본문 바로가기

자바 심화/TIL

Kafka 비동기 처리 구현

개요

MSA 예약 관리 시스템 프로젝트에서 예약 관리 서비스를 Kafka를 사용해 비동기 방식으로 처리하도록 구현했다.

 

Kafka 비동기 처리 구현 과정

Kafka 설치

1. Docker-Compose.yml 작성

version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    platform: linux/amd64
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: wurstmeister/kafka:latest
    platform: linux/amd64
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    platform: linux/amd64
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_READONLY: "false"

 

2. Docker Compose 실행 및 설치 확인

docker-compose -f docker-compose.yml up -d

docker ps

 

비동기 처리 구현

1. Gradle - Kafka 의존성 추가

// Kafka
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

 

2. application.yml에 Kafka 구성

kafka:
  bootstrap-servers: localhost:9092
  consumer:
    auto-offset-reset: earliest
  • bootstrap-server 주소 및 consumer auto-offset reset 방식만 작성
    • producer, consumer에 대한 serializer 설정도 작성 가능

 

3. Kafka Producer/Consumer Configuration 파일 작성

 
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> kafkaConfig = new HashMap<>();
        kafkaConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(kafkaConfig);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • Producer의 bootsrapServer, Serializer 설정
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> kafkaConfig = new HashMap<>();
        kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        kafkaConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(kafkaConfig);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
  • Consumer의 bootstrap server, de-serializer, auto-offset reset, trusted pakages 설정

 

4. Message Class 작성

@Getter
@NoArgsConstructor
@AllArgsConstructor
public class ReservationMessage {
    private String userId;
    private String userEmail;
    private String storeId;
    private String guestCount;
    private String reservationDate;
    private String reservationTime;
}
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class PaymentMessage {
    private UUID paymentId;
    private Integer amount;
    private UUID reservationId;
}
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Builder(access = AccessLevel.PRIVATE)
public class ReservationToStoreMessage {
    private UUID ReservationSlotId;
    private Integer currentCapacity;

    public static ReservationToStoreMessage of(UUID slotId, Integer currentCapacity) {
        return ReservationToStoreMessage.builder()
                .ReservationSlotId(slotId)
                .currentCapacity(currentCapacity)
                .build();
    }
}
  • 메시지를 보내거나 받기 위한 클래스를 생성한다.
    • field 타입과 이름이 일치해야 된다.

 

5. Producer Class 작성

