|
当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2! |
通过消息启动批处理作业
使用 Spring Batch 核心 API 启动批处理作业时,基本上有两种方式:
-
通过命令行,使用
CommandLineJobOperator -
通过编程方式,调用
JobOperator.start()
例如,当你通过 shell 脚本调用批处理作业时,可能会选择使用
CommandLineJobOperator。另一种做法是直接使用
JobOperator,例如把 Spring Batch 作为 Web 应用的一部分来运行。
但如果场景更复杂呢?你可能需要轮询远程 (S)FTP 服务器来获取批处理作业所需的数据,
或者应用需要同时支持多个不同的数据源。例如,数据文件可能不仅来自 Web,
还可能来自 FTP 或其他来源。在调用 Spring Batch 之前,输入文件也可能还需要额外的转换。
因此,借助 Spring Integration 及其丰富的适配器来执行批处理作业会更强大。
例如,你可以使用 File Inbound Channel Adapter 监控文件系统中的目录,
一旦输入文件到达就立即启动批处理作业。此外,你还可以仅通过配置创建
Spring Integration 流程,组合多个不同的适配器,从多个来源同时为批处理作业接入数据。
之所以容易实现这些场景,是因为 Spring Integration 支持以解耦、事件驱动的方式执行
JobOperator。
Spring Batch Integration 提供了
JobLaunchingMessageHandler,可用来启动批处理作业。
JobLaunchingMessageHandler 的输入由 Spring Integration 消息提供,
其负载类型为 JobLaunchRequest。这个类封装了要启动的 Job
以及启动该批处理作业所需的 JobParameters。
下图展示了启动批处理作业时所需的典型 Spring Integration 消息流。 EIP(Enterprise Integration Patterns)网站 提供了消息图标及其含义的完整说明。
将文件转换为 JobLaunchRequest
下面的示例展示了如何将文件转换为 JobLaunchRequest:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 响应
当批处理作业开始执行时,会返回一个 JobExecution 实例。
你可以使用该实例来判断本次执行的状态。只要 JobExecution 能够成功创建,
它就一定会被返回,而不受实际执行是否成功的影响。
JobExecution 的返回时机取决于所提供的
TaskExecutor。如果使用的是 synchronous(单线程)
TaskExecutor 实现,那么只有在作业完成 之后,
才会返回 JobExecution 响应。若使用
asynchronous TaskExecutor,
则会立即返回 JobExecution 实例。随后,你可以获取该
JobExecution 的 id
(通过 JobExecution.getJobInstanceId()),再借助
JobExplorer 查询 JobRepository,
以获取该作业的最新状态。更多信息可参见
查询仓库。
Spring Batch 集成配置
设想这样一个场景:需要创建一个文件 inbound-channel-adapter,
监听指定目录中的 CSV 文件,将它们交给转换器
(FileMessageToJobRequest)处理,再通过作业启动网关启动作业,
并使用 logging-channel-adapter 记录 JobExecution 的输出。
-
Java
-
XML
下面的示例展示了如何使用 Java 配置这一常见场景:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
下面的示例展示了如何使用 XML 配置这一常见场景:
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
ItemReader 配置示例
现在我们已经在轮询文件并启动作业,接下来需要配置 Spring Batch 的
ItemReader,使其读取由作业参数 "input.file.name"
指定位置上的文件,下面的 Bean 配置展示了这一做法:
-
Java
-
XML
下面的 Java 示例展示了所需的 Bean 配置:
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
下面的 XML 示例展示了所需的 Bean 配置:
<bean id="itemReader" class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
上例中的关键点有两个:一是把
#{jobParameters['input.file.name']} 注入为 Resource 属性值;
二是把 ItemReader Bean 设为 step 作用域。将 Bean 设为
step 作用域后,就可以利用延迟绑定能力,从而访问
jobParameters 变量。