Java Integration with Kafka distributed message broker

Kafka is a major distributed, partitioned, replicated, commit log service used as a message  broker in the current tech industry open-sourced by Linked-In. It functions as a central messaging bus for various domains and frameworks specializing in big-data systems.

It works as a broker between the producers and consumers with a strong order guarantee than any traditional messaging system. It provides a single consumer abstraction model that generalizes the queuing and publish-subscribe model as a Consumer-Group.

Reference: http://kafka.apache.org/documentation.html#introduction

Github: https://github.com/vishwakarmarhl/kafka-sample

The API is provided by Kafka-clients version 0.9.0.0 

Here I am going through a sample that is capable of publishing a message over the kafka topic. There is a listener constantly waiting with a consumer group that works as a thread pool subscribed to the topic.

A. Setting up kafka is as simple as extracting the archive and starting the server.

Reference: http://kafka.apache.org/090/documentation.html

Download: https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

Say you have a virtual machine as per my setup and the host IP as 192.168.56.101

  1. Download and extract the above kafka-2.11-0.9 build in /opt folder, and set the KAFKA_HOME variable
  2. Configure $KAFKA_HOME/config/server.properties
    advertised.host.name=192.168.56.101

kafka broker internals

If you are into spring and java then this is a great introduction resource from SpringDeveloper. However in this tutorial I am covering a very basic implementation.

 

B. Walk-through of the Producer and Listener

producer_consumer

The code has two main classes responsible for the send (KafkaSpeaker.java) and listen (KafkaListener.java) functionality. KafkaMain is responsible for launching the KafkaListener in a pre-configured consumer threadpool.

KafkaProducer.java

The producer is configured with the broker url and client identifier.

//The kafka producer is configured with the topic and broker address for sending the message as follows: 

 Properties props = new Properties();
 props.put("bootstrap.servers", prop.getProperty("producer.url"));
 props.put("client.id", prop.getProperty("producer.client.id"));
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 this.producer = new KafkaProducer<>(props);

The producer can send the message on a topic using the send api

producer.send(new ProducerRecord<String, String>(topic,
  String.format("{\"type\":\"sample\",\"t\":%.3f,\"k\":%d}",
                System.nanoTime()*1e-9,i)
  ));

 

KafkaListener.java

The listener has the concept of consumer group that can listen on each partition and transact on the messages queued for consumption and further processing.

consumer-groups

The consumer group is created using the threadpool executors as follows,

final ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<KafkaListener> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
 KafkaListener consumer = new KafkaListener(i, prop, topics);
 consumers.add(consumer);
 executor.submit(consumer);
}

Configure the listener and create a consumer

props.put("bootstrap.servers", prop.getProperty("consumer.url"));
props.put("group.id", prop.getProperty("consumer.group.id")); 
props.put("client.id", prop.getProperty("consumer.client.id"));
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
this.timeOut = Integer.parseInt( prop.getProperty("consumer.poll.timeout"));

Subscribe and listen infinitely on the topic until the shutdown hook is executed for this thread pool.

consumer.subscribe(topics);
while (true) {
 ConsumerRecords<String, String> records = consumer.poll(this.timeOut);
 for (ConsumerRecord<String, String> record : records) {
    log.info(record.topic()+" - "+this.id + ", Data: " + record.value());
 }
}

 

The code available in github is self-sufficient and simple enough and was very simple to create.

Thanks to the great article that set me of on this trial project.

Spring Rest API and Mongodb with GridFS

Reference: https://github.com/vishwakarmarhl

RDataServer

Hello draft of the Mongo DB based file data store service

GIT USAGE

There are two more branches apart from the master branch which have the file upload functionality

  1. Checkout the branch
  2. Pull the current branch
    git pull
  3. After making changes in any of them index and commit
    git add .
    git commit -m “Updated the code and added a message”
  4. Push changes to the github repository
    git push origin FileUploadGridFSSpring
    git push origin MultiPartFileUpload

SETUP & START MONGO DB WIN x64

Download : http://www.mongodb.org/dr/downloads.mongodb.org/win32/mongodb-win32-x86_64-2.4.5.zip/download

  1. Unzip the zip contents in C:\mongodb\
  2. create C:\data\db
  3. Execute the C:\mongodb\bin\mongod.exe –dbpath C:\data\db

CREATE SAMPLE DATA

//The following command simply pre-allocates a 2 gigabyte, uncapped collection named people. db.createCollection(“files”, { size: 2147483648 }) db.files.save( { fileId: ‘1235’, fileName: ‘V_XXX.EXE’, filePath: ‘/opt/storage/rhldata’, fileSizeInKB: 123342, fileExtensionType: ‘EXE’ })

PROJECT USAGE

  1. Start the mongod.exe standalone server
  2. Import the source as a maven project in Eclipse STS IDE
  3. MVN PACKAGE
  4. Deploy on a tomcat instance to see the data from mongodb
  5. An alternate plugin for tomcat enables maven based initialization. mvn tomcat7:run
  6. Open up http://localhost:8088/RDataServe to checkout the grid for file data

APPENDIX/REFERENCE

Mongo Shell Cmds

show dbs show collections

//This command creates a collection named file with a maximum size of 5 megabytes and a maximum of 5000 documents. db.createCollection(“files”, { capped : true, size : 5242880, max : 5000 } )

//The following command simply pre-allocates a 2 gigabyte, uncapped collection named people. db.createCollection(“files”, { size: 2147483648 })

//Drop a collection capped db.files.drop()

//Insert db.files.insert( { _id: 1, fileId: 1234, fileName: ‘R_XXX.EXE’, filePath: ‘/opt/storage/rhldata’, fileSizeInKB: 123412, fileExtensionType: ‘EXE’ })

db.files.save( { fileId: ‘1235’, fileName: ‘V_XXX.EXE’, filePath: ‘/opt/storage/rhldata’, fileSizeInKB: 123342, fileExtensionType: ‘EXE’ })

//Query db.files.find({fileId:1234})