3. 数据输入

3.1 ItemReader 概述

写个MyReader

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import java.util.Iterator;
import java.util.List;

public class MyReader implements ItemReader<String> {

    private Iterator<String> iterator;

    public MyReader(List<String> list) {
        this.iterator = list.iterator();
    }

    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        // 默认一个一个读数据
        if (iterator.hasNext()) {
            return this.iterator.next();
        } else {
            return null;
        }
    }

}

在Step使用reader()writer()

import com.sb.springbatch.listener.MyJobListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

@Configuration
public class ItemReaderDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job itemReaderDemoJob() {
        return jobBuilderFactory.get("itemReaderDemoJob")
                .start(itemReaderDemoStep())
                .listener(new MyJobListener())
                .build();
    }

    @Bean
    public Step itemReaderDemoStep() {
        return stepBuilderFactory.get("itemReaderDemoStep")
                .chunk(2) // 每读2个才处理
                .reader(itemReaderDemoRead())
                .writer(list -> {
                    for (Object item : list) {
                        System.out.println(item + "...");
                    }
                }).build();
    }

    @Bean
    public MyReader itemReaderDemoRead() {
        List<String> data = Arrays.asList("鼠", "牛", "虎", "兔");
        return new MyReader(data);
    }

}

console(鼠牛一批输出,虎兔一批输出)

2023-03-30 16:20:50.224  INFO 5740 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=itemReaderDemoJob]] launched with the following parameters: [{}]
itemReaderDemoJob>before<
2023-03-30 16:20:50.249  INFO 5740 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [itemReaderDemoStep]
...
...
...
...
2023-03-30 16:20:50.274  INFO 5740 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [itemReaderDemoStep] executed in 25ms
itemReaderDemoJob>after<
2023-03-30 16:20:50.282  INFO 5740 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=itemReaderDemoJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 45ms
2023-03-30 16:20:50.286  INFO 5740 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-03-30 16:20:50.289  INFO 5740 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

Process finished with exit code 0

3.2 从数据库中读取数据

JdbcPagingltemReader

先在数据库中建个user表

CREATE TABLE USER (
`id` INT PRIMARY KEY AUTO_INCREMENT,
`username` VARCHAR(30),
`password` VARCHAR(20),
`age` INT
) ENGINE = INNODB CHARSET = utf8mb4 COMMENT '用户表';

mock点假数据

INSERT INTO USER
( `id`, `username`, `password`, `age` )
VALUES
( 1, "lisi", "123", 23 );

INSERT INTO USER
( `username`, `password`, `age` )
VALUES
( "wangwu", "456", 21 ),
( "zhaoliu", "666", 26 ),
( "xiaohong", "777", 24 ),
( "xiaoming", "999", 27 );

User.java

import lombok.Data;

@Data
public class User {
    private Integer id;
    private String username;
    private String password;
    private int age;
}

ItemReaderDbDemo.java

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.RowMapper;

import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableBatchProcessing
public class ItemReaderDbDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Autowired
    @Qualifier("dbJdbcWriter")
    private ItemWriter<? super User> dbJdbcWriter;

    @Bean
    public Job ItemReaderDbDemoJob() {
        return jobBuilderFactory.get("itemReaderDbDemoJob")
                .start(itemReaderDbStep())
                .build();
    }

    @Bean
    public Step itemReaderDbStep() {
        return stepBuilderFactory.get("itemReaderDemoStep")
                .<User, User>chunk(2)
                .reader(dbJdbcReader())
                .writer(dbJdbcWriter)
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<User> dbJdbcReader() {
        JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<User>();
        reader.setDataSource(dataSource);
        reader.setFetchSize(2);
        // 把读取到的记录 转换成 User对象
        reader.setRowMapper(new RowMapper<User>() {
            // 结果集的映射
            @Override
            public User mapRow(ResultSet resultSet, int rowNum) throws SQLException {
                User user = new User();
                user.setId(resultSet.getInt("id"));
                user.setUsername(resultSet.getString("username"));
                user.setPassword(resultSet.getString("password"));
                user.setAge(resultSet.getInt("age"));
                return user;
            }
        });

        // 指定 sq1语句
        MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
        provider.setSelectClause("id,username,password,age");
        provider.setFromClause("from user");
        // 指定 根据哪个字段排序
        Map<String, Order> sort = new HashMap<>(1);
        sort.put("id", Order.ASCENDING);
        provider.setSortKeys(sort);

        reader.setQueryProvider(provider);
        return reader;
    }
}

