@@ -3,6 +3,7 @@ mod record_batch;
33
44use std:: {
55 pin:: Pin ,
6+ sync:: Arc ,
67 task:: { self , Poll } ,
78} ;
89
@@ -12,7 +13,10 @@ use futures03::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
1213use slog:: { debug, info, Logger } ;
1314
1415use self :: record_batch:: Buffer ;
15- use crate :: amp:: { client:: ResponseBatch , error:: IsDeterministic , log:: Logger as _} ;
16+ use crate :: {
17+ amp:: { client:: ResponseBatch , error:: IsDeterministic , log:: Logger as _} ,
18+ cheap_clone:: CheapClone ,
19+ } ;
1620
1721pub use self :: {
1822 error:: Error ,
@@ -38,54 +42,72 @@ pub use self::{
3842/// To ensure data consistency and ordered output, the aggregator waits for slower streams
3943/// to catch up with faster streams. The output stream speed matches the slowest input stream.
4044pub struct StreamAggregator {
41- streams : Vec < BoxStream < ' static , Result < RecordBatch , Error > > > ,
45+ named_streams : Vec < ( Arc < str > , BoxStream < ' static , Result < RecordBatch , Error > > ) > ,
4246 buffer : Buffer ,
4347 logger : Logger ,
48+
49+ /// Indicates whether all streams are fully consumed.
4450 is_finalized : bool ,
51+
52+ /// Indicates whether any stream has produced an error.
53+ ///
54+ /// When `true`, the stream aggregator stops polling all other streams.
4555 is_failed : bool ,
4656}
4757
4858impl StreamAggregator {
4959 /// Creates a new stream aggregator from the `streams` with a bounded buffer.
5060 pub fn new < E > (
5161 logger : & Logger ,
52- streams : impl IntoIterator < Item = BoxStream < ' static , Result < ResponseBatch , E > > > ,
62+ named_streams : impl IntoIterator < Item = ( String , BoxStream < ' static , Result < ResponseBatch , E > > ) > ,
5363 max_buffer_size : usize ,
5464 ) -> Self
5565 where
5666 E : std:: error:: Error + IsDeterministic + Send + Sync + ' static ,
5767 {
5868 let logger = logger. component ( "AmpStreamAggregator" ) ;
5969
60- let streams = streams
70+ let named_streams = named_streams
6171 . into_iter ( )
62- . enumerate ( )
63- . map ( |( stream_index, stream) | {
64- stream
65- . map_err ( move |e| Error :: stream ( stream_index, e) )
66- . try_filter_map ( move |response_batch| async move {
67- match response_batch {
68- ResponseBatch :: Batch { data } => Ok ( Some ( data) ) ,
69- ResponseBatch :: Reorg ( _) => Err ( Error :: Stream {
70- stream_index,
71- source : anyhow ! ( "chain reorg" ) ,
72- is_deterministic : false ,
73- } ) ,
74- }
75- } )
76- . boxed ( )
72+ . map ( |( stream_name, stream) | {
73+ let stream_name: Arc < str > = stream_name. into ( ) ;
74+ (
75+ stream_name. cheap_clone ( ) ,
76+ stream
77+ . map_err ( {
78+ let stream_name = stream_name. cheap_clone ( ) ;
79+ move |e| Error :: stream ( stream_name. cheap_clone ( ) , e)
80+ } )
81+ . try_filter_map ( {
82+ let stream_name = stream_name. cheap_clone ( ) ;
83+ move |response_batch| {
84+ let stream_name = stream_name. cheap_clone ( ) ;
85+ async move {
86+ match response_batch {
87+ ResponseBatch :: Batch { data } => Ok ( Some ( data) ) ,
88+ ResponseBatch :: Reorg ( _) => Err ( Error :: Stream {
89+ stream_name : stream_name. cheap_clone ( ) ,
90+ source : anyhow ! ( "chain reorg" ) ,
91+ is_deterministic : false ,
92+ } ) ,
93+ }
94+ }
95+ }
96+ } )
97+ . boxed ( ) ,
98+ )
7799 } )
78100 . collect :: < Vec < _ > > ( ) ;
79101
80- let num_streams = streams . len ( ) ;
102+ let num_streams = named_streams . len ( ) ;
81103
82104 info ! ( logger, "Initializing stream aggregator" ;
83105 "num_streams" => num_streams,
84106 "max_buffer_size" => max_buffer_size
85107 ) ;
86108
87109 Self {
88- streams ,
110+ named_streams ,
89111 buffer : Buffer :: new ( num_streams, max_buffer_size) ,
90112 logger,
91113 is_finalized : false ,
@@ -99,7 +121,12 @@ impl StreamAggregator {
99121 ) -> Poll < Option < Result < RecordBatchGroups , Error > > > {
100122 let mut made_progress = false ;
101123
102- for ( stream_index, stream) in self . streams . iter_mut ( ) . enumerate ( ) {
124+ for ( stream_index, ( stream_name, stream) ) in self . named_streams . iter_mut ( ) . enumerate ( ) {
125+ let logger = self . logger . new ( slog:: o!(
126+ "stream_index" => stream_index,
127+ "stream_name" => stream_name. cheap_clone( )
128+ ) ) ;
129+
103130 if self . buffer . is_finalized ( stream_index) {
104131 continue ;
105132 }
@@ -108,7 +135,7 @@ impl StreamAggregator {
108135 self . is_failed = true ;
109136
110137 return Poll :: Ready ( Some ( Err ( Error :: Buffer {
111- stream_index ,
138+ stream_name : stream_name . cheap_clone ( ) ,
112139 source : anyhow ! ( "buffer is blocked" ) ,
113140 } ) ) ) ;
114141 }
@@ -123,16 +150,15 @@ impl StreamAggregator {
123150 self . buffer
124151 . extend ( stream_index, record_batch)
125152 . map_err ( |e| Error :: Buffer {
126- stream_index ,
153+ stream_name : stream_name . cheap_clone ( ) ,
127154 source : e,
128155 } ) ;
129156
130157 match buffer_result {
131158 Ok ( ( ) ) => {
132159 made_progress = true ;
133160
134- debug ! ( self . logger, "Buffered record batch" ;
135- "stream_index" => stream_index,
161+ debug ! ( logger, "Buffered record batch" ;
136162 "buffer_size" => self . buffer. size( stream_index) ,
137163 "has_capacity" => self . buffer. has_capacity( stream_index)
138164 ) ;
@@ -145,9 +171,7 @@ impl StreamAggregator {
145171 }
146172 }
147173 Poll :: Ready ( Some ( Ok ( _empty_record_batch) ) ) => {
148- debug ! ( self . logger, "Received an empty record batch" ;
149- "stream_index" => stream_index
150- ) ;
174+ debug ! ( logger, "Received an empty record batch" ) ;
151175 }
152176 Poll :: Ready ( Some ( Err ( e) ) ) => {
153177 self . is_failed = true ;
@@ -163,8 +187,7 @@ impl StreamAggregator {
163187
164188 made_progress = true ;
165189
166- info ! ( self . logger, "Stream completed" ;
167- "stream_index" => stream_index,
190+ info ! ( logger, "Stream completed" ;
168191 "buffer_size" => self . buffer. size( stream_index)
169192 ) ;
170193 }
0 commit comments