Use the Schema Registry API

Schemas provide human-readable documentation for an API. They verify that data conforms to an API, support the generation of serializers for data, and manage the compatibility of evolving APIs, allowing new versions of services to be rolled out independently.

The Schema Registry is built into Redpanda, and you can use it with the API or Redpanda Console. This section describes operations available in the Schema Registry API.

The Redpanda Schema Registry has API endpoints that allow you to perform the following tasks:

  • Register schemas for a subject. When data formats are updated, a new version of the schema can be registered under the same subject, allowing for backward and forward compatibility.

  • Retrieve schemas of specific versions.

  • Retrieve a list of subjects.

  • Retrieve a list of schema versions for a subject.

  • Configure schema compatibility checking.

  • Query supported serialization formats.

  • Delete schemas from the registry.

The following examples cover the basic functionality of the Redpanda Schema Registry based on an example Avro schema called sensor_sample. This schema contains fields that represent a measurement from a sensor for the value of the sensor topic, as defined below.

{
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "long"
    }
  ]
}

Prerequisites

To run the sample commands and code in each example, follow these steps to set up Redpanda and other tools:

  1. You need a running Redpanda cluster. If you don’t have one, you can follow the quickstart to deploy a self-hosted cluster.

    These examples assume that the Schema Registry is available locally at http://localhost:8081. If the Schema Registry is hosted on a different address or port in your cluster, change the URLs in the examples.

  2. Download the jq utility.

  3. Install curl or Python.

    You can also use rpk to interact with the Schema Registry. The rpk registry set of commands call the different API endpoints as shown in the curl and Python examples.

    If using Python, install the Requests module, then create an interactive Python session:

    import requests
    import json
    def pretty(text):
      print(json.dumps(text, indent=2))
    
    base_uri = "http://localhost:8081"

Query supported schema formats

To get the supported data serialization formats in the Schema Registry, make a GET request to the /schemas/types endpoint:

  • Curl

  • Python

curl -s "http://localhost:8081/schemas/types" | jq .
res = requests.get(f'{base_uri}/schemas/types').json()
pretty(res)

This returns the supported serialization formats:

[
  "PROTOBUF",
  "AVRO"
]

Register a schema

A schema is registered in the registry with a subject, which is a name that is associated with the schema as it evolves. Subjects are typically in the form <topic-name>-key or <topic-name>-value.

To register the sensor_sample schema, make a POST request to the /subjects/sensor-value/versions endpoint with the Content-Type application/vnd.schemaregistry.v1+json:

  • rpk

  • Curl

  • Python

rpk registry schema create sensor-value --schema ~/code/tmp/sensor_sample.avro
curl -s \
  -X POST \
  "http://localhost:8081/subjects/sensor-value/versions" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"}' \
  | jq
sensor_schema = {
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "long"
    }
  ]
}

res = requests.post(
    url=f'{base_uri}/subjects/sensor-value/versions',
    data=json.dumps({
      'schema': json.dumps(sensor_schema)
    }),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)

This returns the version id unique for the schema in the Redpanda cluster:

  • rpk

  • Curl

SUBJECT        VERSION  ID    TYPE
sensor-value   1        1     AVRO
{
  "id": 1
}

When you register an evolved schema for an existing subject, the version id is incremented by 1.

Retrieve a schema

To retrieve a registered schema from the registry, make a GET request to the /schemas/ids/<id> endpoint:

  • rpk

  • Curl

  • Python

rpk registry schema get --id 1
curl -s \
  "http://localhost:8081/schemas/ids/1" \
  | jq .
res = requests.get(f'{base_uri}/schemas/ids/1').json()
pretty(res)

The rpk output returns the subject and version, and the HTTP response returns the schema:

  • rpk

  • Curl

SUBJECT        VERSION  ID    TYPE
sensor-value   1        1     AVRO
{
  "schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"
}

List registry subjects

