Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 93 additions & 17 deletions modules/reference/pages/sql/sql-statements/create-table.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

The `CREATE TABLE` statement maps a Redpanda topic to a SQL table through a catalog. After creating the table, you can query topic data using standard SQL.

NOTE: You must first xref:reference:sql/sql-statements/create-redpanda-catalog.adoc[create a Redpanda catalog connection] before creating tables. `CREATE TABLE` in Redpanda SQL maps Redpanda topics to SQL tables — it does not create standalone tables with user-defined schemas.
NOTE: You must first xref:reference:sql/sql-statements/create-redpanda-catalog.adoc[create a Redpanda catalog connection] before creating tables. `CREATE TABLE` in Redpanda SQL maps Redpanda topics to SQL tables and does not create standalone tables with user-defined schemas.

== Syntax

Expand All @@ -20,7 +20,7 @@ WITH (option = 'value' [, ...]);

== Options

[cols="<30%,<15%,<10%,<45%",options="header"]
[cols="<30%,<10%,<15%,<45%",options="header"]
|===
|Option |Type |Required |Description

Expand All @@ -32,12 +32,12 @@ WITH (option = 'value' [, ...]);
|`schema_subject`
|STRING
|No
|Schema Registry subject name to use for deserializing topic data.
|Schema Registry subject name to use for deserializing topic data. Defaults to the topic-name strategy (`<topic>-value`).

|`schema_lookup_policy`
|STRING
|No
|How to resolve the schema version. Only `LATEST` is supported.
|How to resolve the schema version. `LATEST` is the only supported value.

|`error_handling_policy`
|STRING
Expand All @@ -51,42 +51,118 @@ a|How to handle records that fail deserialization.
|`struct_mapping_policy`
|STRING
|No
a|How to map nested structures to SQL columns.
a|How to map nested structures from the topic schema to SQL columns.

* `JSON` (default): Stores nested data as JSON.
* `FLATTEN`: Expands nested fields into top-level columns.
* `COMPOUND`: Maps to ROW types.
* `VARIANT`: Stores as a variant type.
* `COMPOUND` (default): Maps each nested structure to a SQL xref:reference:sql/sql-data-types/row.adoc[ROW] value with named fields, queryable using `(column).field_name` syntax. Cyclic types are not supported in `COMPOUND` mode. Use `JSON` for recursive schemas.
* `JSON`: Stores each nested structure as a JSON value. Required for recursive (cyclic) types.

|`output_schema_message_full_name`
|STRING
|No
|Full Protobuf message name. Required when the schema contains multiple message definitions.

|`confluent_wire_protocol`
|STRING
|No
a|Whether records on the topic are encoded with the https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format[Confluent Schema Registry wire format^] (a magic byte followed by a 4-byte schema ID before the payload).

* `'true'` (default): Records carry the Confluent wire-format prefix. Use this for topics whose values were produced by a Schema-Registry-aware client.
* `'false'`: Records are raw Protobuf or Avro without the wire-format prefix.

Only valid when `schema_lookup_policy = 'LATEST'`.
|===

[#auto-added-columns]
== Auto-added columns

Every catalog-mapped table includes two struct columns in addition to the columns derived from the topic's schema. Redpanda SQL adds these columns to both Kafka-backed and Iceberg-backed tables. The names `redpanda` and `redpanda_raw` are reserved. A topic schema cannot define columns with these names.

=== `redpanda`

Contains Kafka record metadata. Always present on every row.

[cols="<22%,<28%,<10%,<40%",options="header"]
|===
|Field |Type |Nullable |Description

|`partition`
|`INT`
|No
|Partition the record was read from.

|`offset`
|`BIGINT`
|No
|Offset of the record within its partition.

|`timestamp`
|`TIMESTAMP WITH TIME ZONE`
|Yes
|Record timestamp.

|`headers`
|Array of struct `{key TEXT, value BYTEA}`
|Yes
|Record headers, as an array where each element is a struct of header name and value bytes.

|`key`
|`BYTEA`
|Yes
|Record key bytes.

|`timestamp_type`
|`INT`
|Yes
|Kafka timestamp type code. `0` for `CreateTime`, `1` for `LogAppendTime`. `NULL` when not available.
|===

=== `redpanda_raw`

Populated only when `error_handling_policy = 'FILL_NULL'` and a record fails to decode. In all other cases, `redpanda_raw` is `NULL`.

Use `redpanda_raw` as a dead-letter pattern. Rows whose value fails schema deserialization remain queryable, with the malformed payload preserved for inspection or reprocessing.

[cols="<22%,<28%,<10%,<40%",options="header"]
|===
|Field |Type |Nullable |Description

|`key`
|`BYTEA`
|Yes
|Raw record key bytes.

|`value`
|`BYTEA`
|Yes
|Raw record value bytes that failed to decode.
|===

== Examples

=== Create a basic table
=== Map a topic to a table

Map the `transactions` topic to a table through `default_redpanda_catalog`:

[source,sql]
----
CREATE TABLE default_redpanda_catalog=>transactions
WITH (topic = 'transactions');
WITH (
topic = 'transactions',
schema_subject = 'transactions-value'
);
----

=== Specify a Schema Registry subject
=== Create a table from a multi-message Protobuf schema

Map a topic and specify the Schema Registry subject:
When the Protobuf schema for the topic defines more than one message, specify the message to use with `output_schema_message_full_name`:

[source,sql]
----
CREATE TABLE default_redpanda_catalog=>user_events
CREATE TABLE default_redpanda_catalog=>orders
WITH (
topic = 'user-events',
schema_subject = 'user-events-value',
schema_lookup_policy = 'LATEST'
topic = 'orders',
schema_subject = 'orders-value',
output_schema_message_full_name = 'com.example.orders.Order'
);
----

Expand Down
3 changes: 3 additions & 0 deletions modules/sql/pages/query-data/index.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
= Query Data
:description: Query live and historical data in your Redpanda topics using standard PostgreSQL syntax.
:page-layout: index
87 changes: 87 additions & 0 deletions modules/sql/pages/query-data/query-streaming-topics.adoc
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder where should we mention the existence of redpanda and redpanda_raw structs. The first is iceberg equivalent so all partition offset etc properties are there. The second is DLQ equivalent, filled when FILL_NULL error policy is set

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JacekGalazka1 do those only exist for Iceberg topics? There is mention of them in this other doc specifically for querying Iceberg https://github.com/redpanda-data/cloud-docs/pull/575/changes#diff-3ab2a15f947f028cb3f75cdb5184029657557cac26b1c961cb27c72554ba3533R83 Is redpanda_raw populated only when FILL_NULL is set?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are always added to each kafka reader, so both pure kafka and iceberg backed will have it.
redpanda_raw is populated only when FILL_NULL is set and only for records that failed to decode. in all other cases it's NULL.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect. Good to go

Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
= Query Streaming Topics
:description: Map a Redpanda topic to a SQL table and run analytical queries directly against live streaming data.
:page-topic-type: how-to
:personas: app_developer, data_engineer
:learning-objective-1: Map a Redpanda topic to a SQL table using the default Redpanda catalog
:learning-objective-2: Run analytical SQL queries against live topic data

Map a Redpanda topic to a SQL table to run analytical queries directly against live streaming data without building ETL pipelines. Redpanda SQL reads each record's fields from the topic's registered schema.

To extend queries past your Redpanda retention window by reading the Iceberg history of Iceberg-enabled topics, see xref:sql:query-data/query-iceberg-topics.adoc[Query Iceberg-enabled Topics].
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly @kbatuigas ! :)


