Oracle Database CDC Patterns
The Oracle Database CDC input enables capturing row-level changes from Oracle tables using LogMiner. This cookbook provides reusable patterns for filtering, transforming, and routing Oracle CDC events to Redpanda, S3, and other destinations.
Use this cookbook to:
-
Find reusable patterns for capturing Oracle Database CDC events
-
Look up integration patterns for routing CDC data to Redpanda and S3
-
Identify patterns for filtering and transforming change events
Prerequisites
Before using these patterns, ensure you have the following configured:
Redpanda CLI
Install the Redpanda CLI (rpk) to run Redpanda Connect. See Get Started with Redpanda Connect using rpk for installation instructions.
Oracle Database setup
The Oracle database must be configured for CDC:
Enable supplemental logging
Supplemental logging must be enabled at the database and table level:
-- Enable database-level supplemental logging
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- Enable table-level supplemental logging (for each table)
ALTER TABLE my_schema.my_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Enable archivelog mode
The database must be in ARCHIVELOG mode for LogMiner to access redo logs:
-- Check if archivelog is enabled
ARCHIVE LOG LIST;
-- Enable archivelog mode (if needed)
SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;
Grant permissions
The database user needs specific permissions to read redo logs:
-- Create user for CDC
CREATE USER cdc_user IDENTIFIED BY <password>;
-- Grant necessary permissions
GRANT CREATE SESSION TO cdc_user;
GRANT SELECT ANY TABLE TO cdc_user;
GRANT SELECT_CATALOG_ROLE TO cdc_user;
GRANT EXECUTE_CATALOG_ROLE TO cdc_user;
GRANT SELECT ANY TRANSACTION TO cdc_user;
GRANT LOGMINING TO cdc_user;
-- Grant access to specific tables
GRANT SELECT ON my_schema.my_table TO cdc_user;
-- Create schema for checkpoint table
CREATE USER rpcn IDENTIFIED BY <password>;
GRANT CREATE TABLE TO rpcn;
GRANT CREATE PROCEDURE TO rpcn;
GRANT UNLIMITED TABLESPACE TO rpcn;
Oracle Wallet (for SSL connections)
For secure connections using Oracle Wallet:
-
Create a wallet directory:
mkdir -p /opt/oracle/wallet -
Use Oracle Wallet Manager or
orapkito create and configure the wallet:orapki wallet create -wallet /opt/oracle/wallet -auto_login orapki wallet add -wallet /opt/oracle/wallet -trusted_cert -cert ca.crt orapki wallet add -wallet /opt/oracle/wallet -user_cert -cert client.crt -pwd <password> -
The wallet directory should contain either:
-
cwallet.sso(auto-login wallet, no password required) -
ewallet.p12(requires password)
-
Environment variables
The examples in this cookbook use environment variables for Oracle configuration. This allows you to keep credentials secure and separate from your pipeline configuration files.
export ORACLE_CONNECTION_STRING=oracle://cdc_user:password@host:1521/service_name (1)
export ORACLE_WALLET_PATH=/opt/oracle/wallet (2)
export ORACLE_WALLET_PASSWORD=wallet_password (3)
export REDPANDA_BROKERS=localhost:9092 (4)
export S3_BUCKET=my-cdc-bucket (5)
| 1 | The Oracle connection string with credentials. |
| 2 | Path to Oracle Wallet directory (optional, for SSL). |
| 3 | Wallet password if using ewallet.p12 (optional). |
| 4 | The Redpanda broker addresses (for Redpanda output examples). |
| 5 | The S3 bucket name (for S3 output examples). |
Capture CDC events
The simplest pattern captures all change events from Oracle tables and outputs them with metadata:
input:
oracledb_cdc:
connection_string: ${ORACLE_CONNECTION_STRING}
include:
- "MY_SCHEMA\\.ORDERS"
- "MY_SCHEMA\\.CUSTOMERS"
exclude:
- "MY_SCHEMA\\..*_TEMP"
pipeline:
processors:
# Extract the change event details
- mapping: |
root = this
root.metadata.operation = meta("operation")
root.metadata.table = meta("table_name")
root.metadata.schema = meta("database_schema")
root.metadata.scn = meta("scn")
root.metadata.timestamp = now()
output:
stdout:
codec: lines
For details on the CDC event message structure and available metadata fields, see the metadata section in the connector reference.
Filter CDC events
You can filter events to process only specific change types:
input:
oracledb_cdc:
connection_string: ${ORACLE_CONNECTION_STRING}
include:
- "MY_SCHEMA\\.ORDERS"
pipeline:
processors:
# Filter out delete events
- mapping: |
root = if meta("operation") == "delete" { deleted() } else { this }
# Add operation metadata to the message
- mapping: |
root = this.merge({
"operation": meta("operation"),
"table": meta("table_name")
})
output:
stdout:
codec: lines
This example:
-
Filters out
deleteevents usingdeleted() -
Adds operation type and timestamp to the output
-
Transforms the event to include metadata
Stream initial snapshot
Capture both existing data and ongoing changes:
input:
oracledb_cdc:
connection_string: ${ORACLE_CONNECTION_STRING}
include:
- "MY_SCHEMA\\.ORDERS"
stream_snapshot: true (1)
snapshot_max_batch_size: 5000 (2)
max_parallel_snapshot_tables: 2 (3)
pipeline:
processors:
- mapping: |
root = this
# Tag snapshot vs CDC events
root.event_type = if meta("operation") == "read" {
"snapshot"
} else {
"cdc"
}
output:
stdout:
codec: lines
| 1 | Enable initial snapshot of existing table data. |
| 2 | Number of rows per batch during snapshot (higher = faster, more memory). |
| 3 | Number of tables to snapshot in parallel. |
Snapshot events have operation metadata set to "read", while CDC events use "insert", "update", or "delete".
|
Route to Redpanda
Stream Oracle changes to Redpanda for real-time processing:
input:
oracledb_cdc:
connection_string: ${ORACLE_CONNECTION_STRING}
include:
- "MY_SCHEMA\\.ORDERS"
- "MY_SCHEMA\\.CUSTOMERS"
pipeline:
processors:
# Create a composite key from the primary key fields
- mapping: |
meta kafka_key = this.ID.string()
# Add CDC metadata as message headers
meta operation = meta("operation")
meta table = meta("table_name")
meta scn = meta("scn")
output:
redpanda:
seed_brokers: ["${REDPANDA_BROKERS}"]
topic: 'oracle-cdc-${! meta("table_name").lowercase() }' (1)
max_in_flight: 10
compression: snappy
batching:
count: 100
period: 100ms
| 1 | Route to different topics based on table name. |
This example:
-
Creates a message key from the primary key for consistent partitioning
-
Routes each table to a separate topic
-
Batches messages for efficient delivery
-
Adds CDC metadata as Kafka headers
Route to S3
Archive CDC events to S3 for long-term storage and analytics:
input:
oracledb_cdc:
connection_string: ${ORACLE_CONNECTION_STRING}
include:
- "MY_SCHEMA\\.ORDERS"
pipeline:
processors:
- mapping: |
root = this
root.operation = meta("operation")
root.table = meta("table_name")
root.scn = meta("scn")
root.captured_at = now().ts_format("2006-01-02T15:04:05Z07:00")
output:
aws_s3:
bucket: ${S3_BUCKET}
path: 'oracle-cdc/${! meta("table_name") }/${! timestamp_unix().ts_format("2006/01/02/15") }/${! uuid_v4() }.jsonl' (1)
batching:
count: 1000
period: 5m
processors:
- archive:
format: json_array (2)
- compress:
algorithm: gzip (3)
| 1 | Organize files by table and time-based partitions (year/month/day/hour). |
| 2 | Archive as JSON array for easier downstream parsing. |
| 3 | Compress with gzip to reduce storage costs. |
This example:
-
Organizes files by table name and hourly partitions
-
Batches events and archives them as compressed JSON
-
Uses UUID file names to prevent collisions
Route by event type
Route different event types to different destinations:
input:
oracledb_cdc:
connection_string: ${ORACLE_CONNECTION_STRING}
include:
- "MY_SCHEMA\\.ORDERS"
pipeline:
processors:
# Add routing metadata
- mapping: |
root = this
root.event_type = meta("operation")
root.table = meta("table_name")
output:
switch:
cases:
# Route INSERT events
- check: this.event_type == "insert"
output:
redpanda:
seed_brokers: ["${REDPANDA_BROKERS}"]
topic: orders-inserts
# Route UPDATE events
- check: this.event_type == "update"
output:
redpanda:
seed_brokers: ["${REDPANDA_BROKERS}"]
topic: orders-updates
# Route DELETE events
- check: this.event_type == "delete"
output:
redpanda:
seed_brokers: ["${REDPANDA_BROKERS}"]
topic: orders-deletes
# Fallback for snapshot reads
- output:
drop: {}
This pattern:
-
Separates processing pipelines for inserts, updates, and deletes
-
Routes each operation type to a dedicated topic
-
Enables specialized downstream consumers per operation type
Detect changed columns
For update operations, identify which columns changed:
input:
oracledb_cdc:
connection_string: ${ORACLE_CONNECTION_STRING}
include:
- "MY_SCHEMA\\.ORDERS"
processors:
- mapping: |
# Only process update operations
root = if meta("operation") != "update" { deleted() }
# Extract the update event details
# Oracle CDC provides before/after values for updates
root.id = this.ID
root.table = meta("table_name")
root.operation = meta("operation")
root.scn = meta("scn")
# Include the full after-state
root.after = this
output:
stdout:
codec: lines
| Oracle CDC captures the full row state after the update. For fine-grained change detection, you may need to maintain a previous state externally or use Oracle’s built-in change tracking features. |
Checkpointing
The Oracle CDC input automatically manages checkpoints using a table in the Oracle database:
input:
oracledb_cdc:
connection_string: ${ORACLE_CONNECTION_STRING}
include:
- "MY_SCHEMA\\.ORDERS"
checkpoint_cache_table_name: RPCN.CDC_CHECKPOINT_CACHE (1)
checkpoint_cache_key: orders_cdc_job (2)
checkpoint_limit: 500 (3)
| 1 | Table name for storing checkpoint state (created automatically if it doesn’t exist). |
| 2 | Unique key to identify this CDC job (useful when running multiple CDC pipelines). |
| 3 | Checkpoint after every 500 messages (lower = better recovery, higher = fewer writes). |
The checkpoint table stores the System Change Number (SCN) of the last successfully processed event. On restart, CDC resumes from the last checkpoint.
Using an external cache
For better separation or to use a different database for checkpointing:
cache_resources:
- label: postgres_cache
postgres:
dsn: postgres://user:pass@postgres-host:5432/checkpoints
input:
oracledb_cdc:
connection_string: ${ORACLE_CONNECTION_STRING}
include:
- "MY_SCHEMA\\.ORDERS"
checkpoint_cache: postgres_cache (1)
checkpoint_cache_key: orders_cdc_job
| 1 | Use an external cache resource instead of the default Oracle table. |
Performance tuning
Optimize throughput and latency with these LogMiner settings:
input:
oracledb_cdc:
connection_string: ${ORACLE_CONNECTION_STRING}
include:
- "MY_SCHEMA\\..*"
logminer:
scn_window_size: 50000 (1)
mining_interval: 100ms (2)
backoff_interval: 1s (3)
max_transaction_events: 10000 (4)
lob_enabled: true (5)
checkpoint_limit: 2000 (6)
batching:
count: 100
period: 1s
| 1 | Number of SCNs to process in each LogMiner session (higher = more memory, fewer sessions). |
| 2 | How often to query LogMiner for new changes (lower = less latency, more CPU). |
| 3 | Backoff when no changes are detected (reduces CPU usage during idle periods). |
| 4 | Maximum events to buffer per transaction (0 = unlimited). |
| 5 | Enable capture of LOB (CLOB/BLOB) columns. |
| 6 | Number of messages to process before checkpointing. |
Throughput considerations
-
Larger
scn_window_sizereduces LogMiner overhead but increases memory usage -
Shorter
mining_intervalreduces latency but increases database load -
Higher
checkpoint_limitimproves throughput but increases recovery time after failures -
LOB columns significantly impact performance; disable
lob_enabledif not needed
Secure connections with Oracle Wallet
Use Oracle Wallet for SSL/TLS connections:
input:
oracledb_cdc:
connection_string: oracle://cdc_user:password@host:1522/service
wallet_path: ${ORACLE_WALLET_PATH} (1)
wallet_password: ${ORACLE_WALLET_PASSWORD} (2)
include:
- "MY_SCHEMA\\.ORDERS"
| 1 | Path to the wallet directory containing cwallet.sso or ewallet.p12. |
| 2 | Wallet password (only required for ewallet.p12 files). |
Alternatively, specify wallet configuration in the connection string:
input:
oracledb_cdc:
connection_string: oracle://user:password@host:1522/service?WALLET=/opt/oracle/wallet&SSL=true
include:
- "MY_SCHEMA\\.ORDERS"
Troubleshoot
No events received
If you’re not receiving events:
-
Verify supplemental logging is enabled:
-- Check database-level logging SELECT SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE; -- Check table-level logging SELECT LOG_GROUP_NAME, TABLE_NAME, DECODE(ALWAYS, 'ALWAYS', 'Unconditional', NULL, 'Conditional') ALWAYS FROM DBA_LOG_GROUPS WHERE OWNER = 'MY_SCHEMA' AND TABLE_NAME = 'MY_TABLE'; -
Check that the database is in ARCHIVELOG mode:
SELECT LOG_MODE FROM V$DATABASE; -
Verify the user has correct permissions:
SELECT * FROM DBA_SYS_PRIVS WHERE GRANTEE = 'CDC_USER'; -
Make sure changes are being made to the monitored tables
Slow event processing
If CDC is lagging behind:
-
Increase
scn_window_sizeto process more changes per LogMiner session -
Reduce
checkpoint_limitto checkpoint more frequently (helps during restarts) -
Disable
lob_enabledif LOB columns aren’t needed -
Check Oracle alert logs for performance issues
-
Monitor the size of redo logs and archive logs
Connection issues with Oracle Wallet
If SSL connections fail:
-
Verify wallet files exist and have correct permissions:
ls -la /opt/oracle/wallet -
Check Oracle TNS configuration
-
Verify the wallet password for
ewallet.p12files -
Test the connection using Oracle tools (sqlplus, SQL Developer)
Checkpoint table errors
If you see errors related to the checkpoint table:
-
Ensure the
rpcnschema exists:CREATE USER rpcn IDENTIFIED BY <password>; GRANT CREATE TABLE TO rpcn; GRANT CREATE PROCEDURE TO rpcn; GRANT UNLIMITED TABLESPACE TO rpcn; -
Verify the CDC user has access to the checkpoint table:
GRANT SELECT, INSERT, UPDATE ON rpcn.cdc_checkpoint_cache TO cdc_user;