Docs Connect Cookbooks DynamoDB CDC Patterns DynamoDB CDC Patterns Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code The DynamoDB CDC input enables capturing item-level changes from DynamoDB tables with streams enabled. This cookbook provides reusable patterns for filtering, transforming, and routing DynamoDB CDC events to Redpanda, S3, and other destinations. Use this cookbook to: Find reusable patterns for capturing DynamoDB CDC events Look up integration patterns for routing CDC data to Redpanda and S3 Identify patterns for filtering and transforming change events Prerequisites Before using these patterns, ensure you have the following configured: Redpanda CLI Install the Redpanda CLI (rpk) to run Redpanda Connect. See Get Started with Redpanda Connect using rpk for installation instructions. DynamoDB Streams The source DynamoDB table must have DynamoDB Streams enabled with an appropriate view type: KEYS_ONLY: Only the key attributes of the modified item NEW_IMAGE: The entire item as it appears after the modification OLD_IMAGE: The entire item as it appeared before the modification NEW_AND_OLD_IMAGES: Both the new and old item images (recommended for detecting changes) To enable streams on an existing table using the AWS CLI: aws dynamodb update-table \ --table-name my-table \ --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES Environment variables The examples in this cookbook use environment variables for AWS configuration. This allows you to keep credentials secure and separate from your pipeline configuration files. export DYNAMODB_TABLE=my-table (1) export AWS_REGION=us-east-1 (2) export REDPANDA_BROKERS=localhost:9092 (3) export S3_BUCKET=my-cdc-bucket (4) 1 The name of the DynamoDB table with streams enabled. 2 The AWS region where your DynamoDB table is located. 3 The Redpanda broker addresses (for Redpanda output examples). 4 The S3 bucket name (for S3 output examples). Redpanda Connect loads AWS credentials from the standard credential chain (environment variables, ~/.aws/credentials, or IAM roles). Capture CDC events The simplest pattern captures all change events from a DynamoDB table and outputs them with metadata: input: aws_dynamodb_cdc: table: ${DYNAMODB_TABLE} region: ${AWS_REGION} checkpoint_table: redpanda_dynamodb_checkpoints start_from: trim_horizon pipeline: processors: # Extract the change event details - mapping: | root.event_type = this.eventName root.table = this.tableName root.event_id = this.eventID root.keys = this.dynamodb.keys root.new_image = this.dynamodb.newImage root.old_image = this.dynamodb.oldImage root.sequence_number = this.dynamodb.sequenceNumber root.timestamp = now() output: stdout: codec: lines For details on the CDC event message structure and available fields for Bloblang mappings, see the message structure section in the connector reference. Filter CDC events You can filter events to process only specific change types: input: aws_dynamodb_cdc: table: ${DYNAMODB_TABLE} region: ${AWS_REGION} start_from: latest pipeline: processors: # Filter to only process INSERT and MODIFY events (ignore REMOVE) - mapping: | root = if this.eventName == "REMOVE" { deleted() } else { this } # Transform to a simplified format - mapping: | root.event_type = this.eventName root.keys = this.dynamodb.keys root.new_data = this.dynamodb.newImage root.old_data = this.dynamodb.oldImage output: stdout: codec: lines This example: Filters out REMOVE events using deleted() Transforms the event to a simplified format Route to Redpanda Stream DynamoDB changes to Redpanda for real-time processing: input: aws_dynamodb_cdc: table: ${DYNAMODB_TABLE} region: ${AWS_REGION} checkpoint_table: redpanda_dynamodb_checkpoints batch_size: 100 poll_interval: 500ms pipeline: processors: # Transform to a Kafka-friendly format with a composite key - mapping: | let keys = this.dynamodb.keys meta kafka_key = [$keys.pk, $keys.sk].filter(v -> v != null).join("#") root.event_type = this.eventName root.table = this.tableName root.timestamp = now() root.keys = this.dynamodb.keys root.new_image = this.dynamodb.newImage root.old_image = this.dynamodb.oldImage output: redpanda: seed_brokers: - ${REDPANDA_BROKERS} topic: dynamodb-cdc-events key: ${! @kafka_key } partitioner: murmur2_hash compression: snappy batching: count: 100 period: 1s This example: Creates a composite message key from the DynamoDB primary key Transforms the DynamoDB format to plain JSON Batches messages for efficient delivery Route to S3 Archive CDC events to S3 for long-term storage and analytics: input: aws_dynamodb_cdc: table: ${DYNAMODB_TABLE} region: ${AWS_REGION} checkpoint_table: redpanda_dynamodb_checkpoints start_from: trim_horizon pipeline: processors: # Add partitioning metadata for S3 organization - mapping: | let event_time = now() meta s3_path = "year=%s/month=%s/day=%s/hour=%s".format( $event_time.ts_format("2006"), $event_time.ts_format("01"), $event_time.ts_format("02"), $event_time.ts_format("15") ) root.event_type = this.eventName root.table = this.tableName root.sequence_number = this.dynamodb.sequenceNumber root.event_time = $event_time root.keys = this.dynamodb.keys root.new_image = this.dynamodb.newImage root.old_image = this.dynamodb.oldImage output: aws_s3: bucket: ${S3_BUCKET} path: dynamodb-cdc/${DYNAMODB_TABLE}/${! @s3_path }/${! uuid_v4() }.json region: ${AWS_REGION} batching: count: 1000 period: 1m processors: - archive: format: lines This example: Organizes files by time-based partitions (year/month/day/hour) Batches events and archives them as newline-delimited JSON Uses UUID file names to prevent collisions Route by event type Route different event types to different destinations: input: aws_dynamodb_cdc: table: ${DYNAMODB_TABLE} region: ${AWS_REGION} pipeline: processors: # Transform to a common format - mapping: | root.event_type = this.eventName root.table = this.tableName root.timestamp = now() root.keys = this.dynamodb.keys root.data = if this.dynamodb.exists("newImage") { this.dynamodb.newImage } else { this.dynamodb.oldImage } output: switch: cases: # Route INSERT events to a topic for new records - check: this.event_type == "INSERT" output: redpanda: seed_brokers: - ${REDPANDA_BROKERS} topic: dynamodb-inserts # Route MODIFY events to a topic for updates - check: this.event_type == "MODIFY" output: redpanda: seed_brokers: - ${REDPANDA_BROKERS} topic: dynamodb-updates # Route REMOVE events to a topic for deletes - check: this.event_type == "REMOVE" output: redpanda: seed_brokers: - ${REDPANDA_BROKERS} topic: dynamodb-deletes # Fallback for any unexpected event types - output: drop: {} This pattern: Separates processing pipelines for inserts, updates, and deletes Applies different retention policies per event type Enables specialized downstream consumers Detect changed fields Compare old and new images to identify which fields changed: input: aws_dynamodb_cdc: table: ${DYNAMODB_TABLE} region: ${AWS_REGION} pipeline: processors: # Only process MODIFY events - mapping: | root = if this.eventName != "MODIFY" { deleted() } else { this } # Compare old and new images to find changed fields - mapping: | let old_data = this.dynamodb.oldImage let new_data = this.dynamodb.newImage root.table = this.tableName root.keys = this.dynamodb.keys root.timestamp = now() # Find fields that changed by comparing key-value pairs root.changes = $new_data.key_values().filter(kv -> !$old_data.exists(kv.key) || $old_data.get(kv.key) != kv.value).map_each(kv -> {"field": kv.key, "old_value": if $old_data.exists(kv.key) { $old_data.get(kv.key) } else { null }, "new_value": kv.value}) # Find fields that were removed root.removed_fields = $old_data.keys().filter(k -> !$new_data.exists(k)) output: stdout: codec: lines This pattern: Filters to only MODIFY events Compares old and new images to find differences Outputs a list of changed fields with their old and new values This pattern requires the NEW_AND_OLD_IMAGES stream view type. The .key_values() method converts an object to an array of key-value pairs that can be filtered and mapped. Checkpointing The DynamoDB CDC input automatically manages checkpoints in a separate DynamoDB table: input: aws_dynamodb_cdc: table: my-table checkpoint_table: my-app-checkpoints (1) checkpoint_limit: 500 (2) start_from: trim_horizon (3) 1 Custom checkpoint table name (default: redpanda_dynamodb_checkpoints). 2 Checkpoint after every 500 messages (lower = better recovery, higher = fewer writes). 3 Start from the oldest available record when no checkpoint exists. If a checkpoint table doesn’t exist, it’s created automatically with the required schema. Performance tuning Optimize throughput and latency with these settings: input: aws_dynamodb_cdc: table: my-table batch_size: 1000 (1) poll_interval: 100ms (2) max_tracked_shards: 10000 (3) throttle_backoff: 50ms (4) 1 Maximum records per shard per request (1-1000). 2 Time between polls when no records are available. 3 Maximum shards to track (for very large tables). 4 Backpressure delay when too many messages are in-flight. Throughput considerations DynamoDB Streams allows 5 GetRecords calls per second per shard Higher batch_size improves throughput but increases memory usage Shorter poll_interval reduces latency but increases API calls Troubleshoot No events received If you’re not receiving events: Verify streams are enabled on the table: aws dynamodb describe-table --table-name my-table \ --query 'Table.StreamSpecification' Check that changes are being made to the table Verify start_from is set to trim_horizon to capture existing stream data Duplicate events Each stream record appears exactly once in DynamoDB Streams. However, if your pipeline fails before checkpointing, records may be re-read on restart, resulting in at-least-once processing semantics. To handle potential duplicates: Use idempotent processing in downstream systems Deduplicate using the dynamodb_sequence_number metadata Lower checkpoint_limit to reduce the window of possible duplicates Stream retention DynamoDB Streams retains data for 24 hours. If your pipeline is offline longer than that: Consider using Kinesis Data Streams for DynamoDB with the aws_kinesis input instead (up to 1 year retention) Implement a full-table scan fallback for disaster recovery Suggested reading DynamoDB CDC Input Reference AWS Configuration Guide Kinesis Input (for Kinesis Data Streams for DynamoDB) DynamoDB Streams Documentation Suggested labs Stream Jira Issues to Redpanda for Real-Time MetricsSearch all labs Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! Cookbooks Enrichment Workflows