Creating an S3 Sink Connector

The Amazon S3 Sink connector stores Apache Kafka messages in an AWS S3 bucket.

Prerequisites

Before you can create an S3 sink connector in the Redpanda Cloud, you must complete these tasks:

  • In the AWS management console, create an S3 bucket associated with your AWS account ID.

  • In the IAM console, create an IAM policy to govern access to the bucket. For the minimum list of required permissions, see the sample IAM policy below.

  • In the Redpanda Console, create a topic.

Sample IAM policy

The S3 sink connector that accesses the S3 bucket must have certain permissions, which are specified in an IAM policy. When you create a policy in the IAM console, you can copy and paste this text into the policy document section. Remember to replace bucket-name with the actual name of your S3 bucket.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:AbortMultipartUpload",
                "s3:ListMultipartUploadParts",
                "s3:ListBucketMultipartUploads"
            ],
            "Resource": "arn:aws:s3:::bucket-name/*"
        }
    ]
}

Create an S3 sink connector

To create an S3 sink connector, first choose the connector type, then configure connector properties, and finally, review and apply the connector configuration. To test the connection, produce a message to the topic and verify that the S3 bucket contains the message.

Choose connector type

To start configuring an S3 sink connector, choose S3SinkConnector as the Connector Type.

  1. From the navigation menu of the Cluster page, select Kafka Connect.

  2. Click Create Connector.

  3. In Installation Target, select redpanda connect.

  4. Under Connector Type, select S3SinkConnector and click Next.

The next step is to configure connector properties.

Configure connector properties

Specify connector properties and list the topics you want to export to the S3 bucket.

  1. In Connector name, enter a name for your connector.

    As a suggestion, use a name that indicates which S3 bucket the connector is sending data to.

  2. In Key converter class, select a converter. Choices include:

    • org.apache.kafka.connect.storage.StringConverter

    • org.apache.kafka.connect.json.JsonConverter

    • org.apache.kafka.connect.converters.ByteArrayConverter

    • io.confluent.connect.avro.AvroConverter

    The following converters are not supported:

    • io.confluent.connect.protobuf.ProtobufConverter

    • io.confluent.connect.json.JsonSchemaConverter

  3. In Topics, enter the name of the topic whose records will be exported to the S3 bucket. If more than one topic will be exported, provide a comma-separated list; for example, topic1, topic2, topic3.

    As an alternative, you can enter a regular expression in Topics regex to specify the topics you want to export.

  4. In Value converter class, select a converter. The choices are the same as for the Key converter class.

  5. Click AWS and fill in the following information (available from the AWS management console):

    • aws.access.key.id

    • aws.secret.access.key

    • aws.s3.bucket.name

    • aws.s3.region

  6. Click File and fill in the following information:

    • file.name.prefix (optional)

    • file.name.template (optional)

    • file.compression.type (default is none)

    • file.max.records (default is 0)

    • file.name.timestamp.timezone

    • file.name.timestamp.source (default is WALLCLOCK)

  7. Click Format and fill in the following fields:

    • format.output.type (choices include csv, json, jsonl, and parquet)

    • format.output.fields (default is value)

    • format.output.fields.value.encoding (default is base64)

  8. Click Next Step.

    The Review page opens.

Review connector configuration

Check that the Connector Plugin is correct and that the information in Connector Properties is correct.

  1. Verify that the Connector Properties file contains all the values you set on the S3SinkConnector page.

  2. Manually edit the file and add the following line: "value.converter.schemas.enable": "false"

    Remember to add a comma at the end of the previous line.

  3. Click Finish.

    The connectors summary page opens.

Test the connection

After the connector is created, test the connection by writing to one of your topics, then checking the contents of the S3 bucket in the AWS management console.

  1. From the navigation menu, select Topics.

  2. On the Topics page, click the name of the topic you created for exporting to your S3 bucket.

    The summary page for that topic opens.

  3. Open the Actions menu and select Publish Message.

    The Produce Message dialog box opens.

  4. Select the Key tab and type a key on the first line.

    For example, type key-1.

  5. Select the Value tab and type a value on the first line.

    For example, type [{"name":"user-1"}].

  6. Click Publish.

  7. Click Close to close the success message.

  8. From the navigation menu, select Kafka Connect.

  9. Under Cluster, click the cluster name.

    The cluster summary page opens, and the state for Connector shows Running.

  10. In the AWS management console, display the page for your S3 bucket.

  11. Click the refresh symbol to refresh the page, then verify that your S3 bucket is listed.

  12. Click your S3 bucket name to see the topic.