Deploy Redpanda Connectors in Docker

The Redpanda Connectors Docker image is a community-supported artifact. For support, reach out to the Redpanda team in Redpanda Community Slack.
Are you looking for Redpanda Connect documentation?

This topic describes how to use the Redpanda Connectors Docker image, which includes a pre-configured instance of Kafka Connect that works with Redpanda.

The Redpanda Connectors Docker image contains only the MirrorMaker2 connector but you can build a custom image to install additional connectors. For a smoother experience, consider using the Managed Connectors available in Redpanda Cloud.

The latest Docker image contains:

  • Red Hat Enterprise Linux 8.9

  • OpenJDK 21 LTS

  • Kafka Connect

  • JMX-Exporter JMX Prometheus JavaAgent

The image also includes the following connectors as plugins:

  • MirrorSourceConnector

  • MirrorCheckpointConnector

  • MirrorHeartbeatConnector

Docker image configuration properties

The following table lists the available Docker image properties.

Property Description

CONNECT_BOOTSTRAP_SERVERS

Comma-separated list of host and port pairs that are the addresses of the Redpanda brokers.

CONNECT_CONFIGURATION

Properties-based Kafka Connect configuration.

CONNECT_ADDITIONAL_CONFIGURATION

Comma-separated Kafka Connect properties. This can be used as an alternative to the CONNECT_CONFIGURATION property. If the same Kafka Connect property is defined in CONNECT_CONFIGURATION and CONNECT_ADDITIONAL_CONFIGURATION, the one from CONNECT_ADDITIONAL_CONFIGURATION is used.

Example: offset.flush.interval.ms=1000,producer.linger.ms=1

CONNECT_SASL_MECHANISM

SASL mechanism. Allowed values: "plain","scram-sha-256", or "scram-sha-512". Do not set if SASL is not used.

Default: not set

CONNECT_SASL_USERNAME

SASL username used to authenticate connecting to a Redpanda broker.

Default: not set

CONNECT_SASL_PASSWORD_FILE

Relative path to a file containing the SASL password, relative to the /opt/kafka/connect-password directory. If the file is in /opt/kafka/connect-password/pass-dir/password, then set pass-dir/password. The SASL password is given in plain text.

Default: not set

CONNECT_TLS_ENABLED

Set to "true" if TLS enabled, and "false" if not.

Default: "false"

CONNECT_TLS_AUTH_CERT

TLS authentication certificate location (relative path from /opt/kafka/connect-certs/).

For example: "user-secret/user.crt" when file is in /opt/kafka/connect-certs/user-secret/user.crt

CONNECT_TLS_AUTH_KEY

TLS authentication key location (relative path from /opt/kafka/connect-certs/).

For example: "user-secret/user.key" when file is in /opt/kafka/connect-certs/user-secret/user.key

CONNECT_TRUSTED_CERTS

Truststore locations (relative path from /opt/kafka/connect-certs/).

For example: "my-secret/ca.crt;my-secret/new-cert.crt" when file is in /opt/kafka/connect-certs/my-secret/ca.crt and /opt/kafka/connect-certs/my-secret/new-cert.crt

CONNECT_ADDITIONAL_TLS_AUTH_CERT

Additional TLS authentication certificate location, used, for example, to connect with the source MM2 cluster, (relative path from /opt/kafka/connect-certs/).

For example: "user-secret/user.crt" when file is in /opt/kafka/connect-certs/user-secret/user.crt

CONNECT_ADDITIONAL_TLS_AUTH_KEY

Additional TLS authentication key location, used, for example, to connect with the source MM2 cluster (relative path from /opt/kafka/connect-certs/).

For example: "user-secret/user.key" when file is in /opt/kafka/connect-certs/user-secret/user.key

CONNECT_ADDITIONAL_TRUSTED_CERTS

Additional truststore locations, used, for example, to connect with the source MM2 cluster (relative path from /opt/kafka/connect-certs/).

For example: "my-secret/cert.crt;my-secret/new-cert.crt" when file is in /opt/kafka/connect-certs/my-secret/cert.crt and /opt/kafka/connect-certs/my-secret/new-cert.crt

CONNECT_METRICS_ENABLED

Set to "true to enable Prometheus metrics, port 9404. Set to "false" to disable. Default: "true"

CONNECT_PLUGIN_PATH

Comma-separated list of directories with plugins to load by Kafka Connect.

Default: /opt/kafka/redpanda-plugins

CONNECT_GC_LOG_ENABLED

Set to "true" to enable GC logging. Set to "false" to disable GC logs.

Default: "false"

CONNECT_HEAP_OPTS

JVM heap options. For example -Xms2G -Xmx2G.

Default: -Xms256M -Xmx256M

CONNECT_LOG4J_CONFIGURATION

By default, Kafka Connect logs at "info" info level using Console appender. Use this property to pass custom log4j properties-based configuration.

CONNECT_LOG_LEVEL

By default, Kafka Connect logs at "warn" info level using Console appender. Use "info" to change the log level to info or "debug" for debug log level.

Install new connector type

