본문 바로가기

자바 심화/TIL

Rabbit MQ

개요

MSA 통신을 위해 사용되는 메시지 브로커인 Kafka, RabbitMQ 중 RabbitMQ에 대해 배워볼 것이다.

 

RabbitMQ

오픈 소스 메시지 브로커 소프트웨어로 애플리케이션이나 시스템 간에 메시지를 교환하고 처리할 수 있도록 설계된  AMQP(Advanced Message Queuing Protocol) 기반의 도구.

 

주요 특징

  1. 메시지 큐 기능:
    • 생산자(Producer)가 보낸 메시지를 큐에 저장하고, 소비자(Consumer)가 이를 가져가서 처리한다.
    • 메시지는 대기열(FIFO 방식)로 관리된다.
  2. 확장성과 분산 처리:
    • 여러 소비자가 메시지를 동시에 처리하도록 분산 환경을 지원하며, 대규모 트래픽에도 대응할 수 있다.
  3. 유연한 라우팅:
    • 메시지를 큐로 전달하는 방식을 다양한 패턴(Direct, Fanout, Topic, Header)으로 설정할 수 있다.
  4. 내구성:
    • 메시지를 디스크에 저장하여 장애 발생 시에도 데이터를 보존할 수 있다.
  5. 멀티 프로토콜 지원:
    • 기본적으로 AMQP를 사용하지만 MQTT, STOMP 등 다양한 프로토콜을 지원한다.
  6. 관리 UI 제공:
    • RabbitMQ 관리 플러그인을 통해 메시지 흐름, 큐 상태 등을 직관적으로 모니터링하고 제어할 수 있다.

사용 사례

  • 비동기 작업 처리: 예를 들어 이메일 전송, 데이터 처리 등의 작업을 백그라운드에서 처리.
  • 마이크로서비스 통신: 서비스 간 비동기 메시지 교환을 통해 독립성과 확장성 확보.
  • 실시간 데이터 스트리밍: 로그 수집, 알림 시스템 등에 활용.

 

 

AMQP 주요 개념

  1. 프로듀서(Producer)
    • 메시지를 생성하고 브로커에 전송하는 역할.
  2. 익스체인지(Exchange)
    • 프로듀서로부터 받은 메시지를 적절한 큐로 라우팅하는 역할. 다양한 유형(Direct, Fanout, Topic, Header)이 존재.
  3. 큐(Queue)
    • 메시지가 저장되는 버퍼로, 소비자가 메시지를 가져가는 대기열.
  4. 소비자(Consumer)
    • 큐에서 메시지를 가져와 처리하는 역할.
  5. 바인딩(Binding)
    • 익스체인지와 큐를 연결하는 라우팅 규칙. 특정 조건에 따라 메시지가 큐에 전달됩니다.
  6. 라우팅 키(Routing Key)
    • 프로듀서가 메시지 전송 시 지정하는 키로, 익스체인지가 메시지를 적절한 큐로 라우팅할 때 사용.
  7. 메시지 확인(Acknowledgement)
    • 소비자가 메시지를 성공적으로 처리했음을 RabbitMQ에 알리는 메커니즘.
  8. 채널(Channel)
    • 하나의 연결(Connection) 내에서 여러 메시지를 송수신할 수 있도록 하는 가상 연결.

 

 

RabbitMQ 동작 원리 요약

  1. 생산자(Producer): 메시지를 생성하여 RabbitMQ로 전송.
  2. 익스체인지(Exchange): 메시지를 큐로 라우팅.
  3. 큐(Queue): 메시지가 저장되는 버퍼.
  4. 소비자(Consumer): 큐에서 메시지를 가져가 처리.

 

장점

  1. 다양한 메시징 패턴 지원:
    • Direct, Fanout, Topic, Header 등 다양한 메시지 라우팅 방식을 제공하여 유연한 메시지 전달 구조를 설계할 수 있다.
  2. AMQP 표준 준수:
    • 표준 프로토콜(AMQP)을 기반으로 설계되어 여러 언어와 플랫폼에서 쉽게 통합 및 사용 가능하다.
  3. 확장성:
    • 클러스터링 및 메시지 브로커 노드 추가를 통해 확장이 용이하며, 대규모 트래픽 처리에 적합하다.
  4. 내구성과 신뢰성:
    • 메시지를 디스크에 저장하거나, ACK재전송 메커니즘을 통해 데이터 손실을 방지한다.
  5. 멀티 프로토콜 지원:
    • 기본적으로 AMQP 외에도 MQTT, STOMP 등 다양한 프로토콜을 지원하여 유연성을 제공한다.
  6. 관리 도구 제공:
    • RabbitMQ Management Plugin을 통해 메시지 큐 상태, 트래픽, 연결 등을 시각적으로 모니터링하고 관리할 수 있다.
  7. 플러그인 시스템:
    • 다양한 플러그인을 통해 기능 확장이 가능하며, 필요에 따라 사용자 정의 플러그인 개발도 가능하다.
  8. 커뮤니티 및 문서화:
    • 활발한 커뮤니티와 풍부한 문서가 제공되어 문제 해결이 용이하다.

