add system defined tags and free form labels to datasets#1553
add system defined tags and free form labels to datasets#1553nikhilsinhaparseable wants to merge 8 commits intoparseablehq:mainfrom
Conversation
WalkthroughReplaces a single optional Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant HTTP_API as HTTP API
participant Parseable
participant ObjectStore as ObjectStorage/Metastore
participant Storage as PersistedStore
Client->>HTTP_API: PUT /api/v1/datasets/{name} (tags/labels)
HTTP_API->>Parseable: validate stream exists & authorize
Parseable->>ObjectStore: update_dataset_tags_and_labels_in_stream(name, tags, labels, tenant_id)
ObjectStore->>Storage: get_stream_json(name, tenant_id) / put_stream_json(updated)
Storage-->>ObjectStore: ack
ObjectStore-->>Parseable: success
Parseable-->>HTTP_API: updated tags/labels
HTTP_API-->>Client: 200 OK / error
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/parseable/mod.rs (1)
480-489: Consider a builder pattern for stream creation parameters.The
#[allow(clippy::too_many_arguments)]annotation is a reasonable workaround, but with 7+ parameters, this function is becoming unwieldy. For a future improvement, consider introducing aStreamCreationParamsstruct or builder pattern to group these related parameters.This is a minor suggestion and doesn't block the current PR.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/parseable/mod.rs` around lines 480 - 489, The function create_stream_if_not_exists has many parameters; refactor by introducing a StreamCreationParams struct (or a builder StreamCreationParamsBuilder) to group stream_name, stream_type, custom_partition, log_source, telemetry_type, dataset_tags, and dataset_labels; update create_stream_if_not_exists signature to accept a single StreamCreationParams (or builder output) and adapt internal usage accordingly, add a conversion constructor or builder methods for easy construction where callers currently pass those seven+ args, and preserve existing behavior and validation inside the new struct or builder.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/parseable/mod.rs`:
- Around line 480-489: The function create_stream_if_not_exists has many
parameters; refactor by introducing a StreamCreationParams struct (or a builder
StreamCreationParamsBuilder) to group stream_name, stream_type,
custom_partition, log_source, telemetry_type, dataset_tags, and dataset_labels;
update create_stream_if_not_exists signature to accept a single
StreamCreationParams (or builder output) and adapt internal usage accordingly,
add a conversion constructor or builder methods for easy construction where
callers currently pass those seven+ args, and preserve existing behavior and
validation inside the new struct or builder.
12ae8ee to
4fb9620
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/parseable/mod.rs (1)
480-527: Consider bundling stream creation parameters into a struct.The function now accepts 8 parameters (including
self), which triggers theclippy::too_many_argumentslint. While the#[allow]attribute suppresses it, this pattern repeats acrosscreate_stream_if_not_exists,create_stream, andcreate_update_stream.A dedicated struct (e.g.,
StreamCreationParamsorStreamConfig) would:
- Improve readability at call sites
- Make future parameter additions easier
- Provide a natural place for default values
💡 Example struct-based approach
pub struct StreamCreationParams { pub stream_type: StreamType, pub custom_partition: Option<String>, pub log_source: Vec<LogSourceEntry>, pub telemetry_type: TelemetryType, pub dataset_tags: Vec<DatasetTag>, pub dataset_labels: Vec<String>, } impl Default for StreamCreationParams { fn default() -> Self { Self { stream_type: StreamType::UserDefined, custom_partition: None, log_source: vec![], telemetry_type: TelemetryType::Logs, dataset_tags: vec![], dataset_labels: vec![], } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/parseable/mod.rs` around lines 480 - 527, Create a StreamCreationParams (or StreamConfig) struct to bundle the multiple stream creation arguments and refactor create_stream_if_not_exists, create_stream, and create_update_stream to accept that struct instead of the long parameter list; update the call site in create_stream_if_not_exists to build a StreamCreationParams (using Default for sensible defaults) and pass it to create_stream, adjust create_stream signature to destructure or reference the struct fields (stream_type, custom_partition, log_source, telemetry_type, dataset_tags, dataset_labels), remove the #[allow(clippy::too_many_arguments)] on those functions, and add conversions/constructors where needed so existing call sites can migrate with minimal changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 163-169: The labels vector built in the PUT handler (where
new_labels is created from body.into_inner().labels) must trim each label and
filter out empty or whitespace-only strings before deduplicating; update the
pipeline that builds new_labels to map each label through .trim(), filter out
strings that are empty after trimming, then collect into a HashSet to dedupe and
back into a Vec (preserving whatever ordering you need). Ensure you modify the
code around the new_labels construction so labels are normalized (trimmed) and
empty/whitespace-only entries are removed prior to deduplication.
- Around line 31-186: Handlers get_correlated_datasets, get_datasets_by_tag,
put_dataset_tags and put_dataset_labels currently use PARSEABLE.streams/list,
PARSEABLE.get_stream and storage APIs without tenant context; update each
handler to resolve the tenant from the request (e.g., call
get_tenant_id_from_request or read the normalized tenant header injected by
middleware), then scope all stream listing and lookups to that tenant (filter
list results or call tenant-aware APIs) and ensure any
storage.update_dataset_tags_and_labels_in_stream and PARSEABLE.get_stream calls
include/are called with the resolved tenant or have the tenant header
overwritten server-side so clients cannot spoof it; ensure the same tenant
resolution is applied to both read (listing/get) and write
(put_dataset_tags/put_dataset_labels) flows.
---
Nitpick comments:
In `@src/parseable/mod.rs`:
- Around line 480-527: Create a StreamCreationParams (or StreamConfig) struct to
bundle the multiple stream creation arguments and refactor
create_stream_if_not_exists, create_stream, and create_update_stream to accept
that struct instead of the long parameter list; update the call site in
create_stream_if_not_exists to build a StreamCreationParams (using Default for
sensible defaults) and pass it to create_stream, adjust create_stream signature
to destructure or reference the struct fields (stream_type, custom_partition,
log_source, telemetry_type, dataset_tags, dataset_labels), remove the
#[allow(clippy::too_many_arguments)] on those functions, and add
conversions/constructors where needed so existing call sites can migrate with
minimal changes.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 139-151: The current read-modify-write in put_dataset_tags uses
PARSEABLE.get_stream(...), calls stream.get_dataset_labels(), then
storage.update_dataset_tags_and_labels_in_stream(...) and finally
stream.set_dataset_tags(...), which allows TOCTOU race with concurrent
put_dataset_labels; fix by serializing updates per-stream (or by providing a
combined tags+labels atomic endpoint). Add a per-stream mutex/async lock on the
stream (e.g., a metadata_lock() or similar on the stream returned by
PARSEABLE.get_stream) and acquire it around the read-modify-write sequence that
uses stream.get_dataset_labels,
storage.update_dataset_tags_and_labels_in_stream, and stream.set_dataset_tags so
concurrent tag/label updates for the same stream are executed sequentially.
---
Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 41-88: get_correlated_datasets currently iterates the global
PARSEABLE.streams.list() and PARSEABLE.get_stream(...) without restricting to
the tenant, causing cross-tenant exposure; update the handler to derive the
tenant context (e.g., from request auth/headers or a tenant path param) and then
restrict both the initial stream lookup (PARSEABLE.get_stream(&dataset_name))
and the loop over streams to the same tenant: either call a tenant-scoped
listing API (e.g., PARSEABLE.streams.list_for_tenant(tenant) if available) or
filter the results of PARSEABLE.streams.list() by a tenant identity exposed on
each stream (e.g., s.get_tenant() == tenant) before skipping and comparing;
ensure the check for the target dataset also validates it belongs to the tenant
so only tenant-scoped streams are considered.
419f53d to
4a3e23c
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)
150-156:⚠️ Potential issue | 🟡 MinorNormalize label input in PUT body before dedupe.
Line 150-Line 156 deduplicates raw strings but does not trim/filter whitespace-only labels, so empty labels can still be persisted.
Suggested fix
let final_labels = match body.labels { Some(labels) => labels .into_iter() + .map(|label| label.trim().to_string()) + .filter(|label| !label.is_empty()) .collect::<HashSet<_>>() .into_iter() .collect(), None => stream.get_dataset_labels(), };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/http/datasets.rs` around lines 150 - 156, Normalize and filter incoming labels before deduplication: when building final_labels from body.labels (the match arm handling Some(labels)), map each label to trimmed string and filter out labels that are empty or whitespace-only (e.g., labels.into_iter().map(|s| s.trim().to_string()).filter(|s| !s.is_empty())...), then collect into a HashSet to dedupe and back into the desired collection; keep the None branch using stream.get_dataset_labels() unchanged.
142-172:⚠️ Potential issue | 🔴 CriticalConcurrent partial metadata updates can still lose data.
Line 142-Line 172 does a read-modify-write using current in-memory values for absent fields, so two concurrent requests (tags-only and labels-only) can overwrite each other with stale counterparts.
Suggested direction
pub async fn put_dataset_metadata(...) -> Result<HttpResponse, DatasetsError> { ... let stream = PARSEABLE .get_stream(&dataset_name, &tenant_id) .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; + + // Serialize metadata updates for this stream (or use atomic storage merge). + // let _guard = stream.metadata_lock().await; let final_tags = match body.tags { ... }; let final_labels = match body.labels { ... }; storage .update_dataset_tags_and_labels_in_stream(...) .await .map_err(DatasetsError::Storage)?; stream.set_dataset_tags(final_tags.clone()); stream.set_dataset_labels(final_labels.clone());If a stream-scoped lock is unavailable, prefer a storage-level atomic patch API that updates only provided fields server-side.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/http/datasets.rs` around lines 142 - 172, Current code computes final_tags/final_labels from in-memory stream values and then does a read-modify-write, which lets concurrent requests overwrite each other; instead either acquire the stream-scoped lock around the read-modify-write (use stream.get_dataset_tags/get_dataset_labels and stream.set_dataset_tags/set_dataset_labels inside the lock) or, preferably, change the update to an atomic storage-side patch that accepts Option<HashSet<_>> and only updates provided fields (modify the call to storage.update_dataset_tags_and_labels_in_stream to pass Option types and implement server-side merge), removing the reliance on in-memory reads to prevent lost updates.
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)
982-988: Prefer atomic update for tags+labels to avoid transient mixed state.Line 982 and Line 986 update related metadata in two separate writes. A single setter that updates both fields under one lock would avoid short-lived inconsistent reads.
♻️ Suggested refactor
+ pub fn set_dataset_metadata(&self, tags: Vec<DatasetTag>, labels: Vec<String>) { + let mut metadata = self.metadata.write().expect(LOCK_EXPECT); + metadata.dataset_tags = tags; + metadata.dataset_labels = labels; + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/parseable/streams.rs` around lines 982 - 988, The two setters set_dataset_tags and set_dataset_labels perform separate metadata.write() calls causing transient inconsistent state; add a new atomic setter (e.g., set_dataset_tags_and_labels(&self, tags: Vec<DatasetTag>, labels: Vec<String>)) that acquires metadata.write() once and assigns both dataset_tags and dataset_labels inside the same lock, then update call sites to use the new method (or have the existing setters delegate to it) so modifications never occur in two separate writes; reference the metadata RwLock field and the existing set_dataset_tags/set_dataset_labels method names when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/handlers/http/modal/server.rs`:
- Around line 222-227: The route currently mounts only PUT /{name} using
http::datasets::put_dataset_metadata (authorized with Action::CreateStream), but
the API requires distinct replacement endpoints for tags and labels; add two
separate routes for "/{name}/tags" and "/{name}/labels" each using web::put()
and wired to the appropriate handlers (e.g., http::datasets::put_dataset_tags
and http::datasets::put_dataset_labels) with the same
authorize_for_resource(Action::CreateStream) policy, or if those handlers do not
exist, split http::datasets::put_dataset_metadata into two functions and route
them accordingly so clients can call PUT /datasets/{name}/tags and PUT
/datasets/{name}/labels.
In `@src/handlers/mod.rs`:
- Around line 95-114: The TryFrom<&str> for DatasetTag currently only accepts
the new literals (e.g., "agent-monitoring", "k8s-monitoring") causing older
values to fail; update the DatasetTag enum and its impl TryFrom to accept legacy
names by adding serde aliases on the enum variants (e.g., add #[serde(alias =
"agent-observability")] and #[serde(alias = "k8s-observability")] to
DatasetTag::AgentMonitoring and ::K8sMonitoring) and extend the match in impl
TryFrom (the function try_from) to also match the old strings
("agent-observability", "k8s-observability") mapping them to the corresponding
variants, keeping the existing error message unchanged.
In `@src/storage/mod.rs`:
- Around line 134-137: The struct currently only reads dataset_tags so legacy
single-key dataset_tag values are dropped; update deserialization/migration to
map dataset_tag -> dataset_tags by adding a compatibility conversion: in
src/storage/mod.rs ensure deserialization recognizes a legacy dataset_tag (alias
or custom Deserialize) and appends it into the dataset_tags Vec, or add a
migration step that, when bumping
CURRENT_OBJECT_STORE_VERSION/CURRENT_SCHEMA_VERSION, transforms any legacy
dataset_tag value into dataset_tags before validation; target the symbols
dataset_tags, dataset_tag, CURRENT_OBJECT_STORE_VERSION and
CURRENT_SCHEMA_VERSION when making this change so older metadata is preserved
and participates in tag-based APIs.
---
Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 150-156: Normalize and filter incoming labels before
deduplication: when building final_labels from body.labels (the match arm
handling Some(labels)), map each label to trimmed string and filter out labels
that are empty or whitespace-only (e.g., labels.into_iter().map(|s|
s.trim().to_string()).filter(|s| !s.is_empty())...), then collect into a HashSet
to dedupe and back into the desired collection; keep the None branch using
stream.get_dataset_labels() unchanged.
- Around line 142-172: Current code computes final_tags/final_labels from
in-memory stream values and then does a read-modify-write, which lets concurrent
requests overwrite each other; instead either acquire the stream-scoped lock
around the read-modify-write (use stream.get_dataset_tags/get_dataset_labels and
stream.set_dataset_tags/set_dataset_labels inside the lock) or, preferably,
change the update to an atomic storage-side patch that accepts
Option<HashSet<_>> and only updates provided fields (modify the call to
storage.update_dataset_tags_and_labels_in_stream to pass Option types and
implement server-side merge), removing the reliance on in-memory reads to
prevent lost updates.
---
Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 982-988: The two setters set_dataset_tags and set_dataset_labels
perform separate metadata.write() calls causing transient inconsistent state;
add a new atomic setter (e.g., set_dataset_tags_and_labels(&self, tags:
Vec<DatasetTag>, labels: Vec<String>)) that acquires metadata.write() once and
assigns both dataset_tags and dataset_labels inside the same lock, then update
call sites to use the new method (or have the existing setters delegate to it)
so modifications never occur in two separate writes; reference the metadata
RwLock field and the existing set_dataset_tags/set_dataset_labels method names
when making the change.
ℹ️ Review info
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (15)
src/connectors/kafka/processor.rssrc/handlers/http/datasets.rssrc/handlers/http/ingest.rssrc/handlers/http/mod.rssrc/handlers/http/modal/server.rssrc/handlers/http/modal/utils/logstream_utils.rssrc/handlers/mod.rssrc/metadata.rssrc/migration/mod.rssrc/parseable/mod.rssrc/parseable/streams.rssrc/prism/home/mod.rssrc/storage/field_stats.rssrc/storage/mod.rssrc/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- src/handlers/http/mod.rs
- src/storage/field_stats.rs
- src/handlers/http/ingest.rs
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/handlers/http/modal/server.rs`:
- Around line 210-220: The two route registrations conflict because
"/tags/{tag}" will match "/tags/correlated" before "/{name}/correlated", causing
get_correlated_datasets to be unreachable for a dataset named "tags"; change the
correlated endpoint path to avoid structural collision (for example replace
"/{name}/correlated" with "/correlated/{name}" in the route registration where
web::get().to(http::datasets::get_correlated_datasets).authorize_for_resource(Action::GetStreamInfo)
is set) and update the path extractor in the get_correlated_datasets handler and
any clients/docs referencing the old path so they use the new
"/correlated/{name}" form.
- Around line 200-227: The post_datasets handler currently derives tenant from
BasicAuth via extract_session_key_from_req; change it to read the tenant using
get_tenant_id_from_request (the same middleware-normalized header used by
get_info and the other dataset handlers) to enforce tenant scoping and prevent
header spoofing: locate the post_datasets function in http::prism_logstream,
remove or stop using extract_session_key_from_req for tenant resolution, call
get_tenant_id_from_request(req) (or equivalent helper used elsewhere) and pass
that tenant downstream to any dataset creation/validation logic so it matches
the behavior of get_datasets_by_tag, get_correlated_datasets, and
put_dataset_metadata.
PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags and
X-P-Dataset-Labels headers (comma-separated) on stream creation
PUT /api/prism/v1/datasets/{name} - update tags and labels
GET /api/prism/v1/datasets/{name}/correlated - find datasets sharing tags or labels
GET /api/prism/v1/datasets/tags/{tag} - find all datasets with a specific tag
include tags and labels in home api response
a77775d to
5d5a265
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)
197-202:⚠️ Potential issue | 🟡 MinorNormalize request-body labels before storing them.
This path still stores label bodies verbatim, so blank labels and whitespace variants can persist and skew correlation results.
Suggested fix
Some(labels) => labels .into_iter() + .map(|label| label.trim().to_string()) + .filter(|label| !label.is_empty()) .collect::<HashSet<_>>() .into_iter() .collect(),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/http/datasets.rs` around lines 197 - 202, The current final_labels assignment uses body.labels verbatim and can retain blank or whitespace-only variants; update the transformation so that when handling Some(labels) you trim each label (e.g., call trim on the string), discard empty results (filter out labels where trimmed.is_empty()), and then deduplicate by collecting into a HashSet before converting back to the Vec used by final_labels; apply this change to the final_labels computation that consumes body.labels so stored labels are normalized (trimmed and non-empty).
189-216:⚠️ Potential issue | 🔴 CriticalPartial PUTs still have a lost-update race.
Combining tags and labels into one storage call does not make this atomic because omitted fields are still read from current in-memory state first. A concurrent
{"tags": ...}request and{"labels": ...}request can each write back a stale copy of the other field and clobber one update. Either makePUTa full replacement that requires both arrays, or serialize this read/merge/write per stream.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/http/datasets.rs` around lines 189 - 216, The current PUT handler builds final_tags and final_labels by reading in-memory via stream.get_dataset_tags()/get_dataset_labels() and then calls storage.update_dataset_tags_and_labels_in_stream(...), which creates a lost-update race when concurrent partial PUTs update only one field; to fix, either require full replacement (validate that body.tags and body.labels are both Some and reject partial PUTs) or serialize read/merge/write per stream (introduce a per-stream mutex/lock when computing final_tags/final_labels and calling update_dataset_tags_and_labels_in_stream) so concurrent handlers cannot interleave; locate and modify the code that constructs final_tags/final_labels and the call to update_dataset_tags_and_labels_in_stream to implement one of these two strategies, using identifiers final_tags, final_labels, stream.get_dataset_tags/get_dataset_labels, and update_dataset_tags_and_labels_in_stream to find the right spot.
🧹 Nitpick comments (2)
src/handlers/mod.rs (2)
153-161: Same non-deterministic ordering concern forparse_dataset_labels.Similar to
parse_dataset_tags, the label ordering is non-deterministic afterHashSetdeduplication. This may affect API response consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/mod.rs` around lines 153 - 161, parse_dataset_labels currently deduplicates via HashSet which yields non-deterministic ordering; change it to produce a deterministic, stable order by using either a BTreeSet for deduplication or collect into a Vec then sort before returning. Update the function parse_dataset_labels (and mirror the same approach used for parse_dataset_tags if present) to trim and filter as now, then dedupe deterministically (BTreeSet::from_iter or dedupe Vec + sort) and return a Vec<String> with a stable sorted order.
131-151: Non-deterministic ordering inparse_dataset_tags.The function collects into a
HashSetfor deduplication, then converts toVec. This results in non-deterministic ordering of the returned tags. If consistent ordering matters for API responses or comparison purposes, consider sorting or usingIndexSetfrom theindexmapcrate.// Current: non-deterministic order .collect::<HashSet<_>>() .into_iter() .collect() // Alternative: deterministic order .collect::<HashSet<_>>() .into_iter() .sorted() // requires Ord impl or sorted_by .collect()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/mod.rs` around lines 131 - 151, parse_dataset_tags currently deduplicates via a HashSet then collects into a Vec which yields non-deterministic ordering; change it to produce a deterministic order by either (a) collecting into the HashSet for dedupe and then sorting the iterator (requiring DatasetTag: Ord or using a comparator) before collecting into Vec, or (b) replace the HashSet with an IndexSet from the indexmap crate to preserve insertion order; update the function parse_dataset_tags (and any uses) accordingly to ensure stable, deterministic ordering of the returned Vec<DatasetTag>.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 68-70: The 404 is returned too early because
PARSEABLE.get_stream(&dataset_name, &tenant_id) only checks the in-memory
registry; before mapping to DatasetsError::DatasetNotFound you must attempt the
same storage-hydration used by Parseable's logstream handlers to load the
dataset from object storage and populate the in-memory registry, then retry
get_stream; apply this change to the current occurrence and the similar one at
the later block (around lines 185-187) so that you only return
DatasetsError::DatasetNotFound if the stream is still absent after trying the
storage-load path.
---
Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 197-202: The current final_labels assignment uses body.labels
verbatim and can retain blank or whitespace-only variants; update the
transformation so that when handling Some(labels) you trim each label (e.g.,
call trim on the string), discard empty results (filter out labels where
trimmed.is_empty()), and then deduplicate by collecting into a HashSet before
converting back to the Vec used by final_labels; apply this change to the
final_labels computation that consumes body.labels so stored labels are
normalized (trimmed and non-empty).
- Around line 189-216: The current PUT handler builds final_tags and
final_labels by reading in-memory via
stream.get_dataset_tags()/get_dataset_labels() and then calls
storage.update_dataset_tags_and_labels_in_stream(...), which creates a
lost-update race when concurrent partial PUTs update only one field; to fix,
either require full replacement (validate that body.tags and body.labels are
both Some and reject partial PUTs) or serialize read/merge/write per stream
(introduce a per-stream mutex/lock when computing final_tags/final_labels and
calling update_dataset_tags_and_labels_in_stream) so concurrent handlers cannot
interleave; locate and modify the code that constructs final_tags/final_labels
and the call to update_dataset_tags_and_labels_in_stream to implement one of
these two strategies, using identifiers final_tags, final_labels,
stream.get_dataset_tags/get_dataset_labels, and
update_dataset_tags_and_labels_in_stream to find the right spot.
---
Nitpick comments:
In `@src/handlers/mod.rs`:
- Around line 153-161: parse_dataset_labels currently deduplicates via HashSet
which yields non-deterministic ordering; change it to produce a deterministic,
stable order by using either a BTreeSet for deduplication or collect into a Vec
then sort before returning. Update the function parse_dataset_labels (and mirror
the same approach used for parse_dataset_tags if present) to trim and filter as
now, then dedupe deterministically (BTreeSet::from_iter or dedupe Vec + sort)
and return a Vec<String> with a stable sorted order.
- Around line 131-151: parse_dataset_tags currently deduplicates via a HashSet
then collects into a Vec which yields non-deterministic ordering; change it to
produce a deterministic order by either (a) collecting into the HashSet for
dedupe and then sorting the iterator (requiring DatasetTag: Ord or using a
comparator) before collecting into Vec, or (b) replace the HashSet with an
IndexSet from the indexmap crate to preserve insertion order; update the
function parse_dataset_tags (and any uses) accordingly to ensure stable,
deterministic ordering of the returned Vec<DatasetTag>.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 9baefa2e-2d8a-4e4b-8fe1-ab8ef9c04ab3
📒 Files selected for processing (17)
src/connectors/kafka/processor.rssrc/handlers/http/datasets.rssrc/handlers/http/ingest.rssrc/handlers/http/mod.rssrc/handlers/http/modal/server.rssrc/handlers/http/modal/utils/logstream_utils.rssrc/handlers/http/prism_logstream.rssrc/handlers/mod.rssrc/metadata.rssrc/migration/mod.rssrc/parseable/mod.rssrc/parseable/streams.rssrc/prism/home/mod.rssrc/prism/logstream/mod.rssrc/storage/field_stats.rssrc/storage/mod.rssrc/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (7)
- src/handlers/http/mod.rs
- src/storage/field_stats.rs
- src/connectors/kafka/processor.rs
- src/handlers/http/modal/server.rs
- src/handlers/http/ingest.rs
- src/parseable/streams.rs
- src/handlers/http/modal/utils/logstream_utils.rs
| let stream = PARSEABLE | ||
| .get_stream(&dataset_name, &tenant_id) | ||
| .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?; |
There was a problem hiding this comment.
Load storage-backed datasets before returning DatasetNotFound.
These get_stream(...) calls only see the in-memory registry. On query/prism nodes, a real dataset can still live only in object storage until something else hydrates it, so both endpoints can 404 an existing stream. Reuse the same storage-load path used elsewhere before this lookup. Based on learnings: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes by loading from storage when the stream is absent from memory.
Also applies to: 185-187
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/handlers/http/datasets.rs` around lines 68 - 70, The 404 is returned too
early because PARSEABLE.get_stream(&dataset_name, &tenant_id) only checks the
in-memory registry; before mapping to DatasetsError::DatasetNotFound you must
attempt the same storage-hydration used by Parseable's logstream handlers to
load the dataset from object storage and populate the in-memory registry, then
retry get_stream; apply this change to the current occurrence and the similar
one at the later block (around lines 185-187) so that you only return
DatasetsError::DatasetNotFound if the stream is still absent after trying the
storage-load path.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/prism/home/mod.rs (1)
205-209:⚠️ Potential issue | 🟠 MajorPick a canonical metadata copy for tags and labels.
get_all_stream_jsonscan yield multipleObjectStoreFormatentries for one dataset, but this response copiesdataset_tagsanddataset_labelsfromstream_jsons[0]only. Since tags/labels are now mutable throughPUT /datasets/{name}, the home API can return stale metadata depending on which copy the metastore returns first. Read an explicit authoritative stream.json here instead of the first entry.Also applies to: 240-251
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/prism/home/mod.rs` around lines 205 - 209, The code currently calls PARSEABLE.metastore.get_all_stream_jsons and then copies dataset_tags/dataset_labels from stream_jsons[0], which can be stale; update get_stream_metadata to instead fetch the canonical/authoritative stream.json (not rely on the first entry) and extract dataset_tags and dataset_labels from that single authoritative result (e.g., use a metastore method that returns the canonical stream.json or a get_stream_json/get_authoritative_stream_json variant), and apply the same fix to the other block that currently reads stream_jsons[0] (the second occurrence mentioned at lines 240-251) so both places use the explicit authoritative stream.json when building StreamMetadataResponse.src/prism/logstream/mod.rs (1)
231-245:⚠️ Potential issue | 🟠 MajorEnforce the full per-stream permission set before building
PrismDatasetResponse.This path eventually returns
schema,stats,retention, andcounts, but the only dataset-level gate inprocess_streamisAction::ListStream. A user who can list a dataset can therefore still receive data that should sit behindGetSchema/Queryor other stronger actions. Validate the full action set per stream before callingbuild_dataset_response, or trim the response to only fields covered by the existing check.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/prism/logstream/mod.rs` around lines 231 - 245, The code currently only gates datasets by Action::ListStream in get_datasets/process_stream but still builds full PrismDatasetResponse (schema, stats, retention, counts); fix by enforcing per-stream permission checks for the full action set (e.g., Action::GetSchema, Action::Query, Action::GetRetention, Action::GetCounts, etc.) before calling build_dataset_response — either (A) perform explicit permission checks for those actions inside get_datasets (or have process_stream return the set of allowed actions) and only include fields allowed by those actions, or (B) change build_dataset_response to accept an allowed_actions flag and trim out schema/stats/retention/counts when the caller lacks the corresponding permission; apply this change around get_datasets, process_stream, and build_dataset_response so responses never expose data beyond the allowed per-stream actions.
♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)
197-203:⚠️ Potential issue | 🟡 MinorNormalize labels before deduplication and persistence.
Line 197-203 dedupes labels but does not trim/filter empty values, so whitespace-only labels can still be stored and matched inconsistently.
💡 Proposed fix
- let final_labels = match body.labels { - Some(labels) => labels - .into_iter() - .collect::<HashSet<_>>() - .into_iter() - .collect(), + let final_labels = match body.labels { + Some(labels) => labels + .into_iter() + .map(|label| label.trim().to_string()) + .filter(|label| !label.is_empty()) + .collect::<HashSet<_>>() + .into_iter() + .collect(), None => stream.get_dataset_labels(), };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/http/datasets.rs` around lines 197 - 203, final_labels is deduped but not normalized, allowing whitespace-only or untrimmed labels to persist; update the handling of body.labels (and the branch that falls back to stream.get_dataset_labels() if needed) to map each label through trim(), filter out labels that are empty after trimming, then collect into a HashSet to deduplicate and finally into the expected collection type used later (use the same symbol final_labels). Ensure both the Some(labels) branch and any code path that constructs labels from stream.get_dataset_labels() apply the same trim+filter+dedupe normalization.
68-70:⚠️ Potential issue | 🟠 MajorDefer
DatasetNotFounduntil storage hydration is attempted.Line 68-70 and Line 185-187 map in-memory
get_streammisses straight to 404. In query/standalone flows, this can incorrectly 404 datasets that exist in object storage but are not hydrated yet. Please apply the same storage-hydration existence pattern used in logstream handlers before returning not found.Based on learnings: stream existence checks should attempt storage-backed hydration in query/standalone mode before returning not found.
Also applies to: 185-187
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/http/datasets.rs` around lines 68 - 70, The current code calls PARSEABLE.get_stream(&dataset_name, &tenant_id) and immediately maps any error to DatasetsError::DatasetNotFound (e.g., in the block around PARSEABLE.get_stream and the similar check at lines 185-187), which causes a premature 404 for datasets that exist in object storage but aren't hydrated; instead, follow the logstream handler pattern: on get_stream error, attempt storage-backed hydration (invoke the same hydrate/storage-check routine used by logstream handlers for query/standalone flows) and only return DatasetsError::DatasetNotFound if hydration confirms the dataset is absent or hydration fails definitively; update the code paths around PARSEABLE.get_stream and the duplicated check to defer mapping to DatasetNotFound until after the hydration attempt completes.
🧹 Nitpick comments (2)
src/prism/home/mod.rs (1)
63-66: Keeptagsandlabelsin the serialized shape even when empty.With
skip_serializing_if = "Vec::is_empty", datasets that simply have no metadata look the same as responses from servers that do not support these fields at all. Returning[]keeps the home-response schema stable for clients.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/prism/home/mod.rs` around lines 63 - 66, The struct fields `tags: Vec<DatasetTag>` and `labels: Vec<String>` in src/prism/home/mod.rs should always be present in serialized output even when empty; remove the `skip_serializing_if = "Vec::is_empty"` attribute while keeping `#[serde(default)]` so empty vectors serialize as `[]`. Locate the `tags` and `labels` field definitions in the struct (the lines with `tags: Vec<DatasetTag>` and `labels: Vec<String>`) and delete only the `skip_serializing_if` portion of their serde attributes, leaving `default` intact to still provide an empty Vec when missing during deserialization.src/parseable/streams.rs (1)
982-988: Add a single setter for dataset metadata.The new dataset update flow has to call
set_dataset_tagsandset_dataset_labelsseparately, so readers can observe a mixed state between the two writes. A combined setter would keep the in-memory view coherent and avoid the second lock acquisition.♻️ Suggested refactor
+ pub fn set_dataset_metadata(&self, tags: Vec<DatasetTag>, labels: Vec<String>) { + let mut metadata = self.metadata.write().expect(LOCK_EXPECT); + metadata.dataset_tags = tags; + metadata.dataset_labels = labels; + } + pub fn set_dataset_tags(&self, tags: Vec<DatasetTag>) { self.metadata.write().expect(LOCK_EXPECT).dataset_tags = tags; } pub fn set_dataset_labels(&self, labels: Vec<String>) { self.metadata.write().expect(LOCK_EXPECT).dataset_labels = labels; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/parseable/streams.rs` around lines 982 - 988, Introduce a single atomic setter that updates both tags and labels under one write lock to avoid intermediate inconsistent reads: add a new method (e.g. set_dataset_metadata or set_dataset_tags_and_labels) that takes (tags: Vec<DatasetTag>, labels: Vec<String>) and performs one self.metadata.write().expect(LOCK_EXPECT) assignment setting both dataset_tags and dataset_labels in the same critical section, and update call sites to use this combined setter instead of calling set_dataset_tags and set_dataset_labels separately; keep the existing individual setters only if backward compatibility is required but prefer the combined setter to prevent two separate lock acquisitions and mixed-state visibility.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/prism/home/mod.rs`:
- Around line 205-209: The code currently calls
PARSEABLE.metastore.get_all_stream_jsons and then copies
dataset_tags/dataset_labels from stream_jsons[0], which can be stale; update
get_stream_metadata to instead fetch the canonical/authoritative stream.json
(not rely on the first entry) and extract dataset_tags and dataset_labels from
that single authoritative result (e.g., use a metastore method that returns the
canonical stream.json or a get_stream_json/get_authoritative_stream_json
variant), and apply the same fix to the other block that currently reads
stream_jsons[0] (the second occurrence mentioned at lines 240-251) so both
places use the explicit authoritative stream.json when building
StreamMetadataResponse.
In `@src/prism/logstream/mod.rs`:
- Around line 231-245: The code currently only gates datasets by
Action::ListStream in get_datasets/process_stream but still builds full
PrismDatasetResponse (schema, stats, retention, counts); fix by enforcing
per-stream permission checks for the full action set (e.g., Action::GetSchema,
Action::Query, Action::GetRetention, Action::GetCounts, etc.) before calling
build_dataset_response — either (A) perform explicit permission checks for those
actions inside get_datasets (or have process_stream return the set of allowed
actions) and only include fields allowed by those actions, or (B) change
build_dataset_response to accept an allowed_actions flag and trim out
schema/stats/retention/counts when the caller lacks the corresponding
permission; apply this change around get_datasets, process_stream, and
build_dataset_response so responses never expose data beyond the allowed
per-stream actions.
---
Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 197-203: final_labels is deduped but not normalized, allowing
whitespace-only or untrimmed labels to persist; update the handling of
body.labels (and the branch that falls back to stream.get_dataset_labels() if
needed) to map each label through trim(), filter out labels that are empty after
trimming, then collect into a HashSet to deduplicate and finally into the
expected collection type used later (use the same symbol final_labels). Ensure
both the Some(labels) branch and any code path that constructs labels from
stream.get_dataset_labels() apply the same trim+filter+dedupe normalization.
- Around line 68-70: The current code calls PARSEABLE.get_stream(&dataset_name,
&tenant_id) and immediately maps any error to DatasetsError::DatasetNotFound
(e.g., in the block around PARSEABLE.get_stream and the similar check at lines
185-187), which causes a premature 404 for datasets that exist in object storage
but aren't hydrated; instead, follow the logstream handler pattern: on
get_stream error, attempt storage-backed hydration (invoke the same
hydrate/storage-check routine used by logstream handlers for query/standalone
flows) and only return DatasetsError::DatasetNotFound if hydration confirms the
dataset is absent or hydration fails definitively; update the code paths around
PARSEABLE.get_stream and the duplicated check to defer mapping to
DatasetNotFound until after the hydration attempt completes.
---
Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 982-988: Introduce a single atomic setter that updates both tags
and labels under one write lock to avoid intermediate inconsistent reads: add a
new method (e.g. set_dataset_metadata or set_dataset_tags_and_labels) that takes
(tags: Vec<DatasetTag>, labels: Vec<String>) and performs one
self.metadata.write().expect(LOCK_EXPECT) assignment setting both dataset_tags
and dataset_labels in the same critical section, and update call sites to use
this combined setter instead of calling set_dataset_tags and set_dataset_labels
separately; keep the existing individual setters only if backward compatibility
is required but prefer the combined setter to prevent two separate lock
acquisitions and mixed-state visibility.
In `@src/prism/home/mod.rs`:
- Around line 63-66: The struct fields `tags: Vec<DatasetTag>` and `labels:
Vec<String>` in src/prism/home/mod.rs should always be present in serialized
output even when empty; remove the `skip_serializing_if = "Vec::is_empty"`
attribute while keeping `#[serde(default)]` so empty vectors serialize as `[]`.
Locate the `tags` and `labels` field definitions in the struct (the lines with
`tags: Vec<DatasetTag>` and `labels: Vec<String>`) and delete only the
`skip_serializing_if` portion of their serde attributes, leaving `default`
intact to still provide an empty Vec when missing during deserialization.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: e591f47e-21dd-4410-8761-13a3ec03a40b
📒 Files selected for processing (17)
src/connectors/kafka/processor.rssrc/handlers/http/datasets.rssrc/handlers/http/ingest.rssrc/handlers/http/mod.rssrc/handlers/http/modal/server.rssrc/handlers/http/modal/utils/logstream_utils.rssrc/handlers/http/prism_logstream.rssrc/handlers/mod.rssrc/metadata.rssrc/migration/mod.rssrc/parseable/mod.rssrc/parseable/streams.rssrc/prism/home/mod.rssrc/prism/logstream/mod.rssrc/storage/field_stats.rssrc/storage/mod.rssrc/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (7)
- src/handlers/http/mod.rs
- src/storage/field_stats.rs
- src/migration/mod.rs
- src/connectors/kafka/processor.rs
- src/handlers/http/ingest.rs
- src/parseable/mod.rs
- src/handlers/mod.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/handlers/mod.rs (1)
131-151: Non-deterministic ordering in parsed tags/labels.The
parse_dataset_tagsandparse_dataset_labelsfunctions collect into aHashSetfor deduplication then convert toVec. SinceHashSetiteration order is non-deterministic, the resulting vector order may vary across calls with the same input. This could cause issues with:
- Consistent API responses
- Testing
- Diff-based change detection
If ordering matters for any downstream use case (e.g., serialization stability), consider sorting the result or using
IndexSetfrom theindexmapcrate to preserve insertion order while deduplicating.♻️ Optional fix to ensure deterministic ordering
pub fn parse_dataset_tags(header_value: &str) -> Vec<DatasetTag> { - header_value + let mut tags: Vec<DatasetTag> = header_value .split(',') .filter_map(|s| { let trimmed = s.trim(); if trimmed.is_empty() { None } else { match DatasetTag::try_from(trimmed) { Ok(tag) => Some(tag), Err(err) => { warn!("Invalid dataset tag '{trimmed}': {err}"); None } } } }) .collect::<HashSet<_>>() .into_iter() - .collect() + .collect(); + tags.sort_by(|a, b| a.to_string().cmp(&b.to_string())); + tags }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/mod.rs` around lines 131 - 151, The functions parse_dataset_tags (and parse_dataset_labels) deduplicate via HashSet then collect into Vec, which yields non-deterministic ordering; update these functions to produce deterministic output by either (a) replacing the HashSet dedupe with an IndexSet from the indexmap crate to preserve insertion order while removing duplicates, or (b) after collecting into a Vec, sort the Vec deterministically (e.g., lexicographically by tag/label string or by a stable Ord implementation) before returning; apply the same change to both parse_dataset_tags and parse_dataset_labels so repeated calls with the same input yield a stable ordering.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/handlers/mod.rs`:
- Around line 131-151: The functions parse_dataset_tags (and
parse_dataset_labels) deduplicate via HashSet then collect into Vec, which
yields non-deterministic ordering; update these functions to produce
deterministic output by either (a) replacing the HashSet dedupe with an IndexSet
from the indexmap crate to preserve insertion order while removing duplicates,
or (b) after collecting into a Vec, sort the Vec deterministically (e.g.,
lexicographically by tag/label string or by a stable Ord implementation) before
returning; apply the same change to both parse_dataset_tags and
parse_dataset_labels so repeated calls with the same input yield a stable
ordering.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 9baefa2e-2d8a-4e4b-8fe1-ab8ef9c04ab3
📒 Files selected for processing (17)
src/connectors/kafka/processor.rssrc/handlers/http/datasets.rssrc/handlers/http/ingest.rssrc/handlers/http/mod.rssrc/handlers/http/modal/server.rssrc/handlers/http/modal/utils/logstream_utils.rssrc/handlers/http/prism_logstream.rssrc/handlers/mod.rssrc/metadata.rssrc/migration/mod.rssrc/parseable/mod.rssrc/parseable/streams.rssrc/prism/home/mod.rssrc/prism/logstream/mod.rssrc/storage/field_stats.rssrc/storage/mod.rssrc/storage/object_storage.rs
✅ Files skipped from review due to trivial changes (2)
- src/handlers/http/mod.rs
- src/parseable/streams.rs
🚧 Files skipped from review as they are similar to previous changes (4)
- src/connectors/kafka/processor.rs
- src/storage/field_stats.rs
- src/storage/object_storage.rs
- src/handlers/http/modal/server.rs
Summary by CodeRabbit
New Features
User-facing Data Model
Refactor