[SpringBatch 실습] 32. 스프링 배치 Partitioning(파티셔닝) Job 수행하여 분석

반응형
728x90
반응형

Partitioning

  • MasterStep이 SlaveStep을 실행시키는 구조이다.
  • SlaveStep은 각 스레드에 의해 독립적으로 실행이 된다.
  • SlaveStep은 독립적인 StepExecution 파라미터 환경을 구성한다.
  • SlaveStep은 ItemReader / ItemProcessor / ItemWriter 등을 가지고 동작하며 작업을 독립적으로 병렬 처리한다.
  • MasterStep은 PartitionStep이며, SlaveStep은 TaskletStep, FlowStep 등이 올 수 있다.

 

 

 

예제코드

PartitioningCustomer.java
package com.project.springbatch._45_partitioning;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.Date;

@Data
@AllArgsConstructor
public class PartitioningCustomer {
    // private String name;
    private final long id;
    private final String firstName;
    private final String lastName;
    private final Date birthdate;
}

 

PartitioningCustomerRowMapper.java
package com.project.springbatch._45_partitioning;

import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;

public class PartitioningCustomerRowMapper implements RowMapper<PartitioningCustomer> {
    @Override
    public PartitioningCustomer mapRow(ResultSet rs, int i) throws SQLException {
        return new PartitioningCustomer(rs.getLong("id"),
                rs.getString("firstName"),
                rs.getString("lastName"),
                rs.getDate("birthdate"));
    }
}

 

PartitioningStopWatchJobListener.java
package com.project.springbatch._45_partitioning;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class PartitioningStopWatchJobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
        System.out.println("==========================================");
        System.out.println("총 소요된 시간 : " + time);
        System.out.println("==========================================");
    }
}

 

ColumnRangePartitioner.java
package com.project.springbatch._45_partitioning;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/**
 * sample 사용
 */
public class ColumnRangePartitioner implements Partitioner {

    private JdbcOperations jdbcTemplate;

    private String table;

    private String column;

    public void setTable(String table) {
        this.table = table;
    }
    public void setColumn(String column) {
        this.column = column;
    }
    public void setDataSource(DataSource dataSource) {
        jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // min, max 가져오기 (id 기준)
        int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);
        int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class);

        // gridSize (4로 셋팅했음) 데이터개수 1000개일 경우
        // (1000 - 1) / 4 = targetSize는 max, min 뺀 전체 개수를 4로 나눈 값 (999 / 4 = 249 + 1 = 250)
        // 결국 전체 개수를 나눈 값이 250이니까 이렇게 수행
        int targetSize = (max - min) / gridSize + 1;

        Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
        int number = 0;
        int start = min; // min
        int end = start + targetSize - 1;

        while (start <= max) {
            ExecutionContext value = new ExecutionContext();
            result.put("partition" + number, value);

            if (end >= max) {
                end = max;
            }

            // 1 ~ 250
            // 251~ 500
            // 501 ~ 750
            // 751 ~ 1000
            value.putInt("minValue", start);
            value.putInt("maxValue", end);

            start += targetSize;
            end += targetSize;
            number++;
        }

        // key : "partition" + number
        // value : ExecutionContext (minValue, maxValue 를 가진 ExecutionContext 가 저장되겠다)
        // StepExecution에 저장될 ExecutionContext 를 gridSize 만큼 만든다.
        return result; // map 안에 key-value 로 담겨져있고, 여기서 ExecutionContext 가 담긴다.
    }}

1) gridSize가 4개일때 100개의 데이터를 아래와 같이 설정하여 result Map에 담는다.

key value
partition0 minValue : 1
maxValue : 25
partition1 minValue : 26
maxValue : 50
partition2 minValue : 51
maxValue : 75
partition3 minValue : 76
maxValue : 100

 

