Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ flagsmith-flag-engine = "0.6"
axum-test = "18"
mockall = "0.14"
tokio-test = "0.4"
tracing-test = { version = "0.2", features = ["no-env-filter"] }
wiremock = "0.6"

[profile.release]
Expand Down
277 changes: 242 additions & 35 deletions src/services/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use chrono::{DateTime, Utc};
use flagsmith_flag_engine::engine::get_evaluation_result;
use flagsmith_flag_engine::engine_eval::{FlagResult, add_identity_to_context};
use flagsmith_flag_engine::identities::Trait as FlagsmithTrait;
use reqwest::Client;
use reqwest::header::HeaderMap;
use reqwest::{Client, Url};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

pub struct EnvironmentService {
pub cache: Arc<dyn EnvironmentsCache>,
Expand Down Expand Up @@ -111,45 +112,82 @@ impl EnvironmentService {
}

async fn fetch_environment(&self, pair: &EnvironmentKeyPair) -> Result<serde_json::Value> {
let url = format!("{}/environment-document/", self.settings.api_url);

let if_modified_since =
if let Some(cached_doc) = self.cache.get_environment(&pair.client_side_key).await {
cached_doc
.get("updated_at")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.to_rfc2822())
} else {
None
};
let if_modified_since = self
.cache
.get_environment(&pair.client_side_key)
.await
.as_deref()
.and_then(compute_if_modified_since);

let mut request = self
.client
.get(&url)
.header("X-Environment-Key", &pair.server_side_key);
let mut next_url = format!("{}/environment-document/", self.settings.api_url);
let mut document: Option<serde_json::Value> = None;
let started_at = Instant::now();
let mut warned_slow = false;

if let Some(if_modified_since_value) = if_modified_since {
request = request.header("If-Modified-Since", if_modified_since_value);
}
loop {
self.warn_if_slow(started_at, &mut warned_slow);

let mut request = self
.client
.get(&next_url)
.header("X-Environment-Key", &pair.server_side_key);
// If-Modified-Since is meaningful only on the first request; the
// upstream pagination cursor (page_id) drives subsequent fetches.
if document.is_none() {
if let Some(ref value) = if_modified_since {
request = request.header("If-Modified-Since", value);
}
}

let response = request.send().await?;
let response = request.send().await?;

if document.is_none() && response.status() == reqwest::StatusCode::NOT_MODIFIED {
return self
.cache
.get_environment(&pair.client_side_key)
.await
.map(|arc| (*arc).clone())
.ok_or_else(|| {
EdgeProxyError::ServiceUnavailable("Cache inconsistency".to_string())
});
}

if response.status() == reqwest::StatusCode::NOT_MODIFIED {
return self
.cache
.get_environment(&pair.client_side_key)
.await
.map(|arc| (*arc).clone())
.ok_or_else(|| {
EdgeProxyError::ServiceUnavailable("Cache inconsistency".to_string())
});
response.error_for_status_ref()?;

let next_link = parse_next_link(response.headers(), &self.settings.api_url);
let body: serde_json::Value = response.json().await?;

match document.as_mut() {
None => document = Some(body),
Some(base) => merge_paginated_overrides(base, body),
}

match next_link {
Some(url) => next_url = url,
None => break,
}
}

response.error_for_status_ref()?;
document.ok_or_else(|| {
EdgeProxyError::ServiceUnavailable("environment-document returned no pages".to_string())
})
}

let document: serde_json::Value = response.json().await?;
Ok(document)
fn warn_if_slow(&self, started_at: Instant, warned: &mut bool) {
if *warned {
return;
}
let poll_interval = Duration::from_secs(self.settings.api_poll_frequency_seconds);
let elapsed = started_at.elapsed();
if elapsed > poll_interval {
warn!(
elapsed_seconds = elapsed.as_secs_f64(),
poll_frequency_seconds = self.settings.api_poll_frequency_seconds,
"environment-document fetch exceeded the configured poll interval; \
raise api_poll_frequency_seconds or trim the environment"
);
*warned = true;
}
}

pub async fn get_environment(&self, environment_key: &str) -> Result<Arc<serde_json::Value>> {
Expand Down Expand Up @@ -396,3 +434,172 @@ impl EnvironmentService {
}
}
}

/// Format the cached document's `updated_at` as an RFC 2822 `If-Modified-Since`
/// value. Returns `None` if the field is missing or unparseable.
fn compute_if_modified_since(cached_doc: &serde_json::Value) -> Option<String> {
cached_doc
.get("updated_at")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.to_rfc2822())
}

/// Append `identity_overrides` from a follow-up page onto the page-1 base.
///
/// Subsequent pages echo back the same `project`, `feature_states`, etc.,
/// so merging is purely an extend of the override array.
fn merge_paginated_overrides(base: &mut serde_json::Value, page: serde_json::Value) {
let serde_json::Value::Object(mut page_obj) = page else {
return;
};
let Some(serde_json::Value::Array(mut new_overrides)) = page_obj.remove("identity_overrides")
else {
return;
};
if new_overrides.is_empty() {
return;
}

let Some(base_obj) = base.as_object_mut() else {
return;
};
let entry = base_obj
.entry("identity_overrides")
.or_insert_with(|| serde_json::Value::Array(Vec::new()));

if let serde_json::Value::Array(arr) = entry {
arr.append(&mut new_overrides);
}
}

