Spring Batch

[SpringBatch 실습] 33. 스프링 배치 SynchronizedItemStreamReader로 thread-safe 설정하기

LearnerKSH 2022. 7. 13. 10:17
728x90
반응형

SynchronizedItemStreamReader

Thread-safe 하지 않은 ItemReader를 Thread-safe하게 처리하도록 하는 기능을 제공한다.

 

Thread-safe Reader

각 스레드가 대기하고 있다가 순차적으로 Item을 읽어온다.

 

read()
public class SynchronizedItemStreamReader<T> implements ItemStreamReader<T>, InitializingBean {

   private ItemStreamReader<T> delegate;

   public void setDelegate(ItemStreamReader<T> delegate) {
      this.delegate = delegate;
   }

   /**
    * This delegates to the read method of the <code>delegate</code>
    */
   @Nullable
   public synchronized T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
      return this.delegate.read();
   }
   
   ...
   
}

 

 

Job 생성

SynchronizedConfiguration.java
package com.project.springbatch._46_syncrhnizedItemReader;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.batch.item.support.builder.SynchronizedItemStreamReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.sql.DataSource;

/*
--job.name=synchronizedItemStreamReaderJob
*/
@RequiredArgsConstructor
@Configuration
@Slf4j
public class SynchronizedConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Bean
    public Job synchronizedItemStreamReaderJob() throws Exception {
        return jobBuilderFactory.get("synchronizedItemStreamReaderJob")
                .incrementer(new RunIdIncrementer())
                .start(synchronizedItemStreamReaderStep1())
                .build();
    }

    @Bean
    public Step synchronizedItemStreamReaderStep1() {
        return stepBuilderFactory.get("synchronizedItemStreamReaderStep1")
                .<Customer, Customer>chunk(60)
                .reader(synchronizedItemStreamReaderCustomItemReader())
                .listener(new ItemReadListener<Customer>() {
                    @Override
                    public void beforeRead() {

                    }

                    @Override
                    public void afterRead(Customer item) {
                        System.out.println("item.getId() : " + item.getId());
                    }

                    @Override
                    public void onReadError(Exception ex) {

                    }
                })
                .writer(synchronizedItemStreamReaderCustomerItemWriter())
                .taskExecutor(synchronizedItemStreamReaderTaskExecutor())
                .build();
    }

    /**
     * Item Reader thread-safe set
     * @return
     */
    @Bean
    @StepScope
    public SynchronizedItemStreamReader<Customer> synchronizedItemStreamReaderCustomItemReader() {
        JdbcCursorItemReader<Customer> notSafetyReader = new JdbcCursorItemReaderBuilder<Customer>()
                .fetchSize(60)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
                .sql("select id, firstName, lastName, birthdate from customer")
                .name("SafetyReader")
                .build();

        return new SynchronizedItemStreamReaderBuilder<Customer>()
                .delegate(notSafetyReader)
                .build();
    }

    @Bean
    @StepScope
    public JdbcBatchItemWriter<Customer> synchronizedItemStreamReaderCustomerItemWriter() {
        JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();

        itemWriter.setDataSource(this.dataSource);
        itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
        itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }

    @Bean
    public TaskExecutor synchronizedItemStreamReaderTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setThreadNamePrefix("safety-thread-");
        return executor;
    }
}

1) SynchronizedItemStreamReader 설정

SynchronizedItemStreamReader는 스레드 안전을 위한 동기화 처리를 해주고, 데이터 처리는 JdbcCursorItemReader에게 위임한다.

@Bean
@StepScope
public SynchronizedItemStreamReader<Customer> synchronizedItemStreamReaderCustomItemReader() {
    JdbcCursorItemReader<Customer> notSafetyReader = new JdbcCursorItemReaderBuilder<Customer>()
            .fetchSize(60)
            .dataSource(dataSource)
            .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
            .sql("select id, firstName, lastName, birthdate from customer")
            .name("SafetyReader")
            .build();

    return new SynchronizedItemStreamReaderBuilder<Customer>()
            .delegate(notSafetyReader)
            .build();
}

 

  • SynchronizedItemStreamReader.java > read()

 

배치 결과
item.getId() : 33
item.getId() : 34
item.getId() : 35
item.getId() : 36
item.getId() : 37
item.getId() : 38
item.getId() : 39
item.getId() : 40
item.getId() : 41
item.getId() : 42
item.getId() : 43
item.getId() : 44
item.getId() : 45
item.getId() : 46
item.getId() : 47
item.getId() : 48
item.getId() : 49
item.getId() : 50
item.getId() : 51
item.getId() : 52
item.getId() : 53
item.getId() : 54
item.getId() : 55
item.getId() : 56
item.getId() : 57
item.getId() : 58
item.getId() : 59
item.getId() : 60
item.getId() : 61
item.getId() : 62

 

 

JdbcCursorItemReader 관련 포스팅은 아래를 참고하자.

https://devfunny.tistory.com/827

 

[SpringBatch 실습] 26. Cursor - JdbcCursorItemReader, JpaCursorItemReader

Cursor 방식 JDBC ResultSet의 기본 메커니즘을 사용한다. 현재 행에 커서를 유지하며 다음 데이터를 호출하면 다음 행으로 커서를 이동하며 데이터 반환이 이루어진다. (Streaming 방식) ResultSet이 open()

devfunny.tistory.com

 

 

반응형