DbJdbcWriter.java(自定义的ItemWriter)

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import java.util.List;

@Component("dbJdbcWriter")
public class DbJdbcWriter implements ItemWriter<User> {
    @Override
    public void write(List<? extends User> list) throws Exception {
        System.out.println("FUCK");
        list.forEach(System.out::println);
    }
}

console

2023-04-02 13:06:03.452  INFO 23480 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=itemReaderDbDemoJob]] launched with the following parameters: [{}]
2023-04-02 13:06:03.477  INFO 23480 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [itemReaderDemoStep]
FUCK
User(id=1, username=lisi, password=123, age=23)
User(id=2, username=wangwu, password=456, age=21)
FUCK
User(id=3, username=zhaoliu, password=666, age=26)
User(id=4, username=xiaohong, password=777, age=24)
FUCK
User(id=5, username=xiaoming, password=999, age=27)
2023-04-02 13:06:03.517  INFO 23480 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [itemReaderDemoStep] executed in 40ms
2023-04-02 13:06:03.525  INFO 23480 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=itemReaderDbDemoJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 60ms
2023-04-02 13:06:03.529  INFO 23480 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-04-02 13:06:03.532  INFO 23480 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

3.3 从普通文件中读取数据

FlatFileItemReader

customer.txt(/resources下)

id,firstName,lastName,birthday
1,Stone,Barrett,1964-10-19 14:11:03
2,Raymond,Pace,1977-12-11 21 :44:30
3,Armando,Logan,1986-12-25 11:54:28
4,Latifah,Barnett,1959-07-24 06:00:16
5,Cassandra,Moses,1956-09-14 06:49:28
6,Audra,Hopkins,1984-08-30 04:18:10
7,Upton,Morrow,1973-82-04 05:26:05
8,Melodie,Velasquez,1953-04-26 11:16:26
9,Sybill,Nolan,1951-06-24 14:56:51
10,Glenna,Little,1953-08-27 13:15:08
11,Ingrid,Jackson,1957-09-05 21:36:47
12,Duncan,Castaneda,1979-01 21 18:31:27
13,Xaviera,Gillespie,1965-07-18 15:05:22
14,Rhoda,Lancaster,1990-09-11 15:52:54
15,Fatima,Combs,1979-06-01 06: 58: 54

Customer.java(提取出的实体类)

import lombok.Data;

@Data
public class Customer {
    private Long id;
    private String firstName;
    private String lastName;
    private String birthday;
}

FileItemReaderDemo.java

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.validation.BindException;

@Configuration
@EnableBatchProcessing
public class FileItemReaderDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("fileItemWriter")
    private ItemWriter<? super Customer> fileItemWriter;

    @Bean
    public Job FileItemReaderDemo() {
        return jobBuilderFactory.get("FileItemReaderDemo")
                .start(FileItemReaderDemoStep())
                .build();
    }

    @Bean
    public Step FileItemReaderDemoStep() {
        return stepBuilderFactory.get("FileItemReaderDemoStep")
                .<Customer, Customer>chunk(5)
                .reader(fileItemReader())
                .writer(fileItemWriter)
                .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<Customer> fileItemReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>();
        reader.setResource(new ClassPathResource("customer.txt"));
        reader.setLinesToSkip(1); // 跳过第一行

        // 数据解析 使用 DelimitedLineTokenizer (这个类用 "," 作为分隔符)
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
        // 把解析出的一个数据 映射为 Customer对象
        DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>();
        mapper.setLineTokenizer(tokenizer);
        // 映射
        mapper.setFieldSetMapper(fieldSet -> {
            Customer customer = new Customer();
            customer.setId(fieldSet.readLong("id"));
            customer.setFirstName(fieldSet.readString("firstName"));
            customer.setLastName(fieldSet.readString("lastName"));
            customer.setBirthday(fieldSet.readString("birthday"));
            return customer;
        });

        mapper.afterPropertiesSet();
        reader.setLineMapper(mapper);
        return reader;
    }
}

