Oracle Database CDC Patterns

The Oracle Database CDC input enables capturing row-level changes from Oracle tables using LogMiner. This cookbook provides reusable patterns for filtering, transforming, and routing Oracle CDC events to Redpanda, S3, and other destinations.

Use this cookbook to:

  • Find reusable patterns for capturing Oracle Database CDC events

  • Look up integration patterns for routing CDC data to Redpanda and S3

  • Identify patterns for filtering and transforming change events

Prerequisites

Before using these patterns, ensure you have the following configured:

Redpanda CLI

Install the Redpanda CLI (rpk) to run Redpanda Connect. See Get Started with Redpanda Connect using rpk for installation instructions.

Oracle Database setup

The Oracle database must be configured for CDC:

Enable supplemental logging

Supplemental logging must be enabled at the database and table level:

-- Enable database-level supplemental logging
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

-- Enable table-level supplemental logging (for each table)
ALTER TABLE my_schema.my_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Enable archivelog mode

The database must be in ARCHIVELOG mode for LogMiner to access redo logs:

-- Check if archivelog is enabled
ARCHIVE LOG LIST;

-- Enable archivelog mode (if needed)
SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;

Grant permissions

The database user needs specific permissions to read redo logs:

-- Create user for CDC
CREATE USER cdc_user IDENTIFIED BY <password>;

-- Grant necessary permissions
GRANT CREATE SESSION TO cdc_user;
GRANT SELECT ANY TABLE TO cdc_user;
GRANT SELECT_CATALOG_ROLE TO cdc_user;
GRANT EXECUTE_CATALOG_ROLE TO cdc_user;
GRANT SELECT ANY TRANSACTION TO cdc_user;
GRANT LOGMINING TO cdc_user;

-- Grant access to specific tables
GRANT SELECT ON my_schema.my_table TO cdc_user;

-- Create schema for checkpoint table
CREATE USER rpcn IDENTIFIED BY <password>;
GRANT CREATE TABLE TO rpcn;
GRANT CREATE PROCEDURE TO rpcn;
GRANT UNLIMITED TABLESPACE TO rpcn;

Oracle Wallet (for SSL connections)

For secure connections using Oracle Wallet:

  1. Create a wallet directory:

    mkdir -p /opt/oracle/wallet
  2. Use Oracle Wallet Manager or orapki to create and configure the wallet:

    orapki wallet create -wallet /opt/oracle/wallet -auto_login
    orapki wallet add -wallet /opt/oracle/wallet -trusted_cert -cert ca.crt
    orapki wallet add -wallet /opt/oracle/wallet -user_cert -cert client.crt -pwd <password>
  3. The wallet directory should contain either:

    • cwallet.sso (auto-login wallet, no password required)

    • ewallet.p12 (requires password)

Environment variables

The examples in this cookbook use environment variables for Oracle configuration. This allows you to keep credentials secure and separate from your pipeline configuration files.

export ORACLE_CONNECTION_STRING=oracle://cdc_user:password@host:1521/service_name (1)
export ORACLE_WALLET_PATH=/opt/oracle/wallet (2)
export ORACLE_WALLET_PASSWORD=wallet_password (3)
export REDPANDA_BROKERS=localhost:9092 (4)
export S3_BUCKET=my-cdc-bucket (5)
1 The Oracle connection string with credentials.
2 Path to Oracle Wallet directory (optional, for SSL).
3 Wallet password if using ewallet.p12 (optional).
4 The Redpanda broker addresses (for Redpanda output examples).
5 The S3 bucket name (for S3 output examples).

Capture CDC events

The simplest pattern captures all change events from Oracle tables and outputs them with metadata:

input:
  oracledb_cdc:
    connection_string: ${ORACLE_CONNECTION_STRING}
    include:
      - "MY_SCHEMA\\.ORDERS"
      - "MY_SCHEMA\\.CUSTOMERS"
    exclude:
      - "MY_SCHEMA\\..*_TEMP"

pipeline:
  processors:
    # Extract the change event details
    - mapping: |
        root = this
        root.metadata.operation = meta("operation")
        root.metadata.table = meta("table_name")
        root.metadata.schema = meta("database_schema")
        root.metadata.scn = meta("scn")
        root.metadata.timestamp = now()

output:
  stdout:
    codec: lines

For details on the CDC event message structure and available metadata fields, see the metadata section in the connector reference.

Filter CDC events

You can filter events to process only specific change types:

input:
  oracledb_cdc:
    connection_string: ${ORACLE_CONNECTION_STRING}
    include:
      - "MY_SCHEMA\\.ORDERS"

pipeline:
  processors:
    # Filter out delete events
    - mapping: |
        root = if meta("operation") == "delete" { deleted() } else { this }

    # Add operation metadata to the message
    - mapping: |
        root = this.merge({
          "operation": meta("operation"),
          "table": meta("table_name")
        })

output:
  stdout:
    codec: lines

This example:

  • Filters out delete events using deleted()

  • Adds operation type and timestamp to the output

  • Transforms the event to include metadata

Stream initial snapshot

Capture both existing data and ongoing changes:

