Array_Position Custom UDF in Hive

Standard

Working with arrays in hive is pretty slick. However, I’ve run into an issue in which in the published Hive UDFs there is no function to return an index of a value within an array when it contains an item you’re looking for. So I took it upon myself to write it. This code runs on hive:


#array_position.py
#!/usr/bin/python
import sys

def get_position (item, item_list_string):
	try:
                # making a list as it would be a string coming from hive
		item_list = item_list_string.split(',') 
		array_position = 0
		for position, value in enumerate(item_list):
			value = value.replace('[', '')
			value= value.replace(']', '')
			value = value.replace('\"', '')
			if value == item:
				array_position = position + 1
		# Add all the output values to a list
		output = [str(array_position)]
		# Print output as tab delimited string objects to stdout
		print '\t'.join(map(str, output))
	except:
		m = 0

def main(argv):
	# Hive submits each record to stdin
	# The record/line is stripped of extra characters
	for line in sys.stdin:
		line = line.strip()
		item, item_list_string = line.split('\t')
		get_position(item, item_list_string)

if __name__ == "__main__":
	main(sys.argv[0]) # 0 as there are no args besides the hive query fields


The hive code to invoke this function is as follows:


ADD FILE array_position.py;
SELECT TRANSFORM (a.item, a.item_list_string)
USING 'array_position.py'
AS array_position
from
(select item, item_list_string
from your_table) a; 

Object Orientated Design for Data Science

Standard

Working with complex datasets, often custom code is needed for an intended solution. However, when designing custom code, the use object-oriented design practices promote code reusability and ease to update & extend funtionality. In this post, I’m going to look at the Titanic data (of the 2k passengers, which survived, etc), you can download here Titanic dataset.

This is a basic dataset, however the principles can be applied to more complex data sets. The idea is to perform different operations on each row of data depending on the passenger class (1st, 2nd, 3rd, or the ship’s crew). This could be accomplished by using a bunch of “if, else” statements, but again I’m looking for clean and reusable code here, and when working with complex data sets, it’s a much better approach.

For your reference, here is the top few rows of the data:

Class,Sex,Age,Survived
3rd,Male,Child,No
3rd,Female,Child,Yes
2nd,Male,Adult,No
Crew,Male,Adult,No
1st,Male,Adult,No

And here is the code (with comments):

def readData():
   # source data column mapping: Passenger,Class,Sex,Age,Survived
    hf = HandlerFactory() #create instance
    with open('titanic.csv',"r") as fl:
        next(fl) # skip header
        for lines in fl:
            lines = lines.strip('\n')
            fields = lines.split(',')
            passenger_class = fields[1]
            handler = hf.getHandler(passenger_class)
            handler.apply(fields)
            
class HandlerFactory():
    def __init__(self):
        self.handlers = {} #dict for each handler class
        self.register(CrewHandler())
        self.register(FirstClassHandler())
        self.register(SecondClassHandler())
        self.register(ThirdClassHandler())
    def register(self,handler):
        self.handlers[handler.getField()] = handler #add each class to dict
    def getHandler(self,fld):
        return self.handlers[fld] # returns the python class to handle specific passenger class

class FieldHander: # base python class to be inherited
    def __init__(self, field):
        self.field = field
    def getField(self):
        return self.field
    def setField(self, field):
        self.field = field    
            
class CrewHandler(FieldHander): # print passenger class and survived
    def __init__(self):
        FieldHander.__init__(self, "Crew")
    def apply(self, fields):
        print ([fields[1]])  

class FirstClassHandler(FieldHander): # print passenger class, gender, age, and survived
    def __init__(self):
        FieldHander.__init__(self, "1st")
    def apply(self, fields):
        print ([fields[1], fields[2], fields[3], fields[4]]) 

class SecondClassHandler(FieldHander): # print passenger class, gender, and survived
    def __init__(self):
        FieldHander.__init__(self, "2nd")
    def apply(self, fields):
        print ([fields[1], fields[2], fields[4]])

class ThirdClassHandler(FieldHander): # print passenger class and survived
    def __init__(self):
        FieldHander.__init__(self, "3rd")
    def apply(self, fields):
        print ([fields[1], fields[4]])

And the results (notice how each class of passenger has different fields chosen):
[‘3rd’, ‘No’]
[‘3rd’, ‘Yes’]
[‘2nd’, ‘Male’, ‘No’]
[‘Crew’]
[‘1st’, ‘Male’, ‘Adult’, ‘No’]
….

Running Python in Hive/Hadoop

Standard

One of the things I love about running Hive is the ability to run Python and leverage the power of the parallel processing. Below I’m going to show a stripped down example of how to integrate a Hive statement & Python together to aggregate data to prepare it for modeling. Keep in mind, you can also use Hive & Python to transform data line by line as well, and it extremely handy for data transformation.

Use case: print out an array of products sold to a particular user. Again is a basic example, but you can build upon this and generate products sold for every user, then use KNN to generate clusters of users, or perhaps Association Rules to generate baskets.

Here is the Python script, which will have to be saved in local Hadoop path:


#!/usr/bin/python
import sys

items_sold = []  # create global list variable

class Items:  # create class to store and access items added
    def __init__(self, x):
    	self.x = x

    def set_x(self, x):
        self.x = x
    
    def get_x(self):
        return self.x

def print_results():  # print output in Hive
	result_set = [item.get_x() for item in items_sold];
	print (result_set)

	# Hive submits each record to stdin
	# The record/line is stripped of extra characters and submitted
for line in sys.stdin:
	line = line.strip()
	purchased_item = line.split('\t')
	items_sold.append(Items(purchased_item))

print_results()

Here is the hive statement:


add file blog_hive.py; 
select TRANSFORM (a.purchased_item)
using 'blog_hive.py'
AS array_purchased
from (select purchased_item from company_purchases where user_id = 'u1') a;

Result in Hive will be similar to this: [‘s_123’, ‘s_234’, ‘s890’]

Probability of a Revenue Threshold

Standard

A retailer’s website purchases have an average order size of $100 and a standard deviation of $75. What is the probability of 10 orders generating over $1,250 in Revenue?

mean = $100.00
stdev = $75.00

avg_order_needed = $1250/10 = $125.00
standard_error = $75/sqrt(10) = $23.72
z-score = (125.00 – 100.00)/23.72 = 1.05

We are looking to solve for this shaded area under the curve.

upper_tail_post

Looking up on z-table for 1.05, the probability is 0.1469 or 14.7% of a obtaining $1,250 in Revenue from 10 random orders.