From 9b8c3351ed2c7f3064657eed0b394f76fc0b4950 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 6 Apr 2026 22:12:51 -0700 Subject: [PATCH 01/10] [runtime] Add EventLogLevel enum, config options, and remove EventFilter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Create EventLogLevel enum (OFF, STANDARD, VERBOSE) with case-insensitive fromString() parser in api module - Add 4 new config options to AgentConfigOptions: event-log.level, event-log.standard.max-string-length, event-log.standard.max-array-elements, event-log.standard.max-depth - Delete EventFilter.java — subsumed by the log level system - Remove eventFilter field and references from EventLoggerConfig and FileEventLogger - Remove 6 EventFilter-related tests from FileEventLoggerTest Part of #541: per-event-type configurable log levels. --- .../apache/flink/agents/api/EventFilter.java | 61 ------ .../api/configuration/AgentConfigOptions.java | 28 +++ .../agents/api/logger/EventLogLevel.java | 61 ++++++ .../agents/api/logger/EventLoggerConfig.java | 36 +--- .../runtime/eventlog/FileEventLogger.java | 8 - .../runtime/eventlog/FileEventLoggerTest.java | 200 ------------------ 6 files changed, 92 insertions(+), 302 deletions(-) delete mode 100644 api/src/main/java/org/apache/flink/agents/api/EventFilter.java create mode 100644 api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java diff --git a/api/src/main/java/org/apache/flink/agents/api/EventFilter.java b/api/src/main/java/org/apache/flink/agents/api/EventFilter.java deleted file mode 100644 index ce97cc59e..000000000 --- a/api/src/main/java/org/apache/flink/agents/api/EventFilter.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.agents.api; - -/** - * Interface for filtering events in event logging and listening. - * - *

