Apache Kafka 기본 개념 총정리

반응형
728x90
반응형

카프카의 탄생

카프카 도입 전의 단점을 파악하여 카프카가 왜 탄생했는지에 대해서 알아봅시다.

 

카프카 도입 전

https://www.confluent.io/blog/event-streaming-platform-1/

1) 일반적으로 데이터를 생성하는 '소스 애플리케이션'과 생성된 데이터가 적재되는 '타깃 애플리케이션'은 연결되어있습니다.

그림을 보면 end-to-end 연결 방식으로 여러 애플리케이션이 서로 데이터를 주고받습니다.

만약, 작게 시작했던 서비스가 점점 커지면서 위 그림처럼 복잡해진다면 앞으로 어떻게 확장을 해나가야 할까요?

각 애플리케이션의 의존도가 높아지면서 확장은 어려워지고 장애 발생 가능성은 커지게되어 서비스 관리가 힘들어집니다.

 

2) 한 쪽의 서비스에 장애가 발생한다면, 그와 연결된 서비스에 모두 영향을 미칩니다.

N:1 또는 1:N 구조를 가지고있기 때문에 한 서비스에 장애가 발생한다면 그에 의존하고 있는 모든 서비스에 문제가 발생합니다.

 

3) 각기 다른 데이터 파이프라인 연결 구조를 가집니다.

서비스가 커질수록 데이터 파이프라인 연결 구조도 복잡해지기 때문에 이점 또한 확장에 비효율적입니다.

 

▶ 데이터 파이프라인이 무엇일까요?

https://velog.io/@ginee_park/%EB%8D%B0%EC%9D%B4%ED%84%B0-%ED%8C%8C%EC%9D%B4%ED%94%84%EB%9D%BC%EC%9D%B8-%EA%B5%AC%EC%B6%95-%EC%9D%B4%EB%A1%A0

 

※ 데이터 파이프라인의 역할

역할 설명
Data extracting 데이터 추출
Data transforming 데이터 변경
Data combining 데이터 결합
Data validating 데이터 검증
Data loading 데이터 적재

데이터 파이프라인은 데이터를 한 시스템에서 또다른 시스템으로 옮기는 작업을 뜻합니다.

옮겨지는 데이터가 각 지점을 순조롭게 흐르도록(flow) Data의 추출(extracting), 변경(transforming), 결합(combining), 검증(validating) 그리고 적재(loading)하는 과정들을 자동화하는 것입니다.

데이터 파이프라인의 시작은 왜(why), 어디에서(where), 어떻게(how) 데이터를 수집할 것인가부터 시작합니다. 

 

데이터 파이프라인은 ETL을 포함하는 용어입니다.

ETL은 추출(Extract), 변환(Transform), 적재(Load)의 줄임말로써, ETL 시스템은 하나의 시스템에서 데이터를 추출하고, 변환하여 데이터베이스 등에 적재합니다. 

 

카프카 도입 후

1) 발생되는 모든 데이터와 데이터 흐름을 중앙에서 관리하고 있습니다.

데이터를 보내는 서비스(프로듀서)와 데이터를 받는 서비스(컨슈머)가 명확하게 구분되어 프로듀서가 보내는 데이터를 한 곳에 모아서 처리할 수 있도록 중앙집중화되었습니다. 

 

2) 한 쪽의 서비스에 장애가 발생하더라도 카프카로 장애를 유연하게 처리할 수 있습니다.

한 쪽의 서비스에 장애가 발생하여도, 카프카의 덕분에 전체 시스템에 영향을 주지 않습니다. 

카프카는 데이터의 유실 및 중복에 대해 대처할 수 있는 여러 방법이 있습니다.

 

3) 동일한 데이터를 여러 서비스에 전달이 가능합니다.

데이터를 보내는 서비스(프로듀서)가 전송한 데이터를 여러 컨슈머들에게 전달할 수 있습니다. 

 

※ 카프카는 Pub/Sub 모델의 Message Queue입니다.

카프카는 Pub/Sub (발행/구독) 모델을 사용하며, 메시지 큐의 일종입니다.

 

 

Message Queue

메시지 큐(Messqge Queue: MQ)는 메시지 기반의 미들웨어(서로 다른 애플리케이션이 서로 통시하는데 사용되는 소프트웨어)로, 메시지를 이용하여 여러 애플리케이션 및 서비스들을 연결해줍니다.

메시지 지향 미들웨어(Message Oriented Middleware: MOM)를 구현한 시스템입니다.

 

https://tecoble.techcourse.co.kr/post/2021-09-19-message-queue/

메시지 큐는 메시지를 임시로 저장하는 간단한 버퍼라고 생각할 수도 있습니다. 메시지를 전송 및 수신하기 위해 중간에 메시지 큐를 둡니다.

메시지 큐는 데이터를 가져가는 쪽(Consumer)이 실제로 언제 메시즈를 가져가서 처리할지는 보장하지 않습니다.

이러한 비동기적 특성이 존재하여, Queue에 넣어둔 데이터들을 나중에 처리할 수 있습니다. 

* 기존 동기화 방식은 많은 메시지가 전송될 경우 병목이 생길 수 있고, 뒤에 들어오는 요청이 딜레이가 될 수 있습니다.
중간에 Message Queue를 활용하여 전송만은 따로 위임하여 순차적으로 처리할 수 있게되었습니다.

 

▶ 메시지 지향 미들웨어가 무엇일까요?

