|
当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2! |
重复处理
RepeatTemplate
批处理本质上就是重复执行某些动作,这种重复有时是为了简单优化,有时则是作业流程本身的一部分。为了对重复逻辑进行抽象和统一,并提供一种类似迭代器框架的能力,Spring Batch 定义了 RepeatOperations 接口。其定义如下:
public interface RepeatOperations {
RepeatStatus iterate(RepeatCallback callback) throws RepeatException;
}
回调本身也是一个接口,其定义如下,用于插入需要重复执行的业务逻辑:
public interface RepeatCallback {
RepeatStatus doInIteration(RepeatContext context) throws Exception;
}
回调会被反复执行,直到实现方判断本次迭代应当结束。这些接口的返回值是一个枚举,可以是 RepeatStatus.CONTINUABLE 或 RepeatStatus.FINISHED。RepeatStatus 用来向重复操作的调用者说明是否还有剩余工作。通常来说,RepeatOperations 的实现会检查 RepeatStatus,并据此决定是否结束迭代。任何希望告知调用方“已经没有更多工作”的回调,都可以返回 RepeatStatus.FINISHED。
RepeatOperations 最简单、最通用的实现是 RepeatTemplate:
RepeatTemplate template = new RepeatTemplate();
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
template.iterate(new RepeatCallback() {
public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}
});
在前面的示例中,我们返回 RepeatStatus.CONTINUABLE,表示还有工作要继续执行。回调也可以返回 RepeatStatus.FINISHED,用来通知调用方已经没有剩余工作。有些迭代会因为回调内部业务本身的条件而结束;另一些从回调视角看则近似于无限循环,此时是否完成的判定会交由外部策略处理,前面的示例就是这种情况。
完成策略
在 RepeatTemplate 内部,iterate 方法中的循环何时结束由 CompletionPolicy 决定,而它同时也是 RepeatContext 的工厂。RepeatTemplate 负责使用当前策略创建 RepeatContext,并在迭代各阶段把它传给 RepeatCallback。每次回调执行完 doInIteration 后,RepeatTemplate 都会调用 CompletionPolicy 来更新其状态,这些状态会保存在 RepeatContext 中。随后它再询问该策略,判断此次迭代是否已经完成。
Spring Batch 提供了一些简单通用的 CompletionPolicy 实现。SimpleCompletionPolicy 允许最多执行固定次数,而 RepeatStatus.FINISHED 则可以在任意时刻强制提前结束。
对于更复杂的完成条件,用户可能需要自行实现完成策略。例如,如果批处理只能在某个时间窗口内运行,而在线系统一旦开始使用就必须停止批任务,那么就需要自定义策略来实现这种约束。
异常处理
如果在 RepeatCallback 内部抛出了异常,RepeatTemplate 会交由 ExceptionHandler 处理,由它决定是否重新抛出该异常。
下面是 ExceptionHandler 接口的定义:
public interface ExceptionHandler {
void handleException(RepeatContext context, Throwable throwable)
throws Throwable;
}
一个常见场景是统计某一类异常出现的次数,并在达到阈值时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 以及更灵活一些的 RethrowOnThresholdExceptionHandler。SimpleLimitExceptionHandler 包含一个阈值属性,以及一个需要与当前异常比较的异常类型。所给类型的所有子类也会被计入统计。在达到阈值之前,这类异常会被忽略;达到阈值后则会重新抛出。其他类型的异常始终会被重新抛出。
SimpleLimitExceptionHandler 一个很重要的可选属性是布尔标志 useParent。它默认是 false,因此阈值只在当前 RepeatContext 中统计。如果设为 true,则在嵌套迭代中,阈值会在同级上下文之间共享,例如同一个 step 中的一组 chunk。
监听器
很多时候,我们希望在多次不同的迭代过程中接收额外回调,以处理横切关注点。为此,Spring Batch 提供了 RepeatListener 接口。RepeatTemplate 允许用户注册 RepeatListener 实现,在迭代过程中合适的时机会携带 RepeatContext 和 RepeatStatus 回调这些监听器。
RepeatListener 接口定义如下:
public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
open 和 close 回调分别发生在整个迭代开始前和结束后。before、after 与 onError 则作用于每一次单独的 RepeatCallback 调用。
还要注意,当存在多个监听器时,它们会按照列表顺序执行。因此,open 和 before 会按注册顺序调用,而 after、onError 和 close 则按相反顺序调用。
并行处理
RepeatOperations 的实现并不局限于按顺序执行回调。某些实现支持并行执行回调,这一点很重要。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring 的 TaskExecutor 策略来执行 RepeatCallback。默认情况下使用的是 SynchronousTaskExecutor,这意味着整个迭代会在同一个线程中执行,与普通的 RepeatTemplate 一样。
声明式迭代
有时你会明确知道某段业务处理每次发生时都应该重复执行。一个经典例子是消息流水线优化。如果一批消息经常成批到达,那么与其为每条消息单独承担一次事务开销,不如把它们集中处理更高效。为此,Spring Batch 提供了一个 AOP 拦截器,可将方法调用包装进 RepeatOperations 对象中。RepeatOperationsInterceptor 会执行被拦截的方法,并按照所提供 RepeatTemplate 中的 CompletionPolicy 规则重复执行。
-
Java
-
XML
下面的示例使用 Java 配置,对名为 processMessage 的方法进行重复服务调用。关于如何配置 AOP 拦截器的更多细节,请参见 Spring 用户指南:
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");
RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
下面的示例展示了声明式迭代:它使用 Spring AOP 命名空间,对名为 processMessage 的方法进行重复服务调用。关于如何配置 AOP 拦截器的更多细节,请参见 Spring 用户指南:
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.processMessage(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>
前面的示例在拦截器内部使用了默认的 RepeatTemplate。如果你想调整策略、监听器或其他细节,可以向拦截器中注入一个自定义的 RepeatTemplate 实例。
如果被拦截的方法返回值是 void,拦截器会始终返回 RepeatStatus.CONTINUABLE,因此如果 CompletionPolicy 没有明确终点,就存在无限循环的风险。否则,在被拦截方法返回值变为 null 之前,拦截器都会返回 RepeatStatus.CONTINUABLE;一旦返回值为 null,它就会返回 RepeatStatus.FINISHED。因此,目标方法内部的业务逻辑可以通过返回 null,或者抛出一个会被所提供 RepeatTemplate 中 ExceptionHandler 重新抛出的异常,来表明已经没有更多工作需要处理。