To install a new connector type:

  1. Prepare a new connector jar. Place a fat-jar or a jar with all dependent jars in a dedicated directory.

    For example: ./connect-plugins/snowflake-sink/snowflake-sink-fat.jar

  2. Mount a volume to bind the directory with a container. For example, make the ./connect-plugins directory content visible in /opt/kafka/connect-plugins in a container:

    volumes:
      - ./connect-plugins:/opt/kafka/connect-plugins
  3. Use the CONNECT_PLUGIN_PATH image property to configure a directory with the new connector. Use Kafka Connect to discover new connectors. For example:

    CONNECT_PLUGIN_PATH: "/opt/kafka/connect-plugins"
  4. The new connector type should be discovered by Kafka Connect automatically on startup. Use the /connector-plugins Kafka Connect REST endpoint to check available connector types. For example: curl localhost:8083/connector-plugins

    Create a separate child directory for each connector, and place the connector’s jar files and other resource files in that child directory.

Configure SASL

To configure SASL:

  1. Prepare the SASL user and password, making sure the user has necessary permissions.

    • Required: Write access for internal topics and access to consumer groups (so all workers in the cluster can communicate with each other).

    • ACLs depend on used connector type (source/sink) and topics used by the connectors.

  2. Create a file containing the plain text password in a dedicated directory. For example, ./connect-password/redpanda-password/password where the password file contains just the password

  3. Mount a volume to bind the directory with a container. For example, make the ./connect-password directory content visible in /opt/kafka/connect-password in a container:

    volumes:
      - ./connect-password:/opt/kafka/connect-password
  4. Use CONNECT_SASL_USERNAME to set the SASL username, and use CONNECT_SASL_PASSWORD_FILE to set the relative path to a password file. For example, if the file is in /opt/kafka/connect-password/redpanda-password/password, use the redpanda-password/password value.

    CONNECT_SASL_USERNAME: "connect-user"
    CONNECT_SASL_PASSWORD_FILE: "redpanda-password/password"

Configure TLS

To configure TLS:

  1. Prepare Redpanda cluster certificate and key, and place them in a dedicated directory. For example:

    ./connect-certs/ca.crt
    ./connect-certs/client.crt
    ./connect-certs/client.key
  2. Mount a volume to bind the directory with a container. For example, make the ./connect-plugins directory content visible in /opt/kafka/connect-plugins in a container:

    volumes:
      - ./connect-certs:/opt/kafka/connect-certs/user-secret
  3. Set the CONNECT_TLS_ENABLED property to "true".

  4. Use the CONNECT_TLS_AUTH_CERT, CONNECT_TRUSTED_CERTS, and CONNECT_TLS_AUTH_KEY image properties to configure the relative path to the certificate and key. For example, if the files are in /opt/kafka/connect-certs/user-secret, use:

    CONNECT_TRUSTED_CERTS: "user-secret/ca.crt"
    CONNECT_TLS_AUTH_CERT: "user-secret/client.crt"
    CONNECT_TLS_AUTH_KEY: "user-secret/client.key"

Connect with Docker Compose

You can use the following Docker Compose sample file to connect:

docker-compose.yml
version: '3.8'
services:
  connect:
    image: docker.redpanda.com/redpandadata/connectors:latest
    volumes:
      - ./connect-password:/opt/kafka/connect-password
      - ./connect-plugins:/opt/kafka/connect-plugins
      - ./connect-certs:/opt/kafka/connect-certs/user-secret
    hostname: connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_CONFIGURATION: |
        key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
        value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
        group.id=connectors-group
        offset.storage.topic=_connectors_offsets
        config.storage.topic=_connectors_configs
        status.storage.topic=_connectors_status
        config.storage.replication.factor=-1
        offset.storage.replication.factor=-1
        status.storage.replication.factor=-1
      CONNECT_BOOTSTRAP_SERVERS: ...data.redpanda:30499,...data.redpanda:30499,...data.redpanda:30499
      CONNECT_GC_LOG_ENABLED: "false"
      CONNECT_HEAP_OPTS: -Xms1G -Xmx1G
      CONNECT_METRICS_ENABLED: "false"
      CONNECT_SASL_MECHANISM: "scram-sha-256"
      CONNECT_SASL_USERNAME: "connect-user"
      CONNECT_SASL_PASSWORD_FILE: "redpanda-password/password"
      CONNECT_TLS_ENABLED: "true"
      CONNECT_TRUSTED_CERTS: "user-secret/ca.crt"
      CONNECT_TLS_AUTH_CERT: "user-secret/client.crt"
      CONNECT_TLS_AUTH_KEY: "user-secret/client.key"
      CONNECT_PLUGIN_PATH: "/opt/kafka/connect-plugins"
├── ...
├── connect-certs
│   ├── ca.crt                                 # A file with Redpanda cluster CA cert
│   ├── client.crt                             # A file with Redpanda cluster cert
│   └── client.key                             # A file with Redpanda cluster key
├── connect-password
│   └── redpanda-password
│       └──password                            # A file with SASL password
├── connect-plugins
│   └── custom-connector
│       └── custom-sink-connector-fat.jar      # Connector fat jar or jar and dependencies jars
└── docker-compose.yaml                        # A docker-compose file

