[SpringBatch 실습] 23. ItemStreamReader, ItemStreamWriter 구현 클래스 생성하기 (open(), update(), close())

반응형
728x90
반응형

Job 생성

ItemStreamConfiguration.java
 package com.project.springbatch._41_itemStream;

 import com.project.springbatch._41_itemStream.custom.CustomItemStreamReader;
 import com.project.springbatch._41_itemStream.custom.CustomItemStreamWriter;
 import lombok.RequiredArgsConstructor;
 import org.springframework.batch.core.Job;
 import org.springframework.batch.core.Step;
 import org.springframework.batch.core.StepContribution;
 import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
 import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
 import org.springframework.batch.core.scope.context.ChunkContext;
 import org.springframework.batch.core.step.tasklet.Tasklet;
 import org.springframework.batch.item.ItemReader;
 import org.springframework.batch.item.ItemWriter;
 import org.springframework.batch.repeat.RepeatStatus;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;

 import java.util.ArrayList;
 import java.util.List;

/*
--job.name=itemStreamJob
 */
@Configuration
@RequiredArgsConstructor
public class ItemStreamConfiguration {
    // job 생성
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job itemStreamJob() {
        return this.jobBuilderFactory.get("itemStreamJob")
                /* step start */
                .start(itemStreamStep1())
                .next(itemStreamStep2())
                .build();
    }

    @Bean
    public Step itemStreamStep1() {
        return stepBuilderFactory.get("itemStreamStep1")
                .<String, String>chunk(5)
                .reader(customItemStreamReader())
                .writer(customItemStreamWriter())
                .build();
    }

    /**
     * item stream writer
     * @return
     */
    private ItemWriter<? super String> customItemStreamWriter() {
        return new CustomItemStreamWriter();
    }

    /**
     * item stream reader
     * @return
     */
    private ItemReader<String> customItemStreamReader() {
        List<String> items = new ArrayList<>();

        for (int i = 0; i <= 10; i++) {
            items.add(String.valueOf(i));
        }

        return new CustomItemStreamReader(items);
    }

    @Bean
    public Step itemStreamStep2() {
        return stepBuilderFactory.get("itemStreamStep2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("chunkJobConfiguration step2 was executed");
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }
}

 

 

ItemStreamReader 구현 클래스 생성

CustomItemStreamReader.java
package com.project.springbatch._41_itemStream.custom;

import org.springframework.batch.item.*;

import java.util.List;

public class CustomItemStreamReader implements ItemStreamReader<String> {
    private final List<String> items;
    private int index = -1; 
    private boolean restart = false; // 재시작 여부

    public CustomItemStreamReader(List<String> items) {
        this.items = items;
        this.index = 0;
    }

    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        System.out.println("CustomItemStreamReader.read");
        String item = null;

        if (this.index < this.items.size()) {
            item = this.items.get(index);
            index++; // index 1씩 증가
        }

        if (this.index == 6 && !restart) { // 고의 실패 (재시작이 아닐 경우에만)
            throw new RuntimeException("Restart is required");
        }

        return item;
    }

    /**
     * 초기화작업
     * @param executionContext
     * @throws ItemStreamException
     */
    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        System.out.println("CustomItemStreamReader.open");

        // ExecutionContext 의 map 에 데이터 저장이 가능하다.
        if (executionContext.containsKey("index")) { // index 키 데이터가 DB에 저장되어있다는 얘기
            index = executionContext.getInt("index");
            this.restart = true; // 재시작 가능으로 변경
        } else {
            index = 0;
            executionContext.put("index", index);
        }
    }

    /**
     * chunk size 만큼의 1 cycle 완료시마다 실행된다.
     * @param executionContext
     * @throws ItemStreamException
     */
    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        System.out.println("CustomItemStreamReader.update");

        // 잡이 재시작될때 여기서 저장된 마지막 index 를 가지고온다.
        executionContext.put("index", index);
    }

    /**
     * 리소스 해제 또는 초기화작업 해제 등 마무리 작업
     * @throws ItemStreamException
     */
    @Override
    public void close() throws ItemStreamException {
        System.out.println("CustomItemStreamReader.close");
    }
}

 

1) 변수 index 선언

private int index = -1;

아이템을 읽다가 실패했을 경우, 그때의 실패 시점의 데이터를 DB에 저장하는데, index 추적 가능하도록 저장한다.

 

2) Job 첫 실행시 고의로 에러를 발생시킨다.

if (this.index == 6 && !restart) { // 고의 실패 (재시작이 아닐 경우에만)
    throw new RuntimeException("Restart is required");
}
  • 조건1. index 가 6일 경우
  • 조건2. restart가 false인 경우

▶ Job을 재시작하면 위 조건문을 타지 않는다. (restart가 true가 되기 때문이다.)

