Data Mine

Intro to Wukong, a Ruby Framework for Hadoop

As Flip Kromer was quoted at the Strata Conference, “Java has many many virtues, but joy is not one of them”. A lot of developers might not think they can use Hadoop simply because they never learned or refuse to use Java.

Wukong allows you to leverage the agility and ease of use of Ruby with Hadoop. The same program you write on your machine can be deployed to the cloud.

In this video at Data Day Austin, Infochimps CTO Flip Kromer walks through how you can get started with Wukong.

Many thanks to Lynn Bender at GeekAustin for filming, and DataStax for sponsoring. You can find more videos from Data Day at this Blip Channel.

Installing Wukong

You can find more posts on big data, Hadoop, Pig, and Wukong at my personal blog, Data Recipes.
–JACOB PERKINS
(The Data Chef)

Wukong is hands down the simplest (and probably the most fun) tool to use with hadoop. It especially excels at the following use case:

You’ve got a huge amount of data (let that be whatever size you think is
huge). You want to perform a simple operation on each record. For
example, parsing out fields with a regular expression, adding two fields
together, stuffing those records into a data store, etc etc. These are
called map only jobs. They do NOT require a reduce. Can you imagine
writing a java map reduce program to add two fields together? Wukong
gives you all the power of ruby backed by all the power (and
parallelism) of hadoop streaming. Before we get into examples, and there
will be plenty, let’s make sure you’ve got wukong installed and running
locally.

Installing Wukong

First and foremost you’ve got to have ruby installed and running on your
machine. Most of the time you already have it. Try checking the version
in a terminal:

$: ruby --version
ruby 1.8.7 (2010-01-10 patchlevel 249) [x86_64-linux]

If that fails then I bet google can help you get ruby installed on
whatever os you happen to be using.

Next is to make sure you’ve got rubygems installed

$: gem --version
1.3.7

Once again, google can help you get it installed if you don’t have it.

Wukong is a rubygem so we can just install it that way:

sudo gem install wukong
sudo gem install json
sudo gem install configliere

Notice we also installed a couple of other libraries to help us out (the
json gem, the configliere gem, and the extlib gem). If at any time you
get weird errors (LoadError: no such file to load — somelibraryname)
then you probably just need to gem install somelibraryname.

An example

Moving on. You should be ready to test out running wukong locally now.
Here’s the most minimal working wukong script I can come up with that
illustrates a map only wukong script:

#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'

class LineMapper < Wukong::Streamer::LineStreamer
 def process line
   yield line
 end
end

Wukong::Script.new(LineMapper, nil).run

Save that into a file called wukong_test.rb and run it with the
following:

cat wukong_test.rb | ./wukong_test.rb
--map

If everything works as expected then you should see exactly the contents
of your script dump onto your terminal. Lets examine what’s actually
going on here.

Boiler plate ruby

First, we’re letting the interpreter know we want to use ruby with the
first line (somewhat obvious). Next, we’re including the libraries we
need.

The guts

Then we define a class in ruby for doing our map job called LineMapper.
This guy subclasses from the wukong LineStreamer class. All the
LineStreamer class does is simply read records from stdin and gives them as arguments to
the LineMapper’s process method. The process method then does nothing
more than yield the line back to the LineStreamer which emits the line
back to stdout.

The runner

Finally, we have to let wukong know we intend to run our script. We
create a new script object with LineMapper as the mapper class and nil as the reducer class.

More succinctly, we’ve written our own cat program. When we ran the
above command we simply streamed our script, line by line, through the
program. Try streaming some real data through the program and adding
some more stuff to the process method. Perhaps parsing the line with a
regular expression and yielding numbers? Yielding words? Yielding
characters? The choice is yours. Have fun with it.

Meatier examples to come.

Graph Processing With Wukong and Hadoop

As a last (for now) tutorial oriented post on Wukong, let’s process a network graph.

Get Data

