[아파치 카프카 어플리케이션 프로그래밍] 12. 카프카에서 제공하는 AdminClient 사용하여 정보 조회하기

반응형
728x90
반응형

카프카의 AdminClient 제공

실제 운영환경에서는 프로듀서와 컨슈머를 통해 데이터를 주고받는것 만큼 카프카에 설정된 내부 옵션을 설정하고 확인하는것이 중요하다.
내부 옵션을 확인하는 가장 확실한 방법은 브로커 중 한대에 접속하여 카프카 브로커 옵션을 확인하는 것이다. 이는 매우 번거롭다.
카프카 클라이언트에서는 내부 옵션들을 설정하거나 조회하기 위해 AdminClient 클래스를 제공한다.
이 클래스를 활용하면 클러스터의 옵션과 관련된 부분을 자동화할 수 있다.

 

활용예시

  • 카프카 컨슈머를 멀티 스레드로 생성할때, 구독하는 파티션 개수만큼 스레드를 생성하고 싶을때, 스레드 생성 전에
    해당 토픽의 파티션 개수를 어드민 API를 통해 가져올 수 있다.
  • AdminClient 클래스로 구현한 웹 대시보드를 통해 ACL(Access Control List)이 적용된 클러스터의 리소스 접근 권한
    규칙 추가를 할 수 있다.
  • 특정 토픽의 데이터양이 늘어남을 감지하고 AdminClient 클래스로 해당 토픽의 파티션을 늘릴 수 있다.

 

 

예제파일

KafkaAdminClient.java
package com.example.consumer._admin;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

@Slf4j
public class KafkaAdminClient {
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) throws Exception {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");

        /** AdminClient 사용 */
        AdminClient admin = AdminClient.create(configs);

        /* 조회1 */
        log.info("== Get broker information");
        for (Node node : admin.describeCluster().nodes().get()) {
            log.info("node : {}", node);

            ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());

            DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
            describeConfigs.all().get().forEach((broker, config) -> {
                config.entries().forEach(configEntry -> log.info(configEntry.name() + "= " + configEntry.value()));
            });
        }

        /* 조회2 */
        log.info("== Get default num.partitions");
        for (Node node : admin.describeCluster().nodes().get()) {
            ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
            DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
            Config config = describeConfigs.all().get().get(cr);
            Optional<ConfigEntry> optionalConfigEntry = config.entries().stream().filter(v -> v.name().equals("num.partitions")).findFirst();
            ConfigEntry numPartitionConfig = optionalConfigEntry.orElseThrow(Exception::new);
            log.info("{}", numPartitionConfig.value());
        }

        /* 조회3 */
        log.info("== Topic list");
        for (TopicListing topicListing : admin.listTopics().listings().get()) {
            log.info("{}", topicListing.toString());
        }

        /* 조회4 */
        log.info("== test topic information");
        Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
        log.info("{}", topicInformation);

        /* 조회5 */
        log.info("== Consumer group list");
        ListConsumerGroupsResult listConsumerGroups = admin.listConsumerGroups();
        listConsumerGroups.all().get().forEach(v -> {
            log.info("{}", v);
        });

        /* close */
        // 어드민 API 는 사용하고 나면 명시적으로 종료 메서드를 호출하여 리소스가 낭비되지 않도록 한다.
        // AdminClient 클래스의 close() 메서드를 사용하면 명시적으로 종료할 수 있다.
        admin.close();
    }
}

 

1) AdminClient 사용

AdminClient admin = AdminClient.create(configs);

 

2) 브로커 정보

/* 조회1 */
log.info("== Get broker information");
for (Node node : admin.describeCluster().nodes().get()) {
    log.info("node : {}", node);

    ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());

    DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
    describeConfigs.all().get().forEach((broker, config) -> {
        config.entries().forEach(configEntry -> log.info(configEntry.name() + "= " + configEntry.value()));
    });
}

 

3) default 파티션 개수

/* 조회2 */
log.info("== Get default num.partitions");
for (Node node : admin.describeCluster().nodes().get()) {
    ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
    DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
    Config config = describeConfigs.all().get().get(cr);
    Optional<ConfigEntry> optionalConfigEntry = config.entries().stream().filter(v -> v.name().equals("num.partitions")).findFirst();
    ConfigEntry numPartitionConfig = optionalConfigEntry.orElseThrow(Exception::new);
    log.info("{}", numPartitionConfig.value());
}

 

4) 토픽 리스트

log.info("== Topic list");
for (TopicListing topicListing : admin.listTopics().listings().get()) {
    log.info("{}", topicListing.toString());
}

 

5) 토픽 정보

log.info("== test topic information");
Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
log.info("{}", topicInformation);

 

6) 컨슈머 그룹 리스트

log.info("== Consumer group list");
ListConsumerGroupsResult listConsumerGroups = admin.listConsumerGroups();
listConsumerGroups.all().get().forEach(v -> {
    log.info("{}", v);
});

 

7) AdminClient 리소스 종료

어드민 API 는 사용하고 나면 명시적으로 종료 메서드를 호출하여 리소스가 낭비되지 않도록 한다.
AdminClient 클래스의 close() 메서드를 사용하면 명시적으로 종료할 수 있다.

admin.close();

 

 

 

반응형

Designed by JB FACTORY