cache
Performs operations against a cache resource for each message, allowing you to store or retrieve data within message payloads.
-
Common
-
Advanced
processors:
label: ""
cache:
resource: "" # No default (required)
operator: "" # No default (required)
key: "" # No default (required)
value: "" # No default (optional)
processors:
label: ""
cache:
resource: "" # No default (required)
operator: "" # No default (required)
key: "" # No default (required)
value: "" # No default (optional)
ttl: "" # No default (optional)
For use cases where you wish to cache the result of processors, consider using the cached processor instead.
This processor will interpolate functions within the key and value fields individually for each message. This allows you to specify dynamic keys and values based on the contents of the message payloads and metadata. You can find a list of functions in Bloblang queries.
Examples
Deduplication
Deduplication can be done using the add operator with a key extracted from the message payload, since it fails when a key already exists we can remove the duplicates using a mapping processor:
pipeline:
processors:
- cache:
resource: foocache
operator: add
key: '${! json("message.id") }'
value: "storeme"
- mapping: root = if errored() { deleted() }
cache_resources:
- label: foocache
redis:
url: tcp://TODO:6379
Deduplication Batch-Wide
Sometimes it’s necessary to deduplicate a batch of messages (also known as a window) by a single identifying value. This can be done by introducing a branch processor, which executes the cache only once on behalf of the batch, in this case with a value make from a field extracted from the first and last messages of the batch:
pipeline:
processors:
# Try and add one message to a cache that identifies the whole batch
- branch:
request_map: |
root = if batch_index() == 0 {
json("id").from(0) + json("meta.tail_id").from(-1)
} else { deleted() }
processors:
- cache:
resource: foocache
operator: add
key: ${! content() }
value: t
# Delete all messages if we failed
- mapping: |
root = if errored().from(0) {
deleted()
}
Hydration
It’s possible to enrich payloads with content previously stored in a cache by using the branch processor:
pipeline:
processors:
- branch:
processors:
- cache:
resource: foocache
operator: get
key: '${! json("message.document_id") }'
result_map: 'root.message.document = this'
# NOTE: If the data stored in the cache is not valid JSON then use
# something like this instead:
# result_map: 'root.message.document = content().string()'
cache_resources:
- label: foocache
memcached:
addresses: [ "TODO:11211" ]
Fields
operator
The operation to perform with the cache.
Type: string
Options: set, add, get, delete, exists
ttl
The time to live (TTL) of each individual item as a duration string. After this period an item will be eligible for removal during the next compaction. Not all caches support per-key TTLs, those that do will have a configuration field default_ttl, and those that do not will fall back to their generally configured TTL setting.
This field supports interpolation functions.
Requires version 3.33.0 or later.
Type: string
# Examples:
ttl: 60s
# ---
ttl: 5m
# ---
ttl: 36h
value
A value to use with the cache (when applicable). This field supports interpolation functions.
Type: string
Operators
add
Set a key in the cache to a value. If the key already exists the action fails with a 'key already exists' error, which can be detected with processor error handling.
get
Retrieve the contents of a cached key and replace the original message payload with the result. If the key does not exist the action fails with an error, which can be detected with processor error handling.