단점

  1. 성능 한계 (고성능 스트리밍에는 부적합):
    • 고속 스트리밍 데이터 처리나 매우 높은 처리량(Throughput)이 요구되는 경우, Kafka와 같은 로그 중심 메시징 시스템에 비해 성능이 떨어질 수 있다.
  2. 복잡한 설정 및 관리:
    • 클러스터링 및 고가용성(HA) 설정이 비교적 복잡하며, 경험이 부족한 사용자는 초기 설정에 어려움을 겪을 수 있다.
  3. 리소스 소비:
    • 디스크 I/O와 메모리 사용량이 많아 고부하 환경에서 하드웨어 요구 사항이 증가할 수 있다.
  4. 메시지 오버헤드:
    • AMQP 프로토콜의 특성상 메시지 메타데이터 오버헤드가 존재하여, 작은 메시지를 대량으로 처리할 때 비효율적일 수 있다.
  5. 지속성 유지 시 성능 저하:
    • 메시지를 디스크에 저장(내구성 설정)하는 경우, 처리 속도가 크게 저하될 수 있다.
  6. 복잡한 유지보수:
    • 클러스터 구성, 장애 복구, 버전 업그레이드 등의 유지보수 작업이 상대적으로 어렵다.

 

RabbitMQ Exchange 유형

 

  1. Direct Exchange
    • 라우팅 키를 기반으로 메시지를 특정 큐로 전달.
    • 사용 사례: 정확히 일치하는 특정 큐로 메시지를 보내야 할 때.
    • 예) routingKey="order_created"인 메시지가 order_created 큐로 전달.
  2. Fanout Exchange
    • 라우팅 키와 관계없이 연결된 모든 큐에 메시지를 브로드캐스트.
    • 사용 사례: 모든 소비자가 동일한 메시지를 받아야 할 때 (예: 로그 전송, 알림).
    • 예) 익스체인지에 연결된 모든 큐에 메시지가 전달.
  3. Topic Exchange
    • 라우팅 키의 패턴을 기반으로 메시지를 큐로 전달.
    • 사용 사례: 라우팅 키에 와일드카드(*, #)를 사용하여 유연한 메시지 전달.
      • *: 단일 단어 일치
      • #: 여러 단어 일치
    • 예)
      • routingKey="user.*": user.created, user.deleted와 일치.
      • routingKey="order.#": order.created, order.updated, order.detail.viewed와 일치.
  4. Headers Exchange
    • 라우팅 키 대신 메시지 헤더의 속성을 기반으로 큐로 라우팅.
    • 사용 사례: 메시지의 헤더 속성을 기반으로 복잡한 라우팅이 필요할 때.
    • 예) 메시지 헤더에 type=order와 format=json이 포함된 경우에만 특정 큐로 전달.

 

 

RabbitMQ를 사용하기 적합한 상황

  • 비동기 작업 처리: 메시지 전달 속도보다 안정성이 중요한 경우.
  • 마이크로서비스 환경: 서비스 간 메시지 기반 통신을 구축하는 경우.
  • 복잡한 라우팅 요구사항: 다양한 메시지 라우팅 규칙을 적용해야 하는 경우.

RabbitMQ 대신 Kafka를 고려할 상황

  • 고성능 스트리밍: 초당 수백만 건의 메시지를 처리해야 하는 경우.
  • 순차 처리 및 내구성: 로그 중심 시스템 설계가 필요한 경우.

 

RabbitMQ 실습

Spring Cloud 환경에서 RabbitMQ를 사용한 비동기 처리 실습

 

1. Docker를 사용해 RabbitMQ 설치

docker run -d --name rabbitmq -p5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq:management
  • localhost:15672에 접속하면 RabbitMQ 로그인 페이지가 나온다(초기 계정 정보 guest, guest).

RabbitMQ 로그인 페이지
RabbitMQ 대시보드

 

2. Spring Boot 프로젝트 생성

 

2-1. Order Application