FileFileItemWriter.java(自定义的ItemWriter)

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import java.util.List;

@Component("fileItemWriter")
public class fileItemWriter implements ItemWriter<Customer> {
    @Override
    public void write(List<? extends Customer> list) throws Exception {
        list.forEach(System.out::println);
    }
}

console

2023-04-02 13:41:08.956  INFO 22752 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=FileItemReaderDemo]] launched with the following parameters: [{}]
2023-04-02 13:41:08.976  INFO 22752 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [FileItemReaderDemoStep]
Customer(id=1, firstName=Stone, lastName=Barrett, birthday=1964-10-19 14:11:03)
Customer(id=2, firstName=Raymond, lastName=Pace, birthday=1977-12-11 21 :44:30)
Customer(id=3, firstName=Armando, lastName=Logan, birthday=1986-12-25 11:54:28)
Customer(id=4, firstName=Latifah, lastName=Barnett, birthday=1959-07-24 06:00:16)
Customer(id=5, firstName=Cassandra, lastName=Moses, birthday=1956-09-14 06:49:28)
Customer(id=6, firstName=Audra, lastName=Hopkins, birthday=1984-08-30 04:18:10)
Customer(id=7, firstName=Upton, lastName=Morrow, birthday=1973-82-04 05:26:05)
Customer(id=8, firstName=Melodie, lastName=Velasquez, birthday=1953-04-26 11:16:26)
Customer(id=9, firstName=Sybill, lastName=Nolan, birthday=1951-06-24 14:56:51)
Customer(id=10, firstName=Glenna, lastName=Little, birthday=1953-08-27 13:15:08)
Customer(id=11, firstName=Ingrid, lastName=Jackson, birthday=1957-09-05 21:36:47)
Customer(id=12, firstName=Duncan, lastName=Castaneda, birthday=1979-01 21 18:31:27)
Customer(id=13, firstName=Xaviera, lastName=Gillespie, birthday=1965-07-18 15:05:22)
Customer(id=14, firstName=Rhoda, lastName=Lancaster, birthday=1990-09-11 15:52:54)
Customer(id=15, firstName=Fatima, lastName=Combs, birthday=1979-06-01 06: 58: 54)
2023-04-02 13:41:09.043  INFO 22752 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [FileItemReaderDemoStep] executed in 66ms
2023-04-02 13:41:09.059  INFO 22752 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=FileItemReaderDemo]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 100ms
2023-04-02 13:41:09.064  INFO 22752 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-04-02 13:41:09.068  INFO 22752 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

3.4 从.xml文件中读取数据

使用StaxEvenItemReader

customer.xml(/resources下)

<?xml version="1.0" encoding="UTF-8" ?>
<customers>
	<customer>
		<id>1</id>
		<firstName>Mufutau</firstName>
		<lastName>Maddox</lastName>
		<birthday>2017-06-05 19:43:51PM</birthday>
	</customer>
	<customer>
		<id>2</id>
		<firstName>Brenden</firstName>
		<lastName>Cobb</lastName>
		<birthday>2017-01-06 13:18:17PM</birthday>
	</customer>
	<customer>
		<id>3</id>
		<firstName>Kerry</firstName>
		<lastName>Joseph</lastName>
		<birthday>2016-09-15 18:32:33PM</birthday>
	</customer>
	<customer>
		<id>4</id>
		<firstName>asdasd</firstName>
		<lastName>Joseph</lastName>
		<birthday>2016-09-15 18:32:33PM</birthday>
	</customer>
	<customer>
		<id>5</id>
		<firstName>JOJO5</firstName>
		<lastName>Jobana</lastName>
		<birthday>2016-09-15 18:32:33PM</birthday>
	</customer>
	<customer>
		<id>6</id>
		<firstName>XuLun</firstName>
		<lastName>JoTaiLang</lastName>
		<birthday>2046-09-15 18:32:33PM</birthday>
	</customer>
	<customer>
		<id>7</id>
		<firstName>JaiLuo</firstName>
		<lastName>JieBeiLing</lastName>
		<birthday>2077-09-15 18:32:33PM</birthday>
	</customer>
