I’ve been using Kafka Connect for a few years now, but I’ve never paid much attention to Single Message Transformations (SMTs), until recently. SMTs are simple transforms that are applied to individual messages before they’re delivered to a sink connector. They can drop a field, rename a field, add a timestamp, etc.
I always thought that any kind of transformation should be done in a processing layer (for example, Kafka Streams) before hitting the integration layer (Kafka Connect). However, my recent experience with configuring an Elasticsearch Sink connector proved me wrong! Complex transformations should definitely be handled outside of Connect, but SMTs can be quite handy for simple enrichment and routing!
SMTs and Routing
Before going to a concrete example, let’s understand how SMTs allow us to apply routing changes. In Kafka Connect, it’s widespread to use Kafka’s topic name as a destination in the sink. For example, the S3 connector uses the topic name as a part of the destination path; Elasticsearch uses the topic name to create an index, etc.
Kafka Connect has a few Router SMTs like TimestampRouter, RegexRouter, etc. that provide various ways to modify the Kafka topic inside the sink. This topic is not going to be used by Kafka to actually write to it; it’s just used in Connect routing, that’s it.
For example, if we have a topic named
a.metrics and we want to create daily indices in Elasticsearch we could use a TimestampRouter like this:
1 2 3 4
This transform configuration will generate intermediate (used only in the sink connector) topics like
a.metrics-2020.01.02, and so on that will be used for creating indices in Elasticsearch.
Elasticsearch Sink Connector Configuration
The example I’m going to provide is a simplified version of the connector I had to configure recently. I had one topic (
source.topic) with different categories of messages inside. These categories had to be saved as separate indices in Elasticsearch (due to very different schemas). I also had to support multiple projects / topics / connectors, so all indices needed to be namespaced.
Finally, Elasticsearch and Kibana are most useful when dealing with time-series data. I needed to add a timestamp field in a specific format for Elasticsearch to parse.
So, let’s take a look at the connector configuration for this use-case:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
The first three transforms (
AddSuffix) are used to route a message based on the payload
category field, the current date and a static prefix.
ReplaceTopic is a
com.sap1ens.connect.transforms.FieldRouter SMT, which is a custom SMT that looks like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
This SMT simply tries to extract a field from an Avro payload and use it as a topic name.
AddPrefix is a
org.apache.kafka.connect.transforms.RegexRouter SMT. It adds a static project name to the topic once it’s transformed into a category.
AddSuffix is a
org.apache.kafka.connect.transforms.TimestampRouter SMT that appends the current date to the topic name, so it would be possible to delete old indices.
After these three transforms are applied, a topic that looked like
source.topic would be transformed into
In the end,
ConvertTimestamp SMTs add a
@timestamp field in the right format for Elasticsearch to parse.
As you can see, SMTs can be quite powerful. With five SMTs, we were able to support non-trivial routing and enrich a timestamp field, which allowed this Elasticsearch sink to be pretty much production-ready! Also, adding a new SMT was very straightforward; it only took ~50 lines of code to implement a new Router.