Docs Connect Components Catalog Inputs redpanda_migrator redpanda_migrator Type: Input ▼ InputOutput Available in: Cloud, Self-Managed Unified Kafka consumer for migrating data between Kafka/Redpanda clusters. Use this input with the redpanda_migrator output to safely transfer topic data, ACLs, schemas, and consumer group offsets between clusters. This component is designed for migration scenarios. Introduced in version 4.67.5. Common Advanced inputs: label: "" redpanda_migrator: seed_brokers: [] # No default (required) topics: [] # No default (required) regexp_topics: false transaction_isolation_level: read_uncommitted consumer_group: "" # No default (optional) schema_registry: url: "" # No default (required) tls: enabled: false skip_cert_verify: false enable_renegotiation: false root_cas: "" root_cas_file: "" client_certs: cert: "" key: "" cert_file: "" key_file: "" password: "" oauth: enabled: false consumer_key: "" consumer_secret: "" access_token: "" access_token_secret: "" basic_auth: enabled: false username: "" password: "" jwt: enabled: false private_key_file: "" signing_method: "" claims: {} headers: {} auto_replay_nacks: true inputs: label: "" redpanda_migrator: seed_brokers: [] # No default (required) client_id: benthos tls: enabled: false skip_cert_verify: false enable_renegotiation: false root_cas: "" root_cas_file: "" client_certs: cert: "" key: "" cert_file: "" key_file: "" password: "" sasl: mechanism: "" # No default (required) username: "" password: "" token: "" extensions: "" # No default (optional) aws: region: "" # No default (optional) endpoint: "" # No default (optional) credentials: profile: "" # No default (optional) id: "" # No default (optional) secret: "" # No default (optional) token: "" # No default (optional) from_ec2_role: "" # No default (optional) role: "" # No default (optional) role_external_id: "" # No default (optional) metadata_max_age: 5m request_timeout_overhead: 10s conn_idle_timeout: 20s topics: [] # No default (required) regexp_topics: false rack_id: "" instance_id: "" rebalance_timeout: 45s session_timeout: 1m heartbeat_interval: 3s start_offset: earliest fetch_max_bytes: 50MiB fetch_max_wait: 5s fetch_min_bytes: 1B fetch_max_partition_bytes: 1MiB transaction_isolation_level: read_uncommitted consumer_group: "" # No default (optional) commit_period: 5s partition_buffer_bytes: 1MB topic_lag_refresh_period: 5s max_yield_batch_bytes: 32KB schema_registry: url: "" # No default (required) tls: enabled: false skip_cert_verify: false enable_renegotiation: false root_cas: "" root_cas_file: "" client_certs: cert: "" key: "" cert_file: "" key_file: "" password: "" oauth: enabled: false consumer_key: "" consumer_secret: "" access_token: "" access_token_secret: "" basic_auth: enabled: false username: "" password: "" jwt: enabled: false private_key_file: "" signing_method: "" claims: {} headers: {} auto_replay_nacks: true The redpanda_migrator input: Reads a batch of messages from a broker. Waits for the redpanda_migrator output to acknowledge the writes before updating the Kafka consumer group offset. Provides the same delivery guarantees and ordering semantics as the redpanda input. Specify a consumer group to make this input consume one or more topics and automatically balance the topic partitions across any other connected clients with the same consumer group. Otherwise, topics are consumed in their entirety or with explicit partitions. This input requires a corresponding redpanda_migrator output in the same pipeline. Each pipeline must have both input and output components configured. For capabilities, guarantees, scheduling, and examples, see the output documentation. Requirements Must be paired with a redpanda_migrator output in the same pipeline. Requires access to a source Kafka or Redpanda cluster. Consumer group configuration is recommended for partition balancing. Multiple migrator pairs When using multiple migrator pairs in a single pipeline, coordination is based on the label field. The label of the input and output must match exactly for correct pairing. If labels do not match, migration fails for that pair. Performance tuning for high throughput For workloads with high message rates or large messages, adjust these fields: partition_buffer_bytes: Increase to 10MB or higher (default: 1MB) max_yield_batch_bytes: Increase to 100MB or higher (default: 10MB) Increasing these values allows the consumer to buffer more data per partition and yield larger batches, reducing overhead and improving throughput. Higher values increase memory usage—monitor system resources accordingly. Metrics This input emits an input_redpanda_migrator_lag metric with topic and partition labels for each consumed topic. This metric records the number of produced messages that remain to be read from each topic/partition pair by the specified consumer group. Monitor this metric to track migration progress and detect bottlenecks. Metadata This input adds the following metadata fields to each message: kafka_key kafka_topic kafka_partition kafka_offset kafka_lag kafka_timestamp_ms kafka_timestamp_unix All record headers Fields auto_replay_nacks Whether to automatically replay messages that are rejected (nacked) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure. Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation. Type: bool Default: true client_id An identifier for the client connection. Type: string Default: benthos commit_period The period of time between each commit of the current partition offsets. Offsets are always committed during shutdown. Type: string Default: 5s conn_idle_timeout The maximum duration that connections can remain idle before they are automatically closed. This field accepts Go duration format strings such as 100ms, 1s, or 5s. Type: string Default: 20s consumer_group An optional consumer group. When specified, the partitions of specified topics are automatically distributed across consumers sharing a consumer group, and partition offsets are automatically committed and resumed under this name. Consumer groups are not supported when explicit partitions are specified to consume from in the topics field. Type: string fetch_max_bytes The maximum number of bytes that a broker tries to send during a fetch. If individual records are larger than the fetch_max_bytes value, brokers still send them. Type: string Default: 50MiB fetch_max_partition_bytes The maximum number of bytes that are consumed from a single partition in a fetch request. This field is equivalent to the Java setting fetch.max.partition.bytes. If a single batch is larger than the fetch_max_partition_bytes value, the batch is still sent so that the client can make progress. Type: string Default: 1MiB fetch_max_wait The maximum period of time a broker can wait for a fetch response to reach the required minimum number of bytes (fetch_min_bytes). Type: string Default: 5s fetch_min_bytes The minimum number of bytes that a broker tries to send during a fetch. This field is equivalent to the Java setting fetch.min.bytes. Type: string Default: 1B heartbeat_interval When you specify a consumer_group, heartbeat_interval sets how frequently a consumer group member should send heartbeats to Apache Kafka. Apache Kafka uses heartbeats to make sure that a group member’s session is active. You must set heartbeat_interval to less than one-third of session_timeout. This field is equivalent to the Java heartbeat.interval.ms setting and accepts Go duration format strings such as 10s or 2m. Type: string Default: 3s instance_id When you specify a consumer_group, assign a unique value to instance_id to define the group’s static membership, which can prevent unnecessary rebalances during reconnections. When you assign an instance ID, the client does not automatically leave the consumer group when it disconnects. To remove the client, you must use an external admin command on behalf of the instance ID. Type: string Default: "" max_yield_batch_bytes The maximum size (in bytes) for each batch yielded by this input. When routed to a redpanda output without modification this would roughly translate to the batch.bytes config field of a traditional producer. Type: string Default: 32KB metadata_max_age The maximum period of time after which metadata is refreshed. This field accepts Go duration format strings such as 100ms, 1s, or 5s. Type: string Default: 5m partition_buffer_bytes A buffer size (in bytes) for each consumed partition, which allows the internal queuing of records before they are flushed. Increasing this value may improve throughput but results in higher memory utilization. Each buffer can grow slightly beyond this value. Type: string Default: 1MB rack_id A rack identifier for this client. Type: string Default: "" rebalance_timeout When you specify a consumer_group, rebalance_timeout sets a time limit for all consumer group members to complete their work and commit offsets after a rebalance has begun. The timeout excludes the time taken to detect a failed or late heartbeat, which indicates a rebalance is required. This field accepts Go duration format strings such as 100ms, 1s, or 5s. Type: string Default: 45s regexp_topics Whether listed topics are interpreted as regular expression patterns for matching multiple topics. When topics are specified with explicit partitions, this field must remain set to false. Type: bool Default: false request_timeout_overhead Grants an additional buffer or overhead to requests that have timeout fields defined. This field is based on the behavior of Apache Kafka’s request.timeout.ms parameter. Type: string Default: 10s sasl[] Specify one or more methods of SASL authentication, which are tried in order. If the broker supports the first mechanism, all connections use that mechanism. If the first mechanism fails, the client picks the first supported mechanism. Connections fail if the broker does not support any client mechanisms. Type: object # Examples: sasl: - mechanism: SCRAM-SHA-512 password: bar username: foo sasl[].aws Contains AWS specific fields for when the mechanism is set to AWS_MSK_IAM. Type: object sasl[].aws.credentials Optional manual configuration of AWS credentials to use. More information can be found in Amazon Web Services. Type: object sasl[].aws.credentials.from_ec2_role Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance. Requires version 4.2.0 or later. Type: bool sasl[].aws.credentials.id The ID of credentials to use. Type: string sasl[].aws.credentials.profile A profile from ~/.aws/credentials to use. Type: string sasl[].aws.credentials.role A role ARN to assume. Type: string sasl[].aws.credentials.role_external_id An external ID to provide when assuming a role. Type: string sasl[].aws.credentials.secret The secret for the credentials being used. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string sasl[].aws.credentials.token The token for the credentials being used, required when using short term credentials. Type: string sasl[].aws.endpoint Allows you to specify a custom endpoint for the AWS API. Type: string sasl[].aws.region The AWS region to target. Type: string sasl[].extensions Key/value pairs to add to OAUTHBEARER authentication requests. Type: string sasl[].mechanism The SASL mechanism to use. Type: string Option Summary AWS_MSK_IAM AWS IAM based authentication as specified by the 'aws-msk-iam-auth' java library. OAUTHBEARER OAuth Bearer based authentication. PLAIN Plain text authentication. SCRAM-SHA-256 SCRAM based authentication as specified in RFC5802. SCRAM-SHA-512 SCRAM based authentication as specified in RFC5802. none Disable sasl authentication sasl[].password A password to provide for PLAIN or SCRAM-* authentication. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" sasl[].token The token to use for a single session’s OAUTHBEARER authentication. Type: string Default: "" sasl[].username A username to provide for PLAIN or SCRAM-* authentication. Type: string Default: "" schema_registry Configuration for schema registry integration. Enables migration of schema subjects, versions, and compatibility settings between clusters. Type: object schema_registry.basic_auth Allows you to specify basic authentication. Type: object schema_registry.basic_auth.enabled Whether to use basic authentication in requests. Type: bool Default: false schema_registry.basic_auth.password A password to authenticate with. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" schema_registry.basic_auth.username A username to authenticate as. Type: string Default: "" schema_registry.jwt Beta Allows you to specify JWT authentication. Type: object schema_registry.jwt.claims A value used to identify the claims that issued the JWT. Type: object Default: {} schema_registry.jwt.enabled Whether to use JWT authentication in requests. Type: bool Default: false schema_registry.jwt.headers Add optional key/value headers to the JWT. Type: object Default: {} schema_registry.jwt.private_key_file A file with the PEM encoded via PKCS1 or PKCS8 as private key. Type: string Default: "" schema_registry.jwt.signing_method A method used to sign the token such as RS256, RS384, RS512 or EdDSA. Type: string Default: "" schema_registry.oauth Allows you to specify open authentication via OAuth version 1. Type: object schema_registry.oauth.access_token A value used to gain access to the protected resources on behalf of the user. Type: string Default: "" schema_registry.oauth.access_token_secret A secret provided in order to establish ownership of a given access token. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" schema_registry.oauth.consumer_key A value used to identify the client to the service provider. Type: string Default: "" schema_registry.oauth.consumer_secret A secret used to establish ownership of the consumer key. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" schema_registry.oauth.enabled Whether to use OAuth version 1 in requests. Type: bool Default: false schema_registry.tls Custom TLS settings can be used to override system defaults. Type: object schema_registry.tls.client_certs[] A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both. Type: object Default: [] # Examples: client_certs: - cert: foo key: bar - cert_file: ./example.pem key_file: ./example.key schema_registry.tls.client_certs[].cert A plain text certificate to use. Type: string Default: "" schema_registry.tls.client_certs[].cert_file The path of a certificate to use. Type: string Default: "" schema_registry.tls.client_certs[].key A plain text certificate key to use. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" schema_registry.tls.client_certs[].key_file The path of a certificate key to use. Type: string Default: "" schema_registry.tls.client_certs[].password A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format. Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" # Examples: password: foo password: ${KEY_PASSWORD} schema_registry.tls.enable_renegotiation Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation. Requires version 3.45.0 or later. Type: bool Default: false schema_registry.tls.enabled Whether custom TLS settings are enabled. Type: bool Default: false schema_registry.tls.root_cas An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" # Examples: root_cas: |- -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- schema_registry.tls.root_cas_file An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. Type: string Default: "" # Examples: root_cas_file: ./root_cas.pem schema_registry.tls.skip_cert_verify Whether to skip server side certificate verification. Type: bool Default: false schema_registry.url The base URL of the schema registry service. Required for schema migration functionality. Type: string # Examples: url: http://localhost:8081 url: https://schema-registry.example.com:8081 seed_brokers[] A list of broker addresses to connect to in order. Use commas to separate multiple addresses in a single list item. Type: array # Examples: seed_brokers: - "localhost:9092" - "foo:9092" - "bar:9092" - "foo:9092,bar:9092" session_timeout When you specify a consumer_group, session_timeout sets the maximum interval between heartbeats sent by a consumer group member to the broker. If a broker doesn’t receive a heartbeat from a group member before the timeout expires, it removes the member from the consumer group and initiates a rebalance. This field accepts Go duration format strings such as 100ms, 1s, or 5s. Type: string Default: 1m start_offset Specify the offset from which this input starts or restarts consuming messages. Restarts occur when the OffsetOutOfRange error is seen during a fetch. Type: string Default: earliest Option Summary committed Prevents consuming a partition in a group if the partition has no prior commits. Corresponds to Kafka’s auto.offset.reset=none option earliest Start from the earliest offset. Corresponds to Kafka’s auto.offset.reset=earliest option. latest Start from the latest offset. Corresponds to Kafka’s auto.offset.reset=latest option. tls Configure Transport Layer Security (TLS) settings to secure network connections. This includes options for standard TLS as well as mutual TLS (mTLS) authentication where both client and server authenticate each other using certificates. Key configuration options include enabled to enable TLS, client_certs for mTLS authentication, root_cas/root_cas_file for custom certificate authorities, and skip_cert_verify for development environments. Type: object tls.client_certs[] A list of client certificates for mutual TLS (mTLS) authentication. Configure this field to enable mTLS, authenticating the client to the server with these certificates. You must set tls.enabled: true for the client certificates to take effect. Certificate pairing rules: For each certificate item, provide either: Inline PEM data using both cert and key or File paths using both cert_file and key_file. Mixing inline and file-based values within the same item is not supported. Type: object Default: [] # Examples: client_certs: - cert: foo key: bar - cert_file: ./example.pem key_file: ./example.key tls.client_certs[].cert A plain text certificate to use. Type: string Default: "" tls.client_certs[].cert_file The path of a certificate to use. Type: string Default: "" tls.client_certs[].key A plain text certificate key to use. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" tls.client_certs[].key_file The path of a certificate key to use. Type: string Default: "" tls.client_certs[].password A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format. Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" # Examples: password: foo password: ${KEY_PASSWORD} tls.enable_renegotiation Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation. Requires version 3.45.0 or later. Type: bool Default: false tls.enabled Whether custom TLS settings are enabled. Type: bool Default: false tls.root_cas Specify a root certificate authority to use (optional). This is a string that represents a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for inline certificate data or root_cas_file for file-based certificate loading. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" # Examples: root_cas: |- -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- tls.root_cas_file Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for file-based certificate loading or root_cas for inline certificate data. Type: string Default: "" # Examples: root_cas_file: ./root_cas.pem tls.skip_cert_verify Whether to skip server-side certificate verification. Set to true only for testing environments as this reduces security by disabling certificate validation. When using self-signed certificates or in development, this may be necessary, but should never be used in production. Consider using root_cas or root_cas_file to specify trusted certificates instead of disabling verification entirely. Type: bool Default: false topic_lag_refresh_period The interval between refresh cycles. During each cycle, this input queries the Redpanda Connect server to calculate the topic lag minus the number of produced messages that remain to be read from each topic/partition pair by the specified consumer group. This field accepts Go duration format strings such as 100ms, 1s, or 5s. Type: string Default: 5s topics[] A list of topics to consume from. Use commas to separate multiple topics in a single element. When a consumer_group is specified, partitions are automatically distributed across consumers of a topic. Otherwise, all partitions are consumed. Alternatively, you can specify explicit partitions to consume by using a colon after the topic name. For example, foo:0 would consume the partition 0 of the topic foo. This syntax supports ranges. For example, foo:0-10 would consume partitions 0 through to 10 inclusive. It is also possible to specify an explicit offset to consume from by adding another colon after the partition. For example, foo:0:10 would consume the partition 0 of the topic foo starting from the offset 10. If the offset is not present (or remains unspecified) then the field start_offset determines which offset to start from. Type: array # Examples: topics: - foo - bar - things.* - "foo,bar" - "foo:0" - "bar:1" - "bar:3" - "foo:0,bar:1,bar:3" - "foo:0-5" transaction_isolation_level The isolation level for handling transactional messages. This setting determines how transactions are processed and affects data consistency guarantees. Type: string Default: read_uncommitted Option Summary read_committed If set, only committed transactional records are processed. read_uncommitted If set, then uncommitted records are processed. Troubleshooting Ensure the input and output label fields match exactly. Both input and output must be present in the pipeline. Verify consumer group configuration for partition balancing. Monitor the lag metric for stalled migration. Suggested reading redpanda_migrator output Migrating from legacy components Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! redpanda_common legacy_redpanda_migrator