schema_registry

Publishes schemas to a schema registry. This output uses the Franz Kafka Schema Registry client.

  • Common

  • Advanced

# Common configuration fields, showing default values
output:
  label: ""
  schema_registry:
    url: "" # No default (required)
    subject: "" # No default (required)
    max_in_flight: 64
# All configuration fields, showing default values
output:
  label: ""
  schema_registry:
    url: "" # No default (required)
    subject: "" # No default (required)
    backfill_dependencies: true
    input_resource: schema_registry_input
    tls:
      enabled: false
      skip_cert_verify: false
      enable_renegotiation: false
      root_cas: ""
      root_cas_file: ""
      client_certs: []
    max_in_flight: 64
    oauth:
      enabled: false
      consumer_key: ""
      consumer_secret: ""
      access_token: ""
      access_token_secret: ""
    basic_auth:
      enabled: false
      username: ""
      password: ""
    jwt:
      enabled: false
      private_key_file: ""
      signing_method: ""
      claims: {}
      headers: {}

Performance

The schema_registry output sends multiple messages in parallel for improved performance. You can use the max_in_flight field to tune the maximum number of in-flight messages, or message batches.

Example

This example writes schemas to a schema registry instance and logs errors for existing schemas.

output:
  fallback:
    - schema_registry:
        url: http://localhost:8082
        subject: ${! @schema_registry_subject }
    - switch:
        cases:
          - check: '@fallback_error == "request returned status: 422"'
            output:
              drop: {}
              processors:
                - log:
                    message: |
                      Subject '${! @schema_registry_subject }' version ${! @schema_registry_version } already has schema: ${! content() }
          - output:
              reject: ${! @fallback_error }

Fields

url

The base URL of the schema registry service.

Type: string

subject

The subject name.

This field supports interpolation functions.

Type: string

backfill_dependencies

Backfill missing schema references and previous schema versions. If set to true, you must also configure a schema_registry input to read source schemas.

Type: bool

Default: true

input_resource

The label of the schema_registry input from which to read source schemas.

Type: string

Default: schema_registry_input

tls

Override system defaults with custom TLS settings.

Type: object

tls.enabled

Whether custom TLS settings are enabled.

Type: bool

Default: false

tls.skip_cert_verify

Whether to skip server side certificate verification.

Type: bool

Default: false

tls.enable_renegotiation

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.

Type: bool

Default: false

tls.root_cas

Specify a certificate authority to use (optional). This is a string that represents a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

# Examples

root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----

tls.root_cas_file

Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate.

Type: string

Default: ""

# Examples

root_cas_file: ./root_cas.pem

tls.client_certs

A list of client certificates to use. For each certificate specify values for either the cert and key fields, or cert_file and key_file fields.

Type: array

Default: []

# Examples

client_certs:
  - cert: foo
    key: bar

client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key

tls.client_certs[].cert

The plain text certificate to use.

Type: string

Default: ""

tls.client_certs[].key

The plain text certificate key to use.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

tls.client_certs[].cert_file

The path to the certificate to use.

Type: string

Default: ""

tls.client_certs[].key_file

The path of a certificate key to use.

Type: string

Default: ""

tls.client_certs[].password

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding Oracle attacks that can let an attacker recover the plaintext.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

# Examples

password: foo

password: ${KEY_PASSWORD}

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this number to improve throughput.

Type: int

Default: 64

oauth

Configure OAuth version 1.0 to give this component authorized access to your schema registry.

Type: object

oauth.enabled

Whether to use OAuth version 1 in requests.

Type: bool

Default: false

oauth.consumer_key

The value used to identify this component or client to your schema registry.

Type: string

Default: ""

oauth.consumer_secret

The secret used to establish ownership of the consumer key.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

oauth.access_token

The value this component can use to gain access to the schema registry.

Type: string

Default: ""

oauth.access_token_secret

The secret that establishes ownership of the oauth.access_token.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

basic_auth

Configure basic authentication for requests from this component to your schema registry.

Type: object

basic_auth.enabled

Whether to use basic authentication in requests.

Type: bool

Default: false

basic_auth.username

The username of the account credentials to authenticate as.

Type: string

Default: ""

basic_auth.password

The password of the account credentials to authenticate with.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

jwt

BETA: Configure JSON Web Token (JWT) authentication for the secure transmission of data from this component to your schema registry.

Type: object

jwt.enabled

Whether to use JWT authentication in requests.

Type: bool

Default: false

jwt.private_key_file

A PEM-encoded file containing a private key that is formatted using either PKCS1 or PKCS8 standards.

Type: string

Default: ""

jwt.signing_method

The method used to sign the token, such as RS256, RS384, RS512 or EdDSA.

Type: string

Default: ""

jwt.claims

Values used to pass the identity of the authenticated entity to the service provider. In this case, between this component and the schema registry.

Type: object

Default: {}

jwt.headers

The key/value pairs that identify the type of token and signing algorithm.

Type: object

Default: {}