실패하기 직전까지의 index를 저장하므로, 6에서 실패했으므로 index는 5까지 저장되어있다. (open() 메서드)

다음 Job을 재실행할때는 index가 5부터 시작된다.

 

3) oepn() : 초기화 작업을 수행한다.

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
    // ExecutionContext 의 map 에 데이터 저장이 가능하다.
    if (executionContext.containsKey("index")) { // index 키 데이터가 DB에 저장되어있다는 얘기
        index = executionContext.getInt("index");
        this.restart = true; // 재시작 가능으로 변경
    } else {
        index = 0;
        executionContext.put("index", index);
    }
}

 

4) write() : chunk size 만큼의 1 cycle이 완료시마다 실행된다.

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    // 잡이 재시작될때 여기서 저장된 마지막 index 를 가지고온다.
    executionContext.put("index", index);
}

 

5) close() : 배치 종료시 호출된다. 리소스 해제 또는 초기화 작업 해제 등 마무리 작업을 수행한다.

@Override
public void close() throws ItemStreamException {
    System.out.println("close");
}

 

 

ItemStreamWriter 구현 클래스 생성

CustomItemStreamWriter.java
package com.project.springbatch._41_itemStream.custom;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;

import java.util.List;

public class CustomItemStreamWriter implements ItemStreamWriter<String> {

    @Override
    public void write(List<? extends String> items) throws Exception {
        items.forEach(System.out::println);
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        System.out.println("CustomItemStreamWriter.open");
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        System.out.println("CustomItemStreamWriter.update");
    }

    @Override
    public void close() throws ItemStreamException {
        System.out.println("CustomItemStreamWriter.close");
    }
}

 

 

디버깅

1) ItemStreamConfiguration.java > customItemStreamReader()

 

2) ItemStreamConfiguration.java > customItemStreamWriter()

 

3) CompositeItemStream.java > open() 에서 우리가 생성한 클래스의 open()을 호출한다.

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
   for (ItemStream itemStream : streams) {
      itemStream.open(executionContext);
   }
}

반복문을 통해 아래 4)~5)번을 수행한다.

 

4) CustomItemStreamReader.java > open()

 

5) CustomItemStreamWriter.java > open()

 

6) CompositeItemStream.java > update() 에서 우리가 생성한 클래스의 update()을 호출한다.

@Override
public void update(ExecutionContext executionContext) {
   for (ItemStream itemStream : streams) {
      itemStream.update(executionContext);
   }
}

반복문을 통해 아래 7)~8)번을 수행한다.

 

7) CustomItemStreamReader.java > update()

 

8) CustomItemStreamWriter.java > update()

 

9) CustomItemStreamReader.java > read()

 

10) chunkSize 5만큼 1 Cycle이 완료되었으므로, write() 수행

 

11) 배치의 1 cycle이 끝났으므로 다시 ItemStreamReader의 update() 수행

  • CustomItemStreamReader.java > update()

  • CustomItemStreamWriter.java > update()

 

12) CustomItemStreamReader.java > read() 수행 

  • 현재 item :  5

 

이 경우, 고의 에러의 조건을 만족하므로 에러가 발생한다.

 

13) 에러가 발생하면서 배치 종료시, CompositeItemStream.java > close() 수행

@Override
public void close() throws ItemStreamException {
   for (ItemStream itemStream : streams) {
      itemStream.close();
   }
}
  • CustomItemStreamReader.java > close() 수행

  • CustomItemStreamWriter.java > close() 수행

 

 

배치 수행 결과

-- 수행 전, open()
CustomItemStreamReader.open
CustomItemStreamWriter.open

-- update()
CustomItemStreamReader.update
CustomItemStreamWriter.update

-- read()
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read

0
1
2
3
4

-- 1 cycle 종료 후 update()
CustomItemStreamReader.update
CustomItemStreamWriter.update

-- 2 cycle 수행중.. 
CustomItemStreamReader.read

-- 에러발생
java.lang.RuntimeException: Restart is required
	at com.project.springbatch._41_itemStream.custom.CustomItemStreamReader.read(CustomItemStreamReader.java:35) ~[main/:na]
	at com.project.springbatch._41_itemStream.custom.CustomItemStreamReader.read(CustomItemStreamReader.java:7) ~[main/:na]

-- close()
CustomItemStreamReader.close
CustomItemStreamWriter.close

 

 

1) CustomItemStreamReader의 open() 호출 -> CustomItemStreamWriter의 open() 호출

  • open() 메서드의 수행 순서
Class Method
AbstractStep.java open()
TaskletStep.java open()
CompositeItemStream.java open() : 우리가 만든 CustomItemStreamReader의 open()을 호출한다.

 

