Docs Cloud Redpanda Connect Components Catalog Inputs microsoft_sql_server_cdc microsoft_sql_server_cdc Available in: Cloud, Self-Managed Enables Change Data Capture by consuming from Microsoft SQL Server’s change tables. Common Advanced inputs: label: "" microsoft_sql_server_cdc: connection_string: "" # No default (required) stream_snapshot: "" # No default (required) snapshot_max_batch_size: 1000 include: [] # No default (required) exclude: [] # No default (optional) checkpoint_cache: "" # No default (optional) checkpoint_cache_table_name: rpcn.CdcCheckpointCache checkpoint_cache_key: microsoft_sql_server_cdc checkpoint_limit: 1024 stream_backoff_interval: 5s auto_replay_nacks: true batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) inputs: label: "" microsoft_sql_server_cdc: connection_string: "" # No default (required) stream_snapshot: "" # No default (required) snapshot_max_batch_size: 1000 include: [] # No default (required) exclude: [] # No default (optional) checkpoint_cache: "" # No default (optional) checkpoint_cache_table_name: rpcn.CdcCheckpointCache checkpoint_cache_key: microsoft_sql_server_cdc checkpoint_limit: 1024 stream_backoff_interval: 5s auto_replay_nacks: true batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) Streams changes from a Microsoft SQL Server database for Change Data Capture (CDC). Additionally, if stream_snapshot is set to true, then the existing data in the database is also streamed too. Metadata This input adds the following metadata fields to each message: schema (Schema of the table that the message originated from) table (Name of the table that the message originated from) operation (Type of operation that generated the message: "read", "delete", "insert", or "update_before" and "update_after". "read" is from messages that are read in the initial snapshot phase.) lsn (the Log Sequence Number in Microsoft SQL Server) Permissions To use the default Microsoft SQL Server cache, the user must have permissions to create tables and stored procedures. Refer to checkpoint_cache_table_name for additional details. Fields auto_replay_nacks Whether to automatically replay messages that are rejected (nacked) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure. Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams, as the original shape of the data is discarded immediately upon consumption and mutation. Type: bool Default: true batching Configure a batching policy. Type: object # Examples: batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m batching.byte_size The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching. Type: int Default: 0 batching.check A Bloblang query that returns a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples: check: this.type == "end_of_transaction" batching.count The number of messages after which the batch is flushed. Set to 0 to disable count-based batching. Type: int Default: 0 batching.period The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as 100ms, 1s, or 5s. Type: string Default: "" # Examples: period: 1s period: 1m period: 500ms batching.processors[] For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches. Type: processor # Examples: processors: - archive: format: concatenate - archive: format: lines - archive: format: json_array checkpoint_cache A cache resource to store the current Log Sequence Number (LSN) position. This enables the connector to resume from the last processed position after restarts, preventing data loss and duplicate processing. The cache stores the highest LSN that has been successfully delivered downstream. Type: string checkpoint_cache_key The key to use to store the snapshot position in checkpoint_cache. An alternative key can be provided if multiple CDC inputs share the same cache. Type: string Default: microsoft_sql_server_cdc checkpoint_cache_table_name The multipart identifier for the checkpoint cache table name. If no checkpoint_cache field is specified, this input will automatically create a table and stored procedure under the rpcn schema to act as a checkpoint cache. This table stores the latest processed Log Sequence Number (LSN) that has been successfully delivered, allowing Redpanda Connect to resume from that point upon restart rather than reconsume the entire change table. Type: string Default: rpcn.CdcCheckpointCache # Examples: checkpoint_cache_table_name: dbo.checkpoint_cache checkpoint_limit The maximum number of messages that can be processed concurrently before applying back pressure. Higher values enable better parallelization and batching but increase memory usage. Messages are processed in LSN order, and a given LSN is only acknowledged after all previous LSNs have been successfully delivered, ensuring at-least-once guarantees. Type: int Default: 1024 connection_string The connection string for the Microsoft SQL Server database. Use the format sqlserver://username:password@host/instance?param1=value¶m2=value. For Windows Authentication, use sqlserver://host/instance?trusted_connection=yes. Include additional parameters like TrustServerCertificate=true for self-signed certificates or encrypt=disable to disable encryption. Type: string # Examples: connection_string: sqlserver://username:password@host/instance?param1=value¶m2=value exclude[] Regular expressions for tables to exclude from CDC streaming. Use this to filter out specific tables from the include patterns. Table names should follow the schema.table format. Exclude patterns are applied after include patterns, allowing you to include broad patterns while excluding specific tables. Type: array # Examples: exclude: - dbo.privatetable include[] Regular expressions for tables to include in CDC streaming. Specify table names using the format schema.table (such as dbo.orders, sales.customers). Each pattern is treated as a regular expression, allowing wildcards and pattern matching. All specified tables must have CDC enabled in SQL Server. Type: array # Examples: include: - dbo.products snapshot_max_batch_size The maximum number of rows to stream in a single batch during the initial snapshot phase. Larger batch sizes can improve throughput for initial data loads but may increase memory usage. This setting only applies when stream_snapshot is enabled. Type: int Default: 1000 stream_backoff_interval The time interval to wait between polling attempts when no new CDC data is available. For low-traffic tables, increasing this value reduces database load and network traffic. Use Go duration format like 5s, 30s, or 1m. Shorter intervals provide lower latency for new changes but increase server load. Type: string Default: 5s # Examples: stream_backoff_interval: 5s stream_backoff_interval: 1m stream_snapshot Whether to stream a snapshot of all existing data before streaming CDC changes. When enabled, the connector first queries all existing table data, then switches to streaming incremental changes from the transaction log. Set to false to start streaming only new changes from the current LSN position. Type: bool Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! legacy_redpanda_migrator_offsets mongodb_cdc