Schema Registry

Redpanda’s Schema Registry provides the interface to store and manage event schemas. Producers and consumers register and retrieve the schemas they use from the registry. Schemas are versioned, and the registry supports configurable compatibility modes between schema versions. When a producer or a consumer makes a request to register a schema change, the registry checks for schema compatibility and returns an error for an incompatible change.

The Schema Registry has API endpoints that allow you to perform the following tasks:

  • Query supported serialization formats.

  • Register schemas for a subject.

  • Retrieve schemas of specific versions.

  • Retrieve a list of subjects.

  • Retrieve a list of schema versions for a subject.

  • Configure schema compatibility checking.

  • Delete schemas from the registry.

Schema

A schema is an external mechanism to describe the structure of data and its encoding. Using schemas provides multiple benefits:

  • Enables a loosely coupled, data-centric architecture that minimizes dependencies in code, between teams, and between producers and consumers.

  • Provides human-readable documentation for an API.

  • Verifies that data conforms to an API.

  • Supports the generation of serializers for data.

  • Manages the compatibility of evolving APIs, allowing new versions of services to be rolled out independently.

Serialization format

A data serialization format defines how data is converted to the bits that are stored and transmitted. Redpanda’s Schema Registry supports the following data serialization formats for its schemas:

  • Avro

  • Protobuf

Redpanda design overview

Redpanda built the Schema Registry directly into the Redpanda binary. It runs out-of-the-box with Redpanda’s default configuration, and it requires no new binaries to install and no new services to deploy or maintain.

Every broker allows mutating REST calls, so there is no need to configure leadership or failover strategies. Schemas are stored in a compacted topic, and the registry uses optimistic concurrency control at the topic level to detect and avoid collisions.

The Schema Registry publishes record metadata to an internal topic, _schemas, as its backend store. By default, _schemas is protected from deletion and configuration changes by Kafka clients (see the kafka_nodelete_topics cluster property).

Use Schema Registry

The following examples cover the basic functionality of the Redpanda Schema Registry based on an example Avro schema called sensor_sample. This schema contains fields that represent a measurement from a sensor for the value of the sensor topic, as defined below.

{
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "long"
    }
  ]
}

Prerequisites

To run the sample commands and code in each example, follow these steps to set up Redpanda and other tools:

  1. Download the jq utility.

  2. Install curl or Python.

    • If using Python, install the Requests module, then create an interactive Python session:

      import requests
      import json
      def pretty(text):
        print(json.dumps(text, indent=2))
      
      base_uri = "http://localhost:8081"
  3. Install Redpanda with Docker.

  4. Start Redpanda with Docker:

    docker network create redpanda-sr
    docker volume create redpanda-sr
    docker run \
    --pull=always \
    --name=redpanda-sr \
    --net=redpanda-sr \
    -v "redpanda-sr:/var/lib/redpanda/data" \
    -p 8081:8081 \
    -p 8082:8082 \
    -p 9092:9092 \
    --detach \
    docker.redpanda.com/redpandadata/redpanda start \
    --overprovisioned \
    --smp 1 \
    --memory 1G \
    --reserve-memory 0M \
    --node-id 0 \
    --check=false \
    --pandaproxy-addr 0.0.0.0:8082 \
    --advertise-pandaproxy-addr 127.0.0.1:8082 \
    --kafka-addr 0.0.0.0:9092 \
    --advertise-kafka-addr redpanda-sr:9092

After Redpanda is running, endpoints are documented at http://localhost:8081/v1.

Query supported schema formats

To get the supported data serialization formats in the Schema Registry, make a GET request to the /schemas/types endpoint:

  • Curl

  • Python

curl -s "http://localhost:8081/schemas/types" | jq .
res = requests.get(f'{base_uri}/schemas/types').json()
pretty(res)

If the request is successful, it returns the supported serialization formats:

[
  "PROTOBUF",
  "AVRO"
]

Register a schema

A schema is registered in the registry with a subject, which is a name that is associated with the schema as it evolves. Subjects are typically in the form <topic-name>-key or <topic-name>-value.

To register the sensor_sample schema, make a POST request to the /subjects/sensor-value/versions endpoint with the Content-Type application/vnd.schemaregistry.v1+json:

  • Curl

  • Python

