Java Integration with Kafka distributed message broker

Kafka is a major distributed, partitioned, replicated, commit log service used as a message  broker in the current tech industry open-sourced by Linked-In. It functions as a central messaging bus for various domains and frameworks specializing in big-data systems.

It works as a broker between the producers and consumers with a strong order guarantee than any traditional messaging system. It provides a single consumer abstraction model that generalizes the queuing and publish-subscribe model as a Consumer-Group.

Reference: http://kafka.apache.org/documentation.html#introduction

Github: https://github.com/vishwakarmarhl/kafka-sample

The API is provided by Kafka-clients version 0.9.0.0 

Here I am going through a sample that is capable of publishing a message over the kafka topic. There is a listener constantly waiting with a consumer group that works as a thread pool subscribed to the topic.

A. Setting up kafka is as simple as extracting the archive and starting the server.

Reference: http://kafka.apache.org/090/documentation.html

Download: https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

Say you have a virtual machine as per my setup and the host IP as 192.168.56.101

  1. Download and extract the above kafka-2.11-0.9 build in /opt folder, and set the KAFKA_HOME variable
  2. Configure $KAFKA_HOME/config/server.properties
    advertised.host.name=192.168.56.101

kafka broker internals

If you are into spring and java then this is a great introduction resource from SpringDeveloper. However in this tutorial I am covering a very basic implementation.

 

B. Walk-through of the Producer and Listener

producer_consumer

The code has two main classes responsible for the send (KafkaSpeaker.java) and listen (KafkaListener.java) functionality. KafkaMain is responsible for launching the KafkaListener in a pre-configured consumer threadpool.

KafkaProducer.java

The producer is configured with the broker url and client identifier.

//The kafka producer is configured with the topic and broker address for sending the message as follows: 

 Properties props = new Properties();
 props.put("bootstrap.servers", prop.getProperty("producer.url"));
 props.put("client.id", prop.getProperty("producer.client.id"));
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 this.producer = new KafkaProducer<>(props);

The producer can send the message on a topic using the send api

producer.send(new ProducerRecord<String, String>(topic,
  String.format("{\"type\":\"sample\",\"t\":%.3f,\"k\":%d}",
                System.nanoTime()*1e-9,i)
  ));

 

KafkaListener.java

The listener has the concept of consumer group that can listen on each partition and transact on the messages queued for consumption and further processing.

consumer-groups

The consumer group is created using the threadpool executors as follows,

final ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<KafkaListener> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
 KafkaListener consumer = new KafkaListener(i, prop, topics);
 consumers.add(consumer);
 executor.submit(consumer);
}

Configure the listener and create a consumer

props.put("bootstrap.servers", prop.getProperty("consumer.url"));
props.put("group.id", prop.getProperty("consumer.group.id")); 
props.put("client.id", prop.getProperty("consumer.client.id"));
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
this.timeOut = Integer.parseInt( prop.getProperty("consumer.poll.timeout"));

Subscribe and listen infinitely on the topic until the shutdown hook is executed for this thread pool.

consumer.subscribe(topics);
while (true) {
 ConsumerRecords<String, String> records = consumer.poll(this.timeOut);
 for (ConsumerRecord<String, String> record : records) {
    log.info(record.topic()+" - "+this.id + ", Data: " + record.value());
 }
}

 

The code available in github is self-sufficient and simple enough and was very simple to create.

Thanks to the great article that set me of on this trial project.

Advertisements

Spring Rest API and Mongodb with GridFS

Reference: https://github.com/vishwakarmarhl

RDataServer

Hello draft of the Mongo DB based file data store service

GIT USAGE

There are two more branches apart from the master branch which have the file upload functionality

  1. Checkout the branch
  2. Pull the current branch
    git pull
  3. After making changes in any of them index and commit
    git add .
    git commit -m “Updated the code and added a message”
  4. Push changes to the github repository
    git push origin FileUploadGridFSSpring
    git push origin MultiPartFileUpload

SETUP & START MONGO DB WIN x64

Download : http://www.mongodb.org/dr/downloads.mongodb.org/win32/mongodb-win32-x86_64-2.4.5.zip/download

  1. Unzip the zip contents in C:\mongodb\
  2. create C:\data\db
  3. Execute the C:\mongodb\bin\mongod.exe –dbpath C:\data\db

