Docs Connect Components Inputs Inputs An input is a source of data piped through an array of optional processors: input: label: my_redis_input redis_streams: url: tcp://localhost:6379 streams: - benthos_stream body_key: body consumer_group: benthos_group # Optional list of processing steps processors: - mapping: | root.document = this.without("links") root.link_count = this.links.length() Some inputs have a logical end, for example a csv input ends once the last row is consumed, when this happens the input gracefully terminates and Redpanda Connect will shut itself down once all messages have been processed fully. It’s also possible to specify a logical end for an input that otherwise doesn’t have one with the read_until input, which checks a condition against each consumed message in order to determine whether it should be the last. Brokering Only one input is configured at the root of a Redpanda Connect config. However, the root input can be a broker which combines multiple inputs and merges the streams: input: broker: inputs: - kafka: addresses: [ TODO ] topics: [ foo, bar ] consumer_group: foogroup - redis_streams: url: tcp://localhost:6379 streams: - benthos_stream body_key: body consumer_group: benthos_group Labels Inputs have an optional field label that can uniquely identify them in observability data such as metrics and logs. This can be useful when running configs with multiple inputs, otherwise their metrics labels will be generated based on their composition. For more information check out the metrics documentation. Sequential reads Sometimes it’s useful to consume a sequence of inputs, where an input is only consumed once its predecessor is drained fully, you can achieve this with the sequence input. Generating messages It’s possible to generate data with Redpanda Connect using the generate input, which is also a convenient way to trigger scheduled pipelines. Categories Services AWS Azure Utility Integration Local Social GCP Network Inputs that consume from storage or message streaming services. amqp_0_9 amqp_1 AWS Kinesis AWS S3 AWS SQS Azure Blob Storage Azure Queue Storage Azure Table Storage beanstalkd Cassandra Discord gcp_bigquery_select GCP Cloud Storage GCP Pub/Sub HDFS Kafka Franz-go MongoDB MQTT NATS nats_kv nats_jetstream nats_stream NSQ ockam_kafka postgres_cdc Pulsar redis_list redis_pubsub redis_scan redis_streams redpanda redpanda_common redpanda_migrator redpanda_migrator_bundle spicedb_watch splunk sql_raw sql_select timeplus X/Twitter Inputs that consume from Amazon Web Services products. AWS Kinesis AWS S3 AWS SQS Inputs that consume from Microsoft Azure services. Azure Blob Storage Azure Cosmos DB Azure Queue Storage Azure Table Storage Inputs that provide utility by generating data or combining/wrapping other inputs. batched broker dynamic generate inproc read_until resource sequence subprocess CockroachDB Schema Registry Inputs that consume from the local machine/filesystem. CSV file Parquet stdin Inputs that consume from social applications and services. Discord X/Twitter Inputs that consume from Google Cloud Platform services. gcp_bigquery_select GCP Cloud Storage GCP Pub/Sub Inputs that consume directly from low level network protocols. http_client http_server nanomsg sftp Socket socket_server WebSocket zmq4 Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution HTTP amqp_0_9