[교재 EffectiveJava] 아이템 81. wait와 notify보다는 동시성 유틸리티를 애용하라

반응형
728x90
반응형

wait와 notify

wait와 notify는 올바르게 사용하기가 아주 까다로우니, java.util.concurrent의 고수준 동시성 유틸리티를 사용하자.

 

▶ wait(), notify(), notifyAll() 포스팅 바로가기

https://devfunny.tistory.com/855

 

wait()과 notify(), notifyAll()

wait(), notify(), notifyAll() synchronized로 동기화해서 공유 데이터를 보호할때 특정 스레드가 객체의 락을 가진 상태로 오랜 시간을 보내지 않도록 하는것도 중요하다. 락을 오랜시간 보유하게되면, 다

devfunny.tistory.com

 

 

1) 실행자 프레임워크

https://devfunny.tistory.com/807?category=957918 

 

[JAVA8 병렬프로그래밍] Executors 클래스, ExecutorService 인터페이스

Executors 클래스 - Executor 인터페이스 : 컨커런트 API의 핵심 인터페이스다. 이 인터페이스를 구현한 여러 종류의 클래스를 기본으로 제공한다. - 스레드 풀 : 스레드를 관리하기 위한 풀이다. 병렬

devfunny.tistory.com

 

 

2) 동시성 컬렉션 (concurrent collection)

List, Queue, Map 같은 표준 컬렉션 인터페이스에 동시성을 가미해 구현한 고성능 컬렉션이다.

높은 동시성에 도달하기 위해 동기화를 각자의 내부에서 수행한다. 

따라서 동시성 컬렉션에서 동시성을 무력화하는건 불가능하며, 외부에서 락을 추가로 사용하면 오히려 속도가 느려진다.

동시성 컬렉션에서 동시성을 무력화하지 못한다. 따라서 여러 메서드를 원자적으로 묶어 호출하는 일이 불가능하다. 

그래서 기본 동작을 하나의 원자적 동작으로 묶는 '상태 의존적 수정' 메서드들이 추가되었다. 이 메서드들은 디폴트 메서드 형태다.

package com.java.effective.item81;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class Intern {
    /* 동시성 정규화 맵 */
    private static final ConcurrentMap<String, String> map =
            new ConcurrentHashMap<>();

    public static String intern(String s) {
        // ConcurrentMap는 get과 같은 검색 기능이 최적화되어있어서, get을 먼저 호출하는게 낫다.
        String previousValue = map.putIfAbsent(s, s);
        return previousValue == null ? s : previousValue;
    }
}

get을 먼저 호출하여 필요할때만 putIfAbsent()를 호출하면 더 빠르다.

 

ConcurrentHashMap.java

get()
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

 

putIfAbsent() > putVal()

synchronized 키워드를 사용하여 동기화를 하고있다.

final V putVal(K key, V value, boolean onlyIfAbsent) {
   ...
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            ...
    }
    addCount(1L, binCount);
    return null;
}

 

아래와 같이 코드를 바꾸자.

package com.java.effective.item81;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class Intern {
    /* 동시성 정규화 맵 */
    private static final ConcurrentMap<String, String> map =
            new ConcurrentHashMap<>();

    public static String intern(String s) {
        String result = map.get(s);
        if (result == null) {
            result = map.putIfAbsent(s, s);
            if (result == null)
                result = s;
        }
        return result;
    }
}

ConcurrentHashMap은 동시성이 뛰어나며 속도가 무척 빠르다. 동기화 컬렉션은 동기화한 컬렉션(Collections.synchronizedMap)보다 훨씬 좋다. 동기화된 맵을 동시성 맵으로 교체하는 것만으로 동시성 애플리케이션의 성능은 극적으로 개선된다.

 

컬렉션 인터페이스 중 일부는 작업이 성공적으로 완료될 때까지 기다리도록 확장되었다. 

 

예시 BlockingQueue