</customers>

pom.xml 中添加如下依赖

<!-- xml相关 -->
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
	<groupId>com.thoughtworks.xstream</groupId>
	<artifactId>xstream</artifactId>
	<version>1.4.7</version>
</dependency>

Customer.java(提取出的实体类)

import lombok.Data;

@Data
public class Customer {
    private Long id;
    private String firstName;
    private String lastName;
    private String birthday;
}

XmlItemReaderDemo.java

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.xml.StaxEventItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.oxm.xstream.XStreamMarshaller;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableBatchProcessing
public class XmlItemReaderDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("xmlItemWriter")
    private ItemWriter<? super Customer> xmlItemWriter;

    @Bean
    public Job xmlItemReaderDemoJob() {
        return jobBuilderFactory.get("xmlItemReaderDemoJob")
                .start(xmlItemReaderDemoStep())
                .build();
    }

    @Bean
    public Step xmlItemReaderDemoStep() {
        return stepBuilderFactory.get("xmlItemReaderDemoStep")
                .<Customer, Customer>chunk(5)
                .reader(xmlItemReader())
                .writer(xmlItemWriter)
                .build();
    }

    @Bean
    @StepScope
    public StaxEventItemReader<Customer> xmlItemReader() {
        StaxEventItemReader<Customer> reader = new StaxEventItemReader<Customer>();
        reader.setResource(new ClassPathResource("customer.xml")); // 指定文件位置

        // 指定需要处理的根标签
        reader.setFragmentRootElementName("customer");
        // 把xml转成 对象
        XStreamMarshaller unmarshaller = new XStreamMarshaller();
        // 告诉unmarshaller 把xml转成什么类型
        Map<String, Class> map = new HashMap<>();
        map.put("customer", Customer.class);
        unmarshaller.setAliases(map);

        reader.setUnmarshaller(unmarshaller);
        return reader;
    }
}

XmlItemWriter.java(自定义的ItemWriter)

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import java.util.List;

@Component("xmlItemWriter")
public class XmlItemWriter implements ItemWriter<Customer> {
    @Override
    public void write(List<? extends Customer> list) throws Exception {
        System.out.println("--each chunk--");
        list.forEach(System.out::println);
    }
}

console

2023-04-02 21:57:31.013  INFO 11360 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=xmlItemReaderDemoJob]] launched with the following parameters: [{}]
2023-04-02 21:57:31.026  INFO 11360 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [xmlItemReaderDemoStep]
--each chunk--
Customer(id=1, firstName=Mufutau, lastName=Maddox, birthday=2017-06-05 19:43:51PM)
Customer(id=2, firstName=Brenden, lastName=Cobb, birthday=2017-01-06 13:18:17PM)
Customer(id=3, firstName=Kerry, lastName=Joseph, birthday=2016-09-15 18:32:33PM)
Customer(id=4, firstName=asdasd, lastName=Joseph, birthday=2016-09-15 18:32:33PM)
Customer(id=5, firstName=JOJO5, lastName=Jobana, birthday=2016-09-15 18:32:33PM)
--each chunk--
Customer(id=6, firstName=XuLun, lastName=JoTaiLang, birthday=2046-09-15 18:32:33PM)
Customer(id=7, firstName=JaiLuo, lastName=JieBeiLing, birthday=2077-09-15 18:32:33PM)
2023-04-02 21:57:31.115  INFO 11360 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [xmlItemReaderDemoStep] executed in 89ms
2023-04-02 21:57:31.124  INFO 11360 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=xmlItemReaderDemoJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 107ms
2023-04-02 21:57:31.127  INFO 11360 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-04-02 21:57:31.130  INFO 11360 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

