[SpringBatch 실습] 21. Chunk 설정에 따른 reader(), processor(), writer() 수행 과정

반응형
728x90
반응형

1번째 Job 생성

ChunkConfiguration.java
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

/*
--job.name=chunkJob
 */
@Configuration
@RequiredArgsConstructor
public class ChunkConfiguration {
    // job 생성
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job chunkJob() {
        return this.jobBuilderFactory.get("chunkJob")
                /* step start */
                .start(chunkStep1())
                .next(chunkStep2())
                .build();
    }

    @Bean
    public Step chunkStep1() {
        return stepBuilderFactory.get("chunkStep1")
                .<String, String>chunk(5)
                .reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5")))
                .processor((ItemProcessor<String, String>) item -> {
                    Thread.sleep(300); // 0.3초 delay
                    System.out.println("item = " + item);
                    return "seohae : " + item;
                })
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        // List type item
                        Thread.sleep(300); // 0.3초 delay
                        System.out.println("items = " + items);
                    }
                })
                .build();
    }

    @Bean
    public Step chunkStep2() {
        return stepBuilderFactory.get("chunkStep2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("chunkJobConfiguration step2 was executed");
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }
}

 

1) chunkSize가 5로 설정되었다.

.<String, String>chunk(5)
  • reader() : chunkSize 5개를 1개씩 처리한다.
  • processor() : chunkSize 5개를 1개씩 처리한다.
  • writer() : 위 processor()을 거친 5개만큼의 리스트를 처리한다.

 

실행결과

processor() 에서 출력된 결과

item = item1
item = item2
item = item3
item = item4
item = item5

 

writer() 에서 출력된 결과

items = [seohae : item1, seohae : item2, seohae : item3, seohae : item4, seohae : item5]

 

 

 

2번째 Job 생성

ChunkOrientedTaskletConfiguration.java
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

/*
--job.name=chunkOrientedTaskletJob
 */
@Configuration
@RequiredArgsConstructor
public class ChunkOrientedTaskletConfiguration {
    // job 생성
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job chunkOrientedTaskletJob() {
        return this.jobBuilderFactory.get("chunkOrientedTaskletJob")
                /* step start */
                .start(chunkOrientedTaskletStep1())
                .next(chunkOrientedTaskletStep2())
                .build();
    }

    @Bean
    public Step chunkOrientedTaskletStep1() {
        return stepBuilderFactory.get("chunkOrientedTaskletStep1")
                .<String, String>chunk(2)
                .reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5", "item6")))
                .processor((ItemProcessor<String, String>) item -> {
                    System.out.println("item = " + item);
                    return "seohae : " + item;
                })
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        items.forEach(System.out::println);
                    }
                })
                .build();
    }

    @Bean
    public Step chunkOrientedTaskletStep2() {
        return stepBuilderFactory.get("chunkOrientedTaskletStep2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("chunkJobConfiguration step2 was executed");
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }
}

1) chunkSize가 2로 설정되었다.

.<String, String>chunk(2)
  • reader() : chunkSize 2개를 1개씩 list에 담는다.
  • processor() : chunkSize 2개를 1개씩 list에 담는다.
  • writer() : 위 processor()을 거친 2개만큼의 리스트를 처리한다.

 

위 과정이 몇번 수행될까?

아이템 5개를 2개씩 수행하므로 3번 수행된다.

 

 

실행결과

첫번째

processor() 에서 출력된 결과

item = item1
item = item2

 

writer() 에서 출력된 결과

seohae : item1
seohae : item2

 

두번째

processor() 에서 출력된 결과

item = item3
item = item4

 

 

writer() 에서 출력된 결과

seohae : item3
seohae : item4

 

세번째

processor() 에서 출력된 결과

item = item5
item = item6

 

writer() 에서 출력된 결과

seohae : item5
seohae : item6

 

결론

chunkSize가 2일때

  • read() : chunkSize 2개를 list에 담는다.
  • process() : chunkSize 2개를 가공하여 list에 담는다.
  • writer() : 2개가 담긴 list를 처리한다.

 

 

디버깅

1) SimpleStepBuilder.createTasklet()

@Override
protected Tasklet createTasklet() {
   Assert.state(reader != null, "ItemReader must be provided");
   Assert.state(writer != null, "ItemWriter must be provided");
   RepeatOperations repeatOperations = createChunkOperations();
   SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations);
   SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter());
   chunkProvider.setListeners(new ArrayList<>(itemListeners));
   chunkProcessor.setListeners(new ArrayList<>(itemListeners));
   ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor);
   tasklet.setBuffering(!readerTransactionalQueue);
   return tasklet;
}

 

  • ChunkOrientedTasklet 객체 생성
ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor);

 

