[SpringBatch 실습] 27. Paging - JdbcPagingItemReader, JpaPagingItemReader

반응형
728x90
반응형

Paging 방식

  • 페이징 단위로 데이터를 조회하는 방식으로 Page Size만큼 한번에 메모리로 가지고 온 다음 한개씩 읽는다.
  • 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 대량의 데이터를 처리하더라도 SocketTimeout 예외가 거의 발생하지 않는다.
  • 시작 행 번호를 지정하고 페이지에 반환시키고자 하는 행의 수를 지정한 후 사용한다. (offset-limit)
  • 페이징 단위의 결괌나 메모리에 할당하기 때문에 메모리 사용량이 적다.
  • Connection 연결 유지 기간이 길지 않고 메모리 공간을 효율적으로 새용해야하는 데이터 처리에 적합하다.

 

 

JdbcPagingItemReader

  • Paging 기반의 JDBC 구현체로써, 쿼리에 시작 행 번호(offset)와 페이지에서 반환할 행수(limit)을 지정해서 SQL을 실행한다.
  • 스프링 배치에서 offset, limit을 PageSize에 맞게 자동으로 생성해준다.
  • 페이징 단위로 데이터를 조회할 때마다 새로운 쿼리가 실행된다.
  • 페이지마다 새로운 쿼리를 실행하므로, 페이징 시 결과 데이터의 순서가 보장될 수 있도록 order by 구문이 작성되도록 한다.
    • 항상 정렬된 상태에서 페이징할 필요가 있다.
  • 멀티 스레드 환경에서 Thread 안정성을 보장하기 때문에 별도의 동기화를 할 필요가 없다.
    • read()할때 synchronized 구문이 있다.

 

Customer.java
package com.project.springbatch._55_reader_jdbcPaging;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Customer {

    private Integer id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

 

CustomRowMapper.java
package com.project.springbatch._55_reader_jdbcPaging;

import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;

public class CustomerRowMapper implements RowMapper<Customer> {
    @Override
    public Customer mapRow(ResultSet rs, int i) throws SQLException {
        Customer customer = new Customer();

        customer.setId(rs.getInt("id"));
        customer.setFirstName(rs.getString("firstname"));
        customer.setLastName(rs.getString("lastname"));
        customer.setBirthdate(rs.getString("birthdate"));

        return customer;
    }
}

 

JdbcPagingConfiguration.java
package com.project.springbatch._55_reader_jdbcPaging;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@RequiredArgsConstructor
@Configuration
public class JdbcPagingConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    public final DataSource dataSource;

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();
    }

    @Bean
    public JdbcPagingItemReader<Customer> customItemReader() throws Exception {

        HashMap<String, Object> parameters = new HashMap<>();
        parameters.put("firstname", "A%");

        return new JdbcPagingItemReaderBuilder<Customer>()
                .name("jdbcPagingItemReader")
                .pageSize(10)
                .fetchSize(10)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
                .queryProvider(createQueryProvider())
                .parameterValues(parameters)
                .build();
    }
    
    @Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("id,firstName,lastName,birthdate");
        queryProvider.setFromClause("from customer");
        queryProvider.setWhereClause("where firstName like :firstname");

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        return queryProvider.getObject();
    }

    @Bean
    public ItemWriter<Customer> customItemWriter() {
        return items -> {
            for (Customer item : items) {
                System.out.println(item.toString());
            }
        };
    }
}

1) 각 속성 설명

 @Bean
    public JdbcPagingItemReader<Customer> customItemReader() throws Exception {

        HashMap<String, Object> parameters = new HashMap<>();
        parameters.put("firstname", "A%");

        return new JdbcPagingItemReaderBuilder<Customer>()
                .name("jdbcPagingItemReader")
                .pageSize(10)
                .fetchSize(10)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
                .queryProvider(createQueryProvider())
                .parameterValues(parameters)
                .build();
    }
method 설명
name ItemReader 이름
fetchSize 한번에 읽어올 데이터 갯수
- fetchSize 만큼 행단위로 데이터를 조회하도록 설정
- paging  방식의 Reader 는 커서 개념이 아닌 페이지 단위로 분할해서 처리하는 방식이라서 만약 pageSize 와 fetchSize 가 동일하다면 둘의 차이에 큰 의미가 없다.
- fetchSize 와 pageSize 가 다를 경우 DB 에 따라 차이가 있겠지만 내부적으로 pageSize 만큼 분할 처리가 완료된 이후 fetchSize 만큼 행단위로 읽어오는 식으로 처리가 이루어질 수도 있다.
- 특별한 이유가 없으면 chunkSize, pageSize, fetchSize 를 가급적 동일하게 설정해서 사용하는 것을 권장한다.
pageSize 페이지 크기 설정 (쿼리당 요청할 레코드 수)
dataSource DB에 접근하기 위해 Datasource 설정
queryProvider DB 페이징 전략에 따른 PagingQueryProvider 설정
rowMapper 쿼리 결과로 반환되는 데이터와 객체를 매핑하기 위한 RowMapper 설정
(QueryProvider API) selectClause select절 설정
(QueryProvider API) fromClause from절 설정
(QueryProvider API) whereClause where절 설정
(QueryProvider API) groupClause group절 설정
(QueryProvider API) sortKeys 정렬을 위한 유니크 키 설정 (HashMap 형태)
parameterValues 쿼리 파라미터 설정 (Map 형태)
maxItemCount 조회할 최대 item 수
currentItemCount 조회 item의 시작 지점
maxRows ResultSet 오프젝트가 포함할 수 있는 최대 행 수

 

