CDC with Postgres Debezium to Kafka Strimzi

Jay Ehsaniara
4 min readAug 27, 2020

In databases, change data capture (CDC) is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data.

In this example, I’m going to demonstrate the scenario in which you are going to capture data changes from Postgres (Logical Replication enabled) into the Kafka cluster.

Postgres Database — Kafka Connect — Kafka

A little intro to Strimzi:

Strimzi is an open-source project that provides container images and operators for running Apache Kafka on Kubernetes and OpenShift. You can find more information on strimzi.io

A little intro to Debezium:

Debezium’s PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. PostgreSQL versions 9.6, 10, 11, and 12 are supported.

The first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content and that was committed to a PostgreSQL database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.

Based on Debizium documentation:

PostgreSQL’s logical decoding feature is a mechanism that allows the extraction of the changes that were committed to the transaction log and the processing of these changes in a user-friendly manner with the help of an output plug-in. The output plug-in enables clients to consume changes.

The PostgreSQL connector contains two main parts that work together to read and process database changes:

A logical decoding output plug-in

You might need to install the output plug-in that you choose to use. You must configure a replication slot that uses your chosen output plug-in before running the PostgreSQL server. The plug-in can be one of the following:

  • decoderbufs: is based on Protobuf and maintained by the Debezium community.
  • wal2json: is based on JSON and maintained by the wal2json community.
  • pgoutput: is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community and used by PostgreSQL itself for logical replication. This plug-in is always present so no additional libraries are needed to be installed. The Debezium connector interprets the raw replication event stream directly into change events.

Java code (the actual Kafka Connect connector)

That reads the changes produced by the chosen logical decoding output plug-in. It uses PostgreSQL’s streaming replication protocol, by means of the PostgreSQL JDBC driver.

You can find more information about debezium on debezium.io

Let's start now

Starting up Minikube

Run your MiniKube from the following command to add more resources:

minikube config set memory 4096
minikube config set cpus 4
minikube start

once it started you can create your Postgres in a separate namespace. In a real scenario, your Postgres database may be outside of your Kubernetes cluster.

Create a Postgres database

create a separate namespace:

Kubectl create namespace postgres

And then run the following command to create deployment and service:

This Postgres already has some example data and Postgres configuration for our purpose.

Username: data_engineerPassword: password

You can confirm it by running:

kubectl get pods -n postgres

Strimzi for Kafka

first, create a separate namespace for your Kafka cluster:

Kubectl create namespace kafka

Then install the strimzi operator:

curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.16.1/strimzi-cluster-operator-0.16.1.yaml | sed ‘s/namespace: .*/namespace: kafka/’ | kubectl apply -f — -n kafka

And then create a single broker Kafka cluster, waiting until it’s ready:

kubectl -n kafka apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/0.16.1/examples/kafka/kafka-persistent-single.yaml && kubectl wait kafka/my-cluster — for=condition=Ready — timeout=300s -n kafka

Create the KafkaConnect

you can create a Kafka connect instance by running following command:

Create Postgres Connector

once the Kafka Connect started you can run the following CURL command to create a connector to your Postgres database.

But, before that you should be able to access to the Kafka Connect through your local pc, so you need to run the following line:

kubectl -n kafka port-forward svc/postgres-connect-cluster-connect-api 8083:8083

then run the following command:

as you see you have defined your Postgres URL and its credentials along with database whiteList. Keep in mind that you also can specify specific tables too.

How To Test:

you can access your Postgres by running the following line in a separate terminal.

kubectl -n postgres port-forward svc/postgres 5432:5432

and also run the following line in the separate terminal too:

kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t --   bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic server1.inventory.customers

now if you start to modify the record or insert or delete records in the customer table you can see streamed data into the Kafka topics.

References:

--

--