command

Executes a command for each message.

Introduced in version 4.21.0.

# Config fields, showing default values
label: ""
command:
  name: bash # No default (required)
  args_mapping: '[ "-c", this.script_path ]' # No default (optional)

The specified command is executed for each message processed, with the raw bytes of the message being fed into the stdin of the command process, and the resulting message having its contents replaced with the stdout of it.

Performance

Since this processor executes a new process for each message performance will likely be an issue for high throughput streams. If this is the case then consider using the subprocess processor instead as it keeps the underlying process alive long term and uses codecs to insert and extract inputs and outputs to it via stdin/stdout.

Error handling

If a non-zero error code is returned by the command then an error containing the entirety of stderr (or a generic message if nothing is written) is set on the message. These failed messages will continue through the pipeline unchanged, but can be dropped or placed in a dead letter queue according to your config, you can read about these patterns.

If the command is successful but stderr is written to then a metadata field command_stderr is populated with its contents.

Fields

name

The name of the command to execute. This field supports interpolation functions.

Type: string

# Examples

name: bash

name: go

name: ${! @command }

args_mapping

An optional Bloblang mapping that, when specified, should resolve into an array of arguments to pass to the command. Command arguments are expressed this way in order to support dynamic behavior.

Type: string

# Examples

args_mapping: '[ "-c", this.script_path ]'

Examples

  • Cron Scheduled Command

  • Dynamic Command Execution

This example uses a generate input to trigger a command on a cron schedule:

input:
  generate:
    interval: '0,30 */2 * * * *'
    mapping: 'root = ""' # Empty string as we do not need to pipe anything to stdin
  processors:
    - command:
        name: df
        args_mapping: '[ "-h" ]'

This example config takes structured messages of the form {"command":"echo","args":["foo"]} and uses their contents to execute the contained command and arguments dynamically, replacing its contents with the command result printed to stdout:

pipeline:
  processors:
    - command:
        name: ${! this.command }
        args_mapping: 'this.args'