当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2!

扩展与并行处理

很多批处理问题都可以通过单线程、单进程作业解决,因此在考虑更复杂的实现之前,先认真确认这种方式是否已经满足需求,始终是明智的做法。应先测量一个真实作业的性能,看看最简单的实现是否已经足够。即便使用普通硬件,读写一个几百兆的文件通常也能在一分钟以内完成。

当你准备开始实现带有并行处理能力的作业时,Spring Batch 提供了多种可选方案。本章会介绍这些方案,不过其中部分特性也会在其他章节中说明。从高层来看,并行处理主要有两种模式:

  • 单进程、多线程

  • 多进程

进一步还可以细分为以下几类:

  • 多线程 Step(单进程)

  • 并行 Steps(单进程)

  • Step 的本地 Chunking(单进程)

  • Step 的远程 Chunking(多进程)

  • Step 分区(单进程或多进程)

  • 远程 Step(多进程)

下面先介绍单进程方案,再介绍多进程方案。

多线程 Step

开始并行处理最简单的方式,就是在 Step 配置中加入一个 TaskExecutor

  • Java

  • XML

使用 Java 配置时,可以像下面这样为 step 添加一个 TaskExecutor

Java 配置
@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10).transactionManager(transactionManager)
				.reader(itemReader())
				.processor(itemProcessor())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}

例如,你可以像下面这样为 tasklet 增加一个属性:

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

在这个示例中,taskExecutor 引用了另一个实现了 TaskExecutor 接口的 bean 定义。TaskExecutor 是 Spring 的标准接口,关于可选实现的详细说明,可以参考 Spring 用户指南。最简单的多线程 TaskExecutorSimpleAsyncTaskExecutor

上述配置的结果是,Step 会使用 task executor 提供的多个线程并发处理 item。因此,ItemProcessor 会被多个线程同时调用,这意味着 ItemProcessor 必须是线程安全的。如果处理过程中使用了有状态组件,就必须确保它们在并发访问时得到了正确同步。

item 的读取和写入仍然由执行该 step 的主线程串行完成,因此 ItemReaderItemWriter 不一定需要线程安全或显式同步。不过,step 的吞吐量可能会受到读写速度限制。如果遇到这种情况,可以考虑采用其他并发技术,例如本地 chunking 或本地分区。

还要注意,step 中使用的连接池类资源,例如 DataSource,也可能会限制并发度。应确保这些资源池的容量至少不小于该 step 期望的并发线程数。

并行 Steps

只要需要并行化的应用逻辑能够拆分成清晰的职责,并分配给不同的 step,就可以在单进程中实现并行。并行 Step 的执行配置简单,也比较容易使用。

  • Java

  • XML

使用 Java 配置时,让 (step1,step2)step3 并行执行非常直接,如下所示:

