Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hugegraph.define.WorkLoad;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
Expand All @@ -43,6 +45,8 @@
@PreMatching
public class LoadDetectFilter implements ContainerRequestFilter {

private static final Logger LOG = Log.logger(LoadDetectFilter.class);

private static final Set<String> WHITE_API_LIST = ImmutableSet.of(
"",
"apis",
Expand All @@ -54,11 +58,40 @@ public class LoadDetectFilter implements ContainerRequestFilter {
private static final RateLimiter GC_RATE_LIMITER =
RateLimiter.create(1.0 / 30);

// Log at most 1 request per second to avoid too many logs when server is under heavy load
private static final RateLimiter REJECT_LOG_RATE_LIMITER = RateLimiter.create(1.0);

@Context
private jakarta.inject.Provider<HugeConfig> configProvider;
@Context
private jakarta.inject.Provider<WorkLoad> loadProvider;

public static boolean isWhiteAPI(ContainerRequestContext context) {
List<PathSegment> segments = context.getUriInfo().getPathSegments();
E.checkArgument(!segments.isEmpty(), "Invalid request uri '%s'",
context.getUriInfo().getPath());
String rootPath = segments.get(0).getPath();
return WHITE_API_LIST.contains(rootPath);
}

protected boolean gcIfNeeded() {
if (GC_RATE_LIMITER.tryAcquire(1)) {
System.gc();
return true;
}
return false;
}

protected boolean allowRejectLog() {
return REJECT_LOG_RATE_LIMITER.tryAcquire();
}

protected void logRejectWarning(String message, Object... args) {
if (this.allowRejectLog()) {
LOG.warn(message, args);
}
}

@Override
public void filter(ContainerRequestContext context) {
if (LoadDetectFilter.isWhiteAPI(context)) {
Expand All @@ -70,7 +103,12 @@ public void filter(ContainerRequestContext context) {
int maxWorkerThreads = config.get(ServerOptions.MAX_WORKER_THREADS);
WorkLoad load = this.loadProvider.get();
// There will be a thread doesn't work, dedicated to statistics
if (load.incrementAndGet() >= maxWorkerThreads) {
int currentLoad = load.incrementAndGet();
if (currentLoad >= maxWorkerThreads) {
this.logRejectWarning("Rejected request due to high worker load, method={}, path={}, " +
"currentLoad={}, maxWorkerThreads={}",
context.getMethod(), context.getUriInfo().getPath(),
currentLoad, maxWorkerThreads);
throw new ServiceUnavailableException(String.format(
"The server is too busy to process the request, " +
"you can config %s to adjust it or try again later",
Expand All @@ -83,7 +121,17 @@ public void filter(ContainerRequestContext context) {
long presumableFreeMem = (Runtime.getRuntime().maxMemory() -
allocatedMem) / Bytes.MB;
if (presumableFreeMem < minFreeMemory) {
gcIfNeeded();
boolean gcTriggered = this.gcIfNeeded();
long allocatedMemAfterCheck = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory();
long recheckedFreeMem = (Runtime.getRuntime().maxMemory() -
allocatedMemAfterCheck) / Bytes.MB;
this.logRejectWarning("Rejected request due to low free memory, method={}, path={}, " +
"presumableFreeMemMB={}, recheckedFreeMemMB={}, gcTriggered={}, " +
"minFreeMemoryMB={}",
context.getMethod(), context.getUriInfo().getPath(),
presumableFreeMem, recheckedFreeMem, gcTriggered,
minFreeMemory);
Comment on lines +124 to +134
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the low-free-memory rejection path, allocatedMemAfterCheck / recheckedFreeMem are computed on every rejected request, even when the reject log is suppressed by REJECT_LOG_RATE_LIMITER (since allowRejectLog() is only checked inside logRejectWarning()). To keep the rate-limiting effective under overload/low-memory conditions, consider checking allowRejectLog() first (or having logRejectWarning() return a boolean) and only doing the recheck computations when a log will actually be emitted.

Copilot uses AI. Check for mistakes.
throw new ServiceUnavailableException(String.format(
"The server available memory %s(MB) is below than " +
"threshold %s(MB) and can't process the request, " +
Expand All @@ -92,18 +140,4 @@ public void filter(ContainerRequestContext context) {
ServerOptions.MIN_FREE_MEMORY.name()));
}
}

public static boolean isWhiteAPI(ContainerRequestContext context) {
List<PathSegment> segments = context.getUriInfo().getPathSegments();
E.checkArgument(!segments.isEmpty(), "Invalid request uri '%s'",
context.getUriInfo().getPath());
String rootPath = segments.get(0).getPath();
return WHITE_API_LIST.contains(rootPath);
}

private static void gcIfNeeded() {
if (GC_RATE_LIMITER.tryAcquire(1)) {
System.gc();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hugegraph.unit;

import org.apache.hugegraph.core.RoleElectionStateMachineTest;
import org.apache.hugegraph.unit.api.filter.LoadDetectFilterTest;
import org.apache.hugegraph.unit.api.filter.PathFilterTest;
import org.apache.hugegraph.unit.cache.CacheManagerTest;
import org.apache.hugegraph.unit.cache.CacheTest;
Expand Down Expand Up @@ -78,6 +79,7 @@
@RunWith(Suite.class)
@Suite.SuiteClasses({
/* api filter */
LoadDetectFilterTest.class,
PathFilterTest.class,

/* cache */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.unit.api.filter;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.hugegraph.api.filter.LoadDetectFilter;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.ServerOptions;
import org.apache.hugegraph.define.WorkLoad;
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.unit.BaseUnitTest;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.Logger;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import jakarta.inject.Provider;
import jakarta.ws.rs.ServiceUnavailableException;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.core.PathSegment;
import jakarta.ws.rs.core.UriInfo;

public class LoadDetectFilterTest extends BaseUnitTest {

private static final Logger TEST_LOGGER =
(Logger) LogManager.getLogger(LoadDetectFilter.class);

Comment on lines +57 to +59
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TEST_LOGGER is obtained by casting LogManager.getLogger(LoadDetectFilter.class) to org.apache.logging.log4j.core.Logger. With the test log4j2.xml using <AsyncLogger name="org.apache.hugegraph" ...>, this logger instance can be an AsyncLogger, so this cast (and addAppender/removeAppender) can fail with ClassCastException or not attach the appender as intended. Consider attaching the TestAppender via LoggerContext/Configuration/LoggerConfig (e.g., config.getLoggerConfig(loggerName).addAppender(...) + ctx.updateLoggers()), and remove it the same way in teardown.

Copilot uses AI. Check for mistakes.
private LoadDetectFilter loadDetectFilter;
private ContainerRequestContext requestContext;
private UriInfo uriInfo;
private WorkLoad workLoad;
private TestAppender testAppender;

@Before
public void setup() {
this.requestContext = Mockito.mock(ContainerRequestContext.class);
this.uriInfo = Mockito.mock(UriInfo.class);
this.workLoad = new WorkLoad();
this.testAppender = new TestAppender();
this.testAppender.start();
TEST_LOGGER.addAppender(this.testAppender);

Mockito.when(this.requestContext.getUriInfo()).thenReturn(this.uriInfo);
Mockito.when(this.requestContext.getMethod()).thenReturn("GET");

this.loadDetectFilter = new TestLoadDetectFilter();
this.setLoadProvider(this.workLoad);
this.setConfigProvider(createConfig(8, 0));
}

@After
public void teardown() {
TEST_LOGGER.removeAppender(this.testAppender);
this.testAppender.stop();
}

@Test
public void testFilter_WhiteListPathIgnored() {
setupPath("", List.of(""));
this.workLoad.incrementAndGet();

this.loadDetectFilter.filter(this.requestContext);

Assert.assertEquals(1, this.workLoad.get().get());
Assert.assertTrue(this.testAppender.events().isEmpty());
}

@Test
public void testFilter_RejectsWhenWorkerLoadIsTooHigh() {
setupPath("graphs/hugegraph/vertices",
List.of("graphs", "hugegraph", "vertices"));
this.setConfigProvider(createConfig(2, 0));
this.workLoad.incrementAndGet();

ServiceUnavailableException exception = (ServiceUnavailableException) Assert.assertThrows(
ServiceUnavailableException.class,
() -> this.loadDetectFilter.filter(this.requestContext));

Assert.assertContains("The server is too busy to process the request",
exception.getMessage());
Assert.assertContains(ServerOptions.MAX_WORKER_THREADS.name(),
exception.getMessage());
Assert.assertEquals(1, this.testAppender.events().size());
this.assertWarnLogContains("Rejected request due to high worker load");
this.assertWarnLogContains("method=GET");
this.assertWarnLogContains("path=graphs/hugegraph/vertices");
this.assertWarnLogContains("currentLoad=2");
}

@Test
public void testFilter_RejectsWhenFreeMemoryIsTooLow() {
setupPath("graphs/hugegraph/vertices",
List.of("graphs", "hugegraph", "vertices"));
this.setConfigProvider(createConfig(8, Integer.MAX_VALUE));
this.setGcTriggered(false);

ServiceUnavailableException exception = (ServiceUnavailableException) Assert.assertThrows(
ServiceUnavailableException.class,
() -> this.loadDetectFilter.filter(this.requestContext));

Assert.assertContains("The server available memory",
exception.getMessage());
Assert.assertContains(ServerOptions.MIN_FREE_MEMORY.name(),
exception.getMessage());
Assert.assertEquals(1, this.testAppender.events().size());
this.assertWarnLogContains("Rejected request due to low free memory");
this.assertWarnLogContains("method=GET");
this.assertWarnLogContains("path=graphs/hugegraph/vertices");
this.assertWarnLogContains("gcTriggered=false");
}

@Test
public void testFilter_AllowsRequestWhenLoadAndMemoryAreHealthy() {
setupPath("graphs/hugegraph/vertices",
List.of("graphs", "hugegraph", "vertices"));
this.setConfigProvider(createConfig(8, 0));

this.loadDetectFilter.filter(this.requestContext);

Assert.assertEquals(1, this.workLoad.get().get());
Assert.assertTrue(this.testAppender.events().isEmpty());
}

@Test
public void testFilter_RejectLogIsRateLimited() {
setupPath("graphs/hugegraph/vertices",
List.of("graphs", "hugegraph", "vertices"));
this.setConfigProvider(createConfig(2, 0));
this.setAllowRejectLogs(true, false);

this.workLoad.incrementAndGet();
Assert.assertThrows(ServiceUnavailableException.class,
() -> this.loadDetectFilter.filter(this.requestContext));

this.workLoad.get().set(1);
Assert.assertThrows(ServiceUnavailableException.class,
() -> this.loadDetectFilter.filter(this.requestContext));

Assert.assertEquals(1, this.testAppender.events().size());
this.assertWarnLogContains("Rejected request due to high worker load");
}

private HugeConfig createConfig(int maxWorkerThreads, int minFreeMemory) {
Configuration conf = new PropertiesConfiguration();
conf.setProperty(ServerOptions.MAX_WORKER_THREADS.name(), maxWorkerThreads);
conf.setProperty(ServerOptions.MIN_FREE_MEMORY.name(), minFreeMemory);
return new HugeConfig(conf);
}

private void setupPath(String path, List<String> segments) {
List<PathSegment> pathSegments = segments.stream()
.map(this::createPathSegment)
.collect(Collectors.toList());
Mockito.when(this.uriInfo.getPath()).thenReturn(path);
Mockito.when(this.uriInfo.getPathSegments()).thenReturn(pathSegments);
}

private PathSegment createPathSegment(String path) {
PathSegment segment = Mockito.mock(PathSegment.class);
Mockito.when(segment.getPath()).thenReturn(path);
return segment;
}

private void setLoadProvider(WorkLoad workLoad) {
Whitebox.setInternalState(this.loadDetectFilter, "loadProvider",
(Provider<WorkLoad>) () -> workLoad);
}

private void setConfigProvider(HugeConfig config) {
Whitebox.setInternalState(this.loadDetectFilter, "configProvider",
(Provider<HugeConfig>) () -> config);
}

private void setGcTriggered(boolean gcTriggered) {
((TestLoadDetectFilter) this.loadDetectFilter).gcTriggered(gcTriggered);
}

private void setAllowRejectLogs(boolean... allowedLogs) {
((TestLoadDetectFilter) this.loadDetectFilter).allowRejectLogs(allowedLogs);
}

private void assertWarnLogContains(String expectedContent) {
Assert.assertFalse(this.testAppender.events().isEmpty());
LogEvent event = this.testAppender.events().get(0);
Assert.assertEquals(Level.WARN, event.getLevel());
Assert.assertContains(expectedContent,
event.getMessage().getFormattedMessage());
}

private static class TestLoadDetectFilter extends LoadDetectFilter {

private boolean gcTriggered;
private final Deque<Boolean> allowRejectLogs = new ArrayDeque<>();

public void gcTriggered(boolean gcTriggered) {
this.gcTriggered = gcTriggered;
}

public void allowRejectLogs(boolean... allowedLogs) {
this.allowRejectLogs.clear();
for (boolean allowedLog : allowedLogs) {
this.allowRejectLogs.addLast(allowedLog);
}
}

@Override
protected boolean gcIfNeeded() {
return this.gcTriggered;
}

@Override
protected boolean allowRejectLog() {
if (this.allowRejectLogs.isEmpty()) {
return true;
}
return this.allowRejectLogs.removeFirst();
}
}

private static class TestAppender extends AbstractAppender {

private final List<LogEvent> events;

protected TestAppender() {
super("LoadDetectFilterTestAppender", (Filter) null,
(Layout<? extends Serializable>) null, false,
Property.EMPTY_ARRAY);
this.events = new ArrayList<>();
}

@Override
public void append(LogEvent event) {
this.events.add(event.toImmutable());
}

public List<LogEvent> events() {
return this.events;
}
}
}
Loading