diff --git a/README.md b/README.md index 73fa6b6..83cf520 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,7 @@ Redis 只负责协调(通知谁该重试),数据库的 fencing token + ver 代理插件不是必须的。后端插件可独立运行。安装后获得: - `/fastsync status`:代理端聚合查看所有后端健康状态 - `/fastsync players`:查看所有玩家当前所在子服 -- 玩家切服通知 +- handoff 通知:代理记录切服关系并通知新后端;锁等待与最终放行由后端登录门禁完成 ### 4. 基本配置 @@ -121,6 +121,7 @@ Redis 只负责协调(通知谁该重试),数据库的 fencing token + ver ```yaml server-name: "survival-1" # 每个子服唯一标识 +cluster-id: "survival-main" # 同一逻辑集群保持一致,不能为空 database: host: "mysql.example.com" @@ -171,7 +172,6 @@ redis: | `ssl` | `false` | SSL 加密 | | `timeout` | `5000` | 连接超时(ms) | | `channel-prefix` | `fastsync:lock:` | Pub/Sub 频道前缀 | -| `cache-enabled` | `false` | Redis 数据缓存 | | `streams-enabled` | `true` | Redis Streams 可靠事件 | | `stream-maxlen` | `100000` | Stream 最大条数,0 = 不裁剪 | | `stream-trim-approx` | `true` | 近似裁剪(~MAXLEN),性能更好 | @@ -199,7 +199,6 @@ redis: | `heartbeat-interval-seconds` | `10` | 心跳间隔(自动校正 ≤ lock-timeout/3) | | `lock-retry-interval-ms` | `300` | 锁重试间隔 | | `lock-max-retries` | `15` | 锁最大重试次数 | -| `save-delay` | `0` | 保存延迟(tick) | | `clear-before-apply` | `true` | 应用前清空(防复制) | | `periodic-save` | `false` | 周期保存开关 | | `periodic-save-interval-seconds` | `300` | 周期保存间隔 | @@ -244,16 +243,16 @@ redis: |--------|--------|------| | `enabled` | `true` | 冲突快照开关 | | `max-snapshots` | `16` | 每玩家最大快照数 | -| `backup-frequency-ms` | `14400000` | 定期备份间隔(4 小时) | +| `backup-frequency-ms` | `14400000` | `save-trigger` 命中后的每玩家最小快照间隔(4 小时;设为 0 不限频) | | `save-trigger` | `never` | `never` / `always` / 逗号分隔原因列表 | ### 集群 (`cluster-id`) ```yaml -cluster-id: "" +cluster-id: "survival-main" ``` -cluster-id 只隔离 Redis 消息(topic/stream/consumer group),**不隔离数据库行**。非空 cluster-id + 默认 `fastsync_` table-prefix = **拒绝启动**。多集群必须使用不同 table-prefix 或不同 database。 +`cluster-id` 是必填的集群身份:它同时进入数据库复合主键,并隔离 Redis topic、stream 与 consumer group。同一逻辑集群的所有后端必须使用相同值;不同集群可在同一数据库与默认 `fastsync_` table-prefix 下安全共存。 ### 操作日志 (`operation-log:`) @@ -273,9 +272,9 @@ cluster-id 只隔离 Redis 消息(topic/stream/consumer group),**不隔离 | 命令 | 说明 | |------|------| -| `/fastsync reload` | 重载配置(重置心跳任务、保护模式) | +| `/fastsync reload` | 事务式热重载;心跳/周期保存会重排,DB、cluster、Redis、线程池等启动期参数变化会拒绝并提示重启 | | `/fastsync status` | 查看 DB/Redis 状态、在线玩家、pending 数、final-save 队列/fallback 计数、OpLog 状态、HikariCP 池、延迟百分位 | -| `/fastsync debug` | 开关调试模式 | +| `/fastsync debug` | 开关本次运行期调试模式(持久化请编辑 config.yml) | | `/fastsync saveall` | 强制保存所有在线玩家(Folia 安全两阶段) | | `/fastsync log [n]` | 查看玩家操作日志(默认 20 条,最多 50) | diff --git a/build.gradle.kts b/build.gradle.kts index b66fcfa..19874cc 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -224,8 +224,12 @@ tasks.named("build") { val ci = tasks.register("ci") { group = "ci" - description = "Full CI pipeline: clean, build, and test." - dependsOn("clean", "build", "check") + description = "Full CI pipeline: build, package, and test." + // Java's build lifecycle already depends on both assemble and check. Do not + // add clean as a sibling dependency: with org.gradle.parallel=true Gradle + // may delete class outputs while compileTestJava/shadowJar are reading them. + // Call `./gradlew clean ci` when an explicitly clean local build is needed. + dependsOn("build") } val testGroup = tasks.register("testGroup") { @@ -273,6 +277,15 @@ dependencies { } sourceSets["velocity"].compileClasspath += velocityOnly +// Let the standard JUnit suite exercise protocol/config code from the custom +// Velocity source set. Without this bridge, proxy behavior compiled but had no +// executable regression tests at all. +sourceSets["test"].compileClasspath += sourceSets["velocity"].output + velocityOnly +sourceSets["test"].runtimeClasspath += sourceSets["velocity"].output + velocityOnly +tasks.named("compileTestJava") { + dependsOn("velocityClasses") +} + val velocityJar = tasks.register("velocityJar") { group = "build" description = "Packages the Velocity proxy plugin JAR." diff --git a/src/main/java/com/fastsync/FastSync.java b/src/main/java/com/fastsync/FastSync.java index 18781c1..98d7329 100644 --- a/src/main/java/com/fastsync/FastSync.java +++ b/src/main/java/com/fastsync/FastSync.java @@ -95,6 +95,37 @@ private void restartHeartbeatTask() { + " (lock-timeout=" + configManager.getLockTimeout() + "s)."); } + /** Start, stop, or reschedule periodic saves after a validated reload. */ + private void restartPeriodicSaveTask() { + if (periodicSaveTask != null) { + SchedulerUtil.cancel(periodicSaveTask); + periodicSaveTask = null; + } + if (!configManager.isPeriodicSave()) { + getLogger().info("Periodic save disabled."); + return; + } + + long intervalTicks = configManager.getPeriodicSaveIntervalSeconds() * 20L; + periodicSaveTask = SchedulerUtil.runGlobalTimer(this, () -> { + List players = new ArrayList<>(Bukkit.getOnlinePlayers()); + final int batchSize = configManager.getPeriodicSaveBatchSize(); + for (int i = 0; i < players.size(); i += batchSize) { + final int start = i; + final int end = Math.min(i + batchSize, players.size()); + long delayTicks = i / batchSize; + SchedulerUtil.runGlobalDelayed(this, () -> { + for (int j = start; j < end; j++) { + syncManager.savePlayerAsync(players.get(j)); + } + }, delayTicks); + } + }, intervalTicks, intervalTicks); + getLogger().info("Periodic save scheduled: every " + + configManager.getPeriodicSaveIntervalSeconds() + " seconds, batch=" + + configManager.getPeriodicSaveBatchSize() + "."); + } + @Override public void onEnable() { instance = this; @@ -128,15 +159,44 @@ public void onEnable() { databaseManager = new DatabaseManager(getLogger(), configManager); try { databaseManager.initialize(); - } catch (SQLException e) { + } catch (SQLException | RuntimeException e) { getLogger().log(Level.SEVERE, "Failed to initialize database! Check your config.yml.", e); + try { + databaseManager.close(); + } catch (RuntimeException cleanupError) { + e.addSuppressed(cleanupError); + getLogger().log(Level.WARNING, "Database cleanup also failed", cleanupError); + } + databaseManager = null; getServer().getPluginManager().disablePlugin(this); return; } // Initialize sync manager (creates thread pool + optional Redis) syncManager = new SyncManager(this, configManager, databaseManager); - syncManager.initialize(); + try { + syncManager.initialize(); + } catch (RuntimeException e) { + getLogger().log(Level.SEVERE, "Failed to initialize synchronization services — refusing to start.", e); + try { + syncManager.beginShutdown(); + syncManager.shutdown(); + } catch (RuntimeException cleanupError) { + e.addSuppressed(cleanupError); + getLogger().log(Level.WARNING, "Synchronization cleanup also failed", cleanupError); + } finally { + syncManager = null; + try { + databaseManager.close(); + } catch (RuntimeException cleanupError) { + e.addSuppressed(cleanupError); + getLogger().log(Level.WARNING, "Database cleanup also failed", cleanupError); + } + databaseManager = null; + } + getServer().getPluginManager().disablePlugin(this); + return; + } // Register plugin messaging channel for proxy handoff communication // This is optional — if no Velocity proxy with FastSync Proxy is installed, @@ -179,45 +239,14 @@ public void onEnable() { // Runs on async thread (DB I/O only, no Bukkit API calls). restartHeartbeatTask(); - // Start periodic save task (if enabled) - if (configManager.isPeriodicSave()) { - long intervalTicks = configManager.getPeriodicSaveIntervalSeconds() * 20L; - periodicSaveTask = SchedulerUtil.runGlobalTimer(this, () -> { - // Snapshot online players on the global region thread, then save them - // in small batches spread across successive ticks to avoid a lag spike - // when many players are online (process at most 10 players per tick). - // - // CRITICAL (Folia-safety): the batched dispatch MUST run on the - // global region (runGlobalDelayed), NOT on an async thread - // (runAsyncDelayed). savePlayerAsync(player) reads - // player.getUniqueId() and calls SchedulerUtil.runAtEntity(plugin, - // player, ...) — both touch the Player object. On Folia, async - // threads must not touch Player entities; the global region is - // the safe context for these reads. The actual DB write still - // happens on the async executor (dispatched from inside - // runAtEntity), so the global region is only used for the - // brief per-player dispatch, not for the DB wait. - List players = new ArrayList<>(Bukkit.getOnlinePlayers()); - final int batchSize = configManager.getPeriodicSaveBatchSize(); - for (int i = 0; i < players.size(); i += batchSize) { - final int start = i; - final int end = Math.min(i + batchSize, players.size()); - long delayTicks = i / batchSize; - SchedulerUtil.runGlobalDelayed(this, () -> { - for (int j = start; j < end; j++) { - syncManager.savePlayerAsync(players.get(j)); - } - }, delayTicks); - } - }, intervalTicks, intervalTicks); - getLogger().info("Periodic save enabled: every " + - configManager.getPeriodicSaveIntervalSeconds() + " seconds"); - } + // Start periodic save task (the same helper safely reschedules it on reload). + restartPeriodicSaveTask(); getLogger().info("FastSync v" + getPluginMeta().getVersion() + " enabled!"); getLogger().info("Server ID: " + configManager.getServerName()); getLogger().info("Serialization: Paper native ItemStack byte serialization"); - getLogger().info("Compression: " + (configManager.isCompressionEnabled() ? "LZ4" : "Disabled")); + getLogger().info("Compression: " + (configManager.isCompressionEnabled() + ? configManager.getCompressionType() : "Disabled")); getLogger().info("Redis: " + (configManager.isRedisEnabled() ? "Enabled" : "Disabled (DB polling)")); } @@ -276,13 +305,27 @@ public boolean onCommand(@NotNull CommandSender sender, @NotNull Command command switch (args[0].toLowerCase()) { case "reload" -> { - configManager.reload(); + ConfigManager.ReloadResult reloadResult; + try { + reloadResult = configManager.reloadSafely(); + } catch (RuntimeException e) { + getLogger().log(Level.WARNING, "Configuration reload rejected", e); + sendMessage(sender, RED + "[FastSync] Reload rejected: " + e.getMessage()); + return true; + } + if (!reloadResult.applied()) { + sendMessage(sender, YELLOW + "[FastSync] Reload not applied; restart required for: " + + String.join(", ", reloadResult.restartRequiredChanges())); + return true; + } // Refresh SyncManager caches that depend on config (e.g. snapshot trigger set) if (syncManager != null) { syncManager.refreshConfigCache(); } // Restart heartbeat task — interval may have changed restartHeartbeatTask(); + // Periodic-save enable/interval/batch are live-reloadable. + restartPeriodicSaveTask(); // Reset protection mode on reload — only if DB is healthy boolean resetOk = syncManager.resetProtectionMode(); if (resetOk) { @@ -293,7 +336,8 @@ public boolean onCommand(@NotNull CommandSender sender, @NotNull Command command sendMessage(sender, GREEN + "[FastSync] Configuration reloaded."); sendMessage(sender, GRAY + "Server: " + configManager.getServerName()); sendMessage(sender, GRAY + "Compression: " + - (configManager.isCompressionEnabled() ? "LZ4" : "Disabled")); + (configManager.isCompressionEnabled() + ? configManager.getCompressionType() : "Disabled")); sendMessage(sender, GRAY + "Redis: " + (configManager.isRedisEnabled() ? "Enabled" : "Disabled")); sendMessage(sender, GRAY + "Heartbeat: every " + @@ -302,11 +346,14 @@ public boolean onCommand(@NotNull CommandSender sender, @NotNull Command command case "status" -> sendStatus(sender); case "debug" -> { boolean newDebug = !configManager.isDebug(); - getConfig().set("debug", newDebug); - saveConfig(); - configManager.reload(); + // Runtime-only by design. Re-saving Bukkit's cached config here + // used to overwrite external edits and Sparrow migrations with + // a stale in-memory copy of the whole YAML document. + configManager.setDebug(newDebug); + syncManager.refreshConfigCache(); sendMessage(sender, GREEN + "[FastSync] Debug mode: " + (newDebug ? GREEN + "ON" : RED + "OFF")); + sendMessage(sender, GRAY + "Runtime only; edit config.yml to persist across restarts."); } case "saveall" -> { sendMessage(sender, YELLOW + "[FastSync] Saving all online players..."); diff --git a/src/main/java/com/fastsync/config/ConfigManager.java b/src/main/java/com/fastsync/config/ConfigManager.java index 07b35bd..810e835 100644 --- a/src/main/java/com/fastsync/config/ConfigManager.java +++ b/src/main/java/com/fastsync/config/ConfigManager.java @@ -14,7 +14,7 @@ * *

