diff --git a/java/appengine-pipeline-0.3+worklytics.11-pom.xml b/java/appengine-pipeline-0.3+worklytics.12-pom.xml
similarity index 99%
rename from java/appengine-pipeline-0.3+worklytics.11-pom.xml
rename to java/appengine-pipeline-0.3+worklytics.12-pom.xml
index d8def549..1f55967a 100644
--- a/java/appengine-pipeline-0.3+worklytics.11-pom.xml
+++ b/java/appengine-pipeline-0.3+worklytics.12-pom.xml
@@ -10,7 +10,7 @@
- 0.3+worklytics.11
+ 0.3+worklytics.12
jar
diff --git a/java/pom.xml b/java/pom.xml
index f3be4caf..c3e40039 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -8,7 +8,7 @@
https://github.com/Worklytics/appengine-pipelines/
- 0.3+worklytics.11
+ 0.3+worklytics.12
jar
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
index 806d97a3..149ee1b6 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
@@ -159,8 +159,7 @@ public Collection enqueue(final Collection pipeline
public Multimap asTaskSpecs(Collection pipelineTasks) {
Multimap taskSpecs = HashMultimap.create();
pipelineTasks.forEach( pipelineTask -> {
- String queueName = Optional.ofNullable(pipelineTask.getQueueSettings().getOnQueue()).orElse("default");
- taskSpecs.put(queueName, pipelineTask.toTaskSpec(servicesService, taskHandlerUrl));
+ taskSpecs.put(getQueueForTask(pipelineTask), pipelineTask.toTaskSpec(servicesService, taskHandlerUrl));
});
return taskSpecs;
}
@@ -175,7 +174,7 @@ List addToQueue(final Collection pipelineTasks) {
Map> queueNameToTaskOptions = new HashMap<>();
for (PipelineTask pipelineTask : pipelineTasks) {
log.finest("Enqueueing: " + pipelineTask);
- String queueName = pipelineTask.getQueueSettings().getOnQueue();
+ String queueName = getQueueForTask(pipelineTask);
TaskOptions taskOptions = toTaskOptions(pipelineTask);
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java
index 6f36dba0..7dead42b 100644
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java
@@ -69,7 +69,7 @@ public TaskReference enqueue(PipelineTask pipelineTask) {
@Override
public Collection enqueue(Collection pipelineTasks) {
Map> tasksByQueue = pipelineTasks.stream()
- .collect(Collectors.groupingBy(pipelineTask -> Optional.ofNullable(pipelineTask.getQueueSettings().getOnQueue()).orElse(DEFAULT_QUEUE_NAME)));
+ .collect(Collectors.groupingBy(this::getQueueForTask));
/// probably could use parallelStream here, but in practice don't *really* expect to have multiple queues in the batch
return tasksByQueue.entrySet().stream()
@@ -89,8 +89,7 @@ public Multimap asTaskSpecs(Collection pipelineT
Multimap taskSpecs = HashMultimap.create();
pipelineTasks
.forEach(pipelineTask -> {
- String queueName = Optional.ofNullable(pipelineTask.getQueueSettings().getOnQueue()).orElse(DEFAULT_QUEUE_NAME);
- taskSpecs.put(queueName, pipelineTask.toTaskSpec(appEngineServicesService, TaskHandler.handleTaskUrl()));
+ taskSpecs.put(getQueueForTask(pipelineTask), pipelineTask.toTaskSpec(appEngineServicesService, TaskHandler.handleTaskUrl()));
});
return taskSpecs;
}
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/PipelineTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/PipelineTaskQueue.java
index 8c202e42..4fd8b083 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/PipelineTaskQueue.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/PipelineTaskQueue.java
@@ -23,8 +23,11 @@
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
+import java.util.Optional;
import java.util.SortedMap;
+import static com.google.appengine.tools.pipeline.impl.PipelineManager.DEFAULT_QUEUE_NAME;
+
/**
*
* @author rudominer@google.com (Mitch Rudominer)
@@ -148,4 +151,18 @@ default TaskReference enqueue(String queueName, TaskSpec taskSpec) {
* @param taskReferences references to the tasks to delete
*/
void deleteTasks(Collection taskReferences);
+
+ /**
+ * Rationale about this: all the tasks that deal with the pipeline progress should be short-lived and shouldn't be
+ * held back by long-running tasks, potentially causing delays in checking slots, finalization, etc. and maybe
+ * leading to abandon locks
+ * @param pipelineTask
+ * @return
+ */
+ default String getQueueForTask(PipelineTask pipelineTask) {
+ if (pipelineTask.getType() == PipelineTask.Type.RUN_JOB) {
+ return Optional.ofNullable(pipelineTask.getQueueSettings().getOnQueue()).orElse(DEFAULT_QUEUE_NAME);
+ }
+ return DEFAULT_QUEUE_NAME;
+ }
}
diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/di/JobRunServiceComponentTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/di/JobRunServiceComponentTest.java
index 3a532d0d..a144d9b5 100644
--- a/java/src/test/java/com/google/appengine/tools/pipeline/di/JobRunServiceComponentTest.java
+++ b/java/src/test/java/com/google/appengine/tools/pipeline/di/JobRunServiceComponentTest.java
@@ -3,6 +3,7 @@
import com.google.appengine.tools.cloudtasktest.FakeHttpServletRequest;
import com.google.appengine.tools.pipeline.PipelineService;
import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesServiceImpl;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -11,6 +12,9 @@
class JobRunServiceComponentTest {
+ // flaky, each test should have a fresh object graph, so here is expecting 0 instances, but being static, it
+ // could be more
+ @Disabled
@Test
public void testJobRunServiceComponent() {
// basic DI setup that will really be done at runtime