[JAVA8 병렬프로그래밍] 포크/조인 프레임워크

반응형
728x90
반응형

포크/조인 프레임워크

컨커런트 API를 최종적으로 완성한 것은 자바 7에서 제공한 포크/조인 프레임워크다. 포크/조인 프레임워크는 java.util.concurrent 패키지의 핵심인 ExecutorService 인터페이스를 구현한 클래스다. 새로운 기능이 아니라, 인터페이스의 구현체가 추가된 것으로 이해하면 된다. 이 프레임워크의 주된 목적은 멀티 프로세서 혹은 멀티 코어를 가지고 있는 하드웨어 자원을 최대한 효율적으로 활용해서 병렬 처리가 가능하도록 하는 것이다. 

 

- 포크 (Fork) : 다른 프로세스 혹은 스레드(태스크)를 여러 개로 쪼개서 새롭게 생성한다는 의미이다.
- 조인 (Join) : 포크해서 실행한 프로세스 혹은 스레드(태스크)의 결과를 취합한다는 의미이다.

 

어떤 할일이 있을때 그 일을 나눠서 할 수 있다면 여러개로 쪼개서 실행하고 (Fork), 최종적으로 실행이 완료된 내용을 취합(Join)해서 그 결과를 만들어내는 것이다.

 

https://www.geeksforgeeks.org/difference-between-fork-join-framework-and-executorservice-in-java/

 

하나의 큰 작업이 생성되면 해당 작업을 분할할 수 있는지 판단하고, 분할할 수 있다면 하위 작업으로 분할한다. 그리고 분할할 수 있을 때까지 계속 하위 작업으로 분할한다. 이후 분할된 모든 하위 작업이 완료될 때까지 대기한 후에 그 결과를 취합해서 최종 결과물로 만든다.

 

하위 작업(서브 태스크)으로 분할해서 동시에 병렬 처리하면 멀티 프로세서, 멀티 코어 기반의 하드웨어의 성능이 뒷받침된다면 처리 시간을 최대한 줄일 수 있기 때문에 포크/조인 프레임워크는 컨커런트 API를 극대화할 수 있다.

 

 

하위 작업으로 분할 가능한지는 어떻게 판단할까?

개발자가 분할 가능 여부에 대한 코드를 작성해야 하며, 정의해 놓지 않았다면 하위 작업으로 분할되지 않고 하나의 큰 작업으로 실행된다.  이런 경우에는 컨커런트 API의 장점과 포크/조인 프레임워크의 장점을 사용할 수 없다. 작업을 분할할 수 없을 경우에는 순차 처리를, 분할이 가능하면 별도의 태스크로 나눈 후 병렬 처리하는 방식을 택해야한다. 

 

 

 

포크/조인 프레임워크 클래스

개발자가 직접적으로 개발하고 활용해야하는 클래스는 ForkJoinPool, RecursiveTask, RecursiveAction 이다.

Class 설명
ForkJoinPool 포크/조인 프레임워크의 모체이다. 포크/조인 프레임워크도 스레드 풀의 일종이며, 등록된 태스크를 관리하고 모니터링을 수행한다. 
성능을 극대화하기 위해 특별히 워크-스틸링 알고리즘을 사용한다. 이 클래스에서 사용 가능한 CPU 개수를 기반으로 사용 가능한 스레드를 정의하고 관리한다.
ForkJoinTask RecursiveTask의 상위 클래스이다. ForkJoinPool에서 실행 가능한 태스크가 ForkJoinTask를 상속받은 클래스들이다. 개발자가 직접 컨트롤할 일은 없다. 
RecursiveTask 실제 업무적으로 구현해야할 태스크가 반드시 상속받아야하는 추상 클래스이다. 추상 클래스의 compute 메서드를 구현해야한다.
RecursiveAction RecursiveTask와 비슷한 용도로 사용하지만 결과를 리턴하지 않는다. 즉, 태스크를 실행한 후 결과를 취합하기 위한 조인 작업이 필요 없다.

 

 

 

RecursiveTask

포크/조인 프레임워크의 시작 지점은 RecursiveTask 추상 클래스를 구현하는 것이다. 이 클래스는 ForkJoinTask 클래스를 상속받았으며 해당 클래스는 다시 Future 인터페이스를 구현한 것이다. (비동기 연산작업의 결과를 표현하기 위한 인터페이스)

 