EventFilter allows fine-grained control over which events are processed by event logs and - * event listeners. Implementations can filter based on event type, attributes, or custom logic. - */ -@FunctionalInterface -public interface EventFilter { - /** - * Determines whether an event should be processed. - * - * @param event The event to evaluate - * @param context The context associated with the event - * @return true if the event should be processed, false otherwise - */ - boolean accept(Event event, EventContext context); - - /** - * Creates a filter that accepts events of the specified types. - * - * @param eventTypes The event types to accept - * @return An EventFilter that accepts only the specified event types - */ - @SafeVarargs - static EventFilter byEventType(Class... eventTypes) { - return (event, context) -> { - for (Class eventType : eventTypes) { - if (eventType.isInstance(event)) { - return true; - } - } - return false; - }; - } - - /** A filter that accepts all events. */ - EventFilter ACCEPT_ALL = (event, context) -> true; - - /** A filter that rejects all events. */ - EventFilter REJECT_ALL = (event, context) -> false; -} diff --git a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java index 724247456..82a63e8f9 100644 --- a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java +++ b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java @@ -54,4 +54,32 @@ public class AgentConfigOptions { /** The config parameter specifies the unique identifier of job. */ public static final ConfigOption JOB_IDENTIFIER = new ConfigOption<>("job-identifier", String.class, null); + + /** + * The global event log level controlling the default verbosity for all event types. Valid + * values are "OFF", "STANDARD", and "VERBOSE". Defaults to "STANDARD". + */ + public static final ConfigOption EVENT_LOG_LEVEL = + new ConfigOption<>("event-log.level", String.class, "STANDARD"); + + /** + * The maximum string length for event payloads when logging at STANDARD level. Strings + * exceeding this length will be truncated. Defaults to 2000. + */ + public static final ConfigOption EVENT_LOG_MAX_STRING_LENGTH = + new ConfigOption<>("event-log.standard.max-string-length", Integer.class, 2000); + + /** + * The maximum number of array elements to include in event payloads when logging at STANDARD + * level. Arrays exceeding this size will be truncated. Defaults to 20. + */ + public static final ConfigOption EVENT_LOG_MAX_ARRAY_ELEMENTS = + new ConfigOption<>("event-log.standard.max-array-elements", Integer.class, 20); + + /** + * The maximum nesting depth for event payloads when logging at STANDARD level. Objects deeper + * than this level will be summarized. Defaults to 5. + */ + public static final ConfigOption EVENT_LOG_MAX_DEPTH = + new ConfigOption<>("event-log.standard.max-depth", Integer.class, 5); } diff --git a/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java b/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java new file mode 100644 index 000000000..118e265cc --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.logger; + +/** + * Log level for event logging, controlling the verbosity of event log output. + * + *

+ */ +public enum EventLogLevel { + + /** No events of this type are logged. */ + OFF, + + /** Events are logged with truncated/summarized payloads. This is the default level. */ + STANDARD, + + /** Events are logged with full, untruncated payloads. */ + VERBOSE; + + /** + * Parses a string value into an {@link EventLogLevel}, case-insensitively. + * + * @param value the string representation of the log level (e.g., "off", "STANDARD", "Verbose") + * @return the corresponding {@link EventLogLevel} + * @throws IllegalArgumentException if the value does not match any log level + */ + public static EventLogLevel fromString(String value) { + if (value == null) { + throw new IllegalArgumentException("EventLogLevel value cannot be null"); + } + try { + return valueOf(value.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Invalid EventLogLevel: '" + + value + + "'. Valid values are: OFF, STANDARD, VERBOSE"); + } + } +} diff --git a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java index f57adef6d..1dedeef02 100644 --- a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java +++ b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java @@ -18,8 +18,6 @@ package org.apache.flink.agents.api.logger; -import org.apache.flink.agents.api.EventFilter; - import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -45,14 +43,11 @@ public final class EventLoggerConfig { private final String loggerType; - private final EventFilter eventFilter; private final Map properties; /** Private constructor - use {@link #builder()} to create instances. */ - private EventLoggerConfig( - String loggerType, EventFilter eventFilter, Map properties) { + private EventLoggerConfig(String loggerType, Map properties) { this.loggerType = Objects.requireNonNull(loggerType, "Logger type cannot be null"); - this.eventFilter = eventFilter == null ? EventFilter.ACCEPT_ALL : eventFilter; this.properties = Collections.unmodifiableMap(new HashMap<>(properties)); } @@ -81,15 +76,6 @@ public String getLoggerType() { return loggerType; } - /** - * Gets the event filter for this logger configuration. - * - * @return the EventFilter to apply, never null - */ - public EventFilter getEventFilter() { - return eventFilter; - } - /** * Gets the implementation-specific properties for this logger configuration. * @@ -113,13 +99,12 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; EventLoggerConfig that = (EventLoggerConfig) o; return Objects.equals(loggerType, that.loggerType) - && Objects.equals(eventFilter, that.eventFilter) && Objects.equals(properties, that.properties); } @Override public int hashCode() { - return Objects.hash(loggerType, eventFilter, properties); + return Objects.hash(loggerType, properties); } @Override @@ -128,8 +113,6 @@ public String toString() { + "loggerType='" + loggerType + '\'' - + ", eventFilter=" - + eventFilter + ", properties=" + properties + '}'; @@ -143,7 +126,6 @@ public String toString() { */ public static final class Builder { private String loggerType = "file"; // Default to file logger - private EventFilter eventFilter = EventFilter.ACCEPT_ALL; // Default to accept all private final Map properties = new HashMap<>(); private Builder() {} @@ -163,18 +145,6 @@ public Builder loggerType(String loggerType) { return this; } - /** - * Sets the event filter for this configuration. - * - * @param eventFilter the EventFilter to apply - * @return this Builder instance for method chaining - * @throws IllegalArgumentException if eventFilter is null - */ - public Builder eventFilter(EventFilter eventFilter) { - this.eventFilter = Objects.requireNonNull(eventFilter, "Event filter cannot be null"); - return this; - } - /** * Adds a property to the configuration. * @@ -213,7 +183,7 @@ public Builder properties(Map properties) { * @return a new EventLoggerConfig instance */ public EventLoggerConfig build() { - return new EventLoggerConfig(loggerType, eventFilter, properties); + return new EventLoggerConfig(loggerType, properties); } } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java index 8e7817276..4afc7bd87 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.EventContext; -import org.apache.flink.agents.api.EventFilter; import org.apache.flink.agents.api.logger.EventLogger; import org.apache.flink.agents.api.logger.EventLoggerConfig; import org.apache.flink.agents.api.logger.EventLoggerOpenParams; @@ -84,13 +83,11 @@ public class FileEventLogger implements EventLogger { private static final ObjectMapper MAPPER = new ObjectMapper(); private final EventLoggerConfig config; - private final EventFilter eventFilter; private boolean prettyPrint; private PrintWriter writer; public FileEventLogger(EventLoggerConfig config) { this.config = config; - this.eventFilter = config.getEventFilter(); } @Override @@ -130,11 +127,6 @@ public void append(EventContext context, Event event) throws Exception { throw new IllegalStateException("FileEventLogger not initialized. Call open() first."); } - // Apply event filter - if (!eventFilter.accept(event, context)) { - return; // Skip this event - } - EventLogRecord record = new EventLogRecord(context, event); // All events should be JSON serializable, since we check it when sending events to context: // RunnerContextImpl.sendEvent diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java index b61d8045c..9a6ca82d7 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.EventContext; -import org.apache.flink.agents.api.EventFilter; import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.api.OutputEvent; import org.apache.flink.agents.api.configuration.AgentConfigOptions; @@ -296,205 +295,6 @@ void testMultipleSubTasks() throws Exception { assertEquals("subtask 1 event", ((InputEvent) subtask1Record.getEvent()).getInput()); } - @Test - void testEventFilterAcceptAll() throws Exception { - // Given - config with ACCEPT_ALL filter (default behavior) - config = - EventLoggerConfig.builder() - .loggerType("file") - .property("baseLogDir", tempDir.toString()) - .eventFilter(EventFilter.ACCEPT_ALL) - .build(); - logger = new FileEventLogger(config); - - logger.open(openParams); - InputEvent inputEvent = new InputEvent("input data"); - OutputEvent outputEvent = new OutputEvent("output data"); - - // When - logger.append(new EventContext(inputEvent), inputEvent); - logger.append(new EventContext(outputEvent), outputEvent); - logger.flush(); - - // Then - both events should be logged - Path logFile = getExpectedLogFilePath(); - List lines = Files.readAllLines(logFile); - assertEquals(2, lines.size(), "Both events should be logged with ACCEPT_ALL filter"); - - // Verify both events were deserialized correctly - EventLogRecord inputRecord = objectMapper.readValue(lines.get(0), EventLogRecord.class); - assertInstanceOf(InputEvent.class, inputRecord.getEvent()); - - EventLogRecord outputRecord = objectMapper.readValue(lines.get(1), EventLogRecord.class); - assertInstanceOf(OutputEvent.class, outputRecord.getEvent()); - } - - @Test - void testEventFilterRejectAll() throws Exception { - // Given - config with REJECT_ALL filter - config = - EventLoggerConfig.builder() - .loggerType("file") - .property("baseLogDir", tempDir.toString()) - .eventFilter(EventFilter.REJECT_ALL) - .build(); - logger = new FileEventLogger(config); - - logger.open(openParams); - InputEvent inputEvent = new InputEvent("input data"); - OutputEvent outputEvent = new OutputEvent("output data"); - - // When - logger.append(new EventContext(inputEvent), inputEvent); - logger.append(new EventContext(outputEvent), outputEvent); - logger.flush(); - - // Then - no events should be logged (file should not exist or be empty) - Path logFile = getExpectedLogFilePath(); - assertTrue( - !Files.exists(logFile) || Files.readAllLines(logFile).isEmpty(), - "No events should be logged with REJECT_ALL filter"); - } - - @Test - void testEventFilterByEventType() throws Exception { - // Given - config with filter that only accepts InputEvents - config = - EventLoggerConfig.builder() - .loggerType("file") - .property("baseLogDir", tempDir.toString()) - .eventFilter(EventFilter.byEventType(InputEvent.class)) - .build(); - logger = new FileEventLogger(config); - - logger.open(openParams); - InputEvent inputEvent = new InputEvent("input data"); - OutputEvent outputEvent = new OutputEvent("output data"); - TestCustomEvent customEvent = new TestCustomEvent("custom data", 42); - - // When - logger.append(new EventContext(inputEvent), inputEvent); - logger.append(new EventContext(outputEvent), outputEvent); - logger.append(new EventContext(customEvent), customEvent); - logger.flush(); - - // Then - only InputEvent should be logged - Path logFile = getExpectedLogFilePath(); - List lines = Files.readAllLines(logFile); - assertEquals(1, lines.size(), "Only InputEvent should be logged"); - - EventLogRecord record = objectMapper.readValue(lines.get(0), EventLogRecord.class); - assertInstanceOf(InputEvent.class, record.getEvent()); - assertEquals("input data", ((InputEvent) record.getEvent()).getInput()); - } - - @Test - void testEventFilterByMultipleEventTypes() throws Exception { - // Given - config with filter that accepts InputEvents and OutputEvents - config = - EventLoggerConfig.builder() - .loggerType("file") - .property("baseLogDir", tempDir.toString()) - .eventFilter(EventFilter.byEventType(InputEvent.class, OutputEvent.class)) - .build(); - logger = new FileEventLogger(config); - - logger.open(openParams); - InputEvent inputEvent = new InputEvent("input data"); - OutputEvent outputEvent = new OutputEvent("output data"); - TestCustomEvent customEvent = new TestCustomEvent("custom data", 42); - - // When - logger.append(new EventContext(inputEvent), inputEvent); - logger.append(new EventContext(outputEvent), outputEvent); - logger.append(new EventContext(customEvent), customEvent); - logger.flush(); - - // Then - InputEvent and OutputEvent should be logged, but not TestCustomEvent - Path logFile = getExpectedLogFilePath(); - List lines = Files.readAllLines(logFile); - assertEquals(2, lines.size(), "InputEvent and OutputEvent should be logged"); - - EventLogRecord inputRecord = objectMapper.readValue(lines.get(0), EventLogRecord.class); - assertInstanceOf(InputEvent.class, inputRecord.getEvent()); - assertEquals("input data", ((InputEvent) inputRecord.getEvent()).getInput()); - - EventLogRecord outputRecord = objectMapper.readValue(lines.get(1), EventLogRecord.class); - assertInstanceOf(OutputEvent.class, outputRecord.getEvent()); - assertEquals("output data", ((OutputEvent) outputRecord.getEvent()).getOutput()); - } - - @Test - void testCustomEventFilter() throws Exception { - // Given - config with custom filter that only accepts events with specific content - EventFilter customFilter = - (event, context) -> { - if (event instanceof InputEvent) { - return ((InputEvent) event).getInput().toString().contains("important"); - } - return false; - }; - - config = - EventLoggerConfig.builder() - .loggerType("file") - .property("baseLogDir", tempDir.toString()) - .eventFilter(customFilter) - .build(); - logger = new FileEventLogger(config); - - logger.open(openParams); - InputEvent importantEvent = new InputEvent("important data"); - InputEvent regularEvent = new InputEvent("regular data"); - OutputEvent outputEvent = new OutputEvent("output data"); - - // When - logger.append(new EventContext(importantEvent), importantEvent); - logger.append(new EventContext(regularEvent), regularEvent); - logger.append(new EventContext(outputEvent), outputEvent); - logger.flush(); - - // Then - only the "important" InputEvent should be logged - Path logFile = getExpectedLogFilePath(); - List lines = Files.readAllLines(logFile); - assertEquals(1, lines.size(), "Only important InputEvent should be logged"); - - EventLogRecord record = objectMapper.readValue(lines.get(0), EventLogRecord.class); - assertInstanceOf(InputEvent.class, record.getEvent()); - assertEquals("important data", ((InputEvent) record.getEvent()).getInput()); - } - - @Test - void testDefaultEventFilterBehavior() throws Exception { - // Given - config without explicit eventFilter (should default to ACCEPT_ALL) - config = - EventLoggerConfig.builder() - .loggerType("file") - .property("baseLogDir", tempDir.toString()) - .build(); - logger = new FileEventLogger(config); - - logger.open(openParams); - InputEvent inputEvent = new InputEvent("input data"); - OutputEvent outputEvent = new OutputEvent("output data"); - - // When - logger.append(new EventContext(inputEvent), inputEvent); - logger.append(new EventContext(outputEvent), outputEvent); - logger.flush(); - - // Then - both events should be logged (default ACCEPT_ALL behavior) - Path logFile = getExpectedLogFilePath(); - List lines = Files.readAllLines(logFile); - assertEquals(2, lines.size(), "Both events should be logged with default filter"); - - EventLogRecord inputRecord = objectMapper.readValue(lines.get(0), EventLogRecord.class); - assertInstanceOf(InputEvent.class, inputRecord.getEvent()); - - EventLogRecord outputRecord = objectMapper.readValue(lines.get(1), EventLogRecord.class); - assertInstanceOf(OutputEvent.class, outputRecord.getEvent()); - } - @Test void testPrettyPrintOutputsFormattedJson() throws Exception { // Given - config with prettyPrint enabled From 1f17c548bb03858396e6b96272cb197c8fbf98c6 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 6 Apr 2026 22:24:30 -0700 Subject: [PATCH 02/10] [runtime] Add EventLogLevelResolver and JsonTruncator with unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - EventLogLevelResolver: resolves effective EventLogLevel for event type strings using hierarchical config key inheritance (exact match → parent package walk-up → root default → built-in STANDARD). Caches results in ConcurrentHashMap. Takes Map to avoid runtime→plan module dependency. - JsonTruncator: truncates Jackson JsonNode trees per configurable thresholds (max-string-length, max-array-elements, max-depth). Produces JSON-parseable wrapper objects (truncatedString, truncatedList, truncatedObject). Protected fields (eventType, id, attributes) at top level are never truncated. - Comprehensive unit tests for both classes covering exact match, hierarchy inheritance, edge cases, all truncation types, composition, and disabled thresholds. Part of #541: per-event-type configurable log levels. --- .../eventlog/EventLogLevelResolver.java | 145 ++++++++++ .../runtime/eventlog/JsonTruncator.java | 269 ++++++++++++++++++ .../eventlog/EventLogLevelResolverTest.java | 153 ++++++++++ .../runtime/eventlog/JsonTruncatorTest.java | 207 ++++++++++++++ 4 files changed, 774 insertions(+) create mode 100644 runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java create mode 100644 runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java create mode 100644 runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java create mode 100644 runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java new file mode 100644 index 000000000..6db7e4c70 --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.eventlog; + +import org.apache.flink.agents.api.logger.EventLogLevel; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Resolves the effective {@link EventLogLevel} for a given event type string using hierarchical + * config key inheritance. + * + *

Resolution order for a given event type (e.g., {@code org.apache.flink.agents.api.event + * .ChatRequestEvent}): + * + *

    + *
  1. Exact match in explicit per-type configuration + *
  2. Walk up dot-separated segments (e.g., {@code org.apache.flink.agents.api.event}, then + * {@code org.apache.flink.agents.api}, etc.) + *
  3. Root default from {@code event-log.level} config key + *
  4. Built-in default: {@link EventLogLevel#STANDARD} + *
+ * + *

This mirrors Log4j's hierarchical logger configuration pattern. Resolved levels are cached in + * a {@link ConcurrentHashMap} for efficient repeated lookups. + * + *

Config keys are expected in the form: + * + *

+ *   event-log.level = STANDARD            (root default)
+ *   event-log.type.<EVENT_TYPE>.level = OFF   (per-type override)
+ * 
+ */ +public class EventLogLevelResolver { + + /** Config key for the root default log level. */ + static final String ROOT_LEVEL_KEY = "event-log.level"; + + /** Prefix for per-event-type log level config keys. */ + static final String TYPE_PREFIX = "event-log.type."; + + /** Suffix for per-event-type log level config keys. */ + static final String TYPE_SUFFIX = ".level"; + + /** Built-in default when no configuration is provided. */ + static final EventLogLevel BUILT_IN_DEFAULT = EventLogLevel.STANDARD; + + private final EventLogLevel rootDefault; + private final Map explicitLevels; + private final ConcurrentHashMap cache; + + /** + * Creates a resolver from a configuration data map. + * + *

The map is scanned for keys matching {@code event-log.type..level} to build + * the explicit per-type level mappings, and {@code event-log.level} for the root default. + * + * @param confData the flat configuration key-value map (e.g., from {@code + * AgentConfiguration.getConfData()}) + */ + public EventLogLevelResolver(Map confData) { + Map data = confData != null ? confData : Collections.emptyMap(); + + // Parse root default + Object rootValue = data.get(ROOT_LEVEL_KEY); + this.rootDefault = + rootValue != null + ? EventLogLevel.fromString(rootValue.toString()) + : BUILT_IN_DEFAULT; + + // Scan for per-type overrides + Map levels = new HashMap<>(); + for (Map.Entry entry : data.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(TYPE_PREFIX) && key.endsWith(TYPE_SUFFIX)) { + String eventType = + key.substring(TYPE_PREFIX.length(), key.length() - TYPE_SUFFIX.length()); + if (!eventType.isEmpty()) { + levels.put(eventType, EventLogLevel.fromString(entry.getValue().toString())); + } + } + } + this.explicitLevels = Collections.unmodifiableMap(levels); + this.cache = new ConcurrentHashMap<>(); + } + + /** + * Resolves the effective log level for the given event type string. + * + *

Uses hierarchical inheritance: exact match first, then walks up dot-separated segments, + * then falls back to root default, then built-in default ({@link EventLogLevel#STANDARD}). + * + * @param eventType the fully qualified event type string (e.g., {@code + * org.apache.flink.agents.api.event.ChatRequestEvent}) + * @return the resolved {@link EventLogLevel}, never null + */ + public EventLogLevel resolve(String eventType) { + if (eventType == null || eventType.isEmpty()) { + return rootDefault; + } + return cache.computeIfAbsent(eventType, this::doResolve); + } + + private EventLogLevel doResolve(String eventType) { + // 1. Exact match + EventLogLevel level = explicitLevels.get(eventType); + if (level != null) { + return level; + } + + // 2. Walk up dot-separated segments + String current = eventType; + int lastDot = current.lastIndexOf('.'); + while (lastDot > 0) { + current = current.substring(0, lastDot); + level = explicitLevels.get(current); + if (level != null) { + return level; + } + lastDot = current.lastIndexOf('.'); + } + + // 3. Root default (already parsed in constructor, falls through to built-in if not set) + return rootDefault; + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java new file mode 100644 index 000000000..9002c088b --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.eventlog; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Truncates a Jackson {@link JsonNode} tree per configurable thresholds. + * + *

Three truncation strategies are applied in a single recursive pass: + * + *

    + *
  • String truncation: Strings longer than {@code maxStringLength} are replaced with a + * wrapper: {@code {"truncatedString": "first N chars...", "omittedChars": M}} + *
  • Array truncation: Arrays larger than {@code maxArrayElements} are replaced with a + * wrapper: {@code {"truncatedList": [first N elements], "omittedElements": M}} + *
  • Depth truncation: At max depth, object nodes retain only scalar fields; nested + * objects/arrays are dropped: {@code {"truncatedObject": {scalars...}, "omittedFields": N}} + *
+ * + *

Setting any threshold to {@code 0} disables that specific truncation strategy. If all + * thresholds are {@code 0}, no truncation occurs. + * + *

Protected fields at the top level of the event node ({@code eventType}, {@code id}, {@code + * attributes}) are never truncated. + */ +public class JsonTruncator { + + private static final Set PROTECTED_FIELDS = + new HashSet<>(Arrays.asList("eventType", "id", "attributes")); + + private final int maxStringLength; + private final int maxArrayElements; + private final int maxDepth; + + /** + * Creates a new truncator with the given thresholds. + * + * @param maxStringLength maximum character length for string values; 0 to disable + * @param maxArrayElements maximum number of array elements retained; 0 to disable + * @param maxDepth maximum object nesting depth; 0 to disable + */ + public JsonTruncator(int maxStringLength, int maxArrayElements, int maxDepth) { + this.maxStringLength = maxStringLength; + this.maxArrayElements = maxArrayElements; + this.maxDepth = maxDepth; + } + + /** + * Truncates the given event node in place according to configured thresholds. + * + *

Protected fields ({@code eventType}, {@code id}, {@code attributes}) at the top level of + * the event node are never truncated. + * + * @param eventNode the top-level event JSON object to truncate + * @return {@code true} if any field was truncated, {@code false} if the node was unchanged + */ + public boolean truncate(ObjectNode eventNode) { + if (eventNode == null) { + return false; + } + return truncateObject(eventNode, 1, true); + } + + /** + * Recursively truncates an object node. + * + * @param node the object node to process + * @param depth current depth (1 = top-level event node) + * @param isTopLevel whether this is the top-level event node (for protected field checks) + * @return true if any truncation occurred + */ + private boolean truncateObject(ObjectNode node, int depth, boolean isTopLevel) { + boolean truncated = false; + + // At max depth, collapse the entire object to retain only scalars + if (maxDepth > 0 && depth >= maxDepth) { + return collapseAtMaxDepth(node, isTopLevel); + } + + List fieldNames = new ArrayList<>(); + node.fieldNames().forEachRemaining(fieldNames::add); + + for (String fieldName : fieldNames) { + if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) { + continue; + } + + JsonNode child = node.get(fieldName); + if (child == null) { + continue; + } + + if (child.isTextual()) { + JsonNode replacement = truncateString(child.textValue()); + if (replacement != null) { + node.set(fieldName, replacement); + truncated = true; + } + } else if (child.isArray()) { + // First recurse into retained elements, then truncate the array if needed + truncated |= truncateArrayContents((ArrayNode) child, depth + 1); + JsonNode replacement = truncateArray((ArrayNode) child); + if (replacement != null) { + node.set(fieldName, replacement); + truncated = true; + } + } else if (child.isObject()) { + truncated |= truncateObject((ObjectNode) child, depth + 1, false); + } + } + + return truncated; + } + + /** + * Truncates a string if it exceeds maxStringLength. + * + * @return a wrapper ObjectNode if truncated, or null if no truncation needed + */ + private JsonNode truncateString(String value) { + if (maxStringLength <= 0 || value == null || value.length() <= maxStringLength) { + return null; + } + ObjectNode wrapper = JsonNodeFactory.instance.objectNode(); + wrapper.put("truncatedString", value.substring(0, maxStringLength) + "..."); + wrapper.put("omittedChars", value.length() - maxStringLength); + return wrapper; + } + + /** + * Truncates an array if it exceeds maxArrayElements. + * + * @return a wrapper ObjectNode if truncated, or null if no truncation needed + */ + private JsonNode truncateArray(ArrayNode array) { + if (maxArrayElements <= 0 || array.size() <= maxArrayElements) { + return null; + } + ObjectNode wrapper = JsonNodeFactory.instance.objectNode(); + ArrayNode retained = JsonNodeFactory.instance.arrayNode(); + for (int i = 0; i < maxArrayElements; i++) { + retained.add(array.get(i)); + } + wrapper.set("truncatedList", retained); + wrapper.put("omittedElements", array.size() - maxArrayElements); + return wrapper; + } + + /** + * Recursively truncates contents within an array (strings and nested structures). + * + * @return true if any truncation occurred within array elements + */ + private boolean truncateArrayContents(ArrayNode array, int depth) { + boolean truncated = false; + for (int i = 0; i < array.size(); i++) { + JsonNode element = array.get(i); + if (element.isTextual()) { + JsonNode replacement = truncateString(element.textValue()); + if (replacement != null) { + array.set(i, replacement); + truncated = true; + } + } else if (element.isObject()) { + truncated |= truncateObject((ObjectNode) element, depth, false); + } else if (element.isArray()) { + truncated |= truncateArrayContents((ArrayNode) element, depth + 1); + JsonNode replacement = truncateArray((ArrayNode) element); + if (replacement != null) { + array.set(i, replacement); + truncated = true; + } + } + } + return truncated; + } + + /** + * At max depth, retain only scalar fields and drop nested objects/arrays. + * + * @return true if any fields were dropped + */ + private boolean collapseAtMaxDepth(ObjectNode node, boolean isTopLevel) { + List fieldNames = new ArrayList<>(); + node.fieldNames().forEachRemaining(fieldNames::add); + + ObjectNode scalarFields = JsonNodeFactory.instance.objectNode(); + int omittedCount = 0; + boolean hasNonScalar = false; + + for (String fieldName : fieldNames) { + if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) { + // Protected fields are kept as-is, even if non-scalar + scalarFields.set(fieldName, node.get(fieldName)); + continue; + } + + JsonNode child = node.get(fieldName); + if (child.isObject() || child.isArray()) { + omittedCount++; + hasNonScalar = true; + } else { + // Apply string truncation to scalar string fields even at max depth + if (child.isTextual()) { + JsonNode replacement = truncateString(child.textValue()); + if (replacement != null) { + scalarFields.set(fieldName, replacement); + } else { + scalarFields.set(fieldName, child); + } + } else { + scalarFields.set(fieldName, child); + } + } + } + + if (!hasNonScalar) { + // No non-scalar fields to drop; but string truncation may have occurred + // Check if any scalar field was replaced + boolean stringTruncated = false; + for (String fieldName : fieldNames) { + if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) { + continue; + } + JsonNode original = node.get(fieldName); + if (original.isTextual()) { + JsonNode replacement = truncateString(original.textValue()); + if (replacement != null) { + node.set(fieldName, replacement); + stringTruncated = true; + } + } + } + return stringTruncated; + } + + // Replace the node contents with the wrapped version + node.removeAll(); + node.set("truncatedObject", scalarFields); + node.put("omittedFields", omittedCount); + return true; + } +} diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java new file mode 100644 index 000000000..a1d2d26e5 --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.eventlog; + +import org.apache.flink.agents.api.logger.EventLogLevel; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class EventLogLevelResolverTest { + + @Test + void testExactMatch() { + Map config = new HashMap<>(); + config.put( + "event-log.type.org.apache.flink.agents.api.event.ChatRequestEvent.level", "OFF"); + EventLogLevelResolver resolver = new EventLogLevelResolver(config); + + assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent")) + .isEqualTo(EventLogLevel.OFF); + } + + @Test + void testParentPackageInheritance() { + Map config = new HashMap<>(); + config.put("event-log.type.org.apache.flink.agents.api.event.level", "VERBOSE"); + EventLogLevelResolver resolver = new EventLogLevelResolver(config); + + // ChatRequestEvent is under the configured parent package + assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent")) + .isEqualTo(EventLogLevel.VERBOSE); + } + + @Test + void testGrandparentInheritance() { + Map config = new HashMap<>(); + config.put("event-log.type.org.apache.flink.level", "OFF"); + EventLogLevelResolver resolver = new EventLogLevelResolver(config); + + // Should walk up multiple levels to find the match + assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent")) + .isEqualTo(EventLogLevel.OFF); + } + + @Test + void testRootDefault() { + Map config = new HashMap<>(); + config.put("event-log.level", "VERBOSE"); + EventLogLevelResolver resolver = new EventLogLevelResolver(config); + + assertThat(resolver.resolve("some.unknown.EventType")).isEqualTo(EventLogLevel.VERBOSE); + } + + @Test + void testBuiltInDefault() { + EventLogLevelResolver resolver = new EventLogLevelResolver(Collections.emptyMap()); + + assertThat(resolver.resolve("some.unknown.EventType")).isEqualTo(EventLogLevel.STANDARD); + } + + @Test + void testCaseInsensitive() { + Map config = new HashMap<>(); + config.put("event-log.type.my.Event.level", "verbose"); + EventLogLevelResolver resolver = new EventLogLevelResolver(config); + + assertThat(resolver.resolve("my.Event")).isEqualTo(EventLogLevel.VERBOSE); + + // Also test root level with mixed case + Map config2 = new HashMap<>(); + config2.put("event-log.level", "Off"); + EventLogLevelResolver resolver2 = new EventLogLevelResolver(config2); + + assertThat(resolver2.resolve("any.Event")).isEqualTo(EventLogLevel.OFF); + } + + @Test + void testInvalidLevel() { + Map config = new HashMap<>(); + config.put("event-log.type.my.Event.level", "INVALID_LEVEL"); + + assertThatThrownBy(() -> new EventLogLevelResolver(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("INVALID_LEVEL"); + } + + @Test + void testNullConfigData() { + EventLogLevelResolver resolver = new EventLogLevelResolver(null); + + assertThat(resolver.resolve("some.Event")).isEqualTo(EventLogLevel.STANDARD); + } + + @Test + void testNullOrEmptyEventType() { + Map config = new HashMap<>(); + config.put("event-log.level", "VERBOSE"); + EventLogLevelResolver resolver = new EventLogLevelResolver(config); + + assertThat(resolver.resolve(null)).isEqualTo(EventLogLevel.VERBOSE); + assertThat(resolver.resolve("")).isEqualTo(EventLogLevel.VERBOSE); + } + + @Test + void testExactMatchTakesPrecedenceOverParent() { + Map config = new HashMap<>(); + config.put("event-log.type.org.apache.flink.agents.api.event.level", "OFF"); + config.put( + "event-log.type.org.apache.flink.agents.api.event.ChatRequestEvent.level", + "VERBOSE"); + EventLogLevelResolver resolver = new EventLogLevelResolver(config); + + // Exact match should win over parent package + assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent")) + .isEqualTo(EventLogLevel.VERBOSE); + // Sibling should inherit from parent + assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatResponseEvent")) + .isEqualTo(EventLogLevel.OFF); + } + + @Test + void testCachingReturnsSameResult() { + Map config = new HashMap<>(); + config.put("event-log.type.my.Event.level", "OFF"); + EventLogLevelResolver resolver = new EventLogLevelResolver(config); + + // Call twice — should return cached result + EventLogLevel first = resolver.resolve("my.Event"); + EventLogLevel second = resolver.resolve("my.Event"); + assertThat(first).isSameAs(second); + } +} diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java new file mode 100644 index 000000000..73696e69d --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.eventlog; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class JsonTruncatorTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Test + void testStringTruncation() { + JsonTruncator truncator = new JsonTruncator(10, 0, 0); + ObjectNode node = MAPPER.createObjectNode(); + node.put("content", "This is a very long string that exceeds the limit"); + + boolean result = truncator.truncate(node); + + assertThat(result).isTrue(); + assertThat(node.get("content").isObject()).isTrue(); + assertThat(node.get("content").get("truncatedString").asText()).isEqualTo("This is a ..."); + assertThat(node.get("content").get("omittedChars").asInt()) + .isEqualTo(49 - 10); // total length minus maxStringLength + } + + @Test + void testArrayTrimming() { + JsonTruncator truncator = new JsonTruncator(0, 3, 0); + ObjectNode node = MAPPER.createObjectNode(); + ArrayNode array = MAPPER.createArrayNode(); + for (int i = 0; i < 7; i++) { + array.add("item" + i); + } + node.set("items", array); + + boolean result = truncator.truncate(node); + + assertThat(result).isTrue(); + assertThat(node.get("items").isObject()).isTrue(); + assertThat(node.get("items").get("truncatedList").size()).isEqualTo(3); + assertThat(node.get("items").get("truncatedList").get(0).asText()).isEqualTo("item0"); + assertThat(node.get("items").get("truncatedList").get(1).asText()).isEqualTo("item1"); + assertThat(node.get("items").get("truncatedList").get(2).asText()).isEqualTo("item2"); + assertThat(node.get("items").get("omittedElements").asInt()).isEqualTo(4); + } + + @Test + void testDepthCollapsing() { + JsonTruncator truncator = new JsonTruncator(0, 0, 2); + ObjectNode node = MAPPER.createObjectNode(); + ObjectNode nested = MAPPER.createObjectNode(); + nested.put("scalarField", "value"); + nested.put("numberField", 42); + ObjectNode deepNested = MAPPER.createObjectNode(); + deepNested.put("deep", "data"); + nested.set("nestedObj", deepNested); + ArrayNode nestedArray = MAPPER.createArrayNode(); + nestedArray.add("a"); + nested.set("nestedArr", nestedArray); + node.set("data", nested); + + boolean result = truncator.truncate(node); + + assertThat(result).isTrue(); + // The "data" field's nested object should be collapsed at depth 2 + assertThat(node.get("data").has("truncatedObject")).isTrue(); + ObjectNode truncatedObj = (ObjectNode) node.get("data").get("truncatedObject"); + assertThat(truncatedObj.get("scalarField").asText()).isEqualTo("value"); + assertThat(truncatedObj.get("numberField").asInt()).isEqualTo(42); + assertThat(truncatedObj.has("nestedObj")).isFalse(); + assertThat(truncatedObj.has("nestedArr")).isFalse(); + assertThat(node.get("data").get("omittedFields").asInt()).isEqualTo(2); + } + + @Test + void testNoTruncationUnderLimits() { + JsonTruncator truncator = new JsonTruncator(100, 10, 5); + ObjectNode node = MAPPER.createObjectNode(); + node.put("short", "hello"); + ArrayNode smallArray = MAPPER.createArrayNode(); + smallArray.add("a"); + smallArray.add("b"); + node.set("list", smallArray); + + boolean result = truncator.truncate(node); + + assertThat(result).isFalse(); + assertThat(node.get("short").asText()).isEqualTo("hello"); + assertThat(node.get("list").size()).isEqualTo(2); + } + + @Test + void testDisabledThresholds() { + // All thresholds set to 0 — no truncation should occur + JsonTruncator truncator = new JsonTruncator(0, 0, 0); + ObjectNode node = MAPPER.createObjectNode(); + node.put("content", "A".repeat(10000)); + ArrayNode bigArray = MAPPER.createArrayNode(); + for (int i = 0; i < 100; i++) { + bigArray.add(i); + } + node.set("items", bigArray); + ObjectNode deep = MAPPER.createObjectNode(); + deep.set("level2", MAPPER.createObjectNode().put("level3", "deep")); + node.set("nested", deep); + + boolean result = truncator.truncate(node); + + assertThat(result).isFalse(); + assertThat(node.get("content").asText()).hasSize(10000); + assertThat(node.get("items").size()).isEqualTo(100); + } + + @Test + void testProtectedFields() { + JsonTruncator truncator = new JsonTruncator(5, 2, 2); + ObjectNode node = MAPPER.createObjectNode(); + // Protected fields should never be truncated + node.put("eventType", "org.apache.flink.agents.api.event.ChatRequestEvent"); + node.put("id", "a-very-long-identifier-that-exceeds-the-limit"); + ObjectNode attributes = MAPPER.createObjectNode(); + attributes.put("key", "value"); + attributes.set("nested", MAPPER.createObjectNode().put("deep", "data")); + node.set("attributes", attributes); + // Non-protected field should be truncated + node.put("content", "This should be truncated"); + + boolean result = truncator.truncate(node); + + assertThat(result).isTrue(); + // Protected fields remain untouched + assertThat(node.get("eventType").asText()) + .isEqualTo("org.apache.flink.agents.api.event.ChatRequestEvent"); + assertThat(node.get("id").asText()) + .isEqualTo("a-very-long-identifier-that-exceeds-the-limit"); + assertThat(node.get("attributes").isObject()).isTrue(); + assertThat(node.get("attributes").get("key").asText()).isEqualTo("value"); + // Non-protected field is truncated + assertThat(node.get("content").get("truncatedString")).isNotNull(); + } + + @Test + void testCompositeScenario() { + JsonTruncator truncator = new JsonTruncator(10, 2, 3); + ObjectNode node = MAPPER.createObjectNode(); + // Long string — should trigger string truncation + node.put("message", "Hello, this is a long message from the LLM"); + // Large array — should trigger array truncation + ArrayNode tools = MAPPER.createArrayNode(); + for (int i = 0; i < 5; i++) { + tools.add("tool" + i); + } + node.set("tools", tools); + // Deep nesting — should trigger depth truncation at level 3 + ObjectNode level1 = MAPPER.createObjectNode(); + ObjectNode level2 = MAPPER.createObjectNode(); + level2.put("scalar", "kept"); + level2.set("tooDeep", MAPPER.createObjectNode().put("hidden", "gone")); + level1.set("inner", level2); + node.set("metadata", level1); + + boolean result = truncator.truncate(node); + + assertThat(result).isTrue(); + // String truncation applied + assertThat(node.get("message").get("truncatedString")).isNotNull(); + assertThat(node.get("message").get("omittedChars").asInt()).isGreaterThan(0); + // Array truncation applied + assertThat(node.get("tools").get("truncatedList").size()).isEqualTo(2); + assertThat(node.get("tools").get("omittedElements").asInt()).isEqualTo(3); + // Depth truncation: level2 is at depth 3, so it should be collapsed + ObjectNode metadataInner = (ObjectNode) node.get("metadata").get("inner"); + assertThat(metadataInner.has("truncatedObject")).isTrue(); + assertThat(metadataInner.get("truncatedObject").get("scalar").asText()).isEqualTo("kept"); + assertThat(metadataInner.get("omittedFields").asInt()).isEqualTo(1); + } + + @Test + void testNullNode() { + JsonTruncator truncator = new JsonTruncator(10, 10, 10); + + boolean result = truncator.truncate(null); + + assertThat(result).isFalse(); + } +} From 239eefb9bba6798731bbda4dd384ab48dd310082 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 6 Apr 2026 22:43:18 -0700 Subject: [PATCH 03/10] [runtime] Integrate log levels into FileEventLogger, serializer, and operator - EventLogRecord: add logLevel, eventType, truncator, and truncatedEventsCounter fields. Old 2-arg constructor defaults to VERBOSE for backward compatibility. - EventLogRecordJsonSerializer: write logLevel and eventType as top-level JSON fields. Apply JsonTruncator when level is STANDARD, increment truncatedEventsCounter when truncation occurs. - EventLogRecordJsonDeserializer: read optional logLevel field, default to VERBOSE for old records (backward compat). - FileEventLogger: resolve log level per event via EventLogLevelResolver, skip OFF events, initialize truncator from agent config. Accept truncation counter via setter. - ActionExecutionOperator: pass full AgentConfiguration.getConfData() to logger via agentConfig property. Wire truncation counter from BuiltInMetrics to FileEventLogger after open(). - BuiltInMetrics: add eventLogTruncatedEvents counter with getter. - FileEventLoggerTest: add 7 new tests for STANDARD truncation, VERBOSE preservation, OFF suppression, per-type override, hierarchical inheritance, new JSON fields, backward deserialization. Part of #541: per-event-type configurable log levels. --- .../runtime/eventlog/EventLogRecord.java | 61 +++++ .../EventLogRecordJsonDeserializer.java | 14 +- .../EventLogRecordJsonSerializer.java | 24 +- .../runtime/eventlog/FileEventLogger.java | 72 +++++- .../runtime/metrics/BuiltInMetrics.java | 14 ++ .../operator/ActionExecutionOperator.java | 6 + .../runtime/eventlog/FileEventLoggerTest.java | 225 ++++++++++++++++++ 7 files changed, 410 insertions(+), 6 deletions(-) diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java index 5b83452d6..fe95cde02 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.EventContext; +import org.apache.flink.agents.api.logger.EventLogLevel; +import org.apache.flink.metrics.Counter; /** * Represents a record in the event log, containing the event context and the event itself. @@ -31,23 +33,82 @@ * *

The class uses custom JSON serialization/deserialization to handle polymorphic Event types by * leveraging the eventType information stored in the EventContext. + * + *

Each record carries a {@link EventLogLevel} that controls truncation behavior during + * serialization, and a convenience copy of the event type string from the context. */ @JsonSerialize(using = EventLogRecordJsonSerializer.class) @JsonDeserialize(using = EventLogRecordJsonDeserializer.class) public class EventLogRecord { private final EventContext context; private final Event event; + private final EventLogLevel logLevel; + private final String eventType; + private final transient JsonTruncator truncator; + private final transient Counter truncatedEventsCounter; + /** + * Creates a record with default VERBOSE log level and no truncator. Used by the deserializer + * and existing code paths that do not need level-aware serialization. + * + * @param context the event context + * @param event the event + */ public EventLogRecord(EventContext context, Event event) { + this(context, event, EventLogLevel.VERBOSE, null, null); + } + + /** + * Creates a record with the specified log level, truncator, and optional metric counter for + * level-aware serialization. + * + * @param context the event context + * @param event the event + * @param logLevel the resolved log level for this event + * @param truncator the truncator to apply when level is STANDARD, may be null + * @param truncatedEventsCounter counter to increment when truncation occurs, may be null + */ + public EventLogRecord( + EventContext context, + Event event, + EventLogLevel logLevel, + JsonTruncator truncator, + Counter truncatedEventsCounter) { this.context = context; this.event = event; + this.logLevel = logLevel != null ? logLevel : EventLogLevel.VERBOSE; + this.eventType = context != null ? context.getEventType() : null; + this.truncator = truncator; + this.truncatedEventsCounter = truncatedEventsCounter; } + /** Returns the event context. */ public EventContext getContext() { return context; } + /** Returns the event. */ public Event getEvent() { return event; } + + /** Returns the log level for this record. */ + public EventLogLevel getLogLevel() { + return logLevel; + } + + /** Returns the event type string (convenience copy from context). */ + public String getEventType() { + return eventType; + } + + /** Returns the truncator, or null if no truncation should be applied. */ + public JsonTruncator getTruncator() { + return truncator; + } + + /** Returns the counter for tracking truncated events, or null if not set. */ + public Counter getTruncatedEventsCounter() { + return truncatedEventsCounter; + } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java index 45c7e6246..e0cc84bb0 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.EventContext; +import org.apache.flink.agents.api.logger.EventLogLevel; import java.io.IOException; @@ -58,6 +59,17 @@ public EventLogRecord deserialize(JsonParser parser, DeserializationContext cont throw new IOException("Missing 'timestamp' field in EventLogRecord JSON"); } + // Read optional logLevel field (backward-compatible: default to VERBOSE if absent) + EventLogLevel logLevel = EventLogLevel.VERBOSE; + JsonNode logLevelNode = rootNode.get("logLevel"); + if (logLevelNode != null && logLevelNode.isTextual()) { + try { + logLevel = EventLogLevel.fromString(logLevelNode.asText()); + } catch (IllegalArgumentException ignored) { + // Fall back to VERBOSE for unrecognized values + } + } + // Deserialize event using eventType from event node JsonNode eventNode = rootNode.get("event"); if (eventNode == null) { @@ -68,7 +80,7 @@ public EventLogRecord deserialize(JsonParser parser, DeserializationContext cont Event event = deserializeEvent(mapper, stripEventType(eventNode), eventType); EventContext eventContext = new EventContext(eventType, timestampNode.asText()); - return new EventLogRecord(eventContext, event); + return new EventLogRecord(eventContext, event, logLevel, null, null); } /** diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java index db7bebd73..6b04944d5 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.logger.EventLogLevel; import org.apache.flink.agents.runtime.python.event.PythonEvent; import java.io.IOException; @@ -47,9 +48,11 @@ *

{@code
  * {
  *   "timestamp": "2024-01-15T10:30:00Z",
+ *   "logLevel": "STANDARD",
+ *   "eventType": "org.apache.flink.agents.api.InputEvent",
  *   "event": {
- *     "eventType": "org.apache.flink.agents.api.InputEvent"
- *     // Event-specific fields serialized normally
+ *     "eventType": "org.apache.flink.agents.api.InputEvent",
+ *     // Event-specific fields (may be truncated at STANDARD level)
  *   }
  * }
  * }
@@ -67,6 +70,9 @@ public void serialize(EventLogRecord record, JsonGenerator gen, SerializerProvid gen.writeStartObject(); gen.writeStringField("timestamp", record.getContext().getTimestamp()); + gen.writeStringField( + "logLevel", record.getLogLevel() != null ? record.getLogLevel().name() : "VERBOSE"); + gen.writeStringField("eventType", record.getContext().getEventType()); gen.writeFieldName("event"); JsonNode eventNode = buildEventNode(record.getEvent(), mapper); @@ -74,8 +80,18 @@ public void serialize(EventLogRecord record, JsonGenerator gen, SerializerProvid throw new IllegalStateException( "Event log payload must be a JSON object, but was: " + eventNode.getNodeType()); } - eventNode = reorderEventFields((ObjectNode) eventNode, record.getEvent(), mapper); - gen.writeTree(eventNode); + ObjectNode orderedNode = + reorderEventFields((ObjectNode) eventNode, record.getEvent(), mapper); + + // Apply truncation for STANDARD level + if (record.getLogLevel() == EventLogLevel.STANDARD && record.getTruncator() != null) { + boolean truncated = record.getTruncator().truncate(orderedNode); + if (truncated && record.getTruncatedEventsCounter() != null) { + record.getTruncatedEventsCounter().inc(); + } + } + + gen.writeTree(orderedNode); gen.writeEndObject(); } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java index 4afc7bd87..294198528 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java @@ -21,9 +21,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.EventContext; +import org.apache.flink.agents.api.configuration.AgentConfigOptions; +import org.apache.flink.agents.api.logger.EventLogLevel; import org.apache.flink.agents.api.logger.EventLogger; import org.apache.flink.agents.api.logger.EventLoggerConfig; import org.apache.flink.agents.api.logger.EventLoggerOpenParams; +import org.apache.flink.metrics.Counter; import java.io.BufferedWriter; import java.io.FileWriter; @@ -31,6 +34,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collections; +import java.util.Map; /** * A file-based event logger that logs events to files with structured names in a flat directory. @@ -80,11 +85,17 @@ public class FileEventLogger implements EventLogger { private static final String DEFAULT_BASE_LOG_DIR = Paths.get(System.getProperty("java.io.tmpdir"), "flink-agents").toString(); + /** Property key for passing the full agent config data map into the logger. */ + public static final String AGENT_CONFIG_PROPERTY_KEY = "agentConfig"; + private static final ObjectMapper MAPPER = new ObjectMapper(); private final EventLoggerConfig config; private boolean prettyPrint; private PrintWriter writer; + private EventLogLevelResolver levelResolver; + private JsonTruncator truncator; + private Counter truncatedEventsCounter; public FileEventLogger(EventLoggerConfig config) { this.config = config; @@ -102,6 +113,45 @@ public void open(EventLoggerOpenParams params) throws Exception { writer = new PrintWriter(new BufferedWriter(new FileWriter(logFilePath, true))); prettyPrint = (Boolean) config.getProperties().getOrDefault(PRETTY_PRINT_PROPERTY_KEY, false); + + // Initialize level resolver and truncator from agent config + @SuppressWarnings("unchecked") + Map agentConfig = + (Map) + config.getProperties() + .getOrDefault(AGENT_CONFIG_PROPERTY_KEY, Collections.emptyMap()); + this.levelResolver = new EventLogLevelResolver(agentConfig); + int maxStringLength = + getIntFromConfig( + agentConfig, + AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getKey(), + AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getDefaultValue()); + int maxArrayElements = + getIntFromConfig( + agentConfig, + AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getKey(), + AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getDefaultValue()); + int maxDepth = + getIntFromConfig( + agentConfig, + AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getKey(), + AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getDefaultValue()); + this.truncator = new JsonTruncator(maxStringLength, maxArrayElements, maxDepth); + } + + private static int getIntFromConfig(Map config, String key, int defaultValue) { + Object value = config.get(key); + if (value == null) { + return defaultValue; + } + if (value instanceof Number) { + return ((Number) value).intValue(); + } + try { + return Integer.parseInt(value.toString()); + } catch (NumberFormatException e) { + return defaultValue; + } } private String generateSubTaskLogFilePath(EventLoggerOpenParams params) { @@ -127,7 +177,17 @@ public void append(EventContext context, Event event) throws Exception { throw new IllegalStateException("FileEventLogger not initialized. Call open() first."); } - EventLogRecord record = new EventLogRecord(context, event); + // Resolve log level and skip OFF events + EventLogLevel level = + levelResolver != null + ? levelResolver.resolve(context.getEventType()) + : EventLogLevel.VERBOSE; + if (level == EventLogLevel.OFF) { + return; + } + + EventLogRecord record = + new EventLogRecord(context, event, level, truncator, truncatedEventsCounter); // All events should be JSON serializable, since we check it when sending events to context: // RunnerContextImpl.sendEvent String json = @@ -146,6 +206,16 @@ public void flush() throws Exception { writer.flush(); } + /** + * Sets the counter for tracking truncated events. Called by the operator after metrics are + * initialized. + * + * @param counter the counter to increment when events are truncated + */ + public void setTruncatedEventsCounter(Counter counter) { + this.truncatedEventsCounter = counter; + } + @Override public void close() throws Exception { if (writer != null) { diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java index 0e9657322..17a438600 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java @@ -37,6 +37,8 @@ public class BuiltInMetrics { private final Meter numOfActionsExecutedPerSec; + private final Counter eventLogTruncatedEvents; + private final HashMap actionMetricGroups; public BuiltInMetrics(FlinkAgentsMetricGroupImpl parentMetricGroup, AgentPlan agentPlan) { @@ -48,6 +50,8 @@ public BuiltInMetrics(FlinkAgentsMetricGroupImpl parentMetricGroup, AgentPlan ag this.numOfActionsExecutedPerSec = parentMetricGroup.getMeter("numOfActionsExecutedPerSec", numOfActionsExecuted); + this.eventLogTruncatedEvents = parentMetricGroup.getCounter("eventLogTruncatedEvents"); + this.actionMetricGroups = new HashMap<>(); for (String actionName : agentPlan.getActions().keySet()) { actionMetricGroups.put( @@ -69,4 +73,14 @@ public void markActionExecuted(String actionName) { numOfActionsExecutedPerSec.markEvent(); actionMetricGroups.get(actionName).markActionExecuted(); } + + /** Returns the counter tracking event log truncation occurrences. */ + public Counter getEventLogTruncatedEventsCounter() { + return eventLogTruncatedEvents; + } + + /** Increments the event log truncation counter. */ + public void markEventTruncated() { + eventLogTruncatedEvents.inc(); + } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index e5015a3a5..6ec16041b 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -342,6 +342,10 @@ private void initEventLogger(StreamingRuntimeContext runtimeContext) throws Exce return; } eventLogger.open(new EventLoggerOpenParams(runtimeContext)); + if (eventLogger instanceof FileEventLogger) { + ((FileEventLogger) eventLogger) + .setTruncatedEventsCounter(builtInMetrics.getEventLogTruncatedEventsCounter()); + } } @Override @@ -1132,6 +1136,8 @@ private EventLogger createEventLogger(AgentPlan agentPlan) { } loggerConfigBuilder.property( FileEventLogger.PRETTY_PRINT_PROPERTY_KEY, agentPlan.getConfig().get(PRETTY_PRINT)); + loggerConfigBuilder.property( + FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentPlan.getConfig().getConfData()); return EventLoggerFactory.createLogger(loggerConfigBuilder.build()); } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java index 9a6ca82d7..fd33eb1d2 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.api.OutputEvent; import org.apache.flink.agents.api.configuration.AgentConfigOptions; +import org.apache.flink.agents.api.logger.EventLogLevel; import org.apache.flink.agents.api.logger.EventLoggerConfig; import org.apache.flink.agents.api.logger.EventLoggerOpenParams; import org.apache.flink.api.common.JobID; @@ -40,7 +41,9 @@ import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.when; @@ -327,6 +330,228 @@ void testPrettyPrintOutputsFormattedJson() throws Exception { "Pretty-printed output should be valid JSON deserializable to EventLogRecord"); } + @Test + void testStandardLevelTruncation() throws Exception { + // Given - config with STANDARD level and a small max-string-length for easy testing + Map agentConfig = new HashMap<>(); + agentConfig.put("event-log.level", "STANDARD"); + agentConfig.put("event-log.standard.max-string-length", 10); + agentConfig.put("event-log.standard.max-array-elements", 20); + agentConfig.put("event-log.standard.max-depth", 5); + + config = + EventLoggerConfig.builder() + .loggerType("file") + .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString()) + .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentConfig) + .build(); + logger = new FileEventLogger(config); + logger.open(openParams); + + // Use a custom event with a very long string field + TestCustomEvent event = + new TestCustomEvent("this is a very long string that exceeds 10", 1); + EventContext context = new EventContext(event); + + logger.append(context, event); + logger.flush(); + + Path logFile = getExpectedLogFilePath(); + List lines = Files.readAllLines(logFile); + assertEquals(1, lines.size()); + + JsonNode jsonNode = objectMapper.readTree(lines.get(0)); + assertEquals("STANDARD", jsonNode.get("logLevel").asText()); + + // The customData field should be truncated + JsonNode eventNode = jsonNode.get("event"); + JsonNode customDataNode = eventNode.get("customData"); + assertTrue( + customDataNode.has("truncatedString"), + "Long string should be truncated at STANDARD level"); + assertTrue(customDataNode.has("omittedChars")); + } + + @Test + void testVerboseLevelNoTruncation() throws Exception { + // Given - config with VERBOSE level + Map agentConfig = new HashMap<>(); + agentConfig.put("event-log.level", "VERBOSE"); + agentConfig.put("event-log.standard.max-string-length", 10); + + config = + EventLoggerConfig.builder() + .loggerType("file") + .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString()) + .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentConfig) + .build(); + logger = new FileEventLogger(config); + logger.open(openParams); + + TestCustomEvent event = + new TestCustomEvent("this is a very long string that exceeds 10", 1); + EventContext context = new EventContext(event); + + logger.append(context, event); + logger.flush(); + + Path logFile = getExpectedLogFilePath(); + List lines = Files.readAllLines(logFile); + assertEquals(1, lines.size()); + + JsonNode jsonNode = objectMapper.readTree(lines.get(0)); + assertEquals("VERBOSE", jsonNode.get("logLevel").asText()); + + // The customData field should NOT be truncated + JsonNode eventNode = jsonNode.get("event"); + assertTrue( + eventNode.get("customData").isTextual(), + "String should be preserved at VERBOSE level"); + assertEquals( + "this is a very long string that exceeds 10", eventNode.get("customData").asText()); + } + + @Test + void testOffLevelSkipsEvent() throws Exception { + // Given - config with OFF level + Map agentConfig = new HashMap<>(); + agentConfig.put("event-log.level", "OFF"); + + config = + EventLoggerConfig.builder() + .loggerType("file") + .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString()) + .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentConfig) + .build(); + logger = new FileEventLogger(config); + logger.open(openParams); + + InputEvent event = new InputEvent("should not be logged"); + EventContext context = new EventContext(event); + + logger.append(context, event); + logger.flush(); + + Path logFile = getExpectedLogFilePath(); + List lines = Files.readAllLines(logFile); + assertEquals(0, lines.size(), "OFF level should produce no output"); + } + + @Test + void testPerTypeLevelOverride() throws Exception { + // Given - root is STANDARD but InputEvent is set to VERBOSE + Map agentConfig = new HashMap<>(); + agentConfig.put("event-log.level", "STANDARD"); + agentConfig.put("event-log.standard.max-string-length", 10); + agentConfig.put("event-log.type.org.apache.flink.agents.api.InputEvent.level", "VERBOSE"); + + config = + EventLoggerConfig.builder() + .loggerType("file") + .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString()) + .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentConfig) + .build(); + logger = new FileEventLogger(config); + logger.open(openParams); + + // InputEvent should be VERBOSE (no truncation) + InputEvent inputEvent = new InputEvent("this is a very long string that exceeds 10"); + logger.append(new EventContext(inputEvent), inputEvent); + + // TestCustomEvent should be STANDARD (truncated) + TestCustomEvent customEvent = + new TestCustomEvent("this is a very long string that exceeds 10", 1); + logger.append(new EventContext(customEvent), customEvent); + logger.flush(); + + Path logFile = getExpectedLogFilePath(); + List lines = Files.readAllLines(logFile); + assertEquals(2, lines.size()); + + // InputEvent at VERBOSE - no truncation + JsonNode inputJson = objectMapper.readTree(lines.get(0)); + assertEquals("VERBOSE", inputJson.get("logLevel").asText()); + assertTrue(inputJson.get("event").get("input").isTextual()); + + // TestCustomEvent at STANDARD - truncated + JsonNode customJson = objectMapper.readTree(lines.get(1)); + assertEquals("STANDARD", customJson.get("logLevel").asText()); + assertTrue(customJson.get("event").get("customData").has("truncatedString")); + } + + @Test + void testJsonOutputHasNewFields() throws Exception { + // Given - default config + logger.open(openParams); + InputEvent event = new InputEvent("test"); + EventContext context = new EventContext(event); + + logger.append(context, event); + logger.flush(); + + Path logFile = getExpectedLogFilePath(); + List lines = Files.readAllLines(logFile); + JsonNode jsonNode = objectMapper.readTree(lines.get(0)); + + // Verify new top-level fields exist + assertTrue(jsonNode.has("logLevel"), "JSON should have logLevel field"); + assertTrue(jsonNode.has("eventType"), "JSON should have eventType field"); + assertEquals("org.apache.flink.agents.api.InputEvent", jsonNode.get("eventType").asText()); + assertNotNull(jsonNode.get("logLevel").asText()); + } + + @Test + void testBackwardCompatibleDeserialization() throws Exception { + // Simulate old-format JSON without logLevel field + String oldFormatJson = + "{\"timestamp\":\"2024-01-15T10:30:00Z\"," + + "\"event\":{\"eventType\":\"org.apache.flink.agents.api.InputEvent\"," + + "\"id\":null,\"attributes\":{},\"input\":\"test\"}}"; + + EventLogRecord record = objectMapper.readValue(oldFormatJson, EventLogRecord.class); + assertEquals( + EventLogLevel.VERBOSE, + record.getLogLevel(), + "Missing logLevel should default to VERBOSE"); + assertNotNull(record.getEvent()); + assertInstanceOf(InputEvent.class, record.getEvent()); + } + + @Test + void testHierarchicalInheritance() throws Exception { + // Set package-level OFF, but specific type VERBOSE + Map agentConfig = new HashMap<>(); + agentConfig.put("event-log.level", "STANDARD"); + agentConfig.put("event-log.type.org.apache.flink.agents.api.level", "OFF"); + agentConfig.put("event-log.type.org.apache.flink.agents.api.InputEvent.level", "VERBOSE"); + + config = + EventLoggerConfig.builder() + .loggerType("file") + .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString()) + .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentConfig) + .build(); + logger = new FileEventLogger(config); + logger.open(openParams); + + // InputEvent has explicit VERBOSE override — should be logged + InputEvent inputEvent = new InputEvent("should be logged"); + logger.append(new EventContext(inputEvent), inputEvent); + + // OutputEvent inherits OFF from package level — should NOT be logged + OutputEvent outputEvent = new OutputEvent("should not be logged"); + logger.append(new EventContext(outputEvent), outputEvent); + logger.flush(); + + Path logFile = getExpectedLogFilePath(); + List lines = Files.readAllLines(logFile); + assertEquals(1, lines.size(), "Only InputEvent (VERBOSE override) should be logged"); + + JsonNode json = objectMapper.readTree(lines.get(0)); + assertEquals("VERBOSE", json.get("logLevel").asText()); + assertEquals("org.apache.flink.agents.api.InputEvent", json.get("eventType").asText()); + } + private Path getExpectedLogFilePath() { return tempDir.resolve( String.format( From fd7bd373924292d7dc044248ef8546d71df56ab8 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 6 Apr 2026 22:48:39 -0700 Subject: [PATCH 04/10] [python] Add event log level config options and E2E tests - Add EVENT_LOG_LEVEL, EVENT_LOG_MAX_STRING_LENGTH, EVENT_LOG_MAX_ARRAY_ELEMENTS, EVENT_LOG_MAX_DEPTH config options to Python AgentConfigOptions (keys and defaults match Java) - Update existing test_python_event_logging to verify new logLevel and top-level eventType fields in JSON output - Add test_event_log_verbose_level: verifies VERBOSE produces full content without truncation - Add test_event_log_off_level: verifies OFF suppresses all event logging - Add test_event_log_standard_truncation: verifies STANDARD with low max-string-length threshold produces truncatedString wrappers - Extract _run_event_logging_pipeline() and _read_log_records() helpers Part of #541: per-event-type configurable log levels. --- python/flink_agents/api/core_options.py | 25 ++++ .../python_event_logging_test.py | 128 +++++++++++++++++- 2 files changed, 150 insertions(+), 3 deletions(-) diff --git a/python/flink_agents/api/core_options.py b/python/flink_agents/api/core_options.py index 9245e78c0..98b8b3179 100644 --- a/python/flink_agents/api/core_options.py +++ b/python/flink_agents/api/core_options.py @@ -91,6 +91,31 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta): default=None, ) + # Event log level config options + EVENT_LOG_LEVEL = ConfigOption( + key="event-log.level", + config_type=str, + default="STANDARD", + ) + + EVENT_LOG_MAX_STRING_LENGTH = ConfigOption( + key="event-log.standard.max-string-length", + config_type=int, + default=2000, + ) + + EVENT_LOG_MAX_ARRAY_ELEMENTS = ConfigOption( + key="event-log.standard.max-array-elements", + config_type=int, + default=20, + ) + + EVENT_LOG_MAX_DEPTH = ConfigOption( + key="event-log.standard.max-depth", + config_type=int, + default=5, + ) + class AgentExecutionOptions: """Execution options for Flink Agents.""" diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py index 2db308042..da41c4706 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py +++ b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py @@ -105,9 +105,7 @@ def test_python_event_logging(tmp_path: Path) -> None: log_files = list(event_log_dir.glob("events-*.log")) # At least one log file should exist - assert len(log_files) > 0, ( - f"Event log files should be created in {event_log_dir}" - ) + assert len(log_files) > 0, f"Event log files should be created in {event_log_dir}" # Check that log files contain structured event content record = None @@ -129,6 +127,8 @@ def test_python_event_logging(tmp_path: Path) -> None: assert record is not None, "Event log file is empty." assert record_line is not None, "Event log file is empty." assert "timestamp" in record + assert "logLevel" in record + assert "eventType" in record assert "event" in record assert "eventType" in record["event"] assert has_processed_review, "Log should contain processed review content" @@ -140,3 +140,125 @@ def test_python_event_logging(tmp_path: Path) -> None: assert id_idx != -1 assert attributes_idx != -1 assert event_type_idx < id_idx < attributes_idx + + +def _run_event_logging_pipeline( + tmp_path: Path, config_overrides: dict[str, str] | None = None +) -> Path: + """Run the event logging pipeline and return the event log directory. + + Args: + tmp_path: Temporary directory for log output. + config_overrides: Optional dict of config key-value pairs to set. + + Returns: + The event log directory path. + """ + event_log_dir = tmp_path / "event_log" + default_log_dir = Path(tempfile.gettempdir()) / "flink-agents" + shutil.rmtree(default_log_dir, ignore_errors=True) + + config = Configuration() + env = StreamExecutionEnvironment.get_execution_environment(config) + env.set_runtime_mode(RuntimeExecutionMode.STREAMING) + env.set_parallelism(1) + + agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + agents_env.get_config().set_str("baseLogDir", str(event_log_dir)) + + if config_overrides: + for key, value in config_overrides.items(): + agents_env.get_config().set_str(key, value) + + current_dir = Path(__file__).parent + input_datastream = env.from_source( + source=FileSource.for_record_stream_format( + StreamFormat.text_line_format(), + f"file:///{current_dir}/../resources/input/input_data.txt", + ).build(), + watermark_strategy=WatermarkStrategy.no_watermarks(), + source_name="python_event_logging_test", + ) + + deserialize_datastream = input_datastream.map(lambda x: json.loads(x)) + + agents_env.from_datastream( + input=deserialize_datastream, key_selector=InputKeySelector() + ).apply(PythonEventLoggingAgent()).to_datastream() + + agents_env.execute() + return event_log_dir + + +def _read_log_records(event_log_dir: Path) -> list[dict]: + """Read all JSON records from event log files. + + Args: + event_log_dir: Directory containing event log files. + + Returns: + List of parsed JSON records. + """ + records: list[dict] = [] + for log_file in event_log_dir.glob("events-*.log"): + with log_file.open(encoding="utf-8") as handle: + records.extend(json.loads(line) for line in handle if line.strip()) + return records + + +def test_event_log_verbose_level(tmp_path: Path) -> None: + """Test that VERBOSE log level writes events without truncation.""" + event_log_dir = _run_event_logging_pipeline( + tmp_path, config_overrides={"event-log.level": "VERBOSE"} + ) + + records = _read_log_records(event_log_dir) + assert len(records) > 0, "VERBOSE level should produce event log records" + + for record in records: + assert record.get("logLevel") == "VERBOSE", ( + f"Expected logLevel VERBOSE, got {record.get('logLevel')}" + ) + + # VERBOSE should not truncate content (no truncatedString wrappers) + raw_content = json.dumps(records) + assert "truncatedString" not in raw_content, ( + "VERBOSE level should not truncate any content" + ) + + +def test_event_log_off_level(tmp_path: Path) -> None: + """Test that OFF log level suppresses all event logging.""" + event_log_dir = _run_event_logging_pipeline( + tmp_path, config_overrides={"event-log.level": "OFF"} + ) + + records = _read_log_records(event_log_dir) + assert len(records) == 0, ( + f"OFF level should not produce any event log records, but found {len(records)}" + ) + + +def test_event_log_standard_truncation(tmp_path: Path) -> None: + """Test that STANDARD level truncates strings exceeding max-string-length.""" + event_log_dir = _run_event_logging_pipeline( + tmp_path, + config_overrides={ + "event-log.level": "STANDARD", + "event-log.standard.max-string-length": "10", + }, + ) + + records = _read_log_records(event_log_dir) + assert len(records) > 0, "STANDARD level should produce event log records" + + for record in records: + assert record.get("logLevel") == "STANDARD", ( + f"Expected logLevel STANDARD, got {record.get('logLevel')}" + ) + + # With max-string-length=10, any string longer than 10 chars should be truncated + raw_content = json.dumps(records) + assert "truncatedString" in raw_content, ( + "STANDARD level with max-string-length=10 should truncate long strings" + ) From 96e8d65741ceafae3e18b051b74acdef640cb707 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 6 Apr 2026 23:41:19 -0700 Subject: [PATCH 05/10] [runtime] Fix PythonEvent type resolution and remove dead code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - FileEventLogger: use PythonEvent.getEventType() (Python module path) for level resolution instead of EventContext.getEventType() (Java wrapper class name). Without this, per-type Python config keys like event-log.type.flink_agents.api.events.event.OutputEvent.level would never match. - EventLogRecordJsonSerializer: use Python module path for top-level eventType field for PythonEvents, consistent with the eventType inside the event object. - BuiltInMetrics: remove dead markEventTruncated() method — counter is incremented via serializer path directly. Part of #541: per-event-type configurable log levels. --- .../eventlog/EventLogRecordJsonSerializer.java | 11 ++++++++++- .../agents/runtime/eventlog/FileEventLogger.java | 13 ++++++++++--- .../agents/runtime/metrics/BuiltInMetrics.java | 5 ----- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java index 6b04944d5..3c7b1dcd8 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java @@ -72,7 +72,16 @@ public void serialize(EventLogRecord record, JsonGenerator gen, SerializerProvid gen.writeStringField("timestamp", record.getContext().getTimestamp()); gen.writeStringField( "logLevel", record.getLogLevel() != null ? record.getLogLevel().name() : "VERBOSE"); - gen.writeStringField("eventType", record.getContext().getEventType()); + // For PythonEvent, use the Python module path as the top-level eventType + // (consistent with the eventType inside the event object) + String topLevelEventType = record.getContext().getEventType(); + if (record.getEvent() instanceof PythonEvent) { + String pythonType = ((PythonEvent) record.getEvent()).getEventType(); + if (pythonType != null) { + topLevelEventType = pythonType; + } + } + gen.writeStringField("eventType", topLevelEventType); gen.writeFieldName("event"); JsonNode eventNode = buildEventNode(record.getEvent(), mapper); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java index 294198528..c09a9716b 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java @@ -26,6 +26,7 @@ import org.apache.flink.agents.api.logger.EventLogger; import org.apache.flink.agents.api.logger.EventLoggerConfig; import org.apache.flink.agents.api.logger.EventLoggerOpenParams; +import org.apache.flink.agents.runtime.python.event.PythonEvent; import org.apache.flink.metrics.Counter; import java.io.BufferedWriter; @@ -177,11 +178,17 @@ public void append(EventContext context, Event event) throws Exception { throw new IllegalStateException("FileEventLogger not initialized. Call open() first."); } + // For PythonEvent, use the Python module path as the event type for level resolution + // and top-level eventType in the JSON output. EventContext.getEventType() returns the + // Java wrapper class name (PythonEvent), not the actual Python event type. + String eventTypeStr = context.getEventType(); + if (event instanceof PythonEvent && ((PythonEvent) event).getEventType() != null) { + eventTypeStr = ((PythonEvent) event).getEventType(); + } + // Resolve log level and skip OFF events EventLogLevel level = - levelResolver != null - ? levelResolver.resolve(context.getEventType()) - : EventLogLevel.VERBOSE; + levelResolver != null ? levelResolver.resolve(eventTypeStr) : EventLogLevel.VERBOSE; if (level == EventLogLevel.OFF) { return; } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java index 17a438600..59e0daaa1 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java @@ -78,9 +78,4 @@ public void markActionExecuted(String actionName) { public Counter getEventLogTruncatedEventsCounter() { return eventLogTruncatedEvents; } - - /** Increments the event log truncation counter. */ - public void markEventTruncated() { - eventLogTruncatedEvents.inc(); - } } From c3bbc3eb7847fdf8cf7ebc6383873c884d9ccf5f Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Tue, 7 Apr 2026 10:50:47 -0700 Subject: [PATCH 06/10] Retrigger CI From 48ed7dbeeba4d32caab4de04d69ecde6255b2a66 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Tue, 7 Apr 2026 23:57:00 -0700 Subject: [PATCH 07/10] Retrigger CI From 2f9eb2389210b6f606b7c9d958779b0c87ced0f4 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Thu, 16 Apr 2026 09:15:50 -0700 Subject: [PATCH 08/10] [runtime] Address review feedback on event log level feature - Use ConfigOption enum type instead of String - Derive root key and default from EVENT_LOG_LEVEL config constant - Add @Nullable on transient truncator/counter fields in EventLogRecord - Extract EventUtil.resolveEventType() for PythonEvent type resolution --- .../api/configuration/AgentConfigOptions.java | 6 ++++-- .../eventlog/EventLogLevelResolver.java | 11 +++------- .../runtime/eventlog/EventLogRecord.java | 6 ++++-- .../EventLogRecordJsonSerializer.java | 13 +++--------- .../runtime/eventlog/FileEventLogger.java | 10 ++------- .../flink/agents/runtime/utils/EventUtil.java | 21 +++++++++++++++++++ 6 files changed, 37 insertions(+), 30 deletions(-) diff --git a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java index 82a63e8f9..9cdda8bde 100644 --- a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java +++ b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java @@ -17,6 +17,8 @@ */ package org.apache.flink.agents.api.configuration; +import org.apache.flink.agents.api.logger.EventLogLevel; + /** The set of configuration options for agents parameters. */ public class AgentConfigOptions { @@ -59,8 +61,8 @@ public class AgentConfigOptions { * The global event log level controlling the default verbosity for all event types. Valid * values are "OFF", "STANDARD", and "VERBOSE". Defaults to "STANDARD". */ - public static final ConfigOption EVENT_LOG_LEVEL = - new ConfigOption<>("event-log.level", String.class, "STANDARD"); + public static final ConfigOption EVENT_LOG_LEVEL = + new ConfigOption<>("event-log.level", EventLogLevel.class, EventLogLevel.STANDARD); /** * The maximum string length for event payloads when logging at STANDARD level. Strings diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java index 6db7e4c70..f2f4b5027 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java @@ -18,6 +18,7 @@ package org.apache.flink.agents.runtime.eventlog; +import org.apache.flink.agents.api.configuration.AgentConfigOptions; import org.apache.flink.agents.api.logger.EventLogLevel; import java.util.Collections; @@ -52,18 +53,12 @@ */ public class EventLogLevelResolver { - /** Config key for the root default log level. */ - static final String ROOT_LEVEL_KEY = "event-log.level"; - /** Prefix for per-event-type log level config keys. */ static final String TYPE_PREFIX = "event-log.type."; /** Suffix for per-event-type log level config keys. */ static final String TYPE_SUFFIX = ".level"; - /** Built-in default when no configuration is provided. */ - static final EventLogLevel BUILT_IN_DEFAULT = EventLogLevel.STANDARD; - private final EventLogLevel rootDefault; private final Map explicitLevels; private final ConcurrentHashMap cache; @@ -81,11 +76,11 @@ public EventLogLevelResolver(Map confData) { Map data = confData != null ? confData : Collections.emptyMap(); // Parse root default - Object rootValue = data.get(ROOT_LEVEL_KEY); + Object rootValue = data.get(AgentConfigOptions.EVENT_LOG_LEVEL.getKey()); this.rootDefault = rootValue != null ? EventLogLevel.fromString(rootValue.toString()) - : BUILT_IN_DEFAULT; + : AgentConfigOptions.EVENT_LOG_LEVEL.getDefaultValue(); // Scan for per-type overrides Map levels = new HashMap<>(); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java index fe95cde02..eed055426 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java @@ -25,6 +25,8 @@ import org.apache.flink.agents.api.logger.EventLogLevel; import org.apache.flink.metrics.Counter; +import javax.annotation.Nullable; + /** * Represents a record in the event log, containing the event context and the event itself. * @@ -44,8 +46,8 @@ public class EventLogRecord { private final Event event; private final EventLogLevel logLevel; private final String eventType; - private final transient JsonTruncator truncator; - private final transient Counter truncatedEventsCounter; + @Nullable private final transient JsonTruncator truncator; + @Nullable private final transient Counter truncatedEventsCounter; /** * Creates a record with default VERBOSE log level and no truncator. Used by the deserializer diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java index 3c7b1dcd8..8a98bde1e 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java @@ -27,6 +27,7 @@ import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.logger.EventLogLevel; import org.apache.flink.agents.runtime.python.event.PythonEvent; +import org.apache.flink.agents.runtime.utils.EventUtil; import java.io.IOException; import java.util.Iterator; @@ -72,16 +73,8 @@ public void serialize(EventLogRecord record, JsonGenerator gen, SerializerProvid gen.writeStringField("timestamp", record.getContext().getTimestamp()); gen.writeStringField( "logLevel", record.getLogLevel() != null ? record.getLogLevel().name() : "VERBOSE"); - // For PythonEvent, use the Python module path as the top-level eventType - // (consistent with the eventType inside the event object) - String topLevelEventType = record.getContext().getEventType(); - if (record.getEvent() instanceof PythonEvent) { - String pythonType = ((PythonEvent) record.getEvent()).getEventType(); - if (pythonType != null) { - topLevelEventType = pythonType; - } - } - gen.writeStringField("eventType", topLevelEventType); + gen.writeStringField( + "eventType", EventUtil.resolveEventType(record.getEvent(), record.getContext())); gen.writeFieldName("event"); JsonNode eventNode = buildEventNode(record.getEvent(), mapper); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java index c09a9716b..d4fbc1b28 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java @@ -26,7 +26,7 @@ import org.apache.flink.agents.api.logger.EventLogger; import org.apache.flink.agents.api.logger.EventLoggerConfig; import org.apache.flink.agents.api.logger.EventLoggerOpenParams; -import org.apache.flink.agents.runtime.python.event.PythonEvent; +import org.apache.flink.agents.runtime.utils.EventUtil; import org.apache.flink.metrics.Counter; import java.io.BufferedWriter; @@ -178,13 +178,7 @@ public void append(EventContext context, Event event) throws Exception { throw new IllegalStateException("FileEventLogger not initialized. Call open() first."); } - // For PythonEvent, use the Python module path as the event type for level resolution - // and top-level eventType in the JSON output. EventContext.getEventType() returns the - // Java wrapper class name (PythonEvent), not the actual Python event type. - String eventTypeStr = context.getEventType(); - if (event instanceof PythonEvent && ((PythonEvent) event).getEventType() != null) { - eventTypeStr = ((PythonEvent) event).getEventType(); - } + String eventTypeStr = EventUtil.resolveEventType(event, context); // Resolve log level and skip OFF events EventLogLevel level = diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/utils/EventUtil.java b/runtime/src/main/java/org/apache/flink/agents/runtime/utils/EventUtil.java index e262a1b75..45a567558 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/utils/EventUtil.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/utils/EventUtil.java @@ -18,6 +18,7 @@ package org.apache.flink.agents.runtime.utils; import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.EventContext; import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.api.OutputEvent; import org.apache.flink.agents.runtime.python.event.PythonEvent; @@ -38,6 +39,26 @@ public static boolean isInputEvent(Event event) { && ((PythonEvent) event).getEventType().equals(PYTHON_INPUT_EVENT_NAME); } + /** + * Resolves the event type string for the given event. For {@link PythonEvent}, returns the + * Python module path (e.g., {@code flink_agents.api.events.event.OutputEvent}) instead of the + * Java wrapper class name. Falls back to {@link EventContext#getEventType()} for all other + * events. + * + * @param event the event + * @param context the event context + * @return the resolved event type string + */ + public static String resolveEventType(Event event, EventContext context) { + if (event instanceof PythonEvent) { + String pythonType = ((PythonEvent) event).getEventType(); + if (pythonType != null) { + return pythonType; + } + } + return context.getEventType(); + } + public static boolean isOutputEvent(Event event) { if (event instanceof OutputEvent) { return true; From c4596bc3b810e7c57a95dce113bcef4d5b5c0c69 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Wed, 29 Apr 2026 07:38:19 -0700 Subject: [PATCH 09/10] [runtime] Move event-log orchestration out of EventLogRecord MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit EventLogRecord no longer carries serialization-time state (logLevel, eventType, truncator, truncatedEventsCounter). It returns to a pure (context, event) data carrier. FileEventLogger.append() now resolves the level, builds the JSON tree, injects logLevel, truncates the event subtree at STANDARD, and writes — keeping the same on-disk JSON layout (timestamp, logLevel, eventType, event). The deserializer silently ignores any logLevel field present in older log files for backward compatibility. Addresses review feedback from @wenjin272 (#609 r3107948831, replying to round-1 r3061919972) and @xintongsong (#609 r3108788804). --- .../runtime/eventlog/EventLogRecord.java | 61 ------------------- .../EventLogRecordJsonDeserializer.java | 17 +----- .../EventLogRecordJsonSerializer.java | 14 +---- .../runtime/eventlog/FileEventLogger.java | 42 ++++++++++--- .../runtime/eventlog/FileEventLoggerTest.java | 9 ++- 5 files changed, 43 insertions(+), 100 deletions(-) diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java index eed055426..402c3f52a 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java @@ -22,10 +22,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.EventContext; -import org.apache.flink.agents.api.logger.EventLogLevel; -import org.apache.flink.metrics.Counter; - -import javax.annotation.Nullable; /** * Represents a record in the event log, containing the event context and the event itself. @@ -35,53 +31,16 @@ * *

The class uses custom JSON serialization/deserialization to handle polymorphic Event types by * leveraging the eventType information stored in the EventContext. - * - *

Each record carries a {@link EventLogLevel} that controls truncation behavior during - * serialization, and a convenience copy of the event type string from the context. */ @JsonSerialize(using = EventLogRecordJsonSerializer.class) @JsonDeserialize(using = EventLogRecordJsonDeserializer.class) public class EventLogRecord { private final EventContext context; private final Event event; - private final EventLogLevel logLevel; - private final String eventType; - @Nullable private final transient JsonTruncator truncator; - @Nullable private final transient Counter truncatedEventsCounter; - /** - * Creates a record with default VERBOSE log level and no truncator. Used by the deserializer - * and existing code paths that do not need level-aware serialization. - * - * @param context the event context - * @param event the event - */ public EventLogRecord(EventContext context, Event event) { - this(context, event, EventLogLevel.VERBOSE, null, null); - } - - /** - * Creates a record with the specified log level, truncator, and optional metric counter for - * level-aware serialization. - * - * @param context the event context - * @param event the event - * @param logLevel the resolved log level for this event - * @param truncator the truncator to apply when level is STANDARD, may be null - * @param truncatedEventsCounter counter to increment when truncation occurs, may be null - */ - public EventLogRecord( - EventContext context, - Event event, - EventLogLevel logLevel, - JsonTruncator truncator, - Counter truncatedEventsCounter) { this.context = context; this.event = event; - this.logLevel = logLevel != null ? logLevel : EventLogLevel.VERBOSE; - this.eventType = context != null ? context.getEventType() : null; - this.truncator = truncator; - this.truncatedEventsCounter = truncatedEventsCounter; } /** Returns the event context. */ @@ -93,24 +52,4 @@ public EventContext getContext() { public Event getEvent() { return event; } - - /** Returns the log level for this record. */ - public EventLogLevel getLogLevel() { - return logLevel; - } - - /** Returns the event type string (convenience copy from context). */ - public String getEventType() { - return eventType; - } - - /** Returns the truncator, or null if no truncation should be applied. */ - public JsonTruncator getTruncator() { - return truncator; - } - - /** Returns the counter for tracking truncated events, or null if not set. */ - public Counter getTruncatedEventsCounter() { - return truncatedEventsCounter; - } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java index e0cc84bb0..a3a330cb3 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java @@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.EventContext; -import org.apache.flink.agents.api.logger.EventLogLevel; import java.io.IOException; @@ -59,18 +58,8 @@ public EventLogRecord deserialize(JsonParser parser, DeserializationContext cont throw new IOException("Missing 'timestamp' field in EventLogRecord JSON"); } - // Read optional logLevel field (backward-compatible: default to VERBOSE if absent) - EventLogLevel logLevel = EventLogLevel.VERBOSE; - JsonNode logLevelNode = rootNode.get("logLevel"); - if (logLevelNode != null && logLevelNode.isTextual()) { - try { - logLevel = EventLogLevel.fromString(logLevelNode.asText()); - } catch (IllegalArgumentException ignored) { - // Fall back to VERBOSE for unrecognized values - } - } - - // Deserialize event using eventType from event node + // Deserialize event using eventType from event node. Any top-level "logLevel" field + // present in older log files is silently ignored — it is no longer part of the record. JsonNode eventNode = rootNode.get("event"); if (eventNode == null) { throw new IOException("Missing 'event' field in EventLogRecord JSON"); @@ -80,7 +69,7 @@ public EventLogRecord deserialize(JsonParser parser, DeserializationContext cont Event event = deserializeEvent(mapper, stripEventType(eventNode), eventType); EventContext eventContext = new EventContext(eventType, timestampNode.asText()); - return new EventLogRecord(eventContext, event, logLevel, null, null); + return new EventLogRecord(eventContext, event); } /** diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java index 8a98bde1e..166f7424c 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.agents.api.Event; -import org.apache.flink.agents.api.logger.EventLogLevel; import org.apache.flink.agents.runtime.python.event.PythonEvent; import org.apache.flink.agents.runtime.utils.EventUtil; @@ -49,11 +48,10 @@ *

{@code
  * {
  *   "timestamp": "2024-01-15T10:30:00Z",
- *   "logLevel": "STANDARD",
  *   "eventType": "org.apache.flink.agents.api.InputEvent",
  *   "event": {
  *     "eventType": "org.apache.flink.agents.api.InputEvent",
- *     // Event-specific fields (may be truncated at STANDARD level)
+ *     // Event-specific fields
  *   }
  * }
  * }
@@ -71,8 +69,6 @@ public void serialize(EventLogRecord record, JsonGenerator gen, SerializerProvid gen.writeStartObject(); gen.writeStringField("timestamp", record.getContext().getTimestamp()); - gen.writeStringField( - "logLevel", record.getLogLevel() != null ? record.getLogLevel().name() : "VERBOSE"); gen.writeStringField( "eventType", EventUtil.resolveEventType(record.getEvent(), record.getContext())); @@ -85,14 +81,6 @@ public void serialize(EventLogRecord record, JsonGenerator gen, SerializerProvid ObjectNode orderedNode = reorderEventFields((ObjectNode) eventNode, record.getEvent(), mapper); - // Apply truncation for STANDARD level - if (record.getLogLevel() == EventLogLevel.STANDARD && record.getTruncator() != null) { - boolean truncated = record.getTruncator().truncate(orderedNode); - if (truncated && record.getTruncatedEventsCounter() != null) { - record.getTruncatedEventsCounter().inc(); - } - } - gen.writeTree(orderedNode); gen.writeEndObject(); } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java index d4fbc1b28..b09af65d4 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java @@ -18,7 +18,9 @@ package org.apache.flink.agents.runtime.eventlog; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.EventContext; import org.apache.flink.agents.api.configuration.AgentConfigOptions; @@ -180,21 +182,47 @@ public void append(EventContext context, Event event) throws Exception { String eventTypeStr = EventUtil.resolveEventType(event, context); - // Resolve log level and skip OFF events + // Resolve log level and skip OFF events. EventLogLevel level = levelResolver != null ? levelResolver.resolve(eventTypeStr) : EventLogLevel.VERBOSE; if (level == EventLogLevel.OFF) { return; } - EventLogRecord record = - new EventLogRecord(context, event, level, truncator, truncatedEventsCounter); - // All events should be JSON serializable, since we check it when sending events to context: - // RunnerContextImpl.sendEvent + // All events should be JSON serializable; we already check this when sending events + // to context (RunnerContextImpl.sendEvent). + EventLogRecord record = new EventLogRecord(context, event); + JsonNode tree = MAPPER.valueToTree(record); + if (!(tree instanceof ObjectNode)) { + throw new IllegalStateException( + "EventLogRecord must serialize to a JSON object, but was: " + + tree.getNodeType()); + } + ObjectNode rootNode = (ObjectNode) tree; + + // Truncate the event subtree at STANDARD level. + if (level == EventLogLevel.STANDARD && truncator != null) { + JsonNode eventNode = rootNode.get("event"); + if (eventNode instanceof ObjectNode) { + boolean truncated = truncator.truncate((ObjectNode) eventNode); + if (truncated && truncatedEventsCounter != null) { + truncatedEventsCounter.inc(); + } + } + } + + // Rebuild the top-level object so logLevel sits between timestamp and eventType, + // matching the documented JSON layout. + ObjectNode ordered = MAPPER.createObjectNode(); + ordered.set("timestamp", rootNode.get("timestamp")); + ordered.put("logLevel", level.name()); + ordered.set("eventType", rootNode.get("eventType")); + ordered.set("event", rootNode.get("event")); + String json = prettyPrint - ? MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(record) - : MAPPER.writeValueAsString(record); + ? MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(ordered) + : MAPPER.writeValueAsString(ordered); writer.println(json); } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java index fd33eb1d2..05176e53e 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java @@ -25,7 +25,6 @@ import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.api.OutputEvent; import org.apache.flink.agents.api.configuration.AgentConfigOptions; -import org.apache.flink.agents.api.logger.EventLogLevel; import org.apache.flink.agents.api.logger.EventLoggerConfig; import org.apache.flink.agents.api.logger.EventLoggerOpenParams; import org.apache.flink.api.common.JobID; @@ -509,12 +508,12 @@ void testBackwardCompatibleDeserialization() throws Exception { + "\"id\":null,\"attributes\":{},\"input\":\"test\"}}"; EventLogRecord record = objectMapper.readValue(oldFormatJson, EventLogRecord.class); - assertEquals( - EventLogLevel.VERBOSE, - record.getLogLevel(), - "Missing logLevel should default to VERBOSE"); assertNotNull(record.getEvent()); assertInstanceOf(InputEvent.class, record.getEvent()); + assertEquals( + "test", + ((InputEvent) record.getEvent()).getInput(), + "Old-format JSON without logLevel should still deserialize the event payload"); } @Test From 43569795bf2cc7943e57e17c231d7eb4cd668dbe Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Wed, 29 Apr 2026 07:38:32 -0700 Subject: [PATCH 10/10] [runtime] Truncate arrays before recursing into retained elements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In JsonTruncator, run truncateArray before truncateArrayContents at both call sites (truncateObject array branch and the nested-array branch in truncateArrayContents). When an array is over the size cap, recursion now visits only the retained subset inside the wrapper's truncatedList — elements that will be dropped are never traversed. Adds a regression test that holds references to dropped tail ObjectNodes and asserts their long-string fields remain unmutated after truncation. Addresses review feedback from @xintongsong (#609 r3108615942). --- .../runtime/eventlog/JsonTruncator.java | 14 ++++-- .../runtime/eventlog/JsonTruncatorTest.java | 44 +++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java index 9002c088b..022ca4cf7 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java @@ -123,12 +123,16 @@ private boolean truncateObject(ObjectNode node, int depth, boolean isTopLevel) { truncated = true; } } else if (child.isArray()) { - // First recurse into retained elements, then truncate the array if needed - truncated |= truncateArrayContents((ArrayNode) child, depth + 1); + // Truncate the array first so we don't recurse into elements that will be dropped. JsonNode replacement = truncateArray((ArrayNode) child); if (replacement != null) { + ArrayNode retained = + (ArrayNode) ((ObjectNode) replacement).get("truncatedList"); + truncateArrayContents(retained, depth + 1); node.set(fieldName, replacement); truncated = true; + } else { + truncated |= truncateArrayContents((ArrayNode) child, depth + 1); } } else if (child.isObject()) { truncated |= truncateObject((ObjectNode) child, depth + 1, false); @@ -190,11 +194,15 @@ private boolean truncateArrayContents(ArrayNode array, int depth) { } else if (element.isObject()) { truncated |= truncateObject((ObjectNode) element, depth, false); } else if (element.isArray()) { - truncated |= truncateArrayContents((ArrayNode) element, depth + 1); JsonNode replacement = truncateArray((ArrayNode) element); if (replacement != null) { + ArrayNode retained = + (ArrayNode) ((ObjectNode) replacement).get("truncatedList"); + truncateArrayContents(retained, depth + 1); array.set(i, replacement); truncated = true; + } else { + truncated |= truncateArrayContents((ArrayNode) element, depth + 1); } } } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java index 73696e69d..bd05b9a10 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java @@ -196,6 +196,50 @@ void testCompositeScenario() { assertThat(metadataInner.get("omittedFields").asInt()).isEqualTo(1); } + @Test + void testRecursionSkipsDroppedTailElements() { + // Regression test: when an array exceeds maxArrayElements, the elements beyond the cap + // must NOT be recursed into. Prior to the fix, truncateArrayContents ran on the full + // array before truncateArray dropped the tail, causing wasted CPU/allocations on + // elements that were about to be discarded. + JsonTruncator truncator = new JsonTruncator(5, 2, 0); + ObjectNode node = MAPPER.createObjectNode(); + ArrayNode items = MAPPER.createArrayNode(); + // Two short strings that fit under the retained cap and the string limit. + items.add("a"); + items.add("b"); + // Three tail ObjectNodes whose long-string fields would be truncated if visited. + String longValue = "this-string-is-much-longer-than-five-chars"; + ObjectNode tail0 = MAPPER.createObjectNode(); + tail0.put("payload", longValue); + ObjectNode tail1 = MAPPER.createObjectNode(); + tail1.put("payload", longValue); + ObjectNode tail2 = MAPPER.createObjectNode(); + tail2.put("payload", longValue); + items.add(tail0); + items.add(tail1); + items.add(tail2); + node.set("items", items); + + boolean result = truncator.truncate(node); + + assertThat(result).isTrue(); + // Retained list is exactly the first 2 elements; 3 are dropped. + assertThat(node.get("items").get("truncatedList").size()).isEqualTo(2); + assertThat(node.get("items").get("truncatedList").get(0).asText()).isEqualTo("a"); + assertThat(node.get("items").get("truncatedList").get(1).asText()).isEqualTo("b"); + assertThat(node.get("items").get("omittedElements").asInt()).isEqualTo(3); + // The dropped tail ObjectNodes were never visited — their payload fields remain + // untouched (still raw strings, not wrapped truncatedString objects). Under the old + // pre-reorder code path, these would have been mutated before being discarded. + assertThat(tail0.get("payload").isTextual()).isTrue(); + assertThat(tail0.get("payload").asText()).isEqualTo(longValue); + assertThat(tail1.get("payload").isTextual()).isTrue(); + assertThat(tail1.get("payload").asText()).isEqualTo(longValue); + assertThat(tail2.get("payload").isTextual()).isTrue(); + assertThat(tail2.get("payload").asText()).isEqualTo(longValue); + } + @Test void testNullNode() { JsonTruncator truncator = new JsonTruncator(10, 10, 10);