반응형
728x90
반응형
예제 파일 생성
JobController.java
@RestController
@RequiredArgsConstructor
public class JobController {
private final JobRegistry jobRegistry;
private final JobOperator jobOperator;
private final JobExplorer jobExplorer;
}
Customer.java
package com.project.springbatch._77_operation;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.Date;
@Data
@AllArgsConstructor
public class Customer {
private final long id;
private final String firstName;
private final String lastName;
private final Date birthdate;
}
CustomerRowMapper.java
package com.project.springbatch._77_operation;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int i) throws SQLException {
return new Customer(rs.getLong("id"),
rs.getString("firstName"),
rs.getString("lastName"),
rs.getDate("birthdate"));
}
}
JobInfo.java
package com.project.springbatch._77_operation;
import lombok.Data;
@Data
public class JobInfo {
private String id;
}
JobOperationConfiguration.java
package com.project.springbatch._77_operation;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@RequiredArgsConstructor
@Configuration
public class JobOperationConfiguration {
public final JobBuilderFactory jobBuilderFactory;
public final StepBuilderFactory stepBuilderFactory;
public final JobRegistry jobRegistry; // 스프링 배치가 초기화될 때 빈으로 등록
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) ->
{
System.out.println("step1 was executed");
Thread.sleep(5000);
return RepeatStatus.FINISHED;
}
)
.build();
}
@Bean
public Step step2() throws Exception {
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) ->
{
System.out.println("step2 was executed");
Thread.sleep(5000);
return RepeatStatus.FINISHED;
}
)
.build();
}
@Bean
public Job job1() throws Exception {
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.next(step2())
.build();
}
/**
* JobRegistryBeanPostProcessor
*/
@Bean
public BeanPostProcessor jobRegistryBeanPostProcessor() throws Exception {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry); // 빈 셋팅
return postProcessor;
}
}
JobOperationConfiguration.java 디버깅
Job 등록 과정
1) JobRegistryBeanPostProcessor.java > postProcessAfterInitialization()
2) MapJobRegistry.java > register()
3) 위 과정을 반복하여 거친 후, jobRegistry 모습
JobExplorer
- JobRepository의 readonly 버전
- 실행중인 Job의 실행 정보인 JobExecution 또는 Step의 실행 정보인 StepExecution을 조회할 수 있다.
@RestController
@RequiredArgsConstructor
public class JobController {
private final JobRegistry jobRegistry;
private final JobOperator jobOperator;
private final JobExplorer jobExplorer;
@PostMapping(value = "/batch/restart")
public String restart() throws Exception {
for (Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){
SimpleJob job = (SimpleJob)jobRegistry.getJob(iterator.next());
System.out.println("job name: " + job.getName());
// /batch/stop 수행 후 batch/restart 수행하기
// 마지막 최종적으로 실행한 Job 의 정보를 가져와서 restart 한다.
JobInstance lastJobInstance = jobExplorer.getLastJobInstance(job.getName());
JobExecution lastJobExecution = jobExplorer.getLastJobExecution(lastJobInstance);
jobOperator.restart(lastJobExecution.getId());
}
return "batch is restarted";
}
}
1) 실패한 Job 정보 가져오기
Job이 성공적으로 끝나면 재시작이 불가능하고, Job이 실패되어야 재시작이 가능하다.
BATCH_JOB_INSTANCE, BATCH_JOB_EXECUTION, BATCH_STEP_EXECUTION 테이블에 데이터가 존재해야하므로 Job을 실행시키고 고의로 에러를 발생시킨 후, 실행해야한다.
SimpleJob job = (SimpleJob) jobRegistry.getJob(iterator.next());
2) 마지막 최종적으로 실행한 Job 정보 가져오기
JobInstance lastJobInstance = jobExplorer.getLastJobInstance(job.getName());
JobExecution lastJobExecution = jobExplorer.getLastJobExecution(lastJobInstance);
3) restart 수행
jobOperator.restart(lastJobExecution.getId());
▶ SimpleJobOperator.java > restart()
@Override
public Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException {
if (logger.isInfoEnabled()) {
logger.info("Checking status of job execution with id=" + executionId);
}
JobExecution jobExecution = findExecutionById(executionId);
String jobName = jobExecution.getJobInstance().getJobName();
Job job = jobRegistry.getJob(jobName);
JobParameters parameters = jobExecution.getJobParameters();
if (logger.isInfoEnabled()) {
logger.info(String.format("Attempting to resume job with name=%s and parameters=%s", jobName, parameters));
}
try {
return jobLauncher.run(job, parameters).getId();
}
catch (JobExecutionAlreadyRunningException e) {
throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running",
jobName, parameters), e);
}
}
JobRegistry
- 생성된 Job을 자동으로 등록, 추적 및 관리하며 여러 곳에서 Job을 생성한 경우 ApplicationContext에서 Job을 수집하여 사용할 수 있다.
- 기본 구현체로 map 기반의 MapJobRegistry 클래스를 제공한다.
- JobName을 Key로 하고 job을 값으로 하여 매핑한다.
- Job 등록
- JobRegistryBeanPostProcessor - BeanPostProcessor 단계에서 bean 초기화시 자동으로 JobRegistry에 Job을 등록시킨다.
▶ MapJobRegistry.java > register()
@Override
public void register(JobFactory jobFactory) throws DuplicateJobException {
Assert.notNull(jobFactory, "jobFactory is null");
String name = jobFactory.getJobName();
Assert.notNull(name, "Job configuration must have a name.");
JobFactory previousValue = map.putIfAbsent(name, jobFactory);
if (previousValue != null) {
throw new DuplicateJobException("A job configuration with this name [" + name
+ "] was already registered");
}
}
JobController.java
@RestController
@RequiredArgsConstructor
public class JobController {
private final JobRegistry jobRegistry;
private final JobOperator jobOperator;
private final JobExplorer jobExplorer;
@PostMapping(value = "/batch/start")
public String start(@RequestBody JobInfo jobInfo) throws Exception {
for (Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){
SimpleJob job = (SimpleJob) jobRegistry.getJob(iterator.next()); // jobName 으로 job 조회
System.out.println("job name: " + job.getName());
jobOperator.start(job.getName(), "id=" + jobInfo.getId());
}
return "batch is started";
}
}
1) JobRegistry에 등록되어있는 Job을 반복문 수행
jobRegistry.getJobNames().iterator()
2) SimpleJob 객체 변환
SimpleJob job = (SimpleJob) jobRegistry.getJob(iterator.next());
3) jobName, jobParameters로 Job 실행하기
jobOperator.start(job.getName(), "id=" + jobInfo.getId());
▶ SimpleJobOperator.java > start()
@Override
public Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException {
if (logger.isInfoEnabled()) {
logger.info("Checking status of job with name=" + jobName);
}
JobParameters jobParameters = jobParametersConverter.getJobParameters(PropertiesConverter
.stringToProperties(parameters));
if (jobRepository.isJobInstanceExists(jobName, jobParameters)) {
throw new JobInstanceAlreadyExistsException(String.format(
"Cannot start a job instance that already exists with name=%s and parameters=%s", jobName,
parameters));
}
Job job = jobRegistry.getJob(jobName);
if (logger.isInfoEnabled()) {
logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters));
}
try {
return jobLauncher.run(job, jobParameters).getId();
}
catch (JobExecutionAlreadyRunningException e) {
throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running",
jobName, parameters), e);
}
catch (JobRestartException e) {
throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName,
parameters), e);
}
catch (JobInstanceAlreadyCompleteException e) {
throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already complete", jobName,
parameters), e);
}
}
JobOperator
- JobExplorer, JobRepostiroy, JobRegistry, JobLauncher를 포함하고있다.
- 배치의 중단, 재시작, Job 요약 등의 모니터링이 가능하다.
- 기본 구현체로 SimpleJobOperator 클래스를 제공한다.
@RestController
@RequiredArgsConstructor
public class JobController {
private final JobRegistry jobRegistry;
private final JobOperator jobOperator;
private final JobExplorer jobExplorer;
@PostMapping(value = "/batch/stop")
public String stop() throws Exception {
for (Iterator<String> iterator = jobRegistry.getJobNames().iterator(); iterator.hasNext();){
SimpleJob job = (SimpleJob)jobRegistry.getJob(iterator.next());
System.out.println("job name: " + job.getName());
// 현재 실행중인 Job 의 JobExecution 을 모두 가져온다. (Job 의 개수만큼 존재하겠다)
Set<JobExecution> runningJobExecutions = jobExplorer.findRunningJobExecutions(job.getName());
JobExecution jobExecution = runningJobExecutions.iterator().next();
// 바로 중단되지는 않고, 현재 실행중인 Step까지는 실행하고 중단한다.
jobOperator.stop(jobExecution.getId());
}
return "batch is stopped";
}
}
1) 현재 실행중인 Job 정보 가져오기
Set<JobExecution> runningJobExecutions = jobExplorer.findRunningJobExecutions(job.getName());
JobExecution jobExecution = runningJobExecutions.iterator().next();
2) Job 중단시키기
현재 실행중인 Step까지는 실행완료 후, 중단한다.
jobOperator.stop(jobExecution.getId());
▶ SimpleJobOperator.java > stop()
@Override
@Transactional
public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
JobExecution jobExecution = findExecutionById(executionId);
// Indicate the execution should be stopped by setting it's status to
// 'STOPPING'. It is assumed that
// the step implementation will check this status at chunk boundaries.
BatchStatus status = jobExecution.getStatus();
if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) {
throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution);
}
jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);
try {
Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object
//get the current stepExecution
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (stepExecution.getStatus().isRunning()) {
try {
//have the step execution that's running -> need to 'stop' it
Step step = ((StepLocator)job).getStep(stepExecution.getStepName());
if (step instanceof TaskletStep) {
Tasklet tasklet = ((TaskletStep)step).getTasklet();
if (tasklet instanceof StoppableTasklet) {
StepSynchronizationManager.register(stepExecution);
((StoppableTasklet)tasklet).stop();
StepSynchronizationManager.release();
}
}
}
catch (NoSuchStepException e) {
logger.warn("Step not found",e);
}
}
}
}
}
catch (NoSuchJobException e) {
logger.warn("Cannot find Job object in the job registry. StoppableTasklet#stop() will not be called",e);
}
return true;
}
반응형