CDC with Postgres Debezium to Kafka Strimzi

Postgres Database — Kafka Connect — Kafka

A little intro to Strimzi:

Strimzi is an open-source project that provides container images and operators for running Apache Kafka on Kubernetes and OpenShift. You can find more information on strimzi.io

A little intro to Debezium:

Debezium’s PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. PostgreSQL versions 9.6, 10, 11, and 12 are supported.

A logical decoding output plug-in

You might need to install the output plug-in that you choose to use. You must configure a replication slot that uses your chosen output plug-in before running the PostgreSQL server. The plug-in can be one of the following:

  • decoderbufs: is based on Protobuf and maintained by the Debezium community.
  • wal2json: is based on JSON and maintained by the wal2json community.
  • pgoutput: is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community and used by PostgreSQL itself for logical replication. This plug-in is always present so no additional libraries are needed to be installed. The Debezium connector interprets the raw replication event stream directly into change events.

Java code (the actual Kafka Connect connector)

That reads the changes produced by the chosen logical decoding output plug-in. It uses PostgreSQL’s streaming replication protocol, by means of the PostgreSQL JDBC driver.

Starting up Minikube

Run your MiniKube from the following command to add more resources:

minikube config set memory 4096
minikube config set cpus 4
minikube start

Create a Postgres database

create a separate namespace:

Kubectl create namespace postgres
Username: data_engineerPassword: password
kubectl get pods -n postgres

Strimzi for Kafka

first, create a separate namespace for your Kafka cluster:

Kubectl create namespace kafka
curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.16.1/strimzi-cluster-operator-0.16.1.yaml | sed ‘s/namespace: .*/namespace: kafka/’ | kubectl apply -f — -n kafka
kubectl -n kafka apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/0.16.1/examples/kafka/kafka-persistent-single.yaml && kubectl wait kafka/my-cluster — for=condition=Ready — timeout=300s -n kafka

Create the KafkaConnect

you can create a Kafka connect instance by running following command:

Create Postgres Connector

once the Kafka Connect started you can run the following CURL command to create a connector to your Postgres database.

kubectl -n kafka port-forward svc/postgres-connect-cluster-connect-api 8083:8083

How To Test:

you can access your Postgres by running the following line in a separate terminal.

kubectl -n postgres port-forward svc/postgres 5432:5432
kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t --   bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic server1.inventory.customers

References:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store