# kafka_franz

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: kafka_franz
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: connect/components/outputs/kafka_franz
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: connect/components/outputs/kafka_franz.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/connect/components/outputs/kafka_franz.adoc
page-git-created-date: "2024-09-09"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/connect/components/outputs/kafka_franz.md -->

**Type:** Output ▼

[Output](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/outputs/kafka_franz/)[Input](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/inputs/kafka_franz/)

**Available in:** Cloud, [Self-Managed](https://docs.redpanda.com/connect/components/outputs/kafka_franz/%20%22View%20the%20Self-Managed%20version%20of%20this%20component%22)

> ⚠️ **WARNING: Deprecated in 4.68.0**
>
> Deprecated in 4.68.0
>
> This component is deprecated and will be removed in the next major version release. Please consider moving onto the unified [`redpanda` input](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/inputs/redpanda/) and [`redpanda` output](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/outputs/redpanda/) components.

The `kafka_franz` output writes a batch of messages to Kafka brokers and waits for acknowledgement before propagating any acknowledgments back to the input. This output often outperforms the traditional `kafka` output, as well as providing more useful logs and error messages.

This output uses the [Franz Kafka client library](https://github.com/twmb/franz-go).

#### Common

```yml
outputs:
  label: ""
  kafka_franz:
    seed_brokers: [] # No default (required)
    topic: "" # No default (required)
    key: "" # No default (optional)
    partition: "" # No default (optional)
    metadata:
      include_prefixes: []
      include_patterns: []
    max_in_flight: 10
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
```

#### Advanced

```yml
outputs:
  label: ""
  kafka_franz:
    seed_brokers: [] # No default (required)
    client_id: redpanda-connect
    tls:
      enabled: false
      skip_cert_verify: false
      enable_renegotiation: false
      root_cas: ""
      root_cas_file: ""
      client_certs: []
    sasl: [] # No default (optional)
    metadata_max_age: 1m
    request_timeout_overhead: 10s
    conn_idle_timeout: 20s
    tcp:
      connect_timeout: 0s
      keep_alive:
        idle: 15s
        interval: 15s
        count: 9
      tcp_user_timeout: 0s
    topic: "" # No default (required)
    key: "" # No default (optional)
    partition: "" # No default (optional)
    metadata:
      include_prefixes: []
      include_patterns: []
    timestamp_ms: "" # No default (optional)
    max_in_flight: 10
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    partitioner: "" # No default (optional)
    idempotent_write: true
    compression: "" # No default (optional)
    allow_auto_topic_creation: true
    timeout: 10s
    max_message_bytes: 1MiB
    broker_write_max_bytes: 100MiB
```

## [](#fields)Fields

### [](#allow_auto_topic_creation)`allow_auto_topic_creation`

Enables topics to be auto created if they do not exist when fetching their metadata.

**Type**: `bool`

**Default**: `true`

### [](#batching)`batching`

Allows you to configure a [batching policy](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/batching/).

**Type**: `object`

```yaml
# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m
```

### [](#batching-byte_size)`batching.byte_size`

The number of bytes at which the batch is flushed. Set to `0` to disable size-based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-check)`batching.check`

A [Bloblang query](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/) that should return a boolean value indicating whether a message should end a batch.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
check: this.type == "end_of_transaction"
```

### [](#batching-count)`batching.count`

The number of messages after which the batch is flushed. Set to `0` to disable count-based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-period)`batching.period`

The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as `100ms`, `1s`, or `5s`.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms
```

### [](#batching-processors)`batching.processors[]`

A list of [processors](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/about/) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op.

**Type**: `processor`

```yaml
# Examples:
processors:
  - archive:
      format: concatenate

# ---

processors:
  - archive:
      format: lines

# ---

processors:
  - archive:
      format: json_array
```

### [](#broker_write_max_bytes)`broker_write_max_bytes`

The maximum number of bytes this output can write to a broker connection in a single write. This field corresponds to Kafka’s `socket.request.max.bytes`.

**Type**: `string`

**Default**: `100MiB`

```yaml
# Examples:
broker_write_max_bytes: 128MB

# ---

broker_write_max_bytes: 50mib
```

### [](#client_id)`client_id`

An identifier for the client connection.

**Type**: `string`

**Default**: `redpanda-connect`

### [](#compression)`compression`

Set an explicit compression type (optional). The default preference is to use `snappy` when the broker supports it. Otherwise, use `none`.

**Type**: `string`

**Options**: `lz4`, `snappy`, `gzip`, `none`, `zstd`

### [](#conn_idle_timeout)`conn_idle_timeout`

The maximum duration that connections can remain idle before they are automatically closed. This field accepts Go duration format strings such as `100ms`, `1s`, or `5s`.

**Type**: `string`

**Default**: `20s`

### [](#idempotent_write)`idempotent_write`

Enables the idempotent write producer option. This requires the `IDEMPOTENT_WRITE` permission on `CLUSTER`. Disable this option if the `IDEMPOTENT_WRITE` permission is unavailable.

**Type**: `bool`

**Default**: `true`

### [](#key)`key`

An optional key to populate for each message. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

**Type**: `string`

### [](#max_in_flight)`max_in_flight`

The maximum number of batches to send in parallel at any given time.

**Type**: `int`

**Default**: `10`

### [](#max_message_bytes)`max_message_bytes`

The maximum space (in bytes) that an individual message may use. Messages larger than this value are rejected. This field corresponds to Kafka’s `max.message.bytes`.

**Type**: `string`

**Default**: `1MiB`

```yaml
# Examples:
max_message_bytes: 100MB

# ---

max_message_bytes: 50mib
```

### [](#metadata)`metadata`

Configure which metadata values are added to messages as headers. This allows you to pass additional context information along with your messages.

**Type**: `object`

### [](#metadata-include_patterns)`metadata.include_patterns[]`

Provide a list of explicit metadata key regular expression (re2) patterns to match against.

**Type**: `array`

**Default**: `[]`

```yaml
# Examples:
include_patterns:
  - .*

# ---

include_patterns:
  - _timestamp_unix$
```

### [](#metadata-include_prefixes)`metadata.include_prefixes[]`

Provide a list of explicit metadata key prefixes to match against.

**Type**: `array`

**Default**: `[]`

```yaml
# Examples:
include_prefixes:
  - foo_
  - bar_

# ---

include_prefixes:
  - kafka_

# ---

include_prefixes:
  - content-
```

### [](#metadata_max_age)`metadata_max_age`

The maximum period of time after which metadata is refreshed. This field accepts Go duration format strings such as `100ms`, `1s`, or `5s`.

Lower values provide more responsive topic and partition discovery but may increase broker load. Higher values reduce broker queries but can delay detection of topology changes.

**Type**: `string`

**Default**: `1m`

### [](#partition)`partition`

Set a partition for each message (optional). This field is only relevant when the `partitioner` is set to `manual`. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

You must provide an interpolation string that is a valid integer.

**Type**: `string`

```yaml
# Examples:
partition: ${! meta("partition") }
```

### [](#partitioner)`partitioner`

Override the default murmur2 hashing partitioner.

**Type**: `string`

| Option | Summary |
| --- | --- |
| least_backup | Chooses the least backed up partition (the partition with the fewest amount of buffered records). Partitions are selected per batch. |
| manual | Manually select a partition for each message, requires the field partition to be specified. |
| murmur2_hash | Kafka’s default hash algorithm that uses a 32-bit murmur2 hash of the key to compute which partition the record will be on. |
| round_robin | Round-robin’s messages through all available partitions. This algorithm has lower throughput and causes higher CPU load on brokers, but can be useful if you want to ensure an even distribution of records to partitions. |

### [](#request_timeout_overhead)`request_timeout_overhead`

Grants an additional buffer or overhead to requests that have timeout fields defined. This field is based on the behavior of Apache Kafka’s `request.timeout.ms` parameter, but with the option to extend the timeout deadline.

**Type**: `string`

**Default**: `10s`

### [](#sasl)`sasl[]`

Specify one or more methods or mechanisms of SASL authentication, which are attempted in order. If the broker supports the first SASL mechanism, all connections use it. If the first mechanism fails, the client picks the first supported mechanism. If the broker does not support any client mechanisms, all connections fail.

**Type**: `object`

```yaml
# Examples:
sasl:
  - mechanism: SCRAM-SHA-512
    password: bar
    username: foo
```

### [](#sasl-aws)`sasl[].aws`

Contains AWS specific fields for when the `mechanism` is set to `AWS_MSK_IAM`.

**Type**: `object`

### [](#sasl-aws-credentials)`sasl[].aws.credentials`

Optional manual configuration of AWS credentials to use. More information can be found in [Amazon Web Services](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/cloud/aws/).

**Type**: `object`

### [](#sasl-aws-credentials-from_ec2_role)`sasl[].aws.credentials.from_ec2_role`

Use the credentials of a host EC2 machine configured to assume [an IAM role associated with the instance](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2.html).

**Type**: `bool`

### [](#sasl-aws-credentials-id)`sasl[].aws.credentials.id`

The ID of credentials to use.

**Type**: `string`

### [](#sasl-aws-credentials-profile)`sasl[].aws.credentials.profile`

A profile from `~/.aws/credentials` to use.

**Type**: `string`

### [](#sasl-aws-credentials-role)`sasl[].aws.credentials.role`

A role ARN to assume.

**Type**: `string`

### [](#sasl-aws-credentials-role_external_id)`sasl[].aws.credentials.role_external_id`

An external ID to provide when assuming a role.

**Type**: `string`

### [](#sasl-aws-credentials-secret)`sasl[].aws.credentials.secret`

The secret for the credentials being used.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

### [](#sasl-aws-credentials-token)`sasl[].aws.credentials.token`

The token for the credentials being used, required when using short term credentials.

**Type**: `string`

### [](#sasl-aws-endpoint)`sasl[].aws.endpoint`

Allows you to specify a custom endpoint for the AWS API.

**Type**: `string`

### [](#sasl-aws-region)`sasl[].aws.region`

The AWS region to target.

**Type**: `string`

### [](#sasl-aws-tcp)`sasl[].aws.tcp`

TCP socket configuration.

**Type**: `object`

### [](#sasl-aws-tcp-connect_timeout)`sasl[].aws.tcp.connect_timeout`

Maximum amount of time a dial will wait for a connect to complete. Zero disables.

**Type**: `string`

**Default**: `0s`

### [](#sasl-aws-tcp-keep_alive)`sasl[].aws.tcp.keep_alive`

TCP keep-alive probe configuration.

**Type**: `object`

### [](#sasl-aws-tcp-keep_alive-count)`sasl[].aws.tcp.keep_alive.count`

Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.

**Type**: `int`

**Default**: `9`

### [](#sasl-aws-tcp-keep_alive-idle)`sasl[].aws.tcp.keep_alive.idle`

Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.

**Type**: `string`

**Default**: `15s`

### [](#sasl-aws-tcp-keep_alive-interval)`sasl[].aws.tcp.keep_alive.interval`

Duration between keep-alive probes. Zero defaults to 15s.

**Type**: `string`

**Default**: `15s`

### [](#sasl-aws-tcp-tcp_user_timeout)`sasl[].aws.tcp.tcp_user_timeout`

Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep\_alive.idle must be greater than this value per RFC 5482. Zero disables.

**Type**: `string`

**Default**: `0s`

### [](#sasl-extensions)`sasl[].extensions`

Key/value pairs to add to OAUTHBEARER authentication requests.

**Type**: `string`

### [](#sasl-mechanism)`sasl[].mechanism`

The SASL mechanism to use.

**Type**: `string`

| Option | Summary |
| --- | --- |
| AWS_MSK_IAM | AWS IAM based authentication as specified by the 'aws-msk-iam-auth' java library. |
| OAUTHBEARER | OAuth Bearer based authentication. |
| PLAIN | Plain text authentication. |
| REDPANDA_CLOUD_SERVICE_ACCOUNT | Redpanda Cloud Service Account authentication when running in Redpanda Cloud. |
| SCRAM-SHA-256 | SCRAM based authentication as specified in RFC5802. |
| SCRAM-SHA-512 | SCRAM based authentication as specified in RFC5802. |
| none | Disable sasl authentication |

### [](#sasl-password)`sasl[].password`

A password to provide for PLAIN or SCRAM-\* authentication.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

**Default**: `""`

### [](#sasl-token)`sasl[].token`

The token to use for a single session’s OAUTHBEARER authentication.

**Type**: `string`

**Default**: `""`

### [](#sasl-username)`sasl[].username`

A username to provide for PLAIN or SCRAM-\* authentication.

**Type**: `string`

**Default**: `""`

### [](#seed_brokers)`seed_brokers[]`

A list of broker addresses to connect to in order. Use commas to separate multiple addresses in a single list item.

**Type**: `array`

```yaml
# Examples:
seed_brokers:
  - "localhost:9092"

# ---

seed_brokers:
  - "foo:9092"
  - "bar:9092"

# ---

seed_brokers:
  - "foo:9092,bar:9092"
```

### [](#tcp)`tcp`

Configure TCP socket-level settings to optimize network performance and reliability. These low-level controls are useful for:

-   **High-latency networks**: Increase `connect_timeout` to allow more time for connection establishment

-   **Long-lived connections**: Configure `keep_alive` settings to detect and recover from stale connections

-   **Unstable networks**: Tune keep-alive probes to balance between quick failure detection and avoiding false positives

-   **Linux systems with specific requirements**: Use `tcp_user_timeout` (Linux 2.6.37+) to control data acknowledgment timeouts


Most users should keep the default values. Only modify these settings if you’re experiencing connection stability issues or have specific network requirements.

**Type**: `object`

### [](#tcp-connect_timeout)`tcp.connect_timeout`

Maximum amount of time a dial will wait for a connect to complete. Zero disables.

**Type**: `string`

**Default**: `0s`

### [](#tcp-keep_alive)`tcp.keep_alive`

TCP keep-alive probe configuration.

**Type**: `object`

### [](#tcp-keep_alive-count)`tcp.keep_alive.count`

Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.

**Type**: `int`

**Default**: `9`

### [](#tcp-keep_alive-idle)`tcp.keep_alive.idle`

Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.

**Type**: `string`

**Default**: `15s`

### [](#tcp-keep_alive-interval)`tcp.keep_alive.interval`

Duration between keep-alive probes. Zero defaults to 15s.

**Type**: `string`

**Default**: `15s`

### [](#tcp-tcp_user_timeout)`tcp.tcp_user_timeout`

Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep\_alive.idle must be greater than this value per RFC 5482. Zero disables.

**Type**: `string`

**Default**: `0s`

### [](#timeout)`timeout`

The maximum period of time to wait for message sends before abandoning the request and retrying.

**Type**: `string`

**Default**: `10s`

### [](#timestamp_ms)`timestamp_ms`

Set a timestamp (in milliseconds) for each message (optional). When left empty, the current timestamp is used. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

**Type**: `string`

```yaml
# Examples:
timestamp_ms: ${! timestamp_unix_milli() }

# ---

timestamp_ms: ${! metadata("kafka_timestamp_ms") }
```

### [](#tls)`tls`

Configure Transport Layer Security (TLS) settings to secure network connections. This includes options for standard TLS as well as mutual TLS (mTLS) authentication where both client and server authenticate each other using certificates. Key configuration options include `enabled` to enable TLS, `client_certs` for mTLS authentication, `root_cas`/`root_cas_file` for custom certificate authorities, and `skip_cert_verify` for development environments.

**Type**: `object`

### [](#tls-client_certs)`tls.client_certs[]`

A list of client certificates for mutual TLS (mTLS) authentication. Configure this field to enable mTLS, authenticating the client to the server with these certificates.

You must set `tls.enabled: true` for the client certificates to take effect.

**Certificate pairing rules**: For each certificate item, provide either:

-   Inline PEM data using both `cert` **and** `key` or

-   File paths using both `cert_file` **and** `key_file`.


Mixing inline and file-based values within the same item is not supported.

**Type**: `object`

**Default**: `[]`

```yaml
# Examples:
client_certs:
  - cert: foo
    key: bar

# ---

client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key
```

### [](#tls-client_certs-cert)`tls.client_certs[].cert`

A plain text certificate to use.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-cert_file)`tls.client_certs[].cert_file`

The path of a certificate to use.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-key)`tls.client_certs[].key`

A plain text certificate key to use.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-key_file)`tls.client_certs[].key_file`

The path of a certificate key to use.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-password)`tls.client_certs[].password`

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete `pbeWithMD5AndDES-CBC` algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
password: foo

# ---

password: ${KEY_PASSWORD}
```

### [](#tls-enable_renegotiation)`tls.enable_renegotiation`

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message `local error: tls: no renegotiation`.

**Type**: `bool`

**Default**: `false`

### [](#tls-enabled)`tls.enabled`

Whether custom TLS settings are enabled.

**Type**: `bool`

**Default**: `false`

### [](#tls-root_cas)`tls.root_cas`

Specify a root certificate authority to use (optional). This is a string that represents a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for inline certificate data or `root_cas_file` for file-based certificate loading.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----
```

### [](#tls-root_cas_file)`tls.root_cas_file`

Specify the path to a root certificate authority file (optional). This is a file, often with a `.pem` extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for file-based certificate loading or `root_cas` for inline certificate data.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
root_cas_file: ./root_cas.pem
```

### [](#tls-skip_cert_verify)`tls.skip_cert_verify`

Whether to skip server-side certificate verification. Set to `true` only for testing environments as this reduces security by disabling certificate validation. When using self-signed certificates or in development, this may be necessary, but should never be used in production. Consider using `root_cas` or `root_cas_file` to specify trusted certificates instead of disabling verification entirely.

**Type**: `bool`

**Default**: `false`

### [](#topic)`topic`

A topic to write messages to. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

**Type**: `string`