[SpringBatch 실습] 24. FlatFileItemReader (.csv 파일 읽어와서 데이터 출력하는 배치 생성)

반응형
728x90
반응형

Job 생성

FlatFileConfiguration.java
package com.project.springbatch._47_reader_FlatFiles;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

/*
--job.name=flatFilesJob
 */
@RequiredArgsConstructor
@Configuration
public class FlatFilesConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job flatFilesJob() {
        return jobBuilderFactory.get("flatFilesJob")
                .start(flatFilesStep1())
                .next(flatFilesStep2())
                .build();
    }

    @Bean
    public Step flatFilesStep1() {
        return stepBuilderFactory.get("flatFilesStep1")
                .<String, String>chunk(5)
                .reader(itemReader())
                .writer(items -> System.out.println("items = " + items))
                .build();
    }

    @Bean
    public Step flatFilesStep2() {
        return stepBuilderFactory.get("flatFilesStep2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2 has executed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public ItemReader itemReader(){

        FlatFileItemReader<Customer> itemReader = new FlatFileItemReader<>();
        itemReader.setResource(new ClassPathResource("/item47/customer.csv"));

        /* DefaultLineMapper */
        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
        lineMapper.setFieldSetMapper(new CustomerFieldSetMapper()); /* filedSetMapper */

        itemReader.setLineMapper(lineMapper);
        itemReader.setLinesToSkip(1); // 첫번째 row 건너뜀

        return itemReader;
    }
}

1) FlatFileIteamReader 사용 로직

@Bean
public ItemReader itemReader(){

    FlatFileItemReader<Customer> itemReader = new FlatFileItemReader<>();
    itemReader.setResource(new ClassPathResource("/item47/customer.csv"));

    /* DefaultLineMapper */
    DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
    lineMapper.setFieldSetMapper(new CustomerFieldSetMapper()); /* filedSetMapper */

    itemReader.setLineMapper(lineMapper);
    itemReader.setLinesToSkip(1); // 첫번째 row 건너뜀

    return itemReader;
}

File 경로 설정

itemReader.setResource(new ClassPathResource("/item47/customer.csv"));

 FlatFileItemReader.setLineMapper() 메서드

Item Reader가 읽어온 Line에 대해서 어떻게 매핑할 것인지 설정한다.

public void setLineMapper(LineMapper<T> lineMapper) {
   this.lineMapper = lineMapper;
}

 

 FlatFileItemReader.setLinesToSkip(1) 메서드

linesToSkip 매개변수로 전달된 숫자까지의 행을 모두 skip 처리한다.

1이라면 첫행이 제외되고, 3이라면 1,2,3행 모두 제외된다.

public void setLinesToSkip(int linesToSkip) {
   this.linesToSkip = linesToSkip;
}

 

resources/item47/customer.csv
name,age,year
user1,30,2000
user2,29,2001
user3,28,2002
user4,27,2003
user5,26,2004
user6,25,2005
user7,24,2006
user8,23,2007
user9,22,2008
user10,21,2009

 

DefaultLineMapper.java
package com.project.springbatch._47_reader_FlatFiles;

import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.batch.item.file.transform.LineTokenizer;

public class DefaultLineMapper<T> implements LineMapper<T> {

    private LineTokenizer tokenizer;
    private FieldSetMapper<T> fieldSetMapper;


    /**
     * line 하나, lineNumber 인자로 받는다.
     * @param line
     * @param lineNumber
     * @return
     * @throws Exception
     */
    @Override
    public T mapLine(String line, int lineNumber) throws Exception {
        FieldSet fieldSet = tokenizer.tokenize(line); // tokenizer : DelimitedLineTokenizer (FlatFilesConfiguration 에서 설정)
        return fieldSetMapper.mapFieldSet(fieldSet);
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }

}

다시 FlatFilesConfiguration.java 코드를 보자.

@Bean
public ItemReader itemReader(){
    FlatFileItemReader<Customer> itemReader = new FlatFileItemReader<>();
    itemReader.setResource(new ClassPathResource("/item47/customer.csv"));

    /* DefaultLineMapper */
    DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
    lineMapper.setFieldSetMapper(new CustomerFieldSetMapper()); /* filedSetMapper */

    ...
}

1) setLineTokenizer()

하나의 Line에 대해서 어떻게 데이터를 가져올지 설정한다.

lineMapper.setLineTokenizer(new DelimitedLineTokenizer());

▶ DelimitedLineTokenizer()

public DelimitedLineTokenizer() {
   this(DELIMITER_COMMA);
}

,로 구분하여 하나의 Line에서 데이터를 가져온다.

user1,30,2000
-- user1
-- 30
-- 2000

 

2) setFieldSetMapper()

