当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2!

常见批处理模式

有些批处理作业完全可以直接由 Spring Batch 提供的现成组件拼装而成。例如,通过配置不同的 ItemReaderItemWriter 实现,就可以覆盖相当广泛的场景。不过在大多数情况下,仍然需要编写一定的自定义代码。对应用开发者来说,最主要的 API 入口包括 TaskletItemReaderItemWriter 以及各种 listener 接口。多数简单批处理作业都可以直接使用 Spring Batch 提供的现成 ItemReader 作为输入,但在处理和写出阶段,往往仍会出现一些特定业务诉求,需要开发者自行实现 ItemWriterItemProcessor

本章给出了一些自定义业务逻辑中的常见模式示例。这些示例主要围绕 listener 接口展开。需要注意的是,在合适的情况下,ItemReaderItemWriter 本身也可以实现某个 listener 接口。

记录 Item 处理与失败信息

一个常见场景是:需要在 step 中对每个 item 的错误做特殊处理,例如记录到专用日志通道,或者向数据库插入一条错误记录。对于面向 chunk 的 Step(通过 step 工厂 bean 创建),可以很容易地借助 ItemReadListener 处理 read 阶段错误,并借助 ItemWriteListener 处理 write 阶段错误。下面的代码示例展示了一个同时记录读写失败的 listener:

public class ItemFailureLoggerListener extends ItemListenerSupport {

    private static Log logger = LogFactory.getLog("item.error");

    public void onReadError(Exception ex) {
        logger.error("Encountered error on read", e);
    }

    public void onWriteError(Exception ex, List<? extends Object> items) {
        logger.error("Encountered error on write", ex);
    }
}

实现好该 listener 之后,还必须把它注册到 step 中。

  • Java

  • XML

下面的示例展示了如何在 Java 配置中为 step 注册 listener:

Java 配置
@Bean
public Step simpleStep(JobRepository jobRepository) {
	return new StepBuilder("simpleStep", jobRepository)
				...
				.listener(new ItemFailureLoggerListener())
				.build();
}

下面的示例展示了如何在 XML 中为 step 注册 listener:

XML 配置
<step id="simpleStep">
...
<listeners>
    <listener>
        <bean class="org.example...ItemFailureLoggerListener"/>
    </listener>
</listeners>
</step>
如果你的 listener 在 onError() 方法中执行了任何操作,那么这些操作必须位于一个即将回滚的事务之内。如果你需要在 onError() 方法中使用数据库这类事务性资源,应考虑为该方法增加声明式事务(详见 Spring Core Reference Guide),并将其传播属性设置为 REQUIRES_NEW

因业务原因手动停止 Job

Spring Batch 通过 JobOperator 接口提供了 stop() 方法,但它主要面向运维操作人员,而不是应用开发者。有时,从业务逻辑内部直接停止某次 job 执行会更方便,也更合理。

最简单的做法是抛出一个 RuntimeException,前提是这个异常既不会被无限重试,也不会被跳过。例如,可以像下面这样使用一个自定义异常类型:

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

    @Override
    public T process(T item) throws Exception {
        if (isPoisonPill(item)) {
            throw new PoisonPillException("Poison pill detected: " + item);
        }
        return item;
    }
}

另一种简单的方式是让 ItemReader 返回 null,从而停止 step 的执行,如下例所示:

public class EarlyCompletionItemReader implements ItemReader<T> {

    private ItemReader<T> delegate;

    public void setDelegate(ItemReader<T> delegate) { ... }

    public T read() throws Exception {
        T item = delegate.read();
        if (isEndItem(item)) {
            return null; // end the step here
        }
        return item;
    }

}

前一个示例实际上依赖了这样一个事实:默认的 CompletionPolicy 策略实现会在待处理 item 为 null 时,认为当前批次已经完成。若有更复杂的需求,也可以自行实现更高级的完成策略,并通过 SimpleStepFactoryBean 注入到 Step 中。

  • Java

  • XML

下面的示例展示了如何在 Java 中向 step 注入完成策略:

Java 配置
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("simpleStep", jobRepository)
				.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
				.reader(reader())
				.writer(writer())
				.build();
}

下面的示例展示了如何在 XML 中向 step 注入完成策略:

XML 配置
<step id="simpleStep">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"
               chunk-completion-policy="completionPolicy"/>
    </tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

另一种做法是在 StepExecution 中设置一个标志位,框架中的 Step 实现在处理 item 的间隙会检查这个标志。要采用这种方式,就必须能够访问当前的 StepExecution,通常可通过实现一个 StepListener 并将其注册到 Step 上来实现。下面的示例展示了一个用于设置该标志的 listener:

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

    private StepExecution stepExecution;

    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public void afterRead(Object item) {
        if (isPoisonPill(item)) {
            stepExecution.setTerminateOnly();
       }
    }

}

