[SpringBatch 실습] 26. Cursor - JdbcCursorItemReader, JpaCursorItemReader

반응형
728x90
반응형

Cursor 방식

  • JDBC ResultSet의 기본 메커니즘을 사용한다.
  • 현재 행에 커서를 유지하며 다음 데이터를 호출하면 다음 행으로 커서를 이동하며 데이터 반환이 이루어진다. (Streaming 방식)
  • ResultSet이 open()될 때마다 next()가 호출되어 Database의 데이터가 반환되고 객체와 매핑이 이루어진다.
  • DB Connection이 연결되면 배치 처리가 완료될 때까지 데이터를 읽어오기 때문에 DB와 SocketTimeout을 충분한 값으로 설정해야한다.
  • 모든 결과를 메모리에 할당하므로 메모리 사용량이 많다.
  • Connection 연결 유지 시간과 메모리 공간이 충분하다면 대량의 데이터 처리에 적합할 수 있다.

 

JdbcCursorItemReader

  • Cursor 기반의 JDBC 구현체로써, ResultSet과 함게 사용되며 Datasource에서 Connection을 얻어와서 SQL을 실행한다.
  • Thread 안정성을 보장하지 않는다.
  • 멀티 스레드 환경에서 사용할 경우 동시성 이슈가 발생하지 않도록 별도 동기화 처리가 필요하다.

 

Customer.java
package com.project.springbatch._53_reader_jdbcCursor;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Customer {

    private Long Id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

 

JdbcCursorConfiguration.java
package com.project.springbatch._53_reader_jdbcCursor;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/*
--job.name=jdbcCursorJob
 */
@RequiredArgsConstructor
@Configuration
public class JdbcCursorConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Bean
    public Job jdbcCursorJob() {
        return jobBuilderFactory.get("jdbcCursorJob")
                .incrementer(new RunIdIncrementer())
                .start(jdbcCursorStep1())
                .build();
    }

    @Bean
    public Step jdbcCursorStep1() {
        return stepBuilderFactory.get("jdbcCursorStep1")
                .<Customer, Customer>chunk(10)
                .reader(customJdbcCursorItemReader())
                .writer(customJdbcCursorItemWriter())
                .build();
    }

    @Bean
    public JdbcCursorItemReader<Customer> customJdbcCursorItemReader() {
        return new JdbcCursorItemReaderBuilder()
                .name("jdbcCursorItemReader")
                .fetchSize(10)
                .sql("select id, firstName, lastName, birthdate from customer where firstName like ? order by lastName, firstName")
                .beanRowMapper(Customer.class) // 자동으로 객체와 매핑
                .queryArguments("A%")
                .maxItemCount(3) // 조회할 최대 아이템 수
                .currentItemCount(2) // 조회 Item의 시작 시점
                .maxRows(100) // ResultSet이 포함할 수 있는 최대 row 수
                .dataSource(dataSource)
                .build();
    }

    @Bean
    public ItemWriter<Customer> customJdbcCursorItemWriter() {
        return items -> {
            for (Customer item : items) {
                System.out.println(item.toString());
            }
        };
    }
}

1) 각 속성 설명

@Bean
public JdbcCursorItemReader<Customer> customJdbcCursorItemReader() {
    return new JdbcCursorItemReaderBuilder()
            .name("jdbcCursorItemReader")
            .fetchSize(10)
            .sql("select id, firstName, lastName, birthdate from customer where firstName like ? order by lastName, firstName")
            .beanRowMapper(Customer.class) // 자동으로 객체와 매핑
            .queryArguments("A%")
            .maxItemCount(3) // 조회할 최대 아이템 수
            .currentItemCount(2) // 조회 Item의 시작 시점
            .maxRows(100) // ResultSet이 포함할 수 있는 최대 row 수
            .dataSource(dataSource)
            .build();
}
method 설명
name ItemReader 이름
fetchSize 한번에 읽어올 데이터 갯수
- chunk size와 동일하게 설정하는 것을 추천한다. 모두 5라고 가정했을때, 5개의 데이터를 한번에 가져와서 5개씩 Commit을 수행하게된다.
sql 실행할 쿼리
beanRowMapper 객체와 자동으로 매핑해주는 Mapper
queryArguments sql 쿼리에 사용될 쿼리 파라미터 설정
maxItemCount 조회할 최대 아이템 갯수
currentItemCount 조회 Item의 시작 시점
- 현재 ItemCount 갯수를 센다. MaxItemCount와 연동되어 사용되는데 만약 MaxItemCount이 20이고, CurrentItemCount가 20이면 더이상 읽어올 데이터가 없다.
maxRows ResultSet이 포함할 수 있는 최대 row 수
dataSource 연결할 DB의 dataSource

 

 

