At New Relic, we’ve built some critical parts of our pipeline with Apache Kafka. Over the years, we’ve hit plenty of issues and devised best practices for managing our Kafka clusters. And we’ve learned a lot about how Kafka works—both effectively and not so effectively—along the way.
One fundamental problem we’ve encountered involves Kafka’s consumer auto commit configuration—specifically, how data loss or data duplications can occur when the consumer service experiences an out of memory (OOM) kill or some other type of hard shutdown.
Let me explain this behavior and what it means for Kafka users.
(Note: This post assumes familiarity with the basics of Kafka, including producer and consumer groups, topic partitions, and offsets.)
A look at the problem
Every message your producers send to a Kafka partition has an offset—a sequential index number that identifies each message. To keep track of which messages have already been processed, your consumer needs to commit the offsets of the messages that were processed.
Unless you’re manually triggering commits, you’re most likely using the Kafka consumer auto commit mechanism. Auto commit is enabled out of the box and by default commits every five seconds.
For a simple data transformation service, “processed” means, simply, that a message has come in and been transformed and then produced back to Kafka. For any other scenario, we’d consider the message to be unprocessed. This is important because if you consume
<InputMessage>, but your service dies before managing to produce
<TransformedMessage>, you don’t want to commit the offset for that input message—you need to let it get picked up again.
This gets to the root of our problem because, unfortunately, that’s not how auto commits work. The Kafka consumer has no idea what you do with the message, and it’s much more nonchalant about committing offsets. As far as the consumer is concerned, as soon as a message is pulled in, it’s “processed.”
So now imagine that your consumer has pulled in 1,000 messages and buffered them into memory. Then an auto commit fires, committing the offsets for those 1,000 messages. However, say your service has now used too much memory and is promptly shut down by an OOM kill signal before all the messages are processed. Hundreds of those messages will never be processed. That’s data loss.
The inverse situation is also possible. You could successfully process those 1,000 messages and then experience a hard shut down before committing the offsets. In this scenario, you’ll re-process hundreds of messages on another instance after your consumers rebalance. That’s data duplication.
Ok, so what can I do about it?
The bad news is that there aren’t any easy fixes here. Fundamentally, this is a problem of weak consistency guarantees. What all Kafka users want is exactly-once processing—a guarantee that you will consume and process messages exactly once. Kafka version 0.11 attempts to solve this problem and has made things slightly better. It’s possible to write an exactly-once pipeline with Kafka 0.11, but to do exactly-once consumption, you need to implement your own transactional semantics in your consumers to tell Kafka when you’re done processing (or roll back if things go wrong).
Additionally, in our testing, we found that the transactional model for message production in Kafka 0.11 didn’t process messages as quickly as we needed it to, taking up to 10 – 100 milliseconds per message. That’s a lot of additional latency that we can’t afford to incur.
Another option would be to “roll your own” exactly-once strategy that would automatically commit offsets only for messages that had reached the end of the processing pipeline. This is a recommendation only for the bravest types out there, as getting this right is pretty hard, and you may cause more problems than you solve.
And there’s always the option to accept the risk of data loss or duplication. Sadly, this is the option that many Kafka users choose, but it’s not always an unreasonable choice. The amount of data you’d actually lose or duplicate in one of these scenarios is relatively small; the auto commit should only be a few seconds off from the actual last committed message. (Your throughout levels are probably the best indicator of how much duplication or loss you’d see). The frequency of occurrence should also be low—ideally your Kafka consumer services aren’t getting regular OOM kill signals.
So, compared to the engineering cost of solving this problem in all of your Kafka consumer services, this may just be a risk that you live with if the the data you work with permits a small amount of loss. Of course, you can always mitigate the risk by building reliable services.
As I said, we have a lot of experience at New Relic managing Kafka clusters. We’ve discovered that building highly reliable services gets tougher as we scale to handle massive data volumes, and we’ve made some practical decisions about how we handle this particular flaw in Kafka should one of our services ever experience a hard shutdown.
In the cases where you accept the risk of data loss, take the following steps to minimize it:
- Be aware of this problem, and document it in your risk matrix.
- Make sure your services are stable. Alert on your service’s SIGKILLs and OOMs.
- When building new services, consider using a streaming system that solves this problem for you right out of the box.