본문 바로가기

자바 심화/TIL

SAGA 패턴

개요

MSA 프로젝트에서 데이터의 일관성을 유지하기 위한 방법 중 하나인 SAGA 패턴에 대해 배우고 간단하게 적용해 볼 것이다.

 

SAGA Pattern?

분산 시스템에서 데이터의 일관성을 유지하기 위해 사용되는 설계 패턴으로, 하나의 분산 트랜잭션을 여러 개의 작은 로컬 트랜잭션으로 나누어 처리한다.

이 패턴은 각 서비스가 독립적으로 동작할 수 있도록 설계되었으며, 분산 트랜잭션의 어려움을 해결하기 위한 방법 중 하나.

 

SAGA 패턴 작동 방식

  1. Choreography 방식
    • 각 서비스가 자신의 작업을 완료한 후 다음 작업을 수행할 서비스를 호출하거나 이벤트를 발행한다.
    • 이벤트 기반으로 동작하며, 중앙 집중식 관리가 없다.
    • 서비스들이 서로 느슨하게 결합되어 있어 확장이 쉽다.
    • 단점: 서비스 간의 의존성이 높아질 수 있으며, 복잡도가 증가할 수 있다.
      • 예) 1.Order Service가 주문을 생성하고 이벤트 발행: Order Created
      • 2. Payment Service가 이벤트를 구독하여 결제를 처리하고, Payment Completed 이벤트를 발행
      • 3. Inventory Service가 결제 완료 이벤트를 구독하여 재고를 차감
  2. Orchestration 방식
    • 중앙에서 작업의 흐름을 제어하는 SAGA Coordinator가 존재한다.
    • SAGA Coordinator가 각 서비스에 작업 요청을 전송하며, 성공 또는 실패를 모니터링한다.
    • 중앙 관리가 가능하여 복잡한 비즈니스 로직을 쉽게 관리할 수 있다.
    • 단점: 중앙 Coordinator가 병목 지점이 될 수 있다.
      • 예) 1. SAGA Coordinator가 Order Service에 주문 생성 요청
      • 2. 주문 생성 성공 시, Payment Service에 결제 요청
      • 3. 결제 성공 시, Inventory Service에 재고 차감 요청

보상 트랜잭션

SAGA 패턴에서 한 트랜잭션이 실패할 경우, 이전 단계에서 수행된 작업을 취소하기 위해 보상 트랜잭션(Compensating Transaction)을 실행한다.

  • 예)
    1. 결제가 실패하면 이미 생성된 주문을 취소하는 보상 트랜잭션 실행
    2. 이미 차감된 재고를 복구하는 작업 실행

SAGA 패턴의 장단점

  • 장점
    • 분산 환경에서 데이터 일관성 유지 가능
    • 독립적인 로컬 트랜잭션으로 인해 높은 확장성과 유연성
    • 중앙 집중식 트랜잭션 관리 도구(DB) 없이도 운영 가능
  • 단점
    • 보상 트랜잭션 설계가 복잡할 수 있음
    • 데이터의 강력한 일관성 대신 최종적 일관성을 보장
    • Choreography 방식에서는 서비스 간의 의존성이 높아질 수 있음
    • Orchestration 방식에서는 Coordinator가 병목 지점이 될 가능성

SAGA 패턴 사용 사례

  1. 전자상거래 플랫폼
    • 주문 생성, 결제 처리, 재고 관리의 트랜잭션 분리
  2. 여행 예약 시스템
    • 항공권, 호텔 예약, 렌터카 예약 등 여러 단계의 트랜잭션 처리
  3. 마이크로서비스 아키텍쳐
    • 여러 서비스가 독립적으로 동작하면서 데이터 일관성을 유지해야 하는 환경

Spring Cloud를 활용한 SAGA 구현

  • Choreography 방식:
    • Kafka, RabbitMQ 등의 메시지 브로커를 활용한 이벤트 기반 통신.
  •  Orchestration 방식:
    • Spring State Machine 또는 Camunda와 같은 워크플로우 관리 툴 사용.

 

SAGA 패턴 적용 실습

Spring Cloud 기반 MSA에서 Choreography 방식으로 구현

 

1. Order Application

build.gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    runtimeOnly 'com.mysql:mysql-connector-j'
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
  • RabbitMQ, JPA, MySQL, Web을 dependency 추가

application.yml

spring:
  application:
    name: order
  rabbitmq:
    port: 5672
    username: guest
    password: guest
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/market
    username: username
    password: 'password'
  jpa:
    hibernate:
      ddl-auto: create
    properties:
      hibernate:
        show_sql: 'true'
        format_sql: 'true'
        use_sql_comments: 'true'

