Change Data Capture and Kafka: Practical Overview of Connectors

Dora Horvat

DATA ENGINEER

Introduction

Change Data Capture (CDC) is the process of extracting changes from a source database system and delivering them to a downstream system or process. CDC enables capturing the state of the database and tracking new changes made to it.

This blog post presents a detailed overview of two Kafka Connect connectors which enable CDC in Kafka. Confluent differentiates between two types of CDC in the context of ingesting data from databases into Kafka: query-based and log-based CDC.

Query-based CDC

With query-based CDC, database tables are periodically queried to find all updated rows, based on a table’s incremental or a timestamp column. The key limitation of batch-oriented CDC is that not all changes made to the database can be captured; for example, if a CDC query is run on a table every 24 hours, any changes made in that time interval will not be captured, only the last one. Query-based CDC can also cause excessive load on the database system, since it is querying tables directly.

Log-based CDC

Instead of querying the database directly, log-based CDC makes use of the database’s binary transaction log, which records all changes made to the database. Since each change is treated as a separate event, changes can’t be missed, including deletion (in contrast to the query-based CDC). Log-based CDC can support (near) real-time data ingestion, for real-time database replication or to feed real-time streaming analytics.

CDC & Kafka – Kafka Connect

Kafka Connect is a part of Apache Kafka that provides a scalable and reliable way to copy data between Kafka and external systems – it can be used to pull data from the external datastore into Kafka or to push data from Kafka to an external store.

For data ingestion into Kafka, query-based CDC can be implemented with the JDBC Kafka Connect Connector, while the log-based CDC can be implemented with the Debezium Connector.

Connectors

Connectors are plugins that can be added to a Kafka Connect runtime. They can be thought of as an interface between external systems and the Connect runtime. A connector itself is composed of one or more JAR files that implement the Connect API.

There are two types of connectors:

  • Sink Connectors – for exporting data from Kafka to external systems
  • Source Connectors – for importing records from external systems into Kafka

A connector typically targets a single system or protocol. Out of the box, Apache Kafka provides only a handful of ready-made source and sink connectors. Connectors for specific external systems can be found on repositories like Confluent Hub or Event Streams connector catalog.

 

Confluent JDBC Source Connector

Confulent’s JDBC Source Connector enables the ingestion of data into Kafka from all relational databases that provide a JDBC driver (MySQL, Postgres, Oracle, SQL Server, DB2). With the JDBC connector, data is loaded periodically by executing a SQL query on the database, providing query-based CDC.

The Confluent JDBC Source Connector is available on the Confluent Hub website.
The full list of configuration properties for the JDBC Source Connector can be found here.

 

Confluent JDBC connector configuration settings example

In this blog post, both the JDBC and Debezium connectors are deployed to a Strimzi Kafka Connect cluster, using the KafkaConnector Strimzi resource.


apiVersion: 'kafka.strimzi.io/v1beta2'
kind: 'KafkaConnector'
metadata:
  name: 'jdbc'
  labels:
    strimzi.io/cluster: kafka-connect
spec:
  class: io.confluent.connect.jdbc.JdbcSourceConnector
  tasksMax: 1
  pause: false
  config:
    connection.url: 'jdbc:postgresql://25.123.134.145:5432/postgres'
    connection.user: '${secrets:my-namespace/my-credentials:username}'
    connection.password: '${secrets:my-namespace/my-credentials:password}'
    connection.attempts: 3
    mode: 'incrementing'
    query: 'SELECT * FROM customer'
    table.types: 'TABLE'
    topic.prefix: 'topic-customers'
    heartbeat.interval.ms: 10000
    session.timeout.ms: 40000
    poll.interval.ms: 5000
    incrementing.column.name: 'id'
    transforms: 'valueToKey,extractField'
    transforms.valueToKey.type: 'org.apache.kafka.connect.transforms.ValueToKey'
    transforms.valueToKey.fields: 'id'
    transforms.extractField.type: 'org.apache.kafka.connect.transforms.ExtractField$Value'
    transforms.extractField.field: 'id'

Table filtering

The connector will try to read every table in the database by default. The following configurations can be used to modify this:

  • catalog.pattern – select tables with a particular catalog of schemas
  • schema.pattern – select tables with a particular schema
  • tables.blacklist – exclude specific tables
  • tables.whitelist – select specific tables
  • query – execute a specific query

Table querying

