diff --git a/modules/reference/pages/sql/sql-statements/create-table.adoc b/modules/reference/pages/sql/sql-statements/create-table.adoc index 027ea6681..f92fc4d32 100644 --- a/modules/reference/pages/sql/sql-statements/create-table.adoc +++ b/modules/reference/pages/sql/sql-statements/create-table.adoc @@ -4,7 +4,7 @@ The `CREATE TABLE` statement maps a Redpanda topic to a SQL table through a catalog. After creating the table, you can query topic data using standard SQL. -NOTE: You must first xref:reference:sql/sql-statements/create-redpanda-catalog.adoc[create a Redpanda catalog connection] before creating tables. `CREATE TABLE` in Redpanda SQL maps Redpanda topics to SQL tables — it does not create standalone tables with user-defined schemas. +NOTE: You must first xref:reference:sql/sql-statements/create-redpanda-catalog.adoc[create a Redpanda catalog connection] before creating tables. `CREATE TABLE` in Redpanda SQL maps Redpanda topics to SQL tables and does not create standalone tables with user-defined schemas. == Syntax @@ -20,7 +20,7 @@ WITH (option = 'value' [, ...]); == Options -[cols="<30%,<15%,<10%,<45%",options="header"] +[cols="<30%,<10%,<15%,<45%",options="header"] |=== |Option |Type |Required |Description @@ -32,12 +32,12 @@ WITH (option = 'value' [, ...]); |`schema_subject` |STRING |No -|Schema Registry subject name to use for deserializing topic data. +|Schema Registry subject name to use for deserializing topic data. Defaults to the topic-name strategy (`-value`). |`schema_lookup_policy` |STRING |No -|How to resolve the schema version. Only `LATEST` is supported. +|How to resolve the schema version. `LATEST` is the only supported value. |`error_handling_policy` |STRING @@ -51,42 +51,118 @@ a|How to handle records that fail deserialization. |`struct_mapping_policy` |STRING |No -a|How to map nested structures to SQL columns. +a|How to map nested structures from the topic schema to SQL columns. -* `JSON` (default): Stores nested data as JSON. -* `FLATTEN`: Expands nested fields into top-level columns. -* `COMPOUND`: Maps to ROW types. -* `VARIANT`: Stores as a variant type. +* `COMPOUND` (default): Maps each nested structure to a SQL xref:reference:sql/sql-data-types/row.adoc[ROW] value with named fields, queryable using `(column).field_name` syntax. Cyclic types are not supported in `COMPOUND` mode. Use `JSON` for recursive schemas. +* `JSON`: Stores each nested structure as a JSON value. Required for recursive (cyclic) types. |`output_schema_message_full_name` |STRING |No |Full Protobuf message name. Required when the schema contains multiple message definitions. + +|`confluent_wire_protocol` +|STRING +|No +a|Whether records on the topic are encoded with the https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format[Confluent Schema Registry wire format^] (a magic byte followed by a 4-byte schema ID before the payload). + +* `'true'` (default): Records carry the Confluent wire-format prefix. Use this for topics whose values were produced by a Schema-Registry-aware client. +* `'false'`: Records are raw Protobuf or Avro without the wire-format prefix. + +Only valid when `schema_lookup_policy = 'LATEST'`. +|=== + +[#auto-added-columns] +== Auto-added columns + +Every catalog-mapped table includes two struct columns in addition to the columns derived from the topic's schema. Redpanda SQL adds these columns to both Kafka-backed and Iceberg-backed tables. The names `redpanda` and `redpanda_raw` are reserved. A topic schema cannot define columns with these names. + +=== `redpanda` + +Contains Kafka record metadata. Always present on every row. + +[cols="<22%,<28%,<10%,<40%",options="header"] +|=== +|Field |Type |Nullable |Description + +|`partition` +|`INT` +|No +|Partition the record was read from. + +|`offset` +|`BIGINT` +|No +|Offset of the record within its partition. + +|`timestamp` +|`TIMESTAMP WITH TIME ZONE` +|Yes +|Record timestamp. + +|`headers` +|Array of struct `{key TEXT, value BYTEA}` +|Yes +|Record headers, as an array where each element is a struct of header name and value bytes. + +|`key` +|`BYTEA` +|Yes +|Record key bytes. + +|`timestamp_type` +|`INT` +|Yes +|Kafka timestamp type code. `0` for `CreateTime`, `1` for `LogAppendTime`. `NULL` when not available. +|=== + +=== `redpanda_raw` + +Populated only when `error_handling_policy = 'FILL_NULL'` and a record fails to decode. In all other cases, `redpanda_raw` is `NULL`. + +Use `redpanda_raw` as a dead-letter pattern. Rows whose value fails schema deserialization remain queryable, with the malformed payload preserved for inspection or reprocessing. + +[cols="<22%,<28%,<10%,<40%",options="header"] +|=== +|Field |Type |Nullable |Description + +|`key` +|`BYTEA` +|Yes +|Raw record key bytes. + +|`value` +|`BYTEA` +|Yes +|Raw record value bytes that failed to decode. |=== == Examples -=== Create a basic table +=== Map a topic to a table Map the `transactions` topic to a table through `default_redpanda_catalog`: [source,sql] ---- CREATE TABLE default_redpanda_catalog=>transactions -WITH (topic = 'transactions'); +WITH ( + topic = 'transactions', + schema_subject = 'transactions-value' +); ---- -=== Specify a Schema Registry subject +=== Create a table from a multi-message Protobuf schema -Map a topic and specify the Schema Registry subject: +When the Protobuf schema for the topic defines more than one message, specify the message to use with `output_schema_message_full_name`: [source,sql] ---- -CREATE TABLE default_redpanda_catalog=>user_events +CREATE TABLE default_redpanda_catalog=>orders WITH ( - topic = 'user-events', - schema_subject = 'user-events-value', - schema_lookup_policy = 'LATEST' + topic = 'orders', + schema_subject = 'orders-value', + output_schema_message_full_name = 'com.example.orders.Order' ); ---- diff --git a/modules/sql/pages/query-data/index.adoc b/modules/sql/pages/query-data/index.adoc new file mode 100644 index 000000000..2daba009d --- /dev/null +++ b/modules/sql/pages/query-data/index.adoc @@ -0,0 +1,3 @@ += Query Data +:description: Query live and historical data in your Redpanda topics using standard PostgreSQL syntax. +:page-layout: index diff --git a/modules/sql/pages/query-data/query-streaming-topics.adoc b/modules/sql/pages/query-data/query-streaming-topics.adoc new file mode 100644 index 000000000..c26f8b917 --- /dev/null +++ b/modules/sql/pages/query-data/query-streaming-topics.adoc @@ -0,0 +1,87 @@ += Query Streaming Topics +:description: Map a Redpanda topic to a SQL table and run analytical queries directly against live streaming data. +:page-topic-type: how-to +:personas: app_developer, data_engineer +:learning-objective-1: Map a Redpanda topic to a SQL table using the default Redpanda catalog +:learning-objective-2: Run analytical SQL queries against live topic data + +Map a Redpanda topic to a SQL table to run analytical queries directly against live streaming data without building ETL pipelines. Redpanda SQL reads each record's fields from the topic's registered schema. + +To extend queries past your Redpanda retention window by reading the Iceberg history of Iceberg-enabled topics, see xref:sql:query-data/query-iceberg-topics.adoc[Query Iceberg-enabled Topics]. + +Use this page to: + +* [ ] {learning-objective-1} +* [ ] {learning-objective-2} + +== Prerequisites + +Before you query a topic with SQL: + +* Enable the Redpanda SQL engine on your Redpanda Bring Your Own Cloud (BYOC) cluster. See xref:sql:get-started/deploy-sql-cluster.adoc[Enable Redpanda SQL]. +* Have a Redpanda Cloud user with the *SQL: Access* (or *SQL: Manage*) data-plane RBAC permission. For a *SQL: Access* user to query a topic, a *SQL: Manage* user must first `GRANT SELECT` on the topic to that user. See xref:sql:manage/manage-access.adoc[Manage access to Redpanda SQL]. +* Connect to Redpanda SQL with `psql` or another PostgreSQL client. See xref:sql:get-started/sql-quickstart.adoc[] for a `psql` example, or xref:sql:connect-to-sql/index.adoc[Connect to Redpanda SQL]. +* Confirm that the Redpanda topic you want to query has a schema registered in Schema Registry. Redpanda SQL supports Protobuf, JSON, and Avro schemas. + +== Map the topic to a SQL table + +Each Redpanda topic appears as a SQL table inside a Redpanda catalog. When Redpanda SQL is enabled, a catalog named `default_redpanda_catalog` is created automatically and points at your cluster. + +Define a table against the topic with `CREATE TABLE`: + +[source,sql] +---- +CREATE TABLE default_redpanda_catalog=>orders WITH ( + topic = 'orders', + schema_subject = 'orders-value' +); +---- + +Replace `orders` with your topic name and `orders-value` with the Schema Registry subject that holds the topic's value schema. `schema_subject` is optional. If omitted, Redpanda SQL uses the topic-name strategy default (`-value`). + +If the topic uses a Protobuf schema that defines more than one message, also set `output_schema_message_full_name` to the fully-qualified name of the message to use: + +[source,sql] +---- +CREATE TABLE default_redpanda_catalog=>orders WITH ( + topic = 'orders', + schema_subject = 'orders-value', + output_schema_message_full_name = 'com.example.orders.Order' +); +---- + +The table inherits its column definitions from the registered schema. Each top-level field in the schema becomes a SQL column. For querying nested fields in struct types, see xref:sql:query-data/query-nested-fields.adoc[]. + +In addition to the columns derived from your topic's schema, Redpanda SQL adds two struct columns to every catalog-mapped table: + +* `redpanda`: Kafka record metadata such as partition, offset, and timestamp. +* `redpanda_raw`: Populated only when `error_handling_policy = 'FILL_NULL'` and a record fails to decode. + +For details, see xref:reference:sql/sql-statements/create-table.adoc#auto-added-columns[Auto-added columns]. + +== Run queries + +Query the table with standard `SELECT` syntax. The following query returns the first 10 records: + +[source,sql] +---- +SELECT * FROM default_redpanda_catalog=>orders LIMIT 10; +---- + +Aggregate and filter records using familiar PostgreSQL constructs: + +[source,sql] +---- +SELECT customer_id, SUM(amount) AS total +FROM default_redpanda_catalog=>orders +WHERE status = 'completed' +GROUP BY customer_id +ORDER BY total DESC +LIMIT 10; +---- + +== Next steps + +* xref:sql:query-data/query-iceberg-topics.adoc[Query Iceberg-enabled Topics]: run queries against historical data retained beyond your Redpanda retention window. +* xref:reference:sql/sql-statements/create-table.adoc[CREATE TABLE]: full reference for the table-against-topic syntax, including all options. +* xref:reference:sql/index.adoc[Redpanda SQL Reference]: supported SQL statements, clauses, data types, and functions.