본문 바로가기

자바 심화/TIL

Kafka - 기초

개요

MSA 통신을 위해 사용되는 메시지 브로커인 Kafka, RabbitMQ 중 Kafka에 대해 배워볼 것이다.

 

Kafka(Apache Kafka)

분산 스트리밍 플랫폼, 주로 실시간 데이터 스트리밍 처리 및 메시지 브로커 역할.

대규모의 데이터 스트림을 처리하고 저장하며, 다양한 소비자가 데이터를 실시간으로 구독할 수 있도록 설계.

 

주요 특징 및 기능

 

  1. 분산 시스템
    • Kafka는 여러 노드에 걸쳐 데이터와 작업을 분산하여 높은 처리량과 내구성을 제공한다.
  2. 내구성 및 확장성
    • 데이터는 디스크에 저장되며, 복제를 통해 장애가 발생해도 데이터 유실을 방지한다.
    • 수평 확장이 가능하여 데이터와 트래픽 증가에 대응할 수 있다.
  3. 실시간 처리
    • 데이터를 실시간으로 생산(Producer)하고 소비(Consumer)할 수 있어 빠른 응답이 필요한 애플리케이션에 적합하다.
  4. Publish-Subscribe 모델
    • Kafka는 Pub-Sub 패턴을 통해 생산자가 데이터를 주제(Topic)에 게시하고, 소비자가 이를 구독한다.
  5. 높은 처리량
    • 배치 및 스트림 처리 모두 지원하며, 대용량 데이터를 효율적으로 처리할 수 있다.
  6. 내장 파티셔닝
    • 토픽은 파티션으로 나뉘며, 데이터는 파티션 단위로 분배됩니다. 이를 통해 병렬 처리가 가능해진다.

 

장단점

  • 장점
    • 고성능: 높은 처리량과 낮은 지연 시간.
    • 내구성: 데이터를 디스크에 저장하며 복제를 지원.
    • 확장성: 노드를 추가하여 쉽게 확장 가능.
    • 유연성: 실시간 스트리밍 외에도 배치 작업 지원.
  • 단점
    • 복잡성: 설정 및 관리가 어렵고, 학습 곡선이 가파름.
    • 데이터 순서: 파티션 수준에서만 메시지 순서 보장.
    • 추가적인 구성 필요: Zookeeper 또는 Kafka Raft Protocol(KRaft) 필요.

 

주요 구성 요소

  1. Topic (토픽)
    • 데이터 분류 단위로, 프로듀서가 데이터를 전송하고 컨슈머가 데이터를 구독하는 논리적 채널.
    • 하나의 토픽은 여러 파티션으로 나뉘며, 메시지는 특정 파티션에 저장된다.
    • 메시지는 순차적 오프셋으로 저장되며, 컨슈머는 이 오프셋을 기반으로 데이터를 읽는다.
    • 특징:
      • 토픽은 읽기 전용: 데이터는 덮어쓰지 않고 새로운 데이터가 추가된다.
      • 데이터를 보존 기간(Retention Period) 동안 유지하며, 이후 삭제된다.
  2. Partition (파티션)
    • 토픽을 물리적 단위로 분할하여 분산 처리를 가능하게 한다.
    • 각 파티션은 순차적인 데이터 구조를 가지며, 메시지는 오프셋으로 식별된다.
    • Kafka의 병렬 처리 성능은 파티션 개수에 따라 결정된다.
    • 리더와 팔로워:
      • 각 파티션에는 **리더(Leader)**와 **팔로워(Follower)**가 존재하며, 리더는 모든 읽기/쓰기 요청을 처리한다.
      • 팔로워는 데이터를 리더에서 복제하여 장애 발생 시 리더로 승격된다.
  3. Producer (프로듀서)
    • 데이터 생산자로, 데이터를 특정 토픽으로 전송한다.
    • 메시지를 어느 파티션에 보낼지는 파티셔닝 전략에 따라 결정된다:
      • Round Robin: 순서대로 파티션에 분배.
      • Key-based Partitioning: 특정 키를 기준으로 해시를 사용해 파티션 결정.
    •  특징:
      • 데이터 전송 시 Ack(확인) 옵션을 통해 안정성을 설정할 수 있다
        • acks=0: 성공 여부 확인 없이 전송.
        • acks=1: 리더가 수신 확인.
        • acks=all: 모든 복제본이 수신 확인.
  4. Consumer (컨슈머)
    • 데이터 소비자로, 특정 토픽을 구독하여 메시지를 읽는다.
    • 컨슈머는 컨슈머 그룹에 속할 수 있으며, 그룹 내에서 메시지를 파티션 단위로 분배한다.
    • 특징:
      • 오프셋 기반 메시지 처리: 컨슈머는 읽은 메시지의 오프셋을 관리한다.
      • 컨슈머 그룹
        • 그룹 내 각 컨슈머는 특정 파티션을 독점적으로 처리한다.
        • 여러 컨슈머 그룹은 동일한 토픽을 독립적으로 구독할 수 있다.
  5. Broker (브로커)
    • Kafka 서버 인스턴스로, 메시지를 저장하고 관리한다.
    • 클러스터의 각 브로커는 특정 토픽의 파티션을 관리하며, 파티션의 리더 또는 팔로워 역할을 수행한다.
    • 특징:
      • 브로커는 분산 환경에서 메시지를 효율적으로 관리한다.
      • 각 브로커에는 유니크 ID가 있으며, 클러스터 내에서 역할을 나눈다.
  6. ZooKeeper 또는 KRaft
    • Kafka는 클러스터 메타데이터를 관리하기 위해 ZooKeeper 또는 새로운 KRaft(Kafka Raft Protocol)를 사용한다.
    • ZooKeeper의 역할
      • 브로커의 등록 및 상태 관리.
      • 파티션의 리더 선출.
      • 컨슈머 오프셋 정보 관리.
    •  KRaft의 특징 (Kafka 2.8 이상부터 도입)
      • ZooKeeper 의존성을 제거하고 Kafka 자체에서 클러스터 메타데이터를 관리.
      • 관리 복잡도를 줄이고 성능을 개선.

 