실행단계

단계 설명
JdbcCursorItemReader 생성 JdbcCursorItemReaderBuilder > build() -> JdbcCursorItemReader 생성
executeQuery() AbstractCursorItemReader > doOpen() > openCursor() - executeQuery()
update() AbstractItemContingItemStreamReader > update()
- 상태정보 기록
doRead() AbstractCursorItemReader > doRead() 
- doRead()를 호출하여 데이터를 하나씩 가져온다.
- fetchSize가 10이므로 resultSet 에 10개가 담겨져있다.
- rs.next()로 row 를 읽어온다.
writer() chunkSize 만큼 읽어오면 writer 로 수행

1) JdbcCursorItemReaderBuilder > build() -> JdbcCursorItemReader 생성

public JdbcCursorItemReader<T> build() {
   if(this.saveState) {
      Assert.hasText(this.name,
            "A name is required when saveState is set to true");
   }

   Assert.hasText(this.sql, "A query is required");
   Assert.notNull(this.dataSource, "A datasource is required");
   Assert.notNull(this.rowMapper, "A rowmapper is required");

   JdbcCursorItemReader<T> reader = new JdbcCursorItemReader<>();

   if(StringUtils.hasText(this.name)) {
      reader.setName(this.name);
   }

   reader.setSaveState(this.saveState);
   reader.setPreparedStatementSetter(this.preparedStatementSetter);
   reader.setRowMapper(this.rowMapper);
   reader.setSql(this.sql);
   reader.setCurrentItemCount(this.currentItemCount);
   reader.setDataSource(this.dataSource);
   reader.setDriverSupportsAbsolute(this.driverSupportsAbsolute);
   reader.setFetchSize(this.fetchSize);
   reader.setIgnoreWarnings(this.ignoreWarnings);
   reader.setMaxItemCount(this.maxItemCount);
   reader.setMaxRows(this.maxRows);
   reader.setQueryTimeout(this.queryTimeout);
   reader.setUseSharedExtendedConnection(this.useSharedExtendedConnection);
   reader.setVerifyCursorPosition(this.verifyCursorPosition);
   reader.setConnectionAutoCommit(this.connectionAutoCommit);

   return reader;
}

 

2) AbstractCursorItemReader > doOpen() > openCursor() - executeQuery()

▶ AbstractCursorItemReader.java > doOpen()

@Override
protected void doOpen() throws Exception {

   Assert.state(!initialized, "Stream is already initialized.  Close before re-opening.");
   Assert.isNull(rs, "ResultSet still open!  Close before re-opening.");

   initializeConnection();
   openCursor(con);
   initialized = true;

}

 

▶ JdbcCursorItemReader.java > openCursor()

@Override
protected void openCursor(Connection con) {
   try {
      if (isUseSharedExtendedConnection()) {
         preparedStatement = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
               ResultSet.HOLD_CURSORS_OVER_COMMIT);
      }
      else {
         preparedStatement = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
      }
      applyStatementSettings(preparedStatement);
      if (this.preparedStatementSetter != null) {
         preparedStatementSetter.setValues(preparedStatement);
      }
      this.rs = preparedStatement.executeQuery();
      handleWarnings(preparedStatement);
   }
   catch (SQLException se) {
      close();
      throw translateSqlException("Executing query", getSql(), se);
   }

}

 

3) AbstractCursorItemReader.java > doRead()

@Nullable
@Override
protected T doRead() throws Exception {
   if (rs == null) {
      throw new ReaderNotOpenException("Reader must be open before it can be read.");
   }

   try {
      if (!rs.next()) {
         return null;
      }
      int currentRow = getCurrentItemCount();
      T item = readCursor(rs, currentRow);
      verifyCursorPosition(currentRow);
      return item;
   }
   catch (SQLException se) {
      throw translateSqlException("Attempt to process next row failed", getSql(), se);
   }
}

 

