기본개념
우선 ItemReader, ItemProcessor, ItemWriter 에 대해 간단하게 살펴보자.
ItemReader
다양한 타입의 입력 데이터를 읽어오는 인터페이스로, 배치 수행의 대상이 될 데이터를 담는다.
package org.springframework.batch.item;
import org.springframework.lang.Nullable;
public interface ItemReader<T> {
@Nullable
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
ItemProcessor
ItemReader로 읽어온 데이터들을 가공하고 변환하는 과정을 수행한다. chunk 기반에서 생략이 가능한 단계다.
package org.springframework.batch.item;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
public interface ItemProcessor<I, O> {
@Nullable
O process(@NonNull I var1) throws Exception;
}
ItemWriter
ItemReader 을 통해 읽어들인 데이터는 ItemProcessor을 통해 변환되는 과정을 거쳐, ItemWriter에서 저장된다.
package org.springframework.batch.item;
import java.util.List;
public interface ItemWriter<T> {
void write(List<? extends T> var1) throws Exception;
}
스프링 배치에서의 Step 실행 단위
Chunk 기반
- 하나의 큰 덩어리를 chunk size만큼 나눠서 반복 실행된다.
- ItemReader, ItemProcessor, ItemWriter 를 사용하며 Chunk 기반 전용 Tasklet인 ChunkOrientedTasklet 구현체가 제공된다.
- 예시코드
@Bean
public Step chunkStep1() {
return stepBuilderFactory.get("chunkStep1")
.<String, String>chunk(5)
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5")))
.processor((ItemProcessor<String, String>) item -> {
return "my" + item;
})
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
System.out.println("items = " + items);
}
})
.build();
}
Task 기반
- 위 Chunk 기반보다 단일 작업 기반으로 처리되는 것이 더 효율적인 경우 사용한다.
- 주로 Tasklet 구현체를 만들어서 사용한다.
- 예시코드
@Bean
public Step chunkStep2() {
return stepBuilderFactory.get("chunkStep2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("chunkJobConfiguration step2 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
Chunk
Chunk란, 여러 개의 아이템을 묶은 하나의 덩어리를 의미한다. Spring Batch에서는 하나의 아이템을 Chunk 단위의 덩어리로 만들고, Chunk 단위로 트랜잭션을 수행한다.
- Chunk 선언 방법
...
@Bean
public Step chunkStep1() {
return stepBuilderFactory.get("chunkStep1")
.<String, String>chunk(5)
...
}
...
<I, O>chunk(5) 로 선언되어있다.
1) I
ItemReader 로 읽은 하나의 아이템을 CHunk에서 정한 개수만큼 반복해서 저장하는 타입이다.
2) O
ItemReader로부터 전달받는 Chunk<I>를 참조해서 ItemProcessor 에서 적절하게 가공한 후, ItemWriter에 전달하는 타입이다.
ChunkOrientedTasklet
스프링 배치에서 제공하는 Tasklet의 구현체다. Chunk 지향 프로세싱을 담당하는 객체로, ItemReader, ItemWriter, ItemProcessor 를 사용하여 Chunk 기반의 데이터 입출력 처리를 담당한다.
- ChunkOrientedTasklet.java
package org.springframework.batch.core.step.item;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.lang.Nullable;
public class ChunkOrientedTasklet<I> implements Tasklet {
private static final String INPUTS_KEY = "INPUTS";
private final ChunkProcessor<I> chunkProcessor;
private final ChunkProvider<I> chunkProvider;
private boolean buffering = true;
private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class);
public ChunkOrientedTasklet(ChunkProvider<I> chunkProvider, ChunkProcessor<I> chunkProcessor) {
this.chunkProvider = chunkProvider;
this.chunkProcessor = chunkProcessor;
}
public void setBuffering(boolean buffering) {
this.buffering = buffering;
}
@Nullable
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Chunk<I> inputs = (Chunk)chunkContext.getAttribute("INPUTS");
if (inputs == null) {
inputs = this.chunkProvider.provide(contribution);
if (this.buffering) {
chunkContext.setAttribute("INPUTS", inputs);
}
}
this.chunkProcessor.process(contribution, inputs);
this.chunkProvider.postProcess(contribution, inputs);
if (inputs.isBusy()) {
logger.debug("Inputs still busy");
return RepeatStatus.CONTINUABLE;
} else {
chunkContext.removeAttribute("INPUTS");
chunkContext.setComplete();
if (logger.isDebugEnabled()) {
logger.debug("Inputs not busy, ended: " + inputs.isEnd());
}
return RepeatStatus.continueIf(!inputs.isEnd());
}
}
}
1) Chunk 처리 중 예외가 발생하여 재시도한 경우라면 버퍼에 담아놨던 데이터를 다시 가져온다.
Chunk<I> inputs = (Chunk)chunkContext.getAttribute("INPUTS");
2) 개별 Item 을 Chunk size만큼 반복해서 읽어서 inputs 에 저장한다.
inputs = this.chunkProvider.provide(contribution);
3) Chunk 를 캐싱하기 위해 버퍼에 담는다.
chunkContext.setAttribute("INPUTS", inputs);
4) ChunkProvider로부터 받은 Chunk<I> inputs의 아이템 개수만큼 데이터를 가공하고 저장한다.
this.chunkProcessor.process(contribution, inputs);
5) Chunk 단위 입출력이 완료되면 버퍼에 저장한 chunk 데이터를 삭제한다.
chunkContext.removeAttribute("INPUTS");
6) 읽을 item이 더 존재하면 chunk 프로세스를 반복하고, 존재하지 않는다면 프로세스를 종료한다.
return RepeatStatus.continueIf(!inputs.isEnd());
Chunk size만큼의 실행 과정
1) ChunkProvider -> read() -> ItemReader
chunk size 만큼 read() 한다.
2) ChunkProcessor -> process() -> ItemProcessor
ITemReader가 전달한 아이템 개수(chunk size)만큼 process()를 반복한다.
3) ChunkProcessor -> write(items) -> ItemWriter
read(), process()를 거친 List 타입의 items 를 저장한다.
4) 데이터를 모두 처리할때까지 1)~3)번의 과정을 반복한다.
Chunk 기반 배치 예제
package com.spring.batch.chunk;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
/*
--job.name=chunkOrientedTaskletJob
*/
@Configuration
@RequiredArgsConstructor
public class ChunkOrientedTaskletConfiguration {
// job 생성
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job chunkOrientedTaskletJob() {
return this.jobBuilderFactory.get("chunkOrientedTaskletJob")
/* step start */
.start(chunkOrientedTaskletStep1())
.next(chunkOrientedTaskletStep2())
.build();
}
@Bean
public Step chunkOrientedTaskletStep1() {
return stepBuilderFactory.get("chunkOrientedTaskletStep1")
.<String, String>chunk(2)
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5", "item6")))
.processor((ItemProcessor<String, String>) item -> {
System.out.println("item = " + item);
return "get " + item;
})
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
items.forEach(System.out::println);
}
})
.build();
}
@Bean
public Step chunkOrientedTaskletStep2() {
return stepBuilderFactory.get("chunkOrientedTaskletStep2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("chunkJobConfiguration step2 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
}
- 위 배치 예제에서 Step1()을 살펴보자.
@Bean
public Step chunkOrientedTaskletStep1() {
return stepBuilderFactory.get("chunkOrientedTaskletStep1")
.<String, String>chunk(2)
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5", "item6")))
.processor((ItemProcessor<String, String>) item -> {
System.out.println("item = " + item);
return "get " + item;
})
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
items.forEach(System.out::println);
}
})
.build();
}
위에서 공부한 내용을 토대로 코드를 읽어보면 아래와 같다.
- I : String, O : String, chunk size : 2
.<String, String>chunk(2)
[반복과정 START]
1) ItemReader 에서 데이터를 한건씩 반복해서 읽어온다. chunk size가 2이므로, 2개가 담아진다면 반복을 종료한다.
2) ItemProcessor 에서 아이템 한건씩 반복해서 가공 또는 변환한다. chunk size가 2이므로, 2개가 처리된다면 반복을 종료한다.
3) ItemWriter 에서 List로 받은 items은 chunk size 가 2이므로 2개의 아이템이 담겨져있다. 이를 한번에 저장하거나 처리한다.
모든 데이터의 처리가 끝났다면 종료하고, 아직 남아있다면 위 1)~3)번의 과정을 반복한다.
예제코드 결과
[1번째 수행]
get item1
get item2
[2번째 수행]
get item3
get item4
[3번째 수행]
get item5
get item6
[종료]
'Coding > Spring Batch' 카테고리의 다른 글
[SpringBatch 실습] 2. ApplicationRunner 구현 클래스로 Job 수행시키기 (0) | 2022.05.21 |
---|---|
[SringBatch 실습] 1. Hello Spring Batch! (0) | 2022.05.21 |
[SpringBatch] @JobScope와 @StespScope 개념과 실행 예제 (0) | 2022.03.06 |
[SpringBatch] BATCH_JOB_EXECUTION, BATCH_STEP_EXECUTION의 EXIT_CODE, STATUS 컬럼 셋팅 정보 (0) | 2022.02.17 |
[SpringBatch] COMPLETED된 Step도 Job 재실행 대상에 포함하기 -allowStartIfComplete (0) | 2022.02.15 |