[JAVA8 병렬프로그래밍] Executors 클래스, ExecutorService 인터페이스

반응형
728x90
반응형

Executors 클래스

- Executor 인터페이스 : 컨커런트 API의 핵심 인터페이스다. 이 인터페이스를 구현한 여러 종류의 클래스를 기본으로 제공한다.

- 스레드 풀 : 스레드를 관리하기 위한 풀이다. 병렬 프로그래밍에서 스레드를 관리하기 위한 기능을 제공한다.

- 포크/조인 프레임워크 : JDK7에서 새롭게 선보인 포크/조인 프레임워크를 이용하면 스레드 간의 대기와 연관 관계 등을 정의할 수 있다.

 

java.util.concurrent 패키지에서 제공하는 Executor 인터페이스
인터페이스 설명
Executor 새로운 태스크를 생성하는데 가장 기본이 되는 인터페이스다.
ExecutorService Executor 인터페이스의 하위 인터페이스다. Executor 인터페이스에서 제공하는 기능 외에 작업(태스크)의 생명주기를 관리하는 기능을 제공한다.
ScheduledExecutorService ExecutorService 인터페이스의 하위 인터페이스이다. ExecutorService 인터페이스에서 제공하는 기능 외에 주기적으로 실행되거나 일정 시간 후에 실행할 수 있는 기능을 제공한다.

 

 

 

Executor 인터페이스

태스크를 실행하는데 가장 기본이 되는 인터페이스다. execute() 메서드 하나만 제공하여, 함수형 인터페이스다. 

package org.example.executor;

import java.util.concurrent.Executor;

public class ExecutorExample implements Executor {
    @Override
    public void execute(Runnable task) {
        // Runnable 인터페이스를 직접 실행한다.
        task.run();

        // Thread를 생성해서 실행한다.
        //new Thread(task).start();
    }

    public static void main(String[] args) {
        Executor executor = new ExecutorExample();
        executor.execute(() -> System.out.println("Hello, Executor!!"));
    }
}

Executor 인터페이스가 컨커런트 API의 최상위 인터페이스여서 가장 기본이 되는 메서드만 정의한다. 실제로 Executor 인터페이스를 직접 상속하고 정의할 일은 스레드를 생성하는 것으로 끝나기 때문에 거의 없다. 

 

 

ExecutorService 인터페이스

Executor 인터페이스를 상속하였고, 기본 제공하는 execute 메서드 외에 스레드를 생성하고 이를 관리하기 위한 메서드를 추가로 정의해놓았다. 가장 보편적이고 일반적으로 사용하는 방법이다.

제공 내용
ExecutorSErvice 컨커런트 API에서 가장 많이 사용되는 핵심 인터페이스
Executors ExecutorService 인터페이스를 직접 구현하기에는 어렵기 때문에 Executors 클래스를 사용하여 병렬 처리에 필요한 여러가지 패턴을 사전에 정의해서 제공
TimeUnit 컨커런트 API에 추가된 시간 관련 클래스

 

 

Executors의 메서드

Executors 클래스는 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, Callbable 클래스를 위한 팩터리와 유틸리티 메서드를 제공한다. 리턴 타입이 ExecutorService인 메서드를 유심히 보자.

메서드 설명
newSingleThreadExecutor - 오직 하나의 스레드로 처리하며 나머지 스레드 생성 요청은 현재 스레드가 종료될때까지 대기한다. 
- 현재 메인 클래스에서 오직 하나의 스레드로 작업을 수행할 때 안전하게 사용할 수 있으나, 여러개의 스레드를 생성할 수 없다.
newFixedThreadPool - 입력 파라미터로 생성할 스레드 풀의 크기를 정의한다. 스레드 풀의 크기 내에서 스레드가 생성되어 병렬 처리된다.
- 스레드 풀의 크기를 넘으면 풀에 여유가 생길때까지 대기한다.
newCachedThreadPool - 멀티 스레드 처리를 위한 스레드 풀을 생성하되 기존에 생성한 스레드를 가능한한 재사용한다.
- 멀티 스레드 기반으로 동작한다는 점에서 newFixedThreadpool과 동일하지만, 등록한 스레드를 모두 한번에 실행시키며 동시 처리에 대한 개수 제한이 없다.
newWorkStealingPool - 스레드 풀을 생성하며, 실행되는 하드웨어의 사용 가능한 모든 프로세스(CPU)의 코어를 쓰도록 병럴 처리 레벨을 설정한다.
- 해당 하드웨어의 자원을 모두 선점하려고 하기 때문에 다른 프로세스 혹은 애플리케이션 성능에 영향을 끼친다.
unconfigurableExecutorService - 메서드의 입력 파라미터로 반드시 ExecutorService 객체를 전달해야한다. 그리고 해당 객체를 표준 ExecutorService 객체로 위임해서 결과를 리턴한다.
- ExecutorService를 구현한 여러 클래스의 기능 중 ExecutorService의 메서드만 호출하고 나머지 기능을 사용하지 못하도록 제한할 필요가 있을때 사용한다.

 

 

