[아파치 카프카 어플리케이션 프로그래밍] 4. 프로젝트 생성하여 카프카 프로듀서 실행

반응형
728x90
반응형

Topic 생성

  • 1) docker kafka 컨테이너 접속
docker container exec -it kafka bash

 

  • 2) 토픽 생성
    • 토픽명 : test
kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1

 

 

예제파일 만들기

build.gradle

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter'
	implementation 'org.springframework.kafka:spring-kafka'
	runtimeOnly 'com.h2database:h2'
}

 

SimpleProducer.java

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

@Slf4j
public class SimpleProducer {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 메시지 키, 값을 직렬화하기 위한 직렬화 클래스 선언
        // String 객체를 전송하므로 String 을 직렬화하는 클래스인 카프카의 라이브러리의 StringSerializer 사용
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        String messageValue = "testMessage";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);

        // '배치 전송'
        // 파라미터로 들어간 record를 프로듀서 내부에 가지고 있다가, 배치 형태로 묶어서 브로커에 전송한다.
        RecordMetadata recordMetadata = producer.send(record).get();
        log.info("result : " + recordMetadata.toString());
        log.info("send : " + record);

        producer.flush();
        producer.close();

    }
}

 

result log
  • test : 토픽명
  • 0 : 파티션
  • 1 : 오프셋
[main] INFO com.example.producer.SimpleProducer - result : test-0@1

 

 

메시지 키를 가진 데이터 전송

@Slf4j
public class SimpleProducer {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 메시지 키, 값을 직렬화하기 위한 직렬화 클래스 선언
        // String 객체를 전송하므로 String 을 직렬화하는 클래스인 카프카의 라이브러리의 StringSerializer 사용
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        String messageValue = "testMessage";
//        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);

        // 메시지 키가 포함된 레코드 전송 (토픽 이름, 메시지 키, 메시지 값 순서)
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", messageValue);

        // '배치 전송'
        // 파라미터로 들어간 record를 프로듀서 내부에 가지고 있다가, 배치 형태로 묶어서 브로커에 전송한다.
        RecordMetadata recordMetadata = producer.send(record).get();
        log.info("result : " + recordMetadata.toString());
        log.info("send : " + record);

        producer.flush();
        producer.close();

    }
}

 

아래와 같이 코드가 변경되었다.

// 메시지 키가 포함된 레코드 전송 (토픽 이름, 메시지 키, 메시지 값 순서)
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", messageValue);

 

컨슈머 확인

kafka-console-consumer.sh --bootstrap-server localhost:9092 
--topic test --property print.key=true -- property key.separator="-" --from-beginning

 

반응형

Designed by JB FACTORY