[아파치 카프카 어플리케이션 프로그래밍] 19. 컨슈머 랙(LAG)

반응형
728x90
반응형

컨슈머 랙(LAG)

토픽의 최신 오프셋(LOG-END-OFFSET)과 컨슈머 오프셋(CURRENT-OFFSET) 간의 차이다. 프로듀서는 계속해서 새로운 데이터를 파티션에 저장하고 컨슈머는 자신이 처리할 수 있는 만큼 데이터를 가져간다. 컨슈머 랙은 컨슈머가 정상 동작하는지 여부를 확인할 수 있기 때문에 컨슈머 애플리케이션을 운영한다면 필수적으로 모니터링해야한다. 컨슈머 랙을 모니터링하는 것은 카프카를 통한 데이터 파이프라인을 운영하는 데에 핵심적인 역할을 한다. 컨슈머 랙을 모니터링함으로써 컨슈머의 장애를 확인할 수 있고 파티션의 개수를 정하는 데에 참고할 수 있기 때문이다. 

 

컨슈머 랙은 컨슈머 그룹과 토픽, 파티션별로 생성된다.

* 예시
1개의 토픽에 3개의 파티션이 있고, 1개의 컨슈머 그룹이 토픽을 구독하여 데이터를 가져가면 컨슈머 랙은 총 3개가 된다. 

 

컨슈머 랙 증가/감소

프로듀서가 보내는 데이터 양 > 컨슈머의 데이터 처리량 컨슈머 랙 증가
프로듀서가 보내는 데이터 양 < 컨슈머의 데이터 처리량 컨슈머 랙 감소 (최솟값 0 : 지연이 없음)

 

프로듀서의 데이터 양이 일정함에도 컨슈머의 장애로 인해 컨슈머 랙이 증가할 수도 있다. 컨슈머는 파티션 개수만큼 늘려서 병렬처리하면 파티션마다 컨슈머가 할당되어 데이터를 처리한다. 

* 예시
2개의 파티션으로 구성된 토픽에 2개의 컨슈머가 각각 할당되어 데이터를 처리한다고 가정하자. 프로듀서가 보내는 데이터는 동일한데 파티션 1번의 컨슈머 랙이 늘어나는 상황이 발생한다면 1번 파티션에 할당된 컨슈머에 이슈가 발생했음을 유추할 수 있다.

 

 

컨슈머 랙 확인 방법

1) 카프카 명령어를 사용

컨슈머 그룹 이름 : my-group

kafka-consumer-groups.sh --bootstrap-server localhost:8082 --group my-group --describe

 

2) 컨슈머 어플리케이션에서 metrics() 메서드를 사용

package com.example.consumer._consumer_lag;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Map;
import java.util.Properties;

@Slf4j
public class LagMetricsConsumer {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);

		/* metrics() 메서드를 사용하여 컨슈머 랙 조회 */
        for (Map.Entry<MetricName, ? extends Metric> entry : consumer.metrics().entrySet()) {
            if ("record-lag-max".equals(entry.getKey().name())
                || "record-lags".equals(entry.getKey().name())
                || "record-lag-avg".equals(entry.getKey().name())) {
                Metric metric = entry.getValue();
                log.info(entry.getKey().name(), metric.metricValue());
            }
        }
    }
}

 

3) 외부 모니터링 툴을 사용 

데이터독(Datadog), 컨플루언트 컨트롤 센터(Confluent Control Center)와 같은 카프카 클러스터 종합 모니터링 툴을 사용할 수 있다. 컨슈머 랙 모니터링만을 위한 툴로 오픈소스로 공개되어있는 버로우(Burrow)도 있다.

 

 

 

 

 

 

 

 

반응형

Designed by JB FACTORY