Connecting Storm and Kafka

Previously, we set up Kafka and we’ve also set up Storm.

Now, let’s connect the two!

By now, we’ve assumed that you are able to get the ExclamationTopology running from our previous guide. In this guide, we are going to use Kafka as our Spout to the ExclamationTopology.

I usually check out the binary version of Storm to run Storm itself, but I like to develop on the source version of Storm. To that end, let’s grab the source version of Storm and extract it:

$ wget http://apache.mirrors.pair.com/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating-src.tar.gz

$ tar -zvxf apache-storm-0.9.2-incubating-src.tar.gz

You can see that a Kafka-connector project has already been written for us, in external/storm-kafka.

To understand how to leverage this project, let’s make Kafka as our Spout for ExclamationTopology.

First, copy the ExclamationTopology to the storm-kafka project, since that’s where all the KafkaSpout classes are (there are other ways of doing this, such as bundling storm-kakfa into a jar, and using these libraries instead).

$ cp apache-storm-0.9.2-incubating-src/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java apache-storm-0.9.2-incubating-src/external/storm-kafka/src/jvm/storm/kafka

Now add the following imports into ExclamationTopology:

import storm.kafka.SpoutConfig;
import backtype.storm.spout.SchemeAsMultiScheme;

It should look something like this:

storm-kafka-import

Remember also to change the package at the top of the import statements to storm.kaka (not storm.starter), as seen in the screenshot.

Then use this to setup the Kafka Spout, assuming you have created a “test” topic as shown in our Kafka guide.

String zkConnString = "ec2-54-183-196-65.us-west-1.compute.amazonaws.com";
String topicName = "test";

BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test", "discovery");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

builder.setSpout("kafka", kafkaSpout);

Here is a screenshot:

storm-kafka-config

Then add the following lines to the pom.xml file currently in that directory. These have to go within the “plugins” bracket:

Also, remove the “scope” field in the org.apache.kafka POM dependency in the pm.xml file under storm-kaka.

Then, compile the package, and submit it to a Storm production cluster.

If you remember from our tutorial on Kafka, you can open up a Producer command line terminal and start typing messages into the Kafka. Go ahead and do this.

On the other side, you should see Storm adding exclamations to everything you type.

Congratulations, you have connected Storm with Kafka!

Setting up Storm and Running Your First Topology

This guide will setup Storm on a single Ubuntu instance, and show you how to run a simple Word Count Topology. This guide assumes no experience with Storm.

Storm was created by Nathan Marz, and is designed for real-time event processing, and improves on some of Hadoop’s distributed design. Storm provides excellent documentation, which you are highly encouraged to go through. If you’re pressed for time, though, the following guide gets you started with running a simple real-time event processor (this is called a topology, but I assume you haven’t read any documentation and just want to get the thing up and running. Though this puts you at a conceptual disadvantage, there’s nothing like getting your hands dirty right away).

Setting up Storm

First grab version 0.9.2 of Storm (already compiled version)

$ wget http://apache.mesi.com.ar/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz

Extract the files:

$ tar -zxvf apache-storm-0.9.2-incubating.tar.gz

Grab the latest version of maven:

$ sudo apt-get install maven

If you are on a Mac:

$ brew install maven

also set the JAVA_HOME path in Mac through the ~/.bash_profile file:

$ export JAVA_HOME=$(/usr/libexec/java_home)

Check the maven version to see that it installed correctly:

$ mvn -version

mvn_version

If you checked out the src version of Storm, you can build and install the Storm jars locally with the following command (requires pom.xml file). This doesn’t need to be done if you already downloaded the compiled version as this guide has shown. However, it’s worth noting now because you’ll be using this command to compile your projects if you modify any of the source code.

Instead of building jars for the Storm project (since we’ve checked out the compiled version), let’s build the jar file for the storm-starter example project. First go into the storm-starter project within the apache-storm-0.9.2-incubating/examples folder:

$ cd apache-storm-0.9.2-incubating/examples/storm-starter

Now compile and build the jar files:

$ mvn clean install -DskipTests=true

It should take a few minutes, and you’ll see a lot of output. At the end of the output, you should see:

mvn_success

You are now ready to run your first Storm job (called a “topology”). We are going to run the topology located in apache-storm-0.9.2-incubating/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java

Let’s run the topology first, and then go briefly into the details of what is happening.

