[SpringBatch 실습] 35. 스프링배치JobExplorer, JobRegistry, JobOperator 알아보기 (With JobRegistryBeanPostProcessor)

반응형
728x90
반응형

예제 파일 생성

JobController.java
@RestController
@RequiredArgsConstructor
public class JobController {
    private final JobRegistry jobRegistry;
    private final JobOperator jobOperator;
    private final JobExplorer jobExplorer;
}

 

Customer.java
package com.project.springbatch._77_operation;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.Date;

@Data
@AllArgsConstructor
public class Customer {

    private final long id;
    private final String firstName;
    private final String lastName;
    private final Date birthdate;
}

 

CustomerRowMapper.java
package com.project.springbatch._77_operation;

import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;

public class CustomerRowMapper implements RowMapper<Customer> {
    @Override
    public Customer mapRow(ResultSet rs, int i) throws SQLException {
        return new Customer(rs.getLong("id"),
                rs.getString("firstName"),
                rs.getString("lastName"),
                rs.getDate("birthdate"));
    }
}

 

JobInfo.java
package com.project.springbatch._77_operation;

import lombok.Data;

@Data
public class JobInfo {
    private String id;
}

 

JobOperationConfiguration.java
package com.project.springbatch._77_operation;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@RequiredArgsConstructor
@Configuration
public class JobOperationConfiguration {

    public final JobBuilderFactory jobBuilderFactory;
    public final StepBuilderFactory stepBuilderFactory;
    public final JobRegistry jobRegistry; // 스프링 배치가 초기화될 때 빈으로 등록

    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .tasklet((contribution, chunkContext) ->
                        {
                            System.out.println("step1 was executed");
                            Thread.sleep(5000);
                            return RepeatStatus.FINISHED;
                        }
                )
                .build();
    }
    @Bean
    public Step step2() throws Exception {
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) ->
                        {
                            System.out.println("step2 was executed");
                            Thread.sleep(5000);
                            return RepeatStatus.FINISHED;
                        }
                )
                .build();
    }
    @Bean
    public Job job1() throws Exception {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .next(step2())
                .build();
    }

    /**
     * JobRegistryBeanPostProcessor
     */
    @Bean
    public BeanPostProcessor jobRegistryBeanPostProcessor() throws Exception {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry); // 빈 셋팅
        return postProcessor;
    }
}

 

 

JobOperationConfiguration.java 디버깅

Job 등록 과정

1) JobRegistryBeanPostProcessor.java > postProcessAfterInitialization()

 

2) MapJobRegistry.java > register()

 

3) 위 과정을 반복하여 거친 후, jobRegistry 모습

 

 

JobExplorer

  • JobRepository의 readonly 버전
  • 실행중인 Job의 실행 정보인 JobExecution 또는 Step의 실행 정보인 StepExecution을 조회할 수 있다.
@RestController
@RequiredArgsConstructor
public class JobController {
    private final JobRegistry jobRegistry;
    private final JobOperator jobOperator;
    private final JobExplorer jobExplorer;
    
    @PostMapping(value = "/batch/restart")
    public String restart() throws Exception {

        for (Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){

            SimpleJob job = (SimpleJob)jobRegistry.getJob(iterator.next());
            System.out.println("job name: " + job.getName());

            // /batch/stop 수행 후 batch/restart 수행하기
            // 마지막 최종적으로 실행한 Job 의 정보를 가져와서 restart 한다.
            JobInstance lastJobInstance = jobExplorer.getLastJobInstance(job.getName());
            JobExecution lastJobExecution = jobExplorer.getLastJobExecution(lastJobInstance);
            jobOperator.restart(lastJobExecution.getId());

        }

        return "batch is restarted";
    }
}

1) 실패한 Job 정보 가져오기

Job이 성공적으로 끝나면 재시작이 불가능하고, Job이 실패되어야 재시작이 가능하다.

BATCH_JOB_INSTANCE, BATCH_JOB_EXECUTION, BATCH_STEP_EXECUTION 테이블에 데이터가 존재해야하므로 Job을 실행시키고 고의로 에러를 발생시킨 후, 실행해야한다.

SimpleJob job = (SimpleJob) jobRegistry.getJob(iterator.next());

 

2) 마지막 최종적으로 실행한 Job 정보 가져오기

JobInstance lastJobInstance = jobExplorer.getLastJobInstance(job.getName());
JobExecution lastJobExecution = jobExplorer.getLastJobExecution(lastJobInstance);

 

3) restart 수행

jobOperator.restart(lastJobExecution.getId());

 

▶ SimpleJobOperator.java > restart()

@Override
public Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException {

   if (logger.isInfoEnabled()) {
      logger.info("Checking status of job execution with id=" + executionId);
   }
   JobExecution jobExecution = findExecutionById(executionId);

   String jobName = jobExecution.getJobInstance().getJobName();
   Job job = jobRegistry.getJob(jobName);
   JobParameters parameters = jobExecution.getJobParameters();

   if (logger.isInfoEnabled()) {
      logger.info(String.format("Attempting to resume job with name=%s and parameters=%s", jobName, parameters));
   }
   try {
      return jobLauncher.run(job, parameters).getId();
   }
   catch (JobExecutionAlreadyRunningException e) {
      throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running",
            jobName, parameters), e);
   }

}

 

 

JobRegistry

  • 생성된 Job을 자동으로 등록, 추적 및 관리하며 여러 곳에서 Job을 생성한 경우 ApplicationContext에서 Job을 수집하여 사용할 수 있다.
  • 기본 구현체로 map 기반의 MapJobRegistry 클래스를 제공한다.
    • JobName을 Key로 하고 job을 값으로 하여 매핑한다.
  • Job 등록
    • JobRegistryBeanPostProcessor - BeanPostProcessor 단계에서 bean 초기화시 자동으로 JobRegistry에 Job을 등록시킨다.

 

