Java Integration with Kafka distributed message broker

Kafka is a major distributed, partitioned, replicated, commit log service used as a message  broker in the current tech industry open-sourced by Linked-In. It functions as a central messaging bus for various domains and frameworks specializing in big-data systems.

It works as a broker between the producers and consumers with a strong order guarantee than any traditional messaging system. It provides a single consumer abstraction model that generalizes the queuing and publish-subscribe model as a Consumer-Group.

Reference: http://kafka.apache.org/documentation.html#introduction

Github: https://github.com/vishwakarmarhl/kafka-sample

The API is provided by Kafka-clients version 0.9.0.0 

Here I am going through a sample that is capable of publishing a message over the kafka topic. There is a listener constantly waiting with a consumer group that works as a thread pool subscribed to the topic.

A. Setting up kafka is as simple as extracting the archive and starting the server.

Reference: http://kafka.apache.org/090/documentation.html

Download: https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

Say you have a virtual machine as per my setup and the host IP as 192.168.56.101

  1. Download and extract the above kafka-2.11-0.9 build in /opt folder, and set the KAFKA_HOME variable
  2. Configure $KAFKA_HOME/config/server.properties
    advertised.host.name=192.168.56.101

kafka broker internals

If you are into spring and java then this is a great introduction resource from SpringDeveloper. However in this tutorial I am covering a very basic implementation.

 

B. Walk-through of the Producer and Listener

producer_consumer

The code has two main classes responsible for the send (KafkaSpeaker.java) and listen (KafkaListener.java) functionality. KafkaMain is responsible for launching the KafkaListener in a pre-configured consumer threadpool.

KafkaProducer.java

The producer is configured with the broker url and client identifier.

//The kafka producer is configured with the topic and broker address for sending the message as follows: 

 Properties props = new Properties();
 props.put("bootstrap.servers", prop.getProperty("producer.url"));
 props.put("client.id", prop.getProperty("producer.client.id"));
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 this.producer = new KafkaProducer<>(props);

The producer can send the message on a topic using the send api

producer.send(new ProducerRecord<String, String>(topic,
  String.format("{\"type\":\"sample\",\"t\":%.3f,\"k\":%d}",
                System.nanoTime()*1e-9,i)
  ));

 

KafkaListener.java

The listener has the concept of consumer group that can listen on each partition and transact on the messages queued for consumption and further processing.

consumer-groups

The consumer group is created using the threadpool executors as follows,

final ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<KafkaListener> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
 KafkaListener consumer = new KafkaListener(i, prop, topics);
 consumers.add(consumer);
 executor.submit(consumer);
}

Configure the listener and create a consumer

props.put("bootstrap.servers", prop.getProperty("consumer.url"));
props.put("group.id", prop.getProperty("consumer.group.id")); 
props.put("client.id", prop.getProperty("consumer.client.id"));
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
this.timeOut = Integer.parseInt( prop.getProperty("consumer.poll.timeout"));

Subscribe and listen infinitely on the topic until the shutdown hook is executed for this thread pool.

consumer.subscribe(topics);
while (true) {
 ConsumerRecords<String, String> records = consumer.poll(this.timeOut);
 for (ConsumerRecord<String, String> record : records) {
    log.info(record.topic()+" - "+this.id + ", Data: " + record.value());
 }
}

 

The code available in github is self-sufficient and simple enough and was very simple to create.

Thanks to the great article that set me of on this trial project.

Advertisements

Spring Rest API and Mongodb with GridFS

Reference: https://github.com/vishwakarmarhl

RDataServer

Hello draft of the Mongo DB based file data store service

GIT USAGE

There are two more branches apart from the master branch which have the file upload functionality

  1. Checkout the branch
  2. Pull the current branch
    git pull
  3. After making changes in any of them index and commit
    git add .
    git commit -m “Updated the code and added a message”
  4. Push changes to the github repository
    git push origin FileUploadGridFSSpring
    git push origin MultiPartFileUpload

SETUP & START MONGO DB WIN x64

