Configure Producers

Along with brokers, topics, and consumers, producers are one of the essential components of a Redpanda deployment. Within streaming applications, producers are responsible for sending data to brokers, where the data is stored in topics made up of partitions. Producers are client applications that write data to Redpanda in the form of events. Producers send messages to a topic in the cluster. Producers communicate with Redpanda through the Kafka API.

When a producer publishes a message to a Redpanda cluster, it sends it to a specific partition. Every event consists of a key and value. To select which partition to produce to, if the key is blank the producer publishes in a round-robin fashion between the topic’s partitions. If a key is provided, then the partitioner hashes the key using the murmur2 algorithm and modulates across the number of partitions.

Producer acknowledgment defines how producer clients and broker leaders communicate their status while transferring data. The acks (abbreviation of "acknowledgments") parameter controls the behavior of the producer when a message is acknowledged by the broker, and is useful when you are working with Redpanda clusters.

While it may seem logical that the producer should wait for the broker to acknowledge the message in every circumstance, there are cases when you can gain more by letting the messages fly to the broker, unconfirmed.

Sometimes trading speed for durability is the best strategy, but sometimes you may want it the other way around. A message is considered "acknowledged" when replication of the message to the majority of other brokers responsible for the partition in the cluster is complete.

Producer configuration options

There are a myriad of client configuration options for all components of a cluster. Finding the optimal combination of these components depends largely on your specific use case. The following sections describe producer configuration options, and how to combine these options with other component configurations for optimal results.

Producer acknowledgement settings

acks=0

The producer doesn’t wait for acknowledgments from the leader and doesn’t retry sending messages. This increases throughput and lowers latency of the system at the expense of durability and data loss.

This option allows a producer to immediately consider a message acknowledged when it is sent to the Redpanda broker. This means that a producer does not have to wait for any kind of response from the Redpanda broker. This is the least safe option because a leader-node crash can cause data loss if the data has not yet replicated to the other nodes in the replica set. However, this setting is useful in cases where you want to optimize for the highest throughput, and are willing to risk some data loss.

Because of the lack of guarantees, this setting is the most network bandwidth- efficient. This makes it useful for use cases like IoT/sensor data collection, where updates are periodic or stateless and you can afford some degree of data loss, but prefer gathering as much data as possible in a given time interval.

acks=1

The producer waits for an acknowledgment from the leader, but it doesn’t wait for the leader to get acknowledgments from followers. This setting doesn’t prioritize throughput, latency, or durability. Instead, acks=1 attempts to provide a balance between all of them.

Replication is not guaranteed with this setting because it happens in the background, after the leader broker sends an acknowledgment to the producer. This setting could result in data loss if the leader broker crashes before any followers manage to replicate the message or if a majority of replica nodes go down at the same time before flushing the message to the disk.

acks=all

The producer receives an acknowledgment after the majority of (implicitly, all) replicas acknowledge the message. This increases durability at the expense of lower throughput and increased latency.

Sometimes referred to as acks = -1, this option instructs the broker that replication is considered complete when the message has been replicated (and fsynced, meaning flushed to disk, making it durable in case of broker crashes) to the majority of the brokers responsible for the partition in the cluster. As soon as the fsync call is complete, the message is considered acknowledged and is made visible to readers.

This parameter has an important distinction compared to Kafka’s behavior. In Kafka, a message is considered acknowledged without the requirement that it has been fsynced. Messages that have not been flushed to disk will be lost in the event of a broker crash. So when using acks=all, the Redpanda default configuration is more resilient than Kafka’s.

retries

This parameter controls the number of times a message is re-sent to the broker if the broker fails to acknowledge it. This is essentially the same as if the client application resends the erroneous message after receiving an error response. The default value of retries in most client libraries is 0. This means that if the send fails, the message is not re-sent at all.

If you increase this number to a higher value, be sure to take a look at the max.in.flight.requests.per.connection value as well, as leaving that parameter at its default value can potentially cause ordering issues in the target topic where the messages arrive. This occurs if two batches are sent to a single partition and the first fails and is retired, but the second succeeds so the records in the second batch may appear first.

