Skip to content
3 changes: 2 additions & 1 deletion src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl ParseableSinkProcessor {
vec![log_source_entry],
TelemetryType::default(),
tenant_id,
None,
vec![],
vec![],
)
.await?;

Expand Down
278 changes: 278 additions & 0 deletions src/handlers/http/datasets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
/*
* Parseable Server (C) 2022 - 2025 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::collections::HashSet;

use actix_web::http::StatusCode;
use actix_web::{HttpRequest, HttpResponse, web};
use serde::{Deserialize, Serialize};

use crate::event::format::LogSource;
use crate::rbac::{self, Users, role::Action};
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::get_tenant_id_from_request;
use crate::{
handlers::DatasetTag,
parseable::PARSEABLE,
storage::{ObjectStorageError, StreamType},
};

/// Check if the caller is authorized to read a specific stream.
fn can_access_stream(req: &HttpRequest, stream_name: &str) -> bool {
let Ok(key) = extract_session_key_from_req(req) else {
return false;
};
Users.authorize(key, Action::GetStreamInfo, Some(stream_name), None)
== rbac::Response::Authorized
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct CorrelatedDataset {
name: String,
log_source: LogSource,
shared_tags: Vec<DatasetTag>,
shared_labels: Vec<String>,
}

/// GET /api/v1/datasets/correlated/{name}
/// Returns all datasets sharing at least one tag or label with the named dataset.
/// Results are filtered to only include datasets the caller is authorized to read.
pub async fn get_correlated_datasets(
req: HttpRequest,
path: web::Path<String>,
) -> Result<HttpResponse, DatasetsError> {
let dataset_name = path.into_inner();
let tenant_id = get_tenant_id_from_request(&req);

// Authorize caller for the seed dataset
if !can_access_stream(&req, &dataset_name) {
return Err(DatasetsError::DatasetNotFound(dataset_name));
}

if !PARSEABLE
.check_or_load_stream(&dataset_name, &tenant_id)
.await
{
return Err(DatasetsError::DatasetNotFound(dataset_name));
}

let stream = PARSEABLE
.get_stream(&dataset_name, &tenant_id)
.map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;

let target_tags: HashSet<DatasetTag> = stream.get_dataset_tags().into_iter().collect();
let target_labels: HashSet<String> = stream.get_dataset_labels().into_iter().collect();

if target_tags.is_empty() && target_labels.is_empty() {
return Ok(HttpResponse::Ok().json(Vec::<CorrelatedDataset>::new()));
}

let all_streams = PARSEABLE.streams.list(&tenant_id);
let mut correlated = Vec::new();

for name in all_streams {
if name == dataset_name {
continue;
}
// Filter out datasets the caller cannot read
if !can_access_stream(&req, &name) {
continue;
}
if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) {
// Skip internal streams
if s.get_stream_type() == StreamType::Internal {
continue;
}

let s_tags: HashSet<DatasetTag> = s.get_dataset_tags().into_iter().collect();
let s_labels: HashSet<String> = s.get_dataset_labels().into_iter().collect();

let shared_tags: Vec<DatasetTag> = target_tags.intersection(&s_tags).copied().collect();
let shared_labels: Vec<String> =
target_labels.intersection(&s_labels).cloned().collect();

if !shared_tags.is_empty() || !shared_labels.is_empty() {
let log_source = s
.get_log_source()
.first()
.map(|entry| entry.log_source_format.clone())
.unwrap_or_default();
correlated.push(CorrelatedDataset {
name,
log_source,
shared_tags,
shared_labels,
});
}
}
}

Ok(HttpResponse::Ok().json(correlated))
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaggedDataset {
name: String,
log_source: LogSource,
}

/// GET /api/v1/datasets/tags/{tag}
/// Returns all datasets that have the specified tag.
/// Results are filtered to only include datasets the caller is authorized to read.
pub async fn get_datasets_by_tag(
req: HttpRequest,
path: web::Path<String>,
) -> Result<HttpResponse, DatasetsError> {
let tenant_id = get_tenant_id_from_request(&req);
let tag_str = path.into_inner();
let tag =
DatasetTag::try_from(tag_str.as_str()).map_err(|_| DatasetsError::InvalidTag(tag_str))?;

let all_streams = PARSEABLE.streams.list(&tenant_id);
let mut matching = Vec::new();

for name in all_streams {
// Filter out datasets the caller cannot read
if !can_access_stream(&req, &name) {
continue;
}
if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) {
if s.get_stream_type() == StreamType::Internal {
continue;
}
if s.get_dataset_tags().contains(&tag) {
let log_source = s
.get_log_source()
.first()
.map(|entry| entry.log_source_format.clone())
.unwrap_or_default();
matching.push(TaggedDataset { name, log_source });
}
}
}

Ok(HttpResponse::Ok().json(matching))
}

#[derive(Debug, Deserialize)]
pub struct PutDatasetMetadataBody {
pub tags: Option<Vec<DatasetTag>>,
pub labels: Option<Vec<String>>,
}

/// PUT /api/v1/datasets/{name}
/// Replaces the dataset's tags and/or labels.
/// Only fields present in the body are updated; absent fields are left unchanged.
pub async fn put_dataset_metadata(
req: HttpRequest,
path: web::Path<String>,
body: web::Json<PutDatasetMetadataBody>,
) -> Result<HttpResponse, DatasetsError> {
let dataset_name = path.into_inner();
let body = body.into_inner();
let tenant_id = get_tenant_id_from_request(&req);

// Explicit per-dataset RBAC check — the middleware cannot extract the
// `{name}` path param (it looks for `logstream`), so we must verify here.
let session_key = extract_session_key_from_req(&req)
.map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
if Users.authorize(session_key, Action::CreateStream, Some(&dataset_name), None)
!= rbac::Response::Authorized
{
return Err(DatasetsError::Unauthorized(dataset_name));
}

if !PARSEABLE
.check_or_load_stream(&dataset_name, &tenant_id)
.await
{
return Err(DatasetsError::DatasetNotFound(dataset_name));
}

let stream = PARSEABLE
.get_stream(&dataset_name, &tenant_id)
.map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;

let final_tags = match body.tags {
Some(tags) => tags
.into_iter()
.collect::<HashSet<_>>()
.into_iter()
.collect(),
None => stream.get_dataset_tags(),
};
let final_labels = match body.labels {
Some(labels) => labels
.into_iter()
.collect::<HashSet<_>>()
.into_iter()
.collect(),
None => stream.get_dataset_labels(),
};

// Update storage first, then in-memory
let storage = PARSEABLE.storage.get_object_store();
storage
.update_dataset_tags_and_labels_in_stream(
&dataset_name,
&final_tags,
&final_labels,
&tenant_id,
)
.await
.map_err(DatasetsError::Storage)?;

stream.set_dataset_tags(final_tags.clone());
stream.set_dataset_labels(final_labels.clone());

Ok(HttpResponse::Ok().json(serde_json::json!({
"tags": final_tags,
"labels": final_labels,
})))
}

#[derive(Debug, thiserror::Error)]
pub enum DatasetsError {
#[error("Dataset not found: {0}")]
DatasetNotFound(String),
#[error("Unauthorized access to dataset: {0}")]
Unauthorized(String),
#[error("Invalid tag: {0}")]
InvalidTag(String),
#[error("Storage error: {0}")]
Storage(ObjectStorageError),
}

impl actix_web::ResponseError for DatasetsError {
fn status_code(&self) -> StatusCode {
match self {
DatasetsError::DatasetNotFound(_) => StatusCode::NOT_FOUND,
DatasetsError::Unauthorized(_) => StatusCode::FORBIDDEN,
DatasetsError::InvalidTag(_) => StatusCode::BAD_REQUEST,
DatasetsError::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

fn error_response(&self) -> HttpResponse {
HttpResponse::build(self.status_code()).json(serde_json::json!({
"error": self.to_string()
}))
}
}
16 changes: 13 additions & 3 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion;
use crate::handlers::{
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
};
use crate::metadata::SchemaVersion;
Expand Down Expand Up @@ -120,7 +120,8 @@ pub async fn ingest(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
None,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -206,6 +207,8 @@ pub async fn setup_otel_stream(
expected_log_source: LogSource,
known_fields: &[&str],
telemetry_type: TelemetryType,
dataset_tags: Vec<DatasetTag>,
dataset_labels: Vec<String>,
) -> Result<(String, LogSource, LogSourceEntry, Option<String>), PostError> {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
Expand Down Expand Up @@ -239,7 +242,8 @@ pub async fn setup_otel_stream(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
None,
dataset_tags,
dataset_labels,
)
.await?;
let mut time_partition = None;
Expand Down Expand Up @@ -362,6 +366,8 @@ pub async fn handle_otel_logs_ingestion(
LogSource::OtelLogs,
&OTEL_LOG_KNOWN_FIELD_LIST,
TelemetryType::Logs,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand All @@ -386,6 +392,8 @@ pub async fn handle_otel_metrics_ingestion(
LogSource::OtelMetrics,
&OTEL_METRICS_KNOWN_FIELD_LIST,
TelemetryType::Metrics,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -417,6 +425,8 @@ pub async fn handle_otel_traces_ingestion(
LogSource::OtelTraces,
&OTEL_TRACES_KNOWN_FIELD_LIST,
TelemetryType::Traces,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod about;
pub mod alerts;
pub mod cluster;
pub mod correlation;
pub mod datasets;
pub mod demo_data;
pub mod health_check;
pub mod ingest;
Expand Down
Loading
Loading