In this tutorial, we will see AsyncItemProcessor Spring Batch Example using MySql. We will read data from the MySQL database and write it to a CSV file using Async ItemProcessor.
Consider we have few records in DataBase as below.
In this AsyncItemProcessor Spring Batch example, We will read data using JpaPagingItemReader and also sort on the basis of id and then write to excel
Before going ahead let’s create a student table
CREATE TABLE student
(
id int NOT NULL PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(260),
roll_number VARCHAR(260)
);
Note – We need to add the below dependency in order to use AsyncItemProcessor.
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
The directory structure of the example
Define pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.javatute.com</groupId>
<artifactId>springbatchasyncitemprocessor</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<!-- Thanks for using https://jar-download.com -->
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
Define Student.java class
package com.springbatchexample.entity;
public class Student {
private int id;
private String name;
private String rollNumber;
public Student() {
}
public Student(int id, String name, String rollNumber) {
this.id = id;
this.name = name;
this.rollNumber = rollNumber;
}
//Getter-Setter
}
Create async reader
@Bean
public ItemReader<Student> asyncreader() {
JdbcPagingItemReaderBuilder jdbcPagingItemReaderBuilder = new JdbcPagingItemReaderBuilder();
jdbcPagingItemReaderBuilder.name("reader");
jdbcPagingItemReaderBuilder.dataSource(dataSource);
jdbcPagingItemReaderBuilder.selectClause("select id, roll_number, name ");
jdbcPagingItemReaderBuilder.fromClause("FROM student ");
jdbcPagingItemReaderBuilder.sortKeys(Collections.singletonMap("id", Order.ASCENDING));
jdbcPagingItemReaderBuilder.rowMapper(new StudentResultRowMapper());
return jdbcPagingItemReaderBuilder.build();
}
Create asyncItemProcessor
@Bean
public ItemProcessor<Student, Student> studentItemProcessor() {
return new StudentItemProcessor();
}
@Bean
public ItemProcessor<Student, Future<Student>> asyncItemProcessor() {
AsyncItemProcessor<Student, Student> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(studentItemProcessor());
asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
return asyncItemProcessor;
}
Create asyncWriter
@Bean
public ItemWriter<Future<Student>> asyncItemWriter() {
AsyncItemWriter<Student> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer());
return asyncItemWriter;
}
@Bean
public FlatFileItemWriter<Student> writer() {
FlatFileItemWriter<Student> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("C://data/batch/data.csv"));
writer.setLineAggregator(getDelimitedLineAggregator());
return writer;
}
private DelimitedLineAggregator<Student> getDelimitedLineAggregator() {
BeanWrapperFieldExtractor<Student> beanWrapperFieldExtractor = new BeanWrapperFieldExtractor<Student>();
beanWrapperFieldExtractor.setNames(new String[]{"id", "rollNumber", "name"});
DelimitedLineAggregator<Student> aggregator = new DelimitedLineAggregator<Student>();
aggregator.setDelimiter(",");
aggregator.setFieldExtractor(beanWrapperFieldExtractor);
return aggregator;
}
Create task executor
@Bean(name = "asyncExecutor")
public TaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(100);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("javatute 1-");
return executor;
}
Create Step and Job
@Bean
public Step asyncStep1() {
StepBuilder stepBuilder = stepBuilderFactory.get("asyncStep1");
SimpleStepBuilder<Student, Future<Student>> simpleStepBuilder = stepBuilder.chunk(10);
return simpleStepBuilder.reader(asyncreader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).build();
}
@Bean
public Job asyncJob() {
JobBuilder jobBuilder = jobBuilderFactory.get("asyncJob");
jobBuilder.incrementer(new RunIdIncrementer());
FlowJobBuilder flowJobBuilder = jobBuilder.flow(asyncStep1()).end();
Job job = flowJobBuilder.build();
return job;
}
We also need RowMapper
@Component
public class StudentResultRowMapper implements RowMapper<Student> {
@Override
public Student mapRow(ResultSet rs, int i) throws SQLException {
Student student = new Student();
student.setId(rs.getInt("id"));
student.setRollNumber(rs.getString("roll_number"));
student.setName(rs.getString("name"));
return student;
}
}
Let’s keep all configurations code together in the SpringBatchConfig file.
@EnableBatchProcessing
@Configuration
public class SpringBatchConfig {
@Autowired
private DataSource dataSource;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public ItemReader<Student> asyncreader() {
JdbcPagingItemReaderBuilder jdbcPagingItemReaderBuilder = new JdbcPagingItemReaderBuilder();
jdbcPagingItemReaderBuilder.name("reader");
jdbcPagingItemReaderBuilder.dataSource(dataSource);
jdbcPagingItemReaderBuilder.selectClause("select id, roll_number, name ");
jdbcPagingItemReaderBuilder.fromClause("FROM student ");
jdbcPagingItemReaderBuilder.sortKeys(Collections.singletonMap("id", Order.ASCENDING));
jdbcPagingItemReaderBuilder.rowMapper(new StudentResultRowMapper());
return jdbcPagingItemReaderBuilder.build();
}
@Bean
public ItemProcessor<Student, Student> studentItemProcessor() {
return new StudentItemProcessor();
}
@Bean
public ItemProcessor<Student, Future<Student>> asyncItemProcessor() {
AsyncItemProcessor<Student, Student> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(studentItemProcessor());
asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
return asyncItemProcessor;
}
@Bean
public ItemWriter<Future<Student>> asyncItemWriter() {
AsyncItemWriter<Student> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer());
return asyncItemWriter;
}
@Bean
public FlatFileItemWriter<Student> writer() {
FlatFileItemWriter<Student> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("C://data/batch/data.csv"));
writer.setLineAggregator(getDelimitedLineAggregator());
return writer;
}
private DelimitedLineAggregator<Student> getDelimitedLineAggregator() {
BeanWrapperFieldExtractor<Student> beanWrapperFieldExtractor = new BeanWrapperFieldExtractor<Student>();
beanWrapperFieldExtractor.setNames(new String[]{"id", "rollNumber", "name"});
DelimitedLineAggregator<Student> aggregator = new DelimitedLineAggregator<Student>();
aggregator.setDelimiter(",");
aggregator.setFieldExtractor(beanWrapperFieldExtractor);
return aggregator;
}
@Bean(name = "asyncExecutor")
public TaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(100);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("javatute 1-");
return executor;
}
@Bean
public Step asyncStep1() {
StepBuilder stepBuilder = stepBuilderFactory.get("asyncStep1");
SimpleStepBuilder<Student, Future<Student>> simpleStepBuilder = stepBuilder.chunk(10);
return simpleStepBuilder.reader(asyncreader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).build();
}
@Bean
public Job asyncJob() {
JobBuilder jobBuilder = jobBuilderFactory.get("asyncJob");
jobBuilder.incrementer(new RunIdIncrementer());
FlowJobBuilder flowJobBuilder = jobBuilder.flow(asyncStep1()).end();
Job job = flowJobBuilder.build();
return job;
}
}
Define SpringMain.class
package com.springbatchexample.main;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan(basePackages = "com.springbatchexample.*")
public class SpringMain {
public static void main(String[] args) {
SpringApplication.run(SpringMain.class, args);
}
}
application.properties
spring.datasource.url=jdbc:mysql://localhost:3306/springbootcrudexample
spring.datasource.username=root
spring.datasource.password=root
spring.jpa.hibernate.ddl-auto=create
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.format_sql=true
server.port = 9091
spring.batch.initialize-schema=always
That’s all about AsyncItemProcessor Spring Batch Example.
Download code from github.
See Spring batch AsyncItemProcessor example docs.
Other Spring Batch Examples.
- Spring Batch FlatFileItemReader Example
- JpaPagingItemReader Example
- JdbcPagingItemReader spring batch example
- StaxEventItemReader Example
- JsonItemReader Spring Batch Example
- JdbcCursorItemReader Spring Batch Example
- CompositeItemProcessor Spring Batch Example
- Spring Batch ItemReader Example
- StoredProcedureItemReader example