Effective batching in streaming dataflow jobs

How we make near real time monitoring possible

Reddit
LinkedIn

Background

Square processes nearly 2 billion payments annually. With payments being processed all over the world at such a fast rate, it becomes difficult to monitor the state of payments. We want to be able to monitor payment traffic in near real time so we can identify payment trends: how many payments are being processed per minute, the gross payment volume (GPV) within the hour, if any payments are failing (declining), etc.

The idea is straightforward. Within Square we have a service that sends Payment messages to a PubSub topic that we can continuously listen to for new data. This data flows through our data pipeline, and eventually ends up in our metrics aggregator: SignalFX. In order to process this data, transform it, and submit it to SignalFX, we use Google Dataflow.

The first thing we had to consider when establishing this process was how to format the data we would send to SignalFX. Although we were working on an individual payment message basis when consuming off the topic, we would need to aggregate payments somehow so that we weren’t sending a time series point for every payment. The aggregations would happen around what SignalFX calls dimensions. Our datapoint becomes the following:

{
  "metric_name": "payment.auth.amount",
  "metric_value": 35000,
  "dimensions": {
    "event_type": "AUTH"
    "payment_gateway": "PAYMENT_GATEWAY"
    "country_code": "USD",
    "instrument_type": "VISA",
    "bin": "123456",
    "status": "success"
  }
}

These dimensions are the keys we use to group the payments. Each grouping also has an aggregate function applied to the values. The values depend on the type of metric being emitted. A count metric would COUNT the number of keys grouped, whereas an amount metric would SUM the values grouped by the key. These transformations are very straightforward, and are easy to do with Apache Beam.

Dataflow Job

The dataflow job has to perform a few steps in order to emit to SignalFX.

image3

The steps presented in Figure 1 give a high level view into what the dataflow job does. There are some things that I’ll gloss over as they’re simple enough and not relevant to the context of this post. To start, the job will pull messages from its subscription on PubSub and apply a five second window. Within that window, we convert the message payload into a Collection of payments.

The next step is to create, aggregate, and send metrics to SignalFX. There are certain restrictions on SignalFX’s API that need to be considered when emitting. SignalFX performs rollup calculations on metric time series on a per-second basis. These rollup calculations are simple mathematical functions such as: SUM, AVG, DELTA, MIN, MAX, etc. This means we needed to try and limit the API calls to around once per second in order to not have too much skew in our data. In order to do that, we need to limit Beam from sending to multiple workers to emit to SignalFX.

A step in a Beam pipeline can either be a transform (PTransform) or a function (DoFn). We apply transforms to the data through the PTransform, which applies some logical function on our dataset. We use our PTransforms to aggregate the data (GroupByKey) and then pass it into our DoFn which will perform some IO, in this case emitting to an API. These steps are meant to be parallelizable so that a job can farm out small bundles of data to multiple machines in order to efficiently process data. However, in our case we didn’t want it to be parallelizable. We’re using Beam and Dataflow to easily and efficiently process data, but we wanted to limit our parallelization on the output step to be run on a single worker. So there’s a restriction on how the data gets to the final DoFn step: it can’t be divided into bundles and farmed out to multiple worker processes.

static class AggregatePaymentMetrics extends 
  PTransform<PCollection<Payments>, 
             PCollection<KV<DimensionGroupKey, Long>>> {

    @Override public PCollection<KV<DimensionGroupKey, Long>> expand(
      PCollection<HashMap<String, Object>> payments) {      

    // create AMOUNT metrics
    PCollection<KV<DimensionGroupKey, Long>> paymentAmounts = AMOUNT
    // create COUNT metrics
    PCollection<KV<DimensionGroupKey, Long>> paymentCounts = COUNTS

    return PCollectionList.of(paymentAmounts).and(paymentCounts);
  }
}

image4

This PTransform outputs a PCollection of KV, which is an immutable key-value pair class provided by Beam. Since the collection this will output is a list of key-value pairs, they will be split up and sent off to multiple workers in the DoFn step. This is because the dataset itself is splittable. The problem is we don’t want them to be split up. We want them all to be sent to the same worker.

How can we make sure that they’re all sent to the same worker? There are two options we can take here. The first is to save the aggregates of a window into memory, and pass that data into the final IO step as side-input. This would guarantee all data goes to a worker. The problem with this approach for us was that we would be emitting the data passed in as side input. If the ParDo created multiple workers, each would get the full side input, and as a result would emit duplicate metrics to SignalFX. That idea didn’t work out for us.

Our other option is to add another layer of grouping on top of the one we’re currently doing in the AggregatePaymentMetrics PTransform. This grouping will be done over a single key, a “dummy” key. This dummy key can be anything really, as long as it doesn’t change. The end result would be that the PCollection of payment metrics will all be mapped to a single key, and then the ParDo step will send them all into a single worker machine instead of multiple ones. To transform the dataset in order to have it mapped to a dummy key, we’ll create an additional step in our pipeline that all the transformed data will go through which just maps to that key. It looks something like:

public static class ApplyDummyKey
      extends DoFn<KV<DimensionGroupKey, Value>, 
              KV<Long, KV<DimensionGroupKey, Value>>> {
    static final long DUMMY_KEY = 1L;

    @ProcessElement public void processElement(ProcessContext context) {
      context.output(KV.of(DUMMY_KEY, context.element()));
    }
  }

Which turns our dataset from a list of KVs into a single KV where the value within it is the original list of KVs.

image2

To combine the two parts together, we’ll do the following:

  1. Aggregate the metrics like we were originally
  2. Map all of them to a single dummy key (such as a Long value of 1)
  3. Fetch an Iterable that contains all the values we want to process by the final DoFn

This looks like the following pipeline:

// create AMOUNT metrics
PCollection<KV<DimensionGroupKey, Long>> paymentAmounts = AMOUNTS
// create COUNT metrics
PCollection<KV<DimensionGroupKey, Long>> paymentCounts = COUNTS

PCollectionList<KV<DimensionGroupKey, Long>> createAggregates =
  PCollectionList.of(paymentCounts).and(paymentAmounts);

return createAggregates
          .apply("Flatten multiple lists into one", 
                  Flatten.pCollections())
          .apply("Apply dummy key", ParDo.of(new ApplyDummyKey()))
          .apply("Group elements by dummy key", GroupByKey.create())
          .apply("Fetch values per key", Values.create());

image1

This way, the ParDo that used to get a small chunk of the elements as they were bundled up and distributed will now get a single stream of the data. After implementing this logic the data is still split into smaller chunks, so a single call to the @ProcessElement within the PTransform gets called a number of times within the worker’s life cycle. This means we can’t emit to our external API SignalFX within that method. To overcome this final blocker, we can move our IO from the @ProcessElement to the @FinishBundle which Beam states will only be called upon completion of a ParDo worker which would happen only once for every window of data.

With this, we’re able to have all our other steps within the pipeline be parallelizable, but are able to control how our final IO step is called.

In this post we demonstrated how one can read from an unbounded source of data and control the way the unbounded data flows through the pipeline. This can be very useful when working with APIs with some strict conditions.