From f4f62df5ab08c2c85b4b2b7252f1e0833c2ac574 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 22 Feb 2026 00:42:54 +1100 Subject: [PATCH 01/10] add system defined tags and free form labels to datasets PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags and X-P-Dataset-Labels headers (comma-separated) on stream creation PUT /api/prism/v1/datasets/{name} - update tags and labels GET /api/prism/v1/datasets/{name}/correlated - find datasets sharing tags or labels GET /api/prism/v1/datasets/tags/{tag} - find all datasets with a specific tag include tags and labels in home api response --- src/connectors/kafka/processor.rs | 3 +- src/handlers/http/ingest.rs | 6 +- src/handlers/http/mod.rs | 1 + src/handlers/http/modal/server.rs | 41 ++++++++++--- .../http/modal/utils/logstream_utils.rs | 33 ++++++----- src/handlers/mod.rs | 58 ++++++++++++++++--- src/metadata.rs | 9 ++- src/migration/mod.rs | 6 +- src/parseable/mod.rs | 27 ++++++--- src/parseable/streams.rs | 24 +++++++- src/prism/home/mod.rs | 18 ++++-- src/storage/field_stats.rs | 3 +- src/storage/mod.rs | 18 ++++-- src/storage/object_storage.rs | 25 ++++++++ 14 files changed, 209 insertions(+), 63 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 4a61258d7..f918b6d26 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -64,7 +64,8 @@ impl ParseableSinkProcessor { vec![log_source_entry], TelemetryType::default(), tenant_id, - None, + vec![], + vec![], ) .await?; diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 0f3a87744..a02fe9323 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -120,7 +120,8 @@ pub async fn ingest( vec![log_source_entry.clone()], telemetry_type, &tenant_id, - None, + vec![], + vec![], ) .await .map_err(|e| { @@ -239,7 +240,8 @@ pub async fn setup_otel_stream( vec![log_source_entry.clone()], telemetry_type, &tenant_id, - None, + vec![], + vec![], ) .await?; let mut time_partition = None; diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 993a40b10..a4136e939 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -32,6 +32,7 @@ pub mod about; pub mod alerts; pub mod cluster; pub mod correlation; +pub mod datasets; pub mod demo_data; pub mod health_check; pub mod ingest; diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 29a977c3d..ed75f542b 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -200,14 +200,39 @@ impl Server { } pub fn get_prism_datasets() -> Scope { - web::scope("/datasets").route( - "", - web::post() - .to(http::prism_logstream::post_datasets) - .authorize_for_resource(Action::GetStreamInfo) - .authorize_for_resource(Action::GetStats) - .authorize_for_resource(Action::GetRetention), - ) + web::scope("/datasets") + .route( + "", + web::post() + .to(http::prism_logstream::post_datasets) + .authorize_for_resource(Action::GetStreamInfo) + .authorize_for_resource(Action::GetStats) + .authorize_for_resource(Action::GetRetention), + ) + .route( + "/tags/{tag}", + web::get() + .to(http::datasets::get_datasets_by_tag) + .authorize_for_resource(Action::GetStreamInfo), + ) + .route( + "/{name}/correlated", + web::get() + .to(http::datasets::get_correlated_datasets) + .authorize_for_resource(Action::GetStreamInfo), + ) + .route( + "/{name}/tags", + web::put() + .to(http::datasets::put_dataset_tags) + .authorize_for_resource(Action::CreateStream), + ) + .route( + "/{name}/labels", + web::put() + .to(http::datasets::put_dataset_labels) + .authorize_for_resource(Action::CreateStream), + ) } pub fn get_demo_data_webscope() -> Scope { diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 7b93fafbc..7930df77f 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -16,17 +16,18 @@ * */ +use actix_web::http::header::HeaderMap; + use crate::{ event::format::LogSource, handlers::{ - CUSTOM_PARTITION_KEY, DATASET_TAG_KEY, DatasetTag, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, - STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, - TelemetryType, UPDATE_STREAM_KEY, + CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag, + LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, + TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY, + parse_dataset_labels, parse_dataset_tags, }, storage::StreamType, }; -use actix_web::http::header::HeaderMap; -use tracing::warn; #[derive(Debug, Default)] pub struct PutStreamHeaders { @@ -38,7 +39,8 @@ pub struct PutStreamHeaders { pub stream_type: StreamType, pub log_source: LogSource, pub telemetry_type: TelemetryType, - pub dataset_tag: Option, + pub dataset_tags: Vec, + pub dataset_labels: Vec, } impl From<&HeaderMap> for PutStreamHeaders { @@ -72,16 +74,17 @@ impl From<&HeaderMap> for PutStreamHeaders { .get(TELEMETRY_TYPE_KEY) .and_then(|v| v.to_str().ok()) .map_or(TelemetryType::Logs, TelemetryType::from), - dataset_tag: headers - .get(DATASET_TAG_KEY) + dataset_tags: headers + .get(DATASET_TAGS_KEY) + .or_else(|| headers.get(DATASET_TAG_KEY)) .and_then(|v| v.to_str().ok()) - .and_then(|v| match DatasetTag::try_from(v) { - Ok(tag) => Some(tag), - Err(err) => { - warn!("Invalid dataset tag '{v}': {err}"); - None - } - }), + .map(parse_dataset_tags) + .unwrap_or_default(), + dataset_labels: headers + .get(DATASET_LABELS_KEY) + .and_then(|v| v.to_str().ok()) + .map(parse_dataset_labels) + .unwrap_or_default(), } } } diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index 034e524b1..ebbea833b 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -16,9 +16,11 @@ * */ +use std::collections::HashSet; use std::fmt::Display; use serde::{Deserialize, Serialize}; +use tracing::warn; pub mod airplane; pub mod http; @@ -36,6 +38,8 @@ pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; pub const STREAM_TYPE_KEY: &str = "x-p-stream-type"; pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type"; pub const DATASET_TAG_KEY: &str = "x-p-dataset-tag"; +pub const DATASET_TAGS_KEY: &str = "x-p-dataset-tags"; +pub const DATASET_LABELS_KEY: &str = "x-p-dataset-labels"; pub const TENANT_ID: &str = "x-p-tenant"; const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; @@ -85,12 +89,14 @@ impl Display for TelemetryType { } /// Tag for categorizing datasets/streams by observability domain -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "kebab-case")] pub enum DatasetTag { - AgentObservability, - K8sObservability, + AgentMonitoring, + K8sMonitoring, DatabaseObservability, + ApplicationMonitoring, + ServiceMap, } impl TryFrom<&str> for DatasetTag { @@ -98,11 +104,13 @@ impl TryFrom<&str> for DatasetTag { fn try_from(s: &str) -> Result { match s.to_lowercase().as_str() { - "agent-observability" => Ok(DatasetTag::AgentObservability), - "k8s-observability" => Ok(DatasetTag::K8sObservability), + "agent-monitoring" => Ok(DatasetTag::AgentMonitoring), + "k8s-monitoring" => Ok(DatasetTag::K8sMonitoring), "database-observability" => Ok(DatasetTag::DatabaseObservability), + "application-monitoring" => Ok(DatasetTag::ApplicationMonitoring), + "service-map" => Ok(DatasetTag::ServiceMap), _ => Err( - "Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability", + "Invalid dataset tag. Supported values: agent-monitoring, k8s-monitoring, database-observability, application-monitoring, service-map", ), } } @@ -111,9 +119,43 @@ impl TryFrom<&str> for DatasetTag { impl Display for DatasetTag { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(match self { - DatasetTag::AgentObservability => "agent-observability", - DatasetTag::K8sObservability => "k8s-observability", + DatasetTag::AgentMonitoring => "agent-monitoring", + DatasetTag::K8sMonitoring => "k8s-monitoring", DatasetTag::DatabaseObservability => "database-observability", + DatasetTag::ApplicationMonitoring => "application-monitoring", + DatasetTag::ServiceMap => "service-map", }) } } + +pub fn parse_dataset_tags(header_value: &str) -> Vec { + header_value + .split(',') + .filter_map(|s| { + let trimmed = s.trim(); + if trimmed.is_empty() { + None + } else { + match DatasetTag::try_from(trimmed) { + Ok(tag) => Some(tag), + Err(err) => { + warn!("Invalid dataset tag '{trimmed}': {err}"); + None + } + } + } + }) + .collect::>() + .into_iter() + .collect() +} + +pub fn parse_dataset_labels(header_value: &str) -> Vec { + header_value + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect::>() + .into_iter() + .collect() +} diff --git a/src/metadata.rs b/src/metadata.rs index b0fa024fd..9ca1dd1e7 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -93,7 +93,8 @@ pub struct LogStreamMetadata { pub stream_type: StreamType, pub log_source: Vec, pub telemetry_type: TelemetryType, - pub dataset_tag: Option, + pub dataset_tags: Vec, + pub dataset_labels: Vec, } impl LogStreamMetadata { @@ -109,7 +110,8 @@ impl LogStreamMetadata { schema_version: SchemaVersion, log_source: Vec, telemetry_type: TelemetryType, - dataset_tag: Option, + dataset_tags: Vec, + dataset_labels: Vec, ) -> Self { LogStreamMetadata { created_at: if created_at.is_empty() { @@ -134,7 +136,8 @@ impl LogStreamMetadata { schema_version, log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, ..Default::default() } } diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 7d7027fe4..8e67d59e6 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -454,7 +454,8 @@ async fn setup_logstream_metadata( stream_type, log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, .. } = serde_json::from_value(stream_metadata_value).unwrap_or_default(); @@ -500,7 +501,8 @@ async fn setup_logstream_metadata( stream_type, log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, }; Ok(metadata) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index d7d02ffad..4ca4eac25 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -416,7 +416,8 @@ impl Parseable { let schema_version = stream_metadata.schema_version; let log_source = stream_metadata.log_source; let telemetry_type = stream_metadata.telemetry_type; - let dataset_tag = stream_metadata.dataset_tag; + let dataset_tags = stream_metadata.dataset_tags; + let dataset_labels = stream_metadata.dataset_labels; let mut metadata = LogStreamMetadata::new( created_at, time_partition, @@ -428,7 +429,8 @@ impl Parseable { schema_version, log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, ); // Set hot tier fields from the stored metadata @@ -533,7 +535,8 @@ impl Parseable { log_source: Vec, telemetry_type: TelemetryType, tenant_id: &Option, - dataset_tag: Option, + dataset_tags: Vec, + dataset_labels: Vec, ) -> Result { if self.streams.contains(stream_name, tenant_id) { return Ok(true); @@ -566,7 +569,8 @@ impl Parseable { log_source, telemetry_type, tenant_id, - dataset_tag, + dataset_tags, + dataset_labels, ) .await?; @@ -643,7 +647,8 @@ impl Parseable { stream_type, log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, } = headers.into(); let stream_in_memory_dont_update = @@ -717,7 +722,8 @@ impl Parseable { vec![log_source_entry], telemetry_type, tenant_id, - dataset_tag, + dataset_tags, + dataset_labels, ) .await?; @@ -779,7 +785,8 @@ impl Parseable { log_source: Vec, telemetry_type: TelemetryType, tenant_id: &Option, - dataset_tag: Option, + dataset_tags: Vec, + dataset_labels: Vec, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_type != StreamType::Internal { @@ -804,7 +811,8 @@ impl Parseable { }, log_source: log_source.clone(), telemetry_type, - dataset_tag, + dataset_tags: dataset_tags.clone(), + dataset_labels: dataset_labels.clone(), ..Default::default() }; @@ -834,7 +842,8 @@ impl Parseable { SchemaVersion::V1, // New stream log_source, telemetry_type, - dataset_tag, + dataset_tags, + dataset_labels, ); let ingestor_id = INGESTOR_META .get() diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 0bf8bf2ca..1230946ab 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -963,8 +963,28 @@ impl Stream { self.metadata.read().expect(LOCK_EXPECT).log_source.clone() } - pub fn get_dataset_tag(&self) -> Option { - self.metadata.read().expect(LOCK_EXPECT).dataset_tag + pub fn get_dataset_tags(&self) -> Vec { + self.metadata + .read() + .expect(LOCK_EXPECT) + .dataset_tags + .clone() + } + + pub fn get_dataset_labels(&self) -> Vec { + self.metadata + .read() + .expect(LOCK_EXPECT) + .dataset_labels + .clone() + } + + pub fn set_dataset_tags(&self, tags: Vec) { + self.metadata.write().expect(LOCK_EXPECT).dataset_tags = tags; + } + + pub fn set_dataset_labels(&self, labels: Vec) { + self.metadata.write().expect(LOCK_EXPECT).dataset_labels = labels; } pub fn add_log_source(&self, log_source: LogSourceEntry) { diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 4ade98c69..b9f1195d4 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -45,7 +45,8 @@ struct StreamMetadata { time_partition: Option, dataset_format: LogSource, ingestion: bool, - tag: Option, + tags: Vec, + labels: Vec, } type StreamMetadataResponse = Result; @@ -59,8 +60,10 @@ pub struct DataSet { time_partition: Option, dataset_format: LogSource, ingestion: bool, - #[serde(skip_serializing_if = "Option::is_none")] - tag: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + tags: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + labels: Vec, } #[derive(Debug, Serialize, Deserialize, Default)] @@ -147,7 +150,8 @@ pub async fn generate_home_response( time_partition: sm.time_partition, dataset_format: sm.dataset_format, ingestion: sm.ingestion, - tag: sm.tag, + tags: sm.tags, + labels: sm.labels, }); } Err(e) => { @@ -233,7 +237,8 @@ async fn get_stream_metadata(stream: String, tenant_id: &Option) -> Stre let ingested = stream_jsons .iter() .any(|s| s.stats.current_stats.events > 0); - let dataset_tag = stream_jsons[0].dataset_tag; + let dataset_tags = stream_jsons[0].dataset_tags.clone(); + let dataset_labels = stream_jsons[0].dataset_labels.clone(); Ok(StreamMetadata { stream, @@ -242,7 +247,8 @@ async fn get_stream_metadata(stream: String, tenant_id: &Option) -> Stre time_partition, dataset_format, ingestion: ingested, - tag: dataset_tag, + tags: dataset_tags, + labels: dataset_labels, }) } diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index 5b3cc1b66..efc2aaaee 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -159,7 +159,8 @@ pub async fn calculate_field_stats( vec![log_source_entry], TelemetryType::Logs, tenant_id, - None, + vec![], + vec![], ) .await?; let vec_json = apply_generic_flattening_for_partition( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 84d48d3c3..1694e5f5b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -131,8 +131,10 @@ pub struct ObjectStoreFormat { pub log_source: Vec, #[serde(default)] pub telemetry_type: TelemetryType, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub dataset_tag: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dataset_tags: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dataset_labels: Vec, } impl MetastoreObject for ObjectStoreFormat { @@ -173,8 +175,10 @@ pub struct StreamInfo { pub telemetry_type: TelemetryType, #[serde(default)] pub hot_tier_enabled: bool, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub dataset_tag: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dataset_tags: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dataset_labels: Vec, } impl StreamInfo { @@ -197,7 +201,8 @@ impl StreamInfo { log_source: metadata.log_source.clone(), telemetry_type: metadata.telemetry_type, hot_tier_enabled: metadata.hot_tier_enabled, - dataset_tag: metadata.dataset_tag, + dataset_tags: metadata.dataset_tags.clone(), + dataset_labels: metadata.dataset_labels.clone(), } } } @@ -279,7 +284,8 @@ impl Default for ObjectStoreFormat { hot_tier: None, log_source: vec![LogSourceEntry::default()], telemetry_type: TelemetryType::Logs, - dataset_tag: None, + dataset_tags: Vec::new(), + dataset_labels: Vec::new(), } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 398b7e0cf..54000b3d4 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -45,6 +45,7 @@ use ulid::Ulid; use crate::catalog::{self, snapshot::Snapshot}; use crate::event::format::LogSource; use crate::event::format::LogSourceEntry; +use crate::handlers::DatasetTag; use crate::handlers::http::fetch_schema; use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; use crate::handlers::http::modal::ingest_server::INGESTOR_META; @@ -495,6 +496,30 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } + async fn update_dataset_tags_and_labels_in_stream( + &self, + stream_name: &str, + tags: &[DatasetTag], + labels: &[String], + ) -> Result<(), ObjectStorageError> { + let mut format: ObjectStoreFormat = serde_json::from_slice( + &PARSEABLE + .metastore + .get_stream_json(stream_name, false) + .await + .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?, + )?; + format.dataset_tags = tags.to_owned(); + format.dataset_labels = labels.to_owned(); + PARSEABLE + .metastore + .put_stream_json(&format, stream_name) + .await + .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?; + + Ok(()) + } + /// Updates the first event timestamp in the object store for the specified stream. /// /// This function retrieves the current object-store format for the given stream, From 30ed3001b0cf604ee9942c4524049bc27a56d48c Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 22 Feb 2026 01:18:08 +1100 Subject: [PATCH 02/10] add dataset handler file --- src/handlers/http/datasets.rs | 212 ++++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100644 src/handlers/http/datasets.rs diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs new file mode 100644 index 000000000..22f259fa2 --- /dev/null +++ b/src/handlers/http/datasets.rs @@ -0,0 +1,212 @@ +/* + * Parseable Server (C) 2022 - 2025 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::HashSet; + +use actix_web::http::StatusCode; +use actix_web::{HttpResponse, web}; +use serde::{Deserialize, Serialize}; + +use crate::{ + handlers::DatasetTag, + parseable::PARSEABLE, + storage::{ObjectStorageError, StreamType}, +}; + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct CorrelatedDataset { + name: String, + shared_tags: Vec, + shared_labels: Vec, +} + +/// GET /api/v1/datasets/{name}/correlated +/// Returns all datasets sharing at least one tag or label with the named dataset. +pub async fn get_correlated_datasets( + path: web::Path, +) -> Result { + let dataset_name = path.into_inner(); + + let stream = PARSEABLE + .get_stream(&dataset_name) + .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; + + let target_tags: HashSet = stream.get_dataset_tags().into_iter().collect(); + let target_labels: HashSet = stream.get_dataset_labels().into_iter().collect(); + + if target_tags.is_empty() && target_labels.is_empty() { + return Ok(HttpResponse::Ok().json(Vec::::new())); + } + + let all_streams = PARSEABLE.streams.list(); + let mut correlated = Vec::new(); + + for name in all_streams { + if name == dataset_name { + continue; + } + if let Ok(s) = PARSEABLE.get_stream(&name) { + // Skip internal streams + if s.get_stream_type() == StreamType::Internal { + continue; + } + + let s_tags: HashSet = s.get_dataset_tags().into_iter().collect(); + let s_labels: HashSet = s.get_dataset_labels().into_iter().collect(); + + let shared_tags: Vec = target_tags.intersection(&s_tags).copied().collect(); + let shared_labels: Vec = + target_labels.intersection(&s_labels).cloned().collect(); + + if !shared_tags.is_empty() || !shared_labels.is_empty() { + correlated.push(CorrelatedDataset { + name, + shared_tags, + shared_labels, + }); + } + } + } + + Ok(HttpResponse::Ok().json(correlated)) +} + +/// GET /api/v1/datasets/tags/{tag} +/// Returns all datasets that have the specified tag. +pub async fn get_datasets_by_tag(path: web::Path) -> Result { + let tag_str = path.into_inner(); + let tag = + DatasetTag::try_from(tag_str.as_str()).map_err(|_| DatasetsError::InvalidTag(tag_str))?; + + let all_streams = PARSEABLE.streams.list(); + let mut matching = Vec::new(); + + for name in all_streams { + if let Ok(s) = PARSEABLE.get_stream(&name) { + if s.get_stream_type() == StreamType::Internal { + continue; + } + if s.get_dataset_tags().contains(&tag) { + matching.push(name); + } + } + } + + Ok(HttpResponse::Ok().json(matching)) +} + +#[derive(Debug, Deserialize)] +pub struct PutTagsBody { + pub tags: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct PutLabelsBody { + pub labels: Vec, +} + +/// PUT /api/v1/datasets/{name}/tags +/// Replaces the dataset's tags with the provided list. +pub async fn put_dataset_tags( + path: web::Path, + body: web::Json, +) -> Result { + let dataset_name = path.into_inner(); + let new_tags: Vec = body + .into_inner() + .tags + .into_iter() + .collect::>() + .into_iter() + .collect(); + + let stream = PARSEABLE + .get_stream(&dataset_name) + .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; + + // Update storage first, then in-memory + let storage = PARSEABLE.storage.get_object_store(); + let existing_labels = stream.get_dataset_labels(); + storage + .update_dataset_tags_and_labels_in_stream(&dataset_name, &new_tags, &existing_labels) + .await + .map_err(DatasetsError::Storage)?; + + stream.set_dataset_tags(new_tags.clone()); + + Ok(HttpResponse::Ok().json(serde_json::json!({ "tags": new_tags }))) +} + +/// PUT /api/v1/datasets/{name}/labels +/// Replaces the dataset's labels with the provided list. +pub async fn put_dataset_labels( + path: web::Path, + body: web::Json, +) -> Result { + let dataset_name = path.into_inner(); + let new_labels: Vec = body + .into_inner() + .labels + .into_iter() + .collect::>() + .into_iter() + .collect(); + + let stream = PARSEABLE + .get_stream(&dataset_name) + .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; + + // Update storage first, then in-memory + let storage = PARSEABLE.storage.get_object_store(); + let existing_tags = stream.get_dataset_tags(); + storage + .update_dataset_tags_and_labels_in_stream(&dataset_name, &existing_tags, &new_labels) + .await + .map_err(DatasetsError::Storage)?; + + stream.set_dataset_labels(new_labels.clone()); + + Ok(HttpResponse::Ok().json(serde_json::json!({ "labels": new_labels }))) +} + +#[derive(Debug, thiserror::Error)] +pub enum DatasetsError { + #[error("Dataset not found: {0}")] + DatasetNotFound(String), + #[error("Invalid tag: {0}")] + InvalidTag(String), + #[error("Storage error: {0}")] + Storage(ObjectStorageError), +} + +impl actix_web::ResponseError for DatasetsError { + fn status_code(&self) -> StatusCode { + match self { + DatasetsError::DatasetNotFound(_) => StatusCode::NOT_FOUND, + DatasetsError::InvalidTag(_) => StatusCode::BAD_REQUEST, + DatasetsError::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn error_response(&self) -> HttpResponse { + HttpResponse::build(self.status_code()).json(serde_json::json!({ + "error": self.to_string() + })) + } +} From 122106ccafdfb1804e58c89151fcd77909a23fda Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 25 Feb 2026 17:30:01 +1100 Subject: [PATCH 03/10] single endpoint for tags and labels --- src/handlers/http/datasets.rs | 85 ++++++++++++------------------- src/handlers/http/modal/server.rs | 10 +--- 2 files changed, 34 insertions(+), 61 deletions(-) diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs index 22f259fa2..2e9d5ba7a 100644 --- a/src/handlers/http/datasets.rs +++ b/src/handlers/http/datasets.rs @@ -112,77 +112,56 @@ pub async fn get_datasets_by_tag(path: web::Path) -> Result, +pub struct PutDatasetMetadataBody { + pub tags: Option>, + pub labels: Option>, } -#[derive(Debug, Deserialize)] -pub struct PutLabelsBody { - pub labels: Vec, -} - -/// PUT /api/v1/datasets/{name}/tags -/// Replaces the dataset's tags with the provided list. -pub async fn put_dataset_tags( +/// PUT /api/v1/datasets/{name} +/// Replaces the dataset's tags and/or labels. +/// Only fields present in the body are updated; absent fields are left unchanged. +pub async fn put_dataset_metadata( path: web::Path, - body: web::Json, + body: web::Json, ) -> Result { let dataset_name = path.into_inner(); - let new_tags: Vec = body - .into_inner() - .tags - .into_iter() - .collect::>() - .into_iter() - .collect(); + let body = body.into_inner(); let stream = PARSEABLE .get_stream(&dataset_name) .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; - // Update storage first, then in-memory - let storage = PARSEABLE.storage.get_object_store(); - let existing_labels = stream.get_dataset_labels(); - storage - .update_dataset_tags_and_labels_in_stream(&dataset_name, &new_tags, &existing_labels) - .await - .map_err(DatasetsError::Storage)?; - - stream.set_dataset_tags(new_tags.clone()); - - Ok(HttpResponse::Ok().json(serde_json::json!({ "tags": new_tags }))) -} - -/// PUT /api/v1/datasets/{name}/labels -/// Replaces the dataset's labels with the provided list. -pub async fn put_dataset_labels( - path: web::Path, - body: web::Json, -) -> Result { - let dataset_name = path.into_inner(); - let new_labels: Vec = body - .into_inner() - .labels - .into_iter() - .collect::>() - .into_iter() - .collect(); - - let stream = PARSEABLE - .get_stream(&dataset_name) - .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; + let final_tags = match body.tags { + Some(tags) => tags + .into_iter() + .collect::>() + .into_iter() + .collect(), + None => stream.get_dataset_tags(), + }; + let final_labels = match body.labels { + Some(labels) => labels + .into_iter() + .collect::>() + .into_iter() + .collect(), + None => stream.get_dataset_labels(), + }; // Update storage first, then in-memory let storage = PARSEABLE.storage.get_object_store(); - let existing_tags = stream.get_dataset_tags(); storage - .update_dataset_tags_and_labels_in_stream(&dataset_name, &existing_tags, &new_labels) + .update_dataset_tags_and_labels_in_stream(&dataset_name, &final_tags, &final_labels) .await .map_err(DatasetsError::Storage)?; - stream.set_dataset_labels(new_labels.clone()); + stream.set_dataset_tags(final_tags.clone()); + stream.set_dataset_labels(final_labels.clone()); - Ok(HttpResponse::Ok().json(serde_json::json!({ "labels": new_labels }))) + Ok(HttpResponse::Ok().json(serde_json::json!({ + "tags": final_tags, + "labels": final_labels, + }))) } #[derive(Debug, thiserror::Error)] diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index ed75f542b..dde70bd1c 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -222,15 +222,9 @@ impl Server { .authorize_for_resource(Action::GetStreamInfo), ) .route( - "/{name}/tags", + "/{name}", web::put() - .to(http::datasets::put_dataset_tags) - .authorize_for_resource(Action::CreateStream), - ) - .route( - "/{name}/labels", - web::put() - .to(http::datasets::put_dataset_labels) + .to(http::datasets::put_dataset_metadata) .authorize_for_resource(Action::CreateStream), ) } From f0e3e02c1a35a329408642301649840951011ffe Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 25 Feb 2026 17:49:47 +1100 Subject: [PATCH 04/10] add tenant_id params in new handlers --- src/handlers/http/datasets.rs | 33 +++++++++++++++++++++++---------- src/parseable/mod.rs | 3 ++- src/storage/object_storage.rs | 5 +++-- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs index 2e9d5ba7a..9bd6f533f 100644 --- a/src/handlers/http/datasets.rs +++ b/src/handlers/http/datasets.rs @@ -19,9 +19,10 @@ use std::collections::HashSet; use actix_web::http::StatusCode; -use actix_web::{HttpResponse, web}; +use actix_web::{HttpRequest, HttpResponse, web}; use serde::{Deserialize, Serialize}; +use crate::utils::get_tenant_id_from_request; use crate::{ handlers::DatasetTag, parseable::PARSEABLE, @@ -39,12 +40,13 @@ struct CorrelatedDataset { /// GET /api/v1/datasets/{name}/correlated /// Returns all datasets sharing at least one tag or label with the named dataset. pub async fn get_correlated_datasets( + req: HttpRequest, path: web::Path, ) -> Result { let dataset_name = path.into_inner(); - + let tenant_id = get_tenant_id_from_request(&req); let stream = PARSEABLE - .get_stream(&dataset_name) + .get_stream(&dataset_name, &tenant_id) .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; let target_tags: HashSet = stream.get_dataset_tags().into_iter().collect(); @@ -54,14 +56,14 @@ pub async fn get_correlated_datasets( return Ok(HttpResponse::Ok().json(Vec::::new())); } - let all_streams = PARSEABLE.streams.list(); + let all_streams = PARSEABLE.streams.list(&tenant_id); let mut correlated = Vec::new(); for name in all_streams { if name == dataset_name { continue; } - if let Ok(s) = PARSEABLE.get_stream(&name) { + if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) { // Skip internal streams if s.get_stream_type() == StreamType::Internal { continue; @@ -89,16 +91,20 @@ pub async fn get_correlated_datasets( /// GET /api/v1/datasets/tags/{tag} /// Returns all datasets that have the specified tag. -pub async fn get_datasets_by_tag(path: web::Path) -> Result { +pub async fn get_datasets_by_tag( + req: HttpRequest, + path: web::Path, +) -> Result { + let tenant_id = get_tenant_id_from_request(&req); let tag_str = path.into_inner(); let tag = DatasetTag::try_from(tag_str.as_str()).map_err(|_| DatasetsError::InvalidTag(tag_str))?; - let all_streams = PARSEABLE.streams.list(); + let all_streams = PARSEABLE.streams.list(&tenant_id); let mut matching = Vec::new(); for name in all_streams { - if let Ok(s) = PARSEABLE.get_stream(&name) { + if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) { if s.get_stream_type() == StreamType::Internal { continue; } @@ -121,14 +127,16 @@ pub struct PutDatasetMetadataBody { /// Replaces the dataset's tags and/or labels. /// Only fields present in the body are updated; absent fields are left unchanged. pub async fn put_dataset_metadata( + req: HttpRequest, path: web::Path, body: web::Json, ) -> Result { let dataset_name = path.into_inner(); let body = body.into_inner(); + let tenant_id = get_tenant_id_from_request(&req); let stream = PARSEABLE - .get_stream(&dataset_name) + .get_stream(&dataset_name, &tenant_id) .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; let final_tags = match body.tags { @@ -151,7 +159,12 @@ pub async fn put_dataset_metadata( // Update storage first, then in-memory let storage = PARSEABLE.storage.get_object_store(); storage - .update_dataset_tags_and_labels_in_stream(&dataset_name, &final_tags, &final_labels) + .update_dataset_tags_and_labels_in_stream( + &dataset_name, + &final_tags, + &final_labels, + &tenant_id, + ) .await .map_err(DatasetsError::Storage)?; diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 4ca4eac25..10ae4ebf6 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -476,7 +476,8 @@ impl Parseable { vec![log_source_entry.clone()], TelemetryType::Logs, &tenant_id, - None, + vec![], + vec![] ) .await; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 54000b3d4..c82e32737 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -501,11 +501,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { stream_name: &str, tags: &[DatasetTag], labels: &[String], + tenant_id: &Option, ) -> Result<(), ObjectStorageError> { let mut format: ObjectStoreFormat = serde_json::from_slice( &PARSEABLE .metastore - .get_stream_json(stream_name, false) + .get_stream_json(stream_name, false, tenant_id) .await .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?, )?; @@ -513,7 +514,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { format.dataset_labels = labels.to_owned(); PARSEABLE .metastore - .put_stream_json(&format, stream_name) + .put_stream_json(&format, stream_name, tenant_id) .await .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?; From 5d5a265a7b56c866936c3477687d51777b9a6488 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 18 Mar 2026 17:59:45 +1100 Subject: [PATCH 05/10] add validation for otel datasets --- src/handlers/http/datasets.rs | 27 +++++++++++++++++++++++++++ src/handlers/http/ingest.rs | 14 +++++++++++--- src/handlers/http/modal/server.rs | 4 ++-- src/handlers/mod.rs | 20 ++++++++++---------- src/parseable/mod.rs | 2 +- 5 files changed, 51 insertions(+), 16 deletions(-) diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs index 9bd6f533f..2bfcfb82c 100644 --- a/src/handlers/http/datasets.rs +++ b/src/handlers/http/datasets.rs @@ -22,6 +22,8 @@ use actix_web::http::StatusCode; use actix_web::{HttpRequest, HttpResponse, web}; use serde::{Deserialize, Serialize}; +use crate::rbac::{self, Users, role::Action}; +use crate::utils::actix::extract_session_key_from_req; use crate::utils::get_tenant_id_from_request; use crate::{ handlers::DatasetTag, @@ -29,6 +31,15 @@ use crate::{ storage::{ObjectStorageError, StreamType}, }; +/// Check if the caller is authorized to read a specific stream. +fn can_access_stream(req: &HttpRequest, stream_name: &str) -> bool { + let Ok(key) = extract_session_key_from_req(req) else { + return false; + }; + Users.authorize(key, Action::GetStreamInfo, Some(stream_name), None) + == rbac::Response::Authorized +} + #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct CorrelatedDataset { @@ -39,12 +50,19 @@ struct CorrelatedDataset { /// GET /api/v1/datasets/{name}/correlated /// Returns all datasets sharing at least one tag or label with the named dataset. +/// Results are filtered to only include datasets the caller is authorized to read. pub async fn get_correlated_datasets( req: HttpRequest, path: web::Path, ) -> Result { let dataset_name = path.into_inner(); let tenant_id = get_tenant_id_from_request(&req); + + // Authorize caller for the seed dataset + if !can_access_stream(&req, &dataset_name) { + return Err(DatasetsError::DatasetNotFound(dataset_name)); + } + let stream = PARSEABLE .get_stream(&dataset_name, &tenant_id) .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; @@ -63,6 +81,10 @@ pub async fn get_correlated_datasets( if name == dataset_name { continue; } + // Filter out datasets the caller cannot read + if !can_access_stream(&req, &name) { + continue; + } if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) { // Skip internal streams if s.get_stream_type() == StreamType::Internal { @@ -91,6 +113,7 @@ pub async fn get_correlated_datasets( /// GET /api/v1/datasets/tags/{tag} /// Returns all datasets that have the specified tag. +/// Results are filtered to only include datasets the caller is authorized to read. pub async fn get_datasets_by_tag( req: HttpRequest, path: web::Path, @@ -104,6 +127,10 @@ pub async fn get_datasets_by_tag( let mut matching = Vec::new(); for name in all_streams { + // Filter out datasets the caller cannot read + if !can_access_stream(&req, &name) { + continue; + } if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) { if s.get_stream_type() == StreamType::Internal { continue; diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index a02fe9323..b40dc3499 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -32,7 +32,7 @@ use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion; use crate::handlers::{ - CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, + CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType, }; use crate::metadata::SchemaVersion; @@ -207,6 +207,8 @@ pub async fn setup_otel_stream( expected_log_source: LogSource, known_fields: &[&str], telemetry_type: TelemetryType, + dataset_tags: Vec, + dataset_labels: Vec, ) -> Result<(String, LogSource, LogSourceEntry, Option), PostError> { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -240,8 +242,8 @@ pub async fn setup_otel_stream( vec![log_source_entry.clone()], telemetry_type, &tenant_id, - vec![], - vec![], + dataset_tags, + dataset_labels, ) .await?; let mut time_partition = None; @@ -364,6 +366,8 @@ pub async fn handle_otel_logs_ingestion( LogSource::OtelLogs, &OTEL_LOG_KNOWN_FIELD_LIST, TelemetryType::Logs, + vec![], + vec![], ) .await .map_err(|e| { @@ -388,6 +392,8 @@ pub async fn handle_otel_metrics_ingestion( LogSource::OtelMetrics, &OTEL_METRICS_KNOWN_FIELD_LIST, TelemetryType::Metrics, + vec![], + vec![], ) .await .map_err(|e| { @@ -419,6 +425,8 @@ pub async fn handle_otel_traces_ingestion( LogSource::OtelTraces, &OTEL_TRACES_KNOWN_FIELD_LIST, TelemetryType::Traces, + vec![], + vec![], ) .await .map_err(|e| { diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index dde70bd1c..b2c006f03 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -213,13 +213,13 @@ impl Server { "/tags/{tag}", web::get() .to(http::datasets::get_datasets_by_tag) - .authorize_for_resource(Action::GetStreamInfo), + .authorize(Action::GetStreamInfo), ) .route( "/{name}/correlated", web::get() .to(http::datasets::get_correlated_datasets) - .authorize_for_resource(Action::GetStreamInfo), + .authorize(Action::GetStreamInfo), ) .route( "/{name}", diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index ebbea833b..f08e8c88e 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -92,10 +92,10 @@ impl Display for TelemetryType { #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "kebab-case")] pub enum DatasetTag { - AgentMonitoring, - K8sMonitoring, + AgentObservability, + K8sObservability, DatabaseObservability, - ApplicationMonitoring, + APM, ServiceMap, } @@ -104,13 +104,13 @@ impl TryFrom<&str> for DatasetTag { fn try_from(s: &str) -> Result { match s.to_lowercase().as_str() { - "agent-monitoring" => Ok(DatasetTag::AgentMonitoring), - "k8s-monitoring" => Ok(DatasetTag::K8sMonitoring), + "agent-observability" => Ok(DatasetTag::AgentObservability), + "k8s-observability" => Ok(DatasetTag::K8sObservability), "database-observability" => Ok(DatasetTag::DatabaseObservability), - "application-monitoring" => Ok(DatasetTag::ApplicationMonitoring), + "apm" => Ok(DatasetTag::APM), "service-map" => Ok(DatasetTag::ServiceMap), _ => Err( - "Invalid dataset tag. Supported values: agent-monitoring, k8s-monitoring, database-observability, application-monitoring, service-map", + "Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability, apm, service-map", ), } } @@ -119,10 +119,10 @@ impl TryFrom<&str> for DatasetTag { impl Display for DatasetTag { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(match self { - DatasetTag::AgentMonitoring => "agent-monitoring", - DatasetTag::K8sMonitoring => "k8s-monitoring", + DatasetTag::AgentObservability => "agent-observability", + DatasetTag::K8sObservability => "k8s-observability", DatasetTag::DatabaseObservability => "database-observability", - DatasetTag::ApplicationMonitoring => "application-monitoring", + DatasetTag::APM => "apm", DatasetTag::ServiceMap => "service-map", }) } diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 10ae4ebf6..484fd5b8e 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -477,7 +477,7 @@ impl Parseable { TelemetryType::Logs, &tenant_id, vec![], - vec![] + vec![], ) .await; From 859e4e7bde979df57af96c2c15583c1b3540af35 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 18 Mar 2026 18:12:54 +1100 Subject: [PATCH 06/10] use middleware-normalized header for post datasets --- src/handlers/http/prism_logstream.rs | 3 ++- src/prism/logstream/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/handlers/http/prism_logstream.rs b/src/handlers/http/prism_logstream.rs index b4fb935e7..4f71941b0 100644 --- a/src/handlers/http/prism_logstream.rs +++ b/src/handlers/http/prism_logstream.rs @@ -43,10 +43,11 @@ pub async fn post_datasets( req: HttpRequest, ) -> Result { let session_key = extract_session_key_from_req(&req)?; + let tenant_id = get_tenant_id_from_request(&req); let dataset = dataset_req .map(|Json(r)| r) .unwrap_or_default() - .get_datasets(session_key) + .get_datasets(session_key, tenant_id) .await?; Ok(web::Json(dataset)) diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 38b31c207..ec7e62fab 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -40,7 +40,7 @@ use crate::{ rbac::{Users, map::SessionKey, role::Action}, stats, storage::{StreamInfo, StreamType, retention::Retention}, - utils::{get_tenant_id_from_key, time::TimeParseError}, + utils::time::TimeParseError, }; #[derive(Serialize)] @@ -231,8 +231,8 @@ impl PrismDatasetRequest { pub async fn get_datasets( mut self, key: SessionKey, + tenant_id: Option, ) -> Result, PrismLogstreamError> { - let tenant_id = get_tenant_id_from_key(&key); if self.streams.is_empty() { self.streams = PARSEABLE.streams.list(&tenant_id); } From 80e528d6e9529e7ac126ebd5307b3f52f32b4d6e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 18 Mar 2026 18:17:06 +1100 Subject: [PATCH 07/10] fix route for correlated dataset --- src/handlers/http/datasets.rs | 2 +- src/handlers/http/modal/server.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs index 2bfcfb82c..b850f97f2 100644 --- a/src/handlers/http/datasets.rs +++ b/src/handlers/http/datasets.rs @@ -48,7 +48,7 @@ struct CorrelatedDataset { shared_labels: Vec, } -/// GET /api/v1/datasets/{name}/correlated +/// GET /api/v1/datasets/correlated/{name} /// Returns all datasets sharing at least one tag or label with the named dataset. /// Results are filtered to only include datasets the caller is authorized to read. pub async fn get_correlated_datasets( diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index b2c006f03..26d3516c5 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -210,15 +210,15 @@ impl Server { .authorize_for_resource(Action::GetRetention), ) .route( - "/tags/{tag}", + "/correlated/{name}", web::get() - .to(http::datasets::get_datasets_by_tag) + .to(http::datasets::get_correlated_datasets) .authorize(Action::GetStreamInfo), ) .route( - "/{name}/correlated", + "/tags/{tag}", web::get() - .to(http::datasets::get_correlated_datasets) + .to(http::datasets::get_datasets_by_tag) .authorize(Action::GetStreamInfo), ) .route( From 6cab92ea052f043ed1316c1bfa591881262ffedc Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 19 Mar 2026 04:59:26 +1100 Subject: [PATCH 08/10] add log source to the correlated and tagged dataset apis --- src/handlers/http/datasets.rs | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs index b850f97f2..9844db1e0 100644 --- a/src/handlers/http/datasets.rs +++ b/src/handlers/http/datasets.rs @@ -22,6 +22,7 @@ use actix_web::http::StatusCode; use actix_web::{HttpRequest, HttpResponse, web}; use serde::{Deserialize, Serialize}; +use crate::event::format::LogSource; use crate::rbac::{self, Users, role::Action}; use crate::utils::actix::extract_session_key_from_req; use crate::utils::get_tenant_id_from_request; @@ -44,6 +45,7 @@ fn can_access_stream(req: &HttpRequest, stream_name: &str) -> bool { #[serde(rename_all = "camelCase")] struct CorrelatedDataset { name: String, + log_source: LogSource, shared_tags: Vec, shared_labels: Vec, } @@ -99,8 +101,14 @@ pub async fn get_correlated_datasets( target_labels.intersection(&s_labels).cloned().collect(); if !shared_tags.is_empty() || !shared_labels.is_empty() { + let log_source = s + .get_log_source() + .first() + .map(|entry| entry.log_source_format.clone()) + .unwrap_or_default(); correlated.push(CorrelatedDataset { name, + log_source, shared_tags, shared_labels, }); @@ -111,6 +119,13 @@ pub async fn get_correlated_datasets( Ok(HttpResponse::Ok().json(correlated)) } +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct TaggedDataset { + name: String, + log_source: LogSource, +} + /// GET /api/v1/datasets/tags/{tag} /// Returns all datasets that have the specified tag. /// Results are filtered to only include datasets the caller is authorized to read. @@ -136,7 +151,12 @@ pub async fn get_datasets_by_tag( continue; } if s.get_dataset_tags().contains(&tag) { - matching.push(name); + let log_source = s + .get_log_source() + .first() + .map(|entry| entry.log_source_format.clone()) + .unwrap_or_default(); + matching.push(TaggedDataset { name, log_source }); } } } From 9fa85368626d767f5fd82140079d096d3ef10e2d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 20 Mar 2026 00:10:15 +1100 Subject: [PATCH 09/10] load stream from storage --- src/handlers/http/datasets.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs index 9844db1e0..4b13ab4c8 100644 --- a/src/handlers/http/datasets.rs +++ b/src/handlers/http/datasets.rs @@ -65,6 +65,13 @@ pub async fn get_correlated_datasets( return Err(DatasetsError::DatasetNotFound(dataset_name)); } + if !PARSEABLE + .check_or_load_stream(&dataset_name, &tenant_id) + .await + { + return Err(DatasetsError::DatasetNotFound(dataset_name)); + } + let stream = PARSEABLE .get_stream(&dataset_name, &tenant_id) .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; @@ -182,6 +189,13 @@ pub async fn put_dataset_metadata( let body = body.into_inner(); let tenant_id = get_tenant_id_from_request(&req); + if !PARSEABLE + .check_or_load_stream(&dataset_name, &tenant_id) + .await + { + return Err(DatasetsError::DatasetNotFound(dataset_name)); + } + let stream = PARSEABLE .get_stream(&dataset_name, &tenant_id) .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; From c37a57dcd8fa6199f8bdb84c4231b75b203844f9 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 20 Mar 2026 01:30:25 +1100 Subject: [PATCH 10/10] verify dataset rbac --- src/handlers/http/datasets.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs index 4b13ab4c8..cd998a385 100644 --- a/src/handlers/http/datasets.rs +++ b/src/handlers/http/datasets.rs @@ -189,6 +189,16 @@ pub async fn put_dataset_metadata( let body = body.into_inner(); let tenant_id = get_tenant_id_from_request(&req); + // Explicit per-dataset RBAC check — the middleware cannot extract the + // `{name}` path param (it looks for `logstream`), so we must verify here. + let session_key = extract_session_key_from_req(&req) + .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; + if Users.authorize(session_key, Action::CreateStream, Some(&dataset_name), None) + != rbac::Response::Authorized + { + return Err(DatasetsError::Unauthorized(dataset_name)); + } + if !PARSEABLE .check_or_load_stream(&dataset_name, &tenant_id) .await @@ -242,6 +252,8 @@ pub async fn put_dataset_metadata( pub enum DatasetsError { #[error("Dataset not found: {0}")] DatasetNotFound(String), + #[error("Unauthorized access to dataset: {0}")] + Unauthorized(String), #[error("Invalid tag: {0}")] InvalidTag(String), #[error("Storage error: {0}")] @@ -252,6 +264,7 @@ impl actix_web::ResponseError for DatasetsError { fn status_code(&self) -> StatusCode { match self { DatasetsError::DatasetNotFound(_) => StatusCode::NOT_FOUND, + DatasetsError::Unauthorized(_) => StatusCode::FORBIDDEN, DatasetsError::InvalidTag(_) => StatusCode::BAD_REQUEST, DatasetsError::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR, }