ExecutorServiceExample.java
package org.example.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceExample {
    public static void main(String argc[]) {
        // 객체를 생성한다.
//    ExecutorService execService = Executors.newSingleThreadExecutor();
//    ExecutorService execService = Executors.newFixedThreadPool(2);
        ExecutorService execService = Executors.newCachedThreadPool();
        // 쓰레드를 생성하고 실행시킨다.
        execService.execute(new MyTask("TODO 1"));
        execService.execute(new MyTask("TODO 2"));
        execService.execute(new MyTask("TODO 3"));
        // ExecutorService를 종료한다.
        execService.shutdown();
    }
}

 

MyTask.java
package org.example.executor;

import java.util.concurrent.TimeUnit;

public class MyTask implements Runnable {
    private String id;

    // for 루프를 이용해서 메시지를 10번 출력한다.
    @Override
    public void run() {
        for (int i = 0 ; i < 5 ; i++) {
            System.out.println("Task ID : " + id + ", running ... " + i);

            try {
                // 1초간 대기한다.
                /* TimeUnit : 컨커런트 API에 추가된 시간 관련 클래스 */
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public MyTask(String id) {
        this.id = id;
    }
}

 

 

newSingleThreadExecutor 메서드

package org.example.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceExample {
    public static void main(String argc[]) {
        // 객체를 생성한다.
    ExecutorService execService = Executors.newSingleThreadExecutor();
//    ExecutorService execService = Executors.newFixedThreadPool(2);
//    ExecutorService execService = Executors.newCachedThreadPool();
        // 쓰레드를 생성하고 실행시킨다.
        execService.execute(new MyTask("TODO 1"));
        execService.execute(new MyTask("TODO 2"));
        execService.execute(new MyTask("TODO 3"));
        // ExecutorService를 종료한다.
        execService.shutdown();
    }
}

 

실행결과
Task ID : TODO 1, running ... 0
Task ID : TODO 1, running ... 1
Task ID : TODO 1, running ... 2
Task ID : TODO 1, running ... 3
Task ID : TODO 1, running ... 4
Task ID : TODO 2, running ... 0
Task ID : TODO 2, running ... 1
Task ID : TODO 2, running ... 2
Task ID : TODO 2, running ... 3
Task ID : TODO 2, running ... 4
Task ID : TODO 3, running ... 0
Task ID : TODO 3, running ... 1
Task ID : TODO 3, running ... 2
Task ID : TODO 3, running ... 3
Task ID : TODO 3, running ... 4

 

오직 하나의 스레드만 처리될 수 있도록 스레드 풀을 생성한다. 따라서 위 결과는 순차적으로 처리되어, 등록된 3개의 스레드가 동시에 처리되지 않고 등록된 순서대로 실행되었다. 

 

 

new FixedThreadPool 메서드

package org.example.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceExample {
    public static void main(String argc[]) {
        // 객체를 생성한다.
//    ExecutorService execService = Executors.newSingleThreadExecutor();
    ExecutorService execService = Executors.newFixedThreadPool(2);
//    ExecutorService execService = Executors.newCachedThreadPool();
        // 쓰레드를 생성하고 실행시킨다.
        execService.execute(new MyTask("TODO 1"));
        execService.execute(new MyTask("TODO 2"));
        execService.execute(new MyTask("TODO 3"));
        // ExecutorService를 종료한다.
        execService.shutdown();
    }
}

 

실행결과
Task ID : TODO 2, running ... 0
Task ID : TODO 1, running ... 0
Task ID : TODO 2, running ... 1
Task ID : TODO 1, running ... 1
Task ID : TODO 2, running ... 2
Task ID : TODO 1, running ... 2
Task ID : TODO 2, running ... 3
Task ID : TODO 1, running ... 3
Task ID : TODO 2, running ... 4
Task ID : TODO 1, running ... 4
Task ID : TODO 3, running ... 0
Task ID : TODO 3, running ... 1
Task ID : TODO 3, running ... 2
Task ID : TODO 3, running ... 3
Task ID : TODO 3, running ... 4

2개의 스레드가 동시에 실행되도록 스레드 풀을 만든다. 만약 등록된 스레드가 2개 이상이라면 2개는 동시에 처리되고 나머지는 종료될때까지 대기한다.

 

TODO1, TODO2 스레드가 동시에 실행되어 교차 출력되고, 그 이후 TODO3은 대기하고 있다가 앞의 스레드가 종료되면 실행된다.

 

 

newCachedThreadPool 메서드

package org.example.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceExample {
    public static void main(String argc[]) {
        // 객체를 생성한다.
//    ExecutorService execService = Executors.newSingleThreadExecutor();
//    ExecutorService execService = Executors.newFixedThreadPool(2);
    ExecutorService execService = Executors.newCachedThreadPool();
        // 쓰레드를 생성하고 실행시킨다.
        execService.execute(new MyTask("TODO 1"));
        execService.execute(new MyTask("TODO 2"));
        execService.execute(new MyTask("TODO 3"));
        // ExecutorService를 종료한다.
        execService.shutdown();
    }
}

 

실행결과
Task ID : TODO 1, running ... 0
Task ID : TODO 3, running ... 0
Task ID : TODO 2, running ... 0
Task ID : TODO 3, running ... 1
Task ID : TODO 2, running ... 1
Task ID : TODO 1, running ... 1
Task ID : TODO 2, running ... 2
Task ID : TODO 3, running ... 2
Task ID : TODO 1, running ... 2
Task ID : TODO 1, running ... 3
Task ID : TODO 2, running ... 3
Task ID : TODO 3, running ... 3
Task ID : TODO 1, running ... 4
Task ID : TODO 2, running ... 4
Task ID : TODO 3, running ... 4

실행하는 스레드 수의 제한 없이 등록한 모든 스레드를 동시에 처리한다.

 

 

 

ScheduledExecutorService 인터페이스

ScheduledExecutorService는 ExecutorService의 하위 인터페이스로, ExecutorService 인터페이스에서 제공하는 기능 외에 추가적으로 Callable 혹은 Runnable 태스크를 특정 시간 이후에 실행하도록 구현할 수 있다. 또한 scheduleAtFixedRate 메서드나 scheduleWithFixeddelay 메서드를 이용하면 태스크를 주기적으로 실행할 수도 있다.

 

주기적으로 태스크를 실행시켜야 하거나 특정 시간에 태스크가 실행되도록 예약하는 기능을 구현할때 주로 사용한다. 이 인터페이스 역시 컨커런트 API에서 제공하는 Executors 클래스를 이용해서 생성하는 것이 일반적이다.

 

메서드 설명
newScheduledThreadPool - 스레드가 특정 시간 이후, 혹은 일정 시간 간격으로 실행되도록 하는 스레드 풀을 생성한다.
- 스레드 풀의 크기를 지정한다.
newSingleThreadScheduledExecutor - 스레드가 특정 시간 이후, 혹은 일정 시간 간격으로 실행되도록 하는 스레드 풀을 생성한다. 
- 하나의 스레드만 실행되며 나머지 스레드는 실행 시간이 지정되더라도 현재 실행중인 스레드가 종료될때까지 대기한다.
unconfigurableScheduledExecutorService - 메서드의 입력 파라미터로 반드시 ScheduledExecutorService 객체를 전달해야한다. 그리고 해당 객체를 표준 ScheduledExecutorService 객체로 위임해서 결과를 리턴한다.
- ScheduledExecutorService 를 구현한 여러 클래스의 기능 중 ExecutorService의 메서드만을 호출하고 나머지 기능을 제한할 필요가 있을때 사용한다.

 

 

newSingleThreadScheduledExecutor 메서드

package org.example.executor;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DelayedTaskExample {
    public static void main(String[] args) {
        // ScheduledExecutorService 객체 생성
        ScheduledExecutorService exeService =
                Executors.newSingleThreadScheduledExecutor();

        // 쓰레드 3개 등록 및 실행
        exeService.schedule(
                () -> System.out.println("TODO 1"), 10, TimeUnit.SECONDS);
        exeService.schedule(
                () -> System.out.println("TODO 2"), 20, TimeUnit.SECONDS);
        exeService.schedule(
                () -> System.out.println("TODO 3"), 30, TimeUnit.SECONDS);
    }
}

스레드를 예약해서 실행할 수 있으며, 한번에 오직 한개의 스레드만 실행한다. 위의 예제는 등록된 3개의 스레드가 순차적으로 그리고 무한 반복하며 실행된다.

 

 

 

ScheduledExecutorService 정책 설정

ScheduledExecutorService는 태스크를 등록할때 여러가지 정책을 설정할 수 있다.

 

1) schedule(Runnable command, long delay, TimeUnit unit)

Runnable 클래스를 delay 시간만큼 후에 실행한다. 한번만 실행되며 반복 호출하지 않는다.

delay command

 

2) schedule(Callable<V> callable, long delay, TimeUnit unit)

Callable 클래스를 delay 시간만큼 후에 실행한다. 한번만 실행되며 반복 호출하지 않는다.

 

3) scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

