Create a Google BigQuery Sink Connector

The Google BigQuery Sink connector enables you to stream any structured data from Redpanda to BigQuery for advanced analytics.

Prerequisites

Before you can create a Google BigQuery Sink connector in the Redpanda Cloud, you must:

  1. Create a Google Cloud account.

  2. In the Google home page:

    1. Select an existing project or create a new one.

    2. Create a new dataset for the project.

    3. (Optional if your data has a schema) After creating the dataset, create a new table to hold the data you intend to stream from Redpanda Cloud topics. Specify a structure for the table using schema values that align with your Redpanda topic data.

      This step is mandatory only if the data in Redpanda does not have a schema. If the data in Redpanda includes a schema, then the connector automatically creates the tables in BigQuery.
  3. Create a custom role.

    The role must have the following permissions:

    bigquery.datasets.get
    bigquery.tables.create
    bigquery.tables.get
    bigquery.tables.getData
    bigquery.tables.list
    bigquery.tables.update
    bigquery.tables.updateData
  4. Create a service account.

  5. Add the custom role to your service account.

  6. Create a service account key, and then download it.

Limitations

The Google BigQuery Sink connector doesn’t support schemas with recursion.

Create a Google BigQuery Sink connector

To create the Google BigQuery Sink connector:

  1. In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.

  2. Select Export to Google BigQuery.

  3. On the Create Connector page, specify the following required connector configuration options:

    Property name Property key Description

    Topics to export

    topics

    A comma-separated list of the cluster topics you want to replicate to Google BigQuery.

    Topics regex

    topics.regex

    A Java regular expression of topics to replicate. For example: specify .* to replicate all available topics in the cluster. Applicable only when Use regular expressions is selected.

    Credentials JSON

    keyfile

    A JSON key with BigQuery service account credentials.

    Project

    project

    The BigQuery project to which topic data will be written.

    Default dataset

    defaultDataset

    The default Google BigQuery dataset to be used.

    Kafka message value format

    value.converter

    The format of the value in the Redpanda topic. The default is JSON.

    Max Tasks

    tasks.max

    Maximum number of tasks to use for this connector. The default is 1. Each task replicates exclusive set of partitions assigned to it.

    Connector name

    name

    Globally-unique name to use for this connector.

  4. Click Next. Review the connector properties specified, then click Create.

Advanced Google BigQuery Sink connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require any additional property settings (for example, automatically create BigQuery tables or map topics to tables), then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:

Property name Property key Description

Auto create tables

autoCreateTables

Automatically create BigQuery tables if they don’t already exist. If the table does not exist, then it is created based on the record schema.

Topic to table map

topic2TableMap

Map of topics to tables. Format: comma-separated tuples, for example topic1:table1,topic2:table2.

Allow new BigQuery fields

allowNewBigQueryFields

If true, new fields can be added to BigQuery tables during subsequent schema updates.

Allow BigQuery required field relaxation

allowBigQueryRequiredFieldRelaxation

If true, fields in the BigQuery schema can be changed from REQUIRED to NULLABLE.

Upsert enabled

upsertEnabled

Enables upsert functionality on the connector.

Delete enabled

deleteEnabled

Enable delete functionality on the connector.

Kafka key field name

kafkaKeyFieldName

The name of the BigQuery table field for the Kafka key. Must be set when upsert or delete is enabled.

Time partitioning type

timePartitioningType

The time partitioning type to use when creating tables.

BigQuery retry attempts

bigQueryRetry

The number of retry attempts made for each BigQuery request that fails with a backend or quota exceeded error.

BigQuery retry attempts interval

bigQueryRetryWait

The minimum amount of time, in milliseconds, to wait between BigQuery backend or quota exceeded error retry attempts.

Error tolerance

errors.tolerance

Error tolerance response during connector operation. Default value is none and signals that any error will result in an immediate connector task failure. Value of all changes the behavior to skip over problematic records.

Dead letter queue topic name

errors.deadletterqueue.topic.name

The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, its transformations, or converters. The topic name is blank by default, which means that no messages are recorded in the DLQ.

Dead letter queue topic replication factor

errors.deadletterqueue.topic .replication.factor

Replication factor used to create the dead letter queue topic when it doesn’t already exist.

Enable error context headers

errors.deadletterqueue.context .headers.enable

When true, adds a header containing error context to the messages written to the dead letter queue. To avoid clashing with headers from the original record, all error context header keys, start with __connect.errors.

Map data

Use the appropriate key or value converter (input data format) for your data as follows:

  • JSON (org.apache.kafka.connect.json.JsonConverter) when your messages are JSON-encoded. Select Message JSON contains schema, with the schema and payload fields. If your messages do not contain schema, manually create tables in BigQuery.

  • AVRO (io.confluent.connect.avro.AvroConverter) when your messages contain AVRO-encoded messages, with schema stored in the Schema Registry.

By default, the table name is the name of the topic (non-alphanumeric characters replaced with an underscore character, _).

Use Topic to table map to remap topic names. For example, topic1:table1,topic2:table2.

Test the connection

After the connector is created, go to your BigQuery worksheets and query your table:

SELECT * FROM `project.dataset.table`

It may take a couple of minutes for the records to be visible in BigQuery.

Troubleshoot

Google credentials are checked for validity during connector creation, upon clicking Finish. In cases where there are invalid credentials, the connector is not created.

Other issues are reported using a failed task error message. Select Show Logs to view error details.

Message Action

Not found: Project invalid-project-name

Check to make sure Project contains a valid BigQuery project.

Not found: Dataset project:invalid-dataset

Check to make sure Default dataset contains a valid BigQuery dataset.

An unexpected error occurred while validating credentials for BigQuery: Failed to create credentials from input stream

The credentials given as a JSON file in the Credentials JSON property are incorrect. Copy a valid key from the Google Cloud service account.

JsonConverter with schemas.enable requires "schema" and "payload" fields

The connector encountered an incorrect message format when reading from a topic.

JsonParseException: Unrecognized token 'test': was expecting JSON

During reading from a topic the connector encountered a message that is invalid JSON.

Streaming to metadata partition of column-based partitioning table {table_name} is disallowed.

Check to confirm that the bigQueryPartitionDecorator property is set to false. You can check the property in the connector configuration JSON view.

Caused by: table: GenericData{classInfo=…​ insertion failed for the following rows:…​ no such field:

The Redpanda message contains a property that does not exist in a BigQuery table schema.

BigQueryConnectException …​ insertion failed for the following rows: …​ [row index 0] (location fieldname[0], reason: invalid): This field: fieldname is not a record.

The Redpanda message contains an array of records, but the BigQuery table expects an array of strings.

BigQueryConnectException: Failed to unionize schemas of records for the table…​ Could not convert to BigQuery schema with a batch of tombstone records.

The Redpanda message does not contain a schema, so the connector cannot create a BigQuery table. Create the BigQuery table manually.