当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2!

批处理领域语言

对于任何有经验的批处理架构师来说,Spring Batch 所采用的批处理整体概念都应当是熟悉且容易理解的。这里有“Job”、有“Step”,也有由开发者提供的处理单元,例如 ItemReaderItemWriter。不过,借助 Spring 的模式、操作、模板、回调与编程习惯,这套模型还能进一步带来如下收益:

  • 显著提升对清晰关注点分离原则的遵循程度。

  • 清晰划分的架构层次,以及以接口形式提供的服务。

  • 简单的默认实现,使系统能够快速接入并开箱即用。

  • 显著增强的可扩展性。

下图是一个已经使用了数十年的批处理参考架构的简化版本。它概括展示了构成批处理领域语言的核心组件。这套架构框架是一个经过长期验证的蓝图,已在多代平台上得到实践证明,包括大型机上的 COBOL、Unix 上的 C,以及如今无处不在的 Java。JCL 和 COBOL 开发者通常会像 C、C#、Java 开发者一样,对这些概念感到熟悉。Spring Batch 为这些层次、组件以及技术服务提供了具体实现,这些内容广泛存在于健壮、可维护的系统中,可用于支撑从简单到复杂的批处理应用构建,并通过其基础设施与扩展能力覆盖极其复杂的处理需求。

Figure 2.1: Batch Stereotypes
图 1. 批处理构件

上图突出了构成 Spring Batch 领域语言的关键概念。一个 Job 由一个或多个 Step 组成,而每个 Step 恰好包含一个 ItemReader、一个可选的 ItemProcessor 和一个 ItemWriter。Job 通过 JobOperator 来操作(例如启动、停止等),而运行过程中的元数据则保存在 JobRepository 中,并可从其中恢复。

Job

本节介绍与批处理作业概念相关的核心构件。Job 是一个封装完整批处理过程的实体。与其他 Spring 项目类似,Job 可以通过 XML 配置文件或基于 Java 的配置方式进行装配,这通常也被称为“作业配置(job configuration)”。不过,Job 只是整个层级结构中的顶层概念,如下图所示:

Job Hierarchy
图 2. Job 层级结构

在 Spring Batch 中,Job 本质上只是一个 Step 实例的容器。它把多个在逻辑上属于同一流程的步骤组合起来,并允许配置作用于所有步骤的全局属性,例如是否支持重启。一个 Job 的配置通常包括:

  • 作业名称。

  • Step 实例的定义及其执行顺序。

  • 该作业是否允许重启。

  • Java

  • XML

对于使用 Java 配置的用户,Spring Batch 提供了 Job 接口的默认实现,即 SimpleJob 类,它在 Job 的基础上提供了一些标准能力。在采用 Java 配置时,框架还提供了一组 builder 用于创建 Job,如下所示:

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

对于使用 XML 配置的用户,Spring Batch 同样通过 SimpleJob 提供了 Job 接口的默认实现。不过,批处理命名空间已经把直接实例化它的细节封装起来了,因此通常可以直接使用 <job> 元素,例如:

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

JobInstance

JobInstance 表示一次“逻辑上的作业运行”。假设有一个需要每天结束时执行一次的批处理作业,例如前图中的 EndOfDay Job。虽然只有一个 EndOfDay 作业定义,但它的每一次实际运行都必须被单独跟踪。对于这个例子来说,每一天都会对应一个逻辑上的 JobInstance。例如,会有 1 月 1 日的运行、1 月 2 日的运行,以此类推。如果 1 月 1 日那次运行第一次失败,并在第二天重新执行,它本质上仍然是“1 月 1 日的那次运行”。(通常这也对应它所处理的数据,即 1 月 1 日的运行处理的就是 1 月 1 日的数据。)因此,一个 JobInstance 可以拥有多个执行记录(即 JobExecution,本章稍后会详细说明),但在任意给定时刻,只能有一个特定的 JobInstance(即特定 Job 加上用于标识的 JobParameters)处于运行中。