https://cerulean85.tistory.com/458

애플리케이션들을 연결하여 각 다른 애플리케이션이 서로 데이터를 교환할 수 있게 해주는 소프트웨어입니다.

구성요소 설명
클라이언트 메시지의 송/수신 중 하나의 행동만 수행할 수 있습니다.
메시지 전송되는 데이터
메시지 브로커 (broker) 브로커는 요청된 메시지를 목적지(destination)에 저장합니다.
수신측 클라이언트의 요청이 발생할때까지 보관합니다.

 

메시지 지향 미들웨어는 지점 간 메시징(point-to-point messaging)이나 게시-구독(publish-subscribe-messaging) 메시징 방식으로 통신합니다.

 

지점 간 메시징 (Message Queue 기반 패턴)

https://cerulean85.tistory.com/458

Receiver에 의해 데이터가 가져가진다면 해당 메시지는 즉시 삭제됩니다. 따라서 하나의 메시지를 여러 수신자가 소비할 수 없습니다.

구성요소 설명
발신자 (sender) 메시지 생산자
수신자 (receiver) 메시지 소비자
큐 (queue) 발신자와 수신자 간의 메시지 교환을 위한 메시지 목적지

 

게시-구독 메시징 (발행(Publish)-구독(Subscribe) 메세지)

https://cerulean85.tistory.com/458

구독자는 토픽에 저장된 메시지의 복사본을 가져가기 때문에 메시지가 즉시 삭제되지 않습니다. 

여러 구독자가 동시에 동일한 메시지를 소비할 수 있습니다.

예를들어, 카프카는 메시지가 대기열에 있어야하는 기간을 지정하여 메시지를 보존하는 정책이 존재합니다.

구성요소 설명
게시자 (publisher) 메시지 생산자
구독자 (subscriber) 메시지 소비자
토픽 (topic) 발신자와 수신자 간의 메시지 교환을 위한 메시지 목적지

 

메시지 큐의 장점은 아래와 같습니다.

장점 설명
비동기 (Asynchronous) Queue에 넣기 때문에 나중에 처리할 수 있습니다.
비동조 (Decoupling) 애플리케이션과 분리할 수 있습니다.
탄력성 (Resilience) 일부가 실패 시 전체에 영향을 받지 않습니다.
과잉 (Redundancy) 실패할 경우 재실행 가능합니다.
보증 (Guarantees) 작업이 처리된걸 확인할 수 있습니다.
확장성 (Scalable) 다수의 프로세스들이 큐에 메시지를 보낼 수 있습니다.

 

 

Pub/Sub 모델 (Publish/Subscribe) 

우리는 카프카 도입 후의 중앙집중화된 변화를 확인했습니다.

중앙에 메시징 시스템 서버(카프카)를 두고 메시지를 주고받는 형태의 통신을 Pub/Sub 모델이라고 합니다.

카프카는 Pub/Sub 모델을 기반으로 만들어진 메세징 시스템이기 때문에 우리는 Pub/Sub 모델에 대해서도 이해하고 넘어가야 합니다.

https://aws.amazon.com/ko/blogs/korea/introducing-amazon-sns-fifo-first-in-first-out-pub-sub-messaging/

 

▶ 발행자 (publisher) : 메시지를 보내는 쪽

발행자는 구독자의 정보를 알 필요가 없으며, 발행자-구독자 간의 의존성 또한 없습니다.

발행자는 메시지를 특정 채널(TOPIC)에 전송하는 역할만 처리합니다.

 

▶ 구독자 (subscriber) : 메시지를 받는 쪽

구독자는 발행자가 전송한 메시지 중 자신이 원하는 메시지를 읽어갑니다.

구독자는 발행자의 변경에 대해 아무런 영향을 끼치지 않으며, 오로지 자신이 원하는 메시지만 가져오면 됩니다.

 

※ 그렇다면 카프카에서는 Pub/Sub 모델을 어떻게 반영했을까요?

https://needjarvis.tistory.com/599

카프카는 Publish/Subscribe의 명칭을 아래와 같이 정했습니다.

Pub/Sub 모델 카프카
publisher producer (프로듀서)
subscriber consumer (컨슈머)

 

 

카프카 기본개념

앞으로 우리가 공부할 카프카의 기본 개념은 아래와 같습니다. 차근차근 알아보도록 합시다.

차례대로 공부해야하므로 우선 아래의 용어들은 간단하게 숙지하고 넘어갑니다.

Zookeeper : 카프카 브로커를 하나의 클러스터로 코디네티이하는 분산 코디네이션 시스템 
Kafka Broker : 카프카 애플리케이션이 설치되어 있는 서버 또는 노드
Topic : 카프카 데이터 저장소 
Partition : 각 토픽 당 데이터를 분산 처리하는 단위
Offset : 파티션 내의 각 레코드를 고유하게 식별하는 순차적인 ID
Producer : 메시지를 Broker의 Topic에 전달하는 애플리케이션
Consumer : Broker의 Topic에 저장된 메시지를 가져가는 애플리케이션
- 컨슈머 그룹 리밸런싱 (Rebalancing) : 컨슈머 그룹 내의 각 컨슈머의 파티션 소유권 이관 작업
Broker Partition Replication : 파티션 복제 기능
ISR :  Reader, Follower 파티션이 모두 동기화된 상태
- 컨슈머 랙 : 토픽의 최신 오프셋(LOG-END-OFFSET)과 컨슈머 오프셋(CURRENT-OFFSET) 간의 차이

 

 

 

