개요
MSA 프로젝트에서 데이터의 일관성을 유지하기 위한 방법 중 하나인 SAGA 패턴에 대해 배우고 간단하게 적용해 볼 것이다.
SAGA Pattern?
분산 시스템에서 데이터의 일관성을 유지하기 위해 사용되는 설계 패턴으로, 하나의 분산 트랜잭션을 여러 개의 작은 로컬 트랜잭션으로 나누어 처리한다.
이 패턴은 각 서비스가 독립적으로 동작할 수 있도록 설계되었으며, 분산 트랜잭션의 어려움을 해결하기 위한 방법 중 하나.
SAGA 패턴 작동 방식
- Choreography 방식
- 각 서비스가 자신의 작업을 완료한 후 다음 작업을 수행할 서비스를 호출하거나 이벤트를 발행한다.
- 이벤트 기반으로 동작하며, 중앙 집중식 관리가 없다.
- 서비스들이 서로 느슨하게 결합되어 있어 확장이 쉽다.
- 단점: 서비스 간의 의존성이 높아질 수 있으며, 복잡도가 증가할 수 있다.
- 예) 1.Order Service가 주문을 생성하고 이벤트 발행: Order Created
- 2. Payment Service가 이벤트를 구독하여 결제를 처리하고, Payment Completed 이벤트를 발행
- 3. Inventory Service가 결제 완료 이벤트를 구독하여 재고를 차감
- Orchestration 방식
- 중앙에서 작업의 흐름을 제어하는 SAGA Coordinator가 존재한다.
- SAGA Coordinator가 각 서비스에 작업 요청을 전송하며, 성공 또는 실패를 모니터링한다.
- 중앙 관리가 가능하여 복잡한 비즈니스 로직을 쉽게 관리할 수 있다.
- 단점: 중앙 Coordinator가 병목 지점이 될 수 있다.
- 예) 1. SAGA Coordinator가 Order Service에 주문 생성 요청
- 2. 주문 생성 성공 시, Payment Service에 결제 요청
- 3. 결제 성공 시, Inventory Service에 재고 차감 요청
보상 트랜잭션
SAGA 패턴에서 한 트랜잭션이 실패할 경우, 이전 단계에서 수행된 작업을 취소하기 위해 보상 트랜잭션(Compensating Transaction)을 실행한다.
- 예)
- 결제가 실패하면 이미 생성된 주문을 취소하는 보상 트랜잭션 실행
- 이미 차감된 재고를 복구하는 작업 실행
SAGA 패턴의 장단점
- 장점
- 분산 환경에서 데이터 일관성 유지 가능
- 독립적인 로컬 트랜잭션으로 인해 높은 확장성과 유연성
- 중앙 집중식 트랜잭션 관리 도구(DB) 없이도 운영 가능
- 단점
- 보상 트랜잭션 설계가 복잡할 수 있음
- 데이터의 강력한 일관성 대신 최종적 일관성을 보장
- Choreography 방식에서는 서비스 간의 의존성이 높아질 수 있음
- Orchestration 방식에서는 Coordinator가 병목 지점이 될 가능성
SAGA 패턴 사용 사례
- 전자상거래 플랫폼
- 주문 생성, 결제 처리, 재고 관리의 트랜잭션 분리
- 여행 예약 시스템
- 항공권, 호텔 예약, 렌터카 예약 등 여러 단계의 트랜잭션 처리
- 마이크로서비스 아키텍쳐
- 여러 서비스가 독립적으로 동작하면서 데이터 일관성을 유지해야 하는 환경
Spring Cloud를 활용한 SAGA 구현
- Choreography 방식:
- Kafka, RabbitMQ 등의 메시지 브로커를 활용한 이벤트 기반 통신.
- Orchestration 방식:
- Spring State Machine 또는 Camunda와 같은 워크플로우 관리 툴 사용.
SAGA 패턴 적용 실습
Spring Cloud 기반 MSA에서 Choreography 방식으로 구현
1. Order Application
build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
runtimeOnly 'com.mysql:mysql-connector-j'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
- RabbitMQ, JPA, MySQL, Web을 dependency 추가
application.yml
spring:
application:
name: order
rabbitmq:
port: 5672
username: guest
password: guest
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/market
username: username
password: 'password'
jpa:
hibernate:
ddl-auto: create
properties:
hibernate:
show_sql: 'true'
format_sql: 'true'
use_sql_comments: 'true'
message:
exchange: market
queue:
product: market.product
payment: market.payment
err:
order: market.err.order
product: market.err.product
err:
exchange: market.err
- RabbitMQ, DB 설정
Message DTO
@Data
@Builder
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class DeliveryMessage {
private UUID orderId;
private UUID paymentId;
private String userId;
private Integer productId;
private Integer productQuantity;
private Integer payAmount;
private String errorType;
}
Entity
@Builder
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "orders")
public class Order {
/* orderStatus, errorType = enum*/
@Id
private UUID orderId;
private String userId;
private String orderStatus;
private String errorType;
public void cancelOrder(String receiveErrorType) {
orderStatus = "CANCELLED";
errorType = receiveErrorType;
}
}
- order는 MySQL 예약어이기 때문에 테이블 이름을 orders로 설정
ApplicationQueueConfig
@Configuration
public class OrderApplicationQueueConfig {
@Bean
public Jackson2JsonMessageConverter producerJackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Value("${message.exchange}")
private String exchange;
@Value("${message.queue.product}")
private String queueProduct;
@Value("${message.queue.payment}")
private String queuePayment;
@Value("${message.err.exchange}")
private String exchangeErr;
@Value("${message.queue.err.order}")
private String queueErrOrder;
@Value("${message.queue.err.product}")
private String queueErrProduct;
@Bean public TopicExchange exchange() { return new TopicExchange(exchange); }
@Bean public Queue queueProduct() { return new Queue(queueProduct); }
@Bean public Queue queuePayment() { return new Queue(queuePayment); }
@Bean public Binding bindingProduct() { return BindingBuilder.bind(queueProduct()).to(exchange()).with(queueProduct); }
@Bean public Binding bindingPayment() { return BindingBuilder.bind(queuePayment()).to(exchange()).with(queuePayment); }
@Bean public TopicExchange exchangeErr() { return new TopicExchange(exchangeErr); }
@Bean public Queue queueErrOrder() { return new Queue(queueErrOrder); }
@Bean public Queue queueErrProduct() { return new Queue(queueErrProduct); }
@Bean public Binding bindingErrOrder() { return BindingBuilder.bind(queueErrOrder()).to(exchangeErr()).with(queueErrOrder); }
@Bean public Binding bindingErrProduct() { return BindingBuilder.bind(queueErrProduct()).to(exchangeErr()).with(queueErrProduct); }
}
- jsonMessageConverter, exchange, queue, binding을 설정(정상, 에러용)
Repository
public interface OrderRepository extends JpaRepository<Order, UUID> {
Optional<Order> findByOrderId(UUID orderId);
}
Endpoint
@Slf4j
@RestController
@RequiredArgsConstructor
public class OrderEndpoint {
@RabbitListener(queues = "${message.queue.err.order}")
public void errOrder(DeliveryMessage deliveryMessage) {
log.info("Error Receive !!!");
orderService.rollbackOrder(deliveryMessage);
}
private final OrderService orderService;
@GetMapping("/order/{orderId}")
public ResponseEntity<Order> getOrder(@PathVariable("orderId") UUID orderId) {
Order order = orderService.getOrder(orderId);
return ResponseEntity.ok(order);
}
@PostMapping("/order")
public ResponseEntity<Order> order(@RequestBody OrderRequestDto orderRequestDto) {
Order order = orderService.createOrder(orderRequestDto);
return ResponseEntity.ok(order);
}
@Data
public static class OrderRequestDto {
private String userId;
private Integer productId;
private Integer productQuantity;
private Integer payAmount;
public Order toOrder() {
return Order.builder()
.orderId(UUID.randomUUID())
.userId(userId)
.orderStatus("RECEIPT")
.build();
}
public DeliveryMessage toDeliveryMessage(UUID orderId) {
return DeliveryMessage.builder()
.orderId(orderId)
.userId(userId)
.productId(productId)
.productQuantity(productQuantity)
.payAmount(payAmount)
.build();
}
}
}
- Order 생성, 조회 Endpoint, DTO, error를 수신할 listener 설정
Service
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {
@Value("${message.queue.product}")
private String productQueue;
// DB 없을 때 테스트용
// private Map<UUID, Order> orderStore = new HashMap<>();
private final OrderRepository orderRepository;
private final RabbitTemplate rabbitTemplate;
public Order createOrder(OrderEndpoint.OrderRequestDto orderRequestDto) {
log.info("OrderRequestDto: {}", orderRequestDto);
Order order = orderRequestDto.toOrder();
//orderStore.put(order.getOrderId(), order);
orderRepository.save(order);
DeliveryMessage deliveryMessage = orderRequestDto.toDeliveryMessage(order.getOrderId());
log.info("Delivery Message: {}", deliveryMessage);
rabbitTemplate.convertAndSend(productQueue, deliveryMessage);
return order;
}
public Order getOrder(UUID orderId) {
// return orderStore.get(orderId);
return orderRepository.findByOrderId(orderId)
.orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND));
}
public void rollbackOrder(DeliveryMessage deliveryMessage) {
Order order = orderRepository.findByOrderId(deliveryMessage.getOrderId())
.orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND));
order.cancelOrder(deliveryMessage.getErrorType());
orderRepository.save(order);
}
}
- DTO를 받아 Order를 생성해 DB에 저장하고 rabbitTemplate을 사용해 메시지 송신하는 메서드, OrderId를 받아 DB의 Order를 찾아서 반환하는 메서드, 에러 발생 시 rollback 메서드 설정
1-1. Order Application 테스트
2. Product Application
build.gradle
dependencies {
// Jackson 의존성
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
- Order에서 Web을 빼고 Jackson 의존성 추가
application.yml
spring:
application:
name: product
rabbitmq:
port: 5672
username: guest
password: guest
message:
exchange: market
queue:
product: market.product
payment: market.payment
err:
order: market.err.order
product: market.err.product
err:
exchange: market.err
- RabbitMQ 설정
Message DTO
- Order의 Message DTO와 동일
ApplicationQueueConfig
@Configuration
public class ProductApplicationQueueConfig {
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
- JsonMessageConverter 설정
Endpoint
@Slf4j
@RequiredArgsConstructor
@Component
public class ProductEndpoint {
private final ProductService productService;
@RabbitListener(queues = "${message.queue.product}")
public void receiveMessage(DeliveryMessage deliveryMessage) {
log.info("Product Receive: {}", deliveryMessage.toString());
productService.reduceProductAmount(deliveryMessage);
}
@RabbitListener(queues = "${message.queue.err.product}")
public void receiveErrorMessage(DeliveryMessage deliveryMessage) {
log.info("Error Receive !!!");
productService.rollbackProduct(deliveryMessage);
}
}
- Order에서 송신한 메시지를 수신하기 위한 listener 설정, 에러 메시지를 수신하기 위한 listener 설정
Service
@Service
@RequiredArgsConstructor
public class ProductService {
@Value("${message.queue.payment}")
private String paymentQueue;
@Value("${message.queue.err.order}")
private String errOrderQueue;
private final RabbitTemplate rabbitTemplate;
public void reduceProductAmount(DeliveryMessage deliveryMessage) {
Integer productId = deliveryMessage.getProductId();
Integer productQuantity = deliveryMessage.getProductQuantity();
if (productId != 1 || productQuantity > 1) {
this.rollbackProduct(deliveryMessage);
return;
}
rabbitTemplate.convertAndSend(paymentQueue, deliveryMessage);
}
public void rollbackProduct(DeliveryMessage deliveryMessage) {
log.info("Product Rollback");
if (!StringUtils.hasText(deliveryMessage.getErrorType())) {
deliveryMessage.setErrorType("PRODUCT ERROR");
}
rabbitTemplate.convertAndSend(errOrderQueue, deliveryMessage);
}
}
- Order의 메시지를 받아 Product를 생성하고 Product에 문제가 있을 경우 OrderErrQueue에 에러 메시지를 송신하는 메서드를 호출하고 정상일 경우 Payment에 메시지 송신
- Product에 문제가 있을 경우 ErrorType을 변경 후 메시지를 송신하는 메서드
2-1. Product Application 테스트
- product application 실행 시 정상적으로 메시지를 수신한다.
- Order를 통해 메시지를 보내 product에 대한 잘못된 값을 보내면 rollback 메서드가 실행되어 errorType이 변경된다.
3. Payment Application
build.gradle
dependencies {
// Jackson 의존성
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
runtimeOnly 'com.mysql:mysql-connector-j'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
- order의 dependency에서 jackson 의존성 추가
application.yml
spring:
application:
name: payment
rabbitmq:
port: 5672
username: guest
password: guest
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/market
username: username
password: 'password'
jpa:
hibernate:
ddl-auto: create
properties:
hibernate:
show_sql: 'true'
format_sql: 'true'
use_sql_comments: 'true'
message:
exchange: market
queue:
product: market.product
payment: market.payment
err:
order: market.err.order
product: market.err.product
err:
exchange: market.err
- DB, RabbitMQ 설정
Message DTO
- Order, Product, Payment 동일하게 사용
Entity
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "payment")
public class Payment {
@Id
private UUID paymentId;
private String userId;
private Integer payAmount;
private String payStatus;
}
ApplicationQueueConfig
- Product와 동일한 설정 사용, Payment에 맞게 클래스 이름만 수정
Repository
public interface PaymentRepository extends JpaRepository<Payment, UUID> {
}
Endpoint
@Slf4j
@Component
@RequiredArgsConstructor
public class PaymentEndpoint {
private final PaymentService paymentService;
@RabbitListener(queues = "${message.queue.payment}")
public void receiveMessage(DeliveryMessage deliveryMessage) {
log.info("Payment Message: {}", deliveryMessage);
paymentService.createPayment(deliveryMessage);
}
}
- Product에서 송신한 메시지를 수신할 listener 설정
Service
@Slf4j
@Service
@RequiredArgsConstructor
public class PaymentService {
@Value("${message.queue.err.product}")
private String productErrorQueue;
private final PaymentRepository paymentRepository;
private final RabbitTemplate rabbitTemplate;
public void createPayment(DeliveryMessage deliveryMessage) {
Payment payment = Payment.builder()
.paymentId(UUID.randomUUID())
.userId(deliveryMessage.getUserId())
.payAmount(deliveryMessage.getPayAmount())
.payStatus("SUCCESS")
.build();
Integer payAmount = deliveryMessage.getPayAmount();
if (payAmount >= 10000) {
log.error("Payment amount exceeds limit : {}", payAmount);
deliveryMessage.setErrorType("PAYMENT_LIMIT_EXCEEDED");
this.rollbackPayment(deliveryMessage);
return;
}
paymentRepository.save(payment);
}
public void rollbackPayment(DeliveryMessage deliveryMessage) {
log.info("Payment Rollback !!!");
rabbitTemplate.convertAndSend(productErrorQueue, deliveryMessage);
}
}
- 메시지를 수신해 Payment Entity 생성 payAmount 값이 10000보다 높으면 rollback 메서드를 호출해 productErrorQueue로 메시지 송신
3-1. 실행 테스트
- Product의 메시지를 수신하고 잘못된 값이 들어온 경우 rollback 메서드를 통해 에러 메시지를 송신하는 것을 확인
4. SAGA 패턴 테스트
application을 모두 실행 후 정상 실행과 에러 상태를 모두 확인
- PostMan 정상 요청과 product, payment에 대한 에러 상황을 발생시키도록 요청을 보내고 DB를 확인
- Product, Payment에 대한 에러 타입과 주문 상태가 취소로 변경
- Payment의 경우에도 정상 상태일 때만 저장되는 것을 확인할 수 있다.
정리
- RabbitMQ를 사용해 SAGA 패턴 중 Choreography 방식을 간단하게 구현해 보았다.
- 간단한 SAGA 패턴이지만 정상적인 경우에 대한 처리를 먼저 개발하고 에러 상황이 발생했을 때에 대한 부분을 개발하는 식으로 단계를 나눠서 진행이 필요했다.
- MSA에서 데이터 일관성을 유지하기 위한 방법 중 다른 방식에 대한 것도 배워서 적용해보고 상황에 맞는 방법을 사용할 수 있도록 해야 될 것 같다.
'자바 심화 > TIL' 카테고리의 다른 글
프로젝트 문제 해결(역직렬화, git 에러) (0) | 2024.12.12 |
---|---|
DDD(Domain-Driven Design) (1) | 2024.12.09 |
Kafka - 기초 (0) | 2024.12.06 |
Rabbit MQ (1) | 2024.12.05 |
대규모 스트림 처리 (2) | 2024.12.04 |