Skip to content

add system defined tags and free form labels to datasets#1553

Open
nikhilsinhaparseable wants to merge 8 commits intoparseablehq:mainfrom
nikhilsinhaparseable:dataset-tags-lables
Open

add system defined tags and free form labels to datasets#1553
nikhilsinhaparseable wants to merge 8 commits intoparseablehq:mainfrom
nikhilsinhaparseable:dataset-tags-lables

Conversation

@nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Feb 21, 2026

  • PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags
  • and X-P-Dataset-Labels headers (comma-separated) on stream creation
  • PUT /api/prism/v1/datasets/{name}- create or update dataset tags and labels(empty list clears all)
  • GET /api/prism/v1/datasets/{name}/correlated - find datasets sharing tags or labels
  • GET /api/prism/v1/datasets/tags/{tag} - find all datasets with a specific tag
  • include tags and labels in home api response

Summary by CodeRabbit

  • New Features

    • New dataset HTTP APIs: fetch correlated datasets, fetch by tag, and update dataset tags/labels; added corresponding server routes and authorization checks.
  • User-facing Data Model

    • Streams, metadata, headers, and responses now support multiple dataset tags and labels (lists instead of a single optional tag); request headers accept/parse multiple tags/labels.
  • Refactor

    • Added new dataset tag types (including APM and ServiceMap), improved parsing/deduplication, and ensured storage + in-memory state stay in sync; dataset operations now respect tenant scoping.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 21, 2026

Walkthrough

Replaces a single optional dataset_tag with dataset_tags: Vec<DatasetTag> and dataset_labels: Vec<String> across metadata, parseable APIs, storage, HTTP handlers, and stream-creation call sites; adds dataset HTTP endpoints, modal routes, header parsing, and an ObjectStorage API to update tags/labels.

Changes

Cohort / File(s) Summary
Dataset Tag Model & Parsing
src/handlers/mod.rs
Expanded DatasetTag variants (+Hash), added DATASET_TAGS_KEY/DATASET_LABELS_KEY, and new parse_dataset_tags()/parse_dataset_labels() with trimming/deduplication and warnings.
HTTP Handlers & Ingest
src/handlers/http/mod.rs, src/handlers/http/ingest.rs, src/handlers/http/prism_logstream.rs
Exported datasets module; propagated dataset_tags/dataset_labels through OTEL/ingest flow (pass explicit vec![] where None was used); post_datasets now passes tenant_id to dataset retrieval.
Datasets API & Modal Routes
src/handlers/http/datasets.rs, src/handlers/http/modal/server.rs
Added new dataset endpoints (GET correlated, GET by tag, PUT update metadata), error types, request/response models, and registered routes under /datasets with authorization guards.
Header / Stream Utilities
src/handlers/http/modal/utils/logstream_utils.rs
PutStreamHeaders now contains dataset_tags: Vec<DatasetTag> and dataset_labels: Vec<String>; header parsing switched to multi-value keys and new parsing helpers.
Parseable Stream APIs & Streams
src/parseable/mod.rs, src/parseable/streams.rs
Updated create_stream_if_not_exists/create_stream signatures to accept vectors; added get_dataset_tags/get_dataset_labels and setters; propagated vectors through creation/update flows.
Metadata & Storage Formats
src/metadata.rs, src/storage/mod.rs, src/storage/object_storage.rs
Replaced dataset_tag: Option with dataset_tags: Vec and dataset_labels: Vec in metadata, ObjectStoreFormat, and StreamInfo; added update_dataset_tags_and_labels_in_stream to ObjectStorage trait.
Home Response & Prism
src/prism/home/mod.rs, src/prism/logstream/mod.rs
Home/response dataset shape changed to tags: Vec and labels: Vec; PrismDatasetRequest::get_datasets now accepts explicit tenant_id parameter.
Migration & Call Sites
src/migration/mod.rs, src/connectors/kafka/processor.rs, src/storage/field_stats.rs
Adjusted migration and call sites to parse/forward dataset_tags/dataset_labels; call sites that previously passed None now pass vec![].

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant HTTP_API as HTTP API
    participant Parseable
    participant ObjectStore as ObjectStorage/Metastore
    participant Storage as PersistedStore

    Client->>HTTP_API: PUT /api/v1/datasets/{name} (tags/labels)
    HTTP_API->>Parseable: validate stream exists & authorize
    Parseable->>ObjectStore: update_dataset_tags_and_labels_in_stream(name, tags, labels, tenant_id)
    ObjectStore->>Storage: get_stream_json(name, tenant_id) / put_stream_json(updated)
    Storage-->>ObjectStore: ack
    ObjectStore-->>Parseable: success
    Parseable-->>HTTP_API: updated tags/labels
    HTTP_API-->>Client: 200 OK / error
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • parmesant
  • praveen5959
  • nitisht

Poem

🐇 I nibbled one tag into many a row,
Headers and streams now carry the show.
I hopped through routes and storage with cheer,
Labels and tags gathered — metadata near.
Hooray — the datasets blossom, hi-de-ho!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description lists key endpoints and features but lacks testing checklist completion and doesn't follow the provided template structure with required sections. Complete the testing checklist, add a proper Description section explaining goals and rationale, and document new features per the template.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately and concisely summarizes the main change: adding system-defined tags and free-form labels to datasets.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

480-489: Consider a builder pattern for stream creation parameters.

The #[allow(clippy::too_many_arguments)] annotation is a reasonable workaround, but with 7+ parameters, this function is becoming unwieldy. For a future improvement, consider introducing a StreamCreationParams struct or builder pattern to group these related parameters.

