Kafka Connect is a modern open-source Enterprise Integration Framework that leverages Apache Kafka ecosystem. With Connect you get access to dozens of connectors that can send data between Kafka and various data stores (like S3, JDBC, Elasticsearch, etc.).

Kafka Connect provides REST API to manage connectors. The REST API supports various operations like describing, adding, modifying, pausing, resuming, and deleting connectors.

Using REST API for managing connectors might become a tedious task, especially when you have to deal with dozens of different connectors. Although it’s possible to use some web UI tools like lensesio/kafka-connect-ui, it makes sense to follow basic deployment principles: config management, version control, CI/CD, etc. In other words, it’s perfectly fine to start with manual, ad-hoc REST API calls, but ultimately any large Kafka Connect cluster needs some kind of automation for deploying connectors.

I want to describe the approach that my team uses to make Connect management simple and reliable.

Git Link to this heading

All connector configs are stored in a single Git repo. We run a customized version of Connect (with our custom plugins, converters, and other tweaks), so connector configs live in the same repo. Every config change is reviewed and merged before deploying.

Jsonnet Link to this heading

Connect REST API uses JSON as a protocol format, so storing configuration in JSON files (one per connector) seems reasonable. However, with more and more connectors it becomes obvious that there is a lot of duplication across files. Here’s a full config for a typical S3 connector we run:

json
 1{
 2   "connector.class": "com.activision.ds.connect.s3.DSS3SinkConnector",
 3   "flush.size": "1000000",
 4   "format.class": "com.activision.ds.connect.s3.format.DSParquetFormat",
 5   "locale": "en",
 6   "name": "SomeName-v1",
 7   "partitioner.class": "com.activision.ds.connect.s3.partitioner.DSTimeBasedPartitioner",
 8   "path.format": "dt=${ingestion_time}/other_key=${kafka.headers.other.value}",
 9   "rotate.schedule.interval.ms": "300000",
10   "s3.bucket.name": "some-bucket",
11   "s3.part.retries": "10",
12   "s3.part.size": "5242880",
13   "s3.region": "us-west-2",
14   "s3.retry.backoff.ms": "1000",
15   "schema.compatibility": "NEVER_CHANGE_SCHEMAS",
16   "storage.class": "io.confluent.connect.s3.storage.S3Storage",
17   "tasks.max": "64",
18   "timestamp.field": "ingestion_time",
19   "timezone": "UTC",
20   "topics": "some.kafka.topics",
21   "topics.dir": "some/folder"
22}

As you can guess, most of the config options above are the same for other S3 connectors. We really care about name, path.format, topics, topics.dir and tasks.max.

Jsonnet is a simple templating language that extends JSON. It’s a pretty powerful language that supports things like variables, functions, inheritance and much more. So, as a first step to simplify our configuration we could come up with a set of defaults for all S3 connectors, for example (defaults.libsonnet):

json
 1{
 2  "connector.class": "com.activision.ds.connect.s3.DSS3SinkConnector",
 3  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
 4  "timezone": "UTC",
 5  "locale": "en",
 6
 7  "s3.region": "us-west-2",
 8  "s3.part.size": "5242880",
 9  "s3.bucket.name": "some-bucket",
10
11  "s3.part.retries": "10",
12  "s3.retry.backoff.ms": "1000",
13
14  "partitioner.class": "com.activision.ds.connect.s3.partitioner.DSTimeBasedPartitioner",
15  "schema.compatibility": "NEVER_CHANGE_SCHEMAS",
16
17  "rotate.schedule.interval.ms": "600000",
18  "flush.size": "500000"
19}

and then the actual connector config could be simplified to:

json
 1local defaults = import '../defaults.libsonnet';
 2
 3local connector = {
 4  "name": "SomeName-v1",
 5
 6  "tasks.max": "64",
 7
 8  "flush.size": "1000000",
 9  "rotate.schedule.interval.ms": "300000",
10
11  "topics": "some.kafka.topics",
12
13  "topics.dir": "some/folder",
14
15  "format.class": "com.activision.ds.connect.s3.format.DSParquetFormat",
16
17  "path.format": "dt=${ingestion_time}/other_key=${kafka.headers.other.value}",
18  "timestamp.field": "ingestion_time"
19};
20
21defaults + connector

Jsonnet provides endless opportunities to template and optimize these configs even more, however defining defaults and then combining them with specific values with the ability to override seems like a good start.

Deployment Link to this heading

Assuming we keep all connector configs in a folder named connectors, also separated by an environment like connectors/prod, ‌connectors/staging, etc., we could use a simple Python script for calling REST API:

python
 1import argparse
 2import logging
 3import _jsonnet
 4import json
 5import os
 6import requests
 7
 8logging.basicConfig(format='%(asctime)-15s: %(name)s - %(levelname)s: %(message)s')
 9LOGGER = logging.getLogger('connectors-deploy')
10LOGGER.setLevel(logging.INFO)
11
12API_ROOT_TEMPLATE = "https://kafka-connect.%(env)s.company.com"
13
14CONNECTORS_CONFIG_ROOT_TEMPLATE = "./connectors/%(env)s"
15CONNECTOR_EXT = ".jsonnet"
16
17
18def main():
19    args = parse_args()
20
21    LOGGER.info("Starting...")
22
23    raw_config_filenames = find_files(get_connectors_config_root(args))
24
25    LOGGER.info("Found connector configs: %s" % raw_config_filenames)
26
27    processed_configs = process_config_files(raw_config_filenames)
28
29    update_or_create_connectors(processed_configs, args)
30
31    LOGGER.info("Completed")
32
33
34def find_files(path_to_use):
35    config_filenames = []
36
37    for path, dirs, files in os.walk(path_to_use):
38        for file in files:
39            if file.endswith(CONNECTOR_EXT):
40                config_filenames.append(os.path.abspath(path + "/" + file))
41
42    return config_filenames
43
44
45def process_config_files(raw_config_filenames):
46    configs = []
47
48    for filename in raw_config_filenames:
49        configs.append(_jsonnet.evaluate_file(filename))
50
51    return configs
52
53
54def get_api_root(args):
55    return replace_args(API_ROOT_TEMPLATE, args)
56
57
58def get_connectors_config_root(args):
59    return replace_args(CONNECTORS_CONFIG_ROOT_TEMPLATE, args)
60
61
62def replace_args(template, args):
63    return template % {'env': args.env}
64
65
66def update_or_create_connectors(configs, args):
67    api_root = get_api_root(args)
68
69    for config in configs:
70        config_json = json.loads(config)
71
72        LOGGER.info("Adding/updating %s connector" % config_json['name'])
73
74        if args.dry_run:
75            LOGGER.info("Dry run is enabled, just printing config: " + config)
76        else:
77            # Update or Create a connector
78            response = requests.put(api_root + "/connectors/" + config_json['name'] + "/config", data=config,
79                                    headers={"Accept": "application/json", "Content-Type": "application/json"})
80
81            LOGGER.info("Response: %s" % response.status_code)
82
83            response.raise_for_status()
84
85
86def parse_args():
87    parser = argparse.ArgumentParser()
88
89    parser.add_argument('--env', required=True, choices=['dev', 'staging', 'prod'], help='Kafka Connect environment')
90    parser.add_argument('--dry-run', dest='dry_run', default=False, action='store_true', help='Dry-run mode')
91    return parser.parse_args()
92
93
94if __name__ == "__main__":
95    main()

This script reads all files in a specified folder, compiles Jsonnet templates into actual JSON payloads and uses PUT /connectors/{name}/config endpoint to update (or add) connectors in an idempotent way. If a connector with the provided name doesn’t exist, it will be created, otherwise, it’ll be updated.

We could run this script manually or use a CI/CD tool like Jenkins to simplify dependency management and make sure every update is auditable.