Docs Cloud Redpanda Connect Components Inputs read_until read_until Available in: Cloud, Self-Managed Reads messages from a child input until a consumed message passes a Bloblang query, at which point the input closes. It is also possible to configure a timeout after which the input is closed if no new messages arrive in that period. # Config fields, showing default values input: label: "" read_until: input: null # No default (required) check: this.type == "foo" # No default (optional) idle_timeout: 5s # No default (optional) restart_input: false Messages are read continuously while the query check returns false, when the query returns true the message that triggered the check is sent out and the input is closed. Use this to define inputs where the stream should end once a certain message appears. If the idle timeout is configured, the input will be closed if no new messages arrive after that period of time. Use this field if you want to empty out and close an input that doesn’t have a logical end. Sometimes inputs close themselves. For example, when the file input type reaches the end of a file it will shut down. By default this type will also shut down. If you wish for the input type to be restarted every time it shuts down until the query check is met then set restart_input to true. Metadata A metadata key benthos_read_until containing the value final is added to the first part of the message that triggers the input to stop. Fields input The child input to consume from. Type: input check A Bloblang query that should return a boolean value indicating whether the input should now be closed. Type: string # Examples check: this.type == "foo" check: count("messages") >= 100 idle_timeout The maximum amount of time without receiving new messages after which the input is closed. Type: string # Examples idle_timeout: 5s restart_input Whether the input should be reopened if it closes itself before the condition has resolved to true. Type: bool Default: false Examples Consume N Messages Read from a kafka and close when empty A common reason to use this input is to consume only N messages from an input and then stop. This can easily be done with the count function: # Only read 100 messages, and then exit. input: read_until: check: count("messages") >= 100 input: kafka: addresses: [ TODO ] topics: [ foo, bar ] consumer_group: foogroup A common reason to use this input is a job that consumes all messages and exits once its empty: # Consumes all messages and exit when the last message was consumed 5s ago. input: read_until: idle_timeout: 5s input: kafka: addresses: [ TODO ] topics: [ foo, bar ] consumer_group: foogroup Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution nats_kv redis_list