Java 配置
@Bean
public Job job(JobRepository jobRepository) {
    return new JobBuilder("job", jobRepository)
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

例如,让 (step1,step2)step3 并行执行可以按下面方式配置:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

可配置的 task executor 用于指定由哪个 TaskExecutor 实现去执行各个 flow。默认值是 SyncTaskExecutor,但如果要让 steps 真正并行运行,就必须使用异步 TaskExecutor。还要注意,job 会确保 split 中的每个 flow 都执行完成后,才会汇总退出状态并继续后续流转。

更多细节可参见 拆分 Flows 一节。

本地 Chunking

本地 chunking 是 v6.0 引入的新特性,它允许你在同一个 JVM 内使用多个线程并行处理 item 的各个 chunk。当待处理 item 数量很大,并且希望充分利用多核处理器时,这项特性尤其有用。借助本地 chunking,你可以把一个面向 chunk 的 step 配置成使用多个线程并发处理不同的 chunk。

这一能力通过 ChunkTaskExecutorItemWriter 实现。它是一个 item writer,会借助 TaskExecutor 将 chunk 请求提交给本地 worker:

@Bean
public ChunkTaskExecutorItemWriter<Vet> itemWriter(ChunkProcessor<Vet> chunkProcessor) {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(4);
    taskExecutor.setThreadNamePrefix("worker-thread-");
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    taskExecutor.afterPropertiesSet();
    return new ChunkTaskExecutorItemWriter<>(chunkProcessor, taskExecutor);
}

ChunkTaskExecutorItemWriter 既需要一个 TaskExecutor 来并发处理 chunk,也需要一个 ChunkProcessor 来定义每个 chunk 的处理方式。task executor 中的每个线程都会获得属于自己的 chunk,并负责完成该 chunk 的处理,而 step 会负责最终结果的整体汇总。

下面是一个 chunk processor 的示例,它会把每个 chunk 中的 item 写入关系型数据库表:

@Bean
public ChunkProcessor<Vet> chunkProcessor(DataSource dataSource, TransactionTemplate transactionTemplate) {
    String sql = "insert into vets (firstname, lastname) values (?, ?)";
    JdbcBatchItemWriter<Vet> itemWriter = new JdbcBatchItemWriterBuilder<Vet>().dataSource(dataSource)
        .sql(sql)
        .itemPreparedStatementSetter((item, ps) -> {
            ps.setString(1, item.firstname());
            ps.setString(2, item.lastname());
        })
        .build();

    return (chunk, contribution) -> transactionTemplate.executeWithoutResult(transactionStatus -> {
        try {
            itemWriter.write(chunk);
            contribution.incrementWriteCount(chunk.size());
            contribution.setExitStatus(ExitStatus.COMPLETED);
        }
        catch (Exception e) {
            transactionStatus.setRollbackOnly();
            contribution.incrementWriteSkipCount(chunk.size());
            contribution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
        }
    });
}

需要注意的是,chunk 的事务管理以及容错能力(如 retry、skip、chunk 扫描等)既不由 ChunkTaskExecutorItemWriter 负责,也不由驱动它的 step 负责,这些都属于委托 chunk processor 的职责范围。此外,task executor 的生命周期也不是由 ChunkTaskExecutorItemWriter 管理的。

这种扩展方式的完整示例可以参见 Local Chunking Sample

远程 Chunking

在远程 chunking 中,Step 的处理会拆分到多个进程中执行,这些进程之间通过某种中间件通信。下图展示了这一模式:

Remote Chunking
图 1. 远程 Chunking

manager 组件是单个进程,而 workers 则是多个远程进程。如果 manager 不是瓶颈,这种模式的效果最佳,因此通常要求处理逻辑比 item 读取更耗时,而这在实际场景中也很常见。

manager 是一个 Spring Batch Step 的实现,只不过其中的 ItemWriter 被替换成了通用版本,该 writer 知道如何把 item 的 chunk 作为消息发送到中间件。workers 则是对应中间件的标准监听器,例如在 JMS 场景下就是 MesssageListener 实现,它们通过 ChunkProcessor 接口,使用标准的 ItemWriterItemProcessorItemWriter 来处理这些 chunk。这种模式的一个优势在于 reader、processor 和 writer 组件都可以直接复用现成实现,与本地执行 step 时使用的是同一套组件。item 会被动态拆分,工作通过中间件共享,因此如果监听器都是积极消费型的,负载均衡就会自动完成。

所使用的中间件必须是持久化的,能够保证消息投递,并确保每条消息只有一个消费者。JMS 是最自然的选择,但在网格计算和共享内存产品领域也存在其他可选方案,例如 JavaSpaces。

更多细节可参见 Spring Batch 集成 - Remote Chunking 一节。

分区

Spring Batch 还提供了一套 SPI,用于对 Step 执行进行分区并实现远程执行。在这种模式下,远端参与者是一些 Step 实例,而这些实例原本同样可以被配置成本地处理。下图展示了这一模式:

Partitioning 概览
图 2. 分区

在这张图中,左侧的 Job 以一系列 Step 实例的形式运行,其中一个 Step 被标记为 manager。图中的 workers 都是相同的 Step 实例,实际上它们甚至可以替代 manager 来执行,并为该 Job 得到相同结果。workers 通常是远程服务,但也可以是本地线程。manager 发送给 workers 的消息在这种模式下不要求持久化,也不要求具备投递保证。Spring Batch 在 JobRepository 中维护的元数据会确保每个 worker 在每次 Job 执行中只执行一次且仅执行一次。

Spring Batch 中这套 SPI 由一个特殊的 Step 实现(称为 PartitionStep)以及两个需要针对具体环境实现的策略接口组成。这两个策略接口分别是 PartitionHandlerStepExecutionSplitter,下方时序图展示了它们的职责:

Partitioning SPI
图 3. 分区 SPI

这里右侧的 Step 就是“远程” worker,因此理论上可能有很多对象或进程同时扮演这一角色,而图中展示的是由 PartitionStep 来驱动整个执行过程。

  • Java

  • XML

下面的示例展示了在使用 Java 配置时如何配置 PartitionStep

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

与多线程 step 中的 throttleLimit 方法类似,gridSize 方法可以防止 task executor 被单个 step 的请求压满。

下面的示例展示了在使用 XML 配置时如何配置 PartitionStep

<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

与多线程 step 中的 throttle-limit 属性类似,grid-size 属性可以防止 task executor 被单个 step 的请求压满。

Spring Batch Samples 的单元测试套件中(见 partition*Job.xml 配置)提供了一个简单示例,你可以直接复制并扩展。

Spring Batch 会为这些分区创建名为 step1:partition0 等形式的 step execution。很多人为了保持一致性,更倾向于把 manager step 命名为 step1:manager。你也可以为 step 使用别名,例如通过指定 name 属性而不是 id 属性来实现。

PartitionHandler

PartitionHandler 是了解远程调用或网格环境底层通信机制的组件。它能够将 StepExecution 请求封装成某种环境相关的格式(例如 DTO),并发送给远端 Step 实例。它不需要关心输入数据如何拆分,也不需要负责多个 Step 执行结果的汇总。一般来说,它通常也不需要关心弹性恢复或故障转移,因为这些能力在很多情况下由底层环境自身提供。无论如何,Spring Batch 都会提供独立于底层环境的可重启能力。失败的 Job 总是可以重新启动,而重启时只会重新执行失败的那些 Steps

PartitionHandler 接口可以针对多种底层环境提供专门实现,包括简单的 RMI 远程调用、EJB 远程调用、自定义 Web Service、JMS、Java Spaces、共享内存网格(如 Terracotta 或 Coherence)以及网格执行平台(如 GridGain)。Spring Batch 本身并不提供任何专有网格或远程调用平台的实现。

不过,Spring Batch 确实提供了一个很实用的 PartitionHandler 实现,它使用 Spring 的 TaskExecutor 策略,在本地通过独立线程执行各个 Step 实例。这个实现名为 TaskExecutorPartitionHandler

  • Java

  • XML

你可以通过 Java 配置显式配置 TaskExecutorPartitionHandler,如下所示:

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

对于前面展示的 XML 命名空间方式配置出来的 step,TaskExecutorPartitionHandler 就是默认实现。当然,你也可以像下面这样显式配置它:

<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

gridSize 属性决定要创建多少个独立的 step execution,因此它可以与 TaskExecutor 中线程池的大小相匹配。你也可以把它设得比可用线程数更大,这样就会把工作切分成更小的块。

TaskExecutorPartitionHandler 很适合 I/O 密集型的 Step,例如复制大量文件,或者将文件系统内容同步到内容管理系统。它也可以用于远程执行,只需提供一个充当远程调用代理的 Step 实现即可,例如基于 Spring Remoting 的代理实现。

Partitioner

Partitioner 的职责更简单一些:它只负责为新的 step execution 生成作为输入参数的执行上下文,而不需要关心重启问题。它只有一个方法,定义如下:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

这个方法的返回值会把每个 step execution 的唯一名称(即那个 String)与一个以 ExecutionContext 形式表示的输入参数关联起来。之后,这些名称会出现在 Batch 元数据中,作为分区后各个 StepExecution 的 step 名称。ExecutionContext 本质上只是一个键值对容器,因此它可以保存主键范围、行号或输入文件位置等信息。远程 Step 随后通常会通过 #{…​} 占位符来绑定这些上下文输入,也就是 step 作用域中的延迟绑定,下一节会展示这一点。

这些 step execution 的名称(即 Partitioner 返回 Map 中的键)必须在同一个 Job 的各个 step execution 中保持唯一,但除此之外没有其他特殊要求。最简单的做法,也是最便于用户理解的做法,是采用“前缀+后缀”的命名约定,其中前缀是正在执行的 step 名称(它在 Job 中本身就是唯一的),后缀则只是一个计数器。框架中提供了遵循这一约定的 SimplePartitioner

你还可以使用一个可选接口 PartitionNameProvider,将分区名称的提供与分区本身分离开来。如果某个 Partitioner 实现了这个接口,那么在重启时只会查询这些名称。如果分区计算开销较大,这会是一个很有价值的优化。PartitionNameProvider 提供的名称必须与 Partitioner 提供的名称保持一致。

将输入数据绑定到 Steps

如果由 PartitionHandler 执行的各个 step 具有相同配置,并且它们的输入参数能够在运行时从 ExecutionContext 绑定进来,那么整体执行会非常高效。借助 Spring Batch 的 StepScope 特性,这件事很容易完成(详见 延迟绑定 一节)。例如,如果 Partitioner 创建的 ExecutionContext 中包含一个名为 fileName 的属性键,并为每次 step 调用指向不同的文件或目录,那么 Partitioner 的输出可能类似下表:

表 1. 面向目录处理时,Partitioner 提供的 step execution 名称与执行上下文示例

Step Execution 名称(键)

ExecutionContext(值)

filecopy:partition0

fileName=/home/data/one

filecopy:partition1

fileName=/home/data/two

filecopy:partition2

fileName=/home/data/three

随后就可以通过对执行上下文进行延迟绑定,把文件名绑定到 step 中。

  • Java

  • XML

下面的示例展示了如何在 Java 中定义延迟绑定:

Java 配置
@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}

