[교재 EffectiveJava] 아이템 48. 스트림 병렬화는 주의해서 적용하라

반응형
728x90
반응형

파이프라인 병렬 실행

자바 8부터는 parallel 메서드만 한번 호출하면 파이프라인을 병렬 실행할 수 있는 스트림을 지원했다. 이를 올바르고 빠르게 작성하는 일은 여전히 어려운 작업이다. 동시성 프로그래밍을 할때는 안정성과 응답가능 상태를 유지하기 위해 애써야한다. 

 

메르센 소수를 생성하는 프로그램
package com.java.effective.item48;

import java.math.BigInteger;
import java.util.stream.Stream;

import static java.math.BigInteger.ONE;
import static java.math.BigInteger.TWO;

/**
 * 메르센 소수를 생성하는 코드
 */
public class Main {
    public static void main(String[] args) {
        primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
                .filter(mersenne -> mersenne.isProbablePrime(50))
                .limit(20)
                .forEach(System.out::println);
    }
    
    static Stream<BigInteger> primes() {
        return Stream.iterate(TWO, BigInteger::nextProbablePrime);
    }
}

 

이 프로그램을 내 컴퓨터에서 실행하면 즉각 소수를 찍기 시작해서 12.5초 만에 완료된다.

 

Stream.iterate()
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
        Objects.requireNonNull(f);
        Spliterator<T> spliterator = new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE,
               Spliterator.ORDERED | Spliterator.IMMUTABLE) {
            T prev;
            boolean started;

            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                Objects.requireNonNull(action);
                T t;
                if (started)
                    t = f.apply(prev);
                else {
                    t = seed;
                    started = true;
                }
                action.accept(prev = t);
                return true;
            }
        };
        return StreamSupport.stream(spliterator, false);
    }

 

 

상황

속도를 높이기 위해 스트림 파이프라인의 parallel()을 호출하겠다는 순진한 생각을 했다고 가정하자. 성능이 어느정도 빨라질까? 추가하게되면 이 프로그램은 아무것도 출력하지 못하고 CPU는 90%나 잡아먹는 상태가 무한히 계속된다. (응답 불가) 

 

이렇게 느려진 원인은 스트림 라이브러리가 이 파이프라인을 병렬화하는 방법을 찾아내지 못했기 때문이다. 환경이 아무리 좋더라도, 데이터 소스가 Stream.iterate 거나 중간 연산으로 limit를 쓰면 파이프라인 병렬화로는 성능 개선을 기대할 수 없다. 위 예제 코드는 이 문제점을 모두 가지고있다. 

 

파이프라인 병렬화는 limit을 다룰때 CPU 코어가 남는다면 원소를 몇개 더 처리한후 제한된 개수 이후의 결과를 버려도 아무런 해가 없다고 가정한다. 그런데 이 코드의 경우 새롭게 메르센 소수를 찾을때마다 그 전 소수를 찾을때보다 두배 정도 오래 걸린다. 즉, 원소를 하나 계산하는 비용이 대략 그 이전까지의 원소 전부를 계산한 비용을 합친 것만큼 든다는 뜻이다. 

 

결론적으로, 스트림 파이프라인을 마구잡이로 병렬화해서는 안된다. 성능이 오히려 나빠질 수 있다.

 

 

참조 지역성의 중요성

스트림의 소스가 ArrayList, HashMap, HashSet, ConcurrentHaspMap 의 인스턴스이거나, 배열, int 범위, long 범위일때 병렬화의 효과가 가장 좋다. 이 자료구조들은 모두 데이터를 원하는 크기로 정확하고 손쉽게 나눌 수 있어서 다수의 스레드에 분배하기에 좋다. 나누는 작업은 Spliterator 가 담당하며 Spliterator 객체는 Stream이나 Iterable의 spliterator 메서드로 얻어올 수 있다.

 

이 자료구조들의 공통점은 원소들을 순차적으로 실행할때의 참조 지역성이 뛰어나다는 것이다. 이웃한 원소의 참조들이 메모리에 연속해서 저장되어있다는 뜻이다. 참조지역성이 낮으면 스레드는 데이터가 주 메모리에서 캐시 메모리로 전송되어 오기를 기다리며 대부분 시간을 멍하니 보내게된다. 따라서 참조 지역성은 다량의 데이터를 처리하는 벌크 연산을 병렬화할 때 아주 중요한 요소다.

 

직접 구현한 Stream, Iterable, Collection이 병렬화의 이점을 제대로 누리게 하고 싶다면 spliterator 메서드를 반드시 재정의하고 결과 스트림의 병렬화 성능을 강도 높게 테스트하자.

 

 

파이프라인의 종단연산

