[아파치 카프카 어플리케이션 프로그래밍] 8. 컨슈머의 중요 개념과 옵션값

반응형
728x90
반응형

컨슈머 그룹

토픽의 파티션으로부터 데이터를 가져가기 위해 컨슈머를 운영하는 방법은 2가지다.

1) 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영
2) 토픽의 특정 파티션만 구독하는 컨슈머를 운영

 

 

1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영

컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있도록 도와주는 방식이다.

컨슈머 그룹으로 묶인 컨슈머들은 토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있다.

 

  • 컨슈머 1개로 이루어진 컨슈머 그룹이 4개의 파티션에 할당

https://colinch4.github.io/2021-01-15/404_consumer_core/

 

컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈때, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하다.

그리고 1개 컨슈머는 여러개의 파티션에 할당될 수 있다.

이러한 특징으로 컨슈머 그룹의 컨슈머 개수를 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야한다.

(컨슈머 그룹의 컨슈머 개수 <= 토픽의 파티션 개수)

 

  • 컨슈머 2개로 이루어진 컨슈머 그룹이 4개의 파티션에 할당

https://colinch4.github.io/2021-01-15/404_consumer_core/

 

  • 컨슈머 4개로 이루어진 컨슈머 그룹이 4개의 파티션에 할당

https://colinch4.github.io/2021-01-15/404_consumer_core/

 

예를들어,  3개의 파티션을 가진 토픽을 효과적으로 처리하기 위해서는 3개 이하의 컨슈머로 이루어진 컨슈머 그룹으로 운영해야한다.

만약 4개의 컨슈머로 이루어진 컨슈머 그룹으로 3개의 파티션을 가진 토픽에서 데이터를 가져가기 위해 할당하면 1개의 컨슈머는 파티션을 할당받지 못하고 유휴 상태로 남게된다.

파티션을 할당받지 못한 컨슈머는 스레드만 차지하고 실질적인 데이터 처리를 하지 못하므로 애플리케이션 실행에 있어 불필요한 스레드로 남게된다.

 

  • 컨슈머 5개로 이루어진 컨슈머 그룹이 4개의 파티션에 할당되어 1개의 컨슈머가 유휴 상태

https://colinch4.github.io/2021-01-15/404_consumer_core/

 

 

컨슈머 그룹 특징

컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가지고있다. 따라서 카프카 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 글부끼리 영향을 받지 않게 처리할 수 있다는 장점을 가진다.

 

운영 서버의 주요 리소스인 CPU, 메모리 정보를 수집하는 데이터 파이프라인을 구축한다고 가정해보자.
실시간 리소스를 시간순으로 확인하기 위해서 데이터를 엘라스틱서치에 저장하고 이와 동시에 대용량 적재를 위해 하둡에 적재할 것이다.
만약 카프카를 활용한 파이프라인이 아니라면 서버에서 실행되는 리소스 수집 및 전송 에이전트는 수집한 리소스를 엘라스틱서치와 하둡에 적재하기 위해 동기적으로 적재를 요청할 것이다.
이렇게 동기로 실행되는 에이전트는 엘라스틱서치 또는 하둡 둘 중 하나에 장애가 발생한다면 더는 적재가 불가능할 수 있다.

 

카프카는 이러한 파이프라인을 운영함에 있어 최종 적재되는 저장소의 장애에 유연하게 대응할 수 있다.

각기 다른 저장소에 저장되는 컨슈머를 다른 컨슈머 그룹으로 묶음으로써 각 저장소의 장애에 격리되어 운영할 수 있다.

따라서 엘라스틱서치의 장애로 인해 더는 적재가 되지 못하더라도 하둡으로 데이터를 적재하는데에는 문제가 없다.

엘라스틱서치의 장애가 해소되면 엘라스틱서치로 적재하는 컨슈머의 컨슈머 그룹은 마지막으로 적재 완료한 데이터 이후부터 다시 적재를 수행하여 최종적으로 모두 정상화될 것이다. 이렇게 데이터 파이프라인을 운영함에 있어 적절히 컨슈머 그룹을 분리하여 운영하는 것이 매우 중요하다. 

 

  • 컨슈머 그룹으로 적재 로직을 분리하여 운영
    • (group1 : 엘라스틱서치, group2 : 하둡이라고 가정하자.)

https://jhleed.tistory.com/180