▶ MapJobRegistry.java > register()

@Override
public void register(JobFactory jobFactory) throws DuplicateJobException {
   Assert.notNull(jobFactory, "jobFactory is null");
   String name = jobFactory.getJobName();
   Assert.notNull(name, "Job configuration must have a name.");
   JobFactory previousValue = map.putIfAbsent(name, jobFactory);
   if (previousValue != null) {
      throw new DuplicateJobException("A job configuration with this name [" + name
            + "] was already registered");
   }
}

 

JobController.java
@RestController
@RequiredArgsConstructor
public class JobController {
    private final JobRegistry jobRegistry;
    private final JobOperator jobOperator;
    private final JobExplorer jobExplorer;
    
    @PostMapping(value = "/batch/start")
    public String start(@RequestBody JobInfo jobInfo) throws Exception {

        for (Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){

            SimpleJob job = (SimpleJob) jobRegistry.getJob(iterator.next()); // jobName 으로 job 조회
            System.out.println("job name: " + job.getName());

            jobOperator.start(job.getName(), "id=" + jobInfo.getId());
        }

        return "batch is started";
    }
}

1) JobRegistry에 등록되어있는 Job을 반복문 수행

jobRegistry.getJobNames().iterator()

 

2) SimpleJob 객체 변환

SimpleJob job = (SimpleJob) jobRegistry.getJob(iterator.next());

 

3) jobName, jobParameters로 Job 실행하기

jobOperator.start(job.getName(), "id=" + jobInfo.getId());

 

▶ SimpleJobOperator.java > start()

@Override
public Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException {
   if (logger.isInfoEnabled()) {
      logger.info("Checking status of job with name=" + jobName);
   }

   JobParameters jobParameters = jobParametersConverter.getJobParameters(PropertiesConverter
         .stringToProperties(parameters));

   if (jobRepository.isJobInstanceExists(jobName, jobParameters)) {
      throw new JobInstanceAlreadyExistsException(String.format(
            "Cannot start a job instance that already exists with name=%s and parameters=%s", jobName,
            parameters));
   }

   Job job = jobRegistry.getJob(jobName);
   if (logger.isInfoEnabled()) {
      logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters));
   }
   try {
      return jobLauncher.run(job, jobParameters).getId();
   }
   catch (JobExecutionAlreadyRunningException e) {
      throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running",
            jobName, parameters), e);
   }
   catch (JobRestartException e) {
      throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName,
            parameters), e);
   }
   catch (JobInstanceAlreadyCompleteException e) {
      throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already complete", jobName,
            parameters), e);
   }

}

 

 

JobOperator

  • JobExplorer, JobRepostiroy, JobRegistry, JobLauncher를 포함하고있다.
  • 배치의 중단, 재시작, Job 요약 등의 모니터링이 가능하다.
  • 기본 구현체로 SimpleJobOperator 클래스를 제공한다.
@RestController
@RequiredArgsConstructor
public class JobController {
    private final JobRegistry jobRegistry;
    private final JobOperator jobOperator;
    private final JobExplorer jobExplorer;
    
    @PostMapping(value = "/batch/stop")
    public String stop() throws Exception {

        for (Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){

            SimpleJob job = (SimpleJob)jobRegistry.getJob(iterator.next());
            System.out.println("job name: " + job.getName());

            // 현재 실행중인 Job 의 JobExecution 을 모두 가져온다. (Job 의 개수만큼 존재하겠다)
            Set<JobExecution> runningJobExecutions = jobExplorer.findRunningJobExecutions(job.getName());
            JobExecution jobExecution = runningJobExecutions.iterator().next();

            // 바로 중단되지는 않고, 현재 실행중인 Step까지는 실행하고 중단한다.
            jobOperator.stop(jobExecution.getId());
        }

        return "batch is stopped";
    }
}

1) 현재 실행중인 Job 정보 가져오기

Set<JobExecution> runningJobExecutions = jobExplorer.findRunningJobExecutions(job.getName());
JobExecution jobExecution = runningJobExecutions.iterator().next();

 

2) Job 중단시키기

현재 실행중인 Step까지는 실행완료 후, 중단한다.

jobOperator.stop(jobExecution.getId());

 

▶ SimpleJobOperator.java > stop()

@Override
@Transactional
public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {

   JobExecution jobExecution = findExecutionById(executionId);
   // Indicate the execution should be stopped by setting it's status to
   // 'STOPPING'. It is assumed that
   // the step implementation will check this status at chunk boundaries.
   BatchStatus status = jobExecution.getStatus();
   if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) {
      throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution);
   }
   jobExecution.setStatus(BatchStatus.STOPPING);
   jobRepository.update(jobExecution);

   try {
      Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
      if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object
         //get the current stepExecution
         for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
            if (stepExecution.getStatus().isRunning()) {
               try {
                  //have the step execution that's running -> need to 'stop' it
                  Step step = ((StepLocator)job).getStep(stepExecution.getStepName());
                  if (step instanceof TaskletStep) {
                     Tasklet tasklet = ((TaskletStep)step).getTasklet();
                     if (tasklet instanceof StoppableTasklet) {
                        StepSynchronizationManager.register(stepExecution);
                        ((StoppableTasklet)tasklet).stop();
                        StepSynchronizationManager.release();
                     }
                  }
               }
               catch (NoSuchStepException e) {
                  logger.warn("Step not found",e);
               }
            }
         }
      }
   }
   catch (NoSuchJobException e) {
      logger.warn("Cannot find Job object in the job registry. StoppableTasklet#stop() will not be called",e);
   }

   return true;
}

 

 

반응형

Designed by JB FACTORY