Kafka Streams is an advanced stream-processing library with high-level, intuitive DSL and a great set of features including exactly-once delivery, reliable stateful event-time processing, and more.
Naturally, after completing a few basic tutorials and examples, a question arises: how should I structure an application for a real, production use-case? The answer could be very different depending on your problem, however, I feel like there are a few very useful patterns that can be used for pretty much any application.
Any streaming application uses the idea of a topology – a definition of processing steps with one source, a series of transformation steps, and one to many sinks. The ways to structure this kind of application have been discussed for many years, modern stream processing is just an iteration on top of the messaging ideas (there are some differences too, but conceptually it feels like stream processing and messaging is essentially the same thing). And every time we talk about messaging and patterns it’s hard to avoid recalling the classic list of Enterprise Integration Patterns. They’re still very relevant today, so let’s apply them!
Router is a simple pattern: you have one input message, optional transformation, and different types of potential outputs. To be able to use it in Kafka Streams some kind of intermediate message envelope is necessary to incorporate different states of the processing result, which indicate different destinations. For example, we could decide to use
MessageSkipped envelopes. Let’s also assume all these classes contain
genericRecord field, which is the original Avro payload. Now, just combine our message classes with a
branch operator and we get our Router:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
branch is a very powerful operator that allows us to construct a set of completely different sub-topologies. Each sub-topology could write to a different topic or use a custom processor.
Dead Letter Channel
Dead Letter Channel (or Dead Letter Queue, DLQ below) is one of the most useful patterns out there. Eventually, your application will fail during message processing and a very common thing to do in this case is delivering that message to a DLQ for inspection and/or reprocessing.
Unfortunately, Apache Kafka doesn’t support DLQs natively, nor does Kafka Streams. There are a few ways to implement a custom DLQ in Kafka Streams, one option is described here. I feel like the better solution is using the Router pattern above + a Processor with a custom Kafka Producer, something like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
DeadLetterQueue could be the following singleton:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
It’s important to highlight a few things:
- DeadLetterQueue’s logic is oversimplified, there is no batching, no callback on
sendmethod to check for an exception, etc. Tweaks like these depend on specific use-cases
- We need a way to serialize a message that’s outside of the Kafka SerDe logic. A solution in this case is to move the logic to some kind of Utils class, so it can be leveraged by the SerDe as well as other components like DLQ.
DLQ can be nicely integrated with a Router via
1 2 3 4 5 6 7 8 9 10 11 12 13 14
There is no pattern called Meter in the original EIP list, however, the idea here is somewhat similar to Detour or Wire Tap. We want to measure our application message rates, at least at the beginning and at the end of the pipeline (and potentially at every major transformation step as well). Let’s say we have some kind of metrics client and we just want to report message counts. In this case, using
peek operator in our topology does the trick:
1 2 3 4 5
MetricsHandler can be smart enough to report different message states differently, for example, tag
But what if we want to report an overall application lag metric? One step is not enough, we actually need to record the time at the start AND the end of the pipeline, and report the difference. An internal header can be used to pass initial timestamp value. For example, imagine injecting two extra steps:
1 2 3 4 5 6 7
InputMetricsHandler records current system timestamp and passes it as a header,
OutputMetricsHandler records another timestamp and calculates a difference between the two, reporting the lag. We had to use
transform instead of a
peek here to get access to the headers.
Most of the EIP patterns are already present in Kafka / Kafka Streams or can be easily implemented. And some patterns provide its core functionality like Pipes and Filters, Aggregator and Guaranteed Delivery. What important patterns do you leverage? Leave a comment or hit me on social media!