Cloud

CREATE TABLE

The CREATE TABLE statement maps a Redpanda topic to a SQL table through a catalog. After creating the table, you can query the topic using standard SQL.

You must first create a Redpanda catalog connection before creating tables. CREATE TABLE in Redpanda SQL maps Redpanda topics to SQL tables and does not create standalone tables with user-defined schemas.

Syntax

CREATE TABLE [IF NOT EXISTS] catalog_name=>table_name
WITH (option = 'value' [, ...]);
  • catalog_name: Name of an existing Redpanda catalog.

  • table_name: Name for the new table.

  • IF NOT EXISTS: Optional. Prevents an error if a table with the same name already exists in the catalog.

Options

Option Type Required Description

topic

STRING

Yes

Name of the Redpanda topic to map to this table.

schema_subject

STRING

No

Schema Registry subject name to use for deserializing topic data. Defaults to the topic-name strategy (<topic>-value).

schema_lookup_policy

STRING

No

How to resolve the schema version. LATEST is the only supported value.

error_handling_policy

STRING

No

How to handle records that fail deserialization.

  • FAIL (default): Raises an error.

  • FILL_NULL: Replaces failed fields with NULL.

  • DROP_RECORD: Skips the record.

struct_mapping_policy

STRING

No

How to map nested structures from the topic schema to SQL columns.

  • COMPOUND (default): Maps each nested structure to a user-defined type with named fields, queryable using (column).field_name syntax. Cyclic types are not supported in COMPOUND mode. Use JSON for recursive schemas. See row for the field-access syntax.

  • JSON: Stores each nested structure as a JSON value. Required for recursive (cyclic) types.

output_schema_message_full_name

STRING

No

Full Protobuf message name. Required when the schema contains multiple message definitions.

confluent_wire_protocol

STRING

No

Whether records on the topic are encoded with the Confluent Schema Registry wire format (a magic byte followed by a 4-byte schema ID before the payload).

  • 'true' (default): Records carry the Confluent wire-format prefix. Use this for topics whose values were produced by a Schema-Registry-aware client.

  • 'false': Records are raw Protobuf or Avro without the wire-format prefix.

Only valid when schema_lookup_policy = 'LATEST'.

Auto-added columns

Every catalog-mapped table includes two struct columns in addition to the columns derived from the topic’s schema. Redpanda SQL adds these columns to both Kafka-backed and Iceberg-backed tables. The names redpanda and redpanda_raw are reserved. A topic schema cannot define columns with these names.

redpanda

Contains Kafka record metadata. Always present on every row.

Field Type Nullable Description

partition

int

No

Partition the record was read from.

offset

bigint

No

Offset of the record within its partition.

timestamp

timestamp with time zone

Yes

Record timestamp.

headers

Array of struct {key TEXT, value BYTEA}

Yes

Record headers, as an array where each element is a struct of header name and value bytes.

key

bytea

Yes

Record key bytes.

timestamp_type

int

Yes

Kafka timestamp type code. 0 for CreateTime, 1 for LogAppendTime. NULL when not available.

redpanda_raw

Populated only when error_handling_policy = 'FILL_NULL' and a record fails to decode. In all other cases, redpanda_raw is NULL.

Use redpanda_raw as a dead-letter pattern. Rows whose value fails schema deserialization remain queryable, with the malformed payload preserved for inspection or reprocessing.

Field Type Nullable Description

key

bytea

Yes

Raw record key bytes.

value

bytea

Yes

Raw record value bytes that failed to decode.

Examples

Map a topic to a table

Map the transactions topic to a table through default_redpanda_catalog:

CREATE TABLE default_redpanda_catalog=>transactions
WITH (
  topic = 'transactions',
  schema_subject = 'transactions-value'
);

Create a table from a multi-message Protobuf schema

When the Protobuf schema for the topic defines more than one message, specify the message to use with output_schema_message_full_name:

CREATE TABLE default_redpanda_catalog=>orders
WITH (
  topic = 'orders',
  schema_subject = 'orders-value',
  output_schema_message_full_name = 'com.example.orders.Order'
);

Create a table with error handling

Map a topic and skip records that fail deserialization:

CREATE TABLE IF NOT EXISTS default_redpanda_catalog=>sensor_readings
WITH (
  topic = 'sensor-data',
  schema_subject = 'sensor-data-value',
  error_handling_policy = 'DROP_RECORD'
);