Use the Schema Registry API
Schemas provide human-readable documentation for an API. They verify that data conforms to an API, support the generation of serializers for data, and manage the compatibility of evolving APIs, allowing new versions of services to be rolled out independently.
The Schema Registry is built into Redpanda, and you can use it with the API or the Redpanda Console UI. This section describes operations available in the Schema Registry API. See also: |
The Redpanda Schema Registry has API endpoints that allow you to perform the following tasks:
-
Register schemas for a subject. A subject is a logical grouping or category for schemas. When data formats are updated, a new version of the schema can be registered under the same subject, allowing for backward and forward compatibility.
-
Retrieve schemas of specific versions.
-
Retrieve a list of subjects.
-
Retrieve a list of schema versions for a subject.
-
Configure schema compatibility checking.
-
Query supported serialization formats.
-
Delete schemas from the registry.
The following examples cover the basic functionality of the Redpanda Schema Registry based on an example Avro schema called sensor_sample
. This schema contains fields that represent a measurement from a sensor for the value of the sensor
topic, as defined below.
{
"type": "record",
"name": "sensor_sample",
"fields": [
{
"name": "timestamp",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "identifier",
"type": "string",
"logicalType": "uuid"
},
{
"name": "value",
"type": "long"
}
]
}
Prerequisites
To run the sample commands and code in each example, follow these steps to set up Redpanda and other tools:
-
You need a running Redpanda cluster. If you don’t have one, see the Redpanda Quickstart.
These examples assume that the Schema Registry is available locally at
http://localhost:8081
. If the Schema Registry is hosted on a different address or port in your cluster, change the URLs in the examples. -
Download the jq utility.
-
If using Python, install the Requests module, then create an interactive Python session:
import requests import json def pretty(text): print(json.dumps(text, indent=2)) base_uri = "http://localhost:8081"
Query supported schema formats
To get the supported data serialization formats in the Schema Registry, make a GET request to the /schemas/types
endpoint:
-
Curl
-
Python
curl -s "http://localhost:8081/schemas/types" | jq .
res = requests.get(f'{base_uri}/schemas/types').json()
pretty(res)
If the request is successful, it returns the supported serialization formats:
[ "PROTOBUF", "AVRO" ]
Register a schema
A schema is registered in the registry with a subject, which is a name that is associated with the schema as it evolves. Subjects are typically in the form <topic-name>-key
or <topic-name>-value
.
To register the sensor_sample
schema, make a POST request to the /subjects/sensor-value/versions
endpoint with the Content-Type application/vnd.schemaregistry.v1+json
:
-
Curl
-
Python
curl -s \
-X POST \
"http://localhost:8081/subjects/sensor-value/versions" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"}' \
| jq
sensor_schema = {
"type": "record",
"name": "sensor_sample",
"fields": [
{
"name": "timestamp",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "identifier",
"type": "string",
"logicalType": "uuid"
},
{
"name": "value",
"type": "long"
}
]
}
res = requests.post(
url=f'{base_uri}/subjects/sensor-value/versions',
data=json.dumps({
'schema': json.dumps(sensor_schema)
}),
headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)
If the request is successful, it returns a version id
unique for the schema in the Redpanda cluster:
{
"id": 1
}
When you register an evolved schema for an existing subject, the version id
is incremented by 1.
Retrieve a schema
To retrieve a registered schema from the registry, make a GET request to the /schemas/ids/<id>
endpoint:
-
Curl
-
Python
curl -s \
"http://localhost:8081/schemas/ids/1" \
| jq .
res = requests.get(f'{base_uri}/schemas/ids/1').json()
pretty(res)
If the request is successful, it returns the schema:
{
"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"
}
List registry subjects
To list all registry subjects, make a GET request to the /subjects
endpoint:
-
Curl
-
Python
curl -s \
"http://localhost:8081/subjects" \
| jq .
res = requests.get(f'{base_uri}/subjects').json()
pretty(res)
If the request is successful, it returns the subject:
[
"sensor-value"
]
Retrieve schema versions of a subject
To query the schema versions of a subject, make a GET request to the /subjects/<subject-name>/versions
endpoint.
For example, to get the schema versions of the sensor-value
subject:
-
Curl
-
Python
curl -s \
"http://localhost:8081/subjects/sensor-value/versions" \
| jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions').json()
pretty(res)
If the request is successful, it returns the version ID:
[
1
]
Retrieve a schema of a subject
To retrieve a schema associated with a subject, make a GET request to the /subjects/<subject-name>/versions/<version-id>
endpoint:
-
Curl
-
Python
curl -s \
"http://localhost:8081/subjects/sensor-value/versions/1" \
| jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/1').json()
pretty(res)
If the request is successful, it returns the subject and its associated schema:
{
"subject": "sensor-value",
"id": 1,
"version": 1,
"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"
}
To get the latest version, use latest
as the version ID:
-
Curl
-
Python
curl -s \
"http://localhost:8081/subjects/sensor-value/versions/latest" \
| jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/latest').json()
pretty(res)
To get only the schema, append /schema
to the endpoint path:
-
Curl
-
Python
curl -s \
"http://localhost:8081/subjects/sensor-value/versions/latest/schema" \
| jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/latest/schema').json()
pretty(res)
If the request is successful, it returns the schema:
{
"type": "record",
"name": "sensor_sample",
"fields": [
{
"name": "timestamp",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "identifier",
"type": "string",
"logicalType": "uuid"
},
{
"name": "value",
"type": "long"
}
]
}
Configure schema compatibility
As applications change and their schemas evolve, you may find that producer schemas and consumer schemas are no longer compatible. You decide how you want a consumer to handle data from a producer that uses an older or newer schema.
Applications are often modeled around a specific business object structure. As applications change and the shape of their data changes, producer schemas and consumer schemas may no longer be compatible. You can decide how a consumer handles data from a producer that uses an older or newer schema, and reduce the chance of consumers hitting deserialization errors.
You can configure different types of schema compatibility, which are applied to a subject when a new schema is registered. The Schema Registry supports the following compatibility types:
-
BACKWARD
(default) - Consumers using the new schema (for example, version 10) can read data from producers using the previous schema (for example, version 9). -
BACKWARD_TRANSITIVE
- Consumers using the new schema (for example, version 10) can read data from producers using all previous schemas (for example, versions 1-9). -
FORWARD
- Consumers using the previous schema (for example, version 9) can read data from producers using the new schema (for example, version 10). -
FORWARD_TRANSITIVE
- Consumers using any previous schema (for example, versions 1-9) can read data from producers using the new schema (for example, version 10). -
FULL
- A new schema and the previous schema (for example, versions 10 and 9) are both backward and forward compatible with each other. -
FULL_TRANSITIVE
- Each schema is both backward and forward compatible with all registered schemas. -
NONE
- No schema compatibility checks are done.
Compatibility uses and constraints
-
A consumer that wants to read a topic from the beginning (for example, an AI learning process) benefits from backward compatibility. It can process the whole topic using the latest schema. This allows producers to remove fields and add attributes.
-
A real-time consumer that doesn’t care about historical events but wants to keep up with the latest data (for example, a typical streaming application) benefits from forward compatibility. Even if producers change the schema, the consumer can carry on.
-
Full compatibility can process historical data and future data. This is the safest option, but it limits the changes that can be done. This only allows for the addition and removal of optional fields.
If you make changes that are not inherently backward-compatible, you may need to change compatibility settings or plan a transitional period, updating producers and consumers to use the new schema while the old one is still accepted.
Backward-compatible tasks | Not backward-compatible tasks | |
---|---|---|
Avro |
Add fields with default values Make fields nullable |
Remove fields Change data types of fields Change enum values Change field constraints Change record of field names |
Protobuf |
Add fields Remove fields |
Remove required fields Change data types of fields |
To set the compatibility type for a subject, make a PUT request to /config/<subject-name>
with the specific compatibility type:
-
Curl
-
Python
curl -s \
-X PUT \
"http://localhost:8081/config/sensor-value" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}' \
| jq .
res = requests.put(
url=f'{base_uri}/config/sensor-value',
data=json.dumps(
{'compatibility': 'BACKWARD'}
),
headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)
If the request is successful, it returns the new compatibility type:
{
"compatibility": "BACKWARD"
}
If you POST an incompatible schema change, the request returns an error. For example, if you try to register a new schema with the value
field’s type changed from long
to int
, and compatibility is set to BACKWARD
, the request returns an error due to incompatibility:
-
Curl
-
Python
curl -s \
-X POST \
"http://localhost:8081/subjects/sensor-value/versions" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"int\"}]}"}' \
| jq
sensor_schema["fields"][2]["type"] = "int"
res = requests.post(
url=f'{base_uri}/subjects/sensor-value/versions',
data=json.dumps({
'schema': json.dumps(sensor_schema)
}),
headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)
The request returns this error:
{
"error_code": 409,
"message": "Schema being registered is incompatible with an earlier schema for subject \"{sensor-value}\""
}
For an example of a compatible change, register a schema with the value
field’s type changed from long
to double
:
-
Curl
-
Python
curl -s \
-X POST \
"http://localhost:8081/subjects/sensor-value/versions" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"double\"}]}"}' \
| jq
sensor_schema["fields"][2]["type"] = "double"
res = requests.post(
url=f'{base_uri}/subjects/sensor-value/versions',
data=json.dumps({
'schema': json.dumps(sensor_schema)
}),
headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)
A successful registration returns the schema’s id
:
{
"id": 2
}
Reference a schema
To build more complex schema definitions, you can add a reference to other schemas. The following example adds schema references and shows that the schema cannot be deleted when it includes references.
curl -X POST -H 'Content-type: application/vnd.schemaregistry.v1+json' http://127.0.0.1:8081/subjects/simple/versions -d '{"schema": "syntax = \"proto3\";\nmessage Simple {\n string id = 1;\n}","schemaType": "PROTOBUF"}'
{"id":2}
curl -X POST -H 'Content-type: application/vnd.schemaregistry.v1+json' http://127.0.0.1:8081/subjects/import/versions -d '{"schema": "syntax = \"proto3\";\nimport \"simple\";\nmessage Test3 {\n Simple id = 1;\n}","schemaType": "PROTOBUF", "references": [{"name": "simple", "subject": "simple", "version":1}]}'
{"id":3}
curl -X DELETE -H 'Content-type: application/vnd.schemaregistry.v1+json' http://127.0.0.1:8081/subjects/simple/versions/1
{"error_code":42206,"message":"One or more references exist to the schema {magic=1,keytype=SCHEMA,subject=simple,version=1}"}
curl -H 'Content-type: application/vnd.schemaregistry.v1+json' http://127.0.0.1:8081/subjects/simple/versions/1/referencedby
[3]
Delete a schema
The Schema Registry API provides DELETE endpoints for deleting a single schema or all schemas of a subject:
-
/subjects/<subject>/versions/<version>
-
/subjects/<subject>
Schemas cannot be deleted if any other schemas reference it.
A schema can be soft deleted (impermanently) or hard deleted (permanently), based on the boolean query parameter permanent
. A soft deleted schema can be retrieved and re-registered. A hard deleted schema cannot be recovered.
Soft delete a schema
To soft delete a schema, make a DELETE request with the subject and version ID (where permanent=false
is the default parameter value):
-
Curl
-
Python
curl -s \
-X DELETE \
"http://localhost:8081/subjects/sensor-value/versions/1" \
| jq .
res = requests.delete(f'{base_uri}/subjects/sensor-value/versions/1').json()
pretty(res)
If the request is successful, it returns the ID of the soft deleted schema:
1
Doing a soft delete for an already deleted schema returns an error. For example:
{
"error_code": 40406,
"message": "Subject 'sensor-value' Version 1 was soft deleted.Set permanent=true to delete permanently"
}
To list subjects of soft-deleted schemas, make a GET request with the deleted
parameter set to true
, /subjects?deleted=true
:
-
Curl
-
Python
curl -s \
"http://localhost:8081/subjects?deleted=true" \
| jq .
payload = { 'deleted' : 'true' }
res = requests.get(f'{base_uri}/subjects', params=payload).json()
pretty(res)
If the request is successful, it returns all subjects, including deleted ones:
[
"sensor-value"
]
To undo a soft deletion, first follow the steps to retrieve the schema, then register the schema.
Hard delete a schema
Redpanda doesn’t recommend hard (permanently) deleting schemas in a production system. The DELETE APIs are primarily used during the development phase, when schemas are being iterated and revised. |
To hard delete a schema, make two DELETE requests with the second request setting the permanent
parameter to true
, /subjects/<subject>/versions/<version>?permanent=true
:
-
Curl
-
Python
curl -s \
-X DELETE \
"http://localhost:8081/subjects/sensor-value/versions/1" \
| jq .
curl -s \
-X DELETE \
"http://localhost:8081/subjects/sensor-value/versions/1?permanent=true" \
| jq .
res = requests.delete(f'{base_uri}/subjects/sensor-value/versions/1').json()
pretty(res)
payload = { 'permanent' : 'true' }
res = requests.delete(f'{base_uri}/subjects/sensor-value/versions/1', params=payload).json()
pretty(res)
If the requests are successful, each request returns the version ID of the deleted schema:
1
1
A request for a hard-deleted schema returns an error. For example:
{
"error_code": 40401,
"message": "Subject 'sensor-value' not found."
}