/// Parse the `Link` response header for the next-page URL (RFC 5988).
///
/// Returns an absolute URL, resolving relative targets against `api_url`.
fn parse_next_link(headers: &HeaderMap, api_url: &str) -> Option<String> {
let base = Url::parse(api_url).ok()?;

for header_value in headers.get_all(reqwest::header::LINK).iter() {
let raw = header_value.to_str().ok()?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: maybe over protective but is it worth to continue instead of ok? so an unrelated malformed link doesn't block the whole operation?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about this one a bit more. The scenario where it'd actually fire is: an upstream emits a Link header value that isn't visible-ASCII (e.g. a 0x80–0xFF byte), and also emits a separate valid rel="next" Link entry that we'd miss because the bad one short-circuits the parse.

RFC 5988 says Link values must be ASCII, and Flagsmith's Edge API today emits one clean Link header per response — single value, no CDN in front rewriting headers. For this corner to bite, you'd need either an upstream bug or a downstream CDN injecting a malformed Link entry before the upstream's clean one. Neither is something we've seen, and Node behaves the same way (its regex match returns null on the same input class — silently treats it as the last page).

Going to leave as ? for now and revisit if we ever do see it in the wild — adding the continue would be one line, but the failure mode it guards against is hypothetical enough that I'd rather not encode "why is this continue?" into the code without a concrete scenario to point at. Happy to reopen if you'd rather have it preemptively though.

for segment in raw.split(',') {
let segment = segment.trim();
let target = match (segment.find('<'), segment.find('>')) {
(Some(start), Some(end)) if end > start + 1 => &segment[start + 1..end],
_ => continue,
};
let params = &segment[segment.find('>').unwrap() + 1..];
if !is_next_rel(params) {
continue;
}
if let Ok(absolute) = base.join(target) {
return Some(absolute.into());
}
}
}
None
}

fn is_next_rel(params: &str) -> bool {
params.split(';').any(|param| {
let param = param.trim();
let Some(value) = param.strip_prefix("rel") else {
return false;
};
let value = value.trim_start();
let Some(value) = value.strip_prefix('=') else {
return false;
};
let value = value.trim().trim_matches(|c| c == '"' || c == '\'');
value
.split_ascii_whitespace()
.any(|rel| rel.eq_ignore_ascii_case("next"))
})
}

#[cfg(test)]
mod tests {
use super::*;
use reqwest::header::{HeaderMap, HeaderValue, LINK};

fn link(value: &str) -> HeaderMap {
let mut h = HeaderMap::new();
h.insert(LINK, HeaderValue::from_str(value).unwrap());
h
}

#[test]
fn parse_next_link_relative() {
let headers = link(
"</api/v1/environment-document/?page_id=identity_override%3A1%3Aabc>; rel=\"next\"",
);
let next = parse_next_link(&headers, "https://edge.api.flagsmith.com/api/v1").unwrap();
assert_eq!(
next,
"https://edge.api.flagsmith.com/api/v1/environment-document/?page_id=identity_override%3A1%3Aabc"
);
}

#[test]
fn parse_next_link_absolute() {
let headers =
link("<https://example.test/api/v1/environment-document/?page_id=x>; rel=\"next\"");
let next = parse_next_link(&headers, "https://edge.api.flagsmith.com/api/v1").unwrap();
assert_eq!(
next,
"https://example.test/api/v1/environment-document/?page_id=x"
);
}

#[test]
fn parse_next_link_picks_next_among_multiple_rels() {
let headers = link("</api/v1/page/prev>; rel=\"prev\", </api/v1/page/next>; rel=\"next\"");
let next = parse_next_link(&headers, "https://edge.api.flagsmith.com/api/v1").unwrap();
assert_eq!(next, "https://edge.api.flagsmith.com/api/v1/page/next");
}

#[test]
fn parse_next_link_returns_none_when_only_other_rels() {
let headers = link("</prev>; rel=\"prev\", </self>; rel=\"self\"");
assert!(parse_next_link(&headers, "https://edge.api.flagsmith.com/api/v1").is_none());
}

#[test]
fn parse_next_link_handles_unquoted_rel() {
let headers = link("</api/v1/page/next>; rel=next");
let next = parse_next_link(&headers, "https://edge.api.flagsmith.com/api/v1").unwrap();
assert_eq!(next, "https://edge.api.flagsmith.com/api/v1/page/next");
}

#[test]
fn merge_paginated_overrides_appends() {
let mut base = serde_json::json!({
"identity_overrides": [{"identifier": "a"}],
"feature_states": []
});
let page = serde_json::json!({
"identity_overrides": [{"identifier": "b"}, {"identifier": "c"}]
});
merge_paginated_overrides(&mut base, page);
let arr = base["identity_overrides"].as_array().unwrap();
assert_eq!(arr.len(), 3);
assert_eq!(arr[0]["identifier"], "a");
assert_eq!(arr[1]["identifier"], "b");
assert_eq!(arr[2]["identifier"], "c");
}

#[test]
fn merge_paginated_overrides_creates_array_when_missing() {
let mut base = serde_json::json!({"feature_states": []});
let page = serde_json::json!({"identity_overrides": [{"identifier": "a"}]});
merge_paginated_overrides(&mut base, page);
assert_eq!(base["identity_overrides"].as_array().unwrap().len(), 1);
}

#[test]
fn merge_paginated_overrides_noop_when_page_has_none() {
let mut base = serde_json::json!({"identity_overrides": [{"identifier": "a"}]});
let page = serde_json::json!({"identity_overrides": []});
merge_paginated_overrides(&mut base, page);
assert_eq!(base["identity_overrides"].as_array().unwrap().len(), 1);
}
}
Loading
Loading