@@ -25,7 +25,7 @@ use uuid::Uuid;
2525
2626use codegraph_core:: { CodeNode , NodeId , Result } ;
2727use codegraph_vector:: ml as vml;
28- use codegraph_vector:: { EmbeddingGenerator } ;
28+ use codegraph_vector:: EmbeddingGenerator ;
2929
3030/// Versioned model metadata
3131#[ derive( Debug , Clone , serde:: Serialize , serde:: Deserialize ) ]
@@ -46,10 +46,18 @@ pub struct ModelRegistry {
4646
4747impl ModelRegistry {
4848 pub fn new < P : Into < PathBuf > > ( root : P ) -> Self {
49- Self { root : root. into ( ) , index : PLRwLock :: new ( HashMap :: new ( ) ) }
49+ Self {
50+ root : root. into ( ) ,
51+ index : PLRwLock :: new ( HashMap :: new ( ) ) ,
52+ }
5053 }
5154
52- pub async fn register ( & self , model_name : & str , version : & str , metrics : HashMap < String , f32 > ) -> Result < ModelVersionMeta > {
55+ pub async fn register (
56+ & self ,
57+ model_name : & str ,
58+ version : & str ,
59+ metrics : HashMap < String , f32 > ,
60+ ) -> Result < ModelVersionMeta > {
5361 let dir = self . root . join ( model_name) . join ( version) ;
5462 tokio:: fs:: create_dir_all ( & dir) . await . ok ( ) ;
5563
@@ -60,7 +68,9 @@ impl ModelRegistry {
6068 metrics,
6169 path : dir. clone ( ) ,
6270 } ;
63- self . index . write ( ) . entry ( model_name. to_string ( ) )
71+ self . index
72+ . write ( )
73+ . entry ( model_name. to_string ( ) )
6474 . or_default ( )
6575 . insert ( version. to_string ( ) , meta. clone ( ) ) ;
6676
@@ -79,7 +89,11 @@ impl ModelRegistry {
7989 }
8090
8191 pub fn get ( & self , model_name : & str , version : & str ) -> Option < ModelVersionMeta > {
82- self . index . read ( ) . get ( model_name) . and_then ( |m| m. get ( version) ) . cloned ( )
92+ self . index
93+ . read ( )
94+ . get ( model_name)
95+ . and_then ( |m| m. get ( version) )
96+ . cloned ( )
8397 }
8498}
8599
@@ -91,11 +105,17 @@ pub struct HotSwapModel {
91105
92106impl HotSwapModel {
93107 pub fn new < S : Into < String > > ( name : S , initial_version : S ) -> Self {
94- Self { active_name : name. into ( ) , active_version : ArcSwap :: from_pointee ( initial_version. into ( ) ) }
108+ Self {
109+ active_name : name. into ( ) ,
110+ active_version : ArcSwap :: from_pointee ( initial_version. into ( ) ) ,
111+ }
95112 }
96113
97114 pub fn active ( & self ) -> ( String , String ) {
98- ( self . active_name . clone ( ) , ( * self . active_version . load ( ) ) . clone ( ) )
115+ (
116+ self . active_name . clone ( ) ,
117+ ( * self . active_version . load ( ) ) . clone ( ) ,
118+ )
99119 }
100120
101121 pub fn swap_version < S : Into < String > > ( & self , new_version : S ) {
@@ -128,7 +148,12 @@ impl Default for AiPipelineBuilder {
128148 hyperparameters : vml:: TrainingHyperparameters :: default ( ) ,
129149 data_config : vml:: DataConfig :: default ( ) ,
130150 validation_config : vml:: ValidationConfig :: default ( ) ,
131- output_config : vml:: OutputConfig { model_path : "models" . into ( ) , save_checkpoints : true , checkpoint_frequency : 10 , export_for_inference : true }
151+ output_config : vml:: OutputConfig {
152+ model_path : "models" . into ( ) ,
153+ save_checkpoints : true ,
154+ checkpoint_frequency : 10 ,
155+ export_for_inference : true ,
156+ } ,
132157 } ,
133158 inference : vml:: InferenceConfig :: default ( ) ,
134159 registry_root : PathBuf :: from ( "models" ) ,
@@ -139,14 +164,34 @@ impl Default for AiPipelineBuilder {
139164}
140165
141166impl AiPipelineBuilder {
142- pub fn new ( ) -> Self { Self :: default ( ) }
167+ pub fn new ( ) -> Self {
168+ Self :: default ( )
169+ }
143170
144- pub fn feature_config ( mut self , cfg : vml:: FeatureConfig ) -> Self { self . feature = cfg; self }
145- pub fn training_config ( mut self , cfg : vml:: TrainingConfig ) -> Self { self . training = cfg; self }
146- pub fn inference_config ( mut self , cfg : vml:: InferenceConfig ) -> Self { self . inference = cfg; self }
147- pub fn registry_root < P : Into < PathBuf > > ( mut self , root : P ) -> Self { self . registry_root = root. into ( ) ; self }
148- pub fn model_name < S : Into < String > > ( mut self , name : S ) -> Self { self . model_name = name. into ( ) ; self }
149- pub fn initial_version < S : Into < String > > ( mut self , v : S ) -> Self { self . initial_version = v. into ( ) ; self }
171+ pub fn feature_config ( mut self , cfg : vml:: FeatureConfig ) -> Self {
172+ self . feature = cfg;
173+ self
174+ }
175+ pub fn training_config ( mut self , cfg : vml:: TrainingConfig ) -> Self {
176+ self . training = cfg;
177+ self
178+ }
179+ pub fn inference_config ( mut self , cfg : vml:: InferenceConfig ) -> Self {
180+ self . inference = cfg;
181+ self
182+ }
183+ pub fn registry_root < P : Into < PathBuf > > ( mut self , root : P ) -> Self {
184+ self . registry_root = root. into ( ) ;
185+ self
186+ }
187+ pub fn model_name < S : Into < String > > ( mut self , name : S ) -> Self {
188+ self . model_name = name. into ( ) ;
189+ self
190+ }
191+ pub fn initial_version < S : Into < String > > ( mut self , v : S ) -> Self {
192+ self . initial_version = v. into ( ) ;
193+ self
194+ }
150195
151196 pub fn build ( self ) -> Result < AiPipeline > {
152197 let embedding_generator = Arc :: new ( EmbeddingGenerator :: default ( ) ) ;
@@ -161,51 +206,120 @@ impl AiPipelineBuilder {
161206 let registry = Arc :: new ( ModelRegistry :: new ( & self . registry_root ) ) ;
162207 let active = Arc :: new ( HotSwapModel :: new ( & self . model_name , & self . initial_version ) ) ;
163208
164- Ok ( AiPipeline { inner, registry, active } )
209+ Ok ( AiPipeline {
210+ inner,
211+ registry,
212+ active,
213+ } )
165214 }
166215}
167216
168217impl AiPipeline {
169- pub fn builder ( ) -> AiPipelineBuilder { AiPipelineBuilder :: new ( ) }
218+ pub fn builder ( ) -> AiPipelineBuilder {
219+ AiPipelineBuilder :: new ( )
220+ }
170221
171222 /// Initialize the inner pipeline
172- pub async fn initialize ( & self ) -> Result < ( ) > { self . inner . initialize ( ) . await }
223+ pub async fn initialize ( & self ) -> Result < ( ) > {
224+ self . inner . initialize ( ) . await
225+ }
173226
174227 /// Train and register a versioned model, then hot-swap as active if requested.
175- pub async fn train_and_deploy ( & self , dataset : & str , nodes : & [ CodeNode ] , targets : Vec < vml:: TrainingTarget > , version : & str , set_active : bool ) -> Result < vml:: TrainingResults > {
176- let results = self . inner . train_model ( dataset, nodes, targets, & self . active_model_name ( ) ) . await ?;
228+ pub async fn train_and_deploy (
229+ & self ,
230+ dataset : & str ,
231+ nodes : & [ CodeNode ] ,
232+ targets : Vec < vml:: TrainingTarget > ,
233+ version : & str ,
234+ set_active : bool ,
235+ ) -> Result < vml:: TrainingResults > {
236+ let results = self
237+ . inner
238+ . train_model ( dataset, nodes, targets, & self . active_model_name ( ) )
239+ . await ?;
177240
178241 // Register version
179- let meta = self . registry . register (
180- & self . active_model_name ( ) ,
181- version,
182- results. validation_metrics . clone ( ) ,
183- ) . await ?;
242+ let meta = self
243+ . registry
244+ . register (
245+ & self . active_model_name ( ) ,
246+ version,
247+ results. validation_metrics . clone ( ) ,
248+ )
249+ . await ?;
184250
185251 // Save model artifact
186252 let path = meta. path . join ( "model.json" ) ;
187- let _ = self . inner . save_model ( & self . active_model_name ( ) , & path) . await ;
253+ let _ = self
254+ . inner
255+ . save_model ( & self . active_model_name ( ) , & path)
256+ . await ;
188257
189258 // Hot swap
190- if set_active { self . active . swap_version ( version. to_string ( ) ) ; }
259+ if set_active {
260+ self . active . swap_version ( version. to_string ( ) ) ;
261+ }
191262 Ok ( results)
192263 }
193264
194265 /// Start an A/B test between two versions.
195- pub async fn start_ab_test ( & self , experiment : & str , version_a : & str , version_b : & str , duration : Duration ) -> Result < String > {
266+ pub async fn start_ab_test (
267+ & self ,
268+ experiment : & str ,
269+ version_a : & str ,
270+ version_b : & str ,
271+ duration : Duration ,
272+ ) -> Result < String > {
196273 // Ensure both versions exist
197- if self . registry . get ( & self . active_model_name ( ) , version_a) . is_none ( ) || self . registry . get ( & self . active_model_name ( ) , version_b) . is_none ( ) {
198- return Err ( codegraph_core:: CodeGraphError :: Training ( "Model versions not found for A/B test" . into ( ) ) ) ;
274+ if self
275+ . registry
276+ . get ( & self . active_model_name ( ) , version_a)
277+ . is_none ( )
278+ || self
279+ . registry
280+ . get ( & self . active_model_name ( ) , version_b)
281+ . is_none ( )
282+ {
283+ return Err ( codegraph_core:: CodeGraphError :: Training (
284+ "Model versions not found for A/B test" . into ( ) ,
285+ ) ) ;
199286 }
200287 let mut alloc = HashMap :: new ( ) ;
201288 alloc. insert ( "A" . to_string ( ) , 0.5 ) ;
202289 alloc. insert ( "B" . to_string ( ) , 0.5 ) ;
203- let traffic = vml:: TrafficAllocation { allocations : alloc, strategy : vml:: AllocationStrategy :: WeightedRandom , sticky_sessions : true } ;
290+ let traffic = vml:: TrafficAllocation {
291+ allocations : alloc,
292+ strategy : vml:: AllocationStrategy :: WeightedRandom ,
293+ sticky_sessions : true ,
294+ } ;
204295 let stats = vml:: StatisticalConfig :: default ( ) ;
205- let metrics = vec ! [ vml:: ExperimentMetric :: Accuracy , vml:: ExperimentMetric :: Latency , vml:: ExperimentMetric :: Throughput ] ;
206- let early = vml:: EarlyStoppingConfig { enabled : true , check_interval : Duration :: from_secs ( 60 ) , min_samples : 500 , futility_boundary : 0.01 , efficacy_boundary : 0.01 } ;
207- let sample = vml:: SampleSizeConfig { min_sample_size : 1000 , max_sample_size : 100_000 , early_stopping : early, calculation_method : vml:: SampleSizeMethod :: Sequential } ;
208- let cfg = vml:: ABTestConfig { name : experiment. into ( ) , description : "Model A/B comparison" . into ( ) , traffic_allocation : traffic, duration, statistical_config : stats, metrics, sample_size : sample } ;
296+ let metrics = vec ! [
297+ vml:: ExperimentMetric :: Accuracy ,
298+ vml:: ExperimentMetric :: Latency ,
299+ vml:: ExperimentMetric :: Throughput ,
300+ ] ;
301+ let early = vml:: EarlyStoppingConfig {
302+ enabled : true ,
303+ check_interval : Duration :: from_secs ( 60 ) ,
304+ min_samples : 500 ,
305+ futility_boundary : 0.01 ,
306+ efficacy_boundary : 0.01 ,
307+ } ;
308+ let sample = vml:: SampleSizeConfig {
309+ min_sample_size : 1000 ,
310+ max_sample_size : 100_000 ,
311+ early_stopping : early,
312+ calculation_method : vml:: SampleSizeMethod :: Sequential ,
313+ } ;
314+ let cfg = vml:: ABTestConfig {
315+ name : experiment. into ( ) ,
316+ description : "Model A/B comparison" . into ( ) ,
317+ traffic_allocation : traffic,
318+ duration,
319+ statistical_config : stats,
320+ metrics,
321+ sample_size : sample,
322+ } ;
209323 let id = self . inner . start_ab_test ( cfg) . await ?;
210324 Ok ( id)
211325 }
@@ -217,30 +331,44 @@ impl AiPipeline {
217331 }
218332
219333 /// High-throughput batch feature extraction (concurrent), returns features in input order.
220- pub async fn extract_features_batch_fast ( & self , nodes : & [ CodeNode ] ) -> Result < Vec < vml:: CodeFeatures > > {
334+ pub async fn extract_features_batch_fast (
335+ & self ,
336+ nodes : & [ CodeNode ] ,
337+ ) -> Result < Vec < vml:: CodeFeatures > > {
221338 // Use the inner feature extractor via pipeline call; shard across tasks for concurrency
222339 let chunk = std:: cmp:: max ( 64 , nodes. len ( ) / std:: cmp:: max ( 1 , num_cpus:: get ( ) ) ) ;
223340 let mut tasks = Vec :: new ( ) ;
224341 for chunk_nodes in nodes. chunks ( chunk) {
225342 let part = chunk_nodes. to_vec ( ) ;
226343 let inner = self . inner . clone ( ) ;
227- tasks. push ( tokio:: spawn ( async move { inner. extract_features_batch ( & part) . await } ) ) ;
344+ tasks. push ( tokio:: spawn ( async move {
345+ inner. extract_features_batch ( & part) . await
346+ } ) ) ;
228347 }
229348 let mut out = Vec :: with_capacity ( nodes. len ( ) ) ;
230- for t in tasks { out. extend ( t. await . unwrap ( ) ?) ; }
349+ for t in tasks {
350+ out. extend ( t. await . unwrap ( ) ?) ;
351+ }
231352 Ok ( out)
232353 }
233354
234355 /// Active model name and version tuple
235- pub fn active ( & self ) -> ( String , String ) { self . active . active ( ) }
236- pub fn active_model_name ( & self ) -> String { self . active . active ( ) . 0 }
356+ pub fn active ( & self ) -> ( String , String ) {
357+ self . active . active ( )
358+ }
359+ pub fn active_model_name ( & self ) -> String {
360+ self . active . active ( ) . 0
361+ }
237362
238363 /// Zero-downtime deploy a new version: warm-up then hot-swap
239364 pub async fn deploy_version ( & self , version : & str , warmup_samples : & [ CodeNode ] ) -> Result < ( ) > {
240365 // Load model artifact if needed (inner keeps in-memory models; ensure present)
241366 if let Some ( meta) = self . registry . get ( & self . active_model_name ( ) , version) {
242367 let path = meta. path . join ( "model.json" ) ;
243- let _ = self . inner . load_model ( & self . active_model_name ( ) , & path) . await ; // best-effort
368+ let _ = self
369+ . inner
370+ . load_model ( & self . active_model_name ( ) , & path)
371+ . await ; // best-effort
244372 }
245373
246374 // Warm-up inference to prime caches and JIT paths
@@ -254,11 +382,17 @@ impl AiPipeline {
254382 }
255383
256384 /// Expose inner metrics for monitoring SLA (latency, throughput, cache hit rate)
257- pub async fn metrics ( & self ) -> vml:: InferenceMetrics { self . inner . get_inference_metrics ( ) . await }
385+ pub async fn metrics ( & self ) -> vml:: InferenceMetrics {
386+ self . inner . get_inference_metrics ( ) . await
387+ }
258388
259389 /// Proxy helpers to inner pipeline for convenience
260- pub async fn save_config ( & self , path : & Path ) -> Result < ( ) > { self . inner . save_config ( path) . await }
261- pub async fn load_config ( & self , path : & Path ) -> Result < ( ) > { self . inner . load_config ( path) . await }
390+ pub async fn save_config ( & self , path : & Path ) -> Result < ( ) > {
391+ self . inner . save_config ( path) . await
392+ }
393+ pub async fn load_config ( & self , path : & Path ) -> Result < ( ) > {
394+ self . inner . load_config ( path) . await
395+ }
262396}
263397
264398// Lightweight proxy methods on inner MLPipeline (implement Clone by arc-wrapping inside inner)
@@ -267,7 +401,8 @@ trait CloneablePipeline {
267401}
268402
269403impl CloneablePipeline for vml:: MLPipeline {
270- fn clone ( & self ) -> Self { // safe shallow rebuild via saved config and shared internals
404+ fn clone ( & self ) -> Self {
405+ // safe shallow rebuild via saved config and shared internals
271406 // Use builder + current config snapshot
272407 // Read-only operations in `build` path; acceptable for proxy clone
273408 let cfg = futures:: executor:: block_on ( async { self . get_context ( ) . await . config . clone ( ) } ) ;
@@ -291,7 +426,14 @@ mod tests {
291426 let p = AiPipeline :: builder ( ) . build ( ) . unwrap ( ) ;
292427 p. initialize ( ) . await . unwrap ( ) ;
293428
294- let node = CodeNode { id : "n1" . into ( ) , name : "foo" . into ( ) , language : Some ( Language :: Rust ) , node_type : Some ( NodeType :: Function ) , content : Some ( "fn foo() { 1 }" . into ( ) ) , children : None } ;
429+ let node = CodeNode {
430+ id : "n1" . into ( ) ,
431+ name : "foo" . into ( ) ,
432+ language : Some ( Language :: Rust ) ,
433+ node_type : Some ( NodeType :: Function ) ,
434+ content : Some ( "fn foo() { 1 }" . into ( ) ) ,
435+ children : None ,
436+ } ;
295437 let _ = p. infer ( & node) . await . unwrap ( ) ;
296438 }
297439}
0 commit comments