[아파치 카프카 어플리케이션 프로그래밍] 18. 멀티 스레드 컨슈머 - 컨슈머 멀티 워커 스레드 전략과 카프카 컨슈머 멀티 스레드 전략

반응형
728x90
반응형

멀티 스레드 컨슈머

카프카는 처리량을 늘리기 위해 파티션과 컨슈머 개수를 늘려서 운영할 수 있다. 파티션을 여러개로 운영하는 경우 데이터를 병렬처리하기 위해서 파티션 개수와 컨슈머 개수를 동일하게 맞추는 것이 가장 좋은 방법이다. 

 

토픽의 파티션은 1개 이상으로 이루어져 있으며 1개의 파티션은 1개의 컨슈머가 할당되어 데이터를 처리할 수 있다.

파티션 개수가 n개라면 동일 컨슈머 그룹으로 묶인 컨슈머 스레드를 최대 n개 운영할 수 있다.

그러므로 n개의 스레드를 가진 1개의 프로세스를 운영하거나 1개의 스레드를 가진 프로세스를 n개 운영하는 방법도 있다.

 

카프카에서 공식적으로 지원하는 라이브러리인 자바는 멀티 스레드를 지원한다. 멀티 스레드로 동작하는 멀티 컨슈머 스레드를 개발, 적용할 수 있다. 

 

 

 

고려해야할점

멀티 스레드 애플리케이션으로 개발할 경우 하나의 프로세스 내부에 스레드가 여러개 생성되어 실행되기 때문에 하나의 컨슈머 스레드에서 예외적 상황(예: OutofMemoryException)이 발생할 경우 프로세스 자체가 종료될 수 있다. 컨슈머 스레드들이 비정상적으로 종료될 경우 데이터 처리에서 중복 또는 유실이 발생할 수 있기 때문에, 다른 컨슈머 스레드에까지 영향을 미칠 수 있다.

 

 

 

스레드 활용 방식

  • 컨슈머 스레드는 1개만 실행하고 데이터 처리를 담당한느 워커 스레드(worker thread)를 여러개 실행하는 워커 스레드 전략
  • 컨슈머 인스턴스에서 poll() 메서드를 호출하는 스레드를 여러개 띄어서 사용하는 컨슈머 멀티 스레드 전략

 

 

컨슈머 멀티 워커 스레드 전략

브로커로부터 전달받은 레코드들을 병렬로 처리한다면 1개의 컨슈머 스레드로 받은 데이터들을 더욱 향상된 속도로 처리할 수 있다. 데이터를 for 반복구문으로 처리할 경우 이전 레코드의 처리가 끝날 때까지 다음 레코드는 기다리게된다. 만약 레코드별로 처리해야하는 시간이 길 경우에는 더욱 오래 기다리게 되므로 처리 속도는 더 느려진다. 멀티 스레드를 사용하면 각기 다른 레코드들의 데이터 처리를 동시에 실행할 수 있어서 처리 시간을 줄일 수 있다.

 

멀티 스레드를 생성하는 ExecutorService 자바 라이브러리를 사용하면 레코드를 병렬처리하는 스레드를 효율적으로 생성하고 관리할 수 있다. Executors를 사용하여 스레드 개수를 제어하는 스레드 풀(thread pool)을 생성할 수 있는데, 데이터 처리 환경에 맞는 스레드 풀을 사용하면 된다. 작업 이후 스레드가 종료되어야 한다면 CachedThreadPool을 사용하여 스레드를 실행한다.

 

 

예제

CustomWorker.java
package com.example.consumer._workerThread;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerWorker implements Runnable {

    private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
    private String recordValue;

    ConsumerWorker(String recordValue) {
        this.recordValue = recordValue;
    }

    @Override
    public void run() {
        logger.info("thread:{}\trecord:{}", Thread.currentThread().getName(), recordValue);
    }
}

1) Runnable 인터페이스 구현

public class ConsumerWorker implements Runnable {

Runnable 인터페이스 구현한 ConsumerWorker 클래스는 스레드로 실행되며, 생성하고 나면 Runnable 인터페이스로 오버라이드된 run() 메서드가 실행된다.

 

2) thread 이름, record 클래스 로그 출력

@Override
public void run() {
    // thread 이름, record 클래스 로그 출력
    logger.info("thread:{}\trecord:{}", Thread.currentThread().getName(), recordValue);
}

 

 

CustomWithMultiWorkerThread.java
package com.example.consumer._workerThread;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerWithMultiWorkerThread {
    private final static Logger logger = LoggerFactory.getLogger(ConsumerWithMultiWorkerThread.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        ExecutorService executorService = Executors.newCachedThreadPool(); // 필요한 만큼 스레드 풀을 늘려서 스레드를 실행하는 방식

        while (true) {
            // poll() 메서드를 통해 리턴받은 레코드들을 처리하는 스레드를 레코드마다 개별 실행한다.
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));

            for (ConsumerRecord<String, String> record : records) {
                ConsumerWorker worker = new ConsumerWorker(record.value());

                // 스레드는 execute() 로 실행되고 레코드별 로그가 출력된다.
                executorService.execute(worker);
            }
        }
    }
}

1) ExecutorService 사용

ExecutorService는 다양한 스레드 풀을 제공하는데, 여기서는 레코드를 출력하고 출력이 완료되면 스레드를 종료하도록
CachedThreadPool 을 사용했다.

