Error Handling

Redpanda Connect supports a range of processors, such as http and aws_lambda, that may fail when retry attempts are exhausted. When a processor fails, the message data continues through the pipeline mostly unchanged, except for the addition of a metadata flag, which you can use for handling errors.

This topic explains some common error-handling patterns, including dropping messages, recovering them with more processing, and routing them to a dead-letter queue. It also shows how to combine these approaches, where appropriate.

Abandon on failure

You can use the try processor to define a list of processors that are executed in sequence. If a processor fails for a particular message, that message skips the remaining processors.

For example:

  • If processor_1 fails to process a message, that message skips processor_2 and processor_3.

  • If a message is processed by processor_1, but processor_2 fails, that message skips processor_3, and so on.

pipeline:
  processors:
    - try:
      - resource: processor_1
      - resource: processor_2 # Skip if processor_1 fails
      - resource: processor_3 # Skip if processor_1 or processor_2 fails

Recover failed messages

You can also route failed messages through defined processing steps using a catch processor.

For example, if processor_1 fails to process a message, it is rerouted to processor_2.

pipeline:
  processors:
    - resource: processor_1 # Processor that might fail
    - catch:
      - resource: processor_2 # Processes rerouted messages

After messages complete all processing steps defined in the catch block, failure flags are removed and they are treated like regular messages.

To keep failure flags in messages, you can simulate a catch block using a switch processor:

pipeline:
  processors:
    - resource: processor_1 # Processor that might fail
    - switch:
      - check: errored()
        processors:
          - resource: processor_2 # Processes rerouted messages

Logging errors

When an error occurs, there may be useful information stored in the error flag. You can use error Bloblang function interpolations to write this information to logs. You can also add the following Bloblang functions to expose additional details about the processor that triggered the error.

For example, this configuration catches processor failures and writes the following information to logs:

  • The label of the processor (${!error_source_label()}) that failed

  • The cause of the failure (${!error()})

pipeline:
  processors:
    - try:
      - resource: processor_1 # Processor that might fail
      - resource: processor_2 # Processor that might fail
      - resource: processor_3 # Processor that might fail
    - catch:
      - log:
          message: "Processor ${!error_source_label()} failed due to: ${!error()}"

You could also add an error message to the message payload:

pipeline:
  processors:
    - resource: processor_1 # Processor that might fail
    - resource: processor_2 # Processor that might fail
    - resource: processor_3 # Processor that might fail
    - catch:
      - mapping: |
          root = this
          root.meta.error = error()

Attempt until success

To process a particular message until it is successful, try using a retry processor:

pipeline:
  processors:
    - retry:
        backoff:
          initial_interval: 1s
          max_interval: 5s
          max_elapsed_time: 30s
        processors:
          # Retries this processor until the message is processed, or the maximum elapsed time is reached.
          - resource: processor_1

Drop failed messages

To filter out any failed messages from your pipeline, you can use a mapping processor:

pipeline:
  processors:
    - mapping: root = if errored() { deleted() }

The mapping uses the error flag to identify any failed messages in a batch and drops the messages, which propagates acknowledgements (also known as "acks") upstream to the pipeline’s input.

Reject messages

Some inputs, such as nats, gcp_pubsub, and amqp_1, support nacking (rejecting) messages. Rather than delivering unprocessed messages to your output, you can use the reject_errored output to perform a nack (or rejection) on them:

output:
  reject_errored:
    resource: processor_1 # Only non-errored messages go here

Route to a dead-letter queue

You can also route failed messages to a different output by nesting the reject_errored output within a fallback output

output:
  fallback:
    - reject_errored:
        resource: processor_1 # Only non-errored messages go here
    - resource: processor_2 # Only errored messages, or delivery failures to processor_1, go here

Finally, in cases where you want to route data differently depending on the type of error message, you can use a switch output:

output:
  switch:
    cases:
      # Capture specifically cat-related errors
      - check: errored() && error().contains("meow")
        output:
          resource: processor_1

      # Capture all other errors
      - check: errored()
        output:
          resource: processor_2

      # Finally, route all successfully processed messages here
      - output:
          resource: processor_3