schema_registry

Beta

Reads schemas from a schema registry. You can use this connector to extract and back up schemas during a data migration. This input uses the Franz Kafka Schema Registry client.

Introduced in version 4.32.2.

  • Common

  • Advanced

# Common configuration fields, showing default values
input:
  label: ""
  schema_registry:
    url: "" # No default (required)
    auto_replay_nacks: true
# All configuration fields, showing default values
input:
  label: ""
  schema_registry:
    url: "" # No default (required)
    include_deleted: false
    subject_filter: ""
    fetch_in_order: true
    tls:
      enabled: false
      skip_cert_verify: false
      enable_renegotiation: false
      root_cas: ""
      root_cas_file: ""
      client_certs: []
    auto_replay_nacks: true
    oauth:
      enabled: false
      consumer_key: ""
      consumer_secret: ""
      access_token: ""
      access_token_secret: ""
    basic_auth:
      enabled: false
      username: ""
      password: ""
    jwt:
      enabled: false
      private_key_file: ""
      signing_method: ""
      claims: {}
      headers: {}

Metadata

The schema_registry input adds the following metadata fields to each message:

- schema_registry_subject
- schema_registry_version

You can access these metadata fields using function interpolation.

Example

This example reads all schemas from a schema registry that are associated with subjects matching the ^foo.* filter, including deleted schemas.

input:
  schema_registry:
    url: http://localhost:8081
    include_deleted: true
    subject_filter: ^foo.*

Fields

url

The base URL of the schema registry service.

Type: string

include_deleted

Include deleted entities.

Type: bool

Default: false

subject_filter

Include only subjects which match the regular expression filter, or leave this field value blank to select all subjects.

Type: string

Default: ""

fetch_in_order

Indicate whether to fetch all schemas from the schema registry service and sort them by ID. Set this value to true if you use schemas that refer to other schemas (schema references).

Type: bool

Default: true

Requires version 3.37.0 or newer

tls

Override system defaults with custom TLS settings.

Type: object

tls.enabled

Whether custom TLS settings are enabled.

Type: bool

Default: false

tls.skip_cert_verify

Whether to skip server-side certificate verification.

Type: bool

Default: false

tls.enable_renegotiation

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.

Type: bool

Default: false

Requires version 3.45.0 or newer

tls.root_cas

Specify a certificate authority to use (optional). This is a string that represents a certificate chain from the parent trusted root certificate, through possible intermediate signing certificates, to the host certificate.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples

root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----

tls.root_cas_file

Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent trusted root certificate, through possible intermediate signing certificates, to the host certificate.

Type: string

Default: ""

# Examples

root_cas_file: ./root_cas.pem

tls.client_certs

A list of client certificates to use. For each certificate specify values for either the cert and key fields, or cert_file and key_file fields.

Type: array

Default: []

# Examples

client_certs:
  - cert: foo
    key: bar

client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key

tls.client_certs[].cert

The plain text certificate to use.

Type: string

Default: ""

tls.client_certs[].key

The plain text certificate key to use.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

tls.client_certs[].cert_file

The path to the certificate file to use.

Type: string

Default: ""

tls.client_certs[].key_file

The path to the certificate key to use.

Type: string

Default: ""

tls.client_certs[].password

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding Oracle attacks that can let an attacker recover the plaintext.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples

password: foo

password: ${KEY_PASSWORD}

auto_replay_nacks

Whether to automatically replay messages that are rejected (nacked) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.

Type: bool

Default: true

oauth

Configure OAuth version 1.0 to give this component authorized access to your schema registry.

Type: object

oauth.enabled

Whether to use OAuth version 1 in requests.

Type: bool

Default: false

oauth.consumer_key

The value used to identify this component or client to your schema registry.

Type: string

Default: ""

oauth.consumer_secret

The secret used to establish ownership of the consumer key.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

oauth.access_token

The value this component can use to gain access to the data in the schema registry.

Type: string

Default: ""

oauth.access_token_secret

The secret that establishes ownership of the oauth.access_token.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

basic_auth

Configure basic authentication for requests from this component to your schema registry.

Type: object

basic_auth.enabled

Whether to use basic authentication in requests.

Type: bool

Default: false

basic_auth.username

The username of the account credentials to authenticate as.

Type: string

Default: ""

basic_auth.password

The password of the account credentials to authenticate with.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

jwt

BETA: Configure JSON Web Token (JWT) authentication for the secure transmission of data from your schema registry to this component.

Type: object

jwt.enabled

Whether to use JWT authentication in requests.

Type: bool

Default: false

jwt.private_key_file

A PEM-encoded file containing a private key that is formatted using either PKCS1 or PKCS8 standards.

Type: string

Default: ""

jwt.signing_method

The method used to sign the token, such as RS256, RS384, RS512 or EdDSA.

Type: string

Default: ""

jwt.claims

Values used to pass the identity of the authenticated entity to the service provider. In this case, between this component and the schema registry.

Type: object

Default: {}

jwt.headers

The key/value pairs that identify the type of token and signing algorithm.

Type: object

Default: {}