티스토리 뷰

 

시작하며

이전 글에서는 SSE를 선택한 이유와 기본 구현 방법을 다뤘습니다. 단일 서버 환경에서는 모든 것이 완벽하게 동작했지만, 실제 운영 환경은 달랐습니다. 로드밸런서 뒤에 여러 서버 인스턴스가 배포된 환경에서 예상치 못한 문제가 발생했습니다.

이번 글에서는 분산 환경에서 마주한 문제와 Redis Pub/Sub을 활용한 해결 과정을 공유합니다.


로드밸런싱(다중서버) 환경에서 발견한 문제

문제 상황

운영 환경은 다음과 같은 구조였습니다:

              [Load Balancer]
                    |
        +-----------+-----------+
        |                       |
   [Server A]            [Server B]
        |                       |
        +----------+------------+
                   |
            [RabbitMQ]

두 대의 서버가 로드밸런서 뒤에서 동작하고, 하나의 RabbitMQ 인스턴스를 공유하는 구조였습니다.

문제는 이렇게 발생했습니다:

  1. 가맹점주가 Server A에 SSE로 연결됨
  2. 고객이 결제를 완료
  3. RabbitMQ에 거래 메시지가 발행됨
  4. Server B의 리스너가 메시지를 소비
  5. Server B는 자신에게 연결된 클라이언트를 찾지만, 가맹점주는 Server A에 연결되어 있음
  6. 가맹점주는 거래 알림을 받지 못함 😱

왜 이런 일이?

RabbitMQ의 기본 동작 방식 때문입니다. 여러 컨슈머가 같은 큐를 구독하면 라운드로빈 방식으로 메시지를 분배합니다. 즉, 메시지가 Server A와 Server B에 번갈아가며 전달되는데, 클라이언트가 어느 서버에 연결되어 있는지는 고려하지 않습니다. 

거래 1: RabbitMQ → Server A (가맹점주는 Server B 연결) ❌
거래 2: RabbitMQ → Server B (가맹점주는 Server B 연결) ✅
거래 3: RabbitMQ → Server A (가맹점주는 Server B 연결) ❌

결과적으로 거래의 절반 정도만 가맹점주에게 전달되는 심각한 문제였습니다.


해결 방안 검토

이 문제를 해결하기 위해 여러 방안을 고민했습니다.

1. Sticky Session (검토 후 제외)

로드밸런서에서 특정 클라이언트를 항상 같은 서버로 라우팅하는 방식입니다.

문제점:

  • 해당 서버가 재시작되면 모든 연결이 끊김
  • 서버 간 부하 분산이 불균형해질 수 있음
  • RabbitMQ 메시지가 여전히 라운드로빈으로 분배됨

2. RabbitMQ Fanout Exchange (검토 후 제외)

모든 서버에 메시지를 브로드캐스트하는 방식입니다.

문제점:

  • 각 서버가 별도의 큐를 가져야 함
  • 기존에 있던 Exchange를 활용하는 것을 목표로 함
  • 메시지가 중복 처리될 수 있음
  • RabbitMQ 구조를 변경해야 함 (기존 시스템과의 호환성)

3. Redis Pub/Sub (채택!)

RabbitMQ에서 받은 메시지를 Redis Pub/Sub으로 재전송하여 모든 서버에 브로드캐스트하는 방식입니다.

장점:

  • 기존 RabbitMQ 구조 변경 불필요
  • 모든 서버가 메시지를 받을 수 있음
  • Redis는 이미 캐시용으로 사용 중이었음
  • 구현이 비교적 간단함

Redis Pub/Sub 구현

아키텍처 변경

[고객 결제]
     ↓
[결제 시스템]
     ↓
[RabbitMQ Queue]
     ↓
[Server A or B가 메시지 수신]
     ↓
[Redis Pub/Sub으로 재전송] ← 핵심 변경점
     ↓
[모든 서버가 Subscribe하여 수신]
     ↓
[각 서버가 자신의 SSE 클라이언트에게 전달]

이제 RabbitMQ에서 어느 서버가 메시지를 받든, Redis Pub/Sub을 통해 모든 서버에 전달됩니다.

Redis 설정

java
@Configuration
public class RedisConfig {
    
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, 
            new PatternTopic("transaction.*"));
        
        return container;
    }
    
    @Bean
    public MessageListenerAdapter listenerAdapter(
            RedisSubscriber subscriber) {
        return new MessageListenerAdapter(subscriber, "onMessage");
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(
            RedisConnectionFactory connectionFactory) {
        
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        
        return template;
    }
}

RabbitMQ 리스너 수정

RabbitMQ에서 메시지를 받으면 Redis로 재전송하도록 수정했습니다.

java
@Component
public class TransactionQueueListener {
    
    private final RedisTemplate<String, Object> redisTemplate;
    
    @RabbitListener(queues = "transaction.queue")
    public void handleTransaction(TransactionMessage message) {
        // Redis Pub/Sub으로 브로드캐스트
        String channel = "transaction." + message.getMerchantId();
        redisTemplate.convertAndSend(channel, message);
        
        log.info("Published transaction to Redis: merchantId={}, txId={}", 
            message.getMerchantId(), message.getTransactionId());
    }
}

