Connect

SQL Server CDC Patterns

The microsoft_sql_server_cdc input captures row-level changes from SQL Server tables using SQL Server’s built-in CDC change tables. Use these patterns to filter, transform, and route SQL Server CDC events to Redpanda, S3, and other destinations.

Use this cookbook to:

  • Apply reusable patterns for capturing SQL Server CDC events

  • Adapt integration patterns to route CDC data to Redpanda, S3, and other destinations

  • Identify patterns for filtering and transforming change events

The microsoft_sql_server_cdc input is in beta. The API is subject to change.

Prerequisites

Before using these patterns, configure the following.

Redpanda CLI

Install the Redpanda CLI (rpk) to run Redpanda Connect. See rpk installation for installation instructions.

SQL Server CDC

CDC must be enabled on both the database and each table you want to capture. Run the following as a user with db_owner or sysadmin privileges:

-- Enable CDC on the database
EXEC sys.sp_cdc_enable_db;

-- Enable CDC on a table
EXEC sys.sp_cdc_enable_table
  @source_schema = N'dbo',
  @source_name = N'orders',
  @role_name = NULL;

For cloud-managed SQL Server, see:

Checkpoint storage

By default, the microsoft_sql_server_cdc input creates a table (rpcn.CdcCheckpointCache) and stored procedure in your SQL Server database to store the last processed Log Sequence Number (LSN). The pipeline user must have CREATE TABLE and CREATE PROCEDURE permissions.

To use an external cache instead, set the checkpoint_cache field to a cache label defined in cache_resources. See the connector reference for details.

Environment variables

The examples in this cookbook use environment variables for configuration:

export MSSQL_CONN="sqlserver://user:password@localhost:1433?database=mydb" (1)
export REDPANDA_BROKERS=localhost:9092 (2)
export S3_BUCKET=cdc-archive (3)
1 The SQL Server connection string in sqlserver://user:password@host:port?database=dbname format.
2 The Redpanda broker addresses (for Redpanda output examples).
3 The Amazon S3 bucket name (for S3 output examples).

Capture CDC events

The simplest pattern captures all change events from SQL Server tables and outputs them with metadata:

input:
  microsoft_sql_server_cdc:
    connection_string: ${MSSQL_CONN}
    include:
      - dbo.orders
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        root.operation = meta("operation")
        root.schema = meta("database_schema")
        root.table = meta("table")
        root.lsn = meta("lsn")
        root.data = this
        root.timestamp = now()

output:
  stdout:
    codec: lines

For details on the CDC event message structure and available metadata fields, see the metadata section in the connector reference.

SQL Server CDC generates two events for each row update: update_before (the previous row values) and update_after (the new row values). Most downstream use cases only need the update_after event.

Filter CDC events

Filter events to process only the current row state after each change, using the operation metadata field:

input:
  microsoft_sql_server_cdc:
    connection_string: ${MSSQL_CONN}
    include:
      - dbo.orders
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        root = if meta("operation") != "insert" && meta("operation") != "update_after" {
          deleted()
        }
    - mapping: |
        root.operation = meta("operation")
        root.table = meta("table")
        root.data = this
        root.timestamp = now()

output:
  stdout:
    codec: lines

This pattern:

  • Keeps only insert and update_after operations, dropping delete, update_before, and read events

  • Transforms the event to a simplified format with a timestamp

Route to Redpanda

Stream SQL Server changes to Redpanda for real-time processing:

input:
  microsoft_sql_server_cdc:
    connection_string: ${MSSQL_CONN}
    include:
      - dbo.orders
      - dbo.customers
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        meta topic = meta("database_schema") + "." + meta("table")

output:
  redpanda:
    seed_brokers:
      - ${REDPANDA_BROKERS}
    topic: ${! meta("topic") }
    key: ${! json("id") }
    batching:
      count: 100
      period: 1s

