ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • SCS(Spring Cloud Stream) 뽀개기2
    카테고리 없음 2024. 6. 5. 23:21

    들어가며

    저번(https://baby-care-dev.tistory.com/69)에 간단한 Spring Cloud Stream(줄여서 SCS) 기반 애플리케이션을 만들어 보았다. 이번에는 SCS 전반적인 내용에 대해 살펴보자. 스프링 공식 문서(https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference)를 참조하였다.


    프로그래밍 모델

    아래 그림이 SCS를 가장 간단하고 알기 쉽게 표현해준다고 생각한다. SCS의 핵심 기능은 메세지 브로커와의 연결을 추상화하여 비즈니스 로직에 집중할 수 있는 모델을 제공하는 것이다. JDBC나 JPA처럼 외부 시스템과 독립적으로 코드를 운영할 수 있다. 외부 메세지 브로커가 Rabbit MQ건 Kafka이건 상관없이 추상화 안쪽의 코드는 바뀌지 않는다.

    SCS 간단한 아키텍처

    추상화는 3가지 모델로 이루어진다. Destination Binder, Bindings, Message이다.

    Destination Binder

    외부 메세지 브로커와의 같은 통합을 담당한다. 여기에는 연결, 위임, 메세지 라우팅, 데이터 타입 변환, 코드 실행과 같은 기능이 있다.

    Bindings

    Binder보다 더 구체적으로 메세지 브로커의 채널과 어플리케이션과의 가교 역할을 한다. Kafka라면 Topic에 대한 producer, consumer 설정이고, Rabbit MQ라면 exchange에 대한 연결이다. 

    spring.cloud.stream.bindings."functionName"로 설정한다.

    코드를 보자.

    spring:
      cloud:
        stream:
          bindings:
            uppercase-in-0:
              destination: my-topic
    @SpringBootApplication
    public class SampleApplication {
    
    	@Bean
    	public Function<String, String> uppercase() {
    	    return value -> value.toUpperCase();
    	}
    }

    여기에서 uppercase라는 Function이 functionName이고 실행될 코드이다. 뒤에 붙은 in-0은 SCS 네이밍 컨벤션이다. 아래와 같은 규칙을 따른다.

    • input : <functionName> + -in- + <index>
    • output : <functionName> + -out- + <index>

    중간에 in과 out은 input/output으로 대응되고, index는 바인딩의 인덱스인데 한 Function에 바인딩이 여러개일 때 0,1,2가 의미가 있고 대부분 싱글 바인딩 구조에서는 0으로 지정하면 된다.

    destination은 메세지 브로커의 토픽이다. 종합하면 kafka 기준으로 my-topic이라는 토픽을 uppercase() Function이 컨슈밍한다는 의미이다.

    Message

    외부 시스템과 애플리케이션이 주고받을 메세지이다. 메세지를 생산하고 소비하는 행위가 있는데 각각 두가지가 있다.(더 있을지도?)

    • 소비 : Consumer와 Function이 있다. input 바인딩을 통해 메세지를 받는다. 차이점은 Consumer는 소비만한다. 즉, void 타입이다. 반면 Function은 메세지를 소비해서 새로운 메세지를 output 바인딩으로 전송하는 점이다.
    • 생산 : Supplier와 SteamBridge가 있다. Supplier는 프레임워크에 의해 무한히 메세지를 생산한다. 어떤 용례가 있는지는 잘 모르겠다. 당장 생각나는건 특정 시간에 특정 동작을 의도하는 트리거를 만드느 정도이다. SteamBridge는 사용자가 직접 호출하여 메세지를 생산한다. 일반적으로 카프카 프로듀서에 메세지를 전송할 때의 경우라고 보면 된다. 코드를 보자.

    Supplier는 아래처럼 외부에서 invocation하는게 아니라 그냥 계속 메세지를 생산해서 보낸다. 기본 값은 1초 주기이고 설정을 바꿀 수 있다.

    @SpringBootApplication
    public static class SupplierConfiguration {
    
    	@Bean
    	public Supplier<String> stringSupplier() {
    		return () -> "Hello from Supplier";
    	}
    }

     

    SteamBridge는 일반적인 웹 어플리케이션에서 봐왔던 것과 비슷하다. 예를들어 /api/some-resource로 요청이 오면 이를 DB에 저장하고 관련된 도메인 이벤트를 steamBridge.send를 통해 output 바인딩으로 전송하는 경우이다. toStream에 output 바인딩 이름이 들어간다. uppercase-out-0과 같은걸 넣어주면 된다.

    @SpringBootApplication
    @Controller
    public class WebSourceApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
    	}
    
    	@Autowired
    	private StreamBridge streamBridge;
    
    	@RequestMapping
    	@ResponseStatus(HttpStatus.ACCEPTED)
    	public void delegateToSupplier(@RequestBody String body) {
    		System.out.println("Sending " + body);
    		streamBridge.send("toStream", body);
    	}
    }

    그 외 알아두면 좋을 것들

    Batch Consumers / Batch Producsers

    메세지를 List로 여러 개 받을 수 있는 기능이다. 카프카에서 batchSize를 설정할 수 있는데 같이 활용하면 좋을 것 같다. spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode를 true로 설정하면 된다.

    @Bean
    public Function<List<Person>, Person> findFirstPerson() {
        return persons -> persons.get(0);
    }

     

    프로듀서도 있는데, List로 전송하긴하는데, 이게 binder에 정말 List로 전송되는게 아니고 개별적으로 전송된다. 

    @Bean
    public Function<String, List<Message<String>>> batch() {
    	return p -> {
    		List<Message<String>> list = new ArrayList<>();
    		list.add(MessageBuilder.withPayload(p + ":1").build());
    		list.add(MessageBuilder.withPayload(p + ":2").build());
    		list.add(MessageBuilder.withPayload(p + ":3").build());
    		list.add(MessageBuilder.withPayload(p + ":4").build());
    		return list;
    	};
    }

     

    에러 핸들링

    기본적으로 2개의 에러 핸들러를 제공한다. 첫번째는 단순히 로그를 찍는 핸들러이다. 두번째는 바인더별(또는 공통으로) DLQ로 보내거나 retry를 하는 등의 실질적인 동작을 하는 핸들러이다. 에러 핸들링을 별도로 하지 않으면 첫번째 핸들러에 의해 로그만 찍히고 메세지는 드랍된다.

    간단한 에러 핸들러를 아래와 같이 정의해보자.

    @Bean
    public Consumer<ErrorMessage> myErrorHandler() {
    	return v -> {
    		// send SMS notification code
    	};
    }

    주석에 슬랙으로 에러를 전송하는 코드같은게 들어가면 된다. 그럼 myErrorHandler라는 메서드는 어떻게 식별하게 해줄지 알아야 한다. spring.cloud.stream.bindings.<binding-name>.error-handler-definition=<핸들러 이름>으로 한다. 아래 예시를 보자.

    spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler

    저렇게 바인딩별로 에러 핸들러를 세밀하게 구성할 수 있지만 대부분은 글로벌 에러 핸들러를 쓰고 싶어할 것이다(나처럼). 그럼 아래 설정처럼 하면 된다.

    spring.cloud.stream.default.error-handler-definition=myErrorHandler

     

    바인딩 설정들

    org.springframework.cloud.stream.config.BindingProperties 클래스에 정의되어 있다.

    input/output 모두 spring.cloud.stream.bindings.<bindingName>. 까지 공통 접두어이다. spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock 처럼 말이다. default 설정을 공통으로 하고 싶을 땐 spring.cloud.stream.default 접두어를 사용하면 된다. spring.cloud.stream.default.contentType=application/json으로 하면 메세지 공통 타입을 json으로 한다는 의미이다.

     

    consumer/producer 공통으로 아래의 4가지 설정이 쓰인다.

    • destination : Kafka의 topic이나 Rabbit MQ의 exchange이다. 여러 개의 destination을 컴마(,)로도 설정할 수 있다. destnation=foo,bar 처럼 말이다.
    • group : 컨슈머에서만 설정하면 되고 컨슈머 그룹을 명시한다. default는 null이고 컨슈머인데 group을 설정 안하면 anonymous consumer로 동작한다.
    • contentType : 메세지의 컨텐츠 타입으로 default는 application/json 이다.
    • binder : 멀티 바인더를 쓸때 설정하면 되고 default는 null이다.

    consumer 전용 설정이다. spring.cloud.stream.bindings.<bindingName>.consumer.로 시작한다.

    컨슈머 전역적으로 설정하려면 spring.cloud.stream.default.consumer 접두어를 사용하면 된다. 다 적진 않았고 필요하면 문서 보면 될 것 같다.

    • autoStartup : 어플리케이션 시작할 때 컨슈머가 자동으로 시작된다는 의미로 default true이다.
    • maxAttempts : 에러 났을 때 메세지 retry를 얼마나 할 것인가로 default는 3이다. 1로하면 retry를 안한다.

    producer 전용 설정으로 spring.cloud.stream.bindings.<bindingName>.producer. 접두어를 사용한다. 전역 설정 역시 spring.cloud.stream.default.producer로 하면 된다.

    • autoStartup : consumer와 동일하게 자동 시작을 의미한다.
    • partitionKeyExpression : SpEL을 사용하며, 설정하였을 경우 outbound 데이터가 파티셔닝된다. 단, partitionCount가 1보다 크게 설정되어 있어야 효과가 있다.
    • partitionKeyExtractorName : 파티션키를 복잡하게 설정할 경우 해당 키 설정을 담당하는 bean 이름을 적는 곳이다.
    • partitionKeySelectorExpression : 파티션키 할당 전략이 별도로 있을 경우 해당 전략을 담당하는 bean 이름을 적는 곳이다. 설정되어 있지 않으면 hashCode(key) & partitionCount 공식으로 각 파티션에 파티셔닝된 메세지들이 할당된다.
    • partitionCount : 토픽 partition 개수를 설정한다. default는 1이다.

    마치며

    영어 문서는 다 봤는데 이걸 정리하려니까 한글로 된 자료보다 훨씬 어려웠다. 그래서 퀄리티도 만족스럽진 않지만 한번 더 정리하면 회사 내부에는 공유할만한 정도는 만들 수 있을 것 같다. 스프링에서 출간한 자료를 처음부터 끝까지 읽은 적은 처음인데 나름 괜찮은 것 같다. 이후에는 코드 레벨에서 좀 더 분석하는 기회가 있으면 성장하는데 도움이 될 것 같다.

    댓글

Designed by Tistory.