반응형
728x90
반응형
Paging 방식
- 페이징 단위로 데이터를 조회하는 방식으로 Page Size만큼 한번에 메모리로 가지고 온 다음 한개씩 읽는다.
- 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 대량의 데이터를 처리하더라도 SocketTimeout 예외가 거의 발생하지 않는다.
- 시작 행 번호를 지정하고 페이지에 반환시키고자 하는 행의 수를 지정한 후 사용한다. (offset-limit)
- 페이징 단위의 결괌나 메모리에 할당하기 때문에 메모리 사용량이 적다.
- Connection 연결 유지 기간이 길지 않고 메모리 공간을 효율적으로 새용해야하는 데이터 처리에 적합하다.
JdbcPagingItemReader
- Paging 기반의 JDBC 구현체로써, 쿼리에 시작 행 번호(offset)와 페이지에서 반환할 행수(limit)을 지정해서 SQL을 실행한다.
- 스프링 배치에서 offset, limit을 PageSize에 맞게 자동으로 생성해준다.
- 페이징 단위로 데이터를 조회할 때마다 새로운 쿼리가 실행된다.
- 페이지마다 새로운 쿼리를 실행하므로, 페이징 시 결과 데이터의 순서가 보장될 수 있도록 order by 구문이 작성되도록 한다.
- 항상 정렬된 상태에서 페이징할 필요가 있다.
- 멀티 스레드 환경에서 Thread 안정성을 보장하기 때문에 별도의 동기화를 할 필요가 없다.
- read()할때 synchronized 구문이 있다.
Customer.java
package com.project.springbatch._55_reader_jdbcPaging;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Customer {
private Integer id;
private String firstName;
private String lastName;
private String birthdate;
}
CustomRowMapper.java
package com.project.springbatch._55_reader_jdbcPaging;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int i) throws SQLException {
Customer customer = new Customer();
customer.setId(rs.getInt("id"));
customer.setFirstName(rs.getString("firstname"));
customer.setLastName(rs.getString("lastname"));
customer.setBirthdate(rs.getString("birthdate"));
return customer;
}
}
JdbcPagingConfiguration.java
package com.project.springbatch._55_reader_jdbcPaging;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@RequiredArgsConstructor
@Configuration
public class JdbcPagingConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
public final DataSource dataSource;
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(customItemReader())
.writer(customItemWriter())
.build();
}
@Bean
public JdbcPagingItemReader<Customer> customItemReader() throws Exception {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("firstname", "A%");
return new JdbcPagingItemReaderBuilder<Customer>()
.name("jdbcPagingItemReader")
.pageSize(10)
.fetchSize(10)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.queryProvider(createQueryProvider())
.parameterValues(parameters)
.build();
}
@Bean
public PagingQueryProvider createQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("id,firstName,lastName,birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where firstName like :firstname");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
@Bean
public ItemWriter<Customer> customItemWriter() {
return items -> {
for (Customer item : items) {
System.out.println(item.toString());
}
};
}
}
1) 각 속성 설명
@Bean
public JdbcPagingItemReader<Customer> customItemReader() throws Exception {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("firstname", "A%");
return new JdbcPagingItemReaderBuilder<Customer>()
.name("jdbcPagingItemReader")
.pageSize(10)
.fetchSize(10)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.queryProvider(createQueryProvider())
.parameterValues(parameters)
.build();
}
method | 설명 |
name | ItemReader 이름 |
fetchSize | 한번에 읽어올 데이터 갯수 - fetchSize 만큼 행단위로 데이터를 조회하도록 설정 - paging 방식의 Reader 는 커서 개념이 아닌 페이지 단위로 분할해서 처리하는 방식이라서 만약 pageSize 와 fetchSize 가 동일하다면 둘의 차이에 큰 의미가 없다. - fetchSize 와 pageSize 가 다를 경우 DB 에 따라 차이가 있겠지만 내부적으로 pageSize 만큼 분할 처리가 완료된 이후 fetchSize 만큼 행단위로 읽어오는 식으로 처리가 이루어질 수도 있다. - 특별한 이유가 없으면 chunkSize, pageSize, fetchSize 를 가급적 동일하게 설정해서 사용하는 것을 권장한다. |
pageSize | 페이지 크기 설정 (쿼리당 요청할 레코드 수) |
dataSource | DB에 접근하기 위해 Datasource 설정 |
queryProvider | DB 페이징 전략에 따른 PagingQueryProvider 설정 |
rowMapper | 쿼리 결과로 반환되는 데이터와 객체를 매핑하기 위한 RowMapper 설정 |
(QueryProvider API) selectClause | select절 설정 |
(QueryProvider API) fromClause | from절 설정 |
(QueryProvider API) whereClause | where절 설정 |
(QueryProvider API) groupClause | group절 설정 |
(QueryProvider API) sortKeys | 정렬을 위한 유니크 키 설정 (HashMap 형태) |
parameterValues | 쿼리 파라미터 설정 (Map 형태) |
maxItemCount | 조회할 최대 item 수 |
currentItemCount | 조회 item의 시작 지점 |
maxRows | ResultSet 오프젝트가 포함할 수 있는 최대 행 수 |
2) PagingQueryProvider
- 쿼리 실행에 필요한 쿼리문을 ItemReader에게 제공하는 클래스다.
- 데이터베이스마다 페이징 전략이 다르기 때문에 각 데이터베이스 유형마다 다른 PagingQueryProvider를 사용한다.
- Select절, From절, SortKey는 필수로 설정해야한다.
- Where, Group by 절은 필수가 아니다.
@Bean
public PagingQueryProvider createQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("id,firstName,lastName,birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where firstName like :firstname");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
- Datasource 설정값으로 Provider 중 하나를 자동으로 선택한다.
SqlPagingQueryProviderFactoryBean.java
private Map<DatabaseType, AbstractSqlPagingQueryProvider> providers = new HashMap<>();
{
providers.put(DB2, new Db2PagingQueryProvider());
providers.put(DB2VSE, new Db2PagingQueryProvider());
providers.put(DB2ZOS, new Db2PagingQueryProvider());
providers.put(DB2AS400, new Db2PagingQueryProvider());
providers.put(DERBY,new DerbyPagingQueryProvider());
providers.put(HSQL,new HsqlPagingQueryProvider());
providers.put(H2,new H2PagingQueryProvider());
providers.put(MYSQL,new MySqlPagingQueryProvider());
providers.put(ORACLE,new OraclePagingQueryProvider());
providers.put(POSTGRES,new PostgresPagingQueryProvider());
providers.put(SQLITE, new SqlitePagingQueryProvider());
providers.put(SQLSERVER,new SqlServerPagingQueryProvider());
providers.put(SYBASE,new SybasePagingQueryProvider());
}
providers에 각 DB(key)에 대한 QueryProvider(value)가 들어가있다.
실행단계
단계 | 설명 |
init(dataSource) | AbstractSqlPagingQueryProvider.java > init(dataSource) |
doRead() | AbstractPagingItemReader.java > doRead() - synchronized (동기화 처리) JdbcPagingItemReader.java > doReadPage() - pageSize 만큼 데이터를 읽는다. |
SimpleChunkProvider | SimpleChunkProvider - chunkSize 만큼 데이터 읽는게 반복된다. |
1) AbstractPagingItemReader.java
▶ doRead()
@Nullable
@Override
protected T doRead() throws Exception {
synchronized (lock) {
if (results == null || current >= pageSize) {
if (logger.isDebugEnabled()) {
logger.debug("Reading page " + getPage());
}
doReadPage();
page++;
if (current >= pageSize) {
current = 0;
}
}
int next = current++;
if (next < results.size()) {
return results.get(next);
}
else {
return null;
}
}
}
▶ doReadPage()
@Override
protected void doReadPage() {
if (results == null) {
results = new CopyOnWriteArrayList<>();
}
else {
results.clear();
}
PagingRowMapper rowCallback = new PagingRowMapper();
List<T> query;
if (getPage() == 0) {
if (logger.isDebugEnabled()) {
logger.debug("SQL used for reading first page: [" + firstPageSql + "]");
}
if (parameterValues != null && parameterValues.size() > 0) {
if (this.queryProvider.isUsingNamedParameters()) {
query = namedParameterJdbcTemplate.query(firstPageSql,
getParameterMap(parameterValues, null), rowCallback);
}
else {
query = getJdbcTemplate().query(firstPageSql,
getParameterList(parameterValues, null).toArray(), rowCallback);
}
}
else {
query = getJdbcTemplate().query(firstPageSql, rowCallback);
}
}
else if (startAfterValues != null) {
previousStartAfterValues = startAfterValues;
if (logger.isDebugEnabled()) {
logger.debug("SQL used for reading remaining pages: [" + remainingPagesSql + "]");
}
if (this.queryProvider.isUsingNamedParameters()) {
query = namedParameterJdbcTemplate.query(remainingPagesSql,
getParameterMap(parameterValues, startAfterValues), rowCallback);
}
else {
query = getJdbcTemplate().query(remainingPagesSql,
getParameterList(parameterValues, startAfterValues).toArray(), rowCallback);
}
}
else {
query = Collections.emptyList();
}
results.addAll(query);
}
JpaPagingItemReader
- Paging 기반의 JPA 구현체로써, EntityManagerFactory 객체가 필요하며, 쿼리는 JPQL을 사용한다.
- JpaPagingItemReader 또한 doRead() 메서드에서 동기화 처리가 되어있어서 멀티 스레드 환경에서 안전하다.
Customer.java
package com.project.springbatch._54_reader_jpaCursor;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
public class Customer {
@Id
@GeneratedValue
private Long Id;
private String firstname;
private String lastname;
private String birthdate;
}
Address
package com.project.springbatch._56_reader_jpaPaging;
import lombok.Getter;
import lombok.Setter;
import javax.persistence.*;
@Getter
@Setter
@Entity
public class Address {
@Id
@GeneratedValue
private Long Id;
private String location;
@OneToOne
@JoinColumn(name = "customer_id")
private Customer customer;
}
JpaPagingConfiguration.java
package com.project.springbatch._56_reader_jpaPaging;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.persistence.EntityManagerFactory;
@RequiredArgsConstructor
@Configuration
public class JpaPagingConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(customItemReader())
.writer(customItemWriter())
.build();
}
@Bean
public JpaPagingItemReader<Customer> customItemReader() {
return new JpaPagingItemReaderBuilder<Customer>()
.name("jpaPagingItemReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(10)
.queryString("select c from Customer c join fetch c.address")
.build();
}
@Bean
public ItemWriter<Customer> customItemWriter() {
return items -> {
for (Customer customer : items) {
System.out.println(customer.getAddress().getLocation());
}
};
}
}
1) 각 속성 설명
@Bean
public JpaPagingItemReader<Customer> customItemReader() {
/*
JpaPagingItemReader - doOpen() 호출 -> entityManager 생성
AbstractItemCountingItemStreamItemReader - update() : 상태정보 업데이트
JpaPagingItemReader - doReadPage()
- 쿼리 수행 (pageSize 만큼 데이터를 가져온다.)
*/
return new JpaPagingItemReaderBuilder<Customer>()
.name("jpaPagingItemReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(10)
.queryString("select c from Customer c join fetch c.address")
.build();
}
method | 설명 |
name | ItemReader 이름 |
pageSize | 페이지 크기 설정 (쿼리 당 요청할 레코드 수) |
queryString | ItemReader가 조회할때 사용할 JPQL 문장 설정 |
EntityManagerFactory | JPQL을 실행하는 EntityManager를 생성하는 팩토리 |
parameterValue | 쿼리 파라미터 설정 (HashMap<String, Object> 형태) |
수행단계
단계 | 설명 |
JpaPagingItemReader 생성 | JpaPagingItemReader - doOpen() 호출 -> entityManager 생성 |
update() | AbstractItemCountingItemStreamItemReader - update() : 상태정보 업데이트 |
doReadPage() | JpaPagingItemReader - doReadPage() - 쿼리 수행 (pageSize 만큼 데이터를 가져온다.) |
1) JpaPagingItemReader.java
▶ doOpen()
@Override
protected void doOpen() throws Exception {
super.doOpen();
entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
if (entityManager == null) {
throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
}
// set entityManager to queryProvider, so it participates
// in JpaPagingItemReader's managed transaction
if (queryProvider != null) {
queryProvider.setEntityManager(entityManager);
}
}
2) AbstractItemCountingItemStreamItemReader.java
▶ update()
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
super.update(executionContext);
if (saveState) {
Assert.notNull(executionContext, "ExecutionContext must not be null");
executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
if (maxItemCount < Integer.MAX_VALUE) {
executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
}
}
}
3) AbstractPagingItemReader.java
▶ doRead()
@Nullable
@Override
protected T doRead() throws Exception {
synchronized (lock) {
if (results == null || current >= pageSize) {
if (logger.isDebugEnabled()) {
logger.debug("Reading page " + getPage());
}
doReadPage();
page++;
if (current >= pageSize) {
current = 0;
}
}
int next = current++;
if (next < results.size()) {
return results.get(next);
}
else {
return null;
}
}
}
4) JpaPagingItemReader.java
▶ doRead()
@Override
@SuppressWarnings("unchecked")
protected void doReadPage() {
EntityTransaction tx = null;
if (transacted) {
tx = entityManager.getTransaction();
tx.begin();
entityManager.flush();
entityManager.clear();
}//end if
Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
if (parameterValues != null) {
for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
query.setParameter(me.getKey(), me.getValue());
}
}
if (results == null) {
results = new CopyOnWriteArrayList<>();
}
else {
results.clear();
}
if (!transacted) {
List<T> queryResult = query.getResultList();
for (T entity : queryResult) {
entityManager.detach(entity);
results.add(entity);
}//end if
} else {
results.addAll(query.getResultList());
tx.commit();
}//end if
}
반응형