[아파치 카프카 어플리케이션 프로그래밍] 6. 특정 파티션 설정, 프로듀서 전송 결과 Callback 클래스 생성하기

반응형
728x90
반응형

특정 파티션 설정

프로듀서 사용환경에 따라 특정 데이터를 가지는 레코드를 특정 파티션으로 보내야할 때가 있다.

 

예를 들어, Pangyo라는 값을 가진 메시지 키가 0번 파티션으로 들어가야 한다고 가정하자.
기본 설정 파티셔너를 사용할 경우 메시지 키의 해시 값을 파티션에 매칭하여 데이터를 전송하므로 어느 파티션에 들어가는지 알 수 없다.
이때 Partitioner 인터페이스를 사용하여 사용자 정의 파티셔너를 생성하면 Pangyo라는 값을 가진 메시지 키에 대해서 무조건 파티션 0번으로 지정하도록 설정할 수 있다.
이렇게 지정할 경우 토픽의 파티션이 변경되더라도 Pangyo 라는 메시지 키를 가진 데이터는 파티션 0번에 적재된다.

 

  • CustomPartitioner.java
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            throw new InvalidRecordException("Need message key");
        }

        // 0번으로 지정
        if (((String) key).equals("testMessage")) {
            return 0;
        }

        List<PartitionInfo> partitions =  cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        // 주어진 레코드가 들어갈 파티션 번호
        // testMessage 가 아닌 메시지 키를 가진 레코드는 해시값을 지정하여 특정 파티션에 매칭되도록 설정한다.
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

 

  • SimpleProducer.java
@Slf4j
public class SimpleProducer {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 메시지 키, 값을 직렬화하기 위한 직렬화 클래스 선언
        // String 객체를 전송하므로 String 을 직렬화하는 클래스인 카프카의 라이브러리의 StringSerializer 사용
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        String messageValue = "testMessage";
//        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);

        // 메시지 키가 포함된 레코드 전송 (토픽 이름, 메시지 키, 메시지 값 순서)
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", messageValue);

        // '배치 전송'
        // 파라미터로 들어간 record를 프로듀서 내부에 가지고 있다가, 배치 형태로 묶어서 브로커에 전송한다.
        RecordMetadata recordMetadata = producer.send(record).get();
        log.info("result : " + recordMetadata.toString());
        log.info("send : " + record);

        producer.flush();
        producer.close();

    }
}

 

아래 코드가 변경되었다.

// 메시지 키가 포함된 레코드 전송 (토픽 이름, 메시지 키, 메시지 값 순서)
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", messageValue);

 

 

 

프로듀서 전송 결과 확인

RecordMetadata recordMetadata = producer.send(record).get();

.send()

결괏값은 카프카 브로커로부터 응답을 기다렸다가 브로커로부터 응답이 오면 RecordMetadata 인스턴스를 반환한다.

 

.get()

send()가 Future 객체를 반환하는데, 이 객체는 RecordMetadata의 비동기 결과를 표현하는 것으로 ProducerRecord가 카프카 브로커에 정상적으로 적재되었는지에 대한 데이터가 포함되어있다.

get()을 사용하여 프로듀서로 보낸 데이터의 결과를 동기적으로 가져온다.

 

test1@2 -> 1: 파티션 번호, 2: 오프셋 번호

 

그러나 동기로 프로듀서의 전송 결과를 확인하는 것은 빠른 전송에 허들이 될 수 있다.

프로듀서가 전송하고 난 뒤, 브로커로부터 전송에 대한 응답 값을 받기 전까지 대기하기 때문이다.

따라서 이를 원하지 않는 경우를 위해 프로듀서는 비동기로 결과를 확인할 수 있도록 Callback 인터페이스를 제공하고 있다.

사용자는 사용자 정의 Callback 클래스를 생성하여 레코드의 전송 결과에 대응하는 로직을 만들 있다.

 

 

 

Callback 클래스 생성

  • ProducerCallback.java
@Slf4j
public class ProducerCallback implements Callback {

    /**
     * 레코드의 비동기 결과를 받기위해 사용한다.
     * 위 코드에서는 만약 브로커 적재에 이슈가 생겼을 경우 Exception 에 어떤 에러가 발생하였는지 담겨서
     * 메서드가 실행된다. 에러가 발생하지 않았을 경우에는 RecordMetadata를 통해 해당 레코드가 적재된 토픽 이름, 파티션 번호, 오프셋을 알 수 있다.
     */
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            log.info(exception.getMessage());
        } else {
            log.info(metadata.toString());
        }
    }
}

 

  • CustomPartitionerProducer.java
@Slf4j
public class CustomPartitionerProducer {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 메시지 키, 값을 직렬화하기 위한 직렬화 클래스 선언
        // String 객체를 전송하므로 String 을 직렬화하는 클래스인 카프카의 라이브러리의 StringSerializer 사용
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // custom partitioner 설정
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        String messageValue = "testMessage";
//        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);

        // 메시지 키가 포함된 레코드 전송 (토픽 이름, 메시지 키, 메시지 값 순서)
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", messageValue);

        // '배치 전송'
        // 파라미터로 들어간 record를 프로듀서 내부에 가지고 있다가, 배치 형태로 묶어서 브로커에 전송한다.
//        RecordMetadata recordMetadata = producer.send(record).get();
//        log.info("result : " + recordMetadata.toString());

        producer.send(record, new ProducerCallback()); // callback 클래스 설정
        log.info("send : " + record);

        producer.flush();
        producer.close();

    }
}

 

아래 코드가 변경되었다.

producer.send(record, new ProducerCallback()); // callback 클래스 설정

 

 

반응형

Designed by JB FACTORY