Unit Testing

The Redpanda Connect service offers a command rpk connect test for running unit tests on sections of a configuration file. This makes it easy to protect your config files from regressions over time.

Writing a test

Let’s imagine we have a configuration file foo.yaml containing some processors:

input:
  kafka:
    addresses: [ TODO ]
    topics: [ foo, bar ]
    consumer_group: foogroup

pipeline:
  processors:
  - mapping: '"%vend".format(content().uppercase().string())'

output:
  aws_s3:
    bucket: TODO
    path: '${! meta("kafka_topic") }/${! json("message.id") }.json'

One way to write our unit tests for this config is to accompany it with a file of the same name and extension but suffixed with _benthos_test, which in this case would be foo_benthos_test.yaml.

tests:
  - name: example test
    target_processors: '/pipeline/processors'
    environment: {}
    input_batch:
      - content: 'example content'
        metadata:
          example_key: example metadata value
    output_batches:
      -
        - content_equals: EXAMPLE CONTENTend
          metadata_equals:
            example_key: example metadata value

Under tests we have a list of any number of unit tests to execute for the config file. Each test is run in complete isolation, including any resources defined by the config file. Tests should be allocated a unique name that identifies the feature being tested.

The field target_processors is either the label of a processor to test, or a JSON Pointer that identifies the position of a processor, or list of processors, within the file which should be executed by the test. For example a value of foo would target a processor with the label foo, and a value of /input/processors would target all processors within the input section of the config.

The field environment allows you to define an object of key/value pairs that set environment variables to be evaluated during the parsing of the target config file. These are unique to each test, allowing you to test different environment variable interpolation combinations.

The field input_batch lists one or more messages to be fed into the targeted processors as a batch. Each message of the batch may have its raw content defined as well as metadata key/value pairs.

For the common case where the messages are in JSON format, you can use json_content instead of content to specify the message structurally rather than verbatim.

The field output_batches lists any number of batches of messages which are expected to result from the target processors. Each batch lists any number of messages, each one defining conditions to describe the expected contents of the message.

If the number of batches defined does not match the resulting number of batches the test will fail. If the number of messages defined in each batch does not match the number in the resulting batches the test will fail. If any condition of a message fails then the test fails.

Inline tests

Sometimes it’s more convenient to define your tests within the config being tested. This is fine, simply add the tests field to the end of the config being tested.

Bloblang tests

Sometimes when working with large Bloblang mappings it’s preferred to have the full mapping in a separate file to your Redpanda Connect configuration. In this case it’s possible to write unit tests that target and execute the mapping directly with the field target_mapping, which when specified is interpreted as either an absolute path or a path relative to the test definition file that points to a file containing only a Bloblang mapping.

For example, if we were to have a file cities.blobl containing a mapping:

root.Cities = this.locations.
                filter(loc -> loc.state == "WA").
                map_each(loc -> loc.name).
                sort().join(", ")

We can accompany it with a test file cities_test.yaml containing a regular test definition:

tests:
  - name: test cities mapping
    target_mapping: './cities.blobl'
    environment: {}
    input_batch:
      - content: |
          {
            "locations": [
              {"name": "Seattle", "state": "WA"},
              {"name": "New York", "state": "NY"},
              {"name": "Bellevue", "state": "WA"},
              {"name": "Olympia", "state": "WA"}
            ]
          }
    output_batches:
      -
        - json_equals: {"Cities": "Bellevue, Olympia, Seattle"}

And execute this test the same way we execute other Redpanda Connect tests (rpk connect test ./dir/cities_test.yaml, rpk connect test ./dir/…​, etc).

Fragmented tests

Sometimes the number of tests you need to define in order to cover a config file is so vast that it’s necessary to split them across multiple test definition files. This is possible but Redpanda Connect still requires a way to detect the configuration file being targeted by these fragmented test definition files. In order to do this we must prefix our target_processors field with the path of the target relative to the definition file.

The syntax of target_processors in this case is a full JSON Pointer that should look something like target.yaml#/pipeline/processors. For example, if we saved our test definition above in an arbitrary location like ./tests/first.yaml and wanted to target our original foo.yaml config file, we could do that with the following:

tests:
  - name: example test
    target_processors: '../foo.yaml#/pipeline/processors'
    environment: {}
    input_batch:
      - content: 'example content'
        metadata:
          example_key: example metadata value
    output_batches:
      -
        - content_equals: EXAMPLE CONTENTend
          metadata_equals:
            example_key: example metadata value

Input Definitions

content

Sets the raw content of the message.

json_content

json_content:
  foo: foo value
  bar: [ element1, 10 ]

