[SpringBatch 실습] 29. 스프링 배치의 단일 스레드 vs 멀티 스레드 Job 수행하여 분석

반응형
728x90
반응형

단일스레드 vs 멀티스레드

  • 단일 스레드 : 프로세스 내 특정 작업을 처리하는 스레드가 하나인 경우
  • 멀티 스레드 : 프로세스 내 특정 작업을 처리하는 스레드가 여러개인 경우
  • 작업 처리에 있어서 단일 스레드와 멀티 스레드의 선택 기준은 "어떤 방식이 자원을 효율적으로 사용하고 성능 처리에 유연한가" 이다.
  • 일반적으로 복잡한 처리나 대용량 데이터를 다루는 작업일 경우 전체 소요 시간 및 성능상의 이점을 가져오기 위해 멀티 스레드 방식을 선택한다.
  • 멀티 스레드 처리 방식은 데이터 동기화 이슈가 존재하기 때문에 최대한 고려해서 결정해야한다.

 

 

스프링 배치 스레드 모델

스프링 배치는 기본적으로 단일 스레드 방식으로 작업을 처리한다.

성능 향상과 대규모 데이터 작업을 위한 비동기 처리 및 Scale out 기능을 제공한다.

 

아래의 방식들은 순서대로 공부해보자.
방식 설명
AsyncItemProcessor / AsyncItemWriter ItemProcessor에게 별도의 스레드가 할당되어 작업을 처리하는 방식
Multi-threaded Step Step 내 Chunk 구조인 ItemReader, ItemProcessor, ItemWriter 마다 여러 스레드가 할당되어 실행하는 방법
Remote Chunking 분산환경처럼 Step 처리가 여러 프로세스로 분할되어 외부의 다른 서버로 전송되어 처리하는 방식
Parallel Steps Step마다 스레드가 할당되어 여러 개의 Step을 병렬로 실행하는 방법
Partitioning Master/Slave 방식으로 Master가 데이터를 파티셔닝 한 다음 각 파티션에게 스레드를 할당하여 Slave가 독립적으로 작동하는 방식

 

 

AsyncItemProcessor / AsyncItemWriter

  • Step 안에서 ItemProcessor가 비동기적으로 동작하는 구조다.
  • AsyncItemProcessor 와 AsyncItemWriter가 함께 구성이 되야한다.
  • AsyncItemProcessor로부터 AsyncItemWriter가 받는 최종 결과값은 List<Future<T>> 타입이며, 비동기 실행이 완료될 때까지 대기한다.
  • 의존성을 추가해야한다.
implementation 'org.springframework.batch:spring-batch-integration'

 

 

단일 스레드 방식

job 생성

AsyncCustomer.java
package com.project.springbatch._42_async;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.Date;

@Data
@AllArgsConstructor
public class AsyncCustomer {
    // private String name;
    private final long id;
    private final String firstName;
    private final String lastName;
    private final Date birthdate;
}

 

AsyncCustomerRowMapper.java
package com.project.springbatch._42_async;

import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;

public class AsyncCustomerRowMapper implements RowMapper<AsyncCustomer> {
    @Override
    public AsyncCustomer mapRow(ResultSet rs, int i) throws SQLException {
        return new AsyncCustomer(rs.getLong("id"),
                rs.getString("firstName"),
                rs.getString("lastName"),
                rs.getDate("birthdate"));
    }
}

 

StopWatchJobListener.java
package com.project.springbatch._42_async;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class StopWatchJobListener implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {

    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
        System.out.println("time : " + time);
    }
}

 

postgresql 쿼리

▶ create table

create table customer
(
    id          serial constraint customer_pk primary key,
    "firstname" varchar,
    "lastname"  varchar,
    birthdate   varchar
);

create table customer2
(
    id          serial constraint customer2_pk primary key,
    "firstname" varchar,
    "lastname"  varchar,
    birthdate   varchar
);

 

▶ insert data