// RabbitMQ
implementation 'org.springframework.boot:spring-boot-starter-amqp'
  • Spring for RabbitMQ, Lombok, Spring Web 의존성 주입

application.yml

spring:
  application:
    name: order
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

message:
  exchange: market
  queue:
    product: market.product
    payment: market.payment
  • rabbitmq의 host, port, user, password 설정
  • exchange, queue 이름 설정

ApplicationQueueConfig

@Configuration
public class OrderApplicationQueueConfig {

    @Value("${message.exchange}")
    private String exchange;

    @Value("${message.queue.product}")
    private String queueProduct;

    @Value("${message.queue.payment}")
    private String queuePayment;

    @Bean public TopicExchange exchange() { return new TopicExchange(exchange); }

    @Bean public Queue queueProduct() { return new Queue(queueProduct); }
    @Bean public Queue queuePayment() { return new Queue(queuePayment); }

    @Bean public Binding bindingProduct() { return BindingBuilder.bind(queueProduct()).to(exchange()).with(queueProduct); }
    @Bean public Binding bindingPayment() { return BindingBuilder.bind(queuePayment()).to(exchange()).with(queuePayment); }
}
  • yml의 변수 이름을 불러오고, exchange, queue, binding 설정

Controller

@RestController
@RequiredArgsConstructor
public class OrderController {

    private final OrderService orderService;

    @GetMapping("/order/{id}")
    public String order(@PathVariable("id") String id) {
        orderService.createOrder(id);
        return "Order complete";
    }
}
  • endpoint  설정

Service

@Service
@RequiredArgsConstructor
public class OrderService {

    @Value("${message.queue.product}")
    private String productQueue;

    @Value("${message.queue.payment}")
    private String paymentQueue;

    private final RabbitTemplate rabbitTemplate;

    public void createOrder(String orderId) {
        rabbitTemplate.convertAndSend(productQueue, orderId);
        rabbitTemplate.convertAndSend(paymentQueue, orderId);
    }
}
  • RabbitTemplate을 사용해 메시지 생성

2-2. Order Application를 실행 후 endpoint로 요청을 보낸다.

 

  • RabbitMQ 대시보드에서 market.payment, market.payment 큐에 메시지가 있는 것을 확인할 수 있다.

 

2-3. Payment Application

// RabbitMQ
implementation 'org.springframework.boot:spring-boot-starter-amqp'
  • RabbitMQ, Lombok 의존성 주입(Consumer 테스트만 할 것이므로 Web은 제외)

application.yml

spring:
  application:
    name: payment
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

message:
  queue:
    payment: market.payment

 

EndPoint

@Slf4j
@Component
public class PaymentEndpoint {

    @Value("${spring.application.name}")
    private String appName;

    @RabbitListener(queues = "${message.queue.payment}")
    public void receiveMessage(String orderId) {
        log.info("Received orderId: {}, appName: {}", orderId, appName);

    }
}
  • @RabbitListener를 사용해 market.payment 큐의 메시지 수신 

 

2-4. Payment Application 실행 및 메시지 수신 확인

  • 큐에서 메시지가 사라지고 log에서 메시지 수신 로그를 확인할 수 있다.

 

2-5. Product Application

  • Payment Application과 동일한 dependency 사용

application.yml

spring:
  application:
    name: product
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

message:
  queue:
    product: market.product

 

Endpoint

@Slf4j
@Component
public class ProductEndpoint {

    @Value("${spring.application.name}")
    private String appName;

    @RabbitListener(queues = "${message.queue.product}")
    public void receiveProduct(String orderId) {
        log.info("receive orderId: {}, appName: {}", orderId, appName);
    }
}

 

2-6. 컨슈머가 라운드로빈으로 메시지를 수신하는 것을 확인하기 위해 두 개의 Product 생성

 

2-7. Product Application 2개 실행 후 Order를 통해 메시지 송신

  • 2개의 Consumer가 번갈아 가면서 메시지를 수신하는 것을 확인할 수 있다.

 

정리

  • RabbitMQ를 사용해 메시지를 송신하고 수신하는 방법, RabbitMQ 대시보드를 통해 정보를 확인하는 방법을 배웠다.
  • RabbitMQ를 통해 MSA 통신이나 비동기 처리 등을 해보는 실습도 필요할 것 같다.

'자바 심화 > TIL' 카테고리의 다른 글

SAGA 패턴  (1) 2024.12.07
Kafka - 기초  (0) 2024.12.06
대규모 스트림 처리  (2) 2024.12.04
Redis - Cache  (0) 2024.12.02
Redis - Redis Template  (0) 2024.11.29