JobInstance 的定义本身并不会直接决定要加载哪些数据。具体加载方式完全取决于 ItemReader 的实现。例如,在 EndOfDay 这个场景中,数据中可能会有一个字段标识其所属的 effective dateschedule date。于是,1 月 1 日的运行只会加载 1 日的数据,1 月 2 日的运行只会加载 2 日的数据。由于这种划分通常属于业务决策,因此由 ItemReader 来决定是合理的。不过,是否沿用同一个 JobInstance,会直接决定是否复用先前执行留下的“状态”(也就是本章后面会介绍的 ExecutionContext)。使用新的 JobInstance 往往意味着“从头开始”,而使用已有实例则通常意味着“从上次中断处继续”。

JobParameters

在讨论了 JobInstance 及其与 Job 的区别之后,一个自然的问题就是:“如何区分不同的 JobInstance?”答案就是:JobParametersJobParameters 对象保存了一组用于启动批处理作业的参数。这些参数既可以用于标识实例,也可以作为运行过程中的参考数据,如下图所示:

Job Parameters
图 3. Job 参数

在前面 JobInstance 一节的示例中,有两个实例,一个对应 1 月 1 日,一个对应 1 月 2 日。实际上,它们都属于同一个 Job,只是分别携带了两个不同的 JobParameter:一个以 01-01-2017 作为启动参数,另一个以 01-02-2017 作为启动参数。因此,可以把这个约定表述为:JobInstance = Job + 用于标识的 JobParameters。这意味着开发者可以通过控制传入的参数,精确地控制 JobInstance 的定义方式。

并不是所有作业参数都必须参与 JobInstance 的标识。默认情况下它们会参与标识,但框架同样允许提交一些不会影响 JobInstance 身份的参数。

JobExecution

JobExecution 表示一次作业运行尝试的技术概念。一次执行可能成功,也可能失败,但只有在执行成功完成时,与之对应的 JobInstance 才会被视为完成。仍以前面提到的 EndOfDay Job 为例,假设 01-01-2017 这个 JobInstance 第一次运行失败,那么当它使用与首次运行相同的标识性参数(01-01-2017)再次执行时,会创建一个新的 JobExecution。不过,从逻辑上讲,它仍然只对应同一个 JobInstance

Job 定义了一个作业“是什么”以及“应当如何执行”;JobInstance 则是一个纯粹用于组织执行记录的逻辑对象,主要用于支撑正确的重启语义。而 JobExecution 则是记录一次实际运行过程中“发生了什么”的核心存储机制,它包含了更多必须被控制和持久化的属性,如下表所示:

Table 1. JobExecution Properties

Property

Definition

Status

A BatchStatus object that indicates the status of the execution. While running, it is BatchStatus#STARTED. If it fails, it is BatchStatus#FAILED. If it finishes successfully, it is BatchStatus#COMPLETED

startTime

A java.time.LocalDateTime representing the current system time when the execution was started. This field is empty if the job has yet to start.

endTime

A java.time.LocalDateTime representing the current system time when the execution finished, regardless of whether it was successful or not. The field is empty if the job has yet to finish.

exitStatus

The ExitStatus, indicating the result of the run. It is most important, because it contains an exit code that is returned to the caller. See chapter 5 for more details. The field is empty if the job has yet to finish.

createTime

A java.time.LocalDateTime representing the current system time when the JobExecution was first persisted. The job may not have been started yet (and thus has no start time), but it always has a createTime, which is required by the framework for managing job-level ExecutionContexts.

lastUpdated

A java.time.LocalDateTime representing the last time a JobExecution was persisted. This field is empty if the job has yet to start.

executionContext

The “property bag” containing any user data that needs to be persisted between executions.

failureExceptions

The list of exceptions encountered during the execution of a Job. These can be useful if more than one exception is encountered during the failure of a Job.