INSERT INTO customer (id,firstname,lastname,birthdate) VALUES (1,'Reed','Edwards','1952-08-16 12:34:53'),(2,'Hoyt','Park','1981-02-18 08:07:58'),(3,'Leila','Petty','1972-06-11 08:43:55'),(4,'Denton','Strong','1989-03-11 18:38:31'),(5,'Zoe','Romero','1990-10-02 13:06:31'),(6,'Rana','Compton','1957-06-09 12:51:11'),(7,'Velma','King','1988-02-02 05:52:25'),(8,'Uriah','Carter','1972-08-31 07:32:05'),(9,'Michael','Graves','1958-04-13 18:47:44'),(10,'Leigh','Stone','1967-06-23 23:41:43');
INSERT INTO customer (id,firstname,lastname,birthdate) VALUES (11,'Iliana','Glenn','1965-02-27 14:33:56'),(12,'Harrison','Haley','1956-06-28 03:15:41'),(13,'Leonard','Zamora','1956-03-28 15:03:09'),(14,'Hiroko','Wyatt','1960-08-22 23:53:50'),(15,'Cameron','Carlson','1969-05-12 11:10:09'),(16,'Hunter','Avery','1953-11-19 12:52:42'),(17,'Aimee','Cox','1976-10-15 12:56:50'),(18,'Yen','Delgado','1990-02-06 10:25:36'),(19,'Gemma','Peterson','1989-04-02 23:42:09'),(20,'Lani','Faulkner','1970-09-18 17:22:14');
INSERT INTO customer (id,firstname,lastname,birthdate) VALUES (21,'Iola','Cannon','1954-01-12 16:56:45'),(22,'Whitney','Shaffer','1951-03-19 01:27:18'),(23,'Jerome','Moran','1968-03-16 05:26:22'),(24,'Quinn','Wheeler','1979-06-19 16:24:22'),(25,'Mira','Wilder','1961-12-27 12:11:07'),(26,'Tobias','Holloway','1968-08-13 20:36:19'),(27,'Shaine','Schneider','1958-03-08 09:47:10'),(28,'Harding','Gonzales','1952-04-11 02:06:25'),(29,'Calista','Nieves','1970-02-17 13:29:59'),(30,'Duncan','Norman','1987-09-13 00:54:49');
INSERT INTO customer (id,firstname,lastname,birthdate) VALUES (31,'Fatima','Hamilton','1961-06-16 14:29:11'),(32,'Ali','Browning','1979-03-27 17:09:37'),(33,'Erin','Sosa','1990-08-23 10:43:58'),(34,'Carol','Harmon','1972-01-14 07:19:39'),(35,'Illiana','Fitzgerald','1970-08-19 02:33:46'),(36,'Stephen','Riley','1954-06-05 08:34:03'),(37,'Hermione','Waller','1969-09-08 01:19:07'),(38,'Desiree','Flowers','1952-06-25 13:34:45'),(39,'Karyn','Blackburn','1977-03-30 13:08:02'),(40,'Briar','Carroll','1985-03-26 01:03:34');
INSERT INTO customer (id,firstname,lastname,birthdate) VALUES (41,'Chaney','Green','1987-04-20 18:56:53'),(42,'Robert','Higgins','1985-09-26 11:25:10'),(43,'Lillith','House','1982-12-06 02:24:23'),(44,'Astra','Winters','1952-03-13 01:13:07'),(45,'Cherokee','Stephenson','1955-10-23 16:57:33'),(46,'Yuri','Shaw','1958-07-14 15:10:07'),(47,'Boris','Sparks','1982-01-01 10:56:34'),(48,'Wilma','Blake','1963-06-07 16:32:33'),(49,'Brynne','Morse','1964-09-21 01:05:25'),(50,'Ila','Conley','1953-11-02 05:12:57');
INSERT INTO customer (id,firstname,lastname,birthdate) VALUES (51,'Sharon','Watts','1964-01-09 16:32:37'),(52,'Kareem','Vaughan','1952-04-18 15:37:10'),(53,'Eden','Barnes','1954-07-04 01:26:44'),(54,'Kenyon','Fulton','1975-08-23 22:17:52'),(55,'Mona','Ball','1972-02-11 04:15:45'),(56,'Moses','Cortez','1979-04-24 15:26:46'),(57,'Macy','Banks','1956-12-31 00:41:15'),(58,'Brenna','Mendez','1972-10-02 07:58:27'),(59,'Emerald','Ewing','1985-11-28 21:15:20'),(60,'Lev','Mcfarland','1951-05-20 14:30:07');
INSERT INTO customer (id,firstname,lastname,birthdate) VALUES (61,'Norman','Tanner','1959-07-29 15:41:45'),(62,'Alexa','Walters','1977-12-06 16:41:17'),(63,'Dara','Hyde','1989-08-04 14:06:43'),(64,'Hu','Sampson','1978-11-01 17:10:23'),(65,'Jasmine','Cardenas','1969-02-15 20:08:06'),(66,'Julian','Bentley','1954-07-11 03:27:51'),(67,'Samson','Brown','1967-10-15 07:03:59'),(68,'Gisela','Hogan','1985-01-19 03:16:20'),(69,'Jeanette','Cummings','1986-09-07 18:25:52'),(70,'Galena','Perkins','1984-01-13 02:15:31');
INSERT INTO customer (id,firstname,lastname,birthdate) VALUES (71,'Olga','Mays','1981-11-20 22:39:27'),(72,'Ferdinand','Austin','1956-08-08 09:08:02'),(73,'Zenia','Anthony','1964-08-21 05:45:16'),(74,'Hop','Hampton','1982-07-22 14:11:00'),(75,'Shaine','Vang','1970-08-13 15:58:28'),(76,'Ariana','Cochran','1959-12-04 01:18:36'),(77,'India','Paul','1963-10-10 05:24:03'),(78,'Karina','Doyle','1979-12-01 00:05:21'),(79,'Delilah','Johnston','1989-03-04 23:50:01'),(80,'Hilel','Hood','1959-08-22 06:40:48');
INSERT INTO customer (id,firstname,lastname,birthdate) VALUES (81,'Kennedy','Hoffman','1963-10-14 20:18:35'),(82,'Kameko','Bell','1976-06-08 15:35:54'),(83,'Lunea','Gutierrez','1964-06-07 16:21:24'),(84,'William','Burris','1980-05-01 17:58:23'),(85,'Kiara','Walls','1955-12-27 18:57:15'),(86,'Latifah','Alexander','1980-06-19 10:39:50'),(87,'Keaton','Ward','1964-10-12 16:03:18'),(88,'Jasper','Clements','1970-03-05 00:29:49'),(89,'Claire','Brown','1972-02-11 00:43:58'),(90,'Noble','Morgan','1955-09-05 05:35:01');
INSERT INTO customer (id,firstname,lastname,birthdate) VALUES (91,'Evangeline','Horn','1952-12-28 14:06:27'),(92,'Jonah','Harrell','1951-06-25 17:37:35'),(93,'Mira','Espinoza','1982-03-26 06:01:16'),(94,'Brennan','Oneill','1979-04-23 08:49:02'),(95,'Dacey','Howe','1983-02-06 19:11:00'),(96,'Yoko','Pittman','1982-09-12 02:18:52'),(97,'Cody','Conway','1971-05-26 07:09:58'),(98,'Jordan','Knowles','1981-12-30 02:20:01'),(99,'Pearl','Boyer','1957-10-19 14:26:49'),(100,'Keely','Montoya','1985-03-24 01:18:09');

 

