# Create a GCS Sink Connector

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: Create a GCS Sink Connector
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: managed-connectors/create-gcs-connector
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: managed-connectors/create-gcs-connector.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/managed-connectors/create-gcs-connector.adoc
description: Use the Redpanda Cloud UI to create a GCS Sink Connector.
page-git-created-date: "2024-06-06"
page-git-modified-date: "2025-08-05"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/managed-connectors/create-gcs-connector.md -->

> ❗ **IMPORTANT**
>
> -   To enable this feature, contact [Redpanda Support](https://support.redpanda.com/hc/en-us/requests/new). To disable this feature, see [Disable Kafka Connect](https://docs.redpanda.com/cloud-data-platform/develop/managed-connectors/disable-kc/).
>
> -   Redpanda Support does not manage or monitor Kafka Connect. For fully-supported connectors, consider [Redpanda Connect](https://docs.redpanda.com/cloud-data-platform/develop/connect/about/).
>
> -   When Kafka Connect is enabled, there is a dedicated node running even when no connectors are deployed.

The Google Cloud Storage (GCS) Sink connector stores Redpanda messages in a Google Cloud Storage bucket.

## [](#prerequisites)Prerequisites

Before you can create a GCS Sink connector in the Redpanda Cloud, you must:

1.  Create a [Google Cloud](https://cloud.google.com/) account.

2.  [Create a service account](https://cloud.google.com/iam/docs/service-accounts-create) that will be used to connect to the GCS service.

3.  [Create a service account key](https://cloud.google.com/iam/docs/keys-create-delete) and download it.

4.  Create a [custom role](https://cloud.google.com/iam/docs/creating-custom-roles), which must have the following permissions:

    -   `storage.objects.create` to create items in the GCS bucket

    -   `storage.objects.delete` to overwrite items in the GCS bucket


5.  [Create a GCS bucket](https://cloud.google.com/storage/docs/creating-buckets) to which to send data.

6.  [Grant permissions](https://cloud.google.com/storage/docs/access-control/using-iam-permissions) to the bucket your created for your service account. Use the role created in step 4.


## [](#limitations)Limitations

The GCS Sink connector has the following limitations:

-   You can use only the `STRING` and `BYTES` input formats for `CSV` output format.

-   You can use only the `PARQUET` format when your messages contain schema.


## [](#create-a-gcs-sink-connector)Create a GCS Sink connector

To create the GCS Sink connector:

1.  In Redpanda Cloud, click **Connectors** in the navigation menu, and then click **Create Connector**.

2.  Select **Export to Google Cloud Storage**.

3.  On the **Create Connector** page, specify the following required connector configuration options:

    | Property name | Property key | Description |
    | --- | --- | --- |
    | Topics to export | topics | Comma-separated list of the cluster topics you want to replicate to GCS. |
    | Topics regex | topics.regex | Java regular expression of topics to replicate. For example: specify .* to replicate all available topics in the cluster. Applicable only when Use regular expressions is selected. |
    | GCS Credentials JSON | gcs.credentials.json | JSON object with GCS credentials. |
    | GCS bucket name | gcs.bucket.name | Name of an existing GCS bucket to store output files in. |
    | Kafka message key format | key.converter | Format of the key in the Redpanda topic. Use BYTES for no conversion. |
    | Kafka message value format | value.converter | Format of the value in the Redpanda topic. Use BYTES for no conversion. |
    | GCS file format | format.output.type | Format of the files created in GCS: CSV (the default), JSON, JSONL AVRO, or PARQUET. You can use the CSV format output only with BYTES and STRING. |
    | Avro codec | avro.codec | The Avro compression codec to be used for Avro output files. Available values: null (the default), deflate, snappy, and bzip2. |
    | Max Tasks | tasks.max | Maximum number of tasks to use for this connector. The default is 1. Each task replicates exclusive set of partitions assigned to it. |
    | Connector name | name | Globally-unique name to use for this connector. |

4.  Click **Next**. Review the connector properties specified, then click **Create**.


### [](#advanced-gcs-sink-connector-configuration)Advanced GCS Sink connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require any additional property settings, then specify any of the following _optional_ advanced connector configuration properties by selecting **Show advanced options** on the **Create Connector** page:

| Property name | Property key | Description |
| --- | --- | --- |
| File name template | file.name.template | The template for file names on GCS. Supports {{ variable }} placeholders for substituting variables. Supported placeholders are:topicpartitionstart_offset (the offset of the first record in the file)timestamp:unit=yyyy|MM|dd|HH (the timestamp of the record)key (when used, other placeholders are not substituted) |
| File name prefix | file.name.prefix | The prefix to be added to the name of each file put in GCS. |
| Output fields | format.output.fields | Fields to place into output files. Supported values are: 'key', 'value', 'offset', 'timestamp', and 'headers'. |
| Value field encoding | format.output.fields.value.encoding | The type of encoding to be used for the value field. Supported values are: 'none' and 'base64'. |
| Envelope for primitives | format.output.envelope | Specifies whether or not to enable additional JSON object wrapping of the actual value. |
| Output file compression | file.compression.type | The compression type to be used for files put into GCS. Supported values are: 'none', 'gzip', 'snappy', and 'zstd'. |
| Max records per file | file.max.records | The maximum number of records to put in a single file. Must be a non-negative number. 0 is interpreted as "unlimited", which is the default. In this case files are only flushed after file.flush.interval.ms. |
| File flush interval milliseconds | file.flush.interval.ms | The time interval to periodically flush files and commit offsets. Value specified must be a non-negative number. Default is 60 seconds. 0 indicates that it is disabled. In this case, files are only flushed after reaching file.max.records record size. |
| GCS bucket check | gcs.bucket.check | If set to true, the connector will attempt to put a test file to the GCS bucket to validate access. Default is true. |
| GCS retry backoff initial delay milliseconds | gcs.retry.backoff.initial.delay.ms | Initial retry delay in milliseconds. The default value is 1000. |
| GCS retry backoff max delay milliseconds | gcs.retry.backoff.max.delay.ms | Maximum retry delay in milliseconds. The default value is 32000. |
| GCS retry backoff delay multiplier | gcs.retry.backoff.delay.multiplier | Retry delay multiplier. The default value is 2.0. |
| GCS retry backoff max attempts | gcs.retry.backoff.max.attempts | Retry max attempts. The default value is 6. |
| GCS retry backoff total timeout milliseconds | gcs.retry.backoff.total.timeout.ms | Retry total timeout in milliseconds. The default value is 50000. |
| Retry back-off | kafka.retry.backoff.ms | Retry backoff in milliseconds. In case of transient exceptions, useful for performing recovery. Maximum value is 86400000 (24 hours). |
| Error tolerance | errors.tolerance | Error tolerance response during connector operation. Default value is none and signals that any error will result in an immediate connector task failure. Value of all changes the behavior to skip over problematic records. |
| Dead letter queue topic name | errors.deadletterqueue.topic.name | The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, its transformations, or converters. The topic name is blank by default, which means that no messages are recorded in the DLQ. |
| Dead letter queue topic replication factor | errors.deadletterqueue.topic .replication.factor | Replication factor used to create the dead letter queue topic when it doesn’t already exist. |
| Enable error context headers | errors.deadletterqueue.context .headers.enable | When true, adds a header containing error context to the messages written to the dead letter queue. To avoid clashing with headers from the original record, all error context header keys, start with __connect.errors. |

## [](#map-data)Map data

Use the appropriate key or value converter (input data format) for your data as follows:

-   `JSON` (`org.apache.kafka.connect.json.JsonConverter`) when your messages are JSON-encoded. Select `Message JSON contains schema`, with the `schema` and `payload` fields.

-   `AVRO` (`io.confluent.connect.avro.AvroConverter`) when your messages contain AVRO-encoded messages, with schema stored in the Schema Registry.

-   `STRING` (`org.apache.kafka.connect.storage.StringConverter`) when your messages contain textual data.

-   `BYTES` (`org.apache.kafka.connect.converters.ByteArrayConverter`) when your messages contain arbitrary data.


You can also select the output data format for your GCS files as follows:

-   `CSV` to produce data in the `CSV` format. For `CSV` only, you can set `STRING` and `BYTES` input formats.

-   `JSON` to produce data in the `JSON` format as an array of record objects.

-   `JSONL` to produce data in the `JSON` format, each message as a separate JSON, one per line.

-   `PARQUET` to produce data in the `PARQUET` format when your messages contain schema.

-   `AVRO` to produce data in the `AVRO` format when your messages contain schema.


## [](#test-the-connection)Test the connection

After the connector is created, check the GCS bucket for a new file. Files should appear after the file flush interval (default is 60 seconds).

## [](#troubleshoot)Troubleshoot

If there are any connection issues, an error message is returned. Depending on the `GCS bucket check` property value, the error results in a failed connector (`GCS bucket check = true`) or a failed task (`GCS bucket check = false`). Select **Show Logs** to view error details.

Additional errors and corrective actions follow.

| Message | Action |
| --- | --- |
| Failed to read credentials from JSON string | The credentials given as JSON file in the GCS credentials JSON property are incorrect. Copy a valid key from the Google Cloud service account. |
| The specified bucket does not exist | Create the bucket if the bucket does not exist, or correct the bucket name if the bucket exists, but the specified GCS bucket name value is incorrect. |
| No files in the GCS bucket | Be sure to wait until the connector performs the first file flush (default is 60 seconds). |