import com.quit.reservation.infrastructure.messaging.message.ReservationToStoreMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class ReservationMessagingProducer implements MessageProducer {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public ReservationMessagingProducer(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    //TODO: value 값 환경변수 처리 고려
    @Value("reservation.confirm.success")
    private String reservationTopic;

    @Value("reservation.confirm.failed")
    private String reservationFailedTopic;

    /* 메시지 전송: 예약 -> 가게-예약 슬롯 예약 생성 정보 전송(slotId, guestCount)*/
    @Override
    public void sendReservationData(UUID slotId, Integer currentCapacity) {
        ReservationToStoreMessage message = ReservationToStoreMessage.of(slotId, currentCapacity);
        sendMessage(reservationTopic, slotId.toString(), message);
    }

    @Override
    public void sendReservationFailed(UUID slotId, Integer currentCapacity) {
        ReservationToStoreMessage message = ReservationToStoreMessage.of(slotId, currentCapacity);
        sendMessage(reservationFailedTopic, slotId.toString(), message);
    }

    private void sendMessage(String topic, String key, Object message) {
        kafkaTemplate.send(topic, key, message);
    }
}
  • Topic, 전송할 메시지 타입을 작성

 

6. Consumer Class 작성

import com.quit.reservation.application.dto.CreateReservationDto;
import com.quit.reservation.application.service.ReservationService;
import com.quit.reservation.domain.enums.ReservationStatus;
import com.quit.reservation.infrastructure.messaging.message.PaymentMessage;
import com.quit.reservation.infrastructure.messaging.message.ReservationMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.UUID;

@Slf4j
@Component
public class ReservationMessagingConsumer {

    /* 수신할 메시지(topic)
     * 1. 결제에서 보내는 결제 성공/실패 메시지(성공 상태/실패 상태 변경)
     * 2. 가게에서 예약 확정 실패 메시지(실패 시 상태 변경) */

    private final ReservationService reservationService;

    public ReservationMessagingConsumer(ReservationService reservationService) {
        this.reservationService = reservationService;
    }

    //TODO: topics 환경변수 설정 고려

    // 대기열 -> 예약 서비스 메시지 수신 처리
    @KafkaListener(topics = "queue.process.success", groupId = "reservation-group")
    public void listenReservationCreate(ReservationMessage message) {
        log.info("예약 정보 메시지 수신 - 가게 ID: {}", message.getStoreId());

        UUID storeId = UUID.fromString(message.getStoreId());
        Integer guestCount = Integer.valueOf(message.getGuestCount());
        LocalDate reservationDate = LocalDate.parse(message.getReservationDate());
        LocalTime reservationTime = LocalTime.parse(message.getReservationTime());

        CreateReservationDto request = CreateReservationDto
                .of(storeId, guestCount, reservationDate, reservationTime);
        log.info("예약 정보 메시지 처리 호출");
        reservationService.createReservation(request, message.getUserEmail());
    }

    // 결제 -> 예약 서비스 메시지 수신 처리(성공)
    @KafkaListener(topics = "payment.create.success", groupId = "reservation-group")
    public void listenReservationPaymentSuccess(PaymentMessage message) {
        log.info("예약 결제 메시지 수신 - 결제 ID: {}", message.getPaymentId());

        UUID reservationId = message.getReservationId();
        Integer amount = message.getAmount();

        log.info("예약 금액 정보 업데이트 호출");
        reservationService.updateReservationPayment(reservationId, amount);

        log.info("예약 상태 변경 호출");
        ReservationStatus status = ReservationStatus.ACCEPTED;
        reservationService.changeReservationStatusAsync(reservationId, status);
    }

    @KafkaListener(topics = "payment.create.failed", groupId = "reservation-group")
    public void listenReservationPaymentFailed(PaymentMessage message) {
        log.info("예약 결제 실패 메시지 수신 - 결제 ID: {}", message.getPaymentId());

        UUID reservationId = message.getReservationId();
        cancelReservationAsync(reservationId);
    }

    //TODO: 랜덤 값 수정 및 topics, message 정의 필요
    @KafkaListener(topics = "store.reservation.failed", groupId = "reservation-group")
    public void listenReservationConfirmFailed() {
        log.info("예약 확정 실패 메시지 수신 - 예약 ID: {}", "실패");

        UUID reservationId = UUID.randomUUID();
        cancelReservationAsync(reservationId);
    }

    private void cancelReservationAsync(UUID reservationId) {
        log.info("예약 취소 처리 호출");
        reservationService.cancelReservationAsync(reservationId);
    }
}
  • 메시지를 수신 받을 Topic을 설정하고 메시지를 받은 후 로직을 작성

 

7. Producer Interface 작성

import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public interface MessageProducer {
    void sendReservationData(UUID slotId, Integer currentCapacity);
    void sendReservationFailed(UUID slotId, Integer currentCapacity);
}
  • application 서비스에서 producer를 사용하기 위한 interface 생성
    • 서비스 코드에서 prodcuer를 직접 의존하지 않도록 하기 위함(DIP)

 

Jmeter 테스트

Kafka를 통한 비동기 처리가 정상적으로 동작하는지 확인하기 위한 테스트를 Jmeter를 사용해 진행했다.

 

Test Contorller

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/tests")
public class KafkaTestController {

    private final ReservationMessagingProducer reservationMessagingProducer;

    @PostMapping("/queue")
    public ResponseEntity<String> createSend(@RequestBody ReservationMessage message) {
        String key = message.getStoreId() + ":" + message.getUserId();
        reservationMessagingProducer.sendMessage("queue.process.success", key, message);
        return ResponseEntity.ok(key);
    }

    @PostMapping("/payment")
    public ResponseEntity<String> paymentSend(@RequestBody PaymentMessage message) {
        String key = String.valueOf(message.getReservationId());
        reservationMessagingProducer.sendMessage("payment.create.success", key, message);
        return ResponseEntity.ok(key);
    }
}

 

Jmeter Setting

1. Thread Group 설정

 

2. HTTP Request 설정

 

3. Header 설정

 

4. 리스너 설정

스레드 그룹 우클릭 → 추가 → 리스너 → 결과들의 트리 보기/요약 보고서 추가

 

5. 테스트 실행 및 결과 확인

Jmeter

 

Kafka topics

 

DB

 

구현 중 어려웠던 점

Consumer 역직렬화 문제

2025-01-10T16:29:39.561+09:00 ERROR 28692 --- [reservation] [ntainer#1-0-C-1] [                                                 ] o.a.k.c.c.internals.CompletedFetch       : [Consumer clientId=consumer-reservation-group-4, groupId=reservation-group] Value Deserializers with error: Deserializers{keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer@7ff7101, valueDeserializer=org.springframework.kafka.support.serializer.JsonDeserializer@fc8fa5f}
2025-01-10T16:29:39.561+09:00 ERROR 28692 --- [reservation] [ntainer#1-0-C-1] [                                                 ] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
	at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1985) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1380) ~[spring-kafka-3.3.1.jar:3.3.1]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing VALUE for partition payment.create.success-0 at offset 0. If needed, please seek past the record to continue consumption.
	at org.apache.kafka.clients.consumer.internals.CompletedFetch.newRecordDeserializationException(CompletedFetch.java:346) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:330) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:284) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:168) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:134) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:667) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:618) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:591) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.8.1.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1685) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1660) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1438) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1328) ~[spring-kafka-3.3.1.jar:3.3.1]
	... 2 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.quit.payment.application.dto.PaymentEvent]
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:136) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:97) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:587) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:327) ~[kafka-clients-3.8.1.jar:na]
	... 14 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.quit.payment.application.dto.PaymentEvent
	at
  • ClassNotFoundException:com.quit.payment.application.dto.PaymentEvent 에러 발생

