Connect

Stream data to BigQuery with the Storage Write API

The gcp_bigquery_write_api output streams messages into BigQuery using the Storage Write API, which offers higher throughput and lower latency than the legacy streaming API or load jobs. This cookbook provides reusable patterns for streaming inserts, change data capture (CDC) upserts and deletes, automatic table creation, multi-table routing, and throughput tuning.

Use this cookbook to:

  • Find patterns for streaming JSON into BigQuery

  • Look up CDC upsert and delete patterns for BigQuery

  • Identify auto-create, multi-table routing, and throughput-tuning patterns

Prerequisites

Before using these patterns, ensure you have the following configured:

Redpanda CLI

Install the Redpanda CLI (rpk) to run Redpanda Connect. See Get Started with Redpanda Connect using rpk for installation instructions.

Enterprise license

The gcp_bigquery_write_api output is an enterprise component. To use it, you need a Redpanda Connect Enterprise license. See Enterprise Licensing.

BigQuery dataset and permissions

You need a Google Cloud project with a BigQuery dataset, and a service account with permission to write to (and, for the auto-create pattern, create) tables in that dataset. The BigQuery Data Editor (roles/bigquery.dataEditor) role covers both.

The output loads credentials from the standard Application Default Credentials chain. You can also supply a service account key directly with the credentials_json field.

Environment variables

The examples in this cookbook use environment variables to keep configuration separate from credentials.

export GCP_PROJECT=my-project (1)
export BQ_DATASET=analytics (2)
export BQ_TABLE=events (3)
export REDPANDA_BROKERS=localhost:9092 (4)
export SOURCE_TOPIC=events (5)
1 The Google Cloud project ID that owns the dataset.
2 The BigQuery dataset that holds the destination tables.
3 The destination table for single-table examples.
4 The Redpanda broker addresses for the source topic.
5 The Redpanda topic to consume from.

Stream JSON to BigQuery

The simplest pattern consumes JSON messages from a Redpanda topic and streams them into a single table using the default stream, which provides at-least-once delivery with the lowest latency:

input:
  kafka_franz:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: ["${SOURCE_TOPIC}"]
    consumer_group: bigquery_sink

pipeline:
  processors:
    # BigQuery's proto3 JSON mapping encodes int64/uint64 as strings, so
    # convert integer columns to strings before writing.
    - mapping: |
        root = this
        root.id = this.id.string()

output:
  gcp_bigquery_write_api:
    project: ${GCP_PROJECT}
    dataset: ${BQ_DATASET}
    table: ${BQ_TABLE}
    message_format: json
    write_mode: default_stream
    batching:
      count: 1000
      period: 5s

The proto3 JSON mapping encodes int64 and uint64 values as strings. When a message maps to an INTEGER (INT64) column, the JSON field must use a string value, for example "age": "30" rather than "age": 30. Otherwise the write fails with an unmarshalling error.

For the full list of message formats and connection options, see the connector reference.

Guarantee exactly-once delivery per batch

To commit each batch atomically with exactly-once semantics, set write_mode to pending_stream. Each batch is written to a per-batch pending stream that commits as a single unit:

input:
  kafka_franz:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: ["${SOURCE_TOPIC}"]
    consumer_group: bigquery_sink

pipeline:
  processors:
    # Project only the columns the table expects and convert the integer id to
    # a string for BigQuery's proto3 JSON mapping.
    - mapping: |
        root.id = this.id.string()
        root.event = this.event

output:
  gcp_bigquery_write_api:
    project: ${GCP_PROJECT}
    dataset: ${BQ_DATASET}
    table: ${BQ_TABLE}
    message_format: json
    write_mode: pending_stream
    batching:
      count: 5000
      period: 10s

Exactly-once applies within a single committed batch. Tune batch size to balance latency against commit overhead. See Message Batching.

Create the destination table automatically

Set auto_create_table to true to have Redpanda Connect create the table if it does not exist, using the supplied schema. You can also define partitioning and clustering for the new table:

input:
  kafka_franz:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: ["${SOURCE_TOPIC}"]
    consumer_group: bigquery_sink

pipeline:
  processors:
    # Shape each message to match the table schema. The id column is INTEGER,
    # so send it as a string for BigQuery's proto3 JSON mapping.
    - mapping: |
        root.id = this.id.string()
        root.name = this.name
        root.created_at = this.created_at

output:
  gcp_bigquery_write_api:
    project: ${GCP_PROJECT}
    dataset: ${BQ_DATASET}
    table: ${BQ_TABLE}
    message_format: json
    auto_create_table: true
    schema:
      - name: id
        type: INTEGER
        mode: REQUIRED
      - name: name
        type: STRING
      - name: created_at
        type: TIMESTAMP
        mode: REQUIRED
    time_partitioning:
      type: DAY
      field: created_at
    clustering:
      - id
    batching:
      count: 1000
      period: 5s

Each schema column accepts a name, type, and optional mode (NULLABLE, REQUIRED, or REPEATED). Partitioning and clustering apply only when the table is created.

Apply CDC upserts

To keep a BigQuery table in sync with a changing source, use change data capture (CDC) upserts. Set write_mode to upsert, declare the primary_keys, and provide a change_type expression. BigQuery injects the _CHANGE_TYPE pseudo-column for each row:

input:
  kafka_franz:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: ["${SOURCE_TOPIC}"]
    consumer_group: bigquery_sink

