Create an S3 Sink Connector

The Amazon S3 Sink connector exports Apache Kafka messages to files in AWS S3 buckets.

Prerequisites

Before you can create an AWS S3 sink connector in the Redpanda Cloud, you must complete these tasks:

  1. Create an AWS account.

  2. Create an S3 bucket that you will send data to.

  3. Create an IAM user that will be used to connect to the S3 service.

  4. Attach the following policy to the user, replacing bucket-name with the name you specified in step 2.

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Principal": "*",
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:AbortMultipartUpload",
                    "s3:ListMultipartUploadParts",
                    "s3:ListBucketMultipartUploads"
                ],
                "Resource": [
                    "arn:aws:s3:::bucket-name/*",
                    "arn:aws:s3:::bucket-name"
                ]
            }
        ]
    }
  5. Create access keys for the user created in step 3.

  6. Copy the access key ID and the secret access key. You will need them to configure the connector.

Limitations

  • You can use only the STRING and BYTES input formats for CSV output format.

  • You can use only the PARQUET format when your messages contain schema.

Create an AWS S3 Sink connector

To create the AWS S3 Sink connector:

  1. In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.

  2. Select Export to S3.

  3. On the Create Connector page, specify the following required connector configuration options:

    Property name Property key Description

    Topics to export

    topics

    Comma-separated list of the cluster topics whose records will be exported to the S3 bucket.

    Topics regex

    topics.regex

    Java regular expression of topics to replicate. For example: specify .* to replicate all available topics in the cluster. Applicable only when Use regular expressions is selected.

    AWS access key ID

    aws.access.key.id

    Enter the AWS access key ID.

    AWS secret access key

    aws.secret.access.key

    Enter the AWS secret access key.

    AWS S3 bucket name

    aws.s3.bucket.name

    Specify the name of the AWS S3 bucket to which the connector is to send data.

    AWS S3 region

    aws.s3.region

    Select the region for the S3 bucket used for storing the records. The default us-east-1.

    Kafka message key format

    key.converter

    Format of the key in the Redpanda topic. The default is BYTES.

    Kafka message value format

    value.converter

    Format of the value in the Redpanda topic. The default is BYTES.

    S3 file format

    format.output.type

    Format of the files created in S3: CSV (the default), AVRO, JSON, JSONL, or PARQUET. You can use the CSV format output only with BYTES and STRING.

    Avro codec

    avro.codec

    The Avro compression codec to be used for Avro output files. Available values: null (the default), deflate, snappy, and bzip2.

    Max Tasks

    tasks.max

    Maximum number of tasks to use for this connector. The default is 1. Each task replicates exclusive set of partitions assigned to it.

    Connector name

    name

    Globally-unique name to use for this connector.

  4. Click Next. Review the connector properties specified, then click Create.

Advanced AWS S3 Sink connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:

Property name Property key Description

File name template

file.name.template

The template for file names on S3. Supports {{ variable }} placeholders for substituting variables. Supported placeholders are:

  • topic

  • partition

  • start_offset (the offset of the first record in the file)

  • timestamp:unit=yyyy|MM|dd|HH (the timestamp of the record)

  • key (when used, other placeholders are not substituted)

File name prefix

file.name.prefix

The prefix to be added to the name of each file put in S3.

Output fields

format.output.fields

Fields to place into output files. Supported values are: 'key', 'value', 'offset', 'timestamp', and 'headers'.

Value field encoding

format.output.fields.value.encoding

The type of encoding to be used for the value field. Supported values are: 'none' and 'base64'.

Envelope for primitives

format.output.envelope

Specifies whether or not to enable additional JSON object wrapping of the actual value.

Output file compression

file.compression.type

The compression type to be used for files put into S3. Supported values are: 'none' (default), 'gzip', 'snappy', and 'zstd'.

Max records per file

file.max.records

The maximum number of records to put in a single file. Must be a non-negative number. 0 is interpreted as "unlimited", which is the default. In this case files are only flushed after file.flush.interval.ms.