Runnable 클래스를 두번째 파라미터인 initialDelay 값만큼 대기했다가 실행한다. 종료되면서 다시 세번째 파라미터인 delay 시간만큼 대기했다가 실행하는 것을 반복한다. 그리고 마지막 TimeUnit은 앞의 initialDelay, delay 에서 사용할 시간 단위를 의미한다.

initialDelay command        
    delay command    
        delay command

 

4) scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

Runnable 클래스를 두번째 파라미터인 initialDelay 값만큼 대기했다가 실행하고 종료 여부와 상관 없이 다시 세번째 파라미터인 period 값 주기로 반복해서 실행한다. scheduleWithFixedDelay와 다른 점은 스레드 종료 여부와는 상관 없이 period 값만큼 반복한다는 점이다.

initialDelay command      
  period command    
    period command  
      period command

 

 

스케줄 작업 구현

별도의 쿼츠와 같은 라이브러리를 이용하지 않아도 스케줄 작업 구현이 가능하다.

package org.example.executor;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class PeriodTaskExample {

    public static void main(String[] args) {
        // ScheduledExecutorService 객체 생성
        ScheduledExecutorService exeService =
                Executors.newScheduledThreadPool(2);

        // 5초 후에 실행, 종료 후 10초 대기 후 반복 실행
        exeService.scheduleWithFixedDelay(
                new MyTask("Delayed 1"), 5, 10, TimeUnit.SECONDS);

        // 5초 후에 실행, 10초 주기로 반복 실행
        exeService.scheduleAtFixedRate(
                new MyTask("Rate 1"), 5, 10, TimeUnit.SECONDS);
        
        // 5초 후에 실행, 10초 주기로 반복 실행
        exeService.scheduleAtFixedRate(
                new MyTask("Rate 2"), 5, 10, TimeUnit.SECONDS);
    }
}

 

 

 