Redis Subscriber 구현

모든 서버는 Redis 채널을 구독하여 메시지를 수신합니다.

java
@Component
@Slf4j
public class RedisSubscriber {
    
    private final ObjectMapper objectMapper;
    private final TransactionNotificationService notificationService;
    
    public void onMessage(String message, String pattern) {
        try {
            TransactionMessage transaction = 
                objectMapper.readValue(message, TransactionMessage.class);
            
            String merchantId = transaction.getMerchantId();
            
            log.info("Received transaction from Redis: merchantId={}, txId={}", 
                merchantId, transaction.getTransactionId());
            
            // 해당 가맹점에 연결된 SSE 클라이언트에게 전송
            notificationService.sendToMerchant(merchantId, transaction);
            
        } catch (JsonProcessingException e) {
            log.error("Failed to parse transaction message", e);
        }
    }
}

ConcurrentHashMap으로 가맹점별 필터링

각 서버는 자신에게 연결된 클라이언트 정보만 ConcurrentHashMap에 저장하고 있습니다.

java
@Service
@Slf4j
public class TransactionNotificationService {
    
    // 가맹점 ID → SSE Emitter 리스트 매핑
    private final ConcurrentHashMap<String, List<SseEmitter>> emitters = 
        new ConcurrentHashMap<>();
    
    public void sendToMerchant(String merchantId, TransactionMessage message) {
        List<SseEmitter> merchantEmitters = emitters.get(merchantId);
        
        // 이 서버에 해당 가맹점이 연결되어 있지 않으면 무시
        if (merchantEmitters == null || merchantEmitters.isEmpty()) {
            log.debug("No emitters found for merchant: {}", merchantId);
            return;
        }
        
        log.info("Sending to {} emitters for merchant: {}", 
            merchantEmitters.size(), merchantId);
        
        List<SseEmitter> deadEmitters = new ArrayList<>();
        
        for (SseEmitter emitter : merchantEmitters) {
            try {
                emitter.send(SseEmitter.event()
                    .name("transaction")
                    .data(message));
                    
                log.debug("Successfully sent transaction to emitter");
                
            } catch (IOException e) {
                log.warn("Failed to send to emitter, marking as dead", e);
                deadEmitters.add(emitter);
            }
        }
        
        // 전송 실패한 emitter 제거
        deadEmitters.forEach(emitter -> removeEmitter(merchantId, emitter));
    }
    
    // subscribe, removeEmitter 메서드는 1편과 동일
}

테스트 및 검증

로컬 테스트

로드밸런서 없이 두 개의 서버 인스턴스를 직접 실행하여 테스트했습니다:

 
bash
# Server 1
java -jar merchantApp.jar --server.port=8081

# Server 2
java -jar merchantApp.jar --server.port=8082

그리고 각 포트로 번갈아가며 SSE를 연결한 뒤, RabbitMQ에 메시지를 발행하여 모든 경우에 정상 동작하는지 확인했습니다.

 

결과는 성공적이었습니다. Redis Pub/Sub의 성능도 충분했고, 메시지 유실 없이 안정적으로 동작했습니다.


고려사항 및 트레이드오프

Redis Pub/Sub의 특징

장점:

  • 매우 빠른 메시지 전달 (밀리초 단위)
  • 간단한 구현
  • 수평 확장 가능 (서버를 추가해도 동일하게 동작)

단점:

  • 메시지 지속성 없음: Redis가 다운되면 메시지 유실 가능
  • 구독자가 없으면 메시지 버려짐: 모든 서버가 다운된 상태에서 발행된 메시지는 사라짐

왜 이 트레이드오프를 수용했나?

  1. 거래내역 조회 기능 존재: 가맹점 포털에서 언제든지 거래내역을 조회할 수 있음
  2. 실시간 알림은 부가 기능: 실시간으로 못 받아도 조회하면 되므로 치명적이지 않음
  3. 사내 모니터링 알림: Redis 장애 시 즉시 감지하고 대응 가능
  4. 구현 복잡도 vs 비즈니스 가치: 완벽한 메시지 보장보다 빠른 출시가 우선

실무에서는 이런 비즈니스 요구사항과 기술적 완성도 사이의 균형을 찾는 것이 중요합니다.


마치며

로드밸런싱 환경에서의 실시간 통신은 단일 서버와 완전히 다른 접근이 필요합니다. 이번 프로젝트를 통해 분산 시스템의 특성과 메시지 브로드캐스팅의 중요성을 경험할 수 있었습니다.

Redis Pub/Sub은 완벽한 해결책은 아니지만, 우리 프로젝트의 요구사항과 제약사항을 고려했을 때 가장 실용적인 선택이었습니다.

하지만 구현 과정에서 또 다른 문제가 있었습니다. 사내에서 공통으로 사용하는 LoggingFilter가 SSE와 충돌하는 문제였죠.

 

다음 글에서는 LoggingFilter 충돌 문제와 그 해결 과정, 그리고 기술 부채에 대한 고민을 다루겠습니다.

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/01   »
1 2 3
4 5 6 7 8 9 10
11 12 13 14 15 16 17
18 19 20 21 22 23 24
25 26 27 28 29 30 31
글 보관함