Bloblang Functions
Functions can be placed anywhere and allow you to extract information from your environment, generate values, or access data from the underlying message being mapped:
root.doc.id = uuid_v4()
root.doc.received_at = now()
root.doc.host = hostname()
Functions support both named and nameless style arguments:
root.values_one = range(start: 0, stop: this.max, step: 2)
root.values_two = range(0, this.max, 2)
# In: {"max":10}
batch_index
Returns the zero-based index of the current message within its batch. Use this to conditionally process messages based on their position, or to create sequential identifiers within a batch.
bytes
Creates a zero-initialized byte array of specified length. Use this to allocate fixed-size byte buffers for binary data manipulation or to generate padding.
content
Returns the raw message payload as bytes, regardless of the current mapping context. Use this to access the original message when working within nested contexts, or to store the entire message as a field.
count
|
This method is deprecated and will be removed in a future version. |
The count function is a counter starting at 1 which increments after each time it is called. Count takes an argument which is an identifier for the counter, allowing you to specify multiple unique counters in your configuration.
counter
Generates an incrementing sequence of integers starting from a minimum value (default 1). Each counter instance maintains its own independent state across message processing. When the maximum value is reached, the counter automatically resets to the minimum.
Parameters
| Name | Type | Description |
|---|---|---|
|
|
The starting value of the counter. This is the first value yielded. Evaluated once when the mapping is initialized. |
|
|
The maximum value before the counter resets to min. Evaluated once when the mapping is initialized. |
|
|
An optional query that controls counter behavior: when it resolves to a non-negative integer, the counter is set to that value; when it resolves to |
Examples
Generate sequential IDs for each message:
root.id = counter()
# In: {}
# Out: {"id":1}
# In: {}
# Out: {"id":2}
Use a custom range for the counter:
root.batch_num = counter(min: 100, max: 200)
# In: {}
# Out: {"batch_num":100}
# In: {}
# Out: {"batch_num":101}
Increment a counter multiple times within a single mapping using a named map:
map increment {
root = counter()
}
root.first_id = null.apply("increment")
root.second_id = null.apply("increment")
# In: {}
# Out: {"first_id":1,"second_id":2}
# In: {}
# Out: {"first_id":3,"second_id":4}
Conditionally reset a counter based on input data:
root.streak = counter(set: if this.status != "success" { 0 })
# In: {"status":"success"}
# Out: {"streak":1}
# In: {"status":"success"}
# Out: {"streak":2}
# In: {"status":"failure"}
# Out: {"streak":0}
# In: {"status":"success"}
# Out: {"streak":1}
Peek at the current counter value without incrementing by using null in the set parameter:
root.count = counter(set: if this.peek { null })
# In: {"peek":false}
# Out: {"count":1}
# In: {"peek":false}
# Out: {"count":2}
# In: {"peek":true}
# Out: {"count":2}
# In: {"peek":false}
# Out: {"count":3}
deleted
Returns a deletion marker that removes the target field or message. When applied to root, the entire message is dropped while still being acknowledged as successfully processed. Use this to filter data or conditionally remove fields.
Examples
root = this
root.bar = deleted()
# In: {"bar":"bar_value","baz":"baz_value","foo":"foo value"}
# Out: {"baz":"baz_value","foo":"foo value"}
Filter array elements by returning deleted for unwanted items:
root.new_nums = this.nums.map_each(num -> if num < 10 { deleted() } else { num - 10 })
# In: {"nums":[3,11,4,17]}
# Out: {"new_nums":[1,7]}
env
Reads an environment variable and returns its value as a string. Returns null if the variable is not set. By default, values are cached for performance.
error
Returns the error message string if the message has failed processing, otherwise null. Use this in error handling pipelines to log or route failed messages based on their error details.
error_source_label
Returns the user-defined label of the component that caused the error, empty string if no label is set, or null if the message has no error. Use this for more human-readable error tracking when components have custom labels.
error_source_name
Returns the component name that caused the error, or null if the message has no error or the error has no associated component. Use this to identify which processor or component in your pipeline caused a failure.
error_source_path
Returns the dot-separated path to the component that caused the error, or null if the message has no error. Use this to identify the exact location of a failed component in nested pipeline configurations.
errored
Returns true if the message has failed processing, false otherwise. Use this for conditional logic in error handling workflows or to route failed messages to dead letter queues.
fake
Generates realistic fake data for testing and development purposes. Supports a wide variety of data types including personal information, network addresses, dates/times, financial data, and UUIDs. Useful for creating mock data, populating test databases, or anonymizing sensitive information.
Supported functions: latitude, longitude, unix_time, date, time_string, month_name, year_string, day_of_week, day_of_month, timestamp, century, timezone, time_period, email, mac_address, domain_name, url, username, ipv4, ipv6, password, jwt, word, sentence, paragraph, cc_type, cc_number, currency, amount_with_currency, title_male, title_female, first_name, first_name_male, first_name_female, last_name, name, gender, chinese_first_name, chinese_last_name, chinese_name, phone_number, toll_free_phone_number, e164_phone_number, uuid_hyphenated, uuid_digit.
Parameters
| Name | Type | Description |
|---|---|---|
|
|
The name of the faker function to use. See description for full list of supported functions. |
Examples
Generate fake user profile data for testing:
root.user = {
"id": fake("uuid_hyphenated"),
"name": fake("name"),
"email": fake("email"),
"created_at": fake("timestamp")
}
Create realistic test data for network monitoring:
root.event = {
"source_ip": fake("ipv4"),
"mac_address": fake("mac_address"),
"url": fake("url")
}
file
Reads a file and returns its contents as bytes. Paths are resolved from the process working directory. For paths relative to the mapping file, use file_rel. By default, files are cached after first read.
file_rel
Reads a file and returns its contents as bytes. Paths are resolved relative to the mapping file’s directory, making it portable across different environments. By default, files are cached after first read.
Parameters
| Name | Type | Description |
|---|---|---|
|
|
The path to the file, relative to the mapping file’s directory. |
|
|
Disable caching to read the latest file contents on each invocation. |
Examples
root.schema = file_rel("./schemas/user.json").parse_json()
root.lookup = file_rel("../data/lookup.csv").parse_csv()
Use no_cache to read updated file contents during runtime, useful for reloading data files without restarting:
root.translations = file_rel(path: "./i18n/en.yaml", no_cache: true).parse_yaml()
json
Returns a field from the original JSON message by dot path, always accessing the root document regardless of mapping context. Use this to reference the source message when working in nested contexts or to extract specific fields.
ksuid
Generates a K-Sortable Unique Identifier with built-in timestamp ordering. Use this for distributed unique IDs that sort chronologically and remain collision-resistant without coordination between generators.
meta
|
This method is deprecated and will be removed in a future version. |
Returns the value of a metadata key from the input message as a string, or null if the key does not exist. Since values are extracted from the read-only input message they do NOT reflect changes made from within the map. In order to query metadata mutations made within a mapping use the root_meta function. This function supports extracting metadata from other messages of a batch with the from method.
metadata
Returns metadata from the input message by key, or null if the key doesn’t exist. This reads the original metadata; to access modified metadata during mapping, use the @ operator instead. Use this to extract message properties like topics, headers, or timestamps.
Examples
root.topic = metadata("kafka_topic")
Retrieve all metadata as an object by omitting the key parameter:
root.all_metadata = metadata()
Copy specific metadata fields to the message body:
root.source = {
"topic": metadata("kafka_topic"),
"partition": metadata("kafka_partition"),
"timestamp": metadata("kafka_timestamp_unix")
}
nanoid
Generates a URL-safe unique identifier using Nano ID. Use this for compact, URL-friendly IDs with good collision resistance. Customize the length (default 21) or provide a custom alphabet for specific character requirements.
now
Returns the current timestamp as an RFC 3339 formatted string with nanosecond precision. Use this to add processing timestamps to messages or measure time between events. Chain with ts_format to customize the format or timezone.
random_int
Generates a pseudo-random non-negative 64-bit integer. Use this for creating random IDs, sampling data, or generating test values. Provide a seed for reproducible randomness, or use a dynamic seed like timestamp_unix_nano() for unique values per mapping instance.
Optional min and max parameters constrain the output range (both inclusive). For dynamic ranges based on message data, use the modulo operator instead: random_int() % dynamic_max + dynamic_min.
Parameters
| Name | Type | Description |
|---|---|---|
|
|
A seed to use, if a query is provided it will only be resolved once during the lifetime of the mapping. |
|
|
The minimum value the random generated number will have. The default value is 0. |
|
|
The maximum value the random generated number will have. The default value is 9223372036854775806 (math.MaxInt64 - 1). |
Examples
root.first = random_int()
root.second = random_int(1)
root.third = random_int(max:20)
root.fourth = random_int(min:10, max:20)
root.fifth = random_int(timestamp_unix_nano(), 5, 20)
root.sixth = random_int(seed:timestamp_unix_nano(), max:20)
Use a dynamic seed for unique random values per mapping instance:
root.random_id = random_int(timestamp_unix_nano())
root.sample_percent = random_int(seed: timestamp_unix_nano(), min: 0, max: 100)
range
Creates an array of integers from start (inclusive) to stop (exclusive) with an optional step. Use this to generate sequences for iteration, indexing, or creating numbered lists.
Parameters
| Name | Type | Description |
|---|---|---|
|
|
The start value. |
|
|
The stop value. |
|
|
The step value. |
Examples
root.a = range(0, 10)
root.b = range(start: 0, stop: this.max, step: 2) # Using named params
root.c = range(0, -this.max, -2)
# In: {"max":10}
# Out: {"a":[0,1,2,3,4,5,6,7,8,9],"b":[0,2,4,6,8],"c":[0,-2,-4,-6,-8]}
Generate a sequence for batch processing:
root.pages = range(0, this.total_items, 100).map_each(offset -> {
"offset": offset,
"limit": 100
})
# In: {"total_items":250}
# Out: {"pages":[{"limit":100,"offset":0},{"limit":100,"offset":100}]}
root_meta
|
This method is deprecated and will be removed in a future version. |
Returns the value of a metadata key from the new message being created as a string, or null if the key does not exist. Changes made to metadata during a mapping will be reflected by this function.
snowflake_id
Generates a unique, time-ordered Snowflake ID. Snowflake IDs are 64-bit integers that encode timestamp, node ID, and sequence information, making them ideal for distributed systems where sortable unique identifiers are needed. Returns a string representation of the ID.
throw
Immediately fails the mapping with a custom error message. Use this to halt processing when data validation fails or required fields are missing, causing the message to be routed to error handlers.
Parameters
| Name | Type | Description |
|---|---|---|
|
|
A string explanation for why an error was thrown, this will be added to the resulting error message. |
Examples
root.doc.type = match {
this.exists("header.id") => "foo"
this.exists("body.data") => "bar"
_ => throw("unknown type")
}
root.doc.contents = (this.body.content | this.thing.body)
# In: {"header":{"id":"first"},"thing":{"body":"hello world"}}
# Out: {"doc":{"contents":"hello world","type":"foo"}}
# In: {"nothing":"matches"}
# Out: Error("failed assignment (line 1): unknown type")
Validate required fields before processing:
root = if this.exists("user_id") {
this
} else {
throw("missing required field: user_id")
}
# In: {"user_id":123,"name":"alice"}
# Out: {"name":"alice","user_id":123}
# In: {"name":"bob"}
# Out: Error("failed assignment (line 1): missing required field: user_id")
timestamp_unix
Returns the current Unix timestamp in seconds since epoch. Use this for numeric timestamps compatible with most systems, or as a seed for random number generation.
timestamp_unix_micro
Returns the current Unix timestamp in microseconds since epoch. Use this for high-precision timing measurements or when microsecond resolution is required.
timestamp_unix_nano
Returns the current Unix timestamp in nanoseconds since epoch. Use this for the highest precision timing or as a unique seed value that changes on every invocation.
tracing_span
Returns the OpenTelemetry tracing span attached to the message as a text map object, or null if no span exists. Use this to propagate trace context to downstream systems via headers or metadata.
ulid
Generates a Universally Unique Lexicographically Sortable Identifier (ULID). ULIDs are 128-bit identifiers that are sortable by creation time, URL-safe, and case-insensitive. They consist of a 48-bit timestamp (millisecond precision) and 80 bits of randomness, making them ideal for distributed systems that need time-ordered unique IDs without coordination.
Parameters
| Name | Type | Description |
|---|---|---|
|
|
Encoding format for the ULID. "crockford" produces 26-character Base32 strings (recommended). "hex" produces 32-character hexadecimal strings. |
|
|
Randomness source: "secure_random" uses cryptographically secure random (recommended for production), "fast_random" uses faster but non-secure random (only for non-sensitive testing). |
uuid_v7
Generates a time-ordered UUID version 7 with millisecond timestamp precision. Use this for sortable unique identifiers that maintain chronological ordering, ideal for database keys or event IDs. Optionally specify a custom timestamp.
with_schema_registry_header
Prepends a Confluent Schema Registry wire format header to message bytes. The header is 5 bytes: a magic byte (0x00) followed by a 4-byte big-endian schema ID. This format is required when producing messages to Kafka topics that use Confluent Schema Registry for schema validation and evolution.