Zookeeper

주키퍼는 분산 애플리케이션을 위한 코디네이션 시스템입니다. 

즉, 분산 애플리케이션이 안정적인 서비스를 할 수 있도록 분산되어 있는 각 애플리케이션의 정보를 중앙에 집중하고 구성 관리, 동기화 등의 서비스를 제공합니다.

https://engkimbs.tistory.com/660

주키퍼는 서버 여러대를 앙상블(클러스터)로 구성하고, 서버들의 상태 정보들을 주키퍼의 지노드(znode)에 key-value 형태로 저장합니다. 

 

▶ 지노드(znode)란 무엇일까요?

https://cornswrold.tistory.com/523

데이터를 저장하기 위한 공간 이름으로, 저장하는 데이터 크기는 매우 작으며, 자식 노드를 가지고 있는 계층형 구조로 구성되어 있습니다.

지노드의 데이터가 변경될 때마다 지노드의 버전 정보가 증가합니다. 

지노드에 저장되는 데이터는 메모리에 저장되므로 처리량이 매우 크고 속도가 빠릅니다.

 

▶ 최근에 Apache Kafka에서 Zookeeper을 제거한다는 소식이 있습니다.

https://www.itworld.co.kr/news/235784

 

아파치 카프카에서 ‘주키퍼’ 빠진다…

분산 이벤트 스트리밍 플랫폼 ‘아파치 카프카(Apache Kafka)’의 메타데이터 관리 도구 ‘주키퍼(ZooKeeper)’가 단계적으로 제거될 예정이다.  ⓒ Getty Images Bank아파치 카프카 프로젝트 관리 위원회

www.itworld.co.kr

 

주키퍼 클러스터에 대해 좀더 알아봅시다.

주키퍼는 기본적으로 다수의 노드를 가진 클러스터 구조에서 replicated mode로 수행되어야 합니다.

이러한 주키퍼의 클러스터 구조를 '앙상블'이라고 부릅니다.

 

1) 고가용성

주키퍼는 앙상블 내에서 과반수의 컴퓨터가 운영중인 동안에만 서비스를 제공할 수 있습니다.

따라서 앙상블에는 일반적으로 홀수의 노드를 권장합니다.

예를들어, 3개의 노드가 있을때 1개의 노드에 장애가 발생하면 과반수인 2개의 노드가 동작하므로 서비스를 유지할 수 있습니다.

 

2) 주키퍼는 앙상블의 과반수 노드에 복제를 보장합니다.

3개의 노드가 있을때 1개의 노드는 리더로 선출됩니다. 따라서 리더(leader)와 나머지인 팔로워(follower)로 구분됩니다.

리더의 데이터 변경시 나머지 팔로워들에 데이터 동기화가 발생합니다.

따라서 리더 노드에 장애가 발생했을시, 나머지 팔로워들 중에 하나를 선정하여 리더로 선출합니다.

 

 

Kafka Broker

카프카 브로커는 '실행된 카프카 애플리케이션 서버 중 1대'입니다.

카프카 클라이언트와 데이터를 주고받기 위해 사용되는 주체이며, 브로커 서버를 여러대로 구성하여 하나의 '클러스터'로 구성합니다.

https://freedeveloper.tistory.com/396

N개의 브로커 중 1대는 컨트롤러(Controller) 기능을 수행합니다.

 

▶ 컨트롤러(Controller)란 무엇일까요?

컨트롤러는 브로커들을 관리합니다.

컨트롤러는 각 브로커들에게 담당 파티션 할당을 수행하고, 브로커들의 상태를 체크하고, 장애를 감지합니다.

 

1) 컨트롤러를 선정하는 것은 위에서 본 Zookeeper의 역할입니다. 

주키퍼는 Controller을 선정하고, 만약 선정된 Controller에 장애가 발생한다면 새 Controller를 선출합니다.

 

2) 브로커는 상태를 가지지 않기 때문에 브로커들의 상태 체크는 주키퍼가 합니다.

주키퍼는 카프카 클러스터안의 모든 브로커에 대해 상태 정보를 기록합니다.

 

카프카 클러스터로 묶인 브로커들은 프로듀서가 보낸 데이터를 안전하게 분산 저장하고 복제하는 역할을 수행합니다.

브로커의 데이터 복제(Replication)에 대해서는 파티션의 개념을 배운 다음 'Broker Partition Replication' 단계에서 자세히 알아보겠습니다.

 

 

Topic

카프카 클러스터는 토픽에 데이터를 저장합니다. 

https://velog.io/@youngerjesus/Apache-Kafka-Design

하나의 토픽에 여러 프로듀서들이 데이터를 전송할 수 있고, 여러 컨슈머들이 하나의 토픽에서 데이터를 읽어올 수 있습니다.

토픽을 효율적으로 관리하기 위해서는, 여러 파티션(Partition)으로 구성되어있어야합니다.

 

 

Partition

파티션은 토픽을 분한할 것입니다. 

https://velog.io/@youngerjesus/Apache-Kafka-Design

만약 여러 프로듀서 A, B, C가 같은 'TEST' 토픽에 데이터를 전송했다고 가정해봅시다.

토픽에 파티션이 1개라면 프로듀서 A, B, C가 보내는 모든 데이터를 해당 파티션이 홀로 감당해야합니다.

이러한 경우 하나의 메시지 전송 완료 이후 다음 메시지 전송이 가능해지면서 전송 속도에 영향을 주게됩니다.

 