This pattern:

  • Uses database_schema.table as the Redpanda topic name

  • Batches messages for efficient delivery

  • Sets the message key to the primary key field (update json("id") to match your table’s primary key column)

Route to S3

Archive CDC events to S3 for long-term storage and analytics:

input:
  microsoft_sql_server_cdc:
    connection_string: ${MSSQL_CONN}
    include:
      - dbo.orders
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        root.operation = meta("operation")
        root.schema = meta("database_schema")
        root.table = meta("table")
        root.data = this
        root.timestamp = now()

output:
  aws_s3:
    bucket: ${S3_BUCKET}
    path: >-
      cdc/${! meta("database_schema") }/${! meta("table") }/${! timestamp_unix().format_timestamp("2006/01/02/15") }/${! uuid_v4() }.ndjson
    batching:
      count: 1000
      period: 5m
      processors:
        - archive:
            format: lines

This pattern:

  • Organizes files by database schema, table, and time-based partitions (year/month/day/hour)

  • Batches events and archives them as newline-delimited JSON

  • Uses UUID file names to prevent collisions

Route by event type

Route different event types to different destinations:

input:
  microsoft_sql_server_cdc:
    connection_string: ${MSSQL_CONN}
    include:
      - dbo.orders
    stream_snapshot: true

output:
  switch:
    cases:
      - check: meta("operation") == "insert"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: sqlserver.orders.inserts
      - output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: sqlserver.orders.changes

This pattern:

  • Routes insert events to sqlserver.orders.inserts, and update_before, update_after, and delete events to sqlserver.orders.changes

  • Supports specialized downstream consumers per operation type

Configure replication mode

The microsoft_sql_server_cdc input supports two replication modes controlled by the stream_snapshot field:

  • stream_snapshot: true: Captures a full snapshot of existing table data before streaming live changes. Use this when you need a complete initial load.

  • stream_snapshot: false: Skips the snapshot and streams only changes from the current LSN position. Use this when you only need new changes going forward.

input:
  microsoft_sql_server_cdc:
    connection_string: ${MSSQL_CONN}
    include:
      - dbo.orders
      - dbo.customers
    stream_snapshot: true
    max_parallel_snapshot_tables: 4 (1)
    snapshot_max_batch_size: 1000 (2)
1 Number of tables to snapshot in parallel.
2 Number of rows to read per batch during snapshot processing.

Filter tables with patterns

The include field accepts regular expressions, so you can capture multiple tables without listing each one:

input:
  microsoft_sql_server_cdc:
    connection_string: ${MSSQL_CONN}
    include:
      - dbo.* (1)
    exclude:
      - dbo.audit_log (2)
    stream_snapshot: false
1 Captures all tables in the dbo schema.
2 Excludes a specific table matched by the include pattern.

Troubleshoot common issues

Use these steps to diagnose and fix the most common problems with the microsoft_sql_server_cdc input.

No events received

If no events arrive:

  1. Verify CDC is enabled on the database and table:

    SELECT name, is_cdc_enabled FROM sys.databases WHERE name = 'mydb';
    SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name = 'orders';
  2. Confirm CDC agent jobs are running:

    EXEC sys.sp_cdc_help_jobs;
  3. Check that the table pattern in include matches the target table, using schema.table format.

Checkpoint table creation fails

If the pipeline fails to start because it cannot create the checkpoint table:

  • Grant CREATE TABLE and CREATE PROCEDURE permissions to the pipeline user, or

  • Create the schema manually (CREATE SCHEMA rpcn) and grant the user permissions on it, or

  • Use an external checkpoint_cache to avoid creating a table in SQL Server.

Duplicate events

The microsoft_sql_server_cdc input provides at-least-once delivery. If the pipeline fails between checkpoints, events may be re-read on restart. To handle duplicates:

  • Use idempotent processing in downstream systems.

  • Deduplicate using the lsn metadata field.

  • Lower checkpoint_limit to reduce the window of possible duplicates.