Connect

TigerBeetle CDC Patterns

The tigerbeetle_cdc input streams financial ledger events from a TigerBeetle cluster in real time. Use these patterns to filter, transform, and route transfer events to Redpanda, S3, and other destinations.

Use this cookbook to:

  • Apply reusable patterns for capturing TigerBeetle CDC events

  • Adapt integration patterns to route financial ledger events to Redpanda and S3

  • Identify patterns for filtering and transforming financial transfer events

The tigerbeetle_cdc input is in beta. The API is subject to change.

Prerequisites

Before using these patterns, configure the following.

Cgo-enabled binary

The tigerbeetle_cdc input requires a cgo-enabled build of Redpanda Connect. The rpk CLI and the standard Docker image do not include the tigerbeetle_cdc input. To get a cgo-enabled binary:

TigerBeetle cluster

The tigerbeetle_cdc input requires a TigerBeetle cluster running version 0.16.57 or later. The Redpanda Connect TigerBeetle client version must not be newer than the cluster version.

For a local cluster, see the TigerBeetle documentation.

Progress cache

The tigerbeetle_cdc input requires a cache to store the last acknowledged event timestamp between restarts. The examples in this cookbook use a Redis cache:

export REDIS_URL=redis://localhost:6379

For production use, any persistent cache supported by Redpanda Connect works, such as aws_dynamodb or sql.

Environment variables

The examples in this cookbook use environment variables for configuration:

export TB_CLUSTER_ID="1" (1)
export TB_REPLICA_1="192.168.1.10:3000" (2)
export TB_REPLICA_2="192.168.1.11:3000"
export TB_REPLICA_3="192.168.1.12:3000"
export REDIS_URL=redis://localhost:6379 (3)
export REDPANDA_BROKERS=localhost:9092 (4)
export S3_BUCKET=cdc-archive (5)
1 The unique 128-bit TigerBeetle cluster ID. Small integers are valid (for example, 1 represents the 128-bit value 1).
2 The addresses of each replica in host:port format. A standard cluster has three replicas.
3 The Redis URL for progress tracking.
4 The Redpanda broker addresses (for Redpanda output examples).
5 The Amazon S3 bucket name (for S3 output examples).

Capture CDC events

The simplest pattern captures all events from a TigerBeetle cluster and outputs them with metadata:

input:
  tigerbeetle_cdc:
    cluster_id: ${TB_CLUSTER_ID}
    addresses:
      - ${TB_REPLICA_1}
      - ${TB_REPLICA_2}
      - ${TB_REPLICA_3}
    progress_cache: redis_cache

pipeline:
  processors:
    - mapping: |
        root.event_type = meta("event_type")
        root.ledger = meta("ledger")
        root.transfer_code = meta("transfer_code")
        root.timestamp_ms = meta("timestamp_ms")
        root.transfer = this.transfer
        root.debit_account = this.debit_account
        root.credit_account = this.credit_account

output:
  stdout:
    codec: lines

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}

For details on the event structure and available metadata fields, see the metadata section in the connector reference.

Each event contains a full snapshot of the transfer and both the debit and credit accounts at the time of the event.

Filter CDC events

Filter events to process only settled transfers (single_phase for immediate and two_phase_posted for completed two-phase) using the event_type metadata field:

input:
  tigerbeetle_cdc:
    cluster_id: ${TB_CLUSTER_ID}
    addresses:
      - ${TB_REPLICA_1}
      - ${TB_REPLICA_2}
      - ${TB_REPLICA_3}
    progress_cache: redis_cache

pipeline:
  processors:
    - mapping: |
        root = if meta("event_type") != "single_phase" && meta("event_type") != "two_phase_posted" {
          deleted()
        }
    - mapping: |
        root.event_type = meta("event_type")
        root.ledger = meta("ledger")
        root.transfer_id = this.transfer.id
        root.amount = this.transfer.amount
        root.debit_account_id = this.debit_account.id
        root.credit_account_id = this.credit_account.id
        root.timestamp_ms = meta("timestamp_ms")

output:
  stdout:
    codec: lines

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}

This pattern:

  • Keeps only single_phase and two_phase_posted events, dropping pending, voided, and expired events

  • Flattens the event to key transfer fields and a millisecond timestamp

Route to Redpanda

Stream TigerBeetle transfer events to Redpanda, partitioned by ledger code:

input:
  tigerbeetle_cdc:
    cluster_id: ${TB_CLUSTER_ID}
    addresses:
      - ${TB_REPLICA_1}
      - ${TB_REPLICA_2}
      - ${TB_REPLICA_3}
    progress_cache: redis_cache

pipeline:
  processors:
    - mapping: |
        meta topic = "transfers." + meta("ledger")

output:
  redpanda:
    seed_brokers:
      - ${REDPANDA_BROKERS}
    topic: ${! meta("topic") }
    key: ${! json("transfer.id") }
    batching:
      count: 100
      period: 1s

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}

This pattern:

  • Creates a Redpanda topic per ledger (for example, transfers.1, transfers.2)

  • Batches messages for efficient delivery

  • Sets the message key to the transfer ID

Route to S3

Archive transfer events to Amazon S3 for long-term storage and analytics:

input:
  tigerbeetle_cdc:
    cluster_id: ${TB_CLUSTER_ID}
    addresses:
      - ${TB_REPLICA_1}
      - ${TB_REPLICA_2}
      - ${TB_REPLICA_3}
    progress_cache: redis_cache

pipeline:
  processors:
    - mapping: |
        root.event_type = meta("event_type")
        root.ledger = meta("ledger")
        root.transfer = this.transfer
        root.debit_account = this.debit_account
        root.credit_account = this.credit_account
        root.timestamp_ms = meta("timestamp_ms")

output:
  aws_s3:
    bucket: ${S3_BUCKET}
    path: >-
      cdc/ledger/${! meta("ledger") }/${! timestamp_unix().format_timestamp("2006/01/02/15") }/${! uuid_v4() }.ndjson
    batching:
      count: 1000
      period: 5m
      processors:
        - archive:
            format: lines

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}

This pattern:

  • Organizes files by ledger code and time-based partitions (year/month/day/hour)

  • Batches events and archives them as newline-delimited JSON

  • Uses UUID file names to prevent collisions

Route by event type

Route single-phase and two-phase transfers to separate Redpanda topics:

input:
  tigerbeetle_cdc:
    cluster_id: ${TB_CLUSTER_ID}
    addresses:
      - ${TB_REPLICA_1}
      - ${TB_REPLICA_2}
      - ${TB_REPLICA_3}
    progress_cache: redis_cache

output:
  switch:
    cases:
      - check: meta("event_type") == "single_phase"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: tigerbeetle.single_phase
      - check: meta("event_type") == "two_phase_posted"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: tigerbeetle.two_phase
      - output:
          drop: {}

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}

This pattern:

  • Routes single_phase events (immediate settlements) to one topic

  • Routes two_phase_posted events (settled two-phase transfers) to another topic

  • Drops other two-phase events (two_phase_pending, two_phase_voided, two_phase_expired) that have not yet settled

  • Supports specialized consumers for each settlement model

Resume from a timestamp

By default, the tigerbeetle_cdc input streams all events available in the cluster. To start from a specific point in time, set timestamp_initial to a TigerBeetle nanosecond timestamp:

input:
  tigerbeetle_cdc:
    cluster_id: ${TB_CLUSTER_ID}
    addresses:
      - ${TB_REPLICA_1}
      - ${TB_REPLICA_2}
      - ${TB_REPLICA_3}
    progress_cache: redis_cache
    timestamp_initial: "1745328372758695656" (1)

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}
1 Start from events at or after this nanosecond timestamp. Ignored if progress_cache already contains a more recent acknowledged timestamp.

Troubleshoot common issues

Use these steps to diagnose and fix common problems with the tigerbeetle_cdc input.

Component not available

If the pipeline fails with an error that tigerbeetle_cdc is not a recognized input:

  • Confirm you are using a cgo-enabled build of Redpanda Connect, not rpk or the standard Docker image.

  • Verify the binary version supports tigerbeetle_cdc (introduced in version 4.65.0).

Cannot connect to cluster

If the pipeline fails to connect to TigerBeetle:

  • Verify the addresses list contains the correct IP and port for each replica.

  • Confirm cluster_id matches the ID used when creating the cluster.

  • Check network access between Redpanda Connect and the TigerBeetle replicas.

Duplicate events after restart

The tigerbeetle_cdc input provides at-least-once delivery. During crash recovery, the pipeline may replay events that were already delivered. To handle duplicates:

  • Use idempotent processing in downstream systems.

  • Deduplicate using the timestamp metadata field, which is unique per event with nanosecond precision.