Apache Kafka

[아파치 카프카 어플리케이션 프로그래밍] 12. 카프카에서 제공하는 AdminClient 사용하여 정보 조회하기

LearnerKSH 2022. 5. 24. 12:27
728x90
반응형

카프카의 AdminClient 제공

실제 운영환경에서는 프로듀서와 컨슈머를 통해 데이터를 주고받는것 만큼 카프카에 설정된 내부 옵션을 설정하고 확인하는것이 중요하다.
내부 옵션을 확인하는 가장 확실한 방법은 브로커 중 한대에 접속하여 카프카 브로커 옵션을 확인하는 것이다. 이는 매우 번거롭다.
카프카 클라이언트에서는 내부 옵션들을 설정하거나 조회하기 위해 AdminClient 클래스를 제공한다.
이 클래스를 활용하면 클러스터의 옵션과 관련된 부분을 자동화할 수 있다.

 

활용예시

  • 카프카 컨슈머를 멀티 스레드로 생성할때, 구독하는 파티션 개수만큼 스레드를 생성하고 싶을때, 스레드 생성 전에
    해당 토픽의 파티션 개수를 어드민 API를 통해 가져올 수 있다.
  • AdminClient 클래스로 구현한 웹 대시보드를 통해 ACL(Access Control List)이 적용된 클러스터의 리소스 접근 권한
    규칙 추가를 할 수 있다.
  • 특정 토픽의 데이터양이 늘어남을 감지하고 AdminClient 클래스로 해당 토픽의 파티션을 늘릴 수 있다.

 

 

예제파일

KafkaAdminClient.java
package com.example.consumer._admin;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

@Slf4j
public class KafkaAdminClient {
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) throws Exception {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");

        /** AdminClient 사용 */
        AdminClient admin = AdminClient.create(configs);

        /* 조회1 */
        log.info("== Get broker information");
        for (Node node : admin.describeCluster().nodes().get()) {
            log.info("node : {}", node);

            ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());

            DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
            describeConfigs.all().get().forEach((broker, config) -> {
                config.entries().forEach(configEntry -> log.info(configEntry.name() + "= " + configEntry.value()));
            });
        }

        /* 조회2 */
        log.info("== Get default num.partitions");
        for (Node node : admin.describeCluster().nodes().get()) {
            ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
            DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
            Config config = describeConfigs.all().get().get(cr);
            Optional<ConfigEntry> optionalConfigEntry = config.entries().stream().filter(v -> v.name().equals("num.partitions")).findFirst();
            ConfigEntry numPartitionConfig = optionalConfigEntry.orElseThrow(Exception::new);
            log.info("{}", numPartitionConfig.value());
        }

        /* 조회3 */
        log.info("== Topic list");
        for (TopicListing topicListing : admin.listTopics().listings().get()) {
            log.info("{}", topicListing.toString());
        }

        /* 조회4 */
        log.info("== test topic information");
        Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
        log.info("{}", topicInformation);

        /* 조회5 */
        log.info("== Consumer group list");
        ListConsumerGroupsResult listConsumerGroups = admin.listConsumerGroups();
        listConsumerGroups.all().get().forEach(v -> {
            log.info("{}", v);
        });

        /* close */
        // 어드민 API 는 사용하고 나면 명시적으로 종료 메서드를 호출하여 리소스가 낭비되지 않도록 한다.
        // AdminClient 클래스의 close() 메서드를 사용하면 명시적으로 종료할 수 있다.
        admin.close();
    }
}

 

1) AdminClient 사용

AdminClient admin = AdminClient.create(configs);

 

2) 브로커 정보

/* 조회1 */
log.info("== Get broker information");
for (Node node : admin.describeCluster().nodes().get()) {
    log.info("node : {}", node);

    ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());

    DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
    describeConfigs.all().get().forEach((broker, config) -> {
        config.entries().forEach(configEntry -> log.info(configEntry.name() + "= " + configEntry.value()));
    });
}

 

3) default 파티션 개수

/* 조회2 */
log.info("== Get default num.partitions");
for (Node node : admin.describeCluster().nodes().get()) {
    ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
    DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
    Config config = describeConfigs.all().get().get(cr);
    Optional<ConfigEntry> optionalConfigEntry = config.entries().stream().filter(v -> v.name().equals("num.partitions")).findFirst();
    ConfigEntry numPartitionConfig = optionalConfigEntry.orElseThrow(Exception::new);
    log.info("{}", numPartitionConfig.value());
}

 

4) 토픽 리스트

log.info("== Topic list");
for (TopicListing topicListing : admin.listTopics().listings().get()) {
    log.info("{}", topicListing.toString());
}

 

5) 토픽 정보

log.info("== test topic information");
Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
log.info("{}", topicInformation);

 

6) 컨슈머 그룹 리스트

log.info("== Consumer group list");
ListConsumerGroupsResult listConsumerGroups = admin.listConsumerGroups();
listConsumerGroups.all().get().forEach(v -> {
    log.info("{}", v);
});

 

7) AdminClient 리소스 종료

어드민 API 는 사용하고 나면 명시적으로 종료 메서드를 호출하여 리소스가 낭비되지 않도록 한다.
AdminClient 클래스의 close() 메서드를 사용하면 명시적으로 종료할 수 있다.

admin.close();

 

 

 

반응형