curl -s \
  -X POST \
  "http://localhost:8081/subjects/sensor-value/versions" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"}' \
  | jq
sensor_schema = {
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "long"
    }
  ]
}

res = requests.post(
    url=f'{base_uri}/subjects/sensor-value/versions',
    data=json.dumps({
      'schema': json.dumps(sensor_schema)
    }),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)

If the request is successful, it returns a version id unique for the schema in the Redpanda cluster:

{
  "id": 1
}

When you register an evolved schema for an existing subject, the version id is incremented by 1.

Retrieve a schema

To retrieve a registered schema from the registry, make a GET request to the /schemas/ids/<id> endpoint:

  • Curl

  • Python

curl -s \
  "http://localhost:8081/schemas/ids/1" \
  | jq .
res = requests.get(f'{base_uri}/schemas/ids/1').json()
pretty(res)

If the request is successful, it returns the schema:

{
  "schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"
}

List registry subjects

To list all registry subjects, make a GET request to the /subjects endpoint:

  • Curl

  • Python

curl -s \
  "http://localhost:8081/subjects" \
  | jq .
res = requests.get(f'{base_uri}/subjects').json()
pretty(res)

If the request is successful, it returns the subject:

[
  "sensor-value"
]

Retrieve schema versions of a subject

To query the schema versions of a subject, make a GET request to the /subjects/<subject-name>/versions endpoint.

For example, to get the schema versions of the sensor-value subject:

  • Curl

  • Python

curl -s \
  "http://localhost:8081/subjects/sensor-value/versions" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions').json()
pretty(res)

If the request is successful, it returns the version ID:

[
  1
]

Retrieve a schema of a subject

To retrieve a schema associated with a subject, make a GET request to the /subjects/<subject-name>/versions/<version-id> endpoint:

  • Curl

  • Python

curl -s \
  "http://localhost:8081/subjects/sensor-value/versions/1" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/1').json()
pretty(res)

If the request is successful, it returns the subject and its associated schema:

{
  "subject": "sensor-value",
  "id": 1,
  "version": 1,
  "schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"
}

To get the latest version, use latest as the version ID:

  • Curl

  • Python

curl -s \
  "http://localhost:8081/subjects/sensor-value/versions/latest" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/latest').json()
pretty(res)

To get only the schema, append /schema to the endpoint path:

  • Curl

  • Python

curl -s \
  "http://localhost:8081/subjects/sensor-value/versions/latest/schema" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/latest/schema').json()
pretty(res)

If the request is successful, it returns the schema:

{
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "long"
    }
  ]
}

Configure schema compatibility

As applications change and their schemas evolve, you may find that producer schemas and consumer schemas are no longer compatible. You decide how you want a consumer to handle data from a producer that uses an older or newer schema.

You can configure different types of schema compatibility, which are applied to a subject when a new schema is registered. The Schema Registry supports the following compatibility types:

  • BACKWARD (default) - Consumers using the new schema (for example, version 10) can read data from producers using the previous schema (for example, version 9).

  • BACKWARD_TRANSITIVE - Consumers using the new schema (for example, version 10) can read data from producers using all previous schemas (for example, versions 1-9).

  • FORWARD - Consumers using the previous schema (for example, version 9) can read data from producers using the new schema (for example, version 10).

  • FORWARD_TRANSITIVE - Consumers using any previous schema (for example, versions 1-9) can read data from producers using the new schema (for example, version 10).

  • FULL - A new schema and the previous schema (for example, versions 10 and 9) are both backward and forward compatible with each other.

  • FULL_TRANSITIVE - Each schema is both backward and forward compatible with all registered schemas.

  • NONE - No schema compatibility checks are done.

To set the compatibility type for a subject, make a PUT request to /config/<subject-name> with the specific compatibility type:

  • Curl

  • Python

curl -s \
  -X PUT \
  "http://localhost:8081/config/sensor-value" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}' \
  | jq .
res = requests.put(
    url=f'{base_uri}/config/sensor-value',
    data=json.dumps(
        {'compatibility': 'BACKWARD'}
      ),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)

If the request is successful, it returns the new compatibility type:

{
  "compatibility": "BACKWARD"
}

If you POST an incompatible schema change, the request returns an error. For example, if you try to register a new schema with the value field’s type changed from long to int, and compatibility is set to BACKWARD, the request returns an error due to incompatibility:

  • Curl

  • Python

curl -s \
  -X POST \
  "http://localhost:8081/subjects/sensor-value/versions" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"int\"}]}"}' \
  | jq
sensor_schema["fields"][2]["type"] = "int"

res = requests.post(
    url=f'{base_uri}/subjects/sensor-value/versions',
    data=json.dumps({
      'schema': json.dumps(sensor_schema)
    }),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)

The request returns this error:

{
  "error_code": 409,
  "message": "Schema being registered is incompatible with an earlier schema for subject \"{sensor-value}\""
}

For an example of a compatible change, register a schema with the value field’s type changed from long to double:

  • Curl

  • Python

curl -s \
  -X POST \
  "http://localhost:8081/subjects/sensor-value/versions" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"double\"}]}"}' \
  | jq
sensor_schema["fields"][2]["type"] = "double"

res = requests.post(
    url=f'{base_uri}/subjects/sensor-value/versions',
    data=json.dumps({
      'schema': json.dumps(sensor_schema)
    }),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)

A successful registration returns the schema’s id:

{
  "id": 2
}

Delete a schema

The Schema Registry API provides DELETE endpoints for deleting a single schema or all schemas of a subject:

  • /subjects/<subject>/versions/<version>

  • /subjects/<subject>

A schema can be soft deleted (impermanently) or hard deleted (permanently), based on the boolean query parameter permanent. A soft deleted schema can be retrieved and re-registered. A hard deleted schema cannot be recovered.

Soft delete a schema

To soft delete a schema, make a DELETE request with the subject and version ID (where permanent=false is the default parameter value):

  • Curl

  • Python

curl -s \
  -X DELETE \
  "http://localhost:8081/subjects/sensor-value/versions/1" \
  | jq .
res = requests.delete(f'{base_uri}/subjects/sensor-value/versions/1').json()
pretty(res)

If the request is successful, it returns the ID of the soft deleted schema:

1

Doing a soft delete for an already deleted schema returns an error. For example:

{
  "error_code": 40406,
  "message": "Subject 'sensor-value' Version 1 was soft deleted.Set permanent=true to delete permanently"
}

To list subjects of soft-deleted schemas, make a GET request with the deleted parameter set to true, /subjects?deleted=true:

  • Curl

  • Python

curl -s \
  "http://localhost:8081/subjects?deleted=true" \
  | jq .
payload = { 'deleted' : 'true' }
res = requests.get(f'{base_uri}/subjects', params=payload).json()
pretty(res)

If the request is successful, it returns all subjects, including deleted ones:

[
  "sensor-value"
]

To undo a soft deletion, first follow the steps to retrieve the schema, then register the schema.

Hard delete a schema

Redpanda doesn’t recommend hard (permanently) deleting schemas in a production system.

The DELETE APIs are primarily used during the development phase, when schemas are being iterated and revised.

To hard delete a schema, make two DELETE requests with the second request setting the permanent parameter to true, /subjects/<subject>/versions/<version>?permanent=true:

  • Curl

  • Python

curl -s \
  -X DELETE \
  "http://localhost:8081/subjects/sensor-value/versions/1" \
  | jq .
curl -s \
  -X DELETE \
  "http://localhost:8081/subjects/sensor-value/versions/1?permanent=true" \
  | jq .
res = requests.delete(f'{base_uri}/subjects/sensor-value/versions/1').json()
pretty(res)
payload = { 'permanent' : 'true' }
res = requests.delete(f'{base_uri}/subjects/sensor-value/versions/1', params=payload).json()
pretty(res)

If the requests are successful, each request returns the version ID of the deleted schema:

1
1

A request for a hard-deleted schema returns an error. For example:

{
  "error_code": 40401,
  "message": "Subject 'sensor-value' not found."
}

Cleanup

When you’re finished, stop and remove the containers in Docker running Redpanda:

docker stop redpanda-sr
docker rm redpanda-sr
docker volume remove redpanda-sr
docker network remove redpanda-sr