这些属性之所以重要,是因为它们都会被持久化保存,并可用于完整判断某次执行的状态。例如,如果 01-01 对应的 EndOfDay 作业在晚上 9:00 启动、9:30 失败,那么批处理元数据表中会留下如下记录:

Table 2. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

Table 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01

TRUE

Table 4. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

为便于说明和排版,列名可能经过缩写或省略。

现在假设该作业失败之后,问题花了一整晚才定位清楚,此时“批处理窗口”已经关闭。再假设批处理窗口每天晚上 9:00 开始,那么 01-01 的作业会在第二天晚上再次启动,并从上次中断的位置继续执行,于 9:30 成功完成。由于此时已经是第二天,01-02 的作业也必须执行,因此它会紧接着在 9:31 启动,并在正常的一小时处理时间后于 10:30 完成。除非两个作业存在访问同一份数据并在数据库锁层面产生冲突的可能,否则并没有规定一个 JobInstance 必须等另一个结束后才能启动。何时运行某个 Job 完全由调度器决定。由于它们属于不同的 JobInstance,Spring Batch 不会主动阻止它们并发运行。(如果试图在某个 JobInstance 已经运行时再次启动同一个实例,则会抛出 JobExecutionAlreadyRunningException。)此时,JobInstance 表和 JobParameters 表中都会新增一条记录,而 JobExecution 表中则会新增两条记录,如下所示:

Table 5. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

2

EndOfDayJob

Table 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

2

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

3

DATE

schedule.Date

2017-01-02 00:00:00

TRUE

Table 7. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

2

1

2017-01-02 21:00

2017-01-02 21:30

COMPLETED

3

2

2017-01-02 21:31

2017-01-02 22:29

COMPLETED

Column names may have been abbreviated or removed for the sake of clarity and formatting.

Step

Step 是一个领域对象,用来封装批处理作业中一个独立、顺序执行的处理阶段。因此,每个 Job 都完全由一个或多个 Step 组成。Step 包含了定义和控制实际批处理过程所需的全部信息。之所以这个描述看起来略显宽泛,是因为某个具体 Step 的内容最终由编写 Job 的开发者自行决定。Step 可以非常简单,也可以非常复杂。一个简单的 Step 可能只是把数据从文件加载到数据库中,几乎不需要写任何代码(取决于所选实现);而复杂的 Step 则可能会在处理过程中应用大量复杂业务规则。与 Job 类似,Step 也有自己独立的 StepExecution,并且它会关联到某一个唯一的 JobExecution,如下图所示:

Figure 2.1: Job Hierarchy With Steps
图 4. 带 Step 的 Job 层级结构

StepExecution

StepExecution 表示一次执行 Step 的尝试。与 JobExecution 类似,每次运行一个 Step 时,都会创建一个新的 StepExecution。不过,如果某个步骤由于前置步骤失败而根本没有开始执行,那么它不会产生持久化的执行记录。只有当对应的 Step 真正启动时,StepExecution 才会被创建。

Step 的执行由 StepExecution 类的对象来表示。每次执行都会包含对所属 Step 和 JobExecution 的引用,以及与事务相关的数据,例如提交次数、回滚次数、开始时间和结束时间。此外,每个 Step 执行还包含一个 ExecutionContext,用于保存开发者希望在批处理运行之间持久化的数据,例如统计信息或重启所需的状态信息。下表列出了 StepExecution 的属性:

Table 8. StepExecution Properties

Property

Definition

Status

A BatchStatus object that indicates the status of the execution. While running, the status is BatchStatus.STARTED. If it fails, the status is BatchStatus.FAILED. If it finishes successfully, the status is BatchStatus.COMPLETED.

startTime

A java.time.LocalDateTime representing the current system time when the execution was started. This field is empty if the step has yet to start.

endTime

A java.time.LocalDateTime representing the current system time when the execution finished, regardless of whether it was successful or not. This field is empty if the step has yet to exit.

exitStatus