2) chunk 프로세스 수행 전 update()가 한번씩 수행된다.

3) read(), writer() 로직 수행한다.

4) chunk size 만큼의 1 cycle 종료시, update() 메서드를 호출한다.

  • CustomItemStreamReader의 update() 호출 -> CustomItemStreamWriter의 update() 호출

5) 모든 수행이 끝나면 close() 메서드를 호출한다.

  • CustomItemStreamReader의 close() 호출 -> CustomItemStreamWriter의 close() 호출

 

 

DB 테이블 조회

1) BATCH_JOB_EXECUTION

COLUMN VALUE
STATUS FAILED
EXIT_CODE FAILED
JOB_EXECUTION_ID 2

 

2) BATCH_JOB_INSTANCE

COLUMN VALUE
JOB_NAME itemStreamJob

 

3) BATCH_STEP_EXECUTION

  • itemStreamStep1
COLUMN VALUE
STEP_NAME itemStreamStep1
EXIT_CODE FAILED
STATUS FAILED
READ_COUNT 5
WRITE_COUNT 5
JOB_EXECUTION_ID 2
STEP_EXECUTION_ID 2

 

  • itemStreamStep2 (수행 X)

 

4) BATCH_STEP_EXECUTION_CONTEXT

COLUMN VALUE
STEP_EXECUTION_ID 2
SHORT_CONTEXT {"@class":"java.util.HashMap",
"batch.taskletType":"org.springframework.batch.core.step.item.ChunkOrientedTasklet",
"index":5,
"batch.stepType":"org.springframework.batch.core.step.tasklet.TaskletStep"}

여기서, 우리가 배치 수행 중에 CustomItemStreamReader.java 클래스의 update() 메서드를 사용해서 저장했던 값을 확인할 수 있다.

  • CustomItemStreamReader.java > update()
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    System.out.println("CustomItemStreamReader.update");

    // 잡이 재시작될때 여기서 저장된 마지막 index 를 가지고온다.
    executionContext.put("index", index);
}

마지막 index를 저장했으므로 위 테이블의 데이터에 index:5가 저장되어있다. 

따라서, 재시작할때 해당 index를 꺼내올 수 있다.

 

  • CustomItemStreamReader.java > open()
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
    System.out.println("CustomItemStreamReader.open");

    // ExecutionContext 의 map 에 데이터 저장이 가능하다.
    if (executionContext.containsKey("index")) { // index 키 데이터가 DB에 저장되어있다는 얘기
        index = executionContext.getInt("index");
        this.restart = true; // 재시작 가능으로 변경
    } else {
        index = 0;
        executionContext.put("index", index);
    }
}

 

 

Batch 재수행 

수행 순서는 위 Batch 첫 수행과 동일하다. 재시작시 다르게 셋팅되는 부분을 확인해보자.

 

▶ CustomItemStreamReader.java > open()

 

▶ CustomItemStreamReader.java > update()

 

▶ CustomItemStreamReader.java

첫번째 실행했던 Job에서 마지막으로 수행중에 실패됬던 index는 5였으므로, 5부터 시작한다.

 

 

 

재수행 배치 수행 결과

-- open()
CustomItemStreamReader.open
CustomItemStreamWriter.open

-- update()
CustomItemStreamReader.update
CustomItemStreamWriter.update

-- read()
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read

-- 5부터 시작 (재시작했으므로)
5
6
7
8
9

-- 1 cycle 종료 후 update()
CustomItemStreamReader.update
CustomItemStreamWriter.update

-- 2 cycle 수행중..
CustomItemStreamReader.read
CustomItemStreamReader.read

10

-- 모든 아이템 수행 종료 후 update() (실질적으로 2 cycle 종료)
CustomItemStreamReader.update
CustomItemStreamWriter.update

-- step1 종료시 close()
CustomItemStreamReader.close
CustomItemStreamWriter.close

-- step2 실행
chunkJobConfiguration step2 was executed

 

 

DB 테이블 조회

1) BATCH_JOB_EXECUTION

COLUMN VALUE
STATUS COMPLETED
EXIT_CODE COMPLETED
JOB_EXECUTION_ID 3

 

2) BATCH_JOB_INSTANCE

COLUMN VALUE
JOB_NAME itemStreamJob

 

3) BATCH_STEP_EXECUTION

  • itemStreamStep1
COLUMN VALUE
STEP_NAME itemStreamStep1
EXIT_CODE COMPLETED
STATUS COMPLETED
JOB_EXECUTION_ID 3

 

  • itemStreamStep2
COLUMN VALUE
STEP_NAME itemStreamStep2
EXIT_CODE COMPLETED
STATUS COMPLETED
JOB_EXECUTION_ID 3

 

 

반응형

Designed by JB FACTORY