Download : http://www.mongodb.org/dr/downloads.mongodb.org/win32/mongodb-win32-x86_64-2.4.5.zip/download

  1. Unzip the zip contents in C:\mongodb\
  2. create C:\data\db
  3. Execute the C:\mongodb\bin\mongod.exe –dbpath C:\data\db

CREATE SAMPLE DATA

//The following command simply pre-allocates a 2 gigabyte, uncapped collection named people. db.createCollection(“files”, { size: 2147483648 }) db.files.save( { fileId: ‘1235’, fileName: ‘V_XXX.EXE’, filePath: ‘/opt/storage/rhldata’, fileSizeInKB: 123342, fileExtensionType: ‘EXE’ })

PROJECT USAGE

  1. Start the mongod.exe standalone server
  2. Import the source as a maven project in Eclipse STS IDE
  3. MVN PACKAGE
  4. Deploy on a tomcat instance to see the data from mongodb
  5. An alternate plugin for tomcat enables maven based initialization. mvn tomcat7:run
  6. Open up http://localhost:8088/RDataServe to checkout the grid for file data

APPENDIX/REFERENCE

Mongo Shell Cmds

show dbs show collections

//This command creates a collection named file with a maximum size of 5 megabytes and a maximum of 5000 documents. db.createCollection(“files”, { capped : true, size : 5242880, max : 5000 } )

//The following command simply pre-allocates a 2 gigabyte, uncapped collection named people. db.createCollection(“files”, { size: 2147483648 })

//Drop a collection capped db.files.drop()

//Insert db.files.insert( { _id: 1, fileId: 1234, fileName: ‘R_XXX.EXE’, filePath: ‘/opt/storage/rhldata’, fileSizeInKB: 123412, fileExtensionType: ‘EXE’ })

db.files.save( { fileId: ‘1235’, fileName: ‘V_XXX.EXE’, filePath: ‘/opt/storage/rhldata’, fileSizeInKB: 123342, fileExtensionType: ‘EXE’ })

//Query db.files.find({fileId:1234})

A quick setup for tomcat 7 on CentOS 6. Also, added the SSL configuration with self-signed certificates to run tomcat 7 on HTTPS secured SSL layer

Setup tomcat

1.) Pre-requisite:

Since Java is a major requirement

$ yum install java-1.7.0-openjdk-devel.x86_64

Add the JAVA_HOME environment variable to ~/.bashrc file 
  #Env variables for java
  export JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64
  export CATALINA_HOME=/opt/tomcat7
  export PATH=$PATH:$JAVA_HOME/bin

Open the ports that will be used by tomcat for service

Flush the tables before config
$ iptables -F
$ iptables -t nat -F

Now setup INPUT ports
$ iptables -I INPUT -p tcp --dport 8443 -j ACCEPT
$ iptables -I INPUT -p tcp --dport 8080 -j ACCEPT
$ service iptables save
$ service iptables restart

In case we want to route the access from port 80 to tomcats 8080

$ iptables -t nat -I PREROUTING -p tcp --dport 80 -j REDIRECT --to-ports 8080
$ iptables -t nat -I OUTPUT -p tcp --dport 80 -j REDIRECT --to-ports 8080

2.) Download and setup tomcat 7

$ wget http://mirrors.gigenet.com/apache/tomcat/tomcat-7/v7.0.62/bin/apache-tomcat-7.0.62.tar.gz
$ tar -xvzf apache-tomcat-7.0.62.tar.gz
$ mv apache-tomcat-7.0.62 tomcat7
$ mv tomcat7/ /opt/

3.) Create a tomcat specific user and user group. Since the tomcat would be running from a script it should not be root user.

