Cloud

gcp_bigquery_write_api

Streams data into BigQuery using the Storage Write API.

Writes messages to a BigQuery table using the Storage Write API. This provides higher throughput and lower latency than the legacy streaming API or load jobs.

Messages can be formatted as JSON (default) or raw protobuf bytes. When using JSON format the component automatically fetches the table schema and converts each message to the corresponding proto representation.

The proto3 JSON mapping encodes int64 and uint64 values as strings. JSON messages with integer fields must use string values (e.g. "age": "30" not "age": 30). Otherwise the write will fail with an unmarshalling error.

When batching is enabled the table name is resolved from the first message in each batch. All messages in the same batch are written to that table.

  • Common

  • Advanced

outputs:
  label: ""
  gcp_bigquery_write_api:
    project: ""
    dataset: "" # No default (required)
    table: "" # No default (required)
    message_format: json
    change_type: "" # No default (optional)
    change_sequence_number: "" # No default (optional)
    primary_keys: [] # No default (optional)
    max_in_flight: 4
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    credentials_json: ""
outputs:
  label: ""
  gcp_bigquery_write_api:
    project: ""
    dataset: "" # No default (required)
    table: "" # No default (required)
    message_format: json
    write_mode: default_stream
    change_type: "" # No default (optional)
    change_sequence_number: "" # No default (optional)
    primary_keys: [] # No default (optional)
    auto_create_table: false
    schema: []
    time_partitioning:
      type: "" # No default (optional)
      field: ""
      expiration: 0s
      require_filter: false
    clustering: []
    max_in_flight: 4
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    credentials_json: ""
    target_principal: ""
    delegates: []
    stream_idle_timeout: 5m
    stream_sweep_interval: 1m
    max_cached_streams: 1024
    schema_resolve_timeout: 15s
    schema_evolution_timeout: 30s
    endpoint:
      http: ""
      grpc: ""

Fields

auto_create_table

If true and the target table does not exist, the output creates it using the configured schema, time_partitioning, and clustering. AlreadyExists errors from concurrent creators are treated as success. When the table name is interpolated, every auto-created table receives the same schema and partition/clustering settings.

Type: bool

Default: false

batching

Allows you to configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int

Default: 0

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int

Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string

Default: ""

# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms

batching.processors[]

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate


# ---

processors:
  - archive:
      format: lines


# ---

processors:
  - archive:
      format: json_array

change_sequence_number

Optional Bloblang expression resolving to the _CHANGE_SEQUENCE_NUMBER pseudo-column value. Format: 1 to 4 sections of 1 to 16 hexadecimal characters each, separated by /. Example: ${! metadata("scn") } or ${! "0/0/0/0" }. When unset, BigQuery resolves ordering by arrival time.

This field supports interpolation functions.

Type: string

change_type

Bloblang expression resolving to the _CHANGE_TYPE pseudo-column value for each row. Must resolve to UPSERT or DELETE (case-insensitive). Required when write_mode is upsert or upsert_delete. Example: ${! metadata("operation") }.

This field supports interpolation functions.

Type: string

clustering[]

Optional clustering columns (up to 4) applied during auto_create_table. All names must appear in schema.

Type: array

Default: []

credentials_json

An optional JSON string containing GCP credentials. If empty, credentials are loaded from the environment.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

dataset

The BigQuery dataset ID.

Type: string

delegates[]

Optional delegation chain for chained service account impersonation. Each service account must be granted roles/iam.serviceAccountTokenCreator on the next in the chain.

Type: array

Default: []

endpoint

Optional endpoint overrides for the BigQuery and Storage Write API clients.

Type: object

endpoint.grpc

Override the BigQuery Storage gRPC endpoint. Useful for local emulators.

Type: string

Default: ""

endpoint.http

Override the BigQuery HTTP endpoint. Useful for local emulators.

Type: string

Default: ""

max_cached_streams