max.in.flight.requests.per.connection

This parameter controls how many unacknowledged messages are allowed to be sent to the broker simultaneously at any given time. The default value is

  1. If you set this parameter to 1, then the producer will not send any more messages until the previous one is either acknowledged or an error happens, which can prompt a retry. If you set this parameter to a value higher than 1, then the producer will send more messages at the same time, which can help to increase throughput, but add a risk of message reordering if retries are enabled.

In cases when you configure the producer to be idempotent, up to five requests can be guaranteed to be in flight with the order preserved.

enable.idempotence

To enable idempotence, set enable.idempotence to true (the default) in your Redpanda configuration.

When idempotence is enabled, the producer will try to make sure that exactly one copy of every message is written to the broker. When set to false, the producer retries sending a message for any reason (such as transient errors like brokers not being available or not enough replicas exception), it can lead to duplicates.

In most client libraries enable.idempotence is set to true by default. Internally, this is implemented using a special identifier that is assigned to every producer (the producer ID or “PID”). This ID, along with a "sequence number", is included in every message that is being sent to the broker. The broker will check if the PID/sequence number combination is larger than the previous one and, if not, it will discard the message.

To guarantee true idempotent behavior, you must also set acks=all to ensure all brokers record messages in order, even in the event of node failures. In this configuration, both the producer and the broker prefer safety and durability over throughput.

Idempotence is only guaranteed within a session. A session starts once the producer is instantiated and a connection is established between the client and Redpanda broker. When the connection is closed, the session ends.

If your application code retries a request, the producer client will assign a new ID to that request, which may lead to duplicate messages.

Message batching

Batching is an efficient way to save on both network bandwidth and disk size as messages can be compressed easier.

When a producer prepares to send messages to a broker, it first fills up a buffer. When this buffer is full, the producer compresses (if instructed to do so) and sends out this batch of messages to the broker. The number of batches that can be sent in a single request to the broker is limited by the max.request.size parameter. The number of requests that can simultaneously be in this "sending" state is controlled by the max.in.flight.requests.per.connection value, which defaults to 5 in most client libraries.

Tune the batching configuration using:

buffer.memory

buffer.memory is the value that controls the total amount of memory available to the producer for buffering. In a case where messages are sent faster than they can be delivered to the broker, the producer application may run out of memory, which will cause it to either block subsequent send calls or even throw an exception. The max.block.ms parameter controls the amount of time the producer will block before throwing an exception if it cannot immediately send messages to the broker.

batch.size

batch.size controls the maximum size of coupled messages that can be batched together in one request. The producer will automatically put messages being sent to the same partition into one batch. This configuration parameter is given in bytes as opposed to the number of messages.

When the producer is gathering messages to assign to a batch, at some point it will hit this byte-size limit, which triggers it to send the batch to the broker. However, the producer does not necessarily wait (for as much time as set using linger.ms) until the batch is full. Sometimes, it can even send single-message batches. This means that setting the batch size too large is not necessarily undesirable, as it won’t cause throttling when sending messages; rather, it will only cause increased memory usage.

Conversely, setting the batch size too small can cause the producer to send batches of messages faster, which can cause network overhead, meaning a reduced throughput. The default value is usually 16384, but you can set this as low as 0, which turns off batching entirely.

linger.ms

linger.ms controls the maximum amount of time the producer will wait before sending out a batch of messages, if it is not already full. This means you can somewhat force the producer to make sure that batches are being filled up as efficiently as possible.

If you are willing to tolerate some latency, setting this value to a number larger than the default of 0 will cause the producer to send fewer, more efficient batches of messages. If you set the value to 0, there is still a high chance messages arrive around the same time to be batched together.

Commonly-used producer configuration options

compression.type

compression.type controls how the producer should compress a batch of messages before sending it to the broker. The default is none, which means the batch of messages will not be compressed at all. Compression occurs on full batches, so you can improve batching throughput by setting this parameter to use one of the available compression algorithms (along with increasing batch size). The available options are: zstd, lz4, gzip, and snappy.

