[SpringBatch] ItemReader, ItemProcessor, ItemWriter 의 chunk 기반 배치 수행, ChunkOrientedTasklet에 대한 이해

반응형
728x90
반응형

기본개념

우선 ItemReader, ItemProcessor, ItemWriter 에 대해 간단하게 살펴보자.

 

ItemReader

다양한 타입의 입력 데이터를 읽어오는 인터페이스로, 배치 수행의 대상이 될 데이터를 담는다.

package org.springframework.batch.item;

import org.springframework.lang.Nullable;

public interface ItemReader<T> {
    @Nullable
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

 

ItemProcessor

ItemReader로 읽어온 데이터들을 가공하고 변환하는 과정을 수행한다. chunk 기반에서 생략이 가능한 단계다.

package org.springframework.batch.item;

import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;

public interface ItemProcessor<I, O> {
    @Nullable
    O process(@NonNull I var1) throws Exception;
}

 

ItemWriter

ItemReader 을 통해 읽어들인 데이터는 ItemProcessor을 통해 변환되는 과정을 거쳐, ItemWriter에서 저장된다. 

package org.springframework.batch.item;

import java.util.List;

public interface ItemWriter<T> {
    void write(List<? extends T> var1) throws Exception;
}

 

 

스프링 배치에서의 Step 실행 단위

Chunk 기반

- 하나의 큰 덩어리를 chunk size만큼 나눠서 반복 실행된다. 

- ItemReader, ItemProcessor, ItemWriter 를 사용하며 Chunk 기반 전용 Tasklet인 ChunkOrientedTasklet 구현체가 제공된다.

 

  • 예시코드
    @Bean
    public Step chunkStep1() {
        return stepBuilderFactory.get("chunkStep1")
                .<String, String>chunk(5)
                .reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5")))
                .processor((ItemProcessor<String, String>) item -> {
                    return "my" + item;
                })
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        System.out.println("items = " + items);
                    }
                })
                .build();
    }

 

 

Task 기반

- 위 Chunk 기반보다 단일 작업 기반으로 처리되는 것이 더 효율적인 경우 사용한다.

- 주로 Tasklet 구현체를 만들어서 사용한다.

 

  • 예시코드
    @Bean
    public Step chunkStep2() {
        return stepBuilderFactory.get("chunkStep2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("chunkJobConfiguration step2 was executed");
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }

 

 

 

Chunk

Chunk란, 여러 개의 아이템을 묶은 하나의 덩어리를 의미한다. Spring Batch에서는 하나의 아이템을 Chunk 단위의 덩어리로 만들고, Chunk 단위로 트랜잭션을 수행한다. 

 

  • Chunk 선언 방법
...
    @Bean
    public Step chunkStep1() {
        return stepBuilderFactory.get("chunkStep1")
                .<String, String>chunk(5)
                ...
                
    }
...

 

<I, O>chunk(5) 로 선언되어있다.

 

1) I
ItemReader 로 읽은 하나의 아이템을 CHunk에서 정한 개수만큼 반복해서 저장하는 타입이다.

2) O
ItemReader로부터 전달받는 Chunk<I>를 참조해서 ItemProcessor 에서 적절하게 가공한 후, ItemWriter에 전달하는 타입이다.

 

 

 

ChunkOrientedTasklet

스프링 배치에서 제공하는 Tasklet의 구현체다. Chunk 지향 프로세싱을 담당하는 객체로, ItemReader, ItemWriter, ItemProcessor 를 사용하여 Chunk 기반의 데이터 입출력 처리를 담당한다. 

 

  • ChunkOrientedTasklet.java
package org.springframework.batch.core.step.item;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.lang.Nullable;

public class ChunkOrientedTasklet<I> implements Tasklet {
    private static final String INPUTS_KEY = "INPUTS";
    private final ChunkProcessor<I> chunkProcessor;
    private final ChunkProvider<I> chunkProvider;
    private boolean buffering = true;
    private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class);

    public ChunkOrientedTasklet(ChunkProvider<I> chunkProvider, ChunkProcessor<I> chunkProcessor) {
        this.chunkProvider = chunkProvider;
        this.chunkProcessor = chunkProcessor;
    }

    public void setBuffering(boolean buffering) {
        this.buffering = buffering;
    }

    @Nullable
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        Chunk<I> inputs = (Chunk)chunkContext.getAttribute("INPUTS");
        if (inputs == null) {
            inputs = this.chunkProvider.provide(contribution);
            if (this.buffering) {
                chunkContext.setAttribute("INPUTS", inputs);
            }
        }

        this.chunkProcessor.process(contribution, inputs);
        this.chunkProvider.postProcess(contribution, inputs);
        if (inputs.isBusy()) {
            logger.debug("Inputs still busy");
            return RepeatStatus.CONTINUABLE;
        } else {
            chunkContext.removeAttribute("INPUTS");
            chunkContext.setComplete();
            if (logger.isDebugEnabled()) {
                logger.debug("Inputs not busy, ended: " + inputs.isEnd());
            }

            return RepeatStatus.continueIf(!inputs.isEnd());
        }
    }
}

 