pipeline:
  processors:
    # Stringify the integer primary key for BigQuery's proto3 JSON mapping.
    - mapping: |
        root = this
        root.id = this.id.string()

output:
  gcp_bigquery_write_api:
    project: ${GCP_PROJECT}
    dataset: ${BQ_DATASET}
    table: ${BQ_TABLE}
    message_format: json
    write_mode: upsert
    primary_keys:
      - id
    change_type: '${! "UPSERT" }'
    auto_create_table: true
    schema:
      - name: id
        type: INTEGER
        mode: REQUIRED
      - name: name
        type: STRING
      - name: updated_at
        type: TIMESTAMP
    batching:
      count: 1000
      period: 5s

In upsert mode, change_type must resolve to UPSERT for every row. The target table must have a primary key: when auto_create_table is true, primary_keys defines it; for a pre-existing table, the output falls back to the PRIMARY KEY declared on the table. Up to 16 columns are supported, and composite keys are matched in the order listed.

CDC modes write through the default stream, as required by BigQuery, so they provide at-least-once delivery rather than the per-batch exactly-once semantics of pending_stream.

Handle inserts, updates, and deletes

To propagate deletes as well as upserts, set write_mode to upsert_delete and resolve change_type to either UPSERT or DELETE per row (case-insensitive). The following example derives the change type from a source operation field in a mapping processor, stores it in metadata, and references it from the output. It also uses change_sequence_number to control ordering when rows for the same key can arrive out of order:

input:
  kafka_franz:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: ["${SOURCE_TOPIC}"]
    consumer_group: bigquery_sink

pipeline:
  processors:
    # Derive the CDC change type and sequence number from the source event.
    # This example assumes a Debezium-style envelope with an "op" field
    # ("c"/"u" for upsert, "d" for delete), the row in "after", and a log
    # sequence number in "lsn".
    - mapping: |
        root = this.after
        root.id = this.after.id.string()
        meta change_type = if this.op == "d" { "DELETE" } else { "UPSERT" }
        meta change_seq = this.lsn.string()

output:
  gcp_bigquery_write_api:
    project: ${GCP_PROJECT}
    dataset: ${BQ_DATASET}
    table: ${BQ_TABLE}
    message_format: json
    write_mode: upsert_delete
    primary_keys:
      - id
    change_type: '${! metadata("change_type") }'
    # Order changes by a monotonic sequence number so a late-arriving older
    # change cannot overwrite a newer row.
    change_sequence_number: '${! metadata("change_seq") }'
    batching:
      count: 1000
      period: 5s

When change_sequence_number is unset, BigQuery resolves ordering by arrival time. Set it from a monotonic source value (such as a log sequence number) so that a late-arriving older change cannot overwrite a newer one.

Route to multiple tables

The table field supports interpolation, so you can route messages to different tables based on their content or metadata. The table name is resolved from the first message in each batch, so batch by the routing key to keep each batch single-table:

input:
  kafka_franz:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: ["${SOURCE_TOPIC}"]
    consumer_group: bigquery_sink

pipeline:
  processors:
    # Stringify the integer id for BigQuery's proto3 JSON mapping.
    - mapping: |
        root = this
        root.id = this.id.string()
    # Split each incoming batch into single-table batches keyed by event_type.
    # The output resolves the table from the first message in each batch, so
    # every batch must contain rows for one table only.
    - group_by_value:
        value: '${! json("event_type") }'

output:
  gcp_bigquery_write_api:
    project: ${GCP_PROJECT}
    dataset: ${BQ_DATASET}
    table: '${! json("event_type") }'
    message_format: json
    # No batching block here: this preserves the single-table batches produced
    # by group_by_value. Adding an output batching policy would re-mix tables.
    max_in_flight: 8
    max_cached_streams: 1024

When routing across many tables, raise max_cached_streams so the output can keep a write stream open per active table.

Tune batching and throughput

Throughput depends on batch size and the number of concurrent in-flight requests. Increase max_in_flight to send more batches in parallel, and size batches with a batching policy:

input:
  kafka_franz:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: ["${SOURCE_TOPIC}"]
    consumer_group: bigquery_sink

pipeline:
  processors:
    # Stringify the integer id for BigQuery's proto3 JSON mapping.
    - mapping: |
        root = this
        root.id = this.id.string()

output:
  gcp_bigquery_write_api:
    project: ${GCP_PROJECT}
    dataset: ${BQ_DATASET}
    table: ${BQ_TABLE}
    message_format: json
    write_mode: default_stream
    # Send more batches concurrently for higher throughput.
    max_in_flight: 16
    # Larger batches reduce per-request overhead at the cost of latency.
    batching:
      count: 10000
      byte_size: 5000000
      period: 10s

Troubleshoot

Writes fail with an unmarshalling error

BigQuery rejects integer fields supplied as JSON numbers. Ensure int64 and uint64 values are encoded as strings, for example "count": "42". Use a mapping processor to convert numeric fields to strings before the output if your source emits them as numbers.

Schema resolution times out

The output fetches table metadata before writing. If you route to many tables or a backend is slow, increase schema_resolve_timeout (default 15s). On the auto_create_table path this budget also covers table creation, so allow extra headroom.

CDC writes are rejected

For upsert and upsert_delete modes, confirm the target table has a primary key and that change_type resolves to a valid value (UPSERT, or DELETE for upsert_delete). Rows that resolve to an invalid change type are rejected by BigQuery.