[Spring Boot] Kafka 기반 주문 서비스와 상품 서비스의 비동기 재고 차감
구조 시퀀스 다이어그램

재고 차감은 왜 비동기로?
재고 차감하는 로직에 있어서 카프카를 도입해 비동기로 메시지를 보내고 처리해야겠다라고 생각을 한 이유는
재고 차감이 동기식이라 하면 주문 처리하는 부분에 있어 처리 속도가 느려진다고 생각했다.
재고 차감이 조금만 오래 걸려도 주문 응답 자체가 느려진다라고 생각을 했고,
만약 주문 서비스에 트래픽이 몰리면 상품 서비스가 버티지 못할 수도 있을거라 생각했다.
주문 서비스는 스케일 아웃을 해둔 상태인데, 만약 상품 서비스는 확인하지 못하고 스케일 아웃 상태가 되지 않았을 수도 있다.
이 상황에서 트래픽이 몰리게 되었을 때 얘기하는건데, 동기 방식이면 동시에 들어오는 수백~수천 건의 재고 차감 요청을 받아야 한다.
동기 방식이다 보니 모두 처리될 때 까지 주문 API는 계속 대기 상태이다.
결국 하나라도 늦으면 전부 늦어진다.
그래서 이런 재고 차감 작업은 메시지 큐잉 방식인 카프카를 이용해서 해결하려고 해보았다.
동기 vs 비동기
| 구분 | 주문 생성(동기) | 재고 차감(비동기) |
| 목적 | 즉시 응답 필요 | 백그라운드 처리 적합 |
| 속도 | 상대적으로 느림 | 주문 응답과 분리되므로 빠름 |
| 장애 영향 | 상품/재고 서비스가 죽으면 실패 | Kafka 큐가 버퍼 역할 / 서비스 죽어도 대기 |
| 결합도 | 서비스 간 강한 결합 | 느슨한 결합 |
| 확장성 | 트래픽 몰림에 취약 | Consumer 늘리면 처리량 증가 |
| 데이터 처리 안정성 | 호출 순간 실패 → 바로 실패 | 큐에 남아 자동 재처리 가능 |
코드
- 상품 서비스
KafkaConsumerConfig
package com.example.catalogservice.infrastructure.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
KafkaConsumer
package com.example.catalogservice.infrastructure.kafka;
import com.example.catalogservice.domain.entity.CatalogEntity;
import com.example.catalogservice.infrastructure.repository.CatalogJpaRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final CatalogJpaRepository catalogJpaRepository;
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message : {}", kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {
});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
CatalogEntity entity = catalogJpaRepository.findByProductId((String)map.get("productId")).orElseThrow(
() -> new RuntimeException("product not found")
);
entity.updateStock((Integer)map.get("qty"));
catalogJpaRepository.save(entity);
}
}
- 주문 서비스
KafkaProducerConfig
package com.example.orderservice.infrastructure.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducer
package com.example.orderservice.infrastructure.kafka;
import com.example.orderservice.presentation.dto.request.OrderRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderRequest send(String topic, OrderRequest orderDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("kafka Producer sent data from Order microservice: {}", orderDto);
return orderDto;
}
}
그리고 주문 서비스에서 주문 생성 시 kafkaProducer를 호출해서 토픽이름과 변경할 데이터 값을 보내준다.

요약
재고 차감 까지 동기식으로 묶어버리게 되면 주문 API 전체 속도가 느려지고, 주문이 몰릴 때 상품 서비스가 동시에 들어오는 재고 감소 요청을 모두 즉시 처리해야 하므로 서비스 전체가 느려질 위험이 있다.
그래서 재고 차감은 kafka 같은 메시징을 사용해 비동기로 분리했고, 이로 인해 주문은 빠르게 끝나고 재고 차감은 뒤에서 처리가 된다.
기대해볼수 있는 것도 상품 서비스가 잠시 느리거나 장애가 발생해도 주문 서비스 자체는 영향을 받지 않을 것으로 기대가 되고, 메시지는 kafka에 남아있으므로 다시 복구되어도 이어서 처리할 수 있을거같다.
'BACKEND & SERVER > Spring Boot MSA' 카테고리의 다른 글
| [Spring Boot] MSA 분산 트랜잭션 처리 방식 비교 (0) | 2025.11.27 |
|---|---|
| [Spring Boot] Spring Kafka Consumer에서 JSON 처리하기 (0) | 2025.11.26 |
| [Spring Boot] FeignClient 사용 시 발생한 예외 처리 문제 (2) | 2025.11.19 |
| [Spring Boot] 스프링 부트 슬랙 API 연동하고 이메일 인증하기 (1) | 2025.11.11 |
| [Spring Boot] JitPack | MSA Project GateWay Swagger 구현 (0) | 2025.11.05 |
