개요
MSA 통신을 위해 사용되는 메시지 브로커인 Kafka, RabbitMQ 중 Kafka에 대해 배워볼 것이다.
Kafka(Apache Kafka)
분산 스트리밍 플랫폼, 주로 실시간 데이터 스트리밍 처리 및 메시지 브로커 역할.
대규모의 데이터 스트림을 처리하고 저장하며, 다양한 소비자가 데이터를 실시간으로 구독할 수 있도록 설계.
주요 특징 및 기능
- 분산 시스템
- Kafka는 여러 노드에 걸쳐 데이터와 작업을 분산하여 높은 처리량과 내구성을 제공한다.
- 내구성 및 확장성
- 데이터는 디스크에 저장되며, 복제를 통해 장애가 발생해도 데이터 유실을 방지한다.
- 수평 확장이 가능하여 데이터와 트래픽 증가에 대응할 수 있다.
- 실시간 처리
- 데이터를 실시간으로 생산(Producer)하고 소비(Consumer)할 수 있어 빠른 응답이 필요한 애플리케이션에 적합하다.
- Publish-Subscribe 모델
- Kafka는 Pub-Sub 패턴을 통해 생산자가 데이터를 주제(Topic)에 게시하고, 소비자가 이를 구독한다.
- 높은 처리량
- 배치 및 스트림 처리 모두 지원하며, 대용량 데이터를 효율적으로 처리할 수 있다.
- 내장 파티셔닝
- 토픽은 파티션으로 나뉘며, 데이터는 파티션 단위로 분배됩니다. 이를 통해 병렬 처리가 가능해진다.
장단점
- 장점
- 고성능: 높은 처리량과 낮은 지연 시간.
- 내구성: 데이터를 디스크에 저장하며 복제를 지원.
- 확장성: 노드를 추가하여 쉽게 확장 가능.
- 유연성: 실시간 스트리밍 외에도 배치 작업 지원.
- 단점
- 복잡성: 설정 및 관리가 어렵고, 학습 곡선이 가파름.
- 데이터 순서: 파티션 수준에서만 메시지 순서 보장.
- 추가적인 구성 필요: Zookeeper 또는 Kafka Raft Protocol(KRaft) 필요.
주요 구성 요소
- Topic (토픽)
- 데이터 분류 단위로, 프로듀서가 데이터를 전송하고 컨슈머가 데이터를 구독하는 논리적 채널.
- 하나의 토픽은 여러 파티션으로 나뉘며, 메시지는 특정 파티션에 저장된다.
- 메시지는 순차적 오프셋으로 저장되며, 컨슈머는 이 오프셋을 기반으로 데이터를 읽는다.
- 특징:
- 토픽은 읽기 전용: 데이터는 덮어쓰지 않고 새로운 데이터가 추가된다.
- 데이터를 보존 기간(Retention Period) 동안 유지하며, 이후 삭제된다.
- Partition (파티션)
- 토픽을 물리적 단위로 분할하여 분산 처리를 가능하게 한다.
- 각 파티션은 순차적인 데이터 구조를 가지며, 메시지는 오프셋으로 식별된다.
- Kafka의 병렬 처리 성능은 파티션 개수에 따라 결정된다.
- 리더와 팔로워:
- 각 파티션에는 **리더(Leader)**와 **팔로워(Follower)**가 존재하며, 리더는 모든 읽기/쓰기 요청을 처리한다.
- 팔로워는 데이터를 리더에서 복제하여 장애 발생 시 리더로 승격된다.
- Producer (프로듀서)
- 데이터 생산자로, 데이터를 특정 토픽으로 전송한다.
- 메시지를 어느 파티션에 보낼지는 파티셔닝 전략에 따라 결정된다:
- Round Robin: 순서대로 파티션에 분배.
- Key-based Partitioning: 특정 키를 기준으로 해시를 사용해 파티션 결정.
- 특징:
- 데이터 전송 시 Ack(확인) 옵션을 통해 안정성을 설정할 수 있다
- acks=0: 성공 여부 확인 없이 전송.
- acks=1: 리더가 수신 확인.
- acks=all: 모든 복제본이 수신 확인.
- 데이터 전송 시 Ack(확인) 옵션을 통해 안정성을 설정할 수 있다
- Consumer (컨슈머)
- 데이터 소비자로, 특정 토픽을 구독하여 메시지를 읽는다.
- 컨슈머는 컨슈머 그룹에 속할 수 있으며, 그룹 내에서 메시지를 파티션 단위로 분배한다.
- 특징:
- 오프셋 기반 메시지 처리: 컨슈머는 읽은 메시지의 오프셋을 관리한다.
- 컨슈머 그룹
- 그룹 내 각 컨슈머는 특정 파티션을 독점적으로 처리한다.
- 여러 컨슈머 그룹은 동일한 토픽을 독립적으로 구독할 수 있다.
- Broker (브로커)
- Kafka 서버 인스턴스로, 메시지를 저장하고 관리한다.
- 클러스터의 각 브로커는 특정 토픽의 파티션을 관리하며, 파티션의 리더 또는 팔로워 역할을 수행한다.
- 특징:
- 브로커는 분산 환경에서 메시지를 효율적으로 관리한다.
- 각 브로커에는 유니크 ID가 있으며, 클러스터 내에서 역할을 나눈다.
- ZooKeeper 또는 KRaft
- Kafka는 클러스터 메타데이터를 관리하기 위해 ZooKeeper 또는 새로운 KRaft(Kafka Raft Protocol)를 사용한다.
- ZooKeeper의 역할
- 브로커의 등록 및 상태 관리.
- 파티션의 리더 선출.
- 컨슈머 오프셋 정보 관리.
- KRaft의 특징 (Kafka 2.8 이상부터 도입)
- ZooKeeper 의존성을 제거하고 Kafka 자체에서 클러스터 메타데이터를 관리.
- 관리 복잡도를 줄이고 성능을 개선.
동작 원리
- Producer
- 메시지를 Kafka의 특정 토픽에 전송.
- 메시지는 파티션에 할당되며, 파티션 내에서 오프셋으로 정렬된다.
- Broker
- Kafka 클러스터의 노드로, 메시지를 저장하고 관리한다.
- 메시지는 디스크에 내구적으로 저장되며, 복제를 통해 안정성을 유지한다.
- Consumer
- 소비자는 토픽을 구독하고, 메시지를 오프셋 기반으로 읽어온다.
- 소비자 그룹을 구성하면 메시지 병렬 처리가 가능하다.
- Zookeeper
- 클러스터 메타데이터와 상태를 관리한다.
- Kafka는 Zookeeper를 통해 브로커를 등록하고 파티션 리더를 결정한다.
Kafka와 RabbitMQ의 차이점
Kafka와 Zookeeper
Kafka는 Zookeeper를 통해 클러스터 메타데이터를 관리한다.
Zookeeper는 다음과 같은 역할을 한다
- 브로커 등록 및 추적.
- 파티션 리더 선출.
- 소비자 그룹 오프셋 관리 (Kafka 버전에 따라 KRaft로 대체 가능).
Docker 사용 시 이유:
- Kafka는 클러스터 관리에 Zookeeper와의 통합을 필요로 한다.
- Docker 환경에서는 Zookeeper 컨테이너를 추가로 실행하여 Kafka와 함께 클러스터를 구성한다.
Kafka 사용 실습
1. Kafka 설치
docker-compose.yml
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
platform: linux/amd64
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: wurstmeister/kafka:latest
platform: linux/amd64
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
kafka-ui:
image: provectuslabs/kafka-ui:latest
platform: linux/amd64
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_READONLY: "false"
- 인텔리제이 루트 폴더에 docker-compose.yml 파일 생성
docker compose up -d
- 터미널에서 위 명령어로 도커 컴포즈 실행
- localhost:8080를 입력해 Kafka UI 접속
2. Producer Application
build.gradle
implementation 'org.springframework.kafka:spring-kafka'
- spring web, kafka, lombok 의존성 추가 후 프로젝트 빌드
application.yml
spring:
application:
name: producer
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
server:
port: 8090
- 서버 포트와 kafka producer 설정
KafkaConfig
@Configuration
public class ProducerApplicationKafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- Kafka 프로듀서를 생성하기 위해 ProducerFactory 정의
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG: Kafka 브로커의 주소를 지정
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG: 메시지 키를 직렬화할 클래스 지정
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG: 메시지 값을 직렬화할 클래스 지정
- DefaultKafkaProducerFactory: configProps를 기반으로 Kafka 프로듀서를 생성
ProducerController
@RestController
@RequiredArgsConstructor
public class ProducerController {
private final ProducerService producerService;
@GetMapping("/send")
public String sendMessage(@RequestParam("topic") String topic,
@RequestParam("key") String key,
@RequestParam("message") String message) {
producerService.sendMessage(topic, key, message);
return "Message sent to Kafka topic";
}
}
- Producer 서비스를 호출해 특정 Kafka Topic에 메시지를 전송하는 API 작성
ProducerService
@Service
@RequiredArgsConstructor
public class ProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String message) {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(topic, key, message + " " + i);
}
}
}
- Kafka Producer를 이용해 특정 topic에 메시지를 전송하는 서비스 클래스
- KafkaTemplate을 사용해 topic으로 메시지 전송
3. Consumer Application
build.gradle - producer와 동일하게 적용
application.yml
spring:
application:
name: consumer
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
server:
port: 8091
- 서버 포트와 kafka consumer 설정
KafkaConfig
// 이 클래스는 Kafka 컨슈머 설정을 위한 Spring 설정 클래스입니다.
@EnableKafka // Kafka 리스너를 활성화하는 어노테이션입니다.
@Configuration // Spring 설정 클래스로 선언하는 어노테이션입니다.
public class ConsumerApplicationKafkaConfig {
// Kafka 컨슈머 팩토리를 생성하는 빈을 정의합니다.
// ConsumerFactory는 Kafka 컨슈머 인스턴스를 생성하는 데 사용됩니다.
// 각 컨슈머는 이 팩토리를 통해 생성된 설정을 기반으로 작동합니다.
@Bean
public ConsumerFactory<String, String> consumerFactory() {
// 컨슈머 팩토리 설정을 위한 맵을 생성합니다.
Map<String, Object> configProps = new HashMap<>();
// Kafka 브로커의 주소를 설정합니다.
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 메시지 키의 디시리얼라이저 클래스를 설정합니다.
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메시지 값의 디시리얼라이저 클래스를 설정합니다.
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 설정된 프로퍼티로 DefaultKafkaConsumerFactory를 생성하여 반환합니다.
return new DefaultKafkaConsumerFactory<>(configProps);
}
// Kafka 리스너 컨테이너 팩토리를 생성하는 빈을 정의합니다.
// ConcurrentKafkaListenerContainerFactory는 Kafka 메시지를 비동기적으로 수신하는 리스너 컨테이너를 생성하는 데 사용됩니다.
// 이 팩토리는 @KafkaListener 어노테이션이 붙은 메서드들을 실행할 컨테이너를 제공합니다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory를 생성합니다.
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 컨슈머 팩토리를 리스너 컨테이너 팩토리에 설정합니다.
factory.setConsumerFactory(consumerFactory());
// 설정된 리스너 컨테이너 팩토리를 반환합니다.
return factory;
}
}
ConsumerEndpoint
@Slf4j
@Component
public class ConsumerEndpoint {
@KafkaListener(groupId = "group_a", topics = "topic1")
public void consumeFromGroupA(String message) {
log.info("Group A consumed message from topic1 : " + message);
}
@KafkaListener(groupId = "group_b", topics = "topic1")
public void consumeFromGroupB(String message) {
log.info("Group B consumed message from topic1 : " + message);
}
@KafkaListener(groupId = "group_c", topics = "topic2")
public void consumeFromGroupC(String message) {
log.info("Group C consumed message from topic2 : " + message);
}
@KafkaListener(groupId = "group_c", topics = "topic3")
public void consumeFromGroupD(String message) {
log.info("Group C consumed message from topic3 : " + message);
}
@KafkaListener(groupId = "group_d", topics = "topic4")
public void consumeFromGroupE(String message) {
log.info("Group D consumed message from topic4 : " + message);
}
}
- Kafka Producer로 전송한 메시지를 수신하기 위한 Consumer Endpoint 설정
4. Application 실행 및 Kafka UI 확인
5. Producer API 호출 및 Consumer 확인
- PostMan을 사용해 요청 전송
- Test-Topic이 연결된 Consumer가 정상적으로 메시지를 수신한 것을 확인할 수 있다.
Topic이 2개의 Consumer Group과 연결된 경우
- 2개의 그룹으로 각각 10개의 메시지가 수신된 것을 확인할 수 있다.
Consumer Group이 같지만 Topic이 다른 경우
- 각각의 Topic에 대한 메시지를 Consumer Group C가 수신하는 것을 확인할 수 있다.
정리
- Kafka를 사용해 메시지를 송수신하는 방법에 대해 배웠다.
- RabbitMQ와 비슷한 점이 많은 것 같다, 상황에 따라 적절한 메시지 브로커를 선택해 사용하면 좋을 거 같다.
'자바 심화 > TIL' 카테고리의 다른 글
DDD(Domain-Driven Design) (1) | 2024.12.09 |
---|---|
SAGA 패턴 (1) | 2024.12.07 |
Rabbit MQ (1) | 2024.12.05 |
대규모 스트림 처리 (2) | 2024.12.04 |
Redis - Cache (0) | 2024.12.02 |