message:
  exchange: market
  queue:
    product: market.product
    payment: market.payment
    err:
      order: market.err.order
      product: market.err.product
  err:
    exchange: market.err
  • RabbitMQ, DB 설정

Message DTO

@Data
@Builder
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class DeliveryMessage {

    private UUID orderId;
    private UUID paymentId;

    private String userId;
    private Integer productId;
    private Integer productQuantity;
    private Integer payAmount;

    private String errorType;
}

 

Entity

@Builder
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "orders")
public class Order {
    /* orderStatus, errorType = enum*/
    @Id
    private UUID orderId;
    private String userId;
    private String orderStatus;
    private String errorType;

    public void cancelOrder(String receiveErrorType) {
        orderStatus = "CANCELLED";
        errorType = receiveErrorType;
    }
}
  • order는 MySQL 예약어이기 때문에 테이블 이름을 orders로 설정

ApplicationQueueConfig

@Configuration
public class OrderApplicationQueueConfig {

    @Bean
    public Jackson2JsonMessageConverter producerJackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Value("${message.exchange}")
    private String exchange;

    @Value("${message.queue.product}")
    private String queueProduct;

    @Value("${message.queue.payment}")
    private String queuePayment;


    @Value("${message.err.exchange}")
    private String exchangeErr;

    @Value("${message.queue.err.order}")
    private String queueErrOrder;

    @Value("${message.queue.err.product}")
    private String queueErrProduct;

    @Bean public TopicExchange exchange() { return new TopicExchange(exchange); }
    @Bean public Queue queueProduct() { return new Queue(queueProduct); }
    @Bean public Queue queuePayment() { return new Queue(queuePayment); }
    @Bean public Binding bindingProduct() { return BindingBuilder.bind(queueProduct()).to(exchange()).with(queueProduct); }
    @Bean public Binding bindingPayment() { return BindingBuilder.bind(queuePayment()).to(exchange()).with(queuePayment); }

    @Bean public TopicExchange exchangeErr() { return new TopicExchange(exchangeErr); }
    @Bean public Queue queueErrOrder() { return new Queue(queueErrOrder); }
    @Bean public Queue queueErrProduct() { return new Queue(queueErrProduct); }
    @Bean public Binding bindingErrOrder() { return BindingBuilder.bind(queueErrOrder()).to(exchangeErr()).with(queueErrOrder); }
    @Bean public Binding bindingErrProduct() { return BindingBuilder.bind(queueErrProduct()).to(exchangeErr()).with(queueErrProduct); }
}
  • jsonMessageConverter, exchange, queue, binding을 설정(정상, 에러용)

Repository

public interface OrderRepository extends JpaRepository<Order, UUID> {
    Optional<Order> findByOrderId(UUID orderId);
}

 

Endpoint

@Slf4j
@RestController
@RequiredArgsConstructor
public class OrderEndpoint {

    @RabbitListener(queues = "${message.queue.err.order}")
    public void errOrder(DeliveryMessage deliveryMessage) {
        log.info("Error Receive !!!");
        orderService.rollbackOrder(deliveryMessage);
    }

    private final OrderService orderService;

    @GetMapping("/order/{orderId}")
    public ResponseEntity<Order> getOrder(@PathVariable("orderId") UUID orderId) {
        Order order = orderService.getOrder(orderId);
        return ResponseEntity.ok(order);
    }

    @PostMapping("/order")
    public ResponseEntity<Order> order(@RequestBody OrderRequestDto orderRequestDto) {
        Order order = orderService.createOrder(orderRequestDto);
        return ResponseEntity.ok(order);
    }

    @Data
    public static class OrderRequestDto {
        private String userId;
        private Integer productId;
        private Integer productQuantity;
        private Integer payAmount;

        public Order toOrder() {
            return Order.builder()
                    .orderId(UUID.randomUUID())
                    .userId(userId)
                    .orderStatus("RECEIPT")
                    .build();
        }

        public DeliveryMessage toDeliveryMessage(UUID orderId) {
            return DeliveryMessage.builder()
                    .orderId(orderId)
                    .userId(userId)
                    .productId(productId)
                    .productQuantity(productQuantity)
                    .payAmount(payAmount)
                    .build();
        }
    }
}
  • Order 생성, 조회 Endpoint, DTO, error를 수신할 listener 설정