Sets the raw content of the message to a JSON document matching the structure of the value.

file_content

file_content: ./foo/bar.txt

Sets the raw content of the message by reading a file. The path of the file should be relative to the path of the test file.

metadata

A map of key/value pairs that sets the metadata values of the message.

Output Conditions

bloblang

bloblang: 'this.age > 10 && @foo.length() > 0'

Executes a Bloblang expression on a message, if the result is anything other than a boolean equalling true the test fails.

content_equals

content_equals: example content

Checks the full raw contents of a message against a value.

content_matches

content_matches: "^foo [a-z]+ bar$"

Checks whether the full raw contents of a message matches a regular expression (re2).

metadata_equals

metadata_equals:
  example_key: example metadata value

Checks a map of metadata keys to values against the metadata stored in the message. If there is a value mismatch between a key of the condition versus the message metadata this condition will fail.

file_equals

file_equals: ./foo/bar.txt

Checks that the contents of a message matches the contents of a file. The path of the file should be relative to the path of the test file.

file_json_equals

file_json_equals: ./foo/bar.json

Checks that both the message and the file contents are valid JSON documents, and that they are structurally equivalent. Will ignore formatting and ordering differences. The path of the file should be relative to the path of the test file.

json_equals

json_equals: { "key": "value" }

Checks that both the message and the condition are valid JSON documents, and that they are structurally equivalent. Will ignore formatting and ordering differences.

You can also structure the condition content as YAML and it will be converted to the equivalent JSON document for testing:

json_equals:
  key: value

json_contains

json_contains: { "key": "value" }

Checks that both the message and the condition are valid JSON documents, and that the message is a superset of the condition.

Running tests

Executing tests for a specific config can be done by pointing the subcommand test at either the config to be tested or its test definition, e.g. rpk connect test ./config.yaml and rpk connect test ./config_benthos_test.yaml are equivalent.

