[아파치 카프카 어플리케이션 프로그래밍] 22. 스프링 카프카 컨슈머

반응형
728x90
반응형

스프링 카프카 컨슈머

스프링 카프카의 컨슈머는 기존 컨슈머를 2개의 타입으로 나누고 커밋을 7가지로 나누어 세분화했다.

리스너의 종류에 따라 한번 호출하는 메서드에서 처리하는 레코드의 개수가 달라진다. 

 

타입

레코드 리스너 (MessageListener)

- 단 1개의 레코드를 처리한다.

- 스프링 카프카 컨슈머의 기본 리스너이다.

 

배치 리스너 (BatchMessageListener)

- 기존 카프카 클라이언트 라이브러리의 poll() ㅔㅁ서드로 리턴받은 CnsumerRecords처럼 한번에 여러개 레코드들을 처리한다.

 

그 외

매뉴얼 커밋을 사용할 경우 Acknowledging 붙은 리스너를 사용하고, KafkaConsumer 인스턴스에 직접 접근하여 컨트롤하고 싶다면 ConsumerAware가 붙은 리스너를 사용하면 된다.

 

타입 리스너 이름 상세설명
RECORD MessageListener Record의 인스턴스 단위로 프로세싱, 오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우
AcknowledgingMessageListener Record 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하는 경우
ConsumerAwareMessageListener Record 인스턴스 단위로 프로세싱, 컨슈머 객체를 활용하고 싶은 경우
AcknowledgingConsumerAwareMessageListener Record 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우
BATCH BatchMessageListener Records 인스턴스 단위로 프로세싱, 오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우
BatchAcknowledgingMessageListener REcords 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하는 경우
BatchConsumerAwareMessageListener Records 인스턴스 단위로 프로세싱, 컨슈머 객체를 활용하고 싶은 경우
BatchAcknowledgingConsumerAwareMessageListener Records 인스턴스 단위로 프로세싱, 매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우

 

 

 

7가지 커밋

카프카 클라이언트 라이브러리에서 컨슈머를 구현할때 가장 어려운 부분이 커밋을 구현하는 것이다. 카프카 컨슈머에서 커밋을 직접 구현할때 실제 운영 환경에서 다양한 종류의 커밋을 구현해서 사용한다. 스프링 카프카에서는 사용자가 사용할만한 커밋의 종류를 7가지로 세분화하고 미리 로직을 만들어놓았다.

 

스프링 카프카에서는 커밋이라 부르지 않고, 'AckMode'라고 부른다. 프로듀서에서 사용하는 acks 옵션과 동일한 어원인 Acknowledgment를 사용하므로 AckMode와 acks를 혼동하지 않도록 주의해야한다.

 

스프링 카프카 컨슈머의 AckMode 기본값은 BATCH이고, 컨슈머의 enable.auto.commit 옵션은 false 로 지정된다.

AcksMode 설명
RECORD 레코드 단위로 프로세싱 이후 커밋
BATCH poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋 (스프링 카프카 컨슈머의 AckMode 기본값)
TIME 특정 시간 이후에 커밋
시간 간격을 선언하는 AckTIme도 함께 설정되어야한다.
COUNT 특정 개수만큼 레코드가 처리된 이후에 커밋
레코드 개수를 선언하는 AckCount도 함께 설정되어야한다.
COUNT_TIME TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋
MANUAL Acknowlegement, acknowledge() 메서드가 호출되면 다음번 poll() 때 커밋을 한다.
매번 acknowledge() 메서드를 호출하면 BATCH 옵션과 동일하게 동작한다.
이 옵션을 사용할 경우에는 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야한다.
MANUAL_IMMEDIATE Acknowledgement, acknowledge() 메서드를 호출한 즉시 커밋한다.
이 옵션을 사용할 경우에는 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야한다.

 

 

 

기본 리스너 컨테이너

기본 리스너 컨테이너는 기본 리스너 컨테이너 팩토리를 통해 생성된 리스너 컨테이너를 사용한다. 

 

application.yml
# record-listener
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092

 

SpringConsumerApplication.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
        
