当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2!

外部化批处理执行

到目前为止讨论的集成方式,大多对应的是由 Spring Integration 像外壳一样包裹 Spring Batch 的使用场景。不过,Spring Batch 也可以在内部使用 Spring Integration。借助这种方式,Spring Batch 用户可以把条目甚至整个 chunk 的处理委派给外部进程,从而卸载复杂处理逻辑。Spring Batch Integration 为以下场景提供了专门支持:

  • 远程 Chunking

  • 远程分区

远程 Chunking

下图展示了将 Spring Batch 与 Spring Integration 结合使用时, 远程 chunking 的一种工作方式:

Remote Chunking
图 1. 远程 Chunking

进一步来说,你还可以通过 ChunkMessageChannelItemWriter (由 Spring Batch Integration 提供)将 chunk 处理外部化。它负责把条目发送出去, 并收集处理结果。发送之后,Spring Batch 会继续读取和分组条目,而不会等待结果返回。 相反,由 ChunkMessageChannelItemWriter 负责汇总结果, 并将其重新整合回 Spring Batch 流程。

借助 Spring Integration,你可以完全控制处理流程的并发性, 例如使用 QueueChannel 替代 DirectChannel。 此外,依托 Spring Integration 丰富的通道适配器集合 (例如 JMS 和 AMQP),你还可以把批处理作业的 chunk 分发到外部系统中处理。

  • Java

  • XML

一个包含远程 chunking step 的作业,其 Java 配置可能类似如下:

Java Configuration
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
     return new JobBuilder("personJob", jobRepository)
             .start(new StepBuilder("step1", jobRepository)
                     .<Person, Person>chunk(200, transactionManager)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }

一个包含远程 chunking step 的作业,其 XML 配置可能类似如下:

XML Configuration
<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

ItemReader 引用指向你希望在 manager 端用于读取数据的 Bean。 ItemWriter 引用则指向前面提到的一个特殊 ItemWriter, 即 ChunkMessageChannelItemWriter。处理器(如果有)不会配置在 manager 端, 而是配置在 worker 端。实现你的场景时,还应检查其他组件属性,例如节流限制等。

  • Java

  • XML

下面的 Java 配置给出了一个基本的 manager 端设置:

Java Configuration
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}

下面的 XML 配置给出了一个基本的 manager 端设置:

XML Configuration
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

前述配置会创建多个 Bean。我们使用 ActiveMQ 以及 Spring Integration 提供的 JMS 入站和出站适配器来配置消息中间件。如示例所示,被作业 step 引用的 itemWriter Bean 使用 ChunkMessageChannelItemWriter 通过已配置的中间件写出 chunk。

接下来可以继续看 worker 端配置,如下例所示:

  • Java

  • XML

下面的示例展示了 Java 版 worker 配置:

Java Configuration
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the manager)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the manager)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}

下面的示例展示了 XML 版 worker 配置:

XML Configuration
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="incomingRequests"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkRequestHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

这些配置项中的大多数在 manager 端配置里已经出现过。worker 端不需要访问 Spring Batch 的 JobRepository,也不需要访问实际的作业配置文件。 这里最关键的 Bean 是 chunkProcessorChunkHandlerChunkProcessorChunkRequestHandlerchunkProcessor 属性 接收一个配置好的 SimpleChunkProcessor,你需要在这里提供 ItemWriter 的引用,以及可选的 ItemProcessor 引用, 它们会在 worker 接收到 manager 发来的 chunk 时执行。

更多信息请参见“可伸缩性”章节中关于 远程 Chunking 的部分。

从 4.1 版本开始,Spring Batch Integration 引入了 @EnableBatchIntegration 注解,可用于简化远程 chunking 配置。 该注解会提供两个可在应用上下文中自动注入的 Bean:

  • RemoteChunkingManagerStepBuilderFactory:用于配置 manager step

  • RemoteChunkingWorkerBuilder:用于配置远程 worker 集成流程

这些 API 会负责配置多个组件,如下图所示:

Remote Chunking Configuration
图 2. 远程 Chunking 配置

在 manager 端,RemoteChunkingManagerStepBuilderFactory 允许你通过声明以下内容来配置 manager step:

  • 用于读取条目并将其发送给 worker 的 item reader

  • 用于向 worker 发送请求的输出通道(“Outgoing requests”)

  • 用于接收 worker 回复的输入通道(“Incoming replies”)

你不必显式配置 ChunkMessageChannelItemWriterMessagingTemplate。当然,如果有充分理由,仍然可以手动配置它们。

在 worker 端,RemoteChunkingWorkerBuilder 允许你将 worker 配置为:

  • 在输入通道(“Incoming requests”)上监听 manager 发送的请求

  • 针对每个请求调用 ChunkProcessorChunkRequestHandlerhandleChunk 方法,并使用配置好的 ItemProcessorItemWriter

  • 通过输出通道(“Outgoing replies”)向 manager 发送回复

