kafka producer과 consumer를 java로 구현해보자.
일단 카프카가 설치 되어 있어야 한다.
카프카 설치는 여기에서 보면 된다.
아주 간단한 샘플용이니 참고만 하길 바란다.
kafka version은 0.9.0.1 버전으로 하였다.
바로 소스를 보자.
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.7</version>
</dependency>
</dependencies>
일단 디펜더시는 카프카 클라이언트와 필자는 logback을 자주 써서 logback으로 디펜더시를 설정 하였다.
다음 코드는 producer 코드이다. 간단하게 만들었다.
public static void main(String[] args) throws IOException {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("acks", "all");
configs.put("block.on.buffer.full", "true");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
producer.send(new ProducerRecord<>("test", "hello world"),
(metadata, exception) -> {
if (metadata != null) {
System.out.println(
"partition(" + metadata.partition() + "), offset(" + metadata.offset() + ")");
} else {
exception.printStackTrace();
}
});
producer.flush();
producer.close();
}
test라는 topic을 예전에 생성 해놔서 topic을 test로 지정해놨다.
http://kafka.apache.org/documentation.html#producerconfigs 여기 가면 producer config 파라미터들이 정리되어 있다.
소스는 딱히 설명 하지 않아도 보면 알 듯 싶다.
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
중요한건 send API가 실제 보내는 API이다. ProducerRecord 클래스의 첫번째 인자는 topic이고 두번째 인자는 value로 지정 되어 있다. 만약 콘솔로 consumer를 띄었다면 콘솔에 찍혀 있는 걸 확인 할 수 있다.
다음으론 consumer예제 클래스이다.
public static void main(String[] args) {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("session.timeout.ms", "10000");
configs.put("group.id", "test");
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
switch (record.topic()) {
case "test":
System.out.println(record.value());
break;
default:
throw new IllegalStateException("get message on topic " + record.topic());
}
}
}
}
http://kafka.apache.org/documentation.html#consumerconfigs 마찬가지로 컨슈머의 설정은 해당 URL에 잘 정리 되어 있다. 예전 config와 새로운 config 설정이 있으니 참고하기 바란다.
consumer.subscribe(Arrays.asList("test"))
test라는 topic을 구독하겠다는 것이다 List로 들어가니 여러 topic을 구독 할 수 있다.
그리고 해당 topic(여기서는 test)으로 오면 실제 값을 출력 해준다.
이렇게 아주 간단하게 producer와 consumer를 구현해봤다. 물론 간단한 코드 이므로 실 운영에서는 사용하지 못할 듯하다. 좀더 리펙토링이 필요 할 것이다. 참고 정도만 하면 될듯 싶다.
소스는
github에 올려놔야 겠다.