개요
MSA 예약 관리 시스템 프로젝트에서 예약 관리 서비스를 Kafka를 사용해 비동기 방식으로 처리하도록 구현했다.
Kafka 비동기 처리 구현 과정
Kafka 설치
1. Docker-Compose.yml 작성
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
platform: linux/amd64
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: wurstmeister/kafka:latest
platform: linux/amd64
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
kafka-ui:
image: provectuslabs/kafka-ui:latest
platform: linux/amd64
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_READONLY: "false"
2. Docker Compose 실행 및 설치 확인
docker-compose -f docker-compose.yml up -d
docker ps
비동기 처리 구현
1. Gradle - Kafka 의존성 추가
// Kafka
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
2. application.yml에 Kafka 구성
kafka:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
- bootstrap-server 주소 및 consumer auto-offset reset 방식만 작성
- producer, consumer에 대한 serializer 설정도 작성 가능
3. Kafka Producer/Consumer Configuration 파일 작성
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> kafkaConfig = new HashMap<>();
kafkaConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(kafkaConfig);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- Producer의 bootsrapServer, Serializer 설정
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> kafkaConfig = new HashMap<>();
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
kafkaConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(kafkaConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
- Consumer의 bootstrap server, de-serializer, auto-offset reset, trusted pakages 설정
4. Message Class 작성
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class ReservationMessage {
private String userId;
private String userEmail;
private String storeId;
private String guestCount;
private String reservationDate;
private String reservationTime;
}
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class PaymentMessage {
private UUID paymentId;
private Integer amount;
private UUID reservationId;
}
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Builder(access = AccessLevel.PRIVATE)
public class ReservationToStoreMessage {
private UUID ReservationSlotId;
private Integer currentCapacity;
public static ReservationToStoreMessage of(UUID slotId, Integer currentCapacity) {
return ReservationToStoreMessage.builder()
.ReservationSlotId(slotId)
.currentCapacity(currentCapacity)
.build();
}
}
- 메시지를 보내거나 받기 위한 클래스를 생성한다.
- field 타입과 이름이 일치해야 된다.
5. Producer Class 작성
import com.quit.reservation.infrastructure.messaging.message.ReservationToStoreMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class ReservationMessagingProducer implements MessageProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public ReservationMessagingProducer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
//TODO: value 값 환경변수 처리 고려
@Value("reservation.confirm.success")
private String reservationTopic;
@Value("reservation.confirm.failed")
private String reservationFailedTopic;
/* 메시지 전송: 예약 -> 가게-예약 슬롯 예약 생성 정보 전송(slotId, guestCount)*/
@Override
public void sendReservationData(UUID slotId, Integer currentCapacity) {
ReservationToStoreMessage message = ReservationToStoreMessage.of(slotId, currentCapacity);
sendMessage(reservationTopic, slotId.toString(), message);
}
@Override
public void sendReservationFailed(UUID slotId, Integer currentCapacity) {
ReservationToStoreMessage message = ReservationToStoreMessage.of(slotId, currentCapacity);
sendMessage(reservationFailedTopic, slotId.toString(), message);
}
private void sendMessage(String topic, String key, Object message) {
kafkaTemplate.send(topic, key, message);
}
}
- Topic, 전송할 메시지 타입을 작성
6. Consumer Class 작성
import com.quit.reservation.application.dto.CreateReservationDto;
import com.quit.reservation.application.service.ReservationService;
import com.quit.reservation.domain.enums.ReservationStatus;
import com.quit.reservation.infrastructure.messaging.message.PaymentMessage;
import com.quit.reservation.infrastructure.messaging.message.ReservationMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.UUID;
@Slf4j
@Component
public class ReservationMessagingConsumer {
/* 수신할 메시지(topic)
* 1. 결제에서 보내는 결제 성공/실패 메시지(성공 상태/실패 상태 변경)
* 2. 가게에서 예약 확정 실패 메시지(실패 시 상태 변경) */
private final ReservationService reservationService;
public ReservationMessagingConsumer(ReservationService reservationService) {
this.reservationService = reservationService;
}
//TODO: topics 환경변수 설정 고려
// 대기열 -> 예약 서비스 메시지 수신 처리
@KafkaListener(topics = "queue.process.success", groupId = "reservation-group")
public void listenReservationCreate(ReservationMessage message) {
log.info("예약 정보 메시지 수신 - 가게 ID: {}", message.getStoreId());
UUID storeId = UUID.fromString(message.getStoreId());
Integer guestCount = Integer.valueOf(message.getGuestCount());
LocalDate reservationDate = LocalDate.parse(message.getReservationDate());
LocalTime reservationTime = LocalTime.parse(message.getReservationTime());
CreateReservationDto request = CreateReservationDto
.of(storeId, guestCount, reservationDate, reservationTime);
log.info("예약 정보 메시지 처리 호출");
reservationService.createReservation(request, message.getUserEmail());
}
// 결제 -> 예약 서비스 메시지 수신 처리(성공)
@KafkaListener(topics = "payment.create.success", groupId = "reservation-group")
public void listenReservationPaymentSuccess(PaymentMessage message) {
log.info("예약 결제 메시지 수신 - 결제 ID: {}", message.getPaymentId());
UUID reservationId = message.getReservationId();
Integer amount = message.getAmount();
log.info("예약 금액 정보 업데이트 호출");
reservationService.updateReservationPayment(reservationId, amount);
log.info("예약 상태 변경 호출");
ReservationStatus status = ReservationStatus.ACCEPTED;
reservationService.changeReservationStatusAsync(reservationId, status);
}
@KafkaListener(topics = "payment.create.failed", groupId = "reservation-group")
public void listenReservationPaymentFailed(PaymentMessage message) {
log.info("예약 결제 실패 메시지 수신 - 결제 ID: {}", message.getPaymentId());
UUID reservationId = message.getReservationId();
cancelReservationAsync(reservationId);
}
//TODO: 랜덤 값 수정 및 topics, message 정의 필요
@KafkaListener(topics = "store.reservation.failed", groupId = "reservation-group")
public void listenReservationConfirmFailed() {
log.info("예약 확정 실패 메시지 수신 - 예약 ID: {}", "실패");
UUID reservationId = UUID.randomUUID();
cancelReservationAsync(reservationId);
}
private void cancelReservationAsync(UUID reservationId) {
log.info("예약 취소 처리 호출");
reservationService.cancelReservationAsync(reservationId);
}
}
- 메시지를 수신 받을 Topic을 설정하고 메시지를 받은 후 로직을 작성
7. Producer Interface 작성
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public interface MessageProducer {
void sendReservationData(UUID slotId, Integer currentCapacity);
void sendReservationFailed(UUID slotId, Integer currentCapacity);
}
- application 서비스에서 producer를 사용하기 위한 interface 생성
- 서비스 코드에서 prodcuer를 직접 의존하지 않도록 하기 위함(DIP)
Jmeter 테스트
Kafka를 통한 비동기 처리가 정상적으로 동작하는지 확인하기 위한 테스트를 Jmeter를 사용해 진행했다.
Test Contorller
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/tests")
public class KafkaTestController {
private final ReservationMessagingProducer reservationMessagingProducer;
@PostMapping("/queue")
public ResponseEntity<String> createSend(@RequestBody ReservationMessage message) {
String key = message.getStoreId() + ":" + message.getUserId();
reservationMessagingProducer.sendMessage("queue.process.success", key, message);
return ResponseEntity.ok(key);
}
@PostMapping("/payment")
public ResponseEntity<String> paymentSend(@RequestBody PaymentMessage message) {
String key = String.valueOf(message.getReservationId());
reservationMessagingProducer.sendMessage("payment.create.success", key, message);
return ResponseEntity.ok(key);
}
}
Jmeter Setting
1. Thread Group 설정
2. HTTP Request 설정
3. Header 설정
4. 리스너 설정
스레드 그룹 우클릭 → 추가 → 리스너 → 결과들의 트리 보기/요약 보고서 추가
5. 테스트 실행 및 결과 확인
Jmeter
Kafka topics
DB
구현 중 어려웠던 점
Consumer 역직렬화 문제
2025-01-10T16:29:39.561+09:00 ERROR 28692 --- [reservation] [ntainer#1-0-C-1] [ ] o.a.k.c.c.internals.CompletedFetch : [Consumer clientId=consumer-reservation-group-4, groupId=reservation-group] Value Deserializers with error: Deserializers{keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer@7ff7101, valueDeserializer=org.springframework.kafka.support.serializer.JsonDeserializer@fc8fa5f}
2025-01-10T16:29:39.561+09:00 ERROR 28692 --- [reservation] [ntainer#1-0-C-1] [ ] o.s.k.l.KafkaMessageListenerContainer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192) ~[spring-kafka-3.3.1.jar:3.3.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1985) ~[spring-kafka-3.3.1.jar:3.3.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1380) ~[spring-kafka-3.3.1.jar:3.3.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing VALUE for partition payment.create.success-0 at offset 0. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.newRecordDeserializationException(CompletedFetch.java:346) ~[kafka-clients-3.8.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:330) ~[kafka-clients-3.8.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:284) ~[kafka-clients-3.8.1.jar:na]
at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:168) ~[kafka-clients-3.8.1.jar:na]
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:134) ~[kafka-clients-3.8.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145) ~[kafka-clients-3.8.1.jar:na]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:667) ~[kafka-clients-3.8.1.jar:na]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:618) ~[kafka-clients-3.8.1.jar:na]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:591) ~[kafka-clients-3.8.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.8.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1685) ~[spring-kafka-3.3.1.jar:3.3.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1660) ~[spring-kafka-3.3.1.jar:3.3.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1438) ~[spring-kafka-3.3.1.jar:3.3.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1328) ~[spring-kafka-3.3.1.jar:3.3.1]
... 2 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.quit.payment.application.dto.PaymentEvent]
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:136) ~[spring-kafka-3.3.1.jar:3.3.1]
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:97) ~[spring-kafka-3.3.1.jar:3.3.1]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:587) ~[spring-kafka-3.3.1.jar:3.3.1]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.8.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:327) ~[kafka-clients-3.8.1.jar:na]
... 14 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.quit.payment.application.dto.PaymentEvent
at
- ClassNotFoundException:com.quit.payment.application.dto.PaymentEvent 에러 발생
원인
- 컨슈머가 역직렬화 할 시 메시지 헤더의 클래스 정보를 참조하도록 기본 설정 되어 있음
해결
- 메시지 역직렬화 시 메시지 헤더의 클래스 정보 참조하지 않도록 설정
- DeSerializer 클래스 타입 명시적 지정
- Message 타입 별로 Consumer 설정 분리, Consumer에 containerFactory 설정
Consumer Config
import com.quit.reservation.infrastructure.messaging.message.PaymentMessage;
import com.quit.reservation.infrastructure.messaging.message.ReservationMessage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
private Map<String, Object> commonKafkaConfig() {
Map<String, Object> kafkaConfig = new HashMap<>();
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
kafkaConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
kafkaConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
return kafkaConfig;
}
private <T> ConsumerFactory<String, T> createConsumerFactory(Class<T> valueType) {
Map<String, Object> kafkaConfig = commonKafkaConfig();
kafkaConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, valueType.getName());
return new DefaultKafkaConsumerFactory<>(kafkaConfig, new StringDeserializer(), new JsonDeserializer<>(valueType));
}
@Bean
public ConsumerFactory<String, PaymentMessage> paymentConsumerFactory() {
return createConsumerFactory(PaymentMessage.class);
}
@Bean
public ConsumerFactory<String, ReservationMessage> queueConsumerFactory() {
return createConsumerFactory(ReservationMessage.class);
}
private <T> ConcurrentKafkaListenerContainerFactory<String, T> createKafkaListenerContainerFactory(ConsumerFactory<String, T> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, T> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentMessage> paymentKafkaListenerContainerFactory() {
return createKafkaListenerContainerFactory(paymentConsumerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ReservationMessage> queueKafkaListenerContainerFactory() {
return createKafkaListenerContainerFactory(queueConsumerFactory());
}
}
- 공통 설정 부분을 메서드화 하여 중복 코드 최소화
Consumer
import com.quit.reservation.application.dto.CreateReservationDto;
import com.quit.reservation.application.service.ReservationService;
import com.quit.reservation.domain.enums.ReservationStatus;
import com.quit.reservation.infrastructure.messaging.message.PaymentMessage;
import com.quit.reservation.infrastructure.messaging.message.ReservationMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.UUID;
@Slf4j
@Component
public class ReservationMessagingConsumer {
/* 수신할 메시지(topic)
* 1. 결제에서 보내는 결제 성공/실패 메시지(성공 상태/실패 상태 변경)
* 2. 가게에서 예약 확정 실패 메시지(실패 시 상태 변경) */
private final ReservationService reservationService;
public ReservationMessagingConsumer(ReservationService reservationService) {
this.reservationService = reservationService;
}
//TODO: topics 환경변수 설정 고려
// 대기열 -> 예약 서비스 메시지 수신 처리
@KafkaListener(topics = "queue.process.success", groupId = "reservation-group",
containerFactory = "queueKafkaListenerContainerFactory")
public void listenReservationCreate(ReservationMessage message) {
log.info("예약 정보 메시지 수신 - 가게 ID: {}", message.getStoreId());
UUID storeId = UUID.fromString(message.getStoreId());
Integer guestCount = Integer.valueOf(message.getGuestCount());
LocalDate reservationDate = LocalDate.parse(message.getReservationDate());
LocalTime reservationTime = LocalTime.parse(message.getReservationTime());
CreateReservationDto request = CreateReservationDto
.of(storeId, guestCount, reservationDate, reservationTime);
log.info("예약 정보 메시지 처리 호출");
reservationService.createReservation(request, message.getUserEmail());
}
// 결제 -> 예약 서비스 메시지 수신 처리(성공)
@KafkaListener(topics = "payment.create.success", groupId = "reservation-group",
containerFactory = "paymentKafkaListenerContainerFactory")
public void listenReservationPaymentSuccess(PaymentMessage message) {
log.info("예약 결제 메시지 수신 - 결제 ID: {}", message.getPaymentId());
UUID reservationId = message.getReservationId();
Integer amount = message.getAmount();
log.info("예약 금액 정보 업데이트 호출");
reservationService.updateReservationPayment(reservationId, amount);
log.info("예약 상태 변경 호출");
ReservationStatus status = ReservationStatus.ACCEPTED;
reservationService.changeReservationStatusAsync(reservationId, status);
}
// 결제 -> 예약 서비스 메시지 수신 처리(실패)
@KafkaListener(topics = "payment.create.failed", groupId = "reservation-group",
containerFactory = "paymentKafkaListenerContainerFactory")
public void listenReservationPaymentFailed(PaymentMessage message) {
log.info("예약 결제 실패 메시지 수신 - 결제 ID: {}", message.getPaymentId());
UUID reservationId = message.getReservationId();
cancelReservationAsync(reservationId);
}
private void cancelReservationAsync(UUID reservationId) {
log.info("예약 취소 처리 호출");
reservationService.cancelReservationAsync(reservationId);
}
}
- Topic별 해당하는 Consumer 설정 적용
문제점
- Producer로 보내는 메시지 타입이 많아질 경우 Consumer 설정을 추가로 구성해야 됨
문제점 해결 방안
- ObjectMapper를 사용해 메시지 직렬화 역직렬화(Serializer, Deserializer)를 Custom 처리
예제
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class EventSerializer {
private static final ObjectMapper objectMapper = new ObjectMapper();
public static String serialize(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException("직렬화 중 오류 발생: " + e.getMessage());
}
}
public static <T> T deserialize(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, clazz);
} catch (JsonProcessingException e) {
throw new RuntimeException("역직렬화 중 오류 발생: " + e.getMessage());
}
}
}
정리
Kafka를 통한 비동기 처리가 이점이 많지만 그만큼 신경 써야 되는 부분도 많은 것 같다.
'자바 심화 > TIL' 카테고리의 다른 글
Redisson 분산 락 구현 (0) | 2025.01.15 |
---|---|
Kafka - 비동기 처리 (1) | 2024.12.31 |
동시성 제어 시점과 데이터 일관성 유지 관점 (1) | 2024.12.30 |
Redis - Redisson (2) | 2024.12.27 |
DB Lock (1) | 2024.12.26 |