3.5 从多个文件中读取数据

使用MultiResourceItemReader

file1.txt(/resources下)

1,Stone, Barrett, 1964-10-19 14:11:03
2,Raymond, Pace,1977-12-11 21:44:30
3,Armando, Logan,1986-12-25 11:54:28
4,Latifah, Barnett,1959-07-24 06:00:16
5,Cassandra, Moses,1956-09-14 06:49:28
6,Audra, Hopkins,1984-08-30 04:18:10
7,Upton, Morrow,1973-02-04 05:26:05
8,Melodie, Velasquez,1953-04-26 11:16:26
9,sybill, Nolan,1951-06-24 14:56:51
10,Glenna, Little, 1953-08-27 13:15:08

file2.txt(/resources下)

11,Ingrid, Jackson,1957-09-05 21:36:47.
12,Duncan, Castaneda,1979-01-21 18:31:27
13,Xaviera, Gillespie,1965-07-18 15:05:22
14,Rhoda, Lancaster,1990-09-11 15:52:54
15,Fatima, Combs,1979-06-01 06:58:54
16,Merri1l, Hopkins ,1990-07-02 17:36:35
17,Felicia, Vinson,1959-12-19 20:23:12
18,Hanae , Harvey, 1984-12-27 10:36:49
19,Ramona, Acosta,1962-06-23 20:03:40
20,Katelyn, Hammond ,1988-11-12 19:05:13

Customer.java(提取出的实体类)

import lombok.Data;

@Data
public class Customer {
    private Long id;
    private String firstName;
    private String lastName;
    private String birthday;
}

MultiFileItemReaderDemo.java

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.validation.BindException;

@Configuration
@EnableBatchProcessing
public class MultiFileItemReaderDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("multiFileWriter")
    private ItemWriter<? super Customer> multiFileWriter;

    @Value("classpath:/file*.txt")
    private Resource[] fileResources;

    @Bean
    public Job multiFileItemReaderDemoJob() {
        return jobBuilderFactory.get("multiFileItemReaderDemoJob")
                .start(multiFileItemReaderDemoStep())
                .build();
    }

    @Bean
    public Step multiFileItemReaderDemoStep() {
        return stepBuilderFactory.get("multiFileItemReaderDemoStep")
                .<Customer,Customer>chunk(5)
                .reader(multiFileReader())
                .writer(multiFileWriter)
                .build();
    }

    // multiFileReader 虽说是多文件读取 但其实是逐个读取单个文件
    @Bean
    @StepScope
    public MultiResourceItemReader<Customer> multiFileReader() {
        MultiResourceItemReader <Customer> reader = new MultiResourceItemReader<>();
        reader.setDelegate(fileItemReaderDemoReader());
        reader.setResources(fileResources);
        return reader;
    }

    // 单个文件读取
    @Bean
    public FlatFileItemReader<Customer> fileItemReaderDemoReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>();
        reader.setResource(new ClassPathResource("file1.txt"));

        // 数据解析
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
        // 把解析出的一个数据映射为 Customer对象
        DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>();
        mapper.setLineTokenizer(tokenizer);
        // 映射
        mapper.setFieldSetMapper(fieldSet -> {
            Customer customer = new Customer();
            customer.setId(fieldSet.readLong("id"));
            customer.setFirstName(fieldSet.readString("firstName"));
            customer.setLastName(fieldSet.readString("lastName"));
            customer.setBirthday(fieldSet.readString("birthday"));
            return customer;
        });
        mapper.afterPropertiesSet();

        reader.setLineMapper(mapper);
        return reader;
    }
}

MultiFileWriter.java(自定义的ItemWriter)

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import java.util.List;