当该标志被设置后,默认行为是让 step 抛出一个 JobInterruptedException。这一行为可以通过 StepInterruptionPolicy 进行控制。不过它本质上只有“抛异常”或“不抛异常”两种选择,因此这始终意味着 job 的一次非正常结束。

添加页脚记录

在写平面文件时,常常需要在所有处理完成之后,向文件末尾追加一条“页脚”记录。Spring Batch 提供的 FlatFileFooterCallback 接口可以用来实现这一需求。FlatFileFooterCallback(以及与之对应的 FlatFileHeaderCallback)都是 FlatFileItemWriter 的可选属性,可以直接配置到 item writer 上。

  • Java

  • XML

下面的示例展示了如何在 Java 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

Java 配置
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.headerCallback(headerCallback())
			.footerCallback(footerCallback())
			.build();
}

下面的示例展示了如何在 XML 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

XML 配置
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator" ref="lineAggregator"/>
    <property name="headerCallback" ref="headerCallback" />
    <property name="footerCallback" ref="footerCallback" />
</bean>

页脚回调接口只有一个方法,当需要写出页脚时会调用它,其定义如下:

public interface FlatFileFooterCallback {

    void writeFooter(Writer writer) throws IOException;

}

写出汇总页脚

与页脚记录相关的一个常见需求,是在输出过程中汇总信息,并在文件末尾追加这些内容。这样的页脚通常用于概括整个文件的结果,或者提供校验值。

例如,如果某个批处理作业要把 Trade 记录写入平面文件,并且要求在页脚中写出所有 Trade 的金额总和,那么就可以使用下面这个 ItemWriter 实现:

public class TradeItemWriter implements ItemWriter<Trade>,
                                        FlatFileFooterCallback {

    private ItemWriter<Trade> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

    public void write(Chunk<? extends Trade> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        for (Trade trade : items) {
            chunkTotal = chunkTotal.add(trade.getAmount());
        }

        delegate.write(items);

        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
    }

    public void writeFooter(Writer writer) throws IOException {
        writer.write("Total Amount Processed: " + totalAmount);
    }

    public void setDelegate(ItemWriter delegate) {...}
}

这个 TradeItemWriter 内部维护了一个 totalAmount 值,每次写出 Trade item 时,都会把对应的 amount 累加进去。当最后一个 Trade 处理完成后,框架会调用 writeFooter,把 totalAmount 写入文件。需要注意的是,write 方法中使用了一个临时变量 chunkTotal,用于保存当前 chunk 中 Trade 金额的总和。这样做是为了确保:如果 write 方法中发生 skip,totalAmount 不会被错误更新。只有在 write 方法顺利结束、确认没有抛出异常之后,才会真正更新 totalAmount

为了让 writeFooter 方法能够被调用,TradeItemWriter(它实现了 FlatFileFooterCallback)必须作为 footerCallback 注入到 FlatFileItemWriter 中。

  • Java

  • XML

下面的示例展示了如何在 Java 中装配 TradeItemWriter

Java 配置
@Bean
public TradeItemWriter tradeItemWriter() {
	TradeItemWriter itemWriter = new TradeItemWriter();

	itemWriter.setDelegate(flatFileItemWriter(null));

	return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.footerCallback(tradeItemWriter())
			.build();
}

下面的示例展示了如何在 XML 中装配 TradeItemWriter

XML 配置
<bean id="tradeItemWriter" class="..TradeItemWriter">
    <property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
   <property name="resource" ref="outputResource" />
   <property name="lineAggregator" ref="lineAggregator"/>
   <property name="footerCallback" ref="tradeItemWriter" />
</bean>

到目前为止,这个 TradeItemWriter 只有在 Step 不支持重启时才能正确工作。原因在于它是有状态的,因为它会保存 totalAmount,但这个值并没有持久化到数据库中,所以一旦发生重启就无法恢复。要让这个类支持重启,就需要实现 ItemStream 接口,并补上 openupdate 方法,如下例所示:

