Docs Connect Components Inputs pg_stream pg_stream Beta Available in: Cloud, Self-Managed License: This component requires an Enterprise license. To upgrade, contact Redpanda sales. Streams data changes from a PostgreSQL database using logical replication. There is also a configuration option to stream all existing data from the database. Introduced in version 4.40.0. # Configuration fields, showing default values input: label: "" pg_stream: dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable # No default (required) include_transaction_markers: true stream_snapshot: false snapshot_memory_safety_factor: 1 snapshot_batch_size: 0 schema: public # No default (required) tables: [] # No default (required) checkpoint_limit: 1024 temporary_slot: false slot_name: "" # No default (optional) pg_standby_timeout: 10s pg_wal_monitor_interval: 3s max_parallel_snapshot_tables: 1 auto_replay_nacks: true batching: count: 0 byte_size: 0 period: "" # No default (optional) check: "" # No default (optional) processors: [] # No default (optional) The pg_stream input uses logical replication to capture changes made to a PostgreSQL database in real time and streams them to Redpanda Connect. Redpanda Connect uses this replication method to allow you to choose which database tables in your source database to receive changes from. There are also two replication modes to choose from, and an option to receive TOAST and deleted values in your data updates. Prerequisites PostgreSQL version 10 or later Logical replication enabled on your PostgreSQL cluster To check whether logical replication is already enabled, run the following query: SHOW wal_level; If the wal_level value is logical, you can start to use this connector. Otherwise, choose from the following sets of instructions to update your replication settings. Cloud platforms Self-Hosted PostgreSQL Amazon RDS for PostgreSQL DB Azure Database for PostgreSQL Google Cloud SQL for PostgreSQL, including creating a user with replication privileges Neon Use an account with sufficient permissions (superuser) to update your replication settings. Open the postgresql.conf file. Find the wal_level parameter. Update the parameter value to wal_level = logical. If you already use replication slots, you may need to increase the limit on replication slots (max_replication_slots). The max_wal_senders parameter value must also be greater than or equal to max_replication_slots. Restart the PostgreSQL server. For this input to make a successful connection to your database, also make sure that it allows replication connections. Open the pg_hba.conf file. Update this line. host replication <replication-username> <connector-ip>/32 md5 Replace the following placeholders with your own values: <replication-username>: The username from an account with superuser privileges. <connector-ip>: The IP address of the server where you are running Redpanda Connect. Restart the PostgreSQL server. Choose a replication mode When you run a pipeline that uses the pg_stream input, Redpanda Connect connects to your PostgreSQL database and creates a replication slot. The replication slot uses a copy of the Write-Ahead Log (WAL) file to subscribe to changes in your database records as they are applied to the database. There are two replication modes you can choose from: snapshot mode and streaming mode. In snapshot mode, Redpanda Connect first takes a snapshot of the database and streams the contents before processing changes from the WAL. In streaming mode, Redpanda Connect directly processes changes from the WAL starting from the most recent changes without taking a snapshot first. For local testing, you can use the example pipeline on this page, which runs in snapshot mode. Snapshot mode If you set the stream_snapshot field to true, Redpanda Connect: Creates a snapshot of your database. Streams the contents of the tables specified in the pg_stream input. Starts processing changes in the WAL that occurred since the snapshot was taken, and streams them to Redpanda Connect. Once the initial replication process is complete, the snapshot is removed and the input keeps a connection open to the database so that it can receive data updates. If the pipeline restarts during the replication process, Redpanda Connect resumes processing data changes from where it left off. If there are other interruptions while the snapshot is taken, you may need to restart the snapshot process. For more information, see Troubleshoot replication failures. Streaming mode If you set the stream_snapshot field to false, Redpanda Connect starts processing data changes from the end of the WAL. If the pipeline restarts, Redpanda Connect resumes processing data changes from the last acknowledged position in the WAL. Monitor the replication process You can monitor the initial replication of data using the following metrics: Metric name Description replication_lag_bytes Indicates how far the connector is lagging behind the source database when processing the transaction log. postgres_snapshot_progress Shows the progress of snapshot processing for each table. Troubleshoot replication failures If the database snapshot fails, the replication slot has only an incomplete record of the existing data in your database. To maintain data integrity, you must drop the replication slot manually in your source database and run the Redpanda Connect pipeline again. SELECT pg_drop_replication_slot(SLOT_NAME); Receive TOAST and deleted values For full visibility of all data updates, you can also choose to stream TOAST and deleted values. To enable this option, run the following query on your source database: ALTER TABLE large_data REPLICA IDENTITY FULL; Metadata This input adds the following metadata fields to each message: mode: Whether a message is part of the snapshot processing of existing data in a database (snapshot) or a data update streamed to Redpanda Connect (streaming). table: The name of the database table from which the message originated. operation: The type of database operation that generated the message, such as INSERT, UPDATE, DELETE, BEGIN and COMMIT. BEGIN are COMMIT operations are only included if the include_transaction_markers field is set to true. Fields dsn The data source name (DSN) of the PostgreSQL database from which you want to stream updates. Use the format postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&…]. For example, if you wanted to disable SSL in a secure environment, you would add sslmode=disable to the connection string. Type: string # Examples dsn: postgres://foouser:foopass@localhost:5432/foodb dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable include_transaction_markers When set to true, creates empty messages for BEGIN and COMMIT operations which start and complete each transaction. Messages with the operation metadata field set to BEGIN or COMMIT have null message payloads. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: bool Default: true stream_snapshot When set to true, this input streams a snapshot of all existing data in the source database before streaming data changes. To use this setting, all database tables that you want to replicate must have a primary key. Type: bool Default: false # Examples stream_snapshot: true snapshot_memory_safety_factor The fraction of available memory to use for streaming the database snapshot. Decimal values between 0 and 1 represent the percentage of memory to use. For example, 0.3 would allocate 30% of the available memory. Lower values make initial streaming slower, but help prevent out-of-memory errors. This option is only available when stream_snapshot is set to true. You can use either snapshot_memory_safety_factor or snapshot_batch_size, not both. Type: float Default: 1 # Examples snapshot_memory_safety_factor: 0.2 snapshot_batch_size The number of table rows to fetch in each batch when querying the snapshot. Leave at 0 to allow the input to determine the batch size based on the snapshot_memory_safety_factor value. This option is only available when stream_snapshot is set to true. You can use either snapshot_batch_size or snapshot_memory_safety_factor, not both. Type: int Default: 0 # Examples snapshot_batch_size: 10000 schema The PostgreSQL schema from which to replicate data. Type: string # Examples schema: public tables A list of database table names to include in the snapshot and logical replication. Specify each table name as a separate item. Type: array tables: - orders_table - customer_address_table - inventory_table checkpoint_limit The maximum number of messages that this input can process at a given time. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given log sequence number (LSN) is not acknowledged until all messages under that offset are delivered. Type: int Default: 1024 temporary_slot If set to true, the input creates a temporary replication slot that is automatically dropped when the connection to your source database is closed. You might use this option to: Avoid data accumulating in the replication slot when a pipeline is paused or stopped Test the connector If the pipeline is restarted, another data snapshot is taken before data updates are streamed. Type: bool Default: false slot_name The name of the PostgreSQL logical replication slot to use. If not provided, a random name is generated unless you create a replication slot manually before starting replication. Type: string Default: "" # Examples slot_name: test_replication_slot pg_standby_timeout Specify the standby timeout after which an idle connection is refreshed to keep the connection alive. Type: string Default: 10s # Examples pg_standby_timeout: 30s pg_wal_monitor_interval How often to report changes to the replication lag and write them to Redpanda Connect metrics. Type: string Default: 3s # Examples pg_wal_monitor_interval: 6s max_parallel_snapshot_tables Specify the maximum number of tables that are processed in parallel when the initial snapshot of the source database is taken. Type: int Default: 1 auto_replay_nacks Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure. Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation. Type: bool Default: true batching Allows you to configure a batching policy. Type: object # Examples batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m batching.count The number of messages after which the batch is flushed. Set to 0 to disable count-based batching. Type: int Default: 0 batching.byte_size The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching. Type: int Default: 0 batching.period The period of time after which an incomplete batch is flushed regardless of its size. Type: string Default: "" # Examples period: 1s period: 1m period: 500ms batching.check A Bloblang query that returns a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples check: this.type == "end_of_transaction" batching.processors For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches. Type: array # Examples processors: - archive: format: concatenate processors: - archive: format: lines processors: - archive: format: json_array Example pipeline You can run the following pipeline locally to check that data updates are streamed from your source database to Redpanda Connect. All transactions are written to stdout. input: label: "pg_stream" pg_stream: dsn: postgres://user:password@host:port/dbname include_transaction_markers: false slot_name: test_slot_native_decoder snapshot_batch_size: 100000 stream_snapshot: true temporary_slot: true schema: schema_name tables: - table_name cache_resources: - label: data_caching file: directory: /tmp/cache output: label: main stdout: {} Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution parquet pulsar