반응형
728x90
반응형
SynchronizedItemStreamReader
Thread-safe 하지 않은 ItemReader를 Thread-safe하게 처리하도록 하는 기능을 제공한다.
Thread-safe Reader
각 스레드가 대기하고 있다가 순차적으로 Item을 읽어온다.
read()
public class SynchronizedItemStreamReader<T> implements ItemStreamReader<T>, InitializingBean {
private ItemStreamReader<T> delegate;
public void setDelegate(ItemStreamReader<T> delegate) {
this.delegate = delegate;
}
/**
* This delegates to the read method of the <code>delegate</code>
*/
@Nullable
public synchronized T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return this.delegate.read();
}
...
}
Job 생성
SynchronizedConfiguration.java
package com.project.springbatch._46_syncrhnizedItemReader;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.batch.item.support.builder.SynchronizedItemStreamReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sql.DataSource;
/*
--job.name=synchronizedItemStreamReaderJob
*/
@RequiredArgsConstructor
@Configuration
@Slf4j
public class SynchronizedConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
public Job synchronizedItemStreamReaderJob() throws Exception {
return jobBuilderFactory.get("synchronizedItemStreamReaderJob")
.incrementer(new RunIdIncrementer())
.start(synchronizedItemStreamReaderStep1())
.build();
}
@Bean
public Step synchronizedItemStreamReaderStep1() {
return stepBuilderFactory.get("synchronizedItemStreamReaderStep1")
.<Customer, Customer>chunk(60)
.reader(synchronizedItemStreamReaderCustomItemReader())
.listener(new ItemReadListener<Customer>() {
@Override
public void beforeRead() {
}
@Override
public void afterRead(Customer item) {
System.out.println("item.getId() : " + item.getId());
}
@Override
public void onReadError(Exception ex) {
}
})
.writer(synchronizedItemStreamReaderCustomerItemWriter())
.taskExecutor(synchronizedItemStreamReaderTaskExecutor())
.build();
}
/**
* Item Reader thread-safe set
* @return
*/
@Bean
@StepScope
public SynchronizedItemStreamReader<Customer> synchronizedItemStreamReaderCustomItemReader() {
JdbcCursorItemReader<Customer> notSafetyReader = new JdbcCursorItemReaderBuilder<Customer>()
.fetchSize(60)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.sql("select id, firstName, lastName, birthdate from customer")
.name("SafetyReader")
.build();
return new SynchronizedItemStreamReaderBuilder<Customer>()
.delegate(notSafetyReader)
.build();
}
@Bean
@StepScope
public JdbcBatchItemWriter<Customer> synchronizedItemStreamReaderCustomerItemWriter() {
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(this.dataSource);
itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
itemWriter.afterPropertiesSet();
return itemWriter;
}
@Bean
public TaskExecutor synchronizedItemStreamReaderTaskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setThreadNamePrefix("safety-thread-");
return executor;
}
}
1) SynchronizedItemStreamReader 설정
SynchronizedItemStreamReader는 스레드 안전을 위한 동기화 처리를 해주고, 데이터 처리는 JdbcCursorItemReader에게 위임한다.
@Bean
@StepScope
public SynchronizedItemStreamReader<Customer> synchronizedItemStreamReaderCustomItemReader() {
JdbcCursorItemReader<Customer> notSafetyReader = new JdbcCursorItemReaderBuilder<Customer>()
.fetchSize(60)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.sql("select id, firstName, lastName, birthdate from customer")
.name("SafetyReader")
.build();
return new SynchronizedItemStreamReaderBuilder<Customer>()
.delegate(notSafetyReader)
.build();
}
- SynchronizedItemStreamReader.java > read()
배치 결과
item.getId() : 33
item.getId() : 34
item.getId() : 35
item.getId() : 36
item.getId() : 37
item.getId() : 38
item.getId() : 39
item.getId() : 40
item.getId() : 41
item.getId() : 42
item.getId() : 43
item.getId() : 44
item.getId() : 45
item.getId() : 46
item.getId() : 47
item.getId() : 48
item.getId() : 49
item.getId() : 50
item.getId() : 51
item.getId() : 52
item.getId() : 53
item.getId() : 54
item.getId() : 55
item.getId() : 56
item.getId() : 57
item.getId() : 58
item.getId() : 59
item.getId() : 60
item.getId() : 61
item.getId() : 62
JdbcCursorItemReader 관련 포스팅은 아래를 참고하자.
https://devfunny.tistory.com/827
반응형
'Coding > Spring Batch' 카테고리의 다른 글
[SpringBatch 실습] 35. 스프링배치JobExplorer, JobRegistry, JobOperator 알아보기 (With JobRegistryBeanPostProcessor) (0) | 2022.07.21 |
---|---|
[SpringBatch 실습] 34. 스프링 배치 Test 코드 작성 (0) | 2022.07.19 |
[SpringBatch 실습] 32. 스프링 배치 Partitioning(파티셔닝) Job 수행하여 분석 (0) | 2022.07.12 |
[SpringBatch 실습] 31. 스프링 배치 Parallel Steps(병렬) 수행하여 분석 (0) | 2022.07.11 |
[SpringBatch 실습] 30. 스프링 배치 MultiThread(멀티 스레드) Job 수행하여 분석 (0) | 2022.07.07 |