Docs Cloud Redpanda Connect Configuration Unit Testing Unit Testing The Redpanda Connect service offers a command rpk connect test for running unit tests on sections of a configuration file. This makes it easy to protect your config files from regressions over time. Writing a test Let’s imagine we have a configuration file foo.yaml containing some processors: input: kafka: addresses: [ TODO ] topics: [ foo, bar ] consumer_group: foogroup pipeline: processors: - mapping: '"%vend".format(content().uppercase().string())' output: aws_s3: bucket: TODO path: '${! meta("kafka_topic") }/${! json("message.id") }.json' One way to write our unit tests for this config is to accompany it with a file of the same name and extension but suffixed with _benthos_test, which in this case would be foo_benthos_test.yaml. tests: - name: example test target_processors: '/pipeline/processors' environment: {} input_batch: - content: 'example content' metadata: example_key: example metadata value output_batches: - - content_equals: EXAMPLE CONTENTend metadata_equals: example_key: example metadata value Under tests we have a list of any number of unit tests to execute for the config file. Each test is run in complete isolation, including any resources defined by the config file. Tests should be allocated a unique name that identifies the feature being tested. The field target_processors is either the label of a processor to test, or a JSON Pointer that identifies the position of a processor, or list of processors, within the file which should be executed by the test. For example a value of foo would target a processor with the label foo, and a value of /input/processors would target all processors within the input section of the config. The field environment allows you to define an object of key/value pairs that set environment variables to be evaluated during the parsing of the target config file. These are unique to each test, allowing you to test different environment variable interpolation combinations. The field input_batch lists one or more messages to be fed into the targeted processors as a batch. Each message of the batch may have its raw content defined as well as metadata key/value pairs. For the common case where the messages are in JSON format, you can use json_content instead of content to specify the message structurally rather than verbatim. The field output_batches lists any number of batches of messages which are expected to result from the target processors. Each batch lists any number of messages, each one defining conditions to describe the expected contents of the message. If the number of batches defined does not match the resulting number of batches the test will fail. If the number of messages defined in each batch does not match the number in the resulting batches the test will fail. If any condition of a message fails then the test fails. Inline tests Sometimes it’s more convenient to define your tests within the config being tested. This is fine, simply add the tests field to the end of the config being tested. Bloblang tests Sometimes when working with large Bloblang mappings it’s preferred to have the full mapping in a separate file to your Redpanda Connect configuration. In this case it’s possible to write unit tests that target and execute the mapping directly with the field target_mapping, which when specified is interpreted as either an absolute path or a path relative to the test definition file that points to a file containing only a Bloblang mapping. For example, if we were to have a file cities.blobl containing a mapping: root.Cities = this.locations. filter(loc -> loc.state == "WA"). map_each(loc -> loc.name). sort().join(", ") We can accompany it with a test file cities_test.yaml containing a regular test definition: tests: - name: test cities mapping target_mapping: './cities.blobl' environment: {} input_batch: - content: | { "locations": [ {"name": "Seattle", "state": "WA"}, {"name": "New York", "state": "NY"}, {"name": "Bellevue", "state": "WA"}, {"name": "Olympia", "state": "WA"} ] } output_batches: - - json_equals: {"Cities": "Bellevue, Olympia, Seattle"} And execute this test the same way we execute other Redpanda Connect tests (rpk connect test ./dir/cities_test.yaml, rpk connect test ./dir/…, etc). Fragmented tests Sometimes the number of tests you need to define in order to cover a config file is so vast that it’s necessary to split them across multiple test definition files. This is possible but Redpanda Connect still requires a way to detect the configuration file being targeted by these fragmented test definition files. In order to do this we must prefix our target_processors field with the path of the target relative to the definition file. The syntax of target_processors in this case is a full JSON Pointer that should look something like target.yaml#/pipeline/processors. For example, if we saved our test definition above in an arbitrary location like ./tests/first.yaml and wanted to target our original foo.yaml config file, we could do that with the following: tests: - name: example test target_processors: '../foo.yaml#/pipeline/processors' environment: {} input_batch: - content: 'example content' metadata: example_key: example metadata value output_batches: - - content_equals: EXAMPLE CONTENTend metadata_equals: example_key: example metadata value Input Definitions content Sets the raw content of the message. json_content json_content: foo: foo value bar: [ element1, 10 ] Sets the raw content of the message to a JSON document matching the structure of the value. file_content file_content: ./foo/bar.txt Sets the raw content of the message by reading a file. The path of the file should be relative to the path of the test file. metadata A map of key/value pairs that sets the metadata values of the message. Output Conditions bloblang bloblang: 'this.age > 10 && @foo.length() > 0' Executes a Bloblang expression on a message, if the result is anything other than a boolean equalling true the test fails. content_equals content_equals: example content Checks the full raw contents of a message against a value. content_matches content_matches: "^foo [a-z]+ bar$" Checks whether the full raw contents of a message matches a regular expression (re2). metadata_equals metadata_equals: example_key: example metadata value Checks a map of metadata keys to values against the metadata stored in the message. If there is a value mismatch between a key of the condition versus the message metadata this condition will fail. file_equals file_equals: ./foo/bar.txt Checks that the contents of a message matches the contents of a file. The path of the file should be relative to the path of the test file. file_json_equals file_json_equals: ./foo/bar.json Checks that both the message and the file contents are valid JSON documents, and that they are structurally equivalent. Will ignore formatting and ordering differences. The path of the file should be relative to the path of the test file. json_equals json_equals: { "key": "value" } Checks that both the message and the condition are valid JSON documents, and that they are structurally equivalent. Will ignore formatting and ordering differences. You can also structure the condition content as YAML and it will be converted to the equivalent JSON document for testing: json_equals: key: value json_contains json_contains: { "key": "value" } Checks that both the message and the condition are valid JSON documents, and that the message is a superset of the condition. Running tests Executing tests for a specific config can be done by pointing the subcommand test at either the config to be tested or its test definition, e.g. rpk connect test ./config.yaml and rpk connect test ./config_benthos_test.yaml are equivalent. The test subcommand also supports wildcard patterns e.g. rpk connect test ./foo/*.yaml will execute all tests within matching files. In order to walk a directory tree and execute all tests found you can use the shortcut ./…, e.g. rpk connect test ./… will execute all tests found in the current directory, any child directories, and so on. If you want to allow components to write logs at a provided level to stdout when running the tests, you can use rpk connect test --log <level>. Please consult the logger docs for further details. Mocking processors BETA: This feature is currently in a BETA phase, which means breaking changes could be made if a fundamental issue with the feature is found. Sometimes you’ll want to write tests for a series of processors, where one or more of them are networked (or otherwise stateful). Rather than creating and managing mocked services you can define mock versions of those processors in the test definition. For example, if we have a config with the following processors: pipeline: processors: - mapping: 'root = "simon says: " + content()' - label: get_foobar_api http: url: http://example.com/foobar verb: GET - mapping: 'root = content().uppercase()' Rather than create a fake service for the http processor to interact with we can define a mock in our test definition that replaces it with a mapping processor. Mocks are configured as a map of labels that identify a processor to replace and the config to replace it with: tests: - name: mocks the http proc target_processors: '/pipeline/processors' mocks: get_foobar_api: mapping: 'root = content().string() + " this is some mock content"' input_batch: - content: "hello world" output_batches: - - content_equals: "SIMON SAYS: HELLO WORLD THIS IS SOME MOCK CONTENT" With the above test definition the http processor will be swapped out for mapping: 'root = content().string() + " this is some mock content"'. For the purposes of mocking it is recommended that you use a mapping processor that simply mutates the message in a way that you would expect the mocked processor to. It’s not currently possible to mock components that are imported as separate resource files (using --resource/-r). It is recommended that you mock these by maintaining separate definitions for test purposes (-r "./test/*.yaml"). More granular mocking It is also possible to target specific fields within the test config by JSON pointers as an alternative to labels. The following test definition would create the same mock as the previous: tests: - name: mocks the http proc target_processors: '/pipeline/processors' mocks: /pipeline/processors/1: mapping: 'root = content().string() + " this is some mock content"' input_batch: - content: "hello world" output_batches: - - content_equals: "SIMON SAYS: HELLO WORLD THIS IS SOME MOCK CONTENT" Fields The schema of a template file is as follows: tests A list of one or more unit tests to execute. Type: array tests[].name The name of the test, this should be unique and give a rough indication of what behavior is being tested. Type: string tests[].environment An optional map of environment variables to set for the duration of the test. Type: object tests[].target_processors A [JSON Pointer][json-pointer] that identifies the specific processors which should be executed by the test. The target can either be a single processor or an array of processors. Alternatively a resource label can be used to identify a processor. It is also possible to target processors in a separate file by prefixing the target with a path relative to the test file followed by a # symbol. Type: string Default: "/pipeline/processors" # Examples target_processors: foo_processor target_processors: /pipeline/processors/0 target_processors: target.yaml#/pipeline/processors target_processors: target.yaml#/pipeline/processors tests[].target_mapping A file path relative to the test definition path of a Bloblang file to execute as an alternative to testing processors with the target_processors field. This allows you to define unit tests for Bloblang mappings directly. Type: string Default: "" tests[].mocks An optional map of processors to mock. Keys should contain either a label or a JSON pointer of a processor that should be mocked. Values should contain a processor definition, which will replace the mocked processor. Most of the time you’ll want to use a [mapping processor][processors.mapping] here, and use it to create a result that emulates the target processor. Type: object # Examples mocks: get_foobar_api: mapping: root = content().string() + " this is some mock content" mocks: /pipeline/processors/1: mapping: root = content().string() + " this is some mock content" tests[].input_batch Define a batch of messages to feed into your test, specify either an input_batch or a series of input_batches. Type: array tests[].input_batch[].content The raw content of the input message. Type: string tests[].input_batch[].json_content Sets the raw content of the message to a JSON document matching the structure of the value. Type: unknown # Examples json_content: bar: - element1 - 10 foo: foo value tests[].input_batch[].file_content Sets the raw content of the message by reading a file. The path of the file should be relative to the path of the test file. Type: string # Examples file_content: ./foo/bar.txt tests[].input_batch[].metadata A map of metadata key/values to add to the input message. Type: object tests[].input_batches Define a series of batches of messages to feed into your test, specify either an input_batch or a series of input_batches. Type: two-dimensional array tests[].input_batches[][].content The raw content of the input message. Type: string tests[].input_batches[][].json_content Sets the raw content of the message to a JSON document matching the structure of the value. Type: unknown # Examples json_content: bar: - element1 - 10 foo: foo value tests[].input_batches[][].file_content Sets the raw content of the message by reading a file. The path of the file should be relative to the path of the test file. Type: string # Examples file_content: ./foo/bar.txt tests[].input_batches[][].metadata A map of metadata key/values to add to the input message. Type: object tests[].output_batches List of output batches. Type: two-dimensional array tests[].output_batches[][].bloblang Executes a Bloblang mapping on the output message, if the result is anything other than a boolean equalling true the test fails. Type: string # Examples bloblang: this.age > 10 && @foo.length() > 0 tests[].output_batches[][].content_equals Checks the full raw contents of a message against a value. Type: string tests[].output_batches[][].content_matches Checks whether the full raw contents of a message matches a regular expression (re2). Type: string # Examples content_matches: ^foo [a-z]+ bar$ tests[].output_batches[][].metadata_equals Checks a map of metadata keys to values against the metadata stored in the message. If there is a value mismatch between a key of the condition versus the message metadata this condition will fail. Type: object # Examples metadata_equals: example_key: example metadata value tests[].output_batches[][].file_equals Checks that the contents of a message matches the contents of a file. The path of the file should be relative to the path of the test file. Type: string # Examples file_equals: ./foo/bar.txt tests[].output_batches[][].file_json_equals Checks that both the message and the file contents are valid JSON documents, and that they are structurally equivalent. Will ignore formatting and ordering differences. The path of the file should be relative to the path of the test file. Type: string # Examples file_json_equals: ./foo/bar.json tests[].output_batches[][].json_equals Checks that both the message and the condition are valid JSON documents, and that they are structurally equivalent. Will ignore formatting and ordering differences. Type: unknown # Examples json_equals: key: value tests[].output_batches[][].json_contains Checks that both the message and the condition are valid JSON documents, and that the message is a superset of the condition. Type: unknown # Examples json_contains: key: value tests[].output_batches[][].file_json_contains Checks that both the message and the file contents are valid JSON documents, and that the message is a superset of the condition. Will ignore formatting and ordering differences. The path of the file should be relative to the path of the test file. Type: string # Examples file_json_contains: ./foo/bar.json Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution Monitor Data Pipelines Components