Docs Connect Components Catalog Inputs oracledb_cdc oracledb_cdc Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code Available in: Cloud, Self-Managed License: This component requires an enterprise license. You can either upgrade to an Enterprise Edition license, or generate a trial license key that's valid for 30 days. Enables Change Data Capture by consuming from OracleDB. Introduced in version 4.83.0. Streams changes from an Oracle database for Change Data Capture (CDC). Additionally, if stream_snapshot is set to true, existing data in the database is also streamed. Common Advanced inputs: label: "" oracledb_cdc: connection_string: "" # No default (required) stream_snapshot: false max_parallel_snapshot_tables: 1 snapshot_max_batch_size: 1000 logminer: scn_window_size: 20000 backoff_interval: 5s mining_interval: 300ms strategy: online_catalog max_transaction_events: 0 lob_enabled: true include: [] # No default (required) exclude: [] # No default (optional) checkpoint_cache: "" # No default (optional) checkpoint_cache_table_name: RPCN.CDC_CHECKPOINT_CACHE checkpoint_cache_key: oracledb_cdc checkpoint_limit: 1024 auto_replay_nacks: true batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) inputs: label: "" oracledb_cdc: connection_string: "" # No default (required) stream_snapshot: false max_parallel_snapshot_tables: 1 snapshot_max_batch_size: 1000 logminer: scn_window_size: 20000 backoff_interval: 5s mining_interval: 300ms strategy: online_catalog max_transaction_events: 0 lob_enabled: true include: [] # No default (required) exclude: [] # No default (optional) checkpoint_cache: "" # No default (optional) checkpoint_cache_table_name: RPCN.CDC_CHECKPOINT_CACHE checkpoint_cache_key: oracledb_cdc checkpoint_limit: 1024 auto_replay_nacks: true batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) Metadata This input adds the following metadata fields to each message: database_schema: The database schema for the table where the message originates from. table_name: Name of the table that the message originated from. operation: Type of operation that generated the message: "read", "delete", "insert", or "update". "read" is from messages that are read in the initial snapshot phase. scn: The System Change Number in Oracle. schema: The table schema, for use with schema-aware downstream processors such as schema_registry_encode. When new columns are detected in CDC events, the schema is automatically refreshed from the Oracle catalog. Dropped columns are reflected after a connector restart. Permissions When using the default Oracle-based cache, the Connect user requires permission to create tables and stored procedures, and the rpcn schema must already exist. See checkpoint_cache_table_name for more information. Fields auto_replay_nacks Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to false these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. Type: bool Default: true batching Allows you to configure a batching policy. Type: object # Examples: batching: byte_size: 5000 count: 0 period: 1s # --- batching: count: 10 period: 1s # --- batching: check: this.contains("END BATCH") count: 0 period: 1m batching.byte_size An amount of bytes at which the batch should be flushed. If 0 disables size based batching. Type: int Default: 0 batching.check A Bloblang query that should return a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples: check: this.type == "end_of_transaction" batching.count A number of messages at which the batch should be flushed. If 0 disables count based batching. Type: int Default: 0 batching.period A period in which an incomplete batch should be flushed regardless of its size. Type: string Default: "" # Examples: period: 1s # --- period: 1m # --- period: 500ms batching.processors[] A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. Type: processor # Examples: processors: - archive: format: concatenate # --- processors: - archive: format: lines # --- processors: - archive: format: json_array checkpoint_cache A cache resource to use for storing the current System Change Number (SCN) that has been successfully delivered. This allows Redpanda Connect to continue from that SCN upon restart, rather than consume the entire state of OracleDB redo logs. If not set, the default Oracle-based cache is used. See checkpoint_cache_table_name for more information. Type: string checkpoint_cache_key The key to use to store the snapshot position in checkpoint_cache. An alternative key can be provided if multiple CDC inputs share the same cache. Type: string Default: oracledb_cdc checkpoint_cache_table_name The identifier for the checkpoint cache table name. If no checkpoint_cache field is specified, this input will automatically create a table and stored procedure under the rpcn schema to act as a checkpoint cache. This table stores the latest processed System Change Number (SCN) that has been successfully delivered, allowing Redpanda Connect to resume from that point upon restart rather than reconsume the entire redo log. Type: string Default: RPCN.CDC_CHECKPOINT_CACHE # Examples: checkpoint_cache_table_name: RPCN.CHECKPOINT_CACHE checkpoint_limit The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given System Change Number (SCN) will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees. Type: int Default: 1024 connection_string The connection string of the Oracle database to connect to. Type: string # Examples: connection_string: oracle://username:password@host:port/service_name exclude[] Regular expressions for tables to exclude. Type: array # Examples: exclude: SCHEMA.PRIVATETABLE include[] Regular expressions for tables to include. Type: array # Examples: include: SCHEMA.PRODUCTS logminer LogMiner configuration settings. Type: object logminer.backoff_interval The interval between attempts to check for new changes once all data is processed. For low traffic tables increasing this value can reduce network traffic to the server. Type: string Default: 5s # Examples: backoff_interval: 5s # --- backoff_interval: 1m logminer.lob_enabled When enabled, large object (CLOB, BLOB) columns are included in both snapshot and streaming change events. When disabled, these columns are still present but contain no values. Enabling this option introduces additional performance overhead and increases memory requirements. Type: bool Default: true logminer.max_transaction_events The maximum number of events that can be buffered for a single transaction. If a transaction exceeds this limit it is discarded and its events will not be emitted. Set to 0 to disable the limit. Type: int Default: 0 logminer.mining_interval The interval between mining cycles during normal operation. Controls how frequently LogMiner polls for new changes when not caught up. Type: string Default: 300ms # Examples: mining_interval: 100ms # --- mining_interval: 1s logminer.scn_window_size The SCN range to mine per cycle. Each cycle reads changes between the current SCN and current SCN + scn_window_size. Smaller values mean more frequent queries with lower memory usage but higher overhead; larger values reduce query frequency and improve throughput at the cost of higher memory usage per cycle. Type: int Default: 20000 logminer.strategy Controls how LogMiner retrieves data dictionary information. online_catalog uses the current data dictionary for best performance but cannot capture DDL changes. Currently, only online_catalog is supported. Type: string Default: online_catalog max_parallel_snapshot_tables Specifies a number of tables that will be processed in parallel during the snapshot processing stage. Type: int Default: 1 snapshot_max_batch_size The maximum number of rows to be streamed in a single batch when taking a snapshot. Type: int Default: 1000 stream_snapshot If set to true, the connector will query all the existing data as a part of snapshot process. Otherwise, it will start from the current System Change Number position. Type: bool Default: false # Examples: stream_snapshot: true Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! ockam_kafka otlp_http