In the storm-starter directory, issue:

$ mvn compile exec:java -Dstorm.topology=storm.starter.WordCountTopology

The whole process will take about 50 seconds, and you will see a whole bunch of output. The main thing to look for in this output is something like this:

wordcount_output

It should occur near the middle of all the output being shown. The end of the output should a bunch of shutdown messages, along with a success message like this:

wordcount_success

Congratulations! You have ran your first Storm topology!

Storm has a local mode (called “Local Cluster”) and a production-cluster mode. The local mode is helpful for development and debugging, while the production-cluster mode is intended to run in a production environment. You just submitted the WordCountTopology in local mode.

Let’s take a look at what you just did.

A Storm topology consists of “Spouts” and “Bolts”. You can think of Spouts as obtaining the data, and Bolts as transforming the data. In a topology, you typically have one or more Bolts stemming from one Spout. The “data” in a Storm topology is called a “Tuple”.

bolts_spouts

In the WordCountTopology, the Spout used is the RandomSentenceSpout:

wc_randomsetence

RandomSetenceSpout is located at apache-storm-0.9.2-incubating/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java

If you take a peak at this file, you can see the sentences being used:

randomsentence

That explains our output in our example – the words being “emitted” are taken from these sentences. The nextTuple method is common in all Spouts, and determines what you want the Spout to do. As you can see, it is a method that is commonly overridden.

Let’s now take a look at the Bolts in WordCountTopology.java:

wc_bolts

These bolts are methods defined with the same file (WordCountTopology.java). By their name, we can guess what they do. Let’s take a look at “SplitSentence”:

split_sentence

It looks like it is calling a python script called “splitsentence.py”. Doing a little digging, we find this script located in apache-storm-0.9.2-incubating/examples/storm-starter/multilang/resources/splitsentence.py

We’ve just stumbled upon a cool thing that Storm can do – bolts are allowed to be language-agnostic! This means, we can write our logic in any language we please. In this case, we are splitting sentences with Python! Here is the splitsentence.py logic:

splitsentence-py

As you can see, it’s splitting the sentences by a single space, and “emitting” each word in that sentence.

So our first bolt “SplitSentence” is actually a python script that splits the sentences into words. Let’s take a look at our second bolt, “WordCount”, which is defined in WordCountTopology.java:

wordcount_java

As you can see, a HashMap called “counts” is created, which stores the counts of each word going through.

This is the basic and fundamental template of a Storm topology. All other topologies you see are just different variations on this.

Just for completeness, let’s take a look at the rest of WordCountTopology:

wc_config

As you might guess based on the names of the variables, the rest of the file is used for configuration information.

conf.setDebug controls the verbosity of the output. The block of code within the “if” statement is configuration for production, and the block of code in the “else” statement is for local mode (which is what we just saw). The topology being submitted is called “word-count”, and we’ve asked the job to run for 10 seconds.

In the meantime, as a “homework” assignment, you are encouraged to get the ExplanationTopology.java working, located in examples/storm-starter/src/jvm/ExclamationTopology.java

If you are feeling ambitious, try modifying the input Spout (TestWordSpout.java), and see how things change. However, you will need to download the source version and build storm-core from scratch, as TestWordSpout.java is part of storm-core. Remember to issue the compile command at the top storm level after each modification of the code:

$ mvn clean install -DskipTests=true

Deploying Storm on Production

Package the project for use on a Storm cluster. For instance, in storm-starter, do:

$ mvn package

The package should be in:

target/storm-starter-{version}-jar-with-dependences.jar

You can check out the binary version of storm (as we did above), and use the “storm” command from there. You can also add the bin path to $PATH.

Read this to fill in the storm.yam part.

I only modified the following in storm.yam:

storm.zookeeper.servers:
– “localhost”
# – “server2”
nimbus.host: “localhost”

Then start nimbus, supervisor, and UI:

$ storm nimbus
$ storm supervisor
$ storm ui
(localhost:8080 by default)

Then, from the machine that has the storm-jar-dependencies.jar, submit it:

$ storm jar /Users/haroldnguyen/workspace/storm-tutorial/apache-storm-0.9.2-incubating-src/examples/storm-starter/target/storm-starter-0.9.2-incubating-jar-with-dependencies.jar storm.starter.ExclamationTopology production-topology-1

The logs are located in the binary version of storm/logs.