This airport data (airport edges) from Infochimps is one such network graph with over 35 million edges. It represents the number of flights and passengers transported between two domestic airports in a given month. Go ahead and download it.

Explore Data

We’ve got to actually look at the data before we can make any decisions about how to process it and what questions we’d like answered:


$: head data/flights_with_colnames.tsv | wu-lign
origin_airport destin_airport passengers flights month
MHK AMW 21 1 200810
EUG RDM 41 22 199011
EUG RDM 88 19 199012
EUG RDM 11 4 199010
MFR RDM 0 1 199002
MFR RDM 11 1 199003
MFR RDM 2 4 199001
MFR RDM 7 1 199009
MFR RDM 7 2 199011

So it’s exactly what you’d expect; An adjacency list with (origin node,destination node,weight_1,weight_2,timestamp). There are thousands of data sets with similar characteristics…

Ask A Question

A simple question to ask (and probably the first question you should ask of a graph) is what the degree distribution is. Notice there are two flavors of degree in our graph:

1. Passenger Degree: For a given airport (node in the graph) the number of passengers in + the number of passengers out. Passengers in is called the ‘in degree’ and passengers out is (naturally) called the ‘out degree’.

2. Flights Degree: For a given airport the number of flights in + the number of flights out.

Let’s write the question wukong style:


#!/usr/bin/env ruby

require 'rubygems'
require 'wukong'

class EdgeMapper < Wukong::Streamer::RecordStreamer
#
# Yield both ways so we can sum (passengers in + passengers out) and (flights
# in + flights out) individually in the reduce phase.
#
def process origin_code, destin_code, passengers, flights, month
yield [origin_code, month, "OUT", passengers, flights]
yield [destin_code, month, "IN", passengers, flights]
end
end

class DegreeCalculator < Wukong::Streamer::AccumulatingReducer
#
# What are we going to use as a key internally?
#
def get_key airport, month, in_or_out, passengers, flights
[airport, month]
end

def start! airport, month, in_or_out, passengers, flights
@out_degree = {:passengers => 0, :flights => 0}
@in_degree = {:passengers => 0, :flights => 0}
end

def accumulate airport, month, in_or_out, passengers, flights
case in_or_out
when "IN" then
@in_degree[:passengers] += passengers.to_i
@in_degree[:flights] += flights.to_i
when "OUT" then
@out_degree[:passengers] += passengers.to_i
@out_degree[:flights] += flights.to_i
end
end

#
# For every airport and month, calculate passenger and flight degrees
#
def finalize

# Passenger degrees (out, in, and total)
passengers_out = @out_degree[:passengers]
passengers_in = @in_degree[:passengers]
passengers_total = passengers_in + passengers_out

# Flight degrees (out, in, and total)
flights_out = @out_degree[:flights]
flights_in = @in_degree[:flights]
flights_total = flights_in + flights_out

yield [key, passengers_in, passengers_out, passengers_total, flights_in, flights_out, flights_total]
end
end

#
# Need to use 2 fields for partition so every record with the same airport and
# month land on the same reducer
#
Wukong::Script.new(
EdgeMapper,
DegreeCalculator,
:partition_fields => 2 # use two fields to partition records
).run

Don’t panic. There’s a lot going on in this script so here’s the breakdown (real gentle like):

Mapper

Here we’re using wukong’s RecordStreamer class which reads lines from $stdin and splits on tabs for us already. That’s how we know exactly what arguments the process method gets.

Next, as is often the case with low level map-reduce, we’ve got to be a bit clever in the way we yield data in the map. Here we yield the edge both ways and attach an extra piece of information (“OUT” or “IN”) depending on whether the passengers and flights were going into the airport in a month or out. This way we can distinguish between these two pieces of data in the reducer and process them independently.

Finally, we’ve carefully rearranged our records such that (airport,month) is always the first two fields. We’ll partition on this as the key. (We have to say that explicitly at the bottom of the script)