Service

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {

    @Value("${message.queue.product}")
    private String productQueue;
    // DB 없을 때 테스트용
    // private Map<UUID, Order> orderStore = new HashMap<>();
    private final OrderRepository orderRepository;
    private final RabbitTemplate rabbitTemplate;

    public Order createOrder(OrderEndpoint.OrderRequestDto orderRequestDto) {
        log.info("OrderRequestDto: {}", orderRequestDto);
        Order order = orderRequestDto.toOrder();
        //orderStore.put(order.getOrderId(), order);
        orderRepository.save(order);
        DeliveryMessage deliveryMessage = orderRequestDto.toDeliveryMessage(order.getOrderId());
        log.info("Delivery Message: {}", deliveryMessage);
        rabbitTemplate.convertAndSend(productQueue, deliveryMessage);

        return order;
    }

    public Order getOrder(UUID orderId) {
        // return orderStore.get(orderId);
        return orderRepository.findByOrderId(orderId)
                .orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND));
    }

    public void rollbackOrder(DeliveryMessage deliveryMessage) {
        Order order = orderRepository.findByOrderId(deliveryMessage.getOrderId())
                .orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND));

        order.cancelOrder(deliveryMessage.getErrorType());
        orderRepository.save(order);
    }
}
  • DTO를 받아 Order를 생성해 DB에 저장하고 rabbitTemplate을 사용해 메시지 송신하는 메서드, OrderId를 받아 DB의 Order를 찾아서 반환하는 메서드, 에러 발생 시 rollback 메서드 설정

1-1. Order Application 테스트

 

2. Product Application

build.gradle

