Skip to content

Commit 1c77604

Browse files
committed
support for re-list
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent d36eb7c commit 1c77604

6 files changed

Lines changed: 659 additions & 521 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterSupport.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,26 @@ public class EventFilterSupport {
2828

2929
private static final Logger log = LoggerFactory.getLogger(EventFilterSupport.class);
3030

31-
private final Map<ResourceID, EventingDetail> activeUpdates = new HashMap<>();
31+
private final Map<ResourceID, EventFilterWindow> eventFilterWindows = new HashMap<>();
3232
private Long lastKnownVersionBeforeRelist = null;
3333

3434
public synchronized void startEventFilteringModify(ResourceID resourceID) {
3535
var ed =
36-
activeUpdates.computeIfAbsent(
37-
resourceID, id -> new EventingDetail(lastKnownVersionBeforeRelist));
36+
eventFilterWindows.computeIfAbsent(
37+
resourceID, id -> new EventFilterWindow(lastKnownVersionBeforeRelist));
3838
ed.increaseActiveUpdates();
3939
}
4040

4141
public synchronized Optional<GenericResourceEvent> doneEventFilterModify(ResourceID resourceID) {
42-
var ed = activeUpdates.get(resourceID);
42+
var ed = eventFilterWindows.get(resourceID);
4343
if (ed == null) return Optional.empty();
4444
ed.decreaseActiveUpdates();
4545
return check(ed, resourceID);
4646
}
4747

4848
public synchronized Optional<GenericResourceEvent> processRelevantEvent(
4949
ResourceID resourceId, GenericResourceEvent genericResourceEvent) {
50-
var ed = activeUpdates.get(resourceId);
50+
var ed = eventFilterWindows.get(resourceId);
5151
if (ed != null) {
5252
ed.addRelatedEvent(genericResourceEvent);
5353
return check(ed, resourceId);
@@ -57,37 +57,41 @@ public synchronized Optional<GenericResourceEvent> processRelevantEvent(
5757
}
5858

5959
private Optional<GenericResourceEvent> check(
60-
EventingDetail eventingDetail, ResourceID resourceID) {
61-
var res = eventingDetail.check();
62-
if (eventingDetail.canRemoved()) {
63-
activeUpdates.remove(resourceID);
60+
EventFilterWindow eventFilterWindow, ResourceID resourceID) {
61+
var res = eventFilterWindow.check();
62+
if (eventFilterWindow.canRemoved()) {
63+
eventFilterWindows.remove(resourceID);
6464
}
6565
return res;
6666
}
6767

6868
public synchronized void addToOwnResourceVersions(ResourceID resourceId, String resourceVersion) {
69-
Optional.ofNullable(activeUpdates.get(resourceId))
69+
Optional.ofNullable(eventFilterWindows.get(resourceId))
7070
.ifPresent(au -> au.addToOwnResourceVersions(resourceVersion));
7171
}
7272

7373
public synchronized void handleGhostResourceRemoval(ResourceID resourceId) {
74-
activeUpdates.remove(resourceId);
74+
var ed = eventFilterWindows.get(resourceId);
75+
if (ed != null && !ed.canRemoved()) {
76+
return;
77+
}
78+
eventFilterWindows.remove(resourceId);
7579
}
7680

7781
// for testing purposes
78-
synchronized Map<ResourceID, EventingDetail> getActiveUpdates() {
79-
return activeUpdates;
82+
synchronized Map<ResourceID, EventFilterWindow> getEventFilterWindows() {
83+
return eventFilterWindows;
8084
}
8185

8286
public synchronized void setStartingReList(String lastKnownVersion) {
83-
activeUpdates.values().forEach(au -> au.setReListStartedFrom(lastKnownVersion));
87+
eventFilterWindows.values().forEach(au -> au.setReListStartedFrom(lastKnownVersion));
8488
}
8589

8690
public synchronized void setRelistFinished(String syncResourceVersions) {
87-
activeUpdates.values().forEach(au -> au.setReListFinished(syncResourceVersions));
91+
eventFilterWindows.values().forEach(EventFilterWindow::setReListFinished);
8892
}
8993

9094
public synchronized boolean isActiveUpdateFor(ResourceID resourceId) {
91-
return activeUpdates.containsKey(resourceId);
95+
return eventFilterWindows.containsKey(resourceId);
9296
}
9397
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventingDetail.java renamed to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterWindow.java

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,23 @@
3030
/**
3131
* Contains all the relevant information around the eventing and algorithms of a single resources.
3232
*/
33-
class EventingDetail {
33+
class EventFilterWindow {
3434

35-
private static final Logger log = LoggerFactory.getLogger(EventingDetail.class);
35+
private static final Logger log = LoggerFactory.getLogger(EventFilterWindow.class);
3636

3737
private final SortedMap<Long, GenericResourceEvent> relatedEvents = new TreeMap<>();
3838
private final SortedSet<Long> ownResourceVersions = new TreeSet<>();
3939
private Long lastResourceVersionBeforeReList;
40+
private boolean affectedByReList;
4041
private int activeUpdates = 0;
4142
private boolean ownRvEverAdded = false;
4243
private int ownRvCount = 0;
4344
private Long lastEmittedResourceRv;
4445
private Long lastSeenRelatedRv;
4546

46-
public EventingDetail(Long lastResourceVersionBeforeReList) {
47+
public EventFilterWindow(Long lastResourceVersionBeforeReList) {
4748
this.lastResourceVersionBeforeReList = lastResourceVersionBeforeReList;
49+
this.affectedByReList = lastResourceVersionBeforeReList != null;
4850
}
4951

5052
// Before we run this method
@@ -103,41 +105,83 @@ public synchronized Optional<GenericResourceEvent> check() {
103105

104106
// Emit if there is a foreign event in the window, or if a previously emitted
105107
// event already advanced the reconciler's view and a *new* event (not one we
106-
// already saw at a prior check) now moves it further.
108+
// already saw at a prior check) now moves it further. ReList also forces an
109+
// emit since it may have hidden events while it was running.
107110
boolean shouldEmit =
108-
foundForeign || (lastEmittedResourceRv != null && (prevSeen == null || cutoff > prevSeen));
111+
foundForeign
112+
|| (lastEmittedResourceRv != null && (prevSeen == null || cutoff > prevSeen))
113+
|| affectedByReList;
109114

110115
if (shouldEmit) {
111116
// Synthesize only from events that are *new* since the last check;
112117
// carryover events (RV ≤ prevSeen) were already considered before and
113118
// should not drive the synthesized event's resource versions.
114119
var synthWindow = prevSeen == null ? windowMap : windowMap.tailMap(prevSeen + 1);
115-
if (!synthWindow.isEmpty()) {
116-
var firstEvent = synthWindow.get(synthWindow.firstKey());
117-
var lastEvent = synthWindow.get(synthWindow.lastKey());
120+
121+
// When affected by a reList, treat events at or before the reList boundary
122+
// as captured *during* relist and not informative — only events strictly
123+
// after the boundary drive the synthesized output.
124+
var effectiveWindow =
125+
affectedByReList && lastResourceVersionBeforeReList != null
126+
? synthWindow.tailMap(lastResourceVersionBeforeReList + 1)
127+
: synthWindow;
128+
129+
if (!effectiveWindow.isEmpty()) {
130+
var firstEvent = effectiveWindow.get(effectiveWindow.firstKey());
131+
var lastEvent = effectiveWindow.get(effectiveWindow.lastKey());
118132

119133
// Identify the last DELETE in the synth window; a DELETE marks the
120134
// boundary of the "current life" of the resource — anything before it
121135
// represents a state that no longer exists.
122136
GenericResourceEvent lastDelete = null;
123-
for (var entry : synthWindow.entrySet()) {
137+
boolean hasForeign = false;
138+
boolean allForeignAreDeletes = true;
139+
for (var entry : effectiveWindow.entrySet()) {
124140
var ev = entry.getValue();
125141
if (ev.getAction() == ResourceAction.DELETED) {
126142
lastDelete = ev;
127143
}
144+
if (!isOwnEcho(entry.getKey(), ev)) {
145+
hasForeign = true;
146+
if (ev.getAction() != ResourceAction.DELETED) {
147+
allForeignAreDeletes = false;
148+
}
149+
}
128150
}
151+
boolean lastIsOwnEcho = isOwnEcho(effectiveWindow.lastKey(), lastEvent);
152+
boolean reListBeforeFirstOwn =
153+
affectedByReList
154+
&& !ownResourceVersions.isEmpty()
155+
&& lastResourceVersionBeforeReList != null
156+
&& lastResourceVersionBeforeReList < ownResourceVersions.first();
129157

130-
if (synthWindow.size() == 1) {
158+
if (affectedByReList && (hasForeign || reListBeforeFirstOwn)) {
159+
// ReList obscured part of the timeline AND something happened that
160+
// wasn't purely our own activity — surface a DELETE with
161+
// lastStateUnknown=true so the reconciler knows the latest known
162+
// state is uncertain.
163+
HasMetadata deleted = lastEvent.getResource().orElseThrow();
164+
result =
165+
Optional.of(new GenericResourceEvent(ResourceAction.DELETED, deleted, null, true));
166+
lastEmittedResourceRv = cutoff;
167+
} else if (!affectedByReList && hasForeign && allForeignAreDeletes && lastIsOwnEcho) {
168+
// The synth window represents a delete-then-our-recreate sequence:
169+
// the only foreign activity was DELETE(s) and the resource is back
170+
// under our control. Nothing for the reconciler to know about.
171+
} else if (effectiveWindow.size() == 1) {
131172
result = Optional.of(firstEvent);
173+
lastEmittedResourceRv = cutoff;
132174
} else if (lastEvent.getAction() == ResourceAction.DELETED) {
133175
result = Optional.of(lastEvent);
176+
lastEmittedResourceRv = cutoff;
134177
} else if (lastDelete != null) {
135178
// A DELETE happened in the middle and the resource was recreated/updated
136179
// afterwards. Synth UPDATED with previous = the deleted state.
137180
HasMetadata previous = lastDelete.getResource().orElseThrow();
138181
HasMetadata latest = lastEvent.getResource().orElseThrow();
139182
result =
140183
Optional.of(new GenericResourceEvent(ResourceAction.UPDATED, latest, previous, null));
184+
lastEmittedResourceRv = cutoff;
141185
} else {
142186
HasMetadata previous =
143187
firstEvent
@@ -146,9 +190,14 @@ public synchronized Optional<GenericResourceEvent> check() {
146190
HasMetadata latest = lastEvent.getResource().orElseThrow();
147191
result =
148192
Optional.of(new GenericResourceEvent(ResourceAction.UPDATED, latest, previous, null));
193+
lastEmittedResourceRv = cutoff;
149194
}
150195
}
151-
lastEmittedResourceRv = cutoff;
196+
197+
if (affectedByReList) {
198+
affectedByReList = false;
199+
lastResourceVersionBeforeReList = null;
200+
}
152201
}
153202

154203
lastSeenRelatedRv = prevSeen == null ? maxRelatedRv : Math.max(prevSeen, maxRelatedRv);
@@ -186,10 +235,13 @@ public void addRelatedEvent(GenericResourceEvent event) {
186235

187236
public synchronized void setReListStartedFrom(String lastResourceVersionBeforeReList) {
188237
this.lastResourceVersionBeforeReList = Long.parseLong(lastResourceVersionBeforeReList);
238+
this.affectedByReList = true;
189239
}
190240

191-
public synchronized void setReListFinished(String syncResourceVersion) {
192-
this.lastResourceVersionBeforeReList = null;
241+
public synchronized void setReListFinished() {
242+
// Marker: relist has completed and check() may now process. The relist
243+
// boundary (lastResourceVersionBeforeReList) is consumed by the next check
244+
// and reset there along with affectedByReList.
193245
}
194246

195247
public synchronized void increaseActiveUpdates() {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterSupportTest.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@ void startEventFilteringCreatesEventingDetail() {
3838
support.startEventFilteringModify(RESOURCE_ID);
3939

4040
assertThat(support.isActiveUpdateFor(RESOURCE_ID)).isTrue();
41-
assertThat(support.getActiveUpdates()).containsOnlyKeys(RESOURCE_ID);
41+
assertThat(support.getEventFilterWindows()).containsOnlyKeys(RESOURCE_ID);
4242
}
4343

4444
@Test
4545
void startEventFilteringTwiceReusesEventingDetail() {
4646
support.startEventFilteringModify(RESOURCE_ID);
47-
var first = support.getActiveUpdates().get(RESOURCE_ID);
47+
var first = support.getEventFilterWindows().get(RESOURCE_ID);
4848

4949
support.startEventFilteringModify(RESOURCE_ID);
50-
var second = support.getActiveUpdates().get(RESOURCE_ID);
50+
var second = support.getEventFilterWindows().get(RESOURCE_ID);
5151

5252
assertThat(second).isSameAs(first);
5353
}
@@ -118,23 +118,25 @@ void addToOwnResourceVersionsIsNoOpWithoutEventingDetail() {
118118
}
119119

120120
@Test
121-
void handleGhostResourceRemovalDropsEventingDetail() {
121+
void handleGhostResourceRemovalKeepsWindowWhileUpdateIsOngoing() {
122122
support.startEventFilteringModify(RESOURCE_ID);
123123

124124
support.handleGhostResourceRemoval(RESOURCE_ID);
125125

126-
assertThat(support.isActiveUpdateFor(RESOURCE_ID)).isFalse();
126+
// An in-flight write may still record its own RV; removing the window now
127+
// would lose that filtering. The upcoming doneEventFilterModify will
128+
// clean up the window itself when the write completes.
129+
assertThat(support.isActiveUpdateFor(RESOURCE_ID)).isTrue();
127130
}
128131

129132
@Test
130-
void independentResourcesAreTrackedSeparately() {
133+
void handleGhostResourceRemovalIsNoOpForUnknownResource() {
131134
support.startEventFilteringModify(RESOURCE_ID);
132-
support.startEventFilteringModify(OTHER_RESOURCE_ID);
133135

134-
support.handleGhostResourceRemoval(RESOURCE_ID);
136+
support.handleGhostResourceRemoval(OTHER_RESOURCE_ID);
135137

136-
assertThat(support.isActiveUpdateFor(RESOURCE_ID)).isFalse();
137-
assertThat(support.isActiveUpdateFor(OTHER_RESOURCE_ID)).isTrue();
138+
assertThat(support.isActiveUpdateFor(RESOURCE_ID)).isTrue();
139+
assertThat(support.isActiveUpdateFor(OTHER_RESOURCE_ID)).isFalse();
138140
}
139141

140142
@Test

0 commit comments

Comments
 (0)