By default, the connector will only detect tables of type TABLE from the source. The table.types configuration can be set to a comma-separated list of table types to extract from the database. The possible types are:

  • TABLE
  • VIEW
  • SYSTEM TABLE
  • GLOBAL TEMPORARY
  • LOCAL TEMPORARY
  • ALIAS
  • SYNONYM

When querying tables, a maximum number of rows to include in a single batch can be specified with batch.max.rows. This can limit the amount of data that is internally buffered in the connector. The default value for batch.max.rows configuration is 100.

Data collection mode

The JDBC source connector supports two kinds of use cases: loading the entire database into a Kafka topic and providing a stream of events (updates made to a database). The mode configuration setting can be set to the following values:

  • bulk – specifies that the connector will read all selected tables in their entirety and send an event to Kafka for each row
    • every table to which the user has access will be copied into Kafka

To only stream the rows that have changed since it was last polled:

  • incrementing – specifies that the connector will send a new event to the topic every time a higher increment appears in the selected column
    • events will happen only for new rows
    • updates to existing rows or row deletions can’t be detected
    • the incrementing.column.name must be set to an incrementing column in the table
  • timestamp – specifies that the connector will identify new or modified rows by tracking the column containing a modification timestamp that is updated every time the row is updated
    • the connector keeps track of the last timestamp it has seen
    • if a row is inserted/updated with an old timestamp, the update will be missed
    • the timestamp.column.name must be set to a timestamp column which is updated at each write
  • timestamp+incrementing – the most reliable for identifying new and updated rows

The timestamp and/or incrementing ID column that you specify to be used, must be present in all tables handled by the connector. If different tables have timestamp/ID columns of different names, separate connector configurations should be created.

 

Specifying a WHERE clause with query modes

For more flexible ingestion of data, the query configuration can be used.

You can use query with a WHERE clause if you enable mode=bulk. This loads all rows from a table at each iteration.

If you want to use a WHERE clause with modes other than bulk, it must be possible to append the WHERE clause to the query. One way to do this is to write the query within a sub-select that includes the incremental and/or timestamp field in its returned fields, like in this example:


SELECT *
FROM
  (SELECT ID_COL,
          TIMESTAMP_COL,
          COL1,
          COL2
   FROM TABLE_A INNERJOIN TABLE_B ON PK=FK
   WHERE COL1='FOO');

Partitioning and parallelism

When ingesting data from multiple tables, the total time can be reduced by carrying out the work concurrently.

The maximum number of tasks can be configured with the tasks.max configuration. The connector will create a number of tasks equal to either the tasks.max or the number of tables, whichever is smaller (the default number of tasks is 1). If the query configuration property is used, the connector will start only a single task.

For partitioned tables, the connector will create only one task, even if the tasks.max configuration is set to a higher value.

For Postgres partitioned tables, the JDBC connector must be version 10.6.0 or higher.

Message keys

The JDBC connector does not generate the record key by default, so with multiple partitions, the records will be spread across partitions. To send a message key for the JDBC connector, the ValueToKey and ExtractField SMTs (Single Message Transformations) can be added to the JDBC connector configuration:


...
  transforms: 'valueToKey,extractField'
  transforms.valueToKey.type: 'org.apache.kafka.connect.transforms.ValueToKey'
  transforms.valueToKey.fields: 'id'
  transforms.extractField.type: 'org.apache.kafka.connect.transforms.ExtractField$Value'
  transforms.extractField.field: 'id'
...

Table capture

If you are using the timestamp mode, you can specify the timestamp which will be used to track changes. By default, the connector will retrieve all of the data. If you want to start tracking from the current time, you can set timestamp.initial=-1. An arbitrary epoch timestamp can also be used.

 

Debezium PostgreSQL Source connector

Debezium is an open-source distributed platform offering Kafka connectors for log-based CDC. These connectors record all row-level changes for each data table in a database in the form of a change event stream. In this blog post, the PostgreSQL Debezium connector is described. In addition to the PostgreSQL connector, Debezium offers connectors for MySQL, MongoDB, SQL Server, Oracle, IBM Db2, Cassandra, Vites and Spanner.
The Debezium Postgres Connector has two main components:

  • A logical decoding output plugin
    • pgoutput – native logical decoding output plugin in Postgres 10+
    • decoderbufs – maintained by the Debezium community
  • Java code (the actual connector) that reads the changes produced by the logical decoding output plugin