Reducer

We’ve seen all these methods before except for one. The reducer needs to know what fields to use as the key (it defaults to the first field). Here we’ve explicitly told it to use the airport and month as the key with the ‘get_key’ method.

* start! – Here we initialize the internal state of the reducer with two ruby hashes. One, the @out_degree will count up all the passengers and flights out. The @in_degree will do the same but for passengers and flights in. (Let’s all take a moment and think about how awful and unreadable that would be in java…)

* accumulate – Here we simply look at each record and decide which counters to increment depending on whether it’s “OUT” or “IN”.

* finalize – All we’re doing here is taking our accumulated counts, creating the record we care about, and yielding it out. Remember, the ‘key’ is just (airport,month).

Get An Answer

We know how to put the data on the hdfs and run the script by now so we’ll skip that part. Here’s what the output looks like:


$: hdp-catd /data/domestic/flights/degree_distribution | head -n20 | wu-lign
1B1 200906 1 1 2 1 1 2
ABE 200705 0 83 83 0 3 3
ABE 199206 0 31 31 0 1 1
ABE 200708 0 904 904 0 20 20
ABE 200307 0 91 91 0 2 2
ABE 200703 0 36 36 0 1 1
ABE 199902 0 84 84 0 1 1
ABE 200611 0 753 753 0 18 18
ABE 199209 0 99 99 0 1 1
ABE 200702 0 54 54 0 1 1
ABE 200407 0 98 98 0 1 1
ABE 200705 0 647 647 0 15 15
ABE 200306 0 27 27 0 1 1
ABE 200703 0 473 473 0 11 11
ABE 200309 0 150 150 0 1 1
ABE 200702 0 313 313 0 8 8
ABE 200103 0 0 0 0 1 1
ABE 199807 0 105 105 0 1 1
ABE 199907 0 91 91 0 1 1
ABE 199501 0 50 50 0 1 1

At this point is where you might bring this back down to your local file system, crack open a program like R, make some plots, etc.

And we’re done for now. Hurray.

Access the Infochimps Query API via commandline

