oracledb_cdc
Enables Change Data Capture by consuming from OracleDB.
Introduced in version 4.83.0.
Streams changes from an Oracle database for Change Data Capture (CDC).
Additionally, if stream_snapshot is set to true, existing data in the database is also streamed.
-
Common
-
Advanced
inputs:
label: ""
oracledb_cdc:
connection_string: "" # No default (required)
wallet_path: "" # No default (optional)
wallet_password: "" # No default (optional)
stream_snapshot: false
max_parallel_snapshot_tables: 1
snapshot_max_batch_size: 1000
logminer:
scn_window_size: 20000
backoff_interval: 5s
mining_interval: 300ms
strategy: online_catalog
max_transaction_events: 0
lob_enabled: true
include: [] # No default (required)
exclude: [] # No default (optional)
checkpoint_cache: "" # No default (optional)
checkpoint_cache_table_name: RPCN.CDC_CHECKPOINT_CACHE
checkpoint_cache_key: oracledb_cdc
checkpoint_limit: 1024
pdb_name: "" # No default (optional)
auto_replay_nacks: true
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
inputs:
label: ""
oracledb_cdc:
connection_string: "" # No default (required)
wallet_path: "" # No default (optional)
wallet_password: "" # No default (optional)
stream_snapshot: false
max_parallel_snapshot_tables: 1
snapshot_max_batch_size: 1000
logminer:
scn_window_size: 20000
backoff_interval: 5s
mining_interval: 300ms
strategy: online_catalog
max_transaction_events: 0
lob_enabled: true
include: [] # No default (required)
exclude: [] # No default (optional)
checkpoint_cache: "" # No default (optional)
checkpoint_cache_table_name: RPCN.CDC_CHECKPOINT_CACHE
checkpoint_cache_key: oracledb_cdc
checkpoint_limit: 1024
pdb_name: "" # No default (optional)
auto_replay_nacks: true
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
Metadata
This input adds the following metadata fields to each message:
-
database_schema: The database schema for the table where the message originates from.
-
table_name: Name of the table that the message originated from.
-
operation: Type of operation that generated the message: "read", "delete", "insert", or "update". "read" is from messages that are read in the initial snapshot phase.
-
scn: The System Change Number in Oracle.
-
schema: The table schema, for use with schema-aware downstream processors such as
schema_registry_encode. When new columns are detected in CDC events, the schema is automatically refreshed from the Oracle catalog. Dropped columns are reflected after a connector restart.
Permissions
When using the default Oracle-based cache, the Connect user requires permission to create tables and stored procedures, and the rpcn schema must already exist. See checkpoint_cache_table_name for more information.
Fields
auto_replay_nacks
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to false these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Type: bool
Default: true
batching
Allows you to configure a batching policy.
Type: object
# Examples:
batching:
byte_size: 5000
count: 0
period: 1s
# ---
batching:
count: 10
period: 1s
# ---
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0 disables size based batching.
Type: int
Default: 0
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples:
check: this.type == "end_of_transaction"
batching.count
A number of messages at which the batch should be flushed. If 0 disables count based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples:
period: 1s
# ---
period: 1m
# ---
period: 500ms
batching.processors[]
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: processor
# Examples:
processors:
- archive:
format: concatenate
# ---
processors:
- archive:
format: lines
# ---
processors:
- archive:
format: json_array
checkpoint_cache
A cache resource to use for storing the current System Change Number (SCN) that has been successfully delivered. This allows Redpanda Connect to continue from that SCN upon restart, rather than consume the entire state of OracleDB redo logs. If not set, the default Oracle-based cache is used. See checkpoint_cache_table_name for more information.
Type: string
checkpoint_cache_key
The key to use to store the snapshot position in checkpoint_cache. An alternative key can be provided if multiple CDC inputs share the same cache.
Type: string
Default: oracledb_cdc
checkpoint_cache_table_name
The identifier for the checkpoint cache table name. If no checkpoint_cache field is specified, this input will automatically create a table and stored procedure under the rpcn schema to act as a checkpoint cache. This table stores the latest processed System Change Number (SCN) that has been successfully delivered, allowing Redpanda Connect to resume from that point upon restart rather than reconsume the entire redo log.
Type: string
Default: RPCN.CDC_CHECKPOINT_CACHE
# Examples:
checkpoint_cache_table_name: RPCN.CHECKPOINT_CACHE
checkpoint_limit
The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given System Change Number (SCN) will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees.
Type: int
Default: 1024
connection_string
The connection string of the Oracle database to connect to. You can supply additional connection options as URL query parameters, for example: oracle://user:password@host:1522/service?WALLET=/opt/oracle/wallet&SSL=true.
Type: string
# Examples:
connection_string: oracle://username:password@host:port/service_name
# ---
connection_string: oracle://user:password@host:1522/service?WALLET=/opt/oracle/wallet&SSL=true
exclude[]
Regular expressions for tables to exclude.
Type: array
# Examples:
exclude: SCHEMA.PRIVATETABLE
include[]
Regular expressions for tables to include.
Type: array
# Examples:
include: SCHEMA.PRODUCTS
logminer.backoff_interval
The interval between attempts to check for new changes once all data is processed. For low traffic tables increasing this value can reduce network traffic to the server.
Type: string
Default: 5s
# Examples:
backoff_interval: 5s
# ---
backoff_interval: 1m
logminer.lob_enabled
When enabled, large object (CLOB, BLOB) columns are included in both snapshot and streaming change events. When disabled, these columns are still present but contain no values. Enabling this option introduces additional performance overhead and increases memory requirements.
Type: bool
Default: true
logminer.max_transaction_events
The maximum number of events that can be buffered for a single transaction. If a transaction exceeds this limit it is discarded and its events will not be emitted. Set to 0 to disable the limit.
Type: int
Default: 0
logminer.mining_interval
The interval between mining cycles during normal operation. Controls how frequently LogMiner polls for new changes when not caught up.
Type: string
Default: 300ms
# Examples:
mining_interval: 100ms
# ---
mining_interval: 1s
logminer.scn_window_size
The SCN range to mine per cycle. Each cycle reads changes between the current SCN and current SCN + scn_window_size. Smaller values mean more frequent queries with lower memory usage but higher overhead; larger values reduce query frequency and improve throughput at the cost of higher memory usage per cycle.
Type: int
Default: 20000
logminer.strategy
Controls how LogMiner retrieves data dictionary information. online_catalog uses the current data dictionary for best performance but cannot capture DDL changes. Currently, only online_catalog is supported.
Type: string
Default: online_catalog
max_parallel_snapshot_tables
Specifies a number of tables that will be processed in parallel during the snapshot processing stage.
Type: int
Default: 1
pdb_name
The name of the pluggable database (PDB) to monitor. When connecting to a CDB root, LogMiner output is scoped to this PDB via SRC_CON_NAME filtering and catalog queries use ALTER SESSION SET CONTAINER to switch context. Requires GRANT SET CONTAINER TO <user> CONTAINER=ALL.
Type: string
snapshot_max_batch_size
The maximum number of rows to be streamed in a single batch when taking a snapshot.
Type: int
Default: 1000
stream_snapshot
If set to true, the connector will query all the existing data as a part of snapshot process. Otherwise, it will start from the current System Change Number position.
Type: bool
Default: false
# Examples:
stream_snapshot: true
wallet_password
Password for the ewallet.p12 PKCS#12 wallet file. Only use this when the wallet directory contains ewallet.p12 rather than cwallet.sso.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string