Iterable
리스트를 생성해보자.
// 리스트로 받는 방법
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List 객체는 Collection을 상속한다.
public interface List<E> extends Collection<E> {
Collection 객체는 Iterable을 상속한다.
그러므로 List 타입은 Iterable의 서브타입이라고 할 수 있다.
public interface Collection<E> extends Iterable<E> {
Iterable은 아래와 같이 for-each 문에서 데이터를 순회할 수 있다.
for (Integer i : list) { // for-each
System.out.println(i);
}
Iterable 객체를 직접 만들어보자.
▶ 1 ~ 9까지의 원소를 가지고, 이 원소를 순회하는 Iterator 오브젝트를 리턴하는 Iterable
-- Lamda
Iterable<Integer> iter2 = () -> new Iterator<>() {
// Iterator 를 또 구현해야하는 이유는? Iterator 를 통해서 여러번 순회하기 위해서
int i = 0; // 시작값
final static int MAX = 10; // 종료값
@Override
public boolean hasNext() {
return i < MAX;
}
@Override
public Integer next() { // 받는쪽에서 호출하며, pull
return ++i;
}
};
-- 익명클래스
Iterable<Integer> iter2 = new Iterable<Integer>() {
@Override
public Iterator<Integer> iterator() {
return new Iterator<>() {
// Iterator 를 또 구현해야하는 이유는? Iterator 를 통해서 여러번 순회하기 위해서
int i = 0; // 시작값
final static int MAX = 10; // 종료값
@Override
public boolean hasNext() {
return i < MAX;
}
@Override
public Integer next() { // 받는쪽에서 호출하며, pull
return ++i;
}
};
}
};
1) Iterable을 구현한 객체로 순회 가능하다.
for (Integer i : iter2) { // for-each 사용 가능 (List, Array가 아님에도 가능)
System.out.println(i);
}
2) iter2.iterator()로 순회 가능하다.
for (Iterator<Integer> it = iter2.iterator(); it.hasNext();) {
System.out.println(it.next());
}
▶ Iterable <- duality (상대성) -> Observable
Iterable | Observable |
Pull : 데이터를 가져오는것 | Push : 데이터를 주는것 (통지하는것) |
Observable
Observer 추가
// Observer 선언
Observer ob = new Observer() { // Subscriber
@Override
public void update(Observable o, Object arg) {
System.out.println(Thread.currentThread().getName() + ":" + arg);
}
};
// Observer 추가
IntObservable intObservable = new IntObservable();
intObservable.addObserver(ob);
// main 스레드가 아닌 별도의 스레드로 동작하도록
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(intObservable); // Runnable 구현 객체
// intObservable.run();
// main 쓰레드
System.out.println(Thread.currentThread().getName() + " EXIT");
es.shutdown();
IntObservable.java (Publisher)
static class IntObservable extends Observable implements Runnable { // Publisher
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
setChanged();
notifyObservers(i); // 값을 던진다.
}
}
}
- Observable.setChange()
protected synchronized void setChanged() {
changed = true;
}
- Observable.notifyObservers()
public void notifyObservers(Object arg) {
Object[] arrLocal;
synchronized (this) {
if (!changed)
return;
arrLocal = obs.toArray();
clearChanged();
}
for (int i = arrLocal.length-1; i>=0; i--)
((Observer)arrLocal[i]).update(this, arg);
}
출력결과
main EXIT
pool-1-thread-1:1
pool-1-thread-1:2
pool-1-thread-1:3
pool-1-thread-1:4
pool-1-thread-1:5
pool-1-thread-1:6
pool-1-thread-1:7
pool-1-thread-1:8
pool-1-thread-1:9
pool-1-thread-1:10
Pub/Sub
Publisher | Subscriber |
연속된 요소들을 제공 | 정보를 요청하고, Publisher로부터 데이터를 받는다. |
Publisher.subscribe(Subscriber) 로 구독 | signal : Subscriber의 onSubscribe(), onNext(), onError(), onComplete() |
1) Subscriber 생성
// Subscriber 를 만들어보자.
Flow.Subscriber<Integer> s = new Flow.Subscriber<Integer>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) { // 필수
System.out.println("E03_PubSub.onSubscribe");
// subscription 저장
this.subscription = subscription;
// subscription의 reuqest()
// subscription.request(Long.MAX_VALUE); // 전부 다 받기
this.subscription.request(1); // 1개 받기
}
int bufferSize = 2;
/**
* publisher 에서 통지한 데이터를 처리
* @param item the item
*/
@Override
public void onNext(Integer item) {
System.out.println("E03_PubSub.onNext : " + item);
// 다음 데이터를 다시 요청
if (--bufferSize <= 0) {
bufferSize = 2;
this.subscription.request(2); // 2개 받기
}
}
/**
* 오류 처리 (Publisher 에서 어떤 종류의 에러가 발생하더라도 이 메서드에서 처리한다.)
* @param throwable the exception
*/
@Override
public void onError(Throwable throwable) {
System.out.println("E03_PubSub.onError");
}
@Override
public void onComplete() {
System.out.println("E03_PubSub.onComplete");
}
};
2) [1, 2, 3, 4, 5]를 가진 리스트를 던지는 Publisher 생성
// Publisher 를 만들어보자.
List<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
// 위 데이터를 던지는 Publisher
Flow.Publisher p = new Flow.Publisher() {
@Override
public void subscribe(Flow.Subscriber subscriber) {
subscriber.onSubscribe(new Flow.Subscription() { // Subscriber.onSubscribe() 호출
Iterator<Integer> it = itr.iterator();
@Override
public void request(long n) {
try {
while (n-- > 0) {
if (it.hasNext()) {
subscriber.onNext(it.next()); // 데이터 통지
} else {
subscriber.onComplete(); // 통지 완료
break;
}
}
} catch (RuntimeException e) {
subscriber.onError(e);
}
}
@Override
public void cancel() {
}
});
}
};
3) Publisher 객체의 subscribe(Subscriber 객체) 호출하여 구독
// 구독
p.subscribe(s);
호출 흐름
Subscriber -> Subscription(중개역할) -> Publisher -> 이벤트 발생 -> Subscriber
1) Publisher의 subscribe()에서 매개변수로 받은 Subscriber 객체의 onSubscribe(Subscription) 호출
- new Flow.Subscription()로 Subscription 객체를 생성하고, request(), cancel() 메서드를 오버라이드 선언한다.
@Override
public void subscribe(Flow.Subscriber subscriber) {
subscriber.onSubscribe(new Flow.Subscription() { // Subscriber.onSubscribe() 호출
Iterator<Integer> it = itr.iterator();
@Override
public void request(long n) {
try {
// Subscriber.onSubscribe() 안에서 호출
while (n-- > 0) {
if (it.hasNext()) {
subscriber.onNext(it.next()); // 데이터 통지
} else {
subscriber.onComplete(); // 통지 완료
break;
}
}
} catch (RuntimeException e) {
// 에러 발생시 Subscriber 에서 처리
subscriber.onError(e);
}
}
@Override
public void cancel() {
}
});
}
2) 호출된 Subscriber 객체의 onSubscribe() 에서 매개변수로 받은 Flow.Subscription 객체의 request() 호출한다.
- 받을 데이터 개수를 설정할 수 있다.
- subscription.request(1)는 '데이터 1개를 받는다'라는 의미다.
@Override
public void onSubscribe(Flow.Subscription subscription) { // 필수
System.out.println("E03_PubSub.onSubscribe");
// subscription 저장
this.subscription = subscription;
// subscription의 reuqest()
// subscription.request(Long.MAX_VALUE); // 전부 다 받기
this.subscription.request(1); // 1개 받기
}
3) Scription 객체의 request() 메서드에서 onNext()를 통해 데이터를 통지하거나, onComplete()를 통해 통지를 완료한다.
@Override
public void request(long n) {
try {
// Subscriber.onSubscribe() 안에서 호출
while (n-- > 0) {
if (it.hasNext()) {
subscriber.onNext(it.next()); // 데이터 통지
} else {
subscriber.onComplete(); // 통지 완료
break;
}
}
} catch (RuntimeException e) {
// 에러 발생시 Subscriber 에서 처리
subscriber.onError(e);
}
}
4) onNext()가 호출되면 Publisher에서 통지된 데이터가 처리된다.
@Override
public void onNext(Integer item) {
System.out.println("E03_PubSub.onNext : " + item);
// 다음 데이터를 다시 요청
if (--bufferSize <= 0) {
bufferSize = 2;
this.subscription.request(2); // 2개 받기
}
}
5) Publisher에서 어떤 종류의 에러가 발생한다면 onError()가 호출되고, 통지가 끝났다면 onComplete()가 호출된다.
- 그러므로, 둘 중 하나만 호출되겠다.
@Override
public void onError(Throwable throwable) {
System.out.println("E03_PubSub.onError");
}
@Override
public void onComplete() {
System.out.println("E03_PubSub.onComplete");
}
위 코드를 비동기로 변경해보자.
별도 스레드로 수행하기 위해 ExecutorService 객체를 선언하자.
// 별도 스레드
ExecutorService es = Executors.newSingleThreadExecutor();
https://devfunny.tistory.com/807?category=957918
Publisher는 쓰레드는 여러개 만들어서 데이터를 통지하는 것이 스펙상으로 불가능하다.
하나의 Subscriber는 순서대로 데이터가 통지됨으로 알고있다.
어느 한순간에는 적어도 한 스레드에서만 데이터가 날라올 수 있다고 본다. 동시에 다른 스레드에서 멀티 스레드 방식으로 동작하는건 가능하다.
1) Publisher 생성
Future 객체로 리턴받는다. 자바 비동기 작업의 결과 정보를 제공해주는 클래스다.
Flow.Publisher p = new Flow.Publisher() {
@Override
public void subscribe(Flow.Subscriber subscriber) {
subscriber.onSubscribe(new Flow.Subscription() { // Subscriber.onSubscribe() 호출
Iterator<Integer> it = itr.iterator();
@Override
public void request(long n) {
Future<?> submit = es.submit(() -> {
int i = 0;
try {
// Subscriber.onSubscribe() 안에서 호출
while (i++ < n) {
if (it.hasNext()) {
subscriber.onNext(it.next()); // 데이터 통지
} else {
subscriber.onComplete(); // 통지 완료
break;
}
}
} catch (RuntimeException e) {
subscriber.onError(e);
}
});
}
@Override
public void cancel() { // 작업을 취소시킬 수 있다. 이때 Future를 통해서 interrupt를 날릴 수 있다.
}
});
}
};
2) Subscriber 생성
- onSubscribe() 메서드는 subscribe()를 호출한 동일한 스레드 내에서 처리되어야한다.
- 즉, main 스레드에서 수행되어야한다.
Flow.Subscriber<Integer> s = new Flow.Subscriber<Integer>() {
Flow.Subscription subscription;
/*
subscribe()한 동일한 쓰레드 내에서 처리되야한다.
-> main 스레드
*/
@Override
public void onSubscribe(Flow.Subscription subscription) { // 필수
System.out.println(Thread.currentThread().getName() + " : E03_PubSub.onSubscribe");
// subscription 저장
this.subscription = subscription;
// subscription의 reuqest()
// subscription.request(Long.MAX_VALUE); // 전부 다 받기
// 이것도 동일한 쓰레드 내에서 처리되야한다. (새로운 스레드 만들어서 request 날리면 안된다.)
this.subscription.request(1); // 1개 받기
}
int bufferSize = 2;
/**
* publisher 에서 통지한 데이터를 처리
* @param item the item
*/
@Override
public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + " : E03_PubSub.onNext : " + item);
// 다음 데이터를 다시 요청
if (--bufferSize <= 0) {
bufferSize = 2;
this.subscription.request(2); // 2개 받기
}
}
/**
* 오류 처리 (Publisher 에서 어떤 종류의 에러가 발생하더라도 이 메서드에서 처리한다.)
* @param throwable the exception
*/
@Override
public void onError(Throwable throwable) {
System.out.println("E03_PubSub.onError : " + throwable);
}
@Override
public void onComplete() {
System.out.println("E03_PubSub.onComplete");
}
};
3) p.subscribe(s) 를 호출하여 구독 시작
public static void main(String[] args) throws InterruptedException {
...
// 구독
p.subscribe(s);
// time 걸어서 동작중에 shutdown 되지 않도록 한다.
es.awaitTermination(10, TimeUnit.HOURS);
es.shutdown();
}
결과
main : E03_PubSub.onSubscribe
pool-1-thread-1 : E03_PubSub.onNext : 1
동기 vs 비동기 결과 비교
Subscriber의 onSubscribe() 메서드에 각각 아래 한줄을 추가하여 결과를 비교해보자.
System.out.println(Thread.currentThread().getName() + " : E03_PubSub.onSubscribe");
@Override
public void onSubscribe(Flow.Subscription subscription) { // 필수
System.out.println(Thread.currentThread().getName() + " : E03_PubSub.onSubscribe");
// subscription 저장
this.subscription = subscription;
// subscription의 reuqest()
// subscription.request(Long.MAX_VALUE); // 전부 다 받기
// 이것도 동일한 쓰레드 내에서 처리되야한다. (새로운 스레드 만들어서 request 날리면 안된다.)
this.subscription.request(1); // 1개 받기
System.out.println(Thread.currentThread().getName() + " : E03_PubSub.onSubscribe");
}
동기 | 비동기 |
E03_PubSub.onSubscribe E03_PubSub.onNext : 1 main : E03_PubSub.onSubscribe |
main : E03_PubSub.onSubscribe main : E03_PubSub.onSubscribe pool-1-thread-1 : E03_PubSub.onNext : 1 |
참고 : 토비의 봄 TV
흐름 정리