Job 생성
FlatFileConfiguration.java
package com.project.springbatch._47_reader_FlatFiles;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
/*
--job.name=flatFilesJob
*/
@RequiredArgsConstructor
@Configuration
public class FlatFilesConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job flatFilesJob() {
return jobBuilderFactory.get("flatFilesJob")
.start(flatFilesStep1())
.next(flatFilesStep2())
.build();
}
@Bean
public Step flatFilesStep1() {
return stepBuilderFactory.get("flatFilesStep1")
.<String, String>chunk(5)
.reader(itemReader())
.writer(items -> System.out.println("items = " + items))
.build();
}
@Bean
public Step flatFilesStep2() {
return stepBuilderFactory.get("flatFilesStep2")
.tasklet((contribution, chunkContext) -> {
System.out.println("step2 has executed");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public ItemReader itemReader(){
FlatFileItemReader<Customer> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new ClassPathResource("/item47/customer.csv"));
/* DefaultLineMapper */
DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new CustomerFieldSetMapper()); /* filedSetMapper */
itemReader.setLineMapper(lineMapper);
itemReader.setLinesToSkip(1); // 첫번째 row 건너뜀
return itemReader;
}
}
1) FlatFileIteamReader 사용 로직
@Bean
public ItemReader itemReader(){
FlatFileItemReader<Customer> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new ClassPathResource("/item47/customer.csv"));
/* DefaultLineMapper */
DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new CustomerFieldSetMapper()); /* filedSetMapper */
itemReader.setLineMapper(lineMapper);
itemReader.setLinesToSkip(1); // 첫번째 row 건너뜀
return itemReader;
}
▶ File 경로 설정
itemReader.setResource(new ClassPathResource("/item47/customer.csv"));
▶ FlatFileItemReader.setLineMapper() 메서드
Item Reader가 읽어온 Line에 대해서 어떻게 매핑할 것인지 설정한다.
public void setLineMapper(LineMapper<T> lineMapper) {
this.lineMapper = lineMapper;
}
▶ FlatFileItemReader.setLinesToSkip(1) 메서드
linesToSkip 매개변수로 전달된 숫자까지의 행을 모두 skip 처리한다.
1이라면 첫행이 제외되고, 3이라면 1,2,3행 모두 제외된다.
public void setLinesToSkip(int linesToSkip) {
this.linesToSkip = linesToSkip;
}
resources/item47/customer.csv
name,age,year
user1,30,2000
user2,29,2001
user3,28,2002
user4,27,2003
user5,26,2004
user6,25,2005
user7,24,2006
user8,23,2007
user9,22,2008
user10,21,2009
DefaultLineMapper.java
package com.project.springbatch._47_reader_FlatFiles;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.batch.item.file.transform.LineTokenizer;
public class DefaultLineMapper<T> implements LineMapper<T> {
private LineTokenizer tokenizer;
private FieldSetMapper<T> fieldSetMapper;
/**
* line 하나, lineNumber 인자로 받는다.
* @param line
* @param lineNumber
* @return
* @throws Exception
*/
@Override
public T mapLine(String line, int lineNumber) throws Exception {
FieldSet fieldSet = tokenizer.tokenize(line); // tokenizer : DelimitedLineTokenizer (FlatFilesConfiguration 에서 설정)
return fieldSetMapper.mapFieldSet(fieldSet);
}
public void setLineTokenizer(LineTokenizer tokenizer) {
this.tokenizer = tokenizer;
}
public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
this.fieldSetMapper = fieldSetMapper;
}
}
다시 FlatFilesConfiguration.java 코드를 보자.
@Bean
public ItemReader itemReader(){
FlatFileItemReader<Customer> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new ClassPathResource("/item47/customer.csv"));
/* DefaultLineMapper */
DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new CustomerFieldSetMapper()); /* filedSetMapper */
...
}
1) setLineTokenizer()
하나의 Line에 대해서 어떻게 데이터를 가져올지 설정한다.
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
▶ DelimitedLineTokenizer()
public DelimitedLineTokenizer() {
this(DELIMITER_COMMA);
}
,로 구분하여 하나의 Line에서 데이터를 가져온다.
user1,30,2000
-- user1
-- 30
-- 2000
2) setFieldSetMapper()
우리가 생성한 CustomerFieldSetMapper 클래스로 설정했다. 해당 클래스는 아래에서 분석해보자.
lineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
3) mapLine() 구현 메서드
@Override
public T mapLine(String line, int lineNumber) throws Exception {
FieldSet fieldSet = tokenizer.tokenize(line); // tokenizer : DelimitedLineTokenizer (FlatFilesConfiguration 에서 설정)
return fieldSetMapper.mapFieldSet(fieldSet);
}
▶ tokenizer.tokenize(line)
tokenizer는 현재 DelimitedLineTokenizer 타입이다.
DelimitedLineTokenizer.java 는 AbstractLineTokenizer.java 를 구현하고있다.
▶ AbstractLineTokenizer.java > tokenize()
@Override
public FieldSet tokenize(@Nullable String line) {
if (line == null) {
line = "";
}
List<String> tokens = new ArrayList<>(doTokenize(line));
// if names are set and strict flag is false
if ( ( names.length != 0 ) && ( ! strict ) ) {
adjustTokenCountIfNecessary( tokens );
}
String[] values = tokens.toArray(new String[tokens.size()]);
if (names.length == 0) {
return fieldSetFactory.create(values);
}
else if (values.length != names.length) {
throw new IncorrectTokenCountException(names.length, values.length, line);
}
return fieldSetFactory.create(values, names);
}
해당 메서드 호출로, DelimitedLineTokenizer.doTokenize()를 호출한다.
List<String> tokens = new ArrayList<>(doTokenize(line));
CustomerFieldSetMapper.java
package com.project.springbatch._47_reader_FlatFiles;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
@Override
public Customer mapFieldSet(FieldSet fs) {
if (fs == null) {
return null;
}
Customer customer = new Customer();
customer.setName(fs.readString(0));
customer.setAge(fs.readInt(1));
customer.setYear(fs.readString(2));
return customer;
}
}
1) fs.readString(index)
▶ FieldSet.java
String readString(int index); -- 현재는 이 메서드를 호출!
String readString(String name);
▶ DelimitedLineTokenizer() 로 인해서 fs는 아래와 같이 데이터가 들어있다.
user1,30,2000
-- user1
-- 30
-- 2000
따라서 결과는 아래와 같다.
method | value |
fs.readString(0) | user1 |
fs.readString(1) | 30 |
fs.readString(2) | 2000 |
▶ fs.readString() 메서드 로직
DefaultFieldSet.readAndTrim() 메서드를 수행한다.
protected String readAndTrim(int index) {
String value = tokens[index];
if (value != null) {
return value.trim();
}
else {
return null;
}
}
디버깅
1) FlatFilesConfiguiration.itemReader()
2) itemReader에 설정된 정보
3) DefaultLineMapper.mapLine()
DelimitedLineTokenizer.java > doTokenize() 메서드 수행 후
4) CustomerFieldSetMapper.mapFieldSet()
▶ 위 메서드 수행 후 customer 객체 모습
5) 위 과정을 chunk size만큼 반복하여 write()가 호출된다.
items = [
Customer(name=user2, age=29, year=2001),
Customer(name=user3, age=28, year=2002),
Customer(name=user4, age=27, year=2003),
Customer(name=user5, age=26, year=2004),
Customer(name=user6, age=25, year=2005)
]
6) 두번째 cycle에서의 wirte() 수행 결과
items = [
Customer(name=user7, age=24, year=2006),
Customer(name=user8, age=23, year=2007),
Customer(name=user9, age=22, year=2008),
Customer(name=user10, age=21, year=2009)
]
오류 발생시 해당 row 패스하기
public FlatFileItemReader itemReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("flatFile")
.resource(new ClassPathResource("customerException.txt"))
.fieldSetMapper(new BeanWrapperFieldSetMapper<>())
.targetType(Customer.class)
.linesToSkip(1)
.fixedLength()
/* 예외를 잡지 않겠다 */
.strict(false) // 예외가 발생하지 않고 오류가 있던 row 는 pass 된다.
.addColumns(new Range(1,5))
.addColumns(new Range(6,9))
.addColumns(new Range(10,11))
.names("name","year","age")
.build();
}
하나의 Line을 기준으로 처리되는 FlatFileItemReader 에서는 strict() 메서드를 사용해서 현재 Line에 이슈가 발생했다면, 예외를 발생시키지 않고 오류가 있는 현재 Line을 패스한다.