RecursiveTask 클래스는 compute 1개의 추상 메서드를 가진다. RecursiveTask 를 사용한다는 것은 compute 메서드를 구현한다는 것이며, 여기에 태스크를 분리하고 태스크를 실행시키는 등의 내용을 정의해야한다. 

if (하위 작업으로 분리할 수 있는지 판단) {
    하위 작업으로 분리,
    재귀 호출
} else {
	태스크 실행
}

 

 

 

포크/조인 프레임워크 예제

특정 디렉토리에 포함되어 있는 모든 파일의 크기를 구하는 예제를 작성해보자.

 

ForkJoinDirSize.java
package org.example.fork_join;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;

/**
 * 특정 디렉토리에 포함되어 있는 모든 파일의 크기를 구하는 프로그램
 */
public class ForkJoinDirSize extends RecursiveTask<Long> {
    private final Path path;

    public ForkJoinDirSize(Path path) {
        this.path = path;
    }

    @Override
    protected Long compute() {
        long fileSize = 0;

        // 디렉토리 일경우 SUBTASK 생성하고 호출
        if (Files.isDirectory(path)) {
            try {
                List<Path> fileList = Files.list(path).collect(Collectors.toList());
                List<ForkJoinDirSize> subTaskList = new ArrayList<>();

                for (Path file : fileList) {
                    /* 디렉터리 하위를 재귀처리 */
                    ForkJoinDirSize subTask = new ForkJoinDirSize(file);
                    subTask.fork(); // 백그라운드에서 멀티 스레드 형태로 실행시키겠다는 의미
                    subTaskList.add(subTask); // 서브 태스크를 다시 List에 추가하여, 실행된 결과를 리턴받는다.
                }

                Long subSize = 0L;

                for (ForkJoinDirSize subTask : subTaskList) {
                    // compute() 리턴 값을 얻는다.
                    subSize += subTask.join(); // 조인 작업은 포크된 모든 작업이 종료될 때까지 기다린다.
                }

                return subSize;
            } catch(IOException e) {
                System.out.println("Error : " + path);
            }
        } else { // 파일일 경우 크기 리턴
            try {
                fileSize = Files.size(path);
            } catch(IOException e) {
                System.out.println("Error : " + path);
            }
        }

        return fileSize;
    }

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();

        Path rootPath = Paths.get("c:\\Program Files");

        /* 포크/조인 프레임워크로 개발된 코드를 실행하기 위해 ForkJoinPool을 이용한다. */
        ForkJoinPool pool = new ForkJoinPool(); // 스레드풀의 크기를 지정하지 않으면 4개의 스레드 풀이 생성된다.
        System.out.printf("병렬 처리 크기 : %s\n", pool.getParallelism());
        System.out.printf("합계 : %s\n", pool.invoke(new ForkJoinDirSize(rootPath)));

        long endTime = System.currentTimeMillis();

        System.out.printf("처리 시간 : " + (endTime - startTime));
    }
}

 

 

 

위 코드를 포크/조인 프레임워크를 사용하지 않고 순차 처리로 수행했을 경우

DirSize.java
package org.example.fork_join;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;

/**
 * 전통적인 방식 - 재귀
 * 포크/조인 프레임워크가 아닌 기존의 순차 처리에 재귀 호출 방식으로 크기를 구해보자.
 */
public class DirSize {
    protected Long compute(Path path) {
        long fileSize = 0;

        try {
            List<Path> fileList = Files.list(path).collect(Collectors.toList());

            for(Path file : fileList) {
                if(Files.isDirectory(file)) {
                    fileSize += compute(file);
                }
                else {
                    fileSize += Files.size(file);
                }
            }
        }
        catch(IOException e) {
            System.out.println("Error : " + path);
        }
        return fileSize;
    }

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();

        Path rootPath = Paths.get("c:\\Program Files");

        DirSize dirSize = new DirSize();
        System.out.printf("합계 : %s\n", dirSize.compute(rootPath));

        long endTime = System.currentTimeMillis();

        System.out.printf("처리 시간 : " + (endTime - startTime));
    }
}

 포크/조인 프레임워크를 사용했을대에 비해서 시간이 많이 걸린다. 

 

 

 

반응형

Designed by JB FACTORY