bloblang

Executes a Bloblang mapping on messages.

# Config fields, showing default values
label: ""
bloblang: ""

Bloblang is a powerful language that enables a wide range of mapping, transformation and filtering tasks. For more information see Bloblang.

If your mapping is large and you’d prefer for it to live in a separate file then you can execute a mapping directly from a file with the expression from "<path>", where the path must be absolute, or relative from the location that Redpanda Connect is executed from.

Component rename

This processor was recently renamed to the mapping processor in order to make the purpose of the processor more prominent. It is still valid to use the existing bloblang name but eventually it will be deprecated and replaced by the new name in example configs.

Examples

  • Mapping

  • More Mapping

Given JSON documents containing an array of fans:

{
  "id":"foo",
  "description":"a show about foo",
  "fans":[
    {"name":"bev","obsession":0.57},
    {"name":"grace","obsession":0.21},
    {"name":"ali","obsession":0.89},
    {"name":"vic","obsession":0.43}
  ]
}

We can reduce the fans to only those with an obsession score above 0.5, giving us:

{
  "id":"foo",
  "description":"a show about foo",
  "fans":[
    {"name":"bev","obsession":0.57},
    {"name":"ali","obsession":0.89}
  ]
}

With the following config:

pipeline:
  processors:
  - bloblang: |
      root = this
      root.fans = this.fans.filter(fan -> fan.obsession > 0.5)

When receiving JSON documents of the form:

{
  "locations": [
    {"name": "Seattle", "state": "WA"},
    {"name": "New York", "state": "NY"},
    {"name": "Bellevue", "state": "WA"},
    {"name": "Olympia", "state": "WA"}
  ]
}

We could collapse the location names from the state of Washington into a field Cities:

{"Cities": "Bellevue, Olympia, Seattle"}

With the following config:

pipeline:
  processors:
    - bloblang: |
        root.Cities = this.locations.
                        filter(loc -> loc.state == "WA").
                        map_each(loc -> loc.name).
                        sort().join(", ")

Error handling

Bloblang mappings can fail, in which case the message remains unchanged, errors are logged, and the message is flagged as having failed, allowing you to use standard processor error handling patterns.

However, Bloblang itself also provides powerful ways of ensuring your mappings do not fail by specifying desired fallback behavior, which you can read about in Error handling.