Soft cap on the number of cached streams. When the cache exceeds this size, the least-recently-used stream is evicted. Set to 0 for unlimited (rely on idle-timeout sweeping only). Relevant when the table field uses interpolation to route to many tables.

Type: int

Default: 1024

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int

Default: 4

message_format

The format of input messages. Use 'json' to have the component convert JSON to proto automatically. Use 'protobuf' to supply raw proto-encoded bytes.

Type: string

Default: json

Options: json, protobuf

primary_keys[]

Optional list of primary-key column names. Required when auto_create_table is true and write_mode is upsert or upsert_delete. When the target table is pre-existing, the connector falls back to the PRIMARY KEY declared on the table. Up to 16 columns; composite keys are supported in the same order they are listed.

Type: array

project

The GCP project ID. If empty, the project is auto-detected from the environment.

Type: string

Default: ""

schema[]

Column definitions used by auto_create_table. Required when auto_create_table is true.

Type: object

Default: []

schema[].fields[]

For RECORD columns, the list of nested fields. Same shape as the top-level schema list.

Type: array

schema[].mode

Column mode: NULLABLE (default), REQUIRED, or REPEATED.

Type: string

Default: NULLABLE

schema[].name

Column name.

Type: string

schema[].type

BigQuery column type (STRING, BYTES, INTEGER/INT64, FLOAT/FLOAT64, NUMERIC, BIGNUMERIC, BOOLEAN/BOOL, TIMESTAMP, DATE, TIME, DATETIME, GEOGRAPHY, JSON, RECORD).

Type: string

schema_evolution_timeout

Total time budget for a single schema evolution attempt (Metadata + Update across all CAS retries on HTTP 412). Bounds how long the WriteBatch retry loop can be starved by a wedged backend.

Type: string

Default: 30s

schema_resolve_timeout

How long a single BigQuery table-metadata fetch can run before being aborted. Coalesced concurrent resolves share one fetch, so this bounds the time a wedged backend can stall every batch routing to the same table. On the auto_create_table path the budget covers Metadata→Create→Metadata, so it needs to absorb transient backend slowness on top of the metadata fetch itself.

Type: string

Default: 15s

stream_idle_timeout

How long a cached stream can remain unused before being closed. Relevant when the table field uses interpolation to route to many tables.

Type: string

Default: 5m

stream_sweep_interval

How often to check for idle streams to close.

Type: string

Default: 1m

table

The BigQuery table ID. Supports interpolation functions. When batching, resolved from the first message in each batch.

Type: string

target_principal

Service account email to impersonate. When set, the output obtains tokens acting as this service account. Requires the caller to have roles/iam.serviceAccountTokenCreator on the target.

Type: string

Default: ""

time_partitioning

Optional time-partitioning settings applied during auto_create_table. Setting type is the trigger — when omitted, the block is treated as absent.

Type: object

time_partitioning.expiration

Optional partition expiration. Zero means no expiration.

Type: string

Default: 0s

time_partitioning.field

Column to partition on. Must be of type DATE, TIMESTAMP, or DATETIME. If empty, the table uses ingestion-time partitioning (_PARTITIONTIME).

Type: string

Default: ""

time_partitioning.require_filter

If true, queries against the table must filter on the partition column.

Type: bool

Default: false

time_partitioning.type

Partitioning granularity.

Type: string

Options: DAY, HOUR, MONTH, YEAR

write_mode

How the output writes to BigQuery. default_stream uses the multiplexed default stream (at-least-once, lowest latency). pending_stream allocates a per-batch pending stream that commits atomically, providing exactly-once semantics within a single committed batch. upsert writes UPSERT-only rows to a BigQuery CDC-enabled table; the target table must have a PRIMARY KEY. upsert_delete allows both UPSERT and DELETE rows. Both CDC modes use the default stream as required by BigQuery.

Type: string

Default: default_stream

Options: default_stream, pending_stream, upsert, upsert_delete