postgres_cdc

Beta

Streams data changes from a PostgreSQL database using logical replication. There is also a configuration option to stream all existing data from the database.

# Configuration fields, showing default values
input:
  label: ""
  postgres_cdc:
    dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable # No default (required)
    include_transaction_markers: true
    stream_snapshot: false
    snapshot_memory_safety_factor: 1
    snapshot_batch_size: 0
    schema: public # No default (required)
    tables: [] # No default (required)
    checkpoint_limit: 1024
    temporary_slot: false
    slot_name: "" # No default (optional)
    pg_standby_timeout: 10s
    pg_wal_monitor_interval: 3s
    max_parallel_snapshot_tables: 1
    auto_replay_nacks: true
    batching:
      count: 0
      byte_size: 0
      period: "" # No default (optional)
      check: "" # No default (optional)
      processors: [] # No default (optional)

The postgres_cdc input uses logical replication to capture changes made to a PostgreSQL database in real time and streams them to Redpanda Connect. Redpanda Connect uses this replication method to allow you to choose which database tables in your source database to receive changes from. There are also two replication modes to choose from, and an option to receive TOAST and deleted values in your data updates.

Prerequisites

  • PostgreSQL version 10 or later

  • Network access from the cluster where your Redpanda Connect pipeline is running to the source database environment. For detailed networking information, including how to set up a VPC peering connection, see Redpanda Cloud Networking.

  • Logical replication enabled on your PostgreSQL cluster

    To check whether logical replication is already enabled, run the following query:

    SHOW wal_level;

    If the wal_level value is logical, you can start to use this connector. Otherwise, choose from the following sets of instructions to update your replication settings.

  • Cloud platforms

  • Self-Hosted PostgreSQL

Use an account with sufficient permissions (superuser) to update your replication settings.

  1. Open the postgresql.conf file.

  2. Find the wal_level parameter.

  3. Update the parameter value to wal_level = logical. If you already use replication slots, you may need to increase the limit on replication slots (max_replication_slots). The max_wal_senders parameter value must also be greater than or equal to max_replication_slots.

  4. Restart the PostgreSQL server.

For this input to make a successful connection to your database, also make sure that it allows replication connections.

  1. Open the pg_hba.conf file.

  2. Update this line.

    host    replication     <replication-username>   <connector-ip>/32    md5

    Replace the following placeholders with your own values:

    • <replication-username>: The username from an account with superuser privileges.

    • <connector-ip>: The IP address of the server where you are running Redpanda Connect.

  3. Restart the PostgreSQL server.

Choose a replication mode

When you run a pipeline that uses the postgres_cdc input, Redpanda Connect connects to your PostgreSQL database and creates a replication slot. The replication slot uses a copy of the Write-Ahead Log (WAL) file to subscribe to changes in your database records as they are applied to the database. There are two replication modes you can choose from: snapshot mode and streaming mode.

In snapshot mode, Redpanda Connect first takes a snapshot of the database and streams the contents before processing changes from the WAL. In streaming mode, Redpanda Connect directly processes changes from the WAL starting from the most recent changes without taking a snapshot first.

For local testing, you can use the example pipeline on this page, which runs in snapshot mode.

Snapshot mode

If you set the stream_snapshot field to true, Redpanda Connect:

  1. Creates a snapshot of your database.

  2. Streams the contents of the tables specified in the postgres_cdc input.

  3. Starts processing changes in the WAL that occurred since the snapshot was taken, and streams them to Redpanda Connect.

Once the initial replication process is complete, the snapshot is removed and the input keeps a connection open to the database so that it can receive data updates.

If the pipeline restarts during the replication process, Redpanda Connect resumes processing data changes from where it left off. If there are other interruptions while the snapshot is taken, you may need to restart the snapshot process. For more information, see Troubleshoot replication failures.

Streaming mode

If you set the stream_snapshot field to false, Redpanda Connect starts processing data changes from the end of the WAL. If the pipeline restarts, Redpanda Connect resumes processing data changes from the last acknowledged position in the WAL.

Monitor the replication process

You can monitor the initial replication of data using the following metrics:

Metric name Description

replication_lag_bytes

Indicates how far the connector is lagging behind the source database when processing the transaction log.

postgres_snapshot_progress

Shows the progress of snapshot processing for each table.

Troubleshoot replication failures

If the database snapshot fails, the replication slot has only an incomplete record of the existing data in your database. To maintain data integrity, you must drop the replication slot manually in your source database and run the Redpanda Connect pipeline again.

SELECT pg_drop_replication_slot(SLOT_NAME);

Receive TOAST and deleted values

For full visibility of all data updates, you can also choose to stream TOAST and deleted values. To enable this option, run the following query on your source database:

ALTER TABLE large_data REPLICA IDENTITY FULL;

Data mappings

The following table shows how selected PostgreSQL data types are mapped to data types supported in Redpanda Connect. All other data types are mapped to string values.

PostgreSQL data type Bloblang value

TEXT, TIMESTAMP, UUID, VARCHAR

JSON strings, for example: this data

BOOL

Boolean JSON fields, for example: true or false

Numeric types (INT4)

JSON number types, for example: 1.

JSONB

JSON objects, for example: { "message": "message text" }

INTEGER[]

An array of integer values, for example: [1,2,3]

