SpringBoot + Docker Kafka 연동하여 Producer, Consumer 설정하여 Message 주고받기

반응형
728x90
반응형

Kafka in Docker 설치

Docker 에 Kafka, Zookeeper 설치는 이전 포스팅을 참고바란다.

https://devfunny.tistory.com/429?category=820624 

 

docker로 Kafka, Zookeeper 설치 (with docker-compose)

Docker 이미지 설치 1) Kafka 설치 docker pull wurstmeister/kafka 2) zookeeper 설치 docker pull wurstmeister/zookeeper docker-compose 파일 생성 1) docker-compose.yml 파일 생성 (local PC에 경로는 자유..

devfunny.tistory.com

 

 

 

프로젝트 구조

  • 1) Producer 역할의 Service

- ProducerConfig.java

- TestProducer.java

 

  • 2) Consumer 역할의 Service

- ConsumerConfig.java

- TestConfig.java

 

 

 

Producer 프로젝트 셋팅

  • 1) pom.xml 
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

 

  • 2) ProducerConfig.java 파일 생성
@EnableKafka
@Configuration
public class ProducerConfig {
    /**
     * 접속하고자하는 카프카 정보
     * @return
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

 

 

Consumer 프로젝트 셋팅

  • 1) pom.xml 
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

 

  • 2) ConsumerConfig.java 파일 생성
@EnableKafka
@Configuration
public class ConsumerConfig {
    /**
     * 접속하고자하는 카프카 정보
     * @return
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupOne"); // 카프카에서 토픽에 쌓여져있는 메시지를 가져가는 컨슈머 그룹화 명
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                        = new ConcurrentKafkaListenerContainerFactory<>();

        /* 위 컨슈머 등록 */
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }
}

 

 

 

Producer 에서 메시지 보내기

  • 1) TestProducer.java
@Service
@Slf4j
@RequiredArgsConstructor
public class TestProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final testTopic = "testTopic";

    public void save(TestDto testDto) {
        ObjectMapper mapper = new ObjectMapper();
        String resultString = "";

        try {
            jsonInString = mapper.writeValueAsString(testDto);

        } catch (JsonProcessingException je) {
            je.printStackTrace();
        }

        kafkaTemplate.send(testTopic, resultString);
    }
}

 

  • 2) 위 kafkaProducer 클래스의 send 메서드를 호출
@RequiredArgsConstructor
public class TestController {

    private final TestProducer testProducer;
    
    @PostMapping("/test")
    public String test(TestDto testDto) {
    	testProducer.save(testDto);
        ...
    }

 

/test API를 호출하게 되면 토픽에 메시지를 보낸다. 해당 메시지가 보내질때마다 아래 Consumer 설정을 통해 메소드가 실행됨을 확인할 수 있다.

 

 

 

Consumer 에서 메시지 받기

  • 1) TestConsumer.java
@Service
@Slf4j
@RequiredArgsConstructor
public class TestConsumer {

    @KafkaListener(topics = "testTopic") // 토픽명
    public void testMessage(String returnValue) { // message 를 토픽에서 가져온다.
        log.info("returnValue : " + returnValue);
    }
}

 

@KafkaListener 이 선언되어 있으므로, 해당 토픽에 메시지가 오면 해당 메소드를 실행하게된다. (구독)

 

 

 

 

 

반응형

Designed by JB FACTORY