The storm ui is pretty neat, giving you a summary of running topologies and visualization:

If you still have trouble, please read through this excellent documentation on Running Topologies on a Production Cluster.

Conclusion

Congratulations – you went from knowing nothing about Storm to running a Word Count topology! The world is really your oyster!

In our next post, we’ll see how to connect Storm with Kafka!

Setting up Zookeeper and Kafka

This was tested on an m1.medium instance on AWS on Ubuntu 12.04.

Installing Zookeeper

Zookeeper is needed for Kafka. To quote the Zookeeper Apache project page,

Zookeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

In other words, we need Zookeeper to maintain configuration for Kafka nodes in a multi-node environment. In this tutorial, we’ll only be setting up a single Kafka node instance, but Kafka relies on Zookeeper to for configuration information.

We can start off by grabbing Zookeeper:

$ wget http://mirrors.advancedhosters.com/apache/zookeeper/stable/zookeeper-3.4.6.tar.gz

Extract the files:

$ tar -zxvf zookeeper-3.4.6.tar.gz

Change to the zookeeper/conf directory:

$ cd zookeeper-3.4.6/conf

In this directory, create the zoo.cfg file from the provided template configuration file named zoo_sample.cfg:

$ cp zoo_sample.cfg zoo.cfg

Open up zoo.cfg and change the dataDir variable to point to /var/lib/zookeeper. (I used the vi editor to make these changes).

dataDir_var

Since this directory doesn’t exist yet, create it:

$ sudo mkdir /var/lib/zookeeper

Install Java on a fresh Ubuntu 12.04:

$ sudo apt-get update
$ sudo apt-get install openjdk-7-jdk

In the zookeeper-3.4.6 directory, start Zookeeper:

$ sudo bin/zkServer.sh start

You should see a message similar to:

zoo_start_success

You can test to see that it can be connected from Java:

$ bin/zkCli.sh -server 127.0.0.1:2181

zoo_connected

Great! Now you have Zookeeper running.

Installing Kafka

We’ll setup using kafka_2.9, since at the time of this writing, Storm-0.9.2 (which we will be using) plays nicer with this version of Kafka.

First grab Kafka, and then extract it:

$ wget http://mirror.cogentco.com/pub/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
$ tar xvzf kafka_2.9.2-0.8.1.1.tgz

Change to the kafka_2.9.2-0.8.1.1/config directory:

$ cd kafka_2.9.2-0.8.1.1/config

Here, change the log.dirs property in the server.properties file to /var/lib/kafka-logs:

kafka_log_dir

Create that directory:

$ sudo mkdir /var/lib/kafka-logs

In the server.properties file again, point the advertised.host.name property to the public DNS (the address that others can connect to) of your instance:

advertised.host

In the kafka_2.9.2-0.8.1.1 directory, you can start the Kafka server:

$ sudo bin/kafka-server-start.sh config/server.properties &

Or, start it as a daemon:

$ sudo bin/kafka-server-start.sh -daemon config/server.properties

You can stop the server anytime:

$ sudo bin/kafka-server-stop.sh

Create a “topic” named test with a single partition and one replica:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

See it at the command line:

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

list_topic_test

You can also start inputting command-line message, and read the output in another terminal. First start the producer to the topic test:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Type a message and press ‘enter’. For instance, the message can be “hello”.

producer

Open another terminal, and in the kafka_2.9.2-0.8.1.1 directory, start the consumer:

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This will start consuming all messages since the beginning of the topic, but you can set the offset. Messages survive for 24 hours by default, but can be tuned to your desire.

You should see each message given from the producer:

consumer

Talking to Kafka with Python

Instead of the command line, maybe we’d like to push messages to Kafka programmatically, say, from a Python script.

First, get ‘pip’ so that you can install Python packages.

$ sudo apt-get install python-pip

Now install the kaka-python package:

$ sudo pip install kafka-python

While leaving your consumer terminal open, run the following Python script:

You should see the message show up in the consumer:

consumer_from_python_producer

You’ve now successfully pushed messages to Kafka using a Python script. There are many more things you can do with the kafka-python package, and you are encouraged to look through the documentation on the kafka-python package Github page.

Summary

In this tutorial, you set up Zookeeper, Kafka, and used a Python script to push messages to Kafka. If you’ve seen any errors in this tutorial, please feel free to leave comments in the section below.