input:
  oracledb_cdc:
    connection_string: ${ORACLE_CONNECTION_STRING}
    include:
      - "MY_SCHEMA\\.ORDERS"
    stream_snapshot: true
    snapshot_max_batch_size: 5000
    max_parallel_snapshot_tables: 2

pipeline:
  processors:
    - mapping: |
        root = this

        # Tag snapshot vs CDC events
        root.event_type = if meta("operation") == "read" {
          "snapshot"
        } else {
          "cdc"
        }

output:
  stdout:
    codec: lines
1 Enable initial snapshot of existing table data.
2 Number of rows per batch during snapshot (higher = faster, more memory).
3 Number of tables to snapshot in parallel.
Snapshot events have operation metadata set to "read", while CDC events use "insert", "update", or "delete".

Route to Redpanda

Stream Oracle changes to Redpanda for real-time processing:

input:
  oracledb_cdc:
    connection_string: ${ORACLE_CONNECTION_STRING}
    include:
      - "MY_SCHEMA\\.ORDERS"
      - "MY_SCHEMA\\.CUSTOMERS"

pipeline:
  processors:
    # Create a composite key from the primary key fields
    - mapping: |
        meta kafka_key = this.ID.string()

        # Add CDC metadata as message headers
        meta operation = meta("operation")
        meta table = meta("table_name")
        meta scn = meta("scn")

output:
  redpanda:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topic: 'oracle-cdc-${! meta("table_name").lowercase() }'
    max_in_flight: 10
    compression: snappy
    batching:
      count: 100
      period: 100ms
1 Route to different topics based on table name.

This example:

  • Creates a message key from the primary key for consistent partitioning

  • Routes each table to a separate topic

  • Batches messages for efficient delivery

  • Adds CDC metadata as Kafka headers

Route to S3

Archive CDC events to S3 for long-term storage and analytics:

input:
  oracledb_cdc:
    connection_string: ${ORACLE_CONNECTION_STRING}
    include:
      - "MY_SCHEMA\\.ORDERS"

pipeline:
  processors:
    - mapping: |
        root = this
        root.operation = meta("operation")
        root.table = meta("table_name")
        root.scn = meta("scn")
        root.captured_at = now().ts_format("2006-01-02T15:04:05Z07:00")

output:
  aws_s3:
    bucket: ${S3_BUCKET}
    path: 'oracle-cdc/${! meta("table_name") }/${! timestamp_unix().ts_format("2006/01/02/15") }/${! uuid_v4() }.jsonl'
    batching:
      count: 1000
      period: 5m
      processors:
        - archive:
            format: json_array
        - compress:
            algorithm: gzip
1 Organize files by table and time-based partitions (year/month/day/hour).
2 Archive as JSON array for easier downstream parsing.
3 Compress with gzip to reduce storage costs.

This example:

  • Organizes files by table name and hourly partitions

  • Batches events and archives them as compressed JSON

  • Uses UUID file names to prevent collisions

Route by event type

Route different event types to different destinations:

input:
  oracledb_cdc:
    connection_string: ${ORACLE_CONNECTION_STRING}
    include:
      - "MY_SCHEMA\\.ORDERS"

pipeline:
  processors:
    # Add routing metadata
    - mapping: |
        root = this
        root.event_type = meta("operation")
        root.table = meta("table_name")

output:
  switch:
    cases:
      # Route INSERT events
      - check: this.event_type == "insert"
        output:
          redpanda:
            seed_brokers: ["${REDPANDA_BROKERS}"]
            topic: orders-inserts

      # Route UPDATE events
      - check: this.event_type == "update"
        output:
          redpanda:
            seed_brokers: ["${REDPANDA_BROKERS}"]
            topic: orders-updates

      # Route DELETE events
      - check: this.event_type == "delete"
        output:
          redpanda:
            seed_brokers: ["${REDPANDA_BROKERS}"]
            topic: orders-deletes

      # Fallback for snapshot reads
      - output:
          drop: {}

This pattern:

  • Separates processing pipelines for inserts, updates, and deletes

  • Routes each operation type to a dedicated topic

  • Enables specialized downstream consumers per operation type

Detect changed columns

For update operations, identify which columns changed:

input:
  oracledb_cdc:
    connection_string: ${ORACLE_CONNECTION_STRING}
    include:
      - "MY_SCHEMA\\.ORDERS"

processors:
  - mapping: |
      # Only process update operations
      root = if meta("operation") != "update" { deleted() }

      # Extract the update event details
      # Oracle CDC provides before/after values for updates
      root.id = this.ID
      root.table = meta("table_name")
      root.operation = meta("operation")
      root.scn = meta("scn")

      # Include the full after-state
      root.after = this

output:
  stdout:
    codec: lines
Oracle CDC captures the full row state after the update. For fine-grained change detection, you may need to maintain a previous state externally or use Oracle’s built-in change tracking features.

Checkpointing

The Oracle CDC input automatically manages checkpoints using a table in the Oracle database:

input:
  oracledb_cdc:
    connection_string: ${ORACLE_CONNECTION_STRING}
    include:
      - "MY_SCHEMA\\.ORDERS"
    checkpoint_cache_table_name: RPCN.CDC_CHECKPOINT_CACHE (1)
    checkpoint_cache_key: orders_cdc_job (2)
    checkpoint_limit: 500 (3)
