Debezium and Postgres: Simplifying Data Flow between Spring Boot Applications with CDC

Batuhan Gökçe
9 min readApr 23, 2023

--

When building apps in microservices architecture, you’ve probably felt the need to propagate a change in one microservices database to another database at least once. In this blog post, we’re going to explore CDC using Debezium in a Spring Boot-based microservices application.

What is CDC (Change-Data-Capture) ?

As the name suggests, it is a technique to capture data changes. We can identify and track changes in a database using CDC techniques and deliver those changes to another system/service in real time. There are some different ways to implement CDC. I will mention some popular ways:

  1. Trigger-based CDC: We can define database triggers to capture changes on tables using SQL-syntax. This is a simple way to implement CDC, however it highly reduces the performance of the database server as it requires multiple writes to database.
  2. Timestamp-based CDC: By using LAST_UPDATED_TIME columns in a table, we can track when a change is made in the table. This approach is also not inefficient since it requires scanning the entire table to identify changes.
  3. Log-based CDC: The most efficient way of CDC is to use a feature provided by enterprise databases which is transaction logs. Databases store all events in a transaction log in order to recover the database when a crash happens. A log-based CDC solution identifies changes using this log, and deliver those changes to a downstream system later. One log-based CDC solution is Debezium which is our next topic.

Debezium

Debezium is an open-source platform that implements log-based change data capture technique. Basically, it monitors transaction log of the source database and whenever a change has been made, an event has been published to a Kafka topic. Debezium acts as a bridge between the source database and the destination system by leveraging Kafka’s messaging capabilities. It also allows us to configure which tables to monitor, which schemas to include or exclude, and which Kafka topics to publish events to.

Use Cases

Here are some popular use cases where CDC and Debezium can be applied:

  • Command Query Responsibility Segregation (CQRS): CDC pattern can be used to keep the query data model in sync.
  • Outbox Pattern: Debezium can be used to produce event to Kafka from outbox tables.
  • Data replication: Data can be replicated between microservices that use separate databases.
  • Audit logging: Debezium can be used to maintain an audit log and track changes to database tables.
  • Cache invalidation: A change in a database table can trigger a change in the cache.

Problem

Now that we’ve covered the concepts, let’s move on to an experiment. Suppose we have two microservices: product service and stock service, each with its own separate database. The product service handles general operations on products, while the stock service has only one responsibility: storing the quantity of products. To better understand the CDC pattern, I’ve kept the services’ structure simple.

Here’s the scenario: when a customer buys a product, the product service needs to reduce the quantity of that product by one, and we also need to notify the stock service to ensure that its data remains consistent.

Architecture for the problem where CDC is applicable

In this situation, using the CDC pattern could be advantageous.

Solution

Before diving into the coding part, let’s see the big picture of the complete solution diagram first, and understand the data pipeline.

Solution architecture

Here is how the pipeline works:

  1. The Product service makes a change in the Product table. The database records the change in its transaction log.
  2. The Debezium connector, which is monitoring the database’s transaction log, detects the change and produces a change event into the Kafka topic.
  3. The Stock service, which is a subscriber of the topic that Debezium produces to, consumes the event and updates its own data accordingly.

Now, it’s time to implement this solution in our local environment. All source code and required docker compose files are available in my Github repository.

We will use Docker to run containers of Kafka, Kafka Connect, Postgres etc. Also, we need JDK17 and Maven to run the microservices. Finally, we may need Postman to make requests. Collection of sample requests is available in the repository.

Upon cloning the repository, the initial task is to execute the containers. Docker compose can be utilized for this purpose.

docker compose up -d

We now have a Kafka environment consisting of Zookeeper, Kafka broker, and connect containers, as well as a Postgres container running on port 15432. Additionally, a Kafka UI tool is running on port 8088, and a database called ‘cdc’ has been initialized with the necessary schemas and users using a custom script. To confirm that our environment has been properly set up, let’s use the Kafka UI tool.

Kafka initial topics before registering a connector

Upon establishing a connection to our Kafka instance at ‘kafka:9093’, we can observe the creation of three topics, as well as a consumer offsets topic. These three topics were specifically designated for Debezium’s internal use. We defined the configuration for these topics in the Docker compose file.

After ensuring that the required containers are running, we can proceed to start the product service and order service in our local environment. These are basic Spring Boot applications that carry out specific tasks. The product service creates a ‘Product’ table in its designated schema, whereas the order service creates a ‘Stock’ table in its designated schema. Furthermore, the order service is appropriately configured to consume change events from the Kafka topic, allowing it to react to updates in real-time.