ExecutorService executorService = Executors.newCachedThreadPool(); // 필요한 만큼 스레드 풀을 늘려서 스레드를 실행하는 방식

 

2) poll() 메서드 호출

poll() 메서드를 통해 리턴받은 레코드들을 처리하는 스레드를 레코드마다 개별 실행한다.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));

 

3) execute() 메서드 호출

스레드는 execute() 로 실행되고 레코드별 로그가 출력된다.

executorService.execute(worker);

 

 

스레드 사용 주의할점

스레드를 사용하면 한번 poll()를 통해 받은 데이터를 병렬처리함으로써 속도의 이점을 확실히 얻을 수 있다.  하지만 몇가지 주의사항은 있다.

 

1) 스레드를 사용함으로써 데이터 처리가 끝나지 않았음에도 커밋을 하기 때문에 리밸런싱, 컨슈머 장애 시 데이터 유실이 발생할 수 있다.

각 레코드의 데이터 처리가 끝났음을 스레드로부터 리턴받지 않고 그 다음 poll() 메서드를 호출 시에 커밋할 수 있기 때문이다.

 

2) 레코드 처리의 역전현상이 발생할 수 있다.

for 반복구문으로 스레드를 생성하므로 레코드별로 스레드의 생성은 순서대로 진행된다. 그러나 스레드의 처리 시간은 다를 수 있다. 나중에 생성된 스레드의 레코드 처리 시간이 더 짧을 경우 이전 레코드가 다음 레코드보다 나중에 처리될 수 있다.

 

레코드 처리에 있어 중복이 발생하거나, 데이터의 역전현상이 발생해도 되며 매우 빠른 처리 속도가 필요한 데이터 처리에 적합하다.

 

 

 

카프카 컨슈머 멀티 스레드 전략

하나의 파티션은 동일 컨슈머 중 최대 1개까지 할당될 수 있다. 그리고 하나의 컨슈머는 여러 파티션에 할당될 수 있다. 이런 특징을 가장 잘 살리는 방법은 1개의 애플리케이션에 구독하고자하는 토픽의 파티션 개수만큼 컨슈머 스레드 개수를 늘려서 운영하는 것이다.

 

컨슈머 스레드를 늘려서 운영하면 각 스레드에 각 파티션이 할당되며, 파티션의 레코드들을 병렬처리할 수 있다.

 

* 주의할점
토픽의 파티션 개수만큼 컨슈머 스레드를 운영하는 것이다.
컨슈머 스레드 > 파티션 개수가 되면 할당할 파티션 개수가 더는 없으므로 파티션에 할당되지 못한 컨슈머 스레드를 데이터 처리를 하지 않게 된다. 

 

 

예제

ConsumerWorker.java
package com.example.consumer._multiConsumerThread;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerWorker implements Runnable {

    private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);

    private final Properties prop;
    private final String topic;
    private final String threadName;
    private KafkaConsumer<String, String> consumer;

    ConsumerWorker(Properties prop, String topic, int number) {
        this.prop = prop;
        this.topic = topic;
        this.threadName = "consumer-thread-" + number;
    }

    @Override
    public void run() {
        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Arrays.asList(topic)); // 토픽을 명시적으로 구독하기 시작한다.
        try {
            while (true) {
                // 리턴받은 레코드들을 처리한다.
                // 이때 스레드 이름을 함께 로그에 출력함으로써 어떤 스레드가 어떤 케로드를 처리했는지 확인할 수 있다.
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    logger.info("{}", record);
                }
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            System.out.println(threadName + " trigger WakeupException");
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }
}

1) 생성자 변수에 컨슈머 옵션 prop, 토픽이름, 스레드 구별할 스레드 번호를 받는다.

ConsumerWorker(Properties prop, String topic, int number) {
    this.prop = prop;
    this.topic = topic;
    this.threadName = "consumer-thread-" + number;
}

 

2) KafkaConsumer 클래스

KafkaConsumer 클래스는 스레드 세이프 하지 않다.
그래서 스레드별로 KafkaConsumer 인스턴스를 별개로 만들어서 운영해야만한다.
만약 KafkaConsumer 인스턴스를 여러 스레드에서 실행하면 ConcurrentModificationException 예외가 발생한다.

consumer = new KafkaConsumer<>(prop);

 

3) poll() 메서드 호출

리턴받은 레코드들을 처리한다.
이때 스레드 이름을 함께 로그에 출력함으로써 어떤 스레드가 어떤 케로드를 처리했는지 확인할 수 있다.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

 

 

MultiConsumerThreadByPartition.java
package com.example.consumer._multiConsumerThread;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiConsumerThreadByPartition {
    private final static Logger logger = LoggerFactory.getLogger(MultiConsumerThreadByPartition.class);

    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        int CONSUMER_COUNT = 3;
        logger.info("Set thread count : {}", CONSUMER_COUNT);

        // newCachedThreadPool 는 내부 작업이 완료되면 스레드를 종료한다.
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);

            // 3개의 컨슈머 스레드를 execute() 메서드를 통해 실행한다.
            executorService.execute(worker);
        }
    }
}

1) newCachedThreadPool() 사용

newCachedThreadPool 는 내부 작업이 완료되면 스레드를 종료한다.

ExecutorService executorService = Executors.newCachedThreadPool();

 

2) execute() 메서드 호출

3개의 컨슈머 스레드를 execute() 메서드를 통해 실행한다.

executorService.execute(worker);

 

 

반응형

Designed by JB FACTORY