Brokerless ZeroMq to share live image/video data stream from Raspberry PI B+ (Uses OpenCV)

Makes sure you have the following :

  1. OpenCV 3.x compiled and installed on Rasbperry Pi
  2. Raspberry Pi B+ with Raspbian and a USB webcam attached

ZeroMQ

Read through ZeroMQ in 100 words for a brief description

zmq

Installationhttp://zeromq.org/bindings:python

 

Code

Now lets go through the simple code I wrote for publishing and subscribing to live webcam stream from a raspberry PI to a workstation.

 

Make sure you have the dependencies and imports as below

import os, sys, datetime
import json, base64

import cv2
import zmq
import numpy as np
import imutils
from imutils.video import FPS

 

“”” Publish “””

  • In this piece of code we are creating a Zero MQ context preparing to send data to ‘tcp://localhost:5555’
  • The opencv API for camera is used to capture a frame from the device in 640×480 size
  • FPS module is used from opencv to estimate the frame rate for this capture
  • The byte buffer read from the webcam is encoded and sent as a string over the Zero MQ TCP socket connection
  • Continue to send each buffer out on the TCP socket
def pubVideo(config):
    context = zmq.Context()
    footage_socket = context.socket(zmq.PUB)
    # 'tcp://localhost:5555'
    ip = "127.0.0.1"
    port = 5555
    target_address = "tcp://{}:{}".format(ip, port) 
    print("Publish Video to ", target_address)
    footage_socket.connect(target_address)
    impath = 0 # For the first USB camera attached
    camera = cv2.VideoCapture(impath)  # init the camera
    camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    print("Start Time: ", datetime.datetime.now())
    fps = FPS().start()
    while True:
        try:
            buffer = capture(config, camera)
            if not isinstance(buffer, (list, tuple, np.ndarray)):
                break
            buffer_encoded = base64.b64encode(buffer)
            footage_socket.send_string(buffer_encoded.decode('ascii'))
            # Update the FPS counter
            fps.update()
            cv2.waitKey(1)
        except KeyboardInterrupt:
            # stop the timer and display FPS information
            fps.stop()
            print("[INFO] elasped time: {:.2f}".format(fps.elapsed()))
            print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
            camera.release()
            cv2.destroyAllWindows()
            print("\n\nBye bye\n")
            break
    print("End Time: ", datetime.datetime.now())

 

“”” Subscribe “””

  • The ZeroMQ subscriber is listening on ‘tcp://*:5555’ 
  • As the string is received its decoded and converted to image using OpenCv
  • We use OpenCV to visualize this frame in a window
  • Every frame sent over the ZeroMQ TCP socket is visualized and appears as a live video stream
def subVideo(config):
    context = zmq.Context()
    footage_socket = context.socket(zmq.SUB)
    port = 5555
    bind_address = "tcp://*:{}".format(port) # 'tcp://*:5555'
    print("Subscribe Video at ", bind_address)
    footage_socket.bind(bind_address)
    footage_socket.setsockopt_string(zmq.SUBSCRIBE, str(''))
    while True:
        try:
            frame = footage_socket.recv_string()
            img = base64.b64decode(frame)
            npimg = np.fromstring(img, dtype=np.uint8)
            source = cv2.imdecode(npimg, 1)
            cv2.imshow("image", source)
            cv2.waitKey(1)
        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            print("\n\nBye bye\n")
            break

Github: https://github.com/vishwakarmarhl/cnaviz/blob/master/imcol/CvZmq.py

 

Run

In the github code above, you can run the test as follows,

PUB: Webcam source machine

python CvZmq.py pub --pubip=127.0.0.1 --pubport=5555

SUB: Target visualization machine

python CvZmq.py sub --subport=5555

 

Advertisements

Java Integration with Kafka distributed message broker

Kafka is a major distributed, partitioned, replicated, commit log service used as a message  broker in the current tech industry open-sourced by Linked-In. It functions as a central messaging bus for various domains and frameworks specializing in big-data systems.

It works as a broker between the producers and consumers with a strong order guarantee than any traditional messaging system. It provides a single consumer abstraction model that generalizes the queuing and publish-subscribe model as a Consumer-Group.

Reference: http://kafka.apache.org/documentation.html#introduction

Github: https://github.com/vishwakarmarhl/kafka-sample

The API is provided by Kafka-clients version 0.9.0.0 

Here I am going through a sample that is capable of publishing a message over the kafka topic. There is a listener constantly waiting with a consumer group that works as a thread pool subscribed to the topic.

A. Setting up kafka is as simple as extracting the archive and starting the server.

Reference: http://kafka.apache.org/090/documentation.html

Download: https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

Say you have a virtual machine as per my setup and the host IP as 192.168.56.101

  1. Download and extract the above kafka-2.11-0.9 build in /opt folder, and set the KAFKA_HOME variable
  2. Configure $KAFKA_HOME/config/server.properties
    advertised.host.name=192.168.56.101

kafka broker internals

If you are into spring and java then this is a great introduction resource from SpringDeveloper. However in this tutorial I am covering a very basic implementation.

 

B. Walk-through of the Producer and Listener

producer_consumer

The code has two main classes responsible for the send (KafkaSpeaker.java) and listen (KafkaListener.java) functionality. KafkaMain is responsible for launching the KafkaListener in a pre-configured consumer threadpool.

KafkaProducer.java

The producer is configured with the broker url and client identifier.

//The kafka producer is configured with the topic and broker address for sending the message as follows: 

 Properties props = new Properties();
 props.put("bootstrap.servers", prop.getProperty("producer.url"));
 props.put("client.id", prop.getProperty("producer.client.id"));
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 this.producer = new KafkaProducer<>(props);

The producer can send the message on a topic using the send api

producer.send(new ProducerRecord<String, String>(topic,
  String.format("{\"type\":\"sample\",\"t\":%.3f,\"k\":%d}",
                System.nanoTime()*1e-9,i)
  ));

 

KafkaListener.java

The listener has the concept of consumer group that can listen on each partition and transact on the messages queued for consumption and further processing.

consumer-groups

The consumer group is created using the threadpool executors as follows,

final ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<KafkaListener> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
 KafkaListener consumer = new KafkaListener(i, prop, topics);
 consumers.add(consumer);
 executor.submit(consumer);
}

Configure the listener and create a consumer

props.put("bootstrap.servers", prop.getProperty("consumer.url"));
props.put("group.id", prop.getProperty("consumer.group.id")); 
props.put("client.id", prop.getProperty("consumer.client.id"));
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
this.timeOut = Integer.parseInt( prop.getProperty("consumer.poll.timeout"));

Subscribe and listen infinitely on the topic until the shutdown hook is executed for this thread pool.

consumer.subscribe(topics);
while (true) {
 ConsumerRecords<String, String> records = consumer.poll(this.timeOut);
 for (ConsumerRecord<String, String> record : records) {
    log.info(record.topic()+" - "+this.id + ", Data: " + record.value());
 }
}

 

The code available in github is self-sufficient and simple enough and was very simple to create.

Thanks to the great article that set me of on this trial project.