Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,170 @@

package org.apache.eventmesh.common.config;

import org.apache.eventmesh.common.ThreadPoolFactory;

import org.apache.eventmesh.common.file.FileChangeContext;
import org.apache.eventmesh.common.file.FileChangeListener;
import org.apache.eventmesh.common.file.WatchFileManager;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.lang.reflect.Modifier;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ConfigMonitorService {

private static final long TIME_INTERVAL = 30 * 1000L;
private static final Map<String, List<ConfigInfo>> CONFIG_INFO_MAP = new ConcurrentHashMap<>();

private final List<ConfigInfo> configInfoList = new ArrayList<>();
private static final Set<String> DIRECTORY_PATH_SET = ConcurrentHashMap.newKeySet();

private final ScheduledExecutorService configLoader = ThreadPoolFactory.createSingleScheduledExecutor("eventMesh-configLoader");
private static final FileChangeListener CONFIG_FILE_CHANGE_LISTENER = new ConfigMonitorFileChangeListener();

{
configLoader.scheduleAtFixedRate(this::load, TIME_INTERVAL, TIME_INTERVAL, TimeUnit.MILLISECONDS);
static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("[ConfigMonitorService] shutdown, clearing {} entries", CONFIG_INFO_MAP.size());
CONFIG_INFO_MAP.clear();
}));
}

public void monitor(ConfigInfo configInfo) {
configInfoList.add(configInfo);
if (configInfo == null) {
return;
}
String filePath = configInfo.getFilePath();
if (filePath == null) {
log.warn("[ConfigMonitorService] filePath is null, skip monitoring: {}", configInfo);
return;
}

Path path = Paths.get(filePath).toAbsolutePath().normalize();
if (!path.toFile().exists()) {
log.warn("[ConfigMonitorService] config file not exist, skip monitoring: {}", filePath);
return;
}

String normalizedPath = path.toString();
List<ConfigInfo> configInfoList = CONFIG_INFO_MAP.computeIfAbsent(normalizedPath, k -> new CopyOnWriteArrayList<>());
if (!configInfoList.contains(configInfo)) {
configInfoList.add(configInfo);
}
log.info("[ConfigMonitorService] monitoring config file: {}, total {} listener(s)", normalizedPath,
CONFIG_INFO_MAP.get(normalizedPath).size());

Path parentPath = path.getParent();
if (parentPath == null) {
log.warn("[ConfigMonitorService] config file parent path is null, skip monitoring: {}", filePath);
return;
}

String directoryPath = parentPath.toString();
if (DIRECTORY_PATH_SET.add(directoryPath)) {
try {
WatchFileManager.registerFileChangeListener(directoryPath, CONFIG_FILE_CHANGE_LISTENER);
} catch (RuntimeException e) {
DIRECTORY_PATH_SET.remove(directoryPath);
throw e;
}
}
}

public static void load(ConfigInfo configInfo) {
try {
Object object = ConfigService.getInstance().getConfig(configInfo);
if (java.util.Objects.equals(configInfo.getObject(), object)) {
return;
}

if (reloadConfig(configInfo, object)) {
configInfo.setObject(object);
}
log.info("config reload success: {}", object);
} catch (Exception e) {
log.error("config reload failed", e);
}
}

public static void clear() {
CONFIG_INFO_MAP.clear();
for (String directoryPath : DIRECTORY_PATH_SET) {
WatchFileManager.deregisterFileChangeListener(directoryPath, CONFIG_FILE_CHANGE_LISTENER);
}
DIRECTORY_PATH_SET.clear();
}