PartitioningConfiguration.java
package com.project.springbatch._45_partitioning;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/*
--job.name=partitioningJob
*/
@RequiredArgsConstructor
@Configuration
public class PartitioningConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Bean
    public Job partitioningJob() throws Exception {
        return jobBuilderFactory.get("partitioningJob")
                .incrementer(new RunIdIncrementer())
                // masterStep 안에서 slaveStep 을 정의하면 된다.
                .start(masterStep())
                .build();
    }

    @Bean
    public Step masterStep() throws Exception {
        return stepBuilderFactory.get("masterStep")
                // slaveStep 을 정의
                .partitioner(slaveStep().getName(), partitioner())
                .step(slaveStep())
                .gridSize(4)// 4개의 slave step 역할을 할 stepExecution 을 만들것
                .taskExecutor(new SimpleAsyncTaskExecutor()) // multi thread
                .build();
    }

    @Bean
    public Step slaveStep() {
        return stepBuilderFactory.get("slaveStep")
                .<PartitioningCustomer, PartitioningCustomer>chunk(1000)
                .reader(pagingItemReader(null, null))
                .writer(partitioningCustomerItemWriter())
                .build();
    }

    @Bean
    public ColumnRangePartitioner partitioner() {
        ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();

        columnRangePartitioner.setColumn("id"); // 구분자 컬럼명
        columnRangePartitioner.setDataSource(this.dataSource);
        columnRangePartitioner.setTable("customer"); // 테이블명

        return columnRangePartitioner;
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<PartitioningCustomer> pagingItemReader(
            // runtime 때의 Job Parameter
            // 각각의 스레드에 StepExecution 이 있을것이고, 이 안에 ExecutionContext 가 있을것
            @Value("#{stepExecutionContext['minValue']}")Long minValue,
            @Value("#{stepExecutionContext['maxValue']}")Long maxValue) {
        System.out.println("reading " + minValue + " to " + maxValue);
        JdbcPagingItemReader<PartitioningCustomer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(this.dataSource);
        reader.setFetchSize(1000);
        reader.setRowMapper(new PartitioningCustomerRowMapper());

        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from customer");
        // 각 스레드별로 조건 (스레드마다 다른 id의 데이터를 읽어올 수 있도록 구성한다)
        queryProvider.setWhereClause("where id >= " + minValue + " and id <= " + maxValue);

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        reader.setQueryProvider(queryProvider);

        return reader;
    }

    @Bean
    @StepScope // 각 스레드마다 런타임 시점에 각각의 itemWriter 를 생성해서 할당
    // 하나의 스레드가 입력해도 상관은 없는데, 만약 데이터가 작으면 main 스레드가 작업해도 상관은 없음
    // 우선 @StepScope 를 선언하여 ItemWriter 도 여러 스레드로 수행하자.
    public JdbcBatchItemWriter<PartitioningCustomer> partitioningCustomerItemWriter() {
        JdbcBatchItemWriter<PartitioningCustomer> itemWriter = new JdbcBatchItemWriter<>();

        itemWriter.setDataSource(this.dataSource);
        itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
        itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }


}

1) masterStep 생성 : MasterStep의 역할을 담당한다.

return stepBuilderFactory.get("masterStep")
        // slaveStep 을 정의
        .partitioner(slaveStep().getName(), partitioner())
        .step(slaveStep())
        .gridSize(4)// 4개의 slave step 역할을 할 stepExecution 을 만들것
        .taskExecutor(new SimpleAsyncTaskExecutor()) // multi thread
        .build();

1) PartitionStep 생성을 위한 PartitionStepBuilder가 생성되고 Partitioner를 설정한다.

2) Slave 역할을 하는 Step을 설정한다. (TaskletStep, FlowStep 등이 올 수 있다.)

3) 몇 개의 파티션으로 나눌지 gridSize를 설정한다.

4) 스레드 풀 실행자를 설정한다. 여기선 SimpleAsyncTaskExecutor()를 호출하여 설정했다.

 

2) partitionPagingItemReader()

해당 Reader를 수행하는 각 스레드에 StepExecution이 있을것이고, 이 안에 ExecutionContext가 있을것이다. 이 ExecutionContext에 minValue, maxValue를 꺼낸다.

@Bean
@StepScope
public JdbcPagingItemReader<PartitioningCustomer> partitioningPagingItemReader(
        // runtime 때의 Job Parameter
        // 각각의 스레드에 StepExecution 이 있을것이고, 이 안에 ExecutionContext 가 있을것
        @Value("#{stepExecutionContext['minValue']}")Long minValue,
        @Value("#{stepExecutionContext['maxValue']}")Long maxValue) {
    System.out.println("reading " + minValue + " to " + maxValue);

 

 

배치결과

reading 1 to 25
reading 76 to 100
reading 51 to 75
reading 26 to 50

 

 

디버깅

1) StepBuilder.java > partitioner()

 

2) PartitionStep.java > doExecute()

각각의 스레드를 만들고, 각 스레드에 StepExecution 객체를 담는다.