▶ 만약 파티션이 여러개로 늘어난다면 어떻게 될까요?

여러 프로듀서 A, B, C가 한번에 데이터를 전송하더라도 'TEST' 토픽 안의 여러개의 파티션들에 나눠져서 병렬로 처리될 수 있습니다.

 

▶ 적절한 파티션의 수는 어떻게 정할까요?

토픽의 파티션 수를 정할때 원하는 목표 처리량의 기준을 잡아야 합니다.

 

(예시)
* 프로듀서 입장 - 목표 처리량의 기준 : 파티션 1개당 초당 10개의 메시지
4개의 프로듀서를 통해 각각 초당 10개의 메시지를 카프카의 토픽으로 보낸다고 하면, 카프카의 토픽에서 초당 40개의 메시지를 받아야합니다.
이런 경우에는 파티션을 4로 늘린다면 위 목표 처리량의 기준을 달성할 수 있습니다. 

* 컨슈머 입장 - 목표 처리량의 기준 : 파티션 1개당 초당 5개의 메시지
컨슈머의 입장에서 8개의 컨슈머를 통해 각각 초당 5개의 메시지를 카프카의 토픽에서 가져올 수 있다고 한다면, 토픽의 파티션 수는 컨슈머 수와 동일하게 8개로 맞추어 컨슈머마다 각각의 파티션에 접근할 수 있게 해야합니다.
  서버 수 서버당 메시지 전송 수 합계 필요 파티션 수
프로듀서 4 10메시지/초 40메시지/초 4
컨슈머 8 5메시지/초 40메시지/초 8

 

 

Offset

오프셋은 각 파티션마다 메시지가 저장되는 위치입니다.

https://medium.com/@umanking/%EC%B9%B4%ED%94%84%EC%B9%B4%EC%97%90-%EB%8C%80%ED%95%B4%EC%84%9C-%EC%9D%B4%EC%95%BC%EA%B8%B0-%ED%95%98%EA%B8%B0%EC%A0%84%EC%97%90-%EB%A8%BC%EC%A0%80-data%EC%97%90-%EB%8C%80%ED%95%B4%EC%84%9C-%EC%9D%B4%EC%95%BC%EA%B8%B0%ED%95%B4%EB%B3%B4%EC%9E%90-d2e3ca2f3c2

오프셋은 파티션마다 유니크한 값을 가집니다. 즉, 하나의 파티션 내에서만 유일한 숫자임을 보장합니다.

카프카에서는 이 오프셋으로 메시지의 순서를 보장하고, 컨슈머는 오프셋 순서대로만 데이터를 가져갈 수 있습니다.

 

 

Producer

프로듀서는 카프카 브로커로 데이터를 전송하는 역할을 수행합니다.

https://velog.io/@youngerjesus/Apache-Kafka-Design

프로듀서의 메시지 전달 과정

https://velog.io/@fj2008/%EC%B9%B4%ED%94%84%EC%B9%B4-%EA%B8%B0%EB%B3%B8-%EA%B0%9C%EB%85%90%EA%B3%BC-%EA%B5%AC%EC%A1%B0

메시지 전달 과정 설명
1. 직렬화 (Serializer) 프로듀서는 전송할 메시지를 직렬화합니다.
2. 파티셔닝 (Partitioning) 직렬화 과정을 거친 메시지는 Partitioner를 통해 토픽의 어떤 파티션에 저장될지 결정됩니다.
따로 설정하지 않으면 카프카에서 설정된 default Partitioner을 따릅니다.

default Partitioner : UniformStickyPartitioner(2.5 버전 이후)
UniformStickyPartitioner은 RoundRobinPartitioner의 단점을 개선했습니다.
UniformStickyPartitioner은 메시지 키가 없을때 파티션에 최대한 동일하게 분배하는 로직이 들어있습니다. 
RoundRobinPartitioner은 들어오는대로 파티션을 순회하면서 전송하기 때문에 배치로 묶이는 빈도가 적어, UniformStickyPartitioner가 기본 파티셔너로 설정되었습니다.
UniformStickyPartitioner는 배치로 묶인 데이터를 모두 동일한 파티션에 전송합니다.
3. 메시지 배치 (Record Accumulator) 메시지마다 매번 네트워크를 통해 전달하는 것은 비효율적이기 때문에 프로듀서는 지정된 만큼 메시지를 저장했다가 한번에 브로커로 전달합니다.
배치를 책임지는 프로듀서 내부의 Record Accumulator(RA)는 각 토픽 파티션에 대응하는 배치 Queue를 구성하고 메시지들을 Record Batch 형태로 묶어 Queue에 저장합니다.
각 배치 Queue에 저장된 레코드 배치들은 때가 되면 각각 브로커에 전달됩니다.
4. 압축 (Compression) 메시지 압축이 설정되어 있다면, 설정된 포맷에 맞춰 메시지를 압축합니다.
프로듀서가 메시지를 압축해서 전송하면 브로커는 디스크에 저장하고, 컨슈머에 보낼때도 압축한 메시지 그대로 전송합니다. 
프로듀서는 메시지를 압축해야하고, 컨슈머는 이를 다시 해제해야하므로 CPU 사용률이 올라갑니다.
하지만 I/O 비용은 줄어들기 때문에 성능 향상이 될 수도 있습니다.
5. 전달 (Sender) 최종적으로 메시지를 브로커에 전송합니다.
브로커의 응답을 기다리지 않을 경우, 전송 이후 해당 과정을 끝냅니다.
만약 브로커의 응답을 기다리도록 설정되어있다면, 메시지 전송 성공 여부를 응답으로 받습니다.
브로커에서 메시지 전송이 실패한 경우에는 설정 값에 따라 재시도를 시도하고, 재시도 횟수를 초과한 경우 예외가 발생합니다.
메시지 전송이 성공했다면, 메시지가 저장된 정보를 반환하여 이후 로직을 처리할 수 있습니다.

 