@SpringBootApplication
@Slf4j
public class SpringConsumerApplication {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringConsumerApplication_Batch.class);
        application.run(args);
    }

    /**
     * 가장 기본적인 리스너
     * poll()이 호출되어 가져온 레코드들은 차례대로 개별 레코드의 메시지 값을 파라미터로 받게된다.
     * 파라미터로 컨슈머 레코드를 받기 때문에 메시지 키, 메시지 값에 대한 처리를 이 메서드 안에서 수행하면 된다.
     * @param record
     */
    @KafkaListener(topics = "test",
            groupId = "test-group-00")
    public void recordListener(ConsumerRecord<String,String> record) {
        log.info(record.toString());
    }

    /**
     * 파라미터로 메시지 값을 받는다.
     * @param messageValue
     */
    @KafkaListener(topics = "test",
            groupId = "test-group-01")
    public void singleTopicListener(String messageValue) {
        log.info(messageValue);
    }

    /**
     * properties 옵션을 사용해서 개별 리스너에 카프카 컨슈머 옵션값을 부여할 수 있다.
     * @param messageValue
     */
    @KafkaListener(topics = "test",
            groupId = "test-group-02", properties = {
            "max.poll.interval.ms:60000",
            "auto.offset.reset:earliest"
    })
    public void singleTopicWithPropertiesListener(String messageValue) {
        log.info(messageValue);
    }

    /**
     * 2개 이상의 카프카 컨슈머 스레드를 실행하고 싶다면 concurrency 옵션을 사용하면 된다.
     * concurrency 옵션값에 해당하는만큼 컨슈머 스레드를 만들어서 병렬 처리한다.
     * 예시로, 파티션이 10개인 토픽을 구독할때 가장 좋은 효율은 concurrency 를 10으로 설정하는 것이다.
     * 이렇게 설정하면 10개 파티션에 10개의 컨슈머 스레드가 각각 할당되어 병렬 처리량이 늘어난다.
     * @param messageValue
     */
    @KafkaListener(topics = "test",
            groupId = "test-group-03",
            concurrency = "3")
    public void concurrentTopicListener(String messageValue) {
        log.info(messageValue);
    }

    /**
     * 특정 파티션만 구독하고 싶다면 topicPartitions 파라미터를 사용한다.
     * 여기에 추가로 PartitionOffset 어노테이션을 활용하면 특정 파티션의 특정 오프셋까지 지정할 수 있다.
     * 이 경우에는 그룹 아이디에 관계없이 항상 설정한 오프셋의 데이터부터 가져온다.
     * @param record
     */
    @KafkaListener(topicPartitions =
            {
                    @TopicPartition(topic = "test01", partitions = {"0", "1"}),
                    @TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3"))
            },
            groupId = "test-group-04")
    public void listenSpecificPartition(ConsumerRecord<String, String> record) {
        log.info(record.toString());
    }
}

 

 

 

배치 리스너

배치 리스너는 레코드 리스너와 다르게 KafkaListener로 사용되는 메서드의 파라미터를 List 또는 ConsumerRecords로 받는다. 배치 리스너를 사용하는 경우에는 파라미터를 List 또는 ConsumerRecords로 선언해야한다.

 

application.yml
# batch-listener
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
    listener:
      type: BATCH

 

SpringConsumerApplication.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.List;

@SpringBootApplication
@Slf4j
public class SpringConsumerApplication_Batch {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringConsumerApplication_Batch.class);
        application.run(args);
    }

    /**
     * 컨슈머 레코드 묶음을 파라미터로 받는다.
     * 카프카 클라이언트 라이브러리에서 poll() 메서드로 리턴받은 ConsumerRecords를 리턴받아 사용하는 것과 동일하다.
     * @param records
     */
    @KafkaListener(topics = "test",
            groupId = "test-group-01")
    public void batchListener(ConsumerRecords<String, String> records) {
        records.forEach(record -> log.info(record.toString()));
    }

    /**
     * 메시지 값들을 List 자료구조를 받아서 처리한다.
     * @param list
     */
    @KafkaListener(topics = "test",
            groupId = "test-group-02")
    public void batchListener(List<String> list) {
        list.forEach(recordValue -> log.info(recordValue));
    }

    /**
     * 2개 이상의 컨슈머 스레드로 배치 리스너를 운영할 경우에 concurrency 옵션을 함께 사용한다.
     * 3으로 설정하면 3개의 컨슈머 스레드가 생성된다.
     * @param records
     */
    @KafkaListener(topics = "test",
            groupId = "test-group-03",
            concurrency = "3")
    public void concurrentBatchListener(ConsumerRecords<String, String> records) {
        records.forEach(record -> log.info(record.toString()));
    }

}

 

 

 

배치 컨슈머 리스너

배치 컨슈머 리스너는 컨슈머를 직접 사용하기 위해 컨슈머 인스턴스를 파라미터로 받는다.  동기, 비동기 커밋이나 컨슈머 인스턴스에서 제공하는 메서드들을 활용하고 싶다면 배치 컨슈머 리스너를 사용한다.

 

application.yml
# batch-commit-listener
spring:
  kafka:
    consumer:
      bootstrap-servers: my-kafka:9092
    listener:
      type: BATCH
      ack-mode: MANUAL_IMMEDIATE

 

SpringConsumerApplication.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;

@SpringBootApplication
@Slf4j
public class SpringConsumerApplication_Batch_Commit {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringConsumerApplication_Batch_Commit.class);
        application.run(args);
    }

    /**
     * 동기 커밋, 비동기 커밋을 사용하고 싶다면 컨슈머 인스턴스를 파라미터로 받아서 사용할 수 있다.
     * commitSync(), commitAsync() 메서드를 호출하면 사용자가 원하는 타이밍에 커밋할 수 있도록 로직을 추가할 수 있다.
     * 리스너가 커밋을 하지 않도록 AckMode를 MANUAL, MANUAL_IMMEDIATE로 설정해야한다.
     * @param records
     * @param consumer
     */
    @KafkaListener(topics = "test", groupId = "test-group-02")
    public void consumerCommitListener(ConsumerRecords<String, String> records, Consumer<String, String> consumer) {
        records.forEach(record -> log.info(record.toString()));
        consumer.commitSync();
    }

}

 

 

 