When the Postgres server commits a transaction, a separate server process invokes a callback function from the logical decoding output plugin (e.g. pgoutput) which converts the changes to a JSON or Protobuf format and writes them to an output stream. The Debezium connector acts as a client which can consume this output stream and transform the events into Debezium create, update, or delete events that include the Log Sequence Number (position in the transaction log) of the event. It then sends the transformed events to Kafka Connect, which asynchronously writes the change event records in the same order they were generated to a Kafka topic. Kafka Connect records the LSN from the change events in a separate offset topic.

Postgres purges the transaction log after some period of time, so the connector does not have the complete history of all changes made to the database.

The connector performs a snapshot of each of the database schemas the first time it starts. After the initial snapshot it continues streaming changes from the exact point at which the snapshot was created.

If the connector stops for any reason, after restart it will continue to read the WAL (write-ahead-log) where it last left off.

    In the event of data corruption or Kafka topic deletion, a re-run of the snapshot can be initialized; this type of snapshot is called an ad-hoc snapshot. Ad-hoc snapshots require the use of signaling tables.

     

    PostgreSQL configuration

    WAL (Write-Ahead-Log) configuration

    As mentioned, Debezium uses Postgres’ logical decoding, which uses replication slots. To enable them, first check and configure the wal_level configuration property in the postgresql.conf file:

      
      SHOW wal_level;
      ALTER SYSTEM SET wal_level = logical;
      

        After altering the wal_level, the PostgreSQL server must be restarted.

         

        Publication

        If the pgoutput logical decoding plugin is used, create a publication to store a list of tables which the connector will monitor for changes:

         

        
        CREATE PUBLICATION debezium_publication FOR TABLE ;
        

        If the publication does not exist, it is created at connector start-up and it includes all tables by default. This can be configured with the publication.autocreate.mode to create a publication that contains tables that match the configuration specified by schema.include.list, schema.exclude.list, and table.include.list, and table.exclude.list . The connector user must have superuser permissions to create the publication.

          Replication slot

          After that, create a replication slot that will be used to store information about the current state of the replication:

           

          
          SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');
          

            User permissions

            Debezium connector requires a database user that has permissions for performing replications. Instead of using the superuser, it is recommended to create a Debezium user that has the minimum required privileges – REPLICATION and LOGIN permissions.

            To enable replication, the Postgres database must be configured to permit Debezium connector hosts to replicate with the database host. Make sure the configurations in the pg_hba.conf file allow replication:

             

            
            local   replication                               trust
            host    replication       127.0.0.1/32            trust
            host    replication       ::1/128                 trust
            

              Debezium connector configuration

              Kafka Connect configuration file

              
              apiVersion: kafka.strimzi.io/v1beta2
              kind: KafkaConnect
              metadata:
                name: debezium-connect-cluster
                annotations:
                  strimzi.io/use-connector-resources: "true"
              spec:
                replicas: 1
                bootstrapServers: k-cluster-kafka-bootstrap:9092
                config:
                  group.id: debezium-connect-cluster
                  offset.storage.topic: debezium-cluster-offsets
                  config.storage.topic: debezium-cluster-configs
                  status.storage.topic: debezium-cluster-status
                  config.storage.replication.factor: 1
                  offset.storage.replication.factor: 1
                  status.storage.replication.factor: 1
                build:
                  output:
                    type: docker
                    image: scratch.hub.syntio.xyz/labs/strimzi-testing:latest
                    pushSecret: regcred
                  plugins:
                    - name: debezium-postgres-connector
                      artifacts:
                        - type: tgz
                          url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.1.1.Final/debezium-connector-postgres-2.1.1.Final-plugin.tar.gz
                template:
                  pod:
                    imagePullSecrets:
                      - name: regcred
                logging:
                  type: inline
                  loggers:
                    log4j.rootLogger: "INFO"
                readinessProbe:
                  initialDelaySeconds: 30
                  timeoutSeconds: 5
                livenessProbe:
                  initialDelaySeconds: 30
                  timeoutSeconds: 5
              

                Debezium PostgreSQL Connector configuration file

                
                apiVersion: kafka.strimzi.io/v1beta2
                kind: KafkaConnector
                metadata:
                  name: debezium-connector-postgres
                  labels:
                    strimzi.io/cluster: debezium-connect-cluster
                spec:
                  class: io.debezium.connector.postgresql.PostgresConnector
                  tasksMax: 1
                  config:
                    plugin.name: pgoutput
                    slot.name: debezium_slot
                    publication.name: debezium_publication
                    tasks.max: 1
                    database.hostname: 25.123.134.145
                    database.port: 5432
                    database.user: ${secrets:my-namespace/my-credentials:username}
                    database.password: ${secrets:my-namespace/my-credentials:password}
                    database.dbname: testdb
                    topic.prefix: debezium
                    table.include.list: public.customer
                

                  Properties:

                  • The slot.name specifies the name of the previously created replication slot for Debezium connector.
                  • The publication.name specifies the name of the previously created publication for Debezium connector.
                  • The tasks.max generally defines the maximum number of tasks created for the connector.
                  • The topic.prefix defines the prefix for name of Kafka topic that receives records from the connector (topic name will be of the form topicPrefix.schemaName.tableName
                  • The table.include.list describes the optional list of tables (table identifiers of the form schemaName.tableName) from which the connector will capture changes.
                    • The table.exclude.list describes the optional list of tables (table identifiers of the form schemaName.tableName) from which the connector will not capture changes.
                    • The schema.include.list describes the optional list of schemas (schema identifiers of the form schemaName) for which you want to capture changes.
                    • The schema.exclude.list describes the optional list of schemas (schema identifiers of the form schemaName) for which you do not want to capture changes.
                    • The column.include.list describes the optional list of columns (column identifiers of the form schemaName.tableName.columnName) that should be included in change event record values.
                    • The column.exclude.list describes the optional list of columns (column identifiers of the form schemaName.tableName.columnName) that should not be included in change event record values.

                  The PostgreSQL connector always uses a single task, so the tasks.max property is always set to one.

                  The topic.prefix property value should not be changed, because upon restart the connector would send subsequent events to a new topic.

                  If a property .include.list is set in the configuration, the same .exclude.list property should not be set.

                    Data change events

                    Debezium connector creates a data change event for every insert, update and delete operation committed on a database. Each event contains a key and a value.
                    Because of the continuous stream of events, their structure can change over time, so each event contains a schema for its content (or a schema ID if you are using a schema registry).

                     

                    Change event key

                    The key schema specifies a Kafka Connect schema that describes the structure of the key payload, which is by default the primary key of the changed table.

                     

                    The default key of a data change event is the primary key of the data table that was modified, but it can be configured with the message.key.columns property.

                    For a single-partition topic, all operations against a table will be emitted in order.

                    Key schema and payload

                     

                    • The optional field specifies whether the event key should contain a value in the payload. Value in the payload is optional when a table does not have a primary key.

                    Change event value

                    The event schema specifies a Kafka Connect schema that describes the event’s payload – the structure of the data table row that was modified.

                    Value schema

                     

                      Value payload

                       

                      • The before field specifies the state of the row before the change event has occurred.
                      • The after field specifies the state of the row after the change event has occurred.
                      • The source field describes the source metadata for the change event.
                      • The op field describes the type of database operation that caused the change event. Possible values of this field are:
                        • c – create
                        • u – update
                        • d – delete
                        • r – read (for snapshots)
                        • t – truncate
                        • m – message (passing messages to logical decoding plugins through WAL, usually with the pg_logical_emit_message )
                      • The ts_ms field displays the time (based on the system clock in the JVM running the Kafka Connect task) at which the connector processed the change event.

                      The availability of the before field depends on the REPLICA IDENTITY setting for the data table.

                      In the source object, ts_ms field displays the time at which the change was made in the database. You can determine the lag between the source database update and Debezium connector by comparing the values of payload.source.ts_ms and payload.ts_ms fields.

                        Replica Identity

                        REPLICA IDENTITY is a Postgres table-level setting that specifies the amount of information that is available to the logical decoding plug-in for update and delete events for previous values of the table columns involved in the change. Possible values are:

                        • DEFAULT – emitted events contain the previous values for the primary key columns of a table
                        • NOTHING – emitted events do not contain any information about the previous values of any table column
                        • FULL – emitted events contain the previous values of all table columns
                        • INDEX index-name – emitted events contain the previous values of the columns contained in the specified index

                        Replica identity for a specific table can be altered with the following command:

                          ALTER TABLE  REPLICA IDENTITY ;
                          

                            Query-based vs. log-based CDC (JDBC vs. Debezium connector)

                            Confluent’s JDBC source connector is an example of a query-based CDC – the database is directly queried to capture the changes made to the data.
                            Debezium’s connector is an example of a log-based CDC – it captures changes from the database’s transaction log.

                            Both of these CDC methods have their strengths and weaknesses:

                            For real-time data streaming use cases, log-based CDC should be used.