gcp_bigquery_select

Executes a SELECT query against BigQuery and replaces messages with the rows returned.

Introduced in version 3.64.0.

# Config fields, showing default values
label: ""
gcp_bigquery_select:
  project: "" # No default (required)
  credentials_json: "" # No default (optional)
  table: bigquery-public-data.samples.shakespeare # No default (required)
  columns: [] # No default (required)
  where: type = ? and created_at > ? # No default (optional)
  job_labels: {}
  args_mapping: root = [ "article", now().ts_format("2006-01-02") ] # No default (optional)
  prefix: "" # No default (optional)
  suffix: "" # No default (optional)

Examples

  • Word count

Given a stream of English terms, enrich the messages with the word count from Shakespeare’s public works:

pipeline:
  processors:
    - branch:
        processors:
          - gcp_bigquery_select:
              project: test-project
              table: bigquery-public-data.samples.shakespeare
              columns:
                - word
                - sum(word_count) as total_count
              where: word = ?
              suffix: |
                GROUP BY word
                ORDER BY total_count DESC
                LIMIT 10
              args_mapping: root = [ this.term ]
        result_map: |
          root.count = this.get("0.total_count")

Fields

project

GCP project where the query job will execute.

Type: string

credentials_json

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

table

Fully-qualified BigQuery table name to query.

Type: string

# Examples

table: bigquery-public-data.samples.shakespeare

columns

A list of columns to query.

Type: array

where

An optional where clause to add. Placeholder arguments are populated with the args_mapping field. Placeholders should always be question marks (?).

Type: string

# Examples

where: type = ? and created_at > ?

where: user_id = ?

job_labels

A list of labels to add to the query job.

Type: object

Default: {}

args_mapping

An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field where.

Type: string

# Examples

args_mapping: root = [ "article", now().ts_format("2006-01-02") ]

prefix

An optional prefix to prepend to the select query (before SELECT).

Type: string

suffix

An optional suffix to append to the select query.

Type: string