반응형
728x90
반응형
카프카의 AdminClient 제공
실제 운영환경에서는 프로듀서와 컨슈머를 통해 데이터를 주고받는것 만큼 카프카에 설정된 내부 옵션을 설정하고 확인하는것이 중요하다.
내부 옵션을 확인하는 가장 확실한 방법은 브로커 중 한대에 접속하여 카프카 브로커 옵션을 확인하는 것이다. 이는 매우 번거롭다.
카프카 클라이언트에서는 내부 옵션들을 설정하거나 조회하기 위해 AdminClient 클래스를 제공한다.
이 클래스를 활용하면 클러스터의 옵션과 관련된 부분을 자동화할 수 있다.
활용예시
- 카프카 컨슈머를 멀티 스레드로 생성할때, 구독하는 파티션 개수만큼 스레드를 생성하고 싶을때, 스레드 생성 전에
해당 토픽의 파티션 개수를 어드민 API를 통해 가져올 수 있다. - AdminClient 클래스로 구현한 웹 대시보드를 통해 ACL(Access Control List)이 적용된 클러스터의 리소스 접근 권한
규칙 추가를 할 수 있다. - 특정 토픽의 데이터양이 늘어남을 감지하고 AdminClient 클래스로 해당 토픽의 파티션을 늘릴 수 있다.
예제파일
KafkaAdminClient.java
package com.example.consumer._admin;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@Slf4j
public class KafkaAdminClient {
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
public static void main(String[] args) throws Exception {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
/** AdminClient 사용 */
AdminClient admin = AdminClient.create(configs);
/* 조회1 */
log.info("== Get broker information");
for (Node node : admin.describeCluster().nodes().get()) {
log.info("node : {}", node);
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
describeConfigs.all().get().forEach((broker, config) -> {
config.entries().forEach(configEntry -> log.info(configEntry.name() + "= " + configEntry.value()));
});
}
/* 조회2 */
log.info("== Get default num.partitions");
for (Node node : admin.describeCluster().nodes().get()) {
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
Config config = describeConfigs.all().get().get(cr);
Optional<ConfigEntry> optionalConfigEntry = config.entries().stream().filter(v -> v.name().equals("num.partitions")).findFirst();
ConfigEntry numPartitionConfig = optionalConfigEntry.orElseThrow(Exception::new);
log.info("{}", numPartitionConfig.value());
}
/* 조회3 */
log.info("== Topic list");
for (TopicListing topicListing : admin.listTopics().listings().get()) {
log.info("{}", topicListing.toString());
}
/* 조회4 */
log.info("== test topic information");
Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
log.info("{}", topicInformation);
/* 조회5 */
log.info("== Consumer group list");
ListConsumerGroupsResult listConsumerGroups = admin.listConsumerGroups();
listConsumerGroups.all().get().forEach(v -> {
log.info("{}", v);
});
/* close */
// 어드민 API 는 사용하고 나면 명시적으로 종료 메서드를 호출하여 리소스가 낭비되지 않도록 한다.
// AdminClient 클래스의 close() 메서드를 사용하면 명시적으로 종료할 수 있다.
admin.close();
}
}
1) AdminClient 사용
AdminClient admin = AdminClient.create(configs);
2) 브로커 정보
/* 조회1 */
log.info("== Get broker information");
for (Node node : admin.describeCluster().nodes().get()) {
log.info("node : {}", node);
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
describeConfigs.all().get().forEach((broker, config) -> {
config.entries().forEach(configEntry -> log.info(configEntry.name() + "= " + configEntry.value()));
});
}
3) default 파티션 개수
/* 조회2 */
log.info("== Get default num.partitions");
for (Node node : admin.describeCluster().nodes().get()) {
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
Config config = describeConfigs.all().get().get(cr);
Optional<ConfigEntry> optionalConfigEntry = config.entries().stream().filter(v -> v.name().equals("num.partitions")).findFirst();
ConfigEntry numPartitionConfig = optionalConfigEntry.orElseThrow(Exception::new);
log.info("{}", numPartitionConfig.value());
}
4) 토픽 리스트
log.info("== Topic list");
for (TopicListing topicListing : admin.listTopics().listings().get()) {
log.info("{}", topicListing.toString());
}
5) 토픽 정보
log.info("== test topic information");
Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
log.info("{}", topicInformation);
6) 컨슈머 그룹 리스트
log.info("== Consumer group list");
ListConsumerGroupsResult listConsumerGroups = admin.listConsumerGroups();
listConsumerGroups.all().get().forEach(v -> {
log.info("{}", v);
});
7) AdminClient 리소스 종료
어드민 API 는 사용하고 나면 명시적으로 종료 메서드를 호출하여 리소스가 낭비되지 않도록 한다.
AdminClient 클래스의 close() 메서드를 사용하면 명시적으로 종료할 수 있다.
admin.close();
반응형
'Coding > Apache Kafka' 카테고리의 다른 글
[아파치 카프카 어플리케이션 프로그래밍] 14. 토픽 정리 정책(cleanup.policy), 토픽 삭제 정책(delete policy), 토픽 압축 정책(compact policy) (0) | 2022.05.27 |
---|---|
[아파치 카프카 어플리케이션 프로그래밍] 13. 토픽과 파티션 (0) | 2022.05.26 |
[아파치 카프카 어플리케이션 프로그래밍] 11. 컨슈머 토픽, 파티션 명시적 선언과 정상 종료 처리 실습 (0) | 2022.05.23 |
[아파치 카프카 어플리케이션 프로그래밍] 10. 컨슈머 리밸런싱 리스너 클래스 생성하기 (0) | 2022.05.23 |
[아파치 카프카 어플리케이션 프로그래밍] 9. 컨슈머 오프셋 커밋 실습 (commit sync, commit async) (0) | 2022.05.23 |