A tutorial on how to use chimps to access the Infochimps Query API via commandline.

  1. Sign up for the API
  2. When you get your API key, create your chimps dotfile: sudo nano ~/.chimps
  3. Put this in your dotfile:
    :query:
          :username: your_api_name
          :key:      you_api_key
    
    
  4. Install chimps: sudo gem install chimps. (make sure you have gemcutter as a source otherwise it won’t find the gem: gem sources -a http://gemcutter.org)
  5. Run a query! % chimps query soc/net/tw/influence screen_name=infochimps

It should return with something like this:

{"replies_out":13,"account_age":602,"statuses":166,"id":15748351,"replies_in":22,"screen_name":"infochimps"}

That’s it!

Little Tips

    # Run a command in each git submodule (show status)
    for foo in `find . -iname .git -type d ` ; do repo=`dirname $foo` ; ( echo "                == $repo ==" ; cd $repo ; git status ) ; done
    # Run a command in each git submodule (show remote URL)
   for foo in `find . -iname .git -type d ` ; do repo=`dirname $foo` ; ( cd $repo ; url=`git remote show origin | grep URL | cut -c 8-`; printf "%-47s\t%s\n" "$repo" "$url" ) ; done

Make your command-line history extend to the beginning of time

I save my entire command-line history, archived by month, and have a shell script that lets me search back through it — if I need to recall the command line parameters to do an ssh tunnel or to make curl do a form POST I can pull it up from that time in June when I figured out how.

  # no limit on history file size
  unset  HISTFILESIZE
  # 10k lines limit on in-memory history
  export HISTSIZE=10000
  # name the history file after the date
  export HISTFILE=$HOME/.history-bash/"hist-`date +%Y-%W`.hist"
  # if starting a brand-new history file
  if [[ ! -f $HISTFILE ]]; then
    # seed new history file with the last part of the most recent history file
    LASTHIST=~/.history-bash/`/bin/ls -1tr ~/.history-bash/ | tail -n 1`;
    if [[ -f "$LASTHIST" ]]; then tail -n 1000 "$LASTHIST" > $HISTFILE  ; fi
  fi
  # seed history buffer from history file
  history -n $HISTFILE

h3. Password safety from the command line

For many commands — mysql, curl/wget, others — it’s convenient to pass your credentials from the command line rather than (unsafely) in a file or (inconveniently) enter them each time. There’s a danger, though, that you’ll accidentally save that password in your .history for anyone with passing access to find.

In my .bashrc, I set export HISTCONTROL=ignorespace — now any command entered with leading spaces on the command line is NOT saved in the history buffer (use ignoreboth to also ignore repeated commands). If I know I’m going to be running repeated commands that require a password on the command line, I can just set an environment variable in an ignored line, and then recall the password variable:

womper ~/wukong$      DBPASS=my.sekritpass1234
womper ~/wukong$ mysql -u ics --password=$DBPASS ics_dev

or for another example,

womper ~/wukong$       twuserpass="myusername:twittterpassword"
womper ~/wukong$ curl -s -u $twuserpass http://stream.twitter.com/1/statuses/sample.json

TSV / Hadoop Streaming Fu

Hadoop streaming uses tab-separated text files.

Quickie histogram:

I keep around a few useful

    cat file.tsv | cuttab 3 | cutc 6 | sort | uniq -c


This take file.tsv and extracts the third column (cuttab 3), takes the first six characters (YYYYMM) ; then sorts (putting all distinct entries together in a run) ; and takes the count of each run. Its output is

   4245 200904
  14660 200905
   7654 200906

A few other useful hadoop commands:

A filename of ‘-’ to the -put command makes it use STDIN. For example, this creates a file on the HDFS with a recursive listing of the somedir directory:

    hadoop fs -lsr somedir | hadoop fs -put - tmp/listing


Wukong’s hdp-du command is tab-separated

    hdp-du somedir | hdp-put - tmp/listing


So you can also run an HDFS file through a quick filter:

   hdp-cat somefile.tsv | cuttab 2 | hdp-put - somefile-col2.tsv 


(If you brainfart and use ‘… > hdp-put …’ know that I’ve done so a dozen times too).

It's Hot, Damn Hot. So Hot I saw a Chimp in Orange Robes Burst into Flames.

It’s been ridiculously hot ridiculously early this year in Austin. A friend passed along this link to a visualization of 100+ degree days over the last 10 years. The author couldn’t find data extending back farther than 2000, but luckily I knew where to look.

I pulled the NCDC weather for Austin from 1948-present (see infochimps.org link for details) and got my Tufte on.

This temperature cycle is hotter than but comparable to the 1950-1965 era. I’ve got no idea if it’s global warming or the peak of a cycle. The fundamental conclusion — that this year so far, 2000 and 2008 were damn hot — stands up well.

(more…)

Start hacking: machetEC2 released!

machetEC2, the Infochimps Amazon Machine Image (AMI) designed for data processing, analysis, and visualization, has been released!

Amazon’s Cloud Computing services give you transformatively cheap and scalable computing power, and their Public Data Sets (AWS/PDS) collection (which infochimps is contributing to) is helping to put the world of free, open data at your fingertips.  MachetEC2 lets you summon a “batteries included” computer — or a hundred computers — from the cloud.  As soon as it loads, you’re ready to start crunching and transforming and visualizing data, whether from AWS/PDS, or infochimps.org, or your own pool.

When you SSH into an instance of machetEC2 (brief instructions after the jump), check the README files: they describe what’s installed, how to deal with volumes and Amazon Public Datasets, and how to use X11-based applications.  You can also visit the the machetEC2 GitHub page to see the full list of packages installed, the list of gems, and the list of programs installed from source.

This machete is only as sharp as it is complete. If there’s software that you find indispensable, we encourage you to suggest it here, or even better to help add it to the toolkit (instructions are within).

(more…)

Hacking through the Amazon with a shiny new MachetEC2

Hold on to your pith helmets: the Infochimps are releasing an Amazon Machine Image designed for data processing, analysis, and visualization.

Amazon’s Elastic Compute Cloud (EC2) allows users to instantiate a virtual computer with a pre-installed operating system, software packages, and up to 1 TB of data loaded on disk, ready to work with, from a shared image (an “Amazon Machine Image”, or AMI).

MachetEC2 is an effort by a group of Infochimps to create an AMI for data processing, analysis, and visualization. If you create an instance of MachetEC2, you’ll be have an environment with tools designed for working with data ready to go. You can load in your own data, grab one of our datasets, or try grabbing the data from one of Amazon’s Public Data Sets. No matter what, you’ll be hacking in minutes.

We’re taking suggestions for what software the community would be most interested in having installed on the image (peek inside to see what we’ve thought of so far…)

(more…)

What's Next: Infinite Monkeywrench starting to take form.

We’re starting beta testing of infochimps.org v1.0 — see the following post. In order to start really populating infochimps.org with dataset payloads, the Infinite Monkeywrench is about to get some major love. The following syntax is still evolving, but we’re already using it to do some really fun stuff: here’s a preview.

One of the data set’s we’re proud to be liberating is the National Climate Data Center’s global weather data. To use that data, you need the file describing each of the NCDC weather stations. (I’ll just describe the stations metadata file — the extraction cartoon for the main dataset is basically the same but like 10 feet wide.)

The weather station metadata is found at at ftp://ftp.ncdc.noaa.gov/pub/data/gsod/ish-history.txt, it’s a flat file, it has a header of 17 lines, it contains fields describing each stations latitude, longitude, call sign and all that, and has lines that look like

# USAF   WBAN  STATION NAME                  CTRY  ST CALL  LAT    LON     ELEV(.1M)
# 010014 99999 SOERSTOKKEN                   NO NO    ENSO  +59783 +005350 +00500

Here’s what a complete Infinite Monkeywrench script to download that file, spin each line into a table row, and export as CSV, YAML, and marked-up XML would look like:

    #!/usr/bin/env ruby
    require 'imw'; include IMW
    imw_components :datamapper, :flat_file_parser

    # Stage as an in-memory Sqlite3 connection:
    DataMapper.setup(:staging_db, 'sqlite3::memory:')

    # Load the infochimps schema -- this has table and field names including type info
    ncdc_station_schema = ICSSchema.load('ncdc_station_schema.icss.yaml')

    # Create the tables from the schema
    ncdc_station_schema.auto_migrate!

    # Parse the station info file
    stations = FlatFileParser.new({
	:database  => :staging_db,
	:schema    => ncdc_station_schema,
	:each_line => :station,
	:filepaths => [:ripd, ['ftp://ftp.ncdc.noaa.gov/pub/data/gsod/ish-history.txt']],
	:skip_head => 17,
	:cartoon   => %q{
	# USAF   WBAN  STATION NAME                  CTRY  ST CALL  LAT    LON     ELEV(.1M)
	  s6    .s5   .s30                           s2.s2.s2.s4  ..ci5   .ci6    .ci5
	},
      })

    # Dump as CSV, YAML and XML
    stations.dump_all out_file => [:fixd, "weather_station_info"], :formats => [:csv, :xml, :yaml]

Almost all of that is setup and teardown. Once the infochimps schema has field names, the only part you really have to figure out is the cartoon,

      s6    .s5   .s30                           s2.s2.s2.s4  ..ci5   .ci6    .ci5

If you’ve used perl’s unpack(), you’ll get the syntax — this says ‘take the USAF call sign from the initial 6-character string; ignore one junk character; … take one character as the latitude sign, and an integer of up to 5 digits as the scaled latitude, ….’

Rather load it into a database? Leave the last line out, and stage right into your DB. (Any of MySQL 4.x+, Potsgres 8.2+, SQLite3+ work.)

    # Load parsed files to the 'ncdc_weather' database in a remote MySQL DB store
    DataMapper.setup(:master_weather_db, 'mysql://remotedb.mycompany.com/ncdc_weather')

Surely a hand-tuned scripts will do this more thoroughly (and more quickly), but you can write this in a few minutes, set it loose on the gigabytes of data, and do all the rest from the comfort of your DB, your hadoop cluster, or a script that starts with populated datastructures given by a YAML file.

Another example. The US Nations Institute for Science and Technology (NIST) publishes an authoritative guide to conversion factors for units of measurement. It is, unhelpfully, only available as an HTML table or a PDF file.

If we feed into the InfiniteMonkeywrench

	fields:
	  - { name: unit_from,                  type: str},
	  - { name: unit_to,                    type: str},
	  - { name: conversion_mantissa,        type: float},
	  - { name: conversion_exponent,        type: float},
	  - { name: is_exact,                   type: boolean},
	  - { name: footnotes,
	      type: seq,
	      sequence: str }
  • The cartoon
	  { :each    => '//table.texttable/tr[@valign="top"]:not(:first-child)',
	    :makes   => :unit_conversion, # a UnitConversion struct
	    :mapping => [
	      '/td'      	  => { :unit_from, :unit_to, :conversion_mantissa, :conversion_exponent],
	      '/td/b'    	  => :is_exact,
	      '/td/a'    	  => :footnotes,
	    ]
	  }

We’d get back something like

  - unit_from: 		 'dyne centimeter (dyn · cm)'
    unit_to:		 ' newton meter (N · m)'
    conversion_mantissa:  1.0
    conversion_exponent: -0.7

  - unit_from: 		 'carat, metric'
    unit_to:		 'gram (g)'
    conversion_mantissa:  2.0
    conversion_exponent: -1
    is_exact: 		 true

  - unit_from: 		 'centimeter of mercury (0 °C) <a href="http://physics.nist.gov/Pubs/SP811/footnotes.html#f13">13</a>'
    unit_to:		 ' pascal (Pa)'
    conversion_mantissa: 1.33322
    conversion_exponent: 3
    footnotes:           [ '<a href="http://physics.nist.gov/Pubs/SP811/footnotes.html#f13">13</a>' ]

Now with some tweaking, you could do even more (and you’ll find you need to hand-correct a couple rows), but note:

  • Once one person’s done it nobody else has to.
  • This snippet gets you most of the way to a semantic dataset in your choice of universal formats.
  • In fact, there’s so little actual code left over we can eventually just take schema + url + cartoon as entered on the website, crawl the relevant pages, and provide each such dataset as CSV, XML, YAML, JSON, zip’d sqlite3 file … you get the idea — and we can do that without having to run code from strangers on our server.
  • Most importantly, for an end user this isn’t like trusting some random dude’s CSV file uploaded to a site named after a chimpanzee. The transformation from NIST’s data to something useful is so simple you can verify it by inspection. Of course, you can run the scripts yourself to check; or you can trace the Monkeywrench code itself; and once we have digital fingerprinting set up on infochimps.org anyone willing to stake their reputation on the veracity of a file can sign it — but it’s pretty easy to accept something this terse but expressive as valid. Our goal is to give transparent provenance of infochimps.org data to any desired degree.

Infinite Monkeywrench hosted on GitHub

Rejoice, you open-source orangutans, for the powerful, the weighty, the Infinite Monkeywrench is now hosted on GitHub! Download a copy and start hacking, if you will, and send us your questions and concerns.

The Infinite Monkeywrench (IMW) turns all the screws in the heaving contraption we call infochimps.org but can also be put to good use on more modest projects as well. Learn more about IMW at the official IMW website.