diff --git a/rsworkspace/crates/acp-nats/src/jetstream/provision.rs b/rsworkspace/crates/acp-nats/src/jetstream/provision.rs index 16152a134..e2f644942 100644 --- a/rsworkspace/crates/acp-nats/src/jetstream/provision.rs +++ b/rsworkspace/crates/acp-nats/src/jetstream/provision.rs @@ -34,10 +34,10 @@ mod tests { use trogon_nats::jetstream::MockJetStreamContext; #[tokio::test] - async fn provision_creates_five_streams() { + async fn provision_creates_six_streams() { let ctx = MockJetStreamContext::new(); provision_streams(&ctx, "acp").await.unwrap(); - assert_eq!(ctx.created_streams().len(), 5); + assert_eq!(ctx.created_streams().len(), 6); } #[tokio::test] @@ -54,6 +54,7 @@ mod tests { assert!(names.contains(&"ACP_CLIENT_OPS".to_string())); assert!(names.contains(&"ACP_NOTIFICATIONS".to_string())); assert!(names.contains(&"ACP_GLOBAL".to_string())); + assert!(names.contains(&"ACP_GLOBAL_EXT".to_string())); } #[tokio::test] @@ -82,6 +83,6 @@ mod tests { let ctx = MockJetStreamContext::new(); provision_streams(&ctx, "acp").await.unwrap(); provision_streams(&ctx, "acp").await.unwrap(); - assert_eq!(ctx.created_streams().len(), 10); + assert_eq!(ctx.created_streams().len(), 12); } } diff --git a/rsworkspace/crates/acp-nats/src/jetstream/streams.rs b/rsworkspace/crates/acp-nats/src/jetstream/streams.rs index 7b5a4b166..240d3f315 100644 --- a/rsworkspace/crates/acp-nats/src/jetstream/streams.rs +++ b/rsworkspace/crates/acp-nats/src/jetstream/streams.rs @@ -22,6 +22,10 @@ pub fn global_stream_name(prefix: &str) -> String { stream_name(prefix, "GLOBAL") } +pub fn global_ext_stream_name(prefix: &str) -> String { + stream_name(prefix, "GLOBAL_EXT") +} + pub fn commands_config(prefix: &str) -> Config { Config { name: stream_name(prefix, "COMMANDS"), @@ -92,7 +96,6 @@ pub fn global_config(prefix: &str) -> Config { format!("{prefix}.agent.initialize"), format!("{prefix}.agent.authenticate"), format!("{prefix}.agent.session.new"), - format!("{prefix}.agent.ext.>"), ], storage: StorageType::File, retention: RetentionPolicy::Limits, @@ -102,13 +105,26 @@ pub fn global_config(prefix: &str) -> Config { } } -pub fn all_configs(prefix: &str) -> [Config; 5] { +pub fn global_ext_config(prefix: &str) -> Config { + Config { + name: stream_name(prefix, "GLOBAL_EXT"), + subjects: vec![format!("{prefix}.agent.ext.>")], + storage: StorageType::File, + retention: RetentionPolicy::Limits, + max_age: DEFAULT_STREAM_MAX_AGE, + discard: DiscardPolicy::Old, + ..Default::default() + } +} + +pub fn all_configs(prefix: &str) -> [Config; 6] { [ commands_config(prefix), responses_config(prefix), client_ops_config(prefix), notifications_config(prefix), global_config(prefix), + global_ext_config(prefix), ] } @@ -123,6 +139,7 @@ mod tests { assert_eq!(client_ops_config("acp").name, "ACP_CLIENT_OPS"); assert_eq!(notifications_config("acp").name, "ACP_NOTIFICATIONS"); assert_eq!(global_config("acp").name, "ACP_GLOBAL"); + assert_eq!(global_ext_config("acp").name, "ACP_GLOBAL_EXT"); } #[test] @@ -197,6 +214,7 @@ mod tests { assert_eq!(client_ops_config("acp").storage, StorageType::File); assert_eq!(notifications_config("acp").storage, StorageType::File); assert_eq!(global_config("acp").storage, StorageType::File); + assert_eq!(global_ext_config("acp").storage, StorageType::File); } #[test] @@ -206,6 +224,7 @@ mod tests { assert_eq!(client_ops_config("acp").max_age, DEFAULT_STREAM_MAX_AGE); assert_eq!(notifications_config("acp").max_age, DEFAULT_STREAM_MAX_AGE); assert_eq!(global_config("acp").max_age, DEFAULT_STREAM_MAX_AGE); + assert_eq!(global_ext_config("acp").max_age, DEFAULT_STREAM_MAX_AGE); } #[test] @@ -264,11 +283,10 @@ mod tests { .subjects .contains(&"acp.agent.session.new".to_string()) ); - assert!(config.subjects.contains(&"acp.agent.ext.>".to_string())); } #[test] - fn global_excludes_session_list() { + fn global_excludes_session_list_and_ext() { let config = global_config("acp"); assert!( !config @@ -276,11 +294,24 @@ mod tests { .contains(&"acp.agent.session.list".to_string()) ); assert!(!config.subjects.contains(&"acp.agent.>".to_string())); + assert!(!config.subjects.contains(&"acp.agent.ext.>".to_string())); + } + + #[test] + fn global_ext_subjects() { + let config = global_ext_config("acp"); + assert_eq!(config.subjects, vec!["acp.agent.ext.>"]); + } + + #[test] + fn global_ext_stream_name_formats_correctly() { + assert_eq!(global_ext_stream_name("acp"), "ACP_GLOBAL_EXT"); + assert_eq!(global_ext_stream_name("myapp"), "MYAPP_GLOBAL_EXT"); } #[test] - fn all_configs_returns_five_streams() { - assert_eq!(all_configs("acp").len(), 5); + fn all_configs_returns_six_streams() { + assert_eq!(all_configs("acp").len(), 6); } #[test]