컨슈머 토픽, 파티션 명시적 선언 PartitionAssignConsumer.java package com.example.consumer._partition; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.*; @Slf4j public class PartitionAssignConsumer { private final static Strin..
리밸런싱 개념 아래 포스팅에서 리밸런싱 내용을 확인하자. https://devfunny.tistory.com/757 [아파치 카프카 어플리케이션 프로그래밍] 8. 컨슈머의 중요 개념과 옵션값 컨슈머 그룹 토픽의 파티션으로부터 데이터를 가져가기 위해 컨슈머를 운영하는 방법은 2가지다. 1) 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영 2) 토픽의 특정 파티션만 구독하는 컨슈머 devfunny.tistory.com 예제 컨슈머 그룹에서 컨슈머가 추가 또는 제거되면 파티션을 컨슈머에 재할당하는 과정인 리밸런스가 일어난다. poll() 메서드를 통해 반환받은 데이터를 모두 처리하기 전에 리밸런스가 발생하면 데이터를 중복 처리할 수 있다. poll() 메서드를 통해 받은 데이터 중 일부를 처리했으나 커밋하지..
Commit Sync Consumer CommitSyncConsumer.java @Slf4j public class CommitSyncConsumer { private final static String TOPIC_NAME = "test"; private final static String BOOTSTRAP_SERVERS = "localhost:9092"; private final static String GROUP_ID = "test-group"; public static void main(String[] args) { Properties configs = new Properties(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_S..
컨슈머 그룹 토픽의 파티션으로부터 데이터를 가져가기 위해 컨슈머를 운영하는 방법은 2가지다. 1) 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영 2) 토픽의 특정 파티션만 구독하는 컨슈머를 운영 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영 컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있도록 도와주는 방식이다. 컨슈머 그룹으로 묶인 컨슈머들은 토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있다. 컨슈머 1개로 이루어진 컨슈머 그룹이 4개의 파티션에 할당 컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈때, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하다. 그리고 1개 컨슈머는 여러개의 파티션에 할당될 수 있다. 이러한 특징으로 컨슈머 그룹의 컨슈머..
프로듀서 실행 바로가기 https://devfunny.tistory.com/746?category=829528 [아파치 카프카 어플리케이션 프로그래밍] 4. 프로젝트 생성하여 카프카 프로듀서 실행 Topic 생성 1) docker kafka 컨테이너 접속 docker container exec -it kafka bash 2) 토픽 생성 토픽명 : test kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 예제파.. devfunny.tistory.com Topic 생성 1) docker kafka 컨테이너 접속 docker container exec -it kafka bash 2) 토픽 생성 토픽명 :..
특정 파티션 설정 프로듀서 사용환경에 따라 특정 데이터를 가지는 레코드를 특정 파티션으로 보내야할 때가 있다. 예를 들어, Pangyo라는 값을 가진 메시지 키가 0번 파티션으로 들어가야 한다고 가정하자. 기본 설정 파티셔너를 사용할 경우 메시지 키의 해시 값을 파티션에 매칭하여 데이터를 전송하므로 어느 파티션에 들어가는지 알 수 없다. 이때 Partitioner 인터페이스를 사용하여 사용자 정의 파티셔너를 생성하면 Pangyo라는 값을 가진 메시지 키에 대해서 무조건 파티션 0번으로 지정하도록 설정할 수 있다. 이렇게 지정할 경우 토픽의 파티션이 변경되더라도 Pangyo 라는 메시지 키를 가진 데이터는 파티션 0번에 적재된다. CustomPartitioner.java public class Custom..
프로듀서 프로듀서는 카프카 브로커로 데이터를 전송할때 내부적으로 파티셔녀, 배치 생성 단계를 거친다. 전송하고자 하는 데이터는 ProducerRecord 인스턴스를 생성하여 설정한다. 필수 파라미터는 토픽과 메시지 값이다. ProducerRecord 생성시 추가 파라미터를 사용하여 오버로딩하여 ProductRecord의 내부 변수를 선언할 수 있다. 파티션 번호를 직접 지정하거나 타임스탬프를 설정, 메시지 키를 설정할 수도 있다. KafkaProducer 인스턴스가 send()를 호출하면 ProducerRecord는 파티셔녀(partitioner)에서 토픽의 어느 파티션으로 전송될 것인지 정해진다. Properties configs = new Properties(); configs.put... Kafka..
Topic 생성 1) docker kafka 컨테이너 접속 docker container exec -it kafka bash 2) 토픽 생성 토픽명 : test kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 예제파일 만들기 build.gradle dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.kafka:spring-kafka' runtimeOnly 'com.h2database:h2' } SimpleProducer.java import lombo..
카프카 브로커 카프카 클라이언트와 데이터를 주고받기 위해 사용하는 주체이자, 데이터를 분산 저장하여 장애가 발생하더라도 안전하게 사용할 수 있도록 도와주는 애플리케이션이다. 하나의 서버에는 한 개의 카프카 브로커 프로세스가 실행된다. 데이터를 안전하게 보관하고 처리하기 위해 3개 이상의 브로커 서버를 1개의 클러스터로 묶어서 운영한다. 카프카 클러스터로 묶인 브로커들은 프로듀서가 보낸 데이터 를 안전하게 분산 저장하고 복제하는 역할을 수행한다. 카프카 브로커가 프로듀서로부터 데이터를 전달받았을때 프로듀서가 요청한 토픽의 파티션에 데이터를 저장하고, 컨슈머가 데이터를 요청하면 파티션에 저장된 데이터를 전달한다. 프로듀서로부터 전달된 데이터는 파일 시스템에 저장된다. 카프카는 메모리나 데이터베이스에 저장하지..
"책으로 공부하는 Apache Kafka" 카프카 설치 교재에서는 AWS EC2를 사용하여 설치하고 있지만, Docker를 사용하여 실습해보자. https://devfunny.tistory.com/429 docker로 Kafka, Zookeeper 설치 (with docker-compose) Docker 이미지 설치 1) Kafka 설치 docker pull wurstmeister/kafka 2) zookeeper 설치 docker pull wurstmeister/zookeeper docker-compose 파일 생성 1) docker-compose.yml 파일 생성 (local PC에 경로는 자유.. devfunny.tistory.com Mac Docker 설치 https://docs.docker.co..
"책으로 공부하는 Apache Kafka" 아파치 카프카 (Apache Kafka) 아파치 카프카란, 대용량 또는 대규모 메시지 데이터를 빠르게 처리할 수 있도록 개발된 분산 메시징 플랫폼이다. 카프카는 각각의 애플리케이션끼리 연결하여 데이터를 처리하는 것이 아니라 한곳에 모아 처리할 수 있도록 중앙집중화했다. 카프카를 통해 데이터 스트림을 한 곳에서 실시간으로 관리할 수 있다. 카프카 내부에 데이터가 저장되는 파티션의 동작은 FIFO(First In First Out) 방식의 큐 자료구조와 유사하다. 프로듀서 (Producer) : 큐에 데이터를 보낸다. 컨슈머 (Consumer) : 큐에서 데이터를 가져간다. 카프카 특징 카프카를 통해 전달할 수 있는 데이터 포맷은 제한이 없다. 직렬화, 역직렬화를 ..
지노드란? 서로 연결되어있는 서버들이 상태 정보 등을 주고받는데, 이때 key-value 형식으로 카프카 지노드에 저장된다. 지노드에 저장된 key-value 를 사용하여 분산 애플리케이션이 서로 데이터를 주고받고, 이러한 지노드에 접속하여 상태 정보를 확인할 수 있는 명령어에 대해 더 자세히 알아보자. 지노드 경로 설정 카프카의 환경설정 파일에서 지노드 경로를 설정할 수 있다. vi /usr/local/kafka/config/server.properties 환경 설정 파일에서 아래 부분을 수정하자. zooleeper.connect=servername/test-kafka # test-kafka 경로로 설정 지노드 접속 /usr/local/zookeeper/bin/zkCli.sh 접속한 후, 아래 명령어를..