From d3bf73bb63b01fca8291fd5e973bd2618ddba990 Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Sun, 3 May 2026 22:58:31 +0100 Subject: [PATCH] feat(PoC): Segment membership inspection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the `segment_membership` Django app — a Roaring-bitmap-backed index that lets us count, list, sample, and stream the identities matching a segment. Atoms are unary predicates over (property, operator, operand); each unique atom gets one bitmap of `Identity.id` values, and segment membership composes via Boolean ops on those bitmaps. Async maintenance runs via flagsmith-task-processor. Includes smoke tests covering the operator vocabulary end-to-end with a differential check against `is_context_in_segment`. Maintenance fires only when `settings.SEGMENT_MEMBERSHIP_ENABLED = True`, so this is dark-no-op in self-hosted and SaaS until enabled. beep boop --- api/app/settings/common.py | 1 + api/environments/identities/models.py | 19 + api/environments/identities/signals.py | 18 + api/poetry.lock | 97 ++- api/pyproject.toml | 1 + api/segment_membership/__init__.py | 0 api/segment_membership/apps.py | 15 + api/segment_membership/constants.py | 17 + api/segment_membership/dataclasses.py | 45 ++ api/segment_membership/management/__init__.py | 0 .../management/commands/__init__.py | 0 .../commands/backfill_segment_membership.py | 92 +++ .../stress_test_segment_membership.py | 737 ++++++++++++++++++ .../commands/verify_segment_membership.py | 143 ++++ api/segment_membership/mappers.py | 167 ++++ .../0001_add_segment_membership_index.py | 101 +++ api/segment_membership/migrations/__init__.py | 0 api/segment_membership/models.py | 65 ++ api/segment_membership/services.py | 434 +++++++++++ api/segment_membership/signals.py | 132 ++++ api/segment_membership/tasks.py | 71 ++ api/tests/unit/segment_membership/__init__.py | 0 .../test_unit_segment_membership_smoke.py | 363 +++++++++ .../observability/_events-catalogue.md | 37 + 24 files changed, 2554 insertions(+), 1 deletion(-) create mode 100644 api/environments/identities/signals.py create mode 100644 api/segment_membership/__init__.py create mode 100644 api/segment_membership/apps.py create mode 100644 api/segment_membership/constants.py create mode 100644 api/segment_membership/dataclasses.py create mode 100644 api/segment_membership/management/__init__.py create mode 100644 api/segment_membership/management/commands/__init__.py create mode 100644 api/segment_membership/management/commands/backfill_segment_membership.py create mode 100644 api/segment_membership/management/commands/stress_test_segment_membership.py create mode 100644 api/segment_membership/management/commands/verify_segment_membership.py create mode 100644 api/segment_membership/mappers.py create mode 100644 api/segment_membership/migrations/0001_add_segment_membership_index.py create mode 100644 api/segment_membership/migrations/__init__.py create mode 100644 api/segment_membership/models.py create mode 100644 api/segment_membership/services.py create mode 100644 api/segment_membership/signals.py create mode 100644 api/segment_membership/tasks.py create mode 100644 api/tests/unit/segment_membership/__init__.py create mode 100644 api/tests/unit/segment_membership/test_unit_segment_membership_smoke.py diff --git a/api/app/settings/common.py b/api/app/settings/common.py index 634c27cf9bd5..03f3b3585087 100644 --- a/api/app/settings/common.py +++ b/api/app/settings/common.py @@ -118,6 +118,7 @@ "features.workflows.core", "features.release_pipelines.core", "segments", + "segment_membership", "app", "e2etests", "simple_history", diff --git a/api/environments/identities/models.py b/api/environments/identities/models.py index fd931e79b232..8c5f3fd64892 100644 --- a/api/environments/identities/models.py +++ b/api/environments/identities/models.py @@ -5,6 +5,7 @@ from flag_engine.engine import get_evaluation_result from environments.identities.managers import IdentityManager +from environments.identities.signals import traits_changed from environments.identities.traits.models import Trait from environments.models import Environment from environments.sdk.types import SDKTraitData @@ -211,6 +212,12 @@ def generate_traits( if persist: Trait.objects.bulk_create(trait_models_to_persist) + if trait_models_to_persist: + traits_changed.send( + sender=type(self), + instance=self, + changed_keys={t.trait_key for t in trait_models_to_persist}, + ) return trait_models @@ -290,6 +297,18 @@ def update_traits( # See: https://github.com/Flagsmith/flagsmith/issues/370 Trait.objects.bulk_create(new_traits, ignore_conflicts=True) + changed_keys = ( + keys_to_delete + | {t.trait_key for t in updated_traits} + | {t.trait_key for t in new_traits} + ) + if changed_keys: + traits_changed.send( + sender=type(self), + instance=self, + changed_keys=changed_keys, + ) + # return the full list of traits for this identity # override persisted traits by transient traits in case of key collisions return [ diff --git a/api/environments/identities/signals.py b/api/environments/identities/signals.py new file mode 100644 index 000000000000..55b70bfa58c4 --- /dev/null +++ b/api/environments/identities/signals.py @@ -0,0 +1,18 @@ +"""Signals emitted from the identity write paths. + +`traits_changed` fires after the bulk trait write paths +(`Identity.update_traits`, `Identity.generate_traits(persist=True)`) complete. +This is necessary because Django's `post_save` is not emitted by +`bulk_create` / `bulk_update`, so any consumer that needs to react to trait +changes from the SDK ingestion path has to subscribe to this signal instead. + +Provides: + + * `sender` — the `Identity` model class. + * `instance` — the identity whose traits changed. + * `changed_keys` — set[str] of trait keys created, updated, or deleted. +""" + +from django.dispatch import Signal + +traits_changed = Signal() diff --git a/api/poetry.lock b/api/poetry.lock index a10b9de29e30..09e9575ac031 100644 --- a/api/poetry.lock +++ b/api/poetry.lock @@ -4233,6 +4233,101 @@ files = [ {file = "pyrepl-0.9.0.tar.gz", hash = "sha256:292570f34b5502e871bbb966d639474f2b57fbfcd3373c2d6a2f3d56e681a775"}, ] +[[package]] +name = "pyroaring" +version = "1.1.0" +description = "Library for handling efficiently sorted integer sets." +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "pyroaring-1.1.0-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:19c9bb8b32f17fe63e54e78f88a7a67fe2dca352bf79d7d3b43f0a901db5ea3b"}, + {file = "pyroaring-1.1.0-cp310-cp310-macosx_14_0_universal2.whl", hash = "sha256:212f471da113eba24a0fedbd1ef078bfed34b792e8f48dba3f7301f1bca2678a"}, + {file = "pyroaring-1.1.0-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:a31276323fdd355505883162f1dca4d7acc3d6ec2159c85ddef863f1fe9e8f05"}, + {file = "pyroaring-1.1.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:55c9d0b708a65308eaa95c53d9de56a2b4425812395f6e57a4d9ad8efb081bf9"}, + {file = "pyroaring-1.1.0-cp310-cp310-manylinux_2_24_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:5599b691e5e993f7fdd9880429d1c485eae03dc392ba5b2dbe2eef0434bea1a5"}, + {file = "pyroaring-1.1.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8376c62e60a49e8fe51ffce569f74beed6e53fb58e5d6739f954c8c51ec0bec3"}, + {file = "pyroaring-1.1.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:cc998e81748b72659802a7b434b39292308a5f4ab8dd4bf4739d65de1d4ed2bf"}, + {file = "pyroaring-1.1.0-cp310-cp310-musllinux_1_2_armv7l.whl", hash = "sha256:fe0b9f0baf190663df37571cbb8a77370e7bd3a66068276d48f813c53ceba530"}, + {file = "pyroaring-1.1.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:83815b3a4cb9c3b17096c70833373610d5c93e89c134548cf43436c90326bfb3"}, + {file = "pyroaring-1.1.0-cp310-cp310-win32.whl", hash = "sha256:e9caf67c0c05d27bab0d88cf193e73023f624c6714f1ff807c7d4b135b19d36a"}, + {file = "pyroaring-1.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:a6343249ce9401dd90c3b934fd9a4a618b6fc455f27a9aa11d198eebd4743137"}, + {file = "pyroaring-1.1.0-cp310-cp310-win_arm64.whl", hash = "sha256:8f5194c0544b08e56e108dad096c468c67f8ac60eec42763aed36ccbd3565346"}, + {file = "pyroaring-1.1.0-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:462b0952277d3100a90ae890ae641d3fb3561b10cfea542e02468f0bef7700a5"}, + {file = "pyroaring-1.1.0-cp311-cp311-macosx_14_0_universal2.whl", hash = "sha256:9cf8608c9d6cb6bff9c624744f7a2ba8ab12276f097a07930490fe0e2219e9be"}, + {file = "pyroaring-1.1.0-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:44a9f9719ed18e86627286f90057f9b7e22f6ba1d952c9793f9600b5c14e8680"}, + {file = "pyroaring-1.1.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c14fc6bd65e5624f76b90297b081222261476978f795f60d48745553617ddceb"}, + {file = "pyroaring-1.1.0-cp311-cp311-manylinux_2_24_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:01212da3752d6486adcca98c9d353f8fb8e36513e05062cdd0feebc4211dbe70"}, + {file = "pyroaring-1.1.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:100585f438b293112e2c52e45a442835837c8a0267dd1e513bafec35628f8ecb"}, + {file = "pyroaring-1.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:532e53191d8dd29dedfc5202cbb45632f7df751b207a7f6d6860fb7067c7fe11"}, + {file = "pyroaring-1.1.0-cp311-cp311-musllinux_1_2_armv7l.whl", hash = "sha256:731a7a9e050758986d5757eea10f9ccb08f9c3ef514ce0335f4a90e126f81131"}, + {file = "pyroaring-1.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:e9864e19109e76111befc75d799d334e7365eb4189607aa734053c12e7840fa5"}, + {file = "pyroaring-1.1.0-cp311-cp311-win32.whl", hash = "sha256:7caf95de39ce869ea0978068521cf6faa7350574fd1734ad6c63e5ed8cd06baa"}, + {file = "pyroaring-1.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:a23cd023985b5f2ba23e84e1fadaeacde3c8a59e1d2adb3fe782e99db1e22387"}, + {file = "pyroaring-1.1.0-cp311-cp311-win_arm64.whl", hash = "sha256:a98d1147fe1d3195053b67b474bccc0be5021506765d27f613a943c8c99f9e4c"}, + {file = "pyroaring-1.1.0-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:fdf484d26016e0c016f23f2b635d2899daec034565fdcc062ed6b10f3b26a3f4"}, + {file = "pyroaring-1.1.0-cp312-cp312-macosx_14_0_universal2.whl", hash = "sha256:e9c2b9aa8decdcf40ed8f4c887092c20a272f8c32215c3fee65e9db92ecf418e"}, + {file = "pyroaring-1.1.0-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:5eb031237e9d39cbdfc9276facacdd88e27aefb58940bd8b56b878dfd38d6022"}, + {file = "pyroaring-1.1.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:66aa6a321fbc598f26d5e66050a7f145c2253f3fe5737b589841ff0cbe5cb177"}, + {file = "pyroaring-1.1.0-cp312-cp312-manylinux_2_24_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:8cff06d18c9a30f8547a92757078aa345db1ba5b22e3082a05f64e50b384e27a"}, + {file = "pyroaring-1.1.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:043dbbaa905f7288c515ac06a96b67a3763f35e9ae06f0c0278c0d9964d16760"}, + {file = "pyroaring-1.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:71fec09bd42f8c33ac3b762cd00c5db842eb583ffd0e361739ce1c17ad078a6a"}, + {file = "pyroaring-1.1.0-cp312-cp312-musllinux_1_2_armv7l.whl", hash = "sha256:c9f30ca28b991a920b446ed3ee19c7ecafcc49c46db592abf89cf239a7bb45f4"}, + {file = "pyroaring-1.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:67650460c65bdd7b4f5078d9c955aa38f64627d02cb48f9cfb24eae84bca2aba"}, + {file = "pyroaring-1.1.0-cp312-cp312-win32.whl", hash = "sha256:61a8eabee99104ca197b6e7cce05dc4f27f503be52881800cd370eb5a5152d3f"}, + {file = "pyroaring-1.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:51ebe5e6f48e3dc9df91a4cb62137ef72e1469acd6f37479abd9991f6d945cc9"}, + {file = "pyroaring-1.1.0-cp312-cp312-win_arm64.whl", hash = "sha256:9882a204178cc8c915e0ce30abb4bdd1668e383c571b06649d5ed272d9625877"}, + {file = "pyroaring-1.1.0-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:72f68a16b00b35481d9b3bfe897ecd8a1f7da69efd92ba5b17347ca11c21cb0d"}, + {file = "pyroaring-1.1.0-cp313-cp313-macosx_14_0_universal2.whl", hash = "sha256:4c443e9f942b6089efe8c9b264576e9d116f90be28a315679375bba2d8a915d6"}, + {file = "pyroaring-1.1.0-cp313-cp313-macosx_14_0_x86_64.whl", hash = "sha256:3beb40eb1220d1ce4fb3661bb019e9a21857e5bb294fe8c1c5016aeb6e82318c"}, + {file = "pyroaring-1.1.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8f1f56004e8f1c1489bf279c25f1fa4764252cd9af5fb35675774268a4a615ba"}, + {file = "pyroaring-1.1.0-cp313-cp313-manylinux_2_24_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:13660386ea8905ee4d42c21a6275463e2dc7d31e0b5d65eec210aa7043ad96f4"}, + {file = "pyroaring-1.1.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0dfb6cf50fd8898179e460e699a6b8326ca508c627d083f7bf62f769fe1717d5"}, + {file = "pyroaring-1.1.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:81ebbc0c880c8a10f13118632e5c0d59159ceada8b651bba18f2e6dc70efdeda"}, + {file = "pyroaring-1.1.0-cp313-cp313-musllinux_1_2_armv7l.whl", hash = "sha256:370d191b0d1b32bbd99452ef5f0485f22fcc4bf7404d33b821d0ce2459951152"}, + {file = "pyroaring-1.1.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:e8b3bfad0ae3ef0e67b40c193863dce8b7d79de545dadbe53c19acc3ace38f66"}, + {file = "pyroaring-1.1.0-cp313-cp313-win32.whl", hash = "sha256:eead129046822cb0fd47c78740b81bdaffd0515c0bb0306a2318acf0f0540b58"}, + {file = "pyroaring-1.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:90ab2f00c09eed5bd986a80c8641e2dc10e7aca1a2d892d89a44b396e39c08ea"}, + {file = "pyroaring-1.1.0-cp313-cp313-win_arm64.whl", hash = "sha256:51dd2490a64ad4ed53c4fb58ef1ee3f84f6cbd97cdb47abd9065c9f714ab72ef"}, + {file = "pyroaring-1.1.0-cp314-cp314-macosx_14_0_arm64.whl", hash = "sha256:5e337f8c5b3c2e0c27da83fc2cb702684a47eee907a960cfee964fcb5344515b"}, + {file = "pyroaring-1.1.0-cp314-cp314-macosx_14_0_universal2.whl", hash = "sha256:53acecba8f898e96b84d4139356e30719c70358177e270055901d3ec1cb0e34c"}, + {file = "pyroaring-1.1.0-cp314-cp314-macosx_14_0_x86_64.whl", hash = "sha256:986efb3aec7655d69c14db2309a2072dbf181bdb906091fede83ad18e316cdaf"}, + {file = "pyroaring-1.1.0-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:92643c9dd303de8960c3dbed93a28b8d87da5ed0a7776568979f379d7bc8a885"}, + {file = "pyroaring-1.1.0-cp314-cp314-manylinux_2_24_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:6a1d4c59d5b23c01d62f86d57ceefd0c0977de0425aafa7069f2d70563fed3b8"}, + {file = "pyroaring-1.1.0-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b8ac1bc26223befbca986551521f37f4c1670dfe26fccb2f0fc2775e75be99c1"}, + {file = "pyroaring-1.1.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:b490f2d22df30affbfdcbe4f7896f321edb72a8dc0cbe5f38adec3de5b947c25"}, + {file = "pyroaring-1.1.0-cp314-cp314-musllinux_1_2_armv7l.whl", hash = "sha256:56a67794188275f8897a8f1fa64d6313c48241bebbdef38833063e7281b29ef8"}, + {file = "pyroaring-1.1.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:9d9f196007f0b15ea19c21732faacaea83cbf5946b6db4949b3b98cf871c93f0"}, + {file = "pyroaring-1.1.0-cp314-cp314-win32.whl", hash = "sha256:abc0f0ce22464864fea208315d25e999e45cb5ee646ac1ca11d314a6a51dbe4a"}, + {file = "pyroaring-1.1.0-cp314-cp314-win_amd64.whl", hash = "sha256:532ae6bb1d3431d9956ef07589dd5c8dd918301a83d937c7dc6e511b1364d76a"}, + {file = "pyroaring-1.1.0-cp314-cp314-win_arm64.whl", hash = "sha256:d2706a89242a347be20805147d58a38f4f4d8f6846228c4ee8dfd3587113719c"}, + {file = "pyroaring-1.1.0-cp314-cp314t-macosx_14_0_arm64.whl", hash = "sha256:39eff7dd06c163c22d0a9f9fd72d27e671457bea8cdb71215382a10512539e1d"}, + {file = "pyroaring-1.1.0-cp314-cp314t-macosx_14_0_universal2.whl", hash = "sha256:562fa04bbfd41144d1276ed79505007557c161371450d68a1d71fc83dc01d083"}, + {file = "pyroaring-1.1.0-cp314-cp314t-macosx_14_0_x86_64.whl", hash = "sha256:591e2ed4d60443dafd9075c1f72e9aaf359ccf5120e32a8c340c2b2ae3da45e7"}, + {file = "pyroaring-1.1.0-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:381eda673442c389993f8b0db2dbf5d02ea8ea9aac6ba736f64cc1ffb6c96885"}, + {file = "pyroaring-1.1.0-cp314-cp314t-manylinux_2_24_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:d9127feb5356ba3a92bdffa04c1bf6bcbc8d436369f78badf441018c3029dd63"}, + {file = "pyroaring-1.1.0-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:650db21c10f42ff2b09ef02c10a779a3d59d0c7512552f3844738b30adbcb8a5"}, + {file = "pyroaring-1.1.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:8a5fbcb86e44f1c0c9c052917eee67a04cbac9de7392fb4bc77c140ff4a7e471"}, + {file = "pyroaring-1.1.0-cp314-cp314t-musllinux_1_2_armv7l.whl", hash = "sha256:0f1d76ef29034017eb2cceebd5fa0504d6ced218ce6432f99da5adecbe038269"}, + {file = "pyroaring-1.1.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:19d0c81c865d63791fe20e5b38733b66f4f962e677ae7e8b3d3c4947ac6e752f"}, + {file = "pyroaring-1.1.0-cp314-cp314t-win32.whl", hash = "sha256:1fc112b9a9890f89cc645a16604783ed7fa25299f149b0ef7b45a5e2e3c1f31f"}, + {file = "pyroaring-1.1.0-cp314-cp314t-win_amd64.whl", hash = "sha256:d92a0f4c7e6bb7deeafac68c79c92ef9340895fe825cf1a31078443753ab6756"}, + {file = "pyroaring-1.1.0-cp314-cp314t-win_arm64.whl", hash = "sha256:99c42fe1449acfbf130da65e66b4d5b2726aba4497be359bae7672e38a15fc62"}, + {file = "pyroaring-1.1.0-cp39-cp39-macosx_14_0_arm64.whl", hash = "sha256:c967ddb5e137c604ec94c7120d2f420ac9a9ce27b0e44987c7134b0eb5fe7a60"}, + {file = "pyroaring-1.1.0-cp39-cp39-macosx_14_0_universal2.whl", hash = "sha256:4ae81a0353af50d6885720b9119783b8be7bcd079e727ea1c4fca3f452fe95ba"}, + {file = "pyroaring-1.1.0-cp39-cp39-macosx_14_0_x86_64.whl", hash = "sha256:f742ebdd879063d2a92f4d57708886c720e854bc84ca240733f220936be0cb7e"}, + {file = "pyroaring-1.1.0-cp39-cp39-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0183b26c6e25a14e32b2aed7095a19491d6c835096032b33bc00b4ae995495bf"}, + {file = "pyroaring-1.1.0-cp39-cp39-manylinux_2_24_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:cc2d113bf6236aa873dc710b38a856808c6054efd3a1a403c5acfceaf87138f1"}, + {file = "pyroaring-1.1.0-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:499f0a5d03d64e7b846ee749e87721fb8516da7233baafb24ab69aa63bcf080b"}, + {file = "pyroaring-1.1.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:974fb6fa4c4a682c633c4d7fa6087ccd717804726f9db1f771f1c2ea4f3990b7"}, + {file = "pyroaring-1.1.0-cp39-cp39-musllinux_1_2_armv7l.whl", hash = "sha256:25d65257258f7a3df8b872ddfe18f3681f20b3bc2a3093f08fc80c66338051f4"}, + {file = "pyroaring-1.1.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:9dc61b7e38aca2ef384ac22b87a9c2b1e8769a1e15d916f93ea6a9a4551fc1d7"}, + {file = "pyroaring-1.1.0-cp39-cp39-win32.whl", hash = "sha256:cdf2b8ab0effc335a9e61cf2a1e14c2295b999a656569b35f35ea3dc55a8931f"}, + {file = "pyroaring-1.1.0-cp39-cp39-win_amd64.whl", hash = "sha256:4a2c7ecd74e7b3ded54ad771a2b7c0aa08aa98f78e43500fa970d68d684bef0c"}, + {file = "pyroaring-1.1.0-cp39-cp39-win_arm64.whl", hash = "sha256:3d04c3eb039cf2295e4d375ef81fd6e3cae7cefe3b5c457ccaf78d7d6f963a5d"}, + {file = "pyroaring-1.1.0.tar.gz", hash = "sha256:f02e4021397ae02a139defdc6813b9942ab163de90affddd4ce4efbac299f619"}, +] + [[package]] name = "pysaml2" version = "7.4.2" @@ -6022,4 +6117,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">3.11,<3.14" -content-hash = "3887d5db2a2c09afcc53b6bdda606a15226230b49c0e1712346099b7a3a71d52" +content-hash = "70c98c96a6c29cd3902154601c6c087da3f02fe9e370856f865671c7d684657f" diff --git a/api/pyproject.toml b/api/pyproject.toml index 91a41d6d1a2e..6b77b4d5c74c 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -190,6 +190,7 @@ structlog = "^24.4.0" prometheus-client = "^0.21.1" django_cockroachdb = "~4.2" django-oauth-toolkit = "^3.0.1" +pyroaring = ">=1.0" [tool.poetry.group.auth-controller] optional = true diff --git a/api/segment_membership/__init__.py b/api/segment_membership/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/api/segment_membership/apps.py b/api/segment_membership/apps.py new file mode 100644 index 000000000000..e58be1c6c6c2 --- /dev/null +++ b/api/segment_membership/apps.py @@ -0,0 +1,15 @@ +from core.apps import BaseAppConfig + + +class SegmentMembershipConfig(BaseAppConfig): + name = "segment_membership" + default = True + + def ready(self) -> None: + super().ready() # type: ignore[no-untyped-call] + # Import order matters — tasks register with the task_processor at + # import time, and signals enqueue those tasks. + from segment_membership import ( + signals, # noqa: F401 + tasks, # noqa: F401 + ) diff --git a/api/segment_membership/constants.py b/api/segment_membership/constants.py new file mode 100644 index 000000000000..b54a42860c1d --- /dev/null +++ b/api/segment_membership/constants.py @@ -0,0 +1,17 @@ +"""Constants for the segment membership index.""" + +from typing import Literal + +# Atom kinds — partition by the property class. Set by +# `map_segment_condition_to_atom_kind` in `mappers.py`. +AtomKind = Literal["trait", "identifier", "identity_key", "environment_name"] + +KIND_TRAIT: AtomKind = "trait" +KIND_IDENTIFIER: AtomKind = "identifier" +KIND_IDENTITY_KEY: AtomKind = "identity_key" +KIND_ENVIRONMENT_NAME: AtomKind = "environment_name" + +# JSONPath properties recognised by the engine for non-trait context values. +PROPERTY_IDENTITY_IDENTIFIER = "$.identity.identifier" +PROPERTY_IDENTITY_KEY = "$.identity.key" +PROPERTY_ENVIRONMENT_NAME = "$.environment.name" diff --git a/api/segment_membership/dataclasses.py b/api/segment_membership/dataclasses.py new file mode 100644 index 000000000000..4019a6976dbd --- /dev/null +++ b/api/segment_membership/dataclasses.py @@ -0,0 +1,45 @@ +from dataclasses import dataclass, field +from typing import Optional, Union + +from segment_membership.constants import AtomKind + + +@dataclass(frozen=True) +class AtomKey: + """Canonical key identifying an atom within an environment.""" + + kind: AtomKind + property: str + operator: str + operand_canonical: Optional[str] + segment_key: Optional[str] + + +@dataclass(frozen=True) +class AtomNode: + key: AtomKey + negated: bool = False + + +@dataclass +class AndNode: + children: list["PredicateTree"] = field(default_factory=list) + + +@dataclass +class OrNode: + children: list["PredicateTree"] = field(default_factory=list) + + +@dataclass +class TrueNode: + """Universe — all ordinals in the env. Lets nested rules with empty + conditions short-circuit cleanly.""" + + +@dataclass +class FalseNode: + """Empty set — never a member.""" + + +PredicateTree = Union[AtomNode, AndNode, OrNode, TrueNode, FalseNode] diff --git a/api/segment_membership/management/__init__.py b/api/segment_membership/management/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/api/segment_membership/management/commands/__init__.py b/api/segment_membership/management/commands/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/api/segment_membership/management/commands/backfill_segment_membership.py b/api/segment_membership/management/commands/backfill_segment_membership.py new file mode 100644 index 000000000000..086790a0632f --- /dev/null +++ b/api/segment_membership/management/commands/backfill_segment_membership.py @@ -0,0 +1,92 @@ +"""Backfill the segment membership index for a single environment. + +Usage:: + + ./manage.py backfill_segment_membership --environment --segment + +The command extracts the atom catalogue from the segment definition(s), +allocates ordinals for any identities missing one, streams identities, and +writes Roaring bitmaps for every atom. +""" + +import time +from argparse import ArgumentParser +from typing import Any + +from django.core.management.base import BaseCommand, CommandError + +from environments.models import Environment +from segment_membership import services +from segments.models import Segment + + +class Command(BaseCommand): + help = "Backfill the segment membership index for one or more segments." + + def add_arguments(self, parser: ArgumentParser) -> None: + parser.add_argument( + "--environment", + type=int, + required=True, + help="Environment ID.", + ) + parser.add_argument( + "--segment", + type=str, + default="all", + help="Segment ID, or 'all' for every segment in the env's project.", + ) + parser.add_argument( + "--rebuild", + action="store_true", + help="Rebuild bitmaps even when already present.", + ) + + def handle(self, *args: Any, **options: Any) -> None: + env_id: int = options["environment"] + segment_arg: str = options["segment"] + rebuild: bool = options["rebuild"] + + try: + environment = Environment.objects.get(id=env_id) + except Environment.DoesNotExist as exc: + raise CommandError(f"Environment {env_id} not found") from exc + + if segment_arg == "all": + segments = list(Segment.live_objects.filter(project=environment.project)) + else: + try: + segments = [Segment.live_objects.get(id=int(segment_arg))] + except (Segment.DoesNotExist, ValueError) as exc: + raise CommandError(f"Segment {segment_arg!r} not found") from exc + + self.stdout.write( + f"Backfilling {len(segments)} segment(s) in environment {env_id}…" + ) + + total_atoms = 0 + total_set_bits = 0 + started = time.perf_counter() + + for segment in segments: + seg_started = time.perf_counter() + cardinalities = services.backfill_segment( + environment, segment, rebuild=rebuild + ) + seg_elapsed = time.perf_counter() - seg_started + atom_count = len(cardinalities) + set_bits = sum(cardinalities.values()) + total_atoms += atom_count + total_set_bits += set_bits + self.stdout.write( + f" segment {segment.id} ({segment.name!r}): " + f"{atom_count} atoms, {set_bits} set bits, {seg_elapsed:.2f}s" + ) + + elapsed = time.perf_counter() - started + self.stdout.write( + self.style.SUCCESS( + f"Done. {len(segments)} segments, {total_atoms} atoms, " + f"{total_set_bits} set bits, {elapsed:.2f}s total." + ) + ) diff --git a/api/segment_membership/management/commands/stress_test_segment_membership.py b/api/segment_membership/management/commands/stress_test_segment_membership.py new file mode 100644 index 000000000000..c74fe8697a57 --- /dev/null +++ b/api/segment_membership/management/commands/stress_test_segment_membership.py @@ -0,0 +1,737 @@ +"""Stress test the segment membership index on synthetic data. + +Seeds an isolated organisation/project/environment, bulk-creates N identities +with five traits each, defines a battery of segments covering the operator +vocabulary, then reports: + + * Seeding time. + * Backfill time and bitmap bytes per atom. + * Read latency: count, sample(100), page(0..200) — best of 5 runs. + * Write overhead: `update_traits` p50/p95 with the index disabled vs + enabled, over 200 calls. + +Usage:: + + ./manage.py stress_test_segment_membership --identities 100000 + +Re-run with `--reset` to drop and recreate the test env. Otherwise the +command reuses the previous run's data and skips seeding. +""" + +import math +import os +import random +import statistics +import time +from argparse import ArgumentParser +from typing import Any, cast + +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError +from django.db import transaction +from flag_engine.segments import constants as op + +from core.constants import FLOAT, INTEGER, STRING +from environments.identities.models import Identity +from environments.identities.traits.models import Trait +from environments.models import Environment +from environments.sdk.types import SDKTraitData +from organisations.models import Organisation +from projects.models import Project +from segment_membership import services +from segment_membership.constants import PROPERTY_IDENTITY_IDENTIFIER +from segment_membership.models import Atom, AtomBitmap +from segments.models import Condition, Segment, SegmentRule + +STRESS_ORG_NAME = "Segment membership stress org" +STRESS_PROJECT_NAME = "Segment membership stress project" +STRESS_ENV_NAME = "stress" + +COUNTRIES = ["US", "GB", "DE", "FR", "ES", "JP", "BR", "IN"] +PLANS = ["free", "pro", "enterprise"] + + +class Command(BaseCommand): + help = "Stress test the segment membership index on synthetic data." + + def add_arguments(self, parser: ArgumentParser) -> None: + parser.add_argument("--identities", type=int, default=10_000) + parser.add_argument("--seed", type=int, default=42) + parser.add_argument("--reset", action="store_true") + parser.add_argument( + "--write-overhead-iterations", + type=int, + default=200, + help="Number of update_traits calls per write-overhead phase.", + ) + parser.add_argument( + "--skip-backfill", + action="store_true", + help="Reuse existing bitmaps; skip the backfill phase.", + ) + parser.add_argument( + "--num-segments", + type=int, + default=8, + help=( + "Total number of segments to define. With <= 8 the original " + "hand-curated battery is used; above that, segments are " + "generated programmatically with diverse operator shapes." + ), + ) + + def handle(self, *args: Any, **options: Any) -> None: + n: int = options["identities"] + rng = random.Random(options["seed"]) + reset: bool = options["reset"] + write_iters: int = options["write_overhead_iterations"] + skip_backfill: bool = options["skip_backfill"] + num_segments: int = options["num_segments"] + + organisation = self._ensure_org() + project = self._ensure_project(organisation) + environment = self._ensure_env(project, reset=reset) + + existing_identities = Identity.objects.filter(environment=environment).count() + if existing_identities and existing_identities != n: + raise CommandError( + f"Environment already has {existing_identities} identities, " + f"expected {n}. Re-run with --reset." + ) + if existing_identities == 0: + self._seed(environment, n, rng) + else: + self.stdout.write( + f"Reusing existing {existing_identities} identities (skip --reset to keep)." + ) + + segments = self._ensure_segments(project, num_segments=num_segments, rng=rng) + + self._report_environment(environment) + if skip_backfill: + backfill_results = self._collect_existing_backfill_stats( + environment, segments + ) + else: + backfill_results = self._phase_backfill(environment, segments) + read_results = self._phase_read(environment, segments) + write_results = self._phase_write_overhead(environment, write_iters, rng) + bitmap_total = self._bitmap_bytes(environment) + + self._print_summary( + n=n, + backfill=backfill_results, + reads=read_results, + write_results=write_results, + bitmap_total_bytes=bitmap_total, + ) + + # ------------------------------------------------------------------ env + + def _ensure_org(self) -> Organisation: + org: Organisation + org, _ = Organisation.objects.get_or_create(name=STRESS_ORG_NAME) + return org + + def _ensure_project(self, organisation: Organisation) -> Project: + project: Project + project, _ = Project.objects.get_or_create( + name=STRESS_PROJECT_NAME, + organisation=organisation, + ) + return project + + def _ensure_env(self, project: Project, *, reset: bool) -> Environment: + existing = Environment.objects.filter( + project=project, name=STRESS_ENV_NAME + ).first() + if existing and reset: + self.stdout.write("Resetting stress environment…") + with transaction.atomic(): + AtomBitmap.objects.filter(atom__environment=existing).delete() + Atom.objects.filter(environment=existing).delete() + # Segments are project-scoped, so wipe them too. + Segment.objects.filter(project=project).delete() + # Cascade deletes traits and identities. + existing.delete() + existing = None + if existing is None: + existing = Environment.objects.create(project=project, name=STRESS_ENV_NAME) + return cast(Environment, existing) + + def _ensure_segments( + self, + project: Project, + *, + num_segments: int, + rng: random.Random, + ) -> list[Segment]: + existing = list(Segment.live_objects.filter(project=project)) + if existing: + return existing + if num_segments <= 8: + return self._curated_segments(project) + return self._generate_segments(project, n=num_segments, rng=rng) + + def _curated_segments(self, project: Project) -> list[Segment]: + return [ + self._make_segment(project, "country=US", [("country", op.EQUAL, "US")]), + self._make_segment( + project, "plan in pro,enterprise", [("plan", op.IN, "pro,enterprise")] + ), + self._make_segment( + project, "age >= 30", [("age", op.GREATER_THAN_INCLUSIVE, "30")] + ), + self._make_segment(project, "email is_set", [("email", op.IS_SET, "")]), + self._make_segment( + project, "score modulo 10|0", [("score", op.MODULO, "10|0")] + ), + self._make_segment( + project, + "country US AND plan pro", + [("country", op.EQUAL, "US"), ("plan", op.EQUAL, "pro")], + ), + self._make_segment( + project, + "identifier % split 50", + [(PROPERTY_IDENTITY_IDENTIFIER, op.PERCENTAGE_SPLIT, "50")], + ), + self._make_segment( + project, "email regex", [("email", op.REGEX, r"^u\d+@example\.com$")] + ), + ] + + def _generate_segments( + self, + project: Project, + *, + n: int, + rng: random.Random, + ) -> list[Segment]: + """Generate `n` segments with a realistic mix of operator shapes. + + The mix is calibrated so that atom dedup mirrors what we expect in + production: country/plan/age atoms collapse heavily, % Split and + regex atoms grow ~linearly with segment count (segment-salted or + unique-operand). + """ + segments: list[Segment] = [] + countries = COUNTRIES + plans = PLANS + for i in range(n): + bucket = i % 7 + if bucket == 0: + country = countries[i % len(countries)] + segments.append( + self._make_segment( + project, + f"country={country} ({i})", + [("country", op.EQUAL, country)], + ) + ) + elif bucket == 1: + pair = rng.sample(plans, k=2) + segments.append( + self._make_segment( + project, + f"plan IN {pair} ({i})", + [("plan", op.IN, ",".join(pair))], + ) + ) + elif bucket == 2: + age = 18 + (i % 70) + segments.append( + self._make_segment( + project, + f"age >= {age} ({i})", + [("age", op.GREATER_THAN_INCLUSIVE, str(age))], + ) + ) + elif bucket == 3: + pct = (i * 7) % 99 + 1 + segments.append( + self._make_segment( + project, + f"identifier % split {pct} ({i})", + [(PROPERTY_IDENTITY_IDENTIFIER, op.PERCENTAGE_SPLIT, str(pct))], + ) + ) + elif bucket == 4: + # Unique regex per segment. + pattern = rf"^u{(i * 13) % 1000}\d*@example\.com$" + segments.append( + self._make_segment( + project, + f"email regex {i}", + [("email", op.REGEX, pattern)], + ) + ) + elif bucket == 5: + country = countries[i % len(countries)] + plan = plans[i % len(plans)] + segments.append( + self._make_segment( + project, + f"country={country} AND plan={plan} ({i})", + [ + ("country", op.EQUAL, country), + ("plan", op.EQUAL, plan), + ], + ) + ) + else: + divisor = (i % 7) + 2 + segments.append( + self._make_segment( + project, + f"score % {divisor}=0 ({i})", + [("score", op.MODULO, f"{divisor}|0")], + ) + ) + return segments + + def _make_segment( + self, + project: Project, + name: str, + conditions: list[tuple[str | None, str, str]], + ) -> Segment: + segment: Segment = Segment.objects.create(name=name, project=project) + rule = SegmentRule.objects.create(segment=segment, type=SegmentRule.ALL_RULE) + for prop, oper, value in conditions: + Condition.objects.create( + rule=rule, property=prop, operator=oper, value=value + ) + return segment + + # ----------------------------------------------------------------- seed + + def _seed(self, environment: Environment, n: int, rng: random.Random) -> None: + self.stdout.write(f"Seeding {n} identities + 5 traits each…") + started = time.perf_counter() + chunk = 5_000 + for offset in range(0, n, chunk): + batch_size = min(chunk, n - offset) + identities = [ + Identity( + identifier=f"u{offset + i:08d}", + environment=environment, + ) + for i in range(batch_size) + ] + # Postgres populates `id` on the returned objects, no extra SELECT needed. + created = Identity.objects.bulk_create(identities) + traits: list[Trait] = [] + for identity in created: + identity_id = identity.id + idx = int(identity.identifier[1:]) + traits.append( + Trait( + identity_id=identity_id, + trait_key="country", + value_type=STRING, + string_value=COUNTRIES[idx % len(COUNTRIES)], + ) + ) + traits.append( + Trait( + identity_id=identity_id, + trait_key="plan", + value_type=STRING, + string_value=PLANS[idx % len(PLANS)], + ) + ) + traits.append( + Trait( + identity_id=identity_id, + trait_key="age", + value_type=INTEGER, + integer_value=18 + (idx % 70), + ) + ) + traits.append( + Trait( + identity_id=identity_id, + trait_key="score", + value_type=FLOAT, + float_value=round(rng.uniform(0, 100), 2), + ) + ) + if idx % 4 == 0: + traits.append( + Trait( + identity_id=identity_id, + trait_key="email", + value_type=STRING, + string_value=f"u{idx}@example.com", + ) + ) + Trait.objects.bulk_create(traits, batch_size=2_000) + self.stdout.write(f" {min(offset + batch_size, n)}/{n} identities seeded") + elapsed = time.perf_counter() - started + self.stdout.write( + self.style.SUCCESS(f"Seeded in {elapsed:.1f}s ({n / elapsed:,.0f} ids/s).") + ) + + # ----------------------------------------------------------- diagnostics + + def _report_environment(self, environment: Environment) -> None: + identities = Identity.objects.filter(environment=environment).count() + traits = Trait.objects.filter(identity__environment=environment).count() + segments = Segment.live_objects.filter(project=environment.project).count() + atoms = Atom.objects.filter(environment=environment).count() + self.stdout.write( + f"Environment {environment.id}: {identities} identities, " + f"{traits} traits, {segments} segments, {atoms} atoms." + ) + + # ----------------------------------------------------------- phases + + def _phase_backfill( + self, + environment: Environment, + segments: list[Segment], + ) -> dict[str, dict[str, float | int]]: + # Reset bitmaps so we always measure full backfill cost. + AtomBitmap.objects.filter(atom__environment=environment).delete() + + # Single-pass backfill across all segments. + started = time.perf_counter() + cardinalities = services.backfill_segments(environment, segments, rebuild=True) + elapsed_total = time.perf_counter() - started + self.stdout.write( + f" backfill[ALL {len(segments)} segments, single pass]: " + f"{elapsed_total:.2f}s, {len(cardinalities)} atoms, " + f"{sum(cardinalities.values())} set bits" + ) + + results: dict[str, dict[str, float | int]] = { + "__single_pass__": { + "elapsed_s": elapsed_total, + "atoms": len(cardinalities), + "set_bits": sum(cardinalities.values()), + } + } + for segment in segments: + seg_card = services.count(segment, environment) + results[segment.name] = { + "elapsed_s": elapsed_total / max(len(segments), 1), + "atoms": 0, + "set_bits": seg_card, + } + self.stdout.write(f" {segment.name!r}: members={seg_card:,}") + return results + + def _collect_existing_backfill_stats( + self, + environment: Environment, + segments: list[Segment], + ) -> dict[str, dict[str, float | int]]: + """Skip the backfill phase but still report per-segment cardinalities.""" + results: dict[str, dict[str, float | int]] = { + "__single_pass__": { + "elapsed_s": 0.0, + "atoms": Atom.objects.filter(environment=environment).count(), + "set_bits": 0, + } + } + for segment in segments: + seg_card = services.count(segment, environment) + results[segment.name] = { + "elapsed_s": 0.0, + "atoms": 0, + "set_bits": seg_card, + } + return results + + def _phase_read( + self, + environment: Environment, + segments: list[Segment], + ) -> dict[str, dict[str, float]]: + results: dict[str, dict[str, float]] = {} + verbose = len(segments) <= 16 + for segment in segments: + count_ms = self._best_of(lambda: services.count(segment, environment), 5) + sample_ms = self._best_of( + lambda: services.sample(segment, environment, 100), 5 + ) + page_ms = self._best_of( + lambda: services.iter_members( + segment, environment, cursor=0, limit=200 + ), + 5, + ) + results[segment.name] = { + "count_ms": count_ms, + "sample100_ms": sample_ms, + "page200_ms": page_ms, + } + if verbose: + self.stdout.write( + f" read[{segment.name!r}]: count={count_ms:.1f}ms, " + f"sample100={sample_ms:.1f}ms, page200={page_ms:.1f}ms" + ) + return results + + def _phase_write_overhead( + self, + environment: Environment, + iterations: int, + rng: random.Random, + ) -> dict[str, dict[str, float]]: + from task_processor.models import Task + from task_processor.task_registry import get_task + from task_processor.task_run_method import TaskRunMethod + + identity_ids = list( + Identity.objects.filter(environment=environment).values_list( + "id", flat=True + ) + ) + sample_ids = rng.sample(identity_ids, k=min(iterations, len(identity_ids))) + # Halve the workload across phases so each phase touches a disjoint + # slice. Avoids cross-phase interference (e.g. threads from a prior + # phase still running while the next phase times its calls). + phase_size = max(1, len(sample_ids) // 4) + phase_slices = [ + sample_ids[i * phase_size : (i + 1) * phase_size] for i in range(4) + ] + + prev_run_method = settings.TASK_RUN_METHOD + prev_enabled = getattr(settings, "SEGMENT_MEMBERSHIP_ENABLED", False) + + results: dict[str, dict[str, float]] = {} + + try: + # Phase A: index disabled (signal handlers early-return). Baseline + # cost of update_traits with no segment-membership work. + settings.SEGMENT_MEMBERSHIP_ENABLED = False + Task.objects.all().delete() + results["disabled"] = self._summarise( + self._time_update_traits(phase_slices[0], rng) + ) + + # Phase B–D: index enabled, vary TASK_RUN_METHOD. + settings.SEGMENT_MEMBERSHIP_ENABLED = True + + settings.TASK_RUN_METHOD = TaskRunMethod.SYNCHRONOUSLY + Task.objects.all().delete() + results["sync"] = self._summarise( + self._time_update_traits(phase_slices[1], rng) + ) + + settings.TASK_RUN_METHOD = TaskRunMethod.SEPARATE_THREAD + Task.objects.all().delete() + results["thread"] = self._summarise( + self._time_update_traits(phase_slices[2], rng) + ) + # Let detached threads finish before measuring next phase. + time.sleep(2.0) + + # Phase D: TASK_PROCESSOR mode — caller cost is one DB INSERT per + # delay() call. Then drain the queue and measure throughput. + Task.objects.all().delete() + settings.TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR + timings_processor = self._time_update_traits(phase_slices[3], rng) + results["processor_enqueue"] = self._summarise(timings_processor) + + queued = Task.objects.filter(completed=False).count() + # We can't use task_processor.processor.run_tasks here because its + # metrics objects are only registered when TASK_PROCESSOR_MODE was + # truthy at import time. Drain by invoking the registered handlers + # directly — same effect, no metrics dependency. + drain_started = time.perf_counter() + drained = 0 + queued_tasks = Task.objects.filter(completed=False).order_by( + "scheduled_for" + ) + for task_row in queued_tasks.iterator(chunk_size=500): + registered = get_task(task_row.task_identifier) + registered.task_function(*task_row.args, **task_row.kwargs) + drained += 1 + Task.objects.filter(completed=False).update(completed=True) + drain_elapsed = time.perf_counter() - drain_started + results["processor_drain"] = { + "queued": float(queued), + "drained": float(drained), + "elapsed_s": drain_elapsed, + "per_task_ms": ( + (drain_elapsed / drained) * 1000 if drained else float("nan") + ), + } + finally: + settings.TASK_RUN_METHOD = prev_run_method + settings.SEGMENT_MEMBERSHIP_ENABLED = prev_enabled + + return results + + def _time_update_traits( + self, + identity_ids: list[int], + rng: random.Random, + ) -> list[float]: + timings_ms: list[float] = [] + # Salt by current process PID + monotonic nanos so values never + # collide with prior runs — otherwise update_traits short-circuits + # the "value unchanged" branch and the maintenance signal silently + # never fires. + run_salt = f"{os.getpid()}-{time.time_ns()}" + for i, ident_id in enumerate(identity_ids): + new_country = f"COUNTRY_{run_salt}_{rng.randint(0, 1_000_000_000)}_{i}" + identity = Identity.objects.get(id=ident_id) + started = time.perf_counter() + identity.update_traits( + [ + cast( + SDKTraitData, + {"trait_key": "country", "trait_value": new_country}, + ) + ] + ) + timings_ms.append((time.perf_counter() - started) * 1000) + return timings_ms + + # ------------------------------------------------------------- helpers + + def _best_of(self, fn: Any, repeats: int) -> float: + timings: list[float] = [] + for _ in range(repeats): + started = time.perf_counter() + fn() + timings.append((time.perf_counter() - started) * 1000) + return min(timings) + + def _summarise(self, timings_ms: list[float]) -> dict[str, float]: + if not timings_ms: + return {"p50": math.nan, "p95": math.nan, "mean": math.nan} + sorted_t = sorted(timings_ms) + return { + "p50": statistics.median(sorted_t), + "p95": sorted_t[max(0, int(len(sorted_t) * 0.95) - 1)], + "mean": statistics.fmean(sorted_t), + } + + def _bitmap_bytes(self, environment: Environment) -> int: + rows = ( + AtomBitmap.objects.filter(atom__environment=environment) + .extra(select={"size": "octet_length(blob)"}) + .values_list("size", flat=True) + ) + return sum(rows) + + # -------------------------------------------------------------- output + + def _print_summary( + self, + *, + n: int, + backfill: dict[str, dict[str, float | int]], + reads: dict[str, dict[str, float]], + write_results: dict[str, dict[str, float]], + bitmap_total_bytes: int, + ) -> None: + self.stdout.write("") + self.stdout.write(self.style.SUCCESS("=== STRESS TEST SUMMARY ===")) + self.stdout.write(f"Identities: {n:,}") + self.stdout.write(f"Bitmap bytes total: {bitmap_total_bytes:,}") + self.stdout.write("") + self._print_backfill_summary(backfill) + self.stdout.write("") + self._print_read_summary(reads) + self.stdout.write("") + self._print_write_summary(write_results) + drain = write_results.get("processor_drain") + if drain is not None: + self.stdout.write("") + self.stdout.write("Task processor backlog drain:") + self.stdout.write( + f" enqueued={int(drain['queued'])}, drained={int(drain['drained'])}, " + f"elapsed={drain['elapsed_s']:.2f}s, " + f"per-task={drain['per_task_ms']:.2f}ms" + ) + + def _print_backfill_summary( + self, backfill: dict[str, dict[str, float | int]] + ) -> None: + single = backfill.pop("__single_pass__", None) + if single is not None: + self.stdout.write( + f"Backfill (single pass over identities, all segments): " + f"{single['elapsed_s']:.2f}s, " + f"{single['atoms']} atoms, " + f"{single['set_bits']:,} set bits" + ) + self.stdout.write("Per-segment membership counts:") + self.stdout.write(f" {'segment':<32} {'members':>10}") + for name, row in backfill.items(): + self.stdout.write(f" {name:<32} {row['set_bits']:>10,}") + + def _print_read_summary(self, reads: dict[str, dict[str, float]]) -> None: + self.stdout.write("Read latency (best of 5, ms):") + if len(reads) <= 16: + self._print_read_per_segment(reads) + else: + self._print_read_aggregated(reads) + + def _print_read_per_segment(self, reads: dict[str, dict[str, float]]) -> None: + self.stdout.write( + f" {'segment':<32} {'count':>8} {'sample100':>10} {'page200':>10}" + ) + for name, row in reads.items(): + self.stdout.write( + f" {name:<48} {row['count_ms']:>8.1f} " + f"{row['sample100_ms']:>10.1f} {row['page200_ms']:>10.1f}" + ) + + def _print_read_aggregated(self, reads: dict[str, dict[str, float]]) -> None: + counts = sorted(r["count_ms"] for r in reads.values()) + samples = sorted(r["sample100_ms"] for r in reads.values()) + pages = sorted(r["page200_ms"] for r in reads.values()) + + def pct(s: list[float], q: float) -> float: + return s[max(0, int(len(s) * q) - 1)] + + self.stdout.write( + f" Aggregated across {len(reads)} segments — best of 5 each:" + ) + self.stdout.write( + f" {'metric':<14} {'min':>8} {'p50':>8} {'p95':>8} {'max':>8}" + ) + for label, series in ( + ("count", counts), + ("sample100", samples), + ("page200", pages), + ): + self.stdout.write( + f" {label:<14} {min(series):>8.1f} {pct(series, 0.50):>8.1f} " + f"{pct(series, 0.95):>8.1f} {max(series):>8.1f}" + ) + + def _print_write_summary(self, write_results: dict[str, dict[str, float]]) -> None: + self.stdout.write("update_traits write overhead (ms, per call from caller):") + self.stdout.write(f" {'phase':<22} {'p50':>8} {'p95':>8} {'mean':>8}") + ordered = [ + ("disabled", "index disabled"), + ("sync", "SYNCHRONOUSLY"), + ("thread", "SEPARATE_THREAD"), + ("processor_enqueue", "TASK_PROCESSOR (enq)"), + ] + baseline = write_results["disabled"]["p50"] + for key, label in ordered: + row = write_results[key] + self.stdout.write( + f" {label:<22} {row['p50']:>8.2f} {row['p95']:>8.2f} " + f"{row['mean']:>8.2f}" + ) + for key, label in ordered: + if key == "disabled": + continue + delta = write_results[key]["p50"] - baseline + self.stdout.write( + f" {('Δ vs disabled (' + label + ')'):<32} {delta:>+8.2f} ms p50" + ) + + +# pyflakes: silence unused-import warning for the os import (kept for environment debugging). +_ = os diff --git a/api/segment_membership/management/commands/verify_segment_membership.py b/api/segment_membership/management/commands/verify_segment_membership.py new file mode 100644 index 000000000000..8a26a3b830df --- /dev/null +++ b/api/segment_membership/management/commands/verify_segment_membership.py @@ -0,0 +1,143 @@ +"""Differentially verify the segment membership index against the live engine. + +For each identity (or a random sample), compare: + + * `engine.is_context_in_segment(context, segment_context)` — the source of truth. + * `identity.ord ∈ B_S` — the index's answer. + +Any mismatch is a PoC failure. Also reports rough timings: backfilled count, +sample evaluation time, bitmap query time. +""" + +import random +import time +from argparse import ArgumentParser +from typing import Any + +from django.core.management.base import BaseCommand, CommandError +from flag_engine.segments.evaluator import is_context_in_segment + +from environments.identities.models import Identity +from environments.models import Environment +from segment_membership import services +from segments.models import Segment +from util.mappers.engine import ( + map_environment_to_evaluation_context, + map_segment_to_segment_context, +) + + +class Command(BaseCommand): + help = ( + "Differentially verify the segment membership index against the " + "live flag engine." + ) + + def add_arguments(self, parser: ArgumentParser) -> None: + parser.add_argument("--environment", type=int, required=True) + parser.add_argument("--segment", type=int, required=True) + parser.add_argument( + "--sample", + type=int, + default=0, + help="Number of identities to verify. 0 means every identity.", + ) + + def handle(self, *args: Any, **options: Any) -> None: + environment = self._get_environment(options["environment"]) + segment = self._get_segment(options["segment"]) + + bitmap_started = time.perf_counter() + bitmap = services.compute_membership_bitmap(segment, environment) + bitmap_elapsed = time.perf_counter() - bitmap_started + self.stdout.write( + f"Bitmap composed in {bitmap_elapsed * 1000:.1f}ms ({len(bitmap)} members)." + ) + + identities = self._select_identities(environment, options["sample"]) + if not identities: + raise CommandError("No identities to verify against.") + + mismatches, engine_elapsed = self._compare( + environment, segment, identities, bitmap + ) + total = len(identities) + self.stdout.write( + f"Evaluated {total} identities in {engine_elapsed:.2f}s " + f"({engine_elapsed / total * 1000:.2f}ms / identity)." + ) + + if mismatches: + self._report_mismatches(mismatches, total) + raise CommandError("Differential verification failed.") + + self.stdout.write( + self.style.SUCCESS( + f"OK: {total} identities, 0 mismatches. Bitmap size {len(bitmap)}." + ) + ) + + def _get_environment(self, env_id: int) -> Environment: + try: + env: Environment = Environment.objects.get(id=env_id) + except Environment.DoesNotExist as exc: + raise CommandError(f"Environment {env_id} not found") from exc + return env + + def _get_segment(self, seg_id: int) -> Segment: + try: + segment: Segment = Segment.live_objects.get(id=seg_id) + except Segment.DoesNotExist as exc: + raise CommandError(f"Segment {seg_id} not found") from exc + return segment + + def _select_identities( + self, environment: Environment, sample_n: int + ) -> list[Identity]: + identities_qs = Identity.objects.filter(environment=environment) + if sample_n: + ids = list(identities_qs.values_list("id", flat=True)) + if len(ids) > sample_n: + ids = random.sample(ids, sample_n) + identities_qs = Identity.objects.filter(id__in=ids) + return list(identities_qs.prefetch_related("identity_traits")) + + def _compare( + self, + environment: Environment, + segment: Segment, + identities: list[Identity], + bitmap: Any, + ) -> tuple[list[tuple[int, str, bool, bool]], float]: + segment_context = map_segment_to_segment_context(segment) + mismatches: list[tuple[int, str, bool, bool]] = [] + engine_started = time.perf_counter() + for identity in identities: + ctx = map_environment_to_evaluation_context( + environment=environment, + identity=identity, + traits=list(identity.identity_traits.all()), + ) + engine_match = is_context_in_segment(ctx, segment_context) + index_match = identity.id in bitmap + if engine_match != index_match: + mismatches.append( + (identity.id, identity.identifier, engine_match, index_match) + ) + return mismatches, time.perf_counter() - engine_started + + def _report_mismatches( + self, + mismatches: list[tuple[int, str, bool, bool]], + total: int, + ) -> None: + self.stderr.write( + self.style.ERROR(f"FAILED: {len(mismatches)} mismatches out of {total}.") + ) + for ident_id, identifier, engine_match, index_match in mismatches[:20]: + self.stderr.write( + f" identity={ident_id} {identifier!r} " + f"engine={engine_match} index={index_match}" + ) + if len(mismatches) > 20: + self.stderr.write(f" ... and {len(mismatches) - 20} more") diff --git a/api/segment_membership/mappers.py b/api/segment_membership/mappers.py new file mode 100644 index 000000000000..c528c99a7206 --- /dev/null +++ b/api/segment_membership/mappers.py @@ -0,0 +1,167 @@ +"""Map ORM Segment objects to atom catalogues and predicate trees. + +A segment evaluates as `is_context_in_segment` in the flag engine: + - Top-level rules are AND-combined. + - Each rule is `conditions_matcher(type)(conditions) AND all(nested_rules)`. + - `type=ALL` → AND of conditions; `ANY` → OR; `NONE` → AND of negated conditions. + +We translate this to a `PredicateTree` whose leaves are `AtomNode`s, and we +collect the set of unique atom keys referenced by the segment. +""" + +from typing import Iterable, cast + +from flag_engine.context.types import StrValueSegmentCondition +from flag_engine.segments import constants as engine_constants + +from segment_membership.constants import ( + KIND_ENVIRONMENT_NAME, + KIND_IDENTIFIER, + KIND_IDENTITY_KEY, + KIND_TRAIT, + PROPERTY_ENVIRONMENT_NAME, + PROPERTY_IDENTITY_IDENTIFIER, + PROPERTY_IDENTITY_KEY, + AtomKind, +) +from segment_membership.dataclasses import ( + AndNode, + AtomKey, + AtomNode, + FalseNode, + OrNode, + PredicateTree, + TrueNode, +) +from segment_membership.models import Atom +from segments.models import Condition, Segment, SegmentRule + + +def map_segment_condition_to_atom_kind(condition: Condition) -> AtomKind: + """Return the atom kind for a segment condition based on its property + and operator.""" + property_value = condition.property + if not property_value: + # The only legitimate empty-property case is legacy PERCENTAGE_SPLIT, + # which the engine evaluates against identity.key. + if condition.operator == engine_constants.PERCENTAGE_SPLIT: + return KIND_IDENTITY_KEY + # Empty property with any other operator is a malformed condition; + # treat it as a trait atom with an empty key so the engine returns + # False for everyone (mirrors current production behaviour). + return KIND_TRAIT + if property_value == PROPERTY_IDENTITY_IDENTIFIER: + return KIND_IDENTIFIER + if property_value == PROPERTY_IDENTITY_KEY: + return KIND_IDENTITY_KEY + if property_value == PROPERTY_ENVIRONMENT_NAME: + return KIND_ENVIRONMENT_NAME + return KIND_TRAIT + + +def map_segment_condition_to_operand_canonical( + condition: Condition, +) -> str | None: + """Return a canonical string form of a condition's operand so equivalent + declarations collapse to the same atom. PoC: trim whitespace, leave the + rest alone.""" + if condition.value is None: + return None + return condition.value.strip() + + +def map_atom_to_engine_condition(atom: Atom) -> StrValueSegmentCondition: + """Project an Atom row into the flag-engine's `SegmentCondition` shape so + it can be evaluated by `context_matches_condition`.""" + return cast( + StrValueSegmentCondition, + { + "property": atom.property, + "operator": atom.operator, + "value": atom.operand_canonical or "", + }, + ) + + +def map_segment_condition_to_atom_key( + condition: Condition, + segment: Segment, +) -> AtomKey: + salted = condition.operator == engine_constants.PERCENTAGE_SPLIT + return AtomKey( + kind=map_segment_condition_to_atom_kind(condition), + property=condition.property or "", + operator=condition.operator, + operand_canonical=map_segment_condition_to_operand_canonical(condition), + segment_key=str(segment.pk) if salted else None, + ) + + +def _map_conditions_to_predicate_tree( + conditions: Iterable[Condition], + rule_type: str, + segment: Segment, +) -> PredicateTree: + cond_list = list(conditions) + if not cond_list: + return TrueNode() + nodes: list[PredicateTree] = [ + AtomNode(key=map_segment_condition_to_atom_key(c, segment)) for c in cond_list + ] + if rule_type == SegmentRule.ALL_RULE: + return AndNode(children=nodes) + if rule_type == SegmentRule.ANY_RULE: + return OrNode(children=nodes) + if rule_type == SegmentRule.NONE_RULE: + # NONE = none of the conditions match = AND of negations. + return AndNode( + children=[ + AtomNode(key=n.key, negated=True) # type: ignore[union-attr] + for n in nodes + ] + ) + # Defensive — unknown rule type collapses to False. + return FalseNode() + + +def _map_segment_rule_to_predicate_tree( + rule: SegmentRule, + segment: Segment, +) -> PredicateTree: + cond_tree = _map_conditions_to_predicate_tree( + conditions=rule.conditions.all(), + rule_type=rule.type, + segment=segment, + ) + nested = [ + _map_segment_rule_to_predicate_tree(child, segment) + for child in rule.rules.all() + ] + if not nested: + return cond_tree + return AndNode(children=[cond_tree, *nested]) + + +def map_segment_to_predicate_tree(segment: Segment) -> PredicateTree: + """Map a Segment to a PredicateTree. Top-level rules are AND-combined.""" + rule_trees = [ + _map_segment_rule_to_predicate_tree(r, segment) for r in segment.rules.all() + ] + if not rule_trees: + return FalseNode() + return AndNode(children=rule_trees) + + +def map_predicate_tree_to_atom_keys(tree: PredicateTree) -> set[AtomKey]: + """Walk the tree and return the set of unique atom keys.""" + keys: set[AtomKey] = set() + _collect_atom_keys(tree, keys) + return keys + + +def _collect_atom_keys(tree: PredicateTree, keys: set[AtomKey]) -> None: + if isinstance(tree, AtomNode): + keys.add(tree.key) + elif isinstance(tree, (AndNode, OrNode)): + for child in tree.children: + _collect_atom_keys(child, keys) diff --git a/api/segment_membership/migrations/0001_add_segment_membership_index.py b/api/segment_membership/migrations/0001_add_segment_membership_index.py new file mode 100644 index 000000000000..789a010033d9 --- /dev/null +++ b/api/segment_membership/migrations/0001_add_segment_membership_index.py @@ -0,0 +1,101 @@ +# Generated by Django 5.2.13 on 2026-05-03 21:54 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ("environments", "0037_add_uuid_field"), + ] + + operations = [ + migrations.CreateModel( + name="Atom", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "kind", + models.CharField( + choices=[ + ("trait", "trait"), + ("identifier", "identifier"), + ("identity_key", "identity_key"), + ("environment_name", "environment_name"), + ], + max_length=32, + ), + ), + ("property", models.CharField(max_length=1000)), + ("operator", models.CharField(max_length=64)), + ("operand_canonical", models.TextField(blank=True, null=True)), + ("segment_key", models.CharField(blank=True, max_length=64, null=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "environment", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="segment_membership_atoms", + to="environments.environment", + ), + ), + ], + ), + migrations.CreateModel( + name="AtomBitmap", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("blob", models.BinaryField()), + ("cardinality", models.PositiveBigIntegerField(default=0)), + ("updated_at", models.DateTimeField(auto_now=True)), + ( + "atom", + models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + related_name="bitmap", + to="segment_membership.atom", + ), + ), + ], + ), + migrations.AddIndex( + model_name="atom", + index=models.Index( + fields=["environment", "kind", "property"], + name="segment_mem_environ_347049_idx", + ), + ), + migrations.AddConstraint( + model_name="atom", + constraint=models.UniqueConstraint( + fields=( + "environment", + "kind", + "property", + "operator", + "operand_canonical", + "segment_key", + ), + name="segment_membership_atom_uniq", + ), + ), + ] diff --git a/api/segment_membership/migrations/__init__.py b/api/segment_membership/migrations/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/api/segment_membership/models.py b/api/segment_membership/models.py new file mode 100644 index 000000000000..7f64ffcc0d21 --- /dev/null +++ b/api/segment_membership/models.py @@ -0,0 +1,65 @@ +from typing import get_args + +from django.db import models + +from environments.models import Environment +from segment_membership.constants import AtomKind + +ATOM_KIND_CHOICES = [(value, value) for value in get_args(AtomKind)] + + +class Atom(models.Model): + """A unary predicate over a single property — the basis we materialise. + + Atoms are env-scoped. Bitmaps for an atom contain the primary keys of the + identities that satisfy the predicate within that env — `Identity.id` is + used directly as the Roaring bitmap ordinal. No separate ordinal-mapping + table is required. + """ + + environment = models.ForeignKey( + Environment, + on_delete=models.CASCADE, + related_name="segment_membership_atoms", + ) + kind = models.CharField( + max_length=32, + choices=ATOM_KIND_CHOICES, + ) + property = models.CharField(max_length=1000) + operator = models.CharField(max_length=64) + operand_canonical = models.TextField(null=True, blank=True) + # Populated only for operators whose hash is salted by segment (PERCENTAGE_SPLIT). + segment_key = models.CharField(max_length=64, null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=[ + "environment", + "kind", + "property", + "operator", + "operand_canonical", + "segment_key", + ], + name="segment_membership_atom_uniq", + ), + ] + indexes = [ + models.Index(fields=["environment", "kind", "property"]), + ] + + +class AtomBitmap(models.Model): + """Roaring bitmap for an atom, serialised via pyroaring.""" + + atom = models.OneToOneField( + Atom, + on_delete=models.CASCADE, + related_name="bitmap", + ) + blob = models.BinaryField() + cardinality = models.PositiveBigIntegerField(default=0) + updated_at = models.DateTimeField(auto_now=True) diff --git a/api/segment_membership/services.py b/api/segment_membership/services.py new file mode 100644 index 000000000000..ef7e74f24fbf --- /dev/null +++ b/api/segment_membership/services.py @@ -0,0 +1,434 @@ +"""Services for the segment membership index. + +The model is a finite atom basis materialised as Roaring bitmaps; segment +membership is composed via Boolean algebra over those bitmaps. See the RFC +in `segment-membership-rfc.md`. + +Ordinals: this module uses `Identity.id` directly as the Roaring bitmap ord +for an env's atoms. No separate ordinal-mapping table — atoms are still +env-scoped, but a bitmap's ords are just the global Postgres primary keys +of the identities that satisfy the predicate in that env. Trade-offs are +captured in the RFC's "Ordinal allocation" section. +""" + +import random +from collections.abc import Iterable +from typing import Any + +import structlog +from flag_engine.context.types import EvaluationContext +from flag_engine.segments.evaluator import context_matches_condition +from pyroaring import BitMap + +from environments.identities.models import Identity +from environments.identities.traits.models import Trait +from environments.models import Environment +from segment_membership.constants import KIND_IDENTIFIER, KIND_IDENTITY_KEY +from segment_membership.dataclasses import ( + AndNode, + AtomKey, + AtomNode, + FalseNode, + OrNode, + PredicateTree, + TrueNode, +) +from segment_membership.mappers import ( + map_atom_to_engine_condition, + map_predicate_tree_to_atom_keys, + map_segment_to_predicate_tree, +) +from segment_membership.models import Atom, AtomBitmap +from segments.models import Segment +from util.mappers.engine import map_environment_to_evaluation_context + +logger = structlog.get_logger("segment_membership") + + +# --- atom catalogue ---------------------------------------------------------- + + +def ensure_atoms( + environment: Environment, + segment: Segment, +) -> dict[AtomKey, Atom]: + """Upsert Atom rows for every unique atom key referenced by the segment.""" + tree = map_segment_to_predicate_tree(segment) + keys = map_predicate_tree_to_atom_keys(tree) + return _upsert_atoms(environment, keys) + + +def _upsert_atoms( + environment: Environment, + keys: Iterable[AtomKey], +) -> dict[AtomKey, Atom]: + out: dict[AtomKey, Atom] = {} + for key in keys: + atom, _ = Atom.objects.get_or_create( + environment=environment, + kind=key.kind, + property=key.property, + operator=key.operator, + operand_canonical=key.operand_canonical, + segment_key=key.segment_key, + ) + out[key] = atom + return out + + +# --- engine bridge ----------------------------------------------------------- + + +def evaluate_atom( + atom: Atom, + context: EvaluationContext[Any, Any], +) -> bool: + """Evaluate one atom against a pre-built engine context.""" + return context_matches_condition( + context=context, + condition=map_atom_to_engine_condition(atom), + segment_key=atom.segment_key or "", + ) + + +# --- bitmap storage ---------------------------------------------------------- + + +def load_bitmap(atom: Atom) -> BitMap: + try: + blob = atom.bitmap.blob + except AtomBitmap.DoesNotExist: + return BitMap() + return BitMap.deserialize(bytes(blob)) if blob else BitMap() + + +def save_bitmap(atom: Atom, bitmap: BitMap) -> None: + blob = bitmap.serialize() + AtomBitmap.objects.update_or_create( + atom=atom, + defaults={"blob": blob, "cardinality": len(bitmap)}, + ) + + +def universe_bitmap(environment: Environment) -> BitMap: + """All identity PKs in this environment — the universe used for negation.""" + ids = Identity.objects.filter(environment=environment).values_list("id", flat=True) + return BitMap(ids) + + +# --- predicate tree → bitmap ------------------------------------------------- + + +class _UniverseCache: + """Lazy accessor for the env's universe bitmap. + + The universe is `Identity.id` for every identity in the env — only + consulted when a NONE rule (negation) or an empty AND/TrueNode is + encountered. Most segments never need it, so we defer the read. + """ + + def __init__(self, environment: Environment): + self._environment = environment + self._bm: BitMap | None = None + + def get(self) -> BitMap: + if self._bm is None: + self._bm = universe_bitmap(self._environment) + return self._bm + + +def _compose( + tree: PredicateTree, + atoms_by_key: dict[AtomKey, Atom], + universe: _UniverseCache, +) -> BitMap: + if isinstance(tree, TrueNode): + return BitMap(universe.get()) + if isinstance(tree, FalseNode): + return BitMap() + if isinstance(tree, AtomNode): + atom = atoms_by_key.get(tree.key) + bm = load_bitmap(atom) if atom is not None else BitMap() + if tree.negated: + return BitMap(universe.get()) - bm + return bm + if isinstance(tree, AndNode): + if not tree.children: + return BitMap(universe.get()) + children = [_compose(c, atoms_by_key, universe) for c in tree.children] + # Smallest-first to minimise intermediate work. + children.sort(key=len) + result = children[0] + for c in children[1:]: + result = result & c + return result + if isinstance(tree, OrNode): + union: BitMap = BitMap() + for child in tree.children: + union = union | _compose(child, atoms_by_key, universe) + return union + raise ValueError(f"unknown predicate tree node: {tree!r}") + + +def compute_membership_bitmap( + segment: Segment, + environment: Environment, +) -> BitMap: + tree = map_segment_to_predicate_tree(segment) + keys = map_predicate_tree_to_atom_keys(tree) + atoms_by_key = _load_atoms(environment, keys) + return _compose(tree, atoms_by_key, _UniverseCache(environment)) + + +def _load_atoms( + environment: Environment, + keys: Iterable[AtomKey], +) -> dict[AtomKey, Atom]: + out: dict[AtomKey, Atom] = {} + for key in keys: + try: + atom = Atom.objects.get( + environment=environment, + kind=key.kind, + property=key.property, + operator=key.operator, + operand_canonical=key.operand_canonical, + segment_key=key.segment_key, + ) + except Atom.DoesNotExist: + continue + out[key] = atom + return out + + +# --- read API ---------------------------------------------------------------- + + +def count(segment: Segment, environment: Environment) -> int: + return len(compute_membership_bitmap(segment, environment)) + + +def iter_members( + segment: Segment, + environment: Environment, + cursor: int = 0, + limit: int = 100, +) -> list[Identity]: + bitmap = compute_membership_bitmap(segment, environment) + ids: list[int] = [] + for ident_id in bitmap: + if ident_id < cursor: + continue + ids.append(ident_id) + if len(ids) >= limit: + break + return _ids_to_identities(ids) + + +def sample( + segment: Segment, + environment: Environment, + n: int, +) -> list[Identity]: + bitmap = compute_membership_bitmap(segment, environment) + cardinality = len(bitmap) + if cardinality <= n: + ids = list(bitmap) + else: + # Roaring supports rank-select in O(log N), so we pick n random ranks + # and resolve them directly. Avoids iterating the whole bitmap. + indices = random.sample(range(cardinality), n) + ids = [bitmap[i] for i in indices] + return _ids_to_identities(ids) + + +def _ids_to_identities(ids: list[int]) -> list[Identity]: + if not ids: + return [] + rows = Identity.objects.filter(id__in=ids) + by_id = {ident.id: ident for ident in rows} + return [by_id[i] for i in ids if i in by_id] + + +# --- backfill ---------------------------------------------------------------- + + +def backfill_segment( + environment: Environment, + segment: Segment, + *, + rebuild: bool = False, +) -> dict[int, int]: + """Ensure atoms exist and backfill any missing bitmaps for `segment` in + `environment`. Returns `{atom_id: cardinality}`. + + With `rebuild=True`, every atom's bitmap is recomputed even if present. + """ + atoms_by_key = ensure_atoms(environment, segment) + atoms = list(atoms_by_key.values()) + return backfill_atoms(environment, atoms, rebuild=rebuild) + + +def backfill_segments( + environment: Environment, + segments: Iterable[Segment], + *, + rebuild: bool = False, +) -> dict[int, int]: + """Ensure atoms across multiple segments and backfill them in one identity + pass. Each identity is loaded and evaluated against the union of atoms, + so backfill is `O(|I_env| · |atoms|)` rather than `O(|I_env| · |segments|)`. + """ + atoms_by_key: dict[AtomKey, Atom] = {} + for segment in segments: + atoms_by_key.update(ensure_atoms(environment, segment)) + return backfill_atoms(environment, list(atoms_by_key.values()), rebuild=rebuild) + + +def backfill_atoms( + environment: Environment, + atoms: list[Atom], + *, + rebuild: bool = False, +) -> dict[int, int]: + if not atoms: + return {} + + if not rebuild: + existing = set( + AtomBitmap.objects.filter(atom__in=atoms).values_list("atom_id", flat=True) + ) + atoms = [a for a in atoms if a.id not in existing] + if not atoms: + return {} + + logger.info( + "backfill.atoms.start", + environment__id=environment.id, + atoms__count=len(atoms), + ) + + bitmaps: dict[int, BitMap] = {atom.id: BitMap() for atom in atoms} + + queryset = ( + Identity.objects.filter(environment=environment) + .only("id", "identifier", "environment_id") + .prefetch_related("identity_traits") + ) + + processed = 0 + for identity in queryset.iterator(chunk_size=2_000): + traits = list(identity.identity_traits.all()) + ctx = map_environment_to_evaluation_context( + environment=environment, + identity=identity, + traits=traits, + ) + for atom in atoms: + try: + if evaluate_atom(atom, ctx): + bitmaps[atom.id].add(identity.id) + except Exception: + logger.exception( + "backfill.atom.eval_failed", + environment__id=environment.id, + atom__id=atom.id, + identity__id=identity.id, + ) + processed += 1 + + cardinalities: dict[int, int] = {} + for atom in atoms: + bm = bitmaps[atom.id] + save_bitmap(atom, bm) + cardinalities[atom.id] = len(bm) + + logger.info( + "backfill.atoms.completed", + environment__id=environment.id, + atoms__count=len(atoms), + identities__count=processed, + ) + return cardinalities + + +# --- maintenance hooks ------------------------------------------------------- + + +def on_identity_created(identity: Identity) -> None: + """Evaluate identifier/identity-key atoms for a newly-created identity. + With PK-as-ordinal there's nothing to allocate — identity.id is the ord.""" + environment = identity.environment + atoms = list( + Atom.objects.filter( + environment=environment, + kind__in=[KIND_IDENTIFIER, KIND_IDENTITY_KEY], + ) + ) + if not atoms: + return + ctx = map_environment_to_evaluation_context( + environment=environment, + identity=identity, + traits=[], + ) + _apply_atom_bits(atoms, ctx, identity.id) + + +def on_traits_changed( + identity: Identity, + changed_keys: Iterable[str], +) -> None: + """Re-evaluate every atom whose property is among `changed_keys`.""" + keys = list({k for k in changed_keys if k}) + if not keys: + return + environment = identity.environment + atoms = list(Atom.objects.filter(environment=environment, property__in=keys)) + if not atoms: + return + # Bypass any prefetched `identity_traits` cache the caller may carry — we + # need post-write state, not whatever was loaded earlier in the request. + traits = list(Trait.objects.filter(identity=identity)) + ctx = map_environment_to_evaluation_context( + environment=environment, + identity=identity, + traits=traits, + ) + _apply_atom_bits(atoms, ctx, identity.id) + + +def on_identity_deleted(identity_id: int, environment_id: int) -> None: + """Clear the deleted identity's bit in every atom for the env.""" + bitmaps = AtomBitmap.objects.filter( + atom__environment_id=environment_id + ).select_related("atom") + for ab in bitmaps: + bm = BitMap.deserialize(bytes(ab.blob)) if ab.blob else BitMap() + if identity_id in bm: + bm.discard(identity_id) + ab.blob = bm.serialize() + ab.cardinality = len(bm) + ab.save(update_fields=["blob", "cardinality", "updated_at"]) + + +def _apply_atom_bits( + atoms: list[Atom], + context: EvaluationContext[Any, Any], + ord_: int, +) -> None: + for atom in atoms: + try: + matches = evaluate_atom(atom, context) + except Exception: + logger.exception("atom.eval_failed", atom__id=atom.id) + continue + bm = load_bitmap(atom) + present = ord_ in bm + if matches and not present: + bm.add(ord_) + elif not matches and present: + bm.discard(ord_) + else: + continue + save_bitmap(atom, bm) diff --git a/api/segment_membership/signals.py b/api/segment_membership/signals.py new file mode 100644 index 000000000000..02167c9544a8 --- /dev/null +++ b/api/segment_membership/signals.py @@ -0,0 +1,132 @@ +"""Signal handlers maintaining the segment membership index. + +Handlers stay strictly synchronous on the request path: they only enqueue +async tasks via `flagsmith-task-processor`. The actual bitmap work happens in +`segment_membership.tasks`. This keeps the user-facing write path clear of +ordinal allocation, atom evaluation, and bitmap I/O. + +Tasks are idempotent (each reads current state and recomputes bits), so +out-of-order delivery and at-least-once retries converge correctly. +""" + +from typing import Any + +import structlog +from django.conf import settings +from django.db.models.signals import post_delete, post_save +from django.dispatch import receiver + +from environments.identities.models import Identity +from environments.identities.signals import traits_changed +from environments.identities.traits.models import Trait +from segment_membership import tasks +from segments.models import Segment + +logger = structlog.get_logger("segment_membership") + + +def _enabled() -> bool: + return getattr(settings, "SEGMENT_MEMBERSHIP_ENABLED", False) + + +@receiver(post_save, sender=Identity) +def _identity_post_save( + sender: type[Identity], + instance: Identity, + created: bool, + **kwargs: Any, +) -> None: + if not _enabled() or not created: + return + tasks.process_identity_created.delay(kwargs={"identity_id": instance.id}) + + +@receiver(post_delete, sender=Identity) +def _identity_post_delete( + sender: type[Identity], + instance: Identity, + **kwargs: Any, +) -> None: + if not _enabled(): + return + tasks.process_identity_deleted.delay( + kwargs={ + "identity_id": instance.id, + "environment_id": instance.environment_id, + } + ) + + +@receiver(post_save, sender=Trait) +def _trait_post_save( + sender: type[Trait], + instance: Trait, + created: bool, + **kwargs: Any, +) -> None: + if not _enabled(): + return + tasks.process_traits_changed.delay( + kwargs={ + "identity_id": instance.identity_id, + "changed_keys": [instance.trait_key], + } + ) + + +@receiver(post_delete, sender=Trait) +def _trait_post_delete( + sender: type[Trait], + instance: Trait, + **kwargs: Any, +) -> None: + if not _enabled(): + return + tasks.process_traits_changed.delay( + kwargs={ + "identity_id": instance.identity_id, + "changed_keys": [instance.trait_key], + } + ) + + +@receiver(traits_changed) +def _traits_changed( + sender: type[Identity], + instance: Identity, + changed_keys: set[str], + **kwargs: Any, +) -> None: + """Bulk write paths in `Identity.update_traits` and + `Identity.generate_traits(persist=True)` bypass `post_save`. This handler + enqueues the maintenance task for the union of keys those paths report.""" + if not _enabled(): + return + if not changed_keys: + return + tasks.process_traits_changed.delay( + kwargs={ + "identity_id": instance.id, + "changed_keys": list(changed_keys), + } + ) + + +@receiver(post_save, sender=Segment) +def _segment_post_save( + sender: type[Segment], + instance: Segment, + created: bool, + **kwargs: Any, +) -> None: + """A canonical segment was created or edited. Edits go through + `serializer.update()` which mutates the canonical row in place after + snapshotting a non-canonical revision; we filter to canonical rows by + `version_of_id == id`, so the snapshot saves don't trigger backfill.""" + if not _enabled(): + return + if instance.version_of_id != instance.id: + return + tasks.process_segment_canonical_changed.delay( + kwargs={"segment_id": instance.id}, + ) diff --git a/api/segment_membership/tasks.py b/api/segment_membership/tasks.py new file mode 100644 index 000000000000..2a0ca0136ede --- /dev/null +++ b/api/segment_membership/tasks.py @@ -0,0 +1,71 @@ +"""Async maintenance tasks for the segment membership index. + +Signal handlers in `signals.py` enqueue these tasks. The actual bitmap work +(ordinal allocation, atom evaluation, bitmap rewrites) runs in the task +processor — off the user-facing write path. Transactional correctness inside +the tasks is fine because the tasks are not on a hot path. + +Tasks are idempotent by construction: each handler reads current state and +recomputes the bit value, so re-runs and out-of-order delivery converge to +the correct answer. +""" + +from typing import Iterable + +import structlog +from task_processor.decorators import register_task_handler + +from environments.identities.models import Identity +from environments.models import Environment +from segment_membership import services +from segments.models import Segment + +logger = structlog.get_logger("segment_membership") + + +@register_task_handler() +def process_identity_created(*, identity_id: int) -> None: + """Allocate an ordinal and evaluate identifier/identity-key atoms for a + newly-created identity.""" + try: + identity = Identity.objects.select_related("environment").get(id=identity_id) + except Identity.DoesNotExist: + # Identity was deleted between enqueue and execute. Nothing to do. + return + services.on_identity_created(identity) + + +@register_task_handler() +def process_traits_changed( + *, + identity_id: int, + changed_keys: Iterable[str], +) -> None: + """Re-evaluate every atom whose property is among `changed_keys` against + the identity's current trait state.""" + try: + identity = Identity.objects.select_related("environment").get(id=identity_id) + except Identity.DoesNotExist: + return + services.on_traits_changed(identity, list(changed_keys)) + + +@register_task_handler() +def process_identity_deleted(*, identity_id: int, environment_id: int) -> None: + """Clear the deleted identity's bit from every atom in its environment.""" + services.on_identity_deleted(identity_id, environment_id) + + +@register_task_handler() +def process_segment_canonical_changed(*, segment_id: int) -> None: + """A canonical Segment row was created or its rules edited. Ensure the + segment's atoms exist and backfill any new bitmaps in every env in the + project. Atoms that drop out of the segment as a result of an edit are + left in place (orphans); see the RFC's catalogue-maintenance section.""" + try: + segment = Segment.live_objects.select_related("project").get(id=segment_id) + except Segment.DoesNotExist: + # Either deleted before the task ran, or the row is no longer canonical. + return + for environment in Environment.objects.filter(project=segment.project): + services.backfill_segment(environment, segment) diff --git a/api/tests/unit/segment_membership/__init__.py b/api/tests/unit/segment_membership/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_smoke.py b/api/tests/unit/segment_membership/test_unit_segment_membership_smoke.py new file mode 100644 index 000000000000..ffaf2dd91656 --- /dev/null +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_smoke.py @@ -0,0 +1,363 @@ +"""End-to-end smoke / differential test for the segment membership PoC. + +Builds a synthetic environment with a mix of operators, runs +`services.backfill_segment`, then verifies that the bitmap-derived membership +matches the live flag engine for every identity. +""" + +from __future__ import annotations + +from collections.abc import Iterator +from typing import Any, cast + +import pytest +from flag_engine.segments import constants as op +from flag_engine.segments.evaluator import is_context_in_segment + +from core.constants import INTEGER, STRING +from environments.identities.models import Identity +from environments.identities.traits.models import Trait +from environments.models import Environment +from environments.sdk.types import SDKTraitData +from segment_membership import services +from segment_membership.constants import ( + PROPERTY_IDENTITY_IDENTIFIER, +) +from segment_membership.models import Atom, AtomBitmap +from segments.models import Condition, Segment, SegmentRule +from util.mappers.engine import ( + map_environment_to_evaluation_context, + map_segment_to_segment_context, +) + + +@pytest.fixture +def segment_membership_enabled(settings: Any) -> None: + """Enable the segment membership maintenance signals for a test.""" + settings.SEGMENT_MEMBERSHIP_ENABLED = True + + +@pytest.fixture +def populated_environment(environment: Environment) -> Environment: + """Seed 30 identities with diverse traits.""" + countries = ["US", "GB", "DE", "FR", "ES"] + plans = ["free", "pro", "enterprise"] + for i in range(30): + identity = Identity.objects.create( + identifier=f"user-{i:03d}", + environment=environment, + ) + Trait.objects.create( + identity=identity, + trait_key="country", + value_type=STRING, + string_value=countries[i % len(countries)], + ) + Trait.objects.create( + identity=identity, + trait_key="plan", + value_type=STRING, + string_value=plans[i % len(plans)], + ) + Trait.objects.create( + identity=identity, + trait_key="age", + value_type=INTEGER, + integer_value=18 + i, + ) + if i % 4 == 0: + Trait.objects.create( + identity=identity, + trait_key="email", + value_type=STRING, + string_value=f"u{i}@example.com", + ) + return environment + + +def _make_segment( + project_id: int, + name: str, + conditions: list[tuple[str | None, str, str]], +) -> Segment: + segment: Segment = Segment.objects.create(name=name, project_id=project_id) + rule = SegmentRule.objects.create(segment=segment, type=SegmentRule.ALL_RULE) + for prop, oper, value in conditions: + Condition.objects.create( + rule=rule, + property=prop, + operator=oper, + value=value, + ) + return segment + + +@pytest.fixture +def segments_under_test(populated_environment: Environment) -> list[Segment]: + project_id = populated_environment.project_id + return [ + _make_segment( + project_id, + "country=US", + [("country", op.EQUAL, "US")], + ), + _make_segment( + project_id, + "plan in pro,enterprise", + [("plan", op.IN, "pro,enterprise")], + ), + _make_segment( + project_id, + "age >= 30", + [("age", op.GREATER_THAN_INCLUSIVE, "30")], + ), + _make_segment( + project_id, + "email regex", + [("email", op.REGEX, r"^u\d+@example\.com$")], + ), + _make_segment( + project_id, + "email is set", + [("email", op.IS_SET, "")], + ), + _make_segment( + project_id, + "country US AND plan pro", + [("country", op.EQUAL, "US"), ("plan", op.EQUAL, "pro")], + ), + _make_segment( + project_id, + "identifier % split 50", + [(PROPERTY_IDENTITY_IDENTIFIER, op.PERCENTAGE_SPLIT, "50")], + ), + _make_segment( + project_id, + "age modulo 5|0", + [("age", op.MODULO, "5|0")], + ), + ] + + +def _engine_match( + environment: Environment, + segment: Segment, + identity: Identity, +) -> bool: + traits = list(identity.identity_traits.all()) + ctx = map_environment_to_evaluation_context( + environment=environment, + identity=identity, + traits=traits, + ) + return is_context_in_segment(ctx, map_segment_to_segment_context(segment)) + + +def _index_match(bitmap: object, identity: Identity) -> bool: + return identity.id in bitmap # type: ignore[operator] + + +def _iter_identities(environment: Environment) -> Iterator[Identity]: + yield from Identity.objects.filter(environment=environment).prefetch_related( + "identity_traits" + ) + + +def test_backfill__diverse_operator_segments__bitmap_matches_engine_for_every_identity( + populated_environment: Environment, + segments_under_test: list[Segment], +) -> None: + # Given a populated environment and a battery of segments covering the operator vocabulary + environment = populated_environment + + # When we backfill the index for each segment + for segment in segments_under_test: + services.backfill_segment(environment, segment) + + # Then every (segment, identity) pair agrees with the engine + mismatches: list[tuple[str, str, bool, bool]] = [] + for segment in segments_under_test: + bitmap = services.compute_membership_bitmap(segment, environment) + for identity in _iter_identities(environment): + engine = _engine_match(environment, segment, identity) + indexed = _index_match(bitmap, identity) + if engine != indexed: + mismatches.append((segment.name, identity.identifier, engine, indexed)) + + assert mismatches == [], f"Differential mismatches: {mismatches[:10]}" + + +def test_count_and_iter_members__country_us__matches_engine_membership_set( + populated_environment: Environment, + segments_under_test: list[Segment], +) -> None: + # Given the country=US segment from the battery + environment = populated_environment + segment = next(s for s in segments_under_test if s.name == "country=US") + services.backfill_segment(environment, segment) + + # When we ask the index for count and members + count = services.count(segment, environment) + members = services.iter_members(segment, environment, cursor=0, limit=1000) + + # Then they agree with the engine + expected_identifiers = { + identity.identifier + for identity in _iter_identities(environment) + if _engine_match(environment, segment, identity) + } + assert count == len(expected_identifiers) + assert {m.identifier for m in members} == expected_identifiers + + +def test_atom_catalogue__country_us_segment__creates_one_trait_atom( + populated_environment: Environment, + segments_under_test: list[Segment], +) -> None: + # Given the country=US segment + environment = populated_environment + segment = next(s for s in segments_under_test if s.name == "country=US") + + # When we ensure atoms + services.ensure_atoms(environment, segment) + + # Then exactly one trait atom is registered + atoms = list(Atom.objects.filter(environment=environment)) + assert len(atoms) == 1 + atom = atoms[0] + assert atom.kind == "trait" + assert atom.property == "country" + assert atom.operator == op.EQUAL + assert atom.operand_canonical == "US" + assert atom.segment_key is None + + +def test_atom_catalogue__percentage_split__atom_carries_segment_key( + populated_environment: Environment, + segments_under_test: list[Segment], +) -> None: + # Given the % Split segment + environment = populated_environment + segment = next(s for s in segments_under_test if "% split" in s.name) + + # When we ensure atoms + services.ensure_atoms(environment, segment) + + # Then the atom is salted by segment id + atom = Atom.objects.get(environment=environment, operator=op.PERCENTAGE_SPLIT) + assert atom.segment_key == str(segment.pk) + + +def test_ordinals__pk_used_directly__bitmap_contains_identity_pks( + populated_environment: Environment, + segments_under_test: list[Segment], +) -> None: + # Given the country=US segment + environment = populated_environment + segment = next(s for s in segments_under_test if s.name == "country=US") + + # When we backfill + services.backfill_segment(environment, segment) + + # Then the bitmap contains exactly the matching identities' primary keys + atom = Atom.objects.get(environment=environment, property="country") + bitmap = services.load_bitmap(atom) + expected_ids = set( + Identity.objects.filter( + environment=environment, identity_traits__string_value="US" + ).values_list("id", flat=True) + ) + assert set(bitmap) == expected_ids + + +def test_update_traits__bulk_write_with_index_enabled__bitmap_reflects_new_value( + populated_environment: Environment, + segment_membership_enabled: None, +) -> None: + # Given a country=US segment with a backfilled bitmap + environment = populated_environment + segment = _make_segment( + environment.project_id, + "country=US", + [("country", op.EQUAL, "US")], + ) + services.backfill_segment(environment, segment) + atom = Atom.objects.get(environment=environment, property="country") + + # And an identity that does not currently match the segment + non_matching = next( + identity + for identity in _iter_identities(environment) + if not _engine_match(environment, segment, identity) + ) + initial = AtomBitmap.objects.get(atom=atom).cardinality + + # When the SDK ingestion path bulk-updates the country trait to US + non_matching.update_traits( + [cast(SDKTraitData, {"trait_key": "country", "trait_value": "US"})] + ) + + # Then the bitmap reflects the new membership without an explicit backfill + refreshed = AtomBitmap.objects.get(atom=atom) + assert refreshed.cardinality == initial + 1 + bitmap = services.compute_membership_bitmap(segment, environment) + assert non_matching.id in bitmap + + +def test_update_traits__bulk_delete_with_index_enabled__bitmap_clears_member( + populated_environment: Environment, + segment_membership_enabled: None, +) -> None: + # Given a country=US segment with a backfilled bitmap + environment = populated_environment + segment = _make_segment( + environment.project_id, + "country=US", + [("country", op.EQUAL, "US")], + ) + services.backfill_segment(environment, segment) + atom = Atom.objects.get(environment=environment, property="country") + + # And an identity that currently matches the segment + matching = next( + identity + for identity in _iter_identities(environment) + if _engine_match(environment, segment, identity) + ) + initial = AtomBitmap.objects.get(atom=atom).cardinality + + # When the SDK ingestion path nulls out the country trait + matching.update_traits( + [cast(SDKTraitData, {"trait_key": "country", "trait_value": None})] + ) + + # Then the identity is removed from the bitmap + refreshed = AtomBitmap.objects.get(atom=atom) + assert refreshed.cardinality == initial - 1 + bitmap = services.compute_membership_bitmap(segment, environment) + assert matching.id not in bitmap + + +def test_bitmap_storage__after_backfill__atom_bitmaps_persist_with_correct_cardinality( + populated_environment: Environment, + segments_under_test: list[Segment], +) -> None: + # Given the country=US segment + environment = populated_environment + segment = next(s for s in segments_under_test if s.name == "country=US") + + # When we backfill + services.backfill_segment(environment, segment) + + # Then the bitmap row exists with the expected cardinality + atom = Atom.objects.get(environment=environment, property="country") + bitmap = AtomBitmap.objects.get(atom=atom) + expected = sum( + 1 + for identity in _iter_identities(environment) + if any( + t.trait_key == "country" and t.string_value == "US" + for t in identity.identity_traits.all() + ) + ) + assert bitmap.cardinality == expected diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index b83a02aed053..07edd08a445d 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -315,6 +315,43 @@ Logged at `warning` from: Attributes: +### `segment_membership.atom.eval_failed` + +Logged at `exception` from: + - `api/segment_membership/services.py:424` + +Attributes: + - `atom.id` + +### `segment_membership.backfill.atom.eval_failed` + +Logged at `exception` from: + - `api/segment_membership/services.py:332` + +Attributes: + - `atom.id` + - `environment.id` + - `identity.id` + +### `segment_membership.backfill.atoms.completed` + +Logged at `info` from: + - `api/segment_membership/services.py:346` + +Attributes: + - `atoms.count` + - `environment.id` + - `identities.count` + +### `segment_membership.backfill.atoms.start` + +Logged at `info` from: + - `api/segment_membership/services.py:305` + +Attributes: + - `atoms.count` + - `environment.id` + ### `segments.serializers.segment_revision_created` Logged at `info` from: