diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs
index 4a61258d7..f918b6d26 100644
--- a/src/connectors/kafka/processor.rs
+++ b/src/connectors/kafka/processor.rs
@@ -64,7 +64,8 @@ impl ParseableSinkProcessor {
vec![log_source_entry],
TelemetryType::default(),
tenant_id,
- None,
+ vec![],
+ vec![],
)
.await?;
diff --git a/src/handlers/http/datasets.rs b/src/handlers/http/datasets.rs
new file mode 100644
index 000000000..cd998a385
--- /dev/null
+++ b/src/handlers/http/datasets.rs
@@ -0,0 +1,278 @@
+/*
+ * Parseable Server (C) 2022 - 2025 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use std::collections::HashSet;
+
+use actix_web::http::StatusCode;
+use actix_web::{HttpRequest, HttpResponse, web};
+use serde::{Deserialize, Serialize};
+
+use crate::event::format::LogSource;
+use crate::rbac::{self, Users, role::Action};
+use crate::utils::actix::extract_session_key_from_req;
+use crate::utils::get_tenant_id_from_request;
+use crate::{
+ handlers::DatasetTag,
+ parseable::PARSEABLE,
+ storage::{ObjectStorageError, StreamType},
+};
+
+/// Check if the caller is authorized to read a specific stream.
+fn can_access_stream(req: &HttpRequest, stream_name: &str) -> bool {
+ let Ok(key) = extract_session_key_from_req(req) else {
+ return false;
+ };
+ Users.authorize(key, Action::GetStreamInfo, Some(stream_name), None)
+ == rbac::Response::Authorized
+}
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct CorrelatedDataset {
+ name: String,
+ log_source: LogSource,
+ shared_tags: Vec,
+ shared_labels: Vec,
+}
+
+/// GET /api/v1/datasets/correlated/{name}
+/// Returns all datasets sharing at least one tag or label with the named dataset.
+/// Results are filtered to only include datasets the caller is authorized to read.
+pub async fn get_correlated_datasets(
+ req: HttpRequest,
+ path: web::Path,
+) -> Result {
+ let dataset_name = path.into_inner();
+ let tenant_id = get_tenant_id_from_request(&req);
+
+ // Authorize caller for the seed dataset
+ if !can_access_stream(&req, &dataset_name) {
+ return Err(DatasetsError::DatasetNotFound(dataset_name));
+ }
+
+ if !PARSEABLE
+ .check_or_load_stream(&dataset_name, &tenant_id)
+ .await
+ {
+ return Err(DatasetsError::DatasetNotFound(dataset_name));
+ }
+
+ let stream = PARSEABLE
+ .get_stream(&dataset_name, &tenant_id)
+ .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
+
+ let target_tags: HashSet = stream.get_dataset_tags().into_iter().collect();
+ let target_labels: HashSet = stream.get_dataset_labels().into_iter().collect();
+
+ if target_tags.is_empty() && target_labels.is_empty() {
+ return Ok(HttpResponse::Ok().json(Vec::::new()));
+ }
+
+ let all_streams = PARSEABLE.streams.list(&tenant_id);
+ let mut correlated = Vec::new();
+
+ for name in all_streams {
+ if name == dataset_name {
+ continue;
+ }
+ // Filter out datasets the caller cannot read
+ if !can_access_stream(&req, &name) {
+ continue;
+ }
+ if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) {
+ // Skip internal streams
+ if s.get_stream_type() == StreamType::Internal {
+ continue;
+ }
+
+ let s_tags: HashSet = s.get_dataset_tags().into_iter().collect();
+ let s_labels: HashSet = s.get_dataset_labels().into_iter().collect();
+
+ let shared_tags: Vec = target_tags.intersection(&s_tags).copied().collect();
+ let shared_labels: Vec =
+ target_labels.intersection(&s_labels).cloned().collect();
+
+ if !shared_tags.is_empty() || !shared_labels.is_empty() {
+ let log_source = s
+ .get_log_source()
+ .first()
+ .map(|entry| entry.log_source_format.clone())
+ .unwrap_or_default();
+ correlated.push(CorrelatedDataset {
+ name,
+ log_source,
+ shared_tags,
+ shared_labels,
+ });
+ }
+ }
+ }
+
+ Ok(HttpResponse::Ok().json(correlated))
+}
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct TaggedDataset {
+ name: String,
+ log_source: LogSource,
+}
+
+/// GET /api/v1/datasets/tags/{tag}
+/// Returns all datasets that have the specified tag.
+/// Results are filtered to only include datasets the caller is authorized to read.
+pub async fn get_datasets_by_tag(
+ req: HttpRequest,
+ path: web::Path,
+) -> Result {
+ let tenant_id = get_tenant_id_from_request(&req);
+ let tag_str = path.into_inner();
+ let tag =
+ DatasetTag::try_from(tag_str.as_str()).map_err(|_| DatasetsError::InvalidTag(tag_str))?;
+
+ let all_streams = PARSEABLE.streams.list(&tenant_id);
+ let mut matching = Vec::new();
+
+ for name in all_streams {
+ // Filter out datasets the caller cannot read
+ if !can_access_stream(&req, &name) {
+ continue;
+ }
+ if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) {
+ if s.get_stream_type() == StreamType::Internal {
+ continue;
+ }
+ if s.get_dataset_tags().contains(&tag) {
+ let log_source = s
+ .get_log_source()
+ .first()
+ .map(|entry| entry.log_source_format.clone())
+ .unwrap_or_default();
+ matching.push(TaggedDataset { name, log_source });
+ }
+ }
+ }
+
+ Ok(HttpResponse::Ok().json(matching))
+}
+
+#[derive(Debug, Deserialize)]
+pub struct PutDatasetMetadataBody {
+ pub tags: Option>,
+ pub labels: Option>,
+}
+
+/// PUT /api/v1/datasets/{name}
+/// Replaces the dataset's tags and/or labels.
+/// Only fields present in the body are updated; absent fields are left unchanged.
+pub async fn put_dataset_metadata(
+ req: HttpRequest,
+ path: web::Path,
+ body: web::Json,
+) -> Result {
+ let dataset_name = path.into_inner();
+ let body = body.into_inner();
+ let tenant_id = get_tenant_id_from_request(&req);
+
+ // Explicit per-dataset RBAC check — the middleware cannot extract the
+ // `{name}` path param (it looks for `logstream`), so we must verify here.
+ let session_key = extract_session_key_from_req(&req)
+ .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
+ if Users.authorize(session_key, Action::CreateStream, Some(&dataset_name), None)
+ != rbac::Response::Authorized
+ {
+ return Err(DatasetsError::Unauthorized(dataset_name));
+ }
+
+ if !PARSEABLE
+ .check_or_load_stream(&dataset_name, &tenant_id)
+ .await
+ {
+ return Err(DatasetsError::DatasetNotFound(dataset_name));
+ }
+
+ let stream = PARSEABLE
+ .get_stream(&dataset_name, &tenant_id)
+ .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
+
+ let final_tags = match body.tags {
+ Some(tags) => tags
+ .into_iter()
+ .collect::>()
+ .into_iter()
+ .collect(),
+ None => stream.get_dataset_tags(),
+ };
+ let final_labels = match body.labels {
+ Some(labels) => labels
+ .into_iter()
+ .collect::>()
+ .into_iter()
+ .collect(),
+ None => stream.get_dataset_labels(),
+ };
+
+ // Update storage first, then in-memory
+ let storage = PARSEABLE.storage.get_object_store();
+ storage
+ .update_dataset_tags_and_labels_in_stream(
+ &dataset_name,
+ &final_tags,
+ &final_labels,
+ &tenant_id,
+ )
+ .await
+ .map_err(DatasetsError::Storage)?;
+
+ stream.set_dataset_tags(final_tags.clone());
+ stream.set_dataset_labels(final_labels.clone());
+
+ Ok(HttpResponse::Ok().json(serde_json::json!({
+ "tags": final_tags,
+ "labels": final_labels,
+ })))
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum DatasetsError {
+ #[error("Dataset not found: {0}")]
+ DatasetNotFound(String),
+ #[error("Unauthorized access to dataset: {0}")]
+ Unauthorized(String),
+ #[error("Invalid tag: {0}")]
+ InvalidTag(String),
+ #[error("Storage error: {0}")]
+ Storage(ObjectStorageError),
+}
+
+impl actix_web::ResponseError for DatasetsError {
+ fn status_code(&self) -> StatusCode {
+ match self {
+ DatasetsError::DatasetNotFound(_) => StatusCode::NOT_FOUND,
+ DatasetsError::Unauthorized(_) => StatusCode::FORBIDDEN,
+ DatasetsError::InvalidTag(_) => StatusCode::BAD_REQUEST,
+ DatasetsError::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ }
+ }
+
+ fn error_response(&self) -> HttpResponse {
+ HttpResponse::build(self.status_code()).json(serde_json::json!({
+ "error": self.to_string()
+ }))
+ }
+}
diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs
index 0f3a87744..b40dc3499 100644
--- a/src/handlers/http/ingest.rs
+++ b/src/handlers/http/ingest.rs
@@ -32,7 +32,7 @@ use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion;
use crate::handlers::{
- CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
+ CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
};
use crate::metadata::SchemaVersion;
@@ -120,7 +120,8 @@ pub async fn ingest(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
- None,
+ vec![],
+ vec![],
)
.await
.map_err(|e| {
@@ -206,6 +207,8 @@ pub async fn setup_otel_stream(
expected_log_source: LogSource,
known_fields: &[&str],
telemetry_type: TelemetryType,
+ dataset_tags: Vec,
+ dataset_labels: Vec,
) -> Result<(String, LogSource, LogSourceEntry, Option), PostError> {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -239,7 +242,8 @@ pub async fn setup_otel_stream(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
- None,
+ dataset_tags,
+ dataset_labels,
)
.await?;
let mut time_partition = None;
@@ -362,6 +366,8 @@ pub async fn handle_otel_logs_ingestion(
LogSource::OtelLogs,
&OTEL_LOG_KNOWN_FIELD_LIST,
TelemetryType::Logs,
+ vec![],
+ vec![],
)
.await
.map_err(|e| {
@@ -386,6 +392,8 @@ pub async fn handle_otel_metrics_ingestion(
LogSource::OtelMetrics,
&OTEL_METRICS_KNOWN_FIELD_LIST,
TelemetryType::Metrics,
+ vec![],
+ vec![],
)
.await
.map_err(|e| {
@@ -417,6 +425,8 @@ pub async fn handle_otel_traces_ingestion(
LogSource::OtelTraces,
&OTEL_TRACES_KNOWN_FIELD_LIST,
TelemetryType::Traces,
+ vec![],
+ vec![],
)
.await
.map_err(|e| {
diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs
index 993a40b10..a4136e939 100644
--- a/src/handlers/http/mod.rs
+++ b/src/handlers/http/mod.rs
@@ -32,6 +32,7 @@ pub mod about;
pub mod alerts;
pub mod cluster;
pub mod correlation;
+pub mod datasets;
pub mod demo_data;
pub mod health_check;
pub mod ingest;
diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs
index 29a977c3d..26d3516c5 100644
--- a/src/handlers/http/modal/server.rs
+++ b/src/handlers/http/modal/server.rs
@@ -200,14 +200,33 @@ impl Server {
}
pub fn get_prism_datasets() -> Scope {
- web::scope("/datasets").route(
- "",
- web::post()
- .to(http::prism_logstream::post_datasets)
- .authorize_for_resource(Action::GetStreamInfo)
- .authorize_for_resource(Action::GetStats)
- .authorize_for_resource(Action::GetRetention),
- )
+ web::scope("/datasets")
+ .route(
+ "",
+ web::post()
+ .to(http::prism_logstream::post_datasets)
+ .authorize_for_resource(Action::GetStreamInfo)
+ .authorize_for_resource(Action::GetStats)
+ .authorize_for_resource(Action::GetRetention),
+ )
+ .route(
+ "/correlated/{name}",
+ web::get()
+ .to(http::datasets::get_correlated_datasets)
+ .authorize(Action::GetStreamInfo),
+ )
+ .route(
+ "/tags/{tag}",
+ web::get()
+ .to(http::datasets::get_datasets_by_tag)
+ .authorize(Action::GetStreamInfo),
+ )
+ .route(
+ "/{name}",
+ web::put()
+ .to(http::datasets::put_dataset_metadata)
+ .authorize_for_resource(Action::CreateStream),
+ )
}
pub fn get_demo_data_webscope() -> Scope {
diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs
index 7b93fafbc..7930df77f 100644
--- a/src/handlers/http/modal/utils/logstream_utils.rs
+++ b/src/handlers/http/modal/utils/logstream_utils.rs
@@ -16,17 +16,18 @@
*
*/
+use actix_web::http::header::HeaderMap;
+
use crate::{
event::format::LogSource,
handlers::{
- CUSTOM_PARTITION_KEY, DATASET_TAG_KEY, DatasetTag, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG,
- STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
- TelemetryType, UPDATE_STREAM_KEY,
+ CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag,
+ LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY,
+ TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY,
+ parse_dataset_labels, parse_dataset_tags,
},
storage::StreamType,
};
-use actix_web::http::header::HeaderMap;
-use tracing::warn;
#[derive(Debug, Default)]
pub struct PutStreamHeaders {
@@ -38,7 +39,8 @@ pub struct PutStreamHeaders {
pub stream_type: StreamType,
pub log_source: LogSource,
pub telemetry_type: TelemetryType,
- pub dataset_tag: Option,
+ pub dataset_tags: Vec,
+ pub dataset_labels: Vec,
}
impl From<&HeaderMap> for PutStreamHeaders {
@@ -72,16 +74,17 @@ impl From<&HeaderMap> for PutStreamHeaders {
.get(TELEMETRY_TYPE_KEY)
.and_then(|v| v.to_str().ok())
.map_or(TelemetryType::Logs, TelemetryType::from),
- dataset_tag: headers
- .get(DATASET_TAG_KEY)
+ dataset_tags: headers
+ .get(DATASET_TAGS_KEY)
+ .or_else(|| headers.get(DATASET_TAG_KEY))
.and_then(|v| v.to_str().ok())
- .and_then(|v| match DatasetTag::try_from(v) {
- Ok(tag) => Some(tag),
- Err(err) => {
- warn!("Invalid dataset tag '{v}': {err}");
- None
- }
- }),
+ .map(parse_dataset_tags)
+ .unwrap_or_default(),
+ dataset_labels: headers
+ .get(DATASET_LABELS_KEY)
+ .and_then(|v| v.to_str().ok())
+ .map(parse_dataset_labels)
+ .unwrap_or_default(),
}
}
}
diff --git a/src/handlers/http/prism_logstream.rs b/src/handlers/http/prism_logstream.rs
index b4fb935e7..4f71941b0 100644
--- a/src/handlers/http/prism_logstream.rs
+++ b/src/handlers/http/prism_logstream.rs
@@ -43,10 +43,11 @@ pub async fn post_datasets(
req: HttpRequest,
) -> Result {
let session_key = extract_session_key_from_req(&req)?;
+ let tenant_id = get_tenant_id_from_request(&req);
let dataset = dataset_req
.map(|Json(r)| r)
.unwrap_or_default()
- .get_datasets(session_key)
+ .get_datasets(session_key, tenant_id)
.await?;
Ok(web::Json(dataset))
diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs
index 034e524b1..f08e8c88e 100644
--- a/src/handlers/mod.rs
+++ b/src/handlers/mod.rs
@@ -16,9 +16,11 @@
*
*/
+use std::collections::HashSet;
use std::fmt::Display;
use serde::{Deserialize, Serialize};
+use tracing::warn;
pub mod airplane;
pub mod http;
@@ -36,6 +38,8 @@ pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
pub const DATASET_TAG_KEY: &str = "x-p-dataset-tag";
+pub const DATASET_TAGS_KEY: &str = "x-p-dataset-tags";
+pub const DATASET_LABELS_KEY: &str = "x-p-dataset-labels";
pub const TENANT_ID: &str = "x-p-tenant";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
@@ -85,12 +89,14 @@ impl Display for TelemetryType {
}
/// Tag for categorizing datasets/streams by observability domain
-#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "kebab-case")]
pub enum DatasetTag {
AgentObservability,
K8sObservability,
DatabaseObservability,
+ APM,
+ ServiceMap,
}
impl TryFrom<&str> for DatasetTag {
@@ -101,8 +107,10 @@ impl TryFrom<&str> for DatasetTag {
"agent-observability" => Ok(DatasetTag::AgentObservability),
"k8s-observability" => Ok(DatasetTag::K8sObservability),
"database-observability" => Ok(DatasetTag::DatabaseObservability),
+ "apm" => Ok(DatasetTag::APM),
+ "service-map" => Ok(DatasetTag::ServiceMap),
_ => Err(
- "Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability",
+ "Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability, apm, service-map",
),
}
}
@@ -114,6 +122,40 @@ impl Display for DatasetTag {
DatasetTag::AgentObservability => "agent-observability",
DatasetTag::K8sObservability => "k8s-observability",
DatasetTag::DatabaseObservability => "database-observability",
+ DatasetTag::APM => "apm",
+ DatasetTag::ServiceMap => "service-map",
})
}
}
+
+pub fn parse_dataset_tags(header_value: &str) -> Vec {
+ header_value
+ .split(',')
+ .filter_map(|s| {
+ let trimmed = s.trim();
+ if trimmed.is_empty() {
+ None
+ } else {
+ match DatasetTag::try_from(trimmed) {
+ Ok(tag) => Some(tag),
+ Err(err) => {
+ warn!("Invalid dataset tag '{trimmed}': {err}");
+ None
+ }
+ }
+ }
+ })
+ .collect::>()
+ .into_iter()
+ .collect()
+}
+
+pub fn parse_dataset_labels(header_value: &str) -> Vec {
+ header_value
+ .split(',')
+ .map(|s| s.trim().to_string())
+ .filter(|s| !s.is_empty())
+ .collect::>()
+ .into_iter()
+ .collect()
+}
diff --git a/src/metadata.rs b/src/metadata.rs
index b0fa024fd..9ca1dd1e7 100644
--- a/src/metadata.rs
+++ b/src/metadata.rs
@@ -93,7 +93,8 @@ pub struct LogStreamMetadata {
pub stream_type: StreamType,
pub log_source: Vec,
pub telemetry_type: TelemetryType,
- pub dataset_tag: Option,
+ pub dataset_tags: Vec,
+ pub dataset_labels: Vec,
}
impl LogStreamMetadata {
@@ -109,7 +110,8 @@ impl LogStreamMetadata {
schema_version: SchemaVersion,
log_source: Vec,
telemetry_type: TelemetryType,
- dataset_tag: Option,
+ dataset_tags: Vec,
+ dataset_labels: Vec,
) -> Self {
LogStreamMetadata {
created_at: if created_at.is_empty() {
@@ -134,7 +136,8 @@ impl LogStreamMetadata {
schema_version,
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
..Default::default()
}
}
diff --git a/src/migration/mod.rs b/src/migration/mod.rs
index 7d7027fe4..8e67d59e6 100644
--- a/src/migration/mod.rs
+++ b/src/migration/mod.rs
@@ -454,7 +454,8 @@ async fn setup_logstream_metadata(
stream_type,
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
..
} = serde_json::from_value(stream_metadata_value).unwrap_or_default();
@@ -500,7 +501,8 @@ async fn setup_logstream_metadata(
stream_type,
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
};
Ok(metadata)
diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs
index d7d02ffad..484fd5b8e 100644
--- a/src/parseable/mod.rs
+++ b/src/parseable/mod.rs
@@ -416,7 +416,8 @@ impl Parseable {
let schema_version = stream_metadata.schema_version;
let log_source = stream_metadata.log_source;
let telemetry_type = stream_metadata.telemetry_type;
- let dataset_tag = stream_metadata.dataset_tag;
+ let dataset_tags = stream_metadata.dataset_tags;
+ let dataset_labels = stream_metadata.dataset_labels;
let mut metadata = LogStreamMetadata::new(
created_at,
time_partition,
@@ -428,7 +429,8 @@ impl Parseable {
schema_version,
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
);
// Set hot tier fields from the stored metadata
@@ -474,7 +476,8 @@ impl Parseable {
vec![log_source_entry.clone()],
TelemetryType::Logs,
&tenant_id,
- None,
+ vec![],
+ vec![],
)
.await;
@@ -533,7 +536,8 @@ impl Parseable {
log_source: Vec,
telemetry_type: TelemetryType,
tenant_id: &Option,
- dataset_tag: Option,
+ dataset_tags: Vec,
+ dataset_labels: Vec,
) -> Result {
if self.streams.contains(stream_name, tenant_id) {
return Ok(true);
@@ -566,7 +570,8 @@ impl Parseable {
log_source,
telemetry_type,
tenant_id,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
)
.await?;
@@ -643,7 +648,8 @@ impl Parseable {
stream_type,
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
} = headers.into();
let stream_in_memory_dont_update =
@@ -717,7 +723,8 @@ impl Parseable {
vec![log_source_entry],
telemetry_type,
tenant_id,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
)
.await?;
@@ -779,7 +786,8 @@ impl Parseable {
log_source: Vec,
telemetry_type: TelemetryType,
tenant_id: &Option,
- dataset_tag: Option,
+ dataset_tags: Vec,
+ dataset_labels: Vec,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
if stream_type != StreamType::Internal {
@@ -804,7 +812,8 @@ impl Parseable {
},
log_source: log_source.clone(),
telemetry_type,
- dataset_tag,
+ dataset_tags: dataset_tags.clone(),
+ dataset_labels: dataset_labels.clone(),
..Default::default()
};
@@ -834,7 +843,8 @@ impl Parseable {
SchemaVersion::V1, // New stream
log_source,
telemetry_type,
- dataset_tag,
+ dataset_tags,
+ dataset_labels,
);
let ingestor_id = INGESTOR_META
.get()
diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs
index 0bf8bf2ca..1230946ab 100644
--- a/src/parseable/streams.rs
+++ b/src/parseable/streams.rs
@@ -963,8 +963,28 @@ impl Stream {
self.metadata.read().expect(LOCK_EXPECT).log_source.clone()
}
- pub fn get_dataset_tag(&self) -> Option {
- self.metadata.read().expect(LOCK_EXPECT).dataset_tag
+ pub fn get_dataset_tags(&self) -> Vec {
+ self.metadata
+ .read()
+ .expect(LOCK_EXPECT)
+ .dataset_tags
+ .clone()
+ }
+
+ pub fn get_dataset_labels(&self) -> Vec {
+ self.metadata
+ .read()
+ .expect(LOCK_EXPECT)
+ .dataset_labels
+ .clone()
+ }
+
+ pub fn set_dataset_tags(&self, tags: Vec) {
+ self.metadata.write().expect(LOCK_EXPECT).dataset_tags = tags;
+ }
+
+ pub fn set_dataset_labels(&self, labels: Vec) {
+ self.metadata.write().expect(LOCK_EXPECT).dataset_labels = labels;
}
pub fn add_log_source(&self, log_source: LogSourceEntry) {
diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs
index 4ade98c69..b9f1195d4 100644
--- a/src/prism/home/mod.rs
+++ b/src/prism/home/mod.rs
@@ -45,7 +45,8 @@ struct StreamMetadata {
time_partition: Option,
dataset_format: LogSource,
ingestion: bool,
- tag: Option,
+ tags: Vec,
+ labels: Vec,
}
type StreamMetadataResponse = Result;
@@ -59,8 +60,10 @@ pub struct DataSet {
time_partition: Option,
dataset_format: LogSource,
ingestion: bool,
- #[serde(skip_serializing_if = "Option::is_none")]
- tag: Option,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ tags: Vec,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ labels: Vec,
}
#[derive(Debug, Serialize, Deserialize, Default)]
@@ -147,7 +150,8 @@ pub async fn generate_home_response(
time_partition: sm.time_partition,
dataset_format: sm.dataset_format,
ingestion: sm.ingestion,
- tag: sm.tag,
+ tags: sm.tags,
+ labels: sm.labels,
});
}
Err(e) => {
@@ -233,7 +237,8 @@ async fn get_stream_metadata(stream: String, tenant_id: &Option) -> Stre
let ingested = stream_jsons
.iter()
.any(|s| s.stats.current_stats.events > 0);
- let dataset_tag = stream_jsons[0].dataset_tag;
+ let dataset_tags = stream_jsons[0].dataset_tags.clone();
+ let dataset_labels = stream_jsons[0].dataset_labels.clone();
Ok(StreamMetadata {
stream,
@@ -242,7 +247,8 @@ async fn get_stream_metadata(stream: String, tenant_id: &Option) -> Stre
time_partition,
dataset_format,
ingestion: ingested,
- tag: dataset_tag,
+ tags: dataset_tags,
+ labels: dataset_labels,
})
}
diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs
index 38b31c207..ec7e62fab 100644
--- a/src/prism/logstream/mod.rs
+++ b/src/prism/logstream/mod.rs
@@ -40,7 +40,7 @@ use crate::{
rbac::{Users, map::SessionKey, role::Action},
stats,
storage::{StreamInfo, StreamType, retention::Retention},
- utils::{get_tenant_id_from_key, time::TimeParseError},
+ utils::time::TimeParseError,
};
#[derive(Serialize)]
@@ -231,8 +231,8 @@ impl PrismDatasetRequest {
pub async fn get_datasets(
mut self,
key: SessionKey,
+ tenant_id: Option,
) -> Result, PrismLogstreamError> {
- let tenant_id = get_tenant_id_from_key(&key);
if self.streams.is_empty() {
self.streams = PARSEABLE.streams.list(&tenant_id);
}
diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs
index 5b3cc1b66..efc2aaaee 100644
--- a/src/storage/field_stats.rs
+++ b/src/storage/field_stats.rs
@@ -159,7 +159,8 @@ pub async fn calculate_field_stats(
vec![log_source_entry],
TelemetryType::Logs,
tenant_id,
- None,
+ vec![],
+ vec![],
)
.await?;
let vec_json = apply_generic_flattening_for_partition(
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index 84d48d3c3..1694e5f5b 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -131,8 +131,10 @@ pub struct ObjectStoreFormat {
pub log_source: Vec,
#[serde(default)]
pub telemetry_type: TelemetryType,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub dataset_tag: Option,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub dataset_tags: Vec,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub dataset_labels: Vec,
}
impl MetastoreObject for ObjectStoreFormat {
@@ -173,8 +175,10 @@ pub struct StreamInfo {
pub telemetry_type: TelemetryType,
#[serde(default)]
pub hot_tier_enabled: bool,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub dataset_tag: Option,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub dataset_tags: Vec,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub dataset_labels: Vec,
}
impl StreamInfo {
@@ -197,7 +201,8 @@ impl StreamInfo {
log_source: metadata.log_source.clone(),
telemetry_type: metadata.telemetry_type,
hot_tier_enabled: metadata.hot_tier_enabled,
- dataset_tag: metadata.dataset_tag,
+ dataset_tags: metadata.dataset_tags.clone(),
+ dataset_labels: metadata.dataset_labels.clone(),
}
}
}
@@ -279,7 +284,8 @@ impl Default for ObjectStoreFormat {
hot_tier: None,
log_source: vec![LogSourceEntry::default()],
telemetry_type: TelemetryType::Logs,
- dataset_tag: None,
+ dataset_tags: Vec::new(),
+ dataset_labels: Vec::new(),
}
}
}
diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs
index 398b7e0cf..c82e32737 100644
--- a/src/storage/object_storage.rs
+++ b/src/storage/object_storage.rs
@@ -45,6 +45,7 @@ use ulid::Ulid;
use crate::catalog::{self, snapshot::Snapshot};
use crate::event::format::LogSource;
use crate::event::format::LogSourceEntry;
+use crate::handlers::DatasetTag;
use crate::handlers::http::fetch_schema;
use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT;
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
@@ -495,6 +496,31 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
Ok(())
}
+ async fn update_dataset_tags_and_labels_in_stream(
+ &self,
+ stream_name: &str,
+ tags: &[DatasetTag],
+ labels: &[String],
+ tenant_id: &Option,
+ ) -> Result<(), ObjectStorageError> {
+ let mut format: ObjectStoreFormat = serde_json::from_slice(
+ &PARSEABLE
+ .metastore
+ .get_stream_json(stream_name, false, tenant_id)
+ .await
+ .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?,
+ )?;
+ format.dataset_tags = tags.to_owned();
+ format.dataset_labels = labels.to_owned();
+ PARSEABLE
+ .metastore
+ .put_stream_json(&format, stream_name, tenant_id)
+ .await
+ .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?;
+
+ Ok(())
+ }
+
/// Updates the first event timestamp in the object store for the specified stream.
///
/// This function retrieves the current object-store format for the given stream,