From e38f218f754e4d5725ae0274c199b169f17fc3c7 Mon Sep 17 00:00:00 2001 From: supersaiyan <1714264078@qq.com> Date: Wed, 1 Apr 2026 16:57:38 +0800 Subject: [PATCH 1/2] feat: add dynamic config hot-reload using JDK WatchService - Add ConfigMonitorService to monitor config file changes - Integrate with WatchFileManager for file system event handling - Support multiple listeners per config file - Add clean shutdown handling via JVM shutdown hook --- .../common/config/ConfigMonitorService.java | 119 ++++-- .../config/ConfigMonitorServiceTest.java | 353 ++++++++++++++++++ 2 files changed, 441 insertions(+), 31 deletions(-) create mode 100644 eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigMonitorServiceTest.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigMonitorService.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigMonitorService.java index 5c4000b60c..dd676f7707 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigMonitorService.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigMonitorService.java @@ -17,56 +17,113 @@ package org.apache.eventmesh.common.config; -import org.apache.eventmesh.common.ThreadPoolFactory; + +import org.apache.eventmesh.common.file.FileChangeContext; +import org.apache.eventmesh.common.file.FileChangeListener; +import org.apache.eventmesh.common.file.WatchFileManager; import java.lang.reflect.Field; -import java.util.ArrayList; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import lombok.extern.slf4j.Slf4j; @Slf4j public class ConfigMonitorService { - private static final long TIME_INTERVAL = 30 * 1000L; - - private final List configInfoList = new ArrayList<>(); + private static final Map> CONFIG_INFO_MAP = new ConcurrentHashMap<>(); - private final ScheduledExecutorService configLoader = ThreadPoolFactory.createSingleScheduledExecutor("eventMesh-configLoader"); + private static final FileChangeListener CONFIG_FILE_CHANGE_LISTENER = new ConfigMonitorFileChangeListener(); - { - configLoader.scheduleAtFixedRate(this::load, TIME_INTERVAL, TIME_INTERVAL, TimeUnit.MILLISECONDS); + static { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("[ConfigMonitorService] shutdown, clearing {} entries", CONFIG_INFO_MAP.size()); + CONFIG_INFO_MAP.clear(); + })); } public void monitor(ConfigInfo configInfo) { - configInfoList.add(configInfo); + String filePath = configInfo.getFilePath(); + if (filePath == null) { + log.warn("[ConfigMonitorService] filePath is null, skip monitoring: {}", configInfo); + return; + } + + Path path = Paths.get(filePath); + if (!path.toFile().exists()) { + log.warn("[ConfigMonitorService] config file not exist, skip monitoring: {}", filePath); + return; + } + + String normalizedPath = path.toAbsolutePath().normalize().toString(); + CONFIG_INFO_MAP.computeIfAbsent(normalizedPath, k -> new CopyOnWriteArrayList<>()).add(configInfo); + log.info("[ConfigMonitorService] monitoring config file: {}, total {} listener(s)", normalizedPath, + CONFIG_INFO_MAP.get(normalizedPath).size()); + + String directoryPath = path.getParent().toString(); + WatchFileManager.registerFileChangeListener(directoryPath, CONFIG_FILE_CHANGE_LISTENER); } - public void load() { - for (ConfigInfo configInfo : configInfoList) { + public static void load(ConfigInfo configInfo) { + try { + Object object = ConfigService.getInstance().getConfig(configInfo); + if (java.util.Objects.equals(configInfo.getObject(), object)) { + return; + } + + Field field = configInfo.getObjectField(); + boolean isAccessible = field.isAccessible(); try { - Object object = ConfigService.getInstance().getConfig(configInfo); - if (configInfo.getObject().equals(object)) { - continue; - } - - Field field = configInfo.getObjectField(); - boolean isAccessible = field.isAccessible(); - try { - field.setAccessible(true); - field.set(configInfo.getInstance(), object); - } finally { - field.setAccessible(isAccessible); - } - - configInfo.setObject(object); - log.info("config reload success: {}", object); - } catch (Exception e) { - log.error("config reload failed", e); + field.setAccessible(true); + field.set(configInfo.getInstance(), object); + } finally { + field.setAccessible(isAccessible); } + + configInfo.setObject(object); + log.info("config reload success: {}", object); + } catch (Exception e) { + log.error("config reload failed", e); } } + public static void clear() { + CONFIG_INFO_MAP.clear(); + } + + public static boolean support(FileChangeContext changeContext) { + String changedFileName = changeContext.getFileName(); + String changedFilePath = Paths.get( + changeContext.getDirectoryPath(), changedFileName).toAbsolutePath().normalize().toString(); + return CONFIG_INFO_MAP.containsKey(changedFilePath); + } + + private static class ConfigMonitorFileChangeListener implements FileChangeListener { + + @Override + public void onChanged(FileChangeContext changeContext) { + String changedFileName = changeContext.getFileName(); + String changedFilePath = Paths.get( + changeContext.getDirectoryPath(), changedFileName).toAbsolutePath().normalize().toString(); + + List configInfoList = CONFIG_INFO_MAP.get(changedFilePath); + if (configInfoList == null || configInfoList.isEmpty()) { + return; + } + + for (ConfigInfo configInfo : configInfoList) { + configInfo.getObject(); // ensure non-null + load(configInfo); + } + } + + @Override + public boolean support(FileChangeContext changeContext) { + return ConfigMonitorService.support(changeContext); + } + } } diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigMonitorServiceTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigMonitorServiceTest.java new file mode 100644 index 0000000000..9ea69a0078 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigMonitorServiceTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.eventmesh.common.file.FileChangeContext; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +/** + * {@link ConfigMonitorService} test class. + * + *

Tests: + *

    + *
  • Registration and unregistration of config files
  • + *
  • File change triggers config hot-reload
  • + *
  • Defense against null/non-existent paths
  • + *
  • Static methods clear() and support()
  • + *
+ * + *

Strategy: JUnit5 + temp files, no external config service dependency.

+ */ +public class ConfigMonitorServiceTest { + + private ConfigMonitorService configMonitorService; + + /** + * Simple config object for testing hot-reload scenarios. + */ + public static class TestMonitorConfig { + private String name = "init"; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } + + @BeforeEach + void setUp() { + // Clear static registry before each test to prevent state pollution + configMonitorService = new ConfigMonitorService(); + ConfigMonitorService.clear(); + } + + @AfterEach + void tearDown() { + // Clear registry after test + ConfigMonitorService.clear(); + } + + /** + * Tests normal registration flow of monitor(). + * + *

Scenario: Pass existing config file path, expect successful registration with logs.

+ *

Verification points: + *

    + *
  • No exception thrown
  • + *
  • Repeated registration allowed (CopyOnWriteArrayList append)
  • + *
+ */ + @Test + @DisplayName("monitor() - 正常注册已存在的配置文件") + void testMonitorNormal() throws Exception { + Path tempFile = Files.createTempFile("test-monitor", ".properties"); + tempFile.toFile().createNewFile(); + + ConfigInfo configInfo = buildConfigInfo(tempFile.toString(), new TestMonitorConfig()); + + // First registration + configMonitorService.monitor(configInfo); + + // Second registration with same path - verify append mechanism (no exception) + configMonitorService.monitor(configInfo); + + Files.deleteIfExists(tempFile); + } + + /** + * Tests defense handling of null file path in monitor(). + * + *

Scenario: filePath is null, method returns early with warning log.

+ *

Verification: No exception, early termination.

+ */ + @Test + @DisplayName("monitor() - filePath 为 null 时跳过监控") + void testMonitorNullFilePath() { + ConfigInfo configInfo = new ConfigInfo(); + configInfo.setFilePath(null); + + // Should not throw exception, return directly + configMonitorService.monitor(configInfo); + } + + /** + * Tests defense handling of non-existent file in monitor(). + * + *

Scenario: Pass non-existent file path, method returns early with warning log.

+ *

Verification: No exception, early termination.

+ */ + @Test + @DisplayName("monitor() - 文件不存在时跳过监控") + void testMonitorFileNotExist() { + ConfigInfo configInfo = new ConfigInfo(); + // Use definitely non-existent path in system + configInfo.setFilePath("/this/path/does/not/exist/unknown.properties"); + + configMonitorService.monitor(configInfo); + } + + /** + * Tests clear() removes all registered monitoring items. + * + *

Scenario: Register multiple files then call clear(), verify registry is empty.

+ *

Verification: After clear(), support() returns false for all paths.

+ */ + @Test + @DisplayName("clear() - 清除所有已注册的监控项") + void testClear() throws Exception { + Path tempFile1 = Files.createTempFile("test-clear-1", ".properties"); + Path tempFile2 = Files.createTempFile("test-clear-2", ".properties"); + tempFile1.toFile().createNewFile(); + tempFile2.toFile().createNewFile(); + + configMonitorService.monitor(buildConfigInfo(tempFile1.toString(), new TestMonitorConfig())); + configMonitorService.monitor(buildConfigInfo(tempFile2.toString(), new TestMonitorConfig())); + + // Verify registration success + FileChangeContext ctx1 = new FileChangeContext(); + ctx1.setDirectoryPath(tempFile1.getParent().toString()); + ctx1.setFileName(tempFile1.getFileName().toString()); + Assertions.assertTrue(ConfigMonitorService.support(ctx1)); + + // Execute clear + ConfigMonitorService.clear(); + + // Verify cleared + Assertions.assertFalse(ConfigMonitorService.support(ctx1)); + + Files.deleteIfExists(tempFile1); + Files.deleteIfExists(tempFile2); + } + + /** + * Tests support() returns true for registered paths, false for unregistered. + * + *

Verification: + *

    + *
  • Monitored file returns true
  • + *
  • Unmonitored file returns false
  • + *
+ */ + @Test + @DisplayName("support() - 正确判断文件是否已注册监控") + void testSupport() throws Exception { + Path tempFile = Files.createTempFile("test-support", ".properties"); + tempFile.toFile().createNewFile(); + String normalizedPath = tempFile.toAbsolutePath().normalize().toString(); + + FileChangeContext registeredCtx = new FileChangeContext(); + registeredCtx.setDirectoryPath(tempFile.getParent().toString()); + registeredCtx.setFileName(tempFile.getFileName().toString()); + + FileChangeContext unregisteredCtx = new FileChangeContext(); + unregisteredCtx.setDirectoryPath(tempFile.getParent().toString()); + unregisteredCtx.setFileName("not-monitored.properties"); + + // Before registration: both unsupported + Assertions.assertFalse(ConfigMonitorService.support(registeredCtx)); + Assertions.assertFalse(ConfigMonitorService.support(unregisteredCtx)); + + // After registration: only registered path supported + configMonitorService.monitor(buildConfigInfo(normalizedPath, new TestMonitorConfig())); + Assertions.assertTrue(ConfigMonitorService.support(registeredCtx)); + Assertions.assertFalse(ConfigMonitorService.support(unregisteredCtx)); + + Files.deleteIfExists(tempFile); + } + + /** + * Tests file change triggers hot-reload via ConfigMonitorFileChangeListener. + * + *

Scenario: Monitor a config file, when content changes (onChanged triggered), + * the object reference in ConfigInfo should update to new value.

+ * + *

Verification: + *

    + *
  • After file change, monitored object field value updates
  • + *
  • Repeated onChanged triggers don't cause exception
  • + *
+ */ + @Test + @DisplayName("ConfigMonitorFileChangeListener.onChanged() - 文件变更触发热重载") + void testOnChangedTriggersReload() throws Exception { + // Create temp properties file with initial content + Path tempFile = Files.createTempFile("test-reload", ".properties"); + String originalValue = "test-value-original"; + String updatedValue = "test-value-updated"; + writeProperty(tempFile, "test.key", originalValue); + + TestMonitorConfig config = new TestMonitorConfig(); + config.setName(originalValue); + + ConfigInfo configInfo = buildConfigInfo(tempFile.toString(), config); + configMonitorService.monitor(configInfo); + + // Simulate file content changed externally + writeProperty(tempFile, "test.key", updatedValue); + + // Build change context and trigger onChanged + FileChangeContext changeContext = new FileChangeContext(); + changeContext.setDirectoryPath(tempFile.getParent().toString()); + changeContext.setFileName(tempFile.getFileName().toString()); + + // Get internal listener and trigger change notification + // Simulate WatchFileManager notification behavior via support + onChanged + if (ConfigMonitorService.support(changeContext)) { + // 通过 ConfigService 触发 load(模拟 WatchFileManager 调用路径) + ConfigMonitorService.load(configInfo); + } + + // Verify object field updated (depends on ConfigService reload) + // Note: ConfigService.getConfig reads real file, config.getName() should reflect latest value + // In actual hot-reload, reload() re-reads file and injects into corresponding field + + Files.deleteIfExists(tempFile); + } + + /** + * Tests exception handling in load() method. + * + *

Scenario: When ConfigService.getConfig throws exception (e.g., file deleted), + * load() should catch exception and log error, not propagate to caller.

+ * + *

Verification: load() completes without throwing exception, only logs error.

+ */ + @Test + @DisplayName("load() - 文件不存在时捕获异常不外抛") + void testLoadWithException() { + ConfigInfo configInfo = new ConfigInfo(); + configInfo.setPath("/non/exist/path.properties"); + configInfo.setClazz(TestMonitorConfig.class); + configInfo.setObject(new TestMonitorConfig()); + configInfo.setInstance(new TestMonitorConfig()); + + // Even if file doesn't exist, load should not throw exception + ConfigMonitorService.load(configInfo); + } + + /** + * Tests load() returns early when config value unchanged. + * + *

Scenario: When object returned by getConfig equals current configInfo.object, + * field write operation should be skipped, return directly.

+ * + *

Verification: Method returns normally, no "config reload success" log.

+ */ + @Test + @DisplayName("load() - 配置未变化时跳过重载") + void testLoadSkipWhenSame() throws Exception { + Path tempFile = Files.createTempFile("test-skip-reload", ".properties"); + writeProperty(tempFile, "test.key", "unchanged"); + tempFile.toFile().deleteOnExit(); + + TestMonitorConfig config = new TestMonitorConfig(); + config.setName("unchanged"); + + ConfigInfo configInfo = buildConfigInfo(tempFile.toString(), config); + + // Simulate ConfigService loaded same object (equals returns true) + // This scenario relies on ConfigService.getConfig returning same reference, load should return early + ConfigMonitorService.load(configInfo); + + Files.deleteIfExists(tempFile); + } + + // ======================== Helper Methods ======================== + + /** + * Builds complete ConfigInfo object. + * + * @param filePath config file path (absolute path) + * @param instance runtime object holding config fields + * @return ConfigInfo instance with all necessary fields populated + */ + private ConfigInfo buildConfigInfo(String filePath, Object instance) throws Exception { + ConfigInfo configInfo = new ConfigInfo(); + configInfo.setFilePath(filePath); + configInfo.setInstance(instance); + + // Find first String field in instance and set as objectField + // Simulates real ConfigInfo initialization in ConfigService.populateConfigForObject + Field objectField = findFirstField(instance.getClass(), String.class); + if (objectField != null) { + objectField.setAccessible(true); + configInfo.setObjectField(objectField); + configInfo.setObject(objectField.get(instance)); + } + + return configInfo; + } + + /** + * Finds first field of targetType in clazz. + */ + private Field findFirstField(Class clazz, Class targetType) { + for (Field field : clazz.getDeclaredFields()) { + if (field.getType().equals(targetType)) { + return field; + } + } + return null; + } + + /** + * Writes single key-value pair to temp properties file. + */ + private void writeProperty(Path file, String key, String value) throws Exception { + try (OutputStream os = new FileOutputStream(file.toFile())) { + java.util.Properties props = new java.util.Properties(); + props.setProperty(key, value); + props.store(os, "test properties"); + } + } +} From 7bc8a83d36aa4c4199b48163fe79bfd40b0ea4f7 Mon Sep 17 00:00:00 2001 From: SUPERSAIYAN <1714264078@qq.com> Date: Wed, 3 Jun 2026 11:20:05 +0800 Subject: [PATCH 2/2] fix: harden config watch service reload --- .../common/config/ConfigMonitorService.java | 83 +++++++++++++--- .../common/config/ConfigService.java | 7 +- .../common/file/WatchFileManager.java | 25 +++-- .../eventmesh/common/file/WatchFileTask.java | 19 +++- .../config/ConfigMonitorServiceTest.java | 96 +++++++++++++++---- 5 files changed, 188 insertions(+), 42 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigMonitorService.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigMonitorService.java index dd676f7707..7128b472a1 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigMonitorService.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigMonitorService.java @@ -23,10 +23,12 @@ import org.apache.eventmesh.common.file.WatchFileManager; import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -37,6 +39,8 @@ public class ConfigMonitorService { private static final Map> CONFIG_INFO_MAP = new ConcurrentHashMap<>(); + private static final Set DIRECTORY_PATH_SET = ConcurrentHashMap.newKeySet(); + private static final FileChangeListener CONFIG_FILE_CHANGE_LISTENER = new ConfigMonitorFileChangeListener(); static { @@ -47,25 +51,44 @@ public class ConfigMonitorService { } public void monitor(ConfigInfo configInfo) { + if (configInfo == null) { + return; + } String filePath = configInfo.getFilePath(); if (filePath == null) { log.warn("[ConfigMonitorService] filePath is null, skip monitoring: {}", configInfo); return; } - Path path = Paths.get(filePath); + Path path = Paths.get(filePath).toAbsolutePath().normalize(); if (!path.toFile().exists()) { log.warn("[ConfigMonitorService] config file not exist, skip monitoring: {}", filePath); return; } - String normalizedPath = path.toAbsolutePath().normalize().toString(); - CONFIG_INFO_MAP.computeIfAbsent(normalizedPath, k -> new CopyOnWriteArrayList<>()).add(configInfo); + String normalizedPath = path.toString(); + List configInfoList = CONFIG_INFO_MAP.computeIfAbsent(normalizedPath, k -> new CopyOnWriteArrayList<>()); + if (!configInfoList.contains(configInfo)) { + configInfoList.add(configInfo); + } log.info("[ConfigMonitorService] monitoring config file: {}, total {} listener(s)", normalizedPath, CONFIG_INFO_MAP.get(normalizedPath).size()); - String directoryPath = path.getParent().toString(); - WatchFileManager.registerFileChangeListener(directoryPath, CONFIG_FILE_CHANGE_LISTENER); + Path parentPath = path.getParent(); + if (parentPath == null) { + log.warn("[ConfigMonitorService] config file parent path is null, skip monitoring: {}", filePath); + return; + } + + String directoryPath = parentPath.toString(); + if (DIRECTORY_PATH_SET.add(directoryPath)) { + try { + WatchFileManager.registerFileChangeListener(directoryPath, CONFIG_FILE_CHANGE_LISTENER); + } catch (RuntimeException e) { + DIRECTORY_PATH_SET.remove(directoryPath); + throw e; + } + } } public static void load(ConfigInfo configInfo) { @@ -75,7 +98,26 @@ public static void load(ConfigInfo configInfo) { return; } - Field field = configInfo.getObjectField(); + if (reloadConfig(configInfo, object)) { + configInfo.setObject(object); + } + log.info("config reload success: {}", object); + } catch (Exception e) { + log.error("config reload failed", e); + } + } + + public static void clear() { + CONFIG_INFO_MAP.clear(); + for (String directoryPath : DIRECTORY_PATH_SET) { + WatchFileManager.deregisterFileChangeListener(directoryPath, CONFIG_FILE_CHANGE_LISTENER); + } + DIRECTORY_PATH_SET.clear(); + } + + private static boolean reloadConfig(ConfigInfo configInfo, Object object) throws IllegalAccessException { + Field field = configInfo.getObjectField(); + if (field != null && configInfo.getInstance() != null) { boolean isAccessible = field.isAccessible(); try { field.setAccessible(true); @@ -83,16 +125,31 @@ public static void load(ConfigInfo configInfo) { } finally { field.setAccessible(isAccessible); } + return true; + } - configInfo.setObject(object); - log.info("config reload success: {}", object); - } catch (Exception e) { - log.error("config reload failed", e); + Object targetObject = configInfo.getObject(); + if (targetObject == null) { + return true; } - } - public static void clear() { - CONFIG_INFO_MAP.clear(); + Class clazz = object.getClass(); + while (clazz != null && !Object.class.equals(clazz)) { + for (Field targetField : clazz.getDeclaredFields()) { + if (Modifier.isStatic(targetField.getModifiers())) { + continue; + } + boolean isAccessible = targetField.isAccessible(); + try { + targetField.setAccessible(true); + targetField.set(targetObject, targetField.get(object)); + } finally { + targetField.setAccessible(isAccessible); + } + } + clazz = clazz.getSuperclass(); + } + return false; } public static boolean support(FileChangeContext changeContext) { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java index 3f3f609a1f..681b734a7e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java @@ -91,7 +91,12 @@ public T buildConfigInstance(Class clazz) { configInfo.setReloadMethodName(config == null ? null : config.reloadMethodName()); try { - return this.getConfig(configInfo); + T configObject = this.getConfig(configInfo); + if (configInfo.isMonitor()) { + configInfo.setObject(configObject); + configMonitorService.monitor(configInfo); + } + return configObject; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileManager.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileManager.java index 77dd5cf981..366160a2bd 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileManager.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileManager.java @@ -17,8 +17,8 @@ package org.apache.eventmesh.common.file; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; @@ -28,7 +28,7 @@ public class WatchFileManager { private static final AtomicBoolean CLOSED = new AtomicBoolean(false); - private static final Map WATCH_FILE_TASK_MAP = new HashMap<>(); + private static final Map WATCH_FILE_TASK_MAP = new ConcurrentHashMap<>(); static { Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -38,13 +38,24 @@ public class WatchFileManager { } public static void registerFileChangeListener(String directoryPath, FileChangeListener listener) { + WatchFileTask task = WATCH_FILE_TASK_MAP.computeIfAbsent(directoryPath, path -> { + WatchFileTask watchFileTask = new WatchFileTask(path); + watchFileTask.start(); + return watchFileTask; + }); + task.addFileChangeListener(listener); + } + + public static void deregisterFileChangeListener(String directoryPath, FileChangeListener listener) { WatchFileTask task = WATCH_FILE_TASK_MAP.get(directoryPath); - if (task == null) { - task = new WatchFileTask(directoryPath); - task.start(); - WATCH_FILE_TASK_MAP.put(directoryPath, task); + if (task != null) { + task.removeFileChangeListener(listener); + if (task.hasFileChangeListener()) { + return; + } + WATCH_FILE_TASK_MAP.remove(directoryPath, task); + task.shutdown(); } - task.addFileChangeListener(listener); } public static void deregisterFileChangeListener(String directoryPath) { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileTask.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileTask.java index 3de204e3db..f36176c2ba 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileTask.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileTask.java @@ -26,8 +26,8 @@ import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import lombok.extern.slf4j.Slf4j; @@ -38,7 +38,7 @@ public class WatchFileTask extends Thread { private final transient WatchService watchService; - private final transient List fileChangeListeners = new ArrayList<>(); + private final transient List fileChangeListeners = new CopyOnWriteArrayList<>(); private transient volatile boolean watch = true; @@ -46,6 +46,7 @@ public class WatchFileTask extends Thread { public WatchFileTask(String directoryPath) { this.directoryPath = directoryPath; + setDaemon(true); final Path path = Paths.get(directoryPath); if (!path.toFile().exists()) { throw new IllegalArgumentException("file directory not exist: " + directoryPath); @@ -74,11 +75,19 @@ public WatchFileTask(String directoryPath) { } public void addFileChangeListener(FileChangeListener fileChangeListener) { - if (fileChangeListener != null) { + if (fileChangeListener != null && !fileChangeListeners.contains(fileChangeListener)) { fileChangeListeners.add(fileChangeListener); } } + public void removeFileChangeListener(FileChangeListener fileChangeListener) { + fileChangeListeners.remove(fileChangeListener); + } + + public boolean hasFileChangeListener() { + return !fileChangeListeners.isEmpty(); + } + public void shutdown() { watch = false; try { @@ -114,7 +123,9 @@ public void run() { log.debug("[WatchFileTask] file watch is interrupted"); } } catch (Exception ex) { - log.error("[WatchFileTask] an exception occurred during file listening : ", ex); + if (watch) { + log.error("[WatchFileTask] an exception occurred during file listening : ", ex); + } } } } diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigMonitorServiceTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigMonitorServiceTest.java index 9ea69a0078..d598c43411 100644 --- a/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigMonitorServiceTest.java +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigMonitorServiceTest.java @@ -52,7 +52,9 @@ public class ConfigMonitorServiceTest { /** * Simple config object for testing hot-reload scenarios. */ + @Config(prefix = "test") public static class TestMonitorConfig { + @ConfigField(field = "key") private String name = "init"; public String getName() { @@ -64,6 +66,30 @@ public void setName(String name) { } } + public static class TestMonitorHolder { + + private TestMonitorConfig config; + + public TestMonitorConfig getConfig() { + return config; + } + + public void setConfig(TestMonitorConfig config) { + this.config = config; + } + } + + @Config(prefix = "test", path = "monitor-test.properties", monitor = true) + public static class BuildConfigMonitorConfig { + + @ConfigField(field = "key") + private String name = "init"; + + public String getName() { + return name; + } + } + @BeforeEach void setUp() { // Clear static registry before each test to prevent state pollution @@ -226,32 +252,49 @@ void testOnChangedTriggersReload() throws Exception { String updatedValue = "test-value-updated"; writeProperty(tempFile, "test.key", originalValue); + TestMonitorHolder holder = new TestMonitorHolder(); TestMonitorConfig config = new TestMonitorConfig(); - config.setName(originalValue); + holder.setConfig(config); - ConfigInfo configInfo = buildConfigInfo(tempFile.toString(), config); + ConfigInfo configInfo = buildConfigInfo(tempFile.toString(), holder); configMonitorService.monitor(configInfo); // Simulate file content changed externally writeProperty(tempFile, "test.key", updatedValue); - // Build change context and trigger onChanged - FileChangeContext changeContext = new FileChangeContext(); - changeContext.setDirectoryPath(tempFile.getParent().toString()); - changeContext.setFileName(tempFile.getFileName().toString()); + waitUntil(() -> updatedValue.equals(holder.getConfig().getName())); + Assertions.assertEquals(updatedValue, holder.getConfig().getName()); - // Get internal listener and trigger change notification - // Simulate WatchFileManager notification behavior via support + onChanged - if (ConfigMonitorService.support(changeContext)) { - // 通过 ConfigService 触发 load(模拟 WatchFileManager 调用路径) - ConfigMonitorService.load(configInfo); - } + Files.deleteIfExists(tempFile); + } + + @Test + @DisplayName("buildConfigInstance() - monitor=true 时文件变更更新原配置对象") + void testBuildConfigInstanceMonitor() throws Exception { + Path tempDir = Files.createTempDirectory("test-build-monitor"); + Path tempFile = tempDir.resolve("monitor-test.properties"); + writeProperty(tempFile, "test.key", "before"); + + try { + ConfigService.getInstance().setConfigPath(tempDir.toString()); + BuildConfigMonitorConfig config = ConfigService.getInstance().buildConfigInstance(BuildConfigMonitorConfig.class); + Assertions.assertEquals("before", config.getName()); - // Verify object field updated (depends on ConfigService reload) - // Note: ConfigService.getConfig reads real file, config.getName() should reflect latest value - // In actual hot-reload, reload() re-reads file and injects into corresponding field + writeProperty(tempFile, "test.key", "after"); + + waitUntil(() -> "after".equals(config.getName())); + Assertions.assertEquals("after", config.getName()); + + writeProperty(tempFile, "test.key", "final"); + + waitUntil(() -> "final".equals(config.getName())); + Assertions.assertEquals("final", config.getName()); + } finally { + ConfigService.getInstance().setConfigPath(null); + } Files.deleteIfExists(tempFile); + Files.deleteIfExists(tempDir); } /** @@ -314,11 +357,14 @@ void testLoadSkipWhenSame() throws Exception { private ConfigInfo buildConfigInfo(String filePath, Object instance) throws Exception { ConfigInfo configInfo = new ConfigInfo(); configInfo.setFilePath(filePath); + configInfo.setPath(ConfigService.FILE_PATH_PREFIX + filePath); + configInfo.setClazz(TestMonitorConfig.class); + configInfo.setPrefix("test"); configInfo.setInstance(instance); - // Find first String field in instance and set as objectField + // Find first TestMonitorConfig field in instance and set as objectField // Simulates real ConfigInfo initialization in ConfigService.populateConfigForObject - Field objectField = findFirstField(instance.getClass(), String.class); + Field objectField = findFirstField(instance.getClass(), TestMonitorConfig.class); if (objectField != null) { objectField.setAccessible(true); configInfo.setObjectField(objectField); @@ -350,4 +396,20 @@ private void writeProperty(Path file, String key, String value) throws Exception props.store(os, "test properties"); } } + + private void waitUntil(Check check) throws Exception { + long deadline = System.currentTimeMillis() + 15_000; + while (System.currentTimeMillis() < deadline) { + if (check.success()) { + return; + } + Thread.sleep(100); + } + Assertions.fail("condition was not met before timeout"); + } + + private interface Check { + + boolean success(); + } }