Docs Connect Cookbooks Oracle Database CDC Patterns Oracle Database CDC Patterns Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code The Oracle Database CDC input enables capturing row-level changes from Oracle tables using LogMiner. This cookbook provides reusable patterns for filtering, transforming, and routing Oracle CDC events to Redpanda, S3, and other destinations. Use this cookbook to: Find reusable patterns for capturing Oracle Database CDC events Look up integration patterns for routing CDC data to Redpanda and S3 Identify patterns for filtering and transforming change events Prerequisites Before using these patterns, ensure you have the following configured: Redpanda CLI Install the Redpanda CLI (rpk) to run Redpanda Connect. See Get Started with Redpanda Connect using rpk for installation instructions. Oracle Database setup The Oracle database must be configured for CDC: Enable supplemental logging Supplemental logging must be enabled at the database and table level: -- Enable database-level supplemental logging ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; -- Enable table-level supplemental logging (for each table) ALTER TABLE my_schema.my_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; Enable archivelog mode The database must be in ARCHIVELOG mode for LogMiner to access redo logs: -- Check if archivelog is enabled ARCHIVE LOG LIST; -- Enable archivelog mode (if needed) SHUTDOWN IMMEDIATE; STARTUP MOUNT; ALTER DATABASE ARCHIVELOG; ALTER DATABASE OPEN; Grant permissions The database user needs specific permissions to read redo logs: -- Create user for CDC CREATE USER cdc_user IDENTIFIED BY <password>; -- Grant necessary permissions GRANT CREATE SESSION TO cdc_user; GRANT SELECT ANY TABLE TO cdc_user; GRANT SELECT_CATALOG_ROLE TO cdc_user; GRANT EXECUTE_CATALOG_ROLE TO cdc_user; GRANT SELECT ANY TRANSACTION TO cdc_user; GRANT LOGMINING TO cdc_user; -- Grant access to specific tables GRANT SELECT ON my_schema.my_table TO cdc_user; -- Create schema for checkpoint table CREATE USER rpcn IDENTIFIED BY <password>; GRANT CREATE TABLE TO rpcn; GRANT CREATE PROCEDURE TO rpcn; GRANT UNLIMITED TABLESPACE TO rpcn; Oracle Wallet (for SSL connections) For secure connections using Oracle Wallet: Create a wallet directory: mkdir -p /opt/oracle/wallet Use Oracle Wallet Manager or orapki to create and configure the wallet: orapki wallet create -wallet /opt/oracle/wallet -auto_login orapki wallet add -wallet /opt/oracle/wallet -trusted_cert -cert ca.crt orapki wallet add -wallet /opt/oracle/wallet -user_cert -cert client.crt -pwd <password> The wallet directory should contain either: cwallet.sso (auto-login wallet, no password required) ewallet.p12 (requires password) Environment variables The examples in this cookbook use environment variables for Oracle configuration. This allows you to keep credentials secure and separate from your pipeline configuration files. export ORACLE_CONNECTION_STRING=oracle://cdc_user:password@host:1521/service_name (1) export ORACLE_WALLET_PATH=/opt/oracle/wallet (2) export ORACLE_WALLET_PASSWORD=wallet_password (3) export REDPANDA_BROKERS=localhost:9092 (4) export S3_BUCKET=my-cdc-bucket (5) 1 The Oracle connection string with credentials. 2 Path to Oracle Wallet directory (optional, for SSL). 3 Wallet password if using ewallet.p12 (optional). 4 The Redpanda broker addresses (for Redpanda output examples). 5 The S3 bucket name (for S3 output examples). Capture CDC events The simplest pattern captures all change events from Oracle tables and outputs them with metadata: input: oracledb_cdc: connection_string: ${ORACLE_CONNECTION_STRING} include: - "MY_SCHEMA\\.ORDERS" - "MY_SCHEMA\\.CUSTOMERS" exclude: - "MY_SCHEMA\\..*_TEMP" pipeline: processors: # Extract the change event details - mapping: | root = this root.metadata.operation = meta("operation") root.metadata.table = meta("table_name") root.metadata.schema = meta("database_schema") root.metadata.scn = meta("scn") root.metadata.timestamp = now() output: stdout: codec: lines For details on the CDC event message structure and available metadata fields, see the metadata section in the connector reference. Filter CDC events You can filter events to process only specific change types: input: oracledb_cdc: connection_string: ${ORACLE_CONNECTION_STRING} include: - "MY_SCHEMA\\.ORDERS" pipeline: processors: # Filter out delete events - mapping: | root = if meta("operation") == "delete" { deleted() } else { this } # Add operation metadata to the message - mapping: | root = this.merge({ "operation": meta("operation"), "table": meta("table_name") }) output: stdout: codec: lines This example: Filters out delete events using deleted() Adds operation type and timestamp to the output Transforms the event to include metadata Stream initial snapshot Capture both existing data and ongoing changes: input: oracledb_cdc: connection_string: ${ORACLE_CONNECTION_STRING} include: - "MY_SCHEMA\\.ORDERS" stream_snapshot: true snapshot_max_batch_size: 5000 max_parallel_snapshot_tables: 2 pipeline: processors: - mapping: | root = this # Tag snapshot vs CDC events root.event_type = if meta("operation") == "read" { "snapshot" } else { "cdc" } output: stdout: codec: lines 1 Enable initial snapshot of existing table data. 2 Number of rows per batch during snapshot (higher = faster, more memory). 3 Number of tables to snapshot in parallel. Snapshot events have operation metadata set to "read", while CDC events use "insert", "update", or "delete". Route to Redpanda Stream Oracle changes to Redpanda for real-time processing: input: oracledb_cdc: connection_string: ${ORACLE_CONNECTION_STRING} include: - "MY_SCHEMA\\.ORDERS" - "MY_SCHEMA\\.CUSTOMERS" pipeline: processors: # Create a composite key from the primary key fields - mapping: | meta kafka_key = this.ID.string() # Add CDC metadata as message headers meta operation = meta("operation") meta table = meta("table_name") meta scn = meta("scn") output: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topic: 'oracle-cdc-${! meta("table_name").lowercase() }' max_in_flight: 10 compression: snappy batching: count: 100 period: 100ms 1 Route to different topics based on table name. This example: Creates a message key from the primary key for consistent partitioning Routes each table to a separate topic Batches messages for efficient delivery Adds CDC metadata as Kafka headers Route to S3 Archive CDC events to S3 for long-term storage and analytics: input: oracledb_cdc: connection_string: ${ORACLE_CONNECTION_STRING} include: - "MY_SCHEMA\\.ORDERS" pipeline: processors: - mapping: | root = this root.operation = meta("operation") root.table = meta("table_name") root.scn = meta("scn") root.captured_at = now().ts_format("2006-01-02T15:04:05Z07:00") output: aws_s3: bucket: ${S3_BUCKET} path: 'oracle-cdc/${! meta("table_name") }/${! timestamp_unix().ts_format("2006/01/02/15") }/${! uuid_v4() }.jsonl' batching: count: 1000 period: 5m processors: - archive: format: json_array - compress: algorithm: gzip 1 Organize files by table and time-based partitions (year/month/day/hour). 2 Archive as JSON array for easier downstream parsing. 3 Compress with gzip to reduce storage costs. This example: Organizes files by table name and hourly partitions Batches events and archives them as compressed JSON Uses UUID file names to prevent collisions Route by event type Route different event types to different destinations: input: oracledb_cdc: connection_string: ${ORACLE_CONNECTION_STRING} include: - "MY_SCHEMA\\.ORDERS" pipeline: processors: # Add routing metadata - mapping: | root = this root.event_type = meta("operation") root.table = meta("table_name") output: switch: cases: # Route INSERT events - check: this.event_type == "insert" output: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topic: orders-inserts # Route UPDATE events - check: this.event_type == "update" output: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topic: orders-updates # Route DELETE events - check: this.event_type == "delete" output: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topic: orders-deletes # Fallback for snapshot reads - output: drop: {} This pattern: Separates processing pipelines for inserts, updates, and deletes Routes each operation type to a dedicated topic Enables specialized downstream consumers per operation type Detect changed columns For update operations, identify which columns changed: input: oracledb_cdc: connection_string: ${ORACLE_CONNECTION_STRING} include: - "MY_SCHEMA\\.ORDERS" processors: - mapping: | # Only process update operations root = if meta("operation") != "update" { deleted() } # Extract the update event details # Oracle CDC provides before/after values for updates root.id = this.ID root.table = meta("table_name") root.operation = meta("operation") root.scn = meta("scn") # Include the full after-state root.after = this output: stdout: codec: lines Oracle CDC captures the full row state after the update. For fine-grained change detection, you may need to maintain a previous state externally or use Oracle’s built-in change tracking features. Checkpointing The Oracle CDC input automatically manages checkpoints using a table in the Oracle database: input: oracledb_cdc: connection_string: ${ORACLE_CONNECTION_STRING} include: - "MY_SCHEMA\\.ORDERS" checkpoint_cache_table_name: RPCN.CDC_CHECKPOINT_CACHE (1) checkpoint_cache_key: orders_cdc_job (2) checkpoint_limit: 500 (3) 1 Table name for storing checkpoint state (created automatically if it doesn’t exist). 2 Unique key to identify this CDC job (useful when running multiple CDC pipelines). 3 Checkpoint after every 500 messages (lower = better recovery, higher = fewer writes). The checkpoint table stores the System Change Number (SCN) of the last successfully processed event. On restart, CDC resumes from the last checkpoint. Using an external cache For better separation or to use a different database for checkpointing: cache_resources: - label: postgres_cache postgres: dsn: postgres://user:pass@postgres-host:5432/checkpoints input: oracledb_cdc: connection_string: ${ORACLE_CONNECTION_STRING} include: - "MY_SCHEMA\\.ORDERS" checkpoint_cache: postgres_cache (1) checkpoint_cache_key: orders_cdc_job 1 Use an external cache resource instead of the default Oracle table. Performance tuning Optimize throughput and latency with these LogMiner settings: input: oracledb_cdc: connection_string: ${ORACLE_CONNECTION_STRING} include: - "MY_SCHEMA\\..*" logminer: scn_window_size: 50000 (1) mining_interval: 100ms (2) backoff_interval: 1s (3) max_transaction_events: 10000 (4) lob_enabled: true (5) checkpoint_limit: 2000 (6) batching: count: 100 period: 1s 1 Number of SCNs to process in each LogMiner session (higher = more memory, fewer sessions). 2 How often to query LogMiner for new changes (lower = less latency, more CPU). 3 Backoff when no changes are detected (reduces CPU usage during idle periods). 4 Maximum events to buffer per transaction (0 = unlimited). 5 Enable capture of LOB (CLOB/BLOB) columns. 6 Number of messages to process before checkpointing. Throughput considerations Larger scn_window_size reduces LogMiner overhead but increases memory usage Shorter mining_interval reduces latency but increases database load Higher checkpoint_limit improves throughput but increases recovery time after failures LOB columns significantly impact performance; disable lob_enabled if not needed Secure connections with Oracle Wallet Use Oracle Wallet for SSL/TLS connections: input: oracledb_cdc: connection_string: oracle://cdc_user:password@host:1522/service wallet_path: ${ORACLE_WALLET_PATH} (1) wallet_password: ${ORACLE_WALLET_PASSWORD} (2) include: - "MY_SCHEMA\\.ORDERS" 1 Path to the wallet directory containing cwallet.sso or ewallet.p12. 2 Wallet password (only required for ewallet.p12 files). Alternatively, specify wallet configuration in the connection string: input: oracledb_cdc: connection_string: oracle://user:password@host:1522/service?WALLET=/opt/oracle/wallet&SSL=true include: - "MY_SCHEMA\\.ORDERS" Troubleshoot No events received If you’re not receiving events: Verify supplemental logging is enabled: -- Check database-level logging SELECT SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE; -- Check table-level logging SELECT LOG_GROUP_NAME, TABLE_NAME, DECODE(ALWAYS, 'ALWAYS', 'Unconditional', NULL, 'Conditional') ALWAYS FROM DBA_LOG_GROUPS WHERE OWNER = 'MY_SCHEMA' AND TABLE_NAME = 'MY_TABLE'; Check that the database is in ARCHIVELOG mode: SELECT LOG_MODE FROM V$DATABASE; Verify the user has correct permissions: SELECT * FROM DBA_SYS_PRIVS WHERE GRANTEE = 'CDC_USER'; Make sure changes are being made to the monitored tables Slow event processing If CDC is lagging behind: Increase scn_window_size to process more changes per LogMiner session Reduce checkpoint_limit to checkpoint more frequently (helps during restarts) Disable lob_enabled if LOB columns aren’t needed Check Oracle alert logs for performance issues Monitor the size of redo logs and archive logs Connection issues with Oracle Wallet If SSL connections fail: Verify wallet files exist and have correct permissions: ls -la /opt/oracle/wallet Check Oracle TNS configuration Verify the wallet password for ewallet.p12 files Test the connection using Oracle tools (sqlplus, SQL Developer) Checkpoint table errors If you see errors related to the checkpoint table: Ensure the rpcn schema exists: CREATE USER rpcn IDENTIFIED BY <password>; GRANT CREATE TABLE TO rpcn; GRANT CREATE PROCEDURE TO rpcn; GRANT UNLIMITED TABLESPACE TO rpcn; Verify the CDC user has access to the checkpoint table: GRANT SELECT, INSERT, UPDATE ON rpcn.cdc_checkpoint_cache TO cdc_user; Suggested reading Oracle CDC Input Reference Redpanda Output AWS S3 Output Oracle LogMiner Documentation Configuring Supplemental Logging Suggested labs Stream Jira Issues to Redpanda for Real-Time MetricsSearch all labs Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! DynamoDB CDC Patterns Enrichment Workflows