2) SimpleChunkProvider.provide()

@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {

    final Chunk<I> inputs = new Chunk<>();
    repeatOperations.iterate(new RepeatCallback() {

        @Override
        public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
            I item = null;
            Timer.Sample sample = Timer.start(Metrics.globalRegistry);
            String status = BatchMetrics.STATUS_SUCCESS;
            try {
                item = read(contribution, inputs);
            }
            catch (SkipOverflowException e) {
                // read() tells us about an excess of skips by throwing an
                // exception
                status = BatchMetrics.STATUS_FAILURE;
                return RepeatStatus.FINISHED;
            }
            finally {
                stopTimer(sample, contribution.getStepExecution(), status);
            }
            if (item == null) {
                inputs.setEnd();
                return RepeatStatus.FINISHED;
            }
            inputs.add(item);
            contribution.incrementReadCount();
            return RepeatStatus.CONTINUABLE;
        }

    });

    return inputs;

}

 

3) read() 메서드를 따라가보면 doRead() 수행

@Nullable
protected final I doRead() throws Exception {
   try {
      listener.beforeRead();
      I item = itemReader.read();
      if(item != null) {
         listener.afterRead(item);
      }
      return item;
   }
   catch (Exception e) {
      if (logger.isDebugEnabled()) {
         logger.debug(e.getMessage() + " : " + e.getClass().getName());
      }
      listener.onReadError(e);
      throw e;
   }
}

chunkSize 2만큼 위 메서드가 수행되어 inputs 리스트가 셋팅된다.

 

 

4) SimpleChunkProcessor.transform()

protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
   Chunk<O> outputs = new Chunk<>();
   for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
      final I item = iterator.next();
      O output;
      Timer.Sample sample = BatchMetrics.createTimerSample();
      String status = BatchMetrics.STATUS_SUCCESS;
      try {
         output = doProcess(item);
      }
      catch (Exception e) {
         /*
          * For a simple chunk processor (no fault tolerance) we are done
          * here, so prevent any more processing of these inputs.
          */
         inputs.clear();
         status = BatchMetrics.STATUS_FAILURE;
         throw e;
      }
      finally {
         stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");
      }
      if (output != null) {
         outputs.add(output);
      }
      else {
         iterator.remove();
      }
   }
   return outputs;
}

inputs 리스트에 있는 item1, item2를 1개씩 doProcess() 메서드를 수행한다.

 

 

5) SimpleChunkProcessor.doProcess()

protected final O doProcess(I item) throws Exception {

   if (itemProcessor == null) {
      @SuppressWarnings("unchecked")
      O result = (O) item;
      return result;
   }

   try {
      listener.beforeProcess(item);
      O result = itemProcessor.process(item);
      listener.afterProcess(item, result);
      return result;
   }
   catch (Exception e) {
      listener.onProcessError(item, e);
      throw e;
   }

}

 

 

item1, item2 가 각각 process() 수행된다.

호출 후의 outputs 리스트에 담겨진 데이터는 다음과 같다.

 

 

6) SimpleChunkProcessor.doWrite()

protected final void doWrite(List<O> items) throws Exception {

   if (itemWriter == null) {
      return;
   }

   try {
      listener.beforeWrite(items);
      writeItems(items);
      doAfterWrite(items);
   }
   catch (Exception e) {
      doOnWriteError(e, items);
      throw e;
   }

}

2개가 들어있는 List를 처리한다.

 

 

2번째 순회

1) SimpleChunkProvider.provide()

item1, item2 2개 처리가 완료되었으므로, 다시 다음 item 2개를 읽어온다.

 

 

2) SimpleChunkProcessor.doProcess()

item3, item4 가 각각 process() 수행된다.

 

 

3) SimpleChunkProcessor.doWrite()

protected final void doWrite(List<O> items) throws Exception {

   if (itemWriter == null) {
      return;
   }

   try {
      listener.beforeWrite(items);
      writeItems(items);
      doAfterWrite(items);
   }
   catch (Exception e) {
      doOnWriteError(e, items);
      throw e;
   }

}

 

2개가 들어있는 List를 처리한다.

 

 

반응형

Designed by JB FACTORY