I’ve been working with Kafka Streams for a few months and I love it! Here’s the great intro if you’re not familiar with the framework. In the sections below I assume that you understand the basic concepts like
KTable, joins and windowing.
Message enrichment is a standard stream processing task and I want to show different options Kafka Streams provides to implement it properly.
Scenario 1: enriching using static (or mostly static) data
Let’s imagine the following situation: you have a stream of address updates and every message in the stream contains a state (province). You need to enrich this information with sales tax rates. Every state has a different sales tax rate, but this information is not changed very often (may be once a year or once in a few months), so it’s practically “static”.
In this case you’d need to represent your main stream of updates as a
KStream and create a
KTable containing state as a key and sales tax rate as a value. After that it should be pretty straightforward to apply a simple
KStream-KTable non-windowed join:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
If, for some reason, a sales tax entry could not exist for a particular state, we can use a
leftJoin operation and have a fallback value or just ignore the enrichment in this case, using something like this:
1 2 3 4 5 6 7 8
Still, there is an issue with this implementation. Because address update stream and sales tax stream have different message IDs we need to repartition one stream a few times (first – to use the state as an ID, so we can join, second – return back to the address ID).
KTable is not very big we may use a
GlobalKTable instead. This type of table doesn’t use partitioning and instead just creates a copy of the same table on every processing node.
Scenario 2: enriching using data sources we control
Let’s look at another typical situation: we have a stream of user activity and we need to enrich it with additional user information (like email, address, preferences, etc.). It’s very common to only have a user ID in a message, everything else about the user can be found in a dedicated database / API. In this particular case we’re lucky, since we also control that database and we can use the following approach:
- Stream all database changes (CRUD) as a changelog, using event sourcing.
- Potentially duplicate a lot of data and introduce eventual consistency (both can be OK)
- In the end, have a separate “view” on user data based on the changelog in Kafka
After that the enrichment itself is trivial:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
As you can see, we don’t need to change the message keys and repartition data, both data streams already use user ID as a key.
And again, if for some reason user data could not be available,
leftJoin can be used to provide a fallback value or skip the enrichment (take a look at the previous section).
Scenario 3: enriching using data sources we don’t control
Finally, the most complicated example. Imagine the same user activity stream like in the previous section. Now, every message contains an IP address and we want to do a simple ip2location transformation. So, every time we see an address like
126.96.36.199 we want to enrich the message with
Vancouver, British Columbia, Canada.
The ip2location transformation itself is just an example and it can be done with a lot of free and commercial APIs, for example:
For your use cases you may need to call some other APIs, for example your billing system, CRM, etc. But it’s just the same problem in the end – you don’t control it, so you can’t emit changeset events like in the previous section.
Scenario 3a: naive solution
So, how do we go about this? Here’s a very naive solution:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
This implementation is going to kill the throughput! Because we need to call an external API for every single message in our stream, which is very expensive. Also, what happens when the call fails? We need to think about proper error handling, retries, etc.
BTW: Apache Flink has a special Async I/O API to make this option actually usable.
Scenario 3b: caching
The implementation above can be slightly improved by introducing an internal cache. So, every time we need to call an external API we’re going to check the cache first, then call the API if nothing found. This implementation is still relatively simple, but can be a good fit for some use-cases.
Kafka Streams provide state stores for managing state in an efficient and reliable way. Let’s implement a low-level Processor for the stateful enrichment:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
And here’s how you would use it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
We create suppliers for the state store and the processor and then the processor is applied to all messages using
This would work as we expected: every time a new IP address is observed it’ll take a bit of time to do the API call, but after that the
transform step should be instantaneous.
Our state store is also backed by local RocksDB database and changelog Kafka topic, so it’s pretty reliable.
Final thing to note here: the state store we create in this example is going to grow all the time (which might be ok for some use-cases). If you’re not happy with that you could use a window state store (great example here) and a retention period like 1 hour or 1 day.
Scenario 3c: separate stream for extraction
Our implementation using the cache and a low-level processor may not be good enough if we want to process all messages as soon as possible, without blocking on external API calls.
So, in this case, we can create a separate stream of data, extracted from a source one. Once this stream is enriched it’s joined back with the original one. In the end, it’s actually a bit more complex than that. Here’s the complete diagram (four Kafka topics in total):
First, we would extract IP addresses to a separate topic:
1 2 3 4 5 6 7 8 9 10 11
IpLocationMessage is just a little container class with IP address and location (empty string as a default) fields.
After that we can apply the snippets from
Scenario 3b to enrich this stream and write results as the IP locations stream.
Now we need to join the original user activity stream and the enriched ip locations. But here’s the problem – we can’t use
KTable to represent ip locations stream. Because if we do, we could easily miss a match in a join (and drop some messages), since it takes some time to do an external API call and
KTable will probably not contain all required enriched IP locations right away.
Solution: use windowed join! In this case, even if the right side of a join is not immediately available, Kafka Streams topology is smart enough to wait up to a window size for a match. And we just need to make sure to make the window wide enough for that.
Unfortunately, Kafka Streams doesn’t provide windowed joins between
KTables! So, we’ll have to join two
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
This snippet is very similar to the one from
Scenario 1, except it uses a windowed joined between two
Downside? It’s very likely to have duplicates, since every time there is the same entry on the right side (the same IP location) the join will happen again! It’s probably mandatory to implement some deduplication logic (great example here) after all these steps.
Any message enrichment scenario is probably unique, so don’t try to use the same solution! I demonstrated a few different options, but the number of possible solutions is probably infinite, so don’t be afraid to experiment! ;–)
PS: https://github.com/confluentinc/kafka-streams-examples is a great resource in addition to the Kafka Streams documentation.