프로듀서 설정 옵션값에 대해 알아봅시다.

프로듀서에는 필수 옵션과 선택 옵션이 있는데, 선택 옵션을 설정하지 않으면 선택 옵션의 기본값(default)을 파악해서 운영해야합니다.

 

▶ 필수 옵션

옵션값 설명
bootstrap.servers 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 hostname:port 를 1개 이상 작성합니다.
2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는데에 이슈가 없도록 설정 가능합니다.
key.serializer 레코드의 메시지 키를 직렬화하는 클래스를 지정합니다.
value.serializer 레코드의 메시지 값을 직렬화화는 클래스를 지정합니다.

 

▶ 선택 옵션 중 acks

value 설명
0 default 프로듀서가 전송한 즉시 브로커에 데이터 저장 여부와 상관 없이 성공으로 판단합니다.
1 리더 파티션에 데이터가 저장되면 전송 성공으로 판단합니다.
-1 or all min.insync.replicas 개수에 해당하는 리더 파티션과 팔로워 파티션에 데이터가 저장되면 성공하는 것으로 판단합니다.

 

▶ 선택 옵션

옵션값 설명
buffer.memory 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리양을 지정합니다.
기본 값은 32MB입니다.
retries 프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정합니다.
기본값은 2147483647입니다.
batch.size 배치로 전송할 레코드 최대 용량을 지정합니다.
너무 작게 설정하면 프로듀서가 브로커로 더 자주 보내기 때문에 네트워크 부담이 있고 너무 크게 설정하면 메모리를 더 많이 사용하게 되는 점을 주의해야합니다.
기본값은 16384입니다.
linger.ms 배치를 전송하기 전까지 기다리는 최소 시간입니다.
기본값은 0입니다.
partitioner.class 레코드를 파티션에 전송할때 적용하는 파티셔너 클래스를 지정합니다.
기본값은 org.apache.kafka.clients.producer.internals.DefaultPartitioner입니다.
enable.idempotence 멱등성 프로듀서로 동작할지 여부를 설정합니다.
기본값은 false입니다.

* 멱등성 프로듀서란?
멱등성 프로듀서는 기본 프로듀서와 달리 데이터를 브로커로 전달할때 프로듀서 PID(Producer unique ID)와 시퀀스 넘버(sequence number)를 함께 전달합니다.
그러면 브로커는 프로듀서의 PID와 시퀀스 넘버를 확인하여 동일한 메시지의 적재 요청이 오더라도 단 한번만 데이터를 적재합니다.

네트워크 장애가 발생하더라도 브로커에 동일한 메시지는 적재하지 않습니다.
멱등성 프로듀서가 전송하는 데이터에 PID와 시퀀스 넘버가 있는데, 브로커는 PID와 시퀀스 넘버로 중복을 체크합니다.
transactional.id 프로듀서가 레코드를 전송할때 레코드를 트랜잭션 단위로 묶을지 여부를 설정합니다.
프로듀서의 고유한 트랜잭션 아이디를 설정할 수 있습니다.
이 값을 설정하면 트랜잭션 프로듀서를 동작합니다.
기본값은 null입니다.

 

 

Consumer

컨슈머는 카프카 브로커의 데이터를 가져오는 역할을 수행합니다.

https://velog.io/@youngerjesus/Apache-Kafka-Design

1) 카프카의 컨슈머는 Polling 구조를 가집니다.

Polling 구조를 가짐으로써, 컨슈머는 자신이 원하는 만큼의 메시지를 브로커에 요청할 수 있습니다.

 

2) 하나의 토픽에 여러개의 컨슈머들이 동시에 구독할 수 있습니다.

컨슈머가 메시지를 읽었을때 그 메시지가 삭제되지 않기 때문에 여러 컨슈머들이 하나의 동일한 메시지를 가져갈 수 있습니다.

각 컨슈머가 어느 토픽 파티션의 어느 오프셋까지 읽었는지는 어떻게 알 수 있을까요?

위에서 공부했던 'Offset'을 컨슈머 오프셋('_consumer_offset') 이라는 토픽에 저장합니다.

이러한 특징 덕분에, 장애가 발생하더라도 다시 구동이 되면 컨슈머 오프셋에 저장된 정보를 통해 어디서부터 메시지를 읽어야하는지 알 수 있어서 안정적인 메시지 polling이 가능합니다.

 

3) 컨슈머 그룹이 존재합니다. 컨슈머는 하나 이상의 컨슈머가 컨슈머 그룹(Consumer Group)을 구성하여 하나의 토픽을 구독할 수 있습니다.

https://colinch4.github.io/2021-01-15/404_consumer_core/

컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가지고 있습니다.

따라서 카프카 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지 않게 처리할 수 있습니다.

 

▶ 컨슈머 그룹의 컨슈머 개수 <= 토픽의 파티션 개수

컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야합니다.