dependencies {
    // Jackson 의존성
    implementation 'com.fasterxml.jackson.core:jackson-databind'
    implementation 'com.fasterxml.jackson.core:jackson-core'
    implementation 'com.fasterxml.jackson.core:jackson-annotations'

    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
  • Order에서 Web을 빼고 Jackson 의존성 추가

application.yml

spring:
  application:
    name: product
  rabbitmq:
    port: 5672
    username: guest
    password: guest

message:
  exchange: market
  queue:
    product: market.product
    payment: market.payment
    err:
      order: market.err.order
      product: market.err.product
  err:
    exchange: market.err
  • RabbitMQ 설정

Message DTO

  • Order의 Message DTO와 동일

ApplicationQueueConfig

@Configuration
public class ProductApplicationQueueConfig {
    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
  • JsonMessageConverter 설정

Endpoint

@Slf4j
@RequiredArgsConstructor
@Component
public class ProductEndpoint {

    private final ProductService productService;

    @RabbitListener(queues = "${message.queue.product}")
    public void receiveMessage(DeliveryMessage deliveryMessage) {
        log.info("Product Receive: {}", deliveryMessage.toString());
        productService.reduceProductAmount(deliveryMessage);
    }

    @RabbitListener(queues = "${message.queue.err.product}")
    public void receiveErrorMessage(DeliveryMessage deliveryMessage) {
        log.info("Error Receive !!!");
        productService.rollbackProduct(deliveryMessage);
    }
}
  • Order에서 송신한 메시지를 수신하기 위한 listener 설정, 에러 메시지를 수신하기 위한 listener 설정

Service

@Service
@RequiredArgsConstructor
public class ProductService {

    @Value("${message.queue.payment}")
    private String paymentQueue;

    @Value("${message.queue.err.order}")
    private String errOrderQueue;

    private final RabbitTemplate rabbitTemplate;

    public void reduceProductAmount(DeliveryMessage deliveryMessage) {
        Integer productId = deliveryMessage.getProductId();
        Integer productQuantity = deliveryMessage.getProductQuantity();

        if (productId != 1 || productQuantity > 1) {
            this.rollbackProduct(deliveryMessage);
            return;
        }

        rabbitTemplate.convertAndSend(paymentQueue, deliveryMessage);
    }

    public void rollbackProduct(DeliveryMessage deliveryMessage) {
        log.info("Product Rollback");
        if (!StringUtils.hasText(deliveryMessage.getErrorType())) {
            deliveryMessage.setErrorType("PRODUCT ERROR");
        }
        rabbitTemplate.convertAndSend(errOrderQueue, deliveryMessage);
    }
}
  • Order의 메시지를 받아 Product를 생성하고 Product에 문제가 있을 경우 OrderErrQueue에 에러 메시지를 송신하는 메서드를 호출하고 정상일 경우 Payment에 메시지 송신
  • Product에 문제가 있을 경우 ErrorType을 변경 후 메시지를 송신하는 메서드

2-1. Product Application 테스트

  • product application 실행 시 정상적으로 메시지를 수신한다.

  • Order를 통해 메시지를 보내 product에 대한 잘못된 값을 보내면 rollback 메서드가 실행되어 errorType이 변경된다.

 

3. Payment Application

build.gradle

dependencies {
    // Jackson 의존성
    implementation 'com.fasterxml.jackson.core:jackson-databind'
    implementation 'com.fasterxml.jackson.core:jackson-core'
    implementation 'com.fasterxml.jackson.core:jackson-annotations'

    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    runtimeOnly 'com.mysql:mysql-connector-j'
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
  • order의 dependency에서 jackson 의존성 추가

application.yml

spring:
  application:
    name: payment
  rabbitmq:
    port: 5672
    username: guest
    password: guest
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/market
    username: username
    password: 'password'
  jpa:
    hibernate:
      ddl-auto: create
    properties:
      hibernate:
        show_sql: 'true'
        format_sql: 'true'
        use_sql_comments: 'true'

message:
  exchange: market
  queue:
    product: market.product
    payment: market.payment
    err:
      order: market.err.order
      product: market.err.product
  err:
    exchange: market.err
  • DB, RabbitMQ 설정

Message DTO

  • Order, Product, Payment 동일하게 사용

Entity

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "payment")
public class Payment {

    @Id
    private UUID paymentId;

    private String userId;
    private Integer payAmount;
    private String payStatus;
}

 

ApplicationQueueConfig

  • Product와 동일한 설정 사용, Payment에 맞게 클래스 이름만 수정

Repository

public interface PaymentRepository extends JpaRepository<Payment, UUID> {
}

 

Endpoint

@Slf4j
@Component
@RequiredArgsConstructor
public class PaymentEndpoint {

    private final PaymentService paymentService;

    @RabbitListener(queues = "${message.queue.payment}")
    public void receiveMessage(DeliveryMessage deliveryMessage) {
        log.info("Payment Message: {}", deliveryMessage);
        paymentService.createPayment(deliveryMessage);
    }
}
  • Product에서 송신한 메시지를 수신할 listener 설정

Service

@Slf4j
@Service
@RequiredArgsConstructor
public class PaymentService {

    @Value("${message.queue.err.product}")
    private String productErrorQueue;

    private final PaymentRepository paymentRepository;
    private final RabbitTemplate rabbitTemplate;

    public void createPayment(DeliveryMessage deliveryMessage) {
        Payment payment = Payment.builder()
                .paymentId(UUID.randomUUID())
                .userId(deliveryMessage.getUserId())
                .payAmount(deliveryMessage.getPayAmount())
                .payStatus("SUCCESS")
                .build();

        Integer payAmount = deliveryMessage.getPayAmount();
        if (payAmount >= 10000) {
            log.error("Payment amount exceeds limit : {}", payAmount);
            deliveryMessage.setErrorType("PAYMENT_LIMIT_EXCEEDED");
            this.rollbackPayment(deliveryMessage);
            return;
        }

        paymentRepository.save(payment);
    }

    public void rollbackPayment(DeliveryMessage deliveryMessage) {
        log.info("Payment Rollback !!!");
        rabbitTemplate.convertAndSend(productErrorQueue, deliveryMessage);
    }
}
  • 메시지를 수신해 Payment Entity 생성 payAmount 값이 10000보다 높으면 rollback 메서드를 호출해 productErrorQueue로 메시지 송신

3-1. 실행 테스트

  • Product의 메시지를 수신하고 잘못된 값이 들어온 경우 rollback 메서드를 통해 에러 메시지를 송신하는 것을 확인

 

4. SAGA 패턴 테스트

application을 모두 실행 후 정상 실행과 에러 상태를 모두 확인

  • PostMan 정상 요청과 product, payment에 대한 에러 상황을 발생시키도록 요청을 보내고 DB를 확인

  • Product, Payment에 대한 에러 타입과 주문 상태가 취소로 변경

  • Payment의 경우에도 정상 상태일 때만 저장되는 것을 확인할 수 있다.

 

정리

  • RabbitMQ를 사용해 SAGA 패턴 중 Choreography 방식을 간단하게 구현해 보았다.
  • 간단한 SAGA 패턴이지만 정상적인 경우에 대한 처리를 먼저 개발하고 에러 상황이 발생했을 때에 대한 부분을 개발하는 식으로 단계를 나눠서 진행이 필요했다.
  • MSA에서 데이터 일관성을 유지하기 위한 방법 중 다른 방식에 대한 것도 배워서 적용해보고 상황에 맞는 방법을 사용할 수 있도록 해야 될 것 같다.

'자바 심화 > TIL' 카테고리의 다른 글

프로젝트 문제 해결(역직렬화, git 에러)  (0) 2024.12.12
DDD(Domain-Driven Design)  (1) 2024.12.09
Kafka - 기초  (0) 2024.12.06
Rabbit MQ  (1) 2024.12.05
대규모 스트림 처리  (2) 2024.12.04