To connect with Docker Compose:

  1. From a directory containing the docker-compose.yaml file, run:

    docker-compose up
  2. To list installed plugins, run:

    curl localhost:8083/connector-plugins
  3. To get Kafka Connect basic information, run:

    curl localhost:8083/
  4. Metrics are available at localhost:9404/.

  5. Use the Redpanda Console or Kafka Connect REST API to manage connectors.

Connect to a Redpanda Cloud cluster

To connect to a Redpanda Cloud cluster with Docker Compose:

  1. Use rpk or Redpanda Console (Security tab) to create a Redpanda user.

  2. Create ACLs for the user.

  3. Set the username in the CONNECT_SASL_USERNAME property.

  4. Create a file containing the user password (for example, in the path passwords/redpanda-password/password). Specify this path in the CONNECT_SASL_PASSWORD_FILE property.

  5. Specify a value in the CONNECT_BOOTSTRAP_SERVERS property. You can view this value in Redpanda Console > Overview > Kafka API, in the Bootstrap server URL option.

  6. Set the CONNECT_SASL_MECHANISM property value to "scram-sha-256".

  7. Set the CONNECT_TLS_ENABLED property value to "true".

docker-compose.yml
version: '3.8'
  connect:
    image: docker.redpanda.com/redpandadata/connectors:latest
    volumes:
      - ./passwords:/opt/kafka/connect-password/passwords
    hostname: connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_CONFIGURATION: |
          key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
          value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
          group.id=connectors-group
          offset.storage.topic=_connectors_offsets
          config.storage.topic=_connectors_configs
          status.storage.topic=_connectors_status
          config.storage.replication.factor=-1
          offset.storage.replication.factor=-1
          status.storage.replication.factor=-1
      CONNECT_BOOTSTRAP_SERVERS: seed-....redpanda.com:9092
      CONNECT_GC_LOG_ENABLED: "false"
      CONNECT_HEAP_OPTS: -Xms1G -Xmx1G
      CONNECT_SASL_MECHANISM: "scram-sha-256"
      CONNECT_SASL_USERNAME: "connectors-user"
      CONNECT_SASL_PASSWORD_FILE: "passwords/redpanda-password/password"
      CONNECT_TLS_ENABLED: "true"
├── ...
├── passwords
│   └── redpanda-password
│       └──password                            # A file with SASL password
└── docker-compose.yaml                        # A docker-compose file

Redpanda Cloud Schema Registry

For converters using Schema Registry (like AvroConverter, JsonConverter), use the following connector configuration properties to set up a connection with Schema Registry:

Property Description

key.converter

Key converter class to use for the connector.

key.converter.schema.registry.url

Key converter Schema Registry URL, which you can view in the cluster Overview > Schema Registry.

key.converter.basic.auth.credentials.source

Key converter authentication method, should be USER_INFO.

key.converter.basic.auth.user.info

Key converter user and password used for authentication, separated by a colon.

value.converter

Value converter class to use for the connector.

value.converter.schema.registry.url

Value converter Schema Registry URL, which you can view in the cluster Overview > Schema Registry.

value.converter.basic.auth.credentials.source

Value converter authentication method, should be USER_INFO.

value.converter.basic.auth.user.info

Value converter user and password used for authentication, separated by a colon.

Example:

{
   ....
   "value.converter.schema.registry.url": "https://schema-registry-....redpanda.com:30081",
   "value.converter.basic.auth.credentials.source": "USER_INFO",
   "value.converter.basic.auth.user.info": "connect-user:secret-password"
}

Manage connectors with Kafka Connect

You can manage connectors using the Kafka Connect REST API.

View version of Kafka Connect worker

To view the version of the Kafka Connect worker, run:

curl localhost:8083 | jq

View list of connector plugins

To view the list of available connector plugins, run:

curl localhost:8083/connector-plugins | jq

View list of active connectors

To view the list of active connectors, run:

curl 'http://localhost:8083/connectors?expand=status&expand=info' | jq

Create connector

To create the connector, run:

curl "localhost:8083/connectors" -H 'Content-Type: application/json' --data-raw '<connector-config>'

For example:

curl "localhost:8083/connectors" \
  -H 'Content-Type: application/json' \
  --data-raw '{  "name": "heartbeat-connector", "config": { "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "heartbeats.topic.replication.factor": "1", "replication.factor": "1",  "source.cluster.alias": "source",  "source.cluster.bootstrap.servers": "redpanda:29092",     "target.cluster.bootstrap.servers": "redpanda:29092"}}'

View connector status

To view connector status, run:

curl localhost:8083/connectors/<connector-name>/status

For example:

curl localhost:8083/connectors/heartbeat-connector/status

Delete connector

To delete the connector, run:

curl "localhost:8083/connectors/<connector-name>" -X 'DELETE'

For example:

curl "localhost:8083/connectors/heartbeat-connector" -X 'DELETE'