# DynamoDB CDC Patterns

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [connect-full.txt](https://docs.redpanda.com/connect-full.txt)

---
title: DynamoDB CDC Patterns
latest-connect-version: 4.93.0
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-redpanda-tag: v26.1.9
docname: dynamodb_cdc
page-component-name: connect
page-version: master
page-component-version: master
page-component-title: Connect
page-relative-src-path: dynamodb_cdc.adoc
page-edit-url: https://github.com/redpanda-data/rp-connect-docs/edit/main/modules/cookbooks/pages/dynamodb_cdc.adoc
description: Learn how to capture, filter, transform, and route DynamoDB change data capture (CDC) events with Redpanda Connect.
page-topic-type: cookbook
personas: streaming_developer, data_engineer
learning-objective-1: Find reusable patterns for capturing DynamoDB CDC events
learning-objective-2: Look up integration patterns for routing CDC data to Redpanda and S3
learning-objective-3: Identify patterns for filtering and transforming change events
page-git-created-date: "2026-03-02"
page-git-modified-date: "2026-03-03"
---

<!-- Source: https://docs.redpanda.com/connect/cookbooks/dynamodb_cdc.md -->

The DynamoDB CDC input enables capturing item-level changes from DynamoDB tables with streams enabled. This cookbook provides reusable patterns for filtering, transforming, and routing DynamoDB CDC events to Redpanda, S3, and other destinations.

Use this cookbook to:

-   Find reusable patterns for capturing DynamoDB CDC events

-   Look up integration patterns for routing CDC data to Redpanda and S3

-   Identify patterns for filtering and transforming change events


## [](#prerequisites)Prerequisites

Before using these patterns, ensure you have the following configured:

### [](#redpanda-cli)Redpanda CLI

Install the Redpanda CLI (`rpk`) to run Redpanda Connect. See [Get Started with Redpanda Connect using `rpk`](https://docs.redpanda.com/connect/get-started/quickstarts/rpk/) for installation instructions.

### [](#dynamodb-streams)DynamoDB Streams

The source DynamoDB table must have [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) enabled with an appropriate view type:

-   `KEYS_ONLY`: Only the key attributes of the modified item

-   `NEW_IMAGE`: The entire item as it appears after the modification

-   `OLD_IMAGE`: The entire item as it appeared before the modification

-   `NEW_AND_OLD_IMAGES`: Both the new and old item images (recommended for detecting changes)


To enable streams on an existing table using the AWS CLI:

```bash
aws dynamodb update-table \
  --table-name my-table \
  --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
```

### [](#environment-variables)Environment variables

The examples in this cookbook use environment variables for AWS configuration. This allows you to keep credentials secure and separate from your pipeline configuration files.

```bash
export DYNAMODB_TABLE=my-table (1)
export AWS_REGION=us-east-1 (2)
export REDPANDA_BROKERS=localhost:9092 (3)
export S3_BUCKET=my-cdc-bucket (4)
```

| 1 | The name of the DynamoDB table with streams enabled. |
| --- | --- |
| 2 | The AWS region where your DynamoDB table is located. |
| 3 | The Redpanda broker addresses (for Redpanda output examples). |
| 4 | The S3 bucket name (for S3 output examples). |

Redpanda Connect loads AWS credentials from the standard [credential chain](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html) (environment variables, `~/.aws/credentials`, or IAM roles).

## [](#capture-cdc-events)Capture CDC events

The simplest pattern captures all change events from a DynamoDB table and outputs them with metadata:

```yaml
input:
  aws_dynamodb_cdc:
    tables: ["${DYNAMODB_TABLE}"]
    region: ${AWS_REGION}
    checkpoint_table: redpanda_dynamodb_checkpoints
    start_from: trim_horizon

pipeline:
  processors:
    # Extract the change event details
    - mapping: |
        root.event_type = this.eventName
        root.table = this.tableName
        root.event_id = this.eventID
        root.keys = this.dynamodb.keys
        root.new_image = this.dynamodb.newImage
        root.old_image = this.dynamodb.oldImage
        root.sequence_number = this.dynamodb.sequenceNumber
        root.timestamp = now()

output:
  stdout:
    codec: lines
```

For details on the CDC event message structure and available fields for Bloblang mappings, see the [message structure](https://docs.redpanda.com/connect/components/inputs/aws_dynamodb_cdc/#_message_structure) section in the connector reference.

## [](#filter-cdc-events)Filter CDC events

You can filter events to process only specific change types:

```yaml
input:
  aws_dynamodb_cdc:
    tables: ["${DYNAMODB_TABLE}"]
    region: ${AWS_REGION}
    start_from: latest

pipeline:
  processors:
    # Filter to only process INSERT and MODIFY events (ignore REMOVE)
    - mapping: |
        root = if this.eventName == "REMOVE" { deleted() } else { this }

    # Transform to a simplified format
    - mapping: |
        root.event_type = this.eventName
        root.keys = this.dynamodb.keys
        root.new_data = this.dynamodb.newImage
        root.old_data = this.dynamodb.oldImage

output:
  stdout:
    codec: lines
```

This example:

-   Filters out `REMOVE` events using `deleted()`

-   Transforms the event to a simplified format


## [](#route-to-redpanda)Route to Redpanda

Stream DynamoDB changes to Redpanda for real-time processing:

```yaml
input:
  aws_dynamodb_cdc:
    tables: ["${DYNAMODB_TABLE}"]
    region: ${AWS_REGION}
    checkpoint_table: redpanda_dynamodb_checkpoints
    batch_size: 100
    poll_interval: 500ms

pipeline:
  processors:
    # Transform to a Kafka-friendly format with a composite key
    - mapping: |
        let keys = this.dynamodb.keys
        meta kafka_key = [$keys.pk, $keys.sk].filter(v -> v != null).join("#")
        root.event_type = this.eventName
        root.table = this.tableName
        root.timestamp = now()
        root.keys = this.dynamodb.keys
        root.new_image = this.dynamodb.newImage
        root.old_image = this.dynamodb.oldImage

output:
  redpanda:
    seed_brokers:
      - ${REDPANDA_BROKERS}
    topic: dynamodb-cdc-events
    key: ${! @kafka_key }
    partitioner: murmur2_hash
    compression: snappy
    batching:
      count: 100
      period: 1s
```

This example:

-   Creates a composite message key from the DynamoDB primary key

-   Transforms the DynamoDB format to plain JSON

-   Batches messages for efficient delivery


## [](#route-to-s3)Route to S3

Archive CDC events to S3 for long-term storage and analytics:

```yaml
input:
  aws_dynamodb_cdc:
    tables: ["${DYNAMODB_TABLE}"]
    region: ${AWS_REGION}
    checkpoint_table: redpanda_dynamodb_checkpoints
    start_from: trim_horizon

pipeline:
  processors:
    # Add partitioning metadata for S3 organization
    - mapping: |
        let event_time = now()
        meta s3_path = "year=%s/month=%s/day=%s/hour=%s".format(
          $event_time.ts_format("2006"),
          $event_time.ts_format("01"),
          $event_time.ts_format("02"),
          $event_time.ts_format("15")
        )
        root.event_type = this.eventName
        root.table = this.tableName
        root.sequence_number = this.dynamodb.sequenceNumber
        root.event_time = $event_time
        root.keys = this.dynamodb.keys
        root.new_image = this.dynamodb.newImage
        root.old_image = this.dynamodb.oldImage

output:
  aws_s3:
    bucket: ${S3_BUCKET}
    path: dynamodb-cdc/${DYNAMODB_TABLE}/${! @s3_path }/${! uuid_v4() }.json
    region: ${AWS_REGION}
    batching:
      count: 1000
      period: 1m
      processors:
        - archive:
            format: lines
```

This example:

-   Organizes files by time-based partitions (year/month/day/hour)

-   Batches events and archives them as newline-delimited JSON

-   Uses UUID file names to prevent collisions


## [](#route-by-event-type)Route by event type

Route different event types to different destinations:

```yaml
input:
  aws_dynamodb_cdc:
    tables: ["${DYNAMODB_TABLE}"]
    region: ${AWS_REGION}

pipeline:
  processors:
    # Transform to a common format
    - mapping: |
        root.event_type = this.eventName
        root.table = this.tableName
        root.timestamp = now()
        root.keys = this.dynamodb.keys
        root.data = if this.dynamodb.exists("newImage") {
          this.dynamodb.newImage
        } else {
          this.dynamodb.oldImage
        }

output:
  switch:
    cases:
      # Route INSERT events to a topic for new records
      - check: this.event_type == "INSERT"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: dynamodb-inserts

      # Route MODIFY events to a topic for updates
      - check: this.event_type == "MODIFY"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: dynamodb-updates

      # Route REMOVE events to a topic for deletes
      - check: this.event_type == "REMOVE"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: dynamodb-deletes

      # Fallback for any unexpected event types
      - output:
          drop: {}
```

This pattern:

-   Separates processing pipelines for inserts, updates, and deletes

-   Applies different retention policies per event type

-   Enables specialized downstream consumers


## [](#detect-changed-fields)Detect changed fields

Compare old and new images to identify which fields changed:

```yaml
input:
  aws_dynamodb_cdc:
    tables: ["${DYNAMODB_TABLE}"]
    region: ${AWS_REGION}

pipeline:
  processors:
    # Only process MODIFY events
    - mapping: |
        root = if this.eventName != "MODIFY" { deleted() } else { this }

    # Compare old and new images to find changed fields
    - mapping: |
        let old_data = this.dynamodb.oldImage
        let new_data = this.dynamodb.newImage

        root.table = this.tableName
        root.keys = this.dynamodb.keys
        root.timestamp = now()

        # Find fields that changed by comparing key-value pairs
        root.changes = $new_data.key_values().filter(kv -> !$old_data.exists(kv.key) || $old_data.get(kv.key) != kv.value).map_each(kv -> {"field": kv.key, "old_value": if $old_data.exists(kv.key) { $old_data.get(kv.key) } else { null }, "new_value": kv.value})

        # Find fields that were removed
        root.removed_fields = $old_data.keys().filter(k -> !$new_data.exists(k))

output:
  stdout:
    codec: lines
```

This pattern:

-   Filters to only MODIFY events

-   Compares old and new images to find differences

-   Outputs a list of changed fields with their old and new values


> 📝 **NOTE**
>
> This pattern requires the `NEW_AND_OLD_IMAGES` stream view type. The `.key_values()` method converts an object to an array of key-value pairs that can be filtered and mapped.

## [](#checkpointing)Checkpointing

The DynamoDB CDC input automatically manages checkpoints in a separate DynamoDB table:

```yaml
input:
  aws_dynamodb_cdc:
    table: my-table
    checkpoint_table: my-app-checkpoints (1)
    checkpoint_limit: 500 (2)
    start_from: trim_horizon (3)
```

| 1 | Custom checkpoint table name (default: redpanda_dynamodb_checkpoints). |
| --- | --- |
| 2 | Checkpoint after every 500 messages (lower = better recovery, higher = fewer writes). |
| 3 | Start from the oldest available record when no checkpoint exists. |

If a checkpoint table doesn’t exist, it’s created automatically with the required schema.

## [](#performance-tuning)Performance tuning

Optimize throughput and latency with these settings:

```yaml
input:
  aws_dynamodb_cdc:
    table: my-table
    batch_size: 1000 (1)
    poll_interval: 100ms (2)
    max_tracked_shards: 10000 (3)
    throttle_backoff: 50ms (4)
```

| 1 | Maximum records per shard per request (1-1000). |
| --- | --- |
| 2 | Time between polls when no records are available. |
| 3 | Maximum shards to track (for very large tables). |
| 4 | Backpressure delay when too many messages are in-flight. |

### [](#throughput-considerations)Throughput considerations

-   DynamoDB Streams allows 5 `GetRecords` calls per second per shard

-   Higher `batch_size` improves throughput but increases memory usage

-   Shorter `poll_interval` reduces latency but increases API calls


## [](#troubleshoot)Troubleshoot

### [](#no-events-received)No events received

If you’re not receiving events:

1.  Verify streams are enabled on the table:

    ```bash
    aws dynamodb describe-table --table-name my-table \
      --query 'Table.StreamSpecification'
    ```

2.  Check that changes are being made to the table

3.  Verify `start_from` is set to `trim_horizon` to capture existing stream data


### [](#duplicate-events)Duplicate events

Each stream record appears exactly once in DynamoDB Streams. However, if your pipeline fails before checkpointing, records may be re-read on restart, resulting in at-least-once processing semantics. To handle potential duplicates:

-   Use idempotent processing in downstream systems

-   Deduplicate using the `dynamodb_sequence_number` metadata

-   Lower `checkpoint_limit` to reduce the window of possible duplicates


### [](#stream-retention)Stream retention

DynamoDB Streams retains data for 24 hours. If your pipeline is offline longer than that:

-   Consider using [Kinesis Data Streams for DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html) with the [`aws_kinesis` input](https://docs.redpanda.com/connect/components/inputs/aws_kinesis/) instead (up to 1 year retention)

-   Implement a full-table scan fallback for disaster recovery


## [](#suggested-reading)Suggested reading

-   [DynamoDB CDC Input Reference](https://docs.redpanda.com/connect/components/inputs/aws_dynamodb_cdc/)

-   [AWS Configuration Guide](https://docs.redpanda.com/connect/guides/cloud/aws/)

-   [Kinesis Input](https://docs.redpanda.com/connect/components/inputs/aws_kinesis/) (for Kinesis Data Streams for DynamoDB)

-   [DynamoDB Streams Documentation](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)


## Suggested labs

-   [Stream Jira Issues to Redpanda for Real-Time Metrics](https://docs.redpanda.com/labs/docker-compose/jira-metrics-pipeline/)

[Search all labs](https://docs.redpanda.com/labs)