원인

  • 컨슈머가 역직렬화 할 시 메시지 헤더의 클래스 정보를 참조하도록 기본 설정 되어 있음

해결

  • 메시지 역직렬화 시 메시지 헤더의 클래스 정보 참조하지 않도록 설정
  • DeSerializer 클래스 타입 명시적 지정
  • Message 타입 별로 Consumer 설정 분리, Consumer에 containerFactory 설정

 

Consumer Config

import com.quit.reservation.infrastructure.messaging.message.PaymentMessage;
import com.quit.reservation.infrastructure.messaging.message.ReservationMessage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    private Map<String, Object> commonKafkaConfig() {
        Map<String, Object> kafkaConfig = new HashMap<>();
        kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        kafkaConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        kafkaConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
        return kafkaConfig;
    }

    private <T> ConsumerFactory<String, T> createConsumerFactory(Class<T> valueType) {
        Map<String, Object> kafkaConfig = commonKafkaConfig();
        kafkaConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, valueType.getName());
        return new DefaultKafkaConsumerFactory<>(kafkaConfig, new StringDeserializer(), new JsonDeserializer<>(valueType));
    }

    @Bean
    public ConsumerFactory<String, PaymentMessage> paymentConsumerFactory() {
        return createConsumerFactory(PaymentMessage.class);
    }

    @Bean
    public ConsumerFactory<String, ReservationMessage> queueConsumerFactory() {
        return createConsumerFactory(ReservationMessage.class);
    }

    private <T> ConcurrentKafkaListenerContainerFactory<String, T> createKafkaListenerContainerFactory(ConsumerFactory<String, T> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, T> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PaymentMessage> paymentKafkaListenerContainerFactory() {
        return createKafkaListenerContainerFactory(paymentConsumerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ReservationMessage> queueKafkaListenerContainerFactory() {
        return createKafkaListenerContainerFactory(queueConsumerFactory());
    }
}
  • 공통 설정 부분을 메서드화 하여 중복 코드 최소화

Consumer

import com.quit.reservation.application.dto.CreateReservationDto;
import com.quit.reservation.application.service.ReservationService;
import com.quit.reservation.domain.enums.ReservationStatus;
import com.quit.reservation.infrastructure.messaging.message.PaymentMessage;
import com.quit.reservation.infrastructure.messaging.message.ReservationMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.UUID;

@Slf4j
@Component
public class ReservationMessagingConsumer {

    /* 수신할 메시지(topic)
     * 1. 결제에서 보내는 결제 성공/실패 메시지(성공 상태/실패 상태 변경)
     * 2. 가게에서 예약 확정 실패 메시지(실패 시 상태 변경) */

    private final ReservationService reservationService;

    public ReservationMessagingConsumer(ReservationService reservationService) {
        this.reservationService = reservationService;
    }

    //TODO: topics 환경변수 설정 고려

    // 대기열 -> 예약 서비스 메시지 수신 처리
    @KafkaListener(topics = "queue.process.success", groupId = "reservation-group",
            containerFactory = "queueKafkaListenerContainerFactory")
    public void listenReservationCreate(ReservationMessage message) {
        log.info("예약 정보 메시지 수신 - 가게 ID: {}", message.getStoreId());

        UUID storeId = UUID.fromString(message.getStoreId());
        Integer guestCount = Integer.valueOf(message.getGuestCount());
        LocalDate reservationDate = LocalDate.parse(message.getReservationDate());
        LocalTime reservationTime = LocalTime.parse(message.getReservationTime());

        CreateReservationDto request = CreateReservationDto
                .of(storeId, guestCount, reservationDate, reservationTime);
        log.info("예약 정보 메시지 처리 호출");
        reservationService.createReservation(request, message.getUserEmail());
    }

    // 결제 -> 예약 서비스 메시지 수신 처리(성공)
    @KafkaListener(topics = "payment.create.success", groupId = "reservation-group",
            containerFactory = "paymentKafkaListenerContainerFactory")
    public void listenReservationPaymentSuccess(PaymentMessage message) {
        log.info("예약 결제 메시지 수신 - 결제 ID: {}", message.getPaymentId());

        UUID reservationId = message.getReservationId();
        Integer amount = message.getAmount();

        log.info("예약 금액 정보 업데이트 호출");
        reservationService.updateReservationPayment(reservationId, amount);

        log.info("예약 상태 변경 호출");
        ReservationStatus status = ReservationStatus.ACCEPTED;
        reservationService.changeReservationStatusAsync(reservationId, status);
    }

    // 결제 -> 예약 서비스 메시지 수신 처리(실패)
    @KafkaListener(topics = "payment.create.failed", groupId = "reservation-group",
            containerFactory = "paymentKafkaListenerContainerFactory")
    public void listenReservationPaymentFailed(PaymentMessage message) {
        log.info("예약 결제 실패 메시지 수신 - 결제 ID: {}", message.getPaymentId());

        UUID reservationId = message.getReservationId();
        cancelReservationAsync(reservationId);
    }

    private void cancelReservationAsync(UUID reservationId) {
        log.info("예약 취소 처리 호출");
        reservationService.cancelReservationAsync(reservationId);
    }
}
  • Topic별 해당하는 Consumer 설정 적용

 

문제점

  • Producer로 보내는 메시지 타입이 많아질 경우 Consumer 설정을 추가로 구성해야 됨

문제점 해결 방안

  • ObjectMapper를 사용해 메시지 직렬화 역직렬화(Serializer, Deserializer)를 Custom 처리

예제

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class EventSerializer {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    public static String serialize(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("직렬화 중 오류 발생: " + e.getMessage());
        }
    }

    public static <T> T deserialize(String json, Class<T> clazz) {
        try {
            return objectMapper.readValue(json, clazz);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("역직렬화 중 오류 발생: " + e.getMessage());
        }
    }
}

 

 

정리

Kafka를 통한 비동기 처리가 이점이 많지만 그만큼 신경 써야 되는 부분도 많은 것 같다.

'자바 심화 > TIL' 카테고리의 다른 글

Redisson 분산 락 구현  (0) 2025.01.15
Kafka - 비동기 처리  (1) 2024.12.31
동시성 제어 시점과 데이터 일관성 유지 관점  (1) 2024.12.30
Redis - Redisson  (2) 2024.12.27
DB Lock  (1) 2024.12.26