read_until

Reads messages from a child input until a consumed message passes a Bloblang query, at which point the input closes. It is also possible to configure a timeout after which the input is closed if no new messages arrive in that period.

# Config fields, showing default values
input:
  label: ""
  read_until:
    input: null # No default (required)
    check: this.type == "foo" # No default (optional)
    idle_timeout: 5s # No default (optional)
    restart_input: false

Messages are read continuously while the query check returns false, when the query returns true the message that triggered the check is sent out and the input is closed. Use this to define inputs where the stream should end once a certain message appears.

If the idle timeout is configured, the input will be closed if no new messages arrive after that period of time. Use this field if you want to empty out and close an input that doesn’t have a logical end.

Sometimes inputs close themselves. For example, when the file input type reaches the end of a file it will shut down. By default this type will also shut down. If you wish for the input type to be restarted every time it shuts down until the query check is met then set restart_input to true.

Metadata

A metadata key benthos_read_until containing the value final is added to the first part of the message that triggers the input to stop.

Fields

input

The child input to consume from.

Type: input

check

A Bloblang query that should return a boolean value indicating whether the input should now be closed.

Type: string

# Examples

check: this.type == "foo"

check: count("messages") >= 100

idle_timeout

The maximum amount of time without receiving new messages after which the input is closed.

Type: string

# Examples

idle_timeout: 5s

restart_input

Whether the input should be reopened if it closes itself before the condition has resolved to true.

Type: bool

Default: false

Examples

  • Consume N Messages

  • Read from a kafka and close when empty

A common reason to use this input is to consume only N messages from an input and then stop. This can easily be done with the count function:

# Only read 100 messages, and then exit.
input:
  read_until:
    check: count("messages") >= 100
    input:
      kafka:
        addresses: [ TODO ]
        topics: [ foo, bar ]
        consumer_group: foogroup

A common reason to use this input is a job that consumes all messages and exits once its empty:

# Consumes all messages and exit when the last message was consumed 5s ago.
input:
  read_until:
    idle_timeout: 5s
    input:
      kafka:
        addresses: [ TODO ]
        topics: [ foo, bar ]
        consumer_group: foogroup