TimeUnit

컨커런트 API에서 제공하는 TimeUnit은 7개의 속성을 제공한다.

속성 설명
DAYS 하루를 의미하며, 24시간과 동일하다. (설정할 수 있는 최댓값)
HOURS 한시간을 의미하며, 60분과 동일하다.
MINUTES 일분을 의미하며, 60초와 동일하다.
SECONDS 일초를 의미한다.
MILLISECONDS 1/1000 초를 의미한다. 1초는 1000밀리초이다.
MICROSECONDS 1/1000 밀리초를 의미한다. 1밀리초는 1000마이크로초이다.
NANOSECONDS 1/1000 마이크로초를 의미한다. 1마이크로초는 1000나노초이다.

 

TimeUnit.java
package java.util.concurrent;

import...

public enum TimeUnit {
    /**
     * Time unit representing one thousandth of a microsecond.
     */
    NANOSECONDS(TimeUnit.NANO_SCALE),
    /**
     * Time unit representing one thousandth of a millisecond.
     */
    MICROSECONDS(TimeUnit.MICRO_SCALE),
    /**
     * Time unit representing one thousandth of a second.
     */
    MILLISECONDS(TimeUnit.MILLI_SCALE),
    /**
     * Time unit representing one second.
     */
    SECONDS(TimeUnit.SECOND_SCALE),
    /**
     * Time unit representing sixty seconds.
     * @since 1.6
     */
    MINUTES(TimeUnit.MINUTE_SCALE),
    /**
     * Time unit representing sixty minutes.
     * @since 1.6
     */
    HOURS(TimeUnit.HOUR_SCALE),
    /**
     * Time unit representing twenty four hours.
     * @since 1.6
     */
    DAYS(TimeUnit.DAY_SCALE);
    
    ...
    
}

 

  • 과거
Thread.sleep(1000); / 1,000밀리초동안 대기한다.

 

  • TimeUnit 사용
TimeUnit.SECONDS.sleep(1); // 1초 동안 대기한다.
TimeUnit.MILLISECONDS.sleep(1000); // 1,000밀리초 동안 대기한다.

 

 

 

반응형

Designed by JB FACTORY