To list all registry subjects, make a GET request to the /subjects endpoint:

  • rpk

  • Curl

  • Python

rpk registry subject list --format json
curl -s \
  "http://localhost:8081/subjects" \
  | jq .
res = requests.get(f'{base_uri}/subjects').json()
pretty(res)

This returns the subject:

[
  "sensor-value"
]

Retrieve schema versions of a subject

To query the schema versions of a subject, make a GET request to the /subjects/<subject-name>/versions endpoint.

For example, to get the schema versions of the sensor-value subject:

  • Curl

  • Python

curl -s \
  "http://localhost:8081/subjects/sensor-value/versions" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions').json()
pretty(res)

This returns the version ID:

[
  1
]

Retrieve a schema of a subject

To retrieve a schema associated with a subject, make a GET request to the /subjects/<subject-name>/versions/<version-id> endpoint:

  • rpk

  • Curl

  • Python

rpk registry schema get sensor-value --schema-version 1
curl -s \
  "http://localhost:8081/subjects/sensor-value/versions/1" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/1').json()
pretty(res)

The rpk output returns the subject, and for HTTP requests, its associated schema as well:

  • rpk

  • Curl

SUBJECT        VERSION  ID    TYPE
sensor-value   1        1     AVRO
{
  "subject": "sensor-value",
  "id": 1,
  "version": 1,
  "schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"
}

To get the latest version, use latest as the version ID:

  • rpk

  • Curl

  • Python

rpk registry schema get sensor-value --schema-version latest
curl -s \
  "http://localhost:8081/subjects/sensor-value/versions/latest" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/latest').json()
pretty(res)

To get only the schema, append /schema to the endpoint path:

  • Curl

  • Python

curl -s \
  "http://localhost:8081/subjects/sensor-value/versions/latest/schema" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/latest/schema').json()
pretty(res)
{
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "long"
    }
  ]
}

Configure schema compatibility

As applications change and their schemas evolve, you may find that producer schemas and consumer schemas are no longer compatible. You decide how you want a consumer to handle data from a producer that uses an older or newer schema.

Applications are often modeled around a specific business object structure. As applications change and the shape of their data changes, producer schemas and consumer schemas may no longer be compatible. You can decide how a consumer handles data from a producer that uses an older or newer schema, and reduce the chance of consumers hitting deserialization errors.

You can configure different types of schema compatibility, which are applied to a subject when a new schema is registered. The Schema Registry supports the following compatibility types:

  • BACKWARD (default) - Consumers using the new schema (for example, version 10) can read data from producers using the previous schema (for example, version 9).

  • BACKWARD_TRANSITIVE - Consumers using the new schema (for example, version 10) can read data from producers using all previous schemas (for example, versions 1-9).

  • FORWARD - Consumers using the previous schema (for example, version 9) can read data from producers using the new schema (for example, version 10).

  • FORWARD_TRANSITIVE - Consumers using any previous schema (for example, versions 1-9) can read data from producers using the new schema (for example, version 10).

  • FULL - A new schema and the previous schema (for example, versions 10 and 9) are both backward and forward compatible with each other.

  • FULL_TRANSITIVE - Each schema is both backward and forward compatible with all registered schemas.

  • NONE - No schema compatibility checks are done.

Compatibility uses and constraints

  • A consumer that wants to read a topic from the beginning (for example, an AI learning process) benefits from backward compatibility. It can process the whole topic using the latest schema. This allows producers to remove fields and add attributes.

  • A real-time consumer that doesn’t care about historical events but wants to keep up with the latest data (for example, a typical streaming application) benefits from forward compatibility. Even if producers change the schema, the consumer can carry on.

  • Full compatibility can process historical data and future data. This is the safest option, but it limits the changes that can be done. This only allows for the addition and removal of optional fields.

If you make changes that are not inherently backward-compatible, you may need to change compatibility settings or plan a transitional period, updating producers and consumers to use the new schema while the old one is still accepted.

Backward-compatible tasks Not backward-compatible tasks

