This is the second part of the “Customizing Flink” series. In the first part, I explained why you may want to customize Flink and how to approach creating and maintaining a fork. It’s not easy. It requires setting up an additional repo or two, a branching strategy, a CI/CD process, etc.

But there is an easier approach that I want to demonstrate in this post. A word of caution: this technique is not endorsed and probably can be considered an anti-pattern.

Class Shadowing Link to this heading

Class shadowing is not specific to Flink. In Java, a classloader, when given a class name, starts to look for that class using a configured classpath (basically a collection of JAR files). It stops looking the moment it finds the first match.

This could lead to major issues when the same class is located in multiple dependencies, e.g. with different versions. And typically you either eliminate all versions of a dependency except one or apply shading - a technique for renaming the package of a given class.

So, in our case, we actually want to use class shadowing to be able to override any class from Flink’s codebase.

Avoiding Dynamic Classloading Link to this heading

One prerequisite to using class shadowing for Flink is avoiding dynamic classloading. Flink has a few different classloaders and we want it to use the system classloader to load our files.

In practice, it means putting the JAR with our code in the /opt/flink/lib folder.

Configuring Your Build Tools Link to this heading

Some build tools won’t be happy that you have two classes with the same name but different content. For example, in the case of SBT and sbt-assembly plugin, you can see something like this:

[error] deduplicate: different file contents found in the following:
[error] org/apache/flink/runtime/highavailability/FileSystemJobResultStore.class
[error] /Users/sap1ens/.m2/repository/org/apache/flink/flink-runtime/1.17.1/flink-runtime-1.17.1.jar:org/apache/flink/runtime/highavailability/FileSystemJobResultStore.class

You need to ensure that the build tool chooses your version and ignores the Flink one (can be done using MergeStrategy.first in this specific case).

The Secret Ingredient Link to this heading

And here is one more detail to make class shadowing work: when you configure your build process to avoid dynamic classloading, name your custom JAR file something like a-.... In fact, you can probably start with a few other letters, as long as they happen to be a letter before f. Why? Here’s a typical /opt/flink/lib content:

flink-cep-1.17.1.jar
flink-connector-files-1.17.1.jar
flink-csv-1.17.1.jar
flink-dist-1.17.1.jar
flink-json-1.17.1.jar
flink-scala_2.12-1.17.1.jar
flink-table-api-java-uber-1.17.1.jar
flink-table-planner-loader-1.17.1.jar
flink-table-runtime-1.17.1.jar
log4j-1.2-api-2.17.1.jar
log4j-api-2.17.1.jar
log4j-core-2.17.1.jar
log4j-slf4j-impl-2.17.1.jar

As you see, all standard Flink JAR files follow the flink-... format. For the class shadowing to work, we want our JAR to appear first in the classpath. Starting with any letter before f puts it first, thanks to alphabetical sorting.

Things You Can Override Link to this heading

Well, pretty much anything! As long as you name the class the same (including the full package name), it should work. Want to override that pesky FileSource behaviour that reads files in a weird order? Just create your own FileSplitAssigner that performs sorting and update FileSource to use it as DEFAULT_SPLIT_ASSIGNER. Want to change the way Flink loads configuration? Just tweak GlobalConfiguration and you’re good to go.

Btw, you’re not limited to Flink classes… You can also override classes in any of the Flink dependencies as well! Think about applying Debezium patches when using Flink CDC, etc.

Upgrades Can Be Tough Link to this heading

Of course, any time you decide to upgrade Flink version, you’d need to compare every single file you modified with a new version. Sometimes, files are deleted or refactored. To make the future you life easier, ensure that your changes are minimal and well-documented.

Outro Link to this heading

And that’s it! We can easily override any Flink class by making a few changes to the build process. No fork repo maintenance is required! Of course, choose it wisely and sparingly: it’s easy to start using but hard to stop. Sometimes, choosing to invest in a proper fork setup is worth it.