-
-
Notifications
You must be signed in to change notification settings - Fork 158
fix: remove associated filters during stream deletion #1497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughInvokes a new async Changes
Sequence Diagram(s)sequenceDiagram
participant Handler as HTTP Handler
participant Streams as PARSEABLE.streams
participant Metastore as PARSEABLE.metastore
participant Cache as FILTERS Cache
Handler->>Handler: Delete stream stats
Handler->>Streams: Check stream existence
alt Stream still exists
Streams-->>Handler: StreamFound -> return StreamStillExists error
else Stream absent
Handler->>Metastore: List all filters
Metastore-->>Handler: Filters list
Handler->>Handler: Filter list by stream_name
loop For each matching filter
Handler->>Metastore: delete_filter(filter_id)
alt Deletion succeeds
Metastore-->>Handler: OK
else Deletion fails
Metastore-->>Handler: Error (logged as warning)
end
Handler->>Cache: Remove in-memory entry for filter_id (if present)
Cache-->>Handler: OK
end
Handler-->>Handler: Return Ok(())
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
CLA Assistant Lite bot: I have read the CLA Document and I hereby sign the CLA You can retrigger this bot by commenting recheck in this Pull Request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/handlers/http/logstream.rs
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.
Applied to files:
src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.
Applied to files:
src/handlers/http/logstream.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).
Applied to files:
src/handlers/http/logstream.rs
🔇 Additional comments (1)
src/handlers/http/logstream.rs (1)
32-32: LGTM!The import of
Filteris necessary for the new filter deletion logic and is correctly placed.
spuckhafte
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let PARSEABLE::metastore::get_filters be queryable using logstream name or id.
The current implementation of it fetches all the filters across existing logstreams at once.
parmesant
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move filter deletion so that it happens after stream deletion. Also, moving the filter deletion logic in a utility function would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/parseable/mod.rs (1)
238-245: Simplify the boolean return expression.The if-else construct is redundant when returning a boolean derived from a condition. This can be simplified to a single expression.
🔎 Proposed simplification
- // check if a stream exists - pub fn check_stream_exists(&self, stream_name: &str) -> bool { - if self.streams.contains(stream_name) { - return true; - } else { - return false; - } - } + /// Checks if a stream exists in the in-memory streams map. + pub fn check_stream_exists(&self, stream_name: &str) -> bool { + self.streams.contains(stream_name) + }src/metastore/metastores/object_store_metastore.rs (1)
549-579: Useselfmethods instead of globalPARSEABLE.metastorecalls.The implementation calls
PARSEABLE.metastore.get_filters()andPARSEABLE.metastore.delete_filter()instead of usingself.get_filters()andself.delete_filter(). This is inconsistent with other methods in this trait implementation and creates unnecessary indirection through the global static. Usingselfis more idiomatic and avoids the implicit assumption thatPARSEABLE.metastoreis the same instance.🔎 Proposed fix
// clear filters associated to a deleted stream async fn delete_zombie_filters(&self, stream_name: &str) -> Result<bool, MetastoreError> { // stream should not exist in order to have zombie filters if PARSEABLE.check_stream_exists(stream_name) { warn!("no zombie filters cleared for [undeleted] stream {}", stream_name); return Ok(false); } - let all_filters = match PARSEABLE.metastore.get_filters().await { - Ok(all_f) => all_f, - Err(e) => { - return Err(e); - } - }; + let all_filters = self.get_filters().await?; // collect filters associated with the logstream being deleted let filters_for_stream: Vec<Filter> = all_filters .into_iter() .filter(|filter| filter.stream_name == stream_name) .collect(); for filter in filters_for_stream.iter() { - PARSEABLE.metastore.delete_filter(filter).await?; - + self.delete_filter(filter).await?; + if let Some(filter_id) = filter.filter_id.as_ref() { FILTERS.delete_filter(filter_id).await; } } - return Ok(true); + Ok(true) }
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/handlers/http/logstream.rssrc/metastore/metastore_traits.rssrc/metastore/metastores/object_store_metastore.rssrc/parseable/mod.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/handlers/http/logstream.rs
🧰 Additional context used
🧠 Learnings (7)
📓 Common learnings
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/parseable/mod.rssrc/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.
Applied to files:
src/parseable/mod.rs
📚 Learning: 2025-07-28T17:10:39.448Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1392
File: src/migration/stream_metadata_migration.rs:303-322
Timestamp: 2025-07-28T17:10:39.448Z
Learning: In Parseable's migration system (src/migration/stream_metadata_migration.rs), each migration function updates the metadata to the current latest format using CURRENT_OBJECT_STORE_VERSION and CURRENT_SCHEMA_VERSION constants, rather than producing incremental versions. For example, v5_v6 function produces v7 format output when these constants are set to "v7", not v6 format.
Applied to files:
src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).
Applied to files:
src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).
Applied to files:
src/metastore/metastores/object_store_metastore.rs
🧬 Code graph analysis (2)
src/metastore/metastore_traits.rs (2)
src/metastore/metastores/object_store_metastore.rs (1)
delete_zombie_filters(550-579)src/validator.rs (1)
stream_name(36-71)
src/metastore/metastores/object_store_metastore.rs (2)
src/users/filters.rs (1)
migrate_v1_v2(181-193)src/metastore/metastore_traits.rs (1)
delete_zombie_filters(111-111)
🔇 Additional comments (3)
src/metastore/metastore_traits.rs (1)
111-111: LGTM - trait method signature is appropriate.The new
delete_zombie_filtersmethod follows the existing trait patterns and is logically grouped with other filter methods. Consider adding a doc comment to clarify the return semantics (true= filters deleted,false= stream still exists).src/metastore/metastores/object_store_metastore.rs (2)
59-59: LGTM - import updated correctly.The
FILTERSimport is necessary for in-memory cache cleanup during zombie filter deletion.
570-576: Consider partial failure handling.If deletion fails mid-loop (e.g., network error on the 3rd of 5 filters), the method returns an error but some filters have already been deleted from storage. The in-memory
FILTERScache may become inconsistent with storage state. Consider whether to:
- Continue deleting remaining filters and collect errors
- Log which filters were successfully deleted before the error
This is a minor robustness concern since the operation can be retried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/metastore/metastores/object_store_metastore.rs (1)
549-579: LGTM! Zombie filter cleanup logic is sound.The implementation correctly:
- Guards against cleaning filters for active streams (line 552)
- Fetches and filters to stream-specific filters
- Removes from both storage and in-memory cache for consistency
The dual deletion (storage + in-memory) at lines 571-574 is essential to prevent stale filter references.
Optional: Simplify code style
- let all_filters = match self.get_filters().await { - Ok(all_f) => all_f, - Err(e) => { - return Err(e); - } - }; + let all_filters = self.get_filters().await?; // collect filters associated with the logstream being deleted let filters_for_stream: Vec<Filter> = all_filters @@ -569,13 +564,11 @@ for filter in filters_for_stream.iter() { self.delete_filter(filter).await?; - if let Some(filter_id) = filter.filter_id.as_ref() { FILTERS.delete_filter(filter_id).await; } } - return Ok(true); + Ok(true) }
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/metastore/metastores/object_store_metastore.rssrc/parseable/mod.rs
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).
Applied to files:
src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-07-28T17:10:39.448Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1392
File: src/migration/stream_metadata_migration.rs:303-322
Timestamp: 2025-07-28T17:10:39.448Z
Learning: In Parseable's migration system (src/migration/stream_metadata_migration.rs), each migration function updates the metadata to the current latest format using CURRENT_OBJECT_STORE_VERSION and CURRENT_SCHEMA_VERSION constants, rather than producing incremental versions. For example, v5_v6 function produces v7 format output when these constants are set to "v7", not v6 format.
Applied to files:
src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-09-06T04:26:17.191Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1424
File: src/enterprise/utils.rs:65-72
Timestamp: 2025-09-06T04:26:17.191Z
Learning: In Parseable's metastore implementation, MetastoreError::to_detail() returns a MetastoreErrorDetail struct (not a string), which contains structured error information including operation, message, stream_name, and other contextual fields. This struct is designed to be boxed in ObjectStorageError::MetastoreError(Box<MetastoreErrorDetail>).
Applied to files:
src/metastore/metastores/object_store_metastore.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/metastore/metastores/object_store_metastore.rssrc/parseable/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.
Applied to files:
src/parseable/mod.rs
📚 Learning: 2025-09-05T09:18:44.813Z
Learnt from: parmesant
Repo: parseablehq/parseable PR: 1425
File: src/query/mod.rs:484-495
Timestamp: 2025-09-05T09:18:44.813Z
Learning: In the Parseable system, stream names and column names cannot contain quotes, which eliminates SQL injection concerns when interpolating these identifiers directly into SQL queries in src/query/mod.rs.
Applied to files:
src/parseable/mod.rs
🧬 Code graph analysis (2)
src/metastore/metastores/object_store_metastore.rs (2)
src/users/filters.rs (1)
migrate_v1_v2(181-193)src/metastore/metastore_traits.rs (1)
delete_zombie_filters(111-111)
src/parseable/mod.rs (1)
src/validator.rs (1)
stream_name(36-71)
🔇 Additional comments (2)
src/parseable/mod.rs (1)
238-241: LGTM! Clean helper method.The synchronous in-memory stream existence check is straightforward and fits the need for the metastore filter cleanup operation. The naming clearly conveys its purpose.
src/metastore/metastores/object_store_metastore.rs (1)
59-59: LGTM! Import supports in-memory cache cleanup.The addition of
FILTERSto imports is necessary for the in-memory filter deletion at line 574.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/handlers/http/modal/utils/logstream_utils.rs (2)
112-125: Consider moving in-memory cache deletion inside the success branch.Currently, if
delete_filterfails on the metastore, the warning is logged but the in-memoryFILTERScache is still cleaned up (lines 122-124). This creates an inconsistency where the filter exists in persistent storage but not in memory—on restart, the filter would reappear.Consider deleting from the cache only when metastore deletion succeeds:
🔎 Proposed fix
for filter in filters_for_stream.iter() { - if let Err(err) = PARSEABLE.metastore.delete_filter(filter).await { + match PARSEABLE.metastore.delete_filter(filter).await { + Ok(_) => { + if let Some(filter_id) = filter.filter_id.as_ref() { + FILTERS.delete_filter(filter_id).await; + } + } + Err(err) => { tracing::warn!( "failed to delete the zombie filter: {} \nfrom storage. For logstream: {}\nError: {:#?}", filter.filter_name, stream_name, err ); - } - - if let Some(filter_id) = filter.filter_id.as_ref() { - FILTERS.delete_filter(filter_id).await; + } } }
127-127: Remove explicitreturnfor idiomatic Rust.The trailing
returnkeyword is unnecessary when the expression is the last statement in the function.🔎 Proposed fix
- return Ok(()); + Ok(())
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
src/handlers/http/logstream.rssrc/handlers/http/modal/utils/logstream_utils.rs
🧰 Additional context used
🧠 Learnings (7)
📚 Learning: 2025-06-16T02:04:58.990Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.990Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.
Applied to files:
src/handlers/http/logstream.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/handlers/http/logstream.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.
Applied to files:
src/handlers/http/logstream.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).
Applied to files:
src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.
Applied to files:
src/handlers/http/logstream.rssrc/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.
Applied to files:
src/handlers/http/logstream.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.
Applied to files:
src/handlers/http/logstream.rs
🧬 Code graph analysis (2)
src/handlers/http/logstream.rs (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)
delete_zombie_filters(96-128)
src/handlers/http/modal/utils/logstream_utils.rs (2)
src/parseable/mod.rs (1)
storage(284-286)src/storage/mod.rs (1)
from(182-188)
🔇 Additional comments (4)
src/handlers/http/logstream.rs (2)
23-23: LGTM!Import correctly added for the new zombie filters cleanup utility.
83-90: LGTM with a note on failure semantics.The zombie filter cleanup is correctly integrated after the stream is removed from memory. The error conversion to
Anyhowloses type information but is acceptable for this use case.Note: If this call fails after the stream has already been deleted from storage/memory (lines 61-79), the user receives an error response even though the stream deletion effectively succeeded. This is a reasonable trade-off since filter cleanup errors are logged with warnings and the function continues processing remaining filters.
src/handlers/http/modal/utils/logstream_utils.rs (2)
21-32: LGTM!Imports are correctly updated to include the necessary types for the zombie filters cleanup feature.
82-93: LGTM!Error types are well-defined using
thiserror. TheStreamFoundForZombieFilterswrapper provides clear error messaging, and the enum properly distinguishes between the stream-still-exists case and metastore errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/handlers/http/modal/utils/logstream_utils.rs (1)
114-119: Prefer structured logging over embedded newlines.The log message uses
\ncharacters which may not render well in structured logging systems (JSON logs, etc.). Using tracing's field syntax is more portable.🔎 Suggested structured logging format
- tracing::warn!( - "failed to delete the zombie filter: {} \nfrom storage. For logstream: {}\nError: {:#?}", - filter.filter_name, - stream_name, - err - ); + tracing::warn!( + filter_name = %filter.filter_name, + stream_name = %stream_name, + error = ?err, + "failed to delete zombie filter from storage" + );
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/handlers/http/logstream.rssrc/handlers/http/modal/utils/logstream_utils.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/handlers/http/logstream.rs
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.
Applied to files:
src/handlers/http/modal/utils/logstream_utils.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/handlers/http/modal/utils/logstream_utils.rs
🧬 Code graph analysis (1)
src/handlers/http/modal/utils/logstream_utils.rs (2)
src/parseable/mod.rs (1)
storage(284-286)src/storage/mod.rs (1)
from(182-188)
🔇 Additional comments (3)
src/handlers/http/modal/utils/logstream_utils.rs (3)
21-32: Imports look correct for the new functionality.The added imports support the new
delete_zombie_filtersfunction and error types appropriately.
82-93: Error types are well-defined.The error enum and wrapper struct are appropriately structured. Using
#[from]for automatic conversions is idiomatic.
104-110: The metastore trait does not support filtering by stream_name. The only available method isget_filters()which returns all filters regardless of stream. The current client-side filtering approach in lines 107-110 is the only viable implementation given the metastore API design.Likely an incorrect or invalid review comment.
| for filter in filters_for_stream.iter() { | ||
| if let Err(err) = PARSEABLE.metastore.delete_filter(filter).await { | ||
| tracing::warn!( | ||
| "failed to delete the zombie filter: {} \nfrom storage. For logstream: {}\nError: {:#?}", | ||
| filter.filter_name, | ||
| stream_name, | ||
| err | ||
| ); | ||
| } else { // ok: have the filter removed from memory only when the storage deletion succeeds | ||
| if let Some(filter_id) = filter.filter_id.as_ref() { | ||
| FILTERS.delete_filter(filter_id).await; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find Filter struct definition
rg -n "struct Filter" -A 10 --type rustRepository: parseablehq/parseable
Length of output: 4126
🏁 Script executed:
# Check how delete_zombie_filters is called
rg -n "delete_zombie_filters" --type rust -B 2 -A 2Repository: parseablehq/parseable
Length of output: 1351
🏁 Script executed:
# Find where filter_id is used and if it's ever None
rg -n "filter_id" --type rust -C 3 | head -150Repository: parseablehq/parseable
Length of output: 10295
🏁 Script executed:
# Find complete delete_zombie_filters implementation
rg -n "pub async fn delete_zombie_filters" -A 30 --type rust src/handlers/http/modal/utils/logstream_utils.rsRepository: parseablehq/parseable
Length of output: 1442
🏁 Script executed:
# Find get_filters_for_stream implementation
rg -n "get_filters_for_stream" --type rust -A 10Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
# Check ZombieFiltersDeletionError definition
rg -n "enum ZombieFiltersDeletionError\|struct ZombieFiltersDeletionError" --type rust -A 5Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
# Search for ZombieFiltersDeletionError
rg -n "ZombieFiltersDeletionError" --type rustRepository: parseablehq/parseable
Length of output: 422
🏁 Script executed:
# Check the file containing delete_zombie_filters for error definitions
head -100 src/handlers/http/modal/utils/logstream_utils.rsRepository: parseablehq/parseable
Length of output: 3705
🏁 Script executed:
# Check how PARSEABLE.metastore.delete_filter is defined and what it returns
rg -n "fn delete_filter" --type rust -A 5Repository: parseablehq/parseable
Length of output: 1590
🏁 Script executed:
# Check how filters are loaded from metastore and if filter_id is always set
rg -n "get_filters\(" --type rust -A 10 | head -80Repository: parseablehq/parseable
Length of output: 3834
🏁 Script executed:
# Check ObjectStoreMetastore implementation of get_filters
rg -n "async fn get_filters" --type rust -A 20Repository: parseablehq/parseable
Length of output: 3827
🏁 Script executed:
# Get the complete get_filters implementation
sed -n '457,530p' src/metastore/metastores/object_store_metastore.rsRepository: parseablehq/parseable
Length of output: 3410
Partial filter deletion failures are silently swallowed, creating potential cleanup inconsistencies during stream deletion.
The function always returns Ok(()) even when individual filter deletions fail (line 113-119). While failures are logged, the caller (logstream.rs:85-87) uses the ? operator expecting error propagation on critical cleanup failures. This mismatch means incomplete filter cleanup during stream deletion will silently succeed.
Additionally, if a filter has filter_id = None, the metastore deletion succeeds (line 113) but the in-memory cache is never updated (line 121-123 only executes for Some), creating a storage/memory inconsistency.
Consider:
- Track and propagate deletion failures so the caller can handle incomplete cleanup appropriately
- Add a warning when
filter_idis None after successful storage deletion - Use structured logging fields instead of
\nin the warning message
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
Bug Fixes
New Features
✏️ Tip: You can customize this high-level summary in your review settings.