TEXT[]

An array of string values, for example: ["value1", "value2", "value3"]

INET

A string that contains an IP address, for example: "192.168.1.1"

POINT

A string that represents a point in a two-dimensional plane, for example: (x, y)

TSRANGE

A string that includes range bounds, for example: [2010-01-01 14:30, 2010-01-01 15:30)

TSVECTOR

A string that includes vector data, for example: "'the':2 'question':3 'is':4"

Metadata

This input adds the following metadata fields to each message:

  • table: The name of the database table from which the message originated.

  • operation: The type of database operation that generated the message, such as read, insert, update, delete, begin and commit. A read operation occurs when a snapshot of the database is processed. The begin and commit operations are only included if the include_transaction_markers field is set to true.

  • lsn: The Log Sequence Number of each data update from the source PostgreSQL database. The lsn values are strings that can be sorted to determine the order in which data updates were written to the WAL.

Fields

dsn

The data source name (DSN) of the PostgreSQL database from which you want to stream updates. Use the format postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&…​]. For example, if you wanted to disable SSL in a secure environment, you would add sslmode=disable to the connection string.

Type: string

# Examples

dsn: postgres://foouser:foopass@localhost:5432/foodb

dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

include_transaction_markers

When set to true, creates empty messages for BEGIN and COMMIT operations which start and complete each transaction. Messages with the operation metadata field set to BEGIN or COMMIT have null message payloads.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: bool

Default: true

stream_snapshot

When set to true, this input streams a snapshot of all existing data in the source database before streaming data changes. To use this setting, all database tables that you want to replicate must have a primary key.

Type: bool

Default: false

# Examples

stream_snapshot: true

snapshot_memory_safety_factor

The fraction of available memory to use for streaming the database snapshot. Decimal values between 0 and 1 represent the percentage of memory to use. For example, 0.3 would allocate 30% of the available memory. Lower values make initial streaming slower, but help prevent out-of-memory errors.

This option is only available when stream_snapshot is set to true. You can use either snapshot_memory_safety_factor or snapshot_batch_size, not both.

Type: float

Default: 1

# Examples

snapshot_memory_safety_factor: 0.2

snapshot_batch_size

The number of table rows to fetch in each batch when querying the snapshot. Leave at 0 to allow the input to determine the batch size based on the snapshot_memory_safety_factor value.

This option is only available when stream_snapshot is set to true. You can use either snapshot_batch_size or snapshot_memory_safety_factor, not both.

Type: int

Default: 0

# Examples

snapshot_batch_size: 10000

schema

The PostgreSQL schema from which to replicate data.

Type: string

# Examples

schema: public

tables

A list of database table names to include in the snapshot and logical replication. Specify each table name as a separate item.

Type: array

tables:
  - orders_table
  - customer_address_table
  - inventory_table

checkpoint_limit

The maximum number of messages that this input can process at a given time. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given log sequence number (LSN) is not acknowledged until all messages under that offset are delivered.

Type: int

Default: 1024

temporary_slot

If set to true, the input creates a temporary replication slot that is automatically dropped when the connection to your source database is closed. You might use this option to:

  • Avoid data accumulating in the replication slot when a pipeline is paused or stopped

  • Test the connector

If the pipeline is restarted, another data snapshot is taken before data updates are streamed.

Type: bool

Default: false

slot_name

The name of the PostgreSQL logical replication slot to use. If not provided, a random name is generated unless you create a replication slot manually before starting replication.

Type: string

Default: ""

# Examples

slot_name: test_replication_slot

pg_standby_timeout

Specify the standby timeout after which an idle connection is refreshed to keep the connection alive.

Type: string

Default: 10s

# Examples

pg_standby_timeout: 30s

pg_wal_monitor_interval

How often to report changes to the replication lag and write them to Redpanda Connect metrics.

Type: string

Default: 3s

# Examples

pg_wal_monitor_interval: 6s

max_parallel_snapshot_tables

Specify the maximum number of tables that are processed in parallel when the initial snapshot of the source database is taken.

Type: int

Default: 1

auto_replay_nacks

Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.

Type: bool

Default: true

batching

Allows you to configure a batching policy.

Type: object

# Examples

batching:
  byte_size: 5000
  count: 0
  period: 1s

batching:
  count: 10
  period: 1s

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.byte_size

The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.period

The period of time after which an incomplete batch is flushed regardless of its size.

Type: string

Default: ""

# Examples

period: 1s

period: 1m

period: 500ms

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples

check: this.type == "end_of_transaction"

batching.processors

For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.

Type: array

# Examples

processors:
  - archive:
      format: concatenate

processors:
  - archive:
      format: lines

processors:
  - archive:
      format: json_array

Example pipeline

You can run the following pipeline locally to check that data updates are streamed from your source database to Redpanda Connect. All transactions are written to stdout.

input:
  label: "postgres_cdc"
  postgres_cdc:
    dsn: postgres://user:password@host:port/dbname
    include_transaction_markers: false
    slot_name: test_slot_native_decoder
    snapshot_batch_size: 100000
    stream_snapshot: true
    temporary_slot: true
    schema: schema_name
    tables:
      - table_name

cache_resources:
  - label: data_caching
    file:
      directory: /tmp/cache

output:
  label: main
  stdout: {}