Stream Data with StreamSets & Analyze with Spark


Tis the season of NFL football, and one way to capture excitement is Twitter data. I’ve tinkered around with Twitter’s Developer API before, but this time I wanted to use a streaming service I’ve heard good things about, called .

After I received the Tweets semi-raw data, I wanted to analyze the Tweet data using Spark. I choose Spark as the distributed nature of the RDD is great for using large amounts of data (and I’m not sure on how much I’ll be getting).

My idea was to do a count of tweets for a particular team/game and see if the volume of Tweets would predict whether that team actually wins or losses the game.

Data Collection Process


I have done a little work with the Twitter Developer API in the past, which I had used Python to parse the tweets as they arrived. I found this process very simple, but I was a little apprehensive brining StreamSets into the mix. However, having the knowledge of a scalable ETL and streaming program like StreamSets is good idea.

To use StreamSets, I did some google searches on “streaming twitter StreamSets”. I found a very well put together tutorial. It looked promising, so I felt confident enough to download the StreamSets application on my Mac and install it. I was a 145mb zip download, extracted as a java project.

After starting via the terminal, I was able to connect to it via localhost through my web browser, which I appreciate.

To connect to Twitter API via the StreamSets HTTP Client, I had to define Resource URL. Instead of getting all the tweets available, I decided to filter only tweets with “nfl” located in the tweet or hash tag. Also note, the Twitter API is random sampled real-time amount of Tweets. All other filtering and counting I was planning on doing in Spark later, but I’m sure some more of that ETL could have also been done in StreamSets.

As for the credentials to connect to Twitter, I had to enter four values: Consumer Key, Consumer Secret, Token, Token Secret. At this point as a test using the StreamSets UI, I connected the HTTP Client to save in Local FileSystem, then ran the pipeline.

I reviewed a few of lines of raw Tweet output in a text editor and online JSON viewer. I decided I didn’t need all the the JSON fields, so I created Field Removal into my pipeline between the HTTP Client and saving to the Local FileSystem. The fields I decided keep (I went with more rather than less as I didn’t know exactly what I’d need in Spark): Create Date/Time, userId, Tweet text, username, user location, user timezone, hashtags, retweet status, retweet count, location. After running, it looked good!

As I was in NFL week 13 (Thursday 11/30 – Sunday 12/03), I decided to run the Pipeline on the Thursday game as a test. I noticed plenty of data (around 5k tweets) relating to the NFL for those three hours of 7pm to 10pm. I thought this was a good proxy for plenty of data to capture for Sunday – when was my intended data to go after for analysis of the project.

Final pipeline diagram:

On Sunday, 12/3, I started the Pipeline at 11:59am and ran it until about 715pm that day. By running during that time, it would allow me have option of analysis on both the noon and 3pm games.

