gcp_bigquery

Inserts message data as new rows in a Google Cloud BigQuery table.

  • Common

  • Advanced

# Common configuration fields, showing default values
output:
  label: ""
  gcp_bigquery:
    project: ""
    job_project: ""
    dataset: "" # No default (required)
    table: "" # No default (required)
    format: NEWLINE_DELIMITED_JSON
    max_in_flight: 64
    job_labels: {}
    credentials_json: "" # No default (optional)
    csv:
      header: []
      field_delimiter: ','
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
# All configuration fields, showing default values
output:
  label: ""
  gcp_bigquery:
    project: ""
    job_project: ""
    dataset: "" # No default (required)
    table: "" # No default (required)
    format: NEWLINE_DELIMITED_JSON
    max_in_flight: 64
    write_disposition: WRITE_APPEND
    create_disposition: CREATE_IF_NEEDED
    ignore_unknown_values: false
    max_bad_records: 0
    auto_detect: false
    job_labels: {}
    credentials_json: "" # No default (optional)
    csv:
      header: []
      field_delimiter: ','
      allow_jagged_rows: false
      allow_quoted_newlines: false
      encoding: UTF-8
      skip_leading_rows: 1
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)

Credentials

By default, Redpanda Connect uses a shared credentials file when connecting to GCP services.

Format

The gcp_bigquery output currently supports only NEWLINE_DELIMITED_JSON, CSV and PARQUET formats. To learn more about how to use BigQuery with these formats, see the following documentation:

Newline-delimited JSON

Each JSON message may contain multiple elements separated by newlines. For example, a single message containing:

{"key": "1"}
{"key": "2"}

Is equivalent to two separate messages:

{"key": "1"}

And:

{"key": "2"}

The same is true for the CSV format.

CSV

When the field csv.header is specified for the CSV format, a header row is inserted as the first line of each message batch. If this field is not provided, then the first message of each message batch must include a header line.

Parquet

Each message sent to this output must be a Parquet file. You can use the parquet_encode processor to convert message data into the correct format. For example:

input:
  generate:
    mapping: |
      root = {
        "foo": random_int(),
        "bar": uuid_v4(),
        "time": now(),
      }
    interval: 0
    count: 1000
    batch_size: 1000
pipeline:
  processors:
    - parquet_encode:
        schema:
          - name: foo
            type: INT64
          - name: bar
            type: UTF8
          - name: time
            type: UTF8
        default_compression: zstd
output:
  gcp_bigquery:
    project: "${PROJECT}"
    dataset: "my_bq_dataset"
    table: "redpanda_connect_ingest"
    format: PARQUET

Performance

The gcp_bigquery output benefits from sending multiple messages in parallel for improved performance. You can tune the maximum number of in-flight messages (or message batches) with the field max_in_flight.

This output also sends messages as a batch for improved performance. Redpanda Connect can form batches at both the input and output level. For more information, see Message Batching.

Fields

project

Specify the project ID of the dataset to insert data into. If not set, the project ID is inferred from the project linked to the service account or read from the GOOGLE_CLOUD_PROJECT environment variable.

Type: string

Default: ""

job_project

Specify the project ID in which jobs are executed. If not set, the project value is used.

Type: string

Default: ""

dataset

The BigQuery Dataset ID.

Type: string

table

The table to insert messages into.

Type: string

format

The format of each incoming message.

Type: string

Default: NEWLINE_DELIMITED_JSON

Options: NEWLINE_DELIMITED_JSON , CSV

max_in_flight

The maximum number of message batches to have in flight at a given time. Increase this value to improve throughput.

Type: int

Default: 64

write_disposition

Specifies how existing data in a destination table is treated.

Type: string

Default: "WRITE_APPEND"

Options: WRITE_APPEND , WRITE_EMPTY , WRITE_TRUNCATE .

create_disposition

Specifies the circumstances under which a destination table is created.

  • Use CREATE_IF_NEEDED to create the destination table if it does not already exist. Tables are created atomically on successful completion of a job.

  • Use CREATE_NEVER if the destination table must already exist.

Type: string

Default: CREATE_IF_NEEDED

Options: CREATE_IF_NEEDED , CREATE_NEVER .

ignore_unknown_values

Set this value to true to ignore values that do not match the schema:

  • For the CSV format, extra values at the end of a line are ignored.

  • For the NEWLINE_DELIMITED_JSON format, values that do not match any column name are ignored.

By default, this value is set to false, and records containing unknown values are treated as bad records. Use the max_bad_records field to customize how bad records are handled.

Type: bool

Default: false

max_bad_records

The maximum number of bad records to ignore when reading data and ignore_unknown_values is set to true.

Type: int

Default: 0

auto_detect

Whether this component automatically infers the options and schema for CSV and NEWLINE_DELIMITED_JSON sources.

If this value is set to false and the destination table doesn’t exist, the output throws an insertion error as it is unable to insert data.

This field delegates schema detection to the GCP BigQuery service. For the CSV format, values like no may be treated as booleans.

Type: bool

Default: false

job_labels

A list of labels to add to the load job.

Type: object

Default: {}

credentials_json

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

csv

Specify how CSV data is interpreted.

Type: object

csv.header

A list of values to use as the header for each batch of messages. If not specified, the first line of each message is used as the header.

Type: array

Default: []

csv.field_delimiter

The separator for fields in a CSV file. The output uses this value when reading or exporting data.

Type: string

Default: ","

csv.allow_jagged_rows

Set to true to treat optional missing trailing columns as nulls in CSV data.

Type: bool

Default: false

csv.allow_quoted_newlines

Whether quoted data sections containing new lines are allowed when reading CSV data.

Type: bool

Default: false

csv.encoding

The character encoding of CSV data.

Type: string

Default: "UTF-8"

Options: UTF-8 , ISO-8859-1 .

csv.skip_leading_rows

The number of rows at the top of a CSV file that BigQuery will skip when reading data. The default value is 1, which allows Redpanda Connect to add the specified header in the first line of each batch sent to BigQuery.

Type: int

Default: 1

batching

Configure a batching policy.

Type: object

# Examples

batching:
  byte_size: 5000
  count: 0
  period: 1s

batching:
  count: 10
  period: 1s

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.byte_size

The amount of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.period

The period of time after which an incomplete batch is flushed regardless of its size.

Type: string

Default: ""

# Examples

period: 1s

period: 1m

period: 500ms

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples

check: this.type == "end_of_transaction"

batching.processors

For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.

Type: array

# Examples

processors:
  - archive:
      format: concatenate

processors:
  - archive:
      format: lines

processors:
  - archive:
      format: json_array