[아파치 카프카 어플리케이션 프로그래밍] 21. 스프링 카프카 프로듀서

반응형
728x90
반응형

스프링 카프카 프로듀서

'카프카 템플릿(Kafka Template)' 이라고 불리는 클래스를 사용하여 데이터를 전송할 수 있다. 카프카 템플릿은 프로듀서 팩토리(ProducerFactory) 클래스를 통해 생성할 수 있다. 카프카 템플릿을 사용하는 방법은 두가지다.

 

 

 

1) 스프링 카프카에서 제공하는 기본 카프카 템플릿을 사용한다.

  • build.gradle
dependencies {
    ...
    implementation 'org.springframework.kafka:spring-kafka'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

 

  • application.yml
spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      acks: all

위 카프카 옵션에는 필수 옵션 설정이 빠져있다.

 

* 필수 설정 옵션
bootstrap-servers
key-serializer
value-serializer

 

스프링 카프카에서 프로듀서를 사용할 경우에는 필수 옵션이 없다. 그렇기 때문에 옵션을 설정하지 않으면 bootstrap-servers는 localhost:9092, key-serializer, value-serializer는 StringSerializer로 자동 설정되어 실행된다.

 

  • SpringProducerApplication.java
import lombok.RequiredArgsConstructor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;

@SpringBootApplication
@RequiredArgsConstructor
public class SpringProducerApplication implements CommandLineRunner {
    private static String TOPIC_NAME = "test";

    private final KafkaTemplate<Integer, String> template;

    public static void main(String[] args) {
        SpringApplication.run(SpringProducerApplication.class, args);
    }

    @Override
    public void run(String... args) {
        for (int i = 0; i < 10; i++) {
            template.send(TOPIC_NAME, "test" + i);
        }

        System.exit(0);
    }
}

스프링 부트 애플리케이션을 실행하고 test0부터 test9까지 메시지 값을 클러스터로 보낸다.

 

 

 

2) 직접 사용자가 카프카 템플릿을 프로듀서 팩토리로 생성한다.

커스텀 카프카 템플릿은 프로듀서 팩토리를 통해 만든 카프카 템플릿 객체를 빈으로 등록하여 사용하는 것이다. 프로듀서에 필요한 각종 옵션을 선언하여 사용할 수 있으며 한 스프링 카프카 애플리케이션 내부에 다양한 종류의 카프카 프로듀서 인스턴스를 생성하고 싶다면 이 방식을 사용하면 된다. 

 

A클러스터로 전송하는 카프카 프로듀서와 B클러스터로 전송하는 카프카 프로듀서를 동시에 사용할 수 있다.

커스텀 카프카 템플릿을 사용하여 2개의 카프카 템플릿을 빈으로 등록하여 사용할 수 있다.

 

  • KafkaTemplateConfiguration.java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaTemplateConfiguration {
    /**
     * KafkaTemplate 빈 객체 등록
     * Bean name : customKafkaTemplate
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> customKafkaTemplate() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);

        /* 빈 객체로 사용할 KafkaTemplate 인스턴스를 초기화하고 리턴한다. */
        return new KafkaTemplate<>(pf);
    }
}

 

  • SpringProducerApplication.java
import lombok.RequiredArgsConstructor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;

@SpringBootApplication
@RequiredArgsConstructor
public class SpringProducerApplication implements CommandLineRunner {
    private static String TOPIC_NAME = "test";

    private final KafkaTemplate<Integer, String> customKafkaTemplate;

    public static void main(String[] args) {
        SpringApplication.run(SpringProducerApplication.class, args);
    }

    @Override
    public void run(String... args) {
        ListenableFuture<SendResult<Integer, String>> future = customKafkaTemplate.send(TOPIC_NAME, "test");
        future.addCallback(new KafkaSendCallback<Integer, String>() {
            /* 프로듀서가 보낸 데이터의 브로커 적재 여부를 비동기로 확인 */
            @Override // 정상일 경우
            public void onSuccess(SendResult<Integer, String> result) {

            }

            @Override // 이슈가 발생했을 경우
            public void onFailure(KafkaProducerException ex) {

            }
        });

        System.exit(0);
    }
}

1) ListenableFuture 

ListenableFuture 인스턴스에 addCallback 함수를 붙여 프로듀서가 보낸 데이터의 브로커의 적재 여부를 비동기로 확인할 수 있다.

onSuccess 정상적으로 적재되었을 경우 호출된다.
onFailure 적재되지 않고 이슈가 발생했을 경우 호출된다.

 

 

KafkaTemplate 종류

ReplyKafkaTemplate

컨슈머가 특정 데이터를 전달받았는지 여부를 확인할 수 있다.

 

RoutingKafkaTemplate

전송하는 토픽별로 옵션을 다르게 설정할 수 있다.

 

 

send() 메서드 

데이터 전송 메서드 오버로딩 제공 설명
send(String topic, K key, V data) 메시지 키, 메시지 값을 포함하여 특정 토픽으로 전달
send(String topic, Integer partition, K key, V data) 메시지 키, 메시지 값이 포함된 레코드를 특정 토픽의 특정 파티션으로 전달
send(String topic, Integer partition, Long timestamp, K key, V data) 메시지 키, 메시지 값, 타임스탬프가 포함된 레코드를 특정 토픽의 특정 파티션으로 전달
send(ProducerRecord<K, V> record) 프로듀서 레코드(ProducerRecord) 객체를 전송

 

 

반응형

Designed by JB FACTORY