当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2!

使用信息性消息提供反馈

由于 Spring Batch 作业可能运行较长时间,因此提供进度信息通常十分关键。 例如,当批处理作业的部分或全部处理失败时,相关人员可能希望收到通知。 Spring Batch 支持通过以下方式收集这类信息:

  • 主动轮询

  • 事件驱动监听器

当以异步方式启动 Spring Batch 作业时(例如通过 Job Launching Gateway),会返回一个 JobExecution 实例。因此,你可以使用 JobExecution.getJobInstanceId(),再借助 JobExplorerJobRepository 中获取更新后的 JobExecution 实例, 持续轮询状态变化。不过,这种方式通常不是最优选择,更推荐使用事件驱动的方法。

因此,Spring Batch 提供了多种监听器,其中最常用的三类包括:

  • StepListener

  • ChunkListener

  • JobExecutionListener

在下图示例中,某个 Spring Batch 作业配置了 StepExecutionListener。这样一来,Spring Integration 就可以接收并处理 step 执行前后的事件。例如,你可以使用 Router 检查接收到的 StepExecution,再根据检查结果执行不同操作, 比如把消息路由到邮件出站通道适配器,从而在满足某些条件时发送邮件通知。

Handling Informational Messages
图 1. 处理信息性消息

下面这个分为两部分的示例展示了如何配置监听器,使其在发生 StepExecution 事件时向 Gateway 发送消息, 并通过 logging-channel-adapter 记录输出。

第一步,创建通知相关的集成 Bean。

  • Java

  • XML

下面的示例展示了如何在 Java 中创建通知集成 Bean:

Java Configuration
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("TEST_LOGGER");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
你需要在配置中添加 @IntegrationComponentScan 注解。

下面的示例展示了如何在 XML 中创建通知集成 Bean:

XML Configuration
<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.listener.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>

第二步,修改作业配置并添加 step 级监听器。

  • Java

  • XML

下面的示例展示了如何在 Java 中添加 step 级监听器:

Java Configuration
public Job importPaymentsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new JobBuilder("importPayments", jobRepository)
        .start(new StepBuilder("step1", jobRepository)
                .chunk(200, transactionManager)
                .listener(notificationExecutionsListener())
                // ...
                .build();
              )
        .build();
}

下面的示例展示了如何在 XML 中添加 step 级监听器:

XML Configuration
<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>