2) PagingQueryProvider

  • 쿼리 실행에 필요한 쿼리문을 ItemReader에게 제공하는 클래스다.
  • 데이터베이스마다 페이징 전략이 다르기 때문에 각 데이터베이스 유형마다 다른 PagingQueryProvider를 사용한다.
  • Select절, From절, SortKey는 필수로 설정해야한다.
  • Where, Group by 절은 필수가 아니다.
@Bean
public PagingQueryProvider createQueryProvider() throws Exception {
    SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
    queryProvider.setDataSource(dataSource);
    queryProvider.setSelectClause("id,firstName,lastName,birthdate");
    queryProvider.setFromClause("from customer");
    queryProvider.setWhereClause("where firstName like :firstname");

    Map<String, Order> sortKeys = new HashMap<>(1);
    sortKeys.put("id", Order.ASCENDING);

    queryProvider.setSortKeys(sortKeys);

    return queryProvider.getObject();
}

- Datasource 설정값으로 Provider 중 하나를 자동으로 선택한다.

SqlPagingQueryProviderFactoryBean.java
private Map<DatabaseType, AbstractSqlPagingQueryProvider> providers = new HashMap<>();


{
   providers.put(DB2, new Db2PagingQueryProvider());
   providers.put(DB2VSE, new Db2PagingQueryProvider());
   providers.put(DB2ZOS, new Db2PagingQueryProvider());
   providers.put(DB2AS400, new Db2PagingQueryProvider());
   providers.put(DERBY,new DerbyPagingQueryProvider());
   providers.put(HSQL,new HsqlPagingQueryProvider());
   providers.put(H2,new H2PagingQueryProvider());
   providers.put(MYSQL,new MySqlPagingQueryProvider());
   providers.put(ORACLE,new OraclePagingQueryProvider());
   providers.put(POSTGRES,new PostgresPagingQueryProvider());
   providers.put(SQLITE, new SqlitePagingQueryProvider());
   providers.put(SQLSERVER,new SqlServerPagingQueryProvider());
   providers.put(SYBASE,new SybasePagingQueryProvider());
}

providers에 각 DB(key)에 대한 QueryProvider(value)가 들어가있다. 



실행단계

단계 설명
init(dataSource) AbstractSqlPagingQueryProvider.java > init(dataSource)
doRead() AbstractPagingItemReader.java > doRead()
- synchronized (동기화 처리)

JdbcPagingItemReader.java > doReadPage()
- pageSize 만큼 데이터를 읽는다.
SimpleChunkProvider SimpleChunkProvider
- chunkSize 만큼 데이터 읽는게 반복된다.

1) AbstractPagingItemReader.java

▶ doRead()

@Nullable
@Override
protected T doRead() throws Exception {

   synchronized (lock) {

      if (results == null || current >= pageSize) {

         if (logger.isDebugEnabled()) {
            logger.debug("Reading page " + getPage());
         }

         doReadPage();
         page++;
         if (current >= pageSize) {
            current = 0;
         }

      }

      int next = current++;
      if (next < results.size()) {
         return results.get(next);
      }
      else {
         return null;
      }

   }

}

▶ doReadPage()

@Override
protected void doReadPage() {
   if (results == null) {
      results = new CopyOnWriteArrayList<>();
   }
   else {
      results.clear();
   }

   PagingRowMapper rowCallback = new PagingRowMapper();

   List<T> query;

   if (getPage() == 0) {
      if (logger.isDebugEnabled()) {
         logger.debug("SQL used for reading first page: [" + firstPageSql + "]");
      }
      if (parameterValues != null && parameterValues.size() > 0) {
         if (this.queryProvider.isUsingNamedParameters()) {
            query = namedParameterJdbcTemplate.query(firstPageSql,
                  getParameterMap(parameterValues, null), rowCallback);
         }
         else {
            query = getJdbcTemplate().query(firstPageSql,
                  getParameterList(parameterValues, null).toArray(), rowCallback);
         }
      }
      else {
         query = getJdbcTemplate().query(firstPageSql, rowCallback);
      }

   }
   else if (startAfterValues != null) {
      previousStartAfterValues = startAfterValues;
      if (logger.isDebugEnabled()) {
         logger.debug("SQL used for reading remaining pages: [" + remainingPagesSql + "]");
      }
      if (this.queryProvider.isUsingNamedParameters()) {
         query = namedParameterJdbcTemplate.query(remainingPagesSql,
               getParameterMap(parameterValues, startAfterValues), rowCallback);
      }
      else {
         query = getJdbcTemplate().query(remainingPagesSql,
               getParameterList(parameterValues, startAfterValues).toArray(), rowCallback);
      }
   }
   else {
      query = Collections.emptyList();
   }

   results.addAll(query);
}

 

 

 

 

