카프카의 오프셋과 커밋

반응형
728x90
반응형

들어가며

컨슈머가 poll()을 호출할 때마다 컨슈머 그룹은 카프카에 저장되어있는 아직 읽지않은 메시지를 가져온다. 이렇게 동작할 수 있는 것은 컨슈머 그룹이 메시지를 어디까지 가져왔는지를 알 수 있기 때문이다. 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치 정보를 기록하고 있다.

 

 

커밋

카프카는 각 컨슈머 그룹의 파티션 별로 오프셋 정보를 저장하기 위한 저장소가 별도로 필요하다. 만약 특정 컨슈머가 갑자기 다운되거나 컨슈머 그룹에 새로운 컨슈머를 추가하게된다면 컨슈머 그룹 내에서 리밸런스가 발생한다.

 

리밸런스가 일어난 후 각 컨슈머들은 이전에 처리했던 토픽의 파티션이 아닌 다른 새로운 파티션에 할당된다. 이때 컨슈머는 이전의 컨슈머가 가져간 데이터의 이후 시점부터 읽어들여야 한다. 이를 어떻게 알까?

 

오프셋 : 컨슈머들이 각각의 파티션에 자신이 가져간 메시지의 위치 정보
커밋 : 각 파티션에 대해 현재 위치를 업데이트하는 동작

 

컨슈머는 새로운 파티션에 대해 가장 최근 커밋된 오프셋을 읽고 그 이후부터 메시지들을 가져온다.

 

 

 

커밋의 단점

  • 1) 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 작으면?

마지막 처리된 오프셋과 커밋된 오프셋 사이의 메시지는 중복이 발생한다.

 

  • 2) 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 크면?

마지막 처리된 오프셋과 커밋된 오프셋 사이의 모든 메시지는 누락된다.

 

 

 

자동커밋

컨슈머 옵션을 설정하면 5초마다 컨슈머는 po()을 호출할때 가장 마지막 오프셋을 자동으로 커밋한다. 기본 설정 주기는 5초이고, 해당 주기는 변경이 가능하다.

enable.auto.commit=true
문제점 상황
컨슈머는 poll()을 요청할때마다 커밋할 시간인지 아닌지 체크하고 poll() 요청으로 가져온 마지막 오프셋을 커밋한다. 마지막 커밋 이후 poll() 주기 5초 진행 중에 3초가 지난 시점에 리밸런스가 발생하였다. 이렇게되면 마지막 자동 커밋 시점으로부터 진행된 3초의 데이터들의 중복이 발생하게 된다. 

 

 

 

수동커밋

메시지 처리가 완료된 시점을 메시지를 가져온 것으로 커밋할 경우에 사용한다. 만약 컨슈머가 데이터베이스에 메시지를 저장한다고 가정한다면, 데이터베이스에 데이터가 저장되는 시점까지를 메시지를 가져온 시점이라고 보고 이때 커밋하는 경우이다. 

 

문제점 상황
메시지를 데이터베이스에 저장하는 도중에 실패하게 되었다. 마지막 커밋된 오프셋부터 메시지를 다시 가져오게되는데, 이때 메시지들이 중복으로 저장될 수 있다. 왜냐하면 데이터베이스에 3개의 메시지가 저장되고 그 다음 메시지 저장되는 시점에 에러가 발생했다면 이미 저장되어있는 3개의 메시지들을 다시 가져오게되기 때문이다.

 

카프카는 적어도 한번(중복은 있지만 손실은 없다)을 보장한다.

 

 

 

특정 파티션 할당

카프카에서는 컨슈머 그룹의 컨슈머들에게 직접 파티션을 공평하게 분배한다. 하지만 만약 특정 파티션에 대해 세밀하게 제어하기를 원한다면 특정 파티션을 할당할 수 있다. 

 

1) 키-값의 형태로 파티션에 저장되어 있고, 특정 파티션에 대한 메시지들만 가져와야하는 경우

2) 컨슈머 프로세스가 가용성이 높은 구성인 경우 카프카가 컨슈머의 실패를 감지하고 재조정할 필요 없고 자동으로 컨슈머 프로세스가 다른 시스템에서 재시작되는 경우

 

수동으로 파티션을 할당하여 메시지를 가져올 경우 컨슈머 인스턴스마다 컨슈머 그룹 아이디를 서로 다르게 설정해야한다. 만약 동일한 컨슈머 그룹 아이디를 설정하게되면, 컨슈머마다 할당된 파티션에 대한 오프셋 정보를 서로 공유하기 때문에 종료된 컨슈머의 파티션을 다른 컨슈머가 할당받아 메시지를 이어가게 되고, 오프셋을 커밋하게 되어 원치 않는 형태로 동작할 수 있다

 

 

특정 오프셋으로부터 메시지 가져오기

카프카의 컨슈머 API를 사용하게되면 메시지 중복 처리 등의 이유로 경우에 따라 오프셋 관리를 수동으로 하는 경우도 있다. 이때 수동으로 어디부터 메시지를 읽어올지를 지정해야하는데 이때 seek() 메서드를 사용한다.

 

 

 

반응형

Designed by JB FACTORY