1) Chunk 처리 중 예외가 발생하여 재시도한 경우라면 버퍼에 담아놨던 데이터를 다시 가져온다.

Chunk<I> inputs = (Chunk)chunkContext.getAttribute("INPUTS");


2) 개별 Item 을 Chunk size만큼 반복해서 읽어서 inputs 에 저장한다.

inputs = this.chunkProvider.provide(contribution);

 

3) Chunk 를 캐싱하기 위해 버퍼에 담는다.

chunkContext.setAttribute("INPUTS", inputs);

 

4) ChunkProvider로부터 받은 Chunk<I> inputs의 아이템 개수만큼 데이터를 가공하고 저장한다.

this.chunkProcessor.process(contribution, inputs);

 

5) Chunk 단위 입출력이 완료되면 버퍼에 저장한 chunk 데이터를 삭제한다.

chunkContext.removeAttribute("INPUTS");

 

6) 읽을 item이 더 존재하면 chunk 프로세스를 반복하고, 존재하지 않는다면 프로세스를 종료한다.

return RepeatStatus.continueIf(!inputs.isEnd());

 

Chunk size만큼의 실행 과정

1) ChunkProvider -> read() -> ItemReader

chunk size 만큼 read() 한다.

 

2) ChunkProcessor -> process() -> ItemProcessor

ITemReader가 전달한 아이템 개수(chunk size)만큼 process()를 반복한다.

 

3) ChunkProcessor -> write(items) -> ItemWriter

read(), process()를 거친 List 타입의 items 를 저장한다.

 

4) 데이터를 모두 처리할때까지 1)~3)번의 과정을 반복한다. 

 

 

Chunk 기반 배치 예제

package com.spring.batch.chunk;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

/*
--job.name=chunkOrientedTaskletJob
 */
@Configuration
@RequiredArgsConstructor
public class ChunkOrientedTaskletConfiguration {
    // job 생성
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job chunkOrientedTaskletJob() {
        return this.jobBuilderFactory.get("chunkOrientedTaskletJob")
                /* step start */
                .start(chunkOrientedTaskletStep1())
                .next(chunkOrientedTaskletStep2())
                .build();
    }

    @Bean
    public Step chunkOrientedTaskletStep1() {
        return stepBuilderFactory.get("chunkOrientedTaskletStep1")
                .<String, String>chunk(2)
                .reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5", "item6")))
                .processor((ItemProcessor<String, String>) item -> {
                    System.out.println("item = " + item);
                    return "get " + item;
                })
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        items.forEach(System.out::println);
                    }
                })
                .build();
    }

    @Bean
    public Step chunkOrientedTaskletStep2() {
        return stepBuilderFactory.get("chunkOrientedTaskletStep2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("chunkJobConfiguration step2 was executed");
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }
}

 

  • 위 배치 예제에서 Step1()을 살펴보자.
    @Bean
    public Step chunkOrientedTaskletStep1() {
        return stepBuilderFactory.get("chunkOrientedTaskletStep1")
                .<String, String>chunk(2)
                .reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5", "item6")))
                .processor((ItemProcessor<String, String>) item -> {
                    System.out.println("item = " + item);
                    return "get " + item;
                })
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        items.forEach(System.out::println);
                    }
                })
                .build();
    }

 

위에서 공부한 내용을 토대로 코드를 읽어보면 아래와 같다.

 

- I : String, O : String, chunk size : 2

.<String, String>chunk(2)

 

[반복과정 START]

1) ItemReader 에서 데이터를 한건씩 반복해서 읽어온다. chunk size가 2이므로, 2개가 담아진다면 반복을 종료한다.

2) ItemProcessor 에서 아이템 한건씩 반복해서 가공 또는 변환한다. chunk size가 2이므로, 2개가 처리된다면 반복을 종료한다.

3) ItemWriter 에서 List로 받은 items은 chunk size 가 2이므로 2개의 아이템이 담겨져있다. 이를 한번에 저장하거나 처리한다.

 

모든 데이터의 처리가 끝났다면 종료하고, 아직 남아있다면 위 1)~3)번의 과정을 반복한다.

 

예제코드 결과

[1번째 수행]
get item1
get item2

[2번째 수행]
get item3
get item4

[3번째 수행]
get item5
get item6

[종료]

 

 

 

반응형

Designed by JB FACTORY