This is a minor suggestion and doesn't block the current PR.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/mod.rs` around lines 480 - 489, The function
create_stream_if_not_exists has many parameters; refactor by introducing a
StreamCreationParams struct (or a builder StreamCreationParamsBuilder) to group
stream_name, stream_type, custom_partition, log_source, telemetry_type,
dataset_tags, and dataset_labels; update create_stream_if_not_exists signature
to accept a single StreamCreationParams (or builder output) and adapt internal
usage accordingly, add a conversion constructor or builder methods for easy
construction where callers currently pass those seven+ args, and preserve
existing behavior and validation inside the new struct or builder.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/parseable/mod.rs`:
- Around line 480-489: The function create_stream_if_not_exists has many
parameters; refactor by introducing a StreamCreationParams struct (or a builder
StreamCreationParamsBuilder) to group stream_name, stream_type,
custom_partition, log_source, telemetry_type, dataset_tags, and dataset_labels;
update create_stream_if_not_exists signature to accept a single
StreamCreationParams (or builder output) and adapt internal usage accordingly,
add a conversion constructor or builder methods for easy construction where
callers currently pass those seven+ args, and preserve existing behavior and
validation inside the new struct or builder.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 21, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

480-527: Consider bundling stream creation parameters into a struct.

The function now accepts 8 parameters (including self), which triggers the clippy::too_many_arguments lint. While the #[allow] attribute suppresses it, this pattern repeats across create_stream_if_not_exists, create_stream, and create_update_stream.

A dedicated struct (e.g., StreamCreationParams or StreamConfig) would:

  • Improve readability at call sites
  • Make future parameter additions easier
  • Provide a natural place for default values
💡 Example struct-based approach
pub struct StreamCreationParams {
    pub stream_type: StreamType,
    pub custom_partition: Option<String>,
    pub log_source: Vec<LogSourceEntry>,
    pub telemetry_type: TelemetryType,
    pub dataset_tags: Vec<DatasetTag>,
    pub dataset_labels: Vec<String>,
}

impl Default for StreamCreationParams {
    fn default() -> Self {
        Self {
            stream_type: StreamType::UserDefined,
            custom_partition: None,
            log_source: vec![],
            telemetry_type: TelemetryType::Logs,
            dataset_tags: vec![],
            dataset_labels: vec![],
        }
    }
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/mod.rs` around lines 480 - 527, Create a StreamCreationParams
(or StreamConfig) struct to bundle the multiple stream creation arguments and
refactor create_stream_if_not_exists, create_stream, and create_update_stream to
accept that struct instead of the long parameter list; update the call site in
create_stream_if_not_exists to build a StreamCreationParams (using Default for
sensible defaults) and pass it to create_stream, adjust create_stream signature
to destructure or reference the struct fields (stream_type, custom_partition,
log_source, telemetry_type, dataset_tags, dataset_labels), remove the
#[allow(clippy::too_many_arguments)] on those functions, and add
conversions/constructors where needed so existing call sites can migrate with
minimal changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 163-169: The labels vector built in the PUT handler (where
new_labels is created from body.into_inner().labels) must trim each label and
filter out empty or whitespace-only strings before deduplicating; update the
pipeline that builds new_labels to map each label through .trim(), filter out
strings that are empty after trimming, then collect into a HashSet to dedupe and
back into a Vec (preserving whatever ordering you need). Ensure you modify the
code around the new_labels construction so labels are normalized (trimmed) and
empty/whitespace-only entries are removed prior to deduplication.
- Around line 31-186: Handlers get_correlated_datasets, get_datasets_by_tag,
put_dataset_tags and put_dataset_labels currently use PARSEABLE.streams/list,
PARSEABLE.get_stream and storage APIs without tenant context; update each
handler to resolve the tenant from the request (e.g., call
get_tenant_id_from_request or read the normalized tenant header injected by
middleware), then scope all stream listing and lookups to that tenant (filter
list results or call tenant-aware APIs) and ensure any
storage.update_dataset_tags_and_labels_in_stream and PARSEABLE.get_stream calls
include/are called with the resolved tenant or have the tenant header
overwritten server-side so clients cannot spoof it; ensure the same tenant
resolution is applied to both read (listing/get) and write
(put_dataset_tags/put_dataset_labels) flows.

---

Nitpick comments:
In `@src/parseable/mod.rs`:
- Around line 480-527: Create a StreamCreationParams (or StreamConfig) struct to
bundle the multiple stream creation arguments and refactor
create_stream_if_not_exists, create_stream, and create_update_stream to accept
that struct instead of the long parameter list; update the call site in
create_stream_if_not_exists to build a StreamCreationParams (using Default for
sensible defaults) and pass it to create_stream, adjust create_stream signature
to destructure or reference the struct fields (stream_type, custom_partition,
log_source, telemetry_type, dataset_tags, dataset_labels), remove the
#[allow(clippy::too_many_arguments)] on those functions, and add
conversions/constructors where needed so existing call sites can migrate with
minimal changes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 139-151: The current read-modify-write in put_dataset_tags uses
PARSEABLE.get_stream(...), calls stream.get_dataset_labels(), then
storage.update_dataset_tags_and_labels_in_stream(...) and finally
stream.set_dataset_tags(...), which allows TOCTOU race with concurrent
put_dataset_labels; fix by serializing updates per-stream (or by providing a
combined tags+labels atomic endpoint). Add a per-stream mutex/async lock on the
stream (e.g., a metadata_lock() or similar on the stream returned by
PARSEABLE.get_stream) and acquire it around the read-modify-write sequence that
uses stream.get_dataset_labels,
storage.update_dataset_tags_and_labels_in_stream, and stream.set_dataset_tags so
concurrent tag/label updates for the same stream are executed sequentially.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 41-88: get_correlated_datasets currently iterates the global
PARSEABLE.streams.list() and PARSEABLE.get_stream(...) without restricting to
the tenant, causing cross-tenant exposure; update the handler to derive the
tenant context (e.g., from request auth/headers or a tenant path param) and then
restrict both the initial stream lookup (PARSEABLE.get_stream(&dataset_name))
and the loop over streams to the same tenant: either call a tenant-scoped
listing API (e.g., PARSEABLE.streams.list_for_tenant(tenant) if available) or
filter the results of PARSEABLE.streams.list() by a tenant identity exposed on
each stream (e.g., s.get_tenant() == tenant) before skipping and comparing;
ensure the check for the target dataset also validates it belongs to the tenant
so only tenant-scoped streams are considered.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

