Docs Connect Components Processors awk awk Available in: Self-Managed Executes an AWK program on messages. This processor is very powerful as it offers a range of custom functions for querying and mutating message contents and metadata. # Config fields, showing default values label: "" awk: codec: "" # No default (required) program: "" # No default (required) Works by feeding message contents as the program input based on a chosen codec and replaces the contents of each message with the result. If the result is empty (nothing is printed by the program) then the original message contents remain unchanged. Comes with a wide range of custom functions for accessing message metadata, json fields, printing logs, etc. These functions can be overridden by functions within the program. Check out the examples section in order to see how this processor can be used. This processor uses GoAWK, in order to understand the differences in how the program works you can read more about it in goawk.differences. Fields codec A codec defines how messages should be inserted into the AWK program as variables. The codec does not change which custom Redpanda Connect functions are available. The text codec is the closest to a typical AWK use case. Type: string Options: none , text , json . program An AWK program to execute Type: string Examples JSON Mapping and Arithmetic Stuff With Arrays Because AWK is a full programming language it’s much easier to map documents and perform arithmetic with it than with other Redpanda Connect processors. For example, if we were expecting documents of the form: {"doc":{"val1":5,"val2":10},"id":"1","type":"add"} {"doc":{"val1":5,"val2":10},"id":"2","type":"multiply"} And we wished to perform the arithmetic specified in the type field, on the values val1 and val2 and, finally, map the result into the document, giving us the following resulting documents: {"doc":{"result":15,"val1":5,"val2":10},"id":"1","type":"add"} {"doc":{"result":50,"val1":5,"val2":10},"id":"2","type":"multiply"} We can do that with the following: pipeline: processors: - awk: codec: none program: | function map_add_vals() { json_set_int("doc.result", json_get("doc.val1") + json_get("doc.val2")); } function map_multiply_vals() { json_set_int("doc.result", json_get("doc.val1") * json_get("doc.val2")); } function map_unknown(type) { json_set("error","unknown document type"); print_log("Document type not recognised: " type, "ERROR"); } { type = json_get("type"); if (type == "add") map_add_vals(); else if (type == "multiply") map_multiply_vals(); else map_unknown(type); } It’s possible to iterate JSON arrays by appending an index value to the path, this can be used to do things like removing duplicates from arrays. For example, given the following input document: {"path":{"to":{"foos":["one","two","three","two","four"]}}} We could create a new array foos_unique from foos giving us the result: {"path":{"to":{"foos":["one","two","three","two","four"],"foos_unique":["one","two","three","four"]}}} With the following config: pipeline: processors: - awk: codec: none program: | { array_path = "path.to.foos" array_len = json_length(array_path) for (i = 0; i < array_len; i++) { ele = json_get(array_path "." i) if ( ! ( ele in seen ) ) { json_append(array_path "_unique", ele) seen[ele] = 1 } } } Codecs The chosen codec determines how the contents of the message are fed into the program. Codecs only impact the input string and variables initialized for your program, they do not change the range of custom functions available. none An empty string is fed into the program. Functions can still be used in order to extract and mutate metadata and message contents. This is useful for when your program only uses functions and doesn’t need the full text of the message to be parsed by the program, as it is significantly faster. text The full contents of the message are fed into the program as a string, allowing you to reference tokenized segments of the message with variables ($0, $1, etc). Custom functions can still be used with this codec. This is the default codec as it behaves most similar to typical usage of the awk command line tool. json An empty string is fed into the program, and variables are automatically initialized before execution of your program by walking the flattened JSON structure. Each value is converted into a variable by taking its full path, e.g. the object: { "foo": { "bar": { "value": 10 }, "created_at": "2018-12-18T11:57:32" } } Would result in the following variable declarations: foo_bar_value = 10 foo_created_at = "2018-12-18T11:57:32" Custom functions can also still be used with this codec. AWK functions json_get Signature: json_get(path) Attempts to find a JSON value in the input message payload by a dot separated path and returns it as a string. json_set Signature: json_set(path, value) Attempts to set a JSON value in the input message payload identified by a dot separated path, the value argument will be interpreted as a string. In order to set non-string values use one of the following typed varieties: json_set_int(path, value) json_set_float(path, value) json_set_bool(path, value) json_append Signature: json_append(path, value) Attempts to append a value to an array identified by a dot separated path. If the target does not exist it will be created. If the target exists but is not already an array then it will be converted into one, with its original contents set to the first element of the array. The value argument will be interpreted as a string. In order to append non-string values use one of the following typed varieties: json_append_int(path, value) json_append_float(path, value) json_append_bool(path, value) json_delete Signature: json_delete(path) Attempts to delete a JSON field from the input message payload identified by a dot separated path. json_length Signature: json_length(path) Returns the size of the string or array value of JSON field from the input message payload identified by a dot separated path. If the target field does not exist, or is not a string or array type, then zero is returned. In order to explicitly check the type of a field use json_type. json_type Signature: json_type(path) Returns the type of a JSON field from the input message payload identified by a dot separated path. Possible values are: "string", "int", "float", "bool", "undefined", "null", "array", "object". create_json_object Signature: create_json_object(key1, val1, key2, val2, …) Generates a valid JSON object of key value pair arguments. The arguments are variadic, meaning any number of pairs can be listed. The value will always resolve to a string regardless of the value type. E.g. the following call: create_json_object("a", "1", "b", 2, "c", "3") Would result in this string: \{"a":"1","b":"2","c":"3"} create_json_array Signature: create_json_array(val1, val2, …) Generates a valid JSON array of value arguments. The arguments are variadic, meaning any number of values can be listed. The value will always resolve to a string regardless of the value type. E.g. the following call: create_json_array("1", 2, "3") Would result in this string: ["1","2","3"] metadata_set Signature: metadata_set(key, value) Set a metadata key for the message to a value. The value will always resolve to a string regardless of the value type. metadata_get Signature: metadata_get(key) string Get the value of a metadata key from the message. timestamp_unix Signature: timestamp_unix() int Returns the current unix timestamp (the number of seconds since 01-01-1970). timestamp_unix Signature: timestamp_unix(date) int Attempts to parse a date string by detecting its format and returns the equivalent unix timestamp (the number of seconds since 01-01-1970). timestamp_unix Signature: timestamp_unix(date, format) int Attempts to parse a date string according to a format and returns the equivalent unix timestamp (the number of seconds since 01-01-1970). The format is defined by showing how the reference time, defined to be Mon Jan 2 15:04:05 -0700 MST 2006 would be displayed if it were the value. timestamp_unix_nano Signature: timestamp_unix_nano() int Returns the current unix timestamp in nanoseconds (the number of nanoseconds since 01-01-1970). timestamp_unix_nano Signature: timestamp_unix_nano(date) int Attempts to parse a date string by detecting its format and returns the equivalent unix timestamp in nanoseconds (the number of nanoseconds since 01-01-1970). timestamp_unix_nano Signature: timestamp_unix_nano(date, format) int Attempts to parse a date string according to a format and returns the equivalent unix timestamp in nanoseconds (the number of nanoseconds since 01-01-1970). The format is defined by showing how the reference time, defined to be Mon Jan 2 15:04:05 -0700 MST 2006 would be displayed if it were the value. timestamp_format Signature: timestamp_format(unix, format) string Formats a unix timestamp. The format is defined by showing how the reference time, defined to be Mon Jan 2 15:04:05 -0700 MST 2006 would be displayed if it were the value. The format is optional, and if omitted RFC3339 (2006-01-02T15:04:05Z07:00) will be used. timestamp_format_nano Signature: timestamp_format_nano(unixNano, format) string Formats a unix timestamp in nanoseconds. The format is defined by showing how the reference time, defined to be Mon Jan 2 15:04:05 -0700 MST 2006 would be displayed if it were the value. The format is optional, and if omitted RFC3339 (2006-01-02T15:04:05Z07:00) will be used. print_log Signature: print_log(message, level) Prints a Redpanda Connect log message at a particular log level. The log level is optional, and if omitted the level INFO will be used. base64_encode Signature: base64_encode(data) Encodes the input data to a base64 string. base64_decode Signature: base64_decode(data) Attempts to base64-decode the input data and returns the decoded string if successful. It will emit an error otherwise. Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution avro aws_bedrock_chat