ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 카프카 핵심 가이드 2 : 프로듀서
    책책책 책을 읽읍시다/프로그래밍 2023. 6. 9. 21:35

    3. 카프카 프로듀서: 카프카에 메세지 쓰기


    카프카 프로듀서 생성하기


     카프카에 메세지를 쓰려면 우선 원하는 속성을 지정해서 프로듀서 객체를 생성해야 한다. 카프카 프로듀서는 아래의 3개의 필수 속성값을 갖는다.

    bootstrap.servers

     카프카 클러스터와 첫 연결을 생성하기 위해 프로듀서가 사용할 브로커의 host:port 목록이다. 이 값은 모든 브로커를 포함할 필요는 없는데, 프로듀서가 첫 연결을 생성한 뒤 추가 정보를 받아오게 되어 있기 때문이다. 다만 브로커 중 하나가 작동을 정지하는 경우에도 프로듀서가 클러스터에 연결할 수 있도록 최소 2개 이상을 지정할 것을 권장한다.

    key.serializer

     카프카에 쓸 레코드의 키의 값을 직렬화하기 위해 사용하는 시리얼라이저(serializer) 클래스의 이름이다. 카프카 브로커는 메세지의 키값, 밸류값으로 바이트 배열을 받는다. 하지만, 프로듀서 인터페이스는 임의의 자바 객체를 키 혹은 밸류로 전송할 수 있도록 매개변수화된 타입(parameterized type)을 사용할 수 있도록 한다. 덕분에 가독성 높은 코드를 작성할 수 있지만, 프로듀서 입장에서는 이 객체를 어떻게 바이트 배열로 바꿔야 하는지 알아야 한다는 의미이기도 하다. key.serializer에는 org.apache.kafka.common.serialization.Serializer 인터페이스를 구현하는 클래스의 이름이 지정되어야 한다. 카프카의 client 패키지에는 (별로 하는 일이 없는) ByteArraySerializer, StringSerializer, IntegerSerializer 등등이 포함되어 있으므로 자주 사용되는 타입을 사용할 경우 시리얼라이저를 직접 구현할 필요는 없다. 키값 없이 밸류값만 보낼 때도 key.serializer 설정은 해 줘야 하지만, VoidSerializer를 사용해서 키 타입으로 Void 타입을 설정할 수 있다.

    value.serializer

     카프카에 쓸 레코드의 밸류값을 직렬화하기 위해 사용하는 시리얼라이저 클래스의 이름이다. 키 값으로 쓰일 객체를 직렬화하기 위해 key.serializer에 클래스 이름을 설정하는 것과 마찬가지로 밸류값으로 쓰일 객체를 직렬화하는 클래스 이름을 value.serializer에 설정해주면 된다.

     

     다음 코드에서는 필수 속성만을 지정하고 나머지는 전부 기본 설정값을 사용하는 방식으로 새로운 프로듀서를 생성하는 방법을 보여준다.

    Properties kafkaProps = new Properties(); //1
    kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
    
    kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); //2
    kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps); //3
    1. 우선 Properties 객체를 생성한다.
    2. 메세지의 키값과 밸류값으로 문자열(String 타입)을 사용할 것이므로, 카프카에서 기본 제공되는 StringSerializer를 사용한다.
    3. 적절한 키와 밸류 타입을 설정하고 Properties 객체를 넘겨줌으로써 새로운 프로듀서를 생성한다.

    상당히 단순한 인터페이스이지만, 단순히 정확한 설정값을 제공하는 것만으로도 프로듀서의 실행을 제어할 수 있음을 확인할 수 있다. 아파치 카프카 문서에는 모든 설정 항목이 나열되어 있는데, 그중에서 중요한 것들은 나중에 다시 보자.

     프로듀서 객체를 생성했으니 이제 메세지를 전송할 수 있다. 메세지 전송 방법에는 크게 3가지 방법이 있다.

     

    파이어 앤 포겟(Fire and Forget)

    메세지를 서버에 전송만 하고 성공 혹은 실패 여부에는 신경 쓰지 않는다. 카프카가 가용성이 높고 프로듀서는 자동으로 전송 실패한 메세지를 재전송 시도하기 때문에 대부분의 경우 메세지는 성공적으로 전달된다. 다만, 재시도를 할 수 없는 에러가 발생하거나 타임아웃이 발생했을 경우 메세지는 유실되며 애플리케이션은 여기에 대해 아무런 정보나 예외를 전달받지 않게 된다.

    동기적 전송(Synchronous send)

    기술적으로 이야기하자면, 카프카 프로듀서는 언제나 비동기적으로 작동한다. 즉, 메세지를 보내면 send() 메서드는 Future 객체를 리턴한다. 하지만 다음 메세지를 전송하기 전 get() 메서드를 호출해서 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인해야 한다.

    비동기적 전송(Asynchronous send)

    콜백 함수와 함께 send() 메서드를 호출하면 카프카 프로커로부터 응답을 받는 시점에서 자동으로 콜백 함수가 호출된다.

     

    비동기적으로 메세지 전송하기

     애플리케이션과 카프카 클러스터 사이의 네트워크 왕복 시간(network roundtrip time)이 10ms라고 가정해보자. 메세지를 보낼 때마다 응답을 기다린다면, 100개의 메세지를 전송하는 데 약 1초가 걸린다. 반면, 보내야 할 메세지를 전부 전송하고 응답을 기다리지 않는다면 100개의 메세지를 전송하더라도 거의 시간이 걸리지 않을 것이다. 실제로 대부분의 경우 굳이 응답이 필요 없다. 카프카는 레코드를 쓴 뒤 해당 레코드의 토픽, 파티션 그리고 오프셋을 리턴하는데, 대부분의 애플리케이션에서는 이런 메타데이터가 필요 없기 때문이다. 반대로, 메세지 전송에 완전히 실패했을 경우에는 그런 내용을 알아야 한다. 그래야 예외를 발생시키든지, 에러를 로그에 쓰든지, 아니면 사후 분석을 위해 에러 파일에 메세지를 쓰거나 할 수 있기 때문이다.

     메세지를 비동기적으로 전송하고도 여전히 에러를 처리하는 경우를 위해 프로듀서는 레코드를 전송할 때 콜백을 지정할 수 있도록 한다. 다음 예제는 콜백을 사용하는 방법을 보여준다.

    private class DemoProducerCallback implements Callback { //1
      @Override
      public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
          e.printStackTrace(); //2
        }
      }
    }
    
    ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); //3
    producer.sent(record, new DemoProducerCallback()); //4
    1. 콜백을 사용하려면 org.apache.kafka.clients.producer.Callback 인터페이스를 구현하는 클래스가 필요하다. 이 인터페이스에는 onCompletion() 단 하나의 메서드만 정의되어 있다.
    2. 만약 카프카가 에러를 리턴한다면 onCompletion() 메서드가 null이 아닌 Exception 객체를 받게 된다. 여기서는 그냥 내용을 화면에 출력해주는 정도로 처리했지만, 실제 애플리케이션에서는 좀 더 확실한 에러 처리 함수가 필요할 것이다.
    3. ProducerRecord를 전송하기 위해 프로듀서 객체의 send 메서드를 사용한다. 메세지는 버퍼에 저장되었다가 별도 스레드에 의해 브로커로 보내진다. 그리고 레코드를 전송할 때 Callback 객체를 함께 매개변수로 전달한다. 
    4. 그리고 레코드를 전송할 때 Callback 객체를 함께 매개변수로 전달한다.

    콜백은 프로듀서의 메인 스레드에서 실행된다. 만약 두 개의 메세지를 동일한 파티션에 전송한다면, 콜백 역시 보낸 순서대로 실행된다. 하지만 이는 뒤집어 생각하면, 전송되어야 할 메세지가 전송이 안되고 프로듀서가 지연되는 상황을 막기 위해서는 콜백이 충분히 빨라야 한다는 의미이기도 하다. 콜백 안에서 블로킹 작업을 수행하는 것 역시 권장되지 않는다. 대신, 블로킹 작업을 동시에 수행하는 다른 스레드를 사용해야 한다.

    batch.size

     같은 파티션에 다수의 레코드가 전송될 경우 프로듀서는 이것들을 배치 단위로 모아서 한꺼번에 전송한다. 이 매개변수는 각각의 배치에 사용될 메모리의 양을 결정한다('개수'가 아니라 '바이트' 단위임에 주의). 배치가 가득 차면 해당 배치에 들어 있는 모든 메세지가 한꺼번에 전송된다. 하지만 이것이 프로듀서가 각각의 배치가 가득 찰 때까지 기다린다는 의미는 아니다. 프로듀서는 절반만 찬 배치나 심지어 하나의 메세지만 들어 있는 배치도 전송한다. 그렇기 때문에 이 매개변수를 지나치게 큰 값으로 유지한다고 해서 메세지 전송에 지연이 발생하지는 않는다. 반면, 이 값을 지나치게 작게 설정할 경우 프로듀서가 지나치게 자주 메세지를 전송해야 하기 때문에 약간의 오버헤드가 발생한다.

    enable.idempotence

     0.11무터 카프타는 '정확히 한 번' 의미 구조(exactly once semantics)를 지원하기 시작했다. '정확히 한 번' 기능은 꽤 방대한 기능이고, 멱등적 프로듀 서는 그중에서도 간단하면서도 매우 강력한 부분이다.

     신뢰성을 최대화하는 방향으로 프로듀서를 설정했다고 가정해보자. acks=all으로 잡고 실패가 나더라도 충분히 재시도하도록 delivery.timout.ms는 꽤 큰 값으로 잡는다. 이 경우 메세지는 반드시 최소 한 번(at least once) 카프카에 쓰여지게 된다. 예를 들어서, 브로커가 프로듀서로부터 레코드를 받아서 로컬 디스크에 쓰고, 다른 브로커에도 성공적으로 복제되었다고 가정하자. 여기서 첫 번째 브로커가 프로듀서로 응답을 보내기 전에 크래시가 났다고 생각해보자. 프로듀서는 request.timeout.ms 만큼 대기한 뒤 재전송을 시도하게 된다. 이때 새로 보내진 메세지는 (이미 기존 쓰기 작업이 성공적으로 복제되었으므로) 이미 메세지를 받은 바 있는 새 리더 브로커로 전달되게 한다. 메세지가 중복되어 저장되는 것이다.

     enable.idempotence=true 설정을 잡아 주는 것은 바로 이러한 사태를 방지하기 위함이다. 멱등적 프로듀서 기능이 활성화된다면, 프로듀서는 레코드를 보낼 때마다 순차적인 번호를 붙여서 보내게 된다. 만약 브로커가 동일한 번호를 가진 레코드를 2개 이상 받을 경우 하나만 저장하게 되며, 프로듀서는 별다른 문제를 발생시키지 않는 DuplicateSequenceException을 받게 된다.

    멱등적 프로듀서 기능을 활성화하기 위해서는 max.in.flight.requests.per.connection 매개변수는 5 이하로, retries는 1 이상으로 그리고 acks=all로 잡아 주어야 한다. 만약 이 조건을 만족하지 않는 설정값을 지정한다면 ConfigException이 발생한다.

     

    시리얼라이저

     카프카는 정숫값을 직렬화할 때 사용하는 IntegerSerializer, ByteArray에 사용되는 ByteArraySerializer, 문자열에 사용되는 StringSerializer 등을 포함하고 있다. 하지만 이것만으로 모든 데이터를 직렬화할 수는 없다. 결국에는 더 일반적인 레코드를 직려로하할 수 있어야 할 ㄷ것이다.

    커스텀 시리얼라이저

     카프카로 전송해야 하는 객체가 단순한 문자열이나 정숫값이 아닐 경우에는 두 가지의 선택지가 있을 수 있다.

    1. 레코드를 생성하기 위해 에이브로(Avro), 스리프트(Thrift), 프로토버프(Protobuf)와 같은 범용 직렬화 라이브러리를 사용한다.
    2. 사용하고 있는 객체를 직렬화하기 위한 커스텀 직렬화 로직을 작성한다.

    전자인 범용 직렬화 라이브러리를 사용하는 방안을 강력 권장한다. 시리얼라이저가 작동하는 방식과 왜 범용 직렬화 라이브러리를 사용하는 것이 좋은지를 이해하기 위해 커스텀 시리얼라이저를 작성해보자.

     

    Customer

    public class Customer {
        private int customerId;
        private String customerName;
    
        public Customer(int customerId, String customerName) {
            this.customerId = customerId;
            this.customerName = customerName;
        }
    
        public int getId() {
            return customerId;
        }
    
        public String getName() {
            return customerName;
        }
    }

    이제 이 클래스를 위한 커스텀 시리얼라이저를 작성해보자.

    CustomerSerializer

    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class CustomerSerializer implements Serializer<Customer> {
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            // nothing to configure
        }
    
        @Override
        /**
         We are serializing Customer as:
         4 byte int representing customerId
         4 byte int representing length of customerName in UTF-8 bytes (0 if name is Null)
         N bytes representing customerName in UTF-8
         **/
        public byte[] serialize(String topic, Customer data) {
            try {
                byte[] serializedName;
                int stringSize;
                if (data == null)
                    return null;
                else {
                    if (data.getName() != null) {
                        serializedName = data.getName().getBytes("UTF-8");
                        stringSize = serializedName.length;
                    } else {
                        serializedName = new byte[0];
                        stringSize = 0;
                    }
                }
    
                ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
                buffer.putInt(data.getId());
                buffer.putInt(stringSize);
                buffer.put(serializedName);
    
                return buffer.array();
            } catch (Exception e) {
                throw new SerializationException("Error when serializing Customer to byte[] " + e);
            }
        }
    
        @Override
        public void close() {
            Serializer.super.close();
        }
    }

     프로듀서를 생성할 때 CustomerSerializer를 사애요해서 설정값을 잡아주면, ProducerRecord<String, Customer>를 사용해서 Customer 객체를 바로 프로듀서에 전달할 수 있다. 단순한 예시지만, 코드에 취약점이 있음을 알 수 있을 것이다. 예를 들어서 만약 고객이 너무 많을 경우 CustomerId의 타입을 Long으로 바꿔 주어야 할 것이고, Customer에 startDate 필드를 추가해야 할 경우 기존 형식과 새 형식 사이의 호환성을 유지해야 하는 심각한 문제를 안게 된다. 서로 다른 버전의 직렬화/비직렬화 로직을 디버깅하는 것은 상당히 어려운 작업인데, 단순 바이트 뭉치를 일일이 비교해야 하기 때문이다. 더 심각한 문제는, 만약 같은 회사의 여러 팀에서 Customer 데이터를 카프카로 쓰는 작업을 수행하고 있다면 모두가 같은 로직을 사용하고 있어야 하기 때문에 코드를 동시에 변경해야 하는 상황이 발생한다.

     이러한 이유로 JSON, 아파치 에이브로, 스리프트 혹은 프로토버프와 같은 범용 라이브러리를 사용할 것을 권장한다. 아파치 에이브로가 무엇인지, 어떻게 직렬화하고 카프카로 전송하는지 알아보자.

    아파치 에이브로를 사용해서 직렬화하기

     아파치 에이브로는 언어 중립(language-neutral)적인 데이터 직렬화 형식이다. 이 프로젝트는 더 범용적인 데이터 파일 공유 방식을 제공하는 것을 목표로 시작되었다.

     에이브로 데이터는 스키마의 형태로 기술된다. 이 스키마는 보통 JSON 형식으로 정의되며, 주어진 데이터를 스키마에 따라 직렬화하면 이진 파일 형태로 결과물이 뽑혀 나오는 것이 보통이다(JSON 형태로 뽑는 것도 가능하다). 에이브로는 직렬화된 결과물이 저장된 파일을 읽거나 직렬화를 할 때 스키마 정보가 별도로 주어진다고 가정하고, 보통은 에이브로 파일 자체에 스키마를 내장하는 방법을 쓴다.

     에이브로의 재미있는 점이자 카프카와 같은 메세지 전달 시스템에 사용하는 데 적합한 이유는 메세지를 쓰는 애플리케이션이 새로운 스키마로 전환하더라도 기존 스키마와 호환성을 유지하는 한, 데이터를 읽는 애플리케이션은 일체의 변경이나 업데이트 없이 계속해서 메세지를 처리할 수 있다는 것이다.

    원래 스키마가 다음과 같다고 가정하자.

    {
      "namespace": "customerManagement.avro",
      "type": "record",
      "name": "Customer",
      "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "faxNumber", "type": ["null", "string"], "default": "null"} //1
      ]
    }
    1. id와 name 필드는 필수 사항인 반면 faxNumber는 선택 사항이고 기본값은 null이다.

     이 스키마를 몇 달 동안 사용해 왔고 이미 이 형식으로 몇 테라바이트의 데이터가 생성되어 있다고 가정해 보자. 이제 이 형식을 21세기에 맞춰 업데이트하기로 했다고 해보자. 팩스 번호(faxNumber) 필드를 삭제하고 대신 email 필드를 추가하는 것이다.

     새로운 스키마는 다음과 같다.

    {
      "namespace": "customerManagement.avro",
      "type": "record",
      "name": "Customer",
      "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": ["null", "string"], "default": "null"}
      ]
    }

     이제 새로운 버전으로 업그레이드를 하면 예전 레코드는 faxNumber를 가지고 있는 반면, 새 레코드는 email을 가지고 있게 된다. 보통 업그레이드란 느리게 진행되는 게 보통이고 몇 달씩 걸리는 경우도 흔하다. 따라서 여전히 faxNumber를 가진 레코드를 처리하는 이전 버전 애플리케이션과 email을 가진 레코드를 처리하는 이후 버전 애플리케이션이 (구버전과 신버전이 뒤섞여 있는) 카프카에 저장된 모든 이벤트를 처리할 수 있도록 해줘야 한다.

     아직 업그레이드 전인 이벤트를 읽는 쪽 애플리케이션은 getName(), getId(), getFaxNumber()와 같은 메서드를 가지고 있을 것이다. 이 애플리케이션이 신버전의 스키마를 사용해서 쓰여진 메세지를 받을 경우 getName(), getId() 메서드는 변경 없이 작동하지만, getFaxNumber는 null을 리턴하게 된다(신버전 메세지에는 faxNumber가 없기 때문이다).

     이제, 우리가 읽는 쪽 애플리케이션을 업그레이드한다고 가정해 보자. 이제 이 애플리케이션에는 getFaxNumber() 같은 메서드는 없고 대신 getEmail()이 있다. 이 애플리케이션이 구버전 스키마로 쓰인 메세지를 받을 경우 getEmail()은 null을 리턴하게 된다(구버전 메세지에는 email이 없기 때문이다).

     이 예시는 에이브로를 사용할 때의 이점을 보여준다. 즉, 데이터를 읽는 쪽 애플리케이션을 전부 변경하지 않고 스키마를 변경하더라도 어떠한 예외나 에러가 발생하지 않으며, 기존 데이터를 새 스키마에 맞춰 업데이트하는 엄청난 작업을 할 필요도 없다는 것이다.

     다만, 이러한 시나리오에도 두 가지 주의해야 할 점이 있다.

    • 데이터를 쓸 때 사용하는 스키마와 읽을 때 기대하는 스키마가 호환되어야 한다. 에이브로 문서(https://avro.apache.org/docs/1.11.1/specification/)에 기술된 호환성 규칙을 참고(또는 https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html)하라.
    • 역직렬화를 할 때는 데이터를 쓸 때 사용했던 스키마에 접근이 가능해야 한다. 설령 그 스키마가 읽는 쪽 애플리케이션에서 기대하는 스키마와 다른 경우에도 마찬가지다. 이 조건을 만족시키기 위해 에이브로 파일을 쓰는 과정에는 사용된 스키마를 쓰는 과정이 포함되어 있지만, 카프카 메세지를 다룰 때는 좀 더 나은 방법이 있다.

    카프카에서 에이브로 레코드 사용하기

     파일 안에 전체 스키마를 저장함으로써 약간의 오버헤드를 감수하는 에이브로 파일과는 달리, 카프카 레코드에 전체 스키마를 저장할 경우 전체 레코드 사이즈는 2배 이상이 될 수 있다. 하지만, 에이브로는 레코드를 읽을 때 스키마 전체를 필요로 하기 떄문에 어딘가 스키마를 저장해 두기는 해야 한다. 이 문제를 해결하기 위해 스키마 레지스트리(Schema Registry)라 불리는 아키텍처 패턴을 사용한다. 스키마 레지스트리는 아파치 카프카의 일부가 아니며 여러 오픈소스 구현체 중 하나를 골라서 사용하면 된다.

     여기서 핵심 아이디어는 카프카에 데이터를 쓰기 위해 사용되는 모든 스키마를 레지스트리에 저장한다는 것이다. 그리고 카프카에 쓰는 레코드에는 사용된 스키마의 고유 식별자만 심어주면 되는 것이다. 컨슈머는 이 식별자를 사용해서 스키마 레지스트리에서 스키마를 가져와서 데이터를 역직렬화할 수 있다. 여기서 중요한 점은 이 모든 작업(스키마를 레지스트리에 저장하고 필요할 떄 가져오는)이 주어진 객체를 직렬화하는 시리얼라이저와의 직렬화된 데이터를 객체로 복원하는 디시리얼라이저 내부에서 수행된다는 점이다. 카프카에 데이터를 쓰는 코드는 그저 다른 시리얼라이저를 사용하듯이 에이브로 시리얼라이저를 사용하면 된다. 아래 그림은 이 과정을 보여준다.

    에이브로 레코드의 직렬화와 역직렬화 처리 흐름

     아래에서는 에이브로를 사용해서 생성한 객체를 카프카에 쓰는 방법을 보여준다.

    Properties props = new Properties();
    
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
    props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); //1
    props.put("schema.registry.url", schemaUrl); //2
    
    String topic = "customerContacts";
    
    Producer<String, Customer> producer = new KafkaProducer<>(props); //3
    
    while (true) {
        Customer customer = CustomerGenerator.getNext(); //4
        System.out.println("Generated customer " + customer.toString());
        ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getName(), customer); //5
        producer.send(record); //6
    }
    1. 에이브로를 사용해서 객체를 직렬화하기 위해 KafkaAvroSerializer를 사용한다. KafkaAvroSerializer는 객체뿐만 아니라 기본형 데이터 역시 처리할 수 있음을 명심하라. 이는 우리가 나중에 레코드 키 타입으로 String을, 밸류 타입으로 Customer를 사용할 수 있는 이유이기도 하다.
    2. schema.registry.url은 에이브로 시리얼라이저의 설정 매개변수로, 프로듀서가 시리얼라이저에 넘겨주는 값이다. 이 값은 우리가 스키마를 저장해 놓은 위치를 가리킨다.
    3. Customer는 우리가 생성한 클래스이며, 여기서는 프로듀서에 우리가 사용할 레코드의 밸류 타입이 Customer임을 알려준다.
    4. Customer 클래스는 일반적인 자바 클래스(POJO: Plain Old Java Object)가 아니라 에이브로의 코드 생성 기능을 사용해서 스키마로부터 생성된 에이브로 특화 객체다. 에이브로 시리얼라이저는 POJO 객체가 아닌 에이브로 객체만을 직렬화할 수 있기 때문이다. 에이브로 클래스를 생성하는 것은 avro-tool.jar를 사용하거나 에이브로 메이븐 플러그인을 사용해서 가능하다.
    5. 밸류 타입이 Customer인 ProducerRecord 객체를 생성하고, Customer 객체를 인수를 전달한다.
    6. 이제 다 되었다. Customer 객체를 전송하면 나머지는 KafkaAvroSerializer가 알아서 해 준다.

    파티션

     카프카 메세지는 키-밸류 순서쌍(key-value pair)이라고 할 수 있는데, 키의 기본값이 null인 만큼 토픽과 밸류의 값만 있어도 ProduceRecord 객체를 생성할 수는 있기는 하지만 대부분의 경우 키값이 지정된 레코드를 쓴다. 키의 역할은 두 가지로, 그 자체로 메세지에 (밸류값과) 함께 저장되는 추가적인 정보이기도 하지만 하나의 토픽에 속한 여러 개의 파티션 중 해당 메세지가 저장될 파티션을 결정짓는 기준점이기도 하다. (키값은 로그 압착 기능이 활성화된 토픽에서도 중요한 역할을 한다.) 같은 키값을 가진 모든 메세지는 같은 파티션에 저장되는 것이다. 즉, 임의의 프로세스가 전체 파티션 중 일부만을 읽어올 경우, 특정한 키값을 갖는 모든 메세지를 읽게 된다는 것이다. 키-밸류 레코드를 생성하기 위해서는 다음과 같이 ProduceRecord 객체를 생성하면 된다.

    ProducerRecord<String, String> record = new ProducerRecord<"CustomerCountry", "Laboratory Equipment", "USA");

    키값이 없는 레코드를 생성하기 위해서는 다음과 같이 키 부분을 생략하면 된다.

    ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "USA");

     여기서는 키값을 null로 잡아 주었다.

     기본 파티셔너(partitioner) 사용 중에 키값이 null인 레코드가 주어질 경우, 레코드는 현재 사용 가능한 토픽의 파티션 중 하나에 랜덤하게 저장된다. 각 파티션별로 저장되는 메세지 개수의 균형을 맞추기 위해 라운드 로빈(round robin) 알고리즘이 사용된다. 아파치 카프카 2.4 프로듀서부터 기본 파티셔너는 키값이 null인 경우, 접착성(sticky) 처리를 하기 위해 라운드 로빈 알고리즘을 사용한다. 이 기능을 구체적으로 설명하자면, 프로듀서가 메세지 배치를 채울 때 다음 배치로 넘어가기 전 이전 배치를 먼저 채우게 되어 있다. 이 기능은 더 적은 요청으로 같은 수의 메세지를 전송하게 함으로써 지연 시간을 줄이고 broker의 CPU 사용량을 줄인다.

     반대로 키값이 지정된 상황에서 기본 파티셔너를 사용할 경우, 카프카는 키값을 해시(hash)한 결과를 기주능로 메세지를 저장할 파티션을 특정한다. (이때 파티셔너는 자체적인 해싱 알고리즘을 사용하기 때문에 자바 버전이 업그레이드되어도 해시값은 변하지 않는다.) 이때 동일한 키값은 항상 동일한 파티션에 저장되는 것이 원칙이기 때문에 파티션을 선택할 때는 토픽의 모든 파티션을 대상으로 선택한다(즉, 사용 가능한 파티션만 대상으로 하지 않는다). 따라서 특정한 파티션에 장애가 발생한 상태에서 해당 파티션에 데이터를 쓰려고 할 경우 에러가 발생한다.

    댓글

Designed by Tistory.