ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 카프카 핵심 가이드 3 : 컨슈머
    책책책 책을 읽읍시다 2023. 6. 11. 01:23

    카프카 컨슈머: 개념


    컨슈머와 컨슈머 그룹

     카프카 토픽으로부터 메세지를 읽어서 몇 가지 검사를 한 후, 다른 데이터 저장소에 저장하는 애플리케이션을 개발해야 한다고 가정해 보자. 이 경우 애플리케이션은 컨슈머 객체(KafkaConsumer 인스턴스)를 생성하고, 해당 토픽을 구독하고, 메세지를 받기 시작한 뒤 받은 메세지를 받아 검사하고 결과를 써야 한다. 한동안이야 잘 작동하겠지만, 만약 프로듀서가 애플리케이션이 검사할 수 있는 속도보다 더 빠른 속도로 토픽에 메세지를 쓰게 된다면 어떻게 될까? 만약 데이터를 읽고 처리하는 컨슈머가 하나뿐이라면 애플리케이션은 새로 추가되는 메세지의 속도를 따라잡을 수 없기 때문에 메세지 처리가 계속해서 뒤로 밀리게 될 것이다. 따라서, 우리는 토픽으로부터 데이터를 읽어 오는 작업을 확장(scale)할 수 있어야 한다. 여러 개의 프로듀서가 동일한 토픽에 메세지를 쓰듯이, 여러 개의 컨슈머가 같은 토픽으로부터 데이터를 분할해서 읽어올 수 있게 해야 하는 것이다.

     카프카 컨슈머는 보통 컨슈머 그룹(consumer group)의 일부로서 작동한다. 동일한 컨슈머 그룹에 속한 여러 개의 컨슈머들이 동일한 토픽을 구독할 경우, 각각의 컨슈머는 해당 토픽에서 서로 다른 파티션의 메세지를 받는 것이다.

     네 개의 파티션을 갖는 T1이라는 토픽이 있다고 생각해 보자. 그리고 G1 컨슈머 그룹에 속한 유일한 컨슈머인 C1을 생성해서 T1 토픽을 구독했다고 가정해보자. 컨슈머 C1은 T1 토픽의 네 파티션 모두에서 모든 메세지를 받게 될 것이다.

    네 개의 파티션과 하나의 컨슈머

     이제 G1에 새로운 컨슈머 C2를 추가한다. 이제 각각의 컨슈머는 2개의 파티션에서 메세지를 받으면 된다. 예를 들어, 아래 그림에서는 C1이 파티션 0과 2를, C2가 1과 3을 맡았다.

    네 개의 파티션과 같은 컨슈머 그룹의 두 컨슈머

     만약 G1에 컨슈머가 4개 있다면, 아래와 같이 각각의 컨슈머가 하나의 파티션에서 메세지를 읽어오게 된다.

    각각 하나의 파티션을 할당받은 같은 컨슈머 그룹의 네 컨슈머

     만약 하나의 토픽을 구독하는 하나의 컨슈머 그룹에 파티션 수보다 더 많은 컨슈머를 추가한다면, 컨슈머 중 몇몇은 유휴 상태가 되어 메세지를 전혀 받지 못한다.

    파티션 개수보다 컨슈머 그룹에 속한 컨슈머가 더 많을 때 유휴 컨슈머가 발생한다.

     이처럼 컨슈머 그룹에 컨슈머를 추가하는 것은 카프카 토픽에서 읽어오는 데이터 양을 확장하는 주된 방법이다. 카프카 컨슈머가 지연 시간이 긴 작업(데이터베이스에 쓴다던가, 데이터에 대해 시간이 오래 걸리는 연산을 수행한다던가)을 수행하는 것은 흔하다. 이러한 경우, 하나의 컨슈머로 토픽에 들어오는 데이터의 속도를 감당할 수 없을 수도 있기 때문에 컨슈머를 추가함으로써 단위 컨슈머가 처리하는 파티션과 메세지의 수를 분산시키는 것이 일반적인 규모 확장 방식이다. 이것은 토픽을 생성할 때 파티션 수를 크게 잡아주는 게 좋은 이유이기도 한데, 부하가 증가함에 따라서 더 많은 컨슈머를 추가할 수 있게 해주기 때문이다. 토픽에 설정된 파티션 수 이상으로 컨슈머를 투입하는 것이 아무 의미 없다는 점을 명심하라(몇몇 컨슈머는 그냥 놀게 될 것이다). 

     한 애플리케이션의 규모를 확장하기 위해 컨슈머 수를 늘리는 경우 이외에도 여러 애플리케이션이 동일한 토픽에서 데이터를 읽어와야 하는 경우 역시 매우 흔하다. 사실, 카프카의 주 디자인 목표 중 하나는 카프카 토픽에 쓰여진 데이터를 전체 조직 안에서 여러 용도로 사용할 수 있도록 만드는 것이 있다. 이러한 경우 우리는 각각의 애플리케이션이 전체 메세지의 일부만 받는 게 아니라 전부 다 받도록 해야 한다. 그리고 이렇게 하려면, 애플리케이션이 각자의 컨슈머 그룹을 갖도록 해야 한다. 다른 전통적인 메세지 전달 시스템과는 다르게 카프카는 성능 저하 없이 많은 수의 컨슈머와 컨슈머 그룹으로 확장이 가능하다.

     앞의 예에서 만약 하나의 컨슈머를 갖는 새로운 컨슈머 그룹 G2를 추가하게 된다면 이 컨슈머는 G1 컨슈머 그룹에서 무엇을 하고 있든지 상관없이 T1 토픽의 모든 메세지를 받게 된다. G2 역시 (G1과 마찬가지로) 2개 이상의 컨슈머를 가질 수 있는데, 이 경우 G1과 마찬가지로 각각의 컨슈머는 전체 파티션을 나눠서 할당(assign)받게 된다. 하지만, G2 전체를 놓고 보면 다른 컨슈머 그룹과는 상관없이 여전히 전체 메세지를 받게 된다.

    새로운 컨슈머 그룹을 추가하면 두 그룹 모두 모든 메세지를 받게 된다

     지금까지 이야기한 내용을 요약하면 다음과 같다. 즉, 1개 이상의 토픽에 대해 모든 메세지를 받아야 하는 애플리케이션별로 새로운 컨슈머 그룹을 생성한다. 토픽에서 메세지를 읽거나 처리하는 규모를 확장하기 위해서는 이미 존재하는 컨슈머 그룹에 새로운 컨슈머를 추가함으로써 해당 그룹 내의 컨슈머 각각이 메세지의 일부만을 받아서 처리하도록 한다.

     

    카프카 컨슈머 생성하기


     카프카 레코드를 읽어오기 위한 첫 번째 단계는 KafkaConsumer 인스턴스를 생성하는 것이다. KafkaConsumer 인스턴스를 생성하는 것은 KafkaProducer 인스턴스를 생성하는 것과 매우 비슷한데, 컨슈머에 넘겨주고자 하는 설정을 담은 Java Properties 객체를 생성하면 되는 것이다. 모든 속성들에 대해서는 이 장의 뒤쪽에서 자세히 살펴볼 것이다. 우선, 반드시 지정해야만 하는 속성은 3개뿐으로 bootstrap.servers, key.deserializer, value.deserialzer다.

     첫 번째 속성인 bootstrap.servers는 카프카 클러스터로의 연결 문자열이다. 사용 방법은 KafkaProducer와 동일하다. 다른 두 속성, key.deserializer와 value.deserializer는 프로듀서의 시리얼라이저와 비슷하지만, 자바 객체를 바이트 배열로 변환하는 클래스를 지정하는 게 아니라 바이트 배열을 자바 객체로 변환하는 클래스를 지정한다.

     엄격히 말해서 반드시 지정해야만 하는 것은 아니지만, 매우 일반적으로 사용되는 네 번째 속성이 있다. KafkaConsumer 인스턴스가 속하는 컨슈머 그룹을 지정하는 group.id 속성이다. 어떤 컨슈머 그룹에도 속하지 않는 컨슈머를 생성하는 것이 가능하기는 하지만 일반적인 것은 아니기 때문에 예제의 컨슈머는 특정 컨슈머 그룹의 일부라고 보면 된다.

     다음의 코드는 KafkaConsumer를 생성하는 방법을 보여준다.

    Properties props = new Properties();
    props.put("bootstrap.servers", "broker1:9092,brkoer2:9092");
    props.put("group.id", "CountryCounter");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

     

    토픽 구독하기


     컨슈머를 생성하고 나서 다음으로 할 일은 1개 이상의 토픽을 구독하는 것이다. subscribe() 메서드는 토픽 목록을 매개변수로 받기 때문에 사용법이 꽤나 간단하다.

    consumer.subscribe(Collections.singletonList("costomerCountries"));

     

    폴링 루프


     컨슈머 API의 핵심은 서버에 추가 데이터가 들어왔는지 폴링하는 단순한 루프다. 컨슈머 애플리케이션의 주요 코드는 다음과 같다.

    while (true) { //1
        ConsumerRecords<String, String> records = consumer.poll(timeout); //2
    
        for (ConsumerRecord<String, String> record : records) { //3
            System.out.printf("topic = %s, partition = %d, offset = %d, "
                    + "customer = %s, country = %s\n",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
            int updatedCount = 1;
            if (custCountryMap.containsKey(record.value())) {
                updatedCount = custCountryMap.get(record.value()) + 1;
            }
            custCountryMap.put(record.value(), updatedCount);
    
            JsonObject json = new JsonObject(custCountryMap);
            System.out.println(json.toString()); //4
        }
    }
    1. 이 루프는 무한 루프이기 때문에 종료되지 않는다. 컨슈머 애플리케이션은 보통 계속해서 카프카에 추가 데이터를 폴링하는, 오랫동안 돌아가는 애플리케이션이다.
    2. 가장 중요한 코드다. 바닷속의 상어가 계속해서 움직이지 않으면 죽듯이, 컨슈머는 카프카를 계속해서 폴링하지 않으면 죽은 것으로 간주되어 이 컨슈머가 읽어오고 있던 파티션들은(처리를 계속하기 위해) 그룹 내 다른 컨슈머에게 넘겨진다. poll()에 전달하는 매개변수는 컨슈머 버퍼에 데이터가 없을 경우 poll()이 블록될 수 있는 최대 시간을 결정한다. 만약 이 값이 0으로 지정되거나 버퍼 안에 이미 레코드가 준비되어 있을 경우 poll()은 즉시 리턴된다. 그게 아닐 경우 지정된 밀리초만큼 기다린다.
    3. poll()은 레코드들이 저장된 List 객체를 리턴한다. 각각의 레코드는 레코드가 저장되어 있던 토픽, 파티션, 파티션에서의 오프셋 그리고 (당연하게도) 키값과 밸류값을 포함한다. 통상적으로 우리는 이 List를 반복해 가며 각각의 레코드를 하나씩 처리한다.
    4. 처리가 끝날 때는 결과물을 데이터 저장소에 쓰거나 이미 저장된 레코드를 갱신한다. 여기서는 국가별 고객 수를 (계속해서) 집계하는 것이 목적이므로 해시 테이블을 갱신하고 결과를 JSON 형식으로 출력한다. 좀 더 현실에 가까운 예제라면 데이터 저장소 안의 결과물을 갱신할 것이다.

     이 폴링 루프는 단순한 데이터를 가져오는 것보다 훨씬 더 많은 일을 한다. 새 컨슈머에서 처음으로 poll()을 호출하면 컨슈머는 GroupCoordinator를 찾아서 컨슈머 그룹에 첨가하고, 파티션을 할당받는다. 리밸런스 역시 연관된 콜백들과 함께 여기서 처리된다. 즉, 컨슈머 혹은 콜백에서 뭔가 잘못될 수 있는 거의 모든 것들은 poll()에서 예외의 형태로 발생되는 것이다.

     poll()이 max.poll.interval.ms에 지정된 시간 이상으로 호출되지 않을 경우, 컨슈머는 죽은 것으로 판정되어 컨슈머 그룹에서 퇴출된다는 점을 명심하라. 따라서 폴링 루프 안에서 예측 불가능한 시간 동안 블록되는 작업을 수행하는 것은 피해야 한다.

    스레드 안정성

     하나의 스레드에서 동일한 그룹 내에 여러 개의 컨슈머를 생성할 수는 없으며, 같은 컨슈머를 다수의 스레드가 안전하게 사용할 수도 없다. 하나의 스레드당 하나의 컨슈머, 이것이 원칙이다. 하나의 애플리케이션에서 동일한 그룹에 속하는 여러 개의 컨슈머를 운용하고 싶다면 스레드를 여러 개 띄워서 각각의 컨슈머를 하나씩 돌리는 수밖에 없다. 컨슈머 로직을 자체적인 객체로 감싼 다음 자바의 ExecutorService를 사용해서 각자의 컨슈머를 가지는 다수의 스레드를 시작시키면 좋다. 이러한 방식을 구현하는 방법에 대한 튜토리얼(https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/)이 있다.

     또 다른 방법으로는 이벤트를 받아서 큐에 넣는 컨슈머 하나와 이 큐에서 이벤트를 꺼내서 처리하는 여러 개의 워커 스레드(Worker Thread)를 사용하는 것이다. 이 패턴에 대한 예제는 Igo Buzatovic의 글(https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/)에서 살펴볼 수 있다.

     

    컨슈머 설정하기


     대부분의 매개변수는 합리적인 기본값을 가지고 있기 떄문에 딱히 변경할 필요는 없다. 하지만 몇몇 매개변수는 컨슈머의 성능과 가용성에 영향을 준다. 상대적으로 조금 더 중요한 속성들에 대해서 알아보자.

    fetch.min.bytes

     이 속성은 컨슈머가 브로커로부터 레코드를 얻어올 때 받는 데이터의 최소량(바이트)를 지정할 수 있게 해 준다(기본값은 1바이트). 만약 브로커가 컨슈머로부터 레코드 요청을 받았는데 새로 보낼 레코드의 양이 fetch.min.bytes보다 작을 경우, 브로커는 충분한 메세지를 보낼 수 있을 때까지 기다린 뒤 컨슈머에게 레코드를 보내준다. 이것은 토픽에 새로운 메세지가 많이 들어오지 않거나 하루 중 쓰기 요청이 적은 시간대일 때와 같은 상황에서 오가는 메세지 수를 줄임으로써 컨슈머와 브로커 양쪽에 대해 부하를 줄요주는 효과가 있다. 만약 읽어올 데이터가 그리 많지 않을 때 컨슈머가 CPU 자원을 너무 많이 사용하고 있거나 컨슈머 수가 많을 때 브로커의 부하를 줄여야 할 경우 이 값을 기본값보다 더 올려잡아 주는 게 좋다. 단, 이 값을 증가시킬 경우 처리량이 적은 상황에서 지연 또한 증가할 수 있음을 명심하라.

    fetch.max.wait.ms

     fetch.min.bytes를 설정함으로써 카프카가 컨슈머에게 응답하기 전 충분한 데이터가 모일 떄까지 기다리도록 할 수 있다. fetch.max.wiat.ms는 얼마나 오래 기다릴 것인지를 결정한다. 기본적으로 카프카는 500밀리초를 기다리도록 되어 있다. 결과적으로 카프카는 토픽에 컨슈머에게 리턴할 데이터가 부족할 경우 리턴할 데이터 최소량 조건을 맞추기 위해 500밀리초까지 기다리게 되는 것이다. 잠재적인 지연을 제한하고 싶을 경우(대개 애플리케이션의 최대 지연에 관련된 SLA 때문인 경우가 많다). fetch.max.wait.ms를 더 작게 잡아주면 된다. 만약 fetch.amx.wait.ms를 100밀리초로, fetch.min.bytes를 1MB로 잡아줄 경우, 카프카는 컨슈머로부터의 읽기(fetch) 요청을 받았을 때 리턴할 데이터가 1MB 이상 모이거나 100밀리초가 지나거나, 두 조건 중 하나가 만족되는 대로 리턴하게 된다.

    fetch.max.bytes

     이 속성은 컨슈머가 브로커를 폴링할 때 카프카가 리턴하는 최대 바이트 수를 지정한다(기본값은 50MB). 이것은 컨슈머가 서버로부터 받은 데이터를 저장하기 위해 사용하는 메모리의 양을 제한하기 위해 사용된다(얼마나 많은 파티션으로부터 얼마나 많은 메세지를 받았는지와는 무관하다). 브로커가 컨슈머에 레코드를 보낼 때는 배치 단위로 보내며, 만약 브로커가 보내야 하는 첫 번째 레코드 배치의 크기가 이 설정값을 넘길 경우, 제한값을 무시하고 해당 배치를 그대로 전송한다. 이것은 컨슈머가 읽기 작업을 계속해서 진행할 수 있도록 보장해 준다. 브로커 설정에도 최대 읽기 크기를 제한할 수 있게 해주는 설정이 있다는 점을 짚고 넘어갈 필요가 있겠다. 대량의 데이터에 대한 요청은 대량의 디스크 읽기와 오랜 네트워크 전송 시간을 초래하여 브로커 부하를 증가시킬 수 있기 때문에, 이러한 사태를 막기 위해서 브로커 설정을 사용할 수 있다.

    max.poll.records

     이 속성은 poll()을 호출할 때마다 리턴되는 최대 레코드 수를 지정한다. 애플리케이션이 폴링 루프를 반복할 때마다 처리해야 하는 레코드의 개수('크기'가 아니라)을 제어하려면 이 설정을 사용하면 된다.

     

    오프셋과 커밋

     poll()을 호출할 떄마다 카프카에 쓰여진 메세지 중에서 컨슈머 그룹에 속한 컨슈머들이 아직 읽지 않은 레코드가 리턴된다. 뒤집어 말하면, 이를 이용해서 그룹 내의 컨슈머가 어떤 레코드를 읽었는지를 판단할 수 있다는 얘기다. 카프카의 고유한 특성 중 하나는 많은 JMS 큐들이 하는 것처럼 컨슈머로부터의 응답을 받는 방식이 아니다. 대신, 컨슈머가 카프카를 사용해서 각 파티션에서의 위치를 추적할 수 있게 한다.

     카프카에서는 파티션에서의 현재 위치를 업데이트하는 작업을 오프셋 커밋(offset commit)이라고 부른다. 전통적인 메세지 큐와는 다르게, 카프카는 레코드를 개별적으로 커밋하지 않는다. 대신, 컨슈머는 파티션에서 성공적으로 처리해 낸 마지막 메세지를 커밋함으로써 그 앞의 모든 메세지들 역시 성공적으로 처리되었음을 암묵적으로 나타낸다.

     컨슈머는 어떻게 오프셋을 커밋하는가? 이것은 카프카의 특수 토픽인 _consumer_offsets 토픽에 각 파티션별로 커밋된 오프셋을 업데이트하도록 하는 메세지를 보냄으로써 이루어진다. 모든 컨슈머들이 정상적으로 실행중일 때는 이것이 아무런 영향을 주지 않는다. 하지만, 컨슈머가 크래시되거나 새로운 컨슈머가 그룹에 추가될 경우 리밸런스가 발생한다. 리밸런스 이후 각각의 컨슈머는 리밸런스 이전에 처리하고 있던 것과는 다른 파티션들을 할당받을 수 있다. 어디서부터 작업을 재개해야 하는지를 알아내기 위해 컨슈머는 각 파티션의 마지막으로 커밋된 메세지를 읽어온 뒤 거기서부터 처리를 재개한다.

     만약 커밋된 오프셋이 클라이언트가 처리한 마지막 메세지의 오프셋보다 작을 경우, 마지막으로 처리된 오프셋과 커밋된 오프셋 사이의 메세지들은 두 번 처리되게 된다.

    재처리된 메세지

     만약 커밋된 메세지가 클라이언트가 실제로 처리한 마지막 메세지의 오프셋보다 클 경우, 마지막으로 처리된 오프셋과 커밋된 오프셋 사이의 모든 메세지들은 컨슈머 그룹에서 누락되게 된다.

    오프셋 사이에서 누락된 메세지

     당연히 오프셋 관리는 클라이언트 애플리케이션에 큰 영향을 미친다. KafkaConsumer API는 오프셋을 커밋하는 다양한 방법을 지원한다.

    자동 커밋

     오프셋을 커밋하는 가장 쉬운 방법은 컨슈머가 대신하도록 하는 것이다. enable.auto.commit 설정을 true로 잡아주면 컨슈머는 5초에 한 번, poll()을 통해 받은 메세지 중 마지막 메세지의 오프셋을 커밋한다. 5초 간격은 기본값으로, auto.commit.interval.ms 설정을 잡아 줌으로써 바꿀 수 있다. 컨슈머의 모든 다른 것들과 마찬가지로, 자동 커밋은 폴링 루프에 의해서 실행된다. poll() 메서드를 실행할 때마다 컨슈머는 커밋해야 하는지를 확인한 뒤 그러할 경우에는 마지막 poll() 호출에서 리턴된 오프셋을 기다린다.

     하지만 이 편리한 옵션을 사용하기 전에, 이 옵션을 사용할 때 발생하는 결과를 이해할 필요가 있다. 기본적으로 자동 커밋은 5초에 한 번 발생한다. 마지막으로 커밋한 지 3초 뒤에 컨슈머가 크래시되었다고 해 보자. 리밸런싱이 완료된 뒤부터 남은 컨슈머들은 크래시된 컨슈머가 읽고 있던 파티션들을 이어받아서 읽기 시작한다. 문제는 남은 컨슈머들이 마지막으로 커밋된 오프셋부터 작업을 시작한다는 것이다. 이 경우 커밋되어 있는 오프셋은 3초 전의 것이기 때문에 크래시되기 3초 전까지 읽혔던 이벤트들은 두 번 처리되게 된다. 오프셋을 더 자주 커밋하여 레코드가 중복될 수 있는 윈도우를 즐어들도록 커밋 간격을 줄여서 설정해 줄 수도 있지만, 중복을 완전히 없애는 것은 불가능하다.

     자동 커밋 기능이 켜진 상태에서 오프셋을 커밋할 때가 되면, 다음 번에 호출된 poll()이 이전 호출에서 리턴된 마지막 오프셋을 커밋한다. 이 작동은 어느 이벤트가 실제로 처리되었는지 알지 못하기 때문에 poll()을 다시 호출하기 전 이전 호추에서 리턴된 모든 이벤트들을 처리하는 게 중요하다.(poll()과 마찬가지로 close() 역시 원자적(atomic)으로 오프셋을 커밋한다.) 이것은 보통 문제가 되지 않지만, 폴링 루프에서 예외를 처리하거나 루프를 일찍 벗어날 떄는 주의하라.

     자동 커밋은 편리하다. 그러나 개발자가 중복 메세지를 방지하기엔 충분하지 않다.

    현재 오프셋 커밋하기

     대부분의 개발자들은 오프셋이 커밋되는 시각을 제어하고자 한다. 메세지 유실의 가능성을 제거함과 동시에 리밸런스 발생시 중복되는 메세지 수를 줄이기 위해서다. 컨슈머 API는 타이머 시간이 아닌 애플리케이션 개발자가 원하는 시간에 현재 오프셋을 커밋하는 옵션을 제공한다.

     enable.auto.commit=false로 설정해 줌으로써 애플리케이션이 명시적을 커밋하려 할 때만 오프셋이 커밋되게 할 수 있다. 가장 간단하고 또 신뢰성 있는 커밋 API는 commitSync()이다. 이 API는 poll()이 리턴한 마지막 오프셋을 커밋한 뒤 커밋이 성공적으로 완료되면 리턴, 어떠한 이유로 실패하면 예외를 발생시킨다.

     commitSync()는 poll()에 의해 리턴된 마지막 오프셋을 커밋한다는 점에 유의하자. 따라서 만약 poll()에서 리턴된 모든 레코드의 처리가 완료되기 전 commitSync()를 호출하게 될 경우 애플리케이션이 크래시되었을 때 커밋은 되었지만 아직 처리되지 않은 메세지들이 누락될 위험을 감수해야 할 것이다. 만약 애플리케이션이 아직 레코드들을 처리하는 와중에 크래시가 날 경우 마지막 메세지 배치의 맨 앞 레코드에서부터 리밸런시 시작 시점까지의 모든 레코드들은 두 번 처리될 것이다(이것이 메세지 유실보다 더 나을지 아닐지는 상황에 따라 다르다).

     가장 최근의 메세지 배치를 처리한 뒤 commitSync()를 호출해서 오프셋을 커밋하는 예는 다음과 같다.

    Duration timeout = Duration.ofMillis(100);
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
    
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %d, offset = %d, "
                    + "customer = %s, country = %s\n",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value()); //1
        }
        try {
            consumer.commitSync(); //2
        } catch (CommitFailedException e) {
            log.error("commit failed", e); //3
        }
    }
    1. 레코드의 내용물이 출력되면 처리가 끝나는 것으로 간주하자. 아마도 실제 애플리케이션은 레코드를 가지고 더 많은 처리를 할 것이다. 즉, 수정하고, 확장하고, 집계하고, 대시보드에 출력하고, 혹은 중요한 이벤트에 대해 사용자에게 알려줘야 할 수도 있는 것이다. 사례에 따라 각 레코드의 처리가 언제 '완료'되었는지 결정할 필요가 있다.
    2. 현재 배치의 모든 레코드에 대한 '처리'가 완료되면, 추가 메세지를 폴링하기 전에 commitSync를 호출해서 해당 배치의 마지막 오프셋을 커밋한다.
    3. 해결할 수 없는 에러가 발생하지 않는 한, commitSync는 커밋을 재시도한다. 하지만 실제로 회복할 수 없는 에러가 발생할 경우, 에러를 로깅하는 것 외에 할 수 있는 게 별로 없다.

    비동기적 커밋

     수동 커밋의 단점 중 하나는 브로커가 커밋 요청에 응답할 떄까지 애플리케이션이 블록된다는 점이다. 이것은 애플리케이션의 처리량을 제한하게 된다. 덜 자주 커밋한다면 처리량이야 올라가겠지만 리밸런스에 의해 발생하는 잠재적인 중복 메세지의 수는 늘어난다. 또 다른 방법은 비동기적 커밋 API를 사용하는 것이다. 브로커가 커밋에 응답할 때까지 기다리는 대신 요청만 보내고 처리를 계속한다.

    Duration timeout = Duration.ofMillis(100);
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
    
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %d, offset = %d, "
                    + "customer = %s, country = %s\n",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitAsync();
    }

     이 방식의 단점은 commitSync()가 성공하거나 재시도 불가능한 실패가 발생할 때까지 재시도하는 반면, commitAsync()는 재시도를 하지 않는다는 점이다. 왜냐하면, commitAsync()가 서버로부터 응답을 받은 시점에는 이미 다른 커밋 시도가 성공했을 수도 있기 때문이다. 우리가 오프셋 2000을 커밋하는 요청을 보냈는데, 일시적인 통신 장애로 브로커가 해당 요청을 못 받아서 응답도 하지 않은 경우를 상상해 보자. 그 사이 우리는 다른 배치를 처리한 뒤 성공적으로 오프셋 3000을 커밋한다. 이 시점에서 commitAsync()가 실패한 앞의 커밋 요청을 재시도해서 성공한다면, 오프셋 3000까지 처리되서 커밋까지 완료된 다음에 오프셋 2000이 커밋되는 사태가 발생할 수 있다.

     오프셋을 올바른 순서로 커밋하는 문제의 복잡성과 중요성에 대해 언급하는 이유는 commitAsync()에는 브로커가 보낸 응답을 받았을 대 호출되는 콜백을 지정할 수 있는 옵션 역시 있기 떄문이다. 이 콜백은 커밋 에러를 로깅하거나 커밋 에러 수를 지표 형태로 집계하기 위해 사용되는 것이 보통이지만, 재시도를 하기 위해 콜백을 사용하고자 할 경우 커밋 순서 관련된 문제에 주의를 기울이는 것이 좋다.

    Duration timeout = Duration.ofMillis(100);
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
    
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %d, offset = %d, "
                    + "customer = %s, country = %s\n",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitAsync(new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    log.error("Commit failed for offsets {}", offsets, exception);
                }
            }
        });
    }

     

    '책책책 책을 읽읍시다' 카테고리의 다른 글

    Real MySQL 8.0 2 : 인덱스  (0) 2023.04.29

    댓글

Designed by Tistory.