@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
   stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());

   // Wait for task completion and then aggregate the results
   Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
   stepExecution.upgradeStatus(BatchStatus.COMPLETED);
   stepExecutionAggregator.aggregate(stepExecution, executions);

   // If anything failed or had a problem we need to crap out
   if (stepExecution.getStatus().isUnsuccessful()) {
      throw new JobExecutionException("Partition handler returned an unsuccessful step");
   }
}

 

3) AbstractPartitionHandler.java

@Override
public Collection<StepExecution> handle(final StepExecutionSplitter stepSplitter,
      final StepExecution masterStepExecution) throws Exception {
   final Set<StepExecution> stepExecutions = stepSplitter.split(masterStepExecution, gridSize);

   return doHandle(masterStepExecution, stepExecutions);
}

 

4) SimpleStepExecutionSplitter.java > split()

 

5) SimpleStepExecutionSplitter.java > getContexts()

private Map<String, ExecutionContext> getContexts(StepExecution stepExecution, int gridSize) {

   ExecutionContext context = stepExecution.getExecutionContext();
   String key = SimpleStepExecutionSplitter.class.getSimpleName() + ".GRID_SIZE";

   // If this is a restart we must retain the same grid size, ignoring the
   // one passed in...
   int splitSize = (int) context.getLong(key, gridSize);
   context.putLong(key, splitSize);

   Map<String, ExecutionContext> result;
   if (context.isDirty()) {
      // The context changed so we didn't already know the partitions
      jobRepository.updateExecutionContext(stepExecution);
      result = partitioner.partition(splitSize);
   }
   else {
      if (partitioner instanceof PartitionNameProvider) {
         result = new HashMap<>();
         Collection<String> names = ((PartitionNameProvider) partitioner).getPartitionNames(splitSize);
         for (String name : names) {
            /*
             * We need to return the same keys as the original (failed)
             * execution, but the execution contexts will be discarded
             * so they can be empty.
             */
            result.put(name, new ExecutionContext());
         }
      }
      else {
         // If no names are provided, grab the partition again.
         result = partitioner.partition(splitSize);
      }
   }

   return result;
}

partition() 호출

우리가 생성한 ColumnRangePartitioner.java > partition() 수행

4개의 SlaveStep이 25개씩 데이터를 수행하게 된다. 여기서 각각이 독립적으로 ExecutionContext를 가지고있음을 확인할 수 있다.

getContexts() 에서 리턴하는 result는 아래와 같다.

 

6) 다시, 돌아오자. SimpleStepExecutionSplitter.java > split()

gridSize만큼 StepExecution을 생성하여 담는다.

for (Entry<String, ExecutionContext> context : contexts.entrySet()) {

      // Make the step execution name unique and repeatable
      String stepName = this.stepName + STEP_NAME_SEPARATOR + context.getKey();

      StepExecution currentStepExecution = jobExecution.createStepExecution(stepName);

      boolean startable = isStartable(currentStepExecution, context.getValue());

      if (startable) {
         set.add(currentStepExecution);
      }
   }

   jobRepository.addAll(set);

   Set<StepExecution> executions = new HashSet<>(set.size());
   executions.addAll(set);

   return executions;

}

 

7) TaskExecutorPartitionHandler.java > doHandle()

각각이 독립적으로 StepExecution을 지닌다.

for문으로 slaceStep 개수만큼 수행시킨다.

for (final StepExecution stepExecution : partitionStepExecutions) {
    final FutureTask<StepExecution> task = createTask(step, stepExecution);

    try {
        taskExecutor.execute(task);
        tasks.add(task);
    } catch (TaskRejectedException e) {
        // couldn't execute one of the tasks
        ExitStatus exitStatus = ExitStatus.FAILED
                .addExitDescription("TaskExecutor rejected the task for this step.");
        /*
         * Set the status in case the caller is tracking it through the
         * JobExecution.
         */
        stepExecution.setStatus(BatchStatus.FAILED);
        stepExecution.setExitStatus(exitStatus);
        result.add(stepExecution);
    }
}

 

8) TaskExecutorPartitionHandler.java > createTask()

protected FutureTask<StepExecution> createTask(final Step step,
                                               final StepExecution stepExecution) {
    return new FutureTask<>(new Callable<StepExecution>() {
        @Override
        public StepExecution call() throws Exception {
            step.execute(stepExecution);
            return stepExecution;
        }
    });
}

 

반응형

Designed by JB FACTORY