종단 연산의 동작 방식은 병렬 수행 효율에 영향을 준다. 종단 연산에서 수행하는 작업량이 파이프라인 전체 작업에서 상당 비중을 차지하면서 순차적인 연산이라면 파이프라인 병렬 수행의 효과는 제한될 수 밖에 없다. 종단 연산 중 병렬화에 가장 적합한 것은 '축소'다. 축소는 파이프라인에 만들어진 모든 원소를 하나로 합치는 작업으로, Stream의 reduce, min, max, count, sum 등과 같이 완성된 형태로 제공되는 메서드 중 하나를 선택해 수행한다. anyMatch, allMatch, noneMatch 처럼 조건에 맞으면 바로 반환되는 메서드도 병렬화에 적합하다. 반면, Stream의 collect 메서드는 가변축소를 수행하여 컬렉션들을 합치는 부담이 크기 때문에 병렬화에 적합하지 않다.

 

 

잘못된 병렬화의 문제점

스트림을 잘못 병렬화하면 성능이 나빠질 뿐만 아니라 결과 자체가 잘못되거나 예상 못한 동작이 발생할 수 있다. 결과가 잘못되거나 오동작 하는 것은 안전 실패라고 한다. 안전 실패는 병렬화한 파이프라인이 사용하는 mappers, filters, 혹은 프로그래머가 제공한 다른 함수 객체가 명세대로 동작하지 않을때 벌어질 수 있다. 

* Stream 의 reduce 연산
acumulator(누적기), combiner(결합기) 함수는 결합 법칙을 만족하고, 간섭받지 않고, 상태를 갖지 않아야한다. 이를 지키지 못하더라도 순차적인 수행에서는 올바른 결과를 얻을 수 있지만, 병렬로 수행한다면 실패한다.

 

.parallel() 추가된 병렬화 버전
package com.java.effective.item48;

import java.math.BigInteger;
import java.util.stream.Stream;

import static java.math.BigInteger.ONE;
import static java.math.BigInteger.TWO;

/**
 * 메르센 소수를 생성하는 코드
 */
public class Main {
    public static void main(String[] args) {
        primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
                .parallel()
                .filter(mersenne -> mersenne.isProbablePrime(50))
                .limit(20)
                .forEach(System.out::println);
    }

    static Stream<BigInteger> primes() {
        return Stream.iterate(TWO, BigInteger::nextProbablePrime);
    }
}

혹시라도 완료되더라도, 출력된 소수의 순서가 올바르지 않을 수 있다. 출력 순서를 순차 버전처럼 정렬하고 싶다면 종단 연산을 forEachOrdered 로 바꿔주면된다. 이 연산은 병렬 스트림을 순회하며 소수를 발견한 순서대로 출력되도록 보장해준다.

 

 

성능 향상 추정 방법

스트림 안의 원소 수와 원소당 수행되는 코드 줄 수를 곱해보자. 이 값이 최소 수십만은 되어야 성능 향상을 맛볼 수 있다. 스트림 병렬화는 오직 성능 최적화 수단임을 기억해야한다. 다른 최적화와 마찬가지로 변경 전후로 반드시 성능을 테스트하여 병렬화를 사용할 가치가 있는지 확인해야한다. 

 

보통은 병렬 스트림 파이프라인도 공통의 포크-조인 풀에서 수행되므로(같은 스레드 풀을 사용), 잘못된 파이프라인 하나가 시스템의 다른부분의 성능에까지 악영향을 줄 수 있다. 

 

 

스트림 병렬화의 효과 예제

소수 계산 스트림 파이프라인 
static long pi(long n) {
    return LongStream.rangeClosed(2, n)
            .mapToObj(BigInteger::valueOf)
            .filter(i -> i.isProbablePrime(50))
            .count();
}

 

소수 계산 스트림 파이프라인 - 병렬화 버전
static long pi(long n) {
    return LongStream.rangeClosed(2, n)
            .parallel()
            .mapToObj(BigInteger::valueOf)
            .filter(i -> i.isProbablePrime(50))
            .count();
}

병렬화 버전 코드가 3.37배 정도 빨라졌다. 

 

 

SplittableRandom 인스턴스

무작위 수들로 이루어진 스트림을 병렬화하려거든 ThreadLocalRandom 보다는 SplittableRandom 인스턴스를 이용하자. SplittableRandom 은 정확히 이럴때 쓰고자 설계된 것이라 병렬화하면 성능이 선형으로 증가한다. ThreadLocalRandom 은 단일 스레드에서 쓰고자 만들어졌다. 병렬 스트림용 데이터 소스로도 사용할 수는 있지만, SplittableRandom 만큼 빠르지는 않을 것이다.  마지막으로 그냥 Random 은 모든 연산을 동기화하기 때문에 병렬 처리시 사용하지 말자.

 

 

결론

스트림의 많은 코드에서 스트림 병렬화가 효과를 보는 경우가 많지 않다. 스트림 병렬화를 하지 말라는 뜻은 아니나, 조건이 잘 갖춰졌을때 사용해야만, parallel 메서드 호출 하나로 거의 프로세서 코어 수에 비례하는 성능 향상을 수행할 수 있다.

 

 

 

반응형

Designed by JB FACTORY