spring batch批处理 入门ITeye - AG环亚娱乐集团

spring batch批处理 入门ITeye

2019年03月06日14时40分27秒 | 作者: 痴安 | 标签: 处理,批处理,读取 | 浏览: 1539

 

参阅:spring batch参阅

 

spring batch的处理流程:

 

读取数据- 处理数据- 写数据

 

reader- process- writer

 

 

maven 依靠:

 properties 
 spring.version 3.2.2.RELEASE /spring.version 
 spring.batch.version 2.2.0.RELEASE /spring.batch.version 
 mysql.driver.version 5.1.25 /mysql.driver.version 
 junit.version 4.11 /junit.version 
 /properties 
 dependencies 
 ! Spring Core  
 dependency 
 groupId org.springframework /groupId 
 artifactId spring-core /artifactId 
 version ${spring.version} /version 
 /dependency 
 ! Spring jdbc, for database  
 dependency 
 groupId org.springframework /groupId 
 artifactId spring-jdbc /artifactId 
 version ${spring.version} /version 
 /dependency 
 ! Spring XML to/back object  
 dependency 
 groupId org.springframework /groupId 
 artifactId spring-oxm /artifactId 
 version ${spring.version} /version 
 /dependency 
 ! MySQL database driver  
 dependency 
 groupId mysql /groupId 
 artifactId mysql-connector-java /artifactId 
 version ${mysql.driver.version} /version 
 /dependency 
 ! Spring Batch dependencies  
 dependency 
 groupId org.springframework.batch /groupId 
 artifactId spring-batch-core /artifactId 
 version ${spring.batch.version} /version 
 /dependency 
 dependency 
 groupId org.springframework.batch /groupId 
 artifactId spring-batch-infrastructure /artifactId 
 version ${spring.batch.version} /version 
 /dependency 
 ! Spring Batch unit test  
 dependency 
 groupId org.springframework.batch /groupId 
 artifactId spring-batch-test /artifactId 
 version ${spring.batch.version} /version 
 /dependency 
 ! Junit  
 dependency 
 groupId junit /groupId 
 artifactId junit /artifactId 
 version ${junit.version} /version 
 scope test /scope 
 /dependency 
 /dependencies 

 

POJO类:

 

