Docs Connect Components Catalog Inputs aws_dynamodb_cdc aws_dynamodb_cdc Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code Available in: Self-Managed License: This component requires an enterprise license. You can either upgrade to an Enterprise Edition license, or generate a trial license key that's valid for 30 days. Stream item-level changes from DynamoDB tables using DynamoDB Streams. This input automatically manages shards, checkpoints progress for recovery, and processes multiple shards concurrently. Introduced in version 4.79.0. Use this reference to: Look up configuration options for DynamoDB CDC streaming Find metadata fields available for message processing Identify checkpointing and performance tuning settings Common Advanced inputs: label: "" aws_dynamodb_cdc: table: "" # No default (required) checkpoint_table: redpanda_dynamodb_checkpoints start_from: trim_horizon inputs: label: "" aws_dynamodb_cdc: table: "" # No default (required) checkpoint_table: redpanda_dynamodb_checkpoints batch_size: 1000 poll_interval: 1s start_from: trim_horizon checkpoint_limit: 1000 max_tracked_shards: 10000 throttle_backoff: 100ms region: "" # No default (optional) endpoint: "" # No default (optional) tcp: connect_timeout: 0s keep_alive: idle: 15s interval: 15s count: 9 tcp_user_timeout: 0s credentials: profile: "" # No default (optional) id: "" # No default (optional) secret: "" # No default (optional) token: "" # No default (optional) from_ec2_role: "" # No default (optional) role: "" # No default (optional) role_external_id: "" # No default (optional) Prerequisites The source DynamoDB table must have DynamoDB Streams enabled. You can enable streams with one of these view types: KEYS_ONLY: Only the key attributes of the modified item NEW_IMAGE: The entire item as it appears after the modification OLD_IMAGE: The entire item as it appeared before the modification NEW_AND_OLD_IMAGES: Both the new and old item images Checkpointing Checkpoints are stored in a separate DynamoDB table (configured via checkpoint_table). This table is created automatically if it does not exist. On restart, the input resumes from the last checkpointed position for each shard. Alternative components For better performance and longer retention (up to 1 year vs 24 hours), consider using Kinesis Data Streams for DynamoDB with the aws_kinesis input instead. Message structure Each CDC event is delivered as a JSON message with the following structure. Use these fields in your Bloblang mappings with this.<field>: { "eventID": "abc123-", (1) "eventName": "INSERT | MODIFY | REMOVE", (2) "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "tableName": "my-table", (3) "dynamodb": { "keys": { (4) "pk": "user#123", "sk": "profile" }, "newImage": { (5) "pk": "user#123", "sk": "profile", "name": "Alice", "email": "alice@example.com" }, "oldImage": { (6) "pk": "user#123", "sk": "profile", "name": "Alice Smith" }, "sequenceNumber": "12345678901234567890", (7) "sizeBytes": 256, "streamViewType": "NEW_AND_OLD_IMAGES" } } 1 Unique identifier for this change event. 2 Type of change: INSERT (new item), MODIFY (updated item), or REMOVE (deleted item). 3 Name of the source DynamoDB table. 4 Primary key attributes of the changed item. Always present. 5 Item state after the change. Present for INSERT and MODIFY events (requires NEW_IMAGE or NEW_AND_OLD_IMAGES stream view type). 6 Item state before the change. Present for MODIFY and REMOVE events (requires OLD_IMAGE or NEW_AND_OLD_IMAGES stream view type). 7 Position of this record in the shard, used for ordering and checkpointing. DynamoDB attribute values are automatically unmarshalled from DynamoDB’s type format ({"S": "value"}) to plain values ("value"). Example mapping pipeline: processors: - mapping: | root.event_type = this.eventName root.table = this.tableName root.keys = this.dynamodb.keys root.new_data = this.dynamodb.newImage root.old_data = this.dynamodb.oldImage Metadata This input adds the following metadata fields to each message: dynamodb_shard_id: The shard ID from which the record was read dynamodb_sequence_number: The sequence number of the record in the stream dynamodb_event_name: The type of change: INSERT, MODIFY, or REMOVE dynamodb_table: The name of the DynamoDB table Metrics This input emits the following metrics: dynamodb_cdc_shards_tracked: Total number of shards being tracked (gauge) dynamodb_cdc_shards_active: Number of shards currently being read from (gauge) Fields batch_size Maximum number of records to read per shard in a single request. Valid range: 1-1000. Type: int Default: 1000 checkpoint_limit Maximum number of unacknowledged messages before forcing a checkpoint update. Lower values provide better recovery guarantees but increase write overhead. Type: int Default: 1000 checkpoint_table DynamoDB table name for storing checkpoints. Will be created if it doesn’t exist. Type: string Default: redpanda_dynamodb_checkpoints credentials Optional manual configuration of AWS credentials to use. More information can be found in Amazon Web Services. Type: object credentials.from_ec2_role Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance. Requires version 4.2.0 or later. Type: bool credentials.id The ID of credentials to use. Type: string credentials.profile A profile from ~/.aws/credentials to use. Type: string credentials.role A role ARN to assume. Type: string credentials.role_external_id An external ID to provide when assuming a role. Type: string credentials.secret The secret for the credentials being used. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string credentials.token The token for the credentials being used, required when using short term credentials. Type: string endpoint Allows you to specify a custom endpoint for the AWS API. Type: string max_tracked_shards Maximum number of shards to track simultaneously. Prevents memory issues with extremely large tables. Type: int Default: 10000 poll_interval Time to wait between polling attempts when no records are available. Type: string Default: 1s region The AWS region to target. Type: string start_from Where to start reading when no checkpoint exists. trim_horizon starts from the oldest available record, latest starts from new records. Type: string Default: trim_horizon Options: trim_horizon, latest table The name of the DynamoDB table to read streams from. Type: string tcp TCP socket configuration. Type: object tcp.connect_timeout Maximum amount of time a dial will wait for a connect to complete. Zero disables. Type: string Default: 0s tcp.keep_alive TCP keep-alive probe configuration. Type: object tcp.keep_alive.count Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9. Type: int Default: 9 tcp.keep_alive.idle Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes. Type: string Default: 15s tcp.keep_alive.interval Duration between keep-alive probes. Zero defaults to 15s. Type: string Default: 15s tcp.tcp_user_timeout Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables. Type: string Default: 0s throttle_backoff Time to wait when applying backpressure due to too many in-flight messages. Type: string Default: 100ms Examples Consume CDC events Read change events from a DynamoDB table with streams enabled. input: aws_dynamodb_cdc: table: my-table region: us-east-1 Start from latest Only process new changes, ignoring existing stream data. input: aws_dynamodb_cdc: table: orders start_from: latest region: us-west-2 Suggested reading For common patterns including filtering events, routing to Kafka or S3, and detecting changed fields, see the DynamoDB CDC Patterns cookbook. Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! amqp_1 aws_kinesis