반응형
728x90
반응형
Cursor 방식
- JDBC ResultSet의 기본 메커니즘을 사용한다.
- 현재 행에 커서를 유지하며 다음 데이터를 호출하면 다음 행으로 커서를 이동하며 데이터 반환이 이루어진다. (Streaming 방식)
- ResultSet이 open()될 때마다 next()가 호출되어 Database의 데이터가 반환되고 객체와 매핑이 이루어진다.
- DB Connection이 연결되면 배치 처리가 완료될 때까지 데이터를 읽어오기 때문에 DB와 SocketTimeout을 충분한 값으로 설정해야한다.
- 모든 결과를 메모리에 할당하므로 메모리 사용량이 많다.
- Connection 연결 유지 시간과 메모리 공간이 충분하다면 대량의 데이터 처리에 적합할 수 있다.
JdbcCursorItemReader
- Cursor 기반의 JDBC 구현체로써, ResultSet과 함게 사용되며 Datasource에서 Connection을 얻어와서 SQL을 실행한다.
- Thread 안정성을 보장하지 않는다.
- 멀티 스레드 환경에서 사용할 경우 동시성 이슈가 발생하지 않도록 별도 동기화 처리가 필요하다.
Customer.java
package com.project.springbatch._53_reader_jdbcCursor;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Customer {
private Long Id;
private String firstName;
private String lastName;
private String birthdate;
}
JdbcCursorConfiguration.java
package com.project.springbatch._53_reader_jdbcCursor;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
/*
--job.name=jdbcCursorJob
*/
@RequiredArgsConstructor
@Configuration
public class JdbcCursorConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
public Job jdbcCursorJob() {
return jobBuilderFactory.get("jdbcCursorJob")
.incrementer(new RunIdIncrementer())
.start(jdbcCursorStep1())
.build();
}
@Bean
public Step jdbcCursorStep1() {
return stepBuilderFactory.get("jdbcCursorStep1")
.<Customer, Customer>chunk(10)
.reader(customJdbcCursorItemReader())
.writer(customJdbcCursorItemWriter())
.build();
}
@Bean
public JdbcCursorItemReader<Customer> customJdbcCursorItemReader() {
return new JdbcCursorItemReaderBuilder()
.name("jdbcCursorItemReader")
.fetchSize(10)
.sql("select id, firstName, lastName, birthdate from customer where firstName like ? order by lastName, firstName")
.beanRowMapper(Customer.class) // 자동으로 객체와 매핑
.queryArguments("A%")
.maxItemCount(3) // 조회할 최대 아이템 수
.currentItemCount(2) // 조회 Item의 시작 시점
.maxRows(100) // ResultSet이 포함할 수 있는 최대 row 수
.dataSource(dataSource)
.build();
}
@Bean
public ItemWriter<Customer> customJdbcCursorItemWriter() {
return items -> {
for (Customer item : items) {
System.out.println(item.toString());
}
};
}
}
1) 각 속성 설명
@Bean
public JdbcCursorItemReader<Customer> customJdbcCursorItemReader() {
return new JdbcCursorItemReaderBuilder()
.name("jdbcCursorItemReader")
.fetchSize(10)
.sql("select id, firstName, lastName, birthdate from customer where firstName like ? order by lastName, firstName")
.beanRowMapper(Customer.class) // 자동으로 객체와 매핑
.queryArguments("A%")
.maxItemCount(3) // 조회할 최대 아이템 수
.currentItemCount(2) // 조회 Item의 시작 시점
.maxRows(100) // ResultSet이 포함할 수 있는 최대 row 수
.dataSource(dataSource)
.build();
}
method | 설명 |
name | ItemReader 이름 |
fetchSize | 한번에 읽어올 데이터 갯수 - chunk size와 동일하게 설정하는 것을 추천한다. 모두 5라고 가정했을때, 5개의 데이터를 한번에 가져와서 5개씩 Commit을 수행하게된다. |
sql | 실행할 쿼리 |
beanRowMapper | 객체와 자동으로 매핑해주는 Mapper |
queryArguments | sql 쿼리에 사용될 쿼리 파라미터 설정 |
maxItemCount | 조회할 최대 아이템 갯수 |
currentItemCount | 조회 Item의 시작 시점 - 현재 ItemCount 갯수를 센다. MaxItemCount와 연동되어 사용되는데 만약 MaxItemCount이 20이고, CurrentItemCount가 20이면 더이상 읽어올 데이터가 없다. |
maxRows | ResultSet이 포함할 수 있는 최대 row 수 |
dataSource | 연결할 DB의 dataSource |
실행단계
단계 | 설명 |
JdbcCursorItemReader 생성 | JdbcCursorItemReaderBuilder > build() -> JdbcCursorItemReader 생성 |
executeQuery() | AbstractCursorItemReader > doOpen() > openCursor() - executeQuery() |
update() | AbstractItemContingItemStreamReader > update() - 상태정보 기록 |
doRead() | AbstractCursorItemReader > doRead() - doRead()를 호출하여 데이터를 하나씩 가져온다. - fetchSize가 10이므로 resultSet 에 10개가 담겨져있다. - rs.next()로 row 를 읽어온다. |
writer() | chunkSize 만큼 읽어오면 writer 로 수행 |
1) JdbcCursorItemReaderBuilder > build() -> JdbcCursorItemReader 생성
public JdbcCursorItemReader<T> build() {
if(this.saveState) {
Assert.hasText(this.name,
"A name is required when saveState is set to true");
}
Assert.hasText(this.sql, "A query is required");
Assert.notNull(this.dataSource, "A datasource is required");
Assert.notNull(this.rowMapper, "A rowmapper is required");
JdbcCursorItemReader<T> reader = new JdbcCursorItemReader<>();
if(StringUtils.hasText(this.name)) {
reader.setName(this.name);
}
reader.setSaveState(this.saveState);
reader.setPreparedStatementSetter(this.preparedStatementSetter);
reader.setRowMapper(this.rowMapper);
reader.setSql(this.sql);
reader.setCurrentItemCount(this.currentItemCount);
reader.setDataSource(this.dataSource);
reader.setDriverSupportsAbsolute(this.driverSupportsAbsolute);
reader.setFetchSize(this.fetchSize);
reader.setIgnoreWarnings(this.ignoreWarnings);
reader.setMaxItemCount(this.maxItemCount);
reader.setMaxRows(this.maxRows);
reader.setQueryTimeout(this.queryTimeout);
reader.setUseSharedExtendedConnection(this.useSharedExtendedConnection);
reader.setVerifyCursorPosition(this.verifyCursorPosition);
reader.setConnectionAutoCommit(this.connectionAutoCommit);
return reader;
}
2) AbstractCursorItemReader > doOpen() > openCursor() - executeQuery()
▶ AbstractCursorItemReader.java > doOpen()
@Override
protected void doOpen() throws Exception {
Assert.state(!initialized, "Stream is already initialized. Close before re-opening.");
Assert.isNull(rs, "ResultSet still open! Close before re-opening.");
initializeConnection();
openCursor(con);
initialized = true;
}
▶ JdbcCursorItemReader.java > openCursor()
@Override
protected void openCursor(Connection con) {
try {
if (isUseSharedExtendedConnection()) {
preparedStatement = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
else {
preparedStatement = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
}
applyStatementSettings(preparedStatement);
if (this.preparedStatementSetter != null) {
preparedStatementSetter.setValues(preparedStatement);
}
this.rs = preparedStatement.executeQuery();
handleWarnings(preparedStatement);
}
catch (SQLException se) {
close();
throw translateSqlException("Executing query", getSql(), se);
}
}
3) AbstractCursorItemReader.java > doRead()
@Nullable
@Override
protected T doRead() throws Exception {
if (rs == null) {
throw new ReaderNotOpenException("Reader must be open before it can be read.");
}
try {
if (!rs.next()) {
return null;
}
int currentRow = getCurrentItemCount();
T item = readCursor(rs, currentRow);
verifyCursorPosition(currentRow);
return item;
}
catch (SQLException se) {
throw translateSqlException("Attempt to process next row failed", getSql(), se);
}
}
▶ JdbcCursorItemReader.java > readCursor()
@Nullable
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
return rowMapper.mapRow(rs, currentRow);
}
JpaCursorItemReader
- SpringBatch 4.3 버전부터 지원된다.
- Cursor 기반의 JPA 구현체로써 EntityManagerFactory 객체가 필요하며 쿼리는 JPQL을 사용한다.
Customer.java
package com.project.springbatch._54_reader_jpaCursor;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
public class Customer {
@Id
@GeneratedValue
private Long Id;
private String firstname;
private String lastname;
private String birthdate;
}
JpaCursorConfiguration.java
package com.project.springbatch._54_reader_jpaCursor;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaCursorItemReader;
import org.springframework.batch.item.database.builder.JpaCursorItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.persistence.EntityManagerFactory;
import java.util.HashMap;
@RequiredArgsConstructor
@Configuration
public class JpaCursorConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job jpaCursorJob() {
return jobBuilderFactory.get("jpaCursorJob")
.incrementer(new RunIdIncrementer())
.start(jpaCursorStep1())
.build();
}
@Bean
public Step jpaCursorStep1() {
return stepBuilderFactory.get("jpaCursorStep1")
.<Customer, Customer>chunk(2)
.reader(customJpaCursorItemReader())
.writer(customJpaCursorItemWriter())
.build();
}
@Bean
public JpaCursorItemReader<Customer> customJpaCursorItemReader() {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("firstname", "A%");
return new JpaCursorItemReaderBuilder()
.name("jpaCursorItemReader")
.queryString("select c from Customer c where firstname like :firstname") // jpql
.entityManagerFactory(entityManagerFactory)
.parameterValues(parameters)
// .maxItemCount(10)
// .currentItemCount(2)
.build();
}
@Bean
public ItemWriter<Customer> customJpaCursorItemWriter() {
return items -> {
for (Customer item : items) {
System.out.println(item.toString());
}
};
}
}
1) 각 속성 설명
@Bean
public JpaCursorItemReader<Customer> customJpaCursorItemReader() {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("firstname", "A%");
return new JpaCursorItemReaderBuilder()
.name("jpaCursorItemReader")
.queryString("select c from Customer c where firstname like :firstname") // jpql
.entityManagerFactory(entityManagerFactory)
.parameterValues(parameters)
// .maxItemCount(10)
// .currentItemCount(2)
.build();
}
method | 설명 |
name | ItemReader 이름 |
queryString | 사용할 JPQL 쿼리문 |
entityManagerFactory | JPA를 사용하기 위한 EntityManagerFactory 설정 |
parameterValues | queryString에 사용된 쿼리의 파라미터 설정 |
수행단계
단계 | 설명 |
createQuery() | JpaCursorItemReader - doOpen() - createQuery() - 쿼리 수행 후, 결과를 query.getResultStream().iterator() 로 담는다. |
update() | JpaCursorItemReader - update() |
doRead() | JpaCursorItemReader - doRead() - open() 메서드에서 읽어온 모든 데이터를 하나씩 리턴한다. - 위 과정들이 chunk size 만큼 반복한다. |
1) JdbcCursorItemReader.java
▶ doOpen()
@Override
@SuppressWarnings("unchecked")
protected void doOpen() throws Exception {
this.entityManager = this.entityManagerFactory.createEntityManager();
if (this.entityManager == null) {
throw new DataAccessResourceFailureException("Unable to create an EntityManager");
}
if (this.queryProvider != null) {
this.queryProvider.setEntityManager(this.entityManager);
}
Query query = createQuery();
if (this.parameterValues != null) {
this.parameterValues.forEach(query::setParameter);
}
this.iterator = query.getResultStream().iterator();
}
▶ createQuery()
private Query createQuery() {
if (this.queryProvider == null) {
return this.entityManager.createQuery(this.queryString);
}
else {
return this.queryProvider.createQuery();
}
}
▶ doRead()
@Override
protected T doRead() {
return this.iterator.hasNext() ? this.iterator.next() : null;
}
반응형