반응형
728x90
반응형
Topic 생성
- 1) docker kafka 컨테이너 접속
docker container exec -it kafka bash
- 2) 토픽 생성
- 토픽명 : test
kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1
예제파일 만들기
build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
runtimeOnly 'com.h2database:h2'
}
SimpleProducer.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@Slf4j
public class SimpleProducer {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 메시지 키, 값을 직렬화하기 위한 직렬화 클래스 선언
// String 객체를 전송하므로 String 을 직렬화하는 클래스인 카프카의 라이브러리의 StringSerializer 사용
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
// '배치 전송'
// 파라미터로 들어간 record를 프로듀서 내부에 가지고 있다가, 배치 형태로 묶어서 브로커에 전송한다.
RecordMetadata recordMetadata = producer.send(record).get();
log.info("result : " + recordMetadata.toString());
log.info("send : " + record);
producer.flush();
producer.close();
}
}
result log
- test : 토픽명
- 0 : 파티션
- 1 : 오프셋
[main] INFO com.example.producer.SimpleProducer - result : test-0@1
메시지 키를 가진 데이터 전송
@Slf4j
public class SimpleProducer {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 메시지 키, 값을 직렬화하기 위한 직렬화 클래스 선언
// String 객체를 전송하므로 String 을 직렬화하는 클래스인 카프카의 라이브러리의 StringSerializer 사용
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
// ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
// 메시지 키가 포함된 레코드 전송 (토픽 이름, 메시지 키, 메시지 값 순서)
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", messageValue);
// '배치 전송'
// 파라미터로 들어간 record를 프로듀서 내부에 가지고 있다가, 배치 형태로 묶어서 브로커에 전송한다.
RecordMetadata recordMetadata = producer.send(record).get();
log.info("result : " + recordMetadata.toString());
log.info("send : " + record);
producer.flush();
producer.close();
}
}
아래와 같이 코드가 변경되었다.
// 메시지 키가 포함된 레코드 전송 (토픽 이름, 메시지 키, 메시지 값 순서)
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", messageValue);
컨슈머 확인
kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic test --property print.key=true -- property key.separator="-" --from-beginning
반응형
'Coding > Apache Kafka' 카테고리의 다른 글
[아파치 카프카 어플리케이션 프로그래밍] 6. 특정 파티션 설정, 프로듀서 전송 결과 Callback 클래스 생성하기 (0) | 2022.05.20 |
---|---|
[아파치 카프카 어플리케이션 프로그래밍] 5. 프로듀서의 중요 개념과 옵션값 (0) | 2022.05.19 |
[아파치 카프카 어플리케이션 프로그래밍] 3. 카프카의 기본 개념 정리 (브로커, 페이지 캐시, 리플리케이션, 컨슈머 그룹, 코디네이터, 주키퍼, 토픽, 레코드, 오프셋) (0) | 2022.05.18 |
[아파치 카프카 어플리케이션 프로그래밍] 2. 카프카 프로듀서, 컨슈머 실행해보기 (with Docker) (0) | 2022.05.18 |
[아파치 카프카 어플리케이션 프로그래밍] 1. 카프카의 탄생 (0) | 2022.05.18 |