반응형
728x90
반응형
Job 생성
아래와 같은 Job을 만들자.
Processor 체이닝 리스트 | 기대 결과 |
CustomItemProcessor1 | [item1, item2, item3, item4, item5, item6, item7, item8, item9, item10] |
CustomItemProcessor2 | [item11, item22, item33, item44, item55, item66, item77, item88, item99, item1010] |
Job 수행
CompositionItemConfiguration.java
package com.project.springbatch._66_processor_delegate;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.builder.CompositeItemProcessorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/*
--job.name=CompositionItemJob
*/
@RequiredArgsConstructor
@Configuration
public class CompositionItemConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job CompositionItemJob() throws Exception {
return jobBuilderFactory.get("CompositionItemJob")
.incrementer(new RunIdIncrementer())
.start(CompositionItemStep1())
.build();
}
@Bean
public Step CompositionItemStep1() throws Exception {
return stepBuilderFactory.get("CompositionItemStep1")
.<String, String>chunk(10)
.reader(new ItemReader<String>() {
int i = 0;
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
i++;
return i > 10 ? null : "item";
}
})
.processor(compositionItemProcessor())
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
System.out.println(items);
}
})
.build();
}
@Bean
public CompositeItemProcessor compositionItemProcessor() {
/*
ItemProcessor 체이닝 수행
*/
CompositeItemProcessor<String,String> compositeProcessor = new CompositeItemProcessor<>();
List itemProcessors = new ArrayList();
itemProcessors.add(new CustomItemProcessor1());
itemProcessors.add(new CustomItemProcessor2());
return new CompositeItemProcessorBuilder<>()
.delegates(itemProcessors)
.build();
}
}
디버깅
1) .reader()
.reader(new ItemReader<String>() {
int i = 0;
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
i++;
return i > 10 ? null : "item";
}
})
- i가 10일때
2) .processor(compositionItemProcessor())
▶ compositionItemProcessor()
Processor 체이닝을 수행한다.
@Bean
public CompositeItemProcessor compositionItemProcessor() {
/*
ItemProcessor 체이닝 수행
*/
CompositeItemProcessor<String,String> compositeProcessor = new CompositeItemProcessor<>();
List itemProcessors = new ArrayList();
itemProcessors.add(new CustomItemProcessor1());
itemProcessors.add(new CustomItemProcessor2());
return new CompositeItemProcessorBuilder<>()
.delegates(itemProcessors)
.build();
}
List로 선언된 itemProcessors에 CustomItemProcessor1, CustomItemProcessor2 를 등록한다.
▶ CompositeItemProcessorBuilder.build()
public CompositeItemProcessor<I, O> build() {
Assert.notNull(delegates, "A list of delegates is required.");
Assert.notEmpty(delegates, "The delegates list must have one or more delegates.");
CompositeItemProcessor<I, O> processor = new CompositeItemProcessor<>();
processor.setDelegates(this.delegates);
return processor;
}
CustomItemProcessor1.process()
item 에는 "item"이 담겨져있다.
read() 에서 아래 코드를 통해 i가 10이였으므로 "item"이 리턴된다.
return i > 10 ? null : "item";
CustomItemProcessor2.process()
위 CustomItemProcessor1.process()에서 받은 "item1"을 "item11"로 만든다.
위 process1(), process2()는 10번 반복 수행된다.
- i가 11일 되었을때
참고로, i가 10보다 커지면 null 이 리턴한다.
▶ SimpleChunkProvider.provide()
@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {
final Chunk<I> inputs = new Chunk<>();
repeatOperations.iterate(new RepeatCallback() {
@Override
public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
I item = null;
Timer.Sample sample = Timer.start(Metrics.globalRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
item = read(contribution, inputs);
}
catch (SkipOverflowException e) {
// read() tells us about an excess of skips by throwing an
// exception
status = BatchMetrics.STATUS_FAILURE;
return RepeatStatus.FINISHED;
}
finally {
stopTimer(sample, contribution.getStepExecution(), status);
}
if (item == null) {
inputs.setEnd();
return RepeatStatus.FINISHED;
}
inputs.add(item);
contribution.incrementReadCount();
return RepeatStatus.CONTINUABLE;
}
});
return inputs;
}
이때, RepeatStatus.FINISED;가 호출되어 배치가 종료된다.
배치 수행 결과
[item11, item22, item33, item44, item55, item66, item77, item88, item99, item1010]
Processor 체이닝 위임 디버깅
CompositeItemProcessor.java
@Nullable
@Override
@SuppressWarnings("unchecked")
public O process(I item) throws Exception {
Object result = item;
for (ItemProcessor<?, ?> delegate : delegates) {
if (result == null) {
return null;
}
result = processItem(delegate, result);
}
return (O) result;
}
1) CustomItemProcessor1 수행일 경우
2) 우리가 정의한 CustomItemProcessor1의 process() 메서드를 호출한다.
CompositeItemProcessor.java > proessItem()
CustomItemProcessor2도 위와 동일하게 process()가 수행된다.
반응형