ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • SCS(Spring Cloud Stream) 뽀개기2 - 스프링 가이드 무작정 따라하기
    책책책 책을 읽읍시다/프로그래밍 2024. 5. 27. 23:52

    들어가며

     우리 팀에서 각 MSA간 REST API 콜로 강결합되어 있던 코드들을 이벤트 드리븐 방식으로 하나씩 떼어내는 작업을 했다. 이 중 가장 큰 문제는 failover 처리가 전혀되어 있지 않다는 점이다. 아직 베타 단계라 트래픽이 적어서 드러나는 문제는 없지만 잠재적으로 이슈들이 터져나올 딱 좋은 환경이다. 프로젝트 완성도가 높지 않다는 변명만 해대고 있는 중이다 ㅠㅠ

     아무튼 그래서 최소한의 에러 처리는 해두기로 했다. 가장 쉬운 방법은 전 회사에서 처럼 컨슈머(kafka) 빈마다 try-catch로 감싸는 것이다. 여기에는 2가지 문제가 있다. 첫째, 컨슈머가 10개쯤은 되는데 다 try-catch로 감싸기 껄끄럽다. 센트리(Sentry)에도 알람을 보내야되고 ES 로그로 수집되게 로깅 작업도 해주어야 한다. 이 중복 로직에 하나라도 수정이 발생한다면 다 수정해야 한다. 둘째, 밀려오는 피처로 자기일 하기 바쁜 팀원들이 다 챙길수 없을 것 같다. 팀원들을 못 믿는게 아니라 누락이 발생할 여지가 크고 또 다른 관리포인트로 다가올 여지가 있어서 더 우아하게 처리하는게 좋겠다라는 생각이 들었다.

     그래서 생각한건 @ControllerAdvice처럼 전역 메세징 에러 핸들러를 만드는 것이었다. 커스텀 어노테이션을 만들고 이 어노테이션이 달린 곳에서 에러가 발생했을 때 pointCut을 통해 예외를 수집하면 될 것 같았다. 그런데 우리가 사용하고 있는 Spring Cloud Stream에서는 예외가 발생하면 이를 수신하는 별도의 레이어가 있어 내가 만든 핸들러에서 예외를 캐치하지 못하는 문제가 있었다. 서론이 길었는데 바로 여기서 내가 잘못 땜질하고 있다는 생각이 들었다. 우리 팀이 사용하고 있는 프레임워크에 대해 제대로 이해하지 못하고 있었던 것이다. 그래서 한번 딥다이브해보기로 했다.

     

     일단 간단한 애플리케이션부터 만들어보자. 참고로 Spring Cloud Stream이란 실시간 메세지 스트리밍을 위한 스프링 계열의 프레임워크다.

    스프링 클라우드 스트림 정의


    준비물

    https://start.spring.io 에 들어가서 아래와 같이 템플릿을 만들자.

    스프링 이니셜라이저로 SCS 프로젝트 만들기

     

    인텔리제이에 임포트하자.

    의존성 설정

     

    RabbitMQ도 docker로 설치하자.

    docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

    Rabbit MQ 띄우기

     

    이제 개발을 위한 준비는 다 되었다!!!


    만드려는 애플리케이션 아키텍처 개요

    전반적인 아키텍처

    하나 하나 읽어보자.

    3개의 애플리케이션을 만들 것이다.

    • name-source : 일정 주기로 임의의 문자열 메세지를 publish한다. SCS에서 생산자(producer)를 source라고 부른다.
    • name-processor : source가 발행한 메세지를 가공한다. SCS에서는 가공 역할을 하는 빈을 processor라고 부른다.
    • name-sink : prosessor가 가공한 메세지를 읽어 출력한다.

    source, rpocessor, sink는 아래와 같이 자바8의 함수형 인터페이스와 매칭된다.

    SCS Java
    Source Supplier
    Processor Function
    Sink Consumer

    Sink 만들기

    NameSinkConfiguration과 Person 클래스

    @Configuration
    public class NameSinkConfiguration {
        @Bean
        public Consumer<Person> nameSink() {
            return person -> {
                System.out.println(person.name());
                System.out.println(person.processedTimestamp());
            };
        }
    }
    
    public record Person(String name, Long processedTimestamp) {
    }

     

    application.yaml

    spring:
      application:
        name: demo
      cloud:
        stream:
          function:
            bindings:
              nameSink-in-0: sinkinput
    
    server:
      port: 0

     

    이제 어플리케이션을 실행시키자. 그리고 Sink 테스트를 위해 RabbitMQ 관리자 페이지에서 임의로 메세지를 발행해볼 것이다.

    http://localhost:15672로 들어가자. 그리고 username과 password는 guest/guest로 세팅되어 있으니 로그인해보자.

     

    상단의 Exchanges에 가면 application.yaml에 설정해둔 토픽인 sinkinput이 보인다. 클릭해서 publish message를 열어 메세지를 보내보자.

    RabbitMQ에서 메세지 보내기

     

    아래와 같이 애플리케이션 콘솔에 출력되는지 확인하자.

    Person 메세지를 받아 출력

    Json 형식의 메세지가 Person 객체로 직렬화되어 Consumer에 의해 정상적으로 소비되었다.


    Processor 만들기

    그래들로 모듈 하나 더 추가했다.

    name-processor

    NameProcessorConfiguration

    @Configuration
    public class NameProcessorConfiguration {
        
        @Bean
        public Function<String, Person> processName() {
            return name -> new Person(name, new Date().getTime());
        }
    }

     

    application.yaml - processor out의 destination이 sink의 in과 일치해야 한다. 그래야 가공한걸 sink가 받아 처리할 수 있으니까 말이다.

    spring:
      application:
        name: processor
      cloud:
        stream:
          function:
            bindings:
              processName-in-0: processorinput
              processName-out-0: sinkinput
    
    server:
      port: 1 # 포트는 다르게 설정해주자

     

    애플리케이션을 구동하고 RabbitMQ 관리자 페이지로 들어가면 processorinput이라는 exchange가 생성된 걸 볼 수 있다.

    processorinput에 메세지 발행해보기

    이제 processorinput에 임의의 String 메세지를 발행해보자.

    Sink까지 메세지 도달

    procssorinput에서 발행한 메세지가 name-processor를 거쳐 sink에서 출력된 걸 볼 수 있다.


     

    Source 만들기

    모듈을 하나 더 추가해주자.

    Source 모듈

    NameSourceConfiguration

    @Configuration
    public class NameSourceConfiguration {
    
        @Bean
        public Supplier<String> supplyName() {
            return () -> "Juhyun Oh";
        }
    }

    application.yaml - source의 out 목적지는 역시 processor의 input과 같아야한다.

    spring:
      application:
        name: processor
      cloud:
        stream:
          function:
            bindings:
              supplyName-out-0: processorinput
    
    server:
      port: 3

     

    이제 source 애플리케이션을 실행 시키면 processor를 거쳐 sink까지 메세지가 전달되어 콘솔에 출력된다.

    source -> processor -> sink로 메세지 전달


    마무리

    메세지 브로커와 스프링 애플리케이션이 이렇게 쉽게 통합되다니 추상화를 정말 잘 시켜놓은 것 같다. 물론 세부적인 설정은 더 해야겠지만 이전에 작업했던 것과 비교하면 수월해진 건 맞다. 일단 간단한 예제는 만들어 봤는데 아직 에러 핸들링까지 넘을 산이 많다. 퇴근하고 시간 나는대로 문서(https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html) 좀 읽어보고 에러 핸들링 예제까지 만들어보자.

     

     

     

     

     

     

    댓글

Designed by Tistory.