@Component("multiFileWriter")
public class MultiFileWriter implements ItemWriter<Customer> {
    @Override
    public void write(List<? extends Customer> list) throws Exception {
        System.out.println("--each chunk--");
        list.forEach(System.out::println);
    }
}

console

2023-04-02 22:37:04.224  INFO 22736 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multiFileItemReaderDemoJob]] launched with the following parameters: [{}]
2023-04-02 22:37:04.237  INFO 22736 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [multiFileItemReaderDemoStep]
--each chunk--
Customer(id=1, firstName=Stone, lastName=Barrett, birthday=1964-10-19 14:11:03)
Customer(id=2, firstName=Raymond, lastName=Pace, birthday=1977-12-11 21 :44:30)
Customer(id=3, firstName=Armando, lastName=Logan, birthday=1986-12-25 11:54:28)
Customer(id=4, firstName=Latifah, lastName=Barnett, birthday=1959-07-24 06:00:16)
Customer(id=5, firstName=Cassandra, lastName=Moses, birthday=1956-09-14 06:49:28)
--each chunk--
Customer(id=6, firstName=Audra, lastName=Hopkins, birthday=1984-08-30 04:18:10)
Customer(id=7, firstName=Upton, lastName=Morrow, birthday=1973-82-04 05:26:05)
Customer(id=8, firstName=Melodie, lastName=Velasquez, birthday=1953-04-26 11:16:26)
Customer(id=9, firstName=Sybill, lastName=Nolan, birthday=1951-06-24 14:56:51)
Customer(id=10, firstName=Glenna, lastName=Little, birthday=1953-08-27 13:15:08)
--each chunk--
Customer(id=11, firstName=Ingrid, lastName=Jackson, birthday=1957-09-05 21:36:47)
Customer(id=12, firstName=Duncan, lastName=Castaneda, birthday=1979-01 21 18:31:27)
Customer(id=13, firstName=Xaviera, lastName=Gillespie, birthday=1965-07-18 15:05:22)
Customer(id=14, firstName=Rhoda, lastName=Lancaster, birthday=1990-09-11 15:52:54)
Customer(id=15, firstName=Fatima, lastName=Combs, birthday=1979-06-01 06: 58: 54)
--each chunk--
Customer(id=16, firstName=Merri1l, lastName=Hopkins, birthday=1990-07-02 17:36:35)
Customer(id=17, firstName=Felicia, lastName=Vinson, birthday=1959-12-19 20:23:12)
Customer(id=18, firstName=Hanae, lastName=Harvey, birthday=1984-12-27 10:36:49)
Customer(id=19, firstName=Ramona, lastName=Acosta, birthday=1962-06-23 20:03:40)
Customer(id=20, firstName=Katelyn, lastName=Hammond, birthday=1988-11-12 19:05:13)
2023-04-02 22:37:04.274  INFO 22736 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [multiFileItemReaderDemoStep] executed in 37ms
2023-04-02 22:37:04.282  INFO 22736 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multiFileItemReaderDemoJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 53ms
2023-04-02 22:37:04.307  INFO 22736 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-04-02 22:37:04.311  INFO 22736 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

3.6 ItemReader的异常处理和重启

千锋Java教程:18.ItemReader的异常处理和重启_哔哩哔哩_bilibili

ItemStreamReader取代原先的ItemReader作为reader的实现接口

如此一来就要实现其中的3个抽象方法:

  • open() 在每个ItemReader/ItemStream开启之前
  • update() 在每个chunk写数据之后(即每批更新之后)
  • close() 在每个ItemReader/ItemStream关闭之后

img

