I’ve been using Kafka Connect for a few years now, but I’ve never paid much attention to Single Message Transformations (SMTs), until recently. SMTs are simple transforms that are applied to individual messages before they’re delivered to a sink connector. They can drop a field, rename a field, add a timestamp, etc.

I always thought that any kind of transformation should be done in a processing layer (for example, Kafka Streams) before hitting the integration layer (Kafka Connect). However, my recent experience with configuring an Elasticsearch Sink connector proved me wrong! Complex transformations should definitely be handled outside of Connect, but SMTs can be quite handy for simple enrichment and routing!

SMTs and Routing Link to this heading

Before going to a concrete example, let’s understand how SMTs allow us to apply routing changes. In Kafka Connect, it’s widespread to use Kafka’s topic name as a destination in the sink. For example, the S3 connector uses the topic name as a part of the destination path; Elasticsearch uses the topic name to create an index, etc.

Kafka Connect has a few Router SMTs like TimestampRouter, RegexRouter, etc. that provide various ways to modify the Kafka topic inside the sink. This topic is not going to be used by Kafka to actually write to it; it’s just used in Connect routing, that’s it.

For example, if we have a topic named a.metrics and we want to create daily indices in Elasticsearch we could use a TimestampRouter like this:

"transforms": "TimestampRouter",
"transforms.TimestampRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.TimestampRouter.topic.format": "${topic}-${timestamp}",
"transforms.TimestampRouter.timestamp.format": "yyyy.MM.dd"

This transform configuration will generate intermediate (used only in the sink connector) topics like a.metrics-2020.01.01, a.metrics-2020.01.02, and so on that will be used for creating indices in Elasticsearch.

Elasticsearch Sink Connector Configuration Link to this heading

The example I’m going to provide is a simplified version of the connector I had to configure recently. I had one topic (source.topic) with different categories of messages inside. These categories had to be saved as separate indices in Elasticsearch (due to very different schemas). I also had to support multiple projects / topics / connectors, so all indices needed to be namespaced.

Finally, Elasticsearch and Kibana are most useful when dealing with time-series data. I needed to add a timestamp field in a specific format for Elasticsearch to parse.

So, let’s take a look at the connector configuration for this use-case:

json
 1{
 2  "name": "ESSink-v1",
 3  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
 4  "connection.url": "http://elasticsearch.host",
 5  
 6  // not used for ES 7
 7  "type.name": "",
 8
 9  "tasks.max": "10",
10
11  "topics": "source.topic",
12
13  "transforms": "ReplaceTopic,AddPrefix,AddSuffix,InsertTimestamp,ConvertTimestamp",
14
15  "transforms.ReplaceTopic.type": "com.sap1ens.connect.transforms.FieldRouter",
16  "transforms.ReplaceTopic.field": "category",
17
18  "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
19  "transforms.AddPrefix.regex": ".*",
20  "transforms.AddPrefix.replacement": "project_a-$0",
21
22  "transforms.AddSuffix.type": "org.apache.kafka.connect.transforms.TimestampRouter",
23  "transforms.AddSuffix.topic.format": "${topic}-${timestamp}",
24  "transforms.AddSuffix.timestamp.format": "yyyy.MM.dd",
25
26  "transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
27  "transforms.InsertTimestamp.timestamp.field": "@timestamp",
28
29  "transforms.ConvertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
30  "transforms.ConvertTimestamp.field": "@timestamp",
31  "transforms.ConvertTimestamp.format": "yyyy-MM-dd'T'HH:mm:ss'Z'",
32  "transforms.ConvertTimestamp.target.type": "string"
33}

The first three transforms (ReplaceTopic, AddPrefix, AddSuffix) are used to route a message based on the payload category field, the current date and a static prefix.

ReplaceTopic is a com.sap1ens.connect.transforms.FieldRouter SMT, which is a custom SMT that looks like this:

java
 1package com.sap1ens.connect.transforms;
 2
 3import io.confluent.connect.storage.util.DataUtils;
 4import java.util.Map;
 5import org.apache.commons.lang3.StringUtils;
 6import org.apache.kafka.common.config.ConfigDef;
 7import org.apache.kafka.connect.connector.ConnectRecord;
 8import org.apache.kafka.connect.data.Struct;
 9import org.apache.kafka.connect.errors.DataException;
10import org.apache.kafka.connect.transforms.Transformation;
11import org.apache.kafka.connect.transforms.util.SimpleConfig;
12
13public class FieldRouter<R extends ConnectRecord<R>> implements Transformation<R> {
14
15    public static final String OVERVIEW_DOC = "Extract and replace topic value with provided field";
16
17    private static final String FIELD_CONFIG = "field";
18
19    public static final ConfigDef CONFIG_DEF = new ConfigDef()
20        .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract.");
21
22    private String fieldName;
23
24    @Override
25    public void configure(Map<String, ?> props) {
26        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
27        fieldName = config.getString(FIELD_CONFIG);
28    }
29
30    @Override
31    public R apply(R record) {
32        String topic;
33
34        if (record.value() instanceof Struct) {
35            topic = DataUtils.getNestedFieldValue(record.value(), fieldName).toString();
36        } else {
37            throw new DataException("Error encoding partition");
38        }
39
40        if (StringUtils.isBlank(topic)) {
41            return record;
42        } else {
43            return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
44        }
45    }
46
47    @Override
48    public ConfigDef config() {
49        return CONFIG_DEF;
50    }
51
52    @Override
53    public void close() {
54
55    }
56}

This SMT simply tries to extract a field from an Avro payload and use it as a topic name.

AddPrefix is a org.apache.kafka.connect.transforms.RegexRouter SMT. It adds a static project name to the topic once it’s transformed into a category.

AddSuffix is a org.apache.kafka.connect.transforms.TimestampRouter SMT that appends the current date to the topic name, so it would be possible to delete old indices.

After these three transforms are applied, a topic that looked like source.topic would be transformed into project_a-some_category-2020.01.01.

In the end, InsertTimestamp and ConvertTimestamp SMTs add a @timestamp field in the right format for Elasticsearch to parse.

Summary Link to this heading

As you can see, SMTs can be quite powerful. With five SMTs, we were able to support non-trivial routing and enrich a timestamp field, which allowed this Elasticsearch sink to be pretty much production-ready! Also, adding a new SMT was very straightforward; it only took ~50 lines of code to implement a new Router.