# aws_dynamodb_cdc

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [connect-full.txt](https://docs.redpanda.com/connect-full.txt)

---
title: aws_dynamodb_cdc
latest-connect-version: 4.93.0
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-redpanda-tag: v26.1.9
docname: inputs/aws_dynamodb_cdc
page-component-name: connect
page-version: master
page-component-version: master
page-component-title: Connect
page-relative-src-path: inputs/aws_dynamodb_cdc.adoc
page-edit-url: https://github.com/redpanda-data/rp-connect-docs/edit/main/modules/components/pages/inputs/aws_dynamodb_cdc.adoc
description: Reads change data capture (CDC) events from DynamoDB Streams.
page-topic-type: reference
personas: data_engineer, streaming_developer, platform_operator
learning-objective-1: Look up configuration options for DynamoDB CDC streaming
learning-objective-2: Find metadata fields available for message processing
learning-objective-3: Identify checkpointing and performance tuning settings
page-git-created-date: "2026-03-02"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/connect/components/inputs/aws_dynamodb_cdc.md -->

**Available in:** [Cloud](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/inputs/aws_dynamodb_cdc/%20%22View%20the%20Cloud%20version%20of%20this%20component%22), Self-Managed

**License**: This component requires an [enterprise license](https://docs.redpanda.com/redpanda-connect/get-started/licensing/). You can either [upgrade to an Enterprise Edition license](https://www.redpanda.com/upgrade), or [generate a trial license key](http://redpanda.com/try-enterprise) that's valid for 30 days.

Stream item-level changes from DynamoDB tables using DynamoDB Streams. This input automatically manages shards, checkpoints progress for recovery, and processes multiple shards concurrently.

Introduced in version 4.79.0.

Use this reference to:

-   Look up configuration options for DynamoDB CDC streaming

-   Find metadata fields available for message processing

-   Identify checkpointing and performance tuning settings


#### Common

```yml
inputs:
  label: ""
  aws_dynamodb_cdc:
    tables: []
    checkpoint_table: redpanda_dynamodb_checkpoints
    start_from: trim_horizon
    snapshot_mode: none
```

#### Advanced

```yml
inputs:
  label: ""
  aws_dynamodb_cdc:
    tables: []
    table_discovery_mode: single
    table_tag_filter: ""
    table_discovery_interval: 5m
    checkpoint_table: redpanda_dynamodb_checkpoints
    batch_size: 1000
    poll_interval: 1s
    start_from: trim_horizon
    checkpoint_limit: 1000
    max_tracked_shards: 10000
    throttle_backoff: 100ms
    snapshot_mode: none
    snapshot_segments: 1
    snapshot_batch_size: 100
    snapshot_throttle: 100ms
    snapshot_deduplicate: true
    snapshot_buffer_size: 100000
    region: "" # No default (optional)
    endpoint: "" # No default (optional)
    tcp:
      connect_timeout: 0s
      keep_alive:
        idle: 15s
        interval: 15s
        count: 9
      tcp_user_timeout: 0s
    credentials:
      profile: "" # No default (optional)
      id: "" # No default (optional)
      secret: "" # No default (optional)
      token: "" # No default (optional)
      from_ec2_role: "" # No default (optional)
      role: "" # No default (optional)
      role_external_id: "" # No default (optional)
```

## [](#prerequisites)Prerequisites

The source DynamoDB table must have [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) enabled. You can enable streams with one of these view types:

-   `KEYS_ONLY`: Only the key attributes of the modified item

-   `NEW_IMAGE`: The entire item as it appears after the modification

-   `OLD_IMAGE`: The entire item as it appeared before the modification

-   `NEW_AND_OLD_IMAGES`: Both the new and old item images


## [](#checkpointing)Checkpointing

Checkpoints are stored in a separate DynamoDB table (configured via `checkpoint_table`). This table is created automatically if it does not exist. On restart, the input resumes from the last checkpointed position for each shard.

## [](#alternative-components)Alternative components

For better performance and longer retention (up to 1 year vs 24 hours), consider using Kinesis Data Streams for DynamoDB with the `aws_kinesis` input instead.

## [](#message-structure)Message structure

Each CDC event is delivered as a JSON message with the following structure. Use these fields in your Bloblang mappings with `this.<field>`:

```json
{
  "eventID": "abc123-",                    (1)
  "eventName": "INSERT | MODIFY | REMOVE",   (2)
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-1",
  "tableName": "my-table",                   (3)
  "dynamodb": {
    "keys": {                                (4)
      "pk": "user#123",
      "sk": "profile"
    },
    "newImage": {                            (5)
      "pk": "user#123",
      "sk": "profile",
      "name": "Alice",
      "email": "alice@example.com"
    },
    "oldImage": {                            (6)
      "pk": "user#123",
      "sk": "profile",
      "name": "Alice Smith"
    },
    "sequenceNumber": "12345678901234567890", (7)
    "sizeBytes": 256,
    "streamViewType": "NEW_AND_OLD_IMAGES"
  }
}
```

| 1 | Unique identifier for this change event. |
| --- | --- |
| 2 | Type of change: INSERT (new item), MODIFY (updated item), or REMOVE (deleted item). |
| 3 | Name of the source DynamoDB table. |
| 4 | Primary key attributes of the changed item. Always present. |
| 5 | Item state after the change. Present for INSERT and MODIFY events (requires NEW_IMAGE or NEW_AND_OLD_IMAGES stream view type). |
| 6 | Item state before the change. Present for MODIFY and REMOVE events (requires OLD_IMAGE or NEW_AND_OLD_IMAGES stream view type). |
| 7 | Position of this record in the shard, used for ordering and checkpointing. |

> 📝 **NOTE**
>
> DynamoDB attribute values are automatically unmarshalled from DynamoDB’s type format (`{"S": "value"}`) to plain values (`"value"`).

### [](#example-mapping)Example mapping

```yaml
pipeline:
  processors:
    - mapping: |
        root.event_type = this.eventName
        root.table = this.tableName
        root.keys = this.dynamodb.keys
        root.new_data = this.dynamodb.newImage
        root.old_data = this.dynamodb.oldImage
```

## [](#metadata)Metadata

This input adds the following metadata fields to each message:

-   `dynamodb_shard_id`: The shard ID from which the record was read

-   `dynamodb_sequence_number`: The sequence number of the record in the stream

-   `dynamodb_event_name`: The type of change: INSERT, MODIFY, or REMOVE

-   `dynamodb_table`: The name of the DynamoDB table


## [](#metrics)Metrics

This input emits the following metrics:

-   `dynamodb_cdc_shards_tracked`: Total number of shards being tracked (gauge)

-   `dynamodb_cdc_shards_active`: Number of shards currently being read from (gauge)


## [](#fields)Fields

### [](#batch_size)`batch_size`

Maximum number of records to read per shard in a single request. Valid range: 1-1000.

**Type**: `int`

**Default**: `1000`

### [](#checkpoint_limit)`checkpoint_limit`

Maximum number of unacknowledged messages before forcing a checkpoint update. Lower values provide better recovery guarantees but increase write overhead.

**Type**: `int`

**Default**: `1000`

### [](#checkpoint_table)`checkpoint_table`

DynamoDB table name for storing checkpoints. Will be created if it doesn’t exist.

**Type**: `string`

**Default**: `redpanda_dynamodb_checkpoints`

### [](#credentials)`credentials`

Optional manual configuration of AWS credentials to use. More information can be found in [Amazon Web Services](https://docs.redpanda.com/connect/guides/cloud/aws/).

**Type**: `object`

### [](#credentials-from_ec2_role)`credentials.from_ec2_role`

Use the credentials of a host EC2 machine configured to assume [an IAM role associated with the instance](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2.html).

Requires version 4.2.0 or later.

**Type**: `bool`

### [](#credentials-id)`credentials.id`

The ID of credentials to use.

**Type**: `string`

### [](#credentials-profile)`credentials.profile`

A profile from `~/.aws/credentials` to use.

**Type**: `string`

### [](#credentials-role)`credentials.role`

A role ARN to assume.

**Type**: `string`

### [](#credentials-role_external_id)`credentials.role_external_id`

An external ID to provide when assuming a role.

**Type**: `string`

### [](#credentials-secret)`credentials.secret`

The secret for the credentials being used.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Secrets](https://docs.redpanda.com/connect/configuration/secrets/).

**Type**: `string`

### [](#credentials-token)`credentials.token`

The token for the credentials being used, required when using short term credentials.

**Type**: `string`

### [](#endpoint)`endpoint`

Allows you to specify a custom endpoint for the AWS API.

**Type**: `string`

### [](#max_tracked_shards)`max_tracked_shards`

Maximum number of shards to track simultaneously. Prevents memory issues with extremely large tables.

**Type**: `int`

**Default**: `10000`

### [](#poll_interval)`poll_interval`

Time to wait between polling attempts when no records are available.

**Type**: `string`

**Default**: `1s`

### [](#region)`region`

The AWS region to target.

**Type**: `string`

### [](#snapshot_batch_size)`snapshot_batch_size`

Records per scan request during snapshot. Maximum 1000. Lower values provide better backpressure control but require more API calls.

**Type**: `int`

**Default**: `100`

### [](#snapshot_buffer_size)`snapshot_buffer_size`

Maximum CDC events to buffer for deduplication (approximately 100 bytes per entry). If exceeded, deduplication is disabled and duplicates may be emitted.

**Type**: `int`

**Default**: `100000`

### [](#snapshot_deduplicate)`snapshot_deduplicate`

Deduplicate records that appear in both snapshot and CDC stream. Requires buffering CDC events during snapshot. If buffer is exceeded, deduplication is disabled to prevent data loss.

**Type**: `bool`

**Default**: `true`

### [](#snapshot_mode)`snapshot_mode`

`none`: Streams CDC events only (default). `snapshot_only`: Performs a one-time full table scan with no ongoing streaming. `snapshot_and_cdc`: Scans the entire table, then streams changes.

**Type**: `string`

**Default**: `none`

**Options**: `none`, `snapshot_only`, `snapshot_and_cdc`

### [](#snapshot_segments)`snapshot_segments`

Number of parallel scan segments (1-10). Higher parallelism scans faster but consumes more Read Capacity Units (RCUs). A lower value is safer to start with.

**Type**: `int`

**Default**: `1`

### [](#snapshot_throttle)`snapshot_throttle`

Minimum time between scan requests per segment. Use this to limit Read Capacity Unit (RCU) consumption during snapshot.

**Type**: `string`

**Default**: `100ms`

### [](#start_from)`start_from`

Where to start reading when no checkpoint exists. `trim_horizon` starts from the oldest available record, `latest` starts from new records.

**Type**: `string`

**Default**: `trim_horizon`

**Options**: `trim_horizon`, `latest`

### [](#table_discovery_interval)`table_discovery_interval`

Interval for rescanning and discovering new tables when using `tag` or `includelist` mode. Set to 0 to disable periodic rescanning.

**Type**: `string`

**Default**: `5m`

### [](#table_discovery_mode)`table_discovery_mode`

`single`: Streams from tables specified in the `tables` list. `tag`: Auto-discovers tables by tags (ignores the `tables` field). `includelist`: Streams from tables in the `tables` list. Use `single` instead; `includelist` is kept for backward compatibility.

**Type**: `string`

**Default**: `single`

**Options**: `single`, `tag`, `includelist`

### [](#table_tag_filter)`table_tag_filter`

Multi-tag filter in the format `key1:v1,v2;key2:v3,v4`. Matches tables where (key1=v1 OR key1=v2) AND (key2=v3 OR key2=v4). Required when `table_discovery_mode` is `tag`.

**Type**: `string`

**Default**: `""`

### [](#tables)`tables[]`

List of table names to stream from. For single table mode, provide one table. For multi-table mode, provide multiple tables.

**Type**: `array`

**Default**: `[]`

### [](#tcp)`tcp`

TCP socket configuration.

**Type**: `object`

### [](#tcp-connect_timeout)`tcp.connect_timeout`

Maximum amount of time a dial will wait for a connect to complete. Zero disables.

**Type**: `string`

**Default**: `0s`

### [](#tcp-keep_alive)`tcp.keep_alive`

TCP keep-alive probe configuration.

**Type**: `object`

### [](#tcp-keep_alive-count)`tcp.keep_alive.count`

Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.

**Type**: `int`

**Default**: `9`

### [](#tcp-keep_alive-idle)`tcp.keep_alive.idle`

Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.

**Type**: `string`

**Default**: `15s`

### [](#tcp-keep_alive-interval)`tcp.keep_alive.interval`

Duration between keep-alive probes. Zero defaults to 15s.

**Type**: `string`

**Default**: `15s`

### [](#tcp-tcp_user_timeout)`tcp.tcp_user_timeout`

Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep\_alive.idle must be greater than this value per RFC 5482. Zero disables.

**Type**: `string`

**Default**: `0s`

### [](#throttle_backoff)`throttle_backoff`

Time to wait when applying backpressure due to too many in-flight messages.

**Type**: `string`

**Default**: `100ms`

## [](#examples)Examples

### [](#consume-cdc-events)Consume CDC events

Read change events from a DynamoDB table with streams enabled.

```yaml
input:
  aws_dynamodb_cdc:
    tables: [my-table]
    region: us-east-1
```

### [](#start-from-latest)Start from latest

Only process new changes, ignoring existing stream data.

```yaml
input:
  aws_dynamodb_cdc:
    tables: [orders]
    start_from: latest
    region: us-west-2
```

### [](#snapshot-and-cdc)Snapshot and CDC

Scan all existing records, then stream ongoing changes.

```yaml
input:
  aws_dynamodb_cdc:
    tables: [products]
    snapshot_mode: snapshot_and_cdc
    snapshot_segments: 5
    region: us-east-1
```

### [](#auto-discover-tables-by-tag)Auto-discover tables by tag

Automatically discover and stream from all tables with a specific tag.

```yaml
input:
  aws_dynamodb_cdc:
    table_discovery_mode: tag
    table_tag_filter: "stream-enabled:true"
    table_discovery_interval: 5m
    region: us-east-1
```

### [](#auto-discover-tables-by-multiple-tags)Auto-discover tables by multiple tags

Discover tables matching multiple tag criteria with OR logic per key, AND logic across keys.

```yaml
input:
  aws_dynamodb_cdc:
    table_discovery_mode: tag
    table_tag_filter: "environment:prod,staging;team:data,analytics"
    table_discovery_interval: 5m
    region: us-east-1
    # Matches tables with: (environment=prod OR environment=staging) AND (team=data OR team=analytics)
```

### [](#stream-from-multiple-specific-tables)Stream from multiple specific tables

Stream from an explicit list of tables simultaneously.

```yaml
input:
  aws_dynamodb_cdc:
    table_discovery_mode: includelist
    tables:
      - orders
      - customers
      - products
    region: us-west-2
```

## [](#suggested-reading)Suggested reading

For common patterns including filtering events, routing to Kafka or S3, and detecting changed fields, see the [DynamoDB CDC Patterns](https://docs.redpanda.com/connect/cookbooks/dynamodb_cdc/) cookbook.