반응형
728x90
반응형
Kafka in Docker 설치
Docker 에 Kafka, Zookeeper 설치는 이전 포스팅을 참고바란다.
https://devfunny.tistory.com/429?category=820624
프로젝트 구조
- 1) Producer 역할의 Service
- ProducerConfig.java
- TestProducer.java
- 2) Consumer 역할의 Service
- ConsumerConfig.java
- TestConfig.java
Producer 프로젝트 셋팅
- 1) pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 2) ProducerConfig.java 파일 생성
@EnableKafka
@Configuration
public class ProducerConfig {
/**
* 접속하고자하는 카프카 정보
* @return
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Consumer 프로젝트 셋팅
- 1) pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 2) ConsumerConfig.java 파일 생성
@EnableKafka
@Configuration
public class ConsumerConfig {
/**
* 접속하고자하는 카프카 정보
* @return
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupOne"); // 카프카에서 토픽에 쌓여져있는 메시지를 가져가는 컨슈머 그룹화 명
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
/* 위 컨슈머 등록 */
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
Producer 에서 메시지 보내기
- 1) TestProducer.java
@Service
@Slf4j
@RequiredArgsConstructor
public class TestProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final testTopic = "testTopic";
public void save(TestDto testDto) {
ObjectMapper mapper = new ObjectMapper();
String resultString = "";
try {
jsonInString = mapper.writeValueAsString(testDto);
} catch (JsonProcessingException je) {
je.printStackTrace();
}
kafkaTemplate.send(testTopic, resultString);
}
}
- 2) 위 kafkaProducer 클래스의 send 메서드를 호출
@RequiredArgsConstructor
public class TestController {
private final TestProducer testProducer;
@PostMapping("/test")
public String test(TestDto testDto) {
testProducer.save(testDto);
...
}
/test API를 호출하게 되면 토픽에 메시지를 보낸다. 해당 메시지가 보내질때마다 아래 Consumer 설정을 통해 메소드가 실행됨을 확인할 수 있다.
Consumer 에서 메시지 받기
- 1) TestConsumer.java
@Service
@Slf4j
@RequiredArgsConstructor
public class TestConsumer {
@KafkaListener(topics = "testTopic") // 토픽명
public void testMessage(String returnValue) { // message 를 토픽에서 가져온다.
log.info("returnValue : " + returnValue);
}
}
@KafkaListener 이 선언되어 있으므로, 해당 토픽에 메시지가 오면 해당 메소드를 실행하게된다. (구독)
반응형
'Coding > Apache Kafka' 카테고리의 다른 글
[아파치 카프카 어플리케이션 프로그래밍] 1. 카프카의 탄생 (0) | 2022.05.18 |
---|---|
카프카 주키퍼 지노드 명령어 정리 (0) | 2021.07.08 |
카프카의 오프셋과 커밋 (0) | 2020.12.20 |
카프카의 컨슈머 그룹 (0) | 2020.12.20 |
카프카의 리플리케이션 관리 (0) | 2020.12.20 |