1번째 Job 생성
ChunkConfiguration.java
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
/*
--job.name=chunkJob
*/
@Configuration
@RequiredArgsConstructor
public class ChunkConfiguration {
// job 생성
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job chunkJob() {
return this.jobBuilderFactory.get("chunkJob")
/* step start */
.start(chunkStep1())
.next(chunkStep2())
.build();
}
@Bean
public Step chunkStep1() {
return stepBuilderFactory.get("chunkStep1")
.<String, String>chunk(5)
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5")))
.processor((ItemProcessor<String, String>) item -> {
Thread.sleep(300); // 0.3초 delay
System.out.println("item = " + item);
return "seohae : " + item;
})
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
// List type item
Thread.sleep(300); // 0.3초 delay
System.out.println("items = " + items);
}
})
.build();
}
@Bean
public Step chunkStep2() {
return stepBuilderFactory.get("chunkStep2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("chunkJobConfiguration step2 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
}
1) chunkSize가 5로 설정되었다.
.<String, String>chunk(5)
- reader() : chunkSize 5개를 1개씩 처리한다.
- processor() : chunkSize 5개를 1개씩 처리한다.
- writer() : 위 processor()을 거친 5개만큼의 리스트를 처리한다.
실행결과
processor() 에서 출력된 결과
item = item1
item = item2
item = item3
item = item4
item = item5
writer() 에서 출력된 결과
items = [seohae : item1, seohae : item2, seohae : item3, seohae : item4, seohae : item5]
2번째 Job 생성
ChunkOrientedTaskletConfiguration.java
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
/*
--job.name=chunkOrientedTaskletJob
*/
@Configuration
@RequiredArgsConstructor
public class ChunkOrientedTaskletConfiguration {
// job 생성
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job chunkOrientedTaskletJob() {
return this.jobBuilderFactory.get("chunkOrientedTaskletJob")
/* step start */
.start(chunkOrientedTaskletStep1())
.next(chunkOrientedTaskletStep2())
.build();
}
@Bean
public Step chunkOrientedTaskletStep1() {
return stepBuilderFactory.get("chunkOrientedTaskletStep1")
.<String, String>chunk(2)
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5", "item6")))
.processor((ItemProcessor<String, String>) item -> {
System.out.println("item = " + item);
return "seohae : " + item;
})
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
items.forEach(System.out::println);
}
})
.build();
}
@Bean
public Step chunkOrientedTaskletStep2() {
return stepBuilderFactory.get("chunkOrientedTaskletStep2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("chunkJobConfiguration step2 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
}
1) chunkSize가 2로 설정되었다.
.<String, String>chunk(2)
- reader() : chunkSize 2개를 1개씩 list에 담는다.
- processor() : chunkSize 2개를 1개씩 list에 담는다.
- writer() : 위 processor()을 거친 2개만큼의 리스트를 처리한다.
위 과정이 몇번 수행될까?
아이템 5개를 2개씩 수행하므로 3번 수행된다.
실행결과
첫번째
processor() 에서 출력된 결과
item = item1
item = item2
writer() 에서 출력된 결과
seohae : item1
seohae : item2
두번째
processor() 에서 출력된 결과
item = item3
item = item4
writer() 에서 출력된 결과
seohae : item3
seohae : item4
세번째
processor() 에서 출력된 결과
item = item5
item = item6
writer() 에서 출력된 결과
seohae : item5
seohae : item6
결론
chunkSize가 2일때
- read() : chunkSize 2개를 list에 담는다.
- process() : chunkSize 2개를 가공하여 list에 담는다.
- writer() : 2개가 담긴 list를 처리한다.
디버깅
1) SimpleStepBuilder.createTasklet()
@Override
protected Tasklet createTasklet() {
Assert.state(reader != null, "ItemReader must be provided");
Assert.state(writer != null, "ItemWriter must be provided");
RepeatOperations repeatOperations = createChunkOperations();
SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations);
SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter());
chunkProvider.setListeners(new ArrayList<>(itemListeners));
chunkProcessor.setListeners(new ArrayList<>(itemListeners));
ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor);
tasklet.setBuffering(!readerTransactionalQueue);
return tasklet;
}
- ChunkOrientedTasklet 객체 생성
ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor);
2) SimpleChunkProvider.provide()
@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {
final Chunk<I> inputs = new Chunk<>();
repeatOperations.iterate(new RepeatCallback() {
@Override
public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
I item = null;
Timer.Sample sample = Timer.start(Metrics.globalRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
item = read(contribution, inputs);
}
catch (SkipOverflowException e) {
// read() tells us about an excess of skips by throwing an
// exception
status = BatchMetrics.STATUS_FAILURE;
return RepeatStatus.FINISHED;
}
finally {
stopTimer(sample, contribution.getStepExecution(), status);
}
if (item == null) {
inputs.setEnd();
return RepeatStatus.FINISHED;
}
inputs.add(item);
contribution.incrementReadCount();
return RepeatStatus.CONTINUABLE;
}
});
return inputs;
}
3) read() 메서드를 따라가보면 doRead() 수행
@Nullable
protected final I doRead() throws Exception {
try {
listener.beforeRead();
I item = itemReader.read();
if(item != null) {
listener.afterRead(item);
}
return item;
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(e.getMessage() + " : " + e.getClass().getName());
}
listener.onReadError(e);
throw e;
}
}
chunkSize 2만큼 위 메서드가 수행되어 inputs 리스트가 셋팅된다.
4) SimpleChunkProcessor.transform()
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
Chunk<O> outputs = new Chunk<>();
for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
final I item = iterator.next();
O output;
Timer.Sample sample = BatchMetrics.createTimerSample();
String status = BatchMetrics.STATUS_SUCCESS;
try {
output = doProcess(item);
}
catch (Exception e) {
/*
* For a simple chunk processor (no fault tolerance) we are done
* here, so prevent any more processing of these inputs.
*/
inputs.clear();
status = BatchMetrics.STATUS_FAILURE;
throw e;
}
finally {
stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");
}
if (output != null) {
outputs.add(output);
}
else {
iterator.remove();
}
}
return outputs;
}
inputs 리스트에 있는 item1, item2를 1개씩 doProcess() 메서드를 수행한다.
5) SimpleChunkProcessor.doProcess()
protected final O doProcess(I item) throws Exception {
if (itemProcessor == null) {
@SuppressWarnings("unchecked")
O result = (O) item;
return result;
}
try {
listener.beforeProcess(item);
O result = itemProcessor.process(item);
listener.afterProcess(item, result);
return result;
}
catch (Exception e) {
listener.onProcessError(item, e);
throw e;
}
}
item1, item2 가 각각 process() 수행된다.
호출 후의 outputs 리스트에 담겨진 데이터는 다음과 같다.
6) SimpleChunkProcessor.doWrite()
protected final void doWrite(List<O> items) throws Exception {
if (itemWriter == null) {
return;
}
try {
listener.beforeWrite(items);
writeItems(items);
doAfterWrite(items);
}
catch (Exception e) {
doOnWriteError(e, items);
throw e;
}
}
2개가 들어있는 List를 처리한다.
2번째 순회
1) SimpleChunkProvider.provide()
item1, item2 2개 처리가 완료되었으므로, 다시 다음 item 2개를 읽어온다.
2) SimpleChunkProcessor.doProcess()
item3, item4 가 각각 process() 수행된다.
3) SimpleChunkProcessor.doWrite()
protected final void doWrite(List<O> items) throws Exception {
if (itemWriter == null) {
return;
}
try {
listener.beforeWrite(items);
writeItems(items);
doAfterWrite(items);
}
catch (Exception e) {
doOnWriteError(e, items);
throw e;
}
}
2개가 들어있는 List를 처리한다.