Use this page to:

* [ ] {learning-objective-1}
* [ ] {learning-objective-2}

== Prerequisites

Before you query a topic with SQL:

* Enable the Redpanda SQL engine on your Redpanda Bring Your Own Cloud (BYOC) cluster. See xref:sql:get-started/deploy-sql-cluster.adoc[Enable Redpanda SQL].
* Have a Redpanda Cloud user with the *SQL: Access* (or *SQL: Manage*) data-plane RBAC permission. For a *SQL: Access* user to query a topic, a *SQL: Manage* user must first `GRANT SELECT` on the topic to that user. See xref:sql:manage/manage-access.adoc[Manage access to Redpanda SQL].
* Connect to Redpanda SQL with `psql` or another PostgreSQL client. See xref:sql:get-started/sql-quickstart.adoc[] for a `psql` example, or xref:sql:connect-to-sql/index.adoc[Connect to Redpanda SQL].
* Confirm that the Redpanda topic you want to query has a schema registered in Schema Registry. Redpanda SQL supports Protobuf, JSON, and Avro schemas.

== Map the topic to a SQL table

Each Redpanda topic appears as a SQL table inside a Redpanda catalog. When Redpanda SQL is enabled, a catalog named `default_redpanda_catalog` is created automatically and points at your cluster.

Define a table against the topic with `CREATE TABLE`:

[source,sql]
----
CREATE TABLE default_redpanda_catalog=>orders WITH (
topic = 'orders',
schema_subject = 'orders-value'
);
----

Replace `orders` with your topic name and `orders-value` with the Schema Registry subject that holds the topic's value schema. `schema_subject` is optional. If omitted, Redpanda SQL uses the topic-name strategy default (`<topic>-value`).

If the topic uses a Protobuf schema that defines more than one message, also set `output_schema_message_full_name` to the fully-qualified name of the message to use:

[source,sql]
----
CREATE TABLE default_redpanda_catalog=>orders WITH (
topic = 'orders',
schema_subject = 'orders-value',
output_schema_message_full_name = 'com.example.orders.Order'
);
----

The table inherits its column definitions from the registered schema. Each top-level field in the schema becomes a SQL column. For querying nested fields in struct types, see xref:sql:query-data/query-nested-fields.adoc[].

In addition to the columns derived from your topic's schema, Redpanda SQL adds two struct columns to every catalog-mapped table:

* `redpanda`: Kafka record metadata such as partition, offset, and timestamp.
* `redpanda_raw`: Populated only when `error_handling_policy = 'FILL_NULL'` and a record fails to decode.

For details, see xref:reference:sql/sql-statements/create-table.adoc#auto-added-columns[Auto-added columns].

== Run queries

Query the table with standard `SELECT` syntax. The following query returns the first 10 records:

[source,sql]
----
SELECT * FROM default_redpanda_catalog=>orders LIMIT 10;
----

Aggregate and filter records using familiar PostgreSQL constructs:

[source,sql]
----
SELECT customer_id, SUM(amount) AS total
FROM default_redpanda_catalog=>orders
WHERE status = 'completed'
GROUP BY customer_id
ORDER BY total DESC
LIMIT 10;
----

== Next steps

* xref:sql:query-data/query-iceberg-topics.adoc[Query Iceberg-enabled Topics]: run queries against historical data retained beyond your Redpanda retention window.
* xref:reference:sql/sql-statements/create-table.adoc[CREATE TABLE]: full reference for the table-against-topic syntax, including all options.
* xref:reference:sql/index.adoc[Redpanda SQL Reference]: supported SQL statements, clauses, data types, and functions.