CREATE SAMPLE DATA

//The following command simply pre-allocates a 2 gigabyte, uncapped collection named people. db.createCollection(“files”, { size: 2147483648 }) db.files.save( { fileId: ‘1235’, fileName: ‘V_XXX.EXE’, filePath: ‘/opt/storage/rhldata’, fileSizeInKB: 123342, fileExtensionType: ‘EXE’ })

PROJECT USAGE

  1. Start the mongod.exe standalone server
  2. Import the source as a maven project in Eclipse STS IDE
  3. MVN PACKAGE
  4. Deploy on a tomcat instance to see the data from mongodb
  5. An alternate plugin for tomcat enables maven based initialization. mvn tomcat7:run
  6. Open up http://localhost:8088/RDataServe to checkout the grid for file data

APPENDIX/REFERENCE

Mongo Shell Cmds

show dbs show collections

//This command creates a collection named file with a maximum size of 5 megabytes and a maximum of 5000 documents. db.createCollection(“files”, { capped : true, size : 5242880, max : 5000 } )

//The following command simply pre-allocates a 2 gigabyte, uncapped collection named people. db.createCollection(“files”, { size: 2147483648 })

//Drop a collection capped db.files.drop()

//Insert db.files.insert( { _id: 1, fileId: 1234, fileName: ‘R_XXX.EXE’, filePath: ‘/opt/storage/rhldata’, fileSizeInKB: 123412, fileExtensionType: ‘EXE’ })

db.files.save( { fileId: ‘1235’, fileName: ‘V_XXX.EXE’, filePath: ‘/opt/storage/rhldata’, fileSizeInKB: 123342, fileExtensionType: ‘EXE’ })

//Query db.files.find({fileId:1234})

vRODBC setup for DistributedR and Vertica database cluster

Pre-requisites

In this particular case there is vertica database server installed with Vertica-R-Udtf setup. The R runtime is being provided from within vertica hence the environment variables changed as below.

Reference: http://www.vertica.com/wp-content/uploads/2014/06/vRODBC-Installation-Guide.pdf

1. Add the following variables in the .bashrc of root for library linking and source it.

export R_HOME=/opt/vertica/R
export DR_HOME=/opt/hp/distributedR
export LD_LIBRARY_PATH=/usr/local/lib/:/opt/vertica/lib64/:/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.34.x86_64/jre/lib/amd64/server:$DR_HOME/lib:$DR_HOME/third_party/lib:$DR_HOME/third_party/lib/atomicio/:$LD_LIBRARY_PATH

2. Install using the  DistributedR-master/third_party/unixODBC-2.3.1.tar.gz

$ tar -xvzf unixODBC-2.3.1.tar.gz
$ cd unixODBC-2.3.1
$ ./configure && make
$ make install

3. Install the vRODBC available in the DistributedR-master/ source folder

$ R CMD INSTALL vRODBC/

4. Create the config files for vRODBC

# Create a directory for the config files
$ mkdir /home/dbadmin/vRODBC/

5. Add the following content to this folder in vertica.ini

 [Driver]
 DriverManagerEncoding = UTF-16
 ODBCInstLib = /usr/local/lib/libodbcinst.so
 ErrorMessagesPath = /opt/vertica/lib64
 LogLevel = 0
 LogPath = /tmp

6. Add the following content to this folder in odbc.ini

 [Test]
 Description = vRODBC test client
 Driver = /opt/vertica/lib64/libverticaodbc.so
 Database = test
 Servername = 10.0.0.3
 UserName = dbadmin
 Password = test
 Port = 5433
 ConnSettings =
 Locale = en_US

7. Use the following environment variables in the users .bashrc and source it

 export R_HOME=/opt/vertica/R
 export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.34.x86_64/jre
 export DR_HOME=/opt/hp/distributedR
 export PATH=/opt/vertica/bin:/opt/vertica/R/bin:$DR_HOME/bin:$PATH
 export LD_LIBRARY_PATH=/usr/local/lib/:/opt/vertica/lib64/:/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.34.x86_64/jre/lib/amd64/server:/home/dbadmin/JAGS/lib:$DR_HOME/lib:$DR_HOME/third_party/lib:$DR_HOME/third_party/lib/atomicio/:$LD_LIBRARY_PATH

 # ODBC Configurations
 export VERTICAINI=/home/dbadmin/vRODBC/vertica.ini
 export ODBCINI=/home/dbadmin/vRODBC/odbc.ini