동일 컨슈머 그룹으로 이루어진 컨슈머가 엘라스틱서치와 하둡에 동시 적재한다면 이전에 동기로 적재하는 에이전트와 동일한 이슈로 적재에 지연이 발생할 수 있기 때문이다. 현재 운영하고 있는 토픽의 데이터가 어디에 적재되는지, 어떻게 처리되는지 파악하고 컨슈머 그룹으로 따로 나눌 수 있는 것은 최대한 나누는 것이 좋다.

 

 

 

리밸런싱 

컨슈머 그룹의 컨슈머에 장애가 발생한다면? 리밸런싱을 수행한다.

컨슈머 그룹으로 이루어진 컨슈머들 중 일부 컨슈머에 장애가 발생하면 장애가 발생한 컨슈머에 할당된 파티션을 장애가 발생하지 않은 컨슈머에 소유권이 넘어간다.

 

리밸런싱은 아래와 같은 상황에서 발생한다.

1) 컨슈머가 추가되는 상황
2) 컨슈머가 제외되는 상황

 

컨슈머 중 1개에 이슈가 발생하여 더는 동작을 안하고 있다면 이슈가 발생한 컨슈머에 할당된 파티션은 더는 데이터 처리를 하지 못하고 있으므로 데이터 처리에 지연이 발생할 수 있다.

이를 해소하기 위해 이슈가 발생한 컨슈머를 컨슈머 그룹에서 제외하여 모든 파티션이 지속적으로 데이터를 처리할 수 있도록 가용성을 높여준다. 리밸런싱은 컨슈머가 데이터를 처리하는 도중에 언제든지 발생할 수 있으므로 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해야한다.

 

https://medium.com/11st-pe-techblog/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%BB%A8%EC%8A%88%EB%A8%B8-%EC%95%A0%ED%94%8C%EB%A6%AC%EC%BC%80%EC%9D%B4%EC%85%98-%EB%B0%B0%ED%8F%AC-%EC%A0%84%EB%9E%B5-4cb2c7550a72

 

 

 

 

그룹 조정자 (group coodinator)

리밸런싱이 유용하지만, 자주 발생하면 안된다.

리밸런싱이 발생할때 파티션의 소유권을 컨슈머로 재할당하는 과정에서 해당 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없기 때문이다. 그룹 조정자(group coodinator)는 리밸런싱을 발동시키는 역할을 하는데, 컨슈머 그룹의 컨슈머가 추가되고 삭제될 때를 감지한다. 카프카 브로커 중 한대가 그룹 조정자의 역할을 수행한다.

 

 

 

오프셋 커밋

컨슈머는 카프카 브로커로부터 데이터가 어디까지 가져갔는지 커밋(commit)을 통해 기록한다.

 

https://baek.dev/post/20/

특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽(_consumer_offsets)에 기록된다. 컨슈머 동작 이슈가 발생하여 _consume_offsets 토픽에 어느 레코드까지 읽어갔는지 오프셋 커밋이 기록되지 못했다면 데이터 처리의 중복이 발생할 수 있다.

그러므로, 데이터 처리의 중복이 발생하지 않도록 컨슈머 애플리케이션이 오프셋 커밋을 정상적으로 처리했는지 검증해야만한다.

 

 

 

비명시 오프셋 커밋

오프셋 커밋은 컨슈머 애플리케이션에서 명시적, 비명시적으로 수행할 수 있다.

기본 옵션은 poll() 메서드가 수행될때 일정 간격마다 오프셋을 커밋하도록 enable.auto.commit = true로 설정되어있다.

이렇게 일정 간격마다 자동으로 커밋되는 것을 비명시 '오프셋 커밋'이라고 부른다. 

 

auto.commit.interval.ms

poll() 메서드가 auto.commit.interval.ms에 설정된 값 이상이 지났을때 그 시점까지 읽은 레코드의 오프셋을 커밋한다.

 

비명시 오프셋 커밋은 편리하지만, poll() 메서드 호출 이후에 리밸런싱 또는 컨슈머 강제 종료 발생시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있는 가능성이 있는 취약한 구조를 가지고 있다.

그러므로 데이터 중복이나 유실을 허용하지 않는 서비스라면 자동 커밋을 사용해서는 안된다.

 

 

명시 오프셋 커밋

poll() 메서드 호출 이후에 반환받은 데이터의 처리가 완료되고 commitSync() 메서드를 호출하면 된다.

commitSync() 메서드는 poll() 메서드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행한다.

