[JAVA8 병렬프로그래밍] 포크/조인 프레임워크
- Coding/Java
- 2022. 6. 15.
포크/조인 프레임워크
컨커런트 API를 최종적으로 완성한 것은 자바 7에서 제공한 포크/조인 프레임워크다. 포크/조인 프레임워크는 java.util.concurrent 패키지의 핵심인 ExecutorService 인터페이스를 구현한 클래스다. 새로운 기능이 아니라, 인터페이스의 구현체가 추가된 것으로 이해하면 된다. 이 프레임워크의 주된 목적은 멀티 프로세서 혹은 멀티 코어를 가지고 있는 하드웨어 자원을 최대한 효율적으로 활용해서 병렬 처리가 가능하도록 하는 것이다.
- 포크 (Fork) : 다른 프로세스 혹은 스레드(태스크)를 여러 개로 쪼개서 새롭게 생성한다는 의미이다.
- 조인 (Join) : 포크해서 실행한 프로세스 혹은 스레드(태스크)의 결과를 취합한다는 의미이다.
어떤 할일이 있을때 그 일을 나눠서 할 수 있다면 여러개로 쪼개서 실행하고 (Fork), 최종적으로 실행이 완료된 내용을 취합(Join)해서 그 결과를 만들어내는 것이다.
하나의 큰 작업이 생성되면 해당 작업을 분할할 수 있는지 판단하고, 분할할 수 있다면 하위 작업으로 분할한다. 그리고 분할할 수 있을 때까지 계속 하위 작업으로 분할한다. 이후 분할된 모든 하위 작업이 완료될 때까지 대기한 후에 그 결과를 취합해서 최종 결과물로 만든다.
하위 작업(서브 태스크)으로 분할해서 동시에 병렬 처리하면 멀티 프로세서, 멀티 코어 기반의 하드웨어의 성능이 뒷받침된다면 처리 시간을 최대한 줄일 수 있기 때문에 포크/조인 프레임워크는 컨커런트 API를 극대화할 수 있다.
하위 작업으로 분할 가능한지는 어떻게 판단할까?
개발자가 분할 가능 여부에 대한 코드를 작성해야 하며, 정의해 놓지 않았다면 하위 작업으로 분할되지 않고 하나의 큰 작업으로 실행된다. 이런 경우에는 컨커런트 API의 장점과 포크/조인 프레임워크의 장점을 사용할 수 없다. 작업을 분할할 수 없을 경우에는 순차 처리를, 분할이 가능하면 별도의 태스크로 나눈 후 병렬 처리하는 방식을 택해야한다.
포크/조인 프레임워크 클래스
개발자가 직접적으로 개발하고 활용해야하는 클래스는 ForkJoinPool, RecursiveTask, RecursiveAction 이다.
Class | 설명 |
ForkJoinPool | 포크/조인 프레임워크의 모체이다. 포크/조인 프레임워크도 스레드 풀의 일종이며, 등록된 태스크를 관리하고 모니터링을 수행한다. 성능을 극대화하기 위해 특별히 워크-스틸링 알고리즘을 사용한다. 이 클래스에서 사용 가능한 CPU 개수를 기반으로 사용 가능한 스레드를 정의하고 관리한다. |
ForkJoinTask | RecursiveTask의 상위 클래스이다. ForkJoinPool에서 실행 가능한 태스크가 ForkJoinTask를 상속받은 클래스들이다. 개발자가 직접 컨트롤할 일은 없다. |
RecursiveTask | 실제 업무적으로 구현해야할 태스크가 반드시 상속받아야하는 추상 클래스이다. 추상 클래스의 compute 메서드를 구현해야한다. |
RecursiveAction | RecursiveTask와 비슷한 용도로 사용하지만 결과를 리턴하지 않는다. 즉, 태스크를 실행한 후 결과를 취합하기 위한 조인 작업이 필요 없다. |
RecursiveTask
포크/조인 프레임워크의 시작 지점은 RecursiveTask 추상 클래스를 구현하는 것이다. 이 클래스는 ForkJoinTask 클래스를 상속받았으며 해당 클래스는 다시 Future 인터페이스를 구현한 것이다. (비동기 연산작업의 결과를 표현하기 위한 인터페이스)
RecursiveTask 클래스는 compute 1개의 추상 메서드를 가진다. RecursiveTask 를 사용한다는 것은 compute 메서드를 구현한다는 것이며, 여기에 태스크를 분리하고 태스크를 실행시키는 등의 내용을 정의해야한다.
if (하위 작업으로 분리할 수 있는지 판단) {
하위 작업으로 분리,
재귀 호출
} else {
태스크 실행
}
포크/조인 프레임워크 예제
특정 디렉토리에 포함되어 있는 모든 파일의 크기를 구하는 예제를 작성해보자.
ForkJoinDirSize.java
package org.example.fork_join;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;
/**
* 특정 디렉토리에 포함되어 있는 모든 파일의 크기를 구하는 프로그램
*/
public class ForkJoinDirSize extends RecursiveTask<Long> {
private final Path path;
public ForkJoinDirSize(Path path) {
this.path = path;
}
@Override
protected Long compute() {
long fileSize = 0;
// 디렉토리 일경우 SUBTASK 생성하고 호출
if (Files.isDirectory(path)) {
try {
List<Path> fileList = Files.list(path).collect(Collectors.toList());
List<ForkJoinDirSize> subTaskList = new ArrayList<>();
for (Path file : fileList) {
/* 디렉터리 하위를 재귀처리 */
ForkJoinDirSize subTask = new ForkJoinDirSize(file);
subTask.fork(); // 백그라운드에서 멀티 스레드 형태로 실행시키겠다는 의미
subTaskList.add(subTask); // 서브 태스크를 다시 List에 추가하여, 실행된 결과를 리턴받는다.
}
Long subSize = 0L;
for (ForkJoinDirSize subTask : subTaskList) {
// compute() 리턴 값을 얻는다.
subSize += subTask.join(); // 조인 작업은 포크된 모든 작업이 종료될 때까지 기다린다.
}
return subSize;
} catch(IOException e) {
System.out.println("Error : " + path);
}
} else { // 파일일 경우 크기 리턴
try {
fileSize = Files.size(path);
} catch(IOException e) {
System.out.println("Error : " + path);
}
}
return fileSize;
}
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
Path rootPath = Paths.get("c:\\Program Files");
/* 포크/조인 프레임워크로 개발된 코드를 실행하기 위해 ForkJoinPool을 이용한다. */
ForkJoinPool pool = new ForkJoinPool(); // 스레드풀의 크기를 지정하지 않으면 4개의 스레드 풀이 생성된다.
System.out.printf("병렬 처리 크기 : %s\n", pool.getParallelism());
System.out.printf("합계 : %s\n", pool.invoke(new ForkJoinDirSize(rootPath)));
long endTime = System.currentTimeMillis();
System.out.printf("처리 시간 : " + (endTime - startTime));
}
}
위 코드를 포크/조인 프레임워크를 사용하지 않고 순차 처리로 수행했을 경우
DirSize.java
package org.example.fork_join;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
/**
* 전통적인 방식 - 재귀
* 포크/조인 프레임워크가 아닌 기존의 순차 처리에 재귀 호출 방식으로 크기를 구해보자.
*/
public class DirSize {
protected Long compute(Path path) {
long fileSize = 0;
try {
List<Path> fileList = Files.list(path).collect(Collectors.toList());
for(Path file : fileList) {
if(Files.isDirectory(file)) {
fileSize += compute(file);
}
else {
fileSize += Files.size(file);
}
}
}
catch(IOException e) {
System.out.println("Error : " + path);
}
return fileSize;
}
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
Path rootPath = Paths.get("c:\\Program Files");
DirSize dirSize = new DirSize();
System.out.printf("합계 : %s\n", dirSize.compute(rootPath));
long endTime = System.currentTimeMillis();
System.out.printf("처리 시간 : " + (endTime - startTime));
}
}
포크/조인 프레임워크를 사용했을대에 비해서 시간이 많이 걸린다.
'Coding > Java' 카테고리의 다른 글
[JAVA8 병렬프로그래밍] 스트림 병렬처리 (0) | 2022.06.16 |
---|---|
[JAVA8 병렬프로그래밍] Future와 CompletableFuture (0) | 2022.06.15 |
[JAVA8 병렬프로그래밍] Executors 클래스, ExecutorService 인터페이스 (0) | 2022.06.14 |
[JAVA8 병렬프로그래밍] 컨커런트 API (0) | 2022.06.14 |
[Java] Stream 사용하여 String 이 null이 아닐 경우 concat(joining) 수행하기 (0) | 2022.03.23 |