Kafka is a major distributed, partitioned, replicated, commit log service used as a message broker in the current tech industry open-sourced by Linked-In. It functions as a central messaging bus for various domains and frameworks specializing in big-data systems.
It works as a broker between the producers and consumers with a strong order guarantee than any traditional messaging system. It provides a single consumer abstraction model that generalizes the queuing and publish-subscribe model as a Consumer-Group.
Reference: http://kafka.apache.org/documentation.html#introduction
Github: https://github.com/vishwakarmarhl/kafka-sample
The API is provided by Kafka-clients version 0.9.0.0
Here I am going through a sample that is capable of publishing a message over the kafka topic. There is a listener constantly waiting with a consumer group that works as a thread pool subscribed to the topic.
A. Setting up kafka is as simple as extracting the archive and starting the server.
Reference: http://kafka.apache.org/090/documentation.html
Download: https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
Say you have a virtual machine as per my setup and the host IP as 192.168.56.101
- Download and extract the above kafka-2.11-0.9 build in /opt folder, and set the KAFKA_HOME variable
- Configure $KAFKA_HOME/config/server.properties
advertised.host.name=192.168.56.101
If you are into spring and java then this is a great introduction resource from SpringDeveloper. However in this tutorial I am covering a very basic implementation.
B. Walk-through of the Producer and Listener
The code has two main classes responsible for the send (KafkaSpeaker.java) and listen (KafkaListener.java) functionality. KafkaMain is responsible for launching the KafkaListener in a pre-configured consumer threadpool.
KafkaProducer.java
The producer is configured with the broker url and client identifier.
//The kafka producer is configured with the topic and broker address for sending the message as follows: Properties props = new Properties(); props.put("bootstrap.servers", prop.getProperty("producer.url")); props.put("client.id", prop.getProperty("producer.client.id")); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); this.producer = new KafkaProducer<>(props);
The producer can send the message on a topic using the send api
producer.send(new ProducerRecord<String, String>(topic, String.format("{\"type\":\"sample\",\"t\":%.3f,\"k\":%d}", System.nanoTime()*1e-9,i) ));
KafkaListener.java
The listener has the concept of consumer group that can listen on each partition and transact on the messages queued for consumption and further processing.
The consumer group is created using the threadpool executors as follows,
final ExecutorService executor = Executors.newFixedThreadPool(numConsumers); final List<KafkaListener> consumers = new ArrayList<>(); for (int i = 0; i < numConsumers; i++) { KafkaListener consumer = new KafkaListener(i, prop, topics); consumers.add(consumer); executor.submit(consumer); }
Configure the listener and create a consumer
props.put("bootstrap.servers", prop.getProperty("consumer.url")); props.put("group.id", prop.getProperty("consumer.group.id")); props.put("client.id", prop.getProperty("consumer.client.id")); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "2000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(props); this.timeOut = Integer.parseInt( prop.getProperty("consumer.poll.timeout"));
Subscribe and listen infinitely on the topic until the shutdown hook is executed for this thread pool.
consumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = consumer.poll(this.timeOut); for (ConsumerRecord<String, String> record : records) { log.info(record.topic()+" - "+this.id + ", Data: " + record.value()); } }
The code available in github is self-sufficient and simple enough and was very simple to create.
Thanks to the great article that set me of on this trial project.