# read_until

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: read_until
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: connect/components/inputs/read_until
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: connect/components/inputs/read_until.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/connect/components/inputs/read_until.adoc
page-git-created-date: "2024-09-09"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/connect/components/inputs/read_until.md -->

**Available in:** Cloud, [Self-Managed](https://docs.redpanda.com/connect/components/inputs/read_until/%20%22View%20the%20Self-Managed%20version%20of%20this%20component%22)

Reads messages from a child input until a consumed message passes a [Bloblang query](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/), at which point the input closes. It is also possible to configure a timeout after which the input is closed if no new messages arrive in that period.

```yml
inputs:
  label: ""
  read_until:
    input: "" # No default (required)
    check: "" # No default (optional)
    idle_timeout: "" # No default (optional)
    restart_input: false
```

Messages are read continuously while the query check returns false, when the query returns true the message that triggered the check is sent out and the input is closed. Use this to define inputs where the stream should end once a certain message appears.

If the idle timeout is configured, the input will be closed if no new messages arrive after that period of time. Use this field if you want to empty out and close an input that doesn’t have a logical end.

Sometimes inputs close themselves. For example, when the `file` input type reaches the end of a file it will shut down. By default this type will also shut down. If you wish for the input type to be restarted every time it shuts down until the query check is met then set `restart_input` to `true`.

## [](#metadata)Metadata

A metadata key `benthos_read_until` containing the value `final` is added to the first part of the message that triggers the input to stop.

## [](#fields)Fields

### [](#check)`check`

A [Bloblang query](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/) that should return a boolean value indicating whether the input should now be closed.

**Type**: `string`

```yaml
# Examples:
check: this.type == "foo"

# ---

check: count("messages") >= 100
```

### [](#idle_timeout)`idle_timeout`

The maximum amount of time without receiving new messages after which the input is closed.

**Type**: `string`

```yaml
# Examples:
idle_timeout: 5s
```

### [](#input)`input`

The child input to consume from.

**Type**: `input`

### [](#restart_input)`restart_input`

Whether the input should be reopened if it closes itself before the condition has resolved to true.

**Type**: `bool`

**Default**: `false`

## [](#examples)Examples

### [](#consume-n-messages)Consume N Messages

A common reason to use this input is to consume only N messages from an input and then stop. This can easily be done with the [`count` function](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/functions/#count):

```yaml
# Only read 100 messages, and then exit.
input:
  read_until:
    check: count("messages") >= 100
    input:
      kafka:
        addresses: [ TODO ]
        topics: [ foo, bar ]
        consumer_group: foogroup
```

### [](#read-from-a-kafka-and-close-when-empty)Read from a kafka and close when empty

A common reason to use this input is a job that consumes all messages and exits once its empty:

```yaml
# Consumes all messages and exit when the last message was consumed 5s ago.
input:
  read_until:
    idle_timeout: 5s
    input:
      kafka:
        addresses: [ TODO ]
        topics: [ foo, bar ]
        consumer_group: foogroup
```