반응형
728x90
반응형
컨슈머 랙(LAG)
토픽의 최신 오프셋(LOG-END-OFFSET)과 컨슈머 오프셋(CURRENT-OFFSET) 간의 차이다. 프로듀서는 계속해서 새로운 데이터를 파티션에 저장하고 컨슈머는 자신이 처리할 수 있는 만큼 데이터를 가져간다. 컨슈머 랙은 컨슈머가 정상 동작하는지 여부를 확인할 수 있기 때문에 컨슈머 애플리케이션을 운영한다면 필수적으로 모니터링해야한다. 컨슈머 랙을 모니터링하는 것은 카프카를 통한 데이터 파이프라인을 운영하는 데에 핵심적인 역할을 한다. 컨슈머 랙을 모니터링함으로써 컨슈머의 장애를 확인할 수 있고 파티션의 개수를 정하는 데에 참고할 수 있기 때문이다.
컨슈머 랙은 컨슈머 그룹과 토픽, 파티션별로 생성된다.
* 예시
1개의 토픽에 3개의 파티션이 있고, 1개의 컨슈머 그룹이 토픽을 구독하여 데이터를 가져가면 컨슈머 랙은 총 3개가 된다.
컨슈머 랙 증가/감소
프로듀서가 보내는 데이터 양 > 컨슈머의 데이터 처리량 | 컨슈머 랙 증가 |
프로듀서가 보내는 데이터 양 < 컨슈머의 데이터 처리량 | 컨슈머 랙 감소 (최솟값 0 : 지연이 없음) |
프로듀서의 데이터 양이 일정함에도 컨슈머의 장애로 인해 컨슈머 랙이 증가할 수도 있다. 컨슈머는 파티션 개수만큼 늘려서 병렬처리하면 파티션마다 컨슈머가 할당되어 데이터를 처리한다.
* 예시
2개의 파티션으로 구성된 토픽에 2개의 컨슈머가 각각 할당되어 데이터를 처리한다고 가정하자. 프로듀서가 보내는 데이터는 동일한데 파티션 1번의 컨슈머 랙이 늘어나는 상황이 발생한다면 1번 파티션에 할당된 컨슈머에 이슈가 발생했음을 유추할 수 있다.
컨슈머 랙 확인 방법
1) 카프카 명령어를 사용
컨슈머 그룹 이름 : my-group
kafka-consumer-groups.sh --bootstrap-server localhost:8082 --group my-group --describe
2) 컨슈머 어플리케이션에서 metrics() 메서드를 사용
package com.example.consumer._consumer_lag;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Map;
import java.util.Properties;
@Slf4j
public class LagMetricsConsumer {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
/* metrics() 메서드를 사용하여 컨슈머 랙 조회 */
for (Map.Entry<MetricName, ? extends Metric> entry : consumer.metrics().entrySet()) {
if ("record-lag-max".equals(entry.getKey().name())
|| "record-lags".equals(entry.getKey().name())
|| "record-lag-avg".equals(entry.getKey().name())) {
Metric metric = entry.getValue();
log.info(entry.getKey().name(), metric.metricValue());
}
}
}
}
3) 외부 모니터링 툴을 사용
데이터독(Datadog), 컨플루언트 컨트롤 센터(Confluent Control Center)와 같은 카프카 클러스터 종합 모니터링 툴을 사용할 수 있다. 컨슈머 랙 모니터링만을 위한 툴로 오픈소스로 공개되어있는 버로우(Burrow)도 있다.
반응형
'Coding > Apache Kafka' 카테고리의 다른 글
[아파치 카프카 어플리케이션 프로그래밍] 21. 스프링 카프카 프로듀서 (0) | 2022.06.09 |
---|---|
[아파치 카프카 어플리케이션 프로그래밍] 20. 컨슈머 배포 프로세스 (중단, 무중단) (0) | 2022.06.03 |
[아파치 카프카 어플리케이션 프로그래밍] 18. 멀티 스레드 컨슈머 - 컨슈머 멀티 워커 스레드 전략과 카프카 컨슈머 멀티 스레드 전략 (0) | 2022.06.01 |
[아파치 카프카 어플리케이션 프로그래밍] 17. 멱등성(idempotence) 프로듀서와 트랜잭션(transaction) 프로듀서 (1) | 2022.05.30 |
[아파치 카프카 어플리케이션 프로그래밍] 16. 카프카 프로듀서 - acks 옵션 (0) | 2022.05.29 |