Simple Flume Decorators with JRuby

Flume is a framework for moving chunks of data around on your network. It’s primary mission isto move log data from where it is generated (perhaps a web server) to someplace where it can actually be used – like an HDFS file system where it can be crunched by Hadoop. Flume’s design is very flexible – the final destination for your data could also be a database like HBase or Cassandra, a search index system like Elastic Search, another file system like an S3 bucket, or any of a myriad of other configurations. Flume will also go to some efforts to make sure that your data is delivered reliably – it includes some tunable reliability features out of the box.

The Flume User Guide does a good job of explaining how its component architecture works. You can configure data flows by chaining together systems of “nodes” – each node is a data moving unit – each has an input (“source”) and an output (“sink”). Sources can conceivably be anything that produces data – flume can tail sets of files on disk, listen to network ports, periodically run programs, etc. Sinks are a little more interesting – they can write data to disk, push data into an network connection, or into a database. Even more interesting, sinks can be composites – you can fan data out to many other sinks, or set up conditional sinks where if data fails to be accepted by the first sink, it will instead be routed to a second sink. Also, you can build “Decorators” that can modify the data as it moves down the path. Flume offers many sources, sinks, and decorators out of the box – but also gives you the ability to write your own through a Java-based plugin API.

Flume chops the data up into a series of “events”. For instance, if you are using flume to tail a log file, every line written to the file gets turned into a flume event. Each event carries with it the body of the data that produced it, as well as some meta-data: the machine that it was collected on, a time-stamp showing when the data was collected, the event’s priority, etc. You can even add your own key-value pairs to an event in its attributes. Flume sinks can store both the body data and the metadata of an event or in some cases, use the metadata to help ensure that the data lands in the right place – like with the “collectorSink” file bucketing mechanism.

To me, and to some of the other primates I work with at Infochimps, decorators are especially interesting. In a decorator, you get to do some processing on your data as it flies from wherever it was produced to its final destination(s). Flume comes with a handful of basic decorators that will allow you to do some small scale processing of flume events. For instance, the “value” decorator lets you set a value in the metadata of an event. The out-of-the-box decorators are not quite sufficient to handle my processing demands. I wanted a little more flexibility, so I wrote (in less time than it took me to write this blog entry) a quick interface to jRuby. Now I have access to my flume data in transit with a powerful scripting engine.

Enough with the intro – lets jump in. The following steps will lead you down the road to processing data on the fly through the flume with a jRuby script:

1. Install flume. Cloudera has good documentation on setting up Flume. I run Ubuntu, so I just added the cloudera apt package repository to my apt sources, and used “apt-get” to install the packages flume, flume-node and flume-master.

2. Get jRuby. If you use apt-get, you will be getting a slightly out-of-date version, but it will do for the moment. The jRuby website has more details If you need it.

3. Get my jRubyPlugin. For it to work, you have to have it and jruby.jar in Flume’s classpath. You can make custom adjustments to Flume’s runtime environment, including Flume classpath changes in the flume-env.sh script in flume/bin. The easy way is to just drop jruby-flume.jar in the /usr/lib/flume/lib directory (or wherever flume landed in your install process). Getting your jRuby envornment completely set up so that you can see jruby gems and stuff is going to involve making adjustments to your environment, but for now, just having the jruby.jar on the classpath will work. I just created a symbolic link to /usr/lib/jruby/lib/jruby.jar in /usr/lib/flume/lib.

( Aside: I don’t know the full answer to getting everything jruby set up in an embedded mode. However, if you add the following to your flume-env.sh script, you will be at least part of the way there
export UOPTS=”-Djruby.home=/usr/lib/jruby -Djruby.lib=/usr/lib/jruby/lib -Djruby.script=jruby”
)

4. You have to tell flume explicitly what classes to load as plugins when the nodes start up. To do this, create or edit “flume-site.xml” in the flume/conf directory. It should contain at least the following:
< ?xml version="1.0"?>
< ?xml-stylesheet type="text/xsl" href="configuration.xsl"?>



flume.plugin.classes
com.infochimps.flume.jruby.JRubyDecorator
List of plugin classes to load.

