본문 바로가기

자바 심화/TIL

Kafka - 비동기 처리

개요

Kafka 비동기 처리에 대해 궁금했던 점에 대한 것과 해결 방안을 찾아볼 것이다.

 

Kafka 비동기 처리에 대해 궁금했던 점

  • Consumer가 Message를 받아 서비스를 호출해 작업을 하는데 동일한 작업에 대한 Controller 메서드가 필요한가?

 

Kafka 비동기 처리 시 Controller의 동일 메서드가 필요 없는 이유

  1. Kafka Consumer가 작업을 처리
    • 메시지가 producer에서 전달되면 consumer가 해당 메시지를 받아 특정 로직을 실행함
      • Consumer가 생성 작업을 호출하면, 이미 Kafka에서 수신된 메시지에 대한 처리를 완료하는 것이므로 별도의 REST API가 필요하지 않음
  2. Controller의 역할이 없을 경우
    • 컨트롤러는 주로 클라이언트 요청을 처리하는 역할을 함
    • 비동기 처리에서는 클라이언트가 REST API를 호출하지 않고 producer를 통해 메시지를 전송하므로 Controller에서 이를 직접 다룰 필요가 없음
  3. 예외 상황
    • Kafka 외에 별도의 요청 경로를 제공해야 하는 경우
      • Kafka 외에도 특정 클라이언트가 REST API를 통해 동일한 작업(생성 등)을 요청할 가능성이 있다면, Controller 메서드로 해당 작업을 처리해야 할 수도 있음
    • 테스트 및 디버깅 목적
      • Consumer나 비즈니스 로직을 디버깅하거나 테스트하기 위해 Controller에서 동일한 작업을 호출하는 임시 메서드를 만들 수도 있음
    • 서비스 확장성
      • Kafka와 REST API 둘 다 지원하려는 경우, Controller와 Consumer를 모두 유지하고 동일한 Service 계층에서 로직을 공유할 수 있음

결론

  • Kafka 비동기 처리를 처음 진행하기 때문에 우선적으로 REST API를 통해 기능이 정상적으로 동작하는지 테스트하고 비동기 처리로 동작하는지 확인

 

Controller로 Kafka 메시지 테스트가 가능한 이유

  • 비즈니스 로직의 분리
    • 일반적으로 Kafka 메시지를 처리하는 로직은 Consumer에서 작성되지만, 실제 처리는 Service 계층에서 이루어짐 
      • 이 Service 계층의 메서드는 Controller에서도 호출할 수 있기 때문에 Kafka 없이도 동일한 로직을 실행할 수 있다
  • Request 데이터를 Kafka 메시지 형태로 전달
    • Kafka 메시지가 Consumer에서 JSON 포맷이나 특정 객체로 변환되어 처리되는 구조라면, Controller에서도 동일한 데이터 구조를 받아 테스트할 수 있다. 
      • 예를 들어, Kafka 메시지와 동일한 DTO를 Controller의 Request Body로 받을 수 있다

예시

Kafka Consumer

@Component
public class KafkaMessageConsumer {

    private final MyService myService;

    public KafkaMessageConsumer(MyService myService) {
        this.myService = myService;
    }

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void consumeMessage(String message) {
        MyDto myDto = convertToDto(message); // JSON -> DTO 변환
        myService.processData(myDto);
    }
}

 

Controller

@RestController
@RequestMapping("/api/test")
public class TestController {

    private final MyService myService;

    public TestController(MyService myService) {
        this.myService = myService;
    }

    @PostMapping("/process")
    public ResponseEntity<String> testProcess(@RequestBody MyDto myDto) {
        myService.processData(myDto); // Service 호출
        return ResponseEntity.ok("Processed successfully");
    }
}

 

Service

@Service
public class MyService {
    public void processData(MyDto myDto) {
        // Kafka 메시지와 동일한 로직 처리
        System.out.println("Processing: " + myDto);
    }
}

 

테스트 시나리오

  1. Kafka를 사용하지 않는 환경에서 테스트
    • POST /api/test/process 엔드포인트에 Kafka 메시지와 동일한 구조의 JSON 데이터를 전송
    • Service 계층 로직이 제대로 동작하는지 확인
  2. Kafka 환경 테스트 전 준비
    • Kafka Consumer가 사용할 로직이 Controller와 동일한지 확인할 수 있다
    • Kafka 메시지 처리 전후를 미리 검증할 수 있는 장점이 있다