Queue를 확장한 BlockingQueue에 추가된 메서드 중 take는 큐의 첫 원소를 꺼낸다. 이때 만약 큐가 비어져있다면 새로운 원소가 추가될 때까지 기다린다. 이런 특성 덕에 BlockingQueue는 작업 큐(생산자-소비자 큐)로 쓰기에 적합하다. 

 

LinkedBlockingQueue > take()
public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

작업 큐는 하나 이상의 생산자(producer) 스레드가 작업(work)을 큐에 추가하고, 하나 이상의 소비자(consumer) 스레드가 큐에 있는 작업을 꺼내 처리하는 형태다. 

 

ThreadPoolExecutor를 포함한 대부분의 실행자 서비스 구현체에서 이 BlockingQueue를 사용한다.

ThreadPoolExecutor.java
private final BlockingQueue<Runnable> workQueue;

 

 

3) 동기화 장치 (synchronizer)

스레드가 다른 스레드를 기다릴 수 있게 하여, 서로 작업을 조율할 수 있게 해준다.

 

CountDownLatch.java
package java.util.concurrent;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

 

위 CountDownLatch를 사용하여 '동시 실행 시간을 재는 간단한 프레임워크' 예제를 작성해보자.

package com.java.effective.item81;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;

public class Time {
    /**
     * concurrency 매개변수로 지정한 지정한 동시성 수준만큼의 스레드를 생성할 수 있어야한다.
     * 그렇지못하면 이 메서드는 결코 끝나지 않게된다. (스레드 기아 교착상태)
     * @param executor ;실행자
     * @param concurrency
     * @param action
     * @return
     * @throws InterruptedException
     */
    public static long time(Executor executor, int concurrency,
                            Runnable action) throws InterruptedException {
        // 작업자 스레드들이 준비가 완료됐음을 타이머 스레드에 통지할때 사용
        CountDownLatch ready = new CountDownLatch(concurrency);

        // 모든 스레드가 동작 준비가 완료되고 작업을 들어가게 하기 위한 래치
        // 통지를 끝낸 작업자 스레드들은 두번째 래치인 start가 열리기를 기다린다.
        CountDownLatch start = new CountDownLatch(1);

        // 모든 스레드가 동작을 완료한 순간을 위한 래치
        /*
            마지막 작업자 스레드가 reday.countDown을 호출하면 타이머 스레드가 시작 시간을 기록한다.
            start.countDown이 호출하여 기다리던 작업자 스레드들을 깨운다. 그 직후 타이머 스레드는 세번째 래치인 done이 열리기를 기다린다.
            done 래치는 마지막 남은 작업자 스레드가 동작을 마치고 done.countDown을 호출하면 열린다.
            타이머 스레드는 done 래치가 열리자마자 깨어나 종료 시각을 기록한다.
         */
        CountDownLatch done  = new CountDownLatch(concurrency);

        for (int i = 0; i < concurrency; i++) {
            executor.execute(() -> {
                // 타이머에게 준비를 마쳤음을 알린다.
                ready.countDown();
                try {
                    // 모든 작업자 스레드가 준비될 때까지 기다린다.
                    start.await();
                    action.run();
                } catch (InterruptedException e) {
                    // 인터럽트를 되살리고 자신은 run 메서드에서 빠져나온다.
                    Thread.currentThread().interrupt();
                } finally {
                    // 타이머에게 작업을 마쳤음을 알린다.
                    done.countDown();
                }
            });
        }

        // 모든 작업자가 준비될 때까지 기다린다.
        ready.await();

        /*
            시간을 잴때는 System.nanoTime()를 사용하자.
            System.currentTimeMillis보다 더 정확하고 정밀하며 시스템의 실시간 시계의 시간 보정에 영향받지 않는다.
         */
        long startNanos = System.nanoTime();

        // 작업자들을 깨운다.
        start.countDown();

        // 모든 작업이 종료될 떄까지 대기한다.
        done.await();

        return System.nanoTime() - startNanos;
    }
}

 

 