▶ JdbcCursorItemReader.java > readCursor()

@Nullable
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
   return rowMapper.mapRow(rs, currentRow);
}

 

 

 

 

JpaCursorItemReader

  • SpringBatch 4.3 버전부터 지원된다.
  • Cursor 기반의 JPA 구현체로써 EntityManagerFactory 객체가 필요하며 쿼리는 JPQL을 사용한다.

 

Customer.java
package com.project.springbatch._54_reader_jpaCursor;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
public class Customer {
    @Id
    @GeneratedValue
    private Long Id;
    private String firstname;
    private String lastname;
    private String birthdate;
}

 

JpaCursorConfiguration.java
package com.project.springbatch._54_reader_jpaCursor;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaCursorItemReader;
import org.springframework.batch.item.database.builder.JpaCursorItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;
import java.util.HashMap;

@RequiredArgsConstructor
@Configuration
public class JpaCursorConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public Job jpaCursorJob() {
        return jobBuilderFactory.get("jpaCursorJob")
                .incrementer(new RunIdIncrementer())
                .start(jpaCursorStep1())
                .build();
    }

    @Bean
    public Step jpaCursorStep1() {
        return stepBuilderFactory.get("jpaCursorStep1")
                .<Customer, Customer>chunk(2)
                .reader(customJpaCursorItemReader())
                .writer(customJpaCursorItemWriter())
                .build();
    }

    @Bean
    public JpaCursorItemReader<Customer> customJpaCursorItemReader() {

        HashMap<String, Object> parameters = new HashMap<>();
        parameters.put("firstname", "A%");

        return new JpaCursorItemReaderBuilder()
                .name("jpaCursorItemReader")
                .queryString("select c from Customer c where firstname like :firstname") // jpql
                .entityManagerFactory(entityManagerFactory)
                .parameterValues(parameters)
//                .maxItemCount(10)
//                .currentItemCount(2)
                .build();
    }

    @Bean
    public ItemWriter<Customer> customJpaCursorItemWriter() {
        return items -> {
            for (Customer item : items) {
                System.out.println(item.toString());
            }
        };
    }
}

1) 각 속성 설명

@Bean
public JpaCursorItemReader<Customer> customJpaCursorItemReader() {

    HashMap<String, Object> parameters = new HashMap<>();
    parameters.put("firstname", "A%");

    return new JpaCursorItemReaderBuilder()
            .name("jpaCursorItemReader")
            .queryString("select c from Customer c where firstname like :firstname") // jpql
            .entityManagerFactory(entityManagerFactory)
            .parameterValues(parameters)
//                .maxItemCount(10)
//                .currentItemCount(2)
            .build();
}
method 설명
name ItemReader 이름
queryString 사용할 JPQL 쿼리문
entityManagerFactory JPA를 사용하기 위한 EntityManagerFactory 설정
parameterValues queryString에 사용된 쿼리의 파라미터 설정

 

 

수행단계

단계 설명
createQuery() JpaCursorItemReader - doOpen() - createQuery()
- 쿼리 수행 후, 결과를 query.getResultStream().iterator() 로 담는다.
update() JpaCursorItemReader - update()
doRead() JpaCursorItemReader - doRead()
- open() 메서드에서 읽어온 모든 데이터를 하나씩 리턴한다.
- 위 과정들이 chunk size 만큼 반복한다.

1) JdbcCursorItemReader.java

▶ doOpen()

@Override
@SuppressWarnings("unchecked")
protected void doOpen() throws Exception {
   this.entityManager = this.entityManagerFactory.createEntityManager();
   if (this.entityManager == null) {
      throw new DataAccessResourceFailureException("Unable to create an EntityManager");
   }
   if (this.queryProvider != null) {
      this.queryProvider.setEntityManager(this.entityManager);
   }
   Query query = createQuery();
   if (this.parameterValues != null) {
      this.parameterValues.forEach(query::setParameter);
   }
   this.iterator = query.getResultStream().iterator();
}

 createQuery()

private Query createQuery() {
   if (this.queryProvider == null) {
      return this.entityManager.createQuery(this.queryString);
   }
   else {
      return this.queryProvider.createQuery();
   }
}

 

▶ doRead()

@Override
protected T doRead() {
   return this.iterator.hasNext() ? this.iterator.next() : null;
}
반응형

Designed by JB FACTORY