oracledb_cdc

Enables Change Data Capture by consuming from OracleDB.

Introduced in version 4.83.0.

Streams changes from an Oracle database for Change Data Capture (CDC). Additionally, if stream_snapshot is set to true, existing data in the database is also streamed.

  • Common

  • Advanced

inputs:
  label: ""
  oracledb_cdc:
    connection_string: "" # No default (required)
    stream_snapshot: false
    max_parallel_snapshot_tables: 1
    snapshot_max_batch_size: 1000
    logminer:
      scn_window_size: 20000
      backoff_interval: 5s
      mining_interval: 300ms
      strategy: online_catalog
      max_transaction_events: 0
      lob_enabled: true
    include: [] # No default (required)
    exclude: [] # No default (optional)
    checkpoint_cache: "" # No default (optional)
    checkpoint_cache_table_name: RPCN.CDC_CHECKPOINT_CACHE
    checkpoint_cache_key: oracledb_cdc
    checkpoint_limit: 1024
    auto_replay_nacks: true
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
inputs:
  label: ""
  oracledb_cdc:
    connection_string: "" # No default (required)
    stream_snapshot: false
    max_parallel_snapshot_tables: 1
    snapshot_max_batch_size: 1000
    logminer:
      scn_window_size: 20000
      backoff_interval: 5s
      mining_interval: 300ms
      strategy: online_catalog
      max_transaction_events: 0
      lob_enabled: true
    include: [] # No default (required)
    exclude: [] # No default (optional)
    checkpoint_cache: "" # No default (optional)
    checkpoint_cache_table_name: RPCN.CDC_CHECKPOINT_CACHE
    checkpoint_cache_key: oracledb_cdc
    checkpoint_limit: 1024
    auto_replay_nacks: true
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)

Metadata

This input adds the following metadata fields to each message:

  • database_schema: The database schema for the table where the message originates from.

  • table_name: Name of the table that the message originated from.

  • operation: Type of operation that generated the message: "read", "delete", "insert", or "update". "read" is from messages that are read in the initial snapshot phase.

  • scn: The System Change Number in Oracle.

  • schema: The table schema, for use with schema-aware downstream processors such as schema_registry_encode. When new columns are detected in CDC events, the schema is automatically refreshed from the Oracle catalog. Dropped columns are reflected after a connector restart.

Permissions

When using the default Oracle-based cache, the Connect user requires permission to create tables and stored procedures, and the rpcn schema must already exist. See checkpoint_cache_table_name for more information.

Fields

auto_replay_nacks

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to false these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.

Type: bool

Default: true

batching

Allows you to configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int

Default: 0

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int

Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string

Default: ""

# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms

batching.processors[]

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate


# ---

processors:
  - archive:
      format: lines


# ---

processors:
  - archive:
      format: json_array

checkpoint_cache

A cache resource to use for storing the current System Change Number (SCN) that has been successfully delivered. This allows Redpanda Connect to continue from that SCN upon restart, rather than consume the entire state of OracleDB redo logs. If not set, the default Oracle-based cache is used. See checkpoint_cache_table_name for more information.

Type: string

checkpoint_cache_key

The key to use to store the snapshot position in checkpoint_cache. An alternative key can be provided if multiple CDC inputs share the same cache.

Type: string

Default: oracledb_cdc

checkpoint_cache_table_name

The identifier for the checkpoint cache table name. If no checkpoint_cache field is specified, this input will automatically create a table and stored procedure under the rpcn schema to act as a checkpoint cache. This table stores the latest processed System Change Number (SCN) that has been successfully delivered, allowing Redpanda Connect to resume from that point upon restart rather than reconsume the entire redo log.

Type: string

Default: RPCN.CDC_CHECKPOINT_CACHE

# Examples:
checkpoint_cache_table_name: RPCN.CHECKPOINT_CACHE

checkpoint_limit

The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given System Change Number (SCN) will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees.

Type: int

Default: 1024

connection_string

The connection string of the Oracle database to connect to.

Type: string

# Examples:
connection_string: oracle://username:password@host:port/service_name

exclude[]

Regular expressions for tables to exclude.

Type: array

# Examples:
exclude: SCHEMA.PRIVATETABLE

include[]

Regular expressions for tables to include.

Type: array

# Examples:
include: SCHEMA.PRODUCTS

logminer

LogMiner configuration settings.

Type: object

logminer.backoff_interval

The interval between attempts to check for new changes once all data is processed. For low traffic tables increasing this value can reduce network traffic to the server.

Type: string

Default: 5s

# Examples:
backoff_interval: 5s

# ---

backoff_interval: 1m

logminer.lob_enabled

When enabled, large object (CLOB, BLOB) columns are included in both snapshot and streaming change events. When disabled, these columns are still present but contain no values. Enabling this option introduces additional performance overhead and increases memory requirements.

Type: bool

Default: true

logminer.max_transaction_events

The maximum number of events that can be buffered for a single transaction. If a transaction exceeds this limit it is discarded and its events will not be emitted. Set to 0 to disable the limit.

Type: int

Default: 0

logminer.mining_interval

The interval between mining cycles during normal operation. Controls how frequently LogMiner polls for new changes when not caught up.

Type: string

Default: 300ms

# Examples:
mining_interval: 100ms

# ---

mining_interval: 1s

logminer.scn_window_size

The SCN range to mine per cycle. Each cycle reads changes between the current SCN and current SCN + scn_window_size. Smaller values mean more frequent queries with lower memory usage but higher overhead; larger values reduce query frequency and improve throughput at the cost of higher memory usage per cycle.

Type: int

Default: 20000

logminer.strategy

Controls how LogMiner retrieves data dictionary information. online_catalog uses the current data dictionary for best performance but cannot capture DDL changes. Currently, only online_catalog is supported.

Type: string

Default: online_catalog

max_parallel_snapshot_tables

Specifies a number of tables that will be processed in parallel during the snapshot processing stage.

Type: int

Default: 1

snapshot_max_batch_size

The maximum number of rows to be streamed in a single batch when taking a snapshot.

Type: int

Default: 1000

stream_snapshot

If set to true, the connector will query all the existing data as a part of snapshot process. Otherwise, it will start from the current System Change Number position.

Type: bool

Default: false

# Examples:
stream_snapshot: true