From 452772daf3fea91256372e41de26e224c9b34d62 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Fri, 9 May 2025 11:34:28 -0700 Subject: [PATCH 1/4] Only do heavy work on the queue, pipeline state handling should be processed quick --- .../impl/backend/AppEngineTaskQueue.java | 5 ++--- .../impl/backend/CloudTasksTaskQueue.java | 5 ++--- .../impl/backend/PipelineTaskQueue.java | 17 +++++++++++++++++ 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java index 806d97a3..149ee1b6 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java @@ -159,8 +159,7 @@ public Collection enqueue(final Collection pipeline public Multimap asTaskSpecs(Collection pipelineTasks) { Multimap taskSpecs = HashMultimap.create(); pipelineTasks.forEach( pipelineTask -> { - String queueName = Optional.ofNullable(pipelineTask.getQueueSettings().getOnQueue()).orElse("default"); - taskSpecs.put(queueName, pipelineTask.toTaskSpec(servicesService, taskHandlerUrl)); + taskSpecs.put(getQueueForTask(pipelineTask), pipelineTask.toTaskSpec(servicesService, taskHandlerUrl)); }); return taskSpecs; } @@ -175,7 +174,7 @@ List addToQueue(final Collection pipelineTasks) { Map> queueNameToTaskOptions = new HashMap<>(); for (PipelineTask pipelineTask : pipelineTasks) { log.finest("Enqueueing: " + pipelineTask); - String queueName = pipelineTask.getQueueSettings().getOnQueue(); + String queueName = getQueueForTask(pipelineTask); TaskOptions taskOptions = toTaskOptions(pipelineTask); diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java index 6f36dba0..7dead42b 100644 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java @@ -69,7 +69,7 @@ public TaskReference enqueue(PipelineTask pipelineTask) { @Override public Collection enqueue(Collection pipelineTasks) { Map> tasksByQueue = pipelineTasks.stream() - .collect(Collectors.groupingBy(pipelineTask -> Optional.ofNullable(pipelineTask.getQueueSettings().getOnQueue()).orElse(DEFAULT_QUEUE_NAME))); + .collect(Collectors.groupingBy(this::getQueueForTask)); /// probably could use parallelStream here, but in practice don't *really* expect to have multiple queues in the batch return tasksByQueue.entrySet().stream() @@ -89,8 +89,7 @@ public Multimap asTaskSpecs(Collection pipelineT Multimap taskSpecs = HashMultimap.create(); pipelineTasks .forEach(pipelineTask -> { - String queueName = Optional.ofNullable(pipelineTask.getQueueSettings().getOnQueue()).orElse(DEFAULT_QUEUE_NAME); - taskSpecs.put(queueName, pipelineTask.toTaskSpec(appEngineServicesService, TaskHandler.handleTaskUrl())); + taskSpecs.put(getQueueForTask(pipelineTask), pipelineTask.toTaskSpec(appEngineServicesService, TaskHandler.handleTaskUrl())); }); return taskSpecs; } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/PipelineTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/PipelineTaskQueue.java index 8c202e42..4fd8b083 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/PipelineTaskQueue.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/PipelineTaskQueue.java @@ -23,8 +23,11 @@ import java.time.Instant; import java.util.Collection; import java.util.Collections; +import java.util.Optional; import java.util.SortedMap; +import static com.google.appengine.tools.pipeline.impl.PipelineManager.DEFAULT_QUEUE_NAME; + /** * * @author rudominer@google.com (Mitch Rudominer) @@ -148,4 +151,18 @@ default TaskReference enqueue(String queueName, TaskSpec taskSpec) { * @param taskReferences references to the tasks to delete */ void deleteTasks(Collection taskReferences); + + /** + * Rationale about this: all the tasks that deal with the pipeline progress should be short-lived and shouldn't be + * held back by long-running tasks, potentially causing delays in checking slots, finalization, etc. and maybe + * leading to abandon locks + * @param pipelineTask + * @return + */ + default String getQueueForTask(PipelineTask pipelineTask) { + if (pipelineTask.getType() == PipelineTask.Type.RUN_JOB) { + return Optional.ofNullable(pipelineTask.getQueueSettings().getOnQueue()).orElse(DEFAULT_QUEUE_NAME); + } + return DEFAULT_QUEUE_NAME; + } } From bc29614b789d51a69c1a0a2ac8a2f5d7121a444b Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Fri, 9 May 2025 11:34:54 -0700 Subject: [PATCH 2/4] Update pom --- java/appengine-pipeline-0.3+worklytics.11-pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/appengine-pipeline-0.3+worklytics.11-pom.xml b/java/appengine-pipeline-0.3+worklytics.11-pom.xml index d8def549..1f55967a 100644 --- a/java/appengine-pipeline-0.3+worklytics.11-pom.xml +++ b/java/appengine-pipeline-0.3+worklytics.11-pom.xml @@ -10,7 +10,7 @@ - 0.3+worklytics.11 + 0.3+worklytics.12 jar From a9cc06a45fa860c7cbc0d8bf6b89b8d17f9636ee Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Fri, 9 May 2025 12:03:44 -0700 Subject: [PATCH 3/4] Update version to .12 --- ....11-pom.xml => appengine-pipeline-0.3+worklytics.12-pom.xml} | 0 java/pom.xml | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename java/{appengine-pipeline-0.3+worklytics.11-pom.xml => appengine-pipeline-0.3+worklytics.12-pom.xml} (100%) diff --git a/java/appengine-pipeline-0.3+worklytics.11-pom.xml b/java/appengine-pipeline-0.3+worklytics.12-pom.xml similarity index 100% rename from java/appengine-pipeline-0.3+worklytics.11-pom.xml rename to java/appengine-pipeline-0.3+worklytics.12-pom.xml diff --git a/java/pom.xml b/java/pom.xml index f3be4caf..c3e40039 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -8,7 +8,7 @@ https://github.com/Worklytics/appengine-pipelines/ - 0.3+worklytics.11 + 0.3+worklytics.12 jar From 5d19766c2314604bc67f75a7431d51cdd6dae6c5 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Fri, 9 May 2025 13:54:24 -0700 Subject: [PATCH 4/4] Disable flaky test --- .../tools/pipeline/di/JobRunServiceComponentTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/di/JobRunServiceComponentTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/di/JobRunServiceComponentTest.java index 3a532d0d..a144d9b5 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/di/JobRunServiceComponentTest.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/di/JobRunServiceComponentTest.java @@ -3,6 +3,7 @@ import com.google.appengine.tools.cloudtasktest.FakeHttpServletRequest; import com.google.appengine.tools.pipeline.PipelineService; import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesServiceImpl; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -11,6 +12,9 @@ class JobRunServiceComponentTest { + // flaky, each test should have a fresh object graph, so here is expecting 0 instances, but being static, it + // could be more + @Disabled @Test public void testJobRunServiceComponent() { // basic DI setup that will really be done at runtime