aws_dynamodb_cdc
Stream item-level changes from DynamoDB tables using DynamoDB Streams. This input automatically manages shards, checkpoints progress for recovery, and processes multiple shards concurrently.
Introduced in version 4.79.0.
Use this reference to:
-
Look up configuration options for DynamoDB CDC streaming
-
Find metadata fields available for message processing
-
Identify checkpointing and performance tuning settings
-
Common
-
Advanced
inputs:
label: ""
aws_dynamodb_cdc:
tables: []
checkpoint_table: redpanda_dynamodb_checkpoints
start_from: trim_horizon
snapshot_mode: none
inputs:
label: ""
aws_dynamodb_cdc:
tables: []
table_discovery_mode: single
table_tag_filter: ""
table_discovery_interval: 5m
checkpoint_table: redpanda_dynamodb_checkpoints
batch_size: 1000
poll_interval: 1s
start_from: trim_horizon
checkpoint_limit: 1000
max_tracked_shards: 10000
throttle_backoff: 100ms
snapshot_mode: none
snapshot_segments: 1
snapshot_batch_size: 100
snapshot_throttle: 100ms
snapshot_deduplicate: true
snapshot_buffer_size: 100000
region: "" # No default (optional)
endpoint: "" # No default (optional)
tcp:
connect_timeout: 0s
keep_alive:
idle: 15s
interval: 15s
count: 9
tcp_user_timeout: 0s
credentials:
profile: "" # No default (optional)
id: "" # No default (optional)
secret: "" # No default (optional)
token: "" # No default (optional)
from_ec2_role: "" # No default (optional)
role: "" # No default (optional)
role_external_id: "" # No default (optional)
Prerequisites
The source DynamoDB table must have DynamoDB Streams enabled. You can enable streams with one of these view types:
-
KEYS_ONLY: Only the key attributes of the modified item -
NEW_IMAGE: The entire item as it appears after the modification -
OLD_IMAGE: The entire item as it appeared before the modification -
NEW_AND_OLD_IMAGES: Both the new and old item images
Checkpointing
Checkpoints are stored in a separate DynamoDB table (configured via checkpoint_table). This table is created automatically if it does not exist. On restart, the input resumes from the last checkpointed position for each shard.
Alternative components
For better performance and longer retention (up to 1 year vs 24 hours), consider using Kinesis Data Streams for DynamoDB with the aws_kinesis input instead.
Message structure
Each CDC event is delivered as a JSON message with the following structure. Use these fields in your Bloblang mappings with this.<field>:
{
"eventID": "abc123-", (1)
"eventName": "INSERT | MODIFY | REMOVE", (2)
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"tableName": "my-table", (3)
"dynamodb": {
"keys": { (4)
"pk": "user#123",
"sk": "profile"
},
"newImage": { (5)
"pk": "user#123",
"sk": "profile",
"name": "Alice",
"email": "alice@example.com"
},
"oldImage": { (6)
"pk": "user#123",
"sk": "profile",
"name": "Alice Smith"
},
"sequenceNumber": "12345678901234567890", (7)
"sizeBytes": 256,
"streamViewType": "NEW_AND_OLD_IMAGES"
}
}
| 1 | Unique identifier for this change event. |
| 2 | Type of change: INSERT (new item), MODIFY (updated item), or REMOVE (deleted item). |
| 3 | Name of the source DynamoDB table. |
| 4 | Primary key attributes of the changed item. Always present. |
| 5 | Item state after the change. Present for INSERT and MODIFY events (requires NEW_IMAGE or NEW_AND_OLD_IMAGES stream view type). |
| 6 | Item state before the change. Present for MODIFY and REMOVE events (requires OLD_IMAGE or NEW_AND_OLD_IMAGES stream view type). |
| 7 | Position of this record in the shard, used for ordering and checkpointing. |
DynamoDB attribute values are automatically unmarshalled from DynamoDB’s type format ({"S": "value"}) to plain values ("value").
|
Metadata
This input adds the following metadata fields to each message:
-
dynamodb_shard_id: The shard ID from which the record was read -
dynamodb_sequence_number: The sequence number of the record in the stream -
dynamodb_event_name: The type of change: INSERT, MODIFY, or REMOVE -
dynamodb_table: The name of the DynamoDB table
Metrics
This input emits the following metrics:
-
dynamodb_cdc_shards_tracked: Total number of shards being tracked (gauge) -
dynamodb_cdc_shards_active: Number of shards currently being read from (gauge)
Fields
batch_size
Maximum number of records to read per shard in a single request. Valid range: 1-1000.
Type: int
Default: 1000
checkpoint_limit
Maximum number of unacknowledged messages before forcing a checkpoint update. Lower values provide better recovery guarantees but increase write overhead.
Type: int
Default: 1000
checkpoint_table
DynamoDB table name for storing checkpoints. Will be created if it doesn’t exist.
Type: string
Default: redpanda_dynamodb_checkpoints
credentials
Optional manual configuration of AWS credentials to use. More information can be found in Amazon Web Services.
Type: object
credentials.from_ec2_role
Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.
Requires version 4.2.0 or later.
Type: bool
credentials.secret
The secret for the credentials being used.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
credentials.token
The token for the credentials being used, required when using short term credentials.
Type: string
max_tracked_shards
Maximum number of shards to track simultaneously. Prevents memory issues with extremely large tables.
Type: int
Default: 10000
poll_interval
Time to wait between polling attempts when no records are available.
Type: string
Default: 1s
snapshot_batch_size
Records per scan request during snapshot. Maximum 1000. Lower values provide better backpressure control but require more API calls.
Type: int
Default: 100
snapshot_buffer_size
Maximum CDC events to buffer for deduplication (approximately 100 bytes per entry). If exceeded, deduplication is disabled and duplicates may be emitted.
Type: int
Default: 100000
snapshot_deduplicate
Deduplicate records that appear in both snapshot and CDC stream. Requires buffering CDC events during snapshot. If buffer is exceeded, deduplication is disabled to prevent data loss.
Type: bool
Default: true
snapshot_mode
none: Streams CDC events only (default). snapshot_only: Performs a one-time full table scan with no ongoing streaming. snapshot_and_cdc: Scans the entire table, then streams changes.
Type: string
Default: none
Options: none, snapshot_only, snapshot_and_cdc
snapshot_segments
Number of parallel scan segments (1-10). Higher parallelism scans faster but consumes more Read Capacity Units (RCUs). A lower value is safer to start with.
Type: int
Default: 1
snapshot_throttle
Minimum time between scan requests per segment. Use this to limit Read Capacity Unit (RCU) consumption during snapshot.
Type: string
Default: 100ms
start_from
Where to start reading when no checkpoint exists. trim_horizon starts from the oldest available record, latest starts from new records.
Type: string
Default: trim_horizon
Options: trim_horizon, latest
table_discovery_interval
Interval for rescanning and discovering new tables when using tag or includelist mode. Set to 0 to disable periodic rescanning.
Type: string
Default: 5m
table_discovery_mode
single: Streams from tables specified in the tables list. tag: Auto-discovers tables by tags (ignores the tables field). includelist: Streams from tables in the tables list. Use single instead; includelist is kept for backward compatibility.
Type: string
Default: single
Options: single, tag, includelist
table_tag_filter
Multi-tag filter in the format key1:v1,v2;key2:v3,v4. Matches tables where (key1=v1 OR key1=v2) AND (key2=v3 OR key2=v4). Required when table_discovery_mode is tag.
Type: string
Default: ""
tables[]
List of table names to stream from. For single table mode, provide one table. For multi-table mode, provide multiple tables.
Type: array
Default: []
tcp.connect_timeout
Maximum amount of time a dial will wait for a connect to complete. Zero disables.
Type: string
Default: 0s
tcp.keep_alive.count
Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.
Type: int
Default: 9
tcp.keep_alive.idle
Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.
Type: string
Default: 15s
tcp.keep_alive.interval
Duration between keep-alive probes. Zero defaults to 15s.
Type: string
Default: 15s
Examples
Consume CDC events
Read change events from a DynamoDB table with streams enabled.
input:
aws_dynamodb_cdc:
tables: [my-table]
region: us-east-1
Start from latest
Only process new changes, ignoring existing stream data.
input:
aws_dynamodb_cdc:
tables: [orders]
start_from: latest
region: us-west-2
Snapshot and CDC
Scan all existing records, then stream ongoing changes.
input:
aws_dynamodb_cdc:
tables: [products]
snapshot_mode: snapshot_and_cdc
snapshot_segments: 5
region: us-east-1
Auto-discover tables by tag
Automatically discover and stream from all tables with a specific tag.
input:
aws_dynamodb_cdc:
table_discovery_mode: tag
table_tag_filter: "stream-enabled:true"
table_discovery_interval: 5m
region: us-east-1
Auto-discover tables by multiple tags
Discover tables matching multiple tag criteria with OR logic per key, AND logic across keys.
input:
aws_dynamodb_cdc:
table_discovery_mode: tag
table_tag_filter: "environment:prod,staging;team:data,analytics"
table_discovery_interval: 5m
region: us-east-1
# Matches tables with: (environment=prod OR environment=staging) AND (team=data OR team=analytics)
Suggested reading
For common patterns including filtering events, routing to Kafka or S3, and detecting changed fields, see the DynamoDB CDC Patterns cookbook.