배치 커밋 리스너

배치 커밋 리스너는 컨테이너에서 관리하는 AckMode를 사용하기 위해 Acknowledgement 인스턴스를 파라미터로 받는다. Acknowledgement 인스턴스는 커밋을 수행하기 위한 한정적인 메서드만 제공한다. 컨슈머 컨테이너에서 관리하는 AckMode를 사용하여 커밋하고 싶다면 배치 커밋 리스너를 사용하면 된다.

 

application.yml
# batch-commit-listener
spring:
  kafka:
    consumer:
      bootstrap-servers: my-kafka:9092
    listener:
      type: BATCH
      ack-mode: MANUAL_IMMEDIATE

 

SpringConsumerApplication.java
package com.example.springconsumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;

@SpringBootApplication
@Slf4j
public class SpringConsumerApplication_Batch_Commit {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringConsumerApplication_Batch_Commit.class);
        application.run(args);
    }

    /**
     * AckMode를 MANUAL, MANUAL_IMMEDIATE로 사용할 경우에는 수동 커밋을 해야한다.
     * 수동 커밋을 하기위해 파라미터로 Acknowledgment 인스턴스를 받아야한다.
     * acknowledge() 를 호출함으로써 커밋을 수행한다.
     * @param records
     * @param ack
     */
    @KafkaListener(topics = "test", groupId = "test-group-01")
    public void commitListener(ConsumerRecords<String, String> records, Acknowledgment ack) {
        records.forEach(record -> log.info(record.toString()));
        ack.acknowledge();
    }
}

 

 

 

커스텀 리스너 컨테이너

서로 다른 설정을 가진 2개 이상의 리스너를 구현하거나 리밸런스 리스너를 구현하기 위해서는 커스텀 리스너 컨테이너를 사용해야한다.

 

KafkaListenerContainerFactory
커스텀 리스너 컨테이너를 만들기 위해서 스프링 카프카에서 카프카 리스너 컨테이너 팩토리 인스턴스를 생성해야한다.


카프카 리스너 컨테이너 팩토리를 빈으로 등록하고 KafkaListener 어노테이션에서 커스텀 리스너 컨테이너 팩토리를 등록하면 커스텀 리스너 컨테이너를 사용할 수 있다.

 

ListenerContainerConfiguration.java
package com.example.springconsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
 * 서로 다른 설정을 가진 2개 이상의 리스너를 구현하거나 리밸런스 리스너를 구현하기 위해서는
 * 커스텀 리스너 컨테이너를 사용해야한다.
 * 커스텀 리스너 컨테이너를 만들기 위해서 스프링 카프카에서 카프카 리스너 컨테이너 팩토리 인스턴스를 생성해야한다. (KafkaListenerContainerFactory)
 * 카프카 리스너 컨테이너 팩토리를 빈으로 등록하고 KafkaListener 어노테이션에서 커스텀 리스너 컨테이너 팩토리를 등록하면 커스텀 리스너 컨테이너를 사용할 수 있다.
 */
@Configuration
public class ListenerContainerConfiguration {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> customContainerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        /* DefaultKafkaConsumerFactory 는 리스너 컨테이너 팩토리를 생성할때 컨슈머 기본 옵션을 설정하는 용도로 사용된다. */
        DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props);

        /* ConcurrentKafkaListenerContainerFactory 는 리스너 컨테이너를 만들기위해 사용한다. */
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        /* 리밸런스 리스너를 선언한다. */
        factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
            /**
             * 커밋이 되기 전에 리밸런스가 발생했을때
             * @param consumer the consumer.
             * @param partitions the partitions.
             */
            @Override
            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {

            }

            /**
             * 커밋이 된 후에 리밸런스가 발생했을때
             * @param consumer the consumer.
             * @param partitions the partitions.
             */
            @Override
            public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

            }

            @Override
            public void onPartitionsLost(Collection<TopicPartition> partitions) {

            }
        });
        /* 레코드 리스너를 사용함을 명시 */
        factory.setBatchListener(false);
        /* AckMode 설정 */
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        /* 컨슈머 팩토리 설정 */
        factory.setConsumerFactory(cf);

        return factory;
    }
}

 

SpringConsumerApplication.java
package com.example.springconsumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;

@SpringBootApplication
@Slf4j
public class SpringConsumerApplication_Custom {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringConsumerApplication_Custom.class);
        application.run(args);
    }

    /**
     * customContainerFactory 를 옵션값으로 설정한다.
     * 커스텀 컨테이너 팩토리로 생성된 커스텀 리스너 컨테이너를 사용할 수 있다.
     * @param data
     */
    @KafkaListener(topics = "test",
            groupId = "test-group",
            containerFactory = "customContainerFactory")
    public void customListener(String data) {
        log.info(data);
    }
}

 

반응형

Designed by JB FACTORY