https://colinch4.github.io/2021-01-15/404_consumer_core/

위 그림은 컨슈머 5개로 이루어진 컨슈머 그룹이 4개의 파티션에 할당되어 1개의 컨슈머가 유휴 상태가 된 모습을 보여줍니다.

파티션을 할당받지 못한 컨슈머는 스레드만 차지하고 실질적인 데이터 처리를 하지 못하므로 애플리케이션 실행에 있어서 불필요한 스레드로 남게됩니다.

 

컨슈머 설정 옵션값에 대해 알아봅시다.

컨슈머에는 필수 옵션과 선택 옵션이 있는데, 선택 옵션을 설정하지 않으면 선택 옵션의 기본값(default)을 파악해서 운영해야합니다.

 

▶ 필수 옵션

옵션값 설명
bootstrap.servers 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 hostname:port 를 1개 이상 작성합니다.
2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는데에 이슈가 없도록 설정 가능합니다.
key.serializer 레코드의 메시지 키를 역직렬화하는 클래스를 지정합니다.
value.serializer 레코드의 메시지 값을 역직렬화화는 클래스를 지정합니다.

 

▶ 선택 옵션 중 auto.offset.reset

컨슈머 그룹이 특정 파티션을 읽을때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션입니다.

이미 컨슈머 오프셋이 있다면 이 옵션값은 무시됩니다.

value 설명
latest (default) 가장 높은 오프셋부터 읽기 시작합니다. (가장 최근)
earliest 가장 낮은 오프셋부터 읽기 시작합니다. (가장 오래된)
none 컨슈머 그룹이 커밋한 기록이 있는지 찾아봅니다.
기록이 없으면 오류를 반환하고, 있다면 기존 커밋 기록 이후 오프셋부터 읽기 시작합니다.

 

▶ 선택 옵션

옵션값 설명
gorup.id 컨슈머 그룹 아이디를 지정합니다.
subscribe() 메서드로 토픽을 구독하여 사용할때는 이 옵션을 필수로 넣어야합니다.
- 기본값 : null
enable.auto.commit 자동 커밋으로 할지 수동 커밋으로 할지 선택합니다.
- 기본값 : true
auto.commit.interval.ms 자동 커밋(enable.auto.commit = true)일 경우 오프셋 커밋 간격을 지정합니다.
- 기본값 : 5000(5초)
max.poll.records poll() 메서드를 통해 반환되는 레코드 개수를 지정합니다.
- 기본값: 500
session.timeout.ms 컨슈머가 브로커와 연결이 끊기는 최대 시간입니다.
이 시간 내에 하트비트를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱을 시작합니다.
- 기본값 : 10000(10초)

* 리밸런싱이란?
컨슈머 그룹의 컨슈머에 장애가 발생한다면 리밸런싱을 수행합니다.
컨슈머 그룹으로 이루어진 컨슈머들 중 일부 컨슈머에 장애가 발생한다면 장애가 발생한 컨슈머에 할당된 파티션을 장애가 발생하지 않은 컨슈머에 소유권을 넘깁니다.

- 컨슈머가 추가되는 상황
- 컨슈머가 제외되는 상황

리밸런싱은 위와 같은 상황에서 발생합니다.
컨슈머 중 1개에 이슈가 발생하여 더는 동작을 안하고 있다면 이슈가 발생한 컨슈머에 할당된 파티션은 더는 데이터 처리를 하지 못하고 있으므로 데이터 처리에 지연이 발생합니다.
이를 해소하기 위해 이슈가 발생한 컨슈머를 컨슈머 그룹에서 제외하여 모든 파티션이 지속적으로 데이터를 처리할 수 있도록 가용성을 높여줍니다. 
heatbeat.interval.ms 하트비트를 전송하는 시간간격입니다.
- 기본값 : 3000(3초)
max.poll.interval.ms poll() 메서드를 호출하는 간격의 최대 시간을 지정합니다.
poll() 메서드를 호출한 이후에 데이터를 처리하는데에 시간이 너무 많이 걸리는 경우 비정상으로 판단하고 리밸런싱을 시작합니다.
- 기본값 : 300000(5분)
isolation.level 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용합니다.
이 옵션은 read_committed, read_uncommitted 로 설정할 수 있습니다.

1) read_committed : 커밋이 완료된 레코드만 읽는다.
2) read_uncommitted (default) : 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다.

 

 

컨슈머 그룹 리밸런싱 (Rebalancing)

컨슈머 그룹 내의 컨슈머들은 각자 특정 파티션에 대한 소유권을 지니고 있습니다.

컨슈머 그룹에서 컨슈머 A, 컨슈머 B가 있을때 만약 컨슈머 A에 장애가 발생했을때 컨슈머 B에 컨슈머 A의 소유권이 넘어갑니다.

이렇게 컨슈머 그룹 내의 소유권 이관 작업을 '리밸런싱'(Rebalancing)이라고 합니다.

 

▶ 그렇다면, 누가 리밸런싱을 수행할까요?

컨슈머 그룹 코디네이터(coordinator)가 수행합니다.

코디네이터는 브로커 중 한대로 지정됩니다. 코디네이터로 지정된 브로커는 특정 상황이 오면 리밸런싱을 실시합니다.

- 컨슈머 그룹 변화 대응 : 컨슈머가 추가되거나 제외된 경우

- 새로운 파티션의 변화 대응 : 컨슈머 그룹이 구독하고 있는 토픽의 파티션이 추가되거나 변경된 경우

 

