Create a MirrorMaker2 Source Connector

You can use a MirrorMaker2 Source connector to import messages from another Kafka cluster. You can also use it to:

  • Replicate messages from an external Kafka or Redpanda cluster.

  • Create topics on the local cluster, with a configuration matching external topics.

  • Replicate topic access-control lists (ACLs).

Prerequisites

  • The external Kafka cluster must be accessible.

  • A service account with full access to the external cluster must be available. You can also use a service account with read-only ACLs when the offset-syncs topic location is set to target. You must have describe and/or describe-configs ACLs for the connector to read topic configurations on the source cluster and create the topics on the target cluster, unless you create the topics yourself.

Limitations

  • ACLs are copied, but service accounts are not created.

  • Only topic ACLs are copied (group ACLs are not).

  • Only ACLs for topics matching the connector configuration are copied (write ACLs are not copied).

  • All permissions ACLs are downgraded to read-only.

Create a MirrorMaker2 Source connector

To create the MirrorMaker2 Source connector:

  1. In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.

  2. Select Import from Kafka cluster topics.

  3. On the Create Connector form page, specify the following required connector configuration options:

    Property name Property key Description

    Regexes of topics to import

    topics

    Comma-separated topic names and regexes you want to replicate.

    Source cluster broker list

    source.cluster.bootstrap.servers

    A comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers regardless of which servers are specified here for bootstrapping. This list only impacts the initial hosts used to discover the full set of servers, and should be in the form host1:port1,host2:port2,.... Because these servers are only used for the initial connection to discover the full cluster membership (which may change dynamically), it need not contain the full set of servers (you may want more than one, though, in case a server is down).

    Source cluster security protocol

    source.cluster.security.protocol

    The protocol to use to communicate with source brokers. Default is PLAINTEXT.

    Source cluster SASL mechanism

    source.cluster.sasl.mechanism

    SASL mechanism used for connections to source cluster. Default is PLAIN.

    Source cluster SASL username

    source.cluster.sasl.username

    SASL username used for connections to source cluster.

    Source cluster SASL password

    source.cluster.sasl.password

    SASL password used for connections to source cluster.

    Sync topic configs enabled

    sync.topic.configs.enabled

    Specifies whether to periodically configure remote topics to match their corresponding upstream topics.

    Sync topic ACLs enabled

    sync.topic.acls.enabled

    Specifies whether or not to periodically configure remote topic ACLs to match their corresponding upstream topics.

    Connector name

    name

    Globally-unique name to use for this connector.

  4. Click Next. Review the connector properties specified, then click Create.

Offsets are not guaranteed to match between the source and target. For example, if data-retention deletes occur on the source topic and the earliest offset is #5000, then when that event is created on the target topic the offset for that event will be #0.

Events written on the target topic use the timestamp that was set on the source event. For example, if the source event has a timestamp 2023-05-22 17:00, then this would also be the timestamp on the target event.

Advanced MirrorMaker2 Source connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:

Property name Property key Description

Source cluster SSL custom certificate

source.cluster.ssl.truststore.certificates

Trusted certificates in the PEM format.

Source cluster SSL keystore key

source.cluster.ssl.keystore.key

Private key in the PEM format.

Source cluster SSL keystore certificate chain

source.cluster.ssl.keystore.certificate.chain

Certificate chain in the PEM format.

Sync topic configs interval seconds

sync.topic.configs.interval.seconds

Frequency of topic config sync.

Sync topic ACLs interval seconds

sync.topic.acls.interval.seconds

Frequency of topic ACL sync.

Topics exclude

topics.exclude

Excluded topics. Supports comma-separated topic names and regexes.

Source cluster alias

source.cluster.alias

When using DefaultReplicationPolicy, topic names will be prefixed with it.

Replication policy class

replication.policy.class

Class that defines the remote topic naming convention. Use IdentityReplicationPolicy to preserve topic names. DefaultReplicationPolicy prefixes the topic with the source cluster alias.

Replication factor

replication.factor

Replication factor for newly created remote topics. Set -1 for cluster default.

Refresh topics interval seconds

refresh.topics.interval.seconds

Frequency of topic refresh.

Offset-Syncs topic location

offset-syncs.topic.location

The location (source or target) of the offset-syncs topic. The default is source.

Offset-Syncs topic replication factor

offset-syncs.topic.replication.factor

Replication factor for offset-syncs topic. The default is -1.

Config properties exclude

config.properties.exclude

Topic config properties that should not be replicated. Supports comma-separated property names and regexes.

Compression type

producer.override.compression.type

The compression type for all data generated by the producer. The default is none (no compression).

Max size of a request

producer.override.max.request.size

The maximum size of a request in bytes. The default is 1048576.

Auto offset reset

consumer.auto.offset.reset

What to do when there is no initial offset in Kafka, or if the current offset does not exist any more on the server (for example, because that data has been deleted). 'earliest' - automatically reset the offset to the earliest offset. 'latest' - automatically reset the offset to the latest offset. 'none' - throw exception to the consumer if no previous offset is found for the consumer’s group.

Offset lag max

offset.lag.max

How out-of-sync a remote partition can be before it is resynced. This setting impacts the MirrorMaker2 Checkpoint connector as it is the maximum lag for syncing consumer groups. The default is 100 records.

Map data

The value converter does not require any schema; it copies data as bytes.

Test the connection

After the connector is created:

  • Ensure that there are no errors in logs and in Redpanda Console.

  • Confirm that Redpanda topics are being replicated. You should see messages coming into the topics.

Use the Connectors API

When using the Connectors API, instead of specifying a value for source.cluster.sasl.username and source.cluster.sasl.password, you can specify a value for source.cluster.sasl.jaas.config.

Troubleshoot

Most MirrorMaker2 Source connector issues are reported as a failed task at the time of creation. Select Show Logs to view error details.

Message Action

Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available. / LOGS: Timed out while checking for or creating topic 'mm2-offset-syncs.target.internal'. This could indicate a connectivity issue / TimeoutException: Timed out waiting for a node assignment

Make sure broker URLs are correct and that the security.protocol is correct.

SaslAuthenticationException: SASL authentication failed: security: Invalid credentials

Confirm that the username and password specified are correct.

Terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue

Error indicates that the SSL should be enabled using Source cluster security protocol (use SSL or SASL_SSL).

RecordTooLargeException: The message is N bytes (…​)

Use producer.override.max.request.size property to change max request size.

RecordTooLargeException: The request included (…​)

The target server is not able to receive messages because it is too large in size. Disabled compression can be a root cause. Consider enabling compression: "Compression type": "snappy",

Scheduler for MirrorSourceConnector caught exception in scheduled task: syncing topic ACLs

MirrorMaker2 requires an authorizer to be configured by the broker side, but it is not. Change the Sync topic ACLs enabled MirrorMaker2 property to false (default is true) to disable ACL syncing.

TopicAuthorizationException: Topic authorization failed

Confirm the service account for the source cluster contains describe and/or describe-configs ACLs.

OffsetOutOfRangeException Fetch position FetchPosition{offset=0, …​ ]

If the 0 offset for your topic does not exist in the source cluster, set Auto offset reset to either earliest or latest.