컨슈머 토픽, 파티션 명시적 선언
PartitionAssignConsumer.java
package com.example.consumer._partition;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
@Slf4j
public class PartitionAssignConsumer {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static int PARTITION_NUMBER = 0;
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/* 명시적 오프셋 커밋 */
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
// consumer.subscribe(Arrays.asList(TOPIC_NAME));
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
// 파라미터 추가
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
// for loop 를 통해 poll() 메서드가 반환한 ConsumerRecord 데이터들을 순차적으로 처리한다.
for (ConsumerRecord<String, String> record : records) {
log.info("{}", record);
currentOffset.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null));
consumer.commitSync();
}
}
}
}
1) assign() 메서드
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
assign 메서드를 사용하여 컨슈머가 어떤 토픽, 파티션을 할당할지 명시적으로 선언할 수 있다.
TopicPartitions 인스턴스를 지닌 자바 컬렉션 타입을 파라미터로 받는다.
TopicPartition 클래스는 카프카 라이브러리 내/외부에서 사용되는 토픽, 파티션의 정보를 담는 객체로 사용된다.
test의 0번 파티션을 할당하여 레코드를 가져오는 구문이다.
subscribe() 메서드를 사용할 때와 다르게 직접 컨슈머가 특정 토픽, 특정 파티션에 할당되므로 리밸런싱하는 과정이 없다.
컨슈머 정상 종료 수행
컨슈머 애플리케이션은 안전하게 종료되어야 한다.
정상적으로 종료되지 않은 컨슈머는 세션 타임아웃이 발생할때까지 컨슈머 그룹에 남게된다.
이로 인해 실제로는 종료되었지만 더는 동작하지 않는 컨슈머가 존재하기 때문에, 파티션의 데이터는 소모되지 못하고 컨슈머 랙이 늘어나게된다. 컨슈머 랙이 늘어나면 데이터 처리 지연이 발생한다.
컨슈머를 안전하게 종료하기 위해 wakeup() 메서드를 지원한다.
CloseConsumer.java
package com.example.consumer._close;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Slf4j
public class CloseConsumer {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
private static KafkaConsumer<String, String> consumer;
public static void main(String[] args) {
// 셧다운훅(kill -TERM 프로세스번호)가 발생하면, wakeup() 메서드가 호출되어 컨슈머를 안전하게 종료한다.
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
// for loop 를 통해 poll() 메서드가 반환한 ConsumerRecord 데이터들을 순차적으로 처리한다.
for (ConsumerRecord<String, String> record : records) {
log.info("{}", record);
currentOffset.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null));
consumer.commitSync();
}
}
} catch (WakeupException we) {
log.warn("Wakeup consumer");
} finally {
consumer.close();
}
}
static class ShutdownThread extends Thread {
public void run() {
log.info("Shutdown hook");
consumer.wakeup();
}
}
}
1) 셧다운훅 발생 경우
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
셧다운훅(kill -TERM 프로세스번호)가 발생하면, wakeup() 메서드가 호출되어 컨슈머를 안전하게 종료한다.
2) wakeup() 메서드
static class ShutdownThread extends Thread {
public void run() {
log.info("Shutdown hook");
consumer.wakeup();
}
}
wakeup() 메서드를 실행하여 KafkaConsumer 인스턴스를 안전하게 종료할 수 있다.
3) wakeup 메서드 실행에 따른 try~catch 예외처리
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
...
}
} catch (WakeupException we) {
log.warn("Wakeup consumer");
} finally {
consumer.close();
}
wakeup()이 실행된 이후 poll() 메서드가 호출되면 WakeupException 예외가 발생한다.
WakeupException 예외를 받은 뒤에는 데이터 처리를 위해 사용한 자원을 해제하면 된다.
4) close() 메서드
consumer.close();
마지막에는 close() 를 호출하여 카프카 클러스터에 컨슈머가 안전하게 종료되었음을 명시적으로 알려주면, 종료가 완료되었다고 볼 수 있다.
close() 메서드는 해당 컨슈머는 더는 동작하지 않는다는 것을 명시적으로 알려준다. 컨슈머 그룹에서 이탈되고 나머지 컨슈머들이 파티션을 할당받게된다.
'Coding > Apache Kafka' 카테고리의 다른 글
[아파치 카프카 어플리케이션 프로그래밍] 13. 토픽과 파티션 (0) | 2022.05.26 |
---|---|
[아파치 카프카 어플리케이션 프로그래밍] 12. 카프카에서 제공하는 AdminClient 사용하여 정보 조회하기 (0) | 2022.05.24 |
[아파치 카프카 어플리케이션 프로그래밍] 10. 컨슈머 리밸런싱 리스너 클래스 생성하기 (0) | 2022.05.23 |
[아파치 카프카 어플리케이션 프로그래밍] 9. 컨슈머 오프셋 커밋 실습 (commit sync, commit async) (0) | 2022.05.23 |
[아파치 카프카 어플리케이션 프로그래밍] 8. 컨슈머의 중요 개념과 옵션값 (0) | 2022.05.22 |