DynamoDB CDC Patterns

The DynamoDB CDC input enables capturing item-level changes from DynamoDB tables with streams enabled. This cookbook provides reusable patterns for filtering, transforming, and routing DynamoDB CDC events to Redpanda, S3, and other destinations.

Use this cookbook to:

  • Find reusable patterns for capturing DynamoDB CDC events

  • Look up integration patterns for routing CDC data to Redpanda and S3

  • Identify patterns for filtering and transforming change events

Prerequisites

Before using these patterns, ensure you have the following configured:

Redpanda CLI

Install the Redpanda CLI (rpk) to run Redpanda Connect. See Get Started with Redpanda Connect using rpk for installation instructions.

DynamoDB Streams

The source DynamoDB table must have DynamoDB Streams enabled with an appropriate view type:

  • KEYS_ONLY: Only the key attributes of the modified item

  • NEW_IMAGE: The entire item as it appears after the modification

  • OLD_IMAGE: The entire item as it appeared before the modification

  • NEW_AND_OLD_IMAGES: Both the new and old item images (recommended for detecting changes)

To enable streams on an existing table using the AWS CLI:

aws dynamodb update-table \
  --table-name my-table \
  --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES

Environment variables

The examples in this cookbook use environment variables for AWS configuration. This allows you to keep credentials secure and separate from your pipeline configuration files.

export DYNAMODB_TABLE=my-table (1)
export AWS_REGION=us-east-1 (2)
export REDPANDA_BROKERS=localhost:9092 (3)
export S3_BUCKET=my-cdc-bucket (4)
1 The name of the DynamoDB table with streams enabled.
2 The AWS region where your DynamoDB table is located.
3 The Redpanda broker addresses (for Redpanda output examples).
4 The S3 bucket name (for S3 output examples).

Redpanda Connect loads AWS credentials from the standard credential chain (environment variables, ~/.aws/credentials, or IAM roles).

Capture CDC events

The simplest pattern captures all change events from a DynamoDB table and outputs them with metadata:

input:
  aws_dynamodb_cdc:
    table: ${DYNAMODB_TABLE}
    region: ${AWS_REGION}
    checkpoint_table: redpanda_dynamodb_checkpoints
    start_from: trim_horizon

pipeline:
  processors:
    # Extract the change event details
    - mapping: |
        root.event_type = this.eventName
        root.table = this.tableName
        root.event_id = this.eventID
        root.keys = this.dynamodb.keys
        root.new_image = this.dynamodb.newImage
        root.old_image = this.dynamodb.oldImage
        root.sequence_number = this.dynamodb.sequenceNumber
        root.timestamp = now()

output:
  stdout:
    codec: lines

For details on the CDC event message structure and available fields for Bloblang mappings, see the message structure section in the connector reference.

Filter CDC events

You can filter events to process only specific change types:

input:
  aws_dynamodb_cdc:
    table: ${DYNAMODB_TABLE}
    region: ${AWS_REGION}
    start_from: latest

pipeline:
  processors:
    # Filter to only process INSERT and MODIFY events (ignore REMOVE)
    - mapping: |
        root = if this.eventName == "REMOVE" { deleted() } else { this }

    # Transform to a simplified format
    - mapping: |
        root.event_type = this.eventName
        root.keys = this.dynamodb.keys
        root.new_data = this.dynamodb.newImage
        root.old_data = this.dynamodb.oldImage

output:
  stdout:
    codec: lines

This example:

  • Filters out REMOVE events using deleted()

  • Transforms the event to a simplified format

Route to Redpanda

Stream DynamoDB changes to Redpanda for real-time processing:

input:
  aws_dynamodb_cdc:
    table: ${DYNAMODB_TABLE}
    region: ${AWS_REGION}
    checkpoint_table: redpanda_dynamodb_checkpoints
    batch_size: 100
    poll_interval: 500ms

pipeline:
  processors:
    # Transform to a Kafka-friendly format with a composite key
    - mapping: |
        let keys = this.dynamodb.keys
        meta kafka_key = [$keys.pk, $keys.sk].filter(v -> v != null).join("#")
        root.event_type = this.eventName
        root.table = this.tableName
        root.timestamp = now()
        root.keys = this.dynamodb.keys
        root.new_image = this.dynamodb.newImage
        root.old_image = this.dynamodb.oldImage

output:
  redpanda:
    seed_brokers:
      - ${REDPANDA_BROKERS}
    topic: dynamodb-cdc-events
    key: ${! @kafka_key }
    partitioner: murmur2_hash
    compression: snappy
    batching:
      count: 100
      period: 1s

This example:

  • Creates a composite message key from the DynamoDB primary key

  • Transforms the DynamoDB format to plain JSON

  • Batches messages for efficient delivery

Route to S3

Archive CDC events to S3 for long-term storage and analytics:

input:
  aws_dynamodb_cdc:
    table: ${DYNAMODB_TABLE}
    region: ${AWS_REGION}
    checkpoint_table: redpanda_dynamodb_checkpoints
    start_from: trim_horizon

pipeline:
  processors:
    # Add partitioning metadata for S3 organization
    - mapping: |
        let event_time = now()
        meta s3_path = "year=%s/month=%s/day=%s/hour=%s".format(
          $event_time.ts_format("2006"),
          $event_time.ts_format("01"),
          $event_time.ts_format("02"),
          $event_time.ts_format("15")
        )
        root.event_type = this.eventName
        root.table = this.tableName
        root.sequence_number = this.dynamodb.sequenceNumber
        root.event_time = $event_time
        root.keys = this.dynamodb.keys
        root.new_image = this.dynamodb.newImage
        root.old_image = this.dynamodb.oldImage

