谁有我的老板是耶稣 电影的百度云呢?谢谢分享。

spring Batch实现数据库大数据量读写 -
- ITeye技术网站
博客分类:
1. data-source-context.xml
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"&
&!-- 1) USE ANNOTATIONS TO IDENTIFY AND WIRE SPRING BEANS. --&
&context:component-scan base-package="net.etongbao.vasp.ac" /&
&!-- 2) DATASOURCE, TRANSACTION MANAGER AND JDBC TEMPLATE --&
&bean id="dataSource" class="com.mchange.boPooledDataSource"
destroy-method="close" abstract="false" scope="singleton"&
&!-- oracle.jdbc.driver.oracleDriver --&
&property name="driverClass" value="oracle.jdbc.OracleDriver" /&
&property name="jdbcUrl" value="jdbc:oracle:thin:@192.168.1.23:1521:orcl01" /&
&property name="user" value="USR_DEV01" /&
&property name="password" value="2AF0829C" /&
&property name="checkoutTimeout" value="30000" /&
&property name="maxIdleTime" value="120" /&
&property name="maxPoolSize" value="100" /&
&property name="minPoolSize" value="2" /&
&property name="initialPoolSize" value="2" /&
&property name="maxStatements" value="0" /&
&property name="maxStatementsPerConnection" value="0" /&
&property name="idleConnectionTestPeriod" value="30" /&
&bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"&
&property name="dataSource" ref="dataSource" /&
&bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager"&
&property name="dataSource" ref="dataSource" /&
&tx:annotation-driven transaction-manager="transactionManager" /&
2. quartz-context.xml
commit-interval="10000"每次批量数据的条数,数值越大效率越高,可在此处添加事物处理,
每次回滚数就是commit-interval数
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"&
&import resource="data-source-context.xml"/&
JOB REPOSITORY - WE USE IN-MEMORY REPOSITORY FOR OUR EXAMPLE --&
&bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"&
&property name="transactionManager" ref="transactionManager" /&
&!-- batch config --&
&bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher"&
&property name="jobRepository" ref="jobRepository" /&
FINALLY OUR JOB DEFINITION. THIS IS A 1 STEP JOB --&
&batch:job id="ledgerJob"&
&batch:listeners&
&batch:listener ref="appJobExecutionListener" /&
&/batch:listeners&
&batch:step id="step1"&
&batch:tasklet transaction-manager="transactionManager"&
&batch:tasklet&
&batch:listeners&
&batch:listener ref="itemFailureLoggerListener" /&
&/batch:listeners&
&batch:chunk reader="ledgerReader" writer="ledgerWriter"
commit-interval="10000" /& &!-- 1万条进行一次commit --&
&/batch:tasklet&
&/batch:tasklet&
&/batch:step&
&/batch:job&
READER --&
&bean id="ledgerReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader"&
&property name="dataSource" ref="dataSource" /&
&property name="sql" value="select * from ledger" /&
&property name="rowMapper" ref="ledgerRowMapper" /&
&!-- Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑,可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理,
添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance --&
&bean id="jobParameterBulider" class="org.springframework.batch.core.JobParametersBuilder" /&
&!-- 定时任务 开始 --&
&bean id="ledgerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"&
&property name="targetObject"&
&!-- 定时执行的类 --&
&ref bean="quartzLedgerJob" /&
&/property&
&property name="targetMethod"&
&!-- 定时执行的类方法 --&
&value&execute&/value&
&/property&
&bean id="ledgerCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean" &
&!-- 这里不可以直接在属性jobDetail中引用taskJob,因为他要求的是一个jobDetail类型的对象,所以我们得通过MethodInvokingJobDetailFactoryBean来转一下 --&
&property name="jobDetail" &
&ref bean="ledgerJobDetail" /&
&/property&
&!--在每天下午18点到下午18:59期间的每1分钟触发
&!--在每天上午10点40分准时触发
&property name="cronExpression" &
&!-- &value&0 * 15 * * ?&/value& --&
&value&0 45 10 * * ? * &/value&
&/property&
&!-- 触发器工厂,将所有的定时任务都注入工厂--&
&bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"&
&!-- 添加触发器 --&
&property name="triggers"&
&!-- 将上面定义的测试定时任务注入(可以定义多个定时任务,同时注入)--&
&ref local="ledgerCronTrigger" /&
&/property&
&!-- 定时任务 结束 --&
3.定时调度job类 QuartzLedgerJob.java
package net.etongbao.vasp.ac.
import java.util.D
import org.slf4j.L
import org.slf4j.LoggerF
import org.springframework.batch.core.J
import org.springframework.batch.core.JobParametersB
import org.springframework.batch.core.launch.JobL
import org.springframework.beans.factory.annotation.A
import org.
import org.springframework.util.StopW
* 定时调度类
* @author Fu Wei
@Component("quartzLedgerJob")
public class QuartzLedgerJob {
private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class);
@Autowired
private JobLauncher jobL
@Autowired
private Job ledgerJ
@Autowired
JobParametersBuilder jobParameterB
private static long counter = 0l;
* 执行业务方法
* @throws Exception
public void execute() throws Exception {
LOG.debug("start...");
StopWatch sw = new StopWatch();
sw.start();
* Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑,
* 可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理,
* 添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance
jobParameterBulider.addDate("date", new Date());
jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters());
sw.stop();
LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", sw.prettyPrint(), ++counter);
4.程序启动类 StartQuartz.java
package net.etongbao.vasp.ac.
import java.io.FileNotFoundE
import org.springframework.context.support.ClassPathXmlApplicationC
* 启动定时调度
* @author Fu Wei
public class StartQuartz {
public static void main(String[] args) throws FileNotFoundException {
new ClassPathXmlApplicationContext("/net/etongbao/vasp/ac/resources/quartz-context.xml");
5.pojo类 Ledger.java
package net.etongbao.vasp.ac.
import java.io.S
import java.util.D
public class Ledger implements Serializable {
private Date receiptD
private String memberN
private String checkN
private Date checkD
private String paymentT
private double depositA
private double paymentA
public Ledger() {
public Ledger(int id, Date receiptDate, String memberName, String checkNumber, Date checkDate, String paymentType,
double depositAmount, double paymentAmount, String comments) {
this.receiptDate = receiptD
this.memberName = memberN
this.checkNumber = checkN
this.checkDate = checkD
this.paymentType = paymentT
this.depositAmount = depositA
this.paymentAmount = paymentA
public int getId() {
public void setId(int id) {
public Date getReceiptDate() {
return receiptD
public void setReceiptDate(Date receiptDate) {
this.receiptDate = receiptD
public String getMemberName() {
return memberN
public void setMemberName(String memberName) {
this.memberName = memberN
public String getCheckNumber() {
return checkN
public void setCheckNumber(String checkNumber) {
this.checkNumber = checkN
public Date getCheckDate() {
return checkD
public void setCheckDate(Date checkDate) {
this.checkDate = checkD
public String getPaymentType() {
return paymentT
public void setPaymentType(String paymentType) {
this.paymentType = paymentT
public double getDepositAmount() {
return depositA
public void setDepositAmount(double depositAmount) {
this.depositAmount = depositA
public double getPaymentAmount() {
return paymentA
public void setPaymentAmount(double paymentAmount) {
this.paymentAmount = paymentA
public String getComments() {
public void setComments(String comments) {
6. LedgerDaoImpl.java
package net.etongbao.vasp.ac.dao.
import java.sql.PreparedS
import java.sql.SQLE
import net.etongbao.vasp.ac.dao.LedgerD
import net.etongbao.vasp.ac.pojo.L
import org.springframework.beans.factory.annotation.A
import org.springframework.jdbc.core.JdbcT
import org.springframework.jdbc.core.PreparedStatementS
import org.springframework.stereotype.R
* ledger数据操作类
* @author Fu Wei
@Repository
public class LedgerDaoImpl implements LedgerDao {
private static final String SAVE_SQL = "insert into ledger_temp (rcv_dt, mbr_nm, chk_nbr, chk_dt, pymt_typ, dpst_amt, pymt_amt, comments) values(?,?,?,?,?,?,?,?)";
@Autowired
private JdbcTemplate jdbcT
public void save(final Ledger item) {
jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() {
public void setValues(PreparedStatement stmt) throws SQLException {
stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime()));
stmt.setString(2, item.getMemberName());
stmt.setString(3, item.getCheckNumber());
stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime()));
stmt.setString(5, item.getPaymentType());
stmt.setDouble(6, item.getDepositAmount());
stmt.setDouble(7, item.getPaymentAmount());
stmt.setString(8, item.getComments());
7.接口 LedgerDao .java
package net.etongbao.vasp.ac.
import net.etongbao.vasp.ac.pojo.L
public interface LedgerDao {
public void save(final Ledger item) ;
8. JdbcTemplete 需要的LedgerRowMapper.java
package net.etongbao.vasp.ac.batch.
import java.sql.ResultS
import java.sql.SQLE
import net.etongbao.vasp.ac.pojo.L
import org.springframework.jdbc.core.RowM
import org.
* ledger行的映射类
* @author Administrator
@Component("ledgerRowMapper")
public class LedgerRowMapper implements RowMapper {
public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
Ledger ledger = new Ledger();
ledger.setId(rs.getInt("id"));
ledger.setReceiptDate(rs.getDate("rcv_dt"));
ledger.setMemberName(rs.getString("mbr_nm"));
ledger.setCheckNumber(rs.getString("chk_nbr"));
ledger.setCheckDate(rs.getDate("chk_dt"));
ledger.setPaymentType(rs.getString("pymt_typ"));
ledger.setDepositAmount(rs.getDouble("dpst_amt"));
ledger.setPaymentAmount(rs.getDouble("pymt_amt"));
ledger.setComments(rs.getString("comments"));
9.关键类LedgerWriter.java ,写入数据,负责数据的添加
package net.etongbao.vasp.ac.batch.
import java.util.L
import net.etongbao.vasp.ac.dao.LedgerD
import net.etongbao.vasp.ac.pojo.L
import org.springframework.batch.item.ItemW
import org.springframework.beans.factory.annotation.A
import org.
* ledger写入数据
* @author Fu Wei
@Component("ledgerWriter")
public class LedgerWriter implements ItemWriter&Ledger& {
@Autowired
private LedgerDao ledgerD
* 写入数据
* @param ledgers
public void write(List&? extends Ledger& ledgers) throws Exception {
for (Ledger ledger : ledgers) {
ledgerDao.save(ledger);
classPath:
&?xml version="1.0" encoding="UTF-8"?&
&classpath&
&classpathentry kind="src" path="src"/&
&classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jrockit-jdk1.6.0_24-R28.1.3-4.0.1"/&
&classpathentry kind="lib" path="lib/aopalliance-1.0.jar"/&
&classpathentry kind="lib" path="lib/c3p0-0.9.1.2.jar"/&
&classpathentry kind="lib" path="lib/commons-collections-3.2.1.jar"/&
&classpathentry kind="lib" path="lib/commons-lang-2.3.jar"/&
&classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/&
&classpathentry kind="lib" path="lib/etb-log4j-1.2.16.jar"/&
&classpathentry kind="lib" path="lib/etb-slf4j-api-1.5.8.jar"/&
&classpathentry kind="lib" path="lib/etb-slf4j-log4j12-1.5.8.jar"/&
&classpathentry kind="lib" path="lib/ojdbc6.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.aop-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.asm-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.aspects-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.beans-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.context-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.context.support-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.core-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.expression-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.instrument-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.instrument.tomcat-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.jdbc-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.jms-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.orm-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.oxm-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.test-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/org.springframework.transaction-3.0.5.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/quartz-all-1.6.5.jar"/&
&classpathentry kind="lib" path="lib/spring-batch-core-2.1.6.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/spring-batch-infrastructure-2.1.6.RELEASE.jar"/&
&classpathentry kind="lib" path="lib/spring-batch-test-2.1.6.RELEASE.jar"/&
&classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/&
&classpathentry kind="output" path="bin"/&
&/classpath&
总结: 测试数据8万多条,响应时间3分多钟。
关键在于quartz-context.xml 中&bean id="ledgerReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader"&
&property name="dataSource" ref="dataSource" /&
&property name="sql" value="select * from ledger" /&
&property name="rowMapper" ref="ledgerRowMapper" /&
&/bean& 负责读取数据 ,在程序执行时一次性抓取全部数据后在批量的交给LedgerWriter进行写操作。当然也可以使用分页读取JdbcPagingItemReader,但要分页数量与写入数量要大写相同,还可以对分页出来的数据进行添加悲观锁
LedgerWriter.java 负责写入数据,每次写入1000条。
下载次数: 1204
浏览 34806
JimmyLincole 写道1楼的问题,解决方法可以看一下这个:The issue here is that your reader is singleton scoped. This means that when the job runs the first time, the ItemReader is opened successfully and the job runs. However when the job attempts to run a second time, it's using the same instance it did the first time which is already initialized (hence the exception). I'd recommend changing the readyReqPoolReader to be a step scoped bean and see if that helps.You can read more about step scope here: http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/scope/StepScope.html3.0与2.2.7 有什么差别,我升级jar包后配置文件出错3.0与2.2.7有啥差别我也不太清楚呢,不好意思
1楼的问题,解决方法可以看一下这个:The issue here is that your reader is singleton scoped. This means that when the job runs the first time, the ItemReader is opened successfully and the job runs. However when the job attempts to run a second time, it's using the same instance it did the first time which is already initialized (hence the exception). I'd recommend changing the readyReqPoolReader to be a step scoped bean and see if that helps.You can read more about step scope here: http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/scope/StepScope.html3.0与2.2.7 有什么差别,我升级jar包后配置文件出错
很nice 的文章
浏览: 94803 次
来自: 北京
问个问题,spring batch 批量写数据时能否做日志输出 ...
yjc2020 写道JimmyLincole 写道1楼的问题, ...
JimmyLincole 写道1楼的问题,解决方法可以看一下这 ...
1楼的问题,解决方法可以看一下这个:The issue her ...
我也遇到了1楼的错误 Reader must be openSpring Batch 2.2.0.RELEASE 发布_Linux新闻_Linux公社-Linux系统门户网站
你好,游客
Spring Batch 2.2.0.RELEASE 发布
来源:Linux社区&
作者:Linux
Spring Batch 2.2.0.RELEASE 发布了,相关链接: |
该版本新特性包括:
支持 Spring Data
基于 Java 注解的配置
非标识作业参数
支持 SQLFire
依赖包的升级
其他的改进内容请看 .
SpringBatch,作为一个 Spring 组件,提供了通过使用 Spring 的 依赖注入(dependency injection) 来处理批处理的条件。下图是 Spring Batch 的体系结构。
Spring Batch 的详细介绍:Spring Batch 的下载地址:
相关资讯 & & &
& (12/12/:32)
& (06/26/:43)
& (08/25/:05)
& (10/29/:24)
& (06/26/:49)
& (07/27/:59)
   同意评论声明
   发表
尊重网上道德,遵守中华人民共和国的各项有关法律法规
承担一切因您的行为而直接或间接导致的民事或刑事法律责任
本站管理人员有权保留或删除其管辖留言中的任意内容
本站有权在网站内转载或引用您的评论
参与本评论即表明您已经阅读并接受上述条款京 东 价:
[定价:¥]
PLUS会员专享价
支  持:
重  量:
选择系列:
搭配赠品:
服务支持:
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
Spring Batch 批处理框架
商品介绍加载中...
扫一扫,精彩好书免费看
京东商城向您保证所售商品均为正品行货,京东自营商品开具机打发票或电子发票。
凭质保证书及京东商城发票,可享受全国联保服务(奢侈品、钟表除外;奢侈品、钟表由京东联系保修,享受法定三包售后服务),与您亲临商场选购的商品享受相同的质量保证。京东商城还为您提供具有竞争力的商品价格和,请您放心购买!
注:因厂家会在没有任何提前通知的情况下更改产品包装、产地或者一些附件,本司不能确保客户收到的货物与商城图片、产地、附件说明完全一致。只能确保为原厂正货!并且保证与当时市场上同样主流新品一致。若本商城没有及时更新,请大家谅解!
权利声明:京东上的所有商品信息、客户评价、商品咨询、网友讨论等内容,是京东重要的经营资源,未经许可,禁止非法转载使用。
注:本站商品信息均来自于合作方,其真实性、准确性和合法性由信息拥有者(合作方)负责。本站不提供任何保证,并不承担任何法律责任。
印刷版次不同,印刷时间和版次以实物为准。
价格说明:
京东价:京东价为商品的销售价,是您最终决定是否购买商品的依据。
划线价:商品展示的划横线价格为参考价,该价格可能是品牌专柜标价、商品吊牌价或由品牌供应商提供的正品零售价(如厂商指导价、建议零售价等)或该商品在京东平台上曾经展示过的销售价;由于地区、时间的差异性和市场行情波动,品牌专柜标价、商品吊牌价等可能会与您购物时展示的不一致,该价格仅供您参考。
折扣:如无特殊说明,折扣指销售商在原价、或划线价(如品牌专柜标价、商品吊牌价、厂商指导价、厂商建议零售价)等某一价格基础上计算出的优惠比例或优惠金额;如有疑问,您可在购买前联系销售商进行咨询。
异常问题:商品促销信息以商品详情页“促销”栏中的信息为准;商品的具体售价以订单结算页价格为准;如您发现活动商品售价或促销信息有异常,建议购买前先联系销售商咨询。
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
浏览了该商品的用户还浏览了
加载中,请稍候...
联系供应商
七日畅销榜
新书热卖榜
iframe(src='///ns.html?id=GTM-T947SH', height='0', width='0', style='display: visibility:')2106人阅读
Spring(6)
Spring Batch提供了多种方式用于处理并行,提高性能。主要分为2大类:
- 单个进程,多线程
- 多个进程
因此,可以细分为以下几类:
- 多线程Step(Multi-thread Step,single process)
- 并行Step(Parallel Steps, single process )
- Remote Chunking of Step( multi process)
- Partitioning a step(single or multi process)
2. Multi-Thread Step
最直接的方式是给Step配置一个TaskExecutor
&step id="loading"&
&tasklet task-executor="taskExecutor"&...&/tasklet&
此时,taskExecutor的线程并行来执行Item处理(统一item的read,process,write在同一个线程中执行)。可以限制TaskExecutor的阈值(默认为4):
&step id="loading"& &tasklet
task-executor="taskExecutor"
throttle-limit="20"&...&/tasklet&
需要注意的是,在多线程Step中,需要确保Reader、Processor和Writer是线程安全的,否则容易出现并发问题。Spring Batch提供的大部分组件都是非线程安全的,他们都保存有部分状态信息,主要是为了支持任务重启。
因此,使用多线程Step的核心任务是实现无状态化,例如不保存当前读取的item的cursor,而是同item的flag字段来区分item是否被处理过,已经被处理过的下次重启的时候,直接被过滤掉。
多线程Step实现的是单个Step的多线程化。
3. Parallel Steps
如果多个Step没有先后关系,可以并行执行,这是通过split和flow来实现的:
id="job1"&
id="split1" task-executor="taskExecutor" next="step4"&
id="step1" parent="s1" next="step2"/&
id="step2" parent="s2"/&
id="step3" parent="s3"/&
id="step4" parent="s4"/&
id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/&
该种模式提供的是多个Step的并行处理。
4. Remote Chunking
Remote chunking 的示意图如下:
Master为单个进程,因此只有在处理所需要的时间远远大于读取所需要的时间的时候,这个方式才适用,否则Master容易成为瓶颈。
Master是一个常规的Step实现,只不过它的ItemWriter知道如何将Items分块,并发送到中间件(例如JMS),通过实现ChunkProvider接口来实现。
public interface ChunkProvider&T&{
Chunk&T& provide(StepContribution contribution) throws E
void postProcess(StepContribution contribution, Chunk&T& chunk);
Slave则充当中间件的Listener,通过ItemProcessor和ItemWriter来实现item处理,具体的是通过实现ChunkProcessor接口
public interface ChunkProcessr&T& {
void process(StepContribution contribution,Chunk&T& chunk) throws E
可以看到,remote chunking实现的是(Processor、Writer)的并行化。分区不需要对数据源的结构有很明确的了解。
5. Partitioning
Step分区处理示意图如下:
一个分区配置如下:
&step id="step1.master"&
&partition step="step1" partitioner="partitioner"&
&handler grid-size="10" task-executor="taskExecutor"/&
&/partition&
&step id="step1"&
&chunk reader="" writer"" processor="" .../&
&/tasklet&
主要包括2个步骤:
1. 数据分区
2. 分区处理
具体的分区执行流程如下:
PartitionHandler
其中PartitionHandler知道集群环境,根据下面要介绍的Splitter进行分区,发送执行请求(通过WebService ,RMI等方式) 并收集执行结果,聚合,最终反馈给Job。Spring Batch提供了一个同一台机器上的Handler实现,在同一机器上创建多个Step Execution。
&step id="step1.master"&
&partition step="step1" handler="handler"/&
&bean class="org.spr...TaskExecutorPartitionHandler"&
&property name="taskExecutor" ref="taskExecutor"/&
&property name="step" ref="step1" /&
&property name="gridSize" value="10" /&
Partitioner
Partitioner负责生成执行上下文,作为Step Execution的输入参数,其接口定义如下:
public interface Partitioner {
Map&String, ExecutionContext& partition(int gridSize);
返回结果中Map的key,是一个唯一的名字,常见的实现方式是step_name + counter。或者通过PartitioneNameProvider来提供。 名字关联到对应的执行上下文。ExecutionContext只是一个key/value容器,因此它可能包含主键范围,行数等信息。
StepExecutionSplitter
Partitioner生成的ExecutionContext,经过StepExecutionSplitter处理之后形成StepExecution,然后交给Handler处理。StepExecutionSplitter接口定义如下:
public interface StepExecutionSplitter {
String getStepName();
Set&StepExecution& split(StepExecution stepExecution , int gridSize)
throws JobExecutionE
通常,Slave中的Step配置都是相同的,他们通过获取Partitioner划分好的ExecutionContext,获取Step的输入参数,动态绑定到Step中。例如划分的情况如下表:
step execution name(key)
ExecutionContext(value)
filecopy:partition0
file_name=/home/data/0
filecopy:partition1
file_name=/home/data/1
filecopy:partition2
file_name=/home/data/2
然后该文件名被绑定到Step的组件中:
&bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader"&
&property name="resource" value="#{stepExecutionContext[file_name]}/*"/&
整个具体流程如下:
可以看出,Patitioning提供的是(Reader、Processor、Writer)的并行化。分区模式需要对数据源的结构有一定的了解,比如知道主键范围。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:445446次
积分:4105
积分:4105
排名:第6374名
原创:137篇
转载:14篇
评论:87条

我要回帖

更多关于 我的老板是耶稣 的文章

 

随机推荐