下面的示例展示了如何在 XML 中定义延迟绑定:

XML 配置
<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>

远程 Step 执行

从 v6.0 开始,Spring Batch 支持远程 step 执行,使你可以在远程机器或集群上执行批处理作业中的 step。在需要把工作负载分摊到多个节点上,以提升性能和可扩展性的大规模批处理场景中,这项能力尤其有用。远程 step 执行由 RemoteStep 类提供,它通过 Spring Integration 的消息通道打通本地作业执行环境与远程 step 执行器之间的通信。

RemoteStep 的配置方式与普通 step 类似,只需提供远程 step 名称,以及一个用于向远程 worker 发送 step 执行请求的消息模板:

@Bean
public Step step(MessagingTemplate messagingTemplate, JobRepository jobRepository) {
    return new RemoteStep("step", "workerStep", jobRepository, messagingTemplate);
}

在 worker 端,你需要定义要执行的远程 step(本例中是 workerStep),并配置一条 Spring Integration 流,用于拦截 step 执行请求并调用 StepExecutionRequestHandler

@Bean
public Step workerStep(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
    return new StepBuilder("workerStep", jobRepository)
        // define step logic
        .build();
}

/*
 * Configure inbound flow (requests coming from the manager)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory, JobRepository jobRepository,
        StepLocator stepLocator) {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobRepository(jobRepository);
    stepExecutionRequestHandler.setStepLocator(stepLocator);
    return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
        .channel(requests())
        .handle(stepExecutionRequestHandler, "handle")
        .get();
}

@Bean
public StepLocator stepLocator(BeanFactory beanFactory) {
    BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
    beanFactoryStepLocator.setBeanFactory(beanFactory);
    return beanFactoryStepLocator;
}

完整示例可参见 Remote Step Sample