Wednesday, September 3, 2014

Getting started with Apache Spark in Python

Apache Spark is one of the promising new technologies for big data processing; it provides distributed in-memory tables and allows you to quickly compute aggregates and even complicated calculations. Spark provides interfaces for Java, Scala and Python.

Although setting a production cluster of Spark can be complicated (especially since you probably want to set up a hadoop cluster too), you can run Spark locally and, in that case, installation is a breeze (download,unpack :).

After you've installed it, running pyspark will get you a shell, and it will create a context (from which you can access most functionality) in a variable called sc.

For this simple tutorial we will load a few files containing sales data, and aggregate in a couple of different ways. You can read any kinds of files, but you need to parse them yourself, so you normally want a simple format; we will read tab-separated files, which contain a fixed number of fields, separated with tabs. Our file will contain the day, the store id, the product id and the amount sold on that day.

To load a file we use the textFile method of the context; you pass it the name of a file (or a regular expression matching more than one file)

For example, to load all of the files that start with sales we would do:
sc.textFile("sales_*.txt")

now, this gives us a collection (spark calls them RDDs) of lines; we normally want to parse each line into its fields; we can call the map method, and give it a function; this will return a collection resulting from applying this method to each of the elements of the previous one (and we can chain this calls). Since we have a tab-delimited file, we just need to call the split method on each line, and that will return an array of strings; so we can do:
sales=sc.textFile("sales_*.txt").map(lambda x:x.split('\t'))

Here we use textFile to read all files that start with sales_ and end in .txt; this would get us a collection of lines; we then apply to each line a function which splits the line by tabs ('\t'), so we now have an array of fields instead of each line; we finally store that in a variable called sales (as an aside, this doesn't actually read the files; instead, spark stores the transformations and won't actually compute it until it's needed; if you want to see the results you can use first to get the first object, or collect to get the full collection).

Once we have the data loaded, we may want to do some calculations; most of the time, we want to calculate values for a group;  something equivalent to a GROUP BY in SQL. We can use the reduceByKey method; this method takes a collection of pairs; will create groups by the first element, and then apply a reducing function to the elements in the group.

Our sales RDD does not contain pairs, so we first need to apply a map, to get pairs; the function we want to apply to the group is addition, so if we want to calculate the sales by day, we can do:
sales_by_day=sales.map(lambda x:(x[0],int(x[3])))       
   .reduceByKey(lambda x,y:x+y)

Remember that the day is the first field (so x[0]) and the amount sold is the fourth element, x[3]. We use reduceByKey to group by the first field (the day) and calculate the sum of the amount sold.

You can look at the elements in sales_by_day with:
sales_by_day.collect()

Which, for our sample data, produces 
[(u'2014-01-02', 212), (u'2014-01-01', 357)]

Many times we want to store the output into another text file; we can use saveAsTextFile; notice that we normally want to apply map again, to transform our output into a more suitable format; for example, to store it again as a text-delimited file we can do:
sales_by_day.map(lambda l: "{0}\t{1}".format(l[0],l[1])) .saveAsTextFile("sales_by_day")

Notice this will actually produce a folder called sales_by_day with a bunch of files, part-0000 etc, and SUCCESS (if it succeeds), just like hadoop would.

Sample data can be obtained from https://github.com/okaram/spark ; I plan to make this into a series; next one will be on joins, another on writing applications, and at least one more on performance.

2 comments:

  1. Replies
    1. I think hdfs sees them as one file, so you can use them in the next stage; for using in other places, you can just cat them together (as in cat part* > finalfile.txt)

      Delete