After you get this in place, restart your flume-master and flume-node. If everything went ok, the services will start up, and you can go to http://localhost:35871/masterext.jsp to see if the plugin loaded successfully. If you see “jRubyDecorator” listed under the decorators, you are in business.

5. Ok, now let’s build a decorator that does something simple. Create a directory somewhere to keep ruby scripts for flume – I like /usr/lib/flume/scripts. The files in this directory need to be readable by the user that flume is running as. Also, if you are in a distributed world, scripts are going to have to be available both on the master and on the machine that will house the logical node that will run the script.

Here is a simple script. Put it in /usr/lib/flume/scripts/reverse.rb:
# reverse.rb — jRubyDecorator script
require ‘java’
java_import ‘com.cloudera.flume.core.EventSinkDecorator’
java_import ‘com.cloudera.flume.core.Event’
java_import ‘com.cloudera.flume.core.EventImpl’
class ReverseDecorator < EventSinkDecorator
def append(e)
body = String.from_java_bytes e.getBody
super EventImpl.new( body.reverse.to_java_bytes, e.getTimestamp, e.getPriority, e.getNanos, e.getHost, e.getAttrs )
end
end
ReverseDecorator.new(nil)
What does it do? Well, it defines a subclass of com.cloudera.flume.core.EventSinkDecorator which redefines the append method. Our special append method builds a new event from an appended event, except that the text of the "body" field is reversed. Not too much nonsense, but we do have to be a little careful with strings. Flume likes its data to be represented as arrays of bytes, but ruby would prefer to deal with strings as Strings, so I convert both ways: String.from_java_bytes() to get a string object, and the to_java_bytes() method on string-like objects to convert back. Hidden in there, is the ruby string method "reverse".

The last line of the append method shows off some of the power of jRuby. It creates a new instance of EventImpl and passes it off to EventSinkDecorator's implementation of append - basically letting the parent class handle all of the difficult work.

Finally, the last line of the script instantiates a new object of the (jRuby!) ReverseDecorator class and returns it to jRubyDecorator. jRubyDecorator is really a factory class for producing decorator instances. It passes off our stuff as a java object, and flume never suspects what has happened.

Does it work? Lets see:
chris@basqueseed:~$ flume shell -c localhost
2011-02-23 17:35:03,785 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-23 17:35:03,993 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
==================================================
FlumeShell v0.9.3-CDH3B4
Copyright (c) Cloudera 2010, All Rights Reserved
==================================================
Type a command to execute (hint: many commands
only work when you are connected to a master node)

You may connect to a master node by typing:
connect host[:adminport=35873[:reportport=45678]]

[flume localhost:35873:45678] exec config basqueseed console '{jRubyDecorator("/usr/lib/flume/scripts/reverse.rb")=>console}’
[id: 0] Execing command : config
Command succeeded
[flume localhost:35873:45678] quit
So far, so good – the master node has decided that everything is kosher. By the way, be careful with the single and double quotes in flume shell commands. The flume shell is very picky about its input. If you have any structure to your sources or sinks, you must single quote the declaration. Lets now play with with a node:
chris@basqueseed:~$ sudo /etc/init.d/flume-node stop
[sudo] password for chris:
Stopping Flume node daemon (flume-node): stopping node

chris@basqueseed:~$ sudo -u flume flume node_nowatch
/2011-02-23 17:38:21,709 [main] INFO agent.FlumeNode: Flume 0.9.3-CDH3B4
2011-02-23 17:38:21,710 [main] INFO agent.FlumeNode: rev 822c62f0c13ab76921e96dd92e19f68007dbcbe2
2011-02-23 17:38:21,710 [main] INFO agent.FlumeNode: Compiled on Mon Feb 21 13:01:39 PST 2011
…{stuff deleted}…
2011-02-23 17:39:40,471 [logicalNode basqueseed-20] INFO console.JLineStdinSource: Opening stdin source
?was I tam a ti saW
2011-02-23 17:39:45,720 [logicalNode basqueseed-20] INFO debug.ConsoleEventSink: ConsoleEventSink( debug ) opened
basqueseed [INFO Wed Feb 23 17:39:45 CST 2011] Was it a mat I saw?
Holy chute! It works!

I think that is enough for today. Next time, I’ll try some more complicated scripts, deal with attributes and play some games with data flows.

Comments are closed.