Java Integration with Kafka distributed message broker

Kafka is a major distributed, partitioned, replicated, commit log service used as a message  broker in the current tech industry open-sourced by Linked-In. It functions as a central messaging bus for various domains and frameworks specializing in big-data systems.

It works as a broker between the producers and consumers with a strong order guarantee than any traditional messaging system. It provides a single consumer abstraction model that generalizes the queuing and publish-subscribe model as a Consumer-Group.

Reference: http://kafka.apache.org/documentation.html#introduction

Github: https://github.com/vishwakarmarhl/kafka-sample

The API is provided by Kafka-clients version 0.9.0.0 

Here I am going through a sample that is capable of publishing a message over the kafka topic. There is a listener constantly waiting with a consumer group that works as a thread pool subscribed to the topic.

A. Setting up kafka is as simple as extracting the archive and starting the server.

Reference: http://kafka.apache.org/090/documentation.html

Download: https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

Say you have a virtual machine as per my setup and the host IP as 192.168.56.101

  1. Download and extract the above kafka-2.11-0.9 build in /opt folder, and set the KAFKA_HOME variable
  2. Configure $KAFKA_HOME/config/server.properties
    advertised.host.name=192.168.56.101

kafka broker internals

If you are into spring and java then this is a great introduction resource from SpringDeveloper. However in this tutorial I am covering a very basic implementation.

 

B. Walk-through of the Producer and Listener

producer_consumer

The code has two main classes responsible for the send (KafkaSpeaker.java) and listen (KafkaListener.java) functionality. KafkaMain is responsible for launching the KafkaListener in a pre-configured consumer threadpool.

KafkaProducer.java

The producer is configured with the broker url and client identifier.

//The kafka producer is configured with the topic and broker address for sending the message as follows: 

 Properties props = new Properties();
 props.put("bootstrap.servers", prop.getProperty("producer.url"));
 props.put("client.id", prop.getProperty("producer.client.id"));
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 this.producer = new KafkaProducer<>(props);

The producer can send the message on a topic using the send api

producer.send(new ProducerRecord<String, String>(topic,
  String.format("{\"type\":\"sample\",\"t\":%.3f,\"k\":%d}",
                System.nanoTime()*1e-9,i)
  ));

 

KafkaListener.java

The listener has the concept of consumer group that can listen on each partition and transact on the messages queued for consumption and further processing.

consumer-groups

The consumer group is created using the threadpool executors as follows,

final ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<KafkaListener> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
 KafkaListener consumer = new KafkaListener(i, prop, topics);
 consumers.add(consumer);
 executor.submit(consumer);
}

Configure the listener and create a consumer

props.put("bootstrap.servers", prop.getProperty("consumer.url"));
props.put("group.id", prop.getProperty("consumer.group.id")); 
props.put("client.id", prop.getProperty("consumer.client.id"));
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
this.timeOut = Integer.parseInt( prop.getProperty("consumer.poll.timeout"));

Subscribe and listen infinitely on the topic until the shutdown hook is executed for this thread pool.

consumer.subscribe(topics);
while (true) {
 ConsumerRecords<String, String> records = consumer.poll(this.timeOut);
 for (ConsumerRecord<String, String> record : records) {
    log.info(record.topic()+" - "+this.id + ", Data: " + record.value());
 }
}

 

The code available in github is self-sufficient and simple enough and was very simple to create.

Thanks to the great article that set me of on this trial project.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s