At New Relic, we’re all about embracing modern frameworks, and our development teams are often given the ability to do so. Recently, the Account Experience (AX) team embraced the Apache Flink framework with the expectation that it would give us significant engineering velocity to solve business needs. Specifically, we needed two applications to publish usage data for our customers. I am happy to say Flink has paid off. As developers, we came up to speed on Flink quickly and were able to leverage it to solve some complex problems.

As an early adopter of Flink, I’ve put together this post to share Flink’s development experience—what it’s like to write the code. Before we get to my five key takeaways, though, a little background is in order:

What is Flink and what were we building with it?

Flink is an open source stream-processing framework. We chose Flink because it’s extremely accurate in its data ingestion, recovers from failures with ease while maintaining state, and was able to scale to meet our needs, all of which is covered in greater detail in Flink’s own introduction. (Note that this post is explicitly not about Flink’s architecture or operational issues.)

The AX team built two Flink-based services that now run in production: the Usage Calculator and the Usage Stamper. The Usage Calculator is an application that reads from Apache Kafka topics containing usage metadata from New Relic APM, New Relic Infrastructure, and New Relic Synthetics agents; the app aggregates data for 24 hours and then writes that data to a Kafka topic containing daily usage data. The Usage Stamper reads from that Kafka topic, and matches the usage data to its account hierarchy, which comes from a separate Kafka topic.

AX team Flink apps diagram

A quick look inside a Flink app—the Usage Calculator

Essentially, every Flink app reads from a stream of input, runs a handful of operations in parallel to transform the data, and writes the data out to a datastore. For the most part, what makes a program unique is the operations it runs.

Writing code to get a basic Flink application running is surprisingly simple and relatively concise. At least, the AX team was surprised and impressed. The Usage Calculator fits into the read, process, write model:

  1. The app consumes data on Kafka topics from the agents.
  2. The app runs the data through several operations to calculate the daily usage for each account for each product, using business rules specific to the product.
  3. The app writes the result to a Kafka topic that is read by the Usage Stamper app.

Flink code sample 1

Flink code sample 2

When we first started writing the app, it took one day to go from zero to a first-draft version. In total it’s about a hundred lines of code—not that complex at all.

Takeaway 1. Know your first-order Flink concepts

Here’s what we needed to understand to implement the first iteration of our application.

DataStream: These are Flink classes that represent an unbounded collection of data.

Flink app code sample 3

Time Windows: Stream elements are grouped by the time in which they occur. A time window is expressed in processing time, event time, or ingestion time. In the Usage Calculator, we use event time.

Flink app code sample 4

Operators: Operators transform DataStreams into new DataStreams. The Usage Calculator uses a map to transform the message into a business object and a reduce function to “count” the number of messages we get (as a proxy for usage).

Flink app code sample 5

In a deployed cluster, these operators run on separate machines.

Takeaway 2. Be prepared: Flink uses lazy evaluation to separate configuration and execution

Flink orchestrates operators running in parallel. A job manager sets up and coordinates multiple task managers. The task managers run user-defined operations on the data Flink passes into them. Flink separates the configuration of operators from the execution of the program by using “lazy evaluation.”

The main() method determines the configuration of the application—how Flink will set up operators. That is to say, the reading and processing of data does not begin until the execute() method is called on the environment. The execute() call is when Flink starts the actual processing.

For example, this code sets up an object graph:

Flink app code sample 6

Flink app code sample 7

If you monitor or set breakpoints in methods that are called directly from the main() method, you’ll see them executed only on startup. What you really want to do is monitor the operators themselves, or set breakpoints inside the operators.

Additionally, you could break out operators into separate classes (which also makes them easier to test), or you could set up streams separately as a pattern for keeping the main() method simple.

Takeaway 3. Your local development environment is not the same as the deployed environment

We learned and re-learned this lesson. As I mentioned earlier, Flink allows you to run massive parallel operations on your data thanks to a job manager that coordinates multiple task managers, the latter of which run user-defined operations on the data Flink passes into them. In a fully deployed situation in which the cluster is spread across several machines, the job manager and each task manager can run on separate machines, which essentially means two things:

  1. Task managers have a separate lifecycle and environment from the job manager.
  2. Flink must be able to serialize operators so that it can send them to task managers that are on different machines.

So, what does this mean for a developer? Code that runs correctly in a local Flink environment may not get past the startup phase in a deployed environment.

When we were building and testing our app, we ran into this problem when we wanted to pass environment variables as part of the operators. Our first attempt was to initialize static variables in the UsageCalculator class and pass them to operators. We wrote code that checked that the environment variables were set before initializing.

Flink app code sample 8

That worked perfectly on our local machines, where the environment was shared across the job manager and task managers. However, as soon as we deployed to a cluster that was distributed across many machines, the environment variables were not present on the task managers and our code that checked for them failed.