Avro

Add fields with default values

Make fields nullable

Remove fields

Change data types of fields

Change enum values

Change field constraints

Change record of field names

Protobuf

Add fields

Remove fields

Remove required fields

Change data types of fields

To set the compatibility type for a subject, make a PUT request to /config/<subject-name> with the specific compatibility type:

  • rpk

  • Curl

  • Python

rpk registry compatibility-level set sensor-value --level BACKWARD
curl -s \
  -X PUT \
  "http://localhost:8081/config/sensor-value" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}' \
  | jq .
res = requests.put(
    url=f'{base_uri}/config/sensor-value',
    data=json.dumps(
        {'compatibility': 'BACKWARD'}
      ),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)

This returns the new compatibility type:

  • rpk

  • Curl

SUBJECT        LEVEL     ERROR
sensor-value   BACKWARD
{
  "compatibility": "BACKWARD"
}

If you POST an incompatible schema change, the request returns an error. For example, if you try to register a new schema with the value field’s type changed from long to int, and compatibility is set to BACKWARD, the request returns an error due to incompatibility:

  • Curl

  • Python

curl -s \
  -X POST \
  "http://localhost:8081/subjects/sensor-value/versions" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"int\"}]}"}' \
  | jq
sensor_schema["fields"][2]["type"] = "int"

res = requests.post(
    url=f'{base_uri}/subjects/sensor-value/versions',
    data=json.dumps({
      'schema': json.dumps(sensor_schema)
    }),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)

The request returns this error:

{
  "error_code": 409,
  "message": "Schema being registered is incompatible with an earlier schema for subject \"{sensor-value}\""
}

For an example of a compatible change, register a schema with the value field’s type changed from long to double:

  • Curl

  • Python

curl -s \
  -X POST \
  "http://localhost:8081/subjects/sensor-value/versions" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"double\"}]}"}' \
  | jq
sensor_schema["fields"][2]["type"] = "double"

res = requests.post(
    url=f'{base_uri}/subjects/sensor-value/versions',
    data=json.dumps({
      'schema': json.dumps(sensor_schema)
    }),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)

A successful registration returns the schema’s id:

{
  "id": 2
}

Reference a schema

To build more complex schema definitions, you can add a reference to other schemas. The following example registers a Protobuf schema in subject test-simple with a message name Simple.

  • rpk

  • Curl

rpk registry schema create test-simple --schema simple.proto
SUBJECT        VERSION  ID    TYPE
test-simple    1        2     PROTOBUF
curl -X POST -H 'Content-type: application/vnd.schemaregistry.v1+json' http://127.0.0.1:8081/subjects/test-simple/versions -d '{"schema": "syntax = \"proto3\";\nmessage Simple {\n  string id = 1;\n}","schemaType": "PROTOBUF"}'
{"id":2}

This schema is then referenced in a new schema in a different subject named import.

  • rpk

  • Curl

# --references flag takes the format {name}:{subject}:{schema version}
rpk registry schema create import --schema import_schema.proto --references simple:test-simple:2
SUBJECT        VERSION  ID    TYPE
import         1        3     PROTOBUF
curl -X POST -H 'Content-type: application/vnd.schemaregistry.v1+json' http://127.0.0.1:8081/subjects/import/versions -d '{"schema": "syntax = \"proto3\";\nimport \"simple\";\nmessage Test3 {\n  Simple id = 1;\n}","schemaType": "PROTOBUF", "references": [{"name": "simple", "subject": "test-simple", "version":1}]}'
{"id":3}

You cannot delete a schema when it is used as a reference.

  • rpk

  • Curl

rpk registry schema delete test-simple --schema-version 1
One or more references exist to the schema {magic=1,keytype=SCHEMA,subject=test-simple,version=1}
curl -X DELETE -H 'Content-type: application/vnd.schemaregistry.v1+json' http://127.0.0.1:8081/subjects/test-simple/versions/1
{"error_code":42206,"message":"One or more references exist to the schema {magic=1,keytype=SCHEMA,subject=test-simple,version=1}"}

