Configure Producers
Along with brokers, topics, and consumers, producers are one of the essential components of a Redpanda deployment. Within streaming applications, producers are responsible for sending data to brokers, where the data is stored in topics made up of partitions. Producers are client applications that write data to Redpanda in the form of events. Producers send messages to a topic in the cluster. Producers communicate with Redpanda through the Kafka API.
When a producer publishes a message to a Redpanda cluster, it sends it to a specific partition. Every event consists of a key and value. To select which partition to produce to, if the key is blank the producer publishes in a round-robin fashion between the topic’s partitions. If a key is provided, then the partitioner hashes the key using the murmur2 algorithm and modulates across the number of partitions.
Producer acknowledgment defines how producer clients and broker leaders
communicate their status while transferring data. The acks
(abbreviation of
"acknowledgments") parameter controls the behavior of the producer when a message
is acknowledged by the broker, and is useful when you are working with
Redpanda clusters.
While it may seem logical that the producer should wait for the broker to acknowledge the message in every circumstance, there are cases when you can gain more by letting the messages fly to the broker, unconfirmed.
Sometimes trading speed for durability is the best strategy, but sometimes you may want it the other way around. A message is considered "acknowledged" when replication of the message to the majority of other brokers responsible for the partition in the cluster is complete.
Producer configuration options
There are a myriad of client configuration options for all components of a cluster. Finding the optimal combination of these components depends largely on your specific use case. The following sections describe producer configuration options, and how to combine these options with other component configurations for optimal results.
Producer acknowledgement settings
acks=0
The producer doesn’t wait for acknowledgments from the leader and doesn’t retry sending messages. This increases throughput and lowers latency of the system at the expense of durability and data loss.
This option allows a producer to immediately consider a message acknowledged when it is sent to the Redpanda broker. This means that a producer does not have to wait for any kind of response from the Redpanda broker. This is the least safe option because a leader-node crash can cause data loss if the data has not yet replicated to the other nodes in the replica set. However, this setting is useful in cases where you want to optimize for the highest throughput, and are willing to risk some data loss.
Because of the lack of guarantees, this setting is the most network bandwidth- efficient. This makes it useful for use cases like IoT/sensor data collection, where updates are periodic or stateless and you can afford some degree of data loss, but prefer gathering as much data as possible in a given time interval.
acks=1
The producer waits for an acknowledgment from the leader, but it doesn’t wait
for the leader to get acknowledgments from followers. This setting doesn’t
prioritize throughput, latency, or durability. Instead, acks=1
attempts to
provide a balance between all of them.
Replication is not guaranteed with this setting because it happens in the background, after the acknowledgement is sent to the producer from the leader broker. This setting could result in data loss if the leader broker crashes before any followers manage to replicate the message.
acks=all
The producer receives an acknowledgment after the majority of (implicitly, all) replicas acknowledge the message. This increases durability at the expense of lower throughput and increased latency.
Sometimes referred to as acks = -1
, this option instructs the broker that
replication is considered complete when the message has been replicated (and
fsynced, meaning flushed to disk, making it durable in case of broker crashes)
to the majority of the brokers responsible for the partition in the cluster. As
soon as the fsync call is complete, the message is considered acknowledged and
is made visible to readers.
This parameter has an important distinction compared to Kafka’s behavior. In
Kafka, a message is considered acknowledged without the requirement that it has
been fsynced. Messages that have not been flushed to disk will be lost in the
event of a broker crash. So when using acks=all , the Redpanda default
configuration is more resilient than Kafka’s.
|
retries
This parameter controls the number of times a message is re-sent to the broker
if the broker fails to acknowledge it. This is essentially the same
as if the client application resends the erroneous message after receiving an
error response. The default value of retries
in most client libraries is 0.
This means that if the send fails, the message is not re-sent at all.
If you increase this number to a higher value, be sure to take a look at the
max.in.flight.requests.per.connection
value as well, as leaving that parameter
at its default value can potentially cause ordering issues in the target topic
where the messages arrive. This occurs if two batches are sent to a single
partition and the first fails and is retired, but the second succeeds so the
records in the second batch may appear first.
max.in.flight.requests.per.connection
This parameter controls how many unacknowledged messages are allowed to be sent to the broker simultaneously at any given time. The default value is
-
If you set this parameter to 1, then the producer will not send any more messages until the previous one is either acknowledged or an error happens, which can prompt a retry. If you set this parameter to a value higher than 1, then the producer will send more messages at the same time, which can help to increase throughput, but add a risk of message reordering if retries are enabled.
In cases when you configure the producer to be idempotent. up to five requests can be guaranteed to be in flight with the order preserved.
enable.idempotence
To enable idempotence, set enable.idempotence
to true
(the default) in your
Redpanda configuration.
When idempotence is enabled, the producer will try to make sure that exactly one
copy of every message is written to the broker. When set to false
, the producer
retries sending a message for any reason (such as transient errors like brokers
not being available or not enough replicas exception), it can lead to duplicates.
In most client libraries enable.idempotence
is set to true by default.
Internally, this is implemented using a special identifier that is assigned to
every producer (the producer ID or “PID”). This ID, along with a "sequence
number", is included in every message that is being sent to the broker. The
broker will check if the PID/sequence number combination is larger than the
previous one and, if not, it will discard the message.
To guarantee true idempotent behavior, you must also set acks=all
to ensure
all brokers record messages in order, even in the event of node failures.
In this configuration, both the producer and the broker prefer safety and
durability over throughput.
Idempotence is only guaranteed within a session. A session starts once the producer is instantiated and a connection is established between the client and Redpanda broker. When the connection is closed, the session ends.
If your application code retries a request, the producer client will assign a new ID to that request, which may lead to duplicate messages.
Message batching
Batching is an efficient way to save on both network bandwidth and disk size as messages can be compressed easier.
When a producer prepares to send messages to a broker, it first fills up a
buffer. When this buffer is full, the producer compresses (if instructed to do
so) and sends out this batch of messages to the broker. The number of batches
that can be sent in a single request to the broker is limited by the
max.request.size
parameter. The number of requests that can simultaneously be
in this "sending" state is controlled by the
max.in.flight.requests.per.connection
value, which defaults to 5 in most
client libraries.
Tune the batching configuration using:
buffer.memory
buffer.memory
is the value that controls the total amount of memory available
to the producer for buffering. In a case where messages are sent faster than
they can be delivered to the broker, the producer application may run out of
memory, which will cause it to either block subsequent send calls or even throw
an exception. The max.block.ms
parameter controls the amount of time the
producer will block before throwing an exception if it cannot immediately send
messages to the broker.
batch.size
batch.size
controls the maximum size of coupled messages that can be batched
together in one request. The producer will automatically put messages being sent
to the same partition into one batch. This configuration parameter is given in
bytes as opposed to the number of messages.
When the producer is gathering messages to assign to a batch, at some point it
will hit this byte-size limit, which triggers it to send the batch to the broker.
However, the producer does not necessarily wait (for as much time as set using
linger.ms
) until the batch is full. Sometimes, it can even send single-message
batches. This means that setting the batch size too large is not necessarily
undesirable, as it won’t cause throttling when sending messages; rather, it will
only cause increased memory usage.
Conversely, setting the batch size too small can cause the producer to send batches of messages faster, which can cause network overhead, meaning a reduced throughput. The default value is usually 16384, but you can set this as low as 0, which turns off batching entirely.
linger.ms
linger.ms
controls the maximum amount of time the producer will wait before
sending out a batch of messages, if it is not already full. This means you can
somewhat force the producer to make sure that batches are being filled up as
efficiently as possible.
If you are willing to tolerate some latency, setting this value to a number
larger than the default of 0
will cause the producer to send fewer, more
efficient batches of messages. If you set the value to 0
, there is still a
high chance messages arrive around the same time to be batched together.
Commonly-used producer configuration options
compression.type
compression.type
controls how the producer should compress a batch of messages
before sending it to the broker. The default is none
, which means the batch of
messages will not be compressed at all. Compression occurs on full batches, so
you can improve batching throughput by setting this parameter to use one of the
available compression algorithms (along with increasing batch size). The
available options are: zstd
, lz4
, gzip
, and snappy
.
Serializers
Serializers are responsible for converting a message to a byte array. You can influence the speed/memory efficiency of your streaming setup by choosing one of the built-in serializers or writing a custom one. The performance consequences of using serialiers is not typically significant.
For example, if you opt for the JSON serializer, you will have more data to transport with each message because every record will contain its schema in a verbose format, which impacts your compression speeds and network throughput. Alternatively, going with AVRO or Protobuf allows you to only define the schema in one place, while also enabling features like schema evolution.
Producer optimization strategies
You can optimize for speed (throughput and latency) or safety (durability and availability) by adjusting parameters. Finding the optimal configuration depends on your use case and requires some trial and error. The strategies described here are meant to be used as guidelines, not hard rules.
There are a wide variety of configuration options within Redpanda. The configuration options mentioned here work best when combined with other broker and consumer configuration options. For details, refer to Configuring Node Properties and Consumer Offsets.
Optimizing for speed
From a producer perspective, when you want to get data into Redpanda as quickly as possible, you can maximize throughput in a variety of ways. You can set other components’ parameters, like experimenting with the topic partition size. You can also test acks settings.
For example, the quicker a producer receives a reply from the broker that the
message has been committed, the sooner it can send the next message, which
generally results in higher throughput. Hence, if you set acks=1
, then the
leader broker would not have to wait for replication to occur, and it can reply
as soon as it is finished committing the message. As mentioned earlier, this
can result in less durability overall.
Another option to explore is how the producer batches messages. Increasing the
value of batch.size
and linger.ms
can increase throughput by making the
producer add more messages into one batch before sending it to the broker and
waiting until the batches can properly fill up. This approach negatively impacts
latency though. By contrast, if you minimize linger.ms
(for example, to 0
)
and batch.size
to 1
, you can achieve lower latency, but sacrifice throughput.
Optimizing for safety
For applications where you must guarantee that there are no lost messages,
duplicates, or service downtime, you can use higher durability acks
settings.
If you set acks=all
, then the producer will wait for a majority of replicas to
acknowledge the message before it can send the next message, resulting in lower
latency, because there is more communication required between brokers. This
approach can guarantee higher durability because the message will be replicated
to all brokers.
You can also increase durability by increasing the number of retries the broker is allowed to make in case messages are not delivered successfully. The trade-off is that you may allow duplicates to enter the system and potentially alter the ordering of messages.