Driving Analysis by Managing Data with ETL (a Ruby gem!)

A look at a Ruby gem that Square’s Business Intelligence team wrote to facilitate ETLs.

Written by Jeff Iacono.

Business Intelligence at Square

At Square there are two main teams who ensure that data is accessible and accurate for analysis across the company. The Data Engineering team takes data from disparate sources and feeds it into databases that are accessible to analysts. The Business Intelligence (BI) team is then responsible for the transformation and organization of this data into highly optimized structures. This foundation empowers analysts within the Business Intelligence team and those across the company to derive insights and provide data driven recommendations.

What problem does this library solve?

The BI team is responsible for understanding complicated data and transforming it into a form that supports fast, accurate, data driven decision-making. The team developed an ETL (Extract, Transform, and Load) library that:

  • Empowers the development of robust, repeatable, and transformative data operations

  • Supports working with and transforming millions of records by leveraging intelligent (and built in) iteration features

  • Facilitates deploying any ETL operation in the same manner as any source code

Who else could benefit from this library?

The ETL library mentioned above will benefit:

  • Analytics teams responsible for quickly creating data structures that support analysis

  • Data teams that are technically focused and dedicated to developing an ETL pipeline

  • Anyone with a need to perform repeated functions over large data sets

  • Analysts that need to transform large datasets into a form that facilitates analysis

  • Anyone that is primarily database focused and looking to learn and apply Ruby

How does it work?

Installation

Add this line to your application’s Gemfile:

gem 'ETL'

And then execute

$ bundle

Or install it yourself as:

$ gem install ETL

Basic ETL

Assume that we have a database connection represented by connection.

To run a basic ETL that is composed of sequential SQL statements, start by creating a new ETL instance:

etl **=** ETL**.**new(description: "a description of what this ETL does",
              connection:  connection)

which can then be configured:

etl**.**config **do** **|**etl**|**
  etl**.**ensure_destination **do** **|**etl**|**
    *# For most ETLs you may want to ensure that the destination exists,*
    *# so the #ensure_destination block is ideally suited to fulfill*
    *# this requirement.*
    *#*
    *# By way of example:*
    *#*
    etl**.**query %[
      CREATE TABLE IF NOT EXISTS some_database.some_destination_table (
        user_id INT UNSIGNED NOT NULL,
        created_date DATE NOT NULL,
        total_amount INT SIGNED NOT NULL,
        message VARCHAR(100) DEFAULT NULL,
        PRIMARY KEY (user_id),
        KEY (user_id, created_date),
        KEY (created_date)
      )]
  **end**

  etl.before_etl do |etl|
    # All pre-ETL work is performed in this block.
    #
    # This can be thought of as a before-ETL hook that will fire only once.
    # When you are not leveraging the ETL iteration capabilities, the value
    # of this block vs the #etl block is not very clear. We will see how
    # and when to leverage this block effectively when we introduce
    # iteration.
    #
    # As an example, let's say we want to get rid of all entries that have
    # an amount less than zero before moving on to our actual etl:
    #
    etl.query %[
      DELETE FROM some_database.some_source_table
      WHERE amount < 0]
  end

  etl.etl do |etl|
    # Here is where the magic happens! This block contains the main ETL
    # operation.
    #
    # For example:
    #
    etl.query %[
      REPLACE INTO some_database.some_destination_table
      SELECT
          user_id
        , DATE(created_at) AS created_date
        , SUM(amount) AS total_amount
      FROM
        some_database.some_source_table sst
      GROUP BY
          sst.user_id
        , sst.DATE(created_at)]
  end

  etl.after_etl do |etl|
    # All post-ETL work is performed in this block.
    #
    # Again, to finish up with an example:
    #
    etl.query %[
      UPDATE some_database.some_destination_table
      SET message = "WOW"
      WHERE total_amount > 100]
  end
end

At this point it is possible to run the ETL instance via:

etl**.**run

which executes #ensure_destination, #before_etl, #etl, and #after_etl in that order.

ETL with iteration

To add in iteration, simply supply #start, #step, and #stop blocks. This is useful when dealing with large data sets or when executing queries that, while optimized, are still slow.

Again, to kick things off:

etl **=** ETL**.**new(description: "a description of what this ETL does",
              connection:  connection)

where connection is the same as described above.

Next we can configure the ETL:

