postgres_cdc
Streams data changes from a PostgreSQL database using logical replication. There is also a configuration option to stream all existing data from the database.
inputs:
label: ""
postgres_cdc:
dsn: "" # No default (required)
include_transaction_markers: false
stream_snapshot: false
snapshot_batch_size: 1000
schema: "" # No default (required)
tables: [] # No default (required)
checkpoint_limit: 1024
temporary_slot: false
slot_name: "" # No default (required)
pg_standby_timeout: 10s
pg_wal_monitor_interval: 3s
max_parallel_snapshot_tables: 1
auto_replay_nacks: true
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
The postgres_cdc input uses logical replication to capture changes made to a PostgreSQL database in real time and streams them to Redpanda Connect. Redpanda Connect uses this replication method to allow you to choose which database tables in your source database to receive changes from. There are also two replication modes to choose from, and an option to receive TOAST and deleted values in your data updates.
Prerequisites
-
PostgreSQL version 14 or later
-
Network access from the cluster where your Redpanda Connect pipeline is running to the source database environment. For detailed networking information, including how to set up a VPC peering connection, see Redpanda Cloud Networking.
-
Logical replication enabled on your PostgreSQL cluster
To check whether logical replication is already enabled, run the following query:
SHOW wal_level;If the
wal_levelvalue islogical, you can start to use this connector. Otherwise, choose from the following sets of instructions to update your replication settings.
-
Cloud platforms
-
Self-Hosted PostgreSQL
-
Google Cloud SQL for PostgreSQL, including creating a user with replication privileges
Use an account with sufficient permissions (superuser) to update your replication settings.
-
Open the
postgresql.conffile. -
Find the
wal_levelparameter. -
Update the parameter value to
wal_level = logical. If you already use replication slots, you may need to increase the limit on replication slots (max_replication_slots). Themax_wal_sendersparameter value must also be greater than or equal tomax_replication_slots. -
Restart the PostgreSQL server.
For this input to make a successful connection to your database, also make sure that it allows replication connections.
-
Open the
pg_hba.conffile. -
Update this line.
host replication <replication-username> <connector-ip>/32 md5Replace the following placeholders with your own values:
-
<replication-username>: The username from an account with superuser privileges. -
<connector-ip>: The IP address of the server where you are running Redpanda Connect.
-
-
Restart the PostgreSQL server.
Choose a replication mode
When you run a pipeline that uses the postgres_cdc input, Redpanda Connect connects to your PostgreSQL database and creates a replication slot. The replication slot uses a copy of the Write-Ahead Log (WAL) file to subscribe to changes in your database records as they are applied to the database. There are two replication modes you can choose from: snapshot mode and streaming mode.
In snapshot mode, Redpanda Connect first takes a snapshot of the database and streams the contents before processing changes from the WAL. In streaming mode, Redpanda Connect directly processes changes from the WAL starting from the most recent changes without taking a snapshot first.
For local testing, you can use the example pipeline on this page, which runs in snapshot mode.
Snapshot mode
If you set the stream_snapshot field to true, Redpanda Connect:
-
Creates a snapshot of your database.
-
Streams the contents of the tables specified in the
postgres_cdcinput. -
Starts processing changes in the WAL that occurred since the snapshot was taken, and streams them to Redpanda Connect.
Once the initial replication process is complete, the snapshot is removed and the input keeps a connection open to the database so that it can receive data updates.
If the pipeline restarts during the replication process, Redpanda Connect resumes processing data changes from where it left off. If there are other interruptions while the snapshot is taken, you may need to restart the snapshot process. For more information, see Troubleshoot replication failures.
Streaming mode
If you set the stream_snapshot field to false, Redpanda Connect starts processing data changes from the end of the WAL. If the pipeline restarts, Redpanda Connect resumes processing data changes from the last acknowledged position in the WAL.
Monitor the replication process
You can monitor the initial replication of data using the following metrics:
| Metric name | Description |
|---|---|
|
Indicates how far the connector is lagging behind the source database when processing the transaction log. |
|
Shows the progress of snapshot processing for each table. |
Troubleshoot replication failures
If the database snapshot fails, the replication slot has only an incomplete record of the existing data in your database. To maintain data integrity, you must drop the replication slot manually in your source database and run the Redpanda Connect pipeline again.
SELECT pg_drop_replication_slot(SLOT_NAME);
Receive TOAST and deleted values
For full visibility of all data updates, you can also choose to stream TOAST and deleted values. To enable this option, run the following query on your source database:
ALTER TABLE large_data REPLICA IDENTITY FULL;
Data mappings
The following table shows how selected PostgreSQL data types are mapped to data types supported in Redpanda Connect. All other data types are mapped to string values.
| PostgreSQL data type | Bloblang value |
|---|---|
TEXT, TIMESTAMP, UUID, VARCHAR |
JSON strings, for example: |
BOOL |
Boolean JSON fields, for example: |
Numeric types (INT4) |
JSON number types, for example: |
JSONB |
JSON objects, for example: |
INTEGER[] |
An array of integer values, for example: |
TEXT[] |
An array of string values, for example: |
INET |
A string that contains an IP address, for example: |
POINT |
A string that represents a point in a two-dimensional plane, for example: |
TSRANGE |
A string that includes range bounds, for example: |
TSVECTOR |
A string that includes vector data, for example: |
Metadata
This input adds the following metadata fields to each message:
-
table: The name of the database table from which the message originated. -
operation: The type of database operation that generated the message, such asread,insert,update,delete,beginandcommit. Areadoperation occurs when a snapshot of the database is processed. Thebeginandcommitoperations are only included if theinclude_transaction_markersfield is set totrue. -
lsn: The Log Sequence Number of each data update from the source PostgreSQL database. Thelsnvalues are strings that can be sorted to determine the order in which data updates were written to the WAL.
Fields
auto_replay_nacks
Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.
Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.
Type: bool
Default: true
aws
AWS IAM authentication configuration for PostgreSQL instances. When enabled, IAM credentials are used to generate temporary authentication tokens instead of a static password.
This is useful for connecting to Amazon RDS or Aurora PostgreSQL instances with IAM database authentication enabled. The generated tokens are valid for 15 minutes and are automatically refreshed.
For more information about AWS credentials configuration, see the credentials for AWS guide.
Type: object
aws.enabled
Enable AWS IAM authentication for PostgreSQL. When enabled, an IAM authentication token is generated and used as the password.
Type: bool
Default: false
aws.endpoint
The PostgreSQL endpoint hostname (e.g., mydb.abc123.us-east-1.rds.amazonaws.com).
Type: string
aws.region
The AWS region where the PostgreSQL instance is located. If no region is specified then the environment default will be used.
Type: string
aws.role
Optional AWS IAM role ARN to assume for authentication. Alternatively, use roles array for role chaining instead.
Type: string
aws.role_external_id
Optional external ID for the role assumption. Only used with the role field. Alternatively, use roles array for role chaining instead.
Type: string
aws.roles[]
Optional array of AWS IAM roles to assume for authentication. Roles can be assumed in sequence, enabling chaining for purposes such as cross-account access. Each role can optionally specify an external ID.
Type: object
aws.secret
The secret for the credentials being used.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
aws.token
The token for the credentials being used, required when using short term credentials.
Type: string
batching
Allows you to configure a batching policy.
Type: object
# Examples:
batching:
byte_size: 5000
count: 0
period: 1s
# ---
batching:
count: 10
period: 1s
# ---
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.byte_size
The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching.
Type: int
Default: 0
batching.check
A Bloblang query that returns a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples:
check: this.type == "end_of_transaction"
batching.count
The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.
Type: int
Default: 0
batching.period
The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as 100ms, 1s, or 5s.
Type: string
Default: ""
# Examples:
period: 1s
# ---
period: 1m
# ---
period: 500ms
batching.processors[]
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op.
Type: processor
# Examples:
processors:
- archive:
format: concatenate
# ---
processors:
- archive:
format: lines
# ---
processors:
- archive:
format: json_array
checkpoint_limit
The maximum number of messages that this input can process at a given time. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given log sequence number (LSN) is not acknowledged until all messages under that offset are delivered.
Type: int
Default: 1024
dsn
The data source name (DSN) of the PostgreSQL database from which you want to stream updates. Use the format postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&…]. For example, if you wanted to disable SSL in a secure environment, you would add sslmode=disable to the connection string.
Type: string
# Examples:
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
heartbeat_interval
The interval between heartbeat messages, which Redpanda Connect writes to the WAL using the pg_logical_emit_message function.
Heartbeat messages are useful when you subscribe to data changes from tables with low activity, while other tables in the database have higher-frequency updates. Heartbeat messages allow Redpanda Connect to periodically acknowledge new messages even when no data updates occur. Each acknowledgement advances the committed point in the WAL, which ensures that PostgreSQL can safely reclaim older log segments, preventing excessive disk space usage.
Set heartbeat_interval to 0s to disable heartbeats.
Type: string
Default: 1h
# Examples:
heartbeat_interval: 0s
# ---
heartbeat_interval: 24h
include_transaction_markers
When set to true, creates empty messages for BEGIN and COMMIT operations which start and complete each transaction. Messages with the operation metadata field set to BEGIN or COMMIT have null message payloads.
Type: bool
Default: false
max_parallel_snapshot_tables
Specify the maximum number of tables that are processed in parallel when the initial snapshot of the source database is taken.
Type: int
Default: 1
pg_standby_timeout
Specify the standby timeout after which an idle connection is refreshed to keep the connection alive.
Type: string
Default: 10s
# Examples:
pg_standby_timeout: 30s
pg_wal_monitor_interval
How often to report changes to the replication lag and write them to Redpanda Connect metrics.
Type: string
Default: 3s
# Examples:
pg_wal_monitor_interval: 6s
schema
The PostgreSQL schema from which to replicate data.
Type: string
# Examples:
schema: public
# ---
schema: "MyCaseSensitiveSchemaNeedingQuotes"
slot_name
The name of the PostgreSQL logical replication slot to use. If not provided, a random name is generated unless you create a replication slot manually before starting replication.
Type: string
# Examples:
slot_name: my_test_slot
snapshot_batch_size
The number of table rows to fetch in each batch when querying the snapshot.
This option is only available when stream_snapshot is set to true.
Type: int
Default: 1000
# Examples:
snapshot_batch_size: 10000
stream_snapshot
When set to true, this input streams a snapshot of all existing data in the source database before streaming data changes. To use this setting, all database tables that you want to replicate must have a primary key.
Type: bool
Default: false
# Examples:
stream_snapshot: true
tables[]
A list of database table names to include in the snapshot and logical replication. Specify each table name as a separate item.
Type: array
# Examples:
tables:
- my_table_1
- "MyCaseSensitiveTableNeedingQuotes"
temporary_slot
If set to true, the input creates a temporary replication slot that is automatically dropped when the connection to your source database is closed. You might use this option to:
-
Avoid data accumulating in the replication slot when a pipeline is paused or stopped
-
Test the connector
If the pipeline is restarted, another data snapshot is taken before data updates are streamed.
Type: bool
Default: false
tls.client_certs[]
A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both.
Type: object
Default: []
# Examples:
client_certs:
- cert: foo
key: bar
# ---
client_certs:
- cert_file: ./example.pem
key_file: ./example.key
tls.client_certs[].key
A plain text certificate key to use.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.
Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
# Examples:
password: foo
# ---
password: ${KEY_PASSWORD}
tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.
Type: bool
Default: false
tls.root_cas
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
# Examples:
root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
tls.root_cas_file
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examples:
root_cas_file: ./root_cas.pem
tls.skip_cert_verify
Whether to skip server side certificate verification.
Type: bool
Default: false
unchanged_toast_value
Specify the value to emit when unchanged TOAST values appear in the message stream. Unchanged values occur for data updates and deletes when REPLICA IDENTITY is not set to FULL.
Type: unknown
Default:
null
# Examples:
unchanged_toast_value: __redpanda_connect_unchanged_toast_value__
Example pipeline
You can run the following pipeline locally to check that data updates are streamed from your source database to Redpanda Connect. All transactions are written to stdout.
input:
label: "postgres_cdc"
postgres_cdc:
dsn: postgres://user:password@host:port/dbname
include_transaction_markers: false
slot_name: test_slot_native_decoder
snapshot_batch_size: 100000
stream_snapshot: true
temporary_slot: true
schema: schema_name
tables:
- table_name
cache_resources:
- label: data_caching
file:
directory: /tmp/cache
output:
label: main
stdout: {}