장점

  • Kafka 의존 없이 로직 검증 가능
  • Kafka 설정이나 메시지 전달 오류를 배제한 상태에서 순수 비즈니스 로직을 테스트할 수 있음
  • 개발 초기에 로직 구현 및 단위 테스트를 쉽게 작성 가능

 

Kafka의 메시지 순서 보장

기본 Kafka Consumer의 동작 방식

  1. 메시지는 파티션 단위로 처리 - Kafka는 토픽을 여러 파티션으로 나누어 저장하며, 각 파티션은 한 Consumer 스레드에서 처리
    • 단일 파티션에서는 메시지가 순서 보장(FIFO)되며, 이전 메시지 처리가 끝나야 다음 메시지를 처리
    • 여러 파티션이 있다면 Consumer는 병렬로 메시지를 처리할 수 있음
  2. Consumer의 메시지 처리 방식
    • 동기 처리 - 메시지를 순차적으로 처리하며, 현재 메시지가 처리 완료되기 전까지 다음 메시지를 처리하지 않는다
    • 비동기 처리 - Consumer 애플리케이션에서 메시지를 병렬로 처리하는 로직을 구현하면, 동시에 여러 메시지를 처리할 수 있음
  3. Acknowledge(Offset Commit)
    • Kafka는 메시지를 처리 완료했다고 Consumer가 명시적으로 Offset Commit을 수행해야 다음 메시지를 처리로 간주함
      • 동기 처리에서는 메시지를 하나 처리할 때마다 Commit이 수행
      • 비동기 처리에서는 처리 완료 후 개별 메시지나 배치로 Commit을 수행할 수 있음

단일 서버 구성에서의 동작

  1. Producer가 메시지를 보내는 속도와 Consumer 처리 속도 비교
    • Producer가 메시지를 빠르게 보내고 Consumer가 동기적으로 처리한다면, Consumer는 한 메시지를 처리한 후에야 다음 메시지를 처리한다. 이 경우 Consumer가 병목이 될 수 있다.
    • Consumer가 비동기로 메시지를 처리한다면, 메시지가 순서와 관계없이 병렬로 처리될 수 있다.
  2. 단일 파티션의 경우
    • Producer가 보낸 메시지는 Kafka 토픽의 단일 파티션에 기록된다.
    • Consumer는 이 파티션의 메시지를 순서대로 처리해야 하므로, 한 메시지의 처리가 끝나야 다음 메시지를 처리한다.
  3. 여러 파티션의 경우
    • Consumer 그룹이 단일 Consumer로 구성되더라도 여러 파티션에서 병렬로 메시지를 가져올 수 있다.
    • Consumer 내부 로직에서 병렬 처리 코드를 구현하면 동시에 여러 메시지를 처리할 수 있다.

동작을 조정하는 방법

  1. 동기 처리 설정
    • 메시지를 순차적으로 처리하려면 Consumer에서 비동기 로직을 사용하지 않고, 처리 완료 후 Offset Commit을 수행하도록 구현
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
          for (ConsumerRecord<String, String> record : records) {
              processMessage(record); // 메시지 처리
              consumer.commitSync(); // 동기적으로 Commit
          }
      }


  2.  비동기 처리 설정
    • 병렬 처리가 필요하다면 스레드 풀을 사용하거나, 메시지를 큐에 넣어 처리하도록 구현
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
          for (ConsumerRecord<String, String> record : records) {
              executorService.submit(() -> processMessage(record)); // 비동기로 메시지 처리
          }
          consumer.commitAsync(); // 비동기 Commit
      }
  3.  Consumer 그룹을 활용한 병렬 처리
    • 단일 서버에서도 Kafka Consumer 그룹을 구성하면 여러 Consumer를 실행할 수 있다
      • 이 경우 Kafka는 각 Consumer에게 서로 다른 파티션의 메시지를 분배

 

단일 서버에서 Kafka 비동기 처리를 구성할 때, 메시지가 처리되는 방식은 Consumer의 설정에 따라 달라진다

  • 단일 파티션 + 동기 처리: 한 메시지 처리 후 다음 메시지를 처리.
  • 단일 파티션 + 비동기 처리: 메시지를 병렬로 처리 가능.
  • 여러 파티션: 메시지가 병렬로 처리 가능.

Kafka 동기 비동기 전환이 간단한 이유

  1. Kafka 자체는 비동기 처리에 유연
  2. 서비스 계층 로직 재활용 가능
  3. Java 스레드 풀 활용

기존 동기 처리 코드

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processMessage(record); // 동기적으로 메시지 처리
        consumer.commitSync(); // 메시지 처리 후 동기적으로 Commit
    }
}

 

스레드 풀 활용

ExecutorService executorService = Executors.newFixedThreadPool(10); // 스레드 풀 생성

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        executorService.submit(() -> {
            try {
                processMessage(record); // 비동기로 메시지 처리
            } catch (Exception e) {
                e.printStackTrace();
                // 예외 처리 로직
            }
        });
    }
    consumer.commitAsync(); // 비동기로 Offset Commit
}

 

변경점

  • 스레드 풀을 사용해 메시지를 비동기로 처리.
  • 메시지 처리가 끝나지 않아도 다음 메시지를 가져와 처리 가능.
  • Offset Commit은 commitAsync를 사용해 처리.

CompletableFuture 활용

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        CompletableFuture.runAsync(() -> processMessage(record))
                         .exceptionally(e -> {
                             e.printStackTrace();
                             // 예외 처리 로직
                             return null;
                         });
    }
    consumer.commitAsync(); // 비동기로 Offset Commit
}
  • CompletableFuture는 Java의 비동기 프로그래밍 표준 API로, 간단한 비동기 처리에 적합

 

전환 시 고려사항

  1. 메시지 처리 순서
    • 단일 파티션에서는 Kafka가 메시지의 순서를 보장한다.
    • 비동기 처리로 전환하면 처리 속도에 따라 메시지 순서가 뒤바뀔 수 있음에 유의해야 한다.
      • 순서가 중요한 경우에는 메시지를 순차적으로 처리하는 큐를 추가하거나, CompletableFuture와 thenRun으로 순서를 보장하는 방식이 필요하다.
  2.  Offset Commit 방식
    • commitSync: 동기적으로 Offset을 Commit하며, 실패 시 재시도.
    • commitAsync: 비동기로 Commit하며, 실패 시 재시도를 하지 않음.
    • 비동기 처리에서는 commitAsync를 사용하되, 처리 완료 후 Offset Commit 타이밍을 정확히 조정해야 중복 처리를 방지할 수 있다.
  3. 예외 처리
    • 비동기 작업에서 예외가 발생하면 스레드 풀이나 CompletableFuture의 예외 처리가 필요하다.
    • 예외가 발생했을 때 메시지를 다시 처리할지, DLQ(Dead Letter Queue)로 보낼지 결정해야 하다.
  4. 스레드 풀 크기 조정
    • 스레드 풀의 크기는 메시지 처리 속도, 서버 리소스(CPU, 메모리) 등을 고려해 적절히 설정해야 하다.
    • 너무 작은 경우 병목이 발생하고, 너무 큰 경우 서버 리소스가 부족할 수 있다.
  5. 모니터링 및 성능 테스트
    • 비동기로 전환 후 처리 속도 및 리소스 사용량을 반드시 테스트.
    • Kafka Consumer의 Lag(처리되지 않은 메시지 수)을 모니터링해 성능을 튜닝.

 

Kafka에서 중복 방지가 어려운 이유

  1. At-least-once 메시징
    • Kafka Consumer는 기본적으로 메시지를 적어도 한 번(at-least-once) 처리하도록 설계되었다. 따라서 Consumer가 메시지 처리 중 실패하면 동일한 메시지가 다시 처리될 수 있다.
  2. 중복 메시지 가능성
    • Producer가 메시지를 전송 중 실패할 경우, 재시도 로직으로 인해 동일한 메시지가 여러 번 전송될 수 있다.
    • Kafka 자체는 메시지의 고유성을 보장하지 않는다.

Kafka만으로 중복 방지 처리

