欢迎使用Spring批处理示例。Spring批次是spring框架用于执行批处理作业的模块。我们可以使用spring批处理一系列作业。
目录
Spring批处理示例
在阅读spring批处理示例程序之前,让我们先了解一下Spring批处理术语。
- 一个作业可以由&;n’;个步骤组成。每个步骤包含读进程写任务或者它可以有一个单独的操作,称为tasklet。
- 读-处理-写入基本上是从数据库、CSV等源读取数据,然后处理数据并将其写入数据库、CSV、XML等源。
- Tasklet意味着执行单个任务或操作,如清理连接,在处理完成后释放资源。
- Read-Process-Write和微线程可以链接在一起以运行作业。
Spring批处理示例
让我们考虑一个springbatch实现的工作示例。出于实现目的,我们将考虑以下场景。
包含数据的CSV文件需要与数据一起转换为XML,并且标记将以列名命名。
下面是spring批处理示例中使用的重要工具和库。
- 阿帕奇Maven3.5.0–;用于项目构建和依赖关系管理。
- EclipseOxygen4.7.0和用于创建SpringBatch maven应用程序的IDE。
- Java 1.8版
- Spring芯4.3.12.释放
- SpringOXM 4.3.12.发布
- SpringJDBC4.3.12.放行
- Spring批次3.0.8.发布
- MySQL Java驱动程序5.1.25–;根据您的MySQL安装使用。这是Spring批处理元数据表所必需的。
Spring批处理示例目录结构
下图说明了springbatch示例项目中的所有组件。
Spring批处理Maven依赖项
以下是pom.xml文件包含spring批处理示例项目所需的所有依赖项的文件。
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.journaldev.spring</groupId>
<artifactId>SpringBatchExample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>SpringBatchDemo</name>
<url>https://maven.apache.org</url>
<properties>
<jdk.version>1.8</jdk.version>
<spring.version>4.3.12.RELEASE</spring.version>
<spring.batch.version>3.0.8.RELEASE</spring.batch.version>
<mysql.driver.version>5.1.25</mysql.driver.version>
<junit.version>4.11</junit.version>
</properties>
<dependencies>
<!-- Spring Core -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- Spring jdbc, for database -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- Spring XML to/back object -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- MySQL database driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.driver.version}</version>
</dependency>
<!-- Spring Batch dependencies -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<!-- Spring Batch unit test -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<!-- Junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>spring-batch</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Spring批处理CSV输入文件
下面是我们的示例CSV文件的内容,用于spring批处理。
1001,Tom,Moody, 29/7/2013
1002,John,Parker, 30/7/2013
1003,Henry,Williams, 31/7/2013
Spring批处理作业配置
我们必须定义Spring Bean以及配置文件中的spring批处理作业。以下是job-batch-demo.xml
file, it’s the most important part of spring batch project.
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:batch="https://www.springframework.org/schema/batch" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch-3.0.xsd
https://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans-4.3.xsd
">
<import resource="../config/context.xml" />
<import resource="../config/database.xml" />
<bean id="report"
scope="prototype" />
<bean id="itemProcessor" />
<batch:job id="DemoJobXMLWriter">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="csvFileItemReader" writer="xmlItemWriter"
processor="itemProcessor" commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="csvFileItemReader">
<property name="resource" value="classpath:csv/input/report.csv" />
<property name="lineMapper">
<bean>
<property name="lineTokenizer">
<bean
>
<property name="names" value="id,firstname,lastname,dob" />
</bean>
</property>
<property name="fieldSetMapper">
<bean />
<!-- if no data type conversion, use BeanWrapperFieldSetMapper to map
by name <bean>
<property name="prototypeBeanName" value="report" /> </bean> -->
</property>
</bean>
</property>
</bean>
<bean id="xmlItemWriter">
<property name="resource" value="file:xml/outputs/report.xml" />
<property name="marshaller" ref="reportMarshaller" />
<property name="rootTagName" value="report" />
</bean>
<bean id="reportMarshaller">
<property name="classesToBeBound">
<list>
<value>com.journaldev.spring.model.Report</value>
</list>
</property>
</bean>
</beans>
- 我们正在使用
FlatFileItemReader
to read CSV file,CustomItemProcessor
to process the data and write to XML file usingStaxEventItemWriter
. batch:job
– This tag defines the job that we want to create. Id property specifies the ID of the job. We can define multiple jobs in a single xml file.- 步骤:批处理–此标记用于定义spring批处理作业的不同步骤。
- Spring批处理框架提供了两种不同类型的处理方式,分别是“;面向TaskletStep”;和“;面向块”;。本例中使用的面向块的样式是指在事务边界内逐个读取数据并创建将被写出的“块”。
- reader:Spring Bean用于读取数据。我们用过
csvFileItemReader
bean in this example that is instance ofFlatFileItemReader
. - 处理器:这是用于处理数据的类。我们用过
CustomItemProcessor
in this example. - writer:用于将数据写入xml文件的bean。
- 提交间隔:此属性定义处理完成后将提交的块的大小。基本上,这意味着ItemReader将逐个读取数据,ItemProcessor也将以相同的方式处理数据,但ItemWriter只会在数据等于提交间隔的大小时写入数据。
- 本项目中使用的三个重要接口是
ItemReader
,ItemProcessor
andItemWriter
fromorg.springframework.batch.item
package.
Spring批处理模型类
首先,我们将CSV文件读入java对象,然后使用JAXB公司将其写入xml文件。下面是我们使用所需JAXB的模型类注解.
package com.journaldev.spring.model;
import java.util.Date;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "record")
public class Report {
private int id;
private String firstName;
private String lastName;
private Date dob;
@XmlAttribute(name = "id")
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@XmlElement(name = "firstname")
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
@XmlElement(name = "lastname")
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@XmlElement(name = "dob")
public Date getDob() {
return dob;
}
public void setDob(Date dob) {
this.dob = dob;
}
@Override
public String toString() {
return "Report [id=" + id + ", firstname=" + firstName + ", lastName=" + lastName + ", DateOfBirth=" + dob
+ "]";
}
}
注意,模型类字段应该与springbatchmapper配置中定义的相同,即。property name="names" value="id,firstname,lastname,dob"
in our case.
Spring批次FieldSetMapper
习俗FieldSetMapper
is needed to convert a Date. If no data type conversion is required, then only BeanWrapperFieldSetMapper
should be used to map the values by name automatically.
扩展的java类FieldSetMapper
is ReportFieldSetMapper
.
package com.journaldev.spring;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import com.journaldev.spring.model.Report;
public class ReportFieldSetMapper implements FieldSetMapper<Report> {
private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
public Report mapFieldSet(FieldSet fieldSet) throws BindException {
Report report = new Report();
report.setId(fieldSet.readInt(0));
report.setFirstName(fieldSet.readString(1));
report.setLastName(fieldSet.readString(2));
// default format yyyy-MM-dd
// fieldSet.readDate(4);
String date = fieldSet.readString(3);
try {
report.setDob(dateFormat.parse(date));
} catch (ParseException e) {
e.printStackTrace();
}
return report;
}
}
Spring批处理项目处理器
现在,正如作业配置中定义的那样,itemProcessor将在itemWriter之前被激发。我们创造了一个CustomItemProcessor.java
class for the same.
package com.journaldev.spring;
import org.springframework.batch.item.ItemProcessor;
import com.journaldev.spring.model.Report;
public class CustomItemProcessor implements ItemProcessor<Report, Report> {
public Report process(Report item) throws Exception {
System.out.println("Processing..." + item);
String fname = item.getFirstName();
String lname = item.getLastName();
item.setFirstName(fname.toUpperCase());
item.setLastName(lname.toUpperCase());
return item;
}
}
我们可以在ItemProcessor实现中操作数据,如您所见,我正在将名字和姓氏值转换为大写。
Spring配置文件
在spring批处理配置文件中,我们导入了两个附加的配置文件–;context.xml
and database.xml
.
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
https://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
<!-- stored job-meta in memory -->
<!--
<bean id="jobRepository"
>
<property name="transactionManager" ref="transactionManager" />
</bean>
-->
<!-- stored job-meta in database -->
<bean id="jobRepository"
>
<property name="dataSource" ref="dataSource" />
<property name="transactionManager" ref="transactionManager" />
<property name="databaseType" value="mysql" />
</bean>
<bean id="transactionManager"
/>
<bean id="jobLauncher"
>
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>
- 作业存储库jobsrepository负责将每个Java对象存储到其正确的springbatch元数据表中。
- 事务管理器它负责在提交间隔大小和处理的数据相等时提交事务。
- 作业启动程序–这是Spring批次的核心。此接口包含用于触发作业的run方法。
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:jdbc="https://www.springframework.org/schema/jdbc" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans-4.3.xsd
https://www.springframework.org/schema/jdbc
https://www.springframework.org/schema/jdbc/spring-jdbc-4.3.xsd">
<!-- connect to database -->
<bean id="dataSource"
>
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/Test" />
<property name="username" value="test" />
<property name="password" value="test123" />
</bean>
<bean id="transactionManager"
/>
<!-- create job-meta tables automatically -->
<!-- <jdbc:initialize-database data-source="dataSource"> <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql"
/> <jdbc:script location="org/springframework/batch/core/schema-mysql.sql"
/> </jdbc:initialize-database> -->
</beans>
springbatch使用一些元数据表来存储批处理作业信息。我们可以从spring批处理配置中创建它们,但是建议通过执行SQL文件来手动创建,正如您在上面的注解代码中看到的那样。从安全性的角度来看,spring的安全性更高。
Spring批处理表
Spring批处理表与用Java表示它们的域对象非常匹配。例如–;JobInstance、JobExecution、JobParameters和StepExecution分别映射到BATCH_JOB_INSTANCE、BATCH_JOB_EXECUTION、BATCH_JOB EXECUTION参数和BATCH_STEP_EXECUTION。
ExecutionContext映射到批处理作业执行上下文和批处理步骤执行上下文。
jobsrepository负责将每个java对象保存并存储到其正确的表中。 下面是每个元数据表的详细信息。
- 批处理作业实例:批处理作业实例表包含与作业实例相关的所有信息。
- 批处理作业执行参数:批处理作业执行参数表保存与JobParameters对象相关的所有信息。
- 批处理作业执行:批处理作业执行表保存与作业执行对象相关的数据。每次运行作业时都会添加一个新行。
- 批处理步骤执行:BATCH_STEP_EXECUTION表包含与StepExecution对象相关的所有信息。
- 批处理作业执行上下文:批处理作业执行上下文表保存与作业的执行上下文相关的数据。每个JobExecution只有一个Job ExecutionContext,它包含该特定作业执行所需的所有作业级别数据。此数据通常表示失败后必须检索的状态,以便JobInstance可以从失败的位置重新启动。
- 批处理步骤执行上下文:BATCH_STEP_EXECUTION上下文表保存与步骤的ExecutionContext相关的数据。每个StepExecution只有一个ExecutionContext,它包含特定步骤执行需要持久化的所有数据。此数据通常表示失败后必须检索的状态,以便JobInstance可以从失败的位置重新启动。
- 批处理作业执行:此表保存作业的数据执行顺序。
- 批处理步骤执行:此表保存用于步骤执行的序列的数据。
- 批处理作业:此表保存作业序列的数据,如果我们有多个作业,我们将得到多行。
Spring批量试验程序
我们的Spring批处理示例项目已经准备好了,最后一步是编写一个测试类,将其作为java程序执行。
package com.journaldev.spring;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class App {
public static void main(String[] args) {
String[] springConfig = { "spring/batch/jobs/job-batch-demo.xml" };
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("DemoJobXMLWriter");
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
try {
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("Exit Status : " + execution.getStatus());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Done");
context.close();
}
}
就像上面的程序一样运行。
<?xml version="1.0" encoding="UTF-8"?><report><record id="1001"><dob>2013-07-29T00:00:00+05:30</dob><firstname>TOM</firstname><lastname>MOODY</lastname></record><record id="1002"><dob>2013-07-30T00:00:00+05:30</dob><firstname>JOHN</firstname><lastname>PARKER</lastname></record><record id="1003"><dob>2013-07-31T00:00:00+05:30</dob><firstname>HENRY</firstname><lastname>WILLIAMS</lastname></record></report>
这就是Spring批处理的全部示例,您可以从下面的链接下载final project。
参考文献:官方指南