public void open(ExecutionContext executionContext) {
    if (executionContext.containsKey("total.amount") {
        totalAmount = (BigDecimal) executionContext.get("total.amount");
    }
}

public void update(ExecutionContext executionContext) {
    executionContext.put("total.amount", totalAmount);
}

update 方法会在 ExecutionContext 被持久化到数据库之前,将当前最新的 totalAmount 存进去。open 方法则会从 ExecutionContext 中读取已有的 totalAmount,并把它作为后续处理的起点,从而使 TradeItemWriter 能够在重启后从上次 Step 停止的位置继续执行。

驱动查询型 ItemReader

读写器章节中,我们已经讨论过使用分页方式进行数据库读取。很多数据库厂商,例如 DB2,采用非常保守的锁策略,如果被读取的表同时还要被在线应用其他部分使用,就可能引发问题。此外,在某些数据库产品上,对超大数据集开启游标本身也可能带来风险。因此,很多项目更倾向于采用“驱动查询(Driving Query)”方式读取数据。这种方式不是直接迭代完整对象,而是先迭代对象键值,如下图所示:

Driving Query Job
图 1. 驱动查询作业

如上图所示,该示例使用了与基于游标示例相同的 `FOO` 表。但不同的是,这里 SQL 语句并不查询整行数据,而只查询 ID。因此,read 返回的不是一个 FOO 对象,而是一个 Integer。随后再利用这个数字去查询详细信息,也就是完整的 Foo 对象,如下图所示:

Driving Query Example
图 2. 驱动查询示例

通常应使用 ItemProcessor 将驱动查询获得的键转换为完整的 Foo 对象。这里可以直接复用已有 DAO,根据键查询对应的完整对象。

多行记录

在平面文件中,通常每条记录只占一行,但也很常见的一种情况是:同一个文件中的一条记录可能跨越多行,而且不同的行格式还不一样。下面这段文件内容就是一个典型示例:

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

从以 `HEA` 开头的那一行,到以 `FOT` 开头的那一行之间的所有内容,都应被视为一条完整记录。要正确处理这种情况,需要注意以下几点:

  • ItemReader 不能再按“每次读取一条单行记录”的方式工作,而必须把多行记录中的所有行作为一个整体读出来,这样才能把完整记录交给 ItemWriter

  • 不同类型的行可能需要采用不同的分词方式。

由于一条记录会跨越多行,而且这些行的数量可能事先并不确定,所以 ItemReader 必须确保每次都能完整读取一整条记录。通常的做法是实现一个自定义 ItemReader,并将其作为 FlatFileItemReader 的包装器。

  • Java

  • XML

下面的示例展示了如何在 Java 中实现自定义 ItemReader

Java 配置
@Bean
public MultiLineTradeItemReader itemReader() {
	MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

	itemReader.setDelegate(flatFileItemReader());

	return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
			.name("flatFileItemReader")
			.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
			.lineTokenizer(orderFileTokenizer())
			.fieldSetMapper(orderFieldSetMapper())
			.build();
	return reader;
}

下面的示例展示了如何在 XML 中实现自定义 ItemReader

XML 配置
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader">
            <property name="resource" value="data/iosample/input/multiLine.txt" />
            <property name="lineMapper">
                <bean class="org.spr...DefaultLineMapper">
                    <property name="lineTokenizer" ref="orderFileTokenizer"/>
                    <property name="fieldSetMapper" ref="orderFieldSetMapper"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

为了确保每一行都能被正确分词,尤其是在定长输入场景下,可以在委托的 FlatFileItemReader 上使用 PatternMatchingCompositeLineTokenizer。更多细节可参见读写器章节中的 FlatFileItemReader。随后,委托 reader 会借助 PassThroughFieldSetMapper,把每一行对应的 FieldSet 传回给外层包装的 ItemReader

  • Java

  • XML

下面的示例展示了如何在 Java 中确保每一行都被正确分词:

Java 内容
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
	PatternMatchingCompositeLineTokenizer tokenizer =
			new PatternMatchingCompositeLineTokenizer();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

	tokenizers.put("HEA*", headerRecordTokenizer());
	tokenizers.put("FOT*", footerRecordTokenizer());
	tokenizers.put("NCU*", customerLineTokenizer());
	tokenizers.put("BAD*", billingAddressLineTokenizer());

	tokenizer.setTokenizers(tokenizers);

	return tokenizer;
}

下面的示例展示了如何在 XML 中确保每一行都被正确分词:

XML 内容
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
    <property name="tokenizers">
        <map>
            <entry key="HEA*" value-ref="headerRecordTokenizer" />
            <entry key="FOT*" value-ref="footerRecordTokenizer" />
            <entry key="NCU*" value-ref="customerLineTokenizer" />
            <entry key="BAD*" value-ref="billingAddressLineTokenizer" />
        </map>
    </property>
</bean>

这个包装器必须能够识别一条记录的结束位置,这样它才能持续调用其委托对象的 read(),直到读完整条记录。对于读取到的每一行,包装器都应逐步构建最终要返回的 item。一旦读到页脚,就可以把完整 item 返回给 ItemProcessorItemWriter,如下例所示:

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
    Trade t = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String prefix = line.readString(0);
        if (prefix.equals("HEA")) {
            t = new Trade(); // Record must start with header
        }
        else if (prefix.equals("NCU")) {
            Assert.notNull(t, "No header was found.");
            t.setLast(line.readString(1));
            t.setFirst(line.readString(2));
            ...
        }
        else if (prefix.equals("BAD")) {
            Assert.notNull(t, "No header was found.");
            t.setCity(line.readString(4));
            t.setState(line.readString(6));
          ...
        }
        else if (prefix.equals("FOT")) {
            return t; // Record must end with footer
        }
    }
    Assert.isNull(t, "No 'END' was found.");
    return null;
}

