当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2!

Item 处理

ItemReader 和 ItemWriter 接口 在各自的职责范围内都非常有用,但如果你希望在写出之前插入一些业务逻辑呢?对于读取和写入两端,一种可行方案都是使用组合模式:创建一个包含另一个 ItemWriterItemWriter,或者创建一个包含另一个 ItemReaderItemReader。下面的代码展示了一个示例:

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(Chunk<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

上面的类内部包含另一个 ItemWriter,并在执行业务逻辑之后将调用委托给它。同样的模式也很容易应用到 ItemReader 上,例如基于主 ItemReader 提供的输入获取更多参考数据。如果你需要自行控制 write 的调用,这种方式也非常有用。不过,如果你的目标只是对即将写出的条目进行“转换”,而不是自己完成写出操作,那么你其实不必亲自调用 write,只需要修改该条目即可。针对这种场景,Spring Batch 提供了 ItemProcessor 接口,如下所示:

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessor 的概念非常简单:给定一个对象,对其进行转换,并返回另一个对象。返回对象可以与输入对象是同一类型,也可以不是。关键点在于,你可以在处理过程中注入业务逻辑,而具体如何实现完全由开发者决定。ItemProcessor 可以直接装配到一个 Step 中。举例来说,假设某个 ItemReader 读取出的对象类型是 Foo,而在写出之前需要把它转换成 Bar。下面的示例展示了一个完成该转换的 ItemProcessor

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar> {
    public void write(Chunk<? extends Bar> bars) throws Exception {
        //write bars
    }
}

在上面的示例中,定义了 Foo 类、Bar 类,以及遵循 ItemProcessor 接口的 FooProcessor 类。这里的转换很简单,但实际上你可以在这里完成任意类型的转换。BarWriter 负责写出 Bar 对象,如果传入其他类型就会抛出异常;同理,FooProcessor 也只接受 Foo 类型,否则会抛出异常。随后,你就可以像下面这样把 FooProcessor 注入到一个 Step 中:

  • Java

  • XML

Java Configuration
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Bar>chunk(2).transactionManager(transactionManager)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(barWriter())
				.build();
}
XML Configuration
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

ItemProcessorItemReaderItemWriter 的一个重要区别是:对于一个 Step 来说,ItemProcessor 并不是必需组件。

链式组合 ItemProcessor

在很多场景下,单次转换已经足够有用。但如果你希望把多个 ItemProcessor 串联起来形成“处理链”呢?这同样可以通过前面提到的组合模式来实现。沿用上一个示例,如果现在要把 Foo 先转换为 Bar,再把 Bar 转换为 Foobar,最后再写出,就可以采用如下方式:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar, Foobar> {
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(Chunk<? extends Foobar> items) throws Exception {
        //write items
    }
}

FooProcessorBarProcessor 可以像下面这样串联起来,得到最终的 Foobar

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);

与前面的示例一样,你也可以把这个组合处理器配置到 Step 中:

  • Java

  • XML

Java Configuration
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Foobar>chunk(2).transactionManager(transactionManager)
				.reader(fooReader())
				.processor(compositeProcessor())
				.writer(foobarWriter())
				.build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
	List<ItemProcessor> delegates = new ArrayList<>(2);
	delegates.add(new FooProcessor());
	delegates.add(new BarProcessor());

	CompositeItemProcessor processor = new CompositeItemProcessor();

	processor.setDelegates(delegates);

	return processor;
}
XML Configuration
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

<bean id="compositeItemProcessor"
      class="org.springframework.batch.infrastructure.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <bean class="..FooProcessor" />
            <bean class="..BarProcessor" />
        </list>
    </property>
</bean>

过滤记录

ItemProcessor 的一个典型用途,是在记录传递给 ItemWriter 之前先将其中一部分过滤掉。过滤与跳过是两个不同的概念。跳过表示这条记录无效;而过滤表示这条记录本身并不一定有问题,只是不应该被写出。

例如,假设某个批处理作业读取的文件中包含三种不同类型的记录:插入记录、更新记录和删除记录。如果系统并不支持删除操作,那么我们就不希望把这些“可删除记录”发送给 ItemWriter。不过,由于这些记录本身并不是错误记录,因此更合适的做法是将它们过滤掉,而不是跳过。最终,ItemWriter 只会接收到可插入和可更新的记录。

要过滤一条记录,你只需要让 ItemProcessor 返回 null。框架检测到结果为 null 后,就不会把该条目加入传递给 ItemWriter 的记录列表。相对地,如果 ItemProcessor 抛出异常,则会触发一次跳过。

校验输入

ItemReader 与 ItemWriter 一章介绍了多种输入解析方式。主要实现通常都会在输入“不符合格式”时抛出异常。例如,FixedLengthTokenizer 会在某段数据缺失时抛出异常;同样地,如果在 RowMapperFieldSetMapper 中访问了不存在的索引,或字段格式与预期不一致,也会抛出异常。这类异常都会发生在 read 返回之前。但这些机制并不能解决“返回的对象是否在业务上有效”这一问题。比如某个字段表示年龄,那么它不能为负数。它也许在语法上完全正确,因为字段存在且值是数字,但这并不会触发异常。由于现有的校验框架已经很多,Spring Batch 并没有再重复造一个,而是提供了一个简单接口 Validator,可以由各种校验框架来实现,如下所示:

public interface Validator<T> {

    void validate(T value) throws ValidationException;

}

这个接口的约定是:如果对象无效,validate 方法就抛出异常;如果对象有效,则正常返回。Spring Batch 提供了一个 ValidatingItemProcessor,如下所示:

  • Java

  • XML

Java Configuration
@Bean
public ValidatingItemProcessor itemProcessor() {
	ValidatingItemProcessor processor = new ValidatingItemProcessor();

	processor.setValidator(validator());

	return processor;
}

@Bean
public SpringValidator validator() {
	SpringValidator validator = new SpringValidator();

	validator.setValidator(new TradeValidator());

	return validator;
}
XML Configuration
<bean class="org.springframework.batch.infrastructure.item.validator.ValidatingItemProcessor">
    <property name="validator" ref="validator" />
</bean>

<bean id="validator" class="org.springframework.batch.infrastructure.item.validator.SpringValidator">
	<property name="validator">
		<bean class="org.springframework.batch.samples.domain.trade.internal.validator.TradeValidator"/>
	</property>
</bean>

你也可以使用 BeanValidatingItemProcessor 来校验带有 Bean Validation API(JSR-303)注解的条目。例如,考虑下面这个 Person 类型:

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
     this.name = name;
    }

    public String getName() {
     return name;
    }

    public void setName(String name) {
     this.name = name;
    }

}

你可以在应用上下文中声明一个 BeanValidatingItemProcessor Bean,并把它注册为面向块的 Step 中的处理器,以完成条目校验:

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
    BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
    beanValidatingItemProcessor.setFilter(true);

    return beanValidatingItemProcessor;
}

容错

当一个 chunk 发生回滚时,在读取阶段被缓存的条目可能会被重新处理。如果某个 Step 被配置为支持容错(通常通过 skip 或 retry),那么其中使用的任何 ItemProcessor 都应当以幂等方式实现。通常这意味着:ItemProcessor 不应修改输入对象本身,而只更新作为处理结果返回的那个对象实例。