@@ -60,12 +60,14 @@ class Agent
6060 # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
6161 # returning the value returned from the proc
6262 def initialize ( initial , opts = { } )
63- @value = initial
64- @rescuers = [ ]
65- @validator = Proc . new { |result | true }
66- @timeout = opts . fetch ( :timeout , TIMEOUT ) . freeze
67- self . observers = CopyOnWriteObserverSet . new
68- @executor = OptionsParser ::get_executor_from ( opts )
63+ @value = initial
64+ @rescuers = [ ]
65+ @validator = Proc . new { |result | true }
66+ @timeout = opts . fetch ( :timeout , TIMEOUT ) . freeze
67+ self . observers = CopyOnWriteObserverSet . new
68+ @executor = OptionsParser ::get_executor_from ( opts )
69+ @being_executed = false
70+ @stash = [ ]
6971 init_mutex
7072 set_deref_options ( opts )
7173 end
@@ -111,7 +113,11 @@ def rescue(clazz = StandardError, &block)
111113 # @yieldparam [Object] value the result of the last update operation
112114 # @yieldreturn [Boolean] true if the value is valid else false
113115 def validate ( &block )
114- @validator = block unless block . nil?
116+ unless block . nil?
117+ mutex . lock
118+ @validator = block
119+ mutex . unlock
120+ end
115121 self
116122 end
117123 alias_method :validates , :validate
@@ -124,8 +130,19 @@ def validate(&block)
124130 # the new value
125131 # @yieldparam [Object] value the current value
126132 # @yieldreturn [Object] the new value
133+ # @return [true, nil] nil when no block is given
127134 def post ( &block )
128- @executor . post { work ( &block ) } unless block . nil?
135+ return nil if block . nil?
136+ mutex . lock
137+ post = if @being_executed
138+ @stash << block
139+ false
140+ else
141+ @being_executed = true
142+ end
143+ mutex . unlock
144+ @executor . post { work ( &block ) } if post
145+ true
129146 end
130147
131148 # Update the current value with the result of the given block operation
@@ -157,22 +174,38 @@ def try_rescue(ex) # :nodoc:
157174 # @!visibility private
158175 def work ( &handler ) # :nodoc:
159176 begin
160-
161- should_notify = false
177+ should_notify = false
178+ validator , value = mutex . synchronize { [ @validator , @value ] }
179+
180+ begin
181+ # FIXME creates second thread
182+ result , valid = Concurrent ::timeout ( @timeout ) do
183+ [ result = handler . call ( value ) ,
184+ validator . call ( result ) ]
185+ end
186+ rescue Exception => ex
187+ exception = ex
188+ end
162189
163190 mutex . synchronize do
164- result = Concurrent ::timeout ( @timeout ) do
165- handler . call ( @value )
166- end
167- if @validator . call ( result )
168- @value = result
191+ if !exception && valid
192+ @value = result
169193 should_notify = true
170194 end
195+
196+ if ( stashed = @stash . shift )
197+ @executor . post { work ( &stashed ) }
198+ else
199+ @being_executed = false
200+ end
171201 end
172- time = Time . now
173- observers . notify_observers { [ time , self . value ] } if should_notify
174- rescue Exception => ex
175- try_rescue ( ex )
202+
203+ if should_notify
204+ time = Time . now
205+ observers . notify_observers { [ time , self . value ] }
206+ end
207+
208+ try_rescue ( exception )
176209 end
177210 end
178211 end
0 commit comments