After I stopped the Pipeline I had 9 folders of data (one folder for each hour, which was default setting in StreamSets local file system settings (the first hour was only one min, and the last folder representing 7pm was also very few). The size of all the Sunday tweets was about 52mb.

Before diving into Spark, I wanted to get an idea on the amount of tweets in my data for data validation purposes. Using the terminal, I did a “wc -l filename” for the 12pm and 3pm hours. The total lines were 3,145 and 4,110. Since I have about 7 full hours, I would expect my data in Spark to have about 20k – 25k Tweets.

Spark Processing and Validation


I had the data in my local drive on the Cluster, so now I copied that data to HDFS for Spark to access. After starting the Spark-shell, I went to read in the data using the HDFS path + “/*”. However, after doing a count on the Tweets, it seems very low. It turns out, I needed additional “/*” added to access at the subdirectories. I did a count on the RDD, and came out to 20,202, which validated to the linux command I ran on my local in which I estimated 20k – 25k Tweets.

Moving on to what I was looking for, which at this point counting number of Tweets during a game for a particular team playing. I decided to break the dataset into two RDDs. The first would be mapping and getting just the “hour” of the Tweet. The second would be mapping to get the “text” of the Tweet.

The final data structure would need to combine the two RDDs, so I could count across specific hours and Tweet contained the team name. I decided on the tuple data structure. Then, I just filter the tuple by hours of a game and team name. For example, for the Vikings/Rams game (which started at noon), would be an hour representing noon, 1pm, 2pm and Tweet text containing “vikings” or “rams”.

I had to repeat this process for each team in the noon games, which there were seven games. At this point, I decided to create a JAR and submit the job via Spark-Submit. The input the shell-script to run the JAR on the cluster was to enter Input Data Location, Output Data Location, and team name. By doing this, it sped up the process of gathering the count of Tweets for each team as I just had to update the team name in the shell-script and running it right from the Cluster..

I was making the assumption noon game would run from noon – 3pm.

Other Programs


I used text editors for writing my code. On my Windows, it was Notepad++.

For creating the JAR file for Spark-Submit, I used the Cloudera VM, and run Eclipse IDE.

For moving JARs along with connecting to the cluster & Cloudera VM, I used Putty and WinSCP.  If using my mac, I would would just SCP.

For visualizing the results I used Excel. If I were using more variables in the dataset and looking for more of a dynamic visualization, I most likely would have used Tableau.



Of the 7 games played at noon, 4 of the 7 who were winners had more Tweets. I don’t think that is it significant to say the Tweet activity predicted the outcome, but interesting nonetheless.



I have used a Hadoop cluster many times over the past 3 years. From a Data Science perspective, it’s really not the greatest tool due to effort needed to move data and lack on statistical/visualizations tools built-in. Going forward, if I were to consult similar tools, I would look into something like Cloudera’s Data Science Workbench. However, I’m a firm believer in the knowledge to perform all functions through the command line, so this project further enhanced my skillset.

Array_Position Custom UDF in Hive


Working with arrays in hive is pretty slick. However, I’ve run into an issue in which in the published Hive UDFs there is no function to return an index of a value within an array when it contains an item you’re looking for. So I took it upon myself to write it. This code runs on hive:
import sys

def get_position (item, item_list_string):
                # making a list as it would be a string coming from hive
		item_list = item_list_string.split(',') 
		array_position = 0
		for position, value in enumerate(item_list):
			value = value.replace('[', '')
			value= value.replace(']', '')
			value = value.replace('\"', '')
			if value == item:
				array_position = position + 1
		# Add all the output values to a list
		output = [str(array_position)]
		# Print output as tab delimited string objects to stdout
		print '\t'.join(map(str, output))
		m = 0

def main(argv):
	# Hive submits each record to stdin
	# The record/line is stripped of extra characters
	for line in sys.stdin:
		line = line.strip()
		item, item_list_string = line.split('\t')
		get_position(item, item_list_string)

if __name__ == "__main__":
	main(sys.argv[0]) # 0 as there are no args besides the hive query fields

The hive code to invoke this function is as follows:

SELECT TRANSFORM (a.item, a.item_list_string)
AS array_position
(select item, item_list_string
from your_table) a; 

Running Python in Hive/Hadoop


One of the things I love about running Hive is the ability to run Python and leverage the power of the parallel processing. Below I’m going to show a stripped down example of how to integrate a Hive statement & Python together to aggregate data to prepare it for modeling. Keep in mind, you can also use Hive & Python to transform data line by line as well, and it extremely handy for data transformation.

Use case: print out an array of products sold to a particular user. Again is a basic example, but you can build upon this and generate products sold for every user, then use KNN to generate clusters of users, or perhaps Association Rules to generate baskets.

Here is the Python script, which will have to be saved in local Hadoop path:

import sys

items_sold = []  # create global list variable

class Items:  # create class to store and access items added
    def __init__(self, x):
    	self.x = x

    def set_x(self, x):
        self.x = x
    def get_x(self):
        return self.x

def print_results():  # print output in Hive
	result_set = [item.get_x() for item in items_sold];
	print (result_set)

	# Hive submits each record to stdin
	# The record/line is stripped of extra characters and submitted
for line in sys.stdin:
	line = line.strip()
	purchased_item = line.split('\t')


Here is the hive statement:

add file; 
select TRANSFORM (a.purchased_item)
using ''
AS array_purchased
from (select purchased_item from company_purchases where user_id = 'u1') a;

Result in Hive will be similar to this: [‘s_123’, ‘s_234’, ‘s890’]