From 061653907482476d902475c28f6075fbc654e113 Mon Sep 17 00:00:00 2001 From: Erik Schultink Date: Fri, 2 Jan 2026 12:34:49 -0500 Subject: [PATCH 1/2] modernize some deps --- java/pom.xml | 13 +- .../txn/PipelineBackendTransactionImpl.java | 117 ++++++++++++------ 2 files changed, 84 insertions(+), 46 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index c3e40039..3f4f8fb8 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -18,9 +18,9 @@ UTF-8 - [2.7, 3.0) + [2.18.2, 2.18.3) 2.55 - 1.18.36 + 1.18.42 [2.0.4, 3.0) @@ -42,6 +42,7 @@ 17 17 + @@ -83,7 +84,7 @@ com.google.cloud libraries-bom - 26.53.0 + 26.73.0 pom import @@ -143,13 +144,13 @@ org.json json - 20231013 + 20251224 it.unimi.dsi fastutil-core - [8.5.1, 8.6.0) + [8.5.15, 8.7.0) org.projectlombok @@ -247,7 +248,7 @@ org.mockito mockito-core - 5.15.2 + 5.21.0 test diff --git a/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java b/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java index 6f9ce650..ae3512ca 100644 --- a/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java +++ b/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java @@ -1,29 +1,44 @@ package com.google.appengine.tools.txn; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.stream.Collectors; import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; import com.google.appengine.tools.pipeline.impl.tasks.PipelineTask; -import com.google.cloud.datastore.*; +import com.google.cloud.datastore.AggregationQuery; +import com.google.cloud.datastore.AggregationResults; +import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.Entity; +import com.google.cloud.datastore.FullEntity; +import com.google.cloud.datastore.Key; +import com.google.cloud.datastore.Query; +import com.google.cloud.datastore.QueryResults; +import com.google.cloud.datastore.Transaction; import com.google.cloud.datastore.models.ExplainOptions; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Multimap; import com.google.protobuf.ByteString; + import lombok.AccessLevel; import lombok.Getter; import lombok.NonNull; import lombok.extern.java.Log; -import java.time.Duration; -import java.time.Instant; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.stream.Collectors; - /** - * Transaction wrapper class that aims to mimic cross-services transactions. In this case datastore-cloud tasks. + * Transaction wrapper class that aims to mimic cross-services transactions. In + * this case datastore-cloud tasks. */ @Log public class PipelineBackendTransactionImpl implements PipelineBackendTransaction { @@ -64,11 +79,13 @@ public PipelineBackendTransactionImpl(@NonNull Datastore datastore, @NonNull Pip /** Transaction interface delegate **/ public Response commit() { - //noinspection unchecked + // noinspection unchecked try { Response dsResponse = getDsTransaction().commit(); - // log.info("commit transaction for " + Arrays.stream(Thread.currentThread().getStackTrace()).toList().get(2)); - // we see more Datastore errors (contention / ) than cloud tasks enqueue errors (barely none) + // log.info("commit transaction for " + + // Arrays.stream(Thread.currentThread().getStackTrace()).toList().get(2)); + // we see more Datastore errors (contention / ) than cloud tasks enqueue errors + // (barely none) // let's only commit if the datastore txn went through taskReferences.addAll(this.commitTasks()); return dsResponse; @@ -76,7 +93,8 @@ public Response commit() { rollbackTasks(); throw t; } finally { - log.log(Level.FINE, String.format("Transaction commit %s- opened for %s", dsTransaction.getTransactionId().toStringUtf8(), stopwatch.elapsed())); + log.log(Level.FINE, String.format("Transaction commit %s- opened for %s", + dsTransaction.getTransactionId().toStringUtf8(), stopwatch.elapsed())); } } @@ -99,7 +117,8 @@ public boolean rollbackIfActive() { log.log(Level.WARNING, "Rollback of transaction failed: ", e); } finally { if (shouldLog) { - log.log(Level.WARNING, String.format("Transaction rollback bc still active - opened for %s", stopwatch.elapsed())); + log.log(Level.WARNING, + String.format("Transaction rollback bc still active - opened for %s", stopwatch.elapsed())); } } return shouldLog; @@ -150,6 +169,16 @@ public QueryResults run(Query query, ExplainOptions explainOptions) { return getDsTransaction().run(query, explainOptions); } + @Override + public AggregationResults runAggregation(AggregationQuery query) { + return getDsTransaction().runAggregation(query); + } + + @Override + public AggregationResults runAggregation(AggregationQuery query, ExplainOptions explainOptions) { + return getDsTransaction().runAggregation(query, explainOptions); + } + @Override public void addWithDeferredIdAllocation(FullEntity... fullEntities) { getDsTransaction().addWithDeferredIdAllocation(fullEntities); @@ -208,32 +237,36 @@ private synchronized Transaction getDsTransaction() { private Collection commitTasks() { if (!pendingTaskSpecsByQueue.isEmpty()) { - //noinspection unchecked + // noinspection unchecked // pipeline specs List taskReferences = new ArrayList<>(); pendingTaskSpecsByQueue.asMap() - .forEach((queue, tasks) -> { - // PoC: we can deal with the delay here prior to commit - Instant fixedNow = Instant.now(); - Collection delayedTasks = tasks.stream() - .map(task -> task.withScheduledExecutionTime(Optional.ofNullable(task.getScheduledExecutionTime()).orElse(fixedNow).plus(ENQUEUE_DELAY_FOR_SAFER_ROLLBACK))) - .collect(Collectors.toSet()); - - if (delayedTasks.size() != tasks.size()) { - HashSet distinctTasks = new HashSet<>(tasks); - List duplicatedTasks = tasks.stream() - .filter(task -> !distinctTasks.add(task)) - .collect(Collectors.toList()); - String message = String.format("Some identical pipeline tasks were enqueued. Duplicates are %s", duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", "))); - if (isCloud) { - log.log(Level.WARNING, message); - } else { - throw new IllegalStateException(String.format("Some identical pipeline tasks were enqueued. Duplicates are %s", duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", ")))); + .forEach((queue, tasks) -> { + // PoC: we can deal with the delay here prior to commit + Instant fixedNow = Instant.now(); + Collection delayedTasks = tasks.stream() + .map(task -> task.withScheduledExecutionTime(Optional.ofNullable(task.getScheduledExecutionTime()) + .orElse(fixedNow).plus(ENQUEUE_DELAY_FOR_SAFER_ROLLBACK))) + .collect(Collectors.toSet()); + + if (delayedTasks.size() != tasks.size()) { + HashSet distinctTasks = new HashSet<>(tasks); + List duplicatedTasks = tasks.stream() + .filter(task -> !distinctTasks.add(task)) + .collect(Collectors.toList()); + String message = String.format("Some identical pipeline tasks were enqueued. Duplicates are %s", + duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", "))); + if (isCloud) { + log.log(Level.WARNING, message); + } else { + throw new IllegalStateException( + String.format("Some identical pipeline tasks were enqueued. Duplicates are %s", + duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", ")))); + } } - } - taskReferences.addAll(taskQueue.enqueue(queue, delayedTasks)); - }); + taskReferences.addAll(taskQueue.enqueue(queue, delayedTasks)); + }); pendingTaskSpecsByQueue.clear(); return taskReferences; } else { @@ -242,16 +275,18 @@ private Collection commitTasks() { } private void rollbackTasks() { - // two cases here that should be mutually exclusive, but deal together for simplicity: + // two cases here that should be mutually exclusive, but deal together for + // simplicity: // 1. if it was never enqueued, just clear the tasks if (!pendingTaskSpecsByQueue.isEmpty()) { - log.log(Level.WARNING, String.format("Rollback never enqueued %d tasks", pendingTaskSpecsByQueue.asMap().values().stream().map(Collection::size).reduce(Integer::sum).orElse(-1))); + log.log(Level.WARNING, String.format("Rollback never enqueued %d tasks", + pendingTaskSpecsByQueue.asMap().values().stream().map(Collection::size).reduce(Integer::sum).orElse(-1))); pendingTaskSpecsByQueue.clear(); } // 2. if anything was enqueued, delete it, if (!taskReferences.isEmpty()) { log.log(Level.WARNING, String.format("Rollback already enqueued %d tasks: %s", taskReferences.size(), - taskReferences.stream().map(PipelineTaskQueue.TaskReference::getTaskName).collect(Collectors.joining(",")))); + taskReferences.stream().map(PipelineTaskQueue.TaskReference::getTaskName).collect(Collectors.joining(",")))); taskQueue.deleteTasks(taskReferences); taskReferences.clear(); } @@ -266,8 +301,10 @@ private void rollbackAllServices() { protected void finalize() throws Throwable { try { if (this.getDsTransaction().isActive()) { - // shouldn't happen, unless opening tnx just for read, just is kind of absurd in a strong consistency model - log.log(Level.WARNING, String.format("Finalizing PipelineBackendTransactionImpl transaction open for %s", stopwatch.elapsed(TimeUnit.MILLISECONDS))); + // shouldn't happen, unless opening tnx just for read, just is kind of absurd in + // a strong consistency model + log.log(Level.WARNING, String.format("Finalizing PipelineBackendTransactionImpl transaction open for %s", + stopwatch.elapsed(TimeUnit.MILLISECONDS))); } } finally { super.finalize(); From a0e95a812be64d065f53a318d6b85e7fdcf4442b Mon Sep 17 00:00:00 2001 From: Erik Schultink Date: Sat, 3 Jan 2026 12:19:41 -0500 Subject: [PATCH 2/2] add vscode settings; cursor/antigravity should also respect these --- .vscode/settings.json | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..d53ecaf3 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "java.compile.nullAnalysis.mode": "automatic", + "java.configuration.updateBuildConfiguration": "automatic" +} \ No newline at end of file