JpaPagingItemReader

  • Paging 기반의 JPA 구현체로써, EntityManagerFactory 객체가 필요하며, 쿼리는 JPQL을 사용한다.
  • JpaPagingItemReader 또한 doRead() 메서드에서 동기화 처리가 되어있어서 멀티 스레드 환경에서 안전하다.

 

Customer.java
package com.project.springbatch._54_reader_jpaCursor;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
public class Customer {
    @Id
    @GeneratedValue
    private Long Id;
    private String firstname;
    private String lastname;
    private String birthdate;
}

 

Address
package com.project.springbatch._56_reader_jpaPaging;

import lombok.Getter;
import lombok.Setter;

import javax.persistence.*;

@Getter
@Setter
@Entity
public class Address {

    @Id
    @GeneratedValue
    private Long Id;
    private String location;

    @OneToOne
    @JoinColumn(name = "customer_id")
    private Customer customer;
}

 

JpaPagingConfiguration.java
package com.project.springbatch._56_reader_jpaPaging;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;

@RequiredArgsConstructor
@Configuration
public class JpaPagingConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();
    }

    @Bean
    public JpaPagingItemReader<Customer> customItemReader() {
        return new JpaPagingItemReaderBuilder<Customer>()
                .name("jpaPagingItemReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(10)
                .queryString("select c from Customer c join fetch c.address")
                .build();
    }

    @Bean
    public ItemWriter<Customer> customItemWriter() {
        return items -> {
            for (Customer customer : items) {
                System.out.println(customer.getAddress().getLocation());
            }
        };
    }
}

1) 각 속성 설명

@Bean
public JpaPagingItemReader<Customer> customItemReader() {
    /*
       JpaPagingItemReader - doOpen() 호출 -> entityManager 생성
       AbstractItemCountingItemStreamItemReader - update() : 상태정보 업데이트
       JpaPagingItemReader - doReadPage()
       - 쿼리 수행 (pageSize 만큼 데이터를 가져온다.)
     */
    return new JpaPagingItemReaderBuilder<Customer>()
            .name("jpaPagingItemReader")
            .entityManagerFactory(entityManagerFactory)
            .pageSize(10)
            .queryString("select c from Customer c join fetch c.address")
            .build();
}
method 설명
name ItemReader 이름
pageSize 페이지 크기 설정 (쿼리 당 요청할 레코드 수)
queryString ItemReader가 조회할때 사용할 JPQL 문장 설정
EntityManagerFactory JPQL을 실행하는 EntityManager를 생성하는 팩토리
parameterValue 쿼리 파라미터 설정 (HashMap<String, Object> 형태)

 

 

수행단계

단계 설명
JpaPagingItemReader 생성 JpaPagingItemReader - doOpen() 호출 -> entityManager 생성
update() AbstractItemCountingItemStreamItemReader - update() : 상태정보 업데이트
doReadPage() JpaPagingItemReader - doReadPage()
- 쿼리 수행 (pageSize 만큼 데이터를 가져온다.)

1) JpaPagingItemReader.java

▶ doOpen()

@Override
protected void doOpen() throws Exception {
   super.doOpen();

   entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
   if (entityManager == null) {
      throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
   }
   // set entityManager to queryProvider, so it participates
   // in JpaPagingItemReader's managed transaction
   if (queryProvider != null) {
      queryProvider.setEntityManager(entityManager);
   }

}

 

2) AbstractItemCountingItemStreamItemReader.java

▶ update()

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
   super.update(executionContext);
   if (saveState) {
      Assert.notNull(executionContext, "ExecutionContext must not be null");
      executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
      if (maxItemCount < Integer.MAX_VALUE) {
         executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
      }
   }

}

 

3) AbstractPagingItemReader.java

▶ doRead()

@Nullable
@Override
protected T doRead() throws Exception {

   synchronized (lock) {

      if (results == null || current >= pageSize) {

         if (logger.isDebugEnabled()) {
            logger.debug("Reading page " + getPage());
         }

         doReadPage();
         page++;
         if (current >= pageSize) {
            current = 0;
         }

      }

      int next = current++;
      if (next < results.size()) {
         return results.get(next);
      }
      else {
         return null;
      }

   }

}

 

4) JpaPagingItemReader.java

▶ doRead()

@Override
@SuppressWarnings("unchecked")
protected void doReadPage() {

   EntityTransaction tx = null;
   
   if (transacted) {
      tx = entityManager.getTransaction();
      tx.begin();
      
      entityManager.flush();
      entityManager.clear();
   }//end if

   Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());

   if (parameterValues != null) {
      for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
         query.setParameter(me.getKey(), me.getValue());
      }
   }

   if (results == null) {
      results = new CopyOnWriteArrayList<>();
   }
   else {
      results.clear();
   }
   
   if (!transacted) {
      List<T> queryResult = query.getResultList();
      for (T entity : queryResult) {
         entityManager.detach(entity);
         results.add(entity);
      }//end if
   } else {
      results.addAll(query.getResultList());
      tx.commit();
   }//end if
}

 

 

반응형

Designed by JB FACTORY