|
当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2! |
扩展与并行处理
很多批处理问题都可以通过单线程、单进程作业解决,因此在考虑更复杂的实现之前,先认真确认这种方式是否已经满足需求,始终是明智的做法。应先测量一个真实作业的性能,看看最简单的实现是否已经足够。即便使用普通硬件,读写一个几百兆的文件通常也能在一分钟以内完成。
当你准备开始实现带有并行处理能力的作业时,Spring Batch 提供了多种可选方案。本章会介绍这些方案,不过其中部分特性也会在其他章节中说明。从高层来看,并行处理主要有两种模式:
-
单进程、多线程
-
多进程
进一步还可以细分为以下几类:
-
多线程 Step(单进程)
-
并行 Steps(单进程)
-
Step 的本地 Chunking(单进程)
-
Step 的远程 Chunking(多进程)
-
Step 分区(单进程或多进程)
-
远程 Step(多进程)
下面先介绍单进程方案,再介绍多进程方案。
多线程 Step
开始并行处理最简单的方式,就是在 Step 配置中加入一个 TaskExecutor。
-
Java
-
XML
使用 Java 配置时,可以像下面这样为 step 添加一个 TaskExecutor:
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
例如,你可以像下面这样为 tasklet 增加一个属性:
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
在这个示例中,taskExecutor 引用了另一个实现了 TaskExecutor 接口的 bean 定义。TaskExecutor 是 Spring 的标准接口,关于可选实现的详细说明,可以参考 Spring 用户指南。最简单的多线程 TaskExecutor 是 SimpleAsyncTaskExecutor。
上述配置的结果是,Step 会使用 task executor 提供的多个线程并发处理 item。因此,ItemProcessor 会被多个线程同时调用,这意味着 ItemProcessor 必须是线程安全的。如果处理过程中使用了有状态组件,就必须确保它们在并发访问时得到了正确同步。
item 的读取和写入仍然由执行该 step 的主线程串行完成,因此 ItemReader 和 ItemWriter 不一定需要线程安全或显式同步。不过,step 的吞吐量可能会受到读写速度限制。如果遇到这种情况,可以考虑采用其他并发技术,例如本地 chunking 或本地分区。
还要注意,step 中使用的连接池类资源,例如 DataSource,也可能会限制并发度。应确保这些资源池的容量至少不小于该 step 期望的并发线程数。
并行 Steps
只要需要并行化的应用逻辑能够拆分成清晰的职责,并分配给不同的 step,就可以在单进程中实现并行。并行 Step 的执行配置简单,也比较容易使用。
-
Java
-
XML
使用 Java 配置时,让 (step1,step2) 与 step3 并行执行非常直接,如下所示:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
例如,让 (step1,step2) 与 step3 并行执行可以按下面方式配置:
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
可配置的 task executor 用于指定由哪个 TaskExecutor 实现去执行各个 flow。默认值是 SyncTaskExecutor,但如果要让 steps 真正并行运行,就必须使用异步 TaskExecutor。还要注意,job 会确保 split 中的每个 flow 都执行完成后,才会汇总退出状态并继续后续流转。
更多细节可参见 拆分 Flows 一节。
本地 Chunking
本地 chunking 是 v6.0 引入的新特性,它允许你在同一个 JVM 内使用多个线程并行处理 item 的各个 chunk。当待处理 item 数量很大,并且希望充分利用多核处理器时,这项特性尤其有用。借助本地 chunking,你可以把一个面向 chunk 的 step 配置成使用多个线程并发处理不同的 chunk。
这一能力通过 ChunkTaskExecutorItemWriter 实现。它是一个 item writer,会借助 TaskExecutor 将 chunk 请求提交给本地 worker:
@Bean
public ChunkTaskExecutorItemWriter<Vet> itemWriter(ChunkProcessor<Vet> chunkProcessor) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setThreadNamePrefix("worker-thread-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
return new ChunkTaskExecutorItemWriter<>(chunkProcessor, taskExecutor);
}
ChunkTaskExecutorItemWriter 既需要一个 TaskExecutor 来并发处理 chunk,也需要一个 ChunkProcessor 来定义每个 chunk 的处理方式。task executor 中的每个线程都会获得属于自己的 chunk,并负责完成该 chunk 的处理,而 step 会负责最终结果的整体汇总。
下面是一个 chunk processor 的示例,它会把每个 chunk 中的 item 写入关系型数据库表:
@Bean
public ChunkProcessor<Vet> chunkProcessor(DataSource dataSource, TransactionTemplate transactionTemplate) {
String sql = "insert into vets (firstname, lastname) values (?, ?)";
JdbcBatchItemWriter<Vet> itemWriter = new JdbcBatchItemWriterBuilder<Vet>().dataSource(dataSource)
.sql(sql)
.itemPreparedStatementSetter((item, ps) -> {
ps.setString(1, item.firstname());
ps.setString(2, item.lastname());
})
.build();
return (chunk, contribution) -> transactionTemplate.executeWithoutResult(transactionStatus -> {
try {
itemWriter.write(chunk);
contribution.incrementWriteCount(chunk.size());
contribution.setExitStatus(ExitStatus.COMPLETED);
}
catch (Exception e) {
transactionStatus.setRollbackOnly();
contribution.incrementWriteSkipCount(chunk.size());
contribution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
});
}
|
需要注意的是,chunk 的事务管理以及容错能力(如 retry、skip、chunk 扫描等)既不由 |
这种扩展方式的完整示例可以参见 Local Chunking Sample。
远程 Chunking
在远程 chunking 中,Step 的处理会拆分到多个进程中执行,这些进程之间通过某种中间件通信。下图展示了这一模式:
manager 组件是单个进程,而 workers 则是多个远程进程。如果 manager 不是瓶颈,这种模式的效果最佳,因此通常要求处理逻辑比 item 读取更耗时,而这在实际场景中也很常见。
manager 是一个 Spring Batch Step 的实现,只不过其中的 ItemWriter 被替换成了通用版本,该 writer 知道如何把 item 的 chunk 作为消息发送到中间件。workers 则是对应中间件的标准监听器,例如在 JMS 场景下就是 MesssageListener 实现,它们通过 ChunkProcessor 接口,使用标准的 ItemWriter 或 ItemProcessor 加 ItemWriter 来处理这些 chunk。这种模式的一个优势在于 reader、processor 和 writer 组件都可以直接复用现成实现,与本地执行 step 时使用的是同一套组件。item 会被动态拆分,工作通过中间件共享,因此如果监听器都是积极消费型的,负载均衡就会自动完成。
所使用的中间件必须是持久化的,能够保证消息投递,并确保每条消息只有一个消费者。JMS 是最自然的选择,但在网格计算和共享内存产品领域也存在其他可选方案,例如 JavaSpaces。
更多细节可参见 Spring Batch 集成 - Remote Chunking 一节。
分区
Spring Batch 还提供了一套 SPI,用于对 Step 执行进行分区并实现远程执行。在这种模式下,远端参与者是一些 Step 实例,而这些实例原本同样可以被配置成本地处理。下图展示了这一模式:
在这张图中,左侧的 Job 以一系列 Step 实例的形式运行,其中一个 Step 被标记为 manager。图中的 workers 都是相同的 Step 实例,实际上它们甚至可以替代 manager 来执行,并为该 Job 得到相同结果。workers 通常是远程服务,但也可以是本地线程。manager 发送给 workers 的消息在这种模式下不要求持久化,也不要求具备投递保证。Spring Batch 在 JobRepository 中维护的元数据会确保每个 worker 在每次 Job 执行中只执行一次且仅执行一次。
Spring Batch 中这套 SPI 由一个特殊的 Step 实现(称为 PartitionStep)以及两个需要针对具体环境实现的策略接口组成。这两个策略接口分别是 PartitionHandler 和 StepExecutionSplitter,下方时序图展示了它们的职责:
这里右侧的 Step 就是“远程” worker,因此理论上可能有很多对象或进程同时扮演这一角色,而图中展示的是由 PartitionStep 来驱动整个执行过程。
-
Java
-
XML
下面的示例展示了在使用 Java 配置时如何配置 PartitionStep:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
与多线程 step 中的 throttleLimit 方法类似,gridSize 方法可以防止 task executor 被单个 step 的请求压满。
下面的示例展示了在使用 XML 配置时如何配置 PartitionStep:
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
与多线程 step 中的 throttle-limit 属性类似,grid-size 属性可以防止 task executor 被单个 step 的请求压满。
Spring Batch Samples 的单元测试套件中(见 partition*Job.xml 配置)提供了一个简单示例,你可以直接复制并扩展。
Spring Batch 会为这些分区创建名为 step1:partition0 等形式的 step execution。很多人为了保持一致性,更倾向于把 manager step 命名为 step1:manager。你也可以为 step 使用别名,例如通过指定 name 属性而不是 id 属性来实现。
PartitionHandler
PartitionHandler 是了解远程调用或网格环境底层通信机制的组件。它能够将 StepExecution 请求封装成某种环境相关的格式(例如 DTO),并发送给远端 Step 实例。它不需要关心输入数据如何拆分,也不需要负责多个 Step 执行结果的汇总。一般来说,它通常也不需要关心弹性恢复或故障转移,因为这些能力在很多情况下由底层环境自身提供。无论如何,Spring Batch 都会提供独立于底层环境的可重启能力。失败的 Job 总是可以重新启动,而重启时只会重新执行失败的那些 Steps。
PartitionHandler 接口可以针对多种底层环境提供专门实现,包括简单的 RMI 远程调用、EJB 远程调用、自定义 Web Service、JMS、Java Spaces、共享内存网格(如 Terracotta 或 Coherence)以及网格执行平台(如 GridGain)。Spring Batch 本身并不提供任何专有网格或远程调用平台的实现。
不过,Spring Batch 确实提供了一个很实用的 PartitionHandler 实现,它使用 Spring 的 TaskExecutor 策略,在本地通过独立线程执行各个 Step 实例。这个实现名为 TaskExecutorPartitionHandler。
-
Java
-
XML
你可以通过 Java 配置显式配置 TaskExecutorPartitionHandler,如下所示:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
对于前面展示的 XML 命名空间方式配置出来的 step,TaskExecutorPartitionHandler 就是默认实现。当然,你也可以像下面这样显式配置它:
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
gridSize 属性决定要创建多少个独立的 step execution,因此它可以与 TaskExecutor 中线程池的大小相匹配。你也可以把它设得比可用线程数更大,这样就会把工作切分成更小的块。
TaskExecutorPartitionHandler 很适合 I/O 密集型的 Step,例如复制大量文件,或者将文件系统内容同步到内容管理系统。它也可以用于远程执行,只需提供一个充当远程调用代理的 Step 实现即可,例如基于 Spring Remoting 的代理实现。
Partitioner
Partitioner 的职责更简单一些:它只负责为新的 step execution 生成作为输入参数的执行上下文,而不需要关心重启问题。它只有一个方法,定义如下:
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
这个方法的返回值会把每个 step execution 的唯一名称(即那个 String)与一个以 ExecutionContext 形式表示的输入参数关联起来。之后,这些名称会出现在 Batch 元数据中,作为分区后各个 StepExecution 的 step 名称。ExecutionContext 本质上只是一个键值对容器,因此它可以保存主键范围、行号或输入文件位置等信息。远程 Step 随后通常会通过 #{…} 占位符来绑定这些上下文输入,也就是 step 作用域中的延迟绑定,下一节会展示这一点。
这些 step execution 的名称(即 Partitioner 返回 Map 中的键)必须在同一个 Job 的各个 step execution 中保持唯一,但除此之外没有其他特殊要求。最简单的做法,也是最便于用户理解的做法,是采用“前缀+后缀”的命名约定,其中前缀是正在执行的 step 名称(它在 Job 中本身就是唯一的),后缀则只是一个计数器。框架中提供了遵循这一约定的 SimplePartitioner。
你还可以使用一个可选接口 PartitionNameProvider,将分区名称的提供与分区本身分离开来。如果某个 Partitioner 实现了这个接口,那么在重启时只会查询这些名称。如果分区计算开销较大,这会是一个很有价值的优化。PartitionNameProvider 提供的名称必须与 Partitioner 提供的名称保持一致。
将输入数据绑定到 Steps
如果由 PartitionHandler 执行的各个 step 具有相同配置,并且它们的输入参数能够在运行时从 ExecutionContext 绑定进来,那么整体执行会非常高效。借助 Spring Batch 的 StepScope 特性,这件事很容易完成(详见 延迟绑定 一节)。例如,如果 Partitioner 创建的 ExecutionContext 中包含一个名为 fileName 的属性键,并为每次 step 调用指向不同的文件或目录,那么 Partitioner 的输出可能类似下表:
Step Execution 名称(键) |
ExecutionContext(值) |
filecopy:partition0 |
fileName=/home/data/one |
filecopy:partition1 |
fileName=/home/data/two |
filecopy:partition2 |
fileName=/home/data/three |
随后就可以通过对执行上下文进行延迟绑定,把文件名绑定到 step 中。
-
Java
-
XML
下面的示例展示了如何在 Java 中定义延迟绑定:
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}
下面的示例展示了如何在 XML 中定义延迟绑定:
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>
远程 Step 执行
从 v6.0 开始,Spring Batch 支持远程 step 执行,使你可以在远程机器或集群上执行批处理作业中的 step。在需要把工作负载分摊到多个节点上,以提升性能和可扩展性的大规模批处理场景中,这项能力尤其有用。远程 step 执行由 RemoteStep 类提供,它通过 Spring Integration 的消息通道打通本地作业执行环境与远程 step 执行器之间的通信。
RemoteStep 的配置方式与普通 step 类似,只需提供远程 step 名称,以及一个用于向远程 worker 发送 step 执行请求的消息模板:
@Bean
public Step step(MessagingTemplate messagingTemplate, JobRepository jobRepository) {
return new RemoteStep("step", "workerStep", jobRepository, messagingTemplate);
}
在 worker 端,你需要定义要执行的远程 step(本例中是 workerStep),并配置一条 Spring Integration 流,用于拦截 step 执行请求并调用 StepExecutionRequestHandler:
@Bean
public Step workerStep(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
return new StepBuilder("workerStep", jobRepository)
// define step logic
.build();
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory, JobRepository jobRepository,
StepLocator stepLocator) {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobRepository(jobRepository);
stepExecutionRequestHandler.setStepLocator(stepLocator);
return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.handle(stepExecutionRequestHandler, "handle")
.get();
}
@Bean
public StepLocator stepLocator(BeanFactory beanFactory) {
BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
beanFactoryStepLocator.setBeanFactory(beanFactory);
return beanFactoryStepLocator;
}
完整示例可参见 Remote Step Sample。