|
当前版本仍在开发中,尚不被视为稳定版本。最新稳定版请使用 Spring Batch 文档 6.0.2! |
数据库
和大多数企业应用风格一样,数据库是批处理的核心存储机制。不过,批处理与其他应用风格的不同之处在于它往往要处理极其庞大的数据集。 如果一条 SQL 语句返回 100 万行,那么结果集很可能会在所有行都被读取完之前一直把这些结果保留在内存中。 Spring Batch 为这个问题提供了两类解决方案:
基于游标的 ItemReader 实现
对于大多数批处理开发者来说,使用数据库游标通常是默认做法,因为这是数据库处理关系型数据“流式读取”问题的标准方案。
Java 的 ResultSet 类本质上就是一个面向对象的游标操作机制。ResultSet 维护着一个指向当前数据行的游标,调用
ResultSet 的 next 方法会把游标移动到下一行。Spring Batch 中基于游标的 ItemReader 实现会在初始化时打开游标,
并在每次调用 read 时将游标向前移动一行,然后返回一个可供处理的映射对象。之后再调用 close,确保所有资源都被释放。
Spring Core 的 JdbcTemplate 则通过回调模式规避这个问题:它会在把 ResultSet 中所有行完整映射并关闭之后,才把控制权返回给调用方。
但在批处理中,这必须等到整个 step 完成后才能结束。下图展示了基于游标的 ItemReader 的通用工作方式。注意,虽然示例使用的是 SQL
(因为 SQL 最为人熟知),但这种基本思路并不局限于某一种技术。
这个示例说明了其基本模式。假设有一张名为 FOO 的表,包含三列:ID、NAME 和 BAR。
现在选出所有 ID 大于 1 且小于 7 的记录。这样,游标起始位置(第 1 行)就会落在 ID 为 2 的记录上。
这一行的结果应当被完整映射为一个 Foo 对象。再次调用 read() 时,游标会移动到下一行,也就是 ID 为 3 的那个 Foo。
每次 read 得到的结果都可以立即写出,从而使这些对象有机会被垃圾回收(前提是没有实例变量持有它们的引用)。
JdbcCursorItemReader
JdbcCursorItemReader 是基于游标技术的 JDBC 实现。它直接操作 ResultSet,并要求提供一条 SQL 语句,
在从 DataSource 获取的连接上执行。下面的数据库表结构将作为示例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
很多人更倾向于把每一行映射为一个领域对象,因此下面的示例使用了一个 RowMapper 接口实现来映射
CustomerCredit 对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
由于 JdbcCursorItemReader 与 JdbcTemplate 共享一些关键接口,因此先看一个使用 JdbcTemplate
读取这批数据的例子,再与 ItemReader 做对比会更有帮助。为方便说明,假设 CUSTOMER 表中有 1000 行数据。
第一个示例使用的是 JdbcTemplate:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
执行完前面的代码片段后,customerCredits 列表中会包含 1000 个 CustomerCredit 对象。在 query 方法内部,
系统会从 DataSource 获取连接,在其上执行提供的 SQL,并对 ResultSet 中的每一行调用一次 mapRow 方法。
下面再看 JdbcCursorItemReader 的做法,与之进行对比:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
执行完前面的代码片段后,counter 的值会等于 1000。如果上面的代码把返回的 customerCredit 放进一个列表中,
那么结果会与 JdbcTemplate 示例完全相同。但 ItemReader 的最大优势在于它允许 item 以“流式”方式处理。
你可以调用一次 read 获取一个 item,再由 ItemWriter 把它写出,然后继续通过 read 获取下一个 item。
这就使得 item 的读取和写出可以按 chunk 方式分批处理并定期提交,而这正是高性能批处理的核心。此外,它也很容易被配置后注入到 Spring Batch 的 Step 中。
-
Java
-
XML
下面的示例展示了如何在 Java 中把 ItemReader 注入到一个 Step 中:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
下面的示例展示了如何在 XML 中把 ItemReader 注入到一个 Step 中:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
附加属性
由于在 Java 中打开游标的方式存在很多差异,因此 JdbcCursorItemReader 也提供了许多可配置属性,如下表所示:
ignoreWarnings |
决定 SQLWarning 是仅记录日志还是触发异常。默认值为 |
fetchSize |
在 |
maxRows |
设置底层 |
queryTimeout |
设置驱动等待 |
verifyCursorPosition |
由于 |
saveState |
指示是否应将 reader 的状态保存到 |
driverSupportsAbsolute |
指示 JDBC 驱动是否支持在 |
setUseSharedExtendedConnection |
指示游标所使用的连接是否应被其他处理共享,从而参与同一事务。如果设为 |
StoredProcedureItemReader
有时必须通过存储过程来获取游标数据。StoredProcedureItemReader 的工作方式与 JdbcCursorItemReader 类似,
不同之处在于它不是执行查询来获取游标,而是执行一个返回游标的存储过程。这个存储过程可以通过以下三种方式返回游标:
-
作为返回的
ResultSet(SQL Server、Sybase、DB2、Derby 和 MySQL 采用这种方式)。 -
作为 out 参数返回的 ref-cursor(Oracle 和 PostgreSQL 采用这种方式)。
-
作为存储函数调用的返回值。
-
Java
-
XML
下面的 Java 配置示例沿用了前面同一个“customer credit”示例:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
下面的 XML 配置示例也沿用了前面的“customer credit”示例:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
前面的示例依赖于存储过程直接返回一个 ResultSet(也就是前面提到的第 1 种方式)。
如果存储过程返回的是一个 ref-cursor(第 2 种方式),那么就需要提供该 ref-cursor 所在 out 参数的位置。
-
Java
-
XML
下面的示例展示了如何在 Java 中处理“第一个参数就是 ref-cursor”的场景:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
下面的示例展示了如何在 XML 中处理“第一个参数就是 ref-cursor”的场景:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果游标是由存储函数返回的(第 3 种方式),那么需要把属性 “function” 设为 true。它的默认值是 false。
-
Java
-
XML
下面的示例展示了如何在 Java 中把该属性设为 true:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
下面的示例展示了如何在 XML 中把该属性设为 true:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有这些场景下,都需要定义一个 RowMapper,以及一个 DataSource 和实际的过程名称。
如果存储过程或函数有输入参数,那么必须通过 parameters 属性进行声明和设置。下面这个 Oracle 示例声明了三个参数:
第一个是返回 ref-cursor 的 out 参数,第二和第三个则是值类型为 INTEGER 的 in 参数。
-
Java
-
XML
下面的示例展示了如何在 Java 中处理这些参数:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
下面的示例展示了如何在 XML 中处理这些参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了参数声明之外,还需要指定一个 PreparedStatementSetter 实现,用于为调用设置参数值。它的工作方式与上面的
JdbcCursorItemReader 完全相同。在 附加属性 中列出的所有附加属性,同样也适用于 StoredProcedureItemReader。
基于分页的 ItemReader 实现
除了使用数据库游标之外,另一种做法是执行多次查询,每次查询只取回部分结果。我们把这一部分结果称为一页(page)。 每次查询都必须指定起始行号以及希望该页返回的行数。
JdbcPagingItemReader
分页型 ItemReader 的一个实现是 JdbcPagingItemReader。它需要一个 PagingQueryProvider,
用于提供检索组成某一页数据所需 SQL 的查询语句。由于每种数据库都有自己的分页支持策略,因此对每种受支持数据库,都需要使用不同的
PagingQueryProvider。此外,还有 SqlPagingQueryProviderFactoryBean,它会自动检测当前使用的数据库,
并确定合适的 PagingQueryProvider 实现。这样能简化配置,也是推荐的最佳实践。
SqlPagingQueryProviderFactoryBean 要求你提供 select 子句和 from 子句,也可以额外提供可选的
where 子句。这些子句再结合必需的 sortKey 用于构造 SQL 语句。
必须在 sortKey 上建立唯一键约束,以保证不同执行之间不会丢失数据。
|
reader 打开之后,会像其他任何 ItemReader 一样,每次调用 read 返回一个 item。真正的分页获取发生在后台,
只有当需要更多数据行时才会触发。
-
Java
-
XML
下面的 Java 配置示例沿用了前面基于游标的 ItemReader 所使用的类似 “customer credit” 示例:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
下面的 XML 配置示例也沿用了前面基于游标的 ItemReader 所使用的类似 “customer credit” 示例:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
这个已配置好的 ItemReader 会借助必须显式指定的 RowMapper 返回 CustomerCredit 对象。
pageSize 属性决定了每次执行查询时从数据库中读取多少条实体。
parameterValues 属性可用于为查询指定一个参数值 Map。如果在 where 子句中使用的是命名参数,
那么每个 entry 的 key 应与命名参数的名称一致;如果使用的是传统的 ? 占位符,那么每个 entry 的 key 应从 1 开始,
对应占位符的位置编号。
JpaPagingItemReader
分页型 ItemReader 的另一个实现是 JpaPagingItemReader。JPA 没有类似 Hibernate
StatelessSession 的概念,因此必须借助 JPA 规范提供的其他特性。由于 JPA 天然支持分页,所以在使用 JPA
进行批处理时,这是一个很自然的选择。每读取完一页数据后,这些实体都会变为 detached 状态,同时持久化上下文会被清空,
这样在该页处理完成后,这些实体就可以被垃圾回收。
JpaPagingItemReader 允许你声明一条 JPQL 语句,并传入一个 EntityManagerFactory。之后,它会像其他任何
ItemReader 一样,每次调用 read 返回一个 item。真正的分页获取发生在后台,只有当需要更多实体时才会触发。
-
Java
-
XML
下面的 Java 配置示例沿用了前面 JDBC reader 所使用的同一个 “customer credit” 示例:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
下面的 XML 配置示例也沿用了前面 JDBC reader 所使用的同一个 “customer credit” 示例:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
这个配置好的 ItemReader 会以与上面 JdbcPagingItemReader 相同的方式返回 CustomerCredit 对象,
前提是 CustomerCredit 对象已经具备正确的 JPA 注解或 ORM 映射文件。pageSize 属性决定了每次执行查询时从数据库中读取多少个实体。
数据库 ItemWriters
While both flat files and XML files have a specific ItemWriter instance, there is no exact equivalent
在数据库世界中并不存在一个完全等价的专用实现,因为事务本身已经提供了所需的全部能力。
之所以文件场景需要 ItemWriter 实现,是因为它们必须表现得“像事务性资源一样”,需要跟踪已写出的 item,并在合适的时机执行 flush 或 clear。
而数据库本身不需要这类额外能力,因为写操作本来就已经处于事务之中。用户既可以自己编写实现了 ItemWriter 接口的 DAO,
也可以使用出于通用处理目的而编写的自定义 ItemWriter。无论哪种方式,通常都可以正常工作。
不过有一点需要特别留意:批量输出在性能和错误处理方面会带来一些影响。这种情况在使用 Hibernate 作为 ItemWriter 时最常见,
但在使用 JDBC batch 模式时也可能遇到同样的问题。只要我们谨慎地执行 flush,并且数据本身没有错误,数据库批量写出本身并没有固有缺陷。
但一旦写入过程中发生错误,就可能带来困扰,因为你无法知道到底是哪个具体 item 导致了异常,甚至无法确定是否真的是某一个具体 item 负责,
如下图所示:
如果 item 在真正写出之前先被缓存在内存中,那么任何错误都要等到提交前执行 flush 时才会被抛出。举例来说,假设每个 chunk
写出 20 个 item,而第 15 个 item 抛出了 DataIntegrityViolationException。从 Step 的视角看,20 个 item 似乎都已经成功写出,
因为在真正写入数据库之前,根本无法得知是否会发生错误。一旦调用 Session#flush(),缓冲区被清空,异常才会出现。到这时,Step
已经无能为力,事务只能回滚。通常情况下,这类异常可能会触发 skip(取决于 skip/retry 策略),然后该 item 不再被重新写入。
但在批量写出的场景中,你无法判断究竟是哪一个 item 导致了问题,因为失败发生时,整个缓冲区都在一起写出。解决这个问题的唯一办法,
就是在每个 item 写出之后立即 flush,如下图所示:
这是一个非常常见的场景,尤其是在使用 Hibernate 时。对于 ItemWriter 的实现,一个简单的准则就是:每次调用
write() 后都执行 flush。这样可以让 item 的 skip 处理更加可靠,而 Spring Batch 会在内部负责出错后对 ItemWriter
调用粒度的控制。