AsyncConfiguration.java
package com.project.springbatch._42_async;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.PostgresPagingQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/*
 --job.name=asyncJob
  */
@Configuration
@RequiredArgsConstructor
public class AsyncConfiguration {
    // job 생성
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private final DataSource dataSource;

    @Bean
    public Job asyncJob() throws Exception {
        return this.jobBuilderFactory.get("asyncJob")
                /* step start */
                .incrementer(new RunIdIncrementer())
                .start(syncStep1())
                .listener(new StopWatchJobListener())
                .build();
    }

    /**
     * 동기 방식
     * 약간의 sleep 을 줬는데, 그만큼의 시간이 더 걸린다.
     * @return
     */
    @Bean
    public Step syncStep1() {
        return stepBuilderFactory.get("syncStep1")
                .<AsyncCustomer, AsyncCustomer>chunk(100)
                .reader(pagingItemReader())
                .processor(asyncCustomItemProcessor())
                .writer(asyncCustomItemWriter())
                .build();
    }

    @Bean
    public JdbcPagingItemReader<AsyncCustomer> pagingItemReader() {
        JdbcPagingItemReader<AsyncCustomer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(this.dataSource);
        reader.setPageSize(100);
        reader.setRowMapper(new AsyncCustomerRowMapper());

        PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from customer");

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        reader.setQueryProvider(queryProvider);

        return reader;
    }

    /**
     * 동기 Processor
     * @return
     */
    @Bean
    public ItemProcessor asyncCustomItemProcessor() {
        return new ItemProcessor<AsyncCustomer, AsyncCustomer>() {
            @Override
            public AsyncCustomer process(AsyncCustomer item) throws Exception {
                Thread.sleep(30); // 0.01초

                return new AsyncCustomer(item.getId(),
                        item.getFirstName().toUpperCase(),
                        item.getLastName().toUpperCase(),
                        item.getBirthdate());
            }
        };
    }