Configuration is loaded from {@code config.yml} using the sparrow-yaml * library (which preserves comments). Clean-slate: there is no Bukkit - * {@link FileConfiguration} fallback — a parse failure is a hard error and the + * {@link org.bukkit.configuration.file.FileConfiguration} fallback — a parse failure is a hard error and the * plugin refuses to start. The bundled sample values are additionally vetted by * {@link #validateProductionSafety()} before the database is opened.

*/ @@ -29,7 +29,6 @@ public class ConfigManager { private String serverName; // Database - private String dbType; private String dbHost; private int dbPort; private String dbDatabase; @@ -58,7 +57,6 @@ public class ConfigManager { private boolean redisSsl; private int redisTimeout; private String redisChannelPrefix; - private boolean redisCacheEnabled; // Sync private boolean syncInventory; @@ -73,7 +71,6 @@ public class ConfigManager { private int lockTimeout; private long lockRetryIntervalMs; private int lockMaxRetries; - private int saveDelay; private boolean clearBeforeApply; private String loadFailKickMessage; private String lockTimeoutKickMessage; @@ -138,12 +135,10 @@ public enum ApiMutationSafetyMode { STRICT, BALANCED, API_ONLY } // Production mode private boolean productionEnabled; private boolean productionRequireRedis; - private boolean productionRequireClusterId; // Final-save executor private int finalSaveThreads; private int finalSaveQueueCapacity; - private int finalSaveShutdownTimeoutSeconds; private boolean finalSaveAllowSyncFallback; // Final-save spool (WAL for queue-full events) @@ -252,9 +247,163 @@ public void load() { assignValues(new SparrowConfigSource(loaded)); } + /** + * Compatibility wrapper for callers that cannot consume a ReloadResult. + * Restart-only changes fail explicitly instead of being half-applied. + */ public void reload() { - // Re-read the config file (sparrow-yaml re-parses it inside load()) - load(); + ReloadResult result = reloadSafely(); + if (!result.applied()) { + throw new IllegalStateException("Restart required for configuration changes: " + + String.join(", ", result.restartRequiredChanges())); + } + } + + /** Result of a transactional live-reload attempt. */ + public record ReloadResult(boolean applied, java.util.List restartRequiredChanges) { + public ReloadResult { + restartRequiredChanges = java.util.List.copyOf(restartRequiredChanges); + } + } + + /** + * Parse and validate a candidate configuration without mutating the active + * ConfigManager. Settings whose owners are constructed only during startup + * (DB pool, Redis client, executors, spool, etc.) are rejected as a group so + * the running process can never enter a half-old/half-new configuration. + * + *

Previously {@link #reload()} assigned every field immediately. Changing + * {@code server-name}, for example, made heartbeats and saves use the new + * name while the DB lock and Redis consumer still belonged to the old name. + * That reliably led to lock loss. This method either applies the whole safe + * subset or leaves the active object untouched and asks for a restart.

+ */ + public ReloadResult reloadSafely() { + ConfigManager candidate = new ConfigManager(plugin); + try { + candidate.load(); + candidate.validateProductionSafety(); + } catch (RuntimeException e) { + // candidate.load() configures the process-wide compression codec. + // Restore the active settings before surfacing a parse/validation + // failure so a rejected reload has zero runtime side effects. + applyCompressionRuntimeSettings(); + throw e; + } + + java.util.List restartRequired = restartRequiredChanges(this, candidate); + if (!restartRequired.isEmpty()) { + applyCompressionRuntimeSettings(); + return new ReloadResult(false, restartRequired); + } + + copyLoadedStateFrom(candidate); + // Re-apply after the copy for clarity and to make this method robust if + // candidate loading ever stops configuring the global codec itself. + applyCompressionRuntimeSettings(); + return new ReloadResult(true, java.util.List.of()); + } + + static java.util.List restartRequiredChanges( + ConfigManager active, ConfigManager candidate) { + java.util.List changes = new java.util.ArrayList<>(); + if (!java.util.Objects.equals(active.serverName, candidate.serverName)) { + changes.add("server-name"); + } + if (!java.util.Objects.equals(active.clusterId, candidate.clusterId)) { + changes.add("cluster-id"); + } + if (!sameDatabaseRuntime(active, candidate)) { + changes.add("database.*"); + } + if (!sameRedisRuntime(active, candidate)) { + changes.add("redis connection/stream settings"); + } + if (active.poolSize != candidate.poolSize + || active.queueCapacity != candidate.queueCapacity + || active.maxConcurrentLoads != candidate.maxConcurrentLoads) { + changes.add("executor/login concurrency settings"); + } + if (active.dirtyTrackingEnabled != candidate.dirtyTrackingEnabled + || active.dirtyValidationInterval != candidate.dirtyValidationInterval) { + changes.add("sync.dirty-tracking enabled/validation-interval"); + } + if (active.snapshotEnabled != candidate.snapshotEnabled) { + changes.add("snapshot.enabled"); + } + if (active.finalSaveThreads != candidate.finalSaveThreads + || active.finalSaveQueueCapacity != candidate.finalSaveQueueCapacity + || active.finalSaveSpoolEnabled != candidate.finalSaveSpoolEnabled + || !java.util.Objects.equals(active.finalSaveSpoolDir, candidate.finalSaveSpoolDir) + || active.finalSaveSpoolMaxFiles != candidate.finalSaveSpoolMaxFiles + || active.finalSaveSpoolMaxBytes != candidate.finalSaveSpoolMaxBytes + || active.finalSaveSpoolFsync != candidate.finalSaveSpoolFsync + || active.finalSaveSpoolReplayOnStartup != candidate.finalSaveSpoolReplayOnStartup + || active.finalSaveSpoolReplayIntervalTicks != candidate.finalSaveSpoolReplayIntervalTicks + || active.finalSaveSpoolReplayBatchSize != candidate.finalSaveSpoolReplayBatchSize + || active.finalSaveSpoolRetainFailedDays != candidate.finalSaveSpoolRetainFailedDays) { + changes.add("final-save executor/spool settings"); + } + if (active.operationLogEnabled != candidate.operationLogEnabled) { + changes.add("operation-log.enabled"); + } + if (active.latencyTrackingEnabled != candidate.latencyTrackingEnabled + || active.latencyWindowSize != candidate.latencyWindowSize) { + changes.add("latency enabled/window-size"); + } + // Production mode changes startup failure policy. Applying it after a + // non-production startup could bless a failed Redis connection or spool + // initialization without rebuilding either subsystem. + if (active.productionEnabled != candidate.productionEnabled + || active.productionRequireRedis != candidate.productionRequireRedis) { + changes.add("production.*"); + } + return changes; + } + + private static boolean sameDatabaseRuntime(ConfigManager a, ConfigManager b) { + return java.util.Objects.equals(a.dbHost, b.dbHost) + && a.dbPort == b.dbPort + && java.util.Objects.equals(a.dbDatabase, b.dbDatabase) + && java.util.Objects.equals(a.dbUsername, b.dbUsername) + && java.util.Objects.equals(a.dbPassword, b.dbPassword) + && java.util.Objects.equals(a.tablePrefix, b.tablePrefix) + && a.connectionTimeout == b.connectionTimeout + && a.idleTimeout == b.idleTimeout + && a.maxLifetime == b.maxLifetime + && a.leakDetectionThreshold == b.leakDetectionThreshold + && java.util.Objects.equals(a.dbParameters, b.dbParameters) + && a.allowInsecureRemote == b.allowInsecureRemote; + } + + private static boolean sameRedisRuntime(ConfigManager a, ConfigManager b) { + return a.redisEnabled == b.redisEnabled + && java.util.Objects.equals(a.redisHost, b.redisHost) + && a.redisPort == b.redisPort + && java.util.Objects.equals(a.redisPassword, b.redisPassword) + && a.redisDatabase == b.redisDatabase + && a.redisSsl == b.redisSsl + && a.redisTimeout == b.redisTimeout + && java.util.Objects.equals(a.redisChannelPrefix, b.redisChannelPrefix) + && a.streamsEnabled == b.streamsEnabled + && a.redisStreamMaxLen == b.redisStreamMaxLen + && a.redisStreamTrimApprox == b.redisStreamTrimApprox; + } + + private void copyLoadedStateFrom(ConfigManager source) { + try { + for (java.lang.reflect.Field field : ConfigManager.class.getDeclaredFields()) { + int modifiers = field.getModifiers(); + if (java.lang.reflect.Modifier.isStatic(modifiers) + || java.lang.reflect.Modifier.isFinal(modifiers)) { + continue; + } + field.setAccessible(true); + field.set(this, field.get(source)); + } + } catch (IllegalAccessException e) { + throw new IllegalStateException("Failed to commit validated configuration", e); + } } /** @@ -314,22 +463,18 @@ public void validateProductionSafety() { } } - // 3) Multi-cluster DB isolation. v2 schema uses (cluster_id, uuid) PK, - // so different clusters sharing the same table-prefix is now safe - // at the DB level. But Redis pub/sub still needs distinct cluster-ids - // for namespace isolation. Warn if cluster-id is empty in a multi-server setup. + // 3) Cluster identity is part of every DB primary key and Redis namespace. + // A blank value is never a safe live configuration; assignValues() + // rejects it too, but keep this guard so programmatic/test callers + // cannot bypass the invariant. if (clusterId == null || clusterId.isBlank()) { - logger.warning("[Config] cluster-id is empty. Redis pub/sub will use the default namespace. " - + "If running multiple server clusters on the same Redis, set a distinct cluster-id per cluster."); + throw new RuntimeException( + "cluster-id must be explicitly set. All servers in the same FastSync " + + "cluster must use the same non-empty value."); } // 4) Production mode checks (round 14) if (productionEnabled) { - if (productionRequireClusterId && (clusterId == null || clusterId.isBlank())) { - throw new RuntimeException( - "production.require-cluster-id=true but cluster-id is empty. " - + "Set a non-empty cluster-id in config.yml."); - } if (productionRequireRedis && !redisEnabled) { throw new RuntimeException( "production.require-redis=true but redis.enabled=false. " @@ -356,7 +501,6 @@ public void validateProductionSafety() { + "rely on the spool + replay service."); } logger.info("[Config] Production mode enabled. Redis required=" + productionRequireRedis - + ", cluster-id required=" + productionRequireClusterId + ", final-save spool=" + finalSaveSpoolEnabled + ", final-save sync fallback allowed=" + finalSaveAllowSyncFallback); } @@ -391,9 +535,14 @@ private static boolean hasPlaintextSsl(String parameters) { private void assignValues(ConfigSource source) { // Server serverName = source.getString("server-name", "survival-1"); + if (serverName == null || !serverName.trim().matches("[A-Za-z0-9_.-]{1,64}")) { + throw new RuntimeException( + "Invalid server-name '" + serverName + "'. " + + "Allowed characters: A-Z a-z 0-9 _ . - , max 64 characters."); + } + serverName = serverName.trim(); // Database - dbType = source.getString("database.type", "mysql"); dbHost = source.getString("database.host", "localhost"); dbPort = source.getInt("database.port", 3306); dbDatabase = source.getString("database.database", "minecraft"); @@ -427,7 +576,6 @@ private void assignValues(ConfigSource source) { redisSsl = source.getBoolean("redis.ssl", false); redisTimeout = source.getInt("redis.timeout", 5000); redisChannelPrefix = source.getString("redis.channel-prefix", "fastsync:lock:"); - redisCacheEnabled = source.getBoolean("redis.cache-enabled", false); // Sync syncInventory = source.getBoolean("sync.sync-inventory", true); @@ -443,10 +591,10 @@ private void assignValues(ConfigSource source) { // Round 16 (P1 #5): tightened defaults. Previously 1000ms x 30 = 30s // worst-case pre-login block, which caused severe UX under login storms // and DB hiccups. New defaults: 300ms x 15 = 4.5s worst-case. The - // Velocity handoff path is faster and does not wait the full window. + // Redis release notifications usually make the backend gate return + // earlier without weakening this bounded fallback window. lockRetryIntervalMs = source.getLong("sync.lock-retry-interval-ms", 300); lockMaxRetries = source.getInt("sync.lock-max-retries", 15); - saveDelay = source.getInt("sync.save-delay", 0); clearBeforeApply = source.getBoolean("sync.clear-before-apply", true); loadFailKickMessage = source.getString("sync.load-fail-kick-message", "&c[FastSync] Failed to load your player data. Please try reconnecting."); @@ -596,12 +744,10 @@ private void assignValues(ConfigSource source) { // Production mode productionEnabled = source.getBoolean("production.enabled", false); productionRequireRedis = source.getBoolean("production.require-redis", true); - productionRequireClusterId = source.getBoolean("production.require-cluster-id", true); // Final-save executor (single source of truth for sync-fallback gate) finalSaveThreads = source.getInt("final-save.threads", 4); finalSaveQueueCapacity = source.getInt("final-save.queue-capacity", 1024); - finalSaveShutdownTimeoutSeconds = source.getInt("final-save.shutdown-timeout-seconds", 60); finalSaveAllowSyncFallback = source.getBoolean("final-save.allow-sync-fallback", false); // Final-save spool (WAL for queue-full events) @@ -614,18 +760,6 @@ private void assignValues(ConfigSource source) { finalSaveSpoolReplayIntervalTicks = source.getLong("final-save.spool.replay-interval-ticks", 100); finalSaveSpoolReplayBatchSize = source.getInt("final-save.spool.replay-batch-size", 64); finalSaveSpoolRetainFailedDays = source.getInt("final-save.spool.retain-failed-days", 7); - if (finalSaveThreads < 1) { - logger.warning("[Config] final-save.threads must be >= 1. Using 1."); - finalSaveThreads = 1; - } - if (finalSaveQueueCapacity < 1) { - logger.warning("[Config] final-save.queue-capacity must be >= 1. Using 1."); - finalSaveQueueCapacity = 1; - } - if (finalSaveShutdownTimeoutSeconds < 1) { - logger.warning("[Config] final-save.shutdown-timeout-seconds must be >= 1. Using 1."); - finalSaveShutdownTimeoutSeconds = 1; - } if (finalSaveSpoolDir == null || finalSaveSpoolDir.isBlank()) { finalSaveSpoolDir = "final-save-spool"; } @@ -694,21 +828,6 @@ private void assignValues(ConfigSource source) { serializationMaxWrappedBytes = source.getInt("serialization.max-wrapped-bytes", 5 * (1 << 19)); // 2.5 MiB if (serializationMaxRawBytes <= 0) serializationMaxRawBytes = 1 << 20; if (serializationMaxWrappedBytes <= 0) serializationMaxWrappedBytes = 5 * (1 << 19); - com.fastsync.serialization.CompressionUtil.configureLimits( - serializationMaxRawBytes, serializationMaxWrappedBytes); - - // Configure compression, including the disabled state on reload. - com.fastsync.serialization.CompressionUtil.setEnabled(compressionEnabled); - if (isZstdCompression()) { - com.fastsync.serialization.CompressionUtil.setAlgorithm( - com.fastsync.serialization.CompressionUtil.CompressionAlgorithm.ZSTD); - com.fastsync.serialization.CompressionUtil.setZstdLevel(zstdLevel); - } else { - com.fastsync.serialization.CompressionUtil.setAlgorithm( - com.fastsync.serialization.CompressionUtil.CompressionAlgorithm.LZ4); - } - com.fastsync.serialization.CompressionUtil.verifyConfiguredCodec(); - // Debug debug = source.getBoolean("debug", false); logTiming = source.getBoolean("log-timing", false); @@ -726,13 +845,142 @@ private void assignValues(ConfigSource source) { streamsEnabled = source.getBoolean("redis.streams-enabled", true); redisStreamMaxLen = source.getInt("redis.stream-maxlen", 100000); redisStreamTrimApprox = source.getBoolean("redis.stream-trim-approx", true); + + validateNumericRanges(); + applyCompressionRuntimeSettings(); + } + + /** + * Clamp numeric settings that feed schedulers, bounded queues and network + * clients. These checks deliberately live in one final pass: several + * defaults depend on values parsed earlier (for example login concurrency + * depends on the DB pool size), and validating only at individual read sites + * previously left zero/negative values able to reach runtime code. + * + *

In particular, {@code periodic-save-batch-size=0} made FastSync's + * batching loop advance by zero forever, while non-positive retry/period + * values caused {@code Thread.sleep} or the Paper/Folia schedulers to throw + * on live login/save paths.

+ */ + void validateNumericRanges() { + if (dbPort < 1 || dbPort > 65_535) { + logger.warning("[Config] database.port must be in 1-65535. Using 3306."); + dbPort = 3306; + } + // These values feed Hikari and ArrayBlockingQueue constructors during + // startup. Bound both ends so a typo cannot create thousands of DB + // connections or allocate a multi-gigabyte queue before startup fails. + poolSize = clampWithWarning("database.pool-size", poolSize, 3, 256, 10); + queueCapacity = clampWithWarning( + "database.queue-capacity", queueCapacity, 1, 1_000_000, 256); + finalSaveThreads = clampWithWarning( + "final-save.threads", finalSaveThreads, 2, 128, 4); + finalSaveQueueCapacity = clampWithWarning( + "final-save.queue-capacity", finalSaveQueueCapacity, 1, 1_000_000, 1_024); + if (connectionTimeout != 0 && connectionTimeout < 250) { + logger.warning("[Config] database.connection-timeout must be 0 or >= 250ms. Using 250ms."); + connectionTimeout = 250; + } + if (idleTimeout != 0 && idleTimeout < 10_000) { + logger.warning("[Config] database.idle-timeout must be 0 or >= 10000ms. Using 10000ms."); + idleTimeout = 10_000; + } + if (maxLifetime != 0 && maxLifetime < 30_000) { + logger.warning("[Config] database.max-lifetime must be 0 or >= 30000ms. Using 30000ms."); + maxLifetime = 30_000; + } + if (leakDetectionThreshold != 0 && leakDetectionThreshold < 2_000) { + logger.warning("[Config] database.leak-detection-threshold must be 0 or >= 2000ms. Using 2000ms."); + leakDetectionThreshold = 2_000; + } + + if (redisPort < 1 || redisPort > 65_535) { + logger.warning("[Config] redis.port must be in 1-65535. Using 6379."); + redisPort = 6379; + } + if (redisDatabase < 0) { + logger.warning("[Config] redis.database must be >= 0. Using 0."); + redisDatabase = 0; + } + if (redisTimeout < 100) { + logger.warning("[Config] redis.timeout must be >= 100ms. Using 100ms."); + redisTimeout = 100; + } + + if (lockRetryIntervalMs < 1) { + logger.warning("[Config] sync.lock-retry-interval-ms must be >= 1. Using 1."); + lockRetryIntervalMs = 1; + } + if (lockMaxRetries < 1) { + logger.warning("[Config] sync.lock-max-retries must be >= 1. Using 1."); + lockMaxRetries = 1; + } + if (periodicSaveIntervalSeconds < 1) { + logger.warning("[Config] sync.periodic-save-interval-seconds must be >= 1. Using 1."); + periodicSaveIntervalSeconds = 1; + } + if (periodicSaveBatchSize < 1) { + logger.warning("[Config] sync.periodic-save-batch-size must be >= 1. Using 1."); + periodicSaveBatchSize = 1; + } + int safeLoadLimit = Math.max(1, poolSize - 2); + if (maxConcurrentLoads > safeLoadLimit) { + logger.warning("[Config] sync.max-concurrent-loads (" + maxConcurrentLoads + + ") leaves no DB capacity for saves/heartbeats. Using " + safeLoadLimit + "."); + maxConcurrentLoads = safeLoadLimit; + } + + maxSnapshots = clampWithWarning( + "snapshot.max-snapshots", maxSnapshots, 1, 10_000, 16); + if (snapshotBackupFrequencyMs < 0) { + logger.warning("[Config] snapshot.backup-frequency-ms must be >= 0. Using 0."); + snapshotBackupFrequencyMs = 0; + } + latencyWindowSize = clampWithWarning( + "latency.window-size", latencyWindowSize, 16, 1_000_000, 1_000); + operationLogRetention = clampWithWarning( + "operation-log.retention", operationLogRetention, 1, 100_000, 100); + if (redisStreamMaxLen < 0) { + logger.warning("[Config] redis.stream-maxlen must be >= 0. Using 0 (no trimming)."); + redisStreamMaxLen = 0; + } + + // Bounds are themselves allocation limits. Cap them so a typo cannot + // turn a poisoned DB header into a multi-gigabyte allocation attempt. + serializationMaxRawBytes = clampWithWarning( + "serialization.max-raw-bytes", serializationMaxRawBytes, + 1_024, 64 * 1_024 * 1_024, 1 << 20); + serializationMaxWrappedBytes = clampWithWarning( + "serialization.max-wrapped-bytes", serializationMaxWrappedBytes, + 1_024, 64 * 1_024 * 1_024, 5 * (1 << 19)); + } + + private int clampWithWarning(String key, int value, int min, int max, int fallback) { + if (value >= min && value <= max) return value; + logger.warning("[Config] " + key + " must be in " + min + "-" + max + + ". Using " + fallback + "."); + return fallback; + } + + private void applyCompressionRuntimeSettings() { + com.fastsync.serialization.CompressionUtil.configureLimits( + serializationMaxRawBytes, serializationMaxWrappedBytes); + com.fastsync.serialization.CompressionUtil.setEnabled(compressionEnabled); + if (isZstdCompression()) { + com.fastsync.serialization.CompressionUtil.setAlgorithm( + com.fastsync.serialization.CompressionUtil.CompressionAlgorithm.ZSTD); + com.fastsync.serialization.CompressionUtil.setZstdLevel(zstdLevel); + } else { + com.fastsync.serialization.CompressionUtil.setAlgorithm( + com.fastsync.serialization.CompressionUtil.CompressionAlgorithm.LZ4); + } + com.fastsync.serialization.CompressionUtil.verifyConfiguredCodec(); } // ==================== Getters ==================== public String getServerName() { return serverName; } - public String getDbType() { return dbType; } public String getDbHost() { return dbHost; } public int getDbPort() { return dbPort; } public String getDbDatabase() { return dbDatabase; } @@ -755,7 +1003,6 @@ private void assignValues(ConfigSource source) { public boolean isRedisSsl() { return redisSsl; } public int getRedisTimeout() { return redisTimeout; } public String getRedisChannelPrefix() { return redisChannelPrefix; } - public boolean isRedisCacheEnabled() { return redisCacheEnabled; } public boolean isSyncInventory() { return syncInventory; } public boolean isSyncEnderChest() { return syncEnderChest; } @@ -769,7 +1016,6 @@ private void assignValues(ConfigSource source) { public int getLockTimeout() { return lockTimeout; } public long getLockRetryIntervalMs() { return lockRetryIntervalMs; } public int getLockMaxRetries() { return lockMaxRetries; } - public int getSaveDelay() { return saveDelay; } public boolean isClearBeforeApply() { return clearBeforeApply; } public String getLoadFailKickMessage() { return loadFailKickMessage; } public String getLockTimeoutKickMessage() { return lockTimeoutKickMessage; } @@ -827,12 +1073,10 @@ private void assignValues(ConfigSource source) { // Production mode public boolean isProductionEnabled() { return productionEnabled; } public boolean isProductionRequireRedis() { return productionRequireRedis; } - public boolean isProductionRequireClusterId() { return productionRequireClusterId; } // Final-save executor public int getFinalSaveThreads() { return finalSaveThreads; } public int getFinalSaveQueueCapacity() { return finalSaveQueueCapacity; } - public int getFinalSaveShutdownTimeoutSeconds() { return finalSaveShutdownTimeoutSeconds; } public boolean isFinalSaveAllowSyncFallback() { return finalSaveAllowSyncFallback; } // Final-save spool @@ -859,6 +1103,8 @@ private void assignValues(ConfigSource source) { public int getSerializationMaxWrappedBytes() { return serializationMaxWrappedBytes; } public boolean isDebug() { return debug; } + /** Toggle diagnostics for the current process without rewriting config.yml. */ + public void setDebug(boolean debug) { this.debug = debug; } public boolean isLogTiming() { return logTiming; } public String getLanguage() { return language; } diff --git a/src/main/java/com/fastsync/conflict/ConflictManager.java b/src/main/java/com/fastsync/conflict/ConflictManager.java index f0ef087..167028e 100644 --- a/src/main/java/com/fastsync/conflict/ConflictManager.java +++ b/src/main/java/com/fastsync/conflict/ConflictManager.java @@ -25,7 +25,9 @@ * case optimistic concurrency exists to prevent. Manual recovery of stale data * should go through the offline snapshot tool, not the live save path. * - * In all cases, the lock is released so other servers can proceed. + * Lock ownership is intentionally outside this class. The caller keeps or + * expires the lock according to save kind; a failed final save must never + * release it and expose stale DB state as current. */ public class ConflictManager { diff --git a/src/main/java/com/fastsync/log/FileOperationLogManager.java b/src/main/java/com/fastsync/log/FileOperationLogManager.java index 8397b28..198daf0 100644 --- a/src/main/java/com/fastsync/log/FileOperationLogManager.java +++ b/src/main/java/com/fastsync/log/FileOperationLogManager.java @@ -67,10 +67,9 @@ * * *

Thread-safety: the manager is safe to call concurrently from multiple - * threads. Appends for the same UUID are serialized by a per-UUID lock to - * ensure append ordering. {@link #queryHistory} is read-only and never blocks - * appends. {@link #prune} should not run concurrently with an in-flight - * {@link #append} for the same UUID. + * threads. Appends, reads and pruning for the same UUID are serialized by a + * per-UUID lock so readers never observe a partially-written record and prune + * cannot replace a file while an append is in flight. */ public class FileOperationLogManager { @@ -96,10 +95,11 @@ public class FileOperationLogManager { * *

This executor is single-threaded (log writes are sequential per * process; per-UUID locks handle ordering across threads) with a large - * bounded queue (4096) and {@code DiscardOldestPolicy} — under extreme + * bounded queue (4096) and a discard-oldest policy — under extreme * load we prefer to drop the oldest queued log entry rather than block - * the save/load path or throw. The SQL DB remains the source of truth; - * the operation log is an audit aid. + * the save/load path or throw. Dropped tasks explicitly complete their + * futures, avoiding callers waiting forever on work that left the queue. + * The SQL DB remains the source of truth; the operation log is an audit aid. */ private final ThreadPoolExecutor appendExecutor; @@ -140,33 +140,24 @@ private ThreadPoolExecutor createAppendExecutor() { factory, (task, executor) -> { if (closed || executor.isShutdown()) { - throw new java.util.concurrent.RejectedExecutionException( - "FileOperationLogManager append executor is shutting down"); + discardTask(task); + return; } Runnable dropped = executor.getQueue().poll(); if (dropped != null) { - long drops = droppedCount.incrementAndGet(); - lastDropTimestamp.set(System.currentTimeMillis()); - if (drops == 1 || drops % 100 == 0) { - logger.warning("[OpLog] Append queue full; dropped " + drops - + " queued log entr" + (drops == 1 ? "y" : "ies") - + " so far. This log is best-effort only."); - } + discardTask(dropped); } if (!executor.getQueue().offer(task)) { - throw new java.util.concurrent.RejectedExecutionException( - "FileOperationLogManager append queue remained full after dropping oldest entry"); + discardTask(task); } }); } - public void initialize() { - try { - Files.createDirectories(playerLogRoot); - } catch (IOException e) { - logger.log(Level.WARNING, - "[OpLog] Failed to create player-log directory: " + playerLogRoot, e); - } + public void initialize() throws IOException { + // Fail initialization instead of advertising an enabled audit log whose + // directory could not be created. OperationLogDelegate catches this and + // disables the optional subsystem cleanly. + Files.createDirectories(playerLogRoot); initialized = true; logger.info("[OpLog] File operation log enabled (dir=" + playerLogRoot + ", retention=" + retention + ", executor=dedicated single-thread bounded)."); @@ -189,24 +180,77 @@ private Object getLock(UUID uuid) { return appendLocks.computeIfAbsent(uuid, id -> new Object()); } + /** Runnable with an observable completion even when queue policy drops it. */ + private final class AppendTask implements Runnable { + private final OperationLog entry; + private final CompletableFuture completion = new CompletableFuture<>(); + + private AppendTask(OperationLog entry) { + this.entry = entry; + } + + @Override + public void run() { + try { + appendSync(entry); + } catch (Throwable t) { + logger.log(Level.WARNING, + "[OpLog] Failed to append operation log for " + entry.uuid(), t); + } finally { + completion.complete(null); + } + } + + private void completeAsDropped() { + completion.complete(null); + } + } + + private void discardTask(Runnable task) { + if (task instanceof FileOperationLogManager.AppendTask appendTask) { + appendTask.completeAsDropped(); + } + long drops = droppedCount.incrementAndGet(); + lastDropTimestamp.set(System.currentTimeMillis()); + if (drops == 1 || drops % 100 == 0) { + logger.warning("[OpLog] Append queue full; dropped " + drops + + " queued log entr" + (drops == 1 ? "y" : "ies") + + " so far. This log is best-effort only."); + } + } + + private void discardTasks(java.util.List tasks) { + if (tasks.isEmpty()) return; + for (Runnable task : tasks) { + if (task instanceof FileOperationLogManager.AppendTask appendTask) { + appendTask.completeAsDropped(); + } + } + droppedCount.addAndGet(tasks.size()); + lastDropTimestamp.set(System.currentTimeMillis()); + } + public CompletableFuture append(OperationLog entry) { if (!isEnabled()) { return CompletableFuture.completedFuture(null); } // Round 16 (P0 #2): use the dedicated bounded appendExecutor instead // of ForkJoinPool.commonPool(). This keeps log writes under FastSync's - // thread-governance and shutdown ordering. DiscardOldestPolicy ensures + // thread-governance and shutdown ordering. The discard-oldest policy ensures // this never blocks the save/load path — under extreme load the oldest // queued log entry is silently dropped (the DB remains the source of - // truth; this log is an audit aid only). - return CompletableFuture.runAsync(() -> { - try { - appendSync(entry); - } catch (Throwable t) { - logger.log(Level.WARNING, - "[OpLog] Failed to append operation log for " + entry.uuid(), t); - } - }, appendExecutor); + // truth; this log is an audit aid only). A custom task is used instead + // of CompletableFuture.runAsync so the rejection policy can complete a + // future whose queued task it discards. + AppendTask task = new AppendTask(entry); + try { + appendExecutor.execute(task); + } catch (java.util.concurrent.RejectedExecutionException e) { + // Defensive fallback for executor implementations/policies that may + // still throw during a close race. Logging remains best-effort. + discardTask(task); + } + return task.completion; } private void appendSync(OperationLog entry) throws IOException { @@ -236,23 +280,25 @@ public List queryHistory(UUID uuid, int limit) { if (!isEnabled() || limit <= 0 || uuid == null) { return List.of(); } - Path path = getLogPath(uuid); - if (!Files.exists(path)) { - return List.of(); - } + synchronized (getLock(uuid)) { + Path path = getLogPath(uuid); + if (!Files.exists(path)) { + return List.of(); + } - List all = readAll(path); - if (all.isEmpty()) { - return List.of(); - } + List all = readAll(path); + if (all.isEmpty()) { + return List.of(); + } - int from = Math.max(0, all.size() - limit); - List tail = all.subList(from, all.size()); - List result = new ArrayList<>(tail.size()); - for (int i = tail.size() - 1; i >= 0; i--) { - result.add(tail.get(i)); + int from = Math.max(0, all.size() - limit); + List tail = all.subList(from, all.size()); + List result = new ArrayList<>(tail.size()); + for (int i = tail.size() - 1; i >= 0; i--) { + result.add(tail.get(i)); + } + return result; } - return result; } public void prune(UUID uuid, int keepCount) { @@ -267,28 +313,25 @@ public void prune(UUID uuid, int keepCount) { } private void pruneSync(UUID uuid, int keepCount) throws IOException { - Path path = getLogPath(uuid); - if (!Files.exists(path)) { - return; - } + synchronized (getLock(uuid)) { + Path path = getLogPath(uuid); + if (!Files.exists(path)) { + return; + } - List all = readAll(path); - if (all.size() <= keepCount) { - return; - } + List all = readAll(path); + if (all.size() <= keepCount) { + return; + } - if (keepCount == 0) { - synchronized (getLock(uuid)) { + if (keepCount == 0) { Files.deleteIfExists(path); + return; } - return; - } - - List keep = new ArrayList<>( - all.subList(all.size() - keepCount, all.size())); - Path tmp = path.resolveSibling(uuid.toString() + ".tmp"); - synchronized (getLock(uuid)) { + List keep = new ArrayList<>( + all.subList(all.size() - keepCount, all.size())); + Path tmp = path.resolveSibling(uuid.toString() + ".tmp"); try (var out = new DataOutputStream(new BufferedOutputStream( Files.newOutputStream(tmp, StandardOpenOption.CREATE, @@ -298,15 +341,19 @@ private void pruneSync(UUID uuid, int keepCount) throws IOException { } out.flush(); } - // Atomic replace - Files.move(tmp, path, - java.nio.file.StandardCopyOption.REPLACE_EXISTING, - java.nio.file.StandardCopyOption.ATOMIC_MOVE); - } + try { + Files.move(tmp, path, + java.nio.file.StandardCopyOption.REPLACE_EXISTING, + java.nio.file.StandardCopyOption.ATOMIC_MOVE); + } catch (java.nio.file.AtomicMoveNotSupportedException e) { + Files.move(tmp, path, + java.nio.file.StandardCopyOption.REPLACE_EXISTING); + } - if (logger.isLoggable(Level.FINE)) { - logger.fine("[OpLog] Compacted " + uuid + ": kept " + keep.size() - + " of " + all.size() + " entries."); + if (logger.isLoggable(Level.FINE)) { + logger.fine("[OpLog] Compacted " + uuid + ": kept " + keep.size() + + " of " + all.size() + " entries."); + } } } @@ -395,7 +442,9 @@ public void close() { appendExecutor.shutdown(); try { if (!appendExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - int remaining = appendExecutor.shutdownNow().size(); + java.util.List abandoned = appendExecutor.shutdownNow(); + int remaining = abandoned.size(); + discardTasks(abandoned); if (remaining > 0) { logger.warning("[OpLog] Forced shutdown of append executor; " + remaining + " pending log entries were discarded."); @@ -405,7 +454,7 @@ public void close() { } } } catch (InterruptedException e) { - appendExecutor.shutdownNow(); + discardTasks(appendExecutor.shutdownNow()); Thread.currentThread().interrupt(); } diff --git a/src/main/java/com/fastsync/snapshot/SnapshotManager.java b/src/main/java/com/fastsync/snapshot/SnapshotManager.java index da8c76e..6a5d53e 100644 --- a/src/main/java/com/fastsync/snapshot/SnapshotManager.java +++ b/src/main/java/com/fastsync/snapshot/SnapshotManager.java @@ -47,6 +47,13 @@ public class SnapshotManager { private ThreadPoolExecutor snapshotExecutor; private final AtomicLong rejectedSnapshotTasks = new AtomicLong(); private java.util.concurrent.Semaphore dbWorkSemaphore; + /** Last accepted snapshot time per player (also acts as an in-flight reservation). */ + private final java.util.concurrent.ConcurrentHashMap snapshotReservations = + new java.util.concurrent.ConcurrentHashMap<>(); + /** Unique tokens prevent an older failure callback from releasing a newer reservation. */ + private final AtomicLong snapshotReservationSequence = new AtomicLong(); + + private record SnapshotReservation(long token, long createdAtNanos) {} /** Hard cap on the snapshot work queue. */ private static final int SNAPSHOT_QUEUE_CAPACITY = 4096; @@ -149,6 +156,7 @@ private CompletableFuture supplySnapshotAsync(Supplier supplier) { * and in-flight tasks to complete. Safe to call multiple times. */ public void close() { + snapshotReservations.clear(); if (snapshotExecutor == null) { return; } @@ -181,6 +189,45 @@ public long getRejectedCount() { return rejectedSnapshotTasks.get(); } + /** + * Reserve a snapshot slot if the configured minimum interval has elapsed. + * The atomic map update prevents concurrent save completions from creating + * duplicate snapshots for the same player. + * + * @return unique reservation token, or {@code -1} when rate-limited + */ + public long reserveSnapshot(UUID uuid, long minimumIntervalMs) { + long now = System.nanoTime(); + long interval = java.util.concurrent.TimeUnit.MILLISECONDS.toNanos( + Math.max(0L, minimumIntervalMs)); + long token = snapshotReservationSequence.incrementAndGet(); + java.util.concurrent.atomic.AtomicBoolean accepted = + new java.util.concurrent.atomic.AtomicBoolean(false); + snapshotReservations.compute(uuid, (ignored, previous) -> { + if (previous == null || interval == 0L || now - previous.createdAtNanos() >= interval) { + accepted.set(true); + return new SnapshotReservation(token, now); + } + return previous; + }); + return accepted.get() ? token : -1L; + } + + /** Release a reservation after asynchronous snapshot creation fails. */ + public void releaseSnapshotReservation(UUID uuid, long reservationToken) { + snapshotReservations.computeIfPresent(uuid, (ignored, current) -> + current.token() == reservationToken ? null : current); + } + + /** Bound the in-memory rate-limit map by monotonic reservation age. */ + public void cleanupSnapshotReservations(long maximumAgeMs) { + long now = System.nanoTime(); + long maximumAge = java.util.concurrent.TimeUnit.MILLISECONDS.toNanos( + Math.max(0L, maximumAgeMs)); + snapshotReservations.entrySet().removeIf( + entry -> now - entry.getValue().createdAtNanos() >= maximumAge); + } + /** * Create the snapshots table if it does not exist. * diff --git a/src/main/java/com/fastsync/sync/SyncManager.java b/src/main/java/com/fastsync/sync/SyncManager.java index 8cd3744..99bcf92 100644 --- a/src/main/java/com/fastsync/sync/SyncManager.java +++ b/src/main/java/com/fastsync/sync/SyncManager.java @@ -342,11 +342,9 @@ public void initialize() { // Round 16 (P0 #3): dedicated final-save executor. 2 threads so a // stuck save on one thread does not head-of-line block the next QUIT - // save; queue is 4x the main queue so QUIT saves rarely fall back to - // synchronous execution on the event thread. - // Round 14: final-save executor uses configurable threads + queue capacity. - int finalThreads = Math.max(2, config.getFinalSaveThreads()); - int finalQueue = Math.max(1024, config.getFinalSaveQueueCapacity()); + // save. Both its thread count and bounded queue are operator-configurable. + int finalThreads = config.getFinalSaveThreads(); + int finalQueue = config.getFinalSaveQueueCapacity(); finalSaveExecutor = new AsyncExecutor(logger, "FastSync-FinalSave", finalThreads, finalQueue); // Login backpressure semaphore — limits concurrent pre-login loads @@ -366,6 +364,13 @@ public void initialize() { // Initialize snapshot system if enabled if (config.isSnapshotEnabled()) { snapshotManager = new SnapshotManager(logger, config); + // Login loads can consume at most poolSize-2 connections. Bound + // snapshot work to the remaining non-critical budget while always + // leaving at least one connection for final saves/heartbeats. + int snapshotDbPermits = Math.max(1, Math.min(2, + config.getPoolSize() - config.getMaxConcurrentLoads() - 1)); + snapshotManager.setDbWorkSemaphore( + new java.util.concurrent.Semaphore(snapshotDbPermits, true)); snapshotManager.initialize(databaseManager); logger.info("Snapshot/backup system enabled (max " + config.getMaxSnapshots() + " per player)."); } @@ -379,6 +384,7 @@ public void initialize() { config.getServerName(), config.getClusterId(), config.getRedisChannelPrefix(), config.isRedisSsl(), config.getRedisTimeout(), config.isStreamsEnabled(), config.getRedisStreamMaxLen(), config.isRedisStreamTrimApprox()); + redissonManager.setDebug(config.isDebug()); // Register listener BEFORE initialize() — initialize() starts the // consumer loop and recovers pending entries, which can dispatch // stream events immediately. If the listener isn't registered yet, @@ -460,6 +466,9 @@ public void refreshConfigCache() { // attributes / advancements are picked up on the next collect. cachedAttributes = null; cachedAdvancements = null; + if (redissonManager != null) { + redissonManager.setDebug(config.isDebug()); + } } /** @@ -3119,6 +3128,12 @@ public void cleanupStaleEntries() { && !pendingEmptyData.contains(uuid) && !pendingBypass.contains(uuid); }); + + if (snapshotManager != null) { + long retention = Math.max(5 * 60 * 1000L, + config.getSnapshotBackupFrequencyMs()); + snapshotManager.cleanupSnapshotReservations(retention); + } } // ==================== Shutdown ==================== @@ -4055,15 +4070,36 @@ private SaveResult persistCollectedData(UUID uuid, PlayerData data, SaveKind kin } } - if (snapshotManager != null && shouldCreateSnapshot(data.getSaveCause())) { - try { - snapshotManager.createSnapshot(uuid, compressed, data.getSaveCause()) - .thenRun(() -> snapshotManager.pruneSnapshots(uuid, config.getMaxSnapshots())); - } catch (Exception snapshotEx) { - // Post-commit side effect failure must NOT turn a - // successful DB save into a retry/failure. The DB - // commit is authoritative — the snapshot is best-effort. - logger.log(Level.WARNING, kind + " save succeeded but snapshot creation failed for " + uuid, snapshotEx); + if (snapshotManager != null && shouldTriggerSnapshot(data.getSaveCause())) { + long reservation = snapshotManager.reserveSnapshot( + uuid, config.getSnapshotBackupFrequencyMs()); + if (reservation >= 0) { + try { + snapshotManager.createSnapshot(uuid, compressed, data.getSaveCause()) + .whenComplete((snapshotId, snapshotError) -> { + if (snapshotError == null) { + snapshotManager.pruneSnapshots(uuid, config.getMaxSnapshots()) + .whenComplete((ignored, pruneError) -> { + if (pruneError != null) { + logger.log(Level.WARNING, kind + + " save and snapshot succeeded but pruning failed for " + + uuid, pruneError); + } + }); + } else { + snapshotManager.releaseSnapshotReservation(uuid, reservation); + logger.log(Level.WARNING, kind + + " save succeeded but snapshot creation failed for " + + uuid, snapshotError); + } + }); + } catch (Exception snapshotEx) { + snapshotManager.releaseSnapshotReservation(uuid, reservation); + // Post-commit side effect failure must NOT turn a + // successful DB save into a retry/failure. The DB + // commit is authoritative — the snapshot is best-effort. + logger.log(Level.WARNING, kind + " save succeeded but snapshot creation failed for " + uuid, snapshotEx); + } } } @@ -4145,7 +4181,7 @@ private SaveResult persistCollectedData(UUID uuid, PlayerData data, SaveKind kin *

The trigger string is parsed into a {@link Set} once at * {@link #initialize()}/{@link #reload()} time, so this method is O(1). */ - private boolean shouldCreateSnapshot(String saveCause) { + private boolean shouldTriggerSnapshot(String saveCause) { java.util.Set triggers = snapshotTriggerSet; if (triggers.isEmpty()) return false; if (triggers.contains("always")) return true; diff --git a/src/main/resources/config.yml b/src/main/resources/config.yml index c7f5e0b..1765691 100644 --- a/src/main/resources/config.yml +++ b/src/main/resources/config.yml @@ -28,11 +28,9 @@ cluster-id: "survival-main" # ------------------------------------------------------------ # When enabled, enforces strict safety checks: # - require-redis: Redis must be enabled and healthy (no DB polling fallback) -# - require-cluster-id: cluster-id must be non-empty production: enabled: false require-redis: true - require-cluster-id: true # ------------------------------------------------------------ # Final-Save Executor @@ -43,7 +41,6 @@ production: final-save: threads: 4 queue-capacity: 1024 - shutdown-timeout-seconds: 60 # false: queue full → spool to disk for replay (safe, no tick blocking) # true: queue full → synchronous fallback on event thread (blocks tick, risky) allow-sync-fallback: false @@ -137,9 +134,6 @@ redis: # Channel prefix for pub/sub messages channel-prefix: "fastsync:lock:" - # Whether to use Redis for data caching (reduces DB load for frequent reads) - cache-enabled: false - # Redis Streams for critical, recoverable event delivery. # Unlike Pub/Sub (fire-and-forget), Streams persist events so they survive # consumer crashes and can be recovered via XAUTOCLAIM. @@ -186,13 +180,11 @@ sync: # Round 16 (P1 #5): defaults tightened from 1000ms x 30 (30s worst-case # login wait) to 300ms x 15 (4.5s worst-case). A 30s pre-login block on # every retry slot causes severe UX under login storms and DB hiccups. - # Velocity handoff path is faster and does not wait the full window. + # Redis release notifications usually wake the backend gate before the + # bounded DB-polling fallback window is exhausted. lock-retry-interval-ms: 300 lock-max-retries: 15 - # Save delay on quit in ticks (0 = immediate, 20 = 1 second delay) - save-delay: 0 - # Whether to clear inventory before applying synced data (prevents item duplication) clear-before-apply: true @@ -359,7 +351,8 @@ snapshot: # Maximum number of snapshots to keep per player (oldest non-pinned are deleted) max-snapshots: 16 - # Minimum time between snapshot creation in milliseconds (default: 4 hours) + # Per-player minimum interval between snapshots whose save-trigger matched + # (default: 4 hours). Set to 0 to disable rate limiting. backup-frequency-ms: 14400000 # When to create a snapshot on a successful save. diff --git a/src/test/java/com/fastsync/config/ConfigManagerNumericValidationTest.java b/src/test/java/com/fastsync/config/ConfigManagerNumericValidationTest.java new file mode 100644 index 0000000..938cb68 --- /dev/null +++ b/src/test/java/com/fastsync/config/ConfigManagerNumericValidationTest.java @@ -0,0 +1,75 @@ +package com.fastsync.config; + +import com.fastsync.testutil.TestConfigBuilder; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class ConfigManagerNumericValidationTest { + + @Test + void clampsSchedulerAndRetryValuesThatWouldFailAtRuntime() throws Exception { + ConfigManager config = new TestConfigBuilder().defaults().build(); + set(config, "poolSize", 4); + set(config, "lockRetryIntervalMs", 0L); + set(config, "lockMaxRetries", 0); + set(config, "periodicSaveIntervalSeconds", 0); + set(config, "periodicSaveBatchSize", 0); + set(config, "maxConcurrentLoads", 99); + + config.validateNumericRanges(); + + assertEquals(1L, config.getLockRetryIntervalMs()); + assertEquals(1, config.getLockMaxRetries()); + assertEquals(1, config.getPeriodicSaveIntervalSeconds()); + assertEquals(1, config.getPeriodicSaveBatchSize()); + assertEquals(2, config.getMaxConcurrentLoads(), + "two DB connections must remain available for save/heartbeat work"); + } + + @Test + void clampsConnectionAndAllocationLimits() throws Exception { + ConfigManager config = new TestConfigBuilder().defaults().build(); + set(config, "dbPort", 70_000); + set(config, "redisPort", 0); + set(config, "poolSize", Integer.MAX_VALUE); + set(config, "queueCapacity", Integer.MAX_VALUE); + set(config, "finalSaveThreads", Integer.MAX_VALUE); + set(config, "finalSaveQueueCapacity", Integer.MAX_VALUE); + set(config, "connectionTimeout", 1L); + set(config, "idleTimeout", 1L); + set(config, "maxLifetime", 1L); + set(config, "leakDetectionThreshold", 1L); + set(config, "redisTimeout", 1); + set(config, "serializationMaxRawBytes", Integer.MAX_VALUE); + set(config, "serializationMaxWrappedBytes", Integer.MAX_VALUE); + set(config, "latencyWindowSize", Integer.MAX_VALUE); + set(config, "operationLogRetention", -1); + + config.validateNumericRanges(); + + assertEquals(3306, config.getDbPort()); + assertEquals(6379, config.getRedisPort()); + assertEquals(10, config.getPoolSize()); + assertEquals(256, config.getQueueCapacity()); + assertEquals(4, config.getFinalSaveThreads()); + assertEquals(1_024, config.getFinalSaveQueueCapacity()); + assertEquals(250L, config.getConnectionTimeout()); + assertEquals(10_000L, config.getIdleTimeout()); + assertEquals(30_000L, config.getMaxLifetime()); + assertEquals(2_000L, config.getLeakDetectionThreshold()); + assertEquals(100, config.getRedisTimeout()); + assertEquals(1 << 20, config.getSerializationMaxRawBytes()); + assertEquals(5 * (1 << 19), config.getSerializationMaxWrappedBytes()); + assertEquals(1_000, config.getLatencyWindowSize()); + assertEquals(100, config.getOperationLogRetention()); + } + + private static void set(ConfigManager config, String fieldName, Object value) throws Exception { + Field field = ConfigManager.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(config, value); + } +} diff --git a/src/test/java/com/fastsync/config/ConfigManagerProductionSafetyTest.java b/src/test/java/com/fastsync/config/ConfigManagerProductionSafetyTest.java index 731a633..e63a29a 100644 --- a/src/test/java/com/fastsync/config/ConfigManagerProductionSafetyTest.java +++ b/src/test/java/com/fastsync/config/ConfigManagerProductionSafetyTest.java @@ -99,17 +99,9 @@ void allowsNonLocalhostWithSslModeRequired() throws Exception { // ==================== Round 16: cluster-id DB isolation ==================== - /** - * Round 16 (P0 #1): a non-empty cluster-id paired with the default - * table-prefix must refuse startup. cluster-id only isolates Redis - * messaging, NOT database rows — two clusters sharing the same MySQL - * database + table-prefix would silently overwrite each other's data. - */ + /** The v2 composite primary key isolates clusters even with the default prefix. */ @Test - void rejectsClusterIdWithDefaultTablePrefix() throws Exception { - // v2 schema has (cluster_id, uuid) PK, so cluster-id + default - // table-prefix is now safe — different clusters are isolated at - // the DB row level. This test is no longer applicable. + void allowsClusterIdWithDefaultTablePrefix() throws Exception { ConfigManager config = base(); set(config, "clusterId", "production"); set(config, "tablePrefix", "fastsync_"); @@ -117,10 +109,7 @@ void rejectsClusterIdWithDefaultTablePrefix() throws Exception { "v2 schema isolates by cluster_id in PK, so default table-prefix is safe"); } - /** - * Round 16 (P0 #1): a non-empty cluster-id with a DISTINCT table-prefix - * is allowed — the operator has explicitly isolated DB rows per cluster. - */ + /** A distinct table prefix remains a valid optional isolation layer. */ @Test void allowsClusterIdWithDistinctTablePrefix() throws Exception { ConfigManager config = base(); @@ -130,30 +119,24 @@ void allowsClusterIdWithDistinctTablePrefix() throws Exception { "cluster-id + distinct table-prefix is allowed"); } - /** - * Round 16 (P0 #1): empty cluster-id (single-cluster deploy) with the - * default table-prefix is allowed — no multi-cluster collision risk. - */ + /** Cluster identity is mandatory even for a single logical cluster. */ @Test - void allowsEmptyClusterIdWithDefaultTablePrefix() throws Exception { + void rejectsEmptyClusterId() throws Exception { ConfigManager config = base(); set(config, "clusterId", ""); // single cluster set(config, "tablePrefix", "fastsync_"); // default is fine for single cluster - assertDoesNotThrow(config::validateProductionSafety, - "empty cluster-id + default table-prefix is allowed for single-cluster deploys"); + assertThrows(RuntimeException.class, config::validateProductionSafety, + "cluster-id is part of every DB key and must always be explicit"); } - /** - * Round 16 (P0 #1): blank cluster-id (whitespace only) is treated as - * empty — must NOT trigger the rejection. - */ + /** Whitespace-only identity is equivalent to missing identity. */ @Test - void allowsBlankClusterIdWithDefaultTablePrefix() throws Exception { + void rejectsBlankClusterId() throws Exception { ConfigManager config = base(); set(config, "clusterId", " "); set(config, "tablePrefix", "fastsync_"); - assertDoesNotThrow(config::validateProductionSafety, - "blank cluster-id must be treated as empty (no multi-cluster intent)"); + assertThrows(RuntimeException.class, config::validateProductionSafety, + "whitespace-only cluster-id must be rejected"); } // ==================== Production spool requirement ==================== diff --git a/src/test/java/com/fastsync/config/ConfigManagerReloadSafetyTest.java b/src/test/java/com/fastsync/config/ConfigManagerReloadSafetyTest.java new file mode 100644 index 0000000..2aa9186 --- /dev/null +++ b/src/test/java/com/fastsync/config/ConfigManagerReloadSafetyTest.java @@ -0,0 +1,62 @@ +package com.fastsync.config; + +import com.fastsync.testutil.TestConfigBuilder; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ConfigManagerReloadSafetyTest { + + @Test + void detectsIdentityAndConnectionChangesThatNeedRestart() throws Exception { + ConfigManager active = config(); + ConfigManager candidate = config(); + set(candidate, "serverName", "other-server"); + set(candidate, "clusterId", "other-cluster"); + set(candidate, "dbHost", "db.internal"); + set(candidate, "redisEnabled", true); + set(candidate, "productionEnabled", true); + + List changes = ConfigManager.restartRequiredChanges(active, candidate); + + assertTrue(changes.contains("server-name")); + assertTrue(changes.contains("cluster-id")); + assertTrue(changes.contains("database.*")); + assertTrue(changes.contains("redis connection/stream settings")); + assertTrue(changes.contains("production.*")); + } + + @Test + void permitsSettingsWhoseConsumersRefreshLive() throws Exception { + ConfigManager active = config(); + ConfigManager candidate = config(); + set(candidate, "syncInventory", false); + set(candidate, "periodicSave", true); + set(candidate, "periodicSaveIntervalSeconds", 60); + set(candidate, "periodicSaveBatchSize", 5); + set(candidate, "heartbeatIntervalSeconds", 3); + set(candidate, "compressionMinSize", 512); + set(candidate, "operationLogRetention", 250); + set(candidate, "debug", true); + + assertEquals(List.of(), ConfigManager.restartRequiredChanges(active, candidate)); + } + + private static ConfigManager config() throws Exception { + ConfigManager config = new TestConfigBuilder().defaults().build(); + // Structural fields not populated by TestConfigBuilder must match a + // realistic initialized instance on both sides. + set(config, "finalSaveSpoolDir", "final-save-spool"); + return config; + } + + private static void set(ConfigManager config, String fieldName, Object value) throws Exception { + Field field = ConfigManager.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(config, value); + } +} diff --git a/src/test/java/com/fastsync/log/FileOperationLogManagerExecutorTest.java b/src/test/java/com/fastsync/log/FileOperationLogManagerExecutorTest.java index a6d1962..32ae385 100644 --- a/src/test/java/com/fastsync/log/FileOperationLogManagerExecutorTest.java +++ b/src/test/java/com/fastsync/log/FileOperationLogManagerExecutorTest.java @@ -5,6 +5,7 @@ import org.junit.jupiter.api.io.TempDir; import java.nio.file.Path; +import java.nio.file.Files; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -90,6 +91,60 @@ void appendRunsOnDedicatedOpLogThreadNotCommonPool() throws Exception { "append() must NOT run on ForkJoinPool.commonPool, but ran on: " + threadName); } + @Test + void failedDirectoryCreationDoesNotAdvertiseEnabledAuditLog() throws Exception { + Path regularFile = tempDir.resolve("not-a-directory"); + Files.writeString(regularFile, "x"); + manager = new FileOperationLogManager(regularFile, 100); + + assertThrows(java.io.IOException.class, manager::initialize); + assertFalse(manager.isEnabled()); + } + + @Test + void discardedAppendCompletesItsFuture() throws Exception { + manager = new FileOperationLogManager(tempDir, 100); + manager.initialize(); + + java.lang.reflect.Field field = + FileOperationLogManager.class.getDeclaredField("appendExecutor"); + field.setAccessible(true); + java.util.concurrent.ThreadPoolExecutor executor = + (java.util.concurrent.ThreadPoolExecutor) field.get(manager); + + CountDownLatch blockerStarted = new CountDownLatch(1); + CountDownLatch releaseBlocker = new CountDownLatch(1); + executor.execute(() -> { + blockerStarted.countDown(); + try { + releaseBlocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + assertTrue(blockerStarted.await(5, TimeUnit.SECONDS)); + + OperationLog entry = new OperationLog( + 1L, java.util.UUID.randomUUID(), OperationType.SAVE, "test-server", + 1L, 1L, 100, "queue-saturation", System.currentTimeMillis()); + + try { + java.util.concurrent.CompletableFuture first = + manager.append(entry).toCompletableFuture(); + for (int i = 1; i <= manager.getQueueCapacity(); i++) { + manager.append(entry); + } + + assertTrue(first.isDone(), + "discard-oldest must complete the future for the evicted append"); + assertEquals(1L, manager.getDroppedCount()); + } finally { + // Avoid turning this queue-policy test into thousands of file writes. + executor.getQueue().clear(); + releaseBlocker.countDown(); + } + } + /** * Round 16 (P0 #2): close() must wait for in-flight appends to finish. * We submit an append whose appendSync sleeps briefly, then call close() diff --git a/src/test/java/com/fastsync/snapshot/SnapshotManagerRateLimitTest.java b/src/test/java/com/fastsync/snapshot/SnapshotManagerRateLimitTest.java new file mode 100644 index 0000000..0ca601c --- /dev/null +++ b/src/test/java/com/fastsync/snapshot/SnapshotManagerRateLimitTest.java @@ -0,0 +1,64 @@ +package com.fastsync.snapshot; + +import com.fastsync.config.ConfigManager; +import org.junit.jupiter.api.Test; + +import java.util.UUID; +import java.util.logging.Logger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class SnapshotManagerRateLimitTest { + + @Test + void reservesOnceWithinConfiguredIntervalAndCanReleaseFailure() { + SnapshotManager manager = new SnapshotManager( + Logger.getLogger("SnapshotManagerRateLimitTest"), new ConfigManager(true)); + UUID uuid = UUID.randomUUID(); + + long first = manager.reserveSnapshot(uuid, 60_000L); + assertTrue(first > 0); + assertEquals(-1L, manager.reserveSnapshot(uuid, 60_000L)); + + manager.releaseSnapshotReservation(uuid, first); + assertTrue(manager.reserveSnapshot(uuid, 60_000L) > 0, + "a failed async snapshot must not consume the whole backup interval"); + } + + @Test + void zeroIntervalAllowsEveryTriggeredSnapshot() { + SnapshotManager manager = new SnapshotManager( + Logger.getLogger("SnapshotManagerRateLimitTest"), new ConfigManager(true)); + UUID uuid = UUID.randomUUID(); + + assertTrue(manager.reserveSnapshot(uuid, 0L) > 0); + assertTrue(manager.reserveSnapshot(uuid, 0L) > 0); + } + + @Test + void staleFailureCannotReleaseNewerReservation() { + SnapshotManager manager = new SnapshotManager( + Logger.getLogger("SnapshotManagerRateLimitTest"), new ConfigManager(true)); + UUID uuid = UUID.randomUUID(); + + long staleToken = manager.reserveSnapshot(uuid, 0L); + long currentToken = manager.reserveSnapshot(uuid, 0L); + assertTrue(currentToken > staleToken); + + manager.releaseSnapshotReservation(uuid, staleToken); + assertEquals(-1L, manager.reserveSnapshot(uuid, 60_000L), + "an older failure callback must not remove the current reservation"); + } + + @Test + void freshReservationSurvivesAgeBasedCleanup() { + SnapshotManager manager = new SnapshotManager( + Logger.getLogger("SnapshotManagerRateLimitTest"), new ConfigManager(true)); + UUID uuid = UUID.randomUUID(); + + assertTrue(manager.reserveSnapshot(uuid, 60_000L) > 0); + manager.cleanupSnapshotReservations(60_000L); + assertEquals(-1L, manager.reserveSnapshot(uuid, 60_000L)); + } +} diff --git a/src/test/java/com/fastsync/testutil/TestConfigBuilder.java b/src/test/java/com/fastsync/testutil/TestConfigBuilder.java index 617bc5a..30afbf2 100644 --- a/src/test/java/com/fastsync/testutil/TestConfigBuilder.java +++ b/src/test/java/com/fastsync/testutil/TestConfigBuilder.java @@ -39,7 +39,7 @@ public TestConfigBuilder defaults() throws ReflectiveOperationException { set("redisPassword", ""); set("redisDatabase", 0); set("redisSsl", false); - set("dbType", "mysql"); + set("redisTimeout", 5000); set("dbHost", "localhost"); set("dbPort", 3306); set("dbDatabase", "fastsync"); @@ -47,6 +47,7 @@ public TestConfigBuilder defaults() throws ReflectiveOperationException { set("dbPassword", "root"); set("dbParameters", ""); set("poolSize", 10); + set("queueCapacity", 256); set("connectionTimeout", 30000L); set("idleTimeout", 600000L); set("maxLifetime", 1800000L); @@ -54,7 +55,16 @@ public TestConfigBuilder defaults() throws ReflectiveOperationException { set("saveOnDeath", false); set("saveOnWorldSave", false); set("periodicSave", false); + set("periodicSaveIntervalSeconds", 300); + set("periodicSaveBatchSize", 10); + set("maxConcurrentLoads", 6); + set("finalSaveThreads", 4); + set("finalSaveQueueCapacity", 1024); set("snapshotEnabled", false); + set("snapshotBackupFrequencyMs", 14_400_000L); + set("latencyWindowSize", 1_000); + set("serializationMaxRawBytes", 1 << 20); + set("serializationMaxWrappedBytes", 5 * (1 << 19)); return this; } diff --git a/src/test/java/com/fastsync/velocity/HandoffProtocolTest.java b/src/test/java/com/fastsync/velocity/HandoffProtocolTest.java new file mode 100644 index 0000000..eb449e6 --- /dev/null +++ b/src/test/java/com/fastsync/velocity/HandoffProtocolTest.java @@ -0,0 +1,60 @@ +package com.fastsync.velocity; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.util.Arrays; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class HandoffProtocolTest { + + @Test + void roundTripsEveryMessageWithPayload() { + UUID uuid = UUID.randomUUID(); + + assertEquals(new HandoffProtocol.QueryLockData(uuid, "target"), + HandoffProtocol.decodeQueryLock(HandoffProtocol.encodeQueryLock(uuid, "target"))); + assertEquals(new HandoffProtocol.LockStatusData(uuid, true, "source"), + HandoffProtocol.decodeLockStatus(HandoffProtocol.encodeLockStatus(uuid, true, "source"))); + assertEquals(new HandoffProtocol.HandoffNotifyData(uuid, "source", "target"), + HandoffProtocol.decodeHandoffNotify( + HandoffProtocol.encodeHandoffNotify(uuid, "source", "target"))); + assertEquals(new HandoffProtocol.StatusResponseData("source", true, false, 5, 2, 1), + HandoffProtocol.decodeStatusResponse( + HandoffProtocol.encodeStatusResponse("source", true, false, 5, 2, 1))); + } + + @Test + void rejectsWrongTypeAndTrailingBytes() { + UUID uuid = UUID.randomUUID(); + byte[] lockStatus = HandoffProtocol.encodeLockStatus(uuid, false, "source"); + assertThrows(IllegalArgumentException.class, + () -> HandoffProtocol.decodeQueryLock(lockStatus)); + + byte[] valid = HandoffProtocol.encodeQueryLock(uuid, "target"); + byte[] withTrailingByte = Arrays.copyOf(valid, valid.length + 1); + assertThrows(IllegalArgumentException.class, + () -> HandoffProtocol.decodeQueryLock(withTrailingByte)); + } + + @Test + void rejectsNegativeStatusCounters() throws Exception { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(bytes)) { + out.writeByte(HandoffProtocol.STATUS_RESPONSE); + out.writeUTF("source"); + out.writeBoolean(true); + out.writeBoolean(true); + out.writeInt(-1); + out.writeInt(0); + out.writeInt(0); + } + + assertThrows(IllegalArgumentException.class, + () -> HandoffProtocol.decodeStatusResponse(bytes.toByteArray())); + } +} diff --git a/src/test/java/com/fastsync/velocity/ProxyConfigValidationTest.java b/src/test/java/com/fastsync/velocity/ProxyConfigValidationTest.java new file mode 100644 index 0000000..90d2a17 --- /dev/null +++ b/src/test/java/com/fastsync/velocity/ProxyConfigValidationTest.java @@ -0,0 +1,29 @@ +package com.fastsync.velocity; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; + +import java.lang.reflect.Field; +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; + +class ProxyConfigValidationTest { + + @Test + void clampsInvalidStatusQueryTimeout() throws Exception { + ProxyConfig config = new ProxyConfig(Path.of("."), mock(Logger.class)); + set(config, "statusQueryTimeoutMs", Long.MAX_VALUE); + + config.validateRanges(); + + assertEquals(60_000L, config.getStatusQueryTimeoutMs()); + } + + private static void set(ProxyConfig config, String fieldName, Object value) throws Exception { + Field field = ProxyConfig.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(config, value); + } +} diff --git a/src/velocity/java/com/fastsync/velocity/FastSyncProxy.java b/src/velocity/java/com/fastsync/velocity/FastSyncProxy.java index 55862c4..bc8f0b4 100644 --- a/src/velocity/java/com/fastsync/velocity/FastSyncProxy.java +++ b/src/velocity/java/com/fastsync/velocity/FastSyncProxy.java @@ -34,14 +34,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + /** * FastSync Velocity proxy plugin. * *

Acts as a cross-server coordinator on the proxy layer: *

    *
  • Tracks which backend server each player is connected to
  • - *
  • Queries the old backend for lock status before allowing a server switch
  • *
  • Notifies the new backend that a player is arriving from another server
  • *
  • Aggregates FastSync status from all backends
  • *
  • Provides a /fastsync command for status and management
  • @@ -76,9 +76,6 @@ public class FastSyncProxy { /** Track player -> last server switch timestamp */ private final ConcurrentHashMap lastSwitchTime = new ConcurrentHashMap<>(); - /** Pending lock status responses: uuid -> future that completes when LOCK_STATUS arrives */ - private final ConcurrentHashMap> pendingLockQueries = new ConcurrentHashMap<>(); - /** Pending status responses: serverName -> future */ private final ConcurrentHashMap> pendingStatusQueries = new ConcurrentHashMap<>(); @@ -113,8 +110,7 @@ public void onProxyInit(ProxyInitializeEvent event) { .repeat(5, TimeUnit.MINUTES) .schedule(); - logger.info("FastSync Proxy initialized. Channel: fastsync:handoff, Smart handoff: {}", - config.isSmartHandoffEnabled()); + logger.info("FastSync Proxy initialized. Channel: fastsync:handoff"); } @Subscribe @@ -122,7 +118,8 @@ public void onProxyShutdown(ProxyShutdownEvent event) { logger.info("FastSync Proxy shutting down. Tracked players: {}", playerServerMap.size()); playerServerMap.clear(); lastSwitchTime.clear(); - pendingLockQueries.clear(); + pendingStatusQueries.values().forEach(future -> + future.completeExceptionally(new IllegalStateException("Proxy is shutting down"))); pendingStatusQueries.clear(); } @@ -166,7 +163,6 @@ public void onDisconnect(DisconnectEvent event) { UUID uuid = player.getUniqueId(); String server = playerServerMap.remove(uuid); lastSwitchTime.remove(uuid); - pendingLockQueries.remove(uuid); if (server != null) { logger.info("Player {} disconnected from {}", player.getUsername(), server); @@ -175,32 +171,6 @@ public void onDisconnect(DisconnectEvent event) { // ==================== Plugin Messaging ==================== - /** - * Query a backend server for the lock status of a player. - * Returns a future that completes when the LOCK_STATUS response arrives. - */ - private CompletableFuture queryLockStatus( - String serverName, UUID uuid) { - CompletableFuture future = new CompletableFuture<>(); - pendingLockQueries.put(uuid, future); - - byte[] data = HandoffProtocol.encodeQueryLock(uuid, playerServerMap.get(uuid)); - proxy.getServer(serverName).ifPresent(server -> { - server.sendPluginMessage(HANDOFF_CHANNEL, data); - }); - - // Timeout fallback - proxy.getScheduler().buildTask(this, () -> { - CompletableFuture pending = pendingLockQueries.get(uuid); - if (pending != null && !pending.isDone()) { - pending.completeExceptionally(new TimeoutException("Lock query timed out")); - pendingLockQueries.remove(uuid); - } - }).delay((int) (config.getWaitTimeoutMs() / 2), TimeUnit.MILLISECONDS).schedule(); - - return future; - } - /** * Send a HANDOFF_NOTIFY to the new backend server. */ @@ -222,8 +192,16 @@ public CompletableFuture> queryAllStatu for (RegisteredServer server : proxy.getAllServers()) { String name = server.getServerInfo().getName(); - CompletableFuture raw = new CompletableFuture<>(); - pendingStatusQueries.put(name, raw); + // The wire protocol has no request ID. Coalesce concurrent admin + // requests per backend instead of replacing the first future and + // forcing its caller to time out. + AtomicBoolean created = new AtomicBoolean(false); + CompletableFuture raw = + pendingStatusQueries.compute(name, (ignored, existing) -> { + if (existing != null && !existing.isDone()) return existing; + created.set(true); + return new CompletableFuture<>(); + }); // Wrap with timeout + exceptionally so one slow backend // doesn't cause allOf to throw. @@ -238,9 +216,12 @@ public CompletableFuture> queryAllStatu .whenComplete((result, ex) -> pendingStatusQueries.remove(name, raw)); futures.add(safe); - boolean sent = server.sendPluginMessage(HANDOFF_CHANNEL, HandoffProtocol.encodeStatusQuery()); - if (!sent) { - raw.complete(null); + if (created.get()) { + boolean sent = server.sendPluginMessage( + HANDOFF_CHANNEL, HandoffProtocol.encodeStatusQuery()); + if (!sent) { + raw.complete(null); + } } } @@ -272,7 +253,7 @@ public void onPluginMessage(PluginMessageEvent event) { event.setResult(PluginMessageEvent.ForwardResult.handled()); // Only accept messages from backend servers, not from player clients. - if (!(event.getSource() instanceof com.velocitypowered.api.proxy.ServerConnection)) { + if (!(event.getSource() instanceof com.velocitypowered.api.proxy.ServerConnection source)) { if (config != null && config.isDebug()) { logger.warn("[Handoff] Rejected plugin message from non-server source: {}", event.getSource().getClass().getName()); @@ -287,22 +268,26 @@ public void onPluginMessage(PluginMessageEvent event) { switch (type) { case HandoffProtocol.LOCK_STATUS -> { HandoffProtocol.LockStatusData status = HandoffProtocol.decodeLockStatus(data); - CompletableFuture future = - pendingLockQueries.remove(status.uuid()); - if (future != null) { - future.complete(status); - } + String sourceName = source.getServerInfo().getName(); if (config.isDebug()) { - logger.info("[Handoff] Lock status for {}: locked={} by {}", - status.uuid(), status.locked(), status.serverName()); + logger.info("[Handoff] Unsolicited lock status from {} for {}: locked={} by {}", + sourceName, status.uuid(), status.locked(), status.serverName()); } } case HandoffProtocol.STATUS_RESPONSE -> { - HandoffProtocol.StatusResponseData status = HandoffProtocol.decodeStatusResponse(data); + HandoffProtocol.StatusResponseData decoded = HandoffProtocol.decodeStatusResponse(data); + String sourceName = source.getServerInfo().getName(); + // Treat the authenticated Velocity connection as identity; + // the backend's self-reported name is display metadata and + // must not be able to impersonate another registered server. + HandoffProtocol.StatusResponseData status = + new HandoffProtocol.StatusResponseData( + sourceName, decoded.dbHealthy(), decoded.redisHealthy(), + decoded.playerCount(), decoded.pendingSaves(), decoded.pendingLoads()); CompletableFuture future = - pendingStatusQueries.remove(status.serverName()); - if (future != null) { - future.complete(status); + pendingStatusQueries.get(sourceName); + if (future != null && future.complete(status)) { + pendingStatusQueries.remove(sourceName, future); } if (config.isDebug()) { logger.info("[Handoff] Status from {}: db={} redis={} players={}", @@ -499,8 +484,7 @@ private void handleReload(CommandSource source) { } private void handleDebug(CommandSource source) { - // Toggle debug by reloading config with debug inverted - // Since we can't easily mutate the loaded config, just log + config.setDebug(!config.isDebug()); source.sendMessage(msg("proxy.debug.state", config.isDebug())); source.sendMessage(msg("proxy.debug.hint")); } diff --git a/src/velocity/java/com/fastsync/velocity/HandoffProtocol.java b/src/velocity/java/com/fastsync/velocity/HandoffProtocol.java index 9ca4884..293dda5 100644 --- a/src/velocity/java/com/fastsync/velocity/HandoffProtocol.java +++ b/src/velocity/java/com/fastsync/velocity/HandoffProtocol.java @@ -32,6 +32,29 @@ public final class HandoffProtocol { private HandoffProtocol() {} + private static DataInputStream decoder(byte[] data, int expectedType) { + if (data == null || data.length == 0) { + throw new IllegalArgumentException("Empty handoff message"); + } + DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); + try { + int actualType = in.readUnsignedByte(); + if (actualType != expectedType) { + throw new IllegalArgumentException("Unexpected handoff message type " + + actualType + " (expected " + expectedType + ")"); + } + return in; + } catch (IOException e) { + throw new IllegalArgumentException("Truncated handoff message", e); + } + } + + private static void requireEnd(DataInputStream in) throws IOException { + if (in.read() != -1) { + throw new IllegalArgumentException("Trailing bytes in handoff message"); + } + } + // ==================== QUERY_LOCK ==================== // [type=1][uuid][newServer] @@ -51,13 +74,13 @@ public static byte[] encodeQueryLock(UUID uuid, String newServer) { public static QueryLockData decodeQueryLock(byte[] data) { try { - DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); - in.readByte(); // type + DataInputStream in = decoder(data, QUERY_LOCK); UUID uuid = UUID.fromString(in.readUTF()); String newServer = in.readUTF(); + requireEnd(in); return new QueryLockData(uuid, newServer); } catch (IOException e) { - throw new RuntimeException(e); + throw new IllegalArgumentException("Invalid QUERY_LOCK message", e); } } @@ -83,14 +106,14 @@ public static byte[] encodeLockStatus(UUID uuid, boolean locked, String serverNa public static LockStatusData decodeLockStatus(byte[] data) { try { - DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); - in.readByte(); // type + DataInputStream in = decoder(data, LOCK_STATUS); UUID uuid = UUID.fromString(in.readUTF()); boolean locked = in.readBoolean(); String serverName = in.readUTF(); + requireEnd(in); return new LockStatusData(uuid, locked, serverName); } catch (IOException e) { - throw new RuntimeException(e); + throw new IllegalArgumentException("Invalid LOCK_STATUS message", e); } } @@ -116,14 +139,14 @@ public static byte[] encodeHandoffNotify(UUID uuid, String oldServer, String new public static HandoffNotifyData decodeHandoffNotify(byte[] data) { try { - DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); - in.readByte(); // type + DataInputStream in = decoder(data, HANDOFF_NOTIFY); UUID uuid = UUID.fromString(in.readUTF()); String oldServer = in.readUTF(); String newServer = in.readUTF(); + requireEnd(in); return new HandoffNotifyData(uuid, oldServer, newServer); } catch (IOException e) { - throw new RuntimeException(e); + throw new IllegalArgumentException("Invalid HANDOFF_NOTIFY message", e); } } @@ -154,7 +177,7 @@ public static byte[] encodeStatusResponse( ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(baos); out.writeByte(STATUS_RESPONSE); - out.writeUTF(serverName); + out.writeUTF(serverName != null ? serverName : ""); out.writeBoolean(dbHealthy); out.writeBoolean(redisHealthy); out.writeInt(playerCount); @@ -169,18 +192,21 @@ public static byte[] encodeStatusResponse( public static StatusResponseData decodeStatusResponse(byte[] data) { try { - DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); - in.readByte(); // type + DataInputStream in = decoder(data, STATUS_RESPONSE); String serverName = in.readUTF(); boolean dbHealthy = in.readBoolean(); boolean redisHealthy = in.readBoolean(); int playerCount = in.readInt(); int pendingSaves = in.readInt(); int pendingLoads = in.readInt(); + if (playerCount < 0 || pendingSaves < 0 || pendingLoads < 0) { + throw new IllegalArgumentException("Negative status counter in handoff message"); + } + requireEnd(in); return new StatusResponseData(serverName, dbHealthy, redisHealthy, playerCount, pendingSaves, pendingLoads); } catch (IOException e) { - throw new RuntimeException(e); + throw new IllegalArgumentException("Invalid STATUS_RESPONSE message", e); } } diff --git a/src/velocity/java/com/fastsync/velocity/ProxyConfig.java b/src/velocity/java/com/fastsync/velocity/ProxyConfig.java index 8135cfb..eb8bc40 100644 --- a/src/velocity/java/com/fastsync/velocity/ProxyConfig.java +++ b/src/velocity/java/com/fastsync/velocity/ProxyConfig.java @@ -17,13 +17,6 @@ public class ProxyConfig { private final Path dataDirectory; private final Logger logger; - // Smart handoff - private boolean smartHandoffEnabled = true; - private long waitTimeoutMs = 5000; - private long pollIntervalMs = 500; - private String waitingMessage = "&e[FastSync] Waiting for your data to be saved..."; - private String timeoutMessage = "&c[FastSync] Data save timed out, transferring anyway."; - // Status private long statusQueryTimeoutMs = 3000; @@ -62,15 +55,6 @@ public void load() { if (root == null) return; - Map handoff = getMap(root, "smart-handoff"); - if (handoff != null) { - smartHandoffEnabled = getBool(handoff, "enabled", true); - waitTimeoutMs = getLong(handoff, "wait-timeout-ms", 5000); - pollIntervalMs = getLong(handoff, "poll-interval-ms", 500); - waitingMessage = getString(handoff, "waiting-message", waitingMessage); - timeoutMessage = getString(handoff, "timeout-message", timeoutMessage); - } - Map status = getMap(root, "status"); if (status != null) { statusQueryTimeoutMs = getLong(status, "query-timeout-ms", 3000); @@ -78,14 +62,25 @@ public void load() { debug = getBool(root, "debug", false); - logger.info("Proxy config loaded: smart-handoff={}, wait-timeout={}ms, debug={}", - smartHandoffEnabled, waitTimeoutMs, debug); + validateRanges(); + + logger.info("Proxy config loaded: status-timeout={}ms, debug={}", + statusQueryTimeoutMs, debug); } catch (Exception e) { logger.warn("Failed to load proxy-config.yml, using defaults", e); } } + /** Keep the status-query scheduler delay bounded. */ + void validateRanges() { + long configuredStatus = statusQueryTimeoutMs; + statusQueryTimeoutMs = Math.max(100L, Math.min(statusQueryTimeoutMs, 60_000L)); + if (statusQueryTimeoutMs != configuredStatus) { + logger.warn("status.query-timeout-ms must be 100-60000; using {}", statusQueryTimeoutMs); + } + } + @SuppressWarnings("unchecked") private Map getMap(Map parent, String key) { Object val = parent.get(key); @@ -103,17 +98,8 @@ private long getLong(Map map, String key, long def) { return def; } - private String getString(Map map, String key, String def) { - Object val = map.get(key); - return val instanceof String ? (String) val : def; - } - // Getters - public boolean isSmartHandoffEnabled() { return smartHandoffEnabled; } - public long getWaitTimeoutMs() { return waitTimeoutMs; } - public long getPollIntervalMs() { return pollIntervalMs; } - public String getWaitingMessage() { return waitingMessage; } - public String getTimeoutMessage() { return timeoutMessage; } public long getStatusQueryTimeoutMs() { return statusQueryTimeoutMs; } public boolean isDebug() { return debug; } + public void setDebug(boolean debug) { this.debug = debug; } } diff --git a/src/velocity/resources/proxy-config.yml b/src/velocity/resources/proxy-config.yml index e4d66d2..44381c3 100644 --- a/src/velocity/resources/proxy-config.yml +++ b/src/velocity/resources/proxy-config.yml @@ -5,30 +5,6 @@ # Backend Paper/Folia servers use their own config.yml. # ============================================================ -# ------------------------------------------------------------ -# Smart Handoff Settings -# ------------------------------------------------------------ -# When a player switches servers, the proxy asks the old backend -# if it has finished saving the player's data. If not, the proxy -# delays the transfer until the save completes or the timeout fires. -smart-handoff: - # Enable smart handoff coordination - enabled: true - - # Maximum time (ms) to wait for the old server to release the lock - # before allowing the transfer anyway (assume stale lock) - wait-timeout-ms: 5000 - - # Polling interval (ms) for re-checking lock status - poll-interval-ms: 500 - - # Message shown to the player while waiting for data save to complete - # (set to empty string to show no message) - waiting-message: "&e[FastSync] Waiting for your data to be saved..." - - # Message shown when the wait times out - timeout-message: "&c[FastSync] Data save timed out, transferring anyway." - # ------------------------------------------------------------ # Status Aggregation Settings # ------------------------------------------------------------ diff --git a/src/velocity/resources/proxy-messages.yml b/src/velocity/resources/proxy-messages.yml index d64628a..bf836a1 100644 --- a/src/velocity/resources/proxy-messages.yml +++ b/src/velocity/resources/proxy-messages.yml @@ -41,9 +41,4 @@ proxy: # ==================== /fastsync debug ==================== debug: state: "Debug mode: {0}" - hint: "(Edit proxy-config.yml and run /fastsync reload to change)" - - # ==================== Handoff messages ==================== - handoff: - waiting: "[FastSync] Waiting for your data to be saved..." - timeout: "[FastSync] Data save timed out, transferring anyway." + hint: "(Runtime only; edit proxy-config.yml to persist across restarts)"