Create a MirrorMaker2 Source Connector
You can use a MirrorMaker2 Source connector to import messages from another Kafka cluster. You can also use it to:
-
Replicate messages from an external Kafka or Redpanda cluster.
-
Create topics on the local cluster, with a configuration matching external topics.
-
Replicate topic access-control lists (ACLs).
Prerequisites
-
The external Kafka cluster must be accessible.
-
A service account with full access to the external cluster must be available. You can also use a service account with read-only ACLs when the
offset-syncs
topic location is set totarget
. You must have describe and/or describe-configs ACLs for the connector to read topic configurations on the source cluster and create the topics on the target cluster, unless you create the topics yourself.
Limitations
-
ACLs are copied, but service accounts are not created.
-
Only topic ACLs are copied (group ACLs are not).
-
Only ACLs for topics matching the connector configuration are copied (write ACLs are not copied).
-
All permissions ACLs are downgraded to read-only.
Create a MirrorMaker2 Source connector
To create the MirrorMaker2 Source connector:
-
In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.
-
Select Import from Kafka cluster topics.
-
On the Create Connector form page, specify the following required connector configuration options:
Property name Property key Description Regexes of topics to import
topics
Comma-separated topic names and regexes you want to replicate.
Source cluster broker list
source.cluster.bootstrap.servers
A comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers regardless of which servers are specified here for bootstrapping. This list only impacts the initial hosts used to discover the full set of servers, and should be in the form
host1:port1,host2:port2,...
. Because these servers are only used for the initial connection to discover the full cluster membership (which may change dynamically), it need not contain the full set of servers (you may want more than one, though, in case a server is down).Source cluster security protocol
source.cluster.security.protocol
The protocol to use to communicate with source brokers. Default is
PLAINTEXT
.Source cluster SASL mechanism
source.cluster.sasl.mechanism
SASL mechanism used for connections to source cluster. Default is
PLAIN
.Source cluster SASL username
source.cluster.sasl.username
SASL username used for connections to source cluster.
Source cluster SASL password
source.cluster.sasl.password
SASL password used for connections to source cluster.
Sync topic configs enabled
sync.topic.configs.enabled
Specifies whether to periodically configure remote topics to match their corresponding upstream topics.
Sync topic ACLs enabled
sync.topic.acls.enabled
Specifies whether or not to periodically configure remote topic ACLs to match their corresponding upstream topics.
Connector name
name
Globally-unique name to use for this connector.
-
Click Next. Review the connector properties specified, then click Create.
Offsets are not guaranteed to match between the source and target. For example,
if data-retention deletes occur on the source topic and the earliest
offset is Events written on the target topic use the timestamp that was set on the source event.
For example, if the source event has a timestamp |
Advanced MirrorMaker2 Source connector configuration
In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:
Property name | Property key | Description |
---|---|---|
|
|
Trusted certificates in the PEM format. |
|
|
Private key in the PEM format. |
|
|
Certificate chain in the PEM format. |
|
|
Frequency of topic config sync. |
|
|
Frequency of topic ACL sync. |
|
|
Excluded topics. Supports comma-separated topic names and regexes. |
|
|
When using DefaultReplicationPolicy, topic names will be prefixed with it. |
|
|
Class that defines the remote topic naming convention. Use IdentityReplicationPolicy to preserve topic names. DefaultReplicationPolicy prefixes the topic with the source cluster alias. |
|
|
Replication factor for newly created remote topics. Set -1 for cluster default. |
|
|
Frequency of topic refresh. |
|
|
The location ( |
|
|
Replication factor for offset-syncs topic. The default is |
|
|
Topic config properties that should not be replicated. Supports comma-separated property names and regexes. |
|
|
The compression type for all data generated by the producer. The default is none (no compression). |
|
|
The maximum size of a request in bytes. The default is 1048576. |
|
|
What to do when there is no initial offset in Kafka, or if the current offset does not exist any more on the server (for example, because that data has been deleted). 'earliest' - automatically reset the offset to the earliest offset. 'latest' - automatically reset the offset to the latest offset. 'none' - throw exception to the consumer if no previous offset is found for the consumer’s group. |
|
|
How out-of-sync a remote partition can be before it is resynced. This setting impacts the MirrorMaker2 Checkpoint connector as it is the maximum lag for syncing consumer groups. The default is 100 records. |
Test the connection
After the connector is created:
-
Ensure that there are no errors in logs and in Redpanda Console.
-
Confirm that Redpanda topics are being replicated. You should see messages coming into the topics.
Use the Connectors API
When using the Connectors API, instead of specifying a value for source.cluster.sasl.username
and source.cluster.sasl.password
,
you can specify a value for source.cluster.sasl.jaas.config
.
Troubleshoot
Most MirrorMaker2 Source connector issues are reported as a failed task at the time of creation. Select Show Logs to view error details.
Message | Action |
---|---|
Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available. / LOGS: Timed out while checking for or creating topic 'mm2-offset-syncs.target.internal'. This could indicate a connectivity issue / TimeoutException: Timed out waiting for a node assignment |
Make sure broker URLs are correct and that the |
SaslAuthenticationException: SASL authentication failed: security: Invalid credentials |
Confirm that the username and password specified are correct. |
Terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue |
Error indicates that the SSL should be enabled using |
RecordTooLargeException: The message is N bytes (…) |
Use |
RecordTooLargeException: The request included (…) |
The target server is not able to receive messages because it is too large in size. Disabled compression can be a root cause. Consider enabling compression: |
Scheduler for MirrorSourceConnector caught exception in scheduled task: syncing topic ACLs |
MirrorMaker2 requires an authorizer to be configured by the broker side, but it is not. Change the |
TopicAuthorizationException: Topic authorization failed |
Confirm the service account for the source cluster contains describe and/or describe-configs ACLs. |
OffsetOutOfRangeException Fetch position FetchPosition{offset=0, … ] |
If the 0 offset for your topic does not exist in the source cluster, set |