    /**
     * 동기 Writer
     * @return
     */
    @Bean
    public JdbcBatchItemWriter asyncCustomItemWriter() {
        JdbcBatchItemWriter<AsyncCustomer> itemWriter = new JdbcBatchItemWriter<>();

        itemWriter.setDataSource(this.dataSource);
        itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
        itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }
}

 

1) JdbcPagingItemReader 관련하여 아래 포스팅을 참고하자.

https://devfunny.tistory.com/828

 

[SringBatch 실습] 27. Paging - JdbcPagingItemReader, JpaPagingItemReader

Paging 방식 페이징 단위로 데이터를 조회하는 방식으로 Page Size만큼 한번에 메모리로 가지고 온 다음 한개씩 읽는다. 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 대량의 데이터를 처리

devfunny.tistory.com

@Bean
public JdbcPagingItemReader<AsyncCustomer> pagingItemReader() {
    JdbcPagingItemReader<AsyncCustomer> reader = new JdbcPagingItemReader<>();

    reader.setDataSource(this.dataSource);
    reader.setPageSize(100);
    reader.setRowMapper(new AsyncCustomerRowMapper());
    
    ...
}

 

2) asyncCustomItemProcessor() 에서 시간차를 둔다.

Thread.sleep(30); // 0.01초

 

3) asyncCustomItemWriter() 수행으로 CUSTOMER2 테이블에 데이터가 정상적으로 저장되었다.

@Bean
public JdbcBatchItemWriter asyncCustomItemWriter() {
    JdbcBatchItemWriter<AsyncCustomer> itemWriter = new JdbcBatchItemWriter<>();

    itemWriter.setDataSource(this.dataSource);
    itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
    itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
    itemWriter.afterPropertiesSet();

    return itemWriter;
}

 

Customer2

 

수행결과
time : 3509 mills

 

 

멀티 스레드 방식

AsyncConfiguration.java
package com.project.springbatch._42_async;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.PostgresPagingQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/*
 --job.name=asyncJob
  */
@Configuration
@RequiredArgsConstructor
public class AsyncConfiguration {
    // job 생성
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private final DataSource dataSource;

    @Bean
    public Job asyncJob() throws Exception {
        return this.jobBuilderFactory.get("asyncJob")
                /* step start */
                .incrementer(new RunIdIncrementer())
//                .start(syncStep1())
                .start(asyncStep2())
                .listener(new StopWatchJobListener())
                .build();
    }

    /**
     * 동기 방식
     * 약간의 sleep 을 줬는데, 그만큼의 시간이 더 걸린다.
     * @return
     */
    @Bean
    public Step syncStep1() {
        return stepBuilderFactory.get("syncStep1")
                .<AsyncCustomer, AsyncCustomer>chunk(100)
                .reader(pagingItemReader())
                .processor(asyncCustomItemProcessor())
                .writer(asyncCustomItemWriter())
                .build();
    }

    /**
     * 비동기 방식
     * @return
     * @throws Exception
     */
    @Bean
    public Step asyncStep2() throws Exception {
        return stepBuilderFactory.get("asyncStep2")
                .<AsyncCustomer, AsyncCustomer>chunk(100)
                .reader(pagingItemReader())
                .processor(asyncItemProcessor())
                .writer(asyncItemWriter())
                .build();
    }

    @Bean
    public JdbcPagingItemReader<AsyncCustomer> pagingItemReader() {
        JdbcPagingItemReader<AsyncCustomer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(this.dataSource);
        reader.setPageSize(100);
        reader.setRowMapper(new AsyncCustomerRowMapper());

        PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from customer");

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        reader.setQueryProvider(queryProvider);

        return reader;
    }

    /**
     * 동기 Processor
     * @return
     */
    @Bean
    public ItemProcessor asyncCustomItemProcessor() {
        return new ItemProcessor<AsyncCustomer, AsyncCustomer>() {
            @Override
            public AsyncCustomer process(AsyncCustomer item) throws Exception {
                Thread.sleep(30); // 0.01초

                return new AsyncCustomer(item.getId(),
                        item.getFirstName().toUpperCase(),
                        item.getLastName().toUpperCase(),
                        item.getBirthdate());
            }
        };
    }

    /**
     * 동기 Writer
     * @return
     */
    @Bean
    public JdbcBatchItemWriter asyncCustomItemWriter() {
        JdbcBatchItemWriter<AsyncCustomer> itemWriter = new JdbcBatchItemWriter<>();

        itemWriter.setDataSource(this.dataSource);
        itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
        itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }

