当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2!

创建自定义 ItemReader 与 ItemWriter

到目前为止,本章已经介绍了 Spring Batch 中读取与写入的基础契约,以及一些常见实现方式。不过,这些实现大多比较通用,很多实际场景并不能完全由开箱即用的组件覆盖。本节通过一个简单示例说明如何创建自定义的 ItemReaderItemWriter,并正确实现它们的契约。示例中的 ItemReader 还实现了 ItemStream,以演示如何让 reader 或 writer 具备可重启能力。

自定义 ItemReader 示例

为了便于说明,这里创建一个简单的 ItemReader 实现,从给定的列表中读取数据。首先实现 ItemReader 最基础的契约,也就是 read 方法,如下所示:

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的类接收一个 item 列表,每次返回其中一个元素,并将其从列表中移除。当列表为空时,它返回 null,从而满足 ItemReader 最基本的要求。下面的测试代码说明了这一点:

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

ItemReader 支持重启

最后一个挑战是让 ItemReader 具备可重启能力。当前实现中,如果处理过程中断后再次启动,ItemReader 只能从头开始读取。在很多场景下这本身是成立的,但有时我们更希望批处理任务能够从上次中断的位置继续执行。关键区别通常在于 reader 是有状态还是无状态。无状态 reader 不需要考虑重启问题,而有状态 reader 则必须在重启时尽量恢复到上一次已知状态。因此,我们通常建议自定义 reader 能保持无状态就尽量保持无状态,这样就无需额外处理可重启性。

如果确实需要保存状态,就应该实现 ItemStream 接口:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

每次调用 ItemStreamupdate 方法时,都会将 ItemReader 当前读取位置以 current.index 这个键保存到提供的 ExecutionContext 中。当调用 ItemStreamopen 方法时,会检查 ExecutionContext 是否包含该键。如果存在,就把当前索引恢复到对应位置。这个示例虽然很简单,但已经满足了通用契约的要求:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数 ItemReader 的重启逻辑都会复杂得多。比如 JdbcCursorItemReader 会在游标中保存最后一条已处理记录的行 ID。

还需要注意,ExecutionContext 中使用的键名不能过于随意。原因在于,同一个 Step 中的所有 ItemStream 共享同一个 ExecutionContext。大多数情况下,只要在键名前加上类名,就足以保证唯一性。但在少数情况下,如果同一个 step 中使用了两个相同类型的 ItemStream,例如要同时输出两个文件,就需要使用更具区分度的名称。因此,Spring Batch 中很多 ItemReaderItemWriter 实现都提供了 setName() 属性,用于覆盖默认键名。

自定义 ItemWriter 示例

自定义 ItemWriter 的实现方式在很多方面与前面的 ItemReader 示例类似,但也有足够多的差异,因此值得单独举例说明。不过,为其增加可重启能力的方法本质上是一样的,所以这里不再重复展开。和前面的 ItemReader 示例一样,这里也使用一个 List 来尽量保持示例简单:

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

ItemWriter 支持重启

要让 ItemWriter 支持重启,可以沿用与 ItemReader 相同的思路,即增加并实现 ItemStream 接口,以便与执行上下文同步状态。在这个示例中,假设我们需要统计已处理 item 的数量,并在输出末尾追加一条页脚记录,那么就可以在 ItemWriter 中实现 ItemStream,这样在流重新打开时便能够从执行上下文中恢复该计数器。

在很多真实场景中,自定义 ItemWriter 还会委托给另一个本身支持重启的 writer,例如文件写出器;或者它写入的是事务性资源,因此由于自身无状态,也不需要支持重启。如果你的 writer 是有状态的,那么通常应当同时实现 ItemWriterItemStream。还要记住,writer 的调用方也必须感知到它实现了 ItemStream,因此你可能需要在配置中把它注册为 stream。