[JAVA8 병렬프로그래밍] 스트림 병렬처리
- Coding/Java
- 2022. 6. 16.
스트림의 병렬 처리
스트림 API에서는 parallelStream 메서드를 이용해서 스트림 객체를 생성하는 것만으로도 병렬 처리가 된다. 손쉽게 대량의 데이터를 병렬 처리하는 코드를 작성할 수 있다.
ParallelReduceMinMax.java
package org.example.streamparallel;
import java.util.Arrays;
import java.util.List;
public class ParallelReduceMinMax {
public static void main(String[] args) {
List<Integer> intList = Arrays.asList(4, 2, 8, 1, 9, 6, 7, 3, 5);
// 최대 값 구하기 - 병렬
int max = intList.parallelStream().reduce(1, Integer::max);
System.out.printf("최대값 : %s\n", max);
// 최소 값 구하기 - 병렬
int min = intList.parallelStream().reduce(1, Integer::min);
System.out.printf("최소값 : %s\n", min);
}
}
위의 코드는 parallelStream과 stream 메서드를 통해 객체를 생성했을 때의 차이점을 느끼지 못할 정도로 동일한 결과가 나온다.
스트림의 병렬처리 내부
parallelStream으로 객체를 생성할때 내부적으로 몇개의 스레드가 어떤 이름으로 생성되고 실행되는지 확인해보자.
InsideParallelStream.java
package org.example.streamparallel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class InsideParallelStream {
public static void main(String[] args) {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 스트림 내부의 쓰레드 값을 구함.
intList.parallelStream().forEach(value -> {
// 현재 쓰레드 이름을 구함.
String threadName = Thread.currentThread().getName();
/* 스레드 이름과 데이터 값을 출력한다. */
LocalDateTime currentTime = LocalDateTime.now();
System.out.printf(currentTime.format(formatter) + " -> Thread Name : %s, Stream Value : %s\n", threadName, value);
// 시간 확인을 위해 2초간 sleep 함
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {}
});
}
}
실행결과
2022-06-16 19:07:02 -> Thread Name : ForkJoinPool.commonPool-worker-9, Stream Value : 2
2022-06-16 19:07:02 -> Thread Name : main, Stream Value : 7
2022-06-16 19:07:02 -> Thread Name : ForkJoinPool.commonPool-worker-13, Stream Value : 8
2022-06-16 19:07:02 -> Thread Name : ForkJoinPool.commonPool-worker-31, Stream Value : 4
2022-06-16 19:07:02 -> Thread Name : ForkJoinPool.commonPool-worker-27, Stream Value : 5
2022-06-16 19:07:02 -> Thread Name : ForkJoinPool.commonPool-worker-23, Stream Value : 6
2022-06-16 19:07:02 -> Thread Name : ForkJoinPool.commonPool-worker-19, Stream Value : 3
2022-06-16 19:07:02 -> Thread Name : ForkJoinPool.commonPool-worker-5, Stream Value : 9
2022-06-16 19:07:01 -> Thread Name : ForkJoinPool.commonPool-worker-7, Stream Value : 10
2022-06-16 19:07:02 -> Thread Name : ForkJoinPool.commonPool-worker-17, Stream Value : 1
1) 병렬 처리이므로 데이터의 순서가 1부터 10까지 순차적으로 출력되지 않고 섞여서 출력된다.
실행할때마다 결과가 다르게 나온다.
2) PC나 서버의 코어 수에 따라서 스레드가 생성된다.
내부적으로 PC 혹은 서버의 코어 수를 계싼한 다음 자동으로 스레드를 생성한다.
3) main 스레드는 1개이고 그 외는 ForkJoinPool으로 생성된다.
main 스레드는 스트림을 처리하기 위한 기본 스레드를 의미하고, 이 스레드가 그 외의 ForkJoinPool의 스레드를 생성한다. 코어 수 기반의 스레드 생성은 스트림에서 제어한 것이 아니라 컨커런트 API의 ForkJoinPool의 기본 값이다.
스레드 개수 제어
병렬 처리시 다른 서비스에 영향을 최소화하면서 수행되도록 스레드 개수를 제어할 필요가 있다.
- ForkJoinPool의 기본 스레드 값을 변경한다.
- ForkJoinPool이 아닌 다른 스레드 풀을 사용한다.
ForkJoinPool의 common Pool 개수를 조정해서 동시 처리 개수를 관리해보자.
InsideParallelStream2.java
package org.example.streamparallel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class InsideParallelStream2 {
public static void main(String[] args) {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 쓰레드 개수 2개로 설정
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
System.out.printf("## Thread Pool Size : %s\n", ForkJoinPool.getCommonPoolParallelism());
intList.parallelStream().forEach(value -> {
// 현재 쓰레드 이름을 구함.
String threadName = Thread.currentThread().getName();
LocalDateTime currentTime = LocalDateTime.now();
System.out.printf(currentTime.format(formatter) + " -> Thread Name : %s, Stream Value : %s\n", threadName, value);
// 시간 확인을 위해 2초간 sleep 함
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {}
});
}
}
실행결과
2022-06-16 19:19:45 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Value : 9
2022-06-16 19:19:45 -> Thread Name : main, Stream Value : 7
2022-06-16 19:19:45 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Value : 3
2022-06-16 19:19:47 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Value : 5
2022-06-16 19:19:47 -> Thread Name : main, Stream Value : 6
2022-06-16 19:19:47 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Value : 10
2022-06-16 19:19:49 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Value : 4
2022-06-16 19:19:49 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Value : 8
2022-06-16 19:19:51 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Value : 2
2022-06-16 19:19:51 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Value : 1
1) 시스템의 속성 정보를 변경
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
ForkJoinPool에서 commonPool을 생성할때 참조하는 환경 변수로, 기본 값은 CPU 코어이지만 설정 값을 강제로 수정 가능하다.
스레드 조절시 주의할점
1) 설정한 스레드 값(ForkJoinPool)에 하나를 더한 스레드(main)가 총 스레드 개수다.
총 2개의 스레드로 설정하려면 값을 1로 설정해야한다.
2) ForkJoinPool의 기본 값을 변경하는 것이기 때문에 자바 가상 머신 전체에 영향을 미친다.
특정한 프로세스 혹은 스레드에 대해서만 변경할 수 없으며 가상 머신이 종료될 때까지 설정이 유효하다.
ForkJoinPool이 아닌 사용자 정의 풀을 사용해보자.
InsideParallelStream3.java
package org.example.streamparallel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class InsideParallelStream3 {
public static void main(String[] args) throws Exception {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/* 별도의 스레드 풀 생성 */
ForkJoinPool customPool = new ForkJoinPool(2);
/* 비동기 수행 submit() */
customPool.submit(() -> {
// 쓰레드 풀 크기를 구한다.
System.out.printf("## Thread Pool Size : %s\n", customPool.getParallelism());
intList.parallelStream().forEach(value -> {
// 현재 쓰레드 이름을 구함.
String threadName = Thread.currentThread().getName();
LocalDateTime currentTime = LocalDateTime.now();
System.out.printf(currentTime.format(formatter) + " -> Thread Name : %s, Stream Value : %s\n", threadName, value);
// 시간 확인을 위해 2초간 sleep 함
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {}
});
}).get();
}
}
실행결과
2022-06-16 19:24:10 -> Thread Name : ForkJoinPool-1-worker-1, Stream Value : 3
2022-06-16 19:24:10 -> Thread Name : ForkJoinPool-1-worker-3, Stream Value : 7
2022-06-16 19:24:12 -> Thread Name : ForkJoinPool-1-worker-3, Stream Value : 6
2022-06-16 19:24:12 -> Thread Name : ForkJoinPool-1-worker-1, Stream Value : 5
2022-06-16 19:24:14 -> Thread Name : ForkJoinPool-1-worker-1, Stream Value : 4
2022-06-16 19:24:14 -> Thread Name : ForkJoinPool-1-worker-3, Stream Value : 9
2022-06-16 19:24:16 -> Thread Name : ForkJoinPool-1-worker-3, Stream Value : 10
2022-06-16 19:24:16 -> Thread Name : ForkJoinPool-1-worker-1, Stream Value : 2
2022-06-16 19:24:18 -> Thread Name : ForkJoinPool-1-worker-3, Stream Value : 8
2022-06-16 19:24:18 -> Thread Name : ForkJoinPool-1-worker-1, Stream Value : 1
이 코드는 commonPool로 실행했을때와 달리 mian 스레드가 없다. 그리고 스레드 설정을 2로 하면 실제로 3개의 스레드가 실행되는데 여기서는 2로 설정하면 실제로 2개의 스레드만 동작한다.
이렇게 별도의 스레드 풀을 생성하면 정확히 원하는 만큼의 스레드 풀을 생성할 수 있고, 원하는 업무 혹은 스트림에 한정해서 스레드 풀의 개수를 변경할 수 있다. 반면 별도의 스레드 풀을 생성해야하기 때문에 코드가 다소 복잡해질 수 있다.
parallel과 sequential
병렬로 하다가 중간에 순차 처리가 필요할 수도 있고, 반대로 순차 처리를 하다가 중간에 병렬 처리를 해야할 때가 있다. 이런 경우에 각 상황에 맞게 스트림의 순차/병렬을 변경하는 메서드를 제공한다.
메서드 | 설명 |
parallel | 순차 처리 스트림 -> 병렬 처리로 변경 |
sequential | 병렬 처리 스트림 -> 순차 처리로 변경 |
예시
list.stream().limit(100).parallel().reduce(Integer::sum);
limit 연산을 수행할때 데이터의 처음부터 100번째까지를 정확히 자르고 싶다면 병렬 처리가 아닌 순차 처리를 해야한다. 그리고 limit 이후로는 병렬 처리가 훨씬 효율적이므로 parallel 메서드로 변경해서 처리한다.
'Coding > Java' 카테고리의 다른 글
[JAVA8 병렬프로그래밍] 원자적 변수 atomic (0) | 2022.06.17 |
---|---|
[JAVA8 병렬프로그래밍] 분할반복 Spliterator (0) | 2022.06.16 |
[JAVA8 병렬프로그래밍] Future와 CompletableFuture (0) | 2022.06.15 |
[JAVA8 병렬프로그래밍] 포크/조인 프레임워크 (0) | 2022.06.15 |
[JAVA8 병렬프로그래밍] Executors 클래스, ExecutorService 인터페이스 (1) | 2022.06.14 |