microsoft_sql_server_cdc

Enables Change Data Capture by consuming from Microsoft SQL Server’s change tables.

  • Common

  • Advanced

inputs:
  label: ""
  microsoft_sql_server_cdc:
    connection_string: "" # No default (required)
    stream_snapshot: "" # No default (required)
    snapshot_max_batch_size: 1000
    include: [] # No default (required)
    exclude: [] # No default (optional)
    checkpoint_cache: "" # No default (optional)
    checkpoint_cache_table_name: rpcn.CdcCheckpointCache
    checkpoint_cache_key: microsoft_sql_server_cdc
    checkpoint_limit: 1024
    stream_backoff_interval: 5s
    auto_replay_nacks: true
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
inputs:
  label: ""
  microsoft_sql_server_cdc:
    connection_string: "" # No default (required)
    stream_snapshot: "" # No default (required)
    snapshot_max_batch_size: 1000
    include: [] # No default (required)
    exclude: [] # No default (optional)
    checkpoint_cache: "" # No default (optional)
    checkpoint_cache_table_name: rpcn.CdcCheckpointCache
    checkpoint_cache_key: microsoft_sql_server_cdc
    checkpoint_limit: 1024
    stream_backoff_interval: 5s
    auto_replay_nacks: true
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)

Streams changes from a Microsoft SQL Server database for Change Data Capture (CDC). Additionally, if stream_snapshot is set to true, then the existing data in the database is also streamed too.

Metadata

This input adds the following metadata fields to each message:

  • schema (Schema of the table that the message originated from)

  • table (Name of the table that the message originated from)

  • operation (Type of operation that generated the message: "read", "delete", "insert", or "update_before" and "update_after". "read" is from messages that are read in the initial snapshot phase.)

  • lsn (the Log Sequence Number in Microsoft SQL Server)

Permissions

To use the default Microsoft SQL Server cache, the user must have permissions to create tables and stored procedures. Refer to checkpoint_cache_table_name for additional details.

Fields

auto_replay_nacks

Whether to automatically replay messages that are rejected (nacked) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams, as the original shape of the data is discarded immediately upon consumption and mutation.

Type: bool

Default: true

batching

Configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s
batching:
  count: 10
  period: 1s
batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.period

The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as 100ms, 1s, or 5s.

Type: string

Default: ""

# Examples:
period: 1s
period: 1m
period: 500ms

batching.processors[]

For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate

  - archive:
      format: lines

  - archive:
      format: json_array

checkpoint_cache

A cache resource to store the current Log Sequence Number (LSN) position. This enables the connector to resume from the last processed position after restarts, preventing data loss and duplicate processing. The cache stores the highest LSN that has been successfully delivered downstream.

Type: string

checkpoint_cache_key

The key to use to store the snapshot position in checkpoint_cache. An alternative key can be provided if multiple CDC inputs share the same cache.

Type: string

Default: microsoft_sql_server_cdc

checkpoint_cache_table_name

The multipart identifier for the checkpoint cache table name. If no checkpoint_cache field is specified, this input will automatically create a table and stored procedure under the rpcn schema to act as a checkpoint cache. This table stores the latest processed Log Sequence Number (LSN) that has been successfully delivered, allowing Redpanda Connect to resume from that point upon restart rather than reconsume the entire change table.

Type: string

Default: rpcn.CdcCheckpointCache

# Examples:
checkpoint_cache_table_name: dbo.checkpoint_cache

checkpoint_limit

The maximum number of messages that can be processed concurrently before applying back pressure. Higher values enable better parallelization and batching but increase memory usage. Messages are processed in LSN order, and a given LSN is only acknowledged after all previous LSNs have been successfully delivered, ensuring at-least-once guarantees.

Type: int

Default: 1024

connection_string

The connection string for the Microsoft SQL Server database. Use the format sqlserver://username:password@host/instance?param1=value&param2=value. For Windows Authentication, use sqlserver://host/instance?trusted_connection=yes. Include additional parameters like TrustServerCertificate=true for self-signed certificates or encrypt=disable to disable encryption.

Type: string

# Examples:
connection_string: sqlserver://username:password@host/instance?param1=value&param2=value

exclude[]

Regular expressions for tables to exclude from CDC streaming. Use this to filter out specific tables from the include patterns. Table names should follow the schema.table format. Exclude patterns are applied after include patterns, allowing you to include broad patterns while excluding specific tables.

Type: array

# Examples:
exclude:
  - dbo.privatetable

include[]

Regular expressions for tables to include in CDC streaming. Specify table names using the format schema.table (such as dbo.orders, sales.customers). Each pattern is treated as a regular expression, allowing wildcards and pattern matching. All specified tables must have CDC enabled in SQL Server.

Type: array

# Examples:
include:
  - dbo.products

snapshot_max_batch_size

The maximum number of rows to stream in a single batch during the initial snapshot phase. Larger batch sizes can improve throughput for initial data loads but may increase memory usage. This setting only applies when stream_snapshot is enabled.

Type: int

Default: 1000

stream_backoff_interval

The time interval to wait between polling attempts when no new CDC data is available. For low-traffic tables, increasing this value reduces database load and network traffic. Use Go duration format like 5s, 30s, or 1m. Shorter intervals provide lower latency for new changes but increase server load.

Type: string

Default: 5s

# Examples:
stream_backoff_interval: 5s
stream_backoff_interval: 1m

stream_snapshot

Whether to stream a snapshot of all existing data before streaming CDC changes. When enabled, the connector first queries all existing table data, then switches to streaming incremental changes from the transaction log. Set to false to start streaming only new changes from the current LSN position.

Type: bool