우리가 생성한 CustomerFieldSetMapper 클래스로 설정했다. 해당 클래스는 아래에서 분석해보자.

lineMapper.setFieldSetMapper(new CustomerFieldSetMapper());

 

3) mapLine() 구현 메서드

@Override
public T mapLine(String line, int lineNumber) throws Exception {
    FieldSet fieldSet = tokenizer.tokenize(line); // tokenizer : DelimitedLineTokenizer (FlatFilesConfiguration 에서 설정)
    return fieldSetMapper.mapFieldSet(fieldSet);
}

 tokenizer.tokenize(line)

tokenizer는 현재 DelimitedLineTokenizer 타입이다.

DelimitedLineTokenizer.java 는 AbstractLineTokenizer.java 를 구현하고있다.

 

▶ AbstractLineTokenizer.java > tokenize()

@Override
public FieldSet tokenize(@Nullable String line) {

   if (line == null) {
      line = "";
   }

   List<String> tokens = new ArrayList<>(doTokenize(line));
   
   // if names are set and strict flag is false
   if ( ( names.length != 0 ) && ( ! strict ) ) {
      adjustTokenCountIfNecessary( tokens );
   }
   
   String[] values = tokens.toArray(new String[tokens.size()]);

   if (names.length == 0) {
      return fieldSetFactory.create(values);
   }
   else if (values.length != names.length) {
      throw new IncorrectTokenCountException(names.length, values.length, line);
   }
   return fieldSetFactory.create(values, names);
}

 

해당 메서드 호출로, DelimitedLineTokenizer.doTokenize()를 호출한다.

List<String> tokens = new ArrayList<>(doTokenize(line));

 

CustomerFieldSetMapper.java
package com.project.springbatch._47_reader_FlatFiles;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;

public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {

    @Override
    public Customer mapFieldSet(FieldSet fs) {

        if (fs == null) {
            return null;
        }

        Customer customer = new Customer();
        customer.setName(fs.readString(0));
        customer.setAge(fs.readInt(1));
        customer.setYear(fs.readString(2));

        return customer;
    }
}

1) fs.readString(index)

▶ FieldSet.java 

String readString(int index); -- 현재는 이 메서드를 호출!
String readString(String name);

 

▶ DelimitedLineTokenizer() 로 인해서 fs는 아래와 같이 데이터가 들어있다.

user1,30,2000
-- user1
-- 30
-- 2000

따라서 결과는 아래와 같다.

method value
fs.readString(0) user1
fs.readString(1) 30
fs.readString(2) 2000

 

▶ fs.readString() 메서드 로직

DefaultFieldSet.readAndTrim() 메서드를 수행한다.

protected String readAndTrim(int index) {
   String value = tokens[index];

   if (value != null) {
      return value.trim();
   }
   else {
      return null;
   }
}

 

 

디버깅

1) FlatFilesConfiguiration.itemReader()

 

2) itemReader에 설정된 정보

 

3) DefaultLineMapper.mapLine()

DelimitedLineTokenizer.java > doTokenize() 메서드 수행 후 

 

4) CustomerFieldSetMapper.mapFieldSet()

▶ 위 메서드 수행 후 customer 객체 모습

 

5) 위 과정을 chunk size만큼 반복하여 write()가 호출된다.

items = [
    Customer(name=user2, age=29, year=2001), 
    Customer(name=user3, age=28, year=2002), 
    Customer(name=user4, age=27, year=2003), 
    Customer(name=user5, age=26, year=2004), 
    Customer(name=user6, age=25, year=2005)
]

 

6) 두번째 cycle에서의 wirte() 수행 결과

items = [
    Customer(name=user7, age=24, year=2006), 
    Customer(name=user8, age=23, year=2007), 
    Customer(name=user9, age=22, year=2008), 
    Customer(name=user10, age=21, year=2009)
]

 

 

오류 발생시 해당 row 패스하기

public FlatFileItemReader itemReader() {
    return new FlatFileItemReaderBuilder<Customer>()
            .name("flatFile")
            .resource(new ClassPathResource("customerException.txt"))
            .fieldSetMapper(new BeanWrapperFieldSetMapper<>())
            .targetType(Customer.class)
            .linesToSkip(1)
            .fixedLength()
            /* 예외를 잡지 않겠다 */
            .strict(false) // 예외가 발생하지 않고 오류가 있던 row 는 pass 된다.
            .addColumns(new Range(1,5))
            .addColumns(new Range(6,9))
            .addColumns(new Range(10,11))
            .names("name","year","age")
            .build();
}

하나의 Line을 기준으로 처리되는 FlatFileItemReader 에서는 strict() 메서드를 사용해서 현재 Line에 이슈가 발생했다면, 예외를 발생시키지 않고 오류가 있는 현재 Line을 패스한다.

 

 

반응형

Designed by JB FACTORY