你也不必显式配置 SimpleChunkProcessorChunkProcessorChunkRequestHandler。如果确有需要,依然可以手动配置。

下面的示例展示了如何使用这些 API:

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public TaskletStep managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the manager
                       .outputChannel(replies()) // replies sent to the manager
                       .build();
        }

        // Middleware beans setup omitted

    }

}

远程 chunking 作业的完整示例可在 这里 查看。

远程分区

下图展示了一个典型的远程分区场景:

Remote Partitioning
图 3. 远程分区

相对而言,当瓶颈不在条目处理本身,而在相关 I/O 时,远程分区会更有价值。 通过远程分区,你可以把工作分发给执行完整 Spring Batch step 的 worker。 因此,每个 worker 都拥有自己的 ItemReaderItemProcessorItemWriter。为此,Spring Batch Integration 提供了 MessageChannelPartitionHandler

这个 PartitionHandler 接口实现通过 MessageChannel 向远程 worker 发送指令并接收响应,从而很好地抽象了与远程 worker 通信时所使用的传输层, 例如 JMS 和 AMQP。

“可伸缩性”章节中关于 远程分区 的部分,对配置远程分区所需的概念与组件做了概览,并给出了使用默认 TaskExecutorPartitionHandler 在本地不同线程中执行分区的示例。 若要把远程分区扩展到多个 JVM,则还需要两个额外组件:

  • 一个远程通信基础设施或网格环境

  • 一个支持目标远程通信基础设施或网格环境的 PartitionHandler 实现

与远程 chunking 类似,你也可以把 JMS 作为“远程通信基础设施”。 此时,就像前文所述那样,使用 MessageChannelPartitionHandler 作为 PartitionHandler 的实现即可。

  • Java

  • XML

下面的示例假定分区作业已经存在,重点展示 Java 中 MessageChannelPartitionHandler 与 JMS 的配置:

Java Configuration
/*
 * Configuration of the manager side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlow.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlow.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}

下面的示例假定分区作业已经存在,重点展示 XML 中 MessageChannelPartitionHandler 与 JMS 的配置:

XML Configuration
<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

你还必须确保分区配置中的 handler 属性映射到 partitionHandler Bean。

  • Java

  • XML

下面的示例展示了如何在 Java 中把分区的 handler 属性映射到 partitionHandler

Java Configuration
	public Job personJob(JobRepository jobRepository) {
		return new JobBuilder("personJob", jobRepository)
				.start(new StepBuilder("step1.manager", jobRepository)
						.partitioner("step1.worker", partitioner())
						.partitionHandler(partitionHandler())
						.build())
				.build();
	}

下面的示例展示了如何在 XML 中把分区的 handler 属性映射到 partitionHandler

XML Configuration
<job id="personJob">
  <step id="step1.manager">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>

远程分区作业的完整示例可在 这里 查看。

你可以使用 @EnableBatchIntegration 注解来简化远程分区配置。 该注解会提供两个对远程分区有用的 Bean:

  • RemotePartitioningManagerStepBuilderFactory:用于配置 manager step

  • RemotePartitioningWorkerStepBuilderFactory:用于配置 worker step

这些 API 会负责配置多个组件,如下图所示:

Remote Partitioning Configuration (with job repository polling)
图 4. 远程分区配置(使用作业仓库轮询)
Remote Partitioning Configuration (with replies aggregation)
图 5. 远程分区配置(使用回复聚合)

在 manager 端,RemotePartitioningManagerStepBuilderFactory 允许你通过声明以下内容来配置 manager step:

  • 用于划分数据的 Partitioner

  • 用于向 worker 发送请求的输出通道(“Outgoing requests”)

  • 用于接收 worker 回复的输入通道(“Incoming replies”),适用于配置回复聚合的场景

  • 轮询间隔与超时参数,适用于配置作业仓库轮询的场景

你不必显式配置 MessageChannelPartitionHandlerMessagingTemplate。当然,如果有充分理由,仍然可以手动配置它们。

在 worker 端,RemotePartitioningWorkerStepBuilderFactory 允许你将 worker 配置为:

  • 在输入通道(“Incoming requests”)上监听 manager 发送的请求

  • 针对每个请求调用 StepExecutionRequestHandlerhandle 方法

  • 通过输出通道(“Outgoing replies”)向 manager 发送回复

你不必显式配置 StepExecutionRequestHandler。 如果确有需要,仍然可以手动配置它。

下面的示例展示了如何使用这些 API:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public Step managerStep() {
                 return this.managerStepBuilderFactory
                    .get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromManager())
                    .outputChannel(outgoingRepliesToManager())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // Middleware beans setup omitted

    }

}