동작 원리

  1. Producer
    • 메시지를 Kafka의 특정 토픽에 전송.
    • 메시지는 파티션에 할당되며, 파티션 내에서 오프셋으로 정렬된다.
  2. Broker
    • Kafka 클러스터의 노드로, 메시지를 저장하고 관리한다.
    • 메시지는 디스크에 내구적으로 저장되며, 복제를 통해 안정성을 유지한다.
  3. Consumer
    • 소비자는 토픽을 구독하고, 메시지를 오프셋 기반으로 읽어온다.
    • 소비자 그룹을 구성하면 메시지 병렬 처리가 가능하다.
  4. Zookeeper
    • 클러스터 메타데이터와 상태를 관리한다.
    • Kafka는 Zookeeper를 통해 브로커를 등록하고 파티션 리더를 결정한다.

 

Kafka와 RabbitMQ의 차이점

 

 

Kafka와 Zookeeper

Kafka는 Zookeeper를 통해 클러스터 메타데이터를 관리한다.
Zookeeper는 다음과 같은 역할을 한다

  • 브로커 등록 및 추적.
  • 파티션 리더 선출.
  • 소비자 그룹 오프셋 관리 (Kafka 버전에 따라 KRaft로 대체 가능).

Docker 사용 시 이유:

  • Kafka는 클러스터 관리에 Zookeeper와의 통합을 필요로 한다.
  • Docker 환경에서는 Zookeeper 컨테이너를 추가로 실행하여 Kafka와 함께 클러스터를 구성한다.

 

Kafka 사용 실습

1. Kafka 설치

docker-compose.yml

version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    platform: linux/amd64
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: wurstmeister/kafka:latest
    platform: linux/amd64
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    platform: linux/amd64
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_READONLY: "false"
  • 인텔리제이 루트 폴더에 docker-compose.yml 파일 생성
docker compose up -d
  • 터미널에서 위 명령어로 도커 컴포즈 실행

  • localhost:8080를 입력해 Kafka UI 접속

 

2. Producer Application

build.gradle

implementation 'org.springframework.kafka:spring-kafka'
  • spring web, kafka, lombok 의존성 추가 후 프로젝트 빌드

application.yml

spring:
  application:
    name: producer
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
server:
  port: 8090
  • 서버 포트와 kafka producer 설정

KafkaConfig

@Configuration
public class ProducerApplicationKafkaConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • Kafka 프로듀서를 생성하기 위해 ProducerFactory 정의
    • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG: Kafka 브로커의 주소를 지정
    • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG: 메시지 키를 직렬화할 클래스 지정
    • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG: 메시지 값을 직렬화할 클래스 지정
    • DefaultKafkaProducerFactory: configProps를 기반으로 Kafka 프로듀서를 생성

ProducerController

@RestController
@RequiredArgsConstructor
public class ProducerController {

    private final ProducerService producerService;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("topic") String topic,
                              @RequestParam("key") String key,
                              @RequestParam("message") String message) {

        producerService.sendMessage(topic, key, message);
        return "Message sent to Kafka topic";
    }
}
  • Producer 서비스를 호출해 특정 Kafka Topic에 메시지를 전송하는 API 작성

ProducerService

