Partitioning
- MasterStep이 SlaveStep을 실행시키는 구조이다.
- SlaveStep은 각 스레드에 의해 독립적으로 실행이 된다.
- SlaveStep은 독립적인 StepExecution 파라미터 환경을 구성한다.
- SlaveStep은 ItemReader / ItemProcessor / ItemWriter 등을 가지고 동작하며 작업을 독립적으로 병렬 처리한다.
- MasterStep은 PartitionStep이며, SlaveStep은 TaskletStep, FlowStep 등이 올 수 있다.
예제코드
PartitioningCustomer.java
package com.project.springbatch._45_partitioning;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.Date;
@Data
@AllArgsConstructor
public class PartitioningCustomer {
// private String name;
private final long id;
private final String firstName;
private final String lastName;
private final Date birthdate;
}
PartitioningCustomerRowMapper.java
package com.project.springbatch._45_partitioning;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
public class PartitioningCustomerRowMapper implements RowMapper<PartitioningCustomer> {
@Override
public PartitioningCustomer mapRow(ResultSet rs, int i) throws SQLException {
return new PartitioningCustomer(rs.getLong("id"),
rs.getString("firstName"),
rs.getString("lastName"),
rs.getDate("birthdate"));
}
}
PartitioningStopWatchJobListener.java
package com.project.springbatch._45_partitioning;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class PartitioningStopWatchJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
System.out.println("==========================================");
System.out.println("총 소요된 시간 : " + time);
System.out.println("==========================================");
}
}
ColumnRangePartitioner.java
package com.project.springbatch._45_partitioning;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
/**
* sample 사용
*/
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
private String table;
private String column;
public void setTable(String table) {
this.table = table;
}
public void setColumn(String column) {
this.column = column;
}
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// min, max 가져오기 (id 기준)
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class);
// gridSize (4로 셋팅했음) 데이터개수 1000개일 경우
// (1000 - 1) / 4 = targetSize는 max, min 뺀 전체 개수를 4로 나눈 값 (999 / 4 = 249 + 1 = 250)
// 결국 전체 개수를 나눈 값이 250이니까 이렇게 수행
int targetSize = (max - min) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min; // min
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
// 1 ~ 250
// 251~ 500
// 501 ~ 750
// 751 ~ 1000
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
// key : "partition" + number
// value : ExecutionContext (minValue, maxValue 를 가진 ExecutionContext 가 저장되겠다)
// StepExecution에 저장될 ExecutionContext 를 gridSize 만큼 만든다.
return result; // map 안에 key-value 로 담겨져있고, 여기서 ExecutionContext 가 담긴다.
}}
1) gridSize가 4개일때 100개의 데이터를 아래와 같이 설정하여 result Map에 담는다.
key | value |
partition0 | minValue : 1 maxValue : 25 |
partition1 | minValue : 26 maxValue : 50 |
partition2 | minValue : 51 maxValue : 75 |
partition3 | minValue : 76 maxValue : 100 |
PartitioningConfiguration.java
package com.project.springbatch._45_partitioning;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
/*
--job.name=partitioningJob
*/
@RequiredArgsConstructor
@Configuration
public class PartitioningConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
public Job partitioningJob() throws Exception {
return jobBuilderFactory.get("partitioningJob")
.incrementer(new RunIdIncrementer())
// masterStep 안에서 slaveStep 을 정의하면 된다.
.start(masterStep())
.build();
}
@Bean
public Step masterStep() throws Exception {
return stepBuilderFactory.get("masterStep")
// slaveStep 을 정의
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(4)// 4개의 slave step 역할을 할 stepExecution 을 만들것
.taskExecutor(new SimpleAsyncTaskExecutor()) // multi thread
.build();
}
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<PartitioningCustomer, PartitioningCustomer>chunk(1000)
.reader(pagingItemReader(null, null))
.writer(partitioningCustomerItemWriter())
.build();
}
@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
columnRangePartitioner.setColumn("id"); // 구분자 컬럼명
columnRangePartitioner.setDataSource(this.dataSource);
columnRangePartitioner.setTable("customer"); // 테이블명
return columnRangePartitioner;
}
@Bean
@StepScope
public JdbcPagingItemReader<PartitioningCustomer> pagingItemReader(
// runtime 때의 Job Parameter
// 각각의 스레드에 StepExecution 이 있을것이고, 이 안에 ExecutionContext 가 있을것
@Value("#{stepExecutionContext['minValue']}")Long minValue,
@Value("#{stepExecutionContext['maxValue']}")Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
JdbcPagingItemReader<PartitioningCustomer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new PartitioningCustomerRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
// 각 스레드별로 조건 (스레드마다 다른 id의 데이터를 읽어올 수 있도록 구성한다)
queryProvider.setWhereClause("where id >= " + minValue + " and id <= " + maxValue);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
@StepScope // 각 스레드마다 런타임 시점에 각각의 itemWriter 를 생성해서 할당
// 하나의 스레드가 입력해도 상관은 없는데, 만약 데이터가 작으면 main 스레드가 작업해도 상관은 없음
// 우선 @StepScope 를 선언하여 ItemWriter 도 여러 스레드로 수행하자.
public JdbcBatchItemWriter<PartitioningCustomer> partitioningCustomerItemWriter() {
JdbcBatchItemWriter<PartitioningCustomer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(this.dataSource);
itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
itemWriter.afterPropertiesSet();
return itemWriter;
}
}
1) masterStep 생성 : MasterStep의 역할을 담당한다.
return stepBuilderFactory.get("masterStep")
// slaveStep 을 정의
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(4)// 4개의 slave step 역할을 할 stepExecution 을 만들것
.taskExecutor(new SimpleAsyncTaskExecutor()) // multi thread
.build();
1) PartitionStep 생성을 위한 PartitionStepBuilder가 생성되고 Partitioner를 설정한다.
2) Slave 역할을 하는 Step을 설정한다. (TaskletStep, FlowStep 등이 올 수 있다.)
3) 몇 개의 파티션으로 나눌지 gridSize를 설정한다.
4) 스레드 풀 실행자를 설정한다. 여기선 SimpleAsyncTaskExecutor()를 호출하여 설정했다.
2) partitionPagingItemReader()
해당 Reader를 수행하는 각 스레드에 StepExecution이 있을것이고, 이 안에 ExecutionContext가 있을것이다. 이 ExecutionContext에 minValue, maxValue를 꺼낸다.
@Bean
@StepScope
public JdbcPagingItemReader<PartitioningCustomer> partitioningPagingItemReader(
// runtime 때의 Job Parameter
// 각각의 스레드에 StepExecution 이 있을것이고, 이 안에 ExecutionContext 가 있을것
@Value("#{stepExecutionContext['minValue']}")Long minValue,
@Value("#{stepExecutionContext['maxValue']}")Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
배치결과
reading 1 to 25
reading 76 to 100
reading 51 to 75
reading 26 to 50
디버깅
1) StepBuilder.java > partitioner()
2) PartitionStep.java > doExecute()
각각의 스레드를 만들고, 각 스레드에 StepExecution 객체를 담는다.
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
// Wait for task completion and then aggregate the results
Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
stepExecution.upgradeStatus(BatchStatus.COMPLETED);
stepExecutionAggregator.aggregate(stepExecution, executions);
// If anything failed or had a problem we need to crap out
if (stepExecution.getStatus().isUnsuccessful()) {
throw new JobExecutionException("Partition handler returned an unsuccessful step");
}
}
3) AbstractPartitionHandler.java
@Override
public Collection<StepExecution> handle(final StepExecutionSplitter stepSplitter,
final StepExecution masterStepExecution) throws Exception {
final Set<StepExecution> stepExecutions = stepSplitter.split(masterStepExecution, gridSize);
return doHandle(masterStepExecution, stepExecutions);
}
4) SimpleStepExecutionSplitter.java > split()
5) SimpleStepExecutionSplitter.java > getContexts()
private Map<String, ExecutionContext> getContexts(StepExecution stepExecution, int gridSize) {
ExecutionContext context = stepExecution.getExecutionContext();
String key = SimpleStepExecutionSplitter.class.getSimpleName() + ".GRID_SIZE";
// If this is a restart we must retain the same grid size, ignoring the
// one passed in...
int splitSize = (int) context.getLong(key, gridSize);
context.putLong(key, splitSize);
Map<String, ExecutionContext> result;
if (context.isDirty()) {
// The context changed so we didn't already know the partitions
jobRepository.updateExecutionContext(stepExecution);
result = partitioner.partition(splitSize);
}
else {
if (partitioner instanceof PartitionNameProvider) {
result = new HashMap<>();
Collection<String> names = ((PartitionNameProvider) partitioner).getPartitionNames(splitSize);
for (String name : names) {
/*
* We need to return the same keys as the original (failed)
* execution, but the execution contexts will be discarded
* so they can be empty.
*/
result.put(name, new ExecutionContext());
}
}
else {
// If no names are provided, grab the partition again.
result = partitioner.partition(splitSize);
}
}
return result;
}
partition() 호출
▶ 우리가 생성한 ColumnRangePartitioner.java > partition() 수행
4개의 SlaveStep이 25개씩 데이터를 수행하게 된다. 여기서 각각이 독립적으로 ExecutionContext를 가지고있음을 확인할 수 있다.
▶ getContexts() 에서 리턴하는 result는 아래와 같다.
6) 다시, 돌아오자. SimpleStepExecutionSplitter.java > split()
gridSize만큼 StepExecution을 생성하여 담는다.
for (Entry<String, ExecutionContext> context : contexts.entrySet()) {
// Make the step execution name unique and repeatable
String stepName = this.stepName + STEP_NAME_SEPARATOR + context.getKey();
StepExecution currentStepExecution = jobExecution.createStepExecution(stepName);
boolean startable = isStartable(currentStepExecution, context.getValue());
if (startable) {
set.add(currentStepExecution);
}
}
jobRepository.addAll(set);
Set<StepExecution> executions = new HashSet<>(set.size());
executions.addAll(set);
return executions;
}
7) TaskExecutorPartitionHandler.java > doHandle()
각각이 독립적으로 StepExecution을 지닌다.
for문으로 slaceStep 개수만큼 수행시킨다.
for (final StepExecution stepExecution : partitionStepExecutions) {
final FutureTask<StepExecution> task = createTask(step, stepExecution);
try {
taskExecutor.execute(task);
tasks.add(task);
} catch (TaskRejectedException e) {
// couldn't execute one of the tasks
ExitStatus exitStatus = ExitStatus.FAILED
.addExitDescription("TaskExecutor rejected the task for this step.");
/*
* Set the status in case the caller is tracking it through the
* JobExecution.
*/
stepExecution.setStatus(BatchStatus.FAILED);
stepExecution.setExitStatus(exitStatus);
result.add(stepExecution);
}
}
8) TaskExecutorPartitionHandler.java > createTask()
protected FutureTask<StepExecution> createTask(final Step step,
final StepExecution stepExecution) {
return new FutureTask<>(new Callable<StepExecution>() {
@Override
public StepExecution call() throws Exception {
step.execute(stepExecution);
return stepExecution;
}
});
}
'Coding > Spring Batch' 카테고리의 다른 글
[SpringBatch 실습] 34. 스프링 배치 Test 코드 작성 (0) | 2022.07.19 |
---|---|
[SpringBatch 실습] 33. 스프링 배치 SynchronizedItemStreamReader로 thread-safe 설정하기 (0) | 2022.07.13 |
[SpringBatch 실습] 31. 스프링 배치 Parallel Steps(병렬) 수행하여 분석 (0) | 2022.07.11 |
[SpringBatch 실습] 30. 스프링 배치 MultiThread(멀티 스레드) Job 수행하여 분석 (0) | 2022.07.07 |
[SpringBatch 실습] 29. 스프링 배치의 단일 스레드 vs 멀티 스레드 Job 수행하여 분석 (0) | 2022.07.06 |