protobuf

Performs conversions to or from a protobuf message. This processor uses reflection, meaning conversions can be made directly from the target .proto files.

# Config fields, showing default values
label: ""
protobuf:
  operator: "" # No default (required)
  message: "" # No default (required)
  discard_unknown: false
  use_proto_names: false
  import_paths: []

The main functionality of this processor is to map to and from JSON documents, you can read more about JSON mapping of protobuf messages here: https://developers.google.com/protocol-buffers/docs/proto3#json

Using reflection for processing protobuf messages in this way is less performant than generating and using native code. Therefore when performance is critical it is recommended that you use Redpanda Connect plugins instead for processing protobuf messages natively, you can find an example of Redpanda Connect plugins at https://github.com/benthosdev/benthos-plugin-example

Operators

to_json

Converts protobuf messages into a generic JSON structure. This makes it easier to manipulate the contents of the document within Redpanda Connect.

from_json

Attempts to create a target protobuf message from a generic JSON structure.

Examples

  • JSON to Protobuf

  • Protobuf to JSON

If we have the following protobuf definition within a directory called testing/schema:

syntax = "proto3";
package testing;

import "google/protobuf/timestamp.proto";

message Person {
  string first_name = 1;
  string last_name = 2;
  string full_name = 3;
  int32 age = 4;
  int32 id = 5; // Unique ID number for this person.
  string email = 6;

  google.protobuf.Timestamp last_updated = 7;
}

And a stream of JSON documents of the form:

{
	"firstName": "caleb",
	"lastName": "quaye",
	"email": "caleb@myspace.com"
}

We can convert the documents into protobuf messages with the following config:

pipeline:
  processors:
    - protobuf:
        operator: from_json
        message: testing.Person
        import_paths: [ testing/schema ]

If we have the following protobuf definition within a directory called testing/schema:

syntax = "proto3";
package testing;

import "google/protobuf/timestamp.proto";

message Person {
  string first_name = 1;
  string last_name = 2;
  string full_name = 3;
  int32 age = 4;
  int32 id = 5; // Unique ID number for this person.
  string email = 6;

  google.protobuf.Timestamp last_updated = 7;
}

And a stream of protobuf messages of the type Person, we could convert them into JSON documents of the format:

{
	"firstName": "caleb",
	"lastName": "quaye",
	"email": "caleb@myspace.com"
}

With the following config:

pipeline:
  processors:
    - protobuf:
        operator: to_json
        message: testing.Person
        import_paths: [ testing/schema ]

Fields

operator

The operator to execute

Type: string

Options: to_json , from_json .

message

The fully qualified name of the protobuf message to convert to/from.

Type: string

discard_unknown

If true, the from_json operator discards fields that are unknown to the schema.

Type: bool

Default: false

use_proto_names

If true, the to_json operator deserializes fields exactly as named in schema file.

Type: bool

Default: false

import_paths

A list of directories containing .proto files, including all definitions required for parsing the target message. If left empty the current directory is used. Each directory listed will be walked with all found .proto files imported.

Type: array

Default: []