|
当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2! |
拦截 Step 执行
和 Job 一样,在 Step 的执行过程中也会发生许多事件,
用户可能需要在这些时机执行某些逻辑。比如,向一个需要页脚的平面文件写数据时,
ItemWriter 就需要在 Step 完成时收到通知,以便写出页脚。
这可以通过多种 Step 作用域的监听器来实现。
你可以通过 listeners 元素,把任何实现了 StepListener 某个扩展接口的类
(但不能是 StepListener 本身,因为它是空接口)应用到一个 step 上。
listeners 元素可以出现在 step、tasklet 或 chunk 声明中。
建议把监听器声明在它实际生效的那个层级上;如果一个监听器同时具备多种能力
(例如同时实现 StepExecutionListener 和 ItemReadListener),
则应把它声明在适用范围最细的那个层级。
-
Java
-
XML
下面的示例展示了如何在 Java 中把一个监听器应用在 chunk 级别:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}
下面的示例展示了如何在 XML 中把一个监听器应用在 chunk 级别:
<step id="step1">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"/>
<listeners>
<listener ref="chunkListener"/>
</listeners>
</tasklet>
</step>
如果某个 ItemReader、ItemWriter 或 ItemProcessor 本身实现了某个
StepListener 接口,那么在使用命名空间 <step> 元素或某个 *StepFactoryBean
工厂时,它会被自动注册到 Step 上。但这一自动注册只适用于直接注入到 Step 中的组件。
如果监听器嵌套在其他组件内部,就需要显式注册它
(如前面向 Step 注册 ItemStream一节所述)。
除了 StepListener 接口本身,Spring Batch 还提供了注解来解决同样的问题。
普通 Java 对象的方法上可以标注这些注解,框架会将它们转换成对应的 StepListener 类型。
在自定义 chunk 组件实现上使用这些注解也很常见,例如用于 ItemReader、
ItemWriter 或 Tasklet。XML 解析器会在处理 <listener/> 元素时分析这些注解,
构建器中的 listener 方法也会注册这些注解驱动的监听器,
因此你只需要通过 XML 命名空间或构建器把监听器注册到 step 上即可。
StepExecutionListener
StepExecutionListener 是最通用的 Step 执行监听器。
它允许你在 Step 启动之前以及结束之后收到通知,无论这个结束是正常完成还是失败,如下例所示:
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
afterStep 的返回类型是 ExitStatus,
这样监听器就有机会修改 Step 完成时返回的退出码。
与该接口对应的注解有:
-
@BeforeStep -
@AfterStep
ChunkListener
“chunk” 指的是在一个事务范围内处理的那一组 item。
在每个提交间隔到来时,提交事务也就意味着提交一个 chunk。
你可以使用 ChunkListener 在 chunk 开始处理之前,或在 chunk 成功完成、失败之后执行逻辑,
如下接口定义所示:
public interface ChunkListener<I, O> extends StepListener {
void beforeChunk(Chunk<I> chunk);
void afterChunk(Chunk<O> chunk);
void afterChunkError(Exception exception, Chunk<O> chunk);
}
beforeChunk 方法会在事务启动之后、读取完一个 chunk 的 item 之后、处理开始之前被调用。
相反,afterChunk 会在 chunk 写出之后、事务提交或回滚之前被调用。
并发 step 中不会调用 ChunkListener 监听器接口
|
与该接口对应的注解有:
-
@BeforeChunk -
@AfterChunk -
@AfterChunkError
ChunkListener 并不是为抛出受检异常而设计的。
错误必须在实现内部自行处理,否则 step 会终止。
ItemReadListener
前面讨论跳过逻辑时提到过,把被跳过的记录记入日志通常是有价值的,
以便后续再对它们进行处理。对于读取错误,可以通过 ItemReaderListener 来实现,
如下接口定义所示:
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
beforeRead 方法会在每次调用 ItemReader 的 read 之前执行。
afterRead 方法会在每次成功读取之后执行,并接收刚刚读取到的 item。
如果读取过程中发生错误,则会调用 onReadError 方法,并把捕获到的异常传入,
以便进行日志记录。
与该接口对应的注解有:
-
@BeforeRead -
@AfterRead -
@OnReadError
ItemProcessListener
与 ItemReadListener 类似,item 的处理过程也可以被“监听”,如下接口定义所示:
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
beforeProcess 方法会在调用 ItemProcessor 的 process 之前执行,
并接收即将被处理的 item。afterProcess 方法会在 item 成功处理之后调用。
如果处理过程中发生错误,则会调用 onProcessError 方法,
并把捕获到的异常以及尝试处理的 item 一并传入,以便记录日志。
与该接口对应的注解有:
-
@BeforeProcess -
@AfterProcess -
@OnProcessError
ItemWriteListener
你也可以通过 ItemWriteListener 来“监听” item 的写出过程,如下接口定义所示:
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
beforeWrite 方法会在调用 ItemWriter 的 write 之前执行,
并接收将要写出的 item 列表。afterWrite 方法会在 item 成功写出之后、
但在 chunk 处理对应事务提交之前执行。如果写出过程中发生错误,则会调用 onWriteError 方法。
框架会把捕获到的异常以及尝试写出的 item 一并传入,以便记录日志。
与该接口对应的注解有:
-
@BeforeWrite -
@AfterWrite -
@OnWriteError
SkipListener
ItemReadListener、ItemProcessListener 和 ItemWriteListener
都提供了错误通知机制,但它们都不会直接告诉你“某条记录实际已经被跳过了”。
例如,即使某个 item 后续重试成功,onWriteError 仍然会被调用。
因此,Spring Batch 单独提供了一个用于跟踪被跳过 item 的接口,如下定义所示:
public interface SkipListener<T,S> extends StepListener {
void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);
}
onSkipInRead 会在某个 item 在读取阶段被跳过时调用。
需要注意的是,回滚可能导致同一个 item 被多次登记为已跳过。
onSkipInWrite 会在某个 item 在写出阶段被跳过时调用。
由于该 item 已经成功读取(并不是在读取阶段被跳过),因此方法参数中也会把这个 item 本身一并传入。
与该接口对应的注解有:
-
@OnSkipInRead -
@OnSkipInWrite -
@OnSkipInProcess