当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2!

重复处理

RepeatTemplate

批处理本质上就是重复执行某些动作,这种重复有时是为了简单优化,有时则是作业流程本身的一部分。为了对重复逻辑进行抽象和统一,并提供一种类似迭代器框架的能力,Spring Batch 定义了 RepeatOperations 接口。其定义如下:

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

回调本身也是一个接口,其定义如下,用于插入需要重复执行的业务逻辑:

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

回调会被反复执行,直到实现方判断本次迭代应当结束。这些接口的返回值是一个枚举,可以是 RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus 用来向重复操作的调用者说明是否还有剩余工作。通常来说,RepeatOperations 的实现会检查 RepeatStatus,并据此决定是否结束迭代。任何希望告知调用方“已经没有更多工作”的回调,都可以返回 RepeatStatus.FINISHED

RepeatOperations 最简单、最通用的实现是 RepeatTemplate

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public RepeatStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return RepeatStatus.CONTINUABLE;
    }

});

在前面的示例中,我们返回 RepeatStatus.CONTINUABLE,表示还有工作要继续执行。回调也可以返回 RepeatStatus.FINISHED,用来通知调用方已经没有剩余工作。有些迭代会因为回调内部业务本身的条件而结束;另一些从回调视角看则近似于无限循环,此时是否完成的判定会交由外部策略处理,前面的示例就是这种情况。

RepeatContext

RepeatCallback 的方法参数是一个 RepeatContext。很多回调不会用到它,但如果有需要,可以把它当作一个属性容器,在整个迭代期间保存临时数据。等到 iterate 方法返回后,这个上下文也就不再存在了。

如果当前存在嵌套迭代,那么一个 RepeatContext 会持有其父上下文。父上下文有时很有用,因为它可以保存需要在多次 iterate 调用之间共享的数据。比如,你可能想统计某个事件在迭代中出现的次数,并在后续调用中继续保留该统计结果。

RepeatStatus

RepeatStatus 是 Spring Batch 用来表示处理是否完成的枚举类型。它只有两个可能取值:

表 1. RepeatStatus 属性

说明

CONTINUABLE

还有工作需要继续执行。

FINISHED

不应再继续重复执行。

你可以通过 RepeatStatusand() 方法对多个 RepeatStatus 值做逻辑与运算。它实际作用于是否可继续执行这个标志。换句话说,只要任意一方是 FINISHED,结果就是 FINISHED

完成策略

RepeatTemplate 内部,iterate 方法中的循环何时结束由 CompletionPolicy 决定,而它同时也是 RepeatContext 的工厂。RepeatTemplate 负责使用当前策略创建 RepeatContext,并在迭代各阶段把它传给 RepeatCallback。每次回调执行完 doInIteration 后,RepeatTemplate 都会调用 CompletionPolicy 来更新其状态,这些状态会保存在 RepeatContext 中。随后它再询问该策略,判断此次迭代是否已经完成。

Spring Batch 提供了一些简单通用的 CompletionPolicy 实现。SimpleCompletionPolicy 允许最多执行固定次数,而 RepeatStatus.FINISHED 则可以在任意时刻强制提前结束。

对于更复杂的完成条件,用户可能需要自行实现完成策略。例如,如果批处理只能在某个时间窗口内运行,而在线系统一旦开始使用就必须停止批任务,那么就需要自定义策略来实现这种约束。

异常处理

如果在 RepeatCallback 内部抛出了异常,RepeatTemplate 会交由 ExceptionHandler 处理,由它决定是否重新抛出该异常。

下面是 ExceptionHandler 接口的定义:

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws Throwable;

}

一个常见场景是统计某一类异常出现的次数,并在达到阈值时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 以及更灵活一些的 RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler 包含一个阈值属性,以及一个需要与当前异常比较的异常类型。所给类型的所有子类也会被计入统计。在达到阈值之前,这类异常会被忽略;达到阈值后则会重新抛出。其他类型的异常始终会被重新抛出。

SimpleLimitExceptionHandler 一个很重要的可选属性是布尔标志 useParent。它默认是 false,因此阈值只在当前 RepeatContext 中统计。如果设为 true,则在嵌套迭代中,阈值会在同级上下文之间共享,例如同一个 step 中的一组 chunk。

监听器

很多时候,我们希望在多次不同的迭代过程中接收额外回调,以处理横切关注点。为此,Spring Batch 提供了 RepeatListener 接口。RepeatTemplate 允许用户注册 RepeatListener 实现,在迭代过程中合适的时机会携带 RepeatContextRepeatStatus 回调这些监听器。

RepeatListener 接口定义如下:

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose 回调分别发生在整个迭代开始前和结束后。beforeafteronError 则作用于每一次单独的 RepeatCallback 调用。

还要注意,当存在多个监听器时,它们会按照列表顺序执行。因此,openbefore 会按注册顺序调用,而 afteronErrorclose 则按相反顺序调用。

并行处理

RepeatOperations 的实现并不局限于按顺序执行回调。某些实现支持并行执行回调,这一点很重要。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring 的 TaskExecutor 策略来执行 RepeatCallback。默认情况下使用的是 SynchronousTaskExecutor,这意味着整个迭代会在同一个线程中执行,与普通的 RepeatTemplate 一样。

声明式迭代

有时你会明确知道某段业务处理每次发生时都应该重复执行。一个经典例子是消息流水线优化。如果一批消息经常成批到达,那么与其为每条消息单独承担一次事务开销,不如把它们集中处理更高效。为此,Spring Batch 提供了一个 AOP 拦截器,可将方法调用包装进 RepeatOperations 对象中。RepeatOperationsInterceptor 会执行被拦截的方法,并按照所提供 RepeatTemplate 中的 CompletionPolicy 规则重复执行。

  • Java

  • XML

下面的示例使用 Java 配置,对名为 processMessage 的方法进行重复服务调用。关于如何配置 AOP 拦截器的更多细节,请参见 Spring 用户指南

@Bean
public MyService myService() {
	ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
	factory.setInterfaces(MyService.class);
	factory.setTarget(new MyService());

	MyService service = (MyService) factory.getProxy();
	JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
	pointcut.setPatterns(".*processMessage.*");

	RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

	((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

	return service;
}

下面的示例展示了声明式迭代:它使用 Spring AOP 命名空间,对名为 processMessage 的方法进行重复服务调用。关于如何配置 AOP 拦截器的更多细节,请参见 Spring 用户指南

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

前面的示例在拦截器内部使用了默认的 RepeatTemplate。如果你想调整策略、监听器或其他细节,可以向拦截器中注入一个自定义的 RepeatTemplate 实例。

如果被拦截的方法返回值是 void,拦截器会始终返回 RepeatStatus.CONTINUABLE,因此如果 CompletionPolicy 没有明确终点,就存在无限循环的风险。否则,在被拦截方法返回值变为 null 之前,拦截器都会返回 RepeatStatus.CONTINUABLE;一旦返回值为 null,它就会返回 RepeatStatus.FINISHED。因此,目标方法内部的业务逻辑可以通过返回 null,或者抛出一个会被所提供 RepeatTemplateExceptionHandler 重新抛出的异常,来表明已经没有更多工作需要处理。