执行系统命令

很多批处理作业都需要在执行过程中调用外部命令。虽然这种命令也可以由调度器单独触发,但那样就失去了统一维护本次运行元数据的优势。此外,多步骤 job 也会因此被迫拆分成多个独立 job。

由于这种需求非常常见,Spring Batch 提供了一个专门用于调用系统命令的 Tasklet 实现。

  • Java

  • XML

下面的示例展示了如何在 Java 中调用外部命令:

Java 配置
@Bean
public SystemCommandTasklet tasklet() {
	SystemCommandTasklet tasklet = new SystemCommandTasklet();

	tasklet.setCommand("echo hello");
	tasklet.setTimeout(5000);

	return tasklet;
}

下面的示例展示了如何在 XML 中调用外部命令:

XML 配置
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
    <property name="command" value="echo hello" />
    <!-- 5 second timeout for the command to complete -->
    <property name="timeout" value="5000" />
</bean>

未找到输入时如何处理 Step 完成状态

在很多批处理场景中,如果数据库或文件中没有找到可处理的数据,并不算异常情况。此时通常只会认为该 Step 没有找到工作要做,并以读取 0 个 item 的状态正常完成。Spring Batch 开箱即用提供的所有 ItemReader 实现,默认都遵循这一行为。不过这也可能带来困惑,例如明明输入存在却什么都没有输出,通常是因为文件名写错之类的问题。正因如此,应该通过检查元数据本身,来判断框架实际找到了多少可处理工作。但如果“没有输入”本身就被视为异常情况怎么办?这时,最佳方案就是通过程序检查元数据中已处理 item 数量是否为 0,并据此让步骤失败。由于这是一个常见需求,Spring Batch 直接提供了一个具备该功能的 listener,即 NoWorkFoundStepExecutionListener

public class NoWorkFoundStepExecutionListener implements StepExecutionListener {

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getReadCount() == 0) {
            return ExitStatus.FAILED;
        }
        return null;
    }

}

上面的 StepExecutionListener 会在 `afterStep` 阶段检查 StepExecutionreadCount 属性,以判断是否一个 item 都没有读取到。如果确实如此,它会返回退出码 FAILED,表示该 Step 应当失败;否则返回 null,即不影响 Step 的状态。

向后续 Steps 传递数据

在不同 step 之间传递信息通常是很有用的,这可以通过 ExecutionContext 来实现。不过这里有一个关键点:实际上存在两个 ExecutionContext,一个位于 Step 级别,另一个位于 Job 级别。Step 级别的 ExecutionContext 只在当前 step 生命周期内有效,而 Job 级别的 ExecutionContext 会贯穿整个 Job。另一方面,StepExecutionContext 会在每次 chunk 提交时更新,而 JobExecutionContext 则只会在每个 Step 结束时更新。

这一划分带来的直接结果是:在 Step 执行期间,所有需要保存的数据都应先放到 StepExecutionContext 中,这样才能确保在 step 运行过程中被正确持久化。如果你直接把数据放进 JobExecutionContext,那么在 Step 执行期间这些数据并不会被持久化;一旦 Step 失败,这些数据也就丢失了。

public class SavingItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...

        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

要让这些数据能够被后续 Steps 使用,就必须在当前 step 结束后,把它们“提升”到 Job 级别的 ExecutionContext 中。Spring Batch 为此提供了 ExecutionContextPromotionListener。这个 listener 需要配置那些必须被提升的 ExecutionContext 键。它还可以选择性配置一个退出码模式列表,只有命中这些退出码时才执行提升操作(默认是 COMPLETED)。和其他 listener 一样,它也必须注册到 Step 上。

  • Java

  • XML

下面的示例展示了如何在 Java 中把 step 级别数据提升到 JobExecutionContext

Java 配置
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
	return new JobBuilder("job1", jobRepository)
				.start(step1)
				.next(step2)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10).transactionManager(transactionManager)
				.reader(reader())
				.writer(savingWriter())
				.listener(promotionListener())
				.build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
	ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

	listener.setKeys(new String[] {"someKey"});

	return listener;
}

下面的示例展示了如何在 XML 中把 step 级别数据提升到 JobExecutionContext

XML 配置
<job id="job1">
    <step id="step1">
        <tasklet>
            <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
        </tasklet>
        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
    </step>

    <step id="step2">
       ...
    </step>
</job>

<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
    <beans:property name="keys">
        <list>
            <value>someKey</value>
        </list>
    </beans:property>
</beans:bean>

最后,还需要从 JobExecutionContext 中把这些保存下来的值取出来,如下例所示:

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...
    }

    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}