@Service
@RequiredArgsConstructor
public class ProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String key, String message) {
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send(topic, key, message + " " + i);
        }
    }
}
  • Kafka Producer를 이용해 특정 topic에 메시지를 전송하는 서비스 클래스
    • KafkaTemplate을 사용해 topic으로 메시지 전송

3. Consumer Application

build.gradle - producer와 동일하게 적용

 

application.yml

spring:
  application:
    name: consumer
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
server:
  port: 8091
  • 서버 포트와 kafka consumer 설정

KafkaConfig

// 이 클래스는 Kafka 컨슈머 설정을 위한 Spring 설정 클래스입니다.
@EnableKafka // Kafka 리스너를 활성화하는 어노테이션입니다.
@Configuration // Spring 설정 클래스로 선언하는 어노테이션입니다.
public class ConsumerApplicationKafkaConfig {

    // Kafka 컨슈머 팩토리를 생성하는 빈을 정의합니다.
    // ConsumerFactory는 Kafka 컨슈머 인스턴스를 생성하는 데 사용됩니다.
    // 각 컨슈머는 이 팩토리를 통해 생성된 설정을 기반으로 작동합니다.
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        // 컨슈머 팩토리 설정을 위한 맵을 생성합니다.
        Map<String, Object> configProps = new HashMap<>();
        // Kafka 브로커의 주소를 설정합니다.
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 메시지 키의 디시리얼라이저 클래스를 설정합니다.
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 메시지 값의 디시리얼라이저 클래스를 설정합니다.
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 설정된 프로퍼티로 DefaultKafkaConsumerFactory를 생성하여 반환합니다.
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    // Kafka 리스너 컨테이너 팩토리를 생성하는 빈을 정의합니다.
    // ConcurrentKafkaListenerContainerFactory는 Kafka 메시지를 비동기적으로 수신하는 리스너 컨테이너를 생성하는 데 사용됩니다.
    // 이 팩토리는 @KafkaListener 어노테이션이 붙은 메서드들을 실행할 컨테이너를 제공합니다.
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        // ConcurrentKafkaListenerContainerFactory를 생성합니다.
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 컨슈머 팩토리를 리스너 컨테이너 팩토리에 설정합니다.
        factory.setConsumerFactory(consumerFactory());
        // 설정된 리스너 컨테이너 팩토리를 반환합니다.
        return factory;
    }
}

 

ConsumerEndpoint

@Slf4j
@Component
public class ConsumerEndpoint {

    @KafkaListener(groupId = "group_a", topics = "topic1")
    public void consumeFromGroupA(String message) {
        log.info("Group A consumed message from topic1 : " + message);
    }

    @KafkaListener(groupId = "group_b", topics = "topic1")
    public void consumeFromGroupB(String message) {
        log.info("Group B consumed message from topic1 : " + message);
    }

    @KafkaListener(groupId = "group_c", topics = "topic2")
    public void consumeFromGroupC(String message) {
        log.info("Group C consumed message from topic2 : " + message);
    }

    @KafkaListener(groupId = "group_c", topics = "topic3")
    public void consumeFromGroupD(String message) {
        log.info("Group C consumed message from topic3 : " + message);
    }

    @KafkaListener(groupId = "group_d", topics = "topic4")
    public void consumeFromGroupE(String message) {
        log.info("Group D consumed message from topic4 : " + message);
    }
}
  • Kafka Producer로 전송한 메시지를 수신하기 위한 Consumer Endpoint 설정

 

4. Application 실행 및 Kafka UI 확인

 

5. Producer API 호출 및 Consumer 확인

  • PostMan을 사용해 요청 전송

  • Test-Topic이 연결된 Consumer가 정상적으로 메시지를 수신한 것을 확인할 수 있다.

Topic이 2개의 Consumer Group과 연결된 경우

  • 2개의 그룹으로 각각 10개의 메시지가 수신된 것을 확인할 수 있다.

Consumer Group이 같지만 Topic이 다른 경우

  • 각각의 Topic에 대한 메시지를 Consumer Group C가 수신하는 것을 확인할 수 있다.

 

정리

  • Kafka를 사용해 메시지를 송수신하는 방법에 대해 배웠다.
  • RabbitMQ와 비슷한 점이 많은 것 같다, 상황에 따라 적절한 메시지 브로커를 선택해 사용하면 좋을 거 같다.

'자바 심화 > TIL' 카테고리의 다른 글

DDD(Domain-Driven Design)  (1) 2024.12.09
SAGA 패턴  (1) 2024.12.07
Rabbit MQ  (1) 2024.12.05
대규모 스트림 처리  (2) 2024.12.04
Redis - Cache  (0) 2024.12.02