반응형
728x90
반응형
리밸런싱 개념
아래 포스팅에서 리밸런싱 내용을 확인하자.
https://devfunny.tistory.com/757
예제
컨슈머 그룹에서 컨슈머가 추가 또는 제거되면 파티션을 컨슈머에 재할당하는 과정인 리밸런스가 일어난다.
poll() 메서드를 통해 반환받은 데이터를 모두 처리하기 전에 리밸런스가 발생하면 데이터를 중복 처리할 수 있다.
poll() 메서드를 통해 받은 데이터 중 일부를 처리했으나 커밋하지 않았기 때문이다.
리밸런스 발생시 데이터를 중복 처리하지 않게 하기 위해서는 리밸런스 발생시 처리한 데이터를 기준으로 커밋을 시도해야한다.
리밸런스 발생을 감지하기 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스를 지원한다.
RebalanceListener.java
package com.example.consumer.rebalance;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
@Slf4j
class RebalanceListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
/**
* 리밸런스가 시작되기 직전에 호출되는 메서드
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.warn("Partitions are assigned");
}
/**
* 리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출되는 메서드
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.warn("Partitions are revoked");
}
}
1) onPartitionsRevoked 메서드
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {...}
리밸런스가 시작되기 직전에 호출된다.
2) onPartitionsAssigned 메서드
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {...}
리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출된다.
RebalanceConsumer.java
package com.example.consumer.rebalance;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
@Slf4j
public class RebalanceConsumer {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private final static String GROUP_ID = "test-group";
private static KafkaConsumer<String, String> consumer;
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/* 명시적 오프셋 커밋 - 리밸런스 발생시 수동 커밋을 위한 설정 */
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumer = new KafkaConsumer<>(configs);
// 리밸런스 클래스를 메서드 오버라이드 변수로 포함
consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());
// 컨슈머에 할당된 파티션 확인 방법
Set<TopicPartition> assignedTopicPartition = consumer.assignment();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
log.info("{}", record);
currentOffset.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null));
consumer.commitSync();
}
}
}
}
1) 리밸런스 클래스 설정
consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());
리밸런스 클래스를 subscribe 메서드 오버라이드 변수로 포함한다.
반응형
'Coding > Apache Kafka' 카테고리의 다른 글
[아파치 카프카 어플리케이션 프로그래밍] 12. 카프카에서 제공하는 AdminClient 사용하여 정보 조회하기 (0) | 2022.05.24 |
---|---|
[아파치 카프카 어플리케이션 프로그래밍] 11. 컨슈머 토픽, 파티션 명시적 선언과 정상 종료 처리 실습 (0) | 2022.05.23 |
[아파치 카프카 어플리케이션 프로그래밍] 9. 컨슈머 오프셋 커밋 실습 (commit sync, commit async) (0) | 2022.05.23 |
[아파치 카프카 어플리케이션 프로그래밍] 8. 컨슈머의 중요 개념과 옵션값 (0) | 2022.05.22 |
[아파치 카프카 어플리케이션 프로그래밍] 7. 프로젝트 생성하여 카프카 컨슈머 실행 (2) | 2022.05.21 |