|
当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2! |
常见批处理模式
有些批处理作业完全可以直接由 Spring Batch 提供的现成组件拼装而成。例如,通过配置不同的 ItemReader 和 ItemWriter 实现,就可以覆盖相当广泛的场景。不过在大多数情况下,仍然需要编写一定的自定义代码。对应用开发者来说,最主要的 API 入口包括 Tasklet、ItemReader、ItemWriter 以及各种 listener 接口。多数简单批处理作业都可以直接使用 Spring Batch 提供的现成 ItemReader 作为输入,但在处理和写出阶段,往往仍会出现一些特定业务诉求,需要开发者自行实现 ItemWriter 或 ItemProcessor。
本章给出了一些自定义业务逻辑中的常见模式示例。这些示例主要围绕 listener 接口展开。需要注意的是,在合适的情况下,ItemReader 或 ItemWriter 本身也可以实现某个 listener 接口。
记录 Item 处理与失败信息
一个常见场景是:需要在 step 中对每个 item 的错误做特殊处理,例如记录到专用日志通道,或者向数据库插入一条错误记录。对于面向 chunk 的 Step(通过 step 工厂 bean 创建),可以很容易地借助 ItemReadListener 处理 read 阶段错误,并借助 ItemWriteListener 处理 write 阶段错误。下面的代码示例展示了一个同时记录读写失败的 listener:
public class ItemFailureLoggerListener extends ItemListenerSupport {
private static Log logger = LogFactory.getLog("item.error");
public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}
public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
实现好该 listener 之后,还必须把它注册到 step 中。
-
Java
-
XML
下面的示例展示了如何在 Java 配置中为 step 注册 listener:
@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}
下面的示例展示了如何在 XML 中为 step 注册 listener:
<step id="simpleStep">
...
<listeners>
<listener>
<bean class="org.example...ItemFailureLoggerListener"/>
</listener>
</listeners>
</step>
如果你的 listener 在 onError() 方法中执行了任何操作,那么这些操作必须位于一个即将回滚的事务之内。如果你需要在 onError() 方法中使用数据库这类事务性资源,应考虑为该方法增加声明式事务(详见 Spring Core Reference Guide),并将其传播属性设置为 REQUIRES_NEW。
|
因业务原因手动停止 Job
Spring Batch 通过 JobOperator 接口提供了 stop() 方法,但它主要面向运维操作人员,而不是应用开发者。有时,从业务逻辑内部直接停止某次 job 执行会更方便,也更合理。
最简单的做法是抛出一个 RuntimeException,前提是这个异常既不会被无限重试,也不会被跳过。例如,可以像下面这样使用一个自定义异常类型:
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {
@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
另一种简单的方式是让 ItemReader 返回 null,从而停止 step 的执行,如下例所示:
public class EarlyCompletionItemReader implements ItemReader<T> {
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) { ... }
public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}
}
前一个示例实际上依赖了这样一个事实:默认的 CompletionPolicy 策略实现会在待处理 item 为 null 时,认为当前批次已经完成。若有更复杂的需求,也可以自行实现更高级的完成策略,并通过 SimpleStepFactoryBean 注入到 Step 中。
-
Java
-
XML
下面的示例展示了如何在 Java 中向 step 注入完成策略:
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}
下面的示例展示了如何在 XML 中向 step 注入完成策略:
<step id="simpleStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"
chunk-completion-policy="completionPolicy"/>
</tasklet>
</step>
<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>
另一种做法是在 StepExecution 中设置一个标志位,框架中的 Step 实现在处理 item 的间隙会检查这个标志。要采用这种方式,就必须能够访问当前的 StepExecution,通常可通过实现一个 StepListener 并将其注册到 Step 上来实现。下面的示例展示了一个用于设置该标志的 listener:
public class CustomItemWriter extends ItemListenerSupport implements StepListener {
private StepExecution stepExecution;
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}
}
当该标志被设置后,默认行为是让 step 抛出一个 JobInterruptedException。这一行为可以通过 StepInterruptionPolicy 进行控制。不过它本质上只有“抛异常”或“不抛异常”两种选择,因此这始终意味着 job 的一次非正常结束。
添加页脚记录
在写平面文件时,常常需要在所有处理完成之后,向文件末尾追加一条“页脚”记录。Spring Batch 提供的 FlatFileFooterCallback 接口可以用来实现这一需求。FlatFileFooterCallback(以及与之对应的 FlatFileHeaderCallback)都是 FlatFileItemWriter 的可选属性,可以直接配置到 item writer 上。
-
Java
-
XML
下面的示例展示了如何在 Java 中使用 FlatFileHeaderCallback 和 FlatFileFooterCallback:
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
下面的示例展示了如何在 XML 中使用 FlatFileHeaderCallback 和 FlatFileFooterCallback:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="headerCallback" ref="headerCallback" />
<property name="footerCallback" ref="footerCallback" />
</bean>
页脚回调接口只有一个方法,当需要写出页脚时会调用它,其定义如下:
public interface FlatFileFooterCallback {
void writeFooter(Writer writer) throws IOException;
}
写出汇总页脚
与页脚记录相关的一个常见需求,是在输出过程中汇总信息,并在文件末尾追加这些内容。这样的页脚通常用于概括整个文件的结果,或者提供校验值。
例如,如果某个批处理作业要把 Trade 记录写入平面文件,并且要求在页脚中写出所有 Trade 的金额总和,那么就可以使用下面这个 ItemWriter 实现:
public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {
private ItemWriter<Trade> delegate;
private BigDecimal totalAmount = BigDecimal.ZERO;
public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}
delegate.write(items);
// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}
public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}
public void setDelegate(ItemWriter delegate) {...}
}
这个 TradeItemWriter 内部维护了一个 totalAmount 值,每次写出 Trade item 时,都会把对应的 amount 累加进去。当最后一个 Trade 处理完成后,框架会调用 writeFooter,把 totalAmount 写入文件。需要注意的是,write 方法中使用了一个临时变量 chunkTotal,用于保存当前 chunk 中 Trade 金额的总和。这样做是为了确保:如果 write 方法中发生 skip,totalAmount 不会被错误更新。只有在 write 方法顺利结束、确认没有抛出异常之后,才会真正更新 totalAmount。
为了让 writeFooter 方法能够被调用,TradeItemWriter(它实现了 FlatFileFooterCallback)必须作为 footerCallback 注入到 FlatFileItemWriter 中。
-
Java
-
XML
下面的示例展示了如何在 Java 中装配 TradeItemWriter:
@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();
itemWriter.setDelegate(flatFileItemWriter(null));
return itemWriter;
}
@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
下面的示例展示了如何在 XML 中装配 TradeItemWriter:
<bean id="tradeItemWriter" class="..TradeItemWriter">
<property name="delegate" ref="flatFileItemWriter" />
</bean>
<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="footerCallback" ref="tradeItemWriter" />
</bean>
到目前为止,这个 TradeItemWriter 只有在 Step 不支持重启时才能正确工作。原因在于它是有状态的,因为它会保存 totalAmount,但这个值并没有持久化到数据库中,所以一旦发生重启就无法恢复。要让这个类支持重启,就需要实现 ItemStream 接口,并补上 open 与 update 方法,如下例所示:
public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}
public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
update 方法会在 ExecutionContext 被持久化到数据库之前,将当前最新的 totalAmount 存进去。open 方法则会从 ExecutionContext 中读取已有的 totalAmount,并把它作为后续处理的起点,从而使 TradeItemWriter 能够在重启后从上次 Step 停止的位置继续执行。
驱动查询型 ItemReader
在读写器章节中,我们已经讨论过使用分页方式进行数据库读取。很多数据库厂商,例如 DB2,采用非常保守的锁策略,如果被读取的表同时还要被在线应用其他部分使用,就可能引发问题。此外,在某些数据库产品上,对超大数据集开启游标本身也可能带来风险。因此,很多项目更倾向于采用“驱动查询(Driving Query)”方式读取数据。这种方式不是直接迭代完整对象,而是先迭代对象键值,如下图所示:
如上图所示,该示例使用了与基于游标示例相同的 `FOO` 表。但不同的是,这里 SQL 语句并不查询整行数据,而只查询 ID。因此,read 返回的不是一个 FOO 对象,而是一个 Integer。随后再利用这个数字去查询详细信息,也就是完整的 Foo 对象,如下图所示:
通常应使用 ItemProcessor 将驱动查询获得的键转换为完整的 Foo 对象。这里可以直接复用已有 DAO,根据键查询对应的完整对象。
多行记录
在平面文件中,通常每条记录只占一行,但也很常见的一种情况是:同一个文件中的一条记录可能跨越多行,而且不同的行格式还不一样。下面这段文件内容就是一个典型示例:
HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US FOT;2;2;267.34
从以 `HEA` 开头的那一行,到以 `FOT` 开头的那一行之间的所有内容,都应被视为一条完整记录。要正确处理这种情况,需要注意以下几点:
-
ItemReader不能再按“每次读取一条单行记录”的方式工作,而必须把多行记录中的所有行作为一个整体读出来,这样才能把完整记录交给ItemWriter。 -
不同类型的行可能需要采用不同的分词方式。
由于一条记录会跨越多行,而且这些行的数量可能事先并不确定,所以 ItemReader 必须确保每次都能完整读取一整条记录。通常的做法是实现一个自定义 ItemReader,并将其作为 FlatFileItemReader 的包装器。
-
Java
-
XML
下面的示例展示了如何在 Java 中实现自定义 ItemReader:
@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();
itemReader.setDelegate(flatFileItemReader());
return itemReader;
}
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
下面的示例展示了如何在 XML 中实现自定义 ItemReader:
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
<property name="delegate">
<bean class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader">
<property name="resource" value="data/iosample/input/multiLine.txt" />
<property name="lineMapper">
<bean class="org.spr...DefaultLineMapper">
<property name="lineTokenizer" ref="orderFileTokenizer"/>
<property name="fieldSetMapper" ref="orderFieldSetMapper"/>
</bean>
</property>
</bean>
</property>
</bean>
为了确保每一行都能被正确分词,尤其是在定长输入场景下,可以在委托的 FlatFileItemReader 上使用 PatternMatchingCompositeLineTokenizer。更多细节可参见读写器章节中的 FlatFileItemReader。随后,委托 reader 会借助 PassThroughFieldSetMapper,把每一行对应的 FieldSet 传回给外层包装的 ItemReader。
-
Java
-
XML
下面的示例展示了如何在 Java 中确保每一行都被正确分词:
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();
Map<String, LineTokenizer> tokenizers = new HashMap<>(4);
tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());
tokenizer.setTokenizers(tokenizers);
return tokenizer;
}
下面的示例展示了如何在 XML 中确保每一行都被正确分词:
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
<property name="tokenizers">
<map>
<entry key="HEA*" value-ref="headerRecordTokenizer" />
<entry key="FOT*" value-ref="footerRecordTokenizer" />
<entry key="NCU*" value-ref="customerLineTokenizer" />
<entry key="BAD*" value-ref="billingAddressLineTokenizer" />
</map>
</property>
</bean>
这个包装器必须能够识别一条记录的结束位置,这样它才能持续调用其委托对象的 read(),直到读完整条记录。对于读取到的每一行,包装器都应逐步构建最终要返回的 item。一旦读到页脚,就可以把完整 item 返回给 ItemProcessor 和 ItemWriter,如下例所示:
private FlatFileItemReader<FieldSet> delegate;
public Trade read() throws Exception {
Trade t = null;
for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
执行系统命令
很多批处理作业都需要在执行过程中调用外部命令。虽然这种命令也可以由调度器单独触发,但那样就失去了统一维护本次运行元数据的优势。此外,多步骤 job 也会因此被迫拆分成多个独立 job。
由于这种需求非常常见,Spring Batch 提供了一个专门用于调用系统命令的 Tasklet 实现。
-
Java
-
XML
下面的示例展示了如何在 Java 中调用外部命令:
@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);
return tasklet;
}
下面的示例展示了如何在 XML 中调用外部命令:
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
<property name="command" value="echo hello" />
<!-- 5 second timeout for the command to complete -->
<property name="timeout" value="5000" />
</bean>
未找到输入时如何处理 Step 完成状态
在很多批处理场景中,如果数据库或文件中没有找到可处理的数据,并不算异常情况。此时通常只会认为该 Step 没有找到工作要做,并以读取 0 个 item 的状态正常完成。Spring Batch 开箱即用提供的所有 ItemReader 实现,默认都遵循这一行为。不过这也可能带来困惑,例如明明输入存在却什么都没有输出,通常是因为文件名写错之类的问题。正因如此,应该通过检查元数据本身,来判断框架实际找到了多少可处理工作。但如果“没有输入”本身就被视为异常情况怎么办?这时,最佳方案就是通过程序检查元数据中已处理 item 数量是否为 0,并据此让步骤失败。由于这是一个常见需求,Spring Batch 直接提供了一个具备该功能的 listener,即 NoWorkFoundStepExecutionListener:
public class NoWorkFoundStepExecutionListener implements StepExecutionListener {
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}
}
上面的 StepExecutionListener 会在 `afterStep` 阶段检查 StepExecution 的 readCount 属性,以判断是否一个 item 都没有读取到。如果确实如此,它会返回退出码 FAILED,表示该 Step 应当失败;否则返回 null,即不影响 Step 的状态。
向后续 Steps 传递数据
在不同 step 之间传递信息通常是很有用的,这可以通过 ExecutionContext 来实现。不过这里有一个关键点:实际上存在两个 ExecutionContext,一个位于 Step 级别,另一个位于 Job 级别。Step 级别的 ExecutionContext 只在当前 step 生命周期内有效,而 Job 级别的 ExecutionContext 会贯穿整个 Job。另一方面,Step 的 ExecutionContext 会在每次 chunk 提交时更新,而 Job 的 ExecutionContext 则只会在每个 Step 结束时更新。
这一划分带来的直接结果是:在 Step 执行期间,所有需要保存的数据都应先放到 Step 的 ExecutionContext 中,这样才能确保在 step 运行过程中被正确持久化。如果你直接把数据放进 Job 的 ExecutionContext,那么在 Step 执行期间这些数据并不会被持久化;一旦 Step 失败,这些数据也就丢失了。
public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
要让这些数据能够被后续 Steps 使用,就必须在当前 step 结束后,把它们“提升”到 Job 级别的 ExecutionContext 中。Spring Batch 为此提供了 ExecutionContextPromotionListener。这个 listener 需要配置那些必须被提升的 ExecutionContext 键。它还可以选择性配置一个退出码模式列表,只有命中这些退出码时才执行提升操作(默认是 COMPLETED)。和其他 listener 一样,它也必须注册到 Step 上。
-
Java
-
XML
下面的示例展示了如何在 Java 中把 step 级别数据提升到 Job 的 ExecutionContext:
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job1", jobRepository)
.start(step1)
.next(step2)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
return listener;
}
下面的示例展示了如何在 XML 中把 step 级别数据提升到 Job 的 ExecutionContext:
<job id="job1">
<step id="step1">
<tasklet>
<chunk reader="reader" writer="savingWriter" commit-interval="10"/>
</tasklet>
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="step2">
...
</step>
</job>
<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
<beans:property name="keys">
<list>
<value>someKey</value>
</list>
</beans:property>
</beans:bean>
最后,还需要从 Job 的 ExecutionContext 中把这些保存下来的值取出来,如下例所示:
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}