$ groupadd tomcat
$ useradd -g 99 -s /sbin/nologin -d /opt/tomcat7 tomcat
$ passwd tomcat
Adjust Ownership For New Users And Groups. Give the new user access to the tomcat directories. 
$ chown -R tomcat:tomcat /opt/tomcat7
$ chmod 775 /opt/tomcat7/webapps
$ chmod +x /opt/tomcat7/bin/*.sh

4.) Create a startup service script

$ vim /etc/init.d/tomcat
Add the following content to this script
#!/bin/bash
# description: Tomcat Start Stop Restart
# processname: tomcat
# chkconfig: 234 20 80
PATH=$JAVA_HOME/bin:$PATH
export PATH
CATALINA_HOME=/opt/tomcat7
export CATALINA_HOME

case $1 in
start)
   cd $CATALINA_HOME/bin
   /bin/su -s /bin/bash tomcat ./startup.sh
   ;;
stop)
   cd $CATALINA_HOME/bin/
   /bin/su -s /bin/bash tomcat ./shutdown.sh
   ;;
restart)
   cd $CATALINA_HOME/bin/
   /bin/su -s /bin/bash tomcat ./shutdown.sh
   cd $CATALINA_HOME/bin/
   /bin/su -s /bin/bash tomcat ./startup.sh
   ;;
esac
exit 0

5.) Add the tomcat script as a service

$ chmod 755 /etc/init.d/tomcat
$ chkconfig --add tomcat
$ chkconfig --level 234 tomcat on
$ chkconfig --list tomcat

6.) Start/Stop the tomcat service

 $ service tomcat start
 $ service tomcat stop

SSL security with self-signed certificates on tomcat

In order to setup this tomcat on SSL Use the following configuration steps,

1.) Generate a keystore file for this server

This will be used as a self-signed certificate for secured connectivity. 
Default path: /home/%user.home%/.keystore
keytool -genkeypair -dname "CN=127.0.0.1, OU=Rahul, O=Luhar, L=Vishwakarma, ST=Karnataka, C=IN" -alias mysslsecuredserver -keyalg RSA -ext san=ip:127.0.0.1

2.) Add the relevant configuration to the tomcats https connector in conf/server.xml

 maxThreads="150" scheme="https" secure="true"
 clientAuth="false" sslProtocol="TLS" keystoreFile="${user.home}/.keystore" keystorePass="mypassman"/>

3.) Add the server IP to the truststore in order to allow for this self signed certificate

Use the InstallCert.java to add the IP to the trusted store
https://github.com/vishwakarmarhl/javahelper/blob/master/InstallCert.java
Compile InstallCert.java. Run the following two commands to generate jssecacerts binary. 127.0.0.1 is the web servers IP.
$ java InstallCert 127.0.0.1:8443
Copy the generated jssecacerts in this path to %JAVA_HOME%\jre\lib\security

You can also export and import the generated certificate from the keystore with the password and share it with other systems on the network that negotiates with this server.

$ keytool -export -alias mysslsecuredserver -file mysslsecuredserver.cer
  $ keytool -import -trustcacerts -alias mysslsecuredserver -file mysslsecuredserver.cer

Verify the tomcat running and secured via HTTPS.

For a proper SSL shared from a hosting provider. Look at the import into the java cacerts

keytool -import -trustcacerts -file NewRootCACertificate.crt -keystore "%JAVA_HOME%\jre\lib\security\cacert"

http://stackoverflow.com/questions/28521266/caused-by-sun-security-provider-certpath-suncertpathbuilderexception-unable-to

Test Link: https://127.0.0.1:8443

AspectJ component for my services audit logger

This is a blog that will leverage the advantages of AspectJ aspect oriented programming concept in solving a very basic problem of auditing the visitors to your service and the response times. These parameters are very important when we want to do some operations around these.

Pre-requisite for AspectJ in your pom.xml

 <!-- Spring AspectJ -->
 <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aspects</artifactId>
    <version>${org.springframework-version}</version>
    <scope>compile</scope>
 </dependency>
 <dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>${org.aspectj-version}</version>
    <scope>compile</scope>
 </dependency>
 <dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjrt</artifactId>
    <version>${org.aspectj-version}</version>
 </dependency>
 <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aop</artifactId>
    <version>${org.springframework-version}</version>
 </dependency>
 <dependency>
    <groupId>cglib</groupId>
    <artifactId>cglib</artifactId>
    <version>3.1</version>
 </dependency>

In your application context add the configuration as per the standard aop proxy. You should also include component-scan and annotation-driven.

<!-- Aspects -->
 <aop:aspectj-autoproxy proxy-target-class="true"/>

Here is the actual code that will intercept around the public calls  in controller. It will print the service url, method, parameters and arguments. We can also obtain the user from the session context.

/**
 * @author Rahul Vishwakarma
 * This class will log all the service calls made to the Generic Application 
 * relying on the @Around advice
 * Ref: http://docs.spring.io/spring/docs/3.0.x/spring-framework-reference/html/aop.html#aop-understanding-aop-proxies 
 */
