Pump: A Faster Way To Write Cascading Flows

Check out the new helper we’ve released to cut down on the verbosity of Cascading Flows.

Written by Bryan Duxbury.

One of the tools that Square uses for “big data” analysis is Cascading. There are plenty of competitors in the same space (1, 2, 3), but we particularly like Cascading because of its language model and its extensibility.

However, one of the things we don’t like about Cascading is how much code it can take to write even simple Flows. This is an understandable consequence of being a DSL hosted in Java, but it can be a real drag on productivity, particularly when trying to review the Flows that others have written. For instance, check this one out:

Pipe pipe **=** **new** **Pipe(**"input"**);**
pipe **=** **new** **Each(**pipe**,** **new** **Fields(**"line"**),** **new** **RegexFilter(**"^[0-9]+"**));**
pipe **=** **new** **Each(**pipe**,** **new** **Fields(**"line"**),** **new** **RegexSplitter(new** **Fields(**"timestamp"**,** "tag"**),** ","**),** **new** **Fields(**"timestamp"**,** "tag"**));**
pipe **=** **new** **Coerce(new** **Fields(**"timestamp"**),** **long.**class**);**
pipe **=** **new** **Each(**pipe**,** **new** **Fields(**"timestamp"**),** **new** **BucketizeTimestamp(),** **new** **Fields(**"tag"**,** "bucketized_timestamp"**);**
pipe **=** **new** **Discard(**pipe**,** **new** **Fields(**"timestamp"**));**
pipe **=** **new** **Rename(**pipe**,** **new** **Fields(**"bucketized_timestamp"**),** **new** **Fields(**"timestamp"**));**
pipe **=** **new** **GroupBy(**pipe**,** **new** **Fields(**"timestamp"**,** "tag"**));**
pipe **=** **new** **Every(**pipe**,** Fields**.**NONE**,** **new** **Count(new** **Fields(**"count"**)));**

This Flow takes in text lines, discards any that don’t start with a string of digits, splits on commas, bucketizes the digit (timestamp) field using a user-defined function, and then counts occurrences of tags by the new timestamp. In all, it’s not that hard to express. Each individual line only does a relatively small amount of work. However, there’s a lot of boilerplate on every line that adds friction, and even with more aggressive multi-line indentation, ultimately it’s more difficult to read. And this is only a trivial Flow — real-world examples could easily have dozens of steps or more!

To combat this problem, we’ve introduced a new helper class, Pump, which can be found in our cascading-helpers project on GitHub. Pump wraps the existing Cascading API to give developers a much less verbose way to express their Flows. Here’s the same Flow as above, rewritten using Pump:

  **.**each**(new** **RegexFilter(**"^[0-9]+"**),** "line"**)**
  **.**each**(new** **RegexSplitter(new** **Fields(**"timestamp"**,** "tag"**),** ","**),** "line"**)**
  **.**coerce**(long.**class**,** "timestamp"**)**
  **.**each**(new** **BucketizeTimestamp(),** "bucketized_timestamp"**)**
  **.**rename**(**"bucketized_timestamp"**,** "timestamp"**)**
  **.**groupby**(**"timestamp"**,** "tag"**)**
  **.**every**(new** **Count(new** **Fields(**"count"**)));**

The first thing you should notice is that Pump uses the Builder pattern. This eliminates a ton of the overhead of declaring your Flow by allowing you to omit variable assignment and the various pipe instantiations. Further, Pump largely does away with arguments of type Fields, since that requires you to throw in another one or two instantiations per operation. We also use varargs liberally to cut down on the need for creation of collections. Finally, we have added builder-style methods for many of the Cascading built-in assemblies for tuple algebra so that you don’t have to break out of the builder to complete these common operations.

The other thing worth noting is that Pump doesn’t change everything about the way Cascading Flows are written, nor does it provide a solution for 100% of the use cases that Cascading supports. Pump’s objective is to make most of the things you want to do easier, but on occasion you will probably want to do something crazy and/or specific, and in that case, you’re better off doing it how the Cascading developers envisioned it. To this end (and ultimately to facilitate actually running your Flow), the result of any Pump helper method can easily be converted back into a Pipe by calling toPipe().

Next Steps

We’ve started using Pump internally and have seen a nice boost to our productivity and code readability. We’re planning to continue extending it, and we’d love to see other Cascading users contribute their improvements! Particularly if you have any ideas around better syntax for managing CoGroups, please get in touch with us.

As a side note, the cascading-helpers project is envisioned as a shared repository of tools that make writing and running Cascading Flows much easier. As a tease, one such class that everyone should use is CascadingHelper — it will make your Cascading unit tests run 10x faster. We’d love to see more external contributions! Bryan Duxbury (@bryanduxbury) | Twitter *The latest Tweets from Bryan Duxbury (@bryanduxbury). Building data tools @StreamSets. Co-Founder @8bitlit. Formerly…*twitter.com

Table Of Contents