Slimming Down Your Kafka Streams Data
So you’ve just put the finishing touches on your fancy new Kafka Streams application, and are going through your list of to-dos to prepare it for a move to a production environment. Configuration? Check. Error handling? Check. Monitoring and logging? Check. Internal state cleanup… Wait, is that something I need to worry about? You might, depending upon the specifics of your application. Thinking about, and designing for, cleanup of your stateful application will save you time later and prevent problems once it’s deployed.
The scope of this post is limited to the default state store in Kafka Streams, which is RocksDB. If you’re plugging in a custom state store, then you’re on your own for state management (though you might want to read along anyway as many of the same concepts apply!). Additionally, the code examples I’ve included are using Spring, though this is just a personal preference; none of the strategies described require any Spring dependencies.
Local data storage is a common side-effect of processing data in a Kafka Streams application. This data is held in “state stores”, which are simple key/value stores backed by a RocksDB database. Aggregations and joins are examples of stateful transformations in the Kafka Streams DSL that will result in local data being created and saved in state stores. You could also put data directly into a state store without any transformations by consuming a topic and outputting it to a KTable, using the StreamsBuilder API. Assuming your source data contains unique keys, the state stores will continue to grow in size unless you do something to remove old data. Under the covers, the state stores are (by default) persisted to your application’s filesystem; and since you probably don’t have infinite filesystem storage, this typically becomes the limiting factor in how large your state store data can grow before it becomes a problem.
In the following example code, a Kafka Streams topology is consuming a stream of “internet-of-things” (IOT) messages, grouping them by a type and a date, and then aggregating that data into counts represented by an object called IotEventMetric.
Because we have a date in our group-by, and assuming new messages continue to come in, we have a never-ending stream of unique keys. This means that the storage required for this aggregation is going to grow unbounded and, if not cleaned up, could eventually exceed the filesystem limits of the application environment. How quickly that happens is highly dependent upon the size and volume of incoming messages as well as the amount of storage allocated to the application.
How do we solve it?
Business requirements can vary greatly, but there is a good chance that there is a logical point in time after which your aggregated data is no longer needed and can therefore be deleted. This could be based on a date, or maybe a lifecycle event or status change that could make that data eligible to be removed from a state store. It would be great if there was a universal TTL (time-to-live) configuration for a state store that automatically removes data, but unfortunately that type of feature doesn’t exist (yet) with Kafka Streams. Nevertheless, there are a few ways you can prune your state stores of old data.
Windowing lets you further group your aggregation or join into segments of time, defined by a start and end date. Optionally, you can add a retention period to the window which sets a minimum time that the data will be maintained before being deleted. Here is the example topology from earlier modified to use a windowed aggregation:
In this example, we set a retention period of 30 days. Once all the records in this window are older than the retention period, Kafka Streams deletes them from the state store as well as the changelog topic, assuming you have fault tolerance enabled (and by default it IS enabled). Data removal from the changelog topic is important because it’s what your Kafka Streams application will use to rebuild the local state stores during application startup, or when migrating data due to the application joining or leaving the consumer group. In other words, if data was only deleted from the state store and not the changelog topic, that data would “magically” reappear later. Using a retention period like this is by far the simplest way to remove old data, though there are a few things to consider:
- Kafka Streams doesn’t delete expired records in the window individually, instead it deletes the entire window once all records in that window have expired.
- There is no background thread to manage the cleanup of old windows; instead that gets handled during normal stream processing as new records are being consumed.
- Grouping your data into segments of time might not make sense for your particular application. Using windows where it doesn’t fit – just to be able to get at a retention period – probably isn’t the right trade off.
In contrast to the DSL, the Kafka Streams processor API gives you more control over the messages flowing through your application. One type of control is access to a writable instance of a state store. By creating a custom processor, you could schedule a Punctuation and iterate over all the records in the state store and delete them based on whatever criteria makes sense for your application. You can add a custom processor within your topology code by leveraging the process() method:
Calling delete in this way also removes the record from the changelog topic for this state store, so the record will remain deleted even after an application restart or migration.
If your state store is the result of an aggregation, and you don’t mind writing some additional code, you can make your topology react to messages with null values (tombstones) and delete data for that key. Conceptually, this is similar to how messages in a compacted topic are deleted: publish a null value into that topic for a given key and Kafka deletes the record from the topic. Here is our example topology that is enhanced to react to tombstones:
With this change, our topology is now checking for a null value and, if present, sets a boolean “delete flag” on our IotEvent object marking it for deletion. In our aggregator, we look for that delete flag and return null if it’s true. Returning null removes the record from the state store as well as its changelog topic. Because aggregate and reduce functions ignore messages with null values, we need to handle this delete case explicitly by marking the object as a “delete record” before it gets to the aggregation.
Publishing tombstones into the topic that feeds your aggregation is probably best done in a different application, which may not always be practical. However, this approach is the most complete – data can be deleted from both your aggregation state store as well as the source topic, assuming it’s compacted (which it probably should be). A few things to keep in mind:
- Be sure to consider the “delete.retention.ms” configuration in your application. (the default is 24 hours) This controls how long the tombstones will stick around in the topic before they get compacted away. The implication is that if your Streams application is lagging behind by more than this value, you may miss the tombstones and won’t have a chance to handle them in your topology.
- Don’t attempt to publish tombstones directly to the changelog topics; this will remove records from that topic, but it won’t remove the corresponding record from the state store.
All of these options require some effort and forethought, but they will help limit the size of your state stores over time. Using a data serialization tool, like Apache Avro, is worth considering not only for the organizational advantages that a schema can provide, but also for the compactness of the data. Even with a good deletion strategy, serializing your messages will help reduce the footprint of your state stores. Hopefully there will be additional features in future releases of Kafka Streams that will help simplify the process of managing state stores. Until then, keep these tips in mind to help keep your data slim and trim!