[아파치 카프카 어플리케이션 프로그래밍] 10. 컨슈머 리밸런싱 리스너 클래스 생성하기

반응형
728x90
반응형

리밸런싱 개념

아래 포스팅에서 리밸런싱 내용을 확인하자.

https://devfunny.tistory.com/757

 

[아파치 카프카 어플리케이션 프로그래밍] 8. 컨슈머의 중요 개념과 옵션값

컨슈머 그룹 토픽의 파티션으로부터 데이터를 가져가기 위해 컨슈머를 운영하는 방법은 2가지다. 1) 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영 2) 토픽의 특정 파티션만 구독하는 컨슈머

devfunny.tistory.com

 

 

예제

컨슈머 그룹에서 컨슈머가 추가 또는 제거되면 파티션을 컨슈머에 재할당하는 과정인 리밸런스가 일어난다.

poll() 메서드를 통해 반환받은 데이터를 모두 처리하기 전에 리밸런스가 발생하면 데이터를 중복 처리할 있다.

poll() 메서드를 통해 받은 데이터 일부를 처리했으나 커밋하지 않았기 때문이다.

 

리밸런스 발생시 데이터를 중복 처리하지 않게 하기 위해서는 리밸런스 발생시 처리한 데이터를 기준으로 커밋을 시도해야한다.

리밸런스 발생을 감지하기 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스를 지원한다.

 

RebalanceListener.java
package com.example.consumer.rebalance;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;

@Slf4j
class RebalanceListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
    /**
     * 리밸런스가 시작되기 직전에 호출되는 메서드
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.warn("Partitions are assigned");
    }

    /**
     * 리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출되는 메서드
     */
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.warn("Partitions are revoked");
    }
}

 

1) onPartitionsRevoked 메서드

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {...}

리밸런스가 시작되기 직전에 호출된다.

 

2) onPartitionsAssigned 메서드

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {...}

 

리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출된다.

 

 

RebalanceConsumer.java
package com.example.consumer.rebalance;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

@Slf4j
public class RebalanceConsumer {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private final static String GROUP_ID = "test-group";


    private static KafkaConsumer<String, String> consumer;

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        /* 명시적 오프셋 커밋 - 리밸런스 발생시 수동 커밋을 위한 설정 */
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        consumer = new KafkaConsumer<>(configs);

        // 리밸런스 클래스를 메서드 오버라이드 변수로 포함
        consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());
        
        // 컨슈머에 할당된 파티션 확인 방법
        Set<TopicPartition> assignedTopicPartition = consumer.assignment();

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

            Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();

            for (ConsumerRecord<String, String> record : records) {
                log.info("{}", record);

                currentOffset.put(new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset() + 1, null));
                consumer.commitSync();
            }
        }
    }
}

 

1) 리밸런스 클래스 설정

consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());

리밸런스 클래스를 subscribe 메서드 오버라이드 변수로 포함한다.

 

 

반응형

Designed by JB FACTORY