This repository contains four example Apache Kafka® Streams applications, which read and process Apache Avro™ messages. They demonstrate use of Generic and Specific Avro Serdes, logging, copying and filtering messages.
All default to reading from a topic called logistics_data_gen.
If you're using an Aiven for Apache Kafka service, then the Sample data
generator for "Logistics" will write appropriate messages to the
logistics_data_gen topic.
All message values use the Confluent Wire
Format,
which means the schema id is inserted before each value. This is sometimes
also referred to as AvroConfluent. This means that the Serdes
(serialize/deserialize) code can look the schema up in a schema registry, and
all the apps take advantage of that.
-
This uses a GenericAvroSerde to read each message value, and logs it. It does not write to an output topic.
-
This uses a GenericAvroSerde to read each message value, and copies that message to a different topic, using the same Serde. It defaults to writing to the topic
logistics_data_copied.
The next two applications assume that the incoming messages match the schema in logistics_gen.avsc, corresponding to the messages generated by the Sample data generator for "Logistics".
-
This uses a GenericAvroSerde to read each message value.
- It ignores messages where the
stateis notDelivered. - It writes messages with values
timeUtc(instead oftime_utc),trackingId(instead oftracking_id),carrierandmanifest.
It writes those altered messages to the output topic using a SpecificAvroSerde and the schema in logistics_delivered.avsc. The output topic defaults to
logistics_data_filtered. - It ignores messages where the
-
This is identical to
GenericFilterApp.javaexcept that it uses a SpecificAvroSerde and the schema in logistics_gen.avsc. to read each message value.
For those applications that read from and write to a topic, both topics must be in the same Kafka application.
The applications are designed to be run in a container - a Dockerfile and
associated run scripts (run.sh and setup_auth) are provided.
By default the container and scripts will run the GenericLogApp, which
should work for any Avro messages. You can choose a different app using the
APP_NAME environment variable, either at the docker build command line,
or by editing the Dockerfile itself.
The project uses Gradle and Groovy for configuration and building.
Download the URL and the certificates for the Kafka service.
Set an environment variable for the Kafka service URL - something like
export KAFKA_BOOTSTRAP_SERVERS=<service uri>or in the Fish shell
set -x KAFKA_BOOTSTRAP_SERVERS <service uri>Do the same for the schema registry URL and password (the program
defaults to the standard schema Karapace username of avnadmin, so we don't
need to specify that).
export SCHEMA_REGISTRY_URL=<schema registry url>export SCHEMA_REGISTRY_PASSWORD=<schema registry password>(the Fish shell equivalents are left as an exercise for Fish shell users:).)
Set an environment variable to the content of each certificate file.
Typically,
- Download the certificate files for the Kafka service (
ca.pem,service. certandservice.key). For an Aiven for Kafka service you can do this from the Connection information in the service Overview.- Put the files into a directory called
certsand use one of the convenience shell scripts to read the content of those files and set the environment variables:or for Fishsource prep_cert_env.shsource prep_cert_env.fish
-
To build a container image for the default
GenericLogApp:docker build -t appimage . -
To build a container image for a specific app (for instance,
GenericFilterApp):docker build --build-arg APP_NAME=GenericFilterApp -t appimage .
Run the container image:
docker run -d --name kafka-streams-container -p 3000:3000 \
-e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
-e KAFKA_CA_CERT="$KAFKA_CA_CERT" \
-e KAFKA_ACCESS_CERT="$KAFKA_ACCESS_CERT" \
-e KAFKA_ACCESS_KEY="$KAFKA_ACCESS_KEY" \
-e SCHEMA_REGISTRY_URL=$SCHEMA_REGISTRY_URL \
-e SCHEMA_REGISTRY_USERNAME=$SCHEMA_REGISTRY_USERNAME \
-e SCHEMA_REGISTRY_PASSWORD=$SCHEMA_REGISTRY_PASSWORD \
-e EXACTLY_ONCE=false \
appimageWe don't actually use the port for anything at the moment.
Several of those environment variable arguments have defaults, so you can leave them off if you're happy with the default:
SCHEMA_REGISTRY_USERNAME-run.shdefaults this toavnadminINPUT_TOPIC- the program has a defaultOUTPUT_TOPIC- the program has a defaultEXACTLY_ONCE-run.shdefaults this tofalse.
All variants of the Java app take the following arguments (of course
OUTPUT_TOPIC is not used by the Log app). Common code to handle these is in
Config.java. The names chosen
match the environment variables used by the container file and run.sh.
-DKAFKA_BOOTSTRAP_SERVERS- the URL for the Kafka service.-DKAFKA_CA_CERT- the contents of theca.pemfile-DKAFKA_ACCESS_CERT- the contents of theservice.certfile-DKAFKA_ACCESS_KEY- the contents of theservice.keyfile-DSCHEMA_REGISTRY_URL- the URL for the schema registry.-DSCHEMA_REGISTRY_USERNAME- the user name for accessing the schema registry. This defaults toavnadmin, which is the default user name for Karapace.-DSCHEMA_REGISTRY_PASSWORD- the password for accessing the schema registry-DINPUT_TOPIC- the input topic name. This defaults tologistics_data_gen, which is the name of the topic written to by the Logistics data stream creator.-DOUTPUT_TOPIC- the output topic name. This defaults tologistics_data_deliveredfor the two filter programs, andlogistics_data_copiedfor the copy program.-DEXACTLY_ONCE- request exactly once semantics. A value oftruerequests exactly once semantics, a value offalse, an empty string or the absence of this property does not. The value is case insensitive. Any other value is an error.
This is a two stage container file.
The APP_NAME variable determines which app is being built and run. It
defaults to GenericLogApp (which will work for any input topic).
The first stage builds a fat (uber) JAR for the program. This minimises the size of the executable to be passed to the second stage.
It uses jdeps and jlink to work out the dependencies that are not in the
JAR file, and extract a minimum JRE from the larger JRE in provided by the
operating system used in that first stage.
The second stage then downloads rocksdb (used by Kafka Streams).
It then copies over the minimal JRE prepared in the first stage, and the fat
JAR itself, as well as the run.sh and setup_auth.sh files, and
finally runs the run.sh script.
The run.sh file expects the following environment variables as input
you'll recognise all but APP_NAME from the instructions on running the
container and the Java app itself):
KAFKA_BOOTSTRAP_SERVERS- the URL of the Kafka service we're usingKAFKA_CA_CERT- the contents of theca.pemfileKAFKA_ACCESS_CERT- the contents of theservice.certfileKAFKA_ACCESS_KEY- the contents of theservice.keyfileSCHEMA_REGISTRY_URL- the URL for the schema registrySCHEMA_REGISTRY_USERNAME- the user name for accessing the schema registry. This is optional and if it is not given, a value ofavnadminwill be assumedSCHEMA_REGISTRY_PASSWORD- the password for accessing the schema registryINPUT_TOPIC- the input topic name. This is optional as the Java app has a sensible default.logistics_data_gen.OUTPUT_TOPIC- the output topic name. This is optional as the Java app has a sensible default.EXACTLY_ONCE- whether exactly once semantics is wanted. This is optional and if it is not given, defaults tofalse. Requesttrueif you want exactly once semantics.APP_NAME- the name of the application to run. This is optional and defaults toGenericLogApp.
It sources the setup_auth.sh script which makes sure that the
KAFKA_CA_CERT, KAFKA_ACCESS_CERT and KAFKA_ACCESS_KEY
environment variables contain data that is correctly split into lines.
Finally the run.sh script runs the fat Java JAR with the necessary
arguments.
We use a fat (uber) JAR in the container, so that all of the programs non-standard dependencies (the ones not provided by the JRE) are frozen into the final executable.
You can build that fat JAR file with
gradle ${APP_NAME}UberJarwhere $APP_NAME is one of GenericLogApp, GenericCopyApp,
GenericFilterApp or SpecificFilterApp -- for instance:
gradle SpecificFilterAppUberJar(See app/build.gradle for the definition of the UberJar tasks.)
If you want to run the app using the provided run.sh script, then you'll
also need to copy the result to the top-level directory
cp app/build/libs/$APP_NAME-uber.jar .With a normal Kafka Streams application, it is possible that a message might be processed once, more than once, or never at all (networks are unreliable, services can crash, and so on).
Exactly once semantics (EOS) in Kafka Streams guarantees that each message will be processed once, no more and no less. It's been available since 2017.
There is quite a lot of good documentation about exactly once semantics for Kafka Streams - the following is by no means an exhaustive list.
See Confluent's Exactly-Once Semantics Are Possible: Here’s How Kafka Does It (2017/2025) for a good introduction to how this works.
The Kafka Streams Core Concepts document (this link is for Apache Kafka 4.1.x which is current at time of writing) is also pretty good. It talks about exactly once in the Processing Guarantees section.
Zeinab Dashti's Developer Guide to Achieve Transactional Processing in Kafka Streams (2025) is good at the pragmatics, and notes that:
When
processing.guarantee=exactly_once_v2is set, Kafka Streams automatically enforces the required producer and consumer configurations:enable.idempotence=true (on the Kafka producer) isolation.level=read_committed (on the Kafka consumer)You don’t need to set these manually — doing so isn’t harmful if you match the required values, but Kafka Streams will log a warning or ignore conflicting settings.
and
Any external consumer reading from Kafka output topics that are written transactionally must configure:
isolation.level=read_committedKafka Streams enforces this by default within its topology, but it must be set explicitly for standalone consumers (e.g., Kafka Connect, other microservices). Without it, consumers could read uncommitted or aborted records, which may result in data duplication or inconsistency.
There are minimal unit tests for the four applications.
Run them with, for instance:
gradle clean cleanTest testIt's possible to run the run.sh script locally, and indeed this is useful
for testing. It's important to remember to
- Copy the built app into the same directory as the
run.shscript - Set the required various environment variables first - these are also
documented at the top of the
run.shfile.
For instance
./run.shor
APP_NAME=GenericFilterApp ./run.shor
APP_NAME=GenericCopyApp OUTPUT_TOPIC=logistics_copy ./run.shIn the reporting directory there is a command line program
report_messages.py which reads messages from both the input and output topics
and shows them using a text UI.
If all the environment variables discussed before are set up, then you can run it with
reporting/report_messages.pyGet help on what it does with
reporting/report_messages.py -hIn that same directory there is an experimental wrapper (serve.py) which
allows it to be run as a web app in a Docker container.
For instance:
cd reportingdocker build -t report_image .docker run -d --name report-messages-container -p 3000:3000 \
-e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
-e KAFKA_CA_CERT=$KAFKA_CA_CERT \
-e KAFKA_ACCESS_CERT=$KAFKA_ACCESS_CERT \
-e KAFKA_ACCESS_KEY=$KAFKA_ACCESS_KEY \
-e SCHEMA_REGISTRY_URL=$SCHEMA_REGISTRY_URL \
report_imageIt deliberately uses the same environment variables as are needed to run the actual application.
Note It assumes that the
$SCHEMA_REGISTRY_URLincludes the username and password in the URL.
Note For trying out this Kafka Streams app, a free Aiven for Kafka service will work just fine. The instructions below show how to use that, as well as how to use a paid service if that's more suitable.
It's possible to do everything in this section using the Aiven web
console, but for documentation purposes here I
shall use the avn command line tool.
Since avn is a Python tool, make sure you're in a virtual environment and
download it:
python -m venv venv
source venv/bin/activate # If you're using fish, activate.fish
pip install aiven-clientRetrieve an Aiven session token (see the documentation) and login, using the email address you logged in to the console with, and pasting the token when prompted:
avn user login <your-email> --tokenFor convenience, set the project to your current project - this means you don't have to specify it on every command:
avn project switch <project-name>Set an environment variable for the service name - perhaps something like "kafka-streams-example"
export KAFKA_SERVICE_NAME=<service name>or for Fish shell
set -x KAFKA_SERVICE_NAME <service name>Create the Aiven for Kafka service. We'll show how to create a free or paid service. There are notes about each command after the command.
-
For trying out this app, a free Aiven for Kafka service will work just fine. Create the service using the following command:
avn service create $KAFKA_SERVICE_NAME \ --service-type kafka \ --cloud do-ams \ --plan free-0 \ -c schema_registry=true \ -c kafka.auto_create_topics_enable=trueNotes
- The details of how the free cloud and plan are specified at the command line may change. This is one case where it's actually simpler to do this in the Aiven web console, as there you just choose the free Kafka tier and then what part of the world you want.
-c schema_registry=truesays we want to enable the Karapace schema registry. This is also free, and we need it to handle Avro messages.-c kafka.auto_create_topics_enable=truesays we want producers to be able to create topics. You don't want this in production, but it's often a good idea in development, and it means the output topics will get created as we need them.
-
If you prefer (or if you're already using your free Aiven for Kafka service for something else and don't want to add new topics to it), you can instead create a paid service. For that, use a command like the following:
avn service create $KAFKA_SERVICE_NAME \ --service-type kafka \ --cloud aws-eu-west-1 \ --plan startup-4 \ --no-project-vpc \ -c schema_registry=true \ -c kafka.auto_create_topics_enable=trueNotes
- Choose a cloud and plan that match your needs. There's no need to go
for anything above the minimum plan (
startup-4in this case). - In the case of this cloud and region, I knew there was a VPC (virtual private cloud) available to my organization, so I needed to tell the command I did not want to use it. It doesn't hurt to specify th
- The last two switches are the same as in the free example above.
- Choose a cloud and plan that match your needs. There's no need to go
for anything above the minimum plan (
While that's running, get the service URL for the new service
export KAFKA_BOOTSTRAP_SERVERS=$(avn service get $KAFKA_SERVICE_NAME --format '{service_uri}')or for Fish shell
set -x KAFKA_BOOTSTRAP_SERVERS (avn service get $KAFKA_SERVICE_NAME --format '{service_uri}')Get the schema registry (Karapace) URL
export SCHEMA_REGISTRY_URL=$(avn service get $KAFKA_SERVICE_NAME --json | jq -r '.connection_info.schema_registry_uri')or for Fish shell
set -x SCHEMA_REGISTRY_URL (avn service get $KAFKA_SERVICE_NAME --json | jq -r '.connection_info.schema_registry_uri')Get the schema registry password
export SCHEMA_REGISTRY_PASSWORD=$(avn service get $KAFKA_SERVICE_NAME --json | jq -r '.users[0].password')or for Fish shell
set -x SCHEMA_REGISTRY_PASSWORD (avn service get $KAFKA_SERVICE_NAME --json | jq -r '.users[0].password')We assume the default username for the schema registry, so don't need to look that up, but if you do need it then you can get it with
export SCHEMA_REGISTRY_USERNAME=$(avn service get $KAFKA_SERVICE_NAME --json | jq -r '.users[0].username')or for Fish shell
set -x SCHEMA_REGISTRY_USERNAME (avn service get $KAFKA_SERVICE_NAME --json | jq -r '.users[0].username')Wait for it to reach Running state
avn service wait $KAFKA_SERVICE_NAMEOnce the Kafka service is running, you can create the two topics if you're not using the standard names, but
- The Logistics sample data stream generator will create
logistics_data_gentopic for you. - The
-c kafka.auto_create_topics_enable=truespecified when creating the service means thelogistics_data_deliveredtopic will get created when the program tries to write to it.
avn service topic-create \
--partitions 1 \
--replication 2 \
$KAFKA_SERVICE_NAME logistics_data_genavn service topic-create \
--partitions 1 \
--replication 2 \
$KAFKA_SERVICE_NAME logistics_data_deliveredDownload the certification files (it will create the directory if necessary)
avn service user-creds-download $KAFKA_SERVICE_NAME --username avnadmin -d certsls certsshould report
ca.pem service.cert service.key
Set the environment variables for the certificate file contents
source prep_cert_env.shor for Fish shell
source prep_cert_env.fishAnd now you're ready to run the program, either via ./run.sh
or via Docker.