150-156: ⚠️ Potential issue | 🟡 Minor

Normalize label input in PUT body before dedupe.

Line 150-Line 156 deduplicates raw strings but does not trim/filter whitespace-only labels, so empty labels can still be persisted.

Suggested fix
     let final_labels = match body.labels {
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
         None => stream.get_dataset_labels(),
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 150 - 156, Normalize and filter
incoming labels before deduplication: when building final_labels from
body.labels (the match arm handling Some(labels)), map each label to trimmed
string and filter out labels that are empty or whitespace-only (e.g.,
labels.into_iter().map(|s| s.trim().to_string()).filter(|s| !s.is_empty())...),
then collect into a HashSet to dedupe and back into the desired collection; keep
the None branch using stream.get_dataset_labels() unchanged.

142-172: ⚠️ Potential issue | 🔴 Critical

Concurrent partial metadata updates can still lose data.

Line 142-Line 172 does a read-modify-write using current in-memory values for absent fields, so two concurrent requests (tags-only and labels-only) can overwrite each other with stale counterparts.

Suggested direction
 pub async fn put_dataset_metadata(...) -> Result<HttpResponse, DatasetsError> {
     ...
     let stream = PARSEABLE
         .get_stream(&dataset_name, &tenant_id)
         .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
+
+    // Serialize metadata updates for this stream (or use atomic storage merge).
+    // let _guard = stream.metadata_lock().await;

     let final_tags = match body.tags { ... };
     let final_labels = match body.labels { ... };

     storage
         .update_dataset_tags_and_labels_in_stream(...)
         .await
         .map_err(DatasetsError::Storage)?;

     stream.set_dataset_tags(final_tags.clone());
     stream.set_dataset_labels(final_labels.clone());

If a stream-scoped lock is unavailable, prefer a storage-level atomic patch API that updates only provided fields server-side.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 142 - 172, Current code computes
final_tags/final_labels from in-memory stream values and then does a
read-modify-write, which lets concurrent requests overwrite each other; instead
either acquire the stream-scoped lock around the read-modify-write (use
stream.get_dataset_tags/get_dataset_labels and
stream.set_dataset_tags/set_dataset_labels inside the lock) or, preferably,
change the update to an atomic storage-side patch that accepts
Option<HashSet<_>> and only updates provided fields (modify the call to
storage.update_dataset_tags_and_labels_in_stream to pass Option types and
implement server-side merge), removing the reliance on in-memory reads to
prevent lost updates.
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

982-988: Prefer atomic update for tags+labels to avoid transient mixed state.

Line 982 and Line 986 update related metadata in two separate writes. A single setter that updates both fields under one lock would avoid short-lived inconsistent reads.

♻️ Suggested refactor
+    pub fn set_dataset_metadata(&self, tags: Vec<DatasetTag>, labels: Vec<String>) {
+        let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
+        metadata.dataset_tags = tags;
+        metadata.dataset_labels = labels;
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 982 - 988, The two setters
set_dataset_tags and set_dataset_labels perform separate metadata.write() calls
causing transient inconsistent state; add a new atomic setter (e.g.,
set_dataset_tags_and_labels(&self, tags: Vec<DatasetTag>, labels: Vec<String>))
that acquires metadata.write() once and assigns both dataset_tags and
dataset_labels inside the same lock, then update call sites to use the new
method (or have the existing setters delegate to it) so modifications never
occur in two separate writes; reference the metadata RwLock field and the
existing set_dataset_tags/set_dataset_labels method names when making the
change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/server.rs`:
- Around line 222-227: The route currently mounts only PUT /{name} using
http::datasets::put_dataset_metadata (authorized with Action::CreateStream), but
the API requires distinct replacement endpoints for tags and labels; add two
separate routes for "/{name}/tags" and "/{name}/labels" each using web::put()
and wired to the appropriate handlers (e.g., http::datasets::put_dataset_tags
and http::datasets::put_dataset_labels) with the same
authorize_for_resource(Action::CreateStream) policy, or if those handlers do not
exist, split http::datasets::put_dataset_metadata into two functions and route
them accordingly so clients can call PUT /datasets/{name}/tags and PUT
/datasets/{name}/labels.

In `@src/handlers/mod.rs`:
- Around line 95-114: The TryFrom<&str> for DatasetTag currently only accepts
the new literals (e.g., "agent-monitoring", "k8s-monitoring") causing older
values to fail; update the DatasetTag enum and its impl TryFrom to accept legacy
names by adding serde aliases on the enum variants (e.g., add #[serde(alias =
"agent-observability")] and #[serde(alias = "k8s-observability")] to
DatasetTag::AgentMonitoring and ::K8sMonitoring) and extend the match in impl
TryFrom (the function try_from) to also match the old strings
("agent-observability", "k8s-observability") mapping them to the corresponding
variants, keeping the existing error message unchanged.

In `@src/storage/mod.rs`:
- Around line 134-137: The struct currently only reads dataset_tags so legacy
single-key dataset_tag values are dropped; update deserialization/migration to
map dataset_tag -> dataset_tags by adding a compatibility conversion: in
src/storage/mod.rs ensure deserialization recognizes a legacy dataset_tag (alias
or custom Deserialize) and appends it into the dataset_tags Vec, or add a
migration step that, when bumping
CURRENT_OBJECT_STORE_VERSION/CURRENT_SCHEMA_VERSION, transforms any legacy
dataset_tag value into dataset_tags before validation; target the symbols
dataset_tags, dataset_tag, CURRENT_OBJECT_STORE_VERSION and
CURRENT_SCHEMA_VERSION when making this change so older metadata is preserved
and participates in tag-based APIs.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 150-156: Normalize and filter incoming labels before
deduplication: when building final_labels from body.labels (the match arm
handling Some(labels)), map each label to trimmed string and filter out labels
that are empty or whitespace-only (e.g., labels.into_iter().map(|s|
s.trim().to_string()).filter(|s| !s.is_empty())...), then collect into a HashSet
to dedupe and back into the desired collection; keep the None branch using
stream.get_dataset_labels() unchanged.
- Around line 142-172: Current code computes final_tags/final_labels from
in-memory stream values and then does a read-modify-write, which lets concurrent
requests overwrite each other; instead either acquire the stream-scoped lock
around the read-modify-write (use stream.get_dataset_tags/get_dataset_labels and
stream.set_dataset_tags/set_dataset_labels inside the lock) or, preferably,
change the update to an atomic storage-side patch that accepts
Option<HashSet<_>> and only updates provided fields (modify the call to
storage.update_dataset_tags_and_labels_in_stream to pass Option types and
implement server-side merge), removing the reliance on in-memory reads to
prevent lost updates.

---

Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 982-988: The two setters set_dataset_tags and set_dataset_labels
perform separate metadata.write() calls causing transient inconsistent state;
add a new atomic setter (e.g., set_dataset_tags_and_labels(&self, tags:
Vec<DatasetTag>, labels: Vec<String>)) that acquires metadata.write() once and
assigns both dataset_tags and dataset_labels inside the same lock, then update
call sites to use the new method (or have the existing setters delegate to it)
so modifications never occur in two separate writes; reference the metadata
RwLock field and the existing set_dataset_tags/set_dataset_labels method names
when making the change.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 419f53d and 4a3e23c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (15)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/handlers/http/mod.rs
  • src/storage/field_stats.rs
  • src/handlers/http/ingest.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/server.rs`:
- Around line 210-220: The two route registrations conflict because
"/tags/{tag}" will match "/tags/correlated" before "/{name}/correlated", causing
get_correlated_datasets to be unreachable for a dataset named "tags"; change the
correlated endpoint path to avoid structural collision (for example replace
"/{name}/correlated" with "/correlated/{name}" in the route registration where
web::get().to(http::datasets::get_correlated_datasets).authorize_for_resource(Action::GetStreamInfo)
is set) and update the path extractor in the get_correlated_datasets handler and
any clients/docs referencing the old path so they use the new
"/correlated/{name}" form.
- Around line 200-227: The post_datasets handler currently derives tenant from
BasicAuth via extract_session_key_from_req; change it to read the tenant using
get_tenant_id_from_request (the same middleware-normalized header used by
get_info and the other dataset handlers) to enforce tenant scoping and prevent
header spoofing: locate the post_datasets function in http::prism_logstream,
remove or stop using extract_session_key_from_req for tenant resolution, call
get_tenant_id_from_request(req) (or equivalent helper used elsewhere) and pass
that tenant downstream to any dataset creation/validation logic so it matches
the behavior of get_datasets_by_tag, get_correlated_datasets, and
put_dataset_metadata.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a3e23c and 4a2b764.

📒 Files selected for processing (1)
  • src/handlers/http/modal/server.rs

PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags and
X-P-Dataset-Labels headers (comma-separated) on stream creation
PUT /api/prism/v1/datasets/{name} - update tags and labels
GET /api/prism/v1/datasets/{name}/correlated - find datasets sharing tags or labels
GET /api/prism/v1/datasets/tags/{tag} - find all datasets with a specific tag
include tags and labels in home api response
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

197-202: ⚠️ Potential issue | 🟡 Minor

Normalize request-body labels before storing them.

This path still stores label bodies verbatim, so blank labels and whitespace variants can persist and skew correlation results.

Suggested fix
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 197 - 202, The current
final_labels assignment uses body.labels verbatim and can retain blank or
whitespace-only variants; update the transformation so that when handling
Some(labels) you trim each label (e.g., call trim on the string), discard empty
results (filter out labels where trimmed.is_empty()), and then deduplicate by
collecting into a HashSet before converting back to the Vec used by
final_labels; apply this change to the final_labels computation that consumes
body.labels so stored labels are normalized (trimmed and non-empty).

189-216: ⚠️ Potential issue | 🔴 Critical

Partial PUTs still have a lost-update race.

Combining tags and labels into one storage call does not make this atomic because omitted fields are still read from current in-memory state first. A concurrent {"tags": ...} request and {"labels": ...} request can each write back a stale copy of the other field and clobber one update. Either make PUT a full replacement that requires both arrays, or serialize this read/merge/write per stream.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 189 - 216, The current PUT
handler builds final_tags and final_labels by reading in-memory via
stream.get_dataset_tags()/get_dataset_labels() and then calls
storage.update_dataset_tags_and_labels_in_stream(...), which creates a
lost-update race when concurrent partial PUTs update only one field; to fix,
either require full replacement (validate that body.tags and body.labels are
both Some and reject partial PUTs) or serialize read/merge/write per stream
(introduce a per-stream mutex/lock when computing final_tags/final_labels and
calling update_dataset_tags_and_labels_in_stream) so concurrent handlers cannot
interleave; locate and modify the code that constructs final_tags/final_labels
and the call to update_dataset_tags_and_labels_in_stream to implement one of
these two strategies, using identifiers final_tags, final_labels,
stream.get_dataset_tags/get_dataset_labels, and
update_dataset_tags_and_labels_in_stream to find the right spot.
🧹 Nitpick comments (2)
src/handlers/mod.rs (2)

153-161: Same non-deterministic ordering concern for parse_dataset_labels.

Similar to parse_dataset_tags, the label ordering is non-deterministic after HashSet deduplication. This may affect API response consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 153 - 161, parse_dataset_labels currently
deduplicates via HashSet which yields non-deterministic ordering; change it to
produce a deterministic, stable order by using either a BTreeSet for
deduplication or collect into a Vec then sort before returning. Update the
function parse_dataset_labels (and mirror the same approach used for
parse_dataset_tags if present) to trim and filter as now, then dedupe
deterministically (BTreeSet::from_iter or dedupe Vec + sort) and return a
Vec<String> with a stable sorted order.

131-151: Non-deterministic ordering in parse_dataset_tags.

The function collects into a HashSet for deduplication, then converts to Vec. This results in non-deterministic ordering of the returned tags. If consistent ordering matters for API responses or comparison purposes, consider sorting or using IndexSet from the indexmap crate.

// Current: non-deterministic order
.collect::<HashSet<_>>()
.into_iter()
.collect()

// Alternative: deterministic order
.collect::<HashSet<_>>()
.into_iter()
.sorted() // requires Ord impl or sorted_by
.collect()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 131 - 151, parse_dataset_tags currently
deduplicates via a HashSet then collects into a Vec which yields
non-deterministic ordering; change it to produce a deterministic order by either
(a) collecting into the HashSet for dedupe and then sorting the iterator
(requiring DatasetTag: Ord or using a comparator) before collecting into Vec, or
(b) replace the HashSet with an IndexSet from the indexmap crate to preserve
insertion order; update the function parse_dataset_tags (and any uses)
accordingly to ensure stable, deterministic ordering of the returned
Vec<DatasetTag>.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 68-70: The 404 is returned too early because
PARSEABLE.get_stream(&dataset_name, &tenant_id) only checks the in-memory
registry; before mapping to DatasetsError::DatasetNotFound you must attempt the
same storage-hydration used by Parseable's logstream handlers to load the
dataset from object storage and populate the in-memory registry, then retry
get_stream; apply this change to the current occurrence and the similar one at
the later block (around lines 185-187) so that you only return
DatasetsError::DatasetNotFound if the stream is still absent after trying the
storage-load path.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 197-202: The current final_labels assignment uses body.labels
verbatim and can retain blank or whitespace-only variants; update the
transformation so that when handling Some(labels) you trim each label (e.g.,
call trim on the string), discard empty results (filter out labels where
trimmed.is_empty()), and then deduplicate by collecting into a HashSet before
converting back to the Vec used by final_labels; apply this change to the
final_labels computation that consumes body.labels so stored labels are
normalized (trimmed and non-empty).
- Around line 189-216: The current PUT handler builds final_tags and
final_labels by reading in-memory via
stream.get_dataset_tags()/get_dataset_labels() and then calls
storage.update_dataset_tags_and_labels_in_stream(...), which creates a
lost-update race when concurrent partial PUTs update only one field; to fix,
either require full replacement (validate that body.tags and body.labels are
both Some and reject partial PUTs) or serialize read/merge/write per stream
(introduce a per-stream mutex/lock when computing final_tags/final_labels and
calling update_dataset_tags_and_labels_in_stream) so concurrent handlers cannot
interleave; locate and modify the code that constructs final_tags/final_labels
and the call to update_dataset_tags_and_labels_in_stream to implement one of
these two strategies, using identifiers final_tags, final_labels,
stream.get_dataset_tags/get_dataset_labels, and
update_dataset_tags_and_labels_in_stream to find the right spot.

---

Nitpick comments:
In `@src/handlers/mod.rs`:
- Around line 153-161: parse_dataset_labels currently deduplicates via HashSet
which yields non-deterministic ordering; change it to produce a deterministic,
stable order by using either a BTreeSet for deduplication or collect into a Vec
then sort before returning. Update the function parse_dataset_labels (and mirror
the same approach used for parse_dataset_tags if present) to trim and filter as
now, then dedupe deterministically (BTreeSet::from_iter or dedupe Vec + sort)
and return a Vec<String> with a stable sorted order.
- Around line 131-151: parse_dataset_tags currently deduplicates via a HashSet
then collects into a Vec which yields non-deterministic ordering; change it to
produce a deterministic order by either (a) collecting into the HashSet for
dedupe and then sorting the iterator (requiring DatasetTag: Ord or using a
comparator) before collecting into Vec, or (b) replace the HashSet with an
IndexSet from the indexmap crate to preserve insertion order; update the
function parse_dataset_tags (and any uses) accordingly to ensure stable,
deterministic ordering of the returned Vec<DatasetTag>.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 9baefa2e-2d8a-4e4b-8fe1-ab8ef9c04ab3

📥 Commits

Reviewing files that changed from the base of the PR and between 4a2b764 and 6cab92e.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (7)
  • src/handlers/http/mod.rs
  • src/storage/field_stats.rs
  • src/connectors/kafka/processor.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/ingest.rs
  • src/parseable/streams.rs
  • src/handlers/http/modal/utils/logstream_utils.rs

Comment on lines +68 to +70
let stream = PARSEABLE
.get_stream(&dataset_name, &tenant_id)
.map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Load storage-backed datasets before returning DatasetNotFound.

These get_stream(...) calls only see the in-memory registry. On query/prism nodes, a real dataset can still live only in object storage until something else hydrates it, so both endpoints can 404 an existing stream. Reuse the same storage-load path used elsewhere before this lookup. Based on learnings: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes by loading from storage when the stream is absent from memory.

Also applies to: 185-187

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 68 - 70, The 404 is returned too
early because PARSEABLE.get_stream(&dataset_name, &tenant_id) only checks the
in-memory registry; before mapping to DatasetsError::DatasetNotFound you must
attempt the same storage-hydration used by Parseable's logstream handlers to
load the dataset from object storage and populate the in-memory registry, then
retry get_stream; apply this change to the current occurrence and the similar
one at the later block (around lines 185-187) so that you only return
DatasetsError::DatasetNotFound if the stream is still absent after trying the
storage-load path.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/prism/home/mod.rs (1)

205-209: ⚠️ Potential issue | 🟠 Major

Pick a canonical metadata copy for tags and labels.

get_all_stream_jsons can yield multiple ObjectStoreFormat entries for one dataset, but this response copies dataset_tags and dataset_labels from stream_jsons[0] only. Since tags/labels are now mutable through PUT /datasets/{name}, the home API can return stale metadata depending on which copy the metastore returns first. Read an explicit authoritative stream.json here instead of the first entry.

Also applies to: 240-251

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prism/home/mod.rs` around lines 205 - 209, The code currently calls
PARSEABLE.metastore.get_all_stream_jsons and then copies
dataset_tags/dataset_labels from stream_jsons[0], which can be stale; update
get_stream_metadata to instead fetch the canonical/authoritative stream.json
(not rely on the first entry) and extract dataset_tags and dataset_labels from
that single authoritative result (e.g., use a metastore method that returns the
canonical stream.json or a get_stream_json/get_authoritative_stream_json
variant), and apply the same fix to the other block that currently reads
stream_jsons[0] (the second occurrence mentioned at lines 240-251) so both
places use the explicit authoritative stream.json when building
StreamMetadataResponse.
src/prism/logstream/mod.rs (1)

231-245: ⚠️ Potential issue | 🟠 Major

Enforce the full per-stream permission set before building PrismDatasetResponse.

This path eventually returns schema, stats, retention, and counts, but the only dataset-level gate in process_stream is Action::ListStream. A user who can list a dataset can therefore still receive data that should sit behind GetSchema/Query or other stronger actions. Validate the full action set per stream before calling build_dataset_response, or trim the response to only fields covered by the existing check.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prism/logstream/mod.rs` around lines 231 - 245, The code currently only
gates datasets by Action::ListStream in get_datasets/process_stream but still
builds full PrismDatasetResponse (schema, stats, retention, counts); fix by
enforcing per-stream permission checks for the full action set (e.g.,
Action::GetSchema, Action::Query, Action::GetRetention, Action::GetCounts, etc.)
before calling build_dataset_response — either (A) perform explicit permission
checks for those actions inside get_datasets (or have process_stream return the
set of allowed actions) and only include fields allowed by those actions, or (B)
change build_dataset_response to accept an allowed_actions flag and trim out
schema/stats/retention/counts when the caller lacks the corresponding
permission; apply this change around get_datasets, process_stream, and
build_dataset_response so responses never expose data beyond the allowed
per-stream actions.
♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

197-203: ⚠️ Potential issue | 🟡 Minor

Normalize labels before deduplication and persistence.

Line 197-203 dedupes labels but does not trim/filter empty values, so whitespace-only labels can still be stored and matched inconsistently.

💡 Proposed fix
-    let final_labels = match body.labels {
-        Some(labels) => labels
-            .into_iter()
-            .collect::<HashSet<_>>()
-            .into_iter()
-            .collect(),
+    let final_labels = match body.labels {
+        Some(labels) => labels
+            .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
+            .collect::<HashSet<_>>()
+            .into_iter()
+            .collect(),
         None => stream.get_dataset_labels(),
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 197 - 203, final_labels is
deduped but not normalized, allowing whitespace-only or untrimmed labels to
persist; update the handling of body.labels (and the branch that falls back to
stream.get_dataset_labels() if needed) to map each label through trim(), filter
out labels that are empty after trimming, then collect into a HashSet to
deduplicate and finally into the expected collection type used later (use the
same symbol final_labels). Ensure both the Some(labels) branch and any code path
that constructs labels from stream.get_dataset_labels() apply the same
trim+filter+dedupe normalization.

68-70: ⚠️ Potential issue | 🟠 Major

Defer DatasetNotFound until storage hydration is attempted.

Line 68-70 and Line 185-187 map in-memory get_stream misses straight to 404. In query/standalone flows, this can incorrectly 404 datasets that exist in object storage but are not hydrated yet. Please apply the same storage-hydration existence pattern used in logstream handlers before returning not found.

Based on learnings: stream existence checks should attempt storage-backed hydration in query/standalone mode before returning not found.

Also applies to: 185-187

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 68 - 70, The current code calls
PARSEABLE.get_stream(&dataset_name, &tenant_id) and immediately maps any error
to DatasetsError::DatasetNotFound (e.g., in the block around
PARSEABLE.get_stream and the similar check at lines 185-187), which causes a
premature 404 for datasets that exist in object storage but aren't hydrated;
instead, follow the logstream handler pattern: on get_stream error, attempt
storage-backed hydration (invoke the same hydrate/storage-check routine used by
logstream handlers for query/standalone flows) and only return
DatasetsError::DatasetNotFound if hydration confirms the dataset is absent or
hydration fails definitively; update the code paths around PARSEABLE.get_stream
and the duplicated check to defer mapping to DatasetNotFound until after the
hydration attempt completes.
🧹 Nitpick comments (2)
src/prism/home/mod.rs (1)

63-66: Keep tags and labels in the serialized shape even when empty.

With skip_serializing_if = "Vec::is_empty", datasets that simply have no metadata look the same as responses from servers that do not support these fields at all. Returning [] keeps the home-response schema stable for clients.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prism/home/mod.rs` around lines 63 - 66, The struct fields `tags:
Vec<DatasetTag>` and `labels: Vec<String>` in src/prism/home/mod.rs should
always be present in serialized output even when empty; remove the
`skip_serializing_if = "Vec::is_empty"` attribute while keeping
`#[serde(default)]` so empty vectors serialize as `[]`. Locate the `tags` and
`labels` field definitions in the struct (the lines with `tags: Vec<DatasetTag>`
and `labels: Vec<String>`) and delete only the `skip_serializing_if` portion of
their serde attributes, leaving `default` intact to still provide an empty Vec
when missing during deserialization.
src/parseable/streams.rs (1)

982-988: Add a single setter for dataset metadata.

The new dataset update flow has to call set_dataset_tags and set_dataset_labels separately, so readers can observe a mixed state between the two writes. A combined setter would keep the in-memory view coherent and avoid the second lock acquisition.

♻️ Suggested refactor
+    pub fn set_dataset_metadata(&self, tags: Vec<DatasetTag>, labels: Vec<String>) {
+        let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
+        metadata.dataset_tags = tags;
+        metadata.dataset_labels = labels;
+    }
+
     pub fn set_dataset_tags(&self, tags: Vec<DatasetTag>) {
         self.metadata.write().expect(LOCK_EXPECT).dataset_tags = tags;
     }
 
     pub fn set_dataset_labels(&self, labels: Vec<String>) {
         self.metadata.write().expect(LOCK_EXPECT).dataset_labels = labels;
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 982 - 988, Introduce a single atomic
setter that updates both tags and labels under one write lock to avoid
intermediate inconsistent reads: add a new method (e.g. set_dataset_metadata or
set_dataset_tags_and_labels) that takes (tags: Vec<DatasetTag>, labels:
Vec<String>) and performs one self.metadata.write().expect(LOCK_EXPECT)
assignment setting both dataset_tags and dataset_labels in the same critical
section, and update call sites to use this combined setter instead of calling
set_dataset_tags and set_dataset_labels separately; keep the existing individual
setters only if backward compatibility is required but prefer the combined
setter to prevent two separate lock acquisitions and mixed-state visibility.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/prism/home/mod.rs`:
- Around line 205-209: The code currently calls
PARSEABLE.metastore.get_all_stream_jsons and then copies
dataset_tags/dataset_labels from stream_jsons[0], which can be stale; update
get_stream_metadata to instead fetch the canonical/authoritative stream.json
(not rely on the first entry) and extract dataset_tags and dataset_labels from
that single authoritative result (e.g., use a metastore method that returns the
canonical stream.json or a get_stream_json/get_authoritative_stream_json
variant), and apply the same fix to the other block that currently reads
stream_jsons[0] (the second occurrence mentioned at lines 240-251) so both
places use the explicit authoritative stream.json when building
StreamMetadataResponse.

In `@src/prism/logstream/mod.rs`:
- Around line 231-245: The code currently only gates datasets by
Action::ListStream in get_datasets/process_stream but still builds full
PrismDatasetResponse (schema, stats, retention, counts); fix by enforcing
per-stream permission checks for the full action set (e.g., Action::GetSchema,
Action::Query, Action::GetRetention, Action::GetCounts, etc.) before calling
build_dataset_response — either (A) perform explicit permission checks for those
actions inside get_datasets (or have process_stream return the set of allowed
actions) and only include fields allowed by those actions, or (B) change
build_dataset_response to accept an allowed_actions flag and trim out
schema/stats/retention/counts when the caller lacks the corresponding
permission; apply this change around get_datasets, process_stream, and
build_dataset_response so responses never expose data beyond the allowed
per-stream actions.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 197-203: final_labels is deduped but not normalized, allowing
whitespace-only or untrimmed labels to persist; update the handling of
body.labels (and the branch that falls back to stream.get_dataset_labels() if
needed) to map each label through trim(), filter out labels that are empty after
trimming, then collect into a HashSet to deduplicate and finally into the
expected collection type used later (use the same symbol final_labels). Ensure
both the Some(labels) branch and any code path that constructs labels from
stream.get_dataset_labels() apply the same trim+filter+dedupe normalization.
- Around line 68-70: The current code calls PARSEABLE.get_stream(&dataset_name,
&tenant_id) and immediately maps any error to DatasetsError::DatasetNotFound
(e.g., in the block around PARSEABLE.get_stream and the similar check at lines
185-187), which causes a premature 404 for datasets that exist in object storage
but aren't hydrated; instead, follow the logstream handler pattern: on
get_stream error, attempt storage-backed hydration (invoke the same
hydrate/storage-check routine used by logstream handlers for query/standalone
flows) and only return DatasetsError::DatasetNotFound if hydration confirms the
dataset is absent or hydration fails definitively; update the code paths around
PARSEABLE.get_stream and the duplicated check to defer mapping to
DatasetNotFound until after the hydration attempt completes.

---

Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 982-988: Introduce a single atomic setter that updates both tags
and labels under one write lock to avoid intermediate inconsistent reads: add a
new method (e.g. set_dataset_metadata or set_dataset_tags_and_labels) that takes
(tags: Vec<DatasetTag>, labels: Vec<String>) and performs one
self.metadata.write().expect(LOCK_EXPECT) assignment setting both dataset_tags
and dataset_labels in the same critical section, and update call sites to use
this combined setter instead of calling set_dataset_tags and set_dataset_labels
separately; keep the existing individual setters only if backward compatibility
is required but prefer the combined setter to prevent two separate lock
acquisitions and mixed-state visibility.

In `@src/prism/home/mod.rs`:
- Around line 63-66: The struct fields `tags: Vec<DatasetTag>` and `labels:
Vec<String>` in src/prism/home/mod.rs should always be present in serialized
output even when empty; remove the `skip_serializing_if = "Vec::is_empty"`
attribute while keeping `#[serde(default)]` so empty vectors serialize as `[]`.
Locate the `tags` and `labels` field definitions in the struct (the lines with
`tags: Vec<DatasetTag>` and `labels: Vec<String>`) and delete only the
`skip_serializing_if` portion of their serde attributes, leaving `default`
intact to still provide an empty Vec when missing during deserialization.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: e591f47e-21dd-4410-8761-13a3ec03a40b

📥 Commits

Reviewing files that changed from the base of the PR and between 4a2b764 and 6cab92e.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (7)
  • src/handlers/http/mod.rs
  • src/storage/field_stats.rs
  • src/migration/mod.rs
  • src/connectors/kafka/processor.rs
  • src/handlers/http/ingest.rs
  • src/parseable/mod.rs
  • src/handlers/mod.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/handlers/mod.rs (1)

131-151: Non-deterministic ordering in parsed tags/labels.

The parse_dataset_tags and parse_dataset_labels functions collect into a HashSet for deduplication then convert to Vec. Since HashSet iteration order is non-deterministic, the resulting vector order may vary across calls with the same input. This could cause issues with:

  • Consistent API responses
  • Testing
  • Diff-based change detection

If ordering matters for any downstream use case (e.g., serialization stability), consider sorting the result or using IndexSet from the indexmap crate to preserve insertion order while deduplicating.

♻️ Optional fix to ensure deterministic ordering
 pub fn parse_dataset_tags(header_value: &str) -> Vec<DatasetTag> {
-    header_value
+    let mut tags: Vec<DatasetTag> = header_value
         .split(',')
         .filter_map(|s| {
             let trimmed = s.trim();
             if trimmed.is_empty() {
                 None
             } else {
                 match DatasetTag::try_from(trimmed) {
                     Ok(tag) => Some(tag),
                     Err(err) => {
                         warn!("Invalid dataset tag '{trimmed}': {err}");
                         None
                     }
                 }
             }
         })
         .collect::<HashSet<_>>()
         .into_iter()
-        .collect()
+        .collect();
+    tags.sort_by(|a, b| a.to_string().cmp(&b.to_string()));
+    tags
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 131 - 151, The functions parse_dataset_tags
(and parse_dataset_labels) deduplicate via HashSet then collect into Vec, which
yields non-deterministic ordering; update these functions to produce
deterministic output by either (a) replacing the HashSet dedupe with an IndexSet
from the indexmap crate to preserve insertion order while removing duplicates,
or (b) after collecting into a Vec, sort the Vec deterministically (e.g.,
lexicographically by tag/label string or by a stable Ord implementation) before
returning; apply the same change to both parse_dataset_tags and
parse_dataset_labels so repeated calls with the same input yield a stable
ordering.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/handlers/mod.rs`:
- Around line 131-151: The functions parse_dataset_tags (and
parse_dataset_labels) deduplicate via HashSet then collect into Vec, which
yields non-deterministic ordering; update these functions to produce
deterministic output by either (a) replacing the HashSet dedupe with an IndexSet
from the indexmap crate to preserve insertion order while removing duplicates,
or (b) after collecting into a Vec, sort the Vec deterministically (e.g.,
lexicographically by tag/label string or by a stable Ord implementation) before
returning; apply the same change to both parse_dataset_tags and
parse_dataset_labels so repeated calls with the same input yield a stable
ordering.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 9baefa2e-2d8a-4e4b-8fe1-ab8ef9c04ab3

📥 Commits

Reviewing files that changed from the base of the PR and between 4a2b764 and 6cab92e.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
✅ Files skipped from review due to trivial changes (2)
  • src/handlers/http/mod.rs
  • src/parseable/streams.rs
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/connectors/kafka/processor.rs
  • src/storage/field_stats.rs
  • src/storage/object_storage.rs
  • src/handlers/http/modal/server.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant