개요
Kafka 비동기 처리에 대해 궁금했던 점에 대한 것과 해결 방안을 찾아볼 것이다.
Kafka 비동기 처리에 대해 궁금했던 점
- Consumer가 Message를 받아 서비스를 호출해 작업을 하는데 동일한 작업에 대한 Controller 메서드가 필요한가?
Kafka 비동기 처리 시 Controller의 동일 메서드가 필요 없는 이유
- Kafka Consumer가 작업을 처리
- 메시지가 producer에서 전달되면 consumer가 해당 메시지를 받아 특정 로직을 실행함
- Consumer가 생성 작업을 호출하면, 이미 Kafka에서 수신된 메시지에 대한 처리를 완료하는 것이므로 별도의 REST API가 필요하지 않음
- 메시지가 producer에서 전달되면 consumer가 해당 메시지를 받아 특정 로직을 실행함
- Controller의 역할이 없을 경우
- 컨트롤러는 주로 클라이언트 요청을 처리하는 역할을 함
- 비동기 처리에서는 클라이언트가 REST API를 호출하지 않고 producer를 통해 메시지를 전송하므로 Controller에서 이를 직접 다룰 필요가 없음
- 예외 상황
- Kafka 외에 별도의 요청 경로를 제공해야 하는 경우
- Kafka 외에도 특정 클라이언트가 REST API를 통해 동일한 작업(생성 등)을 요청할 가능성이 있다면, Controller 메서드로 해당 작업을 처리해야 할 수도 있음
- 테스트 및 디버깅 목적
- Consumer나 비즈니스 로직을 디버깅하거나 테스트하기 위해 Controller에서 동일한 작업을 호출하는 임시 메서드를 만들 수도 있음
- 서비스 확장성
- Kafka와 REST API 둘 다 지원하려는 경우, Controller와 Consumer를 모두 유지하고 동일한 Service 계층에서 로직을 공유할 수 있음
- Kafka 외에 별도의 요청 경로를 제공해야 하는 경우
결론
- Kafka 비동기 처리를 처음 진행하기 때문에 우선적으로 REST API를 통해 기능이 정상적으로 동작하는지 테스트하고 비동기 처리로 동작하는지 확인
Controller로 Kafka 메시지 테스트가 가능한 이유
- 비즈니스 로직의 분리
- 일반적으로 Kafka 메시지를 처리하는 로직은 Consumer에서 작성되지만, 실제 처리는 Service 계층에서 이루어짐
- 이 Service 계층의 메서드는 Controller에서도 호출할 수 있기 때문에 Kafka 없이도 동일한 로직을 실행할 수 있다
- 일반적으로 Kafka 메시지를 처리하는 로직은 Consumer에서 작성되지만, 실제 처리는 Service 계층에서 이루어짐
- Request 데이터를 Kafka 메시지 형태로 전달
- Kafka 메시지가 Consumer에서 JSON 포맷이나 특정 객체로 변환되어 처리되는 구조라면, Controller에서도 동일한 데이터 구조를 받아 테스트할 수 있다.
- 예를 들어, Kafka 메시지와 동일한 DTO를 Controller의 Request Body로 받을 수 있다
- Kafka 메시지가 Consumer에서 JSON 포맷이나 특정 객체로 변환되어 처리되는 구조라면, Controller에서도 동일한 데이터 구조를 받아 테스트할 수 있다.
예시
Kafka Consumer
@Component
public class KafkaMessageConsumer {
private final MyService myService;
public KafkaMessageConsumer(MyService myService) {
this.myService = myService;
}
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consumeMessage(String message) {
MyDto myDto = convertToDto(message); // JSON -> DTO 변환
myService.processData(myDto);
}
}
Controller
@RestController
@RequestMapping("/api/test")
public class TestController {
private final MyService myService;
public TestController(MyService myService) {
this.myService = myService;
}
@PostMapping("/process")
public ResponseEntity<String> testProcess(@RequestBody MyDto myDto) {
myService.processData(myDto); // Service 호출
return ResponseEntity.ok("Processed successfully");
}
}
Service
@Service
public class MyService {
public void processData(MyDto myDto) {
// Kafka 메시지와 동일한 로직 처리
System.out.println("Processing: " + myDto);
}
}
테스트 시나리오
- Kafka를 사용하지 않는 환경에서 테스트
- POST /api/test/process 엔드포인트에 Kafka 메시지와 동일한 구조의 JSON 데이터를 전송
- Service 계층 로직이 제대로 동작하는지 확인
- Kafka 환경 테스트 전 준비
- Kafka Consumer가 사용할 로직이 Controller와 동일한지 확인할 수 있다
- Kafka 메시지 처리 전후를 미리 검증할 수 있는 장점이 있다
장점
- Kafka 의존 없이 로직 검증 가능
- Kafka 설정이나 메시지 전달 오류를 배제한 상태에서 순수 비즈니스 로직을 테스트할 수 있음
- 개발 초기에 로직 구현 및 단위 테스트를 쉽게 작성 가능
Kafka의 메시지 순서 보장
기본 Kafka Consumer의 동작 방식
- 메시지는 파티션 단위로 처리 - Kafka는 토픽을 여러 파티션으로 나누어 저장하며, 각 파티션은 한 Consumer 스레드에서 처리
- 단일 파티션에서는 메시지가 순서 보장(FIFO)되며, 이전 메시지 처리가 끝나야 다음 메시지를 처리
- 여러 파티션이 있다면 Consumer는 병렬로 메시지를 처리할 수 있음
- Consumer의 메시지 처리 방식
- 동기 처리 - 메시지를 순차적으로 처리하며, 현재 메시지가 처리 완료되기 전까지 다음 메시지를 처리하지 않는다
- 비동기 처리 - Consumer 애플리케이션에서 메시지를 병렬로 처리하는 로직을 구현하면, 동시에 여러 메시지를 처리할 수 있음
- Acknowledge(Offset Commit)
- Kafka는 메시지를 처리 완료했다고 Consumer가 명시적으로 Offset Commit을 수행해야 다음 메시지를 처리로 간주함
- 동기 처리에서는 메시지를 하나 처리할 때마다 Commit이 수행
- 비동기 처리에서는 처리 완료 후 개별 메시지나 배치로 Commit을 수행할 수 있음
- Kafka는 메시지를 처리 완료했다고 Consumer가 명시적으로 Offset Commit을 수행해야 다음 메시지를 처리로 간주함
단일 서버 구성에서의 동작
- Producer가 메시지를 보내는 속도와 Consumer 처리 속도 비교
- Producer가 메시지를 빠르게 보내고 Consumer가 동기적으로 처리한다면, Consumer는 한 메시지를 처리한 후에야 다음 메시지를 처리한다. 이 경우 Consumer가 병목이 될 수 있다.
- Consumer가 비동기로 메시지를 처리한다면, 메시지가 순서와 관계없이 병렬로 처리될 수 있다.
- 단일 파티션의 경우
- Producer가 보낸 메시지는 Kafka 토픽의 단일 파티션에 기록된다.
- Consumer는 이 파티션의 메시지를 순서대로 처리해야 하므로, 한 메시지의 처리가 끝나야 다음 메시지를 처리한다.
- 여러 파티션의 경우
- Consumer 그룹이 단일 Consumer로 구성되더라도 여러 파티션에서 병렬로 메시지를 가져올 수 있다.
- Consumer 내부 로직에서 병렬 처리 코드를 구현하면 동시에 여러 메시지를 처리할 수 있다.
동작을 조정하는 방법
- 동기 처리 설정
- 메시지를 순차적으로 처리하려면 Consumer에서 비동기 로직을 사용하지 않고, 처리 완료 후 Offset Commit을 수행하도록 구현
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { processMessage(record); // 메시지 처리 consumer.commitSync(); // 동기적으로 Commit } }
- 메시지를 순차적으로 처리하려면 Consumer에서 비동기 로직을 사용하지 않고, 처리 완료 후 Offset Commit을 수행하도록 구현
- 비동기 처리 설정
- 병렬 처리가 필요하다면 스레드 풀을 사용하거나, 메시지를 큐에 넣어 처리하도록 구현
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { executorService.submit(() -> processMessage(record)); // 비동기로 메시지 처리 } consumer.commitAsync(); // 비동기 Commit }
- 병렬 처리가 필요하다면 스레드 풀을 사용하거나, 메시지를 큐에 넣어 처리하도록 구현
- Consumer 그룹을 활용한 병렬 처리
- 단일 서버에서도 Kafka Consumer 그룹을 구성하면 여러 Consumer를 실행할 수 있다
- 이 경우 Kafka는 각 Consumer에게 서로 다른 파티션의 메시지를 분배
- 단일 서버에서도 Kafka Consumer 그룹을 구성하면 여러 Consumer를 실행할 수 있다
단일 서버에서 Kafka 비동기 처리를 구성할 때, 메시지가 처리되는 방식은 Consumer의 설정에 따라 달라진다
- 단일 파티션 + 동기 처리: 한 메시지 처리 후 다음 메시지를 처리.
- 단일 파티션 + 비동기 처리: 메시지를 병렬로 처리 가능.
- 여러 파티션: 메시지가 병렬로 처리 가능.
Kafka 동기 비동기 전환이 간단한 이유
- Kafka 자체는 비동기 처리에 유연
- 서비스 계층 로직 재활용 가능
- Java 스레드 풀 활용
기존 동기 처리 코드
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record); // 동기적으로 메시지 처리
consumer.commitSync(); // 메시지 처리 후 동기적으로 Commit
}
}
스레드 풀 활용
ExecutorService executorService = Executors.newFixedThreadPool(10); // 스레드 풀 생성
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executorService.submit(() -> {
try {
processMessage(record); // 비동기로 메시지 처리
} catch (Exception e) {
e.printStackTrace();
// 예외 처리 로직
}
});
}
consumer.commitAsync(); // 비동기로 Offset Commit
}
변경점
- 스레드 풀을 사용해 메시지를 비동기로 처리.
- 메시지 처리가 끝나지 않아도 다음 메시지를 가져와 처리 가능.
- Offset Commit은 commitAsync를 사용해 처리.
CompletableFuture 활용
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
CompletableFuture.runAsync(() -> processMessage(record))
.exceptionally(e -> {
e.printStackTrace();
// 예외 처리 로직
return null;
});
}
consumer.commitAsync(); // 비동기로 Offset Commit
}
- CompletableFuture는 Java의 비동기 프로그래밍 표준 API로, 간단한 비동기 처리에 적합
전환 시 고려사항
- 메시지 처리 순서
- 단일 파티션에서는 Kafka가 메시지의 순서를 보장한다.
- 비동기 처리로 전환하면 처리 속도에 따라 메시지 순서가 뒤바뀔 수 있음에 유의해야 한다.
- 순서가 중요한 경우에는 메시지를 순차적으로 처리하는 큐를 추가하거나, CompletableFuture와 thenRun으로 순서를 보장하는 방식이 필요하다.
- Offset Commit 방식
- commitSync: 동기적으로 Offset을 Commit하며, 실패 시 재시도.
- commitAsync: 비동기로 Commit하며, 실패 시 재시도를 하지 않음.
- 비동기 처리에서는 commitAsync를 사용하되, 처리 완료 후 Offset Commit 타이밍을 정확히 조정해야 중복 처리를 방지할 수 있다.
- 예외 처리
- 비동기 작업에서 예외가 발생하면 스레드 풀이나 CompletableFuture의 예외 처리가 필요하다.
- 예외가 발생했을 때 메시지를 다시 처리할지, DLQ(Dead Letter Queue)로 보낼지 결정해야 하다.
- 스레드 풀 크기 조정
- 스레드 풀의 크기는 메시지 처리 속도, 서버 리소스(CPU, 메모리) 등을 고려해 적절히 설정해야 하다.
- 너무 작은 경우 병목이 발생하고, 너무 큰 경우 서버 리소스가 부족할 수 있다.
- 모니터링 및 성능 테스트
- 비동기로 전환 후 처리 속도 및 리소스 사용량을 반드시 테스트.
- Kafka Consumer의 Lag(처리되지 않은 메시지 수)을 모니터링해 성능을 튜닝.
Kafka에서 중복 방지가 어려운 이유
- At-least-once 메시징
- Kafka Consumer는 기본적으로 메시지를 적어도 한 번(at-least-once) 처리하도록 설계되었다. 따라서 Consumer가 메시지 처리 중 실패하면 동일한 메시지가 다시 처리될 수 있다.
- 중복 메시지 가능성
- Producer가 메시지를 전송 중 실패할 경우, 재시도 로직으로 인해 동일한 메시지가 여러 번 전송될 수 있다.
- Kafka 자체는 메시지의 고유성을 보장하지 않는다.
Kafka만으로 중복 방지 처리
Kafka를 사용하는 경우, 중복 메시지 방지는 주로 키(key)를 활용하거나 메시지 처리 전략을 설계하여 구현한다.
- Producer에서 메시지 Key 활용
- Kafka 메시지의 Key를 고유한 값(예: 예약 ID)으로 설정하면, 동일한 Key를 가진 메시지가 Kafka 토픽의 동일한 파티션으로 전송된다.
- 장점: 파티션 내 순서를 보장하므로 최신 상태를 덮어쓸 수 있음.
- 한계: 단순히 Key를 사용하면 메시지 덮어쓰기일 뿐, 중복 검증 로직이 없으면 예약 처리 결과는 여전히 중복될 수 있음.
- Kafka 메시지의 Key를 고유한 값(예: 예약 ID)으로 설정하면, 동일한 Key를 가진 메시지가 Kafka 토픽의 동일한 파티션으로 전송된다.
- Idempotent Producer 설정
- Kafka Producer에서 Idempotence(멱등성) 설정을 활성화하면, 동일한 메시지를 여러 번 전송하더라도 Kafka가 중복 메시지를 제거한다.
- 설정: enable.idempotence=true
- 한계: Producer 수준에서 중복 제거만 보장하므로, Consumer에서 중복 검증은 여전히 필요하다.
- Kafka Producer에서 Idempotence(멱등성) 설정을 활성화하면, 동일한 메시지를 여러 번 전송하더라도 Kafka가 중복 메시지를 제거한다.
Kafka와 추가적인 중복 방지 로직
Kafka만으로 중복 방지 구현이 완벽하지 않으므로, 애플리케이션 레벨에서 추가적인 중복 방지 로직을 설계하는 것이 필요.
- 중복 확인을 위한 데이터 저장소 활용
- 예약 서비스에서는 중복 확인을 위해 **데이터 저장소(예: Redis, RDBMS)**를 활용하는 것이 일반적.
- 전략:
- 예약 ID 또는 고유 키(예: 사용자 ID + 예약 시간)를 저장소에 저장.
- Kafka 메시지를 처리하기 전에 저장소에서 중복 여부를 확인.
- 중복이 발견되면 메시지를 무시.
@Service public class ReservationService { private final RedisTemplate<String, String> redisTemplate; public boolean isDuplicateReservation(String reservationId) { Boolean isDuplicate = redisTemplate.opsForValue().setIfAbsent(reservationId, "processed", 1, TimeUnit.HOURS); return !Boolean.TRUE.equals(isDuplicate); // 중복인 경우 false 반환 } }
- Consumer에서 Idempotent 처리
- Consumer에서 메시지를 처리할 때, 처리된 메시지(Offset 또는 ID)를 저장해 중복을 방지.
- 전략:
- 메시지 처리 후 고유 키(예약 ID)를 데이터베이스나 캐시에 저장.
- 새 메시지가 들어올 때 이 키를 조회하여 중복 여부 확인.
- 중복이면 메시지 무시.
@Component public class KafkaReservationConsumer { private final ReservationService reservationService; @KafkaListener(topics = "reservations", groupId = "reservation-group") public void consumeReservationMessage(String message) { String reservationId = extractReservationId(message); // 메시지에서 예약 ID 추출 if (reservationService.isDuplicateReservation(reservationId)) { System.out.println("Duplicate reservation detected. Skipping..."); return; } processReservation(message); // 예약 처리 } }
추가 고려 사항
- Exactly-once 처리 보장
- Kafka는 기본적으로 at-least-once 메시징을 보장하지만, Exactly-once 처리를 구현하려면 Kafka Streams나 Kafka Connect와 같은 도구를 사용해 메시지 상태를 관리해야 한다.
- 예약 충돌 방지 로직
- 예약 중복 방지 외에도, 예약 시간이 겹치거나 리소스가 중복 예약되지 않도록 비즈니스 로직 수준의 검증이 필요하다.
- Dead Letter Queue (DLQ) 활용
- 중복 메시지나 비정상 메시지를 처리하지 않고 별도의 DLQ로 이동시켜 나중에 분석할 수 있도록 설계.
- Kafka 설정 활용: Key와 Idempotent Producer를 사용해 중복 메시지를 최소화.
- 애플리케이션 레벨 중복 방지: Redis, RDBMS, 캐시 등을 활용해 중복 검증 로직을 구현.
- Exactly-once 처리: 필요에 따라 Kafka Streams나 사후 검증 로직을 추가.
정리
- MSA 프로젝트에서 비동기 처리를 위해 Kafka를 채택하였는데 Kafka를 간단히 사용하는데에도 많은 것을 고려하고 처리해야 된다는 것을 배웠다.
'자바 심화 > TIL' 카테고리의 다른 글
동시성 제어 시점과 데이터 일관성 유지 관점 (1) | 2024.12.30 |
---|---|
Redis - Redisson (2) | 2024.12.27 |
DB Lock (1) | 2024.12.26 |
장애 대응 (1) | 2024.12.24 |
시큐어 코딩(Secure Coding) (0) | 2024.12.23 |