diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 00000000..d53ecaf3
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,4 @@
+{
+ "java.compile.nullAnalysis.mode": "automatic",
+ "java.configuration.updateBuildConfiguration": "automatic"
+}
\ No newline at end of file
diff --git a/java/pom.xml b/java/pom.xml
index c3e40039..3f4f8fb8 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -18,9 +18,9 @@
UTF-8
- [2.7, 3.0)
+ [2.18.2, 2.18.3)
2.55
- 1.18.36
+ 1.18.42
[2.0.4, 3.0)
@@ -42,6 +42,7 @@
17
17
+
@@ -83,7 +84,7 @@
com.google.cloud
libraries-bom
- 26.53.0
+ 26.73.0
pom
import
@@ -143,13 +144,13 @@
org.json
json
- 20231013
+ 20251224
it.unimi.dsi
fastutil-core
- [8.5.1, 8.6.0)
+ [8.5.15, 8.7.0)
org.projectlombok
@@ -247,7 +248,7 @@
org.mockito
mockito-core
- 5.15.2
+ 5.21.0
test
diff --git a/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java b/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java
index 6f9ce650..ae3512ca 100644
--- a/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java
+++ b/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java
@@ -1,29 +1,44 @@
package com.google.appengine.tools.txn;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.stream.Collectors;
import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue;
import com.google.appengine.tools.pipeline.impl.tasks.PipelineTask;
-import com.google.cloud.datastore.*;
+import com.google.cloud.datastore.AggregationQuery;
+import com.google.cloud.datastore.AggregationResults;
+import com.google.cloud.datastore.Datastore;
+import com.google.cloud.datastore.Entity;
+import com.google.cloud.datastore.FullEntity;
+import com.google.cloud.datastore.Key;
+import com.google.cloud.datastore.Query;
+import com.google.cloud.datastore.QueryResults;
+import com.google.cloud.datastore.Transaction;
import com.google.cloud.datastore.models.ExplainOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import com.google.protobuf.ByteString;
+
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.java.Log;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.stream.Collectors;
-
/**
- * Transaction wrapper class that aims to mimic cross-services transactions. In this case datastore-cloud tasks.
+ * Transaction wrapper class that aims to mimic cross-services transactions. In
+ * this case datastore-cloud tasks.
*/
@Log
public class PipelineBackendTransactionImpl implements PipelineBackendTransaction {
@@ -64,11 +79,13 @@ public PipelineBackendTransactionImpl(@NonNull Datastore datastore, @NonNull Pip
/** Transaction interface delegate **/
public Response commit() {
- //noinspection unchecked
+ // noinspection unchecked
try {
Response dsResponse = getDsTransaction().commit();
- // log.info("commit transaction for " + Arrays.stream(Thread.currentThread().getStackTrace()).toList().get(2));
- // we see more Datastore errors (contention / ) than cloud tasks enqueue errors (barely none)
+ // log.info("commit transaction for " +
+ // Arrays.stream(Thread.currentThread().getStackTrace()).toList().get(2));
+ // we see more Datastore errors (contention / ) than cloud tasks enqueue errors
+ // (barely none)
// let's only commit if the datastore txn went through
taskReferences.addAll(this.commitTasks());
return dsResponse;
@@ -76,7 +93,8 @@ public Response commit() {
rollbackTasks();
throw t;
} finally {
- log.log(Level.FINE, String.format("Transaction commit %s- opened for %s", dsTransaction.getTransactionId().toStringUtf8(), stopwatch.elapsed()));
+ log.log(Level.FINE, String.format("Transaction commit %s- opened for %s",
+ dsTransaction.getTransactionId().toStringUtf8(), stopwatch.elapsed()));
}
}
@@ -99,7 +117,8 @@ public boolean rollbackIfActive() {
log.log(Level.WARNING, "Rollback of transaction failed: ", e);
} finally {
if (shouldLog) {
- log.log(Level.WARNING, String.format("Transaction rollback bc still active - opened for %s", stopwatch.elapsed()));
+ log.log(Level.WARNING,
+ String.format("Transaction rollback bc still active - opened for %s", stopwatch.elapsed()));
}
}
return shouldLog;
@@ -150,6 +169,16 @@ public QueryResults run(Query query, ExplainOptions explainOptions) {
return getDsTransaction().run(query, explainOptions);
}
+ @Override
+ public AggregationResults runAggregation(AggregationQuery query) {
+ return getDsTransaction().runAggregation(query);
+ }
+
+ @Override
+ public AggregationResults runAggregation(AggregationQuery query, ExplainOptions explainOptions) {
+ return getDsTransaction().runAggregation(query, explainOptions);
+ }
+
@Override
public void addWithDeferredIdAllocation(FullEntity>... fullEntities) {
getDsTransaction().addWithDeferredIdAllocation(fullEntities);
@@ -208,32 +237,36 @@ private synchronized Transaction getDsTransaction() {
private Collection commitTasks() {
if (!pendingTaskSpecsByQueue.isEmpty()) {
- //noinspection unchecked
+ // noinspection unchecked
// pipeline specs
List taskReferences = new ArrayList<>();
pendingTaskSpecsByQueue.asMap()
- .forEach((queue, tasks) -> {
- // PoC: we can deal with the delay here prior to commit
- Instant fixedNow = Instant.now();
- Collection delayedTasks = tasks.stream()
- .map(task -> task.withScheduledExecutionTime(Optional.ofNullable(task.getScheduledExecutionTime()).orElse(fixedNow).plus(ENQUEUE_DELAY_FOR_SAFER_ROLLBACK)))
- .collect(Collectors.toSet());
-
- if (delayedTasks.size() != tasks.size()) {
- HashSet distinctTasks = new HashSet<>(tasks);
- List duplicatedTasks = tasks.stream()
- .filter(task -> !distinctTasks.add(task))
- .collect(Collectors.toList());
- String message = String.format("Some identical pipeline tasks were enqueued. Duplicates are %s", duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", ")));
- if (isCloud) {
- log.log(Level.WARNING, message);
- } else {
- throw new IllegalStateException(String.format("Some identical pipeline tasks were enqueued. Duplicates are %s", duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", "))));
+ .forEach((queue, tasks) -> {
+ // PoC: we can deal with the delay here prior to commit
+ Instant fixedNow = Instant.now();
+ Collection delayedTasks = tasks.stream()
+ .map(task -> task.withScheduledExecutionTime(Optional.ofNullable(task.getScheduledExecutionTime())
+ .orElse(fixedNow).plus(ENQUEUE_DELAY_FOR_SAFER_ROLLBACK)))
+ .collect(Collectors.toSet());
+
+ if (delayedTasks.size() != tasks.size()) {
+ HashSet distinctTasks = new HashSet<>(tasks);
+ List duplicatedTasks = tasks.stream()
+ .filter(task -> !distinctTasks.add(task))
+ .collect(Collectors.toList());
+ String message = String.format("Some identical pipeline tasks were enqueued. Duplicates are %s",
+ duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", ")));
+ if (isCloud) {
+ log.log(Level.WARNING, message);
+ } else {
+ throw new IllegalStateException(
+ String.format("Some identical pipeline tasks were enqueued. Duplicates are %s",
+ duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", "))));
+ }
}
- }
- taskReferences.addAll(taskQueue.enqueue(queue, delayedTasks));
- });
+ taskReferences.addAll(taskQueue.enqueue(queue, delayedTasks));
+ });
pendingTaskSpecsByQueue.clear();
return taskReferences;
} else {
@@ -242,16 +275,18 @@ private Collection commitTasks() {
}
private void rollbackTasks() {
- // two cases here that should be mutually exclusive, but deal together for simplicity:
+ // two cases here that should be mutually exclusive, but deal together for
+ // simplicity:
// 1. if it was never enqueued, just clear the tasks
if (!pendingTaskSpecsByQueue.isEmpty()) {
- log.log(Level.WARNING, String.format("Rollback never enqueued %d tasks", pendingTaskSpecsByQueue.asMap().values().stream().map(Collection::size).reduce(Integer::sum).orElse(-1)));
+ log.log(Level.WARNING, String.format("Rollback never enqueued %d tasks",
+ pendingTaskSpecsByQueue.asMap().values().stream().map(Collection::size).reduce(Integer::sum).orElse(-1)));
pendingTaskSpecsByQueue.clear();
}
// 2. if anything was enqueued, delete it,
if (!taskReferences.isEmpty()) {
log.log(Level.WARNING, String.format("Rollback already enqueued %d tasks: %s", taskReferences.size(),
- taskReferences.stream().map(PipelineTaskQueue.TaskReference::getTaskName).collect(Collectors.joining(","))));
+ taskReferences.stream().map(PipelineTaskQueue.TaskReference::getTaskName).collect(Collectors.joining(","))));
taskQueue.deleteTasks(taskReferences);
taskReferences.clear();
}
@@ -266,8 +301,10 @@ private void rollbackAllServices() {
protected void finalize() throws Throwable {
try {
if (this.getDsTransaction().isActive()) {
- // shouldn't happen, unless opening tnx just for read, just is kind of absurd in a strong consistency model
- log.log(Level.WARNING, String.format("Finalizing PipelineBackendTransactionImpl transaction open for %s", stopwatch.elapsed(TimeUnit.MILLISECONDS)));
+ // shouldn't happen, unless opening tnx just for read, just is kind of absurd in
+ // a strong consistency model
+ log.log(Level.WARNING, String.format("Finalizing PipelineBackendTransactionImpl transaction open for %s",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS)));
}
} finally {
super.finalize();