commitSync() 메서드는 브로커에 커밋 요청을 하고 커밋이 정상적으로 처리되었는지 응답하기까지 기다리는데 이는 컨슈머의 처리량에 영향을 끼친다. 데이터 처리 시간에 비해 커밋 요청 및 응답에 시간이 오래 걸린다면 동일 시간당 데이터 처리량이 줄어들기 때문이다.

이를 해결하기 위해 commitSync() 메서드를 사용하여 커밋 요청을 전송하고 응답이 오기 전까지 데이터 처리를 수행할 수 있다. 

하지만 비동기 커밋은 커밋 요청이 실패했을 경우 현재 처리중인 데이터의 순서를 보장하지 않으며 데이터의 중복이 발생할 수 있다.

 

 

 

컨슈머 내부 구조

https://medium.com/11st-pe-techblog/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%BB%A8%EC%8A%88%EB%A8%B8-%EC%95%A0%ED%94%8C%EB%A6%AC%EC%BC%80%EC%9D%B4%EC%85%98-%EB%B0%B0%ED%8F%AC-%EC%A0%84%EB%9E%B5-4cb2c7550a72

 

컨슈머는 poll 메서드를 통해 레코드를 반환하지만, poll() 메서드를 호출하는 시점에 클러스터에서 데이터를 가져오는 것은 아니다. 컨슈머 애플리케이션을 실행하게되면 내부에서 Fetcher 인스턴스가 생성되어 poll() 메서드를 호출하기 전에 미리 레코드들을 내부 큐로 가지고온다. 

이후에 사용자가 명시적으로 poll() 메서드를 호출하면 컨슈머는 내부 큐에 있는 레코드들을 반환받아 처리를 수행한다.

 

 

 

컨슈머 주요 옵션

컨슈머 애플리케이션을 실행할때 설정해야하는 필수 옵션과 선택 옵션이 있다.

필수 옵션은 반드시 설정해야하는 옵션이고, 선택 옵션은 사용자의 설정을 필수로 받지 않고 옵션의 기본값을 따른다.

 

필수옵션
  • bootstrap.servers

프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 hostname:port 를 1개 이상 작성한다. 2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정 가능하다.

 

  • key.deserializer

레코드의 메시지 키를 역직렬화하는 클래스를 지정한다.

 

  • value.deserializer

레코드의 메시지 값을 역직렬화화는 클래스를 지정한다.

 

 

선택옵션
  • gorup.id

컨슈머 그룹 아이디를 지정한다. subscribe() 메서드로 토픽을 구독하여 사용할때는 이 옵션을 필수로 넣어야한다. 기본값 : null

 

  • auto.offset.reset

컨슈머 그룹이 특정 파티션을 읽을때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션이다.

이미 컨슈머 오프셋이 있다면 이 옵션값은 무시된다. 

latest (default) 가장 높은 오프셋부터 읽기 시작한다. (가장 최근)
earliest 가장 낮은 오프셋부터 읽기 시작한다. (가장 오래된)
none 컨슈머 그룹이 커밋한 기록이 있는지 찾아본다. 기록이 없으면 오류를 반환하고, 있다면 기존 커밋 기록 이후 오프셋부터 읽기 시작한다.

 

  • enable.auto.commit

자동 커밋으로 할지 수동 커밋으로 할지 선택한다.

기본값 : true

 

  • auto.commit.interval.ms

자동 커밋(enable.auto.commit = true)일 경우 오프셋 커밋 간격을 지정한다.

기본값 : 5000(5초)

 

  • max.poll.records

poll() 메서드를 통해 반환되는 레코드 개수를 지정한다.

기본값: 500

 

  • session.timeout.ms

컨슈머가 브로커와 연결이 끊기는 최대 시간이다. 이 시간 내에 하트비트를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱을 시작한다. 보통 파트비트 시간 간격의 3배로 설정한다.

기본값 : 10000(10초)

 

  • heatbeat.interval.ms

하트비트를 전송하는 시간간격이다.

기본값 : 3000(3초)

 

  • max.poll.interval.ms

poll() 메서드를 호출하는 간격의 최대 시간을 지정한다.

poll() 메서드를 호출한 이후에 데이터를 처리하는데에 시간이 너무 많이 걸리는 경우 비정상으로 판단하고 리밸런싱을 시작한다.

기본값 : 300000(5분)

 

  • isolation.level

트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다.

이 옵션은 read_committed, read_uncommitted 로 설정할 수 있다.

read_committed 커밋이 완료된 레코드만 읽는다.
read_uncommitted (default) 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다.

 

 

반응형

Designed by JB FACTORY