当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2!

拦截 Step 执行

Job 一样,在 Step 的执行过程中也会发生许多事件, 用户可能需要在这些时机执行某些逻辑。比如,向一个需要页脚的平面文件写数据时, ItemWriter 就需要在 Step 完成时收到通知,以便写出页脚。 这可以通过多种 Step 作用域的监听器来实现。

你可以通过 listeners 元素,把任何实现了 StepListener 某个扩展接口的类 (但不能是 StepListener 本身,因为它是空接口)应用到一个 step 上。 listeners 元素可以出现在 step、tasklet 或 chunk 声明中。 建议把监听器声明在它实际生效的那个层级上;如果一个监听器同时具备多种能力 (例如同时实现 StepExecutionListenerItemReadListener), 则应把它声明在适用范围最细的那个层级。

  • Java

  • XML

下面的示例展示了如何在 Java 中把一个监听器应用在 chunk 级别:

Java Configuration
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10).transactionManager(transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

下面的示例展示了如何在 XML 中把一个监听器应用在 chunk 级别:

XML Configuration
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

如果某个 ItemReaderItemWriterItemProcessor 本身实现了某个 StepListener 接口,那么在使用命名空间 <step> 元素或某个 *StepFactoryBean 工厂时,它会被自动注册到 Step 上。但这一自动注册只适用于直接注入到 Step 中的组件。 如果监听器嵌套在其他组件内部,就需要显式注册它 (如前面Step 注册 ItemStream一节所述)。

除了 StepListener 接口本身,Spring Batch 还提供了注解来解决同样的问题。 普通 Java 对象的方法上可以标注这些注解,框架会将它们转换成对应的 StepListener 类型。 在自定义 chunk 组件实现上使用这些注解也很常见,例如用于 ItemReaderItemWriterTasklet。XML 解析器会在处理 <listener/> 元素时分析这些注解, 构建器中的 listener 方法也会注册这些注解驱动的监听器, 因此你只需要通过 XML 命名空间或构建器把监听器注册到 step 上即可。

StepExecutionListener

StepExecutionListener 是最通用的 Step 执行监听器。 它允许你在 Step 启动之前以及结束之后收到通知,无论这个结束是正常完成还是失败,如下例所示:

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

afterStep 的返回类型是 ExitStatus, 这样监听器就有机会修改 Step 完成时返回的退出码。

与该接口对应的注解有:

  • @BeforeStep

  • @AfterStep

ChunkListener

“chunk” 指的是在一个事务范围内处理的那一组 item。 在每个提交间隔到来时,提交事务也就意味着提交一个 chunk。 你可以使用 ChunkListener 在 chunk 开始处理之前,或在 chunk 成功完成、失败之后执行逻辑, 如下接口定义所示:

public interface ChunkListener<I, O> extends StepListener {

    void beforeChunk(Chunk<I> chunk);
    void afterChunk(Chunk<O> chunk);
    void afterChunkError(Exception exception, Chunk<O> chunk);

}

beforeChunk 方法会在事务启动之后、读取完一个 chunk 的 item 之后、处理开始之前被调用。 相反,afterChunk 会在 chunk 写出之后、事务提交或回滚之前被调用。

并发 step 中不会调用 ChunkListener 监听器接口

与该接口对应的注解有:

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

ChunkListener 并不是为抛出受检异常而设计的。 错误必须在实现内部自行处理,否则 step 会终止。

ItemReadListener

前面讨论跳过逻辑时提到过,把被跳过的记录记入日志通常是有价值的, 以便后续再对它们进行处理。对于读取错误,可以通过 ItemReaderListener 来实现, 如下接口定义所示:

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeRead 方法会在每次调用 ItemReaderread 之前执行。 afterRead 方法会在每次成功读取之后执行,并接收刚刚读取到的 item。 如果读取过程中发生错误,则会调用 onReadError 方法,并把捕获到的异常传入, 以便进行日志记录。

与该接口对应的注解有:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 类似,item 的处理过程也可以被“监听”,如下接口定义所示:

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess 方法会在调用 ItemProcessorprocess 之前执行, 并接收即将被处理的 item。afterProcess 方法会在 item 成功处理之后调用。 如果处理过程中发生错误,则会调用 onProcessError 方法, 并把捕获到的异常以及尝试处理的 item 一并传入,以便记录日志。

与该接口对应的注解有:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

你也可以通过 ItemWriteListener 来“监听” item 的写出过程,如下接口定义所示:

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite 方法会在调用 ItemWriterwrite 之前执行, 并接收将要写出的 item 列表。afterWrite 方法会在 item 成功写出之后、 但在 chunk 处理对应事务提交之前执行。如果写出过程中发生错误,则会调用 onWriteError 方法。 框架会把捕获到的异常以及尝试写出的 item 一并传入,以便记录日志。

与该接口对应的注解有:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了错误通知机制,但它们都不会直接告诉你“某条记录实际已经被跳过了”。 例如,即使某个 item 后续重试成功,onWriteError 仍然会被调用。 因此,Spring Batch 单独提供了一个用于跟踪被跳过 item 的接口,如下定义所示:

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

onSkipInRead 会在某个 item 在读取阶段被跳过时调用。 需要注意的是,回滚可能导致同一个 item 被多次登记为已跳过。 onSkipInWrite 会在某个 item 在写出阶段被跳过时调用。 由于该 item 已经成功读取(并不是在读取阶段被跳过),因此方法参数中也会把这个 item 本身一并传入。

与该接口对应的注解有:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListener 与事务

SkipListener 最常见的用途之一,就是把被跳过的 item 记录下来, 以便后续由另一个批处理流程,甚至人工流程,来评估并修复导致跳过的问题。 由于原始事务在很多情况下可能会回滚,Spring Batch 提供了两个保证:

  • 合适的 skip 方法(取决于错误发生的阶段)对每个 item 只会调用一次。

  • SkipListener 总是在事务提交之前的最后时刻被调用。 这样做是为了确保监听器中访问到的任何事务性资源,不会因为 ItemWriter 内部发生失败而被回滚。