Kafka를 사용하는 경우, 중복 메시지 방지는 주로 키(key)를 활용하거나 메시지 처리 전략을 설계하여 구현한다.

  1. Producer에서 메시지 Key 활용
    • Kafka 메시지의 Key를 고유한 값(예: 예약 ID)으로 설정하면, 동일한 Key를 가진 메시지가 Kafka 토픽의 동일한 파티션으로 전송된다.
      • 장점: 파티션 내 순서를 보장하므로 최신 상태를 덮어쓸 수 있음.
      • 한계: 단순히 Key를 사용하면 메시지 덮어쓰기일 뿐, 중복 검증 로직이 없으면 예약 처리 결과는 여전히 중복될 수 있음.
  2. Idempotent Producer 설정
    • Kafka Producer에서 Idempotence(멱등성) 설정을 활성화하면, 동일한 메시지를 여러 번 전송하더라도 Kafka가 중복 메시지를 제거한다.
      • 설정: enable.idempotence=true
      • 한계: Producer 수준에서 중복 제거만 보장하므로, Consumer에서 중복 검증은 여전히 필요하다.

 

Kafka와 추가적인 중복 방지 로직

Kafka만으로 중복 방지 구현이 완벽하지 않으므로, 애플리케이션 레벨에서 추가적인 중복 방지 로직을 설계하는 것이 필요.

  1. 중복 확인을 위한 데이터 저장소 활용
    • 예약 서비스에서는 중복 확인을 위해 **데이터 저장소(예: Redis, RDBMS)**를 활용하는 것이 일반적.
    • 전략:
      1. 예약 ID 또는 고유 키(예: 사용자 ID + 예약 시간)를 저장소에 저장.
      2. Kafka 메시지를 처리하기 전에 저장소에서 중복 여부를 확인.
      3. 중복이 발견되면 메시지를 무시.
        @Service
        public class ReservationService {
            private final RedisTemplate<String, String> redisTemplate;
        
            public boolean isDuplicateReservation(String reservationId) {
                Boolean isDuplicate = redisTemplate.opsForValue().setIfAbsent(reservationId, "processed", 1, TimeUnit.HOURS);
                return !Boolean.TRUE.equals(isDuplicate); // 중복인 경우 false 반환
            }
        }

  2.  Consumer에서 Idempotent 처리
    • Consumer에서 메시지를 처리할 때, 처리된 메시지(Offset 또는 ID)를 저장해 중복을 방지.
    • 전략:
      1. 메시지 처리 후 고유 키(예약 ID)를 데이터베이스나 캐시에 저장.
      2. 새 메시지가 들어올 때 이 키를 조회하여 중복 여부 확인.
      3. 중복이면 메시지 무시.
        @Component
        public class KafkaReservationConsumer {
            private final ReservationService reservationService;
        
            @KafkaListener(topics = "reservations", groupId = "reservation-group")
            public void consumeReservationMessage(String message) {
                String reservationId = extractReservationId(message); // 메시지에서 예약 ID 추출
                if (reservationService.isDuplicateReservation(reservationId)) {
                    System.out.println("Duplicate reservation detected. Skipping...");
                    return;
                }
                processReservation(message); // 예약 처리
            }
        }

추가 고려 사항

  1. Exactly-once 처리 보장
    • Kafka는 기본적으로 at-least-once 메시징을 보장하지만, Exactly-once 처리를 구현하려면 Kafka Streams나 Kafka Connect와 같은 도구를 사용해 메시지 상태를 관리해야 한다.
  2. 예약 충돌 방지 로직
    • 예약 중복 방지 외에도, 예약 시간이 겹치거나 리소스가 중복 예약되지 않도록 비즈니스 로직 수준의 검증이 필요하다.
  3. Dead Letter Queue (DLQ) 활용
    • 중복 메시지나 비정상 메시지를 처리하지 않고 별도의 DLQ로 이동시켜 나중에 분석할 수 있도록 설계.
  • Kafka 설정 활용: Key와 Idempotent Producer를 사용해 중복 메시지를 최소화.
  • 애플리케이션 레벨 중복 방지: Redis, RDBMS, 캐시 등을 활용해 중복 검증 로직을 구현.
  • Exactly-once 처리: 필요에 따라 Kafka Streams나 사후 검증 로직을 추가.

 

정리

  • MSA 프로젝트에서 비동기 처리를 위해 Kafka를 채택하였는데 Kafka를 간단히 사용하는데에도 많은 것을 고려하고 처리해야 된다는 것을 배웠다.

'자바 심화 > TIL' 카테고리의 다른 글

동시성 제어 시점과 데이터 일관성 유지 관점  (1) 2024.12.30
Redis - Redisson  (2) 2024.12.27
DB Lock  (1) 2024.12.26
장애 대응  (1) 2024.12.24
시큐어 코딩(Secure Coding)  (0) 2024.12.23