Kafka Streams is an advanced stream-processing library with high-level, intuitive DSL and a great set of features including exactly-once delivery, reliable stateful event-time processing, and more.

Naturally, after completing a few basic tutorials and examples, a question arises: how should I structure an application for a real, production use-case? The answer could be very different depending on your problem, however, I feel like there are a few very useful patterns that can be used for pretty much any application.

Any streaming application uses the idea of a topology - a definition of processing steps with one source, a series of transformation steps, and one to many sinks. The ways to structure this kind of application have been discussed for many years, modern stream processing is just an iteration on top of the messaging ideas (there are some differences too, but conceptually it feels like stream processing and messaging is essentially the same thing). And every time we talk about messaging and patterns it’s hard to avoid recalling the classic list of Enterprise Integration Patterns. They’re still very relevant today, so let’s apply them!

Router Link to this heading

Router is a simple pattern: you have one input message, optional transformation, and different types of potential outputs. To be able to use it in Kafka Streams some kind of intermediate message envelope is necessary to incorporate different states of the processing result, which indicate different destinations. For example, we could decide to use MessageSucceded, MessageFailed and MessageSkipped envelopes. Let’s also assume all these classes contain genericRecord field, which is the original Avro payload. Now, just combine our message classes with a branch operator and we get our Router:

 1KStream[] streams = builder
 2    .stream(Pattern.compile(applicationConfig.getTopics()))
 3    // ... a transformation here returns one of the states below
 4    .branch(
 5            (key, value) -> value instanceof MessageSucceeded,
 6            (key, value) -> value instanceof MessageFailed,
 7            (key, value) -> value instanceof MessageSkipped
 8    );
10// MessageSucceeded
11// assuming 'MessageSucceeded' contains an Avro record field that we want to send to the output topic
12streams[0].map((key, value) -> KeyValue.pair(key, ((MessageSucceeded) value).getGenericRecord()))
13    .to(new SinkTopicNameExtractor());  
15// MessageFailed
18// MessageSkipped
19// do nothing

branch is a very powerful operator that allows us to construct a set of completely different sub-topologies. Each sub-topology could write to a different topic or use a custom processor.

Dead Letter Channel Link to this heading

Dead Letter Channel (or Dead Letter Queue, DLQ below) is one of the most useful patterns out there. Eventually, your application will fail during message processing and a very common thing to do in this case is delivering that message to a DLQ for inspection and/or reprocessing.

Unfortunately, Apache Kafka doesn’t support DLQs natively, nor does Kafka Streams. There are a few ways to implement a custom DLQ in Kafka Streams, one option is described here. I feel like the better solution is using the Router pattern above + a Processor with a custom Kafka Producer, something like this:

 1public class MessageFailureHandler implements Processor<String, MessageFailed> {
 2    private ProcessorContext context;
 4    @Override
 5    public void init(ProcessorContext context) {
 6        this.context = context;
 7    }
 9    @Override
10    public void process(String key, MessageFailed value) {
11        String reason = generateFailureReason(value);
13        try {
14            DeadLetterQueue.getInstance().send(
15                key == null ? null : key.getBytes(),
16                AvroUtils.serialize(value.getGenericRecord()),
17                context.headers(),
18                context.topic(),
19                reason
20            );
21        } catch (IOException e) {
22            LOG.error("Could not re-serialize record!", e);
23        }
24    }
26    @Override
27    public void close() {}
29    private String generateFailureReason(MessageFailed value) {
30        // ... handle failure reason generation here
31    }

and DeadLetterQueue could be the following singleton:

 1public class DeadLetterQueue {
 2    private KafkaProducer<byte[], byte[]> dlqKafkaProducer;
 4    public static DeadLetterQueue getInstance() {
 5        // standard singleton logic here
 6    }
 8    private DeadLetterQueue() {
 9        Properties props = new Properties();
10        // ... initialize props
11        this.dlqKafkaProducer = new KafkaProducer<>(props);
12    }
14    public void send(byte[] key, byte[] value, Headers headers, String sourceTopic, String reason) throws KafkaException {
15        headers.add(new RecordHeader("failure.reason", reason.getBytes()));
16        headers.add(new RecordHeader("failure.time", String.valueOf(System.currentTimeMillis()).getBytes()));
18        String dlqTopic = generateDLQTopic(sourceTopic);
20        LOG.warn("Sending to Dead Letter Queue {}: {}", dlqTopic, reason);
22        dlqKafkaProducer.send(new ProducerRecord<>(
23            dlqTopic,
24            null,
25            key,
26            value,
27            headers)
28        );
29    }
31    private String generateDLQTopic (String sourceTopic) {
32        // ... handle DLQ topic generation here
33    }

It’s important to highlight a few things:

  • DeadLetterQueue’s logic is oversimplified, there is no batching, no callback on send method to check for an exception, etc. Tweaks like these depend on specific use-cases
  • We need a way to serialize a message that’s outside of the Kafka SerDe logic. A solution in this case is to move the logic to some kind of Utils class, so it can be leveraged by the SerDe as well as other components like DLQ.

DLQ can be nicely integrated with a Router via ProcessorSupplier

 1public class MessageFailureHandlerSupplier implements ProcessorSupplier {
 2    @Override
 3    public Processor get() {
 4        return new MessageFailureHandler();
 5    }
 8// ...
10// and then when defining your topology, initialize the supplier:
11MessageFailureHandlerSupplier messageFailureHandlerSupplier = new MessageFailureHandlerSupplier();
13// and use it:

Meter Link to this heading

There is no pattern called Meter in the original EIP list, however, the idea here is somewhat similar to Detour or Wire Tap. We want to measure our application message rates, at least at the beginning and at the end of the pipeline (and potentially at every major transformation step as well). Let’s say we have some kind of metrics client and we just want to report message counts. In this case, using peek operator in our topology does the trick:

1// ...
5// ...

MetricsHandler can be smart enough to report different message states differently, for example, tag MessageSucceeded with success, MessageFailure with failure, etc.

But what if we want to report an overall application lag metric? One step is not enough, we actually need to record the time at the start AND the end of the pipeline, and report the difference. An internal header can be used to pass initial timestamp value. For example, imagine injecting two extra steps:

1// ...
4.transform(...) // actual transformation business logic
7// ...

Where InputMetricsHandler records current system timestamp and passes it as a header, OutputMetricsHandler records another timestamp and calculates a difference between the two, reporting the lag. We had to use transform instead of a peek here to get access to the headers.

More? Link to this heading

Most of the EIP patterns are already present in Kafka / Kafka Streams or can be easily implemented. And some patterns provide its core functionality like Pipes and Filters, Aggregator and Guaranteed Delivery. What important patterns do you leverage? Leave a comment or hit me on social media!