Don’t miss part one in this series: Using Apache Kafka for Real-Time Event Processing at New Relic.
If you’re a recent adopter of Apache Kafka, you’re undoubtedly trying to determine how to handle all the data streaming through your system. The Events Pipeline team at New Relic processes a huge amount of “event data” on an hourly basis, so we’ve thought about this question a lot. Unless you’re processing only a small amount of data, you need to distribute your data onto separate partitions.
In part one of this series—Using Apache Kafka for Real-Time Event Processing at New Relic—we explained how we built the underlying architecture of our event processing streams using Kafka. In this post, we explain how the partitioning strategy for your producers depends on what your consumers will do with the data.
Why partition your data in Kafka?
If you have enough load that you need more than a single instance of your application, you need to partition your data. The producer clients decide which topic partition data ends up in, but it’s what the consumer applications will do with that data that drives the decision logic. If possible, the best partitioning strategy to use is random.
However, you may need to partition on an attribute of the data if
- The consumers of the topic need to aggregate by some attribute of the data
- The consumers need some sort of ordering guarantee
- Another resource is a bottleneck and you need to shard data
- You want to concentrate data for efficiency of storage and/or indexing
In part one, we used the diagram below to illustrate a simplification of a system we run for processing ongoing queries on event data:
Random partitioning of Kafka data
We use this system on the input topic for our most CPU-intensive application—the match service. This means that all instances of the match service must know about all registered queries to be able to match any event. While the event volume is large, the number of registered queries is relatively small, and thus a single application instance can handle holding all of them in memory, for now at least.
The following diagram uses colored squares to represent events that match to the same query. It shows messages randomly allocated to partitions:
Random partitioning results in the most even spread of load for consumers, and thus makes scaling the consumers easier.
Partition by aggregate
On the topic consumed by the service that does the query aggregation, however, we must partition according to the query identifier since we need all of the events that we’re aggregating to end up at the same place.
This diagram shows that events matching to the same query are all co-located on the same partition. The colors represent which query each event matches to:
After releasing the original version of the service, we discovered that the top 1.5% queries accounted for approximately 90% of the events processed for aggregation. As you can imagine, this resulted in some pretty bad hot spots on the unlucky partitions.
To mitigate the hot spots, we needed a more sophisticated partitioning strategy, so we also partitioned by time window to move the hot spots around. We hashed together the query identifier with the time window begin time. This spread the “hot” queries across the partitions in chunks.
In the diagram below, the numbers indicate what time window each message belongs to:
We partition our final results by the query identifier, as the clients that consume from the results topic expect the windows to be provided in order:
Planning for resource bottlenecks and storage efficiency
When choosing a partition strategy, it’s important to plan for resource bottlenecks and storage efficiency.
(Note that the examples in this section reference other services that are not a part of the streaming query system I’ve been discussing.)
Resource bottleneck: We have another service that has a dependency on some databases that have been split into shards. We partition its topic according to the how the shards are split in the databases. This approach produces a result similar to the diagram in our partition by aggregate example. Each consumer will be dependent only on the database shard it is linked with. Thus, issues with other database shards will not affect the instance or its ability to keep consuming from its partition. Also, if the application needs to keep state in memory related to the database, it will be a smaller share. Of course, this method of partitioning data is also prone to hotspots.
Storage efficiency: The source topic in our query processing system shares a topic with the system that permanently stores the event data. It reads in all the same data using a separate consumer group. The data on this topic is partitioned by which customer account the data belongs to. For efficiency of storage and access, we concentrate an account’s data into as few nodes as possible. While many accounts are small enough to fit on a single node, some accounts must be spread across multiple nodes. If an account becomes too large, we have custom logic to spread it across nodes, and, when needed, we can shrink the node count back down.
Rebalance or statically assign partitions?
By default, whenever a consumer enters or leaves a consumer group, the brokers rebalance the partitions across consumers, meaning Kafka handles load balancing with respect to the number of partitions per application instance for you. This is great—it’s a major feature of Kafka. We use this default on nearly all our services.
When a rebalance happens, all consumers drop their partitions and are reassigned new ones. If you have an application that has state associated with the consumed data, such as our aggregator service, you need to drop that state and start fresh with data from the new partition.
However, if dropping state isn’t an option, an alternative is to not use a consumer group and instead use the Kafka API to statically assign partitions, which does not trigger rebalances. Of course, in that case, you must balance the partitions yourself and also make sure that all partitions are consumed.
Since New Relic deals with high-availability real-time systems, we cannot tolerate any downtime for deploys, so we do rolling deploys.
For our aggregator service, which collects events into aggregates over the course of several minutes, we use statically assigned partitions to avoid unnecessarily dropping this state when other application instances restart. The aggregator builds up state that it must drop at every rebalance/restart/deploy. It has to backtrack and rebuild the state it had from the last recorded publish or snapshot. Before we used statically assigned partitions, we had to wait for every application instance to recover before they could restart.
If you use static partitions, then you must manage the consumer partition assignment in your application manually. Here is how we do this in our aggregator service:
We set a configuration value for the number of partitions each application instance should attempt to grab. When each instance starts up, it gets assigned an ID through our Apache ZooKeeper cluster, and it calculates which partition numbers to assign itself. The instance holds onto those partitions for its lifetime. We always keep a couple extra idle instances running—waiting to pick up partitions in the event that another instance goes down (either due to failure or because of a normal restart/deploy). All of our instances run in containers, and we orchestrate them with Marathon to always keep a minimum number of instances running.
The diagram below shows the process of a partition being assigned to an aggregator instance. In this example, we have configured 1 partition per instance:
Your partitioning strategies will depend on the shape of your data and what type of processing your applications do. As you scale, you may need to adapt your strategies to handle new volume and shape of data. Consider what the resource bottlenecks are in your architecture, and spread load accordingly across your data pipelines. It may be CPU, database traffic, or disk space, but the principle is the same. Be efficient with your most limited/expensive resources.