반응형

 

 

구조 시퀀스 다이어그램


 

 

 

재고 차감은 왜 비동기로?


 

재고 차감하는 로직에 있어서 카프카를 도입해 비동기로 메시지를 보내고 처리해야겠다라고 생각을 한 이유는 

 

재고 차감이 동기식이라 하면 주문 처리하는 부분에 있어 처리 속도가 느려진다고 생각했다. 

재고 차감이 조금만 오래 걸려도 주문 응답 자체가 느려진다라고 생각을 했고, 

 

만약 주문 서비스에 트래픽이 몰리면 상품 서비스가 버티지 못할 수도 있을거라 생각했다. 

주문 서비스는 스케일 아웃을 해둔 상태인데, 만약 상품 서비스는 확인하지 못하고 스케일 아웃 상태가 되지 않았을 수도 있다. 

이 상황에서 트래픽이 몰리게 되었을 때 얘기하는건데, 동기 방식이면 동시에 들어오는 수백~수천 건의 재고 차감 요청을 받아야 한다. 

동기 방식이다 보니 모두 처리될 때 까지 주문 API는 계속 대기 상태이다. 

결국 하나라도 늦으면 전부 늦어진다. 

 

그래서 이런 재고 차감 작업은 메시지 큐잉 방식인 카프카를 이용해서 해결하려고 해보았다. 

 

 

동기 vs 비동기


 

 

구분 주문 생성(동기) 재고 차감(비동기)
목적 즉시 응답 필요 백그라운드 처리 적합
속도 상대적으로 느림 주문 응답과 분리되므로 빠름
장애 영향 상품/재고 서비스가 죽으면 실패 Kafka 큐가 버퍼 역할 / 서비스 죽어도 대기
결합도 서비스 간 강한 결합 느슨한 결합
확장성 트래픽 몰림에 취약 Consumer 늘리면 처리량 증가
데이터 처리 안정성 호출 순간 실패 → 바로 실패 큐에 남아 자동 재처리 가능

 

 

코드


 

- 상품 서비스 

 

KafkaConsumerConfig

package com.example.catalogservice.infrastructure.kafka;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
            = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }

}

 

KafkaConsumer 

package com.example.catalogservice.infrastructure.kafka;

import com.example.catalogservice.domain.entity.CatalogEntity;
import com.example.catalogservice.infrastructure.repository.CatalogJpaRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {

    private final CatalogJpaRepository catalogJpaRepository;

    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage) {
        log.info("Kafka Message : {}", kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {
            });
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        CatalogEntity entity = catalogJpaRepository.findByProductId((String)map.get("productId")).orElseThrow(
            () -> new RuntimeException("product not found")
        );

        entity.updateStock((Integer)map.get("qty"));
        catalogJpaRepository.save(entity);
    }
}

 

- 주문 서비스 

 

KafkaProducerConfig

package com.example.orderservice.infrastructure.kafka;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());
    }

}

 

KafkaProducer 

package com.example.orderservice.infrastructure.kafka;

import com.example.orderservice.presentation.dto.request.OrderRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderRequest send(String topic, OrderRequest orderDto) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(orderDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("kafka Producer sent data from Order microservice: {}", orderDto);

        return orderDto;
    }
}

 

그리고 주문 서비스에서 주문 생성 시 kafkaProducer를 호출해서 토픽이름과 변경할 데이터 값을 보내준다.

 

 

요약


 

재고 차감 까지 동기식으로 묶어버리게 되면 주문 API 전체 속도가 느려지고, 주문이 몰릴 때 상품 서비스가 동시에 들어오는 재고 감소 요청을 모두 즉시 처리해야 하므로 서비스 전체가 느려질 위험이 있다. 

 

그래서 재고 차감은 kafka 같은 메시징을 사용해 비동기로 분리했고, 이로 인해 주문은 빠르게 끝나고 재고 차감은 뒤에서 처리가 된다. 

기대해볼수 있는 것도 상품 서비스가 잠시 느리거나 장애가 발생해도 주문 서비스 자체는 영향을 받지 않을 것으로 기대가 되고, 메시지는 kafka에 남아있으므로 다시 복구되어도 이어서 처리할 수 있을거같다. 

 

728x90
반응형