aws_dynamodb_cdc

Stream item-level changes from DynamoDB tables using DynamoDB Streams. This input automatically manages shards, checkpoints progress for recovery, and processes multiple shards concurrently.

Introduced in version 4.79.0.

Use this reference to:

  • Look up configuration options for DynamoDB CDC streaming

  • Find metadata fields available for message processing

  • Identify checkpointing and performance tuning settings

  • Common

  • Advanced

inputs:
  label: ""
  aws_dynamodb_cdc:
    table: "" # No default (required)
    checkpoint_table: redpanda_dynamodb_checkpoints
    start_from: trim_horizon
inputs:
  label: ""
  aws_dynamodb_cdc:
    table: "" # No default (required)
    checkpoint_table: redpanda_dynamodb_checkpoints
    batch_size: 1000
    poll_interval: 1s
    start_from: trim_horizon
    checkpoint_limit: 1000
    max_tracked_shards: 10000
    throttle_backoff: 100ms
    region: "" # No default (optional)
    endpoint: "" # No default (optional)
    tcp:
      connect_timeout: 0s
      keep_alive:
        idle: 15s
        interval: 15s
        count: 9
      tcp_user_timeout: 0s
    credentials:
      profile: "" # No default (optional)
      id: "" # No default (optional)
      secret: "" # No default (optional)
      token: "" # No default (optional)
      from_ec2_role: "" # No default (optional)
      role: "" # No default (optional)
      role_external_id: "" # No default (optional)

Prerequisites

The source DynamoDB table must have DynamoDB Streams enabled. You can enable streams with one of these view types:

  • KEYS_ONLY: Only the key attributes of the modified item

  • NEW_IMAGE: The entire item as it appears after the modification

  • OLD_IMAGE: The entire item as it appeared before the modification

  • NEW_AND_OLD_IMAGES: Both the new and old item images

Checkpointing

Checkpoints are stored in a separate DynamoDB table (configured via checkpoint_table). This table is created automatically if it does not exist. On restart, the input resumes from the last checkpointed position for each shard.

Alternative components

For better performance and longer retention (up to 1 year vs 24 hours), consider using Kinesis Data Streams for DynamoDB with the aws_kinesis input instead.

Message structure

Each CDC event is delivered as a JSON message with the following structure. Use these fields in your Bloblang mappings with this.<field>:

{
  "eventID": "abc123-",                    (1)
  "eventName": "INSERT | MODIFY | REMOVE",   (2)
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-1",
  "tableName": "my-table",                   (3)
  "dynamodb": {
    "keys": {                                (4)
      "pk": "user#123",
      "sk": "profile"
    },
    "newImage": {                            (5)
      "pk": "user#123",
      "sk": "profile",
      "name": "Alice",
      "email": "alice@example.com"
    },
    "oldImage": {                            (6)
      "pk": "user#123",
      "sk": "profile",
      "name": "Alice Smith"
    },
    "sequenceNumber": "12345678901234567890", (7)
    "sizeBytes": 256,
    "streamViewType": "NEW_AND_OLD_IMAGES"
  }
}
1 Unique identifier for this change event.
2 Type of change: INSERT (new item), MODIFY (updated item), or REMOVE (deleted item).
3 Name of the source DynamoDB table.
4 Primary key attributes of the changed item. Always present.
5 Item state after the change. Present for INSERT and MODIFY events (requires NEW_IMAGE or NEW_AND_OLD_IMAGES stream view type).
6 Item state before the change. Present for MODIFY and REMOVE events (requires OLD_IMAGE or NEW_AND_OLD_IMAGES stream view type).
7 Position of this record in the shard, used for ordering and checkpointing.
DynamoDB attribute values are automatically unmarshalled from DynamoDB’s type format ({"S": "value"}) to plain values ("value").

Example mapping

pipeline:
  processors:
    - mapping: |
        root.event_type = this.eventName
        root.table = this.tableName
        root.keys = this.dynamodb.keys
        root.new_data = this.dynamodb.newImage
        root.old_data = this.dynamodb.oldImage

Metadata

This input adds the following metadata fields to each message:

  • dynamodb_shard_id: The shard ID from which the record was read

  • dynamodb_sequence_number: The sequence number of the record in the stream

  • dynamodb_event_name: The type of change: INSERT, MODIFY, or REMOVE

  • dynamodb_table: The name of the DynamoDB table

Metrics

This input emits the following metrics:

  • dynamodb_cdc_shards_tracked: Total number of shards being tracked (gauge)

  • dynamodb_cdc_shards_active: Number of shards currently being read from (gauge)

Fields

batch_size

Maximum number of records to read per shard in a single request. Valid range: 1-1000.

Type: int

Default: 1000

checkpoint_limit

Maximum number of unacknowledged messages before forcing a checkpoint update. Lower values provide better recovery guarantees but increase write overhead.

Type: int

Default: 1000

checkpoint_table

DynamoDB table name for storing checkpoints. Will be created if it doesn’t exist.

Type: string

Default: redpanda_dynamodb_checkpoints

credentials

Optional manual configuration of AWS credentials to use. More information can be found in Amazon Web Services.

Type: object

credentials.from_ec2_role

Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.

Requires version 4.2.0 or later.

Type: bool

credentials.id

The ID of credentials to use.

Type: string

credentials.profile

A profile from ~/.aws/credentials to use.

Type: string

credentials.role

A role ARN to assume.

Type: string

credentials.role_external_id

An external ID to provide when assuming a role.

Type: string

credentials.secret

The secret for the credentials being used.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

credentials.token

The token for the credentials being used, required when using short term credentials.

Type: string

endpoint

Allows you to specify a custom endpoint for the AWS API.

Type: string

max_tracked_shards

Maximum number of shards to track simultaneously. Prevents memory issues with extremely large tables.

Type: int

Default: 10000

poll_interval

Time to wait between polling attempts when no records are available.

Type: string

Default: 1s

region

The AWS region to target.

Type: string

start_from

Where to start reading when no checkpoint exists. trim_horizon starts from the oldest available record, latest starts from new records.

Type: string

Default: trim_horizon

Options: trim_horizon, latest

table

The name of the DynamoDB table to read streams from.

Type: string

tcp

TCP socket configuration.

Type: object

tcp.connect_timeout

Maximum amount of time a dial will wait for a connect to complete. Zero disables.

Type: string

Default: 0s

tcp.keep_alive

TCP keep-alive probe configuration.

Type: object

tcp.keep_alive.count

Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.

Type: int

Default: 9

tcp.keep_alive.idle

Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.

Type: string

Default: 15s

tcp.keep_alive.interval

Duration between keep-alive probes. Zero defaults to 15s.

Type: string

Default: 15s

tcp.tcp_user_timeout

Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables.

Type: string

Default: 0s

throttle_backoff

Time to wait when applying backpressure due to too many in-flight messages.

Type: string

Default: 100ms

Examples

Consume CDC events

Read change events from a DynamoDB table with streams enabled.

input:
  aws_dynamodb_cdc:
    table: my-table
    region: us-east-1

Start from latest

Only process new changes, ignoring existing stream data.

input:
  aws_dynamodb_cdc:
    table: orders
    start_from: latest
    region: us-west-2

Suggested reading

For common patterns including filtering events, routing to Kafka or S3, and detecting changed fields, see the DynamoDB CDC Patterns cookbook.