public void load() {
for (ConfigInfo configInfo : configInfoList) {
private static boolean reloadConfig(ConfigInfo configInfo, Object object) throws IllegalAccessException {
Field field = configInfo.getObjectField();
if (field != null && configInfo.getInstance() != null) {
boolean isAccessible = field.isAccessible();
try {
Object object = ConfigService.getInstance().getConfig(configInfo);
if (configInfo.getObject().equals(object)) {
field.setAccessible(true);
field.set(configInfo.getInstance(), object);
} finally {
field.setAccessible(isAccessible);
}
return true;
}

Object targetObject = configInfo.getObject();
if (targetObject == null) {
return true;
}

Class<?> clazz = object.getClass();
while (clazz != null && !Object.class.equals(clazz)) {
for (Field targetField : clazz.getDeclaredFields()) {
if (Modifier.isStatic(targetField.getModifiers())) {
continue;
}

Field field = configInfo.getObjectField();
boolean isAccessible = field.isAccessible();
boolean isAccessible = targetField.isAccessible();
try {
field.setAccessible(true);
field.set(configInfo.getInstance(), object);
targetField.setAccessible(true);
targetField.set(targetObject, targetField.get(object));
} finally {
field.setAccessible(isAccessible);
targetField.setAccessible(isAccessible);
}

configInfo.setObject(object);
log.info("config reload success: {}", object);
} catch (Exception e) {
log.error("config reload failed", e);
}
clazz = clazz.getSuperclass();
}
return false;
}

public static boolean support(FileChangeContext changeContext) {
String changedFileName = changeContext.getFileName();
String changedFilePath = Paths.get(
changeContext.getDirectoryPath(), changedFileName).toAbsolutePath().normalize().toString();
return CONFIG_INFO_MAP.containsKey(changedFilePath);
}

private static class ConfigMonitorFileChangeListener implements FileChangeListener {

@Override
public void onChanged(FileChangeContext changeContext) {
String changedFileName = changeContext.getFileName();
String changedFilePath = Paths.get(
changeContext.getDirectoryPath(), changedFileName).toAbsolutePath().normalize().toString();

List<ConfigInfo> configInfoList = CONFIG_INFO_MAP.get(changedFilePath);
if (configInfoList == null || configInfoList.isEmpty()) {
return;
}

for (ConfigInfo configInfo : configInfoList) {
configInfo.getObject(); // ensure non-null
load(configInfo);
}
}

@Override
public boolean support(FileChangeContext changeContext) {
return ConfigMonitorService.support(changeContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ public <T> T buildConfigInstance(Class<?> clazz) {
configInfo.setReloadMethodName(config == null ? null : config.reloadMethodName());

try {
return this.getConfig(configInfo);
T configObject = this.getConfig(configInfo);
if (configInfo.isMonitor()) {
configInfo.setObject(configObject);
configMonitorService.monitor(configInfo);
}
return configObject;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.eventmesh.common.file;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,7 +28,7 @@ public class WatchFileManager {

private static final AtomicBoolean CLOSED = new AtomicBoolean(false);

private static final Map<String, WatchFileTask> WATCH_FILE_TASK_MAP = new HashMap<>();
private static final Map<String, WatchFileTask> WATCH_FILE_TASK_MAP = new ConcurrentHashMap<>();

static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand All @@ -38,13 +38,24 @@ public class WatchFileManager {
}

public static void registerFileChangeListener(String directoryPath, FileChangeListener listener) {
WatchFileTask task = WATCH_FILE_TASK_MAP.computeIfAbsent(directoryPath, path -> {
WatchFileTask watchFileTask = new WatchFileTask(path);
watchFileTask.start();
return watchFileTask;
});
task.addFileChangeListener(listener);
}

public static void deregisterFileChangeListener(String directoryPath, FileChangeListener listener) {
WatchFileTask task = WATCH_FILE_TASK_MAP.get(directoryPath);
if (task == null) {
task = new WatchFileTask(directoryPath);
task.start();
WATCH_FILE_TASK_MAP.put(directoryPath, task);
if (task != null) {
task.removeFileChangeListener(listener);
if (task.hasFileChangeListener()) {
return;
}
WATCH_FILE_TASK_MAP.remove(directoryPath, task);
task.shutdown();
}
task.addFileChangeListener(listener);
}

public static void deregisterFileChangeListener(String directoryPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -38,14 +38,15 @@ public class WatchFileTask extends Thread {

private final transient WatchService watchService;

private final transient List<FileChangeListener> fileChangeListeners = new ArrayList<>();
private final transient List<FileChangeListener> fileChangeListeners = new CopyOnWriteArrayList<>();

private transient volatile boolean watch = true;

private final transient String directoryPath;

public WatchFileTask(String directoryPath) {
this.directoryPath = directoryPath;
setDaemon(true);
final Path path = Paths.get(directoryPath);
if (!path.toFile().exists()) {
throw new IllegalArgumentException("file directory not exist: " + directoryPath);
Expand Down Expand Up @@ -74,11 +75,19 @@ public WatchFileTask(String directoryPath) {
}

public void addFileChangeListener(FileChangeListener fileChangeListener) {
if (fileChangeListener != null) {
if (fileChangeListener != null && !fileChangeListeners.contains(fileChangeListener)) {
fileChangeListeners.add(fileChangeListener);
}
}

public void removeFileChangeListener(FileChangeListener fileChangeListener) {
fileChangeListeners.remove(fileChangeListener);
}

public boolean hasFileChangeListener() {
return !fileChangeListeners.isEmpty();
}

public void shutdown() {
watch = false;
try {
Expand Down Expand Up @@ -114,7 +123,9 @@ public void run() {
log.debug("[WatchFileTask] file watch is interrupted");
}
} catch (Exception ex) {
log.error("[WatchFileTask] an exception occurred during file listening : ", ex);
if (watch) {
log.error("[WatchFileTask] an exception occurred during file listening : ", ex);
}
}
}
}
Expand Down
Loading