The test subcommand also supports wildcard patterns e.g. rpk connect test ./foo/*.yaml will execute all tests within matching files. In order to walk a directory tree and execute all tests found you can use the shortcut ./…​, e.g. rpk connect test ./…​ will execute all tests found in the current directory, any child directories, and so on.

If you want to allow components to write logs at a provided level to stdout when running the tests, you can use rpk connect test --log <level>. Please consult the logger docs for further details.

Mocking processors

BETA: This feature is currently in a BETA phase, which means breaking changes could be made if a fundamental issue with the feature is found.

Sometimes you’ll want to write tests for a series of processors, where one or more of them are networked (or otherwise stateful). Rather than creating and managing mocked services you can define mock versions of those processors in the test definition. For example, if we have a config with the following processors:

pipeline:
  processors:
    - mapping: 'root = "simon says: " + content()'
    - label: get_foobar_api
      http:
        url: http://example.com/foobar
        verb: GET
    - mapping: 'root = content().uppercase()'

Rather than create a fake service for the http processor to interact with we can define a mock in our test definition that replaces it with a mapping processor. Mocks are configured as a map of labels that identify a processor to replace and the config to replace it with:

tests:
  - name: mocks the http proc
    target_processors: '/pipeline/processors'
    mocks:
      get_foobar_api:
        mapping: 'root = content().string() + " this is some mock content"'
    input_batch:
      - content: "hello world"
    output_batches:
      - - content_equals: "SIMON SAYS: HELLO WORLD THIS IS SOME MOCK CONTENT"

With the above test definition the http processor will be swapped out for mapping: 'root = content().string() + " this is some mock content"'. For the purposes of mocking it is recommended that you use a mapping processor that simply mutates the message in a way that you would expect the mocked processor to.

It’s not currently possible to mock components that are imported as separate resource files (using --resource/-r). It is recommended that you mock these by maintaining separate definitions for test purposes (-r "./test/*.yaml").

More granular mocking

It is also possible to target specific fields within the test config by JSON pointers as an alternative to labels. The following test definition would create the same mock as the previous:

tests:
  - name: mocks the http proc
    target_processors: '/pipeline/processors'
    mocks:
      /pipeline/processors/1:
        mapping: 'root = content().string() + " this is some mock content"'
    input_batch:
      - content: "hello world"
    output_batches:
      - - content_equals: "SIMON SAYS: HELLO WORLD THIS IS SOME MOCK CONTENT"

Fields

The schema of a template file is as follows:

tests

A list of one or more unit tests to execute.

Type: array

tests[].name

The name of the test, this should be unique and give a rough indication of what behavior is being tested.

Type: string

tests[].environment

An optional map of environment variables to set for the duration of the test.

Type: object

tests[].target_processors

A [JSON Pointer][json-pointer] that identifies the specific processors which should be executed by the test. The target can either be a single processor or an array of processors. Alternatively a resource label can be used to identify a processor.

It is also possible to target processors in a separate file by prefixing the target with a path relative to the test file followed by a # symbol.

Type: string

Default: "/pipeline/processors"

# Examples

target_processors: foo_processor

target_processors: /pipeline/processors/0

target_processors: target.yaml#/pipeline/processors

target_processors: target.yaml#/pipeline/processors

tests[].target_mapping

A file path relative to the test definition path of a Bloblang file to execute as an alternative to testing processors with the target_processors field. This allows you to define unit tests for Bloblang mappings directly.

Type: string

Default: ""

tests[].mocks

An optional map of processors to mock. Keys should contain either a label or a JSON pointer of a processor that should be mocked. Values should contain a processor definition, which will replace the mocked processor. Most of the time you’ll want to use a [mapping processor][processors.mapping] here, and use it to create a result that emulates the target processor.

Type: object

# Examples

mocks:
  get_foobar_api:
    mapping: root = content().string() + " this is some mock content"

mocks:
  /pipeline/processors/1:
    mapping: root = content().string() + " this is some mock content"

tests[].input_batch

Define a batch of messages to feed into your test, specify either an input_batch or a series of input_batches.

Type: array

tests[].input_batch[].content

The raw content of the input message.

Type: string

tests[].input_batch[].json_content

Sets the raw content of the message to a JSON document matching the structure of the value.

Type: unknown

# Examples

json_content:
  bar:
    - element1
    - 10
  foo: foo value

tests[].input_batch[].file_content

Sets the raw content of the message by reading a file. The path of the file should be relative to the path of the test file.

Type: string

# Examples

file_content: ./foo/bar.txt

tests[].input_batch[].metadata

A map of metadata key/values to add to the input message.

Type: object

tests[].input_batches

Define a series of batches of messages to feed into your test, specify either an input_batch or a series of input_batches.

Type: two-dimensional array

tests[].input_batches[][].content

The raw content of the input message.

Type: string

tests[].input_batches[][].json_content

Sets the raw content of the message to a JSON document matching the structure of the value.

Type: unknown

# Examples

json_content:
  bar:
    - element1
    - 10
  foo: foo value

tests[].input_batches[][].file_content

Sets the raw content of the message by reading a file. The path of the file should be relative to the path of the test file.

Type: string

# Examples

file_content: ./foo/bar.txt

tests[].input_batches[][].metadata

A map of metadata key/values to add to the input message.

Type: object

tests[].output_batches

List of output batches.

Type: two-dimensional array

tests[].output_batches[][].bloblang

Executes a Bloblang mapping on the output message, if the result is anything other than a boolean equalling true the test fails.

Type: string

# Examples

bloblang: this.age > 10 && @foo.length() > 0

tests[].output_batches[][].content_equals

Checks the full raw contents of a message against a value.

Type: string

tests[].output_batches[][].content_matches

Checks whether the full raw contents of a message matches a regular expression (re2).

Type: string

# Examples

content_matches: ^foo [a-z]+ bar$

tests[].output_batches[][].metadata_equals

Checks a map of metadata keys to values against the metadata stored in the message. If there is a value mismatch between a key of the condition versus the message metadata this condition will fail.

Type: object

# Examples

metadata_equals:
  example_key: example metadata value

tests[].output_batches[][].file_equals

Checks that the contents of a message matches the contents of a file. The path of the file should be relative to the path of the test file.

Type: string

# Examples

file_equals: ./foo/bar.txt

tests[].output_batches[][].file_json_equals

Checks that both the message and the file contents are valid JSON documents, and that they are structurally equivalent. Will ignore formatting and ordering differences. The path of the file should be relative to the path of the test file.

Type: string

# Examples

file_json_equals: ./foo/bar.json

tests[].output_batches[][].json_equals

Checks that both the message and the condition are valid JSON documents, and that they are structurally equivalent. Will ignore formatting and ordering differences.

Type: unknown

# Examples

json_equals:
  key: value

tests[].output_batches[][].json_contains

Checks that both the message and the condition are valid JSON documents, and that the message is a superset of the condition.

Type: unknown

# Examples

json_contains:
  key: value

tests[].output_batches[][].file_json_contains

Checks that both the message and the file contents are valid JSON documents, and that the message is a superset of the condition. Will ignore formatting and ordering differences. The path of the file should be relative to the path of the test file.

Type: string

# Examples

file_json_contains: ./foo/bar.json