[SpringBatch 실습] 33. 스프링 배치 SynchronizedItemStreamReader로 thread-safe 설정하기

반응형
728x90
반응형

SynchronizedItemStreamReader

Thread-safe 하지 않은 ItemReader를 Thread-safe하게 처리하도록 하는 기능을 제공한다.

 

Thread-safe Reader

각 스레드가 대기하고 있다가 순차적으로 Item을 읽어온다.

 

read()
public class SynchronizedItemStreamReader<T> implements ItemStreamReader<T>, InitializingBean {

   private ItemStreamReader<T> delegate;

   public void setDelegate(ItemStreamReader<T> delegate) {
      this.delegate = delegate;
   }

   /**
    * This delegates to the read method of the <code>delegate</code>
    */
   @Nullable
   public synchronized T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
      return this.delegate.read();
   }
   
   ...
   
}

 

 

Job 생성

SynchronizedConfiguration.java
package com.project.springbatch._46_syncrhnizedItemReader;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.batch.item.support.builder.SynchronizedItemStreamReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.sql.DataSource;

/*
--job.name=synchronizedItemStreamReaderJob
*/
@RequiredArgsConstructor
@Configuration
@Slf4j
public class SynchronizedConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Bean
    public Job synchronizedItemStreamReaderJob() throws Exception {
        return jobBuilderFactory.get("synchronizedItemStreamReaderJob")
                .incrementer(new RunIdIncrementer())
                .start(synchronizedItemStreamReaderStep1())
                .build();
    }

    @Bean
    public Step synchronizedItemStreamReaderStep1() {
        return stepBuilderFactory.get("synchronizedItemStreamReaderStep1")
                .<Customer, Customer>chunk(60)
                .reader(synchronizedItemStreamReaderCustomItemReader())
                .listener(new ItemReadListener<Customer>() {
                    @Override
                    public void beforeRead() {

                    }

                    @Override
                    public void afterRead(Customer item) {
                        System.out.println("item.getId() : " + item.getId());
                    }

                    @Override
                    public void onReadError(Exception ex) {

                    }
                })
                .writer(synchronizedItemStreamReaderCustomerItemWriter())
                .taskExecutor(synchronizedItemStreamReaderTaskExecutor())
                .build();
    }

    /**
     * Item Reader thread-safe set
     * @return
     */
    @Bean
    @StepScope
    public SynchronizedItemStreamReader<Customer> synchronizedItemStreamReaderCustomItemReader() {
        JdbcCursorItemReader<Customer> notSafetyReader = new JdbcCursorItemReaderBuilder<Customer>()
                .fetchSize(60)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
                .sql("select id, firstName, lastName, birthdate from customer")
                .name("SafetyReader")
                .build();

        return new SynchronizedItemStreamReaderBuilder<Customer>()
                .delegate(notSafetyReader)
                .build();
    }

    @Bean
    @StepScope
    public JdbcBatchItemWriter<Customer> synchronizedItemStreamReaderCustomerItemWriter() {
        JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();

        itemWriter.setDataSource(this.dataSource);
        itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
        itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }

    @Bean
    public TaskExecutor synchronizedItemStreamReaderTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setThreadNamePrefix("safety-thread-");
        return executor;
    }
}

1) SynchronizedItemStreamReader 설정

SynchronizedItemStreamReader는 스레드 안전을 위한 동기화 처리를 해주고, 데이터 처리는 JdbcCursorItemReader에게 위임한다.

@Bean
@StepScope
public SynchronizedItemStreamReader<Customer> synchronizedItemStreamReaderCustomItemReader() {
    JdbcCursorItemReader<Customer> notSafetyReader = new JdbcCursorItemReaderBuilder<Customer>()
            .fetchSize(60)
            .dataSource(dataSource)
            .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
            .sql("select id, firstName, lastName, birthdate from customer")
            .name("SafetyReader")
            .build();

    return new SynchronizedItemStreamReaderBuilder<Customer>()
            .delegate(notSafetyReader)
            .build();
}

 

  • SynchronizedItemStreamReader.java > read()

 

배치 결과
item.getId() : 33
item.getId() : 34
item.getId() : 35
item.getId() : 36
item.getId() : 37
item.getId() : 38
item.getId() : 39
item.getId() : 40
item.getId() : 41
item.getId() : 42
item.getId() : 43
item.getId() : 44
item.getId() : 45
item.getId() : 46
item.getId() : 47
item.getId() : 48
item.getId() : 49
item.getId() : 50
item.getId() : 51
item.getId() : 52
item.getId() : 53
item.getId() : 54
item.getId() : 55
item.getId() : 56
item.getId() : 57
item.getId() : 58
item.getId() : 59
item.getId() : 60
item.getId() : 61
item.getId() : 62

 

 

JdbcCursorItemReader 관련 포스팅은 아래를 참고하자.

https://devfunny.tistory.com/827

 

[SpringBatch 실습] 26. Cursor - JdbcCursorItemReader, JpaCursorItemReader

Cursor 방식 JDBC ResultSet의 기본 메커니즘을 사용한다. 현재 행에 커서를 유지하며 다음 데이터를 호출하면 다음 행으로 커서를 이동하며 데이터 반환이 이루어진다. (Streaming 방식) ResultSet이 open()

devfunny.tistory.com

 

 

반응형

Designed by JB FACTORY