# postgres_cdc

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [connect-full.txt](https://docs.redpanda.com/connect-full.txt)

---
title: postgres_cdc
latest-connect-version: 4.93.0
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-redpanda-tag: v26.1.9
docname: inputs/postgres_cdc
page-component-name: connect
page-version: master
page-component-version: master
page-component-title: Connect
page-relative-src-path: inputs/postgres_cdc.adoc
page-edit-url: https://github.com/redpanda-data/rp-connect-docs/edit/main/modules/components/pages/inputs/postgres_cdc.adoc
page-git-created-date: "2024-12-05"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/connect/components/inputs/postgres_cdc.md -->

**Available in:** [Cloud](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/inputs/postgres_cdc/%20%22View%20the%20Cloud%20version%20of%20this%20component%22), Self-Managed

**License**: This component requires an [enterprise license](https://docs.redpanda.com/redpanda-connect/get-started/licensing/). You can either [upgrade to an Enterprise Edition license](https://www.redpanda.com/upgrade), or [generate a trial license key](http://redpanda.com/try-enterprise) that's valid for 30 days.

Streams data changes from a PostgreSQL database using logical replication. There is also a configuration option to [stream all existing data](#stream_snapshot) from the database.

Introduced in version 4.40.0 and renamed from `pg_stream` to `postgres_cdc` in version 4.43.0.

```yml
inputs:
  label: ""
  postgres_cdc:
    dsn: "" # No default (required)
    include_transaction_markers: false
    stream_snapshot: false
    snapshot_batch_size: 1000
    schema: "" # No default (required)
    tables: [] # No default (required)
    checkpoint_limit: 1024
    temporary_slot: false
    slot_name: "" # No default (required)
    pg_standby_timeout: 10s
    pg_wal_monitor_interval: 3s
    max_parallel_snapshot_tables: 1
    auto_replay_nacks: true
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
```

The `postgres_cdc` input uses logical replication to capture changes made to a PostgreSQL database in real time and streams them to Redpanda Connect. Redpanda Connect uses this replication method to allow you to choose which database tables in your source database to receive changes from. There are also [two replication modes](#choose-a-replication-mode) to choose from, and an [option to receive TOAST and deleted values](#receive-toast-and-deleted-values) in your data updates.

## [](#prerequisites)Prerequisites

-   PostgreSQL version 14 or later

-   Logical replication enabled on your PostgreSQL cluster

    To check whether logical replication is already enabled, run the following query:

    ```SQL
    SHOW wal_level;
    ```

    If the `wal_level` value is `logical`, you can start to use this connector. Otherwise, choose from the following sets of instructions to update your replication settings.


### Cloud platforms

-   [Amazon RDS for PostgreSQL DB](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/PostgreSQL.Concepts.General.FeatureSupport.LogicalReplication.html)

-   [Azure Database for PostgreSQL](https://learn.microsoft.com/en-us/azure/postgresql/flexible-server/concepts-logical#prerequisites-for-logical-replication-and-logical-decoding)

-   [Google Cloud SQL for PostgreSQL](https://cloud.google.com/sql/docs/postgres/replication/configure-logical-replication), including creating a user with replication privileges

-   [Neon](https://neon.tech/docs/guides/logical-replication-guide)

### Self-Hosted PostgreSQL

Use an account with sufficient permissions (superuser) to update your replication settings.

1.  Open the `postgresql.conf` file.

2.  Find the `wal_level` parameter.

3.  Update the parameter value to `wal_level = logical`. If you already use replication slots, you may need to increase the limit on replication slots (`max_replication_slots`). The `max_wal_senders` parameter value must also be greater than or equal to `max_replication_slots`.

4.  Restart the PostgreSQL server.


For this input to make a successful connection to your database, also make sure that it allows replication connections.

1.  Open the `pg_hba.conf` file.

2.  Update this line.

    ```yaml
    host    replication     <replication-username>   <connector-ip>/32    md5
    ```

    Replace the following placeholders with your own values:

    -   `<replication-username>`: The username from an account with superuser privileges.

    -   `<connector-ip>`: The IP address of the server where you are running Redpanda Connect.


3.  Restart the PostgreSQL server.

## [](#choose-a-replication-mode)Choose a replication mode

When you run a pipeline that uses the `postgres_cdc` input, Redpanda Connect connects to your PostgreSQL database and creates a replication slot. The replication slot uses a copy of the Write-Ahead Log (WAL) file to subscribe to changes in your database records as they are applied to the database. There are two replication modes you can choose from: snapshot mode and streaming mode.

In snapshot mode, Redpanda Connect first takes a snapshot of the database and streams the contents before processing changes from the WAL. In streaming mode, Redpanda Connect directly processes changes from the WAL starting from the most recent changes without taking a snapshot first.

For local testing, you can use the [example pipeline on this page](#example-pipeline), which runs in snapshot mode.

### [](#snapshot-mode)Snapshot mode

If you set the [`stream_snapshot` field](#stream_snapshot) to `true`, Redpanda Connect:

1.  Creates a snapshot of your database.

2.  Streams the contents of the tables specified in the `postgres_cdc` input.

3.  Starts processing changes in the WAL that occurred since the snapshot was taken, and streams them to Redpanda Connect.


Once the initial replication process is complete, the snapshot is removed and the input keeps a connection open to the database so that it can receive data updates.

If the pipeline restarts during the replication process, Redpanda Connect resumes processing data changes from where it left off. If there are other interruptions while the snapshot is taken, you may need to restart the snapshot process. For more information, see [Troubleshoot replication failures](#troubleshoot_replication_failures).

### [](#streaming-mode)Streaming mode

If you set the [`stream_snapshot` field](#stream_snapshot) to `false`, Redpanda Connect starts processing data changes from the end of the WAL. If the pipeline restarts, Redpanda Connect resumes processing data changes from the last acknowledged position in the WAL.

## [](#monitor-the-replication-process)Monitor the replication process

You can monitor the initial replication of data using the following metrics:

| Metric name | Description |
| --- | --- |
| replication_lag_bytes | Indicates how far the connector is lagging behind the source database when processing the transaction log. |
| postgres_snapshot_progress | Shows the progress of snapshot processing for each table. |

## [](#troubleshoot-replication-failures)Troubleshoot replication failures

If the database snapshot fails, the replication slot has only an incomplete record of the existing data in your database. To maintain data integrity, you must drop the replication slot manually in your source database and run the Redpanda Connect pipeline again.

```SQL
SELECT pg_drop_replication_slot(SLOT_NAME);
```

## [](#receive-toast-and-deleted-values)Receive TOAST and deleted values

For full visibility of all data updates, you can also choose to stream [TOAST](https://www.postgresql.org/docs/current/storage-toast.html) and deleted values. To enable this option, run the following query on your source database:

```SQL
ALTER TABLE large_data REPLICA IDENTITY FULL;
```

## [](#data-mappings)Data mappings

The following table shows how selected PostgreSQL data types are mapped to data types supported in Redpanda Connect. All other data types are mapped to string values.

| PostgreSQL data type | Bloblang value |
| --- | --- |
| TEXT, TIMESTAMP, UUID, VARCHAR | JSON strings, for example: this data |
| BOOL | Boolean JSON fields, for example: true or false |
| Numeric types (INT4) | JSON number types, for example: 1. |
| JSONB | JSON objects, for example: { "message": "message text" } |
| INTEGER[] | An array of integer values, for example: [1,2,3] |
| TEXT[] | An array of string values, for example: ["value1", "value2", "value3"] |
| INET | A string that contains an IP address, for example: "192.168.1.1" |
| POINT | A string that represents a point in a two-dimensional plane, for example: (x, y) |
| TSRANGE | A string that includes range bounds, for example: [2010-01-01 14:30, 2010-01-01 15:30) |
| TSVECTOR | A string that includes vector data, for example: "'the':2 'question':3 'is':4" |

## [](#metadata)Metadata

This input adds the following metadata fields to each message:

-   `table`: The name of the database table from which the message originated.

-   `operation`: The type of database operation that generated the message, such as `read`, `insert`, `update`, `delete`, `begin` and `commit`. A `read` operation occurs when a snapshot of the database is processed. The `begin` and `commit` operations are only included if the `include_transaction_markers` field is set to `true`.

-   `lsn`: The [Log Sequence Number](https://www.postgresql.org/docs/current/datatype-pg-lsn.html) of each data update from the source PostgreSQL database. The `lsn` values are strings that can be sorted to determine the order in which data updates were written to the WAL.


## [](#fields)Fields

### [](#auto_replay_nacks)`auto_replay_nacks`

Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set `auto_replay_nacks` to `false` to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.

**Type**: `bool`

**Default**: `true`

### [](#aws)`aws`

AWS IAM authentication configuration for PostgreSQL instances. When enabled, IAM credentials are used to generate temporary authentication tokens instead of a static password.

This is useful for connecting to Amazon RDS or Aurora PostgreSQL instances with IAM database authentication enabled. The generated tokens are valid for 15 minutes and are automatically refreshed.

For more information about AWS credentials configuration, see the [credentials for AWS](https://docs.redpanda.com/connect/guides/cloud/aws/) guide.

**Type**: `object`

### [](#aws-enabled)`aws.enabled`

Enable AWS IAM authentication for PostgreSQL. When enabled, an IAM authentication token is generated and used as the password.

**Type**: `bool`

**Default**: `false`

### [](#aws-endpoint)`aws.endpoint`

The PostgreSQL endpoint hostname (e.g., mydb.abc123.us-east-1.rds.amazonaws.com).

**Type**: `string`

### [](#aws-id)`aws.id`

The ID of credentials to use.

**Type**: `string`

### [](#aws-region)`aws.region`

The AWS region where the PostgreSQL instance is located. If no region is specified then the environment default will be used.

**Type**: `string`

### [](#aws-role)`aws.role`

Optional AWS IAM role ARN to assume for authentication. Alternatively, use `roles` array for role chaining instead.

**Type**: `string`

### [](#aws-role_external_id)`aws.role_external_id`

Optional external ID for the role assumption. Only used with the `role` field. Alternatively, use `roles` array for role chaining instead.

**Type**: `string`

### [](#aws-roles)`aws.roles[]`

Optional array of AWS IAM roles to assume for authentication. Roles can be assumed in sequence, enabling chaining for purposes such as cross-account access. Each role can optionally specify an external ID.

**Type**: `object`

### [](#aws-roles-role)`aws.roles[].role`

AWS IAM role ARN to assume.

**Type**: `string`

**Default**: `""`

### [](#aws-roles-role_external_id)`aws.roles[].role_external_id`

Optional external ID for the role assumption.

**Type**: `string`

**Default**: `""`

### [](#aws-secret)`aws.secret`

The secret for the credentials being used.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Secrets](https://docs.redpanda.com/connect/configuration/secrets/).

**Type**: `string`

### [](#aws-token)`aws.token`

The token for the credentials being used, required when using short term credentials.

**Type**: `string`

### [](#batching)`batching`

Allows you to configure a [batching policy](https://docs.redpanda.com/connect/configuration/batching/).

**Type**: `object`

```yaml
# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m
```

### [](#batching-byte_size)`batching.byte_size`

The number of bytes at which the batch is flushed. Set to `0` to disable size-based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-check)`batching.check`

A [Bloblang query](https://docs.redpanda.com/connect/guides/bloblang/about/) that returns a boolean value indicating whether a message should end a batch.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
check: this.type == "end_of_transaction"
```

### [](#batching-count)`batching.count`

The number of messages after which the batch is flushed. Set to `0` to disable count-based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-period)`batching.period`

The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as `100ms`, `1s`, or `5s`.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms
```

### [](#batching-processors)`batching.processors[]`

A list of [processors](https://docs.redpanda.com/connect/components/processors/about/) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op.

**Type**: `processor`

```yaml
# Examples:
processors:
  - archive:
      format: concatenate

# ---

processors:
  - archive:
      format: lines

# ---

processors:
  - archive:
      format: json_array
```

### [](#checkpoint_limit)`checkpoint_limit`

The maximum number of messages that this input can process at a given time. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given log sequence number (LSN) is not acknowledged until all messages under that offset are delivered.

**Type**: `int`

**Default**: `1024`

### [](#dsn)`dsn`

The data source name (DSN) of the PostgreSQL database from which you want to stream updates. Use the format `postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&…​]`. For example, if you wanted to disable SSL in a secure environment, you would add `sslmode=disable` to the connection string.

**Type**: `string`

```yaml
# Examples:
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
```

### [](#heartbeat_interval)`heartbeat_interval`

The interval between heartbeat messages, which Redpanda Connect writes to the WAL using the `pg_logical_emit_message` function.

Heartbeat messages are useful when you subscribe to data changes from tables with low activity, while other tables in the database have higher-frequency updates. Heartbeat messages allow Redpanda Connect to periodically acknowledge new messages even when no data updates occur. Each acknowledgement advances the committed point in the WAL, which ensures that PostgreSQL can safely reclaim older log segments, preventing excessive disk space usage.

Set `heartbeat_interval` to `0s` to disable heartbeats.

**Type**: `string`

**Default**: `1h`

```yaml
# Examples:
heartbeat_interval: 0s

# ---

heartbeat_interval: 24h
```

### [](#include_transaction_markers)`include_transaction_markers`

When set to `true`, creates empty messages for `BEGIN` and `COMMIT` operations which start and complete each transaction. Messages with the `operation` metadata field set to `BEGIN` or `COMMIT` have null message payloads.

**Type**: `bool`

**Default**: `false`

### [](#max_parallel_snapshot_tables)`max_parallel_snapshot_tables`

Specify the maximum number of tables that are processed in parallel when the initial snapshot of the source database is taken.

**Type**: `int`

**Default**: `1`

### [](#pg_standby_timeout)`pg_standby_timeout`

Specify the standby timeout after which an idle connection is refreshed to keep the connection alive.

**Type**: `string`

**Default**: `10s`

```yaml
# Examples:
pg_standby_timeout: 30s
```

### [](#pg_wal_monitor_interval)`pg_wal_monitor_interval`

How often to report changes to the replication lag and write them to Redpanda Connect metrics.

**Type**: `string`

**Default**: `3s`

```yaml
# Examples:
pg_wal_monitor_interval: 6s
```

### [](#schema)`schema`

The PostgreSQL schema from which to replicate data.

**Type**: `string`

```yaml
# Examples:
schema: public

# ---

schema: "MyCaseSensitiveSchemaNeedingQuotes"
```

### [](#slot_name)`slot_name`

The name of the PostgreSQL logical replication slot to use. If not provided, a random name is generated unless you create a replication slot manually before starting replication.

> 📝 **NOTE**
>
> Starting from version 4.48.1, Redpanda Connect no longer adds the prefix `rs_` to the names of replication slots it creates. To continue using an existing replication slot after upgrading, manually add the `rs_` prefix to the slot name.

**Type**: `string`

```yaml
# Examples:
slot_name: my_test_slot
```

### [](#snapshot_batch_size)`snapshot_batch_size`

The number of table rows to fetch in each batch when querying the snapshot.

This option is only available when `stream_snapshot` is set to `true`.

**Type**: `int`

**Default**: `1000`

```yaml
# Examples:
snapshot_batch_size: 10000
```

### [](#stream_snapshot)`stream_snapshot`

When set to `true`, this input streams a snapshot of all existing data in the source database before streaming data changes. To use this setting, all database tables that you want to replicate _must_ have a primary key.

**Type**: `bool`

**Default**: `false`

```yaml
# Examples:
stream_snapshot: true
```

### [](#tables)`tables[]`

A list of database table names to include in the snapshot and logical replication. Specify each table name as a separate item.

**Type**: `array`

```yaml
# Examples:
tables:
  - my_table_1
  - "MyCaseSensitiveTableNeedingQuotes"
```

### [](#temporary_slot)`temporary_slot`

If set to `true`, the input creates a temporary replication slot that is automatically dropped when the connection to your source database is closed. You might use this option to:

-   Avoid data accumulating in the replication slot when a pipeline is paused or stopped

-   Test the connector


If the pipeline is restarted, another data snapshot is taken before data updates are streamed.

**Type**: `bool`

**Default**: `false`

### [](#tls)`tls`

Custom TLS settings can be used to override system defaults.

**Type**: `object`

### [](#tls-client_certs)`tls.client_certs[]`

A list of client certificates to use. For each certificate either the fields `cert` and `key`, or `cert_file` and `key_file` should be specified, but not both.

**Type**: `object`

**Default**: `[]`

```yaml
# Examples:
client_certs:
  - cert: foo
    key: bar

# ---

client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key
```

### [](#tls-client_certs-cert)`tls.client_certs[].cert`

A plain text certificate to use.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-cert_file)`tls.client_certs[].cert_file`

The path of a certificate to use.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-key)`tls.client_certs[].key`

A plain text certificate key to use.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Secrets](https://docs.redpanda.com/connect/configuration/secrets/).

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-key_file)`tls.client_certs[].key_file`

The path of a certificate key to use.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-password)`tls.client_certs[].password`

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete `pbeWithMD5AndDES-CBC` algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Secrets](https://docs.redpanda.com/connect/configuration/secrets/).

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
password: foo

# ---

password: ${KEY_PASSWORD}
```

### [](#tls-enable_renegotiation)`tls.enable_renegotiation`

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message `local error: tls: no renegotiation`.

Requires version 3.45.0 or later.

**Type**: `bool`

**Default**: `false`

### [](#tls-root_cas)`tls.root_cas`

An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Secrets](https://docs.redpanda.com/connect/configuration/secrets/).

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----
```

### [](#tls-root_cas_file)`tls.root_cas_file`

An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
root_cas_file: ./root_cas.pem
```

### [](#tls-skip_cert_verify)`tls.skip_cert_verify`

Whether to skip server side certificate verification.

**Type**: `bool`

**Default**: `false`

### [](#unchanged_toast_value)`unchanged_toast_value`

Specify the value to emit when unchanged [TOAST values](#receive-toast-and-deleted-values) appear in the message stream. Unchanged values occur for data updates and deletes when `REPLICA IDENTITY` is not set to `FULL`.

**Type**: `unknown`

**Default**:

```yaml
null
```

```yaml
# Examples:
unchanged_toast_value: __redpanda_connect_unchanged_toast_value__
```

## [](#example-pipeline)Example pipeline

You can run the following pipeline locally to check that data updates are streamed from your source database to Redpanda Connect. All transactions are written to stdout.

```yml
input:
  label: "postgres_cdc"
  postgres_cdc:
    dsn: postgres://user:password@host:port/dbname
    include_transaction_markers: false
    slot_name: test_slot_native_decoder
    snapshot_batch_size: 100000
    stream_snapshot: true
    temporary_slot: true
    schema: schema_name
    tables:
      - table_name

cache_resources:
  - label: data_caching
    file:
      directory: /tmp/cache

output:
  label: main
  stdout: {}
```