Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ processResources {
}
}

def targetJavaVersion = 21
def targetJavaVersion = 25
tasks.withType(JavaCompile).configureEach {
it.options.encoding = "UTF-8"
it.options.release = targetJavaVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@ public class NotEnoughBandwidthConfig implements TConfig {
add("minecraft:player_info_remove");
}};
public boolean debugLog = false;
public int compressionLevel = 6;
public int contextLevel = 23;
public int compressionLevel = 10;
public int contextLevel = 25;
public int minCompressionBytes = 24;
public int minCompressionSavingsBytes = 12;
public int minCompressionSavingsPercent = 5;
public int aggregationMinBatchPackets = 6;
public int aggregationMaxExtraCycles = 1;
public int dccSizeLimit = 60;
public int dccDistance = 5;
public int dccTimeout = 60;
public boolean chunkCacheEnabled = true;
public int chunkCacheMaxSizeMB = 2048;
public int chunkCacheCompressionLevel = 8;

@Expose(serialize = false, deserialize = false)
public static final HashSet<String> COMMON_BLOCK_LIST = new HashSet<>() {{
Expand Down Expand Up @@ -62,4 +68,28 @@ public int getCompressionLevel() {
public int getContextLevel() {
return MathHelper.clamp(contextLevel, 21, 25);
}

public int getMinCompressionBytes() {
return MathHelper.clamp(minCompressionBytes, 1, 512);
}

public int getMinCompressionSavingsBytes() {
return MathHelper.clamp(minCompressionSavingsBytes, 0, 1024);
}

public int getMinCompressionSavingsPercent() {
return MathHelper.clamp(minCompressionSavingsPercent, 0, 95);
}

public int getAggregationMinBatchPackets() {
return MathHelper.clamp(aggregationMinBatchPackets, 1, 64);
}

public int getAggregationMaxExtraCycles() {
return MathHelper.clamp(aggregationMaxExtraCycles, 0, 20);
}

public int getChunkCacheCompressionLevel() {
return MathHelper.clamp(chunkCacheCompressionLevel, 1, 19);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cn.ussshenzhou.notenoughbandwidth.aggregation;

import cn.ussshenzhou.notenoughbandwidth.NotEnoughBandwidthConfig;
import cn.ussshenzhou.notenoughbandwidth.network.NebConnectionRegistry;
import cn.ussshenzhou.notenoughbandwidth.util.DefaultChannelPipelineHelper;
import cn.ussshenzhou.notenoughbandwidth.util.PacketUtil;
Expand All @@ -20,8 +21,6 @@

public class AggregationManager {
private static final Logger LOGGER = LoggerFactory.getLogger("NEB-Aggregation");
private static final int MIN_BATCH_PACKETS = 4;
private static final int MAX_EXTRA_CYCLES = 2;
private static final ConcurrentHashMap<ClientConnection, ArrayList<AggregatedEncodePacket>> PACKET_BUFFER = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<ClientConnection, Integer> FLUSH_WAIT = new ConcurrentHashMap<>();
private static final ScheduledExecutorService TIMER = Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -50,6 +49,9 @@ public static void takeOver(Packet<?> packet, ClientConnection connection) {
}

private static void flush() {
var cfg = NotEnoughBandwidthConfig.get();
int minBatchPackets = cfg.getAggregationMinBatchPackets();
int maxExtraCycles = cfg.getAggregationMaxExtraCycles();
// Purge dead connections without holding a global lock.
PACKET_BUFFER.keySet().removeIf(c -> !c.isOpen());
FLUSH_WAIT.keySet().removeIf(c -> !c.isOpen());
Expand All @@ -63,9 +65,9 @@ private static void flush() {
if (packets.isEmpty()) {
continue;
}
if (packets.size() < MIN_BATCH_PACKETS) {
if (packets.size() < minBatchPackets) {
int waited = FLUSH_WAIT.getOrDefault(connection, 0);
if (waited < MAX_EXTRA_CYCLES) {
if (waited < maxExtraCycles) {
FLUSH_WAIT.put(connection, waited + 1);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public PacketAggregationPacket(ArrayList<AggregatedEncodePacket> packetsToEncode
}

public void write(RegistryByteBuf buffer) {
var cfg = NotEnoughBandwidthConfig.get();
var rawBuf = new RegistryByteBuf(ByteBufAllocator.DEFAULT.buffer(), buffer.getRegistryManager());
try {
packetsToEncode.forEach(p -> encodeSubPacket(rawBuf, p));
Expand All @@ -63,23 +64,41 @@ public void write(RegistryByteBuf buffer) {
rawBuf.getBytes(rawBuf.readerIndex(), sample);
DictionaryManager.collectSample(sample);
}
boolean compress = rawSize >= 32;
buffer.writeBoolean(compress);
if (compress) {
buffer.writeVarInt(rawSize);
boolean shouldTryCompress = rawSize >= cfg.getMinCompressionBytes();
if (shouldTryCompress) {
var compressedBuf = new PacketByteBuf(ZstdHelper.compress(connection, rawBuf));
try {
if (ConfigHelper.getConfigRead(NotEnoughBandwidthConfig.class).debugLog) {
LOGGER.debug("Aggregated and compressed: {} -> {} bytes ({} %)",
rawSize, compressedBuf.readableBytes(),
String.format("%.2f", 100f * compressedBuf.readableBytes() / rawSize));
int compressedSize = compressedBuf.readableBytes();
int minSavingsBytes = cfg.getMinCompressionSavingsBytes();
int minSavingsPercent = cfg.getMinCompressionSavingsPercent();
int savedBytes = rawSize - compressedSize;
boolean enoughByteSavings = savedBytes >= minSavingsBytes;
boolean enoughPercentSavings =
compressedSize * 100L <= rawSize * (100L - minSavingsPercent);
boolean useCompressed = enoughByteSavings && enoughPercentSavings;
buffer.writeBoolean(useCompressed);
if (useCompressed) {
buffer.writeVarInt(rawSize);
if (ConfigHelper.getConfigRead(NotEnoughBandwidthConfig.class).debugLog) {
LOGGER.debug("Aggregated and compressed: {} -> {} bytes ({} %)",
rawSize, compressedSize,
String.format("%.2f", 100f * compressedSize / rawSize));
}
buffer.writeBytes(compressedBuf);
this.bakedSize = compressedSize;
} else {
if (ConfigHelper.getConfigRead(NotEnoughBandwidthConfig.class).debugLog) {
LOGGER.debug("Skip compression (insufficient gain): {} -> {} bytes, saved {} bytes",
rawSize, compressedSize, savedBytes);
}
buffer.writeBytes(rawBuf);
this.bakedSize = rawSize;
}
buffer.writeBytes(compressedBuf);
this.bakedSize = compressedBuf.readableBytes();
} finally {
compressedBuf.release();
}
} else {
buffer.writeBoolean(false);
buffer.writeBytes(rawBuf);
this.bakedSize = rawSize;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cn.ussshenzhou.notenoughbandwidth.chunkcache;

import cn.ussshenzhou.notenoughbandwidth.NotEnoughBandwidthConfig;
import com.github.luben.zstd.Zstd;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
Expand Down Expand Up @@ -78,7 +79,8 @@ private void migrateIfNeeded() {
}

public void put(long hash, byte[] data) {
byte[] compressed = Zstd.compress(data, 6);
int compressionLevel = NotEnoughBandwidthConfig.get().getChunkCacheCompressionLevel();
byte[] compressed = Zstd.compress(data, compressionLevel);
byte[] value = new byte[TIMESTAMP_BYTES + compressed.length];
ByteBuffer.wrap(value).putLong(System.currentTimeMillis());
System.arraycopy(compressed, 0, value, TIMESTAMP_BYTES, compressed.length);
Expand Down