package com.tch.test.spring.batch.entity;
import java.text.SimpleDateFormat;
import java.util.Date;
public class Report {
 private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
 private int id;
 private Date date;
 public int getId() {
 return id;
 public void setId(int id) {
 this.id = id;
 public Date getDate() {
 return date;
 public void setDate(Date date) {
 this.date = date;
 @Override
 public String toString() {
 return "Report [id=" + id + ", date=" + dateFormat.format(date) + "]";
}

 

mapper类:

 

package com.tch.test.spring.batch;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import com.tch.test.spring.batch.entity.Report;
public class ReportFieldSetMapper implements FieldSetMapper Report {
 private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
 @Override
 public Report mapFieldSet(FieldSet fieldSet) throws BindException {
 Report report = new Report();
 report.setId(fieldSet.readInt(0));
 String date = fieldSet.readString(1);
 try {
 report.setDate(dateFormat.parse(date));
 } catch (ParseException e) {
 e.printStackTrace();
 return report;
}

 

 

 

processor类:

 

package com.tch.test.spring.batch;
import org.springframework.batch.item.ItemProcessor;
import com.tch.test.spring.batch.entity.Report;
public class CustomItemProcessor implements ItemProcessor Report, Report {
 @Override
 public Report process(Report report) throws Exception {
 System.out.println("Processing..." + report);
 return report;
}

 

 

writer类:

 

package com.tch.test.spring.batch;
import java.util.List;
import org.springframework.batch.item.ItemWriter;
import com.tch.test.spring.batch.entity.Report;
public class ReportItemWriter implements ItemWriter Report {
 public void write(List ? extends Report reports) throws Exception {
 for (Report m : reports) {
 System.out.println("write results : "+m);
}

 

 

beans.xml:(commit-interval表明批处理每次事务处理记载数,这儿每次处理十条)

 

 beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://www.springframework.org/schema/batch
 http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
 http://www.springframework.org/schema/beans 
 http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
 bean id="itemProcessor" / 
 batch:job id="helloWorldJob" 
 batch:step id="step1" 
 batch:tasklet 
 batch:chunk reader="cvsFileItemReader" writer="reportWriter" processor="itemProcessor" commit-interval="10" 
 /batch:chunk 
 /batch:tasklet 
 /batch:step 
 /batch:job 
 bean id="cvsFileItemReader" 
 property name="resource" value="classpath:report.txt" / 
 property name="lineMapper" 
 bean 
 property name="lineTokenizer" 
 bean 
 /bean 
 /property 
 property name="fieldSetMapper" 
 bean / 
 /property 
 /bean 
 /property 
 /bean 
 bean id="reportWriter" /bean 
 bean id="jobLauncher" 
 property name="jobRepository" ref="jobRepository" / 
 /bean 
 bean id="jobRepository" 
 property name="transactionManager" ref="transactionManager" / 
 /bean 
 bean id="transactionManager" / 
 /beans 

 

 

report.txt:

 

1001, 29/7/2013
1002, 30/7/2013
1003, 31/7/2013
1004, 29/7/2013
1005,30/7/2013
1006, 31/7/2013
1007, 29/7/2013
1008,30/7/2013
1009, 31/7/2013
1010, 29/7/2013
1011,30/7/2013
1012, 31/7/2013
1013, 29/7/2013
1014,30/7/2013
1015, 31/7/2013
1016, 29/7/2013
1017,30/7/2013
1018, 31/7/2013

 

Test:

 

package com.tch.test.spring.batch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Test {
 public static void main(String[] args) throws Exception {
 String[] springConfig = { "beans.xml" };
 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
 JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
 Job job = (Job) context.getBean("helloWorldJob");
 JobExecution execution = jobLauncher.run(job, new JobParameters());
 System.out.println("Exit Status : " + execution.getStatus());
 System.out.println("Done");
 context.close();
}

 

 

 

 

假如要读取多个文件,则只需要运用MultiResourceItemReader即可:

 

beans.xml:

 

 beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://www.springframework.org/schema/batch
 http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
 http://www.springframework.org/schema/beans 
 http://www.springframework.org/schema/beans/spring-beans-3.2.xsd" 
 bean id="itemProcessor" / 
 batch:job id="helloWorldJob" 
 batch:step id="step1" 
 batch:tasklet 
 batch:chunk reader="multiResourceReader" writer="reportWriter"
 processor="itemProcessor" commit-interval="10" 
 /batch:chunk 
 /batch:tasklet 
 /batch:step 
 /batch:job 
 bean id="multiResourceReader"
 property name="resources" value="classpath:report*.txt" / 
 property name="delegate" ref="flatFileItemReader" / 
 /bean 
 bean id="flatFileItemReader" 
 property name="resource" value="classpath:report.txt" / 
 property name="lineMapper" 
 bean 
 property name="lineTokenizer" 
 bean
 /bean 
 /property 
 property name="fieldSetMapper" 
 bean / 
 /property 
 /bean 
 /property 
 /bean 
 bean id="reportWriter" /bean 
 bean id="jobLauncher"
 property name="jobRepository" ref="jobRepository" / 
 /bean 
 bean id="jobRepository"
 property name="transactionManager" ref="transactionManager" / 
 /bean 
 bean id="transactionManager"
 /beans 

 

 

 

spring batch 和 quartz 守时批处理:

 

beans.xml:

 

 beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://www.springframework.org/schema/batch
 http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
 http://www.springframework.org/schema/beans 
 http://www.springframework.org/schema/beans/spring-beans-3.2.xsd" 
 ! 处理数据  
 bean id="itemProcessor" / 
 ! 处理数据的job  
 batch:job id="helloWorldJob" 
 batch:step id="step1" 
 batch:tasklet 
 batch:chunk reader="multiResourceReader" writer="reportWriter"
 processor="itemProcessor" commit-interval="10" 
 /batch:chunk 
 /batch:tasklet 
 /batch:step 
 /batch:job 
 ! 读取多个资源的reader  
 bean id="multiResourceReader"
 ! 资源方位  
 property name="resources" value="classpath:report*.txt" / 
 ! 运用读取单个资源的reader  
 property name="delegate" ref="flatFileItemReader" / 
 /bean 
 ! 读取单个资源的reader  
 bean id="flatFileItemReader" 
 property name="lineMapper" 
 bean 
 ! 分词器  
 property name="lineTokenizer" 
 bean
 /bean 
 /property 
 ! 数据和实体的映射处理  
 property name="fieldSetMapper" 
 bean / 
 /property 
 /bean 
 /property 
 /bean 
 ! 数据处理完结之后,自定义写操作  
 bean id="reportWriter" /bean 
 ! job发动  
 bean id="jobLauncher"
 property name="jobRepository" ref="jobRepository" / 
 /bean 
 ! job库房  
 bean id="jobRepository"
 property name="transactionManager" ref="transactionManager" / 
 /bean 
 ! 事务办理  
 bean id="transactionManager"
 ! job注册  
 bean
 property name="jobRegistry" ref="jobRegistry" / 
 /bean 
 bean id="jobRegistry"
 ! 运用quartz进行调度办理(每5秒钟履行一次)  
 bean 
 property name="triggers" 
 bean id="cronTrigger" 
 ! 履行的操作  
 property name="jobDetail" ref="jobDetail" / 
 ! 守时表达式  
 property name="cronExpression" value="*/5 * * * * ?" / 
 /bean 
 /property 
 /bean 
 bean id="jobDetail" 
 ! 履行操作的class  
 property name="jobClass" value="com.tch.test.spring.batch.JobLauncherDetails" / 
 property name="group" value="quartz-batch" / 
 property name="jobDataAsMap" 
 map 
 entry key="jobName" value="helloWorldJob" / 
 entry key="jobLocator" value-ref="jobRegistry" / 
 entry key="jobLauncher" value-ref="jobLauncher" / 
 entry key="param1" value="value1" / 
 entry key="param2" value="value2" / 
 /map 
 /property 
 /bean 
 /beans 

 

 

JobLauncherDetails:

 

package com.tch.test.spring.batch;
import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;
import org.quartz.JobExecutionContext;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.scheduling.quartz.QuartzJobBean;
public class JobLauncherDetails extends QuartzJobBean {
 static final String JOB_NAME = "jobName";
 private JobLocator jobLocator;
 private JobLauncher jobLauncher;
 public void setJobLocator(JobLocator jobLocator) {
 this.jobLocator = jobLocator;
 public void setJobLauncher(JobLauncher jobLauncher) {
 this.jobLauncher = jobLauncher;
 @SuppressWarnings("unchecked")
 protected void executeInternal(JobExecutionContext context) {
 Map String, Object jobDataMap = context.getMergedJobDataMap();
 //拿到beans.xml中jobDetail里边装备的"jobName"对应的value
 String jobName = (String) jobDataMap.get(JOB_NAME);
 JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
 try {
 //运转对应的job
 jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
 } catch (JobExecutionException e) {
 e.printStackTrace();
 // 读取装备的参数
 private JobParameters getJobParametersFromJobMap(Map String, Object jobDataMap) {
 JobParametersBuilder builder = new JobParametersBuilder();
 for (Entry String, Object entry : jobDataMap.entrySet()) {
 String key = entry.getKey();
 Object value = entry.getValue();
 //过滤掉"jobName"
 if (value instanceof String !key.equals(JOB_NAME)) {
 builder.addString(key, (String) value);
 } else if (value instanceof Float || value instanceof Double) {
 builder.addDouble(key, ((Number) value).doubleValue());
 } else if (value instanceof Integer || value instanceof Long) {
 builder.addLong(key, ((Number) value).longValue());
 } else if (value instanceof Date) {
 builder.addDate(key, (Date) value);
 } else {
 //过滤掉beans.xml中jobDetail的jobDataAsMap装备的"jobLocator"、"jobLauncher"特点
 // JobDataMap contains values which are not job parameters
 // (ignoring)
 // need unique job parameter to rerun the completed job
 builder.addDate("run date", new Date());
 return builder.toJobParameters();
}

 

 

运转:

 

package com.tch.test.spring.batch;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Test {
 public static void main(String[] args) throws Exception {
 String[] springConfig = { "beans.xml" };
 context = new ClassPathXmlApplicationContext(springConfig);
}

 

 

 

 

 

 

 

版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表AG环亚娱乐集团立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章