Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,15 @@ Redis 只负责协调(通知谁该重试),数据库的 fencing token + ver
代理插件不是必须的。后端插件可独立运行。安装后获得:
- `/fastsync status`:代理端聚合查看所有后端健康状态
- `/fastsync players`:查看所有玩家当前所在子服
- 玩家切服通知
- handoff 通知:代理记录切服关系并通知新后端;锁等待与最终放行由后端登录门禁完成

### 4. 基本配置

首次启动后编辑 `plugins/FastSync/config.yml`:

```yaml
server-name: "survival-1" # 每个子服唯一标识
cluster-id: "survival-main" # 同一逻辑集群保持一致,不能为空

database:
host: "mysql.example.com"
Expand Down Expand Up @@ -171,7 +172,6 @@ redis:
| `ssl` | `false` | SSL 加密 |
| `timeout` | `5000` | 连接超时(ms) |
| `channel-prefix` | `fastsync:lock:` | Pub/Sub 频道前缀 |
| `cache-enabled` | `false` | Redis 数据缓存 |
| `streams-enabled` | `true` | Redis Streams 可靠事件 |
| `stream-maxlen` | `100000` | Stream 最大条数,0 = 不裁剪 |
| `stream-trim-approx` | `true` | 近似裁剪(~MAXLEN),性能更好 |
Expand Down Expand Up @@ -199,7 +199,6 @@ redis:
| `heartbeat-interval-seconds` | `10` | 心跳间隔(自动校正 ≤ lock-timeout/3) |
| `lock-retry-interval-ms` | `300` | 锁重试间隔 |
| `lock-max-retries` | `15` | 锁最大重试次数 |
| `save-delay` | `0` | 保存延迟(tick) |
| `clear-before-apply` | `true` | 应用前清空(防复制) |
| `periodic-save` | `false` | 周期保存开关 |
| `periodic-save-interval-seconds` | `300` | 周期保存间隔 |
Expand Down Expand Up @@ -244,16 +243,16 @@ redis:
|--------|--------|------|
| `enabled` | `true` | 冲突快照开关 |
| `max-snapshots` | `16` | 每玩家最大快照数 |
| `backup-frequency-ms` | `14400000` | 定期备份间隔(4 小时) |
| `backup-frequency-ms` | `14400000` | `save-trigger` 命中后的每玩家最小快照间隔(4 小时;设为 0 不限频) |
| `save-trigger` | `never` | `never` / `always` / 逗号分隔原因列表 |

### 集群 (`cluster-id`)

```yaml
cluster-id: ""
cluster-id: "survival-main"
```

cluster-id 只隔离 Redis 消息(topic/stream/consumer group),**不隔离数据库行**。非空 cluster-id + 默认 `fastsync_` table-prefix = **拒绝启动**。多集群必须使用不同 table-prefix 或不同 database
`cluster-id` 是必填的集群身份:它同时进入数据库复合主键,并隔离 Redis topicstreamconsumer group。同一逻辑集群的所有后端必须使用相同值;不同集群可在同一数据库与默认 `fastsync_` table-prefix 下安全共存

### 操作日志 (`operation-log:`)

Expand All @@ -273,9 +272,9 @@ cluster-id 只隔离 Redis 消息(topic/stream/consumer group),**不隔离

| 命令 | 说明 |
|------|------|
| `/fastsync reload` | 重载配置(重置心跳任务、保护模式) |
| `/fastsync reload` | 事务式热重载;心跳/周期保存会重排,DB、cluster、Redis、线程池等启动期参数变化会拒绝并提示重启 |
| `/fastsync status` | 查看 DB/Redis 状态、在线玩家、pending 数、final-save 队列/fallback 计数、OpLog 状态、HikariCP 池、延迟百分位 |
| `/fastsync debug` | 开关调试模式 |
| `/fastsync debug` | 开关本次运行期调试模式(持久化请编辑 config.yml) |
| `/fastsync saveall` | 强制保存所有在线玩家(Folia 安全两阶段) |
| `/fastsync log <player> [n]` | 查看玩家操作日志(默认 20 条,最多 50) |

Expand Down
17 changes: 15 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,12 @@ tasks.named("build") {

val ci = tasks.register("ci") {
group = "ci"
description = "Full CI pipeline: clean, build, and test."
dependsOn("clean", "build", "check")
description = "Full CI pipeline: build, package, and test."
// Java's build lifecycle already depends on both assemble and check. Do not
// add clean as a sibling dependency: with org.gradle.parallel=true Gradle
// may delete class outputs while compileTestJava/shadowJar are reading them.
// Call `./gradlew clean ci` when an explicitly clean local build is needed.
dependsOn("build")
}

val testGroup = tasks.register("testGroup") {
Expand Down Expand Up @@ -273,6 +277,15 @@ dependencies {
}
sourceSets["velocity"].compileClasspath += velocityOnly

// Let the standard JUnit suite exercise protocol/config code from the custom
// Velocity source set. Without this bridge, proxy behavior compiled but had no
// executable regression tests at all.
sourceSets["test"].compileClasspath += sourceSets["velocity"].output + velocityOnly
sourceSets["test"].runtimeClasspath += sourceSets["velocity"].output + velocityOnly
tasks.named<JavaCompile>("compileTestJava") {
dependsOn("velocityClasses")
}

val velocityJar = tasks.register<Jar>("velocityJar") {
group = "build"
description = "Packages the Velocity proxy plugin JAR."
Expand Down
131 changes: 89 additions & 42 deletions src/main/java/com/fastsync/FastSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,37 @@ private void restartHeartbeatTask() {
+ " (lock-timeout=" + configManager.getLockTimeout() + "s).");
}

/** Start, stop, or reschedule periodic saves after a validated reload. */
private void restartPeriodicSaveTask() {
if (periodicSaveTask != null) {
SchedulerUtil.cancel(periodicSaveTask);
periodicSaveTask = null;
}
if (!configManager.isPeriodicSave()) {
getLogger().info("Periodic save disabled.");
return;
}

long intervalTicks = configManager.getPeriodicSaveIntervalSeconds() * 20L;
periodicSaveTask = SchedulerUtil.runGlobalTimer(this, () -> {
List<Player> players = new ArrayList<>(Bukkit.getOnlinePlayers());
final int batchSize = configManager.getPeriodicSaveBatchSize();
for (int i = 0; i < players.size(); i += batchSize) {
final int start = i;
final int end = Math.min(i + batchSize, players.size());
long delayTicks = i / batchSize;
SchedulerUtil.runGlobalDelayed(this, () -> {
for (int j = start; j < end; j++) {
syncManager.savePlayerAsync(players.get(j));
}
}, delayTicks);
}
}, intervalTicks, intervalTicks);
getLogger().info("Periodic save scheduled: every "
+ configManager.getPeriodicSaveIntervalSeconds() + " seconds, batch="
+ configManager.getPeriodicSaveBatchSize() + ".");
}

@Override
public void onEnable() {
instance = this;
Expand Down Expand Up @@ -128,15 +159,44 @@ public void onEnable() {
databaseManager = new DatabaseManager(getLogger(), configManager);
try {
databaseManager.initialize();
} catch (SQLException e) {
} catch (SQLException | RuntimeException e) {
getLogger().log(Level.SEVERE, "Failed to initialize database! Check your config.yml.", e);
try {
databaseManager.close();
} catch (RuntimeException cleanupError) {
e.addSuppressed(cleanupError);
getLogger().log(Level.WARNING, "Database cleanup also failed", cleanupError);
}
databaseManager = null;
getServer().getPluginManager().disablePlugin(this);
return;
}

// Initialize sync manager (creates thread pool + optional Redis)
syncManager = new SyncManager(this, configManager, databaseManager);
syncManager.initialize();
try {
syncManager.initialize();
} catch (RuntimeException e) {
getLogger().log(Level.SEVERE, "Failed to initialize synchronization services — refusing to start.", e);
try {
syncManager.beginShutdown();
syncManager.shutdown();
} catch (RuntimeException cleanupError) {
e.addSuppressed(cleanupError);
getLogger().log(Level.WARNING, "Synchronization cleanup also failed", cleanupError);
} finally {
syncManager = null;
try {
databaseManager.close();
} catch (RuntimeException cleanupError) {
e.addSuppressed(cleanupError);
getLogger().log(Level.WARNING, "Database cleanup also failed", cleanupError);
}
databaseManager = null;
}
getServer().getPluginManager().disablePlugin(this);
return;
}

// Register plugin messaging channel for proxy handoff communication
// This is optional — if no Velocity proxy with FastSync Proxy is installed,
Expand Down Expand Up @@ -179,45 +239,14 @@ public void onEnable() {
// Runs on async thread (DB I/O only, no Bukkit API calls).
restartHeartbeatTask();

// Start periodic save task (if enabled)
if (configManager.isPeriodicSave()) {
long intervalTicks = configManager.getPeriodicSaveIntervalSeconds() * 20L;
periodicSaveTask = SchedulerUtil.runGlobalTimer(this, () -> {
// Snapshot online players on the global region thread, then save them
// in small batches spread across successive ticks to avoid a lag spike
// when many players are online (process at most 10 players per tick).
//
// CRITICAL (Folia-safety): the batched dispatch MUST run on the
// global region (runGlobalDelayed), NOT on an async thread
// (runAsyncDelayed). savePlayerAsync(player) reads
// player.getUniqueId() and calls SchedulerUtil.runAtEntity(plugin,
// player, ...) — both touch the Player object. On Folia, async
// threads must not touch Player entities; the global region is
// the safe context for these reads. The actual DB write still
// happens on the async executor (dispatched from inside
// runAtEntity), so the global region is only used for the
// brief per-player dispatch, not for the DB wait.
List<Player> players = new ArrayList<>(Bukkit.getOnlinePlayers());
final int batchSize = configManager.getPeriodicSaveBatchSize();
for (int i = 0; i < players.size(); i += batchSize) {
final int start = i;
final int end = Math.min(i + batchSize, players.size());
long delayTicks = i / batchSize;
SchedulerUtil.runGlobalDelayed(this, () -> {
for (int j = start; j < end; j++) {
syncManager.savePlayerAsync(players.get(j));
}
}, delayTicks);
}
}, intervalTicks, intervalTicks);
getLogger().info("Periodic save enabled: every " +
configManager.getPeriodicSaveIntervalSeconds() + " seconds");
}
// Start periodic save task (the same helper safely reschedules it on reload).
restartPeriodicSaveTask();

getLogger().info("FastSync v" + getPluginMeta().getVersion() + " enabled!");
getLogger().info("Server ID: " + configManager.getServerName());
getLogger().info("Serialization: Paper native ItemStack byte serialization");
getLogger().info("Compression: " + (configManager.isCompressionEnabled() ? "LZ4" : "Disabled"));
getLogger().info("Compression: " + (configManager.isCompressionEnabled()
? configManager.getCompressionType() : "Disabled"));
getLogger().info("Redis: " + (configManager.isRedisEnabled() ? "Enabled" : "Disabled (DB polling)"));
}

Expand Down Expand Up @@ -276,13 +305,27 @@ public boolean onCommand(@NotNull CommandSender sender, @NotNull Command command

switch (args[0].toLowerCase()) {
case "reload" -> {
configManager.reload();
ConfigManager.ReloadResult reloadResult;
try {
reloadResult = configManager.reloadSafely();
} catch (RuntimeException e) {
getLogger().log(Level.WARNING, "Configuration reload rejected", e);
sendMessage(sender, RED + "[FastSync] Reload rejected: " + e.getMessage());
return true;
}
if (!reloadResult.applied()) {
sendMessage(sender, YELLOW + "[FastSync] Reload not applied; restart required for: "
+ String.join(", ", reloadResult.restartRequiredChanges()));
return true;
}
// Refresh SyncManager caches that depend on config (e.g. snapshot trigger set)
if (syncManager != null) {
syncManager.refreshConfigCache();
}
// Restart heartbeat task — interval may have changed
restartHeartbeatTask();
// Periodic-save enable/interval/batch are live-reloadable.
restartPeriodicSaveTask();
// Reset protection mode on reload — only if DB is healthy
boolean resetOk = syncManager.resetProtectionMode();
if (resetOk) {
Expand All @@ -293,7 +336,8 @@ public boolean onCommand(@NotNull CommandSender sender, @NotNull Command command
sendMessage(sender, GREEN + "[FastSync] Configuration reloaded.");
sendMessage(sender, GRAY + "Server: " + configManager.getServerName());
sendMessage(sender, GRAY + "Compression: " +
(configManager.isCompressionEnabled() ? "LZ4" : "Disabled"));
(configManager.isCompressionEnabled()
? configManager.getCompressionType() : "Disabled"));
sendMessage(sender, GRAY + "Redis: " +
(configManager.isRedisEnabled() ? "Enabled" : "Disabled"));
sendMessage(sender, GRAY + "Heartbeat: every " +
Expand All @@ -302,11 +346,14 @@ public boolean onCommand(@NotNull CommandSender sender, @NotNull Command command
case "status" -> sendStatus(sender);
case "debug" -> {
boolean newDebug = !configManager.isDebug();
getConfig().set("debug", newDebug);
saveConfig();
configManager.reload();
// Runtime-only by design. Re-saving Bukkit's cached config here
// used to overwrite external edits and Sparrow migrations with
// a stale in-memory copy of the whole YAML document.
configManager.setDebug(newDebug);
syncManager.refreshConfigCache();
sendMessage(sender, GREEN + "[FastSync] Debug mode: " +
(newDebug ? GREEN + "ON" : RED + "OFF"));
sendMessage(sender, GRAY + "Runtime only; edit config.yml to persist across restarts.");
}
case "saveall" -> {
sendMessage(sender, YELLOW + "[FastSync] Saving all online players...");
Expand Down
Loading