@Component
@Aspect
public class GenericLoggerAspect {
 /** Logger for this class and subclasses */
 private static final Logger log = LoggerFactory.getLogger(RhlLoggerAspect.class);
 
 public static ConcurrentHashMap<String,RequestData> responseTime = new ConcurrentHashMap<String,RequestData>();
 
 @Autowired 
 Utility utility;
/**
 * Inner class for request info
 * @author Rahul
 *
 */
 public class RequestInfo{
 public int responseTimeMills = 0;
 public Date accessTime = null;
 public String urlPath;
 public String requestType;
 public String args;
 }
 /**
 * Inner class for Data capturing request information
 * @author Rahul
 *
 */
 public class RequestData {
   public RequestInfo requestInfo;
   public String api;
   public UserInfo userInfo;
 
   public RequestData(RequestInfo requestInfo,UserInfo userInfo,String methodSignature){
   this.requestInfo = requestInfo;
   this.userInfo = userInfo;
   this.api = methodSignature;
   }
 }

/**
 * User and client related info
 * @author Rahul
 *
 */
 public class UserInfo{
   public String userName;
   public int userId;
   public String sessionId;
   public String role;
   public String clientIp;
 }
 
 private String getRepresentation(Object [] params){
   StringBuilder sb = new StringBuilder();
   if(params!=null)
   {
   String value = null;
   for(int i=0;i<params.length;i++){
   value = params[i] + ",";
   if(value.contains("@")){
     value = "";
   }
     sb.append(value);
   }
    if(sb.length()>0)
    return sb.substring(0, sb.length() - 1);
  }
  return sb.toString();
}
enum IssueType{
  ISSUE_URL_SUFFIX, OTHER, NONE
}
@Around("execution(@*..RequestMapping * * (..))")
public Object log_around(ProceedingJoinPoint pjp) throws Throwable {
 
  Object obj = null;
  IssueType issueType = IssueType.NONE;
  String error="Error in GenericLoggerAspect";
 try{
  String methodSignature = pjp.getSignature()+"";
  StringBuffer args = new StringBuffer(); //getRepresentation(pjp.getArgs());
 
  //Append arguments
  Object[] arg = pjp.getArgs();
  for (int i = 0; i < arg.length; i++) {
    args.append(arg[i]).append(",");
  }
  if (arg.length > 0) {
   args.deleteCharAt(args.length() - 1);
  }
 
  log.info("\tSTART {}-{}", methodSignature+" ["+Thread.currentThread().getId()+"]",args.toString());

  UserInfo userInfo = new UserInfo();
  UserSecure userSecure = utility.getUserInfo();
  if(userSecure != null){
   userInfo.clientIp = userSecure.getClientIp();
   userInfo.userId = userSecure.getUserId();
   userInfo.sessionId = userSecure.getSessionId();
   userInfo.userName = userSecure.getUserName();
  if(userSecure.getAuthorities() != null)
   userInfo.role = userSecure.getAuthorities().toString();
  }
  ServletRequestAttributes sra = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes();
 
  String urlPath=""; 
 
  if(sra!=null){
    HttpServletRequest req = sra.getRequest();
  if(req!=null){
    urlPath = req.getServletPath();
  } 
 }
 
 long start = System.currentTimeMillis();
 if(urlPath.endsWith("/")){
   issueType = IssueType.ISSUE_URL_SUFFIX;
   obj = null;
 } else
   obj = pjp.proceed();
 
 String requestType = "ajax";
 if(obj!=null && (obj instanceof ModelAndView)){
   requestType = "page";
 }
 int elapsedTime = (int) (System.currentTimeMillis() - start); 
 RequestInfo requestInfo = new RequestInfo();
 requestInfo.accessTime = (new Date());
 requestInfo.requestType = (requestType);
 requestInfo.responseTimeMills = (elapsedTime);
 requestInfo.urlPath = (urlPath);
 requestInfo.args = (args.toString());
 
 RequestData requestData = new RequestData(requestInfo, userInfo,methodSignature);
 log.info(userInfo.sessionId + " REQ {} by "+userInfo.userName +"@"+userInfo.clientIp+", time {} mills; args ["+args.toString()+"]", urlPath + " ["+ Thread.currentThread().getId() +"]" , elapsedTime);
 responseTime.put(methodSignature, requestData);
 //addUserAuditLog(requestData);
 
 }catch(Exception ex){
 issueType = IssueType.OTHER;
 error = ex.getMessage();
 log.error(ex.getMessage());
}
 
if(obj==null){
 switch(issueType){
 case ISSUE_URL_SUFFIX:throw new GenericException("URL ends with trailling /"); 
 case OTHER:throw new GenericException(error);
 default:
 break;
 } 
}
return obj;
}
/**
 * @param exceptions
 */
@AfterThrowing(pointcut="execution(public * com.generic.controller.*.*(..))",throwing="ex") 
 public void MethodError(Exception ex){ 
   log.error("@Exception {}", ex.toString()); 
 } 
 
@Pointcut("execution(public * *(..))")
private void anyPublicOperation() {
   log.info("Testing the public Execution call");
 }
}

When we run the system we expect the following response in the log which does the @Around joint point.

Before:

17-Jul-2014 03:25:35,067-INFO – GenericLoggerAspect:130 –       START List com.humesis.generic.controller.UserController.getUsers(HttpServletResponse)

After:

17-Jul-2014 03:25:35,106-INFO – GenericLoggerAspect:176 – 674B8F114926E5A3BB143E7126D828C7 REQ /users [28] by rahul@0:0:0:0:0:0:0:1, time 36 mills; args [HttpSessionSecurityContextRepository]

This data can Asynchronously be audited or logged for generating the access pattern or hotspots in the service access.

 

CAS-ify and Implement Single sign on in your application, Oh what a rellief…

 

Ah, there comes a time in a developers life when the application they develop requires to be actually used and in this particular case I am talking about multiple applications in the eco-system. Now this developer in discussion appears to be an enterprise scale geek. In order to use these applications, people need to be scrutinized by a central or single sign on like security entity. Here comes a Central Authentication Service to rescue your CASe.

 

I will take you through a set of steps here,

 

A. Configure CAS single sign on Server on a Tomcat with SSL configured.

Steps are detailed here.

B.. Create a service application to actually authenticate with this CAS server and service your request.

RESTful client for CAS secured services. A sample example is available here

 

C. Generate your SSL trusted certificates so that This CAS Server and your Service application can actually interact

Setup Certificates:   The SSL related certificates used for development are self-signed in nature and are restricted to IP on which server and services are running. The keytool command provisioned by the JDK is used for this purpose.

Self-Signed Certificate Setup steps for CAS:

  1. Configure: https://wiki.jasig.org/display/CASUM/RESTful+API
  2. Test : Using commons http client
  3.  Workflow,
  • - Get the TicketGrantingTicket from server = "https://localhost:8443/cas/v1/tickets";
  • - Get the ServiceTicket service = "https://localhost:8443/cas-sample/secure";
  • - Based on the service ticket GET access to the secured REST API service

D. Certification Path Exception for SSL handshake:
Reference: http://www.mkyong.com/webservices/jax-ws/suncertpathbuilderexception-unable-to-find-valid-certification-path-to-requested-target/
Source: https://github.com/vishwakarmarhl/javahelper/blob/master/InstallCert.java
Command: java InstallCert localhost:8443 // Also add trust for the service and cas_server IP

E. Use a Http Test client to authenticate and call the service.