Create a MirrorMaker2 Checkpoint Connector

You can use the MirrorMaker2 Checkpoint connector to import consumer group offsets from other Kafka clusters.

Prerequisites

  • The external Kafka cluster is accessible.

  • A service account with read-only access to the external cluster is available.

  • The Kafka cluster topics connector is running for the same source cluster, with a matching configuration.

Limitations

The MirrorMaker2 Checkpoint connector does not migrate consumer group offsets that are lower than the highest offsets synced by the MirrorMaker2 Source connector by the time the MirrorMaker2 Checkpoint connector is started.

Create a MirrorMaker2 Checkpoint connector

To create the MirrorMaker2 Checkpoint connector:

  1. In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.

  2. Select Import from Kafka cluster offsets.

  3. On the Create Connector page, specify the following required connector configuration options:

    Property name Property key Description

    Topics to replicate

    topics

    Comma-separated topic names and regexes you want to replicate.

    Source cluster broker list

    source.cluster.bootstrap.servers

    A comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers regardless of which servers are specified here for bootstrapping.

    Source cluster security protocol

    source.cluster.security.protocol

    The protocol used to communicate with source brokers. The default is PLAINTEXT.

    Source cluster SASL mechanism

    source.cluster.sasl.mechanism

    SASL mechanism used for connections to source cluster. Default is PLAIN.

    Source cluster SASL username

    source.cluster.sasl.username

    SASL username used for connections to source cluster.

    Source cluster SASL password

    source.cluster.sasl.password

    SASL password used for connections to source cluster.

    Groups

    groups

    Consumer groups to replicate. Supports comma-separated group IDs and regexes.

    Connector name

    name

    Globally-unique name to use for this connector.

  4. Click Next. Review the connector properties specified, then click Create.

Advanced MirrorMaker2 Checkpoint connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:

Property name Property key Description

Source cluster SSL custom certificate

source.cluster.ssl.truststore.certificates

Trusted certificates in the PEM format.

Source cluster SSL keystore key

source.cluster.ssl.keystore.key

Private key in the PEM format.

Source cluster SSL keystore certificate chain

source.cluster.ssl.keystore.certificate.chain

Certificate chain in the PEM format.

Topics exclude

topics.exclude

Excluded topics. Supports comma-separated topic names and regexes.

Source cluster alias

source.cluster.alias

When using DefaultReplicationPolicy, topic names will be prefixed with it.

Replication policy class

replication.policy.class

Class that defines the remote topic naming convention. Use IdentityReplicationPolicy to preserve topic names. DefaultReplicationPolicy prefixes the topic with the source cluster alias.

Emit checkpoints interval seconds

emit.checkpoints.interval.seconds

Frequency of checkpoints. The default is 60.

Sync group offsets enabled

sync.group.offsets.enabled

Specifies whether or not to periodically write the translated offsets to the __consumer_offsets topic in the target cluster, as long as no active consumers in that group are connected to the target cluster.

Sync group offsets interval seconds

sync.group.offsets.interval.seconds

Frequency of consumer group offset sync. The default is 60.

Refresh groups interval seconds

refresh.groups.interval.seconds

Frequency of group refreshes. The default is 600.

Offset-Syncs topic location

offset-syncs.topic.location

The location (source or target) of the offset-syncs topic. The default is source.

Checkpoints topic replication factor

checkpoints.topic.replication.factor

Replication factor for checkpoints topic. The default is -1.

Test the connection

After the connector is created:

  • Ensure that there are no errors in logs and in Redpanda Console.

  • Wait for the Kafka cluster topics connector to catch up. Then check to confirm that the consumer groups are replicated.

Use the Connectors API

When using the Connectors API, instead of specifying a value for source.cluster.sasl.username and source.cluster.sasl.password, you can specify a value for source.cluster.sasl.jaas.config.

Troubleshoot

Most MirrorMaker2 Checkpoint connector issues are reported as a failed task at the time of creation. Select Show Logs to view error details.

Message Action

Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available. / LOGS: Timed out while checking for or creating topic 'mm2-offset-syncs.target.internal'. This could indicate a connectivity issue / TimeoutException: Timed out waiting for a node assignment

Make sure broker URLs are correct and that the source cluster security protocol is correct.

SaslAuthenticationException: SASL authentication failed: security: Invalid credentials

Check to confirm that the username and password specified are correct.

java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config

Check to confirm that the username and password specified are correct.

Client SASL mechanism 'PLAIN' not enabled in the server, enabled mechanisms are [SCRAM-SHA-256, SCRAM-SHA-512]

Check to confirm that the respective Source cluster SASL mechanism is correct.

SaslAuthenticationException: SASL authentication failed: security: Invalid credentials

Make sure the respective Source cluster SASL mechanism is correct (for example, SCRAM-SHA-256 instead of SCRAM-SHA-512).

terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue

Enable the SSL using Source cluster security protocol (specify SSL or SASL_SSL).