Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c032552
improve: filter only own updates for read-after-write-conistency
csviri Jun 7, 2026
b9b2807
wip
csviri Jun 8, 2026
236dc69
wip
csviri Jun 8, 2026
5ec8f1d
wip
csviri Jun 8, 2026
b7ca3a8
Event filtering with recording
csviri Jun 8, 2026
58b98e9
test fix
csviri Jun 8, 2026
5ed32fa
Simplified EventHandling
csviri Jun 9, 2026
52b29f0
unit tests fix
csviri Jun 9, 2026
648ffff
small fix, test repeats
csviri Jun 9, 2026
cef1c32
improvements and releated unit tests
csviri Jun 9, 2026
00ad4e4
cleanup
csviri Jun 9, 2026
3059765
improve: filter only own updates for read-after-write-conistency with…
csviri Jun 9, 2026
c4b5f03
improvements on edge cases
csviri Jun 9, 2026
a200ad5
Potential fix for pull request finding
csviri Jun 10, 2026
0723deb
delete related improvements and unit tests
csviri Jun 10, 2026
9bb59a5
delete handling improvements and test improvements
csviri Jun 10, 2026
1edea32
wip
csviri Jun 10, 2026
9fee806
tests
csviri Jun 10, 2026
8d1b665
test fix
csviri Jun 10, 2026
44691d0
fix typo
csviri Jun 10, 2026
639bf2a
Potential fix for pull request finding
csviri Jun 10, 2026
e567fcd
fixes
csviri Jun 10, 2026
f3595ac
improve: filter only own updates for read-after-write-conistency with…
csviri Jun 9, 2026
3688639
test fixes
csviri Jun 10, 2026
6a1ba58
logging and improvements
csviri Jun 10, 2026
2a6f482
test AI identified cases
csviri Jun 10, 2026
bf3faec
fix: only filter own events
csviri Jun 11, 2026
c954fba
wip
csviri Jun 11, 2026
8af288c
improvements and test fixes
csviri Jun 11, 2026
60c9d5a
improvements
csviri Jun 11, 2026
556234b
fix resource cache read
csviri Jun 11, 2026
7663c2b
support for re-list
csviri Jun 12, 2026
f00eba7
simple algorithm, refined tests
csviri Jun 12, 2026
cf4b7b4
naming fix
csviri Jun 12, 2026
09b03b8
small fixes
csviri Jun 12, 2026
dbc9f0f
cleanup
csviri Jun 12, 2026
4e1b5f2
cleanup
csviri Jun 12, 2026
a7f3836
additional tests
csviri Jun 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.GenericResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.*;
Expand Down Expand Up @@ -84,7 +84,7 @@ protected synchronized void handleEvent(
try {
if (log.isDebugEnabled()) {
log.debug("Event received with action: {}", action);
log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource);
log.debug("Event Old resource: {},\n new resource: {}", oldResource, resource);
}
Comment thread
csviri marked this conversation as resolved.
Comment thread
csviri marked this conversation as resolved.
Comment on lines 85 to 88
MDCUtils.addResourceInfo(resource);
controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource);
Expand Down Expand Up @@ -141,11 +141,22 @@ private void handleOnAddOrUpdate(
ResourceAction action, T oldCustomResource, T newCustomResource) {
var handling =
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
if (handling == EventHandling.NEW) {
handleEvent(action, newCustomResource, oldCustomResource, null);
} else if (log.isDebugEnabled()) {
log.debug("{} event propagation for action: {}", handling, action);
}
handling.ifPresentOrElse(
this::handleEvent,
() -> {
if (log.isDebugEnabled()) {
log.debug("Skipping/deferring event propagation for action: {}", action);
}
});
}

@SuppressWarnings("unchecked")
private void handleEvent(GenericResourceEvent r) {
handleEvent(
r.getAction(),
(T) r.getResource().orElseThrow(),
(T) r.getPreviousResource().orElse(null),
r.getLastStateUnknow());
}

@Override
Expand All @@ -154,10 +165,10 @@ public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown)
resource,
ResourceAction.DELETED,
() -> {
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
var res = temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
// delete event is quite special here, that requires special care, since we clean up
// caches on delete event.
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
res.ifPresent(this::handleEvent);
});
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Java Operator SDK Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class EventFilterSupport {

private final Map<ResourceID, EventFilterWindow> eventFilterWindows = new HashMap<>();
private boolean ongoingReList = false;

public synchronized void startEventFilteringModify(ResourceID resourceID) {
var ed =
eventFilterWindows.computeIfAbsent(resourceID, id -> new EventFilterWindow(ongoingReList));
ed.increaseActiveUpdates();
}

public synchronized Optional<GenericResourceEvent> doneEventFilterModify(ResourceID resourceID) {
var ed = eventFilterWindows.get(resourceID);
if (ed == null) return Optional.empty();
ed.decreaseActiveUpdates();
return check(ed, resourceID);
}

public synchronized Optional<GenericResourceEvent> processEvent(
ResourceID resourceId, GenericResourceEvent genericResourceEvent) {
var ed = eventFilterWindows.get(resourceId);
if (ed != null) {
ed.addRelatedEvent(genericResourceEvent);
return check(ed, resourceId);
} else {
return Optional.of(genericResourceEvent);
}
}

private Optional<GenericResourceEvent> check(
EventFilterWindow eventFilterWindow, ResourceID resourceID) {
var res = eventFilterWindow.check();
if (eventFilterWindow.canBeRemoved()) {
eventFilterWindows.remove(resourceID);
}
return res;
}

public synchronized void addToOwnResourceVersions(ResourceID resourceId, String resourceVersion) {
Optional.ofNullable(eventFilterWindows.get(resourceId))
.ifPresent(au -> au.addToOwnResourceVersions(resourceVersion));
}

public synchronized void handleGhostResourceRemoval(ResourceID resourceId) {
eventFilterWindows.remove(resourceId);
}

// for testing purposes
synchronized Map<ResourceID, EventFilterWindow> getEventFilterWindows() {
return eventFilterWindows;
}

public synchronized void setStartingReList() {
ongoingReList = true;
eventFilterWindows.values().forEach(EventFilterWindow::setReListStarted);
}

public synchronized void setRelistFinished() {
ongoingReList = false;
eventFilterWindows.values().forEach(EventFilterWindow::setReListFinished);
}

public synchronized boolean isActiveUpdateFor(ResourceID resourceId) {
return eventFilterWindows.containsKey(resourceId);
}
}
Loading
Loading