Docs Cloud Develop Kafka Connect MirrorMaker2 Source Connector Create a MirrorMaker2 Source Connector You can use a MirrorMaker2 Source connector to import messages from another Kafka cluster. You can also use it to: Replicate messages from an external Kafka or Redpanda cluster. Create topics on the local cluster, with a configuration matching external topics. Replicate topic access-control lists (ACLs). Prerequisites The external Kafka cluster must be accessible. A service account with full access to the external cluster must be available. You can also use a service account with read-only ACLs when the offset-syncs topic location is set to target. You must have describe and/or describe-configs ACLs for the connector to read topic configurations on the source cluster and create the topics on the target cluster, unless you create the topics yourself. Limitations ACLs are copied, but service accounts are not created. Only topic ACLs are copied (group ACLs are not). Only ACLs for topics matching the connector configuration are copied (write ACLs are not copied). All permissions ACLs are downgraded to read-only. Create a MirrorMaker2 Source connector To create the MirrorMaker2 Source connector: In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector. Select Import from Kafka cluster topics. On the Create Connector form page, specify the following required connector configuration options: Property name Property key Description Regexes of topics to import topics Comma-separated topic names and regexes you want to replicate. Source cluster broker list source.cluster.bootstrap.servers A comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers regardless of which servers are specified here for bootstrapping. This list only impacts the initial hosts used to discover the full set of servers, and should be in the form host1:port1,host2:port2,.... Because these servers are only used for the initial connection to discover the full cluster membership (which may change dynamically), it need not contain the full set of servers (you may want more than one, though, in case a server is down). Source cluster security protocol source.cluster.security.protocol The protocol to use to communicate with source brokers. Default is PLAINTEXT. Source cluster SASL mechanism source.cluster.sasl.mechanism SASL mechanism used for connections to source cluster. Default is PLAIN. Source cluster SASL username source.cluster.sasl.username SASL username used for connections to source cluster. Source cluster SASL password source.cluster.sasl.password SASL password used for connections to source cluster. Sync topic configs enabled sync.topic.configs.enabled Specifies whether to periodically configure remote topics to match their corresponding upstream topics. Sync topic ACLs enabled sync.topic.acls.enabled Specifies whether or not to periodically configure remote topic ACLs to match their corresponding upstream topics. Connector name name Globally-unique name to use for this connector. Click Next. Review the connector properties specified, then click Create. Offsets are not guaranteed to match between the source and target. For example, if data-retention deletes occur on the source topic and the earliest offset is #5000, then when that event is created on the target topic the offset for that event will be #0. Events written on the target topic use the timestamp that was set on the source event. For example, if the source event has a timestamp 2023-05-22 17:00, then this would also be the timestamp on the target event. Advanced MirrorMaker2 Source connector configuration In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page: Property name Property key Description Source cluster SSL custom certificate source.cluster.ssl.truststore.certificates Trusted certificates in the PEM format. Source cluster SSL keystore key source.cluster.ssl.keystore.key Private key in the PEM format. Source cluster SSL keystore certificate chain source.cluster.ssl.keystore.certificate.chain Certificate chain in the PEM format. Sync topic configs interval seconds sync.topic.configs.interval.seconds Frequency of topic config sync. Sync topic ACLs interval seconds sync.topic.acls.interval.seconds Frequency of topic ACL sync. Topics exclude topics.exclude Excluded topics. Supports comma-separated topic names and regexes. Source cluster alias source.cluster.alias When using DefaultReplicationPolicy, topic names will be prefixed with it. Replication policy class replication.policy.class Class that defines the remote topic naming convention. Use IdentityReplicationPolicy to preserve topic names. DefaultReplicationPolicy prefixes the topic with the source cluster alias. Replication factor replication.factor Replication factor for newly created remote topics. Set -1 for cluster default. Refresh topics interval seconds refresh.topics.interval.seconds Frequency of topic refresh. Offset-Syncs topic location offset-syncs.topic.location The location (source or target) of the offset-syncs topic. The default is source. Offset-Syncs topic replication factor offset-syncs.topic.replication.factor Replication factor for offset-syncs topic. The default is -1. Config properties exclude config.properties.exclude Topic config properties that should not be replicated. Supports comma-separated property names and regexes. Compression type producer.override.compression.type The compression type for all data generated by the producer. The default is none (no compression). Max size of a request producer.override.max.request.size The maximum size of a request in bytes. The default is 1048576. Auto offset reset consumer.auto.offset.reset What to do when there is no initial offset in Kafka, or if the current offset does not exist any more on the server (for example, because that data has been deleted). 'earliest' - automatically reset the offset to the earliest offset. 'latest' - automatically reset the offset to the latest offset. 'none' - throw exception to the consumer if no previous offset is found for the consumer’s group. Offset lag max offset.lag.max How out-of-sync a remote partition can be before it is resynced. This setting impacts the MirrorMaker2 Checkpoint connector as it is the maximum lag for syncing consumer groups. The default is 100 records. Map data The value converter does not require any schema; it copies data as bytes. Test the connection After the connector is created: Ensure that there are no errors in logs and in Redpanda Console. Confirm that Redpanda topics are being replicated. You should see messages coming into the topics. Use the Connectors API When using the Connectors API, instead of specifying a value for source.cluster.sasl.username and source.cluster.sasl.password, you can specify a value for source.cluster.sasl.jaas.config. Troubleshoot Most MirrorMaker2 Source connector issues are reported as a failed task at the time of creation. Select Show Logs to view error details. Message Action Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available. / LOGS: Timed out while checking for or creating topic 'mm2-offset-syncs.target.internal'. This could indicate a connectivity issue / TimeoutException: Timed out waiting for a node assignment Make sure broker URLs are correct and that the security.protocol is correct. SaslAuthenticationException: SASL authentication failed: security: Invalid credentials Confirm that the username and password specified are correct. Terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue Error indicates that the SSL should be enabled using Source cluster security protocol (use SSL or SASL_SSL). RecordTooLargeException: The message is N bytes (…) Use producer.override.max.request.size property to change max request size. RecordTooLargeException: The request included (…) The target server is not able to receive messages because it is too large in size. Disabled compression can be a root cause. Consider enabling compression: "Compression type": "snappy", Scheduler for MirrorSourceConnector caught exception in scheduled task: syncing topic ACLs MirrorMaker2 requires an authorizer to be configured by the broker side, but it is not. Change the Sync topic ACLs enabled MirrorMaker2 property to false (default is true) to disable ACL syncing. TopicAuthorizationException: Topic authorization failed Confirm the service account for the source cluster contains describe and/or describe-configs ACLs. OffsetOutOfRangeException Fetch position FetchPosition{offset=0, … ] If the 0 offset for your topic does not exist in the source cluster, set Auto offset reset to either earliest or latest. Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution JDBC Source Connector MirrorMaker2 Checkpoint Connector