Job 생성
ItemStreamConfiguration.java
package com.project.springbatch._41_itemStream;
import com.project.springbatch._41_itemStream.custom.CustomItemStreamReader;
import com.project.springbatch._41_itemStream.custom.CustomItemStreamWriter;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/*
--job.name=itemStreamJob
*/
@Configuration
@RequiredArgsConstructor
public class ItemStreamConfiguration {
// job 생성
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job itemStreamJob() {
return this.jobBuilderFactory.get("itemStreamJob")
/* step start */
.start(itemStreamStep1())
.next(itemStreamStep2())
.build();
}
@Bean
public Step itemStreamStep1() {
return stepBuilderFactory.get("itemStreamStep1")
.<String, String>chunk(5)
.reader(customItemStreamReader())
.writer(customItemStreamWriter())
.build();
}
/**
* item stream writer
* @return
*/
private ItemWriter<? super String> customItemStreamWriter() {
return new CustomItemStreamWriter();
}
/**
* item stream reader
* @return
*/
private ItemReader<String> customItemStreamReader() {
List<String> items = new ArrayList<>();
for (int i = 0; i <= 10; i++) {
items.add(String.valueOf(i));
}
return new CustomItemStreamReader(items);
}
@Bean
public Step itemStreamStep2() {
return stepBuilderFactory.get("itemStreamStep2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("chunkJobConfiguration step2 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
}
ItemStreamReader 구현 클래스 생성
CustomItemStreamReader.java
package com.project.springbatch._41_itemStream.custom;
import org.springframework.batch.item.*;
import java.util.List;
public class CustomItemStreamReader implements ItemStreamReader<String> {
private final List<String> items;
private int index = -1;
private boolean restart = false; // 재시작 여부
public CustomItemStreamReader(List<String> items) {
this.items = items;
this.index = 0;
}
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
System.out.println("CustomItemStreamReader.read");
String item = null;
if (this.index < this.items.size()) {
item = this.items.get(index);
index++; // index 1씩 증가
}
if (this.index == 6 && !restart) { // 고의 실패 (재시작이 아닐 경우에만)
throw new RuntimeException("Restart is required");
}
return item;
}
/**
* 초기화작업
* @param executionContext
* @throws ItemStreamException
*/
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("CustomItemStreamReader.open");
// ExecutionContext 의 map 에 데이터 저장이 가능하다.
if (executionContext.containsKey("index")) { // index 키 데이터가 DB에 저장되어있다는 얘기
index = executionContext.getInt("index");
this.restart = true; // 재시작 가능으로 변경
} else {
index = 0;
executionContext.put("index", index);
}
}
/**
* chunk size 만큼의 1 cycle 완료시마다 실행된다.
* @param executionContext
* @throws ItemStreamException
*/
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("CustomItemStreamReader.update");
// 잡이 재시작될때 여기서 저장된 마지막 index 를 가지고온다.
executionContext.put("index", index);
}
/**
* 리소스 해제 또는 초기화작업 해제 등 마무리 작업
* @throws ItemStreamException
*/
@Override
public void close() throws ItemStreamException {
System.out.println("CustomItemStreamReader.close");
}
}
1) 변수 index 선언
private int index = -1;
아이템을 읽다가 실패했을 경우, 그때의 실패 시점의 데이터를 DB에 저장하는데, index 추적 가능하도록 저장한다.
2) Job 첫 실행시 고의로 에러를 발생시킨다.
if (this.index == 6 && !restart) { // 고의 실패 (재시작이 아닐 경우에만)
throw new RuntimeException("Restart is required");
}
- 조건1. index 가 6일 경우
- 조건2. restart가 false인 경우
▶ Job을 재시작하면 위 조건문을 타지 않는다. (restart가 true가 되기 때문이다.)
실패하기 직전까지의 index를 저장하므로, 6에서 실패했으므로 index는 5까지 저장되어있다. (open() 메서드)
다음 Job을 재실행할때는 index가 5부터 시작된다.
3) oepn() : 초기화 작업을 수행한다.
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
// ExecutionContext 의 map 에 데이터 저장이 가능하다.
if (executionContext.containsKey("index")) { // index 키 데이터가 DB에 저장되어있다는 얘기
index = executionContext.getInt("index");
this.restart = true; // 재시작 가능으로 변경
} else {
index = 0;
executionContext.put("index", index);
}
}
4) write() : chunk size 만큼의 1 cycle이 완료시마다 실행된다.
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// 잡이 재시작될때 여기서 저장된 마지막 index 를 가지고온다.
executionContext.put("index", index);
}
5) close() : 배치 종료시 호출된다. 리소스 해제 또는 초기화 작업 해제 등 마무리 작업을 수행한다.
@Override
public void close() throws ItemStreamException {
System.out.println("close");
}
ItemStreamWriter 구현 클래스 생성
CustomItemStreamWriter.java
package com.project.springbatch._41_itemStream.custom;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import java.util.List;
public class CustomItemStreamWriter implements ItemStreamWriter<String> {
@Override
public void write(List<? extends String> items) throws Exception {
items.forEach(System.out::println);
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("CustomItemStreamWriter.open");
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("CustomItemStreamWriter.update");
}
@Override
public void close() throws ItemStreamException {
System.out.println("CustomItemStreamWriter.close");
}
}
디버깅
1) ItemStreamConfiguration.java > customItemStreamReader()
2) ItemStreamConfiguration.java > customItemStreamWriter()
3) CompositeItemStream.java > open() 에서 우리가 생성한 클래스의 open()을 호출한다.
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
for (ItemStream itemStream : streams) {
itemStream.open(executionContext);
}
}
반복문을 통해 아래 4)~5)번을 수행한다.
4) CustomItemStreamReader.java > open()
5) CustomItemStreamWriter.java > open()
6) CompositeItemStream.java > update() 에서 우리가 생성한 클래스의 update()을 호출한다.
@Override
public void update(ExecutionContext executionContext) {
for (ItemStream itemStream : streams) {
itemStream.update(executionContext);
}
}
반복문을 통해 아래 7)~8)번을 수행한다.
7) CustomItemStreamReader.java > update()
8) CustomItemStreamWriter.java > update()
9) CustomItemStreamReader.java > read()
10) chunkSize 5만큼 1 Cycle이 완료되었으므로, write() 수행
11) 배치의 1 cycle이 끝났으므로 다시 ItemStreamReader의 update() 수행
- CustomItemStreamReader.java > update()
- CustomItemStreamWriter.java > update()
12) CustomItemStreamReader.java > read() 수행
- 현재 item : 5
이 경우, 고의 에러의 조건을 만족하므로 에러가 발생한다.
13) 에러가 발생하면서 배치 종료시, CompositeItemStream.java > close() 수행
@Override
public void close() throws ItemStreamException {
for (ItemStream itemStream : streams) {
itemStream.close();
}
}
- CustomItemStreamReader.java > close() 수행
- CustomItemStreamWriter.java > close() 수행
배치 수행 결과
-- 수행 전, open()
CustomItemStreamReader.open
CustomItemStreamWriter.open
-- update()
CustomItemStreamReader.update
CustomItemStreamWriter.update
-- read()
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read
0
1
2
3
4
-- 1 cycle 종료 후 update()
CustomItemStreamReader.update
CustomItemStreamWriter.update
-- 2 cycle 수행중..
CustomItemStreamReader.read
-- 에러발생
java.lang.RuntimeException: Restart is required
at com.project.springbatch._41_itemStream.custom.CustomItemStreamReader.read(CustomItemStreamReader.java:35) ~[main/:na]
at com.project.springbatch._41_itemStream.custom.CustomItemStreamReader.read(CustomItemStreamReader.java:7) ~[main/:na]
-- close()
CustomItemStreamReader.close
CustomItemStreamWriter.close
1) CustomItemStreamReader의 open() 호출 -> CustomItemStreamWriter의 open() 호출
- open() 메서드의 수행 순서
Class | Method |
AbstractStep.java | open() |
TaskletStep.java | open() |
CompositeItemStream.java | open() : 우리가 만든 CustomItemStreamReader의 open()을 호출한다. |
2) chunk 프로세스 수행 전 update()가 한번씩 수행된다.
3) read(), writer() 로직 수행한다.
4) chunk size 만큼의 1 cycle 종료시, update() 메서드를 호출한다.
- CustomItemStreamReader의 update() 호출 -> CustomItemStreamWriter의 update() 호출
5) 모든 수행이 끝나면 close() 메서드를 호출한다.
- CustomItemStreamReader의 close() 호출 -> CustomItemStreamWriter의 close() 호출
DB 테이블 조회
1) BATCH_JOB_EXECUTION
COLUMN | VALUE |
STATUS | FAILED |
EXIT_CODE | FAILED |
JOB_EXECUTION_ID | 2 |
2) BATCH_JOB_INSTANCE
COLUMN | VALUE |
JOB_NAME | itemStreamJob |
3) BATCH_STEP_EXECUTION
- itemStreamStep1
COLUMN | VALUE |
STEP_NAME | itemStreamStep1 |
EXIT_CODE | FAILED |
STATUS | FAILED |
READ_COUNT | 5 |
WRITE_COUNT | 5 |
JOB_EXECUTION_ID | 2 |
STEP_EXECUTION_ID | 2 |
- itemStreamStep2 (수행 X)
4) BATCH_STEP_EXECUTION_CONTEXT
COLUMN | VALUE |
STEP_EXECUTION_ID | 2 |
SHORT_CONTEXT | {"@class":"java.util.HashMap", "batch.taskletType":"org.springframework.batch.core.step.item.ChunkOrientedTasklet", "index":5, "batch.stepType":"org.springframework.batch.core.step.tasklet.TaskletStep"} |
여기서, 우리가 배치 수행 중에 CustomItemStreamReader.java 클래스의 update() 메서드를 사용해서 저장했던 값을 확인할 수 있다.
- CustomItemStreamReader.java > update()
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("CustomItemStreamReader.update");
// 잡이 재시작될때 여기서 저장된 마지막 index 를 가지고온다.
executionContext.put("index", index);
}
마지막 index를 저장했으므로 위 테이블의 데이터에 index:5가 저장되어있다.
따라서, 재시작할때 해당 index를 꺼내올 수 있다.
- CustomItemStreamReader.java > open()
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("CustomItemStreamReader.open");
// ExecutionContext 의 map 에 데이터 저장이 가능하다.
if (executionContext.containsKey("index")) { // index 키 데이터가 DB에 저장되어있다는 얘기
index = executionContext.getInt("index");
this.restart = true; // 재시작 가능으로 변경
} else {
index = 0;
executionContext.put("index", index);
}
}
Batch 재수행
수행 순서는 위 Batch 첫 수행과 동일하다. 재시작시 다르게 셋팅되는 부분을 확인해보자.
▶ CustomItemStreamReader.java > open()
▶ CustomItemStreamReader.java > update()
▶ CustomItemStreamReader.java
첫번째 실행했던 Job에서 마지막으로 수행중에 실패됬던 index는 5였으므로, 5부터 시작한다.
재수행 배치 수행 결과
-- open()
CustomItemStreamReader.open
CustomItemStreamWriter.open
-- update()
CustomItemStreamReader.update
CustomItemStreamWriter.update
-- read()
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read
CustomItemStreamReader.read
-- 5부터 시작 (재시작했으므로)
5
6
7
8
9
-- 1 cycle 종료 후 update()
CustomItemStreamReader.update
CustomItemStreamWriter.update
-- 2 cycle 수행중..
CustomItemStreamReader.read
CustomItemStreamReader.read
10
-- 모든 아이템 수행 종료 후 update() (실질적으로 2 cycle 종료)
CustomItemStreamReader.update
CustomItemStreamWriter.update
-- step1 종료시 close()
CustomItemStreamReader.close
CustomItemStreamWriter.close
-- step2 실행
chunkJobConfiguration step2 was executed
DB 테이블 조회
1) BATCH_JOB_EXECUTION
COLUMN | VALUE |
STATUS | COMPLETED |
EXIT_CODE | COMPLETED |
JOB_EXECUTION_ID | 3 |
2) BATCH_JOB_INSTANCE
COLUMN | VALUE |
JOB_NAME | itemStreamJob |
3) BATCH_STEP_EXECUTION
- itemStreamStep1
COLUMN | VALUE |
STEP_NAME | itemStreamStep1 |
EXIT_CODE | COMPLETED |
STATUS | COMPLETED |
JOB_EXECUTION_ID | 3 |
- itemStreamStep2
COLUMN | VALUE |
STEP_NAME | itemStreamStep2 |
EXIT_CODE | COMPLETED |
STATUS | COMPLETED |
JOB_EXECUTION_ID | 3 |