    /**
     * 비동기 Processor
     * @return
     * @throws Exception
     */
    @Bean
    public AsyncItemProcessor asyncItemProcessor() throws Exception {
        AsyncItemProcessor<AsyncCustomer, AsyncCustomer> asyncItemProcessor = new AsyncItemProcessor();

        asyncItemProcessor.setDelegate(asyncCustomItemProcessor()); // 위임
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());

        return asyncItemProcessor;
    }

    /**
     * 비동기 Writer
     * @return
     * @throws Exception
     */
    @Bean
    public AsyncItemWriter asyncItemWriter() throws Exception {
        AsyncItemWriter<AsyncCustomer> asyncItemWriter = new AsyncItemWriter<>();

        asyncItemWriter.setDelegate(asyncCustomItemWriter());

        return asyncItemWriter;
    }
}

1) 동작 설명

@Bean
public Step asyncStep2() throws Exception {
    return stepBuilderFactory.get("asyncStep2")
            .<AsyncCustomer, AsyncCustomer>chunk(100)
            .reader(pagingItemReader())
            .processor(asyncItemProcessor())
            .writer(asyncItemWriter())
            .build();
}

- ItemReader 설정

  • 비동기 실행이 아니다.

- AsyncItemProcessor() 

  • 스레드 풀 개수만큼 스레드가 생성되어 비동기로 실행된다.
  • 내부적으로 실제 ItemProcessor에게 실행을 위임하고 결과를 Future에 저장한다.

- AsyncITemWriter()

  • 비동기 실행 결과 값들을 모두 받아올때까지 대기한다.
  • 내부적으로 실제 ItemWriter에게 최종 결과값을 넘겨주고 실행을 위임한다.

 

수행결과
time : 133 mills

 

 

디버깅

▶ asyncItemProcessor()

@Bean
public AsyncItemProcessor asyncItemProcessor() throws Exception {
    AsyncItemProcessor<AsyncCustomer, AsyncCustomer> asyncItemProcessor = new AsyncItemProcessor();

    asyncItemProcessor.setDelegate(asyncCustomItemProcessor()); // 위임
    asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());

    return asyncItemProcessor;
}

1) AsyncItemProcessor.java > process()

@Nullable
public Future<O> process(final I item) throws Exception {
   final StepExecution stepExecution = getStepExecution();
   FutureTask<O> task = new FutureTask<>(new Callable<O>() {
      public O call() throws Exception {
         if (stepExecution != null) {
            StepSynchronizationManager.register(stepExecution);
         }
         try {
            return delegate.process(item);
         }
         finally {
            if (stepExecution != null) {
               StepSynchronizationManager.close();
            }
         }
      }
   });
   taskExecutor.execute(task);
   return task;
}

TaskExecutor로 비동기 실행을 위한 스레드를 만들고, 해당 스레드는 FutureTask를 실행한다.

  • 여기서 TaskExecutor는 setTaskExecutor로 설정해줬다.
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());

 

  • FutureTask는 Callable 인터페이스를 실행하면서 그 안에서 ItemProcessor가 작업을 처리하게된다. 
return delegate.process(item);

 

  • 우리가 여기서 셋팅해준 ItemProcessor가 작업을 처리한다.
asyncItemProcessor.setDelegate(asyncCustomItemProcessor()); // 위임

 

▶ asyncItemWriter()

@Bean
public AsyncItemWriter asyncItemWriter() throws Exception {
    AsyncItemWriter<AsyncCustomer> asyncItemWriter = new AsyncItemWriter<>();

    asyncItemWriter.setDelegate(asyncCustomItemWriter());

    return asyncItemWriter;
}

1) AsyncItemWriter.java > write()

public void write(List<? extends Future<T>> items) throws Exception {
   List<T> list = new ArrayList<>();
   for (Future<T> future : items) {
      try {
         T item = future.get();

         if(item != null) {
            list.add(future.get());
         }
      }
      catch (ExecutionException e) {
         Throwable cause = e.getCause();

         if(cause != null && cause instanceof Exception) {
            logger.debug("An exception was thrown while processing an item", e);

            throw (Exception) cause;
         }
         else {
            throw e;
         }
      }
   }
   
   delegate.write(list);
}

Processor에서 작업중인 비동기 실행의 결과값들을 받을때까지 대기한다.

T item = future.get();

 

 

최종결과

실행 방식 수행시간
단일 스레드  3509 milis
멀티 스레드 133 milis

 

 

반응형

Designed by JB FACTORY