Connect

PostgreSQL CDC Patterns

The postgres_cdc input captures row-level changes from PostgreSQL tables using logical replication and the Write-Ahead Log (WAL). Use these patterns to filter, transform, and route PostgreSQL CDC events to Redpanda, S3, and other destinations.

Use this cookbook to:

  • Apply reusable patterns for capturing PostgreSQL CDC events

  • Adapt integration patterns to route CDC data to Redpanda and S3

  • Identify patterns for filtering and transforming change events

Prerequisites

Before using these patterns, configure the following.

Redpanda CLI

Install the Redpanda CLI (rpk) to run Redpanda Connect. See rpk installation for installation instructions.

PostgreSQL logical replication

The source PostgreSQL database must have logical replication enabled. Verify the setting by running:

SHOW wal_level;

The wal_level value must be logical. For instructions on enabling logical replication, see the postgres_cdc prerequisites. For cloud-managed PostgreSQL, see:

Replication slot

The postgres_cdc input uses a PostgreSQL replication slot to track its position in the WAL. The slot is created automatically when the pipeline starts, using the name specified in the slot_name field. Each pipeline must use a unique slot name. Replication slots persist in the database, so the pipeline resumes from where it left off after a restart.

Environment variables

The examples in this cookbook use environment variables for configuration:

export PG_DSN="postgres://user:password@localhost:5432/mydb" (1)
export PG_SCHEMA="public" (2)
export PG_SLOT_NAME="my_connect_slot" (3)
export REDPANDA_BROKERS=localhost:9092 (4)
export S3_BUCKET=cdc-archive (5)
1 The PostgreSQL DSN in postgres://user:password@host:port/dbname format.
2 The schema containing the tables to capture.
3 A unique name for the replication slot.
4 The Redpanda broker addresses (for Redpanda output examples).
5 The S3 bucket name (for S3 output examples).

Capture CDC events

The simplest pattern captures all change events from PostgreSQL tables and outputs them with metadata:

input:
  postgres_cdc:
    dsn: ${PG_DSN}
    schema: ${PG_SCHEMA}
    tables:
      - orders
    slot_name: ${PG_SLOT_NAME}
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        root.operation = meta("operation")
        root.table = meta("table")
        root.lsn = meta("lsn")
        root.data = this
        root.timestamp = now()

output:
  stdout:
    codec: lines

For details on the CDC event message structure and available metadata fields, see the metadata section in the connector reference.

Filter CDC events

Filter events to process only specific change types using the operation metadata field:

input:
  postgres_cdc:
    dsn: ${PG_DSN}
    schema: ${PG_SCHEMA}
    tables:
      - orders
    slot_name: ${PG_SLOT_NAME}
    stream_snapshot: false

pipeline:
  processors:
    - mapping: |
        root = if meta("operation") != "insert" && meta("operation") != "update" {
          deleted()
        }
    - mapping: |
        root.operation = meta("operation")
        root.table = meta("table")
        root.data = this
        root.timestamp = now()

output:
  stdout:
    codec: lines

This pattern:

  • Filters to only insert and update operations, dropping delete and read events

  • Transforms the event to a simplified format with a timestamp

Route to Redpanda

Stream PostgreSQL changes to Redpanda for real-time processing:

input:
  postgres_cdc:
    dsn: ${PG_DSN}
    schema: ${PG_SCHEMA}
    tables:
      - orders
      - customers
    slot_name: ${PG_SLOT_NAME}
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        meta topic = meta("table")

output:
  redpanda:
    seed_brokers:
      - ${REDPANDA_BROKERS}
    topic: ${! meta("topic") }
    key: ${! json("id") }
    batching:
      count: 100
      period: 1s

This pattern:

  • Uses the table name as the Redpanda topic

  • Batches messages for efficient delivery

  • Sets the message key to the row’s primary key field

Route to S3

Archive CDC events to S3 for long-term storage and analytics:

input:
  postgres_cdc:
    dsn: ${PG_DSN}
    schema: ${PG_SCHEMA}
    tables:
      - orders
    slot_name: ${PG_SLOT_NAME}
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        root.operation = meta("operation")
        root.table = meta("table")
        root.data = this
        root.timestamp = now()

output:
  aws_s3:
    bucket: ${S3_BUCKET}
    path: >-
      cdc/${! meta("table") }/${! timestamp_unix().format_timestamp("2006/01/02/15") }/${! uuid_v4() }.ndjson
    batching:
      count: 1000
      period: 5m
      processors:
        - archive:
            format: lines

This pattern:

  • Organizes files by table and time-based partitions (year/month/day/hour)

  • Batches events and archives them as newline-delimited JSON

  • Uses UUID file names to prevent collisions

Route by event type

Route different event types to different destinations:

input:
  postgres_cdc:
    dsn: ${PG_DSN}
    schema: ${PG_SCHEMA}
    tables:
      - orders
    slot_name: ${PG_SLOT_NAME}
    stream_snapshot: true

output:
  switch:
    cases:
      - check: meta("operation") == "insert"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: postgres.orders.inserts
      - output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: postgres.orders.changes

This pattern:

  • Routes insert events to one Redpanda topic and all other change events to another

  • Supports specialized downstream consumers per operation type

Configure replication mode

The postgres_cdc input supports two replication modes controlled by the stream_snapshot field:

  • stream_snapshot: true: Captures a full snapshot of existing table data before streaming live changes. Use this when you need a complete initial load.

  • stream_snapshot: false: Skips the snapshot and streams only changes from the current WAL position. Use this when you only need new changes going forward.

input:
  postgres_cdc:
    dsn: ${PG_DSN}
    schema: ${PG_SCHEMA}
    tables:
      - orders
    slot_name: ${PG_SLOT_NAME}
    stream_snapshot: true
    snapshot_batch_size: 1000 (1)
1 Number of rows to read per batch during snapshot processing.

Troubleshoot common issues

Use these steps to diagnose and fix the most common problems with the postgres_cdc input.

No events received

If no events arrive:

  1. Verify logical replication is enabled:

    SHOW wal_level;
  2. Grant the required PostgreSQL privileges:

    ALTER ROLE your_user WITH REPLICATION;
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO your_user;
    ALTER ROLE …​ WITH REPLICATION requires superuser privileges. Run this statement as a superuser, or have a database administrator run it on your behalf.
  3. Check that slot_name is unique and not already in use:

    SELECT slot_name, active FROM pg_replication_slots;

Snapshot fails mid-run

If the snapshot fails, the replication slot has an incomplete record of your database. To maintain data integrity, drop the replication slot and restart the pipeline:

SELECT pg_drop_replication_slot('my_connect_slot');

Replace my_connect_slot with your slot_name value.

Duplicate events

The postgres_cdc input provides at-least-once delivery. If the pipeline fails between WAL acknowledgments, events may be re-read on restart. To handle duplicates:

  • Use idempotent processing in downstream systems.

  • Deduplicate using the lsn metadata field.

  • Lower checkpoint_limit to reduce the window of possible duplicates.