▶ 컨슈머의 장애 발생을 어떻게 감지할까요?

컨슈머 그룹의 컨슈머는 메시지를 폴링하거나 커밋할때 하트비트(Heartbeat) 메시지를 그룹 코디네이터에게 전달합니다.

그룹 코디네티어는 하트비트의 상태에 따라 컨슈머의 상태를 감지합니다.

- 하트비트 정상 : 컨슈머가 정상적으로 작동중입니다.

- 하트비트 이슈 발생 : 하트비트를 받지 못하면, 해당 컨슈머에 이상이 발생한 것으로 판단하고 리밸런싱을 실시합니다.

 

즉, 컨슈머의 장애가 감지되면 리밸런싱을 통해 파티션 소유권을 다른 파티션에게 이관하게 됩니다.

 

특정 토픽의 파티션이 추가되거나 변경이 발생한 경우, 리밸런싱을 통해 해당 파티션에 대한 소유권을 재조정해야 합니다.

리밸런싱을 통해 컨슈머 그룹 내의 컨슈머가 추가된 파티션을 구독할 수 있도록 합니다.

 

▶ 리밸런싱의 수행 과정

https://medium.com/11st-pe-techblog/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%BB%A8%EC%8A%88%EB%A8%B8-%EC%95%A0%ED%94%8C%EB%A6%AC%EC%BC%80%EC%9D%B4%EC%85%98-%EB%B0%B0%ED%8F%AC-%EC%A0%84%EB%9E%B5-4cb2c7550a72

1. 코디네이터는 컨슈머 그룹 내의 모든 컨슈머들의 파티션 소유권을 박탈합니다.

2. 컨슈머들은 각각 poll을 호출하여 조인을 요청합니다.

3. 코디네이터는 제일 먼저 joinGroup을 요청한 컨슈머를 그룹 리더로 선정하고, 선정한 리더는 파티션 정도와 컨슈머 목록을 참고하여 파티션 할당을 결정합니다. 재조정된 파티션 소유권의 정보를 다시 코디네이터에게 전달합니다.

4. 코디네이터는 재조정된 파티션 소유권을 각 파티션에게 알리고 리밸런싱을 종료합니다.

 

▶ 리밸런싱 발생이 끼치는 영향은 무엇일까요?

리밸런싱은 컨슈머의 소유권을 재조정하기 때문에 리밸런싱이 발생한 컨슈머 그룹 내의 모든 컨슈머의 읽기 작업은 중단됩니다.

컨슈머의 일시적인 서비스 중단이 발생하므로 데이터 처리에 지연이 발생할 수 있습니다.

리밸런싱은 컨슈머가 데이터를 처리하는 도중에 언제든지 발생할 수 있으므로 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해야합니다.

 

 

Broker Partition Replication

데이터 복제(replication)는 카프카를 장애 허용 시스템으로 동작하도록 하는 원동력입니다.

복제의 이유는 클러스터로 묶인 브로커 중 일부에 장애가 발생하더라도 데이터를 유실하지 않고 안전하게 사용하기 위함입니다.

 

우리는 위에서 'Broker'에 대해 공부를 했습니다.

브로커 서버를 여러대로 구성하여 하나의 '클러스터'로 구성한다고 했습니다. 

Kafka Broker의 파티션 복제가 어떤 방식으로 발생되는지 알아봅시다.

 

카프카의 데이터 복제는 파티션 단위로 이루어집니다.

토픽을 생성할때 파티션의 복제 개수(replication factor)도 같이 설정되는데, 직접 옵션을 선택하지 않으면 브로커에 설정된 옵션값으로 설정됩니다.

옵션값 설명
복제개수의 최솟값 1 (복제 없음)
복제 개수의 최댓값 브로커 개수

 

▶ 브로커별 파티션의 복제 과정

브로커가 3개 존재한다고 가정합시다.

브로커 0 브로커 1 브로커 2
토픽 토픽 토픽
파티션1
(리더)
파티션1
(팔로워)
파티션1
(팔로워)

 

복제된 파티션은 리더(leader)와 팔로워(follower)로 구성됩니다.

  • 리더 : 프로듀서 또는 컨슈머와 직접 통신하는 파티션
  • 팔로워 : 나머지 복제 데이터를 가지고 있는 파티션

 

팔로워 파티션들은 리더 파티션의 오프셋을 확인하여 현재 자신이 가지고있는 오프셋과 차이가 나는 경우 리터 파티션으로부터 데이터를 가져와서 자신의 파티션에 저장합니다.

파티션 복제로 인해 나머지 브로커에도 파티션의 데이터가 복제되므로 복제 개수만큼의 저장 용량이 증가하게 됩니다.

그러나 복제를 통해 데이터를 안전하게 사용할 수 있으므로, 이러한 단점을 감수하고 복제를 수행합니다.

그러므로 운영시에 복제 개수를 2개 이상으로 정하는 것이 중요합니다.

(데이터가 일부 유실되어도 무관하고 데이터 처리 속도가 중요하다면 1 또는 2로 설정해도 됩니다.)

 

▷ 이때, 리더인 브로커 0번에 장애가 발생했습니다.

브로커 0 브로커 1 브로커 2
토픽 토픽 토픽
파티션1
(리더)
파티션1
(리더)
파티션1
(팔로워)

리더 파티션은 더이상 사용할 수 없으므로 팔로워 파티션 중 하나가 리더 파티션의 지위를 넘겨받습니다.