All apps started with no integration yet

We have everything set up, but Debezium isn’t doing anything right now — it’s neither monitoring Postgres nor producing events in Kafka. We have to register connectors in order to connect those separate components using Debezium.

Kafka Connect provides a Rest API for registering/querying/deleting connectors. We will make an HTTP request to register the connector. All required requests are available in the postman collection. Let’s make a POST request to Kafka Connect container.

Sample request for registering a connector

We made a POST request to localhost:8083/connectors with a JSON body that defines configuration for the connector. Let’s mention about the important configurations. Detailed explanation about them can be found here which is the official documentation of Debezium.

#2: Defines name of the connector

#4: The name of the connector class. The source code for this class can be examined here.

#5–9: Properties of the database that will be monitored. Here, I used postgres user, but it is recommended to use a user with less privileges. Required privileges can be found here.

#13: Schemas and table names to monitor. According to current configuration, table ‘product’ on schema ‘product_schema’ in database ‘cdc’ will be monitored by Debezium. Changes only in this table will be identified.

#14: Column names that will be included in the change events.

We can also verify that our connector has been registered successfully by querying all connectors by making an HTTP request.

Sample request for querying connectors

So far we’ve run containers, started applications and configured Debezium to monitor postgres. Now, when we check topic list on Kafka, we see that new topic has been created for change events.

Kafka topic list after registering the connector

‘product.product_schema.product’ topic has been created. By default, Debezium determines the topic name using this pattern; <prefix>.<schema_name>.<table_name>. We can change the name of topic if we want. For more details, click the link provided.

Now, let’s create a new product by making a POST request to product service. For the sake of simplicity, fields of new products are determined in the app. Just a simple POST request with no body is enough.

~ curl -X POST localhost:8090/api/product
{"id":2,"name":"Printer","quantity":349}

This request must create a new product row in the product table. We also expect that product to appear in the stock table. Let’s check the tables on database.

Synced product and stock databases via Debezium

After inserting a new row into the product table by the product service, Debezium connector detects the change and generates a change event that is published to the Kafka topic. The stock service, subscribing to the topic, consumes the event and inserts a corresponding stock row into its own table, achieving synchronization of data between two separate microservices.

Structure of Change Event

Let’s examine the message that has been produced after insertion of the product.

{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": true,
"field": "quantity_left"
}
],
"optional": true,
"name": "product.product_schema.product.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": true,
"field": "quantity_left"
}
],
"optional": true,
"name": "product.product_schema.product.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "product.product_schema.product.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 2,
"quantity_left": 349
},
"source": {
"version": "2.1.2.Final",
"connector": "postgresql",
"name": "product",
"ts_ms": 1682193312424,
"snapshot": "false",
"db": "cdc",
"sequence": "[\"24301848\",\"24302240\"]",
"schema": "product_schema",
"table": "product",
"txId": 753,
"lsn": 24302240,
"xmin": null
},
"op": "c",
"ts_ms": 1682193312514,
"transaction": null
}
}

Event may seem long, but it actually comprises two main parts: schema and payload. The schema includes information about the fields in the payload such as table schema or source schema. The payload contains details about the change including the type, source, before and after values. In this example, we used ‘before’, ‘after’ and ‘op’ fields in the payload.

{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 2,
"quantity_left": 349
},
"op": "c" //c,u,d,r
}
}

Since we inserted a new row, the “before” field is null and the “op” field contains the value ‘c’, which indicates a creation. This field can also contain ‘u’ for update, ‘d’ for delete, or ‘r’ for read.

Let’s decrement quantity of the product, and check its change event as well.

~ curl -X POST localhost:8090/api/product/decrement-quantity/2

Quantity value in the database has been reduced by one. And this event has been produced to the topic:

{
"schema": {...},
"payload": {
"before": {
"id": 2,
"quantity_left": 349
},
"after": {
"id": 2,
"quantity_left": 348
},
"op": "u"
}
}

Conclusion

Implementing CDC with Debezium is a clear solution to ensure data consistency between microservices that use seperate databases. By leveraging transaction log of a DBMS and messaging capabilities of Kafka, Debezium makes it easy to identify changes in a source data and transfer those changes to other destination systems.

In this blog post, we have covered the implementation of CDC using Debezium and Postgres in a Spring-based application environment. While we used Spring Boot for the services and Postgres for the database, the tools and technologies are not limited to these choices. Debezium can work with other databases too, and the language/framework of the services is not a concern. Debezium can be used with various databases and in any language or framework.

References

--

--

No responses yet