Docs Connect Components Processors dedupe dedupe Deduplicates messages by storing a key value in a cache using the add operator. If the key already exists within the cache it is dropped. # Config fields, showing default values label: "" dedupe: cache: "" # No default (required) key: ${! meta("kafka_key") } # No default (required) drop_on_err: true Caches must be configured as resources, for more information check out the cache documentation. When using this processor with an output target that might fail you should always wrap the output within an indefinite retry block. This ensures that during outages your messages aren’t reprocessed after failures, which would result in messages being dropped. Batch deduplication This processor enacts on individual messages only, in order to perform a deduplication on behalf of a batch (or window) of messages instead use the cache processor. Delivery guarantees Performing deduplication on a stream using a distributed cache voids any at-least-once guarantees that it previously had. This is because the cache will preserve message signatures even if the message fails to leave the Redpanda Connect pipeline, which would cause message loss in the event of an outage at the output sink followed by a restart of the Redpanda Connect instance (or a server crash, etc). This problem can be mitigated by using an in-memory cache and distributing messages to horizontally scaled Redpanda Connect pipelines partitioned by the deduplication key. However, in situations where at-least-once delivery guarantees are important it is worth avoiding deduplication in favour of implement idempotent behavior at the edge of your stream pipelines. Fields cache The cache resource to target with this processor. Type: string key An interpolated string yielding the key to deduplicate by for each message. This field supports interpolation functions. Type: string # Examples key: ${! meta("kafka_key") } key: ${! content().hash("xxhash64") } drop_on_err Whether messages should be dropped when the cache returns a general error such as a network issue. Type: bool Default: true Examples Deduplicate based on Kafka key The following configuration demonstrates a pipeline that deduplicates messages based on the Kafka key. pipeline: processors: - dedupe: cache: keycache key: ${! meta("kafka_key") } cache_resources: - label: keycache memory: default_ttl: 60s Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution decompress for_each