ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • kafka producer consumer 구현하기
    카테고리 없음 2023. 4. 20. 09:53
    kafka producer과 consumer를 java로 구현해보자. 일단 카프카가 설치 되어 있어야 한다. 카프카 설치는 여기에서 보면 된다. 아주 간단한 샘플용이니 참고만 하길 바란다. kafka version은 0.9.0.1 버전으로 하였다. 바로 소스를 보자.
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
        </dependency>
    
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.7</version>
        </dependency>
    </dependencies>
    
    일단 디펜더시는 카프카 클라이언트와 필자는 logback을 자주 써서 logback으로 디펜더시를 설정 하였다. 다음 코드는 producer 코드이다. 간단하게 만들었다.
    public static void main(String[] args) throws IOException {
    
      Properties configs = new Properties();
      configs.put("bootstrap.servers", "localhost:9092");
      configs.put("acks", "all");
      configs.put("block.on.buffer.full", "true");
      configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
      KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
    
      producer.send(new ProducerRecord<>("test", "hello world"),
        (metadata, exception) -> {
          if (metadata != null) {
            System.out.println(
              "partition(" + metadata.partition() + "), offset(" + metadata.offset() + ")");
          } else {
            exception.printStackTrace();
          }
        });
      producer.flush();
      producer.close();
    }
    
    test라는 topic을 예전에 생성 해놔서 topic을 test로 지정해놨다. http://kafka.apache.org/documentation.html#producerconfigs 여기 가면 producer config 파라미터들이 정리되어 있다. 소스는 딱히 설명 하지 않아도 보면 알 듯 싶다.
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
    
    중요한건 send API가 실제 보내는 API이다. ProducerRecord 클래스의 첫번째 인자는 topic이고 두번째 인자는 value로 지정 되어 있다. 만약 콘솔로 consumer를 띄었다면 콘솔에 찍혀 있는 걸 확인 할 수 있다. 다음으론 consumer예제 클래스이다.
    public static void main(String[] args) {
      Properties configs = new Properties();
      configs.put("bootstrap.servers", "localhost:9092");
      configs.put("session.timeout.ms", "10000");
      configs.put("group.id", "test");
      configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
      consumer.subscribe(Arrays.asList("test"));
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(500);
        for (ConsumerRecord<String, String> record : records) {
          switch (record.topic()) {
            case "test":
              System.out.println(record.value());
              break;
            default:
              throw new IllegalStateException("get message on topic " + record.topic());
          }
        }
      }
    }
    
    http://kafka.apache.org/documentation.html#consumerconfigs 마찬가지로 컨슈머의 설정은 해당 URL에 잘 정리 되어 있다. 예전 config와 새로운 config 설정이 있으니 참고하기 바란다.
    consumer.subscribe(Arrays.asList("test"))
    
    test라는 topic을 구독하겠다는 것이다 List로 들어가니 여러 topic을 구독 할 수 있다. 그리고 해당 topic(여기서는 test)으로 오면 실제 값을 출력 해준다. 이렇게 아주 간단하게 producer와 consumer를 구현해봤다. 물론 간단한 코드 이므로 실 운영에서는 사용하지 못할 듯하다. 좀더 리펙토링이 필요 할 것이다. 참고 정도만 하면 될듯 싶다. 소스는 github에 올려놔야 겠다.

    댓글

Designed by Tistory.