output:
  aws_s3:
    bucket: ${S3_BUCKET}
    path: dynamodb-cdc/${DYNAMODB_TABLE}/${! @s3_path }/${! uuid_v4() }.json
    region: ${AWS_REGION}
    batching:
      count: 1000
      period: 1m
      processors:
        - archive:
            format: lines

This example:

  • Organizes files by time-based partitions (year/month/day/hour)

  • Batches events and archives them as newline-delimited JSON

  • Uses UUID file names to prevent collisions

Route by event type

Route different event types to different destinations:

input:
  aws_dynamodb_cdc:
    table: ${DYNAMODB_TABLE}
    region: ${AWS_REGION}

pipeline:
  processors:
    # Transform to a common format
    - mapping: |
        root.event_type = this.eventName
        root.table = this.tableName
        root.timestamp = now()
        root.keys = this.dynamodb.keys
        root.data = if this.dynamodb.exists("newImage") {
          this.dynamodb.newImage
        } else {
          this.dynamodb.oldImage
        }

output:
  switch:
    cases:
      # Route INSERT events to a topic for new records
      - check: this.event_type == "INSERT"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: dynamodb-inserts

      # Route MODIFY events to a topic for updates
      - check: this.event_type == "MODIFY"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: dynamodb-updates

      # Route REMOVE events to a topic for deletes
      - check: this.event_type == "REMOVE"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: dynamodb-deletes

      # Fallback for any unexpected event types
      - output:
          drop: {}

This pattern:

  • Separates processing pipelines for inserts, updates, and deletes

  • Applies different retention policies per event type

  • Enables specialized downstream consumers

Detect changed fields

Compare old and new images to identify which fields changed:

input:
  aws_dynamodb_cdc:
    table: ${DYNAMODB_TABLE}
    region: ${AWS_REGION}

pipeline:
  processors:
    # Only process MODIFY events
    - mapping: |
        root = if this.eventName != "MODIFY" { deleted() } else { this }

    # Compare old and new images to find changed fields
    - mapping: |
        let old_data = this.dynamodb.oldImage
        let new_data = this.dynamodb.newImage

        root.table = this.tableName
        root.keys = this.dynamodb.keys
        root.timestamp = now()

        # Find fields that changed by comparing key-value pairs
        root.changes = $new_data.key_values().filter(kv -> !$old_data.exists(kv.key) || $old_data.get(kv.key) != kv.value).map_each(kv -> {"field": kv.key, "old_value": if $old_data.exists(kv.key) { $old_data.get(kv.key) } else { null }, "new_value": kv.value})

        # Find fields that were removed
        root.removed_fields = $old_data.keys().filter(k -> !$new_data.exists(k))

output:
  stdout:
    codec: lines

This pattern:

  • Filters to only MODIFY events

  • Compares old and new images to find differences

  • Outputs a list of changed fields with their old and new values

This pattern requires the NEW_AND_OLD_IMAGES stream view type. The .key_values() method converts an object to an array of key-value pairs that can be filtered and mapped.

Checkpointing

The DynamoDB CDC input automatically manages checkpoints in a separate DynamoDB table:

input:
  aws_dynamodb_cdc:
    table: my-table
    checkpoint_table: my-app-checkpoints (1)
    checkpoint_limit: 500 (2)
    start_from: trim_horizon (3)
1 Custom checkpoint table name (default: redpanda_dynamodb_checkpoints).
2 Checkpoint after every 500 messages (lower = better recovery, higher = fewer writes).
3 Start from the oldest available record when no checkpoint exists.

If a checkpoint table doesn’t exist, it’s created automatically with the required schema.

Performance tuning

Optimize throughput and latency with these settings:

input:
  aws_dynamodb_cdc:
    table: my-table
    batch_size: 1000 (1)
    poll_interval: 100ms (2)
    max_tracked_shards: 10000 (3)
    throttle_backoff: 50ms (4)
1 Maximum records per shard per request (1-1000).
2 Time between polls when no records are available.
3 Maximum shards to track (for very large tables).
4 Backpressure delay when too many messages are in-flight.

Throughput considerations

  • DynamoDB Streams allows 5 GetRecords calls per second per shard

  • Higher batch_size improves throughput but increases memory usage

  • Shorter poll_interval reduces latency but increases API calls

Troubleshoot

No events received

If you’re not receiving events:

  1. Verify streams are enabled on the table:

    aws dynamodb describe-table --table-name my-table \
      --query 'Table.StreamSpecification'
  2. Check that changes are being made to the table

  3. Verify start_from is set to trim_horizon to capture existing stream data

Duplicate events

Each stream record appears exactly once in DynamoDB Streams. However, if your pipeline fails before checkpointing, records may be re-read on restart, resulting in at-least-once processing semantics. To handle potential duplicates:

  • Use idempotent processing in downstream systems

  • Deduplicate using the dynamodb_sequence_number metadata

  • Lower checkpoint_limit to reduce the window of possible duplicates

Stream retention

DynamoDB Streams retains data for 24 hours. If your pipeline is offline longer than that:

Suggested reading