schema_registry

Reads schemas from a schema registry. You can use this connector to extract and backup schemas during a data migration.

  • Common

  • Advanced

# Common configuration fields, showing default values
input:
  label: ""
  schema_registry:
    url: "" # No default (required)
    auto_replay_nacks: true
# All configuration fields, showing default values
input:
  label: ""
  schema_registry:
    url: "" # No default (required)
    include_deleted: false
    subject_filter: ""
    tls:
      enabled: false
      skip_cert_verify: false
      enable_renegotiation: false
      root_cas: ""
      root_cas_file: ""
      client_certs: []
    auto_replay_nacks: true

Metadata

The schema_registry input adds the following metadata fields to each message:

- schema_registry_subject
- schema_registry_version

You can access these metadata fields using function interpolation.

Example

This example reads all schemas from a schema registry that are associated with subjects matching the ^foo.* filter, including deleted schemas.

input:
  schema_registry:
    url: http://localhost:8081
    include_deleted: true
    subject_filter: ^foo.*

Fields

url

The base URL of the schema registry service.

Type: string

include_deleted

Include deleted entities.

Type: bool

Default: false

subject_filter

Include only subjects which match the regular expression filter, or leave this field value blank to select all subjects.

Type: string

Default: ""

tls

Override system defaults with custom TLS settings.

Type: object

tls.enabled

Whether custom TLS settings are enabled.

Type: bool

Default: false

tls.skip_cert_verify

Whether to skip server side certificate verification.

Type: bool

Default: false

tls.enable_renegotiation

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.

Type: bool

Default: false

tls.root_cas

Specify a certificate authority to use (optional). This is a string that represents a certificate chain from the parent trusted root certificate, through possible intermediate signing certificates, to the host certificate.

This field contains sensitive information. Review your cluster security before adding it to your configuration.

Type: string

Default: ""

# Examples

root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----

tls.root_cas_file

Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent trusted root certificate, through possible intermediate signing certificates, to the host certificate.

Type: string

Default: ""

# Examples

root_cas_file: ./root_cas.pem

tls.client_certs

A list of client certificates to use. For each certificate specify values for either the cert and key fields, or cert_file and key_file fields.

Type: array

Default: []

# Examples

client_certs:
  - cert: foo
    key: bar

client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key

tls.client_certs[].cert

The plain text certificate to use.

Type: string

Default: ""

tls.client_certs[].key

The plain text certificate key to use.

This field contains sensitive information. Review your cluster security before adding it to your configuration.

Type: string

Default: ""

tls.client_certs[].cert_file

The path to the certificate file to use.

Type: string

Default: ""

tls.client_certs[].key_file

The path to the certificate key to use.

Type: string

Default: ""

tls.client_certs[].password

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding Oracle attacks that can let an attacker recover the plaintext.

This field contains sensitive information. Review your cluster security before adding it to your configuration.

Type: string

Default: ""

# Examples

password: foo

password: ${KEY_PASSWORD}

auto_replay_nacks

Whether to automatically replay messages that are rejected (nacked) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.

Type: bool

Default: true