Configuration

Redpanda Connect pipelines are configured in a YAML file that consists of a number of root sections, arranged like so:

  • Common

  • Full

input:
  kafka:
    addresses: [ TODO ]
    topics: [ foo, bar ]
    consumer_group: foogroup

pipeline:
  processors:
  - mapping: |
      root.message = this
      root.meta.link_count = this.links.length()

output:
  aws_s3:
    bucket: TODO
    path: '${! meta("kafka_topic") }/${! json("message.id") }.json'
http:
  address: 0.0.0.0:4195
  debug_endpoints: false

input:
  kafka:
    addresses: [ TODO ]
    topics: [ foo, bar ]
    consumer_group: foogroup

buffer:
  none: {}

pipeline:
  processors:
  - mapping: |
      root.message = this
      root.meta.link_count = this.links.length()

output:
  aws_s3:
    bucket: TODO
    path: '${! meta("kafka_topic") }/${! json("message.id") }.json'

input_resources: []
cache_resources: []
processor_resources: []
rate_limit_resources: []
output_resources: []

logger:
  level: INFO
  static_fields:
    '@service': benthos

metrics:
  prometheus: {}

tracer:
  none: {}

shutdown_timeout: 20s
shutdown_delay: ""

Most sections represent a component type, which you can read about in more detail in this document.

These types are hierarchical. For example, an input can have a list of child processor types attached to it, which in turn can have their own processor children.

This is powerful but can potentially lead to large and cumbersome configuration files. This document outlines tooling provided by Redpanda Connect to help with writing and managing these more complex configuration files.

Testing

For guidance on how to write and run unit tests for your configuration files read this guide.

Customizing your configuration

Sometimes it’s useful to write a configuration where certain fields can be defined during deployment. For this purpose Redpanda Connect supports environment variable interpolation, allowing you to set fields in your config with environment variables like so:

input:
  kafka:
    addresses:
    - ${KAFKA_BROKER:localhost:9092}
    topics:
    - ${KAFKA_TOPIC:default-topic}

This is very useful for sharing configuration files across different deployment environments.

Labels

Labels are unique, user-defined identifiers used throughout Redpanda Connect configurations. They serve two purposes:

  • Reference: Allow different parts of your pipeline to refer to specific components or resources.

  • Readability: Make your configuration more understandable for humans, especially in complex deployments.

You can assign labels to most pipeline components, including resources, inputs, outputs, processors, and entire pipelines. Using clear, descriptive labels improves both maintainability and clarity.

Labels are commonly applied to the following components:

Resources

Labels identify reusable resources such as processors, caches, and rate limiters, making them easy to reference elsewhere in your pipeline.

processor_resources:
  - label: my-transformer       # Processor resource label
    mapping: 'root = content().uppercase()'

cache_resources:
  - label: user-cache           # Cache resource label
    memory:
      default_ttl: 300s

rate_limit_resources:
  - label: api-limiter          # Rate limiter resource label
    local:
      count: 100
      interval: 1m

Component labeling for clarity

You can also use labels on inputs, outputs, processors, and other components to improve the human-readability of your configuration and make troubleshooting easier. For example:

input:
  label: ingest_api
  http_server: {}

pipeline:
  label: user_data_ingest
  processors:
    - label: sanitize_fields
      mapping: 'root = this.trim()'
    - resource: my-transformer

Label naming requirements

Labels must meet the following criteria:

  • Length: 3-128 characters

  • Allowed characters: Alphanumeric, hyphens, and underscores (A-Za-z0-9-_)

  • Case sensitivity: Labels are case-sensitive

Example valid labels
my-processor
data_transformer_01
UserAnalytics-v2
Example invalid labels
ab                   // Too short (less than 3 characters)
my.processor         // Invalid character: period
my processor         // Invalid character: space

Reusing configuration snippets

Sometimes it’s necessary to use a rather large component multiple times. Instead of copy/pasting the configuration or using YAML anchors you can define your component as a resource.

In the following example we want to make an HTTP request with our payloads. Occasionally the payload might get rejected due to garbage within its contents, and so we catch these rejected requests, attempt to "cleanse" the contents and try to make the same HTTP request again. Since the HTTP request component is quite large (and likely to change over time) we make sure to avoid duplicating it by defining it as a resource get_foo:

pipeline:
  processors:
    - resource: get_foo
    - catch:
      - mapping: |
          root = this
          root.content = this.content.strip_html()
      - resource: get_foo

processor_resources:
  - label: get_foo
    http:
      url: http://example.com/foo
      verb: POST
      headers:
        SomeThing: "set-to-this"
        SomeThingElse: "set-to-something-else"

Shutting down

Under normal operating conditions, the Redpanda Connect process will shut down when there are no more messages produced by inputs and the final message has been processed. The shutdown procedure can also be initiated by sending the process a interrupt (SIGINT) or termination (SIGTERM) signal. There are two top-level configuration options that control the shutdown behavior: shutdown_timeout and shutdown_delay.

Shutdown delay

The shutdown_delay option can be used to delay the start of the shutdown procedure. This is useful for pipelines that need a short grace period to have their metrics and traces scraped. While the shutdown delay is in effect, the HTTP metrics endpoint continues to be available for scraping and any active tracers are free to flush remaining traces.

The shutdown delay can be interrupted by sending the Redpanda Connect process a second OS interrupt or termination signal.

Shutdown timeout

The shutdown_timeout option sets a hard deadline for Redpanda Connect process to gracefully terminate. If this duration is exceeded then the process is forcefully terminated and any messages that were in-flight will be dropped.

This option takes effect after the shutdown_delay duration has passed if that is enabled.