Create a Google BigQuery Sink Connector
The Google BigQuery Sink connector enables you to stream any structured data from Redpanda to BigQuery for advanced analytics.
Prerequisites
Before you can create a Google BigQuery Sink connector in the Redpanda Cloud, you must:
-
Create a Google Cloud account.
-
In the Google home page:
-
Create a new dataset for the project.
-
(Optional if your data has a schema) After creating the dataset, create a new table to hold the data you intend to stream from Redpanda Cloud topics. Specify a structure for the table using schema values that align with your Redpanda topic data.
This step is mandatory only if the data in Redpanda does not have a schema. If the data in Redpanda includes a schema, then the connector automatically creates the tables in BigQuery.
-
Create a custom role.
The role must have the following permissions:
bigquery.datasets.get bigquery.tables.create bigquery.tables.get bigquery.tables.getData bigquery.tables.list bigquery.tables.update bigquery.tables.updateData
-
Create a service account.
-
Create a service account key, and then download it.
Create a Google BigQuery Sink connector
To create the Google BigQuery Sink connector:
-
In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.
-
Select Export to Google BigQuery.
-
On the Create Connector page, specify the following required connector configuration options:
Property name Property key Description Topics to export
topics
A comma-separated list of the cluster topics you want to replicate to Google BigQuery.
Topics regex
topics.regex
A Java regular expression of topics to replicate. For example: specify
.*
to replicate all available topics in the cluster. Applicable only when Use regular expressions is selected.Credentials JSON
keyfile
A JSON key with BigQuery service account credentials.
Project
project
The BigQuery project to which topic data will be written.
Default dataset
defaultDataset
The default Google BigQuery dataset to be used.
Kafka message value format
value.converter
The format of the value in the Kafka topic. The default is
JSON
.Max Tasks
tasks.max
Maximum number of tasks to use for this connector. The default is
1
. Each task replicates exclusive set of partitions assigned to it.Connector name
name
Globally-unique name to use for this connector.
-
Click Next. Review the connector properties specified, then click Create.
Advanced Google BigQuery Sink connector configuration
In most instances, the preceding basic configuration properties are sufficient. If you require any additional property settings (for example, automatically create BigQuery tables or map topics to tables), then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:
Property name | Property key | Description |
---|---|---|
|
|
Automatically create BigQuery tables if they don’t already exist. If the table does not exist, then it is created based on the record schema. |
|
|
Map of topics to tables. Format: comma-separated tuples, for example |
|
|
If true, new fields can be added to BigQuery tables during subsequent schema updates. |
|
|
If true, fields in the BigQuery schema can be changed from |
|
|
Enables upsert functionality on the connector. |
|
|
Enable delete functionality on the connector. |
|
|
The name of the BigQuery table field for the Kafka key. Must be set when upsert or delete is enabled. |
|
|
The time partitioning type to use when creating tables. |
|
|
The number of retry attempts made for each BigQuery request that fails with a backend or quota exceeded error. |
|
|
The minimum amount of time, in milliseconds, to wait between BigQuery backend or quota exceeded error retry attempts. |
|
|
Error tolerance response during connector operation. Default value is |
|
|
The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, its transformations, or converters. The topic name is blank by default, which means that no messages are recorded in the DLQ. |
|
|
Replication factor used to create the dead letter queue topic when it doesn’t already exist. |
|
|
When |
Map data
Use the appropriate key or value converter (input data format) for your data as follows:
-
JSON
(org.apache.kafka.connect.json.JsonConverter
) when your messages are JSON-encoded. SelectMessage JSON contains schema
, with theschema
andpayload
fields. If your messages do not contain schema, manually create tables in BigQuery. -
AVRO
(io.confluent.connect.avro.AvroConverter
) when your messages contain AVRO-encoded messages, with schema stored in the Schema Registry.
By default, the table name is the name of the topic (non-alphanumeric characters replaced with an underscore character, _
).
Use Topic to table map
to remap topic names. For example, topic1:table1,topic2:table2
.
Test the connection
After the connector is created, go to your BigQuery worksheets and query your table:
SELECT * FROM `project.dataset.table`
It may take a couple of minutes for the records to be visible in BigQuery.
Troubleshoot
Google credentials are checked for validity during connector creation, upon clicking Finish. In cases where there are invalid credentials, the connector is not created.
Other issues are reported using a failed task error message. Select Show Logs to view error details.
Message | Action |
---|---|
Not found: Project invalid-project-name |
Check to make sure |
Not found: Dataset project:invalid-dataset |
Check to make sure |
An unexpected error occurred while validating credentials for BigQuery: Failed to create credentials from input stream |
The credentials given as a JSON file in the |
JsonConverter with schemas.enable requires "schema" and "payload" fields |
The connector encountered an incorrect message format when reading from a topic. |
JsonParseException: Unrecognized token 'test': was expecting JSON |
During reading from a topic the connector encountered a message that is invalid JSON. |
Streaming to metadata partition of column-based partitioning table {table_name} is disallowed. |
Check to confirm that the |
Caused by: table: GenericData{classInfo=… insertion failed for the following rows:… no such field: |
The Redpanda message contains a property that does not exist in a BigQuery table schema. |
BigQueryConnectException … insertion failed for the following rows: … [row index 0] (location fieldname[0], reason: invalid): This field: fieldname is not a record. |
The Redpanda message contains an array of records, but the BigQuery table expects an array of strings. |
BigQueryConnectException: Failed to unionize schemas of records for the table… Could not convert to BigQuery schema with a batch of tombstone records. |
The Redpanda message does not contain a schema, so the connector cannot create a BigQuery table. Create the BigQuery table manually. |