diff --git a/rsworkspace/crates/acp-nats/src/jetstream/provision.rs b/rsworkspace/crates/acp-nats/src/jetstream/provision.rs index 0af7a77b6..16152a134 100644 --- a/rsworkspace/crates/acp-nats/src/jetstream/provision.rs +++ b/rsworkspace/crates/acp-nats/src/jetstream/provision.rs @@ -34,10 +34,10 @@ mod tests { use trogon_nats::jetstream::MockJetStreamContext; #[tokio::test] - async fn provision_creates_four_streams() { + async fn provision_creates_five_streams() { let ctx = MockJetStreamContext::new(); provision_streams(&ctx, "acp").await.unwrap(); - assert_eq!(ctx.created_streams().len(), 4); + assert_eq!(ctx.created_streams().len(), 5); } #[tokio::test] @@ -53,6 +53,7 @@ mod tests { assert!(names.contains(&"ACP_RESPONSES".to_string())); assert!(names.contains(&"ACP_CLIENT_OPS".to_string())); assert!(names.contains(&"ACP_NOTIFICATIONS".to_string())); + assert!(names.contains(&"ACP_GLOBAL".to_string())); } #[tokio::test] @@ -81,6 +82,6 @@ mod tests { let ctx = MockJetStreamContext::new(); provision_streams(&ctx, "acp").await.unwrap(); provision_streams(&ctx, "acp").await.unwrap(); - assert_eq!(ctx.created_streams().len(), 8); + assert_eq!(ctx.created_streams().len(), 10); } } diff --git a/rsworkspace/crates/acp-nats/src/jetstream/streams.rs b/rsworkspace/crates/acp-nats/src/jetstream/streams.rs index 94b6bdc9b..7b5a4b166 100644 --- a/rsworkspace/crates/acp-nats/src/jetstream/streams.rs +++ b/rsworkspace/crates/acp-nats/src/jetstream/streams.rs @@ -18,6 +18,10 @@ pub fn commands_stream_name(prefix: &str) -> String { stream_name(prefix, "COMMANDS") } +pub fn global_stream_name(prefix: &str) -> String { + stream_name(prefix, "GLOBAL") +} + pub fn commands_config(prefix: &str) -> Config { Config { name: stream_name(prefix, "COMMANDS"), @@ -81,12 +85,30 @@ pub fn notifications_config(prefix: &str) -> Config { } } -pub fn all_configs(prefix: &str) -> [Config; 4] { +pub fn global_config(prefix: &str) -> Config { + Config { + name: stream_name(prefix, "GLOBAL"), + subjects: vec![ + format!("{prefix}.agent.initialize"), + format!("{prefix}.agent.authenticate"), + format!("{prefix}.agent.session.new"), + format!("{prefix}.agent.ext.>"), + ], + storage: StorageType::File, + retention: RetentionPolicy::Limits, + max_age: DEFAULT_STREAM_MAX_AGE, + discard: DiscardPolicy::Old, + ..Default::default() + } +} + +pub fn all_configs(prefix: &str) -> [Config; 5] { [ commands_config(prefix), responses_config(prefix), client_ops_config(prefix), notifications_config(prefix), + global_config(prefix), ] } @@ -100,6 +122,7 @@ mod tests { assert_eq!(responses_config("acp").name, "ACP_RESPONSES"); assert_eq!(client_ops_config("acp").name, "ACP_CLIENT_OPS"); assert_eq!(notifications_config("acp").name, "ACP_NOTIFICATIONS"); + assert_eq!(global_config("acp").name, "ACP_GLOBAL"); } #[test] @@ -173,6 +196,7 @@ mod tests { assert_eq!(responses_config("acp").storage, StorageType::File); assert_eq!(client_ops_config("acp").storage, StorageType::File); assert_eq!(notifications_config("acp").storage, StorageType::File); + assert_eq!(global_config("acp").storage, StorageType::File); } #[test] @@ -181,6 +205,7 @@ mod tests { assert_eq!(responses_config("acp").max_age, DEFAULT_STREAM_MAX_AGE); assert_eq!(client_ops_config("acp").max_age, DEFAULT_STREAM_MAX_AGE); assert_eq!(notifications_config("acp").max_age, DEFAULT_STREAM_MAX_AGE); + assert_eq!(global_config("acp").max_age, DEFAULT_STREAM_MAX_AGE); } #[test] @@ -216,8 +241,46 @@ mod tests { } #[test] - fn all_configs_returns_four_streams() { - assert_eq!(all_configs("acp").len(), 4); + fn global_stream_name_formats_correctly() { + assert_eq!(global_stream_name("acp"), "ACP_GLOBAL"); + assert_eq!(global_stream_name("myapp"), "MYAPP_GLOBAL"); + } + + #[test] + fn global_subjects_include_expected() { + let config = global_config("acp"); + assert!( + config + .subjects + .contains(&"acp.agent.initialize".to_string()) + ); + assert!( + config + .subjects + .contains(&"acp.agent.authenticate".to_string()) + ); + assert!( + config + .subjects + .contains(&"acp.agent.session.new".to_string()) + ); + assert!(config.subjects.contains(&"acp.agent.ext.>".to_string())); + } + + #[test] + fn global_excludes_session_list() { + let config = global_config("acp"); + assert!( + !config + .subjects + .contains(&"acp.agent.session.list".to_string()) + ); + assert!(!config.subjects.contains(&"acp.agent.>".to_string())); + } + + #[test] + fn all_configs_returns_five_streams() { + assert_eq!(all_configs("acp").len(), 5); } #[test]