*# assuming we have the ETL instance from above*
etl**.**config **do** **|**etl**|**
  etl**.**ensure_destination **do** **|**etl**|**
    *# For most ETLs you may want to ensure that the destination exists,*
    *# so the #ensure_destination block is ideally suited to fulfill*
    *# this requirement.*
    *#*
    *# By way of example:*
    *#*
    etl**.**query %[
      CREATE TABLE IF NOT EXISTS some_database.some_destination_table (
          user_id INT UNSIGNED NOT NULL
        , created_date DATE NOT NULL
        , total_amount INT SIGNED NOT NULL
        , message VARCHAR(100) DEFAULT NULL
        , PRIMARY KEY (user_id, created_date)
        , KEY (created_date)
      )]
  **end**

  etl.before_etl do |etl|
    # All pre-ETL work is performed in this block.
    #
    # Now that we are leveraging iteration the #before_etl block
    # becomes more useful as a way to execute an operation once
    # before we begin our iteration.
    #
    # As an example, let's say we want to get rid of all entries
    # that have an amount less than zero before moving on to our
    # actual etl:
    #
    etl.query %[
      DELETE FROM some_database.some_source_table
      WHERE amount < 0]
  end

  etl.start do |etl|
    # This defines where the ETL should start. This can be a flat
    # number or date, or even SQL / other code can be executed to
    # produce a starting value.
    #
    # Usually, this is the last known entry for the destination table
    # with some sensible default if the destination does not yet
    # contain data.
    #
    # As an example:
    #
    # Note that we cast the default date as a DATE. If we don't, it
    # will be treated as a string and our iterator will fail under
    # the hood when testing if it is complete.
    res = etl.query %[
      SELECT COALESCE(MAX(created_date), DATE('2010-01-01')) AS the_max
      FROM some_database.some_destination_table]

    res.to_a.first['the_max']
  end

  etl.step do |etl|
    # The step block defines the size of the iteration block. To
    # iterate by ten records, the step block should be set to return
    # 10.
    #
    # As an alternative example, to set the iteration to go 10,000
    # units at a time, the following value should be provided:
    #
    #   10_000 (Note: an underscore is used for readability)
    #
    # As an example, to iterate 7 days at a time:
    #
    7
  end

  etl.stop do |etl|
    # The stop block defines when the iteration should halt.
    # Again, this can be a flat value or code. Either way, one value
    # *must* be returned.
    #
    # As a flat value:
    #
    #   1_000_000
    #
    # Or a date value:
    #
    #   Time.now.to_date
    #
    # Or as a code example:
    #
    res = etl.query %[
      SELECT DATE(MAX(created_at)) AS the_max
      FROM some_database.some_source_table]

    res.to_a.first['the_max']
  end

  etl.etl do |etl, lbound, ubound|
    # The etl block is the main part of the framework. Note: there are
    # two extra args with the iterator this time around: "lbound" and
    # "ubound"
    #
    # "lbound" is the lower bound of the current iteration. When
    # iterating from 0 to 10 and stepping by 2, the lbound would equal
    # 2 on the second iteration.
    #
    # "ubound" is the upper bound of the current iteration. In continuing
    # with the example above, when iterating from 0 to 10 and stepping by
    # 2, the ubound would equal 4 on the second iteration.
    #
    # These args can be used to "window" SQL queries or other code
    # operations.
    #
    # As a first example, to iterate over a set of ids:
    #
    #   etl.query %[
    #     REPLACE INTO some_database.some_destination_table (
    #         created_date
    #       , user_id
    #       , total_amount
    #     ) SELECT
    #         DATE(sst.created_at) AS created_date
    #       , sst.user_id
    #       , SUM(sst.amount) AS total_amount
    #     FROM
    #       some_database.some_source_table sst
    #     WHERE
    #       sst.user_id > #{lbound} AND sst.user_id <= #{ubound}
    #     GROUP BY
    #         DATE(sst.created_at)
    #       , sst.user_id]
    #
    # To "window" a SQL query using dates:
    #
    etl.query %[
      REPLACE INTO some_database.some_destination_table (
          created_date
        , user_id
        , total_amount
      ) SELECT
          DATE(sst.created_at) AS created_date
        , sst.user_id
        , SUM(sst.amount) AS total_amount
      FROM
        some_database.some_source_table sst
      WHERE
        -- Note the usage of quotes surrounding the lbound and ubound vars.
        -- This is is required when dealing with dates / datetimes
        sst.created_at >= '#{lbound}' AND sst.created_at < '#{ubound}'
      GROUP BY
          DATE(sst.created_at)
        , sst.user_id]

    # Note that there is no sql sanitization here so there is *potential*
    # for SQL injection. That being said you'll likely be using this gem
    # in an internal tool so hopefully your co-workers are not looking to
    # sabotage your ETL pipeline. Just be aware of this and handle it as
    # you see fit.
  end

  etl.after_etl do |etl|
    # All post-ETL work is performed in this block.
    #
    # Again, to finish up with an example:
    #
    etl.query %[
      UPDATE some_database.some_destination_table
      SET message = "WOW"
      WHERE total_amount > 100]
  end
end

At this point it is possible to run the ETL instance via:

etl**.**run

which executes #ensure_destination, #before_etl, #etl, and #after_etl in that order.

Note that #etl executes #start and #stop once and memoizes the result for each. It then begins to iterate from what #start evaluated to up until what #stopevaluated to by what #step evaluates to.

Check out the source code on Github.

What’s Next?

The next step is to integrate each set of ETL operations that our library manages into a pipeline framework, which will serve as the backbone of our evolving data warehouse. We’re currently building both the pipeline and data warehouse and look forward to sharing our progress with you in the near future. If you’d like to tackle these problems with us, get in touch by going to https://squareup.com/careers/business and clicking “Finance.” Jeff Iacono business intelligence and pretendgineer @Square. Alum of @elegantbuild, $LEH, @dartmouthmedium.com

Table Of Contents