Serializers

Serializers are responsible for converting a message to a byte array. You can influence the speed/memory efficiency of your streaming setup by choosing one of the built-in serializers or writing a custom one. The performance consequences of using serializers is not typically significant.

For example, if you opt for the JSON serializer, you will have more data to transport with each message because every record will contain its schema in a verbose format, which impacts your compression speeds and network throughput. Alternatively, going with AVRO or Protobuf allows you to only define the schema in one place, while also enabling features like schema evolution.

Broker timestamps

Redpanda employs a unique strategy to help ensure the accuracy of retention operations. In this strategy, closed segments are only eligible for deletion when the age of all messages in the segment exceeds a configured threshold. However, when a producer sends a message to a topic, the timestamp set by the producer may not accurately reflect the time the message reaches the broker. To address this time skew, each time a producer sends a message to a topic, Redpanda records the broker’s system date and time in the broker_timestamp property of the message. This property helps maintain accurate retention policies, even when the message’s creation timestamp deviates from the broker’s time.

Configure broker timestamp alerting

Each time a broker receives a message with a skewed created timestamp that is outside a configured range, Redpanda increments the vectorized_kafka_rpc_produce_bad_create_time metric. Two cluster properties control this range. The minimum accepted value for both of these properties is five minutes. Any attempt to set a value lower than that is rejected by Redpanda. The properties are:

  • log_message_timestamp_alert_before_ms: Defines the allowed skew before the broker’s time. This check is effectively disabled when the value is set to null. Minimum: 300000 ms (5 minutes), Default: null.

  • log_message_timestamp_alert_after_ms: Defines the allowed skew after the broker’s time. There is no way to disable this check. Minimum: 300000 ms (5 minutes), Default: 7200000 ms (2 hours).

Disable broker timestamp retention

While not advised for typical use, Redpanda allows you to override the use of broker timestamps for retention policy. You accomplish this through use of the Admin API. Use the activate feature API to disable the broker_time_based_retention property.

If you disable this feature, make sure to specify your desired timestamp policy. This is stored in the log_message_timestamp_type cluster property. The timestamp policy defaults to CreateTime (client timestamp set by producer) but may be updated to LogAppendTime (server timestamp set by Redpanda).

Producer optimization strategies

You can optimize for speed (throughput and latency) or safety (durability and availability) by adjusting parameters. Finding the optimal configuration depends on your use case and requires some trial and error. The strategies described here are meant to be used as guidelines, not hard rules.

There are a wide variety of configuration options within Redpanda. The configuration options mentioned here work best when combined with other broker and consumer configuration options. For details, refer to Configuring Node Properties and Consumer Offsets.

Optimizing for speed

From a producer perspective, when you want to get data into Redpanda as quickly as possible, you can maximize throughput in a variety of ways. You can set other components’ parameters, like experimenting with the topic partition size. You can also test acks settings.

For example, the quicker a producer receives a reply from the broker that the message has been committed, the sooner it can send the next message, which generally results in higher throughput. Hence, if you set acks=1, then the leader broker would not have to wait for replication to occur, and it can reply as soon as it is finished committing the message. As mentioned earlier, this can result in less durability overall.

Another option to explore is how the producer batches messages. Increasing the value of batch.size and linger.ms can increase throughput by making the producer add more messages into one batch before sending it to the broker and waiting until the batches can properly fill up. This approach negatively impacts latency though. By contrast, if you minimize linger.ms (for example, to 0) and batch.size to 1, you can achieve lower latency, but sacrifice throughput.

Optimizing for safety

For applications where you must guarantee that there are no lost messages, duplicates, or service downtime, you can use higher durability acks settings. If you set acks=all, then the producer will wait for a majority of replicas to acknowledge the message before it can send the next message, resulting in lower latency, because there is more communication required between brokers. This approach can guarantee higher durability because the message will be replicated to all brokers.

You can also increase durability by increasing the number of retries the broker is allowed to make in case messages are not delivered successfully. The trade-off is that you may allow duplicates to enter the system and potentially alter the ordering of messages.