[SpringBatch 실습] 36. CustomItemProcessor 여러개를 이어서 수행시키기 (Processor 체이닝 설정하여 멀티 수행)

반응형
728x90
반응형

Job 생성

아래와 같은 Job을 만들자.

Processor 체이닝 리스트 기대 결과
CustomItemProcessor1 [item1, item2, item3, item4, item5, item6, item7, item8, item9, item10]
CustomItemProcessor2 [item11, item22, item33, item44, item55, item66, item77, item88, item99, item1010]

 

 

 

Job 수행

CompositionItemConfiguration.java
package com.project.springbatch._66_processor_delegate;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.builder.CompositeItemProcessorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

/*
--job.name=CompositionItemJob
*/
@RequiredArgsConstructor
@Configuration
public class CompositionItemConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job CompositionItemJob() throws Exception {
        return jobBuilderFactory.get("CompositionItemJob")
                .incrementer(new RunIdIncrementer())
                .start(CompositionItemStep1())
                .build();
    }

    @Bean
    public Step CompositionItemStep1() throws Exception {
        return stepBuilderFactory.get("CompositionItemStep1")
                .<String, String>chunk(10)
                .reader(new ItemReader<String>() {
                    int i = 0;
                    @Override
                    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                        i++;
                        return i > 10 ? null : "item";
                    }
                })
                .processor(compositionItemProcessor())
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        System.out.println(items);
                    }
                })
                .build();
    }

    @Bean
    public CompositeItemProcessor compositionItemProcessor() {
        /*
            ItemProcessor 체이닝 수행
         */
        CompositeItemProcessor<String,String> compositeProcessor = new CompositeItemProcessor<>();
        List itemProcessors = new ArrayList();
        itemProcessors.add(new CustomItemProcessor1());
        itemProcessors.add(new CustomItemProcessor2());

        return new CompositeItemProcessorBuilder<>()
                .delegates(itemProcessors)
                .build();
    }
}

 

 

디버깅

1) .reader()

.reader(new ItemReader<String>() {
    int i = 0;
    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        i++;
        return i > 10 ? null : "item";
    }
})

- i가 10일때

 

2) .processor(compositionItemProcessor())

▶ compositionItemProcessor()

Processor 체이닝을 수행한다.

@Bean
public CompositeItemProcessor compositionItemProcessor() {
    /*
        ItemProcessor 체이닝 수행
     */
    CompositeItemProcessor<String,String> compositeProcessor = new CompositeItemProcessor<>();
    List itemProcessors = new ArrayList();
    itemProcessors.add(new CustomItemProcessor1());
    itemProcessors.add(new CustomItemProcessor2());

    return new CompositeItemProcessorBuilder<>()
            .delegates(itemProcessors)
            .build();
}

List로 선언된 itemProcessors에 CustomItemProcessor1, CustomItemProcessor2 를 등록한다.

 

▶ CompositeItemProcessorBuilder.build()

public CompositeItemProcessor<I, O> build() {
   Assert.notNull(delegates, "A list of delegates is required.");
   Assert.notEmpty(delegates, "The delegates list must have one or more delegates.");

   CompositeItemProcessor<I, O> processor = new CompositeItemProcessor<>();
   processor.setDelegates(this.delegates);
   return processor;
}

 

CustomItemProcessor1.process()

item 에는 "item"이 담겨져있다.

read() 에서 아래 코드를 통해 i가 10이였으므로 "item"이 리턴된다.

return i > 10 ? null : "item";

 

CustomItemProcessor2.process()

위 CustomItemProcessor1.process()에서 받은 "item1"을 "item11"로 만든다.

 

위 process1(), process2()는 10번 반복 수행된다.

 

- i가 11일 되었을때

참고로, i가 10보다 커지면 null 이 리턴한다.

 

▶ SimpleChunkProvider.provide()

@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {

   final Chunk<I> inputs = new Chunk<>();
   repeatOperations.iterate(new RepeatCallback() {

      @Override
      public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
         I item = null;
         Timer.Sample sample = Timer.start(Metrics.globalRegistry);
         String status = BatchMetrics.STATUS_SUCCESS;
         try {
            item = read(contribution, inputs);
         }
         catch (SkipOverflowException e) {
            // read() tells us about an excess of skips by throwing an
            // exception
            status = BatchMetrics.STATUS_FAILURE;
            return RepeatStatus.FINISHED;
         }
         finally {
            stopTimer(sample, contribution.getStepExecution(), status);
         }
         if (item == null) {
            inputs.setEnd();
            return RepeatStatus.FINISHED;
         }
         inputs.add(item);
         contribution.incrementReadCount();
         return RepeatStatus.CONTINUABLE;
      }

   });

   return inputs;

}

이때, RepeatStatus.FINISED;가 호출되어 배치가 종료된다.

 

배치 수행 결과
[item11, item22, item33, item44, item55, item66, item77, item88, item99, item1010]

 

 

Processor 체이닝 위임 디버깅

CompositeItemProcessor.java
@Nullable
@Override
@SuppressWarnings("unchecked")
public O process(I item) throws Exception {
   Object result = item;

   for (ItemProcessor<?, ?> delegate : delegates) {
      if (result == null) {
         return null;
      }

      result = processItem(delegate, result);
   }
   return (O) result;
}

1) CustomItemProcessor1 수행일 경우

 

2) 우리가 정의한 CustomItemProcessor1의 process() 메서드를 호출한다.

CompositeItemProcessor.java > proessItem()

CustomItemProcessor2도 위와 동일하게 process()가 수행된다.

 

 

반응형

Designed by JB FACTORY