So we wrote a separate class that got instantiated on the job manager and read environment variables. Then we passed that class into our operators, so they could access the same variables. Only this time, we didn’t make the environment class serializable. That didn’t work either; here’s the stack trace we got on startup:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FilterFunction is not serializable. The object probably contains or references non serializable fields.
     at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
     at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
     at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
     at org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:627)
     at com.newrelic.UsageCalculator.main(UsageCalculator.java:54)
Caused by: java.io.NotSerializableException: com.newrelic.util.Env
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream…

Our final solution was to pass only individual fields to the operator to limit what the operator need to know about.

Flink app code sample 9

Flink app code sample 10

Takeaway 4. Leverage Flink’s power to simplify complex tasks

We’ve seen the basics of a Flink application: input, processing, and output. We’ve hit a couple of potholes along the way: lazy execution and the differences between a local and deployed development environment. Now we’re ready to start leveraging Flink to make what we originally thought would be a difficult operation quite simple.

The Usage Stamper is the second Flink application in our pipeline. It takes in the information that the Usage Calculator published about usage in a New Relic account and creates events to record that usage across multiple related accounts. This means that the Usage Stamper can write up to three records for each incoming message. In order to do this, the Usage Stamper must know about the related accounts for every account that has usage—this is what the team thought would be the tough part.

We discussed several approaches but decided to merge data from the Usage Calculator with data from another stream in the New Relic ecosystem that stored account information. Flink has several operators for merging streams; we used the CoFlatMapFunction, which takes data from two joined streams and allows you to define the result of the mapping.

It seemed straightforward, but we were still surprised at how simple it was to implement. The simplicity was inherent in the fact that Flink passes only single records from each stream. So instead of having to cache the entire account database on disk to look up the account for a usage, our code is able to operate with only a specific usage and its associated account. CoFlatMapFunction calls separate methods when it receives an account or a usage message, so it’s simple to process each separately and create a new business object when both have arrived.

Takeaway 5. Defer and minimize state

At this point, the AX team was cruising with Flink development. We had multiple applications running in production; we’d learned to harness Flink’s power and were processing data at lightning speed. And then our product manager reminded us about a field we needed to calculate and add to our output.

This work needed to take place in the Usage Calculator, so let’s quickly recap what we saw the first version of the calculator doing:

  1. Reading records from a Kafka topic
  2. Mapping those records to a model class to use for our calculations
  3. Aggregating the records across a specific time period
  4. Creating events and sending them to our output Kafka topic

Ahh! All we needed to do to add the new calculation was:

  1. Add a field to our model class and populate it from the incoming data
  2. Revise the reduce operation to include calculation of the data before creating the event

Easy code to write. The hitch, however, was that adding the field to our model class meant that we had to change what field all of our operators were using. Not a big deal, right?

Except…

Flink stores state for certain operations so that it can be shut down and restarted without losing data. Adding the new field broke Flink’s ability to read in older versions of our model class that it had stored in state. The result? When we deployed, we lost all the usage data that had not been written to the database. Since our aggregation window (the amount of time before we write out data) is 24 hours, that meant we lost a day’s worth of usage data.

We didn’t want to lose data. Our PM didn’t want us to lose data. Our customers really didn’t want us to lose data. We needed to figure out how to evolve our code without losing data so we could continue to iterate quickly. We did this by minimizing the amount of information in our model so that it would change less frequently.

Originally, we mapped incoming messages to a data model. This data model contained all of the fields needed for business operations and was what Flink stored in our state. To minimize the changes that would break our ability to store and read state, we needed to minimize changes to that model; the model had to contain only the fields needed to identify an individual piece of data (passed to the keyBy method) and be able to aggregate the data (i.e., count the number of hours of usage). We changed our code so that an incoming model class is created with the minimum amount of data needed and contains the payload of the incoming message. Only after the aggregation operation is the incoming model transformed into a business model on which we perform business operations, which may change over time.

A successful project

The Account Experience team is happy with our choice to embrace Flink. Stream processing was new to most of us on the AX team when we started this project, but we were able to quickly get up to speed and leverage Flink’s features to simplify a complex problem. Along the way, we learned a new way of thinking about data processing and five important lessons for working with Flink:

  1. Know your first-order Flink concepts
  2. Flink uses lazy evaluation
  3. Your local development environment is not the same as the deployed environment
  4. Leverage Flink’s power to simplify complex tasks
  5. Defer and minimize state

Ruby has been happily developing software in Portland, Oregon since before “Portlandia” was a thing. She's super excited to be part of New Relic's Accounts Experience team. When she’s not working, she’s probably watching someone play music in a dark venue. View posts by .

Interested in writing for New Relic Blog? Send us a pitch!