레거시 코드를 다뤄야할때

wait 메서드는 스레드가 어떤 조건이 충족되기를 기다리게할 때 사용한다.

락 객체의 wait 메서드는 반드시 그 객체를 잠근 동기화 영역 안에서 호출해야한다.

 

wait 메서드를 사용하는 표준 방식
synchronized (obj) {
    while (<조건이 충족되지 않았다.>) {
        obj.wait(); // 락을 놓고, 깨어나면 다시 잡는다.
    }
    ... // 조건 충족됐을 대의 동작을 수행한다.
}

wait 메서드를 사용할때는 반드시 대기 반복문(wait loop) 관용구를 사용하라. 반복문 밖에서는 절대로 호출하지 말자.

이 반복문은 wait 호출 전후로 조건이 만족하는지를 검사하는 역할을 한다.

 

대기 전에 조건을 검사하여 조건이 이미 충족되었다면 wait를 건너뛰게 한 것은 응답 불가 상태를 예방하기 위한 조치다. 

만약 조건이 이미 충족되었는데 스레드가 notify(혹은 notifyAll) 메서드를 먼저 호출한 후 대기 상태로 빠지면, 그 스레드를 다시 깨울 수 있다고 보장할 수 없다.

 

대기 후에 조건을 검사하여 조건이 충족되지 않았다면 다시 대기하게 하는 것은 안전 실패를 막는 조치다.

만약 조건이 충족되지 않았는데 스레드가 동작을 이어가면 락이 보호하는 불변식을 깨뜨릴 위험이 있다. 

 

조건이 만족되지 않아도 스레드가 깨어날 수 있는 상황

  • 스레드가 notify를 호출한 다음 대기 중이던 스레드가 깨어나는 사이에 다른 스레드가 락을 얻어 그 락이 보호하는 상태를 변경한다.
  • 조건이 만족되지 않았음에도 다른 스레드가 실수 혹은 악의적으로 notify를 호출한다. 공개된 객체를 락으로 사용해 대기하는 클래스는 이런 위험에 노출되고, 외부에 노출된 객체의 동기화된 메소드 안에서 호출하는 wait는 모두 이 문제에 영향을 받는다.
  • 깨우는 스레드는 지나치게 관대해서, 대기 중인 스레드 중 일부만 조건이 충족되도 notifyAll을 호출해 모든 스레드를 깨울 수도 있다.
  • 대기중인 스레드가 (드물게) notify 없이도 깨어나는 경우가 있다. 허위 각성(spurious wakeup) 현상이다.

 

notify

스레드 하나만 깨운다.

 

notifyAll

모든 스레드를 깨운다.

 

일반적으로 언제나 notifyAll을 사용하라는게 합리적이고 안전한 조언이 될 것이다. 

깨어나야하는 모든 스레드가 깨어남을 보장하므로 항상 정확한 결과를 얻을 수 있다.

깨어난 스레드들은 기다리던 조건이 충족되었는지 확인하여, 충족되지 않았다면 다시 대기할 것이다.

 

모든 스레드가 같은 조건을 기다리고, 조건이 한번 충족될 때마다 단 하나의 스레드만 혜택을 받을 수 있다면 notifyAll 대신 notify를 사용하여 최적화할 수 있다.

 

하지만 이상의 전제조건들이 만족될지라도 notify 대신 notifyAll을 사용해야하는 이유는, 외부로 공개된 객체에 대해 실수로 혹은 악의적으로 notify를 호출하는 상황에 대비하기 위해 wait를 반복문 안에서 호출하듯, notify 대신 notifyAll을 사용하면 관련 없는 스레드가 실수로 혹은 악의적으로 wait를 호출하는 공격으로부터 보호할 수 있기 때문이다. 

그런 스레드가 중요한 notify를 삼켜버린다면 꼭 깨어났어야 할 스레드들이 영원히 대기하게 될 수 있다.

 

 

반응형

Designed by JB FACTORY