(补充:ItemWriter也有类似的扩充接口即ItemStreamWriter

restart.txt(/resources下)※注意 Wrongname在第11行

1,Stone,Barrett,1964-10-19 14:11:03
2,Raymond,Pace,1977-12-11 21 :44:30
3,Armando,Logan,1986-12-25 11:54:28
4,Latifah,Barnett,1959-07-24 06:00:16
5,Cassandra,Moses,1956-09-14 06:49:28
6,Audra,Hopkins,1984-08-30 04:18:10
7,Upton,Morrow,1973-82-04 05:26:05
8,Melodie,Velasquez,1953-04-26 11:16:26
9,Sybill,Nolan,1951-06-24 14:56:51
10,Glenna,Little,1953-08-27 13:15:08
11,Wrongname,Jackson,1957-09-05 21:36:47
12,Duncan,Castaneda,1979-01 21 18:31:27
13,Xaviera,Gillespie,1965-07-18 15:05:22
14,Rhoda,Lancaster,1990-09-11 15:52:54
15,Fatima,Combs,1979-06-01 06: 58: 54
16,Merri1l,Hopkins,1990-07-02 17:36:35
17,Felicia,Vinson,1959-12-19 20:23:12
18,Hanae,Harvey,1984-12-27 10:36:49
19,Ramona,Acosta,1962-06-23 20:03:40
20,Katelyn,Hammond,1988-11-12 19:05:13

Customer.java(提取出的实体类)

import lombok.Data;

@Data
public class Customer {
    private Long id;
    private String firstName;
    private String lastName;
    private String birthday;
}

RestartReader.java

import org.springframework.batch.item.*;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindException;

@Component("restartReader")
public class RestartReader implements ItemStreamReader<Customer> {

    private FlatFileItemReader<Customer> customerFlatFileItemReader = new FlatFileItemReader<>();
    private Long curLine = 0L;
    private boolean restart = false;
    private ExecutionContext executionContext;

    public RestartReader() {
        customerFlatFileItemReader.setResource(new ClassPathResource("restart.txt"));
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
        DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>();
        mapper.setLineTokenizer(tokenizer);
        mapper.setFieldSetMapper(new FieldSetMapper<Customer>() {
            @Override
            public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
                Customer customer = new Customer();
                customer.setId(fieldSet.readLong("id"));
                customer.setFirstName(fieldSet.readString("firstName"));
                customer.setLastName(fieldSet.readString("lastName"));
                customer.setBirthday(fieldSet.readString("birthday"));
                return customer;
            }
        });

        mapper.afterPropertiesSet();
        customerFlatFileItemReader.setLineMapper(mapper);
    }

    @Override
    public Customer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        Customer customer = null;
        this.curLine++;

        if (restart) {
            customerFlatFileItemReader.setLinesToSkip(this.curLine.intValue() - 1);
            restart = false;
            System.out.println("Start reading from line: " + this.curLine);
        }

        customerFlatFileItemReader.open(this.executionContext);
        customer = customerFlatFileItemReader.read();

        // 制造的异常: 当读到某一行的firstName是"Wrongname" 抛出一个异常
        if (customer != null && customer.getFirstName().equals("Wrongname")) {
            throw new RuntimeException("Something went wrong. Customer ID: " + customer.getId());
        }

        return customer;
    }


    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        this.executionContext = executionContext;
        if (executionContext.containsKey("curLine")) {
            this.curLine = executionContext.getLong("curLine");
            this.restart = true;
        } else {
            this.curLine = 0L;
            executionContext.put("curLine", this.curLine);
            System.out.println("Start reading from line " + (this.curLine + 1));
        }
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.put("curLine", this.curLine);
        System.out.println("Up to currentLine: " + this.curLine + " were done");
    }

    @Override
    public void close() throws ItemStreamException {

    }
}

RestartWriter.java

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import java.util.List;

@Component("restartWriter")
public class RestartWriter implements ItemWriter<Customer> {
    @Override
    public void write(List<? extends Customer> list) throws Exception {
        list.forEach(System.out::println);
    }
}

RestartDemo.java

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class RestartDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("restartReader")
    private ItemReader<? extends Customer> restartReader;

    @Autowired
    @Qualifier("restartWriter")
    private ItemWriter<? super Customer> restartWriter;

    @Bean
    public Job restartJob() {
        return jobBuilderFactory.get("restartJob")
                .start(restartStep())
                .build();
    }

    @Bean
    public Step restartStep() {
        return stepBuilderFactory.get("restartStep")
                .<Customer, Customer>chunk(5)
                .reader(restartReader)
                .writer(restartWriter)
                .build();
    }
}