1 Table name for storing checkpoint state (created automatically if it doesn’t exist).
2 Unique key to identify this CDC job (useful when running multiple CDC pipelines).
3 Checkpoint after every 500 messages (lower = better recovery, higher = fewer writes).

The checkpoint table stores the System Change Number (SCN) of the last successfully processed event. On restart, CDC resumes from the last checkpoint.

Using an external cache

For better separation or to use a different database for checkpointing:

cache_resources:
  - label: postgres_cache
    postgres:
      dsn: postgres://user:pass@postgres-host:5432/checkpoints

input:
  oracledb_cdc:
    connection_string: ${ORACLE_CONNECTION_STRING}
    include:
      - "MY_SCHEMA\\.ORDERS"
    checkpoint_cache: postgres_cache (1)
    checkpoint_cache_key: orders_cdc_job
1 Use an external cache resource instead of the default Oracle table.

Performance tuning

Optimize throughput and latency with these LogMiner settings:

input:
  oracledb_cdc:
    connection_string: ${ORACLE_CONNECTION_STRING}
    include:
      - "MY_SCHEMA\\..*"
    logminer:
      scn_window_size: 50000 (1)
      mining_interval: 100ms (2)
      backoff_interval: 1s (3)
      max_transaction_events: 10000 (4)
      lob_enabled: true (5)
    checkpoint_limit: 2000 (6)
    batching:
      count: 100
      period: 1s
1 Number of SCNs to process in each LogMiner session (higher = more memory, fewer sessions).
2 How often to query LogMiner for new changes (lower = less latency, more CPU).
3 Backoff when no changes are detected (reduces CPU usage during idle periods).
4 Maximum events to buffer per transaction (0 = unlimited).
5 Enable capture of LOB (CLOB/BLOB) columns.
6 Number of messages to process before checkpointing.

Throughput considerations

  • Larger scn_window_size reduces LogMiner overhead but increases memory usage

  • Shorter mining_interval reduces latency but increases database load

  • Higher checkpoint_limit improves throughput but increases recovery time after failures

  • LOB columns significantly impact performance; disable lob_enabled if not needed

Secure connections with Oracle Wallet

Use Oracle Wallet for SSL/TLS connections:

input:
  oracledb_cdc:
    connection_string: oracle://cdc_user:password@host:1522/service
    wallet_path: ${ORACLE_WALLET_PATH} (1)
    wallet_password: ${ORACLE_WALLET_PASSWORD} (2)
    include:
      - "MY_SCHEMA\\.ORDERS"
1 Path to the wallet directory containing cwallet.sso or ewallet.p12.
2 Wallet password (only required for ewallet.p12 files).

Alternatively, specify wallet configuration in the connection string:

input:
  oracledb_cdc:
    connection_string: oracle://user:password@host:1522/service?WALLET=/opt/oracle/wallet&SSL=true
    include:
      - "MY_SCHEMA\\.ORDERS"

Troubleshoot

No events received

If you’re not receiving events:

  1. Verify supplemental logging is enabled:

    -- Check database-level logging
    SELECT SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE;
    
    -- Check table-level logging
    SELECT LOG_GROUP_NAME, TABLE_NAME, DECODE(ALWAYS, 'ALWAYS', 'Unconditional', NULL, 'Conditional') ALWAYS
    FROM DBA_LOG_GROUPS
    WHERE OWNER = 'MY_SCHEMA' AND TABLE_NAME = 'MY_TABLE';
  2. Check that the database is in ARCHIVELOG mode:

    SELECT LOG_MODE FROM V$DATABASE;
  3. Verify the user has correct permissions:

    SELECT * FROM DBA_SYS_PRIVS WHERE GRANTEE = 'CDC_USER';
  4. Make sure changes are being made to the monitored tables

Slow event processing

If CDC is lagging behind:

  • Increase scn_window_size to process more changes per LogMiner session

  • Reduce checkpoint_limit to checkpoint more frequently (helps during restarts)

  • Disable lob_enabled if LOB columns aren’t needed

  • Check Oracle alert logs for performance issues

  • Monitor the size of redo logs and archive logs

Connection issues with Oracle Wallet

If SSL connections fail:

  1. Verify wallet files exist and have correct permissions:

    ls -la /opt/oracle/wallet
  2. Check Oracle TNS configuration

  3. Verify the wallet password for ewallet.p12 files

  4. Test the connection using Oracle tools (sqlplus, SQL Developer)

Checkpoint table errors

If you see errors related to the checkpoint table:

  1. Ensure the rpcn schema exists:

    CREATE USER rpcn IDENTIFIED BY <password>;
    GRANT CREATE TABLE TO rpcn;
    GRANT CREATE PROCEDURE TO rpcn;
    GRANT UNLIMITED TABLESPACE TO rpcn;
  2. Verify the CDC user has access to the checkpoint table:

    GRANT SELECT, INSERT, UPDATE ON rpcn.cdc_checkpoint_cache TO cdc_user;