diff --git a/api/src/main/java/org/apache/flink/agents/api/EventFilter.java b/api/src/main/java/org/apache/flink/agents/api/EventFilter.java
deleted file mode 100644
index ce97cc59e..000000000
--- a/api/src/main/java/org/apache/flink/agents/api/EventFilter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.agents.api;
-
-/**
- * Interface for filtering events in event logging and listening.
- *
- *
EventFilter allows fine-grained control over which events are processed by event logs and
- * event listeners. Implementations can filter based on event type, attributes, or custom logic.
- */
-@FunctionalInterface
-public interface EventFilter {
- /**
- * Determines whether an event should be processed.
- *
- * @param event The event to evaluate
- * @param context The context associated with the event
- * @return true if the event should be processed, false otherwise
- */
- boolean accept(Event event, EventContext context);
-
- /**
- * Creates a filter that accepts events of the specified types.
- *
- * @param eventTypes The event types to accept
- * @return An EventFilter that accepts only the specified event types
- */
- @SafeVarargs
- static EventFilter byEventType(Class extends Event>... eventTypes) {
- return (event, context) -> {
- for (Class extends Event> eventType : eventTypes) {
- if (eventType.isInstance(event)) {
- return true;
- }
- }
- return false;
- };
- }
-
- /** A filter that accepts all events. */
- EventFilter ACCEPT_ALL = (event, context) -> true;
-
- /** A filter that rejects all events. */
- EventFilter REJECT_ALL = (event, context) -> false;
-}
diff --git a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 724247456..9cdda8bde 100644
--- a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++ b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -17,6 +17,8 @@
*/
package org.apache.flink.agents.api.configuration;
+import org.apache.flink.agents.api.logger.EventLogLevel;
+
/** The set of configuration options for agents parameters. */
public class AgentConfigOptions {
@@ -54,4 +56,32 @@ public class AgentConfigOptions {
/** The config parameter specifies the unique identifier of job. */
public static final ConfigOption JOB_IDENTIFIER =
new ConfigOption<>("job-identifier", String.class, null);
+
+ /**
+ * The global event log level controlling the default verbosity for all event types. Valid
+ * values are "OFF", "STANDARD", and "VERBOSE". Defaults to "STANDARD".
+ */
+ public static final ConfigOption EVENT_LOG_LEVEL =
+ new ConfigOption<>("event-log.level", EventLogLevel.class, EventLogLevel.STANDARD);
+
+ /**
+ * The maximum string length for event payloads when logging at STANDARD level. Strings
+ * exceeding this length will be truncated. Defaults to 2000.
+ */
+ public static final ConfigOption EVENT_LOG_MAX_STRING_LENGTH =
+ new ConfigOption<>("event-log.standard.max-string-length", Integer.class, 2000);
+
+ /**
+ * The maximum number of array elements to include in event payloads when logging at STANDARD
+ * level. Arrays exceeding this size will be truncated. Defaults to 20.
+ */
+ public static final ConfigOption EVENT_LOG_MAX_ARRAY_ELEMENTS =
+ new ConfigOption<>("event-log.standard.max-array-elements", Integer.class, 20);
+
+ /**
+ * The maximum nesting depth for event payloads when logging at STANDARD level. Objects deeper
+ * than this level will be summarized. Defaults to 5.
+ */
+ public static final ConfigOption EVENT_LOG_MAX_DEPTH =
+ new ConfigOption<>("event-log.standard.max-depth", Integer.class, 5);
}
diff --git a/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java b/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java
new file mode 100644
index 000000000..118e265cc
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.logger;
+
+/**
+ * Log level for event logging, controlling the verbosity of event log output.
+ *
+ *
+ * - {@link #OFF} - No events of this type are logged.
+ *
- {@link #STANDARD} - Events are logged with truncated/summarized payloads (default).
+ *
- {@link #VERBOSE} - Events are logged with full, untruncated payloads.
+ *
+ */
+public enum EventLogLevel {
+
+ /** No events of this type are logged. */
+ OFF,
+
+ /** Events are logged with truncated/summarized payloads. This is the default level. */
+ STANDARD,
+
+ /** Events are logged with full, untruncated payloads. */
+ VERBOSE;
+
+ /**
+ * Parses a string value into an {@link EventLogLevel}, case-insensitively.
+ *
+ * @param value the string representation of the log level (e.g., "off", "STANDARD", "Verbose")
+ * @return the corresponding {@link EventLogLevel}
+ * @throws IllegalArgumentException if the value does not match any log level
+ */
+ public static EventLogLevel fromString(String value) {
+ if (value == null) {
+ throw new IllegalArgumentException("EventLogLevel value cannot be null");
+ }
+ try {
+ return valueOf(value.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Invalid EventLogLevel: '"
+ + value
+ + "'. Valid values are: OFF, STANDARD, VERBOSE");
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
index f57adef6d..1dedeef02 100644
--- a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
+++ b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
@@ -18,8 +18,6 @@
package org.apache.flink.agents.api.logger;
-import org.apache.flink.agents.api.EventFilter;
-
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -45,14 +43,11 @@
public final class EventLoggerConfig {
private final String loggerType;
- private final EventFilter eventFilter;
private final Map properties;
/** Private constructor - use {@link #builder()} to create instances. */
- private EventLoggerConfig(
- String loggerType, EventFilter eventFilter, Map properties) {
+ private EventLoggerConfig(String loggerType, Map properties) {
this.loggerType = Objects.requireNonNull(loggerType, "Logger type cannot be null");
- this.eventFilter = eventFilter == null ? EventFilter.ACCEPT_ALL : eventFilter;
this.properties = Collections.unmodifiableMap(new HashMap<>(properties));
}
@@ -81,15 +76,6 @@ public String getLoggerType() {
return loggerType;
}
- /**
- * Gets the event filter for this logger configuration.
- *
- * @return the EventFilter to apply, never null
- */
- public EventFilter getEventFilter() {
- return eventFilter;
- }
-
/**
* Gets the implementation-specific properties for this logger configuration.
*
@@ -113,13 +99,12 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
EventLoggerConfig that = (EventLoggerConfig) o;
return Objects.equals(loggerType, that.loggerType)
- && Objects.equals(eventFilter, that.eventFilter)
&& Objects.equals(properties, that.properties);
}
@Override
public int hashCode() {
- return Objects.hash(loggerType, eventFilter, properties);
+ return Objects.hash(loggerType, properties);
}
@Override
@@ -128,8 +113,6 @@ public String toString() {
+ "loggerType='"
+ loggerType
+ '\''
- + ", eventFilter="
- + eventFilter
+ ", properties="
+ properties
+ '}';
@@ -143,7 +126,6 @@ public String toString() {
*/
public static final class Builder {
private String loggerType = "file"; // Default to file logger
- private EventFilter eventFilter = EventFilter.ACCEPT_ALL; // Default to accept all
private final Map properties = new HashMap<>();
private Builder() {}
@@ -163,18 +145,6 @@ public Builder loggerType(String loggerType) {
return this;
}
- /**
- * Sets the event filter for this configuration.
- *
- * @param eventFilter the EventFilter to apply
- * @return this Builder instance for method chaining
- * @throws IllegalArgumentException if eventFilter is null
- */
- public Builder eventFilter(EventFilter eventFilter) {
- this.eventFilter = Objects.requireNonNull(eventFilter, "Event filter cannot be null");
- return this;
- }
-
/**
* Adds a property to the configuration.
*
@@ -213,7 +183,7 @@ public Builder properties(Map properties) {
* @return a new EventLoggerConfig instance
*/
public EventLoggerConfig build() {
- return new EventLoggerConfig(loggerType, eventFilter, properties);
+ return new EventLoggerConfig(loggerType, properties);
}
}
}
diff --git a/python/flink_agents/api/core_options.py b/python/flink_agents/api/core_options.py
index 9245e78c0..98b8b3179 100644
--- a/python/flink_agents/api/core_options.py
+++ b/python/flink_agents/api/core_options.py
@@ -91,6 +91,31 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
default=None,
)
+ # Event log level config options
+ EVENT_LOG_LEVEL = ConfigOption(
+ key="event-log.level",
+ config_type=str,
+ default="STANDARD",
+ )
+
+ EVENT_LOG_MAX_STRING_LENGTH = ConfigOption(
+ key="event-log.standard.max-string-length",
+ config_type=int,
+ default=2000,
+ )
+
+ EVENT_LOG_MAX_ARRAY_ELEMENTS = ConfigOption(
+ key="event-log.standard.max-array-elements",
+ config_type=int,
+ default=20,
+ )
+
+ EVENT_LOG_MAX_DEPTH = ConfigOption(
+ key="event-log.standard.max-depth",
+ config_type=int,
+ default=5,
+ )
+
class AgentExecutionOptions:
"""Execution options for Flink Agents."""
diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
index 2db308042..da41c4706 100644
--- a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
+++ b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
@@ -105,9 +105,7 @@ def test_python_event_logging(tmp_path: Path) -> None:
log_files = list(event_log_dir.glob("events-*.log"))
# At least one log file should exist
- assert len(log_files) > 0, (
- f"Event log files should be created in {event_log_dir}"
- )
+ assert len(log_files) > 0, f"Event log files should be created in {event_log_dir}"
# Check that log files contain structured event content
record = None
@@ -129,6 +127,8 @@ def test_python_event_logging(tmp_path: Path) -> None:
assert record is not None, "Event log file is empty."
assert record_line is not None, "Event log file is empty."
assert "timestamp" in record
+ assert "logLevel" in record
+ assert "eventType" in record
assert "event" in record
assert "eventType" in record["event"]
assert has_processed_review, "Log should contain processed review content"
@@ -140,3 +140,125 @@ def test_python_event_logging(tmp_path: Path) -> None:
assert id_idx != -1
assert attributes_idx != -1
assert event_type_idx < id_idx < attributes_idx
+
+
+def _run_event_logging_pipeline(
+ tmp_path: Path, config_overrides: dict[str, str] | None = None
+) -> Path:
+ """Run the event logging pipeline and return the event log directory.
+
+ Args:
+ tmp_path: Temporary directory for log output.
+ config_overrides: Optional dict of config key-value pairs to set.
+
+ Returns:
+ The event log directory path.
+ """
+ event_log_dir = tmp_path / "event_log"
+ default_log_dir = Path(tempfile.gettempdir()) / "flink-agents"
+ shutil.rmtree(default_log_dir, ignore_errors=True)
+
+ config = Configuration()
+ env = StreamExecutionEnvironment.get_execution_environment(config)
+ env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+ env.set_parallelism(1)
+
+ agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+ agents_env.get_config().set_str("baseLogDir", str(event_log_dir))
+
+ if config_overrides:
+ for key, value in config_overrides.items():
+ agents_env.get_config().set_str(key, value)
+
+ current_dir = Path(__file__).parent
+ input_datastream = env.from_source(
+ source=FileSource.for_record_stream_format(
+ StreamFormat.text_line_format(),
+ f"file:///{current_dir}/../resources/input/input_data.txt",
+ ).build(),
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="python_event_logging_test",
+ )
+
+ deserialize_datastream = input_datastream.map(lambda x: json.loads(x))
+
+ agents_env.from_datastream(
+ input=deserialize_datastream, key_selector=InputKeySelector()
+ ).apply(PythonEventLoggingAgent()).to_datastream()
+
+ agents_env.execute()
+ return event_log_dir
+
+
+def _read_log_records(event_log_dir: Path) -> list[dict]:
+ """Read all JSON records from event log files.
+
+ Args:
+ event_log_dir: Directory containing event log files.
+
+ Returns:
+ List of parsed JSON records.
+ """
+ records: list[dict] = []
+ for log_file in event_log_dir.glob("events-*.log"):
+ with log_file.open(encoding="utf-8") as handle:
+ records.extend(json.loads(line) for line in handle if line.strip())
+ return records
+
+
+def test_event_log_verbose_level(tmp_path: Path) -> None:
+ """Test that VERBOSE log level writes events without truncation."""
+ event_log_dir = _run_event_logging_pipeline(
+ tmp_path, config_overrides={"event-log.level": "VERBOSE"}
+ )
+
+ records = _read_log_records(event_log_dir)
+ assert len(records) > 0, "VERBOSE level should produce event log records"
+
+ for record in records:
+ assert record.get("logLevel") == "VERBOSE", (
+ f"Expected logLevel VERBOSE, got {record.get('logLevel')}"
+ )
+
+ # VERBOSE should not truncate content (no truncatedString wrappers)
+ raw_content = json.dumps(records)
+ assert "truncatedString" not in raw_content, (
+ "VERBOSE level should not truncate any content"
+ )
+
+
+def test_event_log_off_level(tmp_path: Path) -> None:
+ """Test that OFF log level suppresses all event logging."""
+ event_log_dir = _run_event_logging_pipeline(
+ tmp_path, config_overrides={"event-log.level": "OFF"}
+ )
+
+ records = _read_log_records(event_log_dir)
+ assert len(records) == 0, (
+ f"OFF level should not produce any event log records, but found {len(records)}"
+ )
+
+
+def test_event_log_standard_truncation(tmp_path: Path) -> None:
+ """Test that STANDARD level truncates strings exceeding max-string-length."""
+ event_log_dir = _run_event_logging_pipeline(
+ tmp_path,
+ config_overrides={
+ "event-log.level": "STANDARD",
+ "event-log.standard.max-string-length": "10",
+ },
+ )
+
+ records = _read_log_records(event_log_dir)
+ assert len(records) > 0, "STANDARD level should produce event log records"
+
+ for record in records:
+ assert record.get("logLevel") == "STANDARD", (
+ f"Expected logLevel STANDARD, got {record.get('logLevel')}"
+ )
+
+ # With max-string-length=10, any string longer than 10 chars should be truncated
+ raw_content = json.dumps(records)
+ assert "truncatedString" in raw_content, (
+ "STANDARD level with max-string-length=10 should truncate long strings"
+ )
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java
new file mode 100644
index 000000000..f2f4b5027
--- /dev/null
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLogLevel;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Resolves the effective {@link EventLogLevel} for a given event type string using hierarchical
+ * config key inheritance.
+ *
+ * Resolution order for a given event type (e.g., {@code org.apache.flink.agents.api.event
+ * .ChatRequestEvent}):
+ *
+ *
+ * - Exact match in explicit per-type configuration
+ *
- Walk up dot-separated segments (e.g., {@code org.apache.flink.agents.api.event}, then
+ * {@code org.apache.flink.agents.api}, etc.)
+ *
- Root default from {@code event-log.level} config key
+ *
- Built-in default: {@link EventLogLevel#STANDARD}
+ *
+ *
+ * This mirrors Log4j's hierarchical logger configuration pattern. Resolved levels are cached in
+ * a {@link ConcurrentHashMap} for efficient repeated lookups.
+ *
+ *
Config keys are expected in the form:
+ *
+ *
+ * event-log.level = STANDARD (root default)
+ * event-log.type.<EVENT_TYPE>.level = OFF (per-type override)
+ *
+ */
+public class EventLogLevelResolver {
+
+ /** Prefix for per-event-type log level config keys. */
+ static final String TYPE_PREFIX = "event-log.type.";
+
+ /** Suffix for per-event-type log level config keys. */
+ static final String TYPE_SUFFIX = ".level";
+
+ private final EventLogLevel rootDefault;
+ private final Map explicitLevels;
+ private final ConcurrentHashMap cache;
+
+ /**
+ * Creates a resolver from a configuration data map.
+ *
+ * The map is scanned for keys matching {@code event-log.type..level} to build
+ * the explicit per-type level mappings, and {@code event-log.level} for the root default.
+ *
+ * @param confData the flat configuration key-value map (e.g., from {@code
+ * AgentConfiguration.getConfData()})
+ */
+ public EventLogLevelResolver(Map confData) {
+ Map data = confData != null ? confData : Collections.emptyMap();
+
+ // Parse root default
+ Object rootValue = data.get(AgentConfigOptions.EVENT_LOG_LEVEL.getKey());
+ this.rootDefault =
+ rootValue != null
+ ? EventLogLevel.fromString(rootValue.toString())
+ : AgentConfigOptions.EVENT_LOG_LEVEL.getDefaultValue();
+
+ // Scan for per-type overrides
+ Map levels = new HashMap<>();
+ for (Map.Entry entry : data.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(TYPE_PREFIX) && key.endsWith(TYPE_SUFFIX)) {
+ String eventType =
+ key.substring(TYPE_PREFIX.length(), key.length() - TYPE_SUFFIX.length());
+ if (!eventType.isEmpty()) {
+ levels.put(eventType, EventLogLevel.fromString(entry.getValue().toString()));
+ }
+ }
+ }
+ this.explicitLevels = Collections.unmodifiableMap(levels);
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Resolves the effective log level for the given event type string.
+ *
+ * Uses hierarchical inheritance: exact match first, then walks up dot-separated segments,
+ * then falls back to root default, then built-in default ({@link EventLogLevel#STANDARD}).
+ *
+ * @param eventType the fully qualified event type string (e.g., {@code
+ * org.apache.flink.agents.api.event.ChatRequestEvent})
+ * @return the resolved {@link EventLogLevel}, never null
+ */
+ public EventLogLevel resolve(String eventType) {
+ if (eventType == null || eventType.isEmpty()) {
+ return rootDefault;
+ }
+ return cache.computeIfAbsent(eventType, this::doResolve);
+ }
+
+ private EventLogLevel doResolve(String eventType) {
+ // 1. Exact match
+ EventLogLevel level = explicitLevels.get(eventType);
+ if (level != null) {
+ return level;
+ }
+
+ // 2. Walk up dot-separated segments
+ String current = eventType;
+ int lastDot = current.lastIndexOf('.');
+ while (lastDot > 0) {
+ current = current.substring(0, lastDot);
+ level = explicitLevels.get(current);
+ if (level != null) {
+ return level;
+ }
+ lastDot = current.lastIndexOf('.');
+ }
+
+ // 3. Root default (already parsed in constructor, falls through to built-in if not set)
+ return rootDefault;
+ }
+}
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
index 5b83452d6..402c3f52a 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
@@ -43,10 +43,12 @@ public EventLogRecord(EventContext context, Event event) {
this.event = event;
}
+ /** Returns the event context. */
public EventContext getContext() {
return context;
}
+ /** Returns the event. */
public Event getEvent() {
return event;
}
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
index 45c7e6246..a3a330cb3 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
@@ -58,7 +58,8 @@ public EventLogRecord deserialize(JsonParser parser, DeserializationContext cont
throw new IOException("Missing 'timestamp' field in EventLogRecord JSON");
}
- // Deserialize event using eventType from event node
+ // Deserialize event using eventType from event node. Any top-level "logLevel" field
+ // present in older log files is silently ignored — it is no longer part of the record.
JsonNode eventNode = rootNode.get("event");
if (eventNode == null) {
throw new IOException("Missing 'event' field in EventLogRecord JSON");
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
index db7bebd73..166f7424c 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
@@ -26,6 +26,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
+import org.apache.flink.agents.runtime.utils.EventUtil;
import java.io.IOException;
import java.util.Iterator;
@@ -47,9 +48,10 @@
*
{@code
* {
* "timestamp": "2024-01-15T10:30:00Z",
+ * "eventType": "org.apache.flink.agents.api.InputEvent",
* "event": {
- * "eventType": "org.apache.flink.agents.api.InputEvent"
- * // Event-specific fields serialized normally
+ * "eventType": "org.apache.flink.agents.api.InputEvent",
+ * // Event-specific fields
* }
* }
* }
@@ -67,6 +69,8 @@ public void serialize(EventLogRecord record, JsonGenerator gen, SerializerProvid
gen.writeStartObject();
gen.writeStringField("timestamp", record.getContext().getTimestamp());
+ gen.writeStringField(
+ "eventType", EventUtil.resolveEventType(record.getEvent(), record.getContext()));
gen.writeFieldName("event");
JsonNode eventNode = buildEventNode(record.getEvent(), mapper);
@@ -74,8 +78,10 @@ public void serialize(EventLogRecord record, JsonGenerator gen, SerializerProvid
throw new IllegalStateException(
"Event log payload must be a JSON object, but was: " + eventNode.getNodeType());
}
- eventNode = reorderEventFields((ObjectNode) eventNode, record.getEvent(), mapper);
- gen.writeTree(eventNode);
+ ObjectNode orderedNode =
+ reorderEventFields((ObjectNode) eventNode, record.getEvent(), mapper);
+
+ gen.writeTree(orderedNode);
gen.writeEndObject();
}
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
index 8e7817276..b09af65d4 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
@@ -18,13 +18,18 @@
package org.apache.flink.agents.runtime.eventlog;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.EventContext;
-import org.apache.flink.agents.api.EventFilter;
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLogLevel;
import org.apache.flink.agents.api.logger.EventLogger;
import org.apache.flink.agents.api.logger.EventLoggerConfig;
import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.agents.runtime.utils.EventUtil;
+import org.apache.flink.metrics.Counter;
import java.io.BufferedWriter;
import java.io.FileWriter;
@@ -32,6 +37,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
/**
* A file-based event logger that logs events to files with structured names in a flat directory.
@@ -81,16 +88,20 @@ public class FileEventLogger implements EventLogger {
private static final String DEFAULT_BASE_LOG_DIR =
Paths.get(System.getProperty("java.io.tmpdir"), "flink-agents").toString();
+ /** Property key for passing the full agent config data map into the logger. */
+ public static final String AGENT_CONFIG_PROPERTY_KEY = "agentConfig";
+
private static final ObjectMapper MAPPER = new ObjectMapper();
private final EventLoggerConfig config;
- private final EventFilter eventFilter;
private boolean prettyPrint;
private PrintWriter writer;
+ private EventLogLevelResolver levelResolver;
+ private JsonTruncator truncator;
+ private Counter truncatedEventsCounter;
public FileEventLogger(EventLoggerConfig config) {
this.config = config;
- this.eventFilter = config.getEventFilter();
}
@Override
@@ -105,6 +116,45 @@ public void open(EventLoggerOpenParams params) throws Exception {
writer = new PrintWriter(new BufferedWriter(new FileWriter(logFilePath, true)));
prettyPrint =
(Boolean) config.getProperties().getOrDefault(PRETTY_PRINT_PROPERTY_KEY, false);
+
+ // Initialize level resolver and truncator from agent config
+ @SuppressWarnings("unchecked")
+ Map agentConfig =
+ (Map)
+ config.getProperties()
+ .getOrDefault(AGENT_CONFIG_PROPERTY_KEY, Collections.emptyMap());
+ this.levelResolver = new EventLogLevelResolver(agentConfig);
+ int maxStringLength =
+ getIntFromConfig(
+ agentConfig,
+ AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getKey(),
+ AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getDefaultValue());
+ int maxArrayElements =
+ getIntFromConfig(
+ agentConfig,
+ AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getKey(),
+ AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getDefaultValue());
+ int maxDepth =
+ getIntFromConfig(
+ agentConfig,
+ AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getKey(),
+ AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getDefaultValue());
+ this.truncator = new JsonTruncator(maxStringLength, maxArrayElements, maxDepth);
+ }
+
+ private static int getIntFromConfig(Map config, String key, int defaultValue) {
+ Object value = config.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+ try {
+ return Integer.parseInt(value.toString());
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
}
private String generateSubTaskLogFilePath(EventLoggerOpenParams params) {
@@ -130,18 +180,49 @@ public void append(EventContext context, Event event) throws Exception {
throw new IllegalStateException("FileEventLogger not initialized. Call open() first.");
}
- // Apply event filter
- if (!eventFilter.accept(event, context)) {
- return; // Skip this event
+ String eventTypeStr = EventUtil.resolveEventType(event, context);
+
+ // Resolve log level and skip OFF events.
+ EventLogLevel level =
+ levelResolver != null ? levelResolver.resolve(eventTypeStr) : EventLogLevel.VERBOSE;
+ if (level == EventLogLevel.OFF) {
+ return;
}
+ // All events should be JSON serializable; we already check this when sending events
+ // to context (RunnerContextImpl.sendEvent).
EventLogRecord record = new EventLogRecord(context, event);
- // All events should be JSON serializable, since we check it when sending events to context:
- // RunnerContextImpl.sendEvent
+ JsonNode tree = MAPPER.valueToTree(record);
+ if (!(tree instanceof ObjectNode)) {
+ throw new IllegalStateException(
+ "EventLogRecord must serialize to a JSON object, but was: "
+ + tree.getNodeType());
+ }
+ ObjectNode rootNode = (ObjectNode) tree;
+
+ // Truncate the event subtree at STANDARD level.
+ if (level == EventLogLevel.STANDARD && truncator != null) {
+ JsonNode eventNode = rootNode.get("event");
+ if (eventNode instanceof ObjectNode) {
+ boolean truncated = truncator.truncate((ObjectNode) eventNode);
+ if (truncated && truncatedEventsCounter != null) {
+ truncatedEventsCounter.inc();
+ }
+ }
+ }
+
+ // Rebuild the top-level object so logLevel sits between timestamp and eventType,
+ // matching the documented JSON layout.
+ ObjectNode ordered = MAPPER.createObjectNode();
+ ordered.set("timestamp", rootNode.get("timestamp"));
+ ordered.put("logLevel", level.name());
+ ordered.set("eventType", rootNode.get("eventType"));
+ ordered.set("event", rootNode.get("event"));
+
String json =
prettyPrint
- ? MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(record)
- : MAPPER.writeValueAsString(record);
+ ? MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(ordered)
+ : MAPPER.writeValueAsString(ordered);
writer.println(json);
}
@@ -154,6 +235,16 @@ public void flush() throws Exception {
writer.flush();
}
+ /**
+ * Sets the counter for tracking truncated events. Called by the operator after metrics are
+ * initialized.
+ *
+ * @param counter the counter to increment when events are truncated
+ */
+ public void setTruncatedEventsCounter(Counter counter) {
+ this.truncatedEventsCounter = counter;
+ }
+
@Override
public void close() throws Exception {
if (writer != null) {
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java
new file mode 100644
index 000000000..022ca4cf7
--- /dev/null
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Truncates a Jackson {@link JsonNode} tree per configurable thresholds.
+ *
+ * Three truncation strategies are applied in a single recursive pass:
+ *
+ *
+ * - String truncation: Strings longer than {@code maxStringLength} are replaced with a
+ * wrapper: {@code {"truncatedString": "first N chars...", "omittedChars": M}}
+ *
- Array truncation: Arrays larger than {@code maxArrayElements} are replaced with a
+ * wrapper: {@code {"truncatedList": [first N elements], "omittedElements": M}}
+ *
- Depth truncation: At max depth, object nodes retain only scalar fields; nested
+ * objects/arrays are dropped: {@code {"truncatedObject": {scalars...}, "omittedFields": N}}
+ *
+ *
+ * Setting any threshold to {@code 0} disables that specific truncation strategy. If all
+ * thresholds are {@code 0}, no truncation occurs.
+ *
+ *
Protected fields at the top level of the event node ({@code eventType}, {@code id}, {@code
+ * attributes}) are never truncated.
+ */
+public class JsonTruncator {
+
+ private static final Set PROTECTED_FIELDS =
+ new HashSet<>(Arrays.asList("eventType", "id", "attributes"));
+
+ private final int maxStringLength;
+ private final int maxArrayElements;
+ private final int maxDepth;
+
+ /**
+ * Creates a new truncator with the given thresholds.
+ *
+ * @param maxStringLength maximum character length for string values; 0 to disable
+ * @param maxArrayElements maximum number of array elements retained; 0 to disable
+ * @param maxDepth maximum object nesting depth; 0 to disable
+ */
+ public JsonTruncator(int maxStringLength, int maxArrayElements, int maxDepth) {
+ this.maxStringLength = maxStringLength;
+ this.maxArrayElements = maxArrayElements;
+ this.maxDepth = maxDepth;
+ }
+
+ /**
+ * Truncates the given event node in place according to configured thresholds.
+ *
+ * Protected fields ({@code eventType}, {@code id}, {@code attributes}) at the top level of
+ * the event node are never truncated.
+ *
+ * @param eventNode the top-level event JSON object to truncate
+ * @return {@code true} if any field was truncated, {@code false} if the node was unchanged
+ */
+ public boolean truncate(ObjectNode eventNode) {
+ if (eventNode == null) {
+ return false;
+ }
+ return truncateObject(eventNode, 1, true);
+ }
+
+ /**
+ * Recursively truncates an object node.
+ *
+ * @param node the object node to process
+ * @param depth current depth (1 = top-level event node)
+ * @param isTopLevel whether this is the top-level event node (for protected field checks)
+ * @return true if any truncation occurred
+ */
+ private boolean truncateObject(ObjectNode node, int depth, boolean isTopLevel) {
+ boolean truncated = false;
+
+ // At max depth, collapse the entire object to retain only scalars
+ if (maxDepth > 0 && depth >= maxDepth) {
+ return collapseAtMaxDepth(node, isTopLevel);
+ }
+
+ List fieldNames = new ArrayList<>();
+ node.fieldNames().forEachRemaining(fieldNames::add);
+
+ for (String fieldName : fieldNames) {
+ if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) {
+ continue;
+ }
+
+ JsonNode child = node.get(fieldName);
+ if (child == null) {
+ continue;
+ }
+
+ if (child.isTextual()) {
+ JsonNode replacement = truncateString(child.textValue());
+ if (replacement != null) {
+ node.set(fieldName, replacement);
+ truncated = true;
+ }
+ } else if (child.isArray()) {
+ // Truncate the array first so we don't recurse into elements that will be dropped.
+ JsonNode replacement = truncateArray((ArrayNode) child);
+ if (replacement != null) {
+ ArrayNode retained =
+ (ArrayNode) ((ObjectNode) replacement).get("truncatedList");
+ truncateArrayContents(retained, depth + 1);
+ node.set(fieldName, replacement);
+ truncated = true;
+ } else {
+ truncated |= truncateArrayContents((ArrayNode) child, depth + 1);
+ }
+ } else if (child.isObject()) {
+ truncated |= truncateObject((ObjectNode) child, depth + 1, false);
+ }
+ }
+
+ return truncated;
+ }
+
+ /**
+ * Truncates a string if it exceeds maxStringLength.
+ *
+ * @return a wrapper ObjectNode if truncated, or null if no truncation needed
+ */
+ private JsonNode truncateString(String value) {
+ if (maxStringLength <= 0 || value == null || value.length() <= maxStringLength) {
+ return null;
+ }
+ ObjectNode wrapper = JsonNodeFactory.instance.objectNode();
+ wrapper.put("truncatedString", value.substring(0, maxStringLength) + "...");
+ wrapper.put("omittedChars", value.length() - maxStringLength);
+ return wrapper;
+ }
+
+ /**
+ * Truncates an array if it exceeds maxArrayElements.
+ *
+ * @return a wrapper ObjectNode if truncated, or null if no truncation needed
+ */
+ private JsonNode truncateArray(ArrayNode array) {
+ if (maxArrayElements <= 0 || array.size() <= maxArrayElements) {
+ return null;
+ }
+ ObjectNode wrapper = JsonNodeFactory.instance.objectNode();
+ ArrayNode retained = JsonNodeFactory.instance.arrayNode();
+ for (int i = 0; i < maxArrayElements; i++) {
+ retained.add(array.get(i));
+ }
+ wrapper.set("truncatedList", retained);
+ wrapper.put("omittedElements", array.size() - maxArrayElements);
+ return wrapper;
+ }
+
+ /**
+ * Recursively truncates contents within an array (strings and nested structures).
+ *
+ * @return true if any truncation occurred within array elements
+ */
+ private boolean truncateArrayContents(ArrayNode array, int depth) {
+ boolean truncated = false;
+ for (int i = 0; i < array.size(); i++) {
+ JsonNode element = array.get(i);
+ if (element.isTextual()) {
+ JsonNode replacement = truncateString(element.textValue());
+ if (replacement != null) {
+ array.set(i, replacement);
+ truncated = true;
+ }
+ } else if (element.isObject()) {
+ truncated |= truncateObject((ObjectNode) element, depth, false);
+ } else if (element.isArray()) {
+ JsonNode replacement = truncateArray((ArrayNode) element);
+ if (replacement != null) {
+ ArrayNode retained =
+ (ArrayNode) ((ObjectNode) replacement).get("truncatedList");
+ truncateArrayContents(retained, depth + 1);
+ array.set(i, replacement);
+ truncated = true;
+ } else {
+ truncated |= truncateArrayContents((ArrayNode) element, depth + 1);
+ }
+ }
+ }
+ return truncated;
+ }
+
+ /**
+ * At max depth, retain only scalar fields and drop nested objects/arrays.
+ *
+ * @return true if any fields were dropped
+ */
+ private boolean collapseAtMaxDepth(ObjectNode node, boolean isTopLevel) {
+ List fieldNames = new ArrayList<>();
+ node.fieldNames().forEachRemaining(fieldNames::add);
+
+ ObjectNode scalarFields = JsonNodeFactory.instance.objectNode();
+ int omittedCount = 0;
+ boolean hasNonScalar = false;
+
+ for (String fieldName : fieldNames) {
+ if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) {
+ // Protected fields are kept as-is, even if non-scalar
+ scalarFields.set(fieldName, node.get(fieldName));
+ continue;
+ }
+
+ JsonNode child = node.get(fieldName);
+ if (child.isObject() || child.isArray()) {
+ omittedCount++;
+ hasNonScalar = true;
+ } else {
+ // Apply string truncation to scalar string fields even at max depth
+ if (child.isTextual()) {
+ JsonNode replacement = truncateString(child.textValue());
+ if (replacement != null) {
+ scalarFields.set(fieldName, replacement);
+ } else {
+ scalarFields.set(fieldName, child);
+ }
+ } else {
+ scalarFields.set(fieldName, child);
+ }
+ }
+ }
+
+ if (!hasNonScalar) {
+ // No non-scalar fields to drop; but string truncation may have occurred
+ // Check if any scalar field was replaced
+ boolean stringTruncated = false;
+ for (String fieldName : fieldNames) {
+ if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) {
+ continue;
+ }
+ JsonNode original = node.get(fieldName);
+ if (original.isTextual()) {
+ JsonNode replacement = truncateString(original.textValue());
+ if (replacement != null) {
+ node.set(fieldName, replacement);
+ stringTruncated = true;
+ }
+ }
+ }
+ return stringTruncated;
+ }
+
+ // Replace the node contents with the wrapped version
+ node.removeAll();
+ node.set("truncatedObject", scalarFields);
+ node.put("omittedFields", omittedCount);
+ return true;
+ }
+}
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
index 0e9657322..59e0daaa1 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
@@ -37,6 +37,8 @@ public class BuiltInMetrics {
private final Meter numOfActionsExecutedPerSec;
+ private final Counter eventLogTruncatedEvents;
+
private final HashMap actionMetricGroups;
public BuiltInMetrics(FlinkAgentsMetricGroupImpl parentMetricGroup, AgentPlan agentPlan) {
@@ -48,6 +50,8 @@ public BuiltInMetrics(FlinkAgentsMetricGroupImpl parentMetricGroup, AgentPlan ag
this.numOfActionsExecutedPerSec =
parentMetricGroup.getMeter("numOfActionsExecutedPerSec", numOfActionsExecuted);
+ this.eventLogTruncatedEvents = parentMetricGroup.getCounter("eventLogTruncatedEvents");
+
this.actionMetricGroups = new HashMap<>();
for (String actionName : agentPlan.getActions().keySet()) {
actionMetricGroups.put(
@@ -69,4 +73,9 @@ public void markActionExecuted(String actionName) {
numOfActionsExecutedPerSec.markEvent();
actionMetricGroups.get(actionName).markActionExecuted();
}
+
+ /** Returns the counter tracking event log truncation occurrences. */
+ public Counter getEventLogTruncatedEventsCounter() {
+ return eventLogTruncatedEvents;
+ }
}
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index e5015a3a5..6ec16041b 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -342,6 +342,10 @@ private void initEventLogger(StreamingRuntimeContext runtimeContext) throws Exce
return;
}
eventLogger.open(new EventLoggerOpenParams(runtimeContext));
+ if (eventLogger instanceof FileEventLogger) {
+ ((FileEventLogger) eventLogger)
+ .setTruncatedEventsCounter(builtInMetrics.getEventLogTruncatedEventsCounter());
+ }
}
@Override
@@ -1132,6 +1136,8 @@ private EventLogger createEventLogger(AgentPlan agentPlan) {
}
loggerConfigBuilder.property(
FileEventLogger.PRETTY_PRINT_PROPERTY_KEY, agentPlan.getConfig().get(PRETTY_PRINT));
+ loggerConfigBuilder.property(
+ FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentPlan.getConfig().getConfData());
return EventLoggerFactory.createLogger(loggerConfigBuilder.build());
}
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/utils/EventUtil.java b/runtime/src/main/java/org/apache/flink/agents/runtime/utils/EventUtil.java
index e262a1b75..45a567558 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/utils/EventUtil.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/utils/EventUtil.java
@@ -18,6 +18,7 @@
package org.apache.flink.agents.runtime.utils;
import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.EventContext;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
@@ -38,6 +39,26 @@ public static boolean isInputEvent(Event event) {
&& ((PythonEvent) event).getEventType().equals(PYTHON_INPUT_EVENT_NAME);
}
+ /**
+ * Resolves the event type string for the given event. For {@link PythonEvent}, returns the
+ * Python module path (e.g., {@code flink_agents.api.events.event.OutputEvent}) instead of the
+ * Java wrapper class name. Falls back to {@link EventContext#getEventType()} for all other
+ * events.
+ *
+ * @param event the event
+ * @param context the event context
+ * @return the resolved event type string
+ */
+ public static String resolveEventType(Event event, EventContext context) {
+ if (event instanceof PythonEvent) {
+ String pythonType = ((PythonEvent) event).getEventType();
+ if (pythonType != null) {
+ return pythonType;
+ }
+ }
+ return context.getEventType();
+ }
+
public static boolean isOutputEvent(Event event) {
if (event instanceof OutputEvent) {
return true;
diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java
new file mode 100644
index 000000000..a1d2d26e5
--- /dev/null
+++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import org.apache.flink.agents.api.logger.EventLogLevel;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class EventLogLevelResolverTest {
+
+ @Test
+ void testExactMatch() {
+ Map config = new HashMap<>();
+ config.put(
+ "event-log.type.org.apache.flink.agents.api.event.ChatRequestEvent.level", "OFF");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+ .isEqualTo(EventLogLevel.OFF);
+ }
+
+ @Test
+ void testParentPackageInheritance() {
+ Map config = new HashMap<>();
+ config.put("event-log.type.org.apache.flink.agents.api.event.level", "VERBOSE");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ // ChatRequestEvent is under the configured parent package
+ assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+ .isEqualTo(EventLogLevel.VERBOSE);
+ }
+
+ @Test
+ void testGrandparentInheritance() {
+ Map config = new HashMap<>();
+ config.put("event-log.type.org.apache.flink.level", "OFF");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ // Should walk up multiple levels to find the match
+ assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+ .isEqualTo(EventLogLevel.OFF);
+ }
+
+ @Test
+ void testRootDefault() {
+ Map config = new HashMap<>();
+ config.put("event-log.level", "VERBOSE");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ assertThat(resolver.resolve("some.unknown.EventType")).isEqualTo(EventLogLevel.VERBOSE);
+ }
+
+ @Test
+ void testBuiltInDefault() {
+ EventLogLevelResolver resolver = new EventLogLevelResolver(Collections.emptyMap());
+
+ assertThat(resolver.resolve("some.unknown.EventType")).isEqualTo(EventLogLevel.STANDARD);
+ }
+
+ @Test
+ void testCaseInsensitive() {
+ Map config = new HashMap<>();
+ config.put("event-log.type.my.Event.level", "verbose");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ assertThat(resolver.resolve("my.Event")).isEqualTo(EventLogLevel.VERBOSE);
+
+ // Also test root level with mixed case
+ Map config2 = new HashMap<>();
+ config2.put("event-log.level", "Off");
+ EventLogLevelResolver resolver2 = new EventLogLevelResolver(config2);
+
+ assertThat(resolver2.resolve("any.Event")).isEqualTo(EventLogLevel.OFF);
+ }
+
+ @Test
+ void testInvalidLevel() {
+ Map config = new HashMap<>();
+ config.put("event-log.type.my.Event.level", "INVALID_LEVEL");
+
+ assertThatThrownBy(() -> new EventLogLevelResolver(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("INVALID_LEVEL");
+ }
+
+ @Test
+ void testNullConfigData() {
+ EventLogLevelResolver resolver = new EventLogLevelResolver(null);
+
+ assertThat(resolver.resolve("some.Event")).isEqualTo(EventLogLevel.STANDARD);
+ }
+
+ @Test
+ void testNullOrEmptyEventType() {
+ Map config = new HashMap<>();
+ config.put("event-log.level", "VERBOSE");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ assertThat(resolver.resolve(null)).isEqualTo(EventLogLevel.VERBOSE);
+ assertThat(resolver.resolve("")).isEqualTo(EventLogLevel.VERBOSE);
+ }
+
+ @Test
+ void testExactMatchTakesPrecedenceOverParent() {
+ Map config = new HashMap<>();
+ config.put("event-log.type.org.apache.flink.agents.api.event.level", "OFF");
+ config.put(
+ "event-log.type.org.apache.flink.agents.api.event.ChatRequestEvent.level",
+ "VERBOSE");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ // Exact match should win over parent package
+ assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+ .isEqualTo(EventLogLevel.VERBOSE);
+ // Sibling should inherit from parent
+ assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatResponseEvent"))
+ .isEqualTo(EventLogLevel.OFF);
+ }
+
+ @Test
+ void testCachingReturnsSameResult() {
+ Map config = new HashMap<>();
+ config.put("event-log.type.my.Event.level", "OFF");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ // Call twice — should return cached result
+ EventLogLevel first = resolver.resolve("my.Event");
+ EventLogLevel second = resolver.resolve("my.Event");
+ assertThat(first).isSameAs(second);
+ }
+}
diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
index b61d8045c..05176e53e 100644
--- a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
+++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.EventContext;
-import org.apache.flink.agents.api.EventFilter;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.configuration.AgentConfigOptions;
@@ -41,7 +40,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.when;
@@ -297,234 +298,257 @@ void testMultipleSubTasks() throws Exception {
}
@Test
- void testEventFilterAcceptAll() throws Exception {
- // Given - config with ACCEPT_ALL filter (default behavior)
+ void testPrettyPrintOutputsFormattedJson() throws Exception {
+ // Given - config with prettyPrint enabled
config =
EventLoggerConfig.builder()
.loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .eventFilter(EventFilter.ACCEPT_ALL)
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString())
+ .property(AgentConfigOptions.PRETTY_PRINT.getKey(), true)
.build();
logger = new FileEventLogger(config);
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("input data");
- OutputEvent outputEvent = new OutputEvent("output data");
-
- // When
+ InputEvent inputEvent = new InputEvent("test input");
logger.append(new EventContext(inputEvent), inputEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
logger.flush();
- // Then - both events should be logged
+ // Then - output should be valid JSON spanning multiple lines (pretty-printed)
Path logFile = getExpectedLogFilePath();
List lines = Files.readAllLines(logFile);
- assertEquals(2, lines.size(), "Both events should be logged with ACCEPT_ALL filter");
-
- // Verify both events were deserialized correctly
- EventLogRecord inputRecord = objectMapper.readValue(lines.get(0), EventLogRecord.class);
- assertInstanceOf(InputEvent.class, inputRecord.getEvent());
-
- EventLogRecord outputRecord = objectMapper.readValue(lines.get(1), EventLogRecord.class);
- assertInstanceOf(OutputEvent.class, outputRecord.getEvent());
+ // Pretty-printed JSON for a single event record spans multiple lines
+ assertTrue(lines.size() > 1, "Pretty-printed JSON should span multiple lines");
+ // Each line after the first should be indented
+ assertTrue(
+ lines.subList(1, lines.size()).stream().anyMatch(line -> line.startsWith(" ")),
+ "Pretty-printed JSON lines should be indented");
+ // The entire content should still be valid JSON
+ String content = String.join("\n", lines);
+ assertDoesNotThrow(
+ () -> objectMapper.readValue(content, EventLogRecord.class),
+ "Pretty-printed output should be valid JSON deserializable to EventLogRecord");
}
@Test
- void testEventFilterRejectAll() throws Exception {
- // Given - config with REJECT_ALL filter
+ void testStandardLevelTruncation() throws Exception {
+ // Given - config with STANDARD level and a small max-string-length for easy testing
+ Map agentConfig = new HashMap<>();
+ agentConfig.put("event-log.level", "STANDARD");
+ agentConfig.put("event-log.standard.max-string-length", 10);
+ agentConfig.put("event-log.standard.max-array-elements", 20);
+ agentConfig.put("event-log.standard.max-depth", 5);
+
config =
EventLoggerConfig.builder()
.loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .eventFilter(EventFilter.REJECT_ALL)
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString())
+ .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentConfig)
.build();
logger = new FileEventLogger(config);
-
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("input data");
- OutputEvent outputEvent = new OutputEvent("output data");
- // When
- logger.append(new EventContext(inputEvent), inputEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
+ // Use a custom event with a very long string field
+ TestCustomEvent event =
+ new TestCustomEvent("this is a very long string that exceeds 10", 1);
+ EventContext context = new EventContext(event);
+
+ logger.append(context, event);
logger.flush();
- // Then - no events should be logged (file should not exist or be empty)
Path logFile = getExpectedLogFilePath();
+ List lines = Files.readAllLines(logFile);
+ assertEquals(1, lines.size());
+
+ JsonNode jsonNode = objectMapper.readTree(lines.get(0));
+ assertEquals("STANDARD", jsonNode.get("logLevel").asText());
+
+ // The customData field should be truncated
+ JsonNode eventNode = jsonNode.get("event");
+ JsonNode customDataNode = eventNode.get("customData");
assertTrue(
- !Files.exists(logFile) || Files.readAllLines(logFile).isEmpty(),
- "No events should be logged with REJECT_ALL filter");
+ customDataNode.has("truncatedString"),
+ "Long string should be truncated at STANDARD level");
+ assertTrue(customDataNode.has("omittedChars"));
}
@Test
- void testEventFilterByEventType() throws Exception {
- // Given - config with filter that only accepts InputEvents
+ void testVerboseLevelNoTruncation() throws Exception {
+ // Given - config with VERBOSE level
+ Map agentConfig = new HashMap<>();
+ agentConfig.put("event-log.level", "VERBOSE");
+ agentConfig.put("event-log.standard.max-string-length", 10);
+
config =
EventLoggerConfig.builder()
.loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .eventFilter(EventFilter.byEventType(InputEvent.class))
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString())
+ .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentConfig)
.build();
logger = new FileEventLogger(config);
-
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("input data");
- OutputEvent outputEvent = new OutputEvent("output data");
- TestCustomEvent customEvent = new TestCustomEvent("custom data", 42);
- // When
- logger.append(new EventContext(inputEvent), inputEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
- logger.append(new EventContext(customEvent), customEvent);
+ TestCustomEvent event =
+ new TestCustomEvent("this is a very long string that exceeds 10", 1);
+ EventContext context = new EventContext(event);
+
+ logger.append(context, event);
logger.flush();
- // Then - only InputEvent should be logged
Path logFile = getExpectedLogFilePath();
List lines = Files.readAllLines(logFile);
- assertEquals(1, lines.size(), "Only InputEvent should be logged");
+ assertEquals(1, lines.size());
- EventLogRecord record = objectMapper.readValue(lines.get(0), EventLogRecord.class);
- assertInstanceOf(InputEvent.class, record.getEvent());
- assertEquals("input data", ((InputEvent) record.getEvent()).getInput());
+ JsonNode jsonNode = objectMapper.readTree(lines.get(0));
+ assertEquals("VERBOSE", jsonNode.get("logLevel").asText());
+
+ // The customData field should NOT be truncated
+ JsonNode eventNode = jsonNode.get("event");
+ assertTrue(
+ eventNode.get("customData").isTextual(),
+ "String should be preserved at VERBOSE level");
+ assertEquals(
+ "this is a very long string that exceeds 10", eventNode.get("customData").asText());
}
@Test
- void testEventFilterByMultipleEventTypes() throws Exception {
- // Given - config with filter that accepts InputEvents and OutputEvents
+ void testOffLevelSkipsEvent() throws Exception {
+ // Given - config with OFF level
+ Map agentConfig = new HashMap<>();
+ agentConfig.put("event-log.level", "OFF");
+
config =
EventLoggerConfig.builder()
.loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .eventFilter(EventFilter.byEventType(InputEvent.class, OutputEvent.class))
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString())
+ .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentConfig)
.build();
logger = new FileEventLogger(config);
-
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("input data");
- OutputEvent outputEvent = new OutputEvent("output data");
- TestCustomEvent customEvent = new TestCustomEvent("custom data", 42);
- // When
- logger.append(new EventContext(inputEvent), inputEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
- logger.append(new EventContext(customEvent), customEvent);
+ InputEvent event = new InputEvent("should not be logged");
+ EventContext context = new EventContext(event);
+
+ logger.append(context, event);
logger.flush();
- // Then - InputEvent and OutputEvent should be logged, but not TestCustomEvent
Path logFile = getExpectedLogFilePath();
List lines = Files.readAllLines(logFile);
- assertEquals(2, lines.size(), "InputEvent and OutputEvent should be logged");
-
- EventLogRecord inputRecord = objectMapper.readValue(lines.get(0), EventLogRecord.class);
- assertInstanceOf(InputEvent.class, inputRecord.getEvent());
- assertEquals("input data", ((InputEvent) inputRecord.getEvent()).getInput());
-
- EventLogRecord outputRecord = objectMapper.readValue(lines.get(1), EventLogRecord.class);
- assertInstanceOf(OutputEvent.class, outputRecord.getEvent());
- assertEquals("output data", ((OutputEvent) outputRecord.getEvent()).getOutput());
+ assertEquals(0, lines.size(), "OFF level should produce no output");
}
@Test
- void testCustomEventFilter() throws Exception {
- // Given - config with custom filter that only accepts events with specific content
- EventFilter customFilter =
- (event, context) -> {
- if (event instanceof InputEvent) {
- return ((InputEvent) event).getInput().toString().contains("important");
- }
- return false;
- };
+ void testPerTypeLevelOverride() throws Exception {
+ // Given - root is STANDARD but InputEvent is set to VERBOSE
+ Map agentConfig = new HashMap<>();
+ agentConfig.put("event-log.level", "STANDARD");
+ agentConfig.put("event-log.standard.max-string-length", 10);
+ agentConfig.put("event-log.type.org.apache.flink.agents.api.InputEvent.level", "VERBOSE");
config =
EventLoggerConfig.builder()
.loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .eventFilter(customFilter)
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString())
+ .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentConfig)
.build();
logger = new FileEventLogger(config);
-
logger.open(openParams);
- InputEvent importantEvent = new InputEvent("important data");
- InputEvent regularEvent = new InputEvent("regular data");
- OutputEvent outputEvent = new OutputEvent("output data");
- // When
- logger.append(new EventContext(importantEvent), importantEvent);
- logger.append(new EventContext(regularEvent), regularEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
+ // InputEvent should be VERBOSE (no truncation)
+ InputEvent inputEvent = new InputEvent("this is a very long string that exceeds 10");
+ logger.append(new EventContext(inputEvent), inputEvent);
+
+ // TestCustomEvent should be STANDARD (truncated)
+ TestCustomEvent customEvent =
+ new TestCustomEvent("this is a very long string that exceeds 10", 1);
+ logger.append(new EventContext(customEvent), customEvent);
logger.flush();
- // Then - only the "important" InputEvent should be logged
Path logFile = getExpectedLogFilePath();
List lines = Files.readAllLines(logFile);
- assertEquals(1, lines.size(), "Only important InputEvent should be logged");
+ assertEquals(2, lines.size());
- EventLogRecord record = objectMapper.readValue(lines.get(0), EventLogRecord.class);
- assertInstanceOf(InputEvent.class, record.getEvent());
- assertEquals("important data", ((InputEvent) record.getEvent()).getInput());
+ // InputEvent at VERBOSE - no truncation
+ JsonNode inputJson = objectMapper.readTree(lines.get(0));
+ assertEquals("VERBOSE", inputJson.get("logLevel").asText());
+ assertTrue(inputJson.get("event").get("input").isTextual());
+
+ // TestCustomEvent at STANDARD - truncated
+ JsonNode customJson = objectMapper.readTree(lines.get(1));
+ assertEquals("STANDARD", customJson.get("logLevel").asText());
+ assertTrue(customJson.get("event").get("customData").has("truncatedString"));
}
@Test
- void testDefaultEventFilterBehavior() throws Exception {
- // Given - config without explicit eventFilter (should default to ACCEPT_ALL)
- config =
- EventLoggerConfig.builder()
- .loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .build();
- logger = new FileEventLogger(config);
-
+ void testJsonOutputHasNewFields() throws Exception {
+ // Given - default config
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("input data");
- OutputEvent outputEvent = new OutputEvent("output data");
+ InputEvent event = new InputEvent("test");
+ EventContext context = new EventContext(event);
- // When
- logger.append(new EventContext(inputEvent), inputEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
+ logger.append(context, event);
logger.flush();
- // Then - both events should be logged (default ACCEPT_ALL behavior)
Path logFile = getExpectedLogFilePath();
List lines = Files.readAllLines(logFile);
- assertEquals(2, lines.size(), "Both events should be logged with default filter");
+ JsonNode jsonNode = objectMapper.readTree(lines.get(0));
- EventLogRecord inputRecord = objectMapper.readValue(lines.get(0), EventLogRecord.class);
- assertInstanceOf(InputEvent.class, inputRecord.getEvent());
+ // Verify new top-level fields exist
+ assertTrue(jsonNode.has("logLevel"), "JSON should have logLevel field");
+ assertTrue(jsonNode.has("eventType"), "JSON should have eventType field");
+ assertEquals("org.apache.flink.agents.api.InputEvent", jsonNode.get("eventType").asText());
+ assertNotNull(jsonNode.get("logLevel").asText());
+ }
- EventLogRecord outputRecord = objectMapper.readValue(lines.get(1), EventLogRecord.class);
- assertInstanceOf(OutputEvent.class, outputRecord.getEvent());
+ @Test
+ void testBackwardCompatibleDeserialization() throws Exception {
+ // Simulate old-format JSON without logLevel field
+ String oldFormatJson =
+ "{\"timestamp\":\"2024-01-15T10:30:00Z\","
+ + "\"event\":{\"eventType\":\"org.apache.flink.agents.api.InputEvent\","
+ + "\"id\":null,\"attributes\":{},\"input\":\"test\"}}";
+
+ EventLogRecord record = objectMapper.readValue(oldFormatJson, EventLogRecord.class);
+ assertNotNull(record.getEvent());
+ assertInstanceOf(InputEvent.class, record.getEvent());
+ assertEquals(
+ "test",
+ ((InputEvent) record.getEvent()).getInput(),
+ "Old-format JSON without logLevel should still deserialize the event payload");
}
@Test
- void testPrettyPrintOutputsFormattedJson() throws Exception {
- // Given - config with prettyPrint enabled
+ void testHierarchicalInheritance() throws Exception {
+ // Set package-level OFF, but specific type VERBOSE
+ Map agentConfig = new HashMap<>();
+ agentConfig.put("event-log.level", "STANDARD");
+ agentConfig.put("event-log.type.org.apache.flink.agents.api.level", "OFF");
+ agentConfig.put("event-log.type.org.apache.flink.agents.api.InputEvent.level", "VERBOSE");
+
config =
EventLoggerConfig.builder()
.loggerType("file")
.property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, tempDir.toString())
- .property(AgentConfigOptions.PRETTY_PRINT.getKey(), true)
+ .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, agentConfig)
.build();
logger = new FileEventLogger(config);
-
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("test input");
+
+ // InputEvent has explicit VERBOSE override — should be logged
+ InputEvent inputEvent = new InputEvent("should be logged");
logger.append(new EventContext(inputEvent), inputEvent);
+
+ // OutputEvent inherits OFF from package level — should NOT be logged
+ OutputEvent outputEvent = new OutputEvent("should not be logged");
+ logger.append(new EventContext(outputEvent), outputEvent);
logger.flush();
- // Then - output should be valid JSON spanning multiple lines (pretty-printed)
Path logFile = getExpectedLogFilePath();
List lines = Files.readAllLines(logFile);
- // Pretty-printed JSON for a single event record spans multiple lines
- assertTrue(lines.size() > 1, "Pretty-printed JSON should span multiple lines");
- // Each line after the first should be indented
- assertTrue(
- lines.subList(1, lines.size()).stream().anyMatch(line -> line.startsWith(" ")),
- "Pretty-printed JSON lines should be indented");
- // The entire content should still be valid JSON
- String content = String.join("\n", lines);
- assertDoesNotThrow(
- () -> objectMapper.readValue(content, EventLogRecord.class),
- "Pretty-printed output should be valid JSON deserializable to EventLogRecord");
+ assertEquals(1, lines.size(), "Only InputEvent (VERBOSE override) should be logged");
+
+ JsonNode json = objectMapper.readTree(lines.get(0));
+ assertEquals("VERBOSE", json.get("logLevel").asText());
+ assertEquals("org.apache.flink.agents.api.InputEvent", json.get("eventType").asText());
}
private Path getExpectedLogFilePath() {
diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java
new file mode 100644
index 000000000..bd05b9a10
--- /dev/null
+++ b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class JsonTruncatorTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Test
+ void testStringTruncation() {
+ JsonTruncator truncator = new JsonTruncator(10, 0, 0);
+ ObjectNode node = MAPPER.createObjectNode();
+ node.put("content", "This is a very long string that exceeds the limit");
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ assertThat(node.get("content").isObject()).isTrue();
+ assertThat(node.get("content").get("truncatedString").asText()).isEqualTo("This is a ...");
+ assertThat(node.get("content").get("omittedChars").asInt())
+ .isEqualTo(49 - 10); // total length minus maxStringLength
+ }
+
+ @Test
+ void testArrayTrimming() {
+ JsonTruncator truncator = new JsonTruncator(0, 3, 0);
+ ObjectNode node = MAPPER.createObjectNode();
+ ArrayNode array = MAPPER.createArrayNode();
+ for (int i = 0; i < 7; i++) {
+ array.add("item" + i);
+ }
+ node.set("items", array);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ assertThat(node.get("items").isObject()).isTrue();
+ assertThat(node.get("items").get("truncatedList").size()).isEqualTo(3);
+ assertThat(node.get("items").get("truncatedList").get(0).asText()).isEqualTo("item0");
+ assertThat(node.get("items").get("truncatedList").get(1).asText()).isEqualTo("item1");
+ assertThat(node.get("items").get("truncatedList").get(2).asText()).isEqualTo("item2");
+ assertThat(node.get("items").get("omittedElements").asInt()).isEqualTo(4);
+ }
+
+ @Test
+ void testDepthCollapsing() {
+ JsonTruncator truncator = new JsonTruncator(0, 0, 2);
+ ObjectNode node = MAPPER.createObjectNode();
+ ObjectNode nested = MAPPER.createObjectNode();
+ nested.put("scalarField", "value");
+ nested.put("numberField", 42);
+ ObjectNode deepNested = MAPPER.createObjectNode();
+ deepNested.put("deep", "data");
+ nested.set("nestedObj", deepNested);
+ ArrayNode nestedArray = MAPPER.createArrayNode();
+ nestedArray.add("a");
+ nested.set("nestedArr", nestedArray);
+ node.set("data", nested);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ // The "data" field's nested object should be collapsed at depth 2
+ assertThat(node.get("data").has("truncatedObject")).isTrue();
+ ObjectNode truncatedObj = (ObjectNode) node.get("data").get("truncatedObject");
+ assertThat(truncatedObj.get("scalarField").asText()).isEqualTo("value");
+ assertThat(truncatedObj.get("numberField").asInt()).isEqualTo(42);
+ assertThat(truncatedObj.has("nestedObj")).isFalse();
+ assertThat(truncatedObj.has("nestedArr")).isFalse();
+ assertThat(node.get("data").get("omittedFields").asInt()).isEqualTo(2);
+ }
+
+ @Test
+ void testNoTruncationUnderLimits() {
+ JsonTruncator truncator = new JsonTruncator(100, 10, 5);
+ ObjectNode node = MAPPER.createObjectNode();
+ node.put("short", "hello");
+ ArrayNode smallArray = MAPPER.createArrayNode();
+ smallArray.add("a");
+ smallArray.add("b");
+ node.set("list", smallArray);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isFalse();
+ assertThat(node.get("short").asText()).isEqualTo("hello");
+ assertThat(node.get("list").size()).isEqualTo(2);
+ }
+
+ @Test
+ void testDisabledThresholds() {
+ // All thresholds set to 0 — no truncation should occur
+ JsonTruncator truncator = new JsonTruncator(0, 0, 0);
+ ObjectNode node = MAPPER.createObjectNode();
+ node.put("content", "A".repeat(10000));
+ ArrayNode bigArray = MAPPER.createArrayNode();
+ for (int i = 0; i < 100; i++) {
+ bigArray.add(i);
+ }
+ node.set("items", bigArray);
+ ObjectNode deep = MAPPER.createObjectNode();
+ deep.set("level2", MAPPER.createObjectNode().put("level3", "deep"));
+ node.set("nested", deep);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isFalse();
+ assertThat(node.get("content").asText()).hasSize(10000);
+ assertThat(node.get("items").size()).isEqualTo(100);
+ }
+
+ @Test
+ void testProtectedFields() {
+ JsonTruncator truncator = new JsonTruncator(5, 2, 2);
+ ObjectNode node = MAPPER.createObjectNode();
+ // Protected fields should never be truncated
+ node.put("eventType", "org.apache.flink.agents.api.event.ChatRequestEvent");
+ node.put("id", "a-very-long-identifier-that-exceeds-the-limit");
+ ObjectNode attributes = MAPPER.createObjectNode();
+ attributes.put("key", "value");
+ attributes.set("nested", MAPPER.createObjectNode().put("deep", "data"));
+ node.set("attributes", attributes);
+ // Non-protected field should be truncated
+ node.put("content", "This should be truncated");
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ // Protected fields remain untouched
+ assertThat(node.get("eventType").asText())
+ .isEqualTo("org.apache.flink.agents.api.event.ChatRequestEvent");
+ assertThat(node.get("id").asText())
+ .isEqualTo("a-very-long-identifier-that-exceeds-the-limit");
+ assertThat(node.get("attributes").isObject()).isTrue();
+ assertThat(node.get("attributes").get("key").asText()).isEqualTo("value");
+ // Non-protected field is truncated
+ assertThat(node.get("content").get("truncatedString")).isNotNull();
+ }
+
+ @Test
+ void testCompositeScenario() {
+ JsonTruncator truncator = new JsonTruncator(10, 2, 3);
+ ObjectNode node = MAPPER.createObjectNode();
+ // Long string — should trigger string truncation
+ node.put("message", "Hello, this is a long message from the LLM");
+ // Large array — should trigger array truncation
+ ArrayNode tools = MAPPER.createArrayNode();
+ for (int i = 0; i < 5; i++) {
+ tools.add("tool" + i);
+ }
+ node.set("tools", tools);
+ // Deep nesting — should trigger depth truncation at level 3
+ ObjectNode level1 = MAPPER.createObjectNode();
+ ObjectNode level2 = MAPPER.createObjectNode();
+ level2.put("scalar", "kept");
+ level2.set("tooDeep", MAPPER.createObjectNode().put("hidden", "gone"));
+ level1.set("inner", level2);
+ node.set("metadata", level1);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ // String truncation applied
+ assertThat(node.get("message").get("truncatedString")).isNotNull();
+ assertThat(node.get("message").get("omittedChars").asInt()).isGreaterThan(0);
+ // Array truncation applied
+ assertThat(node.get("tools").get("truncatedList").size()).isEqualTo(2);
+ assertThat(node.get("tools").get("omittedElements").asInt()).isEqualTo(3);
+ // Depth truncation: level2 is at depth 3, so it should be collapsed
+ ObjectNode metadataInner = (ObjectNode) node.get("metadata").get("inner");
+ assertThat(metadataInner.has("truncatedObject")).isTrue();
+ assertThat(metadataInner.get("truncatedObject").get("scalar").asText()).isEqualTo("kept");
+ assertThat(metadataInner.get("omittedFields").asInt()).isEqualTo(1);
+ }
+
+ @Test
+ void testRecursionSkipsDroppedTailElements() {
+ // Regression test: when an array exceeds maxArrayElements, the elements beyond the cap
+ // must NOT be recursed into. Prior to the fix, truncateArrayContents ran on the full
+ // array before truncateArray dropped the tail, causing wasted CPU/allocations on
+ // elements that were about to be discarded.
+ JsonTruncator truncator = new JsonTruncator(5, 2, 0);
+ ObjectNode node = MAPPER.createObjectNode();
+ ArrayNode items = MAPPER.createArrayNode();
+ // Two short strings that fit under the retained cap and the string limit.
+ items.add("a");
+ items.add("b");
+ // Three tail ObjectNodes whose long-string fields would be truncated if visited.
+ String longValue = "this-string-is-much-longer-than-five-chars";
+ ObjectNode tail0 = MAPPER.createObjectNode();
+ tail0.put("payload", longValue);
+ ObjectNode tail1 = MAPPER.createObjectNode();
+ tail1.put("payload", longValue);
+ ObjectNode tail2 = MAPPER.createObjectNode();
+ tail2.put("payload", longValue);
+ items.add(tail0);
+ items.add(tail1);
+ items.add(tail2);
+ node.set("items", items);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ // Retained list is exactly the first 2 elements; 3 are dropped.
+ assertThat(node.get("items").get("truncatedList").size()).isEqualTo(2);
+ assertThat(node.get("items").get("truncatedList").get(0).asText()).isEqualTo("a");
+ assertThat(node.get("items").get("truncatedList").get(1).asText()).isEqualTo("b");
+ assertThat(node.get("items").get("omittedElements").asInt()).isEqualTo(3);
+ // The dropped tail ObjectNodes were never visited — their payload fields remain
+ // untouched (still raw strings, not wrapped truncatedString objects). Under the old
+ // pre-reorder code path, these would have been mutated before being discarded.
+ assertThat(tail0.get("payload").isTextual()).isTrue();
+ assertThat(tail0.get("payload").asText()).isEqualTo(longValue);
+ assertThat(tail1.get("payload").isTextual()).isTrue();
+ assertThat(tail1.get("payload").asText()).isEqualTo(longValue);
+ assertThat(tail2.get("payload").isTextual()).isTrue();
+ assertThat(tail2.get("payload").asText()).isEqualTo(longValue);
+ }
+
+ @Test
+ void testNullNode() {
+ JsonTruncator truncator = new JsonTruncator(10, 10, 10);
+
+ boolean result = truncator.truncate(null);
+
+ assertThat(result).isFalse();
+ }
+}