From aca26716a5f23ccf96701050568e2ba3dfe1c74f Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Tue, 23 Dec 2025 20:07:53 +0530 Subject: [PATCH 1/6] fix: remove associated filters during stream deletion --- src/handlers/http/logstream.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 604ab9c3a..c8e59c6f9 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -29,6 +29,7 @@ use crate::rbac::role::Action; use crate::stats::{Stats, event_labels_date, storage_size_labels_date}; use crate::storage::retention::Retention; use crate::storage::{ObjectStoreFormat, StreamInfo, StreamType}; +use crate::users::filters::{FILTERS, Filter}; use crate::utils::actix::extract_session_key_from_req; use crate::utils::json::flatten::{ self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels, @@ -56,6 +57,21 @@ pub async fn delete(stream_name: Path) -> Result = all_filters + .into_iter() + .filter(|filter| filter.stream_name == stream_name) + .collect(); + + for filter in filters_for_stream.iter() { + PARSEABLE.metastore.delete_filter(filter).await?; + + if let Some(filter_id) = filter.filter_id.as_ref() { + FILTERS.delete_filter(filter_id).await; + } + } + // Delete from storage objectstore.delete_stream(&stream_name).await?; // Delete from staging From 19704d9e4a374c4ffdc3a2545ca9d09bc313a9da Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Tue, 23 Dec 2025 20:56:34 +0530 Subject: [PATCH 2/6] chore: safe await --- src/handlers/http/logstream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index c8e59c6f9..28d1870f5 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -57,7 +57,7 @@ pub async fn delete(stream_name: Path) -> Result = all_filters .into_iter() From d812994ec62ecd30d1cda15fb029c2489799155f Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Wed, 24 Dec 2025 19:03:14 +0530 Subject: [PATCH 3/6] chore: refactor logic for deleting zombie filters --- src/handlers/http/logstream.rs | 22 ++++-------- src/metastore/metastore_traits.rs | 1 + .../metastores/object_store_metastore.rs | 34 ++++++++++++++++++- src/parseable/mod.rs | 9 +++++ 4 files changed, 49 insertions(+), 17 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 28d1870f5..fa15310c9 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -29,7 +29,6 @@ use crate::rbac::role::Action; use crate::stats::{Stats, event_labels_date, storage_size_labels_date}; use crate::storage::retention::Retention; use crate::storage::{ObjectStoreFormat, StreamInfo, StreamType}; -use crate::users::filters::{FILTERS, Filter}; use crate::utils::actix::extract_session_key_from_req; use crate::utils::json::flatten::{ self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels, @@ -57,21 +56,6 @@ pub async fn delete(stream_name: Path) -> Result = all_filters - .into_iter() - .filter(|filter| filter.stream_name == stream_name) - .collect(); - - for filter in filters_for_stream.iter() { - PARSEABLE.metastore.delete_filter(filter).await?; - - if let Some(filter_id) = filter.filter_id.as_ref() { - FILTERS.delete_filter(filter_id).await; - } - } - // Delete from storage objectstore.delete_stream(&stream_name).await?; // Delete from staging @@ -95,6 +79,12 @@ pub async fn delete(stream_name: Path) -> Result Result, MetastoreError>; async fn put_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; async fn delete_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; + async fn delete_zombie_filters(&self, stream_name: &str) -> Result; /// correlations async fn get_correlations(&self) -> Result, MetastoreError>; diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index d509bdfcc..79e408c12 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -56,7 +56,7 @@ use crate::{ parseable_json_path, schema_path, stream_json_path, to_bytes, }, }, - users::filters::{Filter, migrate_v1_v2}, + users::filters::{FILTERS, Filter, migrate_v1_v2}, }; /// Using PARSEABLE's storage as a metastore (default) @@ -546,6 +546,38 @@ impl Metastore for ObjectStoreMetastore { .await?) } + // clear filters associated to a deleted stream + async fn delete_zombie_filters(&self, stream_name: &str) -> Result { + // stream should not exist in order to have zombie filters + if PARSEABLE.check_stream_exists(stream_name) { + warn!("no zombie filters cleared for [undeleted] stream {}", stream_name); + return Ok(false); + } + + let all_filters = match PARSEABLE.metastore.get_filters().await { + Ok(all_f) => all_f, + Err(e) => { + return Err(e); + } + }; + + // collect filters associated with the logstream being deleted + let filters_for_stream: Vec = all_filters + .into_iter() + .filter(|filter| filter.stream_name == stream_name) + .collect(); + + for filter in filters_for_stream.iter() { + PARSEABLE.metastore.delete_filter(filter).await?; + + if let Some(filter_id) = filter.filter_id.as_ref() { + FILTERS.delete_filter(filter_id).await; + } + } + + return Ok(true); + } + /// Get all correlations async fn get_correlations(&self) -> Result, MetastoreError> { let mut correlations = Vec::new(); diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index d8cba4aea..80713c443 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -235,6 +235,15 @@ impl Parseable { .unwrap_or_default() } + // check if a stream exists + pub fn check_stream_exists(&self, stream_name: &str) -> bool { + if self.streams.contains(stream_name) { + return true; + } else { + return false; + } + } + // validate the storage, if the proper path for staging directory is provided // if the proper data directory is provided, or s3 bucket is provided etc pub async fn validate_storage(&self) -> Result, ObjectStorageError> { From 6e96d09de32080784d5cfbd3a37d8c365dba04a0 Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Wed, 24 Dec 2025 20:19:47 +0530 Subject: [PATCH 4/6] chore: simplification --- src/metastore/metastores/object_store_metastore.rs | 4 ++-- src/parseable/mod.rs | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index 79e408c12..5ef9dd67a 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -554,7 +554,7 @@ impl Metastore for ObjectStoreMetastore { return Ok(false); } - let all_filters = match PARSEABLE.metastore.get_filters().await { + let all_filters = match self.get_filters().await { Ok(all_f) => all_f, Err(e) => { return Err(e); @@ -568,7 +568,7 @@ impl Metastore for ObjectStoreMetastore { .collect(); for filter in filters_for_stream.iter() { - PARSEABLE.metastore.delete_filter(filter).await?; + self.delete_filter(filter).await?; if let Some(filter_id) = filter.filter_id.as_ref() { FILTERS.delete_filter(filter_id).await; diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 80713c443..cc969a98c 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -237,11 +237,7 @@ impl Parseable { // check if a stream exists pub fn check_stream_exists(&self, stream_name: &str) -> bool { - if self.streams.contains(stream_name) { - return true; - } else { - return false; - } + self.streams.contains(stream_name) } // validate the storage, if the proper path for staging directory is provided From ffd81aa429a713e0138682de4e61cd69f39fb7f2 Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Mon, 29 Dec 2025 01:57:48 +0530 Subject: [PATCH 5/6] chore: refactored zombie filter deletion into a general utlity func --- Cargo.lock | 170 ++++++++---------- src/handlers/http/logstream.rs | 9 +- .../http/modal/utils/logstream_utils.rs | 58 +++++- src/metastore/metastore_traits.rs | 1 - .../metastores/object_store_metastore.rs | 34 +--- src/parseable/mod.rs | 5 - 6 files changed, 132 insertions(+), 145 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43cda6fc4..fb0d3373f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,7 +81,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -218,7 +218,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -281,7 +281,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dd80fa0bd6217e482112d9d87a05af8e0f8dec9e3aa51f34816f761c5cf7da7" dependencies = [ "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -747,7 +747,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -900,7 +900,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.108", + "syn", ] [[package]] @@ -1231,7 +1231,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -1510,7 +1510,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.108", + "syn", ] [[package]] @@ -1521,7 +1521,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2019,7 +2019,7 @@ checksum = "1063ad4c9e094b3f798acee16d9a47bd7372d9699be2de21b05c3bd3f34ab848" dependencies = [ "datafusion-doc", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2209,7 +2209,7 @@ checksum = "30542c1ad912e0e3d22a1935c290e12e8a29d704a420177a31faad4a601a0800" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2230,7 +2230,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2240,7 +2240,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" dependencies = [ "derive_builder_core", - "syn 2.0.108", + "syn", ] [[package]] @@ -2253,7 +2253,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.108", + "syn", ] [[package]] @@ -2283,7 +2283,7 @@ dependencies = [ "convert_case 0.6.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn", "unicode-xid", ] @@ -2297,7 +2297,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.108", + "syn", "unicode-xid", ] @@ -2320,7 +2320,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2527,7 +2527,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2706,6 +2706,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac-sha256" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad6880c8d4a9ebf39c6e8b77007ce223f646a4d21ce29d99f70cb16420545425" + [[package]] name = "hostname" version = "0.4.0" @@ -3064,7 +3070,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -3073,16 +3079,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" -[[package]] -name = "idna" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" -dependencies = [ - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "1.1.0" @@ -3585,7 +3581,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -3652,14 +3648,15 @@ checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "openid" -version = "0.15.0" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "627898ab5b3fff5e5f1dc0e404bafdbb87a4337d815e86149f53640380946ccc" +checksum = "c0a9d93c04da2d5e11578af6207f163c0816698b24c25a7aefae06a71e2d07bb" dependencies = [ "base64 0.22.1", "biscuit", "chrono", - "lazy_static", + "getrandom 0.3.1", + "hmac-sha256", "mime", "reqwest 0.12.12", "serde", @@ -4041,7 +4038,7 @@ checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4084,7 +4081,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.108", + "syn", ] [[package]] @@ -4097,27 +4094,25 @@ dependencies = [ ] [[package]] -name = "proc-macro-error" -version = "1.0.4" +name = "proc-macro-error-attr2" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" dependencies = [ - "proc-macro-error-attr", "proc-macro2", "quote", - "syn 1.0.109", - "version_check", ] [[package]] -name = "proc-macro-error-attr" -version = "1.0.4" +name = "proc-macro-error2" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" dependencies = [ + "proc-macro-error-attr2", "proc-macro2", "quote", - "version_check", + "syn", ] [[package]] @@ -4227,7 +4222,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4240,7 +4235,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4502,7 +4497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" dependencies = [ "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4700,7 +4695,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.108", + "syn", "unicode-ident", ] @@ -4718,7 +4713,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.108", + "syn", "unicode-ident", ] @@ -4995,7 +4990,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5043,7 +5038,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5228,7 +5223,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5289,7 +5284,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.108", + "syn", ] [[package]] @@ -5298,16 +5293,6 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.108" @@ -5342,7 +5327,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5425,7 +5410,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5436,7 +5421,7 @@ checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5552,7 +5537,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5771,7 +5756,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5855,27 +5840,12 @@ version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" -[[package]] -name = "unicode-bidi" -version = "0.3.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" - [[package]] name = "unicode-ident" version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034" -[[package]] -name = "unicode-normalization" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" -dependencies = [ - "tinyvec", -] - [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -5934,7 +5904,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" dependencies = [ "form_urlencoded", - "idna 1.1.0", + "idna", "percent-encoding", "serde", ] @@ -5970,11 +5940,11 @@ dependencies = [ [[package]] name = "validator" -version = "0.18.1" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db79c75af171630a3148bd3e6d7c4f42b6a9a014c2945bc5ed0020cbb8d9478e" +checksum = "d0b4a29d8709210980a09379f27ee31549b73292c87ab9899beee1c0d3be6303" dependencies = [ - "idna 0.5.0", + "idna", "once_cell", "regex", "serde", @@ -5986,16 +5956,16 @@ dependencies = [ [[package]] name = "validator_derive" -version = "0.18.2" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df0bcf92720c40105ac4b2dda2a4ea3aa717d4d6a862cc217da653a4bd5c6b10" +checksum = "bac855a2ce6f843beb229757e6e570a42e837bcb15e5f449dd48d5747d41bf77" dependencies = [ "darling", "once_cell", - "proc-macro-error", + "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6126,7 +6096,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.108", + "syn", "wasm-bindgen-shared", ] @@ -6161,7 +6131,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6303,7 +6273,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6314,7 +6284,7 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6668,7 +6638,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", "synstructure", ] @@ -6699,7 +6669,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6710,7 +6680,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6730,7 +6700,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", "synstructure", ] @@ -6759,7 +6729,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index fa15310c9..5839bf57a 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -20,6 +20,7 @@ use self::error::StreamError; use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; use super::query::update_schema_when_distributed; use crate::event::format::override_data_type; +use crate::handlers::http::modal::utils::logstream_utils::delete_zombie_filters; use crate::hottier::{CURRENT_HOT_TIER_VERSION, HotTierManager, StreamHotTier}; use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; @@ -81,9 +82,11 @@ pub async fn delete(stream_name: Path) -> Result for PutStreamHeaders { } } } + + +#[derive(thiserror::Error, Debug)] +pub enum ZombieFiltersDeletionError { + #[error("{0}")] + StreamStillExists(#[from] StreamFoundForZombieFilters), + + #[error("Metastore error: {0}")] + GetFromMetastore(#[from] MetastoreError) +} + +#[derive(Debug, thiserror::Error)] +#[error("Stream still exists for zombie filters: {0}")] +pub struct StreamFoundForZombieFilters(String); + + +pub async fn delete_zombie_filters(stream_name: &str) -> Result<(), ZombieFiltersDeletionError> { + // stream should not exist in order to have zombie filters + if PARSEABLE.streams.contains(stream_name) { + return Err(ZombieFiltersDeletionError::StreamStillExists( + StreamFoundForZombieFilters(stream_name.to_string()) + )); + } + + let all_filters = PARSEABLE.metastore.get_filters().await?; + + // collect filters associated with the logstream being deleted + let filters_for_stream: Vec = all_filters + .into_iter() + .filter(|filter| filter.stream_name == stream_name) + .collect(); + + for filter in filters_for_stream.iter() { + if let Err(err) = PARSEABLE.metastore.delete_filter(filter).await { + tracing::warn!( + "failed to delete the zombie filter: {} \nfrom storage. For logstream: {}\nError: {:#?}", + filter.filter_name, + stream_name, + err + ); + } + + if let Some(filter_id) = filter.filter_id.as_ref() { + FILTERS.delete_filter(filter_id).await; + } + } + + return Ok(()); +} \ No newline at end of file diff --git a/src/metastore/metastore_traits.rs b/src/metastore/metastore_traits.rs index 88badfe52..ed4ea6f20 100644 --- a/src/metastore/metastore_traits.rs +++ b/src/metastore/metastore_traits.rs @@ -108,7 +108,6 @@ pub trait Metastore: std::fmt::Debug + Send + Sync { async fn get_filters(&self) -> Result, MetastoreError>; async fn put_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; async fn delete_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; - async fn delete_zombie_filters(&self, stream_name: &str) -> Result; /// correlations async fn get_correlations(&self) -> Result, MetastoreError>; diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index 5ef9dd67a..d509bdfcc 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -56,7 +56,7 @@ use crate::{ parseable_json_path, schema_path, stream_json_path, to_bytes, }, }, - users::filters::{FILTERS, Filter, migrate_v1_v2}, + users::filters::{Filter, migrate_v1_v2}, }; /// Using PARSEABLE's storage as a metastore (default) @@ -546,38 +546,6 @@ impl Metastore for ObjectStoreMetastore { .await?) } - // clear filters associated to a deleted stream - async fn delete_zombie_filters(&self, stream_name: &str) -> Result { - // stream should not exist in order to have zombie filters - if PARSEABLE.check_stream_exists(stream_name) { - warn!("no zombie filters cleared for [undeleted] stream {}", stream_name); - return Ok(false); - } - - let all_filters = match self.get_filters().await { - Ok(all_f) => all_f, - Err(e) => { - return Err(e); - } - }; - - // collect filters associated with the logstream being deleted - let filters_for_stream: Vec = all_filters - .into_iter() - .filter(|filter| filter.stream_name == stream_name) - .collect(); - - for filter in filters_for_stream.iter() { - self.delete_filter(filter).await?; - - if let Some(filter_id) = filter.filter_id.as_ref() { - FILTERS.delete_filter(filter_id).await; - } - } - - return Ok(true); - } - /// Get all correlations async fn get_correlations(&self) -> Result, MetastoreError> { let mut correlations = Vec::new(); diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index cc969a98c..d8cba4aea 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -235,11 +235,6 @@ impl Parseable { .unwrap_or_default() } - // check if a stream exists - pub fn check_stream_exists(&self, stream_name: &str) -> bool { - self.streams.contains(stream_name) - } - // validate the storage, if the proper path for staging directory is provided // if the proper data directory is provided, or s3 bucket is provided etc pub async fn validate_storage(&self) -> Result, ObjectStorageError> { From fbc3b5250f6b790f6d9d25f01a83b5f3116b2310 Mon Sep 17 00:00:00 2001 From: spuckhafte Date: Mon, 29 Dec 2025 02:26:47 +0530 Subject: [PATCH 6/6] chore: cleanup, simplification --- src/handlers/http/logstream.rs | 4 +--- src/handlers/http/modal/utils/logstream_utils.rs | 10 +++++----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 5839bf57a..06a1249d3 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -84,9 +84,7 @@ pub async fn delete(stream_name: Path) -> Result Result<(), ZombieFilter stream_name, err ); - } - - if let Some(filter_id) = filter.filter_id.as_ref() { - FILTERS.delete_filter(filter_id).await; + } else { // ok: have the filter removed from memory only when the storage deletion succeeds + if let Some(filter_id) = filter.filter_id.as_ref() { + FILTERS.delete_filter(filter_id).await; + } } } - return Ok(()); + Ok(()) } \ No newline at end of file