카테고리 없음

kafka producer consumer 구현하기

머룽 2023. 4. 20. 09:53
kafka producer과 consumer를 java로 구현해보자. 일단 카프카가 설치 되어 있어야 한다. 카프카 설치는 여기에서 보면 된다. 아주 간단한 샘플용이니 참고만 하길 바란다. kafka version은 0.9.0.1 버전으로 하였다. 바로 소스를 보자.
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>

    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.1.7</version>
    </dependency>
</dependencies>
일단 디펜더시는 카프카 클라이언트와 필자는 logback을 자주 써서 logback으로 디펜더시를 설정 하였다. 다음 코드는 producer 코드이다. 간단하게 만들었다.
public static void main(String[] args) throws IOException {

  Properties configs = new Properties();
  configs.put("bootstrap.servers", "localhost:9092");
  configs.put("acks", "all");
  configs.put("block.on.buffer.full", "true");
  configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

  producer.send(new ProducerRecord<>("test", "hello world"),
    (metadata, exception) -> {
      if (metadata != null) {
        System.out.println(
          "partition(" + metadata.partition() + "), offset(" + metadata.offset() + ")");
      } else {
        exception.printStackTrace();
      }
    });
  producer.flush();
  producer.close();
}
test라는 topic을 예전에 생성 해놔서 topic을 test로 지정해놨다. http://kafka.apache.org/documentation.html#producerconfigs 여기 가면 producer config 파라미터들이 정리되어 있다. 소스는 딱히 설명 하지 않아도 보면 알 듯 싶다.
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
중요한건 send API가 실제 보내는 API이다. ProducerRecord 클래스의 첫번째 인자는 topic이고 두번째 인자는 value로 지정 되어 있다. 만약 콘솔로 consumer를 띄었다면 콘솔에 찍혀 있는 걸 확인 할 수 있다. 다음으론 consumer예제 클래스이다.
public static void main(String[] args) {
  Properties configs = new Properties();
  configs.put("bootstrap.servers", "localhost:9092");
  configs.put("session.timeout.ms", "10000");
  configs.put("group.id", "test");
  configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
  consumer.subscribe(Arrays.asList("test"));
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(500);
    for (ConsumerRecord<String, String> record : records) {
      switch (record.topic()) {
        case "test":
          System.out.println(record.value());
          break;
        default:
          throw new IllegalStateException("get message on topic " + record.topic());
      }
    }
  }
}
http://kafka.apache.org/documentation.html#consumerconfigs 마찬가지로 컨슈머의 설정은 해당 URL에 잘 정리 되어 있다. 예전 config와 새로운 config 설정이 있으니 참고하기 바란다.
consumer.subscribe(Arrays.asList("test"))
test라는 topic을 구독하겠다는 것이다 List로 들어가니 여러 topic을 구독 할 수 있다. 그리고 해당 topic(여기서는 test)으로 오면 실제 값을 출력 해준다. 이렇게 아주 간단하게 producer와 consumer를 구현해봤다. 물론 간단한 코드 이므로 실 운영에서는 사용하지 못할 듯하다. 좀더 리펙토링이 필요 할 것이다. 참고 정도만 하면 될듯 싶다. 소스는 github에 올려놔야 겠다.