Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# App Engine Pipelines

This project is an advanced, modernized implementation of Google App Engine Pipelines. Originally an execution framework to run complex, multi-step asynchronous algorithms on App Engine, this fork extends the concepts and capabilities to be compatible with newer Java versions, modern GCP Datastore API clients (Cloud Datastore / Firestore in Datastore mode), and Cloud Tasks.

## Project Architecture
- **Datastore Client**: Uses modern `google-cloud-datastore` APIs rather than the legacy `appengine-api-1.0-sdk`.
- **Task Queues**: Abstracts task enqueuing to work across `AppEngineTaskQueue` (legacy standard App Engine push queues) and `CloudTasksTaskQueue` (modern Google Cloud Tasks).
- **Settings Propagation**: Inherits pipeline settings such as retry counts, worker services, worker versions, and custom Datastore boundaries (Namespace and Database ID) down to sub-jobs and async worker callbacks.

## Development Rules
- When modifying datastore interactions, always ensure you respect potential overrides for `databaseId` and `namespace`, which are typically configured at the `JobSetting` level.
- When passing parameters to new Pipeline Tasks, leverage `PipelineTask.toProperties()` and augment `QueueSettings` if those parameters must be inherited across jobs.
- Validate Cloud Datastore constraints carefully (e.g., namespace regex restrictions).
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@

package com.google.appengine.tools.pipeline;

import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import java.io.Serial;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* A setting for specifying to the framework some aspect of a Job's execution.
Expand Down Expand Up @@ -77,7 +75,7 @@ abstract class StringValuedSetting implements JobSetting {
@Serial
private static final long serialVersionUID = 7756646651569386669L;

//NOTE: behavior of Pipeline Framework allows this to be null for some settings
// NOTE: behavior of Pipeline Framework allows this to be null for some settings
// (tests verify this)
private final String value;

Expand Down Expand Up @@ -179,8 +177,9 @@ public OnService(String service) {
*/
final class OnServiceVersion extends StringValuedSetting {

@Serial
@Serial
private static final long serialVersionUID = 3877411731586475273L;

public OnServiceVersion(String version) {
super(version);
}
Expand Down Expand Up @@ -212,20 +211,49 @@ public StatusConsoleUrl(String statusConsoleUrl) {
}
}

/**
* A setting for specifying the datastore database to use for this job;
* otherwise will be the default datastore database.
*
* q: do we want to allow pipelines to mix datastore databases?
*
*/
final class DatastoreDatabase extends StringValuedSetting {
@Serial
private static final long serialVersionUID = -1L;

public DatastoreDatabase(String datastoreDatabase) {
super(datastoreDatabase);
if (datastoreDatabase != null && !datastoreDatabase.isEmpty() && !datastoreDatabase.equals("(default)")) {
if (!datastoreDatabase.matches("^[a-z][a-z0-9-]{1,61}[a-z0-9]$")) {
throw new IllegalArgumentException("Invalid Datastore database ID: " + datastoreDatabase);
}
}
}
}

/**
* A setting for specifying the datastore namespace to use for this job;
* otherwise will be the default datastore namespace.
*/
final class DatastoreNamespace extends StringValuedSetting {
@Serial
private static final long serialVersionUID = -1L;

public DatastoreNamespace(String datastoreNameSpace) {
super(datastoreNameSpace);
if (datastoreNameSpace != null) {
if (!datastoreNameSpace.matches("^[0-9A-Za-z._-]{0,100}$")) {
throw new IllegalArgumentException("Invalid Datastore namespace: " + datastoreNameSpace);
}
}
}
}


static <E extends StringValuedSetting> Optional<String> getSettingValue(Class<E> clazz, JobSetting[] settings) {
return Arrays.stream(settings)
.filter( s -> s.getClass().isAssignableFrom(clazz))
.findAny()
.map(s -> ((StringValuedSetting) s).getValue());
.filter(s -> s.getClass().isAssignableFrom(clazz))
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getSettingValue() uses s.getClass().isAssignableFrom(clazz), which is the reverse of the usual assignability check. This will fail to match if a setting instance is a subclass of clazz (or if clazz is an interface/supertype you want to match broadly). Use clazz.isAssignableFrom(s.getClass()) (or clazz.isInstance(s)) to correctly detect matching settings.

Suggested change
.filter(s -> s.getClass().isAssignableFrom(clazz))
.filter(clazz::isInstance)

Copilot uses AI. Check for mistakes.
.findAny()
.map(s -> ((StringValuedSetting) s).getValue());
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.google.appengine.tools.pipeline.impl;

import lombok.*;

import javax.annotation.Nullable;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
* settings for how to asynchronously execute a task via a queue
*
Expand All @@ -12,7 +17,8 @@
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Getter @Setter
@Getter
@Setter
@ToString
public final class QueueSettings implements Cloneable {

Expand All @@ -33,12 +39,26 @@ public final class QueueSettings implements Cloneable {
private String onQueue;

/**
* delay in seconds to set when enqueueing the task (eg, should not execute until *at least* this much time has passed
* delay in seconds to set when enqueueing the task (eg, should not execute
* until *at least* this much time has passed
Comment on lines +42 to +43
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc for delayInSeconds opens a parenthesis (“(eg, …”) but doesn’t close it, which reads like a truncated sentence. Consider closing the parenthesis or rephrasing into a full sentence for clarity.

Suggested change
* delay in seconds to set when enqueueing the task (eg, should not execute
* until *at least* this much time has passed
* Delay in seconds to set when enqueueing the task (for example, the task should not execute
* until at least this much time has passed).

Copilot uses AI. Check for mistakes.
*/
private Long delayInSeconds;

/**
* Merge will override any {@code null} setting with a matching setting from {@code other}.
* datastore database ID to propagate
*/
@Nullable
private String databaseId;

/**
* datastore namespace to propagate
*/
@Nullable
private String namespace;

/**
* Merge will override any {@code null} setting with a matching setting from
* {@code other}.
* Note, delay value is not being merged.
*/
public QueueSettings merge(QueueSettings other) {
Expand All @@ -49,6 +69,12 @@ public QueueSettings merge(QueueSettings other) {
if (onQueue == null) {
onQueue = other.getOnQueue();
}
if (databaseId == null) {
databaseId = other.getDatabaseId();
}
if (namespace == null) {
namespace = other.getNamespace();
}
return this;
}

Expand Down
Loading