8. Open the R console and test the connections using the following script

library(vRODBC)
connect <- odbcConnect("Test")
segment <- sqlQuery(connect, "SELECT * FROM role")
segment
odbcClose(connect)

Setup open source Distributed R on a three node cluster with R and execute tests on workers

————————————————————————————————————————

———————-              DISTRIBUTED R              ————————-

————————————————————————————————————————

References:

  1. Pre requisite packages
#Install dependencies
$ sudo yum install libtool zlib devel automake pkgconfig gcc c++ curl 
$ sudo yum install -y make gcc gcc-c++ libxml2-devel rsync

# Install R
$ curl -O http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
$ sudo rpm -i epel-release-latest-7.noarch.rpm
$ sudo yum update
$ sudo yum install R R-devel

2. Move the installation archive to target. copied from github https://github.com/vertica/DistributedR

scp -r disvert.tar disvert@10.0.0.5:/home/disvert/
ssh disvert@10.0.0.5

3. Remove any older version of the package and verify R installation

# Connect to the R console and make sure to remove any old versions
remove.packages('Rcpp')
remove.packages('RInside')

# Go to the source code of Distrib R and make clean 
make clean
whereis R
make distclean

# Remove any old installation
rm -rf /opt/hp/distributedR/

4. Update the environment for execution. This can be done towards the end of the installation.

Make sure you have password-less access to other nodes to your cluster nodes.

# Add the R runtime to the path bin just in case its installed separately 
ln -s /opt/disvert/R/bin/R /bin/R
ln -s /opt/disvert/R/bin/R /sbin/R
# Update the environment variables in ~/.bashrc file for the libraries and executables path
export R_HOME=/opt/disvert/R
export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.34.x86_64/jre
export DR_HOME=/opt/hp/distributedR
export PATH=/opt/disvert/bin:/opt/disvert/R/bin:$DR_HOME/bin:$PATH
export LD_LIBRARY_PATH=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.34.x86_64/jre/lib/amd64/server:$DR_HOME/lib:$DR_HOME/third_party/lib:$DR_HOME/third_party/lib/atomicio/:$LD_LIBRARY_PATH

5. Install the following from the $DR_HOME/third party lib folder of the github distribution.

Press tab to autocomplete the version as per the package archive name in the folder.

R CMD INSTALL Rcpp_
R CMD INSTALL RInside_
R CMD INSTALL XML_
R CMD INSTALL randomForest_
R CMD INSTALL chron_
R CMD INSTALL data.table_

6. Build dependencies. Go to the DistributedR-master/third_party/ directory and make -j4

make -j4 all

7. Build and install the actual code in the DistributedR-master

make -j4
make install

———————————————————-
8. Test the library execution in the R console
———————————————————-

library(distributedR)
distributedR_start() # start DR
distributedR_status()
B
getpartition(B) # collect darray data
distributedR_shutdown() # stop DR

————————————————————————————————————————
9. Cluster configuration for the nodes are available at /opt/hp/distributedR/conf/cluster_conf.xml
————————————————————————————————————————

  • node0001 = 10.0.0.3,/home/disvert
  • node0002 = 10.0.0.4,/home/disvert
  • node0003 = 10.0.0.5,/home/disvert

Following configuration is for the node0001 and will be replicated on other nodes with the server info configuration.

<MasterConfig>
 <ServerInfo>
 <Hostname>10.0.0.3</Hostname>
 <StartPortRange>50000</StartPortRange>
 <EndPortRange>50100</EndPortRange>
 </ServerInfo>
 <Workers>
 <Worker>
 <Hostname>10.0.0.3</Hostname>
 <StartPortRange>50000</StartPortRange>
 <EndPortRange>50100</EndPortRange>
 <Executors>0</Executors>
 <SharedMemory>0</SharedMemory>
 </Worker>
 <Worker>
 <Hostname>10.0.0.4</Hostname>
 <StartPortRange>50000</StartPortRange>
 <EndPortRange>50100</EndPortRange>
 <SharedMemory>0</SharedMemory>
 <Executors>15</Executors>
 </Worker>
 <Worker>
 <Hostname>10.0.0.5</Hostname>
 <StartPortRange>50000</StartPortRange>
 <EndPortRange>50100</EndPortRange>
 <SharedMemory>0</SharedMemory>
 <Executors>15</Executors>
 </Worker>
 </Workers>
