Executors 클래스 - Executor 인터페이스 : 컨커런트 API의 핵심 인터페이스다. 이 인터페이스를 구현한 여러 종류의 클래스를 기본으로 제공한다. - 스레드 풀 : 스레드를 관리하기 위한 풀이다. 병렬 프로그래밍에서 스레드를 관리하기 위한 기능을 제공한다. - 포크/조인 프레임워크 : JDK7에서 새롭게 선보인 포크/조인 프레임워크를 이용하면 스레드 간의 대기와 연관 관계 등을 정의할 수 있다. java.util.concurrent 패키지에서 제공하는 Executor 인터페이스 인터페이스 설명 Executor 새로운 태스크를 생성하는데 가장 기본이 되는 인터페이스다. ExecutorService Executor 인터페이스의 하위 인터페이스다. Executor 인터페이스에서 제공하는 기능 외..
컨터런트 API 자바 5에서 처음 소개한 컨커런트 API는 스레드에서 데이터 정합성을 확보하고 멀티 스레드 환경에서 프로그래밍하기 위해 필요한 5가지 특징이 있다. 병렬 애플리케이션에서 데이터의 동기화와 정합성을 확보하기 위해 Lock 객체를 제공하며 이를 통해 잠금 기능을 사용할 수 있다. 스레드를 실행하고 관리하는 고수준 API를 사용한 Executors 클래스를 제공한다. 이 클래스는 Executor 인터페이스를 구현한 것으로 대량 데이터를 병렬 처리하기에 적합하다. 병렬 프로그램에서 대량 데이터의 정합성을 유지한채 사용하기 위한 컬렉션 프레임워크의 확장판인 컨커런트 컬렉션 클래스를 제공한다. 원자적 변수는 동기화를 위한 synchronized 키워드 사용을 최소화하여 성능을 확보하면서 메모리 정합..
Job 생성 JobExecutionDeciderConfiguration.java import lombok.RequiredArgsConstructor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import..
컬렉션의 map, filter 함수 filter, map은 리스트를 반환한다. 이 함수들은 결과 컬렉션을 즉시(eagerly) 생성한다. 이는 컬렉션 함수로 연쇄하면 매 단계마다 계산 중간 결과를 새로운 컬렉션에 임시로 담는다. package chapter5_람다로_프로그래밍._3_지연계산_컬렉션연산 data class Person(val name: String, val age: Int) fun main() { val people = listOf( Person("Alice", 29), Person("Bob", 31) ) val filter = people.map(Person::name).filter{ it.startsWith("김") } } 이는 이 연쇄 호출이 리스트를 2개 만든다는 뜻이다. 한 리스트..
Job 생성 CustomExitStatusConfiguration.java import lombok.RequiredArgsConstructor; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.conf..
Job 생성 import lombok.RequiredArgsConstructor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.bu..
스프링 카프카 컨슈머 스프링 카프카의 컨슈머는 기존 컨슈머를 2개의 타입으로 나누고 커밋을 7가지로 나누어 세분화했다. 리스너의 종류에 따라 한번 호출하는 메서드에서 처리하는 레코드의 개수가 달라진다. 타입 레코드 리스너 (MessageListener) - 단 1개의 레코드를 처리한다. - 스프링 카프카 컨슈머의 기본 리스너이다. 배치 리스너 (BatchMessageListener) - 기존 카프카 클라이언트 라이브러리의 poll() ㅔㅁ서드로 리턴받은 CnsumerRecords처럼 한번에 여러개 레코드들을 처리한다. 그 외 매뉴얼 커밋을 사용할 경우 Acknowledging 붙은 리스너를 사용하고, KafkaConsumer 인스턴스에 직접 접근하여 컨트롤하고 싶다면 ConsumerAware가 붙은 리스..
Job 생성 import lombok.RequiredArgsConstructor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.scope...
스프링 카프카 프로듀서 '카프카 템플릿(Kafka Template)' 이라고 불리는 클래스를 사용하여 데이터를 전송할 수 있다. 카프카 템플릿은 프로듀서 팩토리(ProducerFactory) 클래스를 통해 생성할 수 있다. 카프카 템플릿을 사용하는 방법은 두가지다. 1) 스프링 카프카에서 제공하는 기본 카프카 템플릿을 사용한다. build.gradle dependencies { ... implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test' } application.yml spring: kafka: producer: bootstrap-servers..
Job 생성 import lombok.RequiredArgsConstructor; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframe..
중단 배포 컨슈머 애플리케이션을 완전히 종료한 이후에 개선된 코드를 가진 애플리케이션을 배포하는 방식이다. 이 방법은 한정된 서버 자원을 운영하는 기업에 적합하다. 중단 배포는 기존 애플리케이션을 완전히 종료한 이후 신규 애플리케이션을 배포, 실행하여 버전을 올리는 방식이다. 컨슈머 애플리케이션이 완전히 종료된 이후에 신규 애플리케이션이 배포된다는 점이 중요하다. 기존 컨슈머 애플리케이션이 종료되면 더는 토픽의 데이터를 가져갈 수 없어서 컨슈머 랙이 늘어난다. 이는 지연이 발생한다는 뜻이다. 중단 배포를 사용할 경우의 장점 새로운 로직이 적용된 신규 애플리케이션의 실행 전후를 명확하게 특정 오프셋 지점으로 나눌 수 있다. 배포 시점의 오프셋을 로깅 컨슈머 랙과 파티션별 오프셋을 기록하는 버로우 사용 이러..
컨슈머 랙(LAG) 토픽의 최신 오프셋(LOG-END-OFFSET)과 컨슈머 오프셋(CURRENT-OFFSET) 간의 차이다. 프로듀서는 계속해서 새로운 데이터를 파티션에 저장하고 컨슈머는 자신이 처리할 수 있는 만큼 데이터를 가져간다. 컨슈머 랙은 컨슈머가 정상 동작하는지 여부를 확인할 수 있기 때문에 컨슈머 애플리케이션을 운영한다면 필수적으로 모니터링해야한다. 컨슈머 랙을 모니터링하는 것은 카프카를 통한 데이터 파이프라인을 운영하는 데에 핵심적인 역할을 한다. 컨슈머 랙을 모니터링함으로써 컨슈머의 장애를 확인할 수 있고 파티션의 개수를 정하는 데에 참고할 수 있기 때문이다. 컨슈머 랙은 컨슈머 그룹과 토픽, 파티션별로 생성된다. * 예시 1개의 토픽에 3개의 파티션이 있고, 1개의 컨슈머 그룹이 토픽..