The ExitStatus indicating the result of the execution. It is most important, because it contains an exit code that is returned to the caller. See chapter 5 for more details. This field is empty if the job has yet to exit.

executionContext

The “property bag” containing any user data that needs to be persisted between executions.

readCount

The number of items that have been successfully read.

writeCount

The number of items that have been successfully written.

commitCount

The number of transactions that have been committed for this execution.

rollbackCount

The number of times the business transaction controlled by the Step has been rolled back.

readSkipCount

The number of times read has failed, resulting in a skipped item.

processSkipCount

The number of times process has failed, resulting in a skipped item.

filterCount

The number of items that have been “filtered” by the ItemProcessor.

writeSkipCount

The number of times write has failed, resulting in a skipped item.

ExecutionContext

ExecutionContext 表示一组由框架负责持久化和管理的键值对,它为开发者提供了一个位置,用于保存作用域限定在 StepExecutionJobExecution 上的持久状态。(如果你熟悉 Quartz,可以把它理解成与 JobDataMap 很接近的概念。)它最典型的用途就是支持重启。以平面文件输入为例,在逐行处理文件时,框架会在提交点周期性地持久化 ExecutionContext。这样一来,如果运行过程中发生致命错误,甚至机器断电,ItemReader 也能保存自己的当前状态。开发者只需要像下面这样把当前已读取的行号写入上下文,剩下的工作交给框架完成:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

继续使用前面 Job 一节中的 EndOfDay 示例。假设其中只有一个步骤 loadData,负责把文件加载到数据库中。第一次运行失败后,元数据表大致会变成如下状态:

Table 9. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

Table 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID

TYPE_CD

KEY_NAME

DATE_VAL

1

DATE

schedule.Date

2017-01-01

Table 11. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

Table 12. BATCH_STEP_EXECUTION

STEP_EXEC_ID

JOB_EXEC_ID

STEP_NAME

START_TIME

END_TIME

STATUS

1

1

loadData

2017-01-01 21:00

2017-01-01 21:30

FAILED

Table 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID

SHORT_CONTEXT

1

{piece.count=40321}

在上述场景中,这个 Step 运行了 30 分钟,并处理了 40,321 个“piece”,在当前示例里它们可以理解为文件中的行。这个值会在每次提交前由框架更新,ExecutionContext 中也可能包含多条与不同键对应的记录。若想在提交前收到通知,通常需要实现某种 StepListener(或 ItemStream),这部分会在后续章节详细说明。与前一个示例一样,这里假设该 Job 会在第二天重新启动。当作业重启时,上一次运行的 ExecutionContext 中保存的值会从数据库中重新组装出来。此时,ItemReader 在打开时就可以检查上下文中是否存在已保存状态,并据此完成自身初始化,如下所示:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这个例子中,执行完上面的代码后,当前行号就会变成 40,322,于是该 Step 可以从上次中断的位置继续执行。你也可以使用 ExecutionContext 来保存与本次运行相关、需要持久化的统计信息。例如,如果某个平面文件中一笔订单横跨多行,那么就可能需要记录“已经处理了多少个订单”(这与“已经读取了多少行”完全不同),以便在 Step 结束时发送邮件,在正文中汇总本次处理的订单总数。框架会为开发者负责这类数据的存储,并且把它正确限定在某个具体的 JobInstance 范围内。实际上,判断某个已有的 ExecutionContext 是否应当被复用并不容易。例如,在上面的 EndOfDay 场景中,当 01-01 的运行第二次启动时,框架会识别出它仍然属于同一个 JobInstance,于是会按 Step 维度把相应的 ExecutionContext 从数据库中取出,并作为 StepExecution 的一部分交给 Step。相反,对于 01-02 的运行,框架会识别出它属于不同实例,因此必须为这个 Step 提供一个空上下文。类似这样的判断,框架都会替开发者自动完成,以确保状态在正确的时机被交付给正确的对象。还需要特别注意的是:在任意时刻,每个 StepExecution 只会对应一个 ExecutionContext。因此,使用 ExecutionContext 的客户端必须足够谨慎,因为这意味着共享键空间。向其中写值时要避免覆盖已有数据。不过,Step 本身不会在上下文中存任何框架内部数据,所以通常不会对框架造成负面影响。

