Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.dotmarketing.util.UtilMethods;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException;
import io.vavr.control.Try;
import java.io.BufferedInputStream;
Expand All @@ -34,12 +33,13 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.comparator.LastModifiedFileComparator;
import org.apache.commons.io.filefilter.DirectoryFileFilter;

Expand All @@ -53,8 +53,13 @@ public class H22Cache extends CacheProvider {
final boolean shouldAsync=Config.getBooleanProperty("cache_h22_async", true);


final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("H22-ASYNC-COMMIT-%d").build();
final private LinkedBlockingQueue<Runnable> asyncTaskQueue = new LinkedBlockingQueue<>();
// Async cache commits run on virtual threads: they block cheaply on JDBC/disk I/O instead of
// pinning a small pool of platform threads. numberOfAsyncThreads now sizes a Semaphore that caps
// how many commits hit the H2/Hikari pool concurrently (embedded H2 writes don't scale linearly),
// while inFlightTasks tracks the total async backlog that isAllocationWithinTolerance() reads.
final ThreadFactory namedThreadFactory = Thread.ofVirtual().name("H22-ASYNC-COMMIT-", 0).factory();
private final Semaphore dbWorkPermits = new Semaphore(Math.max(1, numberOfAsyncThreads), true);
private final AtomicInteger inFlightTasks = new AtomicInteger(0);
private ExecutorService executorService;


Expand Down Expand Up @@ -115,16 +120,10 @@ public boolean isDistributed() {


private ExecutorService spawnNewThreadPool() {

if (Config.getBooleanProperty("cache.h22.async.caller.runs.policy", true)) {
return new ThreadPoolExecutor(numberOfAsyncThreads, numberOfAsyncThreads, 10,
TimeUnit.SECONDS, asyncTaskQueue, namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy()
);
}

return new ThreadPoolExecutor(numberOfAsyncThreads, numberOfAsyncThreads, 10,
TimeUnit.SECONDS, asyncTaskQueue, namedThreadFactory
);
// One virtual thread per submitted commit. Backpressure is handled up-front by shouldAsync()
// (which falls back to running the commit synchronously on the caller), and dbWorkPermits caps
// how many of these virtual threads touch the H2/Hikari pool at the same time.
return Executors.newThreadPerTaskExecutor(namedThreadFactory);
}


Expand Down Expand Up @@ -183,15 +182,43 @@ public void put(final String group, final String key, final Object content) {


void putAsync(final Fqn fqn, final Object content) {

executorService.submit(()-> {
submitAsync(() -> {
try {
// Add the given content to the group and for a given key
doUpsert(fqn, (Serializable) content);
} catch (Exception e) {
handleError(e, fqn);
}
});
});
}

/**
* Submits a cache commit to run on a virtual thread. The {@link #inFlightTasks} counter is bumped
* before submission and cleared in a {@code finally} so {@link #isAllocationWithinTolerance()} sees
* the real backlog, and the task acquires a {@link #dbWorkPermits} permit so only a bounded number
* of commits hit the H2/Hikari pool at once.
*/
private void submitAsync(final Runnable dbTask) {
inFlightTasks.incrementAndGet();
try {
executorService.submit(() -> {
try {
dbWorkPermits.acquire();
try {
dbTask.run();
} finally {
dbWorkPermits.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
inFlightTasks.decrementAndGet();
}
});
} catch (RejectedExecutionException e) {
// Executor is shutting down; keep the counter balanced.
inFlightTasks.decrementAndGet();
}
}


Expand Down Expand Up @@ -291,7 +318,7 @@ public void remove(final String group, final String key) {
* @return
*/
boolean isAllocationWithinTolerance() {
final int size = asyncTaskQueue.size();
final int size = inFlightTasks.get();
final float allocation = (float) size / (float) asyncTaskQueueSize;
Logger.debug(H22Cache.class,
() -> " size is " + size + ", allocation is " + allocation + ", tolerance is :"
Expand All @@ -309,8 +336,7 @@ boolean shouldAsync() {
}

void removeAsync(final Fqn fqn) {

executorService.submit(()-> {
submitAsync(() -> {
try {
// Invalidates from Cache a key from a given group
doDelete(fqn);
Expand Down
Loading