From 0db68ae56640d0ec724e55d243814029abdbb9e8 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Fri, 1 May 2026 16:25:00 +0530 Subject: [PATCH] fix: follow environment-document pagination Link headers Walk RFC 5988 Link: rel="next" cursors on /environment-document and merge follow-up pages' identity_overrides into the page-1 base. Without this the proxy silently drops every override past the first ~1 MB page and serves stale flag values for affected identities. A single warn! fires per refresh if total fetch time crosses api_poll_frequency_seconds, naming both durations. Refs Flagsmith/edge-proxy-rs#10 --- Cargo.lock | 22 +++ Cargo.toml | 1 + src/services/environment.rs | 277 ++++++++++++++++++++++---- tests/test_environment_pagination.rs | 279 +++++++++++++++++++++++++++ 4 files changed, 544 insertions(+), 35 deletions(-) create mode 100644 tests/test_environment_pagination.rs diff --git a/Cargo.lock b/Cargo.lock index b3c847c..16dfc9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -534,6 +534,7 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "tracing-test", "validator", "wiremock", ] @@ -2477,6 +2478,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a4c448db514d4f24c5ddb9f73f2ee71bfb24c526cf0c570ba142d1119e0051" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad06847b7afb65c7866a36664b75c40b895e318cea4f71299f013fb22965329d" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 91b955d..f985108 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ flagsmith-flag-engine = "0.6" axum-test = "18" mockall = "0.14" tokio-test = "0.4" +tracing-test = { version = "0.2", features = ["no-env-filter"] } wiremock = "0.6" [profile.release] diff --git a/src/services/environment.rs b/src/services/environment.rs index f79aa86..914d7c0 100644 --- a/src/services/environment.rs +++ b/src/services/environment.rs @@ -7,12 +7,13 @@ use chrono::{DateTime, Utc}; use flagsmith_flag_engine::engine::get_evaluation_result; use flagsmith_flag_engine::engine_eval::{FlagResult, add_identity_to_context}; use flagsmith_flag_engine::identities::Trait as FlagsmithTrait; -use reqwest::Client; +use reqwest::header::HeaderMap; +use reqwest::{Client, Url}; use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::RwLock; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; pub struct EnvironmentService { pub cache: Arc, @@ -111,45 +112,82 @@ impl EnvironmentService { } async fn fetch_environment(&self, pair: &EnvironmentKeyPair) -> Result { - let url = format!("{}/environment-document/", self.settings.api_url); - - let if_modified_since = - if let Some(cached_doc) = self.cache.get_environment(&pair.client_side_key).await { - cached_doc - .get("updated_at") - .and_then(|v| v.as_str()) - .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) - .map(|dt| dt.to_rfc2822()) - } else { - None - }; + let if_modified_since = self + .cache + .get_environment(&pair.client_side_key) + .await + .as_deref() + .and_then(compute_if_modified_since); - let mut request = self - .client - .get(&url) - .header("X-Environment-Key", &pair.server_side_key); + let mut next_url = format!("{}/environment-document/", self.settings.api_url); + let mut document: Option = None; + let started_at = Instant::now(); + let mut warned_slow = false; - if let Some(if_modified_since_value) = if_modified_since { - request = request.header("If-Modified-Since", if_modified_since_value); - } + loop { + self.warn_if_slow(started_at, &mut warned_slow); + + let mut request = self + .client + .get(&next_url) + .header("X-Environment-Key", &pair.server_side_key); + // If-Modified-Since is meaningful only on the first request; the + // upstream pagination cursor (page_id) drives subsequent fetches. + if document.is_none() { + if let Some(ref value) = if_modified_since { + request = request.header("If-Modified-Since", value); + } + } - let response = request.send().await?; + let response = request.send().await?; + + if document.is_none() && response.status() == reqwest::StatusCode::NOT_MODIFIED { + return self + .cache + .get_environment(&pair.client_side_key) + .await + .map(|arc| (*arc).clone()) + .ok_or_else(|| { + EdgeProxyError::ServiceUnavailable("Cache inconsistency".to_string()) + }); + } - if response.status() == reqwest::StatusCode::NOT_MODIFIED { - return self - .cache - .get_environment(&pair.client_side_key) - .await - .map(|arc| (*arc).clone()) - .ok_or_else(|| { - EdgeProxyError::ServiceUnavailable("Cache inconsistency".to_string()) - }); + response.error_for_status_ref()?; + + let next_link = parse_next_link(response.headers(), &self.settings.api_url); + let body: serde_json::Value = response.json().await?; + + match document.as_mut() { + None => document = Some(body), + Some(base) => merge_paginated_overrides(base, body), + } + + match next_link { + Some(url) => next_url = url, + None => break, + } } - response.error_for_status_ref()?; + document.ok_or_else(|| { + EdgeProxyError::ServiceUnavailable("environment-document returned no pages".to_string()) + }) + } - let document: serde_json::Value = response.json().await?; - Ok(document) + fn warn_if_slow(&self, started_at: Instant, warned: &mut bool) { + if *warned { + return; + } + let poll_interval = Duration::from_secs(self.settings.api_poll_frequency_seconds); + let elapsed = started_at.elapsed(); + if elapsed > poll_interval { + warn!( + elapsed_seconds = elapsed.as_secs_f64(), + poll_frequency_seconds = self.settings.api_poll_frequency_seconds, + "environment-document fetch exceeded the configured poll interval; \ + raise api_poll_frequency_seconds or trim the environment" + ); + *warned = true; + } } pub async fn get_environment(&self, environment_key: &str) -> Result> { @@ -396,3 +434,172 @@ impl EnvironmentService { } } } + +/// Format the cached document's `updated_at` as an RFC 2822 `If-Modified-Since` +/// value. Returns `None` if the field is missing or unparseable. +fn compute_if_modified_since(cached_doc: &serde_json::Value) -> Option { + cached_doc + .get("updated_at") + .and_then(|v| v.as_str()) + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.to_rfc2822()) +} + +/// Append `identity_overrides` from a follow-up page onto the page-1 base. +/// +/// Subsequent pages echo back the same `project`, `feature_states`, etc., +/// so merging is purely an extend of the override array. +fn merge_paginated_overrides(base: &mut serde_json::Value, page: serde_json::Value) { + let serde_json::Value::Object(mut page_obj) = page else { + return; + }; + let Some(serde_json::Value::Array(mut new_overrides)) = page_obj.remove("identity_overrides") + else { + return; + }; + if new_overrides.is_empty() { + return; + } + + let Some(base_obj) = base.as_object_mut() else { + return; + }; + let entry = base_obj + .entry("identity_overrides") + .or_insert_with(|| serde_json::Value::Array(Vec::new())); + + if let serde_json::Value::Array(arr) = entry { + arr.append(&mut new_overrides); + } +} + +/// Parse the `Link` response header for the next-page URL (RFC 5988). +/// +/// Returns an absolute URL, resolving relative targets against `api_url`. +fn parse_next_link(headers: &HeaderMap, api_url: &str) -> Option { + let base = Url::parse(api_url).ok()?; + + for header_value in headers.get_all(reqwest::header::LINK).iter() { + let raw = header_value.to_str().ok()?; + for segment in raw.split(',') { + let segment = segment.trim(); + let target = match (segment.find('<'), segment.find('>')) { + (Some(start), Some(end)) if end > start + 1 => &segment[start + 1..end], + _ => continue, + }; + let params = &segment[segment.find('>').unwrap() + 1..]; + if !is_next_rel(params) { + continue; + } + if let Ok(absolute) = base.join(target) { + return Some(absolute.into()); + } + } + } + None +} + +fn is_next_rel(params: &str) -> bool { + params.split(';').any(|param| { + let param = param.trim(); + let Some(value) = param.strip_prefix("rel") else { + return false; + }; + let value = value.trim_start(); + let Some(value) = value.strip_prefix('=') else { + return false; + }; + let value = value.trim().trim_matches(|c| c == '"' || c == '\''); + value + .split_ascii_whitespace() + .any(|rel| rel.eq_ignore_ascii_case("next")) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use reqwest::header::{HeaderMap, HeaderValue, LINK}; + + fn link(value: &str) -> HeaderMap { + let mut h = HeaderMap::new(); + h.insert(LINK, HeaderValue::from_str(value).unwrap()); + h + } + + #[test] + fn parse_next_link_relative() { + let headers = link( + "; rel=\"next\"", + ); + let next = parse_next_link(&headers, "https://edge.api.flagsmith.com/api/v1").unwrap(); + assert_eq!( + next, + "https://edge.api.flagsmith.com/api/v1/environment-document/?page_id=identity_override%3A1%3Aabc" + ); + } + + #[test] + fn parse_next_link_absolute() { + let headers = + link("; rel=\"next\""); + let next = parse_next_link(&headers, "https://edge.api.flagsmith.com/api/v1").unwrap(); + assert_eq!( + next, + "https://example.test/api/v1/environment-document/?page_id=x" + ); + } + + #[test] + fn parse_next_link_picks_next_among_multiple_rels() { + let headers = link("; rel=\"prev\", ; rel=\"next\""); + let next = parse_next_link(&headers, "https://edge.api.flagsmith.com/api/v1").unwrap(); + assert_eq!(next, "https://edge.api.flagsmith.com/api/v1/page/next"); + } + + #[test] + fn parse_next_link_returns_none_when_only_other_rels() { + let headers = link("; rel=\"prev\", ; rel=\"self\""); + assert!(parse_next_link(&headers, "https://edge.api.flagsmith.com/api/v1").is_none()); + } + + #[test] + fn parse_next_link_handles_unquoted_rel() { + let headers = link("; rel=next"); + let next = parse_next_link(&headers, "https://edge.api.flagsmith.com/api/v1").unwrap(); + assert_eq!(next, "https://edge.api.flagsmith.com/api/v1/page/next"); + } + + #[test] + fn merge_paginated_overrides_appends() { + let mut base = serde_json::json!({ + "identity_overrides": [{"identifier": "a"}], + "feature_states": [] + }); + let page = serde_json::json!({ + "identity_overrides": [{"identifier": "b"}, {"identifier": "c"}] + }); + merge_paginated_overrides(&mut base, page); + let arr = base["identity_overrides"].as_array().unwrap(); + assert_eq!(arr.len(), 3); + assert_eq!(arr[0]["identifier"], "a"); + assert_eq!(arr[1]["identifier"], "b"); + assert_eq!(arr[2]["identifier"], "c"); + } + + #[test] + fn merge_paginated_overrides_creates_array_when_missing() { + let mut base = serde_json::json!({"feature_states": []}); + let page = serde_json::json!({"identity_overrides": [{"identifier": "a"}]}); + merge_paginated_overrides(&mut base, page); + assert_eq!(base["identity_overrides"].as_array().unwrap().len(), 1); + } + + #[test] + fn merge_paginated_overrides_noop_when_page_has_none() { + let mut base = serde_json::json!({"identity_overrides": [{"identifier": "a"}]}); + let page = serde_json::json!({"identity_overrides": []}); + merge_paginated_overrides(&mut base, page); + assert_eq!(base["identity_overrides"].as_array().unwrap().len(), 1); + } +} diff --git a/tests/test_environment_pagination.rs b/tests/test_environment_pagination.rs new file mode 100644 index 0000000..8b68024 --- /dev/null +++ b/tests/test_environment_pagination.rs @@ -0,0 +1,279 @@ +use edge_proxy::config::settings::{AppSettings, EnvironmentKeyPair}; +use edge_proxy::services::EnvironmentService; +use serde_json::{Value, json}; +use std::time::Duration; +use tracing_test::traced_test; +use wiremock::matchers::{method, path, query_param}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +const SERVER_KEY: &str = "ser.test_pagination_key"; +const CLIENT_KEY: &str = "client_pagination_key"; + +fn settings_for(api_url: &str, poll_seconds: u64) -> AppSettings { + AppSettings { + environment_key_pairs: vec![EnvironmentKeyPair { + server_side_key: SERVER_KEY.to_string(), + client_side_key: CLIENT_KEY.to_string(), + }], + api_url: api_url.to_string(), + api_poll_frequency_seconds: poll_seconds, + api_poll_timeout_seconds: 5, + ..AppSettings::default() + } +} + +fn page_body(overrides: &[&str], updated_at: &str) -> Value { + json!({ + "id": 1, + "api_key": CLIENT_KEY, + "name": "Test", + "updated_at": updated_at, + "allow_client_traits": true, + "hide_sensitive_data": false, + "hide_disabled_flags": null, + "use_identity_composite_key_for_hashing": true, + "use_identity_overrides_in_local_eval": true, + "project": { + "id": 1, + "name": "project-1", + "hide_disabled_flags": false, + "segments": [], + "server_key_only_feature_ids": [], + "organisation": { + "id": 1, + "name": "org-1", + "feature_analytics": false, + "persist_trait_data": true, + "stop_serving_flags": false, + }, + }, + "feature_states": [ + { + "multivariate_feature_state_values": [], + "feature_state_value": "default", + "feature": {"id": 1, "name": "test_flag", "type": "STANDARD"}, + "enabled": false, + "featurestate_uuid": "fs-uuid-1", + } + ], + "identity_overrides": overrides + .iter() + .map(|ident| override_for(ident)) + .collect::>(), + }) +} + +fn override_for(identifier: &str) -> Value { + json!({ + "identifier": identifier, + "identity_uuid": format!("uuid-{}", identifier), + "created_date": "2026-05-01T00:00:00Z", + "environment_api_key": CLIENT_KEY, + "identity_features": [ + { + "django_id": null, + "feature": {"id": 1, "name": "test_flag", "type": "STANDARD"}, + "featurestate_uuid": format!("fsu-{}", identifier), + "feature_state_value": format!("value-{}", identifier), + "enabled": true, + } + ], + "identity_traits": [], + "composite_key": format!("{}_{}", CLIENT_KEY, identifier), + "django_id": null, + "dashboard_alias": null, + }) +} + +#[tokio::test] +async fn test_fetch_environment_follows_next_link_through_three_pages() { + // Given: three pages, first two carry rel="next" Link headers. + let mock = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/api/v1/environment-document/")) + .and(query_param_absent("page_id")) + .and(wiremock::matchers::header("X-Environment-Key", SERVER_KEY)) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(page_body(&["user-1", "user-2"], "2026-05-01T00:00:00Z")) + .insert_header( + "Link", + "; rel=\"next\"", + ), + ) + .expect(1) + .mount(&mock) + .await; + + Mock::given(method("GET")) + .and(path("/api/v1/environment-document/")) + .and(query_param("page_id", "identity_override:1:cursor-2")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(page_body(&["user-3"], "2026-05-01T00:00:00Z")) + .insert_header( + "Link", + "; rel=\"next\"", + ), + ) + .expect(1) + .mount(&mock) + .await; + + Mock::given(method("GET")) + .and(path("/api/v1/environment-document/")) + .and(query_param("page_id", "identity_override:1:cursor-3")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(page_body(&["user-4", "user-5"], "2026-05-01T00:00:00Z")), + ) + .expect(1) + .mount(&mock) + .await; + + let api_url = format!("{}/api/v1", mock.uri()); + let service = EnvironmentService::new(settings_for(&api_url, 60)); + + // When + assert!(service.refresh_environment_caches().await); + + // Then: every override across all three pages is in the cached document. + let document = service.cache.get_environment(CLIENT_KEY).await.unwrap(); + let overrides = document["identity_overrides"].as_array().unwrap(); + let identifiers: Vec<&str> = overrides + .iter() + .map(|o| o["identifier"].as_str().unwrap()) + .collect(); + assert_eq!( + identifiers, + vec!["user-1", "user-2", "user-3", "user-4", "user-5"] + ); + + // Page-1 fields are authoritative (project, feature_states, name). + assert_eq!(document["name"], "Test"); + assert_eq!(document["feature_states"].as_array().unwrap().len(), 1); +} + +#[tokio::test] +async fn test_fetch_environment_single_page_no_link_header() { + let mock = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/api/v1/environment-document/")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(page_body(&["only-user"], "2026-05-01T00:00:00Z")), + ) + .expect(1) // exactly one request — no follow-up + .mount(&mock) + .await; + + let api_url = format!("{}/api/v1", mock.uri()); + let service = EnvironmentService::new(settings_for(&api_url, 60)); + + assert!(service.refresh_environment_caches().await); + + let document = service.cache.get_environment(CLIENT_KEY).await.unwrap(); + let overrides = document["identity_overrides"].as_array().unwrap(); + assert_eq!(overrides.len(), 1); + assert_eq!(overrides[0]["identifier"], "only-user"); +} + +#[tokio::test] +async fn test_fetch_environment_resolves_absolute_next_link() { + let mock = MockServer::start().await; + let absolute_next = format!( + "{}/api/v1/environment-document/?page_id=identity_override%3A1%3Aabsolute", + mock.uri() + ); + + Mock::given(method("GET")) + .and(path("/api/v1/environment-document/")) + .and(query_param_absent("page_id")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(page_body(&["a"], "2026-05-01T00:00:00Z")) + .insert_header("Link", format!("<{}>; rel=\"next\"", absolute_next)), + ) + .expect(1) + .mount(&mock) + .await; + + Mock::given(method("GET")) + .and(path("/api/v1/environment-document/")) + .and(query_param("page_id", "identity_override:1:absolute")) + .respond_with( + ResponseTemplate::new(200).set_body_json(page_body(&["b"], "2026-05-01T00:00:00Z")), + ) + .expect(1) + .mount(&mock) + .await; + + let api_url = format!("{}/api/v1", mock.uri()); + let service = EnvironmentService::new(settings_for(&api_url, 60)); + + assert!(service.refresh_environment_caches().await); + let document = service.cache.get_environment(CLIENT_KEY).await.unwrap(); + let overrides = document["identity_overrides"].as_array().unwrap(); + assert_eq!(overrides.len(), 2); + assert_eq!(overrides[0]["identifier"], "a"); + assert_eq!(overrides[1]["identifier"], "b"); +} + +#[tokio::test] +#[traced_test] +async fn test_fetch_environment_warns_when_exceeds_poll_interval() { + let mock = MockServer::start().await; + + // Page 1 stalls long enough to push elapsed past poll_frequency_seconds=1 + // before iteration 2 begins; the warning fires at the top of iteration 2. + Mock::given(method("GET")) + .and(path("/api/v1/environment-document/")) + .and(query_param_absent("page_id")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(page_body(&["a"], "2026-05-01T00:00:00Z")) + .insert_header( + "Link", + "; rel=\"next\"", + ) + .set_delay(Duration::from_millis(1500)), + ) + .expect(1) + .mount(&mock) + .await; + + Mock::given(method("GET")) + .and(path("/api/v1/environment-document/")) + .and(query_param("page_id", "identity_override:1:slow")) + .respond_with( + ResponseTemplate::new(200).set_body_json(page_body(&["b"], "2026-05-01T00:00:00Z")), + ) + .expect(1) + .mount(&mock) + .await; + + let api_url = format!("{}/api/v1", mock.uri()); + let service = EnvironmentService::new(settings_for(&api_url, 1)); + + assert!(service.refresh_environment_caches().await); + + assert!(logs_contain( + "environment-document fetch exceeded the configured poll interval" + )); + assert!(logs_contain("elapsed_seconds")); + assert!(logs_contain("poll_frequency_seconds=1")); +} + +// ---- Test helpers below ------------------------------------------------------ + +fn query_param_absent(name: &'static str) -> impl wiremock::Match { + struct Absent(&'static str); + impl wiremock::Match for Absent { + fn matches(&self, req: &wiremock::Request) -> bool { + !req.url.query_pairs().any(|(k, _)| k.as_ref() == self.0) + } + } + Absent(name) +}