File flush interval milliseconds

file.flush.interval.ms

The time interval to periodically flush files and commit offsets. Value specified must be a non-negative number. Default is 60 seconds. 0 indicates that it is disabled. In this case, files are only flushed after reaching file.max.records record size.

AWS S3 bucket check

aws.s3.bucket.check

If set to true (default), the connector will attempt to put a test file to the S3 bucket to validate access.

AWS S3 part size bytes

s3.part.size

The part size in S3 multi-part uploads in bytes. Maximum is 2147483647 (2GB) and default is 5242880 (5MB).

S3 retry backoff

aws.s3.backoff.delay.ms

S3 default base sleep time (in milliseconds) for non-throttled exceptions. Default is 100.

S3 maximum back-off

aws.s3.backoff.max.delay.ms

S3 maximum back-off time (in milliseconds) before retrying a request. Default is 20000.

S3 max retries

aws.s3.backoff.max.retries

Maximum retry limit (if the value is greater than 30, there can be integer overflow issues during delay calculation). Default is 3.

Error tolerance

errors.tolerance

Error tolerance response during connector operation. Default value is none and signals that any error will result in an immediate connector task failure. Value of all changes the behavior to skip over problematic records.

Dead letter queue topic name

errors.deadletterqueue.topic.name

The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, its transformations, or converters. The topic name is blank by default, which means that no messages are recorded in the DLQ.

Dead letter queue topic replication factor

errors.deadletterqueue.topic .replication.factor

Replication factor used to create the dead letter queue topic when it doesn’t already exist.

Enable error context headers

errors.deadletterqueue.context .headers.enable

When true, adds a header containing error context to the messages written to the dead letter queue. To avoid clashing with headers from the original record, all error context header keys, start with __connect.errors.

Map data

Use the appropriate key or value converter (input data format) for your data as follows:

  • JSON (org.apache.kafka.connect.json.JsonConverter) when your messages are JSON-encoded. Select Message JSON contains schema, with the schema and payload fields.

  • AVRO (io.confluent.connect.avro.AvroConverter) when your messages contain AVRO-encoded messages, with schema stored in the Schema Registry.

  • STRING (org.apache.kafka.connect.storage.StringConverter) when your messages contain textual data.

  • BYTES (org.apache.kafka.connect.converters.ByteArrayConverter) when your messages contain arbitrary data.

You can also select the output data format for your S3 files as follows:

  • CSV to produce data in the CSV format. For CSV only, you can set STRING and BYTES input formats.

  • JSON to produce data in the JSON format as an array of record objects.

  • JSONL to produce data in the JSON format, each message as a separate JSON, one per line.

  • PARQUET to produce data in the PARQUET format when your messages contain schema.

  • AVRO to produce data in the AVRO format when your messages contain schema.

Test the connection

After the connector is created, test the connection by writing to one of your topics, then checking the contents of the S3 bucket in the AWS management console. Files should appear after the file flush interval (default is 60 seconds).

Troubleshoot

If there are any connection issues, an error message is returned. Depending on the AWS S3 bucket check property value, the error results in a failed connector (AWS S3 bucket check = true) or a failed task (AWS S3 bucket check = false). Select Show Logs to view error details.

Additional errors and corrective actions follow.

Message Action

The AWS Access Key Id you provided does not exist in our records

AWS access key ID is invalid. Check to confirm that a valid existing AWS access key is specified.

The authorization header is malformed; the region us-east-1 is wrong; expecting us-east-2

The selected region (AWS S3 region) of the AWS bucket is incorrect. Check to confirm that you have specified the region in which the bucket was created.

The specified bucket does not exist

Create the bucket specified in the AWS S3 bucket name property, or provide the correct name of the existing bucket.

No files in the S3 bucket

Be sure to wait until the connector completes the first file flush (default 60 seconds). Verify that the topics specified are correct. Then verify that the topics contain messages to be pushed to S3.