还要注意,每个 JobExecution 至少有一个 ExecutionContext,而每个 StepExecution 也各自拥有一个。例如,下面这段代码:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

正如注释所示,ecStep 并不等于 ecJob。它们是两个不同的 ExecutionContext。其中,作用域属于 Step 的上下文会在 Step 的每一个提交点保存,而作用域属于 Job 的上下文则会在每个 Step 执行之间进行保存。

ExecutionContext 中,所有非瞬态条目都必须是 Serializable 的。对执行上下文进行正确序列化,是 Step 和 Job 支持重启能力的基础。如果你使用的键或值本身不具备原生可序列化能力,就必须采用定制的序列化方案。否则,执行上下文的持久化过程可能会失效,从而导致失败的作业无法被正确恢复。

JobRepository

JobRepository 是前文提到的所有领域对象的持久化机制。 它为 JobLauncherJobStep 的实现提供 CRUD 操作。一个 Job 首次启动时,会先从仓库中取得一个 JobExecution。此外,在执行过程中, StepExecutionJobExecution 实例也会通过传递给该仓库而被持久化。

  • Java

  • XML

使用 Java 配置时,@EnableBatchProcessing 注解会自动配置一个 JobRepository 组件。

Spring Batch 的 XML 命名空间支持通过 <job-repository> 标签来配置 JobRepository 实例,如下例所示:

<job-repository id="jobRepository"/>

JobOperator

JobOperator 表示一个简洁的操作接口,用于启动、停止和重启作业等操作,如下例所示:

public interface JobOperator {

    JobExecution start(Job job, JobParameters jobParameters) throws Exception;
    JobExecution startNextInstance(Job job) throws Exception;
    boolean stop(JobExecution jobExecution) throws Exception;
    JobExecution restart(JobExecution jobExecution) throws Exception;
    JobExecution abandon(JobExecution jobExecution) throws Exception;

}

Job 会使用给定的一组 JobParameters 启动。其实现通常应当先从 JobRepository 获取一个有效的 JobExecution,然后执行该 Job

ItemReader

ItemReader 是一个抽象,表示为某个 Step 逐条读取输入数据的过程。 当 ItemReader 可提供的数据已经读完时,会通过返回 null 来表明这一点。 关于 ItemReader 接口及其各种实现的更多细节,可参见 Readers And Writers

ItemWriter

ItemWriter 是一个抽象,表示某个 Step 的输出,一次写出一个批次或一个 chunk 中的数据项。通常,ItemWriter 并不知道下一次会接收到什么输入,只处理当前这次调用传入的项。 关于 ItemWriter 接口及其各种实现的更多细节,可参见 Readers And Writers

ItemProcessor

ItemProcessor 是一个抽象,用于表示对单个数据项的业务处理。 ItemReader 负责读取单项,ItemWriter 负责写出单项,而 ItemProcessor 则提供对数据进行转换或应用其他业务处理的入口。 如果在处理过程中判定该项无效,则返回 null 表示该项不应被写出。 关于 ItemProcessor 接口的更多细节,可参见 Readers And Writers

Batch 命名空间

前文列出的许多领域概念都需要在 Spring 的 ApplicationContext 中进行配置。虽然上述接口的实现都可以通过标准 bean 定义来使用, 但 Spring Batch 也提供了一个专用命名空间,以便更方便地完成配置,如下例所示:

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

只要声明了 batch 命名空间,就可以使用其中的任意元素。关于如何配置 Job 的更多信息, 可参见 配置并运行 Job。 关于如何配置 Step 的更多信息,可参见 配置 Step

Batch XML 命名空间自 Spring Batch 6.0 起已被弃用,并将在 7.0 版本中移除。