Docs Connect Configuration Using CUE Using CUE CUE support is experimental. It may change for some time to improve CUE’s ability to type-check Redpanda Connect configurations at the expense of causing new validation errors when moving from one Redpanda Connect release to the next. CUE is a powerful configuration language that makes it easier and safer to build Redpanda Connect configurations. It achieves this by validating and type-checking configurations as well as allowing you to build useful utilities that reduce boilerplate. In this guide, we will see how to build a Redpanda Connect configuration using CUE, export it to YAML and execute it. Prerequisites Before you get started, ensure that you have installed CUE by following this guide. If this is your first time workin with it, then it’s a great idea to step through the CUE tutorial and familiarize yourself with the language. Create Create a directory for the CUE module that will contain the configuration: mkdir hello-cue cd hello-cue cue mod init example.com/hello-cue touch config.cue CUE modules must start with a hostname. This will typically be the URL of your repository. For example: cue mod init github.com/redpanda-data/hello-cue. The rpk connect list command will generate a CUE package containing the types we’ll need to build our configuration. Let’s write this package into our project: mkdir benthos rpk connect list --format cue > benthos/schema.cue At this point, you should now have the following directory structure: hello-cue/ benthos/ schema.cue cue.mod/ pkg/ usr/ module.cue config.cue We are now ready to write our Redpanda Connect config in CUE. Let’s start by editing our config.cue to include the following snippet: import "example.com/hello-cue/benthos" benthos.#Config & { input: { generate: { mapping: """ root = { "message": "Hello, CUE!" } """ } } pipeline: { processors: [ { mapping: """ root = this root.id = uuid_v4() """ } ] } output: { stdout: {} } } Let’s see what this will look like as YAML by running cue export while in the hello-cue directory: cue export --out yaml config.cue This should output something like this: input: generate: mapping: 'root = { "message": "Hello, CUE!" }' pipeline: processors: - mapping: |- root = this root.id = uuid_v4() output: stdout: {} tests: [] We can run this with Redpanda Connect to see that it indeed works: rpk connect run <(cue export --out yaml config.cue) When you are satisfied with the results, terminate the Redpanda Connect process and let’s move on to look at some of the nice features that we get with CUE. Enhance The config.cue above looks eerily like JSON. This is because CUE is a superset of JSON and shares its syntax. However, we can shorten our configuration to reduce indentation and curly brackets. Let’s rewrite config.cue to look like this: import "example.com/hello-cue/benthos" benthos.#Config & { input: generate: mapping: """ root = { "message": "Hello, CUE!" } """ pipeline: processors: [ { mapping: """ root = this root.id = uuid_v4() """ } ] output: stdout: {} } If you run the same cue export command from earlier, you’ll notice that the YAML output is the same. Next, we’ll look at what some error handling patterns might look like with CUE. One typical technique to detect messages with errors is to use the switch output to wrap another output with some error detection and handling. Another pattern involves limiting the number of retries on a given output that is misbehaving and rejecting or dropping messages with some useful logging. If we combine all these concepts together we get: output: switch: cases: - check: errored() output: reject: "failed to process message: ${! error() }" - output: retry: max_retries: 5 output: gcp_pubsub: project: "sample-project" topic: "sample-topic" There are quite a few lines of YAML here and we seem to be going sideways as we compose more functionality. We can try and make this more manageable with CUE! Let’s create a new file in our hello-cue directory called benthos/helpers.cue: touch benthos/helpers.cue In this file, add the following snippet: package benthos #Guarded: self = { // The desired output that will be wrapped with error handling mechanisms #output: #Output // The error text to emit if the output receives any messages which contained // processing errors #errorMessage: string // The number of retries to attempt on the desired output (default is 3) #maxRetries: uint | *3 // The error message to emit if the retry attempts are exhausted #retryErrorMessage: string // Whether to drop or reject any failed messages #errorHandling: "drop" | "reject" switch: cases: [ { check: "errored()" output: { if self.#errorHandling == "reject" { reject: self.#errorMessage } if self.#errorHandling == "drop" { drop: {} processors: [{ log: message: self.#errorMessage }] } } }, { output: fallback: [ { retry: { max_retries: self.#maxRetries output: self.#output } }, { if self.#errorHandling == "reject" { reject: self.#retryErrorMessage } if self.#errorHandling == "drop" { drop: {} processors: [{ log: message: self.#retryErrorMessage }] } } ] } ] } Now, let’s get back to config.cue and edit a few bits while leveraging this helper: import "example.com/hello-cue/benthos" benthos.#Config & { input: generate: { count: 1 interval: "0" mapping: """ root = { "message": "Hello, CUE!" } """ } output: benthos.#Guarded & { #errorMessage: "failed to process message: ${! error() }" #maxRetries: 3 #retryErrorMessage: "failed to output message after \(#maxRetries) retries" #errorHandling: "drop" #output: http_client: { url: "http://localhost:4195/sad-blob" retries: 0 } } } If you rerun cue export now, you’ll see that we’ve wrapped our output with a couple of error handling mechanisms. We also had access to powerful CUE features like conditional fields based on #errorHandling, default values and interpolations. input: generate: count: 1 interval: "0" mapping: 'root = { "message": "Hello, CUE!" }' output: switch: cases: - check: errored() output: drop: {} processors: - log: message: 'failed to process message: ${! error() }' - output: fallback: - retry: max_retries: 3 output: http_client: url: http://localhost:4195/sad-blob retries: 0 - drop: {} processors: - log: message: failed to output message after 3 retries tests: [] The final directory structure of your hello-cue project should look like this: hello-cue/ benthos/ schema.cue helpers.cue cue.mod/ pkg/ usr/ module.cue config.cue Included CUE types The benthos.cue file we emitted earlier contains a number of useful types that we can use when build configuration files and helpers. These include: benthos.#Config This definition describes the format of a Redpanda Connect config file. You’ll want to use it at the top of your configuration file to validate its overall structure. benthos.#Input benthos.#Output benthos.#Processor benthos.#RateLimit benthos.#Buffer benthos.#Cache benthos.#Metric benthos.#Tracer Each of these definitions is a disjunction that holds all the corresponding components in Redpanda Connect. In other words, a CUE field that is specified as benthos.#Input, such as myfield: benthos.#Input, must resolve to a valid Redpanda Connect input. Wrap up Being able to define helper packages and definitions like #Guarded and reusing them across your Redpanda Connect configurations is a really powerful feature of CUE. This will allow you to share consistent good practices without messy boilerplate across projects and teams! Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution Dynamic Inputs and Outputs Components