Call the /subjects/test-simple/versions/1/referencedby endpoint to see the schema IDs that reference version 1 for subject test-simple.

  • rpk

  • Curl

rpk registry schema references test-simple --schema-version 1
SUBJECT  VERSION  ID    TYPE
import   1        3     PROTOBUF
curl -H 'Content-type: application/vnd.schemaregistry.v1+json' http://127.0.0.1:8081/subjects/test-simple/versions/1/referencedby
[3]

Delete a schema

The Schema Registry API provides DELETE endpoints for deleting a single schema or all schemas of a subject:

  • /subjects/<subject>/versions/<version>

  • /subjects/<subject>

Schemas cannot be deleted if any other schemas reference it.

A schema can be soft deleted (impermanently) or hard deleted (permanently), based on the boolean query parameter permanent. A soft deleted schema can be retrieved and re-registered. A hard deleted schema cannot be recovered.

Soft delete a schema

To soft delete a schema, make a DELETE request with the subject and version ID (where permanent=false is the default parameter value):

  • rpk

  • Curl

  • Python

rpk registry schema delete sensor-value --schema-version 1
curl -s \
  -X DELETE \
  "http://localhost:8081/subjects/sensor-value/versions/1" \
  | jq .
res = requests.delete(f'{base_uri}/subjects/sensor-value/versions/1').json()
pretty(res)

This returns the ID of the soft deleted schema:

  • rpk

  • Curl

Successfully deleted schema. Subject: "sensor-value", version: "1"
1

Doing a soft delete for an already deleted schema returns an error:

  • rpk

  • Curl

Subject 'sensor-value' Version 1 was soft deleted. Set permanent=true to delete permanently
{
  "error_code": 40406,
  "message": "Subject 'sensor-value' Version 1 was soft deleted.Set permanent=true to delete permanently"
}

To list subjects of soft-deleted schemas, make a GET request with the deleted parameter set to true, /subjects?deleted=true:

  • rpk

  • Curl

  • Python

rpk registry subject list --deleted
curl -s \
  "http://localhost:8081/subjects?deleted=true" \
  | jq .
payload = { 'deleted' : 'true' }
res = requests.get(f'{base_uri}/subjects', params=payload).json()
pretty(res)

This returns all subjects, including deleted ones:

[
  "sensor-value"
]

To undo a soft deletion, first follow the steps to retrieve the schema, then register the schema.

Hard delete a schema

Redpanda doesn’t recommend hard (permanently) deleting schemas in a production system.

The DELETE APIs are primarily used during the development phase, when schemas are being iterated and revised.

To hard delete a schema, use the --permanent flag with the rpk registry schema delete command, or for curl or Python, make two DELETE requests with the second request setting the permanent parameter to true (/subjects/<subject>/versions/<version>?permanent=true):

  • rpk

  • Curl

  • Python

rpk registry schema delete sensor-value --schema-version 1  --permanent
curl -s \
  -X DELETE \
  "http://localhost:8081/subjects/sensor-value/versions/1" \
  | jq .
curl -s \
  -X DELETE \
  "http://localhost:8081/subjects/sensor-value/versions/1?permanent=true" \
  | jq .
res = requests.delete(f'{base_uri}/subjects/sensor-value/versions/1').json()
pretty(res)
payload = { 'permanent' : 'true' }
res = requests.delete(f'{base_uri}/subjects/sensor-value/versions/1', params=payload).json()
pretty(res)

Each request returns the version ID of the deleted schema:

  • rpk

  • Curl

Successfully deleted schema. Subject: "sensor-value", version: "1"
1
1

A request for a hard-deleted schema returns an error:

  • rpk

  • Curl

Subject 'sensor-value' not found.
{
  "error_code": 40401,
  "message": "Subject 'sensor-value' not found."
}