控制台结果(遇到Wrongname即抛出异常)

2023-04-03 22:01:32.557  INFO 19784 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=restartJob]] launched with the following parameters: [{}]
2023-04-03 22:01:32.567  INFO 19784 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [restartStep]
Start reading from line 1
Up to currentLine: 0 were done
Customer(id=1, firstName=Stone, lastName=Barrett, birthday=1964-10-19 14:11:03)
Customer(id=2, firstName=Raymond, lastName=Pace, birthday=1977-12-11 21 :44:30)
Customer(id=3, firstName=Armando, lastName=Logan, birthday=1986-12-25 11:54:28)
Customer(id=4, firstName=Latifah, lastName=Barnett, birthday=1959-07-24 06:00:16)
Customer(id=5, firstName=Cassandra, lastName=Moses, birthday=1956-09-14 06:49:28)
Up to currentLine: 5 were done
Customer(id=6, firstName=Audra, lastName=Hopkins, birthday=1984-08-30 04:18:10)
Customer(id=7, firstName=Upton, lastName=Morrow, birthday=1973-82-04 05:26:05)
Customer(id=8, firstName=Melodie, lastName=Velasquez, birthday=1953-04-26 11:16:26)
Customer(id=9, firstName=Sybill, lastName=Nolan, birthday=1951-06-24 14:56:51)
Customer(id=10, firstName=Glenna, lastName=Little, birthday=1953-08-27 13:15:08)
Up to currentLine: 10 were done
2023-04-03 22:01:32.597 ERROR 19784 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step restartStep in job restartJob

java.lang.RuntimeException: Something went wrong. Customer ID: 11
...
2023-04-03 22:01:32.599  INFO 19784 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [restartStep] executed in 31ms
2023-04-03 22:01:32.609  INFO 19784 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=restartJob]] completed with the following parameters: [{}] and the following status: [FAILED] in 47ms
2023-04-03 22:01:32.613  INFO 19784 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-04-03 22:01:32.616  INFO 19784 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

控制台结果(把问题行Wrongname修正之后再跑)

2023-04-03 22:05:06.084  INFO 20748 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=restartJob]] launched with the following parameters: [{}]
2023-04-03 22:05:06.097  INFO 20748 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [restartStep]
Up to currentLine: 10 were done
Start reading from line: 11
Customer(id=11, firstName=Gu, lastName=Jackson, birthday=1957-09-05 21:36:47)
Customer(id=12, firstName=Duncan, lastName=Castaneda, birthday=1979-01 21 18:31:27)
Customer(id=13, firstName=Xaviera, lastName=Gillespie, birthday=1965-07-18 15:05:22)
Customer(id=14, firstName=Rhoda, lastName=Lancaster, birthday=1990-09-11 15:52:54)
Customer(id=15, firstName=Fatima, lastName=Combs, birthday=1979-06-01 06: 58: 54)
Up to currentLine: 15 were done
Customer(id=16, firstName=Merri1l, lastName=Hopkins, birthday=1990-07-02 17:36:35)
Customer(id=17, firstName=Felicia, lastName=Vinson, birthday=1959-12-19 20:23:12)
Customer(id=18, firstName=Hanae, lastName=Harvey, birthday=1984-12-27 10:36:49)
Customer(id=19, firstName=Ramona, lastName=Acosta, birthday=1962-06-23 20:03:40)
Customer(id=20, firstName=Katelyn, lastName=Hammond, birthday=1988-11-12 19:05:13)
Up to currentLine: 20 were done
Up to currentLine: 21 were done
2023-04-03 22:05:06.122  INFO 20748 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [restartStep] executed in 25ms
2023-04-03 22:05:06.130  INFO 20748 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=restartJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 43ms
2023-04-03 22:05:06.133  INFO 20748 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-04-03 22:05:06.135  INFO 20748 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.