728x90
반응형
SynchronizedItemStreamReader
Thread-safe 하지 않은 ItemReader를 Thread-safe하게 처리하도록 하는 기능을 제공한다.
Thread-safe Reader
각 스레드가 대기하고 있다가 순차적으로 Item을 읽어온다.
read()
public class SynchronizedItemStreamReader<T> implements ItemStreamReader<T>, InitializingBean {
private ItemStreamReader<T> delegate;
public void setDelegate(ItemStreamReader<T> delegate) {
this.delegate = delegate;
}
/**
* This delegates to the read method of the <code>delegate</code>
*/
@Nullable
public synchronized T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return this.delegate.read();
}
...
}
Job 생성
SynchronizedConfiguration.java
package com.project.springbatch._46_syncrhnizedItemReader;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.batch.item.support.builder.SynchronizedItemStreamReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sql.DataSource;
/*
--job.name=synchronizedItemStreamReaderJob
*/
@RequiredArgsConstructor
@Configuration
@Slf4j
public class SynchronizedConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
public Job synchronizedItemStreamReaderJob() throws Exception {
return jobBuilderFactory.get("synchronizedItemStreamReaderJob")
.incrementer(new RunIdIncrementer())
.start(synchronizedItemStreamReaderStep1())
.build();
}
@Bean
public Step synchronizedItemStreamReaderStep1() {
return stepBuilderFactory.get("synchronizedItemStreamReaderStep1")
.<Customer, Customer>chunk(60)
.reader(synchronizedItemStreamReaderCustomItemReader())
.listener(new ItemReadListener<Customer>() {
@Override
public void beforeRead() {
}
@Override
public void afterRead(Customer item) {
System.out.println("item.getId() : " + item.getId());
}
@Override
public void onReadError(Exception ex) {
}
})
.writer(synchronizedItemStreamReaderCustomerItemWriter())
.taskExecutor(synchronizedItemStreamReaderTaskExecutor())
.build();
}
/**
* Item Reader thread-safe set
* @return
*/
@Bean
@StepScope
public SynchronizedItemStreamReader<Customer> synchronizedItemStreamReaderCustomItemReader() {
JdbcCursorItemReader<Customer> notSafetyReader = new JdbcCursorItemReaderBuilder<Customer>()
.fetchSize(60)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.sql("select id, firstName, lastName, birthdate from customer")
.name("SafetyReader")
.build();
return new SynchronizedItemStreamReaderBuilder<Customer>()
.delegate(notSafetyReader)
.build();
}
@Bean
@StepScope
public JdbcBatchItemWriter<Customer> synchronizedItemStreamReaderCustomerItemWriter() {
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(this.dataSource);
itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
itemWriter.afterPropertiesSet();
return itemWriter;
}
@Bean
public TaskExecutor synchronizedItemStreamReaderTaskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setThreadNamePrefix("safety-thread-");
return executor;
}
}
1) SynchronizedItemStreamReader 설정
SynchronizedItemStreamReader는 스레드 안전을 위한 동기화 처리를 해주고, 데이터 처리는 JdbcCursorItemReader에게 위임한다.
@Bean
@StepScope
public SynchronizedItemStreamReader<Customer> synchronizedItemStreamReaderCustomItemReader() {
JdbcCursorItemReader<Customer> notSafetyReader = new JdbcCursorItemReaderBuilder<Customer>()
.fetchSize(60)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.sql("select id, firstName, lastName, birthdate from customer")
.name("SafetyReader")
.build();
return new SynchronizedItemStreamReaderBuilder<Customer>()
.delegate(notSafetyReader)
.build();
}
- SynchronizedItemStreamReader.java > read()
배치 결과
item.getId() : 33
item.getId() : 34
item.getId() : 35
item.getId() : 36
item.getId() : 37
item.getId() : 38
item.getId() : 39
item.getId() : 40
item.getId() : 41
item.getId() : 42
item.getId() : 43
item.getId() : 44
item.getId() : 45
item.getId() : 46
item.getId() : 47
item.getId() : 48
item.getId() : 49
item.getId() : 50
item.getId() : 51
item.getId() : 52
item.getId() : 53
item.getId() : 54
item.getId() : 55
item.getId() : 56
item.getId() : 57
item.getId() : 58
item.getId() : 59
item.getId() : 60
item.getId() : 61
item.getId() : 62
JdbcCursorItemReader 관련 포스팅은 아래를 참고하자.
https://devfunny.tistory.com/827
[SpringBatch 실습] 26. Cursor - JdbcCursorItemReader, JpaCursorItemReader
Cursor 방식 JDBC ResultSet의 기본 메커니즘을 사용한다. 현재 행에 커서를 유지하며 다음 데이터를 호출하면 다음 행으로 커서를 이동하며 데이터 반환이 이루어진다. (Streaming 방식) ResultSet이 open()
devfunny.tistory.com
반응형
'Spring Batch' 카테고리의 다른 글
[SpringBatch 실습] 35. 스프링배치JobExplorer, JobRegistry, JobOperator 알아보기 (With JobRegistryBeanPostProcessor) (0) | 2022.07.21 |
---|---|
[SpringBatch 실습] 34. 스프링 배치 Test 코드 작성 (0) | 2022.07.19 |
[SpringBatch 실습] 32. 스프링 배치 Partitioning(파티셔닝) Job 수행하여 분석 (0) | 2022.07.12 |
[SpringBatch 실습] 31. 스프링 배치 Parallel Steps(병렬) 수행하여 분석 (0) | 2022.07.11 |
[SpringBatch 실습] 30. 스프링 배치 MultiThread(멀티 스레드) Job 수행하여 분석 (0) | 2022.07.07 |