이를 통해 데이터가 유실되지 않고 컨슈머나 프로듀서와 데이터를 주고받도록 동작할 수 있습니다.

운영 시에는 데이터 종류마다 다른 복제 개수를 설정하고 상황에 따라서는 토픽마다 복제 개수를 다르게 설정하여 운영하기도 합니다.

 

 

ISR (In-Sync-Replicas)

리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 뜻합니다.

 

▶ 복제 개수가 2인 토픽을 가정해봅시다.

리터 파티션 1개와 팔로워 파티션 1개가 존재합니다.

리더 파티션에 0부터 3의 오프셋이 있다고 가정할때, 팔로워 파티션에 동기화가 완료되려면 0부터 3까지 오프셋이 존재해야합니다.

https://blossun.github.io/infra/kafka/02_%EC%B9%B4%ED%94%84%EC%B9%B4-%ED%95%B5%EC%8B%AC-%EC%9A%94%EC%86%8C-3%EA%B0%80%EC%A7%80/

동기화가 완료됐다는 의미는 리더 파티션의 모든 데이터가 팔로워 파티션에 복제된 상태를 말합니다.

리더 파티션과 팔로워 파티션이 동기화된 상태에서는 리더 또는 팔로워 파티션이 위치하는 브로커에 장애가 발생하더라도 데이터를 안전하게 사용할 수 있습니다.

 

팔로워 파티션이 리더 파티션으로부터 데이터를 복제하는 데에 시간이 걸립니다.

프로듀서가 특정 파티션에 데이터를 저장하는 작업은 리더 파티션을 통해 처리하고, 이때 리더 파티션에 새로운 레코드가 추가되어 오프셋이 증가하면 팔로워 파티션이 위치한 브로커는 리더 파티션의 데이터를 복제합니다.

리더 파티션에 데이터가 적제된 이후 팔로워 파티션이 복제하는 시간차 때문에 리더 파티션과 팔로워 파티션 간에 오프셋 차이가 발생할 수 있습니다.

 

▶ 옵션값

1) replica.lag.time.max.ms

이런 차이를 모니터링하기 위해 리더 파티션은 replica.lag.time.max.ms 값 만큼의 주기를 가지고 팔로워 파티션이 데이터를 복제하는지 확인합니다. 만약 팔로워 파티션이 replica.lag.time.max.ms 값보다 더 긴 시간동안 데이터를 가져가지 않는다면 해당 팔로워 파티션에 문제가 생긴것으로 판단하고 ISR 그룹에서 제외합니다.

 

ISR로 묶인 리더 파티션과 팔로워 파티션은 파티션에 존재하는 데이터가 모두 동일하기 때문에 팔로워 파티션은 리더 파티션으로 새로 선출될 자격을 가집니다.

반면, ISR로 묶이지 못한 팔로워 파티션은 리더로 선출될 자격이 없습니다.

 

2) unclean.leader.election.enable

일부 데이터 유실이 발생하더라도 서비스를 중단하지 않고 지속적으로 토픽을 사용하고 싶다면 ISR이 아닌 팔로워 파티션을 리더로 선출하도록 설정할 수 있습니다. 

토픽별로 설정이 가능합니다.

설명 설정
false ISR이 아닌 팔로워 파티션을 리더 파티션으로 선출하지 않습니다.
리더 파티션이 존재하는 브로커가 다시 시작되기까지 기다리며 기다리는 동안, 토픽을 사용하는 서비스가 중단됩니다.
대신 동기화되지 않은 팔로워 파티션이 리더로 선출되지 않기 때문에 데이터의 유실은 발생하지 않습니다.
데이터가 유실되면 안되는 경우
true ISR이 아닌 팔로워 파티션, 즉 동기화가 되지 않은 팔로워 파티션도 리더로 선출될 수 있습니다.
리더 파티션이 존재하는 브로커에서 장애가 발생하고 동기화되지 않은 팔로워 파티션이 리더로 선출되면 리더 파티션으로부터 동기화가 되지 않은 일부 데이터는 유실될 수 있습니다.
대신 서비스 중단은 없습니다.
일부 데이터가 유실되더라도 토픽과 연동 중인 서비스의 무중단 운영이 더 중요할 경우

 

 

컨슈머 랙 (LAG)

토픽의 최신 오프셋(LOG-END-OFFSET)과 컨슈머 오프셋(CURRENT-OFFSET) 간의 차이입니다.

프로듀서는 계속해서 새로운 데이터를 파티션에 저장하고 컨슈머는 자신이 처리할 수 있는 만큼 데이터를 가져갑니다.

 

▶ 컨슈머 랙은 컨슈머 그룹과 토픽, 파티션 별로 생성됩니다.

1개의 토픽에 3개의 파티션이 있고, 1개의 컨슈머 그룹이 토픽을 구독하여 데이터를 가져가면 컨슈머 랙은 총 3개가 됩니다. 

 

▶ 컨슈머 랙의 증가/감소

처리량 컨슈머 랙
프로듀서가 보내는 데이터 양 > 컨슈머의 데이터 처리량 컨슈머 랙 증가
프로듀서가 보내는 데이터 양 < 컨슈머의 데이터 처리량 컨슈머 랙 감소
(최솟값 0 : 지연이 없음)
프로듀서가 보내는 데이터 양은 일정
컨슈머의 장애 발생
컨슈머 랙 증가

 

 

반응형

Designed by JB FACTORY