</MasterConfig>

This will get you started on the distributed R tests. I hope such a cluster configuration will be handy for any data crunching that you may want to do with R.

Secure an instance with SSH RSA based on a public and private key pair for access

Secure a public instance with SSH-2 RSA. This would be done for provisioning private key based SSH access to a user test on the remote machine.

http://www.server-world.info/en/note?os=CentOS_7&p=ssh&f=4

A larger reference is available in the above link. I have tried the multi node password less test user setup.

  1. Create the user for secure access on the remote machine. In case its a cluster then on all the nodes of the cluster.
 $ adduser test
 $ passwd  test
           t35t

2. Generate a public(id_rsa.pub)/private(id_rsa) KeyPair without a pass. The path shall be /home/test/.ssh

 $ ssh-keygen -t rsa

3. Add the public key string which looks something like below, in id_rsa.pub to the OpenSSH keys file at /home/test/.ssh/authorized_keys

ssh-rsa AAAAB3NzaC1 ... ... ...3PGVu4D+37RA0expQUJX1p++JtLlaw== rsa-key-20150623-test-user

4. Move the generated keys id_rsa, id_rsa.pub and authorized_keys to all the test nodes we want password-less access to.

Make sure the files are readable with the right permissions on the remote machines
 chmod 700 ~/.ssh
 chmod 600 ~/.ssh/*

5. Access the nodes from another machine with the private key in the .ssh folder

$ ssh -v test@10.0.0.66 # should log in without a password

6. Share the key with another user in a ppk file for putty based access.
Move the private key to another system and use windows puttygen to load it and save the private key as a ppk file . Use this private key file in pageant to access this instance as team user

How to install NodeJS and use NVM for your node versioning with sailsjs framework for no reason at all

  ------------------------------------------------------------------
                     NodeJS (https://nodejs.org)
  ------------------------------------------------------------------

1.) Always Use NVM for using the right version of node installation

$ curl https://raw.githubusercontent.com/creationix/nvm/v0.23.3/install.sh | bash
$ wget -qO- https://raw.githubusercontent.com/creationix/nvm/v0.23.3/install.sh | bash

Usage of nvm for showing the list of remote and local node installation available

$ nvm list-remote
$ nvm install 0.12.0
$ nvm list

2.) Install node manually
$ wget http://nodejs.org/dist/latest/node-v0.12.0.tar.gz
$ tar -xvzf node-v0.12.0.tar.gz
$ cd node-v0.12.0/
$ sudo yum groups install “C Development Tools and Libraries”

$ ./configure
$ sudo make install

Verify Installation:

$ node –version
$ npm –version
3.) Setup Sails.js framework on node
$ http://sailsjs.org/
$ https://github.com/balderdashy/sails-docs

Since evryting works on node use nvm to source the installed node version for this sails project
$ nvm current
$ nvm ls
$ nvm use v0.12.0
$ npm -g install sails

$ sails –version

4.) Start the server
$ sails new ships
$ cd ships
$ sails lift

SailsJs frameworks and simplified management app has made the web development overly simple. The nvm system is more linux centric but I am sure there is something like https://github.com/coreybutler/nvm-windows which does the trick I suppose.

5.) Lets add some web API to the project. Adding more to Sails code
Generate a controller and model specifically. The routes are automagically taken care of.

$ sails generate controller user
$ sails generate model User name:string email:string type:string id:int

Generate controller and model as an api
$ sails generate api vendor

$ sails lift

Now the in-memory default ORM kick in and sets up the user and vendor model.

Will try to grow more on the sailsjs framework so that I can really make sue of something here. Lets hope I can convert this in some of my planned work.

A command list for GIT based distributed version control and development

GIT is an important distributed version control discipline that any developer should adhere to. It gives you a command line based perspective to development and organizes the basic operations. As a developer one should be familiar with the daily chores of pull, commit, push local as well as on remote repositories.

Git-Logo-1788C

Just before jumping into any learning its mandatory that you really understand the terminology and githubs help page is really helpful there. https://help.github.com/articles/github-glossary/

When you start working with Git then there is a standard sequence that one tends to follow. I found http://gitref.org useful. Anyways, here is a link which shows similar workflow as below and was edited on http://www.draw.io

Overall GIT versioning workflow diagram

Overall GIT versioning workflow diagram

General Guideline

  1. In order to achieve a sanitized version control we should always maintain a master and a develop branch in the repository in https://github.com/rawnics/ships.git, as is shown in the figure above with their respective git commands in a table below. The PROJ-R1 has a master stable branch and a development branch which is constantly in progress.
  2. The development proceeds in the develop branch and the code here is unit tested, verified and committed by the developer (3):Figure
  3. The develop branch is stable in capacity of nightly build and snapshot development release
  4. The master branch is the production release branch which contains stable release quality code
  5. According to plan as and when development matures into a release we merge that branch to master (4):Figure
  6. In case of a feature product say PROJ-R6, that we want to develop from the PROJ-R1 codebase we create a branch dev_r6 for that (5):Figure
  7. Once this feature product matures and there remains no commonality. We may move it to a separate repository altogether for further development with similar master/develop branch structure
  8. Tag is the most important identifier in the laundry list of commits & push. It should definitely be used for production/master branch to track and identify releases

GIT Commands

A. Workflow and commands for branching to develop and merging back to master repository
(1) Initialize repository
      Initialize a repo       : git init
      Add a remote repo : git remote add origin https://github.com/rawnics/ships.git
      Pull the master       : git pull origin master
(2) Branch checkout,
     List branches           : git ls-remote origin
     Checkout branch     : git checkout -b develop master
     Select branch           : git checkout develop
(3)  Code Changes      : echo “Development branch code base changed” >> Ships\README.md
       (3.1) Index              : git add .
       (3.2) Commit          : git commit -a -m “Development branch code base”
       (3.3) Remote push : git push origin develop
(4) Merge
     (4.1) Compare branch : git diff master develop
     (4.2) Merge develop    : git checkout master
                                            git merge develop
     (4.3) Remote push      : git push
     (4.4) Delete branch     : git branch -d develop
(5) Create feature product branch
(6) Tag branch version
    (6.1) Tag annotated    : git tag “1.0.0” -m “Beta 1.0.0”
    (6.2) Push to tag         : git push origin tag “1.0.0”

    (6.3) Checkout tag      : git checkout “1.0.0”

B. Workflow to understand the basic checkout from and to master repo without branching

Initialize a repo          : git init
Add a remote repo    : git remote add origin https://github.com/rawnics/ships.git
Pull the master          : git pull origin master
Do your changes      : echo “Test Entry for demonstrating change” > Ships\README.md
Stage files                 : git add .
Commit file                : git commit -a -m “new buggy commit of README.md”
Pushed the change   : git push origin master
Check status            : git status
C. Workflow to contribute to a repository by Fork and Pull Request scenario

Fork the repository you want to contribute to in your github account. This setup will have an upstream repository which you just forked, an origin repository which is pointing to your github account and your local master versioned repository branch.
(1) Setup
Initialize a repo             : git init
Add a remote repo        : git remote add origin https://github.com/rawnics/ships.git
Add a upstream repo    : git remote add upstream https://github.com/AWNICS/ships.git
Merge the master          : git pull origin master
Merge the upstream     : git pull upstream master
(2) Checkout Branch
# Start working with the develop branch with a pull-merge
Checkout a branch        : git checkout develop
Merge/Pull a branch     : git pull upstream develop
(3) Commit & Push
# Commit and push branches to your origin like in Section B
git add .
git commit -m “Updated the code” –author=”Rahul Vishwakarma <rahul@awnics.com>”
git push origin develop
git push origin master
(4) Pull Request
 # Github repository
 Create a pull request for AWNICS repository from your Github page

Now there are some jargons that one needs to be familiar with and that can be pulled from the help. I was scouring the net and this video looked really promising.

In case you want an exhaustive description with some fundamentals.