diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..7dc66302 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,13 @@ +# App Engine Pipelines + +This project is an advanced, modernized implementation of Google App Engine Pipelines. Originally an execution framework to run complex, multi-step asynchronous algorithms on App Engine, this fork extends the concepts and capabilities to be compatible with newer Java versions, modern GCP Datastore API clients (Cloud Datastore / Firestore in Datastore mode), and Cloud Tasks. + +## Project Architecture +- **Datastore Client**: Uses modern `google-cloud-datastore` APIs rather than the legacy `appengine-api-1.0-sdk`. +- **Task Queues**: Abstracts task enqueuing to work across `AppEngineTaskQueue` (legacy standard App Engine push queues) and `CloudTasksTaskQueue` (modern Google Cloud Tasks). +- **Settings Propagation**: Inherits pipeline settings such as retry counts, worker services, worker versions, and custom Datastore boundaries (Namespace and Database ID) down to sub-jobs and async worker callbacks. + +## Development Rules +- When modifying datastore interactions, always ensure you respect potential overrides for `databaseId` and `namespace`, which are typically configured at the `JobSetting` level. +- When passing parameters to new Pipeline Tasks, leverage `PipelineTask.toProperties()` and augment `QueueSettings` if those parameters must be inherited across jobs. +- Validate Cloud Datastore constraints carefully (e.g., namespace regex restrictions). diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/JobSetting.java b/java/src/main/java/com/google/appengine/tools/pipeline/JobSetting.java index 1e60a406..6702a9dd 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/JobSetting.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/JobSetting.java @@ -14,15 +14,13 @@ package com.google.appengine.tools.pipeline; -import lombok.Getter; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; - import java.io.Serial; import java.io.Serializable; import java.util.Arrays; import java.util.Optional; +import lombok.Getter; +import lombok.RequiredArgsConstructor; /** * A setting for specifying to the framework some aspect of a Job's execution. @@ -77,7 +75,7 @@ abstract class StringValuedSetting implements JobSetting { @Serial private static final long serialVersionUID = 7756646651569386669L; - //NOTE: behavior of Pipeline Framework allows this to be null for some settings + // NOTE: behavior of Pipeline Framework allows this to be null for some settings // (tests verify this) private final String value; @@ -179,8 +177,9 @@ public OnService(String service) { */ final class OnServiceVersion extends StringValuedSetting { - @Serial + @Serial private static final long serialVersionUID = 3877411731586475273L; + public OnServiceVersion(String version) { super(version); } @@ -212,20 +211,49 @@ public StatusConsoleUrl(String statusConsoleUrl) { } } + /** + * A setting for specifying the datastore database to use for this job; + * otherwise will be the default datastore database. + * + * q: do we want to allow pipelines to mix datastore databases? + * + */ + final class DatastoreDatabase extends StringValuedSetting { + @Serial + private static final long serialVersionUID = -1L; + + public DatastoreDatabase(String datastoreDatabase) { + super(datastoreDatabase); + if (datastoreDatabase != null && !datastoreDatabase.isEmpty() && !datastoreDatabase.equals("(default)")) { + if (!datastoreDatabase.matches("^[a-z][a-z0-9-]{1,61}[a-z0-9]$")) { + throw new IllegalArgumentException("Invalid Datastore database ID: " + datastoreDatabase); + } + } + } + } + + /** + * A setting for specifying the datastore namespace to use for this job; + * otherwise will be the default datastore namespace. + */ final class DatastoreNamespace extends StringValuedSetting { @Serial private static final long serialVersionUID = -1L; public DatastoreNamespace(String datastoreNameSpace) { super(datastoreNameSpace); + if (datastoreNameSpace != null) { + if (!datastoreNameSpace.matches("^[0-9A-Za-z._-]{0,100}$")) { + throw new IllegalArgumentException("Invalid Datastore namespace: " + datastoreNameSpace); + } + } } } - static Optional getSettingValue(Class clazz, JobSetting[] settings) { return Arrays.stream(settings) - .filter( s -> s.getClass().isAssignableFrom(clazz)) - .findAny() - .map(s -> ((StringValuedSetting) s).getValue()); + .filter(s -> s.getClass().isAssignableFrom(clazz)) + .findAny() + .map(s -> ((StringValuedSetting) s).getValue()); } } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/QueueSettings.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/QueueSettings.java index db18805b..3e57101e 100644 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/QueueSettings.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/QueueSettings.java @@ -1,9 +1,14 @@ package com.google.appengine.tools.pipeline.impl; -import lombok.*; - import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + /** * settings for how to asynchronously execute a task via a queue * @@ -12,7 +17,8 @@ @AllArgsConstructor @NoArgsConstructor @Builder -@Getter @Setter +@Getter +@Setter @ToString public final class QueueSettings implements Cloneable { @@ -33,12 +39,26 @@ public final class QueueSettings implements Cloneable { private String onQueue; /** - * delay in seconds to set when enqueueing the task (eg, should not execute until *at least* this much time has passed + * delay in seconds to set when enqueueing the task (eg, should not execute + * until *at least* this much time has passed */ private Long delayInSeconds; /** - * Merge will override any {@code null} setting with a matching setting from {@code other}. + * datastore database ID to propagate + */ + @Nullable + private String databaseId; + + /** + * datastore namespace to propagate + */ + @Nullable + private String namespace; + + /** + * Merge will override any {@code null} setting with a matching setting from + * {@code other}. * Note, delay value is not being merged. */ public QueueSettings merge(QueueSettings other) { @@ -49,6 +69,12 @@ public QueueSettings merge(QueueSettings other) { if (onQueue == null) { onQueue = other.getOnQueue(); } + if (databaseId == null) { + databaseId = other.getDatabaseId(); + } + if (namespace == null) { + namespace = other.getNamespace(); + } return this; } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java index 6516c1d6..d829d05d 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java @@ -14,10 +14,55 @@ package com.google.appengine.tools.pipeline.impl.backend; -import com.github.rholder.retry.*; +import static com.google.appengine.tools.pipeline.impl.model.JobRecord.IS_ROOT_JOB_PROPERTY; +import static com.google.appengine.tools.pipeline.impl.model.JobRecord.ROOT_JOB_DISPLAY_NAME; +import static com.google.appengine.tools.pipeline.impl.model.PipelineModelObject.ROOT_JOB_KEY_PROPERTY; +import static com.google.appengine.tools.pipeline.impl.util.TestUtils.throwHereForTesting; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.logging.Level; +import java.util.stream.Collectors; + +import javax.inject.Inject; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; import com.google.appengine.tools.pipeline.JobRunId; import com.google.appengine.tools.pipeline.NoSuchObjectException; -import com.google.appengine.tools.pipeline.impl.model.*; +import com.google.appengine.tools.pipeline.impl.model.Barrier; +import com.google.appengine.tools.pipeline.impl.model.ExceptionRecord; +import com.google.appengine.tools.pipeline.impl.model.JobInstanceRecord; +import com.google.appengine.tools.pipeline.impl.model.JobRecord; +import com.google.appengine.tools.pipeline.impl.model.PipelineModelObject; +import com.google.appengine.tools.pipeline.impl.model.PipelineObjects; +import com.google.appengine.tools.pipeline.impl.model.ShardedValue; +import com.google.appengine.tools.pipeline.impl.model.Slot; import com.google.appengine.tools.pipeline.impl.tasks.PipelineTask; import com.google.appengine.tools.pipeline.impl.util.SerializationUtils; import com.google.appengine.tools.pipeline.impl.util.TestUtils; @@ -25,38 +70,42 @@ import com.google.appengine.tools.txn.PipelineBackendTransaction; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.datastore.*; +import com.google.cloud.datastore.Batch; +import com.google.cloud.datastore.Blob; +import com.google.cloud.datastore.Cursor; +import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.DatastoreBatchWriter; +import com.google.cloud.datastore.DatastoreException; +import com.google.cloud.datastore.DatastoreOptions; +import com.google.cloud.datastore.Entity; +import com.google.cloud.datastore.EntityQuery; +import com.google.cloud.datastore.Key; +import com.google.cloud.datastore.KeyQuery; +import com.google.cloud.datastore.ProjectionEntity; +import com.google.cloud.datastore.ProjectionEntityQuery; +import com.google.cloud.datastore.Query; +import com.google.cloud.datastore.QueryResults; +import com.google.cloud.datastore.StructuredQuery; +import com.google.cloud.datastore.Transaction; import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Streams; import com.google.datastore.v1.QueryResultBatch; -import lombok.*; + +import lombok.Builder; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.Value; import lombok.extern.java.Log; -import javax.inject.Inject; -import java.io.IOException; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; -import java.util.logging.Level; -import java.util.stream.Collectors; - -import static com.google.appengine.tools.pipeline.impl.model.JobRecord.IS_ROOT_JOB_PROPERTY; -import static com.google.appengine.tools.pipeline.impl.model.JobRecord.ROOT_JOB_DISPLAY_NAME; -import static com.google.appengine.tools.pipeline.impl.model.PipelineModelObject.ROOT_JOB_KEY_PROPERTY; -import static com.google.appengine.tools.pipeline.impl.util.TestUtils.throwHereForTesting; - /** * @author rudominer@google.com (Mitch Rudominer) * */ @Log -@RequiredArgsConstructor(onConstructor_ = {@Inject}) public class AppEngineBackEnd implements PipelineBackEnd, SerializationStrategy { public static final int MAX_RETRY_ATTEMPTS = 5; @@ -65,42 +114,45 @@ public class AppEngineBackEnd implements PipelineBackEnd, SerializationStrategy // TODO: RetryUtils is in mapreduce package, so duplicated to not mix on purpose // TODO: consider moving to a shared package - // TODO: possibly we should inspect error code in more detail? see https://cloud.google.com/datastore/docs/concepts/errors#Error_Codes + // TODO: possibly we should inspect error code in more detail? see + // https://cloud.google.com/datastore/docs/concepts/errors#Error_Codes @SuppressWarnings("DuplicatedCode") public static Predicate handleDatastoreExceptionRetry() { return t -> { - Iterator datastoreExceptionIterator = Iterables.filter(Throwables.getCausalChain(t), DatastoreException.class).iterator(); + Iterator datastoreExceptionIterator = Iterables + .filter(Throwables.getCausalChain(t), DatastoreException.class).iterator(); if (datastoreExceptionIterator.hasNext()) { DatastoreException de = datastoreExceptionIterator.next(); return de.isRetryable() || - (de.getMessage() != null && de.getMessage().toLowerCase().contains("retry the transaction")); + (de.getMessage() != null && de.getMessage().toLowerCase().contains("retry the transaction")); } return false; }; } private Retryer withDefaults(RetryerBuilder builder) { - return builder - .withWaitStrategy( - WaitStrategies.exponentialWait(RETRY_BACKOFF_MULTIPLIER, RETRY_MAX_BACKOFF_MS, TimeUnit.MILLISECONDS)) - .retryIfException(handleDatastoreExceptionRetry()) - .retryIfExceptionOfType(IOException.class) //q: can this happen? - .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_ATTEMPTS)) - .withRetryListener(new RetryListener() { - @Override - public void onRetry(Attempt attempt) { - if (attempt.getAttemptNumber() > 1 || attempt.hasException()) { - String className = AppEngineBackEnd.class.getName(); - if (attempt.hasException()) { - log.log(Level.WARNING, "%s, Attempt #%d. Retrying...".formatted(className, attempt.getAttemptNumber()), attempt.getExceptionCause()); - } else { - log.log(Level.WARNING, "%s, Attempt #%d OK, wait: %s".formatted(className, attempt.getAttemptNumber(), Duration.ofMillis(attempt.getDelaySinceFirstAttempt()))); - } - } - } + return builder + .withWaitStrategy( + WaitStrategies.exponentialWait(RETRY_BACKOFF_MULTIPLIER, RETRY_MAX_BACKOFF_MS, TimeUnit.MILLISECONDS)) + .retryIfException(handleDatastoreExceptionRetry()) + .retryIfExceptionOfType(IOException.class) // q: can this happen? + .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_ATTEMPTS)) + .withRetryListener(new RetryListener() { + @Override + public void onRetry(Attempt attempt) { + if (attempt.getAttemptNumber() > 1 || attempt.hasException()) { + String className = AppEngineBackEnd.class.getName(); + if (attempt.hasException()) { + log.log(Level.WARNING, "%s, Attempt #%d. Retrying...".formatted(className, attempt.getAttemptNumber()), + attempt.getExceptionCause()); + } else { + log.log(Level.WARNING, "%s, Attempt #%d OK, wait: %s".formatted(className, attempt.getAttemptNumber(), + Duration.ofMillis(attempt.getDelaySinceFirstAttempt()))); } - ) - .build(); + } + } + }) + .build(); } @@ -112,8 +164,17 @@ public void onRetry(Attempt attempt) { private final PipelineTaskQueue taskQueue; private final AppEngineServicesService servicesService; + @Inject + public AppEngineBackEnd(Datastore datastore, PipelineTaskQueue taskQueue, + AppEngineServicesService appEngineServicesService) { + this.datastore = datastore; + this.taskQueue = taskQueue; + this.servicesService = appEngineServicesService; + } + // Only used in tests - public AppEngineBackEnd(Options options, PipelineTaskQueue taskQueue, AppEngineServicesService appEngineServicesService) { + public AppEngineBackEnd(Options options, PipelineTaskQueue taskQueue, + AppEngineServicesService appEngineServicesService) { this(options.getDatastoreOptions().toBuilder().build().getService(), taskQueue, appEngineServicesService); } @@ -123,8 +184,10 @@ public static class Options implements PipelineBackEnd.Options { String projectId; - //q: good idea? risk here that we're copying / passing around sensitive info; although really - // in prod ppl should depend on application-default credentials and I think this will be null + // q: good idea? risk here that we're copying / passing around sensitive info; + // although really + // in prod ppl should depend on application-default credentials and I think this + // will be null Credentials credentials; DatastoreOptions datastoreOptions; @@ -132,10 +195,10 @@ public static class Options implements PipelineBackEnd.Options { @SneakyThrows public static Options defaults() { return Options.builder() - .datastoreOptions(DatastoreOptions.getDefaultInstance()) - .credentials(GoogleCredentials.getApplicationDefault()) - .projectId(DatastoreOptions.getDefaultProjectId()) - .build(); + .datastoreOptions(DatastoreOptions.getDefaultInstance()) + .credentials(GoogleCredentials.getApplicationDefault()) + .projectId(DatastoreOptions.getDefaultProjectId()) + .build(); } } @@ -143,10 +206,10 @@ public static Options defaults() { @Override public PipelineBackEnd.Options getOptions() { return Options.builder() - .datastoreOptions(datastore.getOptions()) - .projectId(datastore.getOptions().getProjectId()) - .credentials(datastore.getOptions().getCredentials()) - .build(); + .datastoreOptions(datastore.getOptions()) + .projectId(datastore.getOptions().getProjectId()) + .credentials(datastore.getOptions().getCredentials()) + .build(); } @Override @@ -164,17 +227,14 @@ public String getDefaultVersion(String service) { return servicesService.getDefaultVersion(service); } - private void putAll(DatastoreBatchWriter batchWriter, Collection objects) { objects.stream() - .map(PipelineModelObject::toEntity) - //extra logging for debug - //.peek(e -> logger.info("putting entity: " + e.getKey().toString())) - .forEach(batchWriter::putWithDeferredIdAllocation); + .map(PipelineModelObject::toEntity) + // extra logging for debug + // .peek(e -> logger.info("putting entity: " + e.getKey().toString())) + .forEach(batchWriter::putWithDeferredIdAllocation); } - - // transactional save all private void saveAll(PipelineBackendTransaction txn, UpdateSpec.Group group) { putAll(txn, group.getBarriers()); @@ -194,12 +254,11 @@ private void saveAll(PipelineBackendTransaction txn, UpdateSpec.Group group) { private List saveAll(UpdateSpec.Group group) { // collect into batches of 500 List toSave = Streams.concat( - group.getBarriers().stream(), - group.getJobs().stream(), - group.getSlots().stream(), - group.getJobInstanceRecords().stream(), - group.getFailureRecords().stream() - ).toList(); + group.getBarriers().stream(), + group.getJobs().stream(), + group.getSlots().stream(), + group.getJobInstanceRecords().stream(), + group.getFailureRecords().stream()).toList(); List keys = new ArrayList<>(toSave.size()); final int MAX_BATCH_SIZE = 500; // limit from Datastore API @@ -210,7 +269,8 @@ private List saveAll(UpdateSpec.Group group) { @Override public List call() throws Exception { Batch batch = datastore.newBatch(); - putAll(batch, toSave.subList(batchOffset, batchOffset +Math.min(MAX_BATCH_SIZE, toSave.size() -batchOffset))); + putAll(batch, + toSave.subList(batchOffset, batchOffset + Math.min(MAX_BATCH_SIZE, toSave.size() - batchOffset))); return batch.submit().getGeneratedKeys(); } })); @@ -219,7 +279,8 @@ public List call() throws Exception { return keys; } - private boolean transactionallySaveAll(UpdateSpec.Transaction transactionSpec, Key jobKey, JobRecord.State... expectedStates) { + private boolean transactionallySaveAll(UpdateSpec.Transaction transactionSpec, Key jobKey, + JobRecord.State... expectedStates) { PipelineBackendTransaction transaction = PipelineBackendTransaction.newInstance(datastore, taskQueue); try { @@ -230,15 +291,15 @@ private boolean transactionallySaveAll(UpdateSpec.Transaction transactionSpec, K } catch (DatastoreException e) { if (e.getCode() == 404) { throw new RuntimeException( - "Fatal Pipeline corruption error. No JobRecord found with key = " + jobKey); + "Fatal Pipeline corruption error. No JobRecord found with key = " + jobKey); } else { throw e; } } if (entity == null) { - //don't believe new datastore lib throws exceptions here anymore + // don't believe new datastore lib throws exceptions here anymore throw new RuntimeException( - "Fatal Pipeline corruption error. No JobRecord found with key = " + jobKey); + "Fatal Pipeline corruption error. No JobRecord found with key = " + jobKey); } JobRecord jobRecord = new JobRecord(entity); @@ -259,14 +320,16 @@ private boolean transactionallySaveAll(UpdateSpec.Transaction transactionSpec, K } saveAll(transaction, transactionSpec); - if (transactionSpec instanceof UpdateSpec.TransactionWithTasks transactionWithTasks) { transaction.enqueue(transactionWithTasks.getTasks()); } // commit is AFTER enqueue, so if enqueuing fails, we don't commit - // then in 'finally' block, if we have to roll back the txn, we ALSO attempt to delete the tasks from the queue - // concern is what if enqueued tasks had names and already ran, then we might get into a bad state if those depended on stuff done elsewhere in the transaction + // then in 'finally' block, if we have to roll back the txn, we ALSO attempt to + // delete the tasks from the queue + // concern is what if enqueued tasks had names and already ran, then we might + // get into a bad state if those depended on stuff done elsewhere in the + // transaction // 1) is there such a problematic case? perhaps nothing transaction.commit(); } finally { @@ -282,8 +345,6 @@ private abstract static class Operation implements Callable { private final String name; } - - @Override public PipelineTaskQueue.TaskReference enqueue(PipelineTask pipelineTask) { return taskQueue.enqueue(pipelineTask); @@ -291,10 +352,10 @@ public PipelineTaskQueue.TaskReference enqueue(PipelineTask pipelineTask) { @Override public boolean saveWithJobStateCheck(final UpdateSpec updateSpec, - final Key jobKey, + final Key jobKey, final JobRecord.State... expectedStates) { - //q: do this in a thread, so parallel with the other datastore saves?? + // q: do this in a thread, so parallel with the other datastore saves?? saveAll(updateSpec.getNonTransactionalGroup()); // TODO(user): Replace this with plug-able hooks that could be used by tests, @@ -342,8 +403,8 @@ public JobRecord queryJob(final Key jobKey, final JobRecord.InflationType inflat case FOR_RUN: runBarrier = queryBarrier(jobRecord.getRunBarrierKey(), true); finalizeBarrier = queryBarrier(jobRecord.getFinalizeBarrierKey(), false); - jobInstanceRecord = - new JobInstanceRecord(getEntity("queryJob", jobRecord.getJobInstanceKey()), getSerializationStrategy()); + jobInstanceRecord = new JobInstanceRecord(getEntity("queryJob", jobRecord.getJobInstanceKey()), + getSerializationStrategy()); outputSlot = querySlot(jobRecord.getOutputSlotKey(), false); break; case FOR_FINALIZE: @@ -431,7 +492,7 @@ public ExceptionRecord queryFailure(Key failureKey) throws NoSuchObjectException return new ExceptionRecord(entity); } - //TODO: change return value to some sort of DatastoreValue type? + // TODO: change return value to some sort of DatastoreValue type? @Override public Object serializeValue(PipelineModelObject model, Object value) throws IOException { byte[] bytes = SerializationUtils.serialize(value); @@ -473,7 +534,7 @@ public List call() { @Override public Object deserializeValue(PipelineModelObject model, Object serializedVersion) - throws IOException, ClassNotFoundException { + throws IOException, ClassNotFoundException { if (serializedVersion instanceof Blob) { return SerializationUtils.deserialize(((Blob) serializedVersion).toByteArray()); } else { @@ -504,12 +565,13 @@ private Map getEntities(String logString, final Collection key Map result = attemptWithRetries(withDefaults(RetryerBuilder.newBuilder()), new Operation<>(logString) { @Override public Map call() { - //NOTE: this read is strongly consistent now, bc backed by Firestore in Datastore-mode; this library was + // NOTE: this read is strongly consistent now, bc backed by Firestore in + // Datastore-mode; this library was // designed thinking this read was only event return datastore.fetch(keys) - .stream() - .filter(Objects::nonNull) - .collect(Collectors.toMap(Entity::getKey, Function.identity())); + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toMap(Entity::getKey, Function.identity())); } }); if (keys.size() != result.size()) { @@ -521,12 +583,13 @@ public Map call() { } private Entity getEntity(String logString, final Key key) throws NoSuchObjectException { - Entity entity = attemptWithRetries(withDefaults(RetryerBuilder.newBuilder()), new Operation<>("getEntity_" + logString) { - @Override - public Entity call() throws Exception { - return datastore.get(key); - } - }); + Entity entity = attemptWithRetries(withDefaults(RetryerBuilder.newBuilder()), + new Operation<>("getEntity_" + logString) { + @Override + public Entity call() throws Exception { + return datastore.get(key); + } + }); if (entity == null) { throw new NoSuchObjectException(key.toString()); @@ -539,22 +602,22 @@ public List queryAll(final String kind, final Key rootJobKey) { @Override public List call() { EntityQuery.Builder query = Query.newEntityQueryBuilder() - .setKind(kind) - .setFilter(StructuredQuery.PropertyFilter.eq(ROOT_JOB_KEY_PROPERTY, rootJobKey)); + .setKind(kind) + .setFilter(StructuredQuery.PropertyFilter.eq(ROOT_JOB_KEY_PROPERTY, rootJobKey)); List entities = new ArrayList<>(); QueryResults queryResults; long lastPageCount; do { - //TODO: set chunkSize? does concept exist in this API client library? + // TODO: set chunkSize? does concept exist in this API client library? queryResults = datastore.run(query.build()); List page = Streams.stream(queryResults).toList(); lastPageCount = page.size(); entities.addAll(page); query = query.setStartCursor(queryResults.getCursorAfter()); - } while ( - queryResults.getMoreResults() != QueryResultBatch.MoreResultsType.NO_MORE_RESULTS - && lastPageCount > 0 // unclear why, but at least in tests prev check doesn't work as moreResults is always MORE_RESULTS_AFTER_LIMIT + } while (queryResults.getMoreResults() != QueryResultBatch.MoreResultsType.NO_MORE_RESULTS + && lastPageCount > 0 // unclear why, but at least in tests prev check doesn't work as moreResults is + // always MORE_RESULTS_AFTER_LIMIT ); return entities; @@ -566,7 +629,7 @@ public List call() { public Pair, String> queryRootPipelines(String classFilter, String cursor, final int limit) { EntityQuery.Builder query = Query.newEntityQueryBuilder() - .setKind(JobRecord.DATA_STORE_KIND); + .setKind(JobRecord.DATA_STORE_KIND); if (Strings.isNullOrEmpty(classFilter)) { query.setFilter(StructuredQuery.PropertyFilter.eq(IS_ROOT_JOB_PROPERTY, true)); @@ -581,88 +644,89 @@ public Pair, String> queryRootPipelines(String cla query.setStartCursor(Cursor.fromUrlSafe(cursor)); } return attemptWithRetries(withDefaults(RetryerBuilder.newBuilder()), new Operation<>("queryRootPipelines") { - @Override - public Pair, String> call() { - QueryResults entities = datastore.run(query.build()); - Cursor dsCursor = null; - List roots = new LinkedList<>(); - while (entities.hasNext()) { - if (limit > 0 && roots.size() >= limit) { - dsCursor = entities.getCursorAfter(); - break; - } - JobRecord jobRecord = new JobRecord(entities.next()); - roots.add(jobRecord); - } - return Pair.of(roots, dsCursor == null ? null : dsCursor.toUrlSafe()); + @Override + public Pair, String> call() { + QueryResults entities = datastore.run(query.build()); + Cursor dsCursor = null; + List roots = new LinkedList<>(); + while (entities.hasNext()) { + if (limit > 0 && roots.size() >= limit) { + dsCursor = entities.getCursorAfter(); + break; } - }); + JobRecord jobRecord = new JobRecord(entities.next()); + roots.add(jobRecord); + } + return Pair.of(roots, dsCursor == null ? null : dsCursor.toUrlSafe()); + } + }); } @Override public Set getRootPipelinesDisplayName() { - return attemptWithRetries(withDefaults(RetryerBuilder.newBuilder()), new Operation<>("getRootPipelinesDisplayName") { - @Override - public Set call() { - ProjectionEntityQuery.Builder query = Query.newProjectionEntityQueryBuilder() - .setKind(JobRecord.DATA_STORE_KIND) - .addProjection(ROOT_JOB_DISPLAY_NAME) - .addDistinctOn(ROOT_JOB_DISPLAY_NAME); - - QueryResults queryResults; - Set pipelines = new LinkedHashSet<>(); - List page; - do { - //TODO: set chunkSize? does concept exist in this API client library? - queryResults = datastore.run(query.build()); - page = Streams.stream(queryResults) - .map(entity -> entity.getString(ROOT_JOB_DISPLAY_NAME)) - .toList(); - pipelines.addAll(page); - query = query.setStartCursor(queryResults.getCursorAfter()); - } while ( - !page.isEmpty() && // unclear why, but at least in tests prev check doesn't work as moreResults is always MORE_RESULTS_AFTER_LIMIT - queryResults.getMoreResults() != QueryResultBatch.MoreResultsType.NO_MORE_RESULTS); - - return pipelines; - } - }); + return attemptWithRetries(withDefaults(RetryerBuilder.newBuilder()), + new Operation<>("getRootPipelinesDisplayName") { + @Override + public Set call() { + ProjectionEntityQuery.Builder query = Query.newProjectionEntityQueryBuilder() + .setKind(JobRecord.DATA_STORE_KIND) + .addProjection(ROOT_JOB_DISPLAY_NAME) + .addDistinctOn(ROOT_JOB_DISPLAY_NAME); + + QueryResults queryResults; + Set pipelines = new LinkedHashSet<>(); + List page; + do { + // TODO: set chunkSize? does concept exist in this API client library? + queryResults = datastore.run(query.build()); + page = Streams.stream(queryResults) + .map(entity -> entity.getString(ROOT_JOB_DISPLAY_NAME)) + .toList(); + pipelines.addAll(page); + query = query.setStartCursor(queryResults.getCursorAfter()); + } while (!page.isEmpty() && // unclear why, but at least in tests prev check doesn't work as moreResults is + // always MORE_RESULTS_AFTER_LIMIT + queryResults.getMoreResults() != QueryResultBatch.MoreResultsType.NO_MORE_RESULTS); + + return pipelines; + } + }); } - //NOTE: just for the pipelines UX + // NOTE: just for the pipelines UX @Override public PipelineObjects queryFullPipeline(final Key rootJobKey) { ExecutorService executor = Executors.newFixedThreadPool(5); - CompletableFuture> jobs = CompletableFuture.supplyAsync(() -> - queryAll(JobRecord.DATA_STORE_KIND, rootJobKey).stream() - .map(entity -> new JobRecord(entity)) - .collect(Collectors.toMap(PipelineModelObject::getKey, Function.identity())), executor); + CompletableFuture> jobs = CompletableFuture + .supplyAsync(() -> queryAll(JobRecord.DATA_STORE_KIND, rootJobKey).stream() + .map(entity -> new JobRecord(entity)) + .collect(Collectors.toMap(PipelineModelObject::getKey, Function.identity())), executor); - CompletableFuture> barriers = CompletableFuture.supplyAsync(() -> - queryAll(Barrier.DATA_STORE_KIND, rootJobKey).stream() - .map(entity -> new Barrier(entity)) - .collect(Collectors.toMap(PipelineModelObject::getKey, Function.identity())), executor); + CompletableFuture> barriers = CompletableFuture + .supplyAsync(() -> queryAll(Barrier.DATA_STORE_KIND, rootJobKey).stream() + .map(entity -> new Barrier(entity)) + .collect(Collectors.toMap(PipelineModelObject::getKey, Function.identity())), executor); - CompletableFuture> slots = CompletableFuture.supplyAsync(() -> - queryAll(Slot.DATA_STORE_KIND, rootJobKey).stream() - .map(entity -> new Slot(entity, this, true)) - .collect(Collectors.toMap(PipelineModelObject::getKey, Function.identity())), executor); + CompletableFuture> slots = CompletableFuture + .supplyAsync(() -> queryAll(Slot.DATA_STORE_KIND, rootJobKey).stream() + .map(entity -> new Slot(entity, this, true)) + .collect(Collectors.toMap(PipelineModelObject::getKey, Function.identity())), executor); - CompletableFuture> jobInstances = CompletableFuture.supplyAsync(() -> - queryAll(JobInstanceRecord.DATA_STORE_KIND, rootJobKey).stream() - .map(entity -> new JobInstanceRecord(entity, getSerializationStrategy())) - .collect(Collectors.toMap(PipelineModelObject::getKey, Function.identity())), executor); + CompletableFuture> jobInstances = CompletableFuture + .supplyAsync(() -> queryAll(JobInstanceRecord.DATA_STORE_KIND, rootJobKey).stream() + .map(entity -> new JobInstanceRecord(entity, getSerializationStrategy())) + .collect(Collectors.toMap(PipelineModelObject::getKey, Function.identity())), executor); - CompletableFuture> exceptions = CompletableFuture.supplyAsync(() -> - queryAll(ExceptionRecord.DATA_STORE_KIND, rootJobKey).stream() - .map(entity -> new ExceptionRecord(entity)) - .collect(Collectors.toMap(PipelineModelObject::getKey, Function.identity())), executor); + CompletableFuture> exceptions = CompletableFuture + .supplyAsync(() -> queryAll(ExceptionRecord.DATA_STORE_KIND, rootJobKey).stream() + .map(entity -> new ExceptionRecord(entity)) + .collect(Collectors.toMap(PipelineModelObject::getKey, Function.identity())), executor); PipelineObjects objects = new PipelineObjects( - rootJobKey, jobs.join(), slots.join(), barriers.join(), jobInstances.join(), exceptions.join()); + rootJobKey, jobs.join(), slots.join(), barriers.join(), jobInstances.join(), exceptions.join()); executor.shutdown(); return objects; @@ -676,9 +740,9 @@ public Void call() { int batchesToAttempt = 5; int batchSize = 100; KeyQuery.Builder queryBuilder = Query.newKeyQueryBuilder() - .setKind(kind) - .setFilter(StructuredQuery.PropertyFilter.eq(ROOT_JOB_KEY_PROPERTY, rootJobKey)) - .setLimit(batchSize); + .setKind(kind) + .setFilter(StructuredQuery.PropertyFilter.eq(ROOT_JOB_KEY_PROPERTY, rootJobKey)) + .setLimit(batchSize); QueryResults queryResults; List keys; @@ -687,7 +751,7 @@ public Void call() { Query query = queryBuilder.build(); queryResults = datastore.run(query); keys = Streams.stream(queryResults) - .toList(); + .toList(); if (!keys.isEmpty()) { log.info("Deleting " + keys.size() + " " + kind + "s with rootJobKey=" + rootJobKey); Batch batch = datastore.newBatch(); @@ -695,10 +759,10 @@ public Void call() { batch.submit(); } queryBuilder = queryBuilder.setStartCursor(queryResults.getCursorAfter()); - } while ( - queryResults.getMoreResults() != QueryResultBatch.MoreResultsType.NO_MORE_RESULTS - && !keys.isEmpty() // unclear why, but in tests prev check doesn't work as moreResults is always MORE_RESULTS_AFTER_LIMIT - && batchesToAttempt-- > 0 // avoid infinite loop + } while (queryResults.getMoreResults() != QueryResultBatch.MoreResultsType.NO_MORE_RESULTS + && !keys.isEmpty() // unclear why, but in tests prev check doesn't work as moreResults is always + // MORE_RESULTS_AFTER_LIMIT + && batchesToAttempt-- > 0 // avoid infinite loop ); return null; } @@ -709,8 +773,10 @@ public Void call() { * Delete all datastore entities corresponding to the given pipeline. * * @param pipelineRunId The root job key identifying the pipeline - * @param force If this parameter is not {@code true} then this method will - * throw an {@link IllegalStateException} if the specified pipeline is not in the + * @param force If this parameter is not {@code true} then this method + * will + * throw an {@link IllegalStateException} if the specified + * pipeline is not in the * {@link JobRecord.State#FINALIZED} or * {@link JobRecord.State#STOPPED} state. * @throws IllegalStateException If {@code force = false} and the specified @@ -755,7 +821,7 @@ private R attemptWithRetries(Retryer retryer, final Operation operatio } catch (RetryException e) { if (e.getCause() instanceof RuntimeException) { log.warning(e.getCause().getMessage() + " during " + operation.getName() - + " throwing after " + e.getNumberOfFailedAttempts() + " multiple attempts "); + + " throwing after " + e.getNumberOfFailedAttempts() + " multiple attempts "); throw (RuntimeException) e.getCause(); } else { throw new RuntimeException(e); diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/model/JobRecord.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/model/JobRecord.java index 4f8ef5c6..c4a87f3a 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/model/JobRecord.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/model/JobRecord.java @@ -14,16 +14,25 @@ package com.google.appengine.tools.pipeline.impl.model; +import java.lang.reflect.Method; +import java.time.Instant; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import com.google.appengine.tools.pipeline.Job; -import com.google.appengine.tools.pipeline.JobRunId; import com.google.appengine.tools.pipeline.JobInfo; +import com.google.appengine.tools.pipeline.JobRunId; import com.google.appengine.tools.pipeline.JobSetting; import com.google.appengine.tools.pipeline.JobSetting.BackoffFactor; import com.google.appengine.tools.pipeline.JobSetting.BackoffSeconds; import com.google.appengine.tools.pipeline.JobSetting.IntValuedSetting; import com.google.appengine.tools.pipeline.JobSetting.MaxAttempts; -import com.google.appengine.tools.pipeline.JobSetting.OnService; import com.google.appengine.tools.pipeline.JobSetting.OnQueue; +import com.google.appengine.tools.pipeline.JobSetting.OnService; import com.google.appengine.tools.pipeline.JobSetting.StatusConsoleUrl; import com.google.appengine.tools.pipeline.JobSetting.WaitForSetting; import com.google.appengine.tools.pipeline.impl.FutureValueImpl; @@ -34,17 +43,20 @@ import com.google.appengine.tools.pipeline.impl.util.EntityUtils; import com.google.appengine.tools.pipeline.impl.util.StringUtils; import com.google.cloud.Timestamp; -import com.google.cloud.datastore.*; +import com.google.cloud.datastore.BooleanValue; +import com.google.cloud.datastore.Entity; +import com.google.cloud.datastore.Key; +import com.google.cloud.datastore.KeyFactory; +import com.google.cloud.datastore.KeyValue; +import com.google.cloud.datastore.LongValue; +import com.google.cloud.datastore.StringValue; +import com.google.cloud.datastore.TimestampValue; import com.google.common.annotations.VisibleForTesting; + import lombok.Getter; import lombok.NonNull; import lombok.Setter; -import java.lang.reflect.Method; -import java.time.Instant; -import java.util.*; -import java.util.stream.Collectors; - /** * The Pipeline model object corresponding to a job. * @@ -56,7 +68,8 @@ public class JobRecord extends PipelineModelObject implements JobInfo, ExpiringD public static final String DATA_STORE_KIND = "pipeline-job"; - //TODO: very hacky, probably need to have a factory that builds these, and extend there + // TODO: very hacky, probably need to have a factory that builds these, and + // extend there @VisibleForTesting public static AppEngineEnvironment environment = new AppEngineStandardGen2(); @@ -64,13 +77,13 @@ public JobRunId getPipelineRunId() { return JobRunId.of(getRootJobKey()); } - /** * The state of the job. * */ public enum State { - // TODO(user): document states (including valid transitions) and relation to JobInfo.State + // TODO(user): document states (including valid transitions) and relation to + // JobInfo.State WAITING_TO_RUN, WAITING_TO_FINALIZE, FINALIZED, @@ -103,9 +116,9 @@ public enum InflationType { * {@code null}; and *
  • for the returned {@link Barrier} * {@link Barrier#getWaitingOnInflated()} will not return {@code null}; and - *
  • {@link JobRecord#getOutputSlotInflated()} will not return + *
  • {@link JobRecord#getOutputSlotInflated()} will not return * {@code null}; and - *
  • {@link JobRecord#getFinalizeBarrierInflated()} will not return + *
  • {@link JobRecord#getFinalizeBarrierInflated()} will not return * {@code null} * */ @@ -114,9 +127,9 @@ public enum InflationType { /** * Inflate as necessary to finalize the job. In particular: *
      - *
    • {@link JobRecord#getOutputSlotInflated()} will not return + *
    • {@link JobRecord#getOutputSlotInflated()} will not return * {@code null}; and - *
    • {@link JobRecord#getFinalizeBarrierInflated()} will not return + *
    • {@link JobRecord#getFinalizeBarrierInflated()} will not return * {@code null}; and *
    • for the returned {@link Barrier} the method * {@link Barrier#getWaitingOnInflated()} will not return {@code null}. @@ -131,18 +144,15 @@ public enum InflationType { FOR_OUTPUT; } - // Data store entity property names private static final String JOB_INSTANCE_PROPERTY = "jobInstance"; private static final String RUN_BARRIER_PROPERTY = "runBarrier"; private static final String FINALIZE_BARRIER_PROPERTY = "finalizeBarrier"; private static final String STATE_PROPERTY = "state"; - private static final String EXCEPTION_HANDLING_ANCESTOR_KEY_PROPERTY = - "exceptionHandlingAncestorKey"; + private static final String EXCEPTION_HANDLING_ANCESTOR_KEY_PROPERTY = "exceptionHandlingAncestorKey"; private static final String EXCEPTION_HANDLER_SPECIFIED_PROPERTY = "hasExceptionHandler"; private static final String EXCEPTION_HANDLER_JOB_KEY_PROPERTY = "exceptionHandlerJobKey"; - private static final String EXCEPTION_HANDLER_JOB_GRAPH_GUID_PROPERTY = - "exceptionHandlerJobGraphGuid"; + private static final String EXCEPTION_HANDLER_JOB_GRAPH_GUID_PROPERTY = "exceptionHandlerJobGraphGuid"; private static final String CALL_EXCEPTION_HANDLER_PROPERTY = "callExceptionHandler"; private static final String IGNORE_EXCEPTION_PROPERTY = "ignoreException"; private static final String OUTPUT_SLOT_PROPERTY = "outputSlot"; @@ -166,7 +176,8 @@ public enum InflationType { /** * projectId for job; must be set */ - @Getter @NonNull + @Getter + @NonNull private final String projectId; /** @@ -175,7 +186,11 @@ public enum InflationType { @Getter private final String namespace; - + /** + * databaseId for Job, if any (otherwise default "") + */ + @Getter + private final String databaseId; @Getter private final Key jobInstanceKey; @@ -185,7 +200,8 @@ public enum InflationType { private final Key finalizeBarrierKey; @Getter private Key outputSlotKey; - @Getter @Setter + @Getter + @Setter private State state; /** * Returns key of the nearest ancestor that has exceptionHandler method @@ -196,7 +212,8 @@ public enum InflationType { private boolean exceptionHandlerSpecified; @Getter private Key exceptionHandlerJobKey; - @Getter @Setter + @Getter + @Setter private String exceptionHandlerJobGraphGuid; /** * If true then this job is exception handler and @@ -206,19 +223,26 @@ public enum InflationType { @Getter private boolean callExceptionHandler; /** - * If true then an exception during a job execution is ignored. It is - * expected to be set to true for jobs that execute error handler due + * If true then an exception during a job execution is ignored. It + * is + * expected to be set to true for jobs that execute error handler + * due * to cancellation. */ - @Getter @Setter + @Getter + @Setter private boolean ignoreException; - @Getter @Setter + @Getter + @Setter private Key exceptionKey; - @Getter @Setter + @Getter + @Setter private Instant startTime; - @Getter @Setter + @Getter + @Setter private Instant endTime; - @Getter @Setter + @Getter + @Setter private String childGraphGuid; @Getter private List childKeys; @@ -232,14 +256,14 @@ public enum InflationType { private long backoffFactor = JobSetting.BackoffFactor.DEFAULT; @Getter private final QueueSettings queueSettings = new QueueSettings(); - @Getter @Setter + @Getter + @Setter private String statusConsoleUrl; @Getter private String rootJobDisplayName; @Getter private Boolean isRootJob; - // transient fields @Getter private Barrier runBarrierInflated; @@ -260,7 +284,8 @@ public enum InflationType { public JobRecord(Entity entity) { super(entity); - //TODO: new lib throws DatastoreException if any of these are undefined, rather than returning 'null + // TODO: new lib throws DatastoreException if any of these are undefined, rather + // than returning 'null // wrap with EntityUtils.getKey(entity, propertyName) ...? // something else? jobInstanceKey = entity.getKey(JOB_INSTANCE_PROPERTY); @@ -269,7 +294,9 @@ public JobRecord(Entity entity) { outputSlotKey = entity.getKey(OUTPUT_SLOT_PROPERTY); state = State.valueOf(entity.getString(STATE_PROPERTY)); exceptionHandlingAncestorKey = EntityUtils.getKey(entity, EXCEPTION_HANDLING_ANCESTOR_KEY_PROPERTY); - exceptionHandlerSpecified = entity.contains(EXCEPTION_HANDLER_SPECIFIED_PROPERTY) ? entity.getBoolean(EXCEPTION_HANDLER_SPECIFIED_PROPERTY) : false; + exceptionHandlerSpecified = entity.contains(EXCEPTION_HANDLER_SPECIFIED_PROPERTY) + ? entity.getBoolean(EXCEPTION_HANDLER_SPECIFIED_PROPERTY) + : false; exceptionHandlerJobKey = EntityUtils.getKey(entity, EXCEPTION_HANDLER_JOB_KEY_PROPERTY); exceptionHandlerJobGraphGuid = EntityUtils.getString(entity, EXCEPTION_HANDLER_JOB_GRAPH_GUID_PROPERTY); @@ -295,12 +322,14 @@ public JobRecord(Entity entity) { } projectId = entity.getKey().getProjectId(); namespace = entity.getKey().getNamespace(); + databaseId = entity.getKey().getDatabaseId() == null || entity.getKey().getDatabaseId().isEmpty() ? null + : entity.getKey().getDatabaseId(); } - public JobRunId getJobRunId() { return JobRunId.of(getKey()); } + /** * Constructs and returns a Data Store Entity that represents this model * object @@ -327,18 +356,22 @@ public Entity toEntity() { } if (null != exceptionHandlerJobGraphGuid) { builder.set(EXCEPTION_HANDLER_JOB_GRAPH_GUID_PROPERTY, - StringValue.newBuilder(exceptionHandlerJobGraphGuid).setExcludeFromIndexes(true).build()); + StringValue.newBuilder(exceptionHandlerJobGraphGuid).setExcludeFromIndexes(true).build()); } - builder.set(CALL_EXCEPTION_HANDLER_PROPERTY, BooleanValue.newBuilder(callExceptionHandler).setExcludeFromIndexes(true).build()); - builder.set(IGNORE_EXCEPTION_PROPERTY, BooleanValue.newBuilder(ignoreException).setExcludeFromIndexes(true).build()); + builder.set(CALL_EXCEPTION_HANDLER_PROPERTY, + BooleanValue.newBuilder(callExceptionHandler).setExcludeFromIndexes(true).build()); + builder.set(IGNORE_EXCEPTION_PROPERTY, + BooleanValue.newBuilder(ignoreException).setExcludeFromIndexes(true).build()); if (childGraphGuid != null) { - builder.set(CHILD_GRAPH_GUID_PROPERTY, StringValue.newBuilder(childGraphGuid).setExcludeFromIndexes(true).build()); + builder.set(CHILD_GRAPH_GUID_PROPERTY, + StringValue.newBuilder(childGraphGuid).setExcludeFromIndexes(true).build()); } if (startTime != null) { builder.set(START_TIME_PROPERTY, Timestamp.of(Date.from(startTime))); } if (endTime != null) { - builder.set(END_TIME_PROPERTY, TimestampValue.newBuilder(Timestamp.of(Date.from(endTime))).setExcludeFromIndexes(true).build()); + builder.set(END_TIME_PROPERTY, + TimestampValue.newBuilder(Timestamp.of(Date.from(endTime))).setExcludeFromIndexes(true).build()); } builder.set(CHILD_KEYS_PROPERTY, childKeys.stream().map(KeyValue::of).collect(Collectors.toList())); builder.set(ATTEMPT_NUM_PROPERTY, LongValue.newBuilder(attemptNumber).setExcludeFromIndexes(true).build()); @@ -346,15 +379,19 @@ public Entity toEntity() { builder.set(BACKOFF_SECONDS_PROPERTY, LongValue.newBuilder(backoffSeconds).setExcludeFromIndexes(true).build()); builder.set(BACKOFF_FACTOR_PROPERTY, LongValue.newBuilder(backoffFactor).setExcludeFromIndexes(true).build()); - // good idea? or should we force jobs to have these values (take defaults, if nothing else?) + // good idea? or should we force jobs to have these values (take defaults, if + // nothing else?) if (queueSettings.getOnService() != null) { - builder.set(ON_SERVICE_PROPERTY, StringValue.newBuilder(queueSettings.getOnService()).setExcludeFromIndexes(true).build()); + builder.set(ON_SERVICE_PROPERTY, + StringValue.newBuilder(queueSettings.getOnService()).setExcludeFromIndexes(true).build()); } if (queueSettings.getOnServiceVersion() != null) { - builder.set(ON_SERVICE_VERSION_PROPERTY, StringValue.newBuilder(queueSettings.getOnServiceVersion()).setExcludeFromIndexes(true).build()); + builder.set(ON_SERVICE_VERSION_PROPERTY, + StringValue.newBuilder(queueSettings.getOnServiceVersion()).setExcludeFromIndexes(true).build()); } if (queueSettings.getOnQueue() != null) { - builder.set(ON_QUEUE_PROPERTY, StringValue.newBuilder(queueSettings.getOnQueue()).setExcludeFromIndexes(true).build()); + builder.set(ON_QUEUE_PROPERTY, + StringValue.newBuilder(queueSettings.getOnQueue()).setExcludeFromIndexes(true).build()); } if (statusConsoleUrl != null) { @@ -376,22 +413,25 @@ public Entity toEntity() { * is created during the run() method of a parent job. The parent job is also * known as the generator job. * - * @param generatorJob The parent generator job of this job. - * @param graphGUIDParam The GUID of the local graph of this job. - * @param jobInstance The non-null user-supplied instance of {@code Job} that - * implements the Job that the newly created JobRecord represents. + * @param generatorJob The parent generator job of this job. + * @param graphGUIDParam The GUID of the local graph of this job. + * @param jobInstance The non-null user-supplied instance of + * {@code Job} that + * implements the Job that the newly created + * JobRecord represents. * @param callExceptionHandler The flag that indicates that this job should call - * {@code Job#handleException(Throwable)} instead of {@code run}. - * @param settings Array of {@code JobSetting} to apply to the newly created - * JobRecord. + * {@code Job#handleException(Throwable)} instead of + * {@code run}. + * @param settings Array of {@code JobSetting} to apply to the newly + * created + * JobRecord. */ public JobRecord(@NonNull JobRecord generatorJob, - String graphGUIDParam, - Job jobInstance, - boolean callExceptionHandler, - JobSetting[] settings, - @NonNull SerializationStrategy serializationStrategy - ) { + String graphGUIDParam, + Job jobInstance, + boolean callExceptionHandler, + JobSetting[] settings, + @NonNull SerializationStrategy serializationStrategy) { this(generatorJob.getRootJobKey(), null, generatorJob.getKey(), graphGUIDParam, jobInstance, callExceptionHandler, settings, generatorJob.getQueueSettings(), serializationStrategy); // If generator job has exception handler then it should be called in case @@ -441,12 +481,22 @@ private JobRecord(Key rootJobKey, Key thisKey, Key generatorJobKey, String graph } projectId = rootJobKey.getProjectId(); namespace = JobSetting.getSettingValue(JobSetting.DatastoreNamespace.class, settings) - .orElse(null); + .orElse(generatorJobKey != null ? generatorJobKey.getNamespace() : null); + + // Look up database setting, falling back to generator job's database, or null + String defaultDbId = null; + if (generatorJobKey != null) { + defaultDbId = generatorJobKey.getDatabaseId() == null || generatorJobKey.getDatabaseId().isEmpty() ? null + : generatorJobKey.getDatabaseId(); + } + databaseId = JobSetting.getSettingValue(JobSetting.DatastoreDatabase.class, settings) + .orElse(defaultDbId); } // Constructor for Root Jobs (called by {@link #createRootJobRecord}). private JobRecord(Key key, Job jobInstance, JobSetting[] settings, SerializationStrategy serializationStrategy) { - // Root Jobs have their rootJobKey the same as their keys and provide null for generatorKey + // Root Jobs have their rootJobKey the same as their keys and provide null for + // generatorKey // and graphGUID. Also, callExceptionHandler is always false. this(key, key, null, null, jobInstance, false, settings, null, serializationStrategy); rootJobDisplayName = jobInstance.getJobDisplayName(); @@ -456,23 +506,27 @@ private JobRecord(Key key, Job jobInstance, JobSetting[] settings, Serializat * A factory method for root jobs. * * @param projectId The project id of the pipeline - * @param jobInstance The non-null user-supplied instance of {@code Job} that - * implements the Job that the newly created JobRecord represents. + * @param jobInstance The non-null user-supplied instance of + * {@code Job} that + * implements the Job that the newly created + * JobRecord represents. * @param serializationStrategy - * @param settings Array of {@code JobSetting} to apply to the newly created + * @param settings Array of {@code JobSetting} to apply to the + * newly created * JobRecord. */ public static JobRecord createRootJobRecord(String projectId, - Job jobInstance, - SerializationStrategy serializationStrategy, - JobSetting[] settings) { + Job jobInstance, + SerializationStrategy serializationStrategy, + JobSetting[] settings) { String namespace = JobSetting.getSettingValue(JobSetting.DatastoreNamespace.class, settings) - .orElse(null); - Key key = generateKey(projectId, namespace, DATA_STORE_KIND); + .orElse(null); + String databaseId = JobSetting.getSettingValue(JobSetting.DatastoreDatabase.class, settings) + .orElse(null); + Key key = generateKey(projectId, databaseId, namespace, DATA_STORE_KIND); return new JobRecord(key, jobInstance, settings, serializationStrategy); } - public static boolean isExceptionHandlerSpecified(Job jobInstance) { boolean result = false; Class clazz = jobInstance.getClass(); @@ -516,8 +570,9 @@ private void applySetting(JobSetting setting) { queueSettings.setOnQueue(((OnQueue) setting).getValue()); } else if (setting instanceof StatusConsoleUrl) { statusConsoleUrl = ((StatusConsoleUrl) setting).getValue(); - } else if (setting instanceof JobSetting.DatastoreNamespace) { - //ignore; applied in constructor, bc it's final + } else if (setting instanceof JobSetting.DatastoreNamespace || + setting instanceof JobSetting.DatastoreDatabase) { + // ignore; applied in constructor, bc they are final } else { throw new RuntimeException("Unrecognized JobSetting class " + setting.getClass().getName()); } @@ -559,7 +614,8 @@ public void inflate(Barrier runBarrier, Barrier finalizeBarrier, Slot outputSlot } /** - * Used to set exceptionHandling Job output to the same slot as the protected job. + * Used to set exceptionHandling Job output to the same slot as the protected + * job. */ public void setOutputSlotInflated(Slot outputSlot) { outputSlotInflated = outputSlot; @@ -659,6 +715,7 @@ public static Key key(String projectId, String databaseId, String namespace, Str @VisibleForTesting public static Key keyFromPipelineHandle(JobRunId pipelineHandle) { - return key(pipelineHandle.getProject(), pipelineHandle.getDatabaseId(), pipelineHandle.getNamespace(), pipelineHandle.getJobId()); + return key(pipelineHandle.getProject(), pipelineHandle.getDatabaseId(), pipelineHandle.getNamespace(), + pipelineHandle.getJobId()); } } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/model/PipelineModelObject.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/model/PipelineModelObject.java index a5c3b4fb..f6ad181b 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/model/PipelineModelObject.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/model/PipelineModelObject.java @@ -14,18 +14,27 @@ package com.google.appengine.tools.pipeline.impl.model; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import com.google.appengine.tools.pipeline.impl.util.EntityUtils; -import com.google.cloud.datastore.*; import com.google.appengine.tools.pipeline.impl.util.GUIDGenerator; +import com.google.cloud.datastore.Entity; +import com.google.cloud.datastore.Key; +import com.google.cloud.datastore.KeyFactory; +import com.google.cloud.datastore.PathElement; +import com.google.cloud.datastore.Value; + import lombok.Getter; import lombok.NonNull; import lombok.Setter; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.*; -import java.util.stream.Collectors; - /** * The parent class of all Pipeline model objects. * @@ -70,40 +79,58 @@ public abstract class PipelineModelObject implements ExpiringDatastoreEntity { private final String graphGUID; /** - * time at which this entity should expire, and be cleaned up from the datastore, if any. + * time at which this entity should expire, and be cleaned up from the + * datastore, if any. * * null implies no expiration (entity will not be automatically cleaned up) * - * NOTE: will NOT be enforced unless you also configure retention TTL policy on your datastore instance. + * NOTE: will NOT be enforced unless you also configure retention TTL policy on + * your datastore instance. * * see: https://cloud.google.com/datastore/docs/ttl */ - @Getter @Setter + @Getter + @Setter private Instant expireAt; /** * Construct a new PipelineModelObject from the provided data. * - * @param rootJobKey The key of the root job for this pipeline. This must be - * non-null, except in the case that we are currently constructing the - * root job. In that case {@code thisKey} and {@code egParentKey} must - * both be null and this must be a {@link JobRecord}. - * @param egParentKey The entity group parent key. This must be null unless - * {@code thisKey} is null. If {@code thisKey} is null then - * {@code parentKey} will be used to construct {@code thisKey}. - * {@code parentKey} and {@code thisKey} are both allowed to be null, - * in which case {@code thisKey} will be constructed without a parent. - * @param thisKey The key for the object being constructed. If this is null - * then a new key will be constructed. + * @param rootJobKey The key of the root job for this pipeline. This must + * be + * non-null, except in the case that we are currently + * constructing the + * root job. In that case {@code thisKey} and + * {@code egParentKey} must + * both be null and this must be a {@link JobRecord}. + * @param egParentKey The entity group parent key. This must be null unless + * {@code thisKey} is null. If {@code thisKey} is null + * then + * {@code parentKey} will be used to construct + * {@code thisKey}. + * {@code parentKey} and {@code thisKey} are both allowed + * to be null, + * in which case {@code thisKey} will be constructed + * without a parent. + * @param thisKey The key for the object being constructed. If this is + * null + * then a new key will be constructed. * @param generatorJobKey The key of the job whose run() method created this - * object. This must be non-null unless this object is part of the root - * job graph---i.e. the root job, or one of its barriers or slots. - * @param graphGUID The unique GUID of the local graph of this object. This is - * used to determine whether or not this object is orphaned. The object - * is defined to be non-orphaned if its graphGUID is equal to the - * childGraphGUID of its parent job. This must be non-null unless this - * object is part of the root job graph---i.e. the root job, or one of - * its barriers or slots. + * object. This must be non-null unless this object is + * part of the root + * job graph---i.e. the root job, or one of its barriers + * or slots. + * @param graphGUID The unique GUID of the local graph of this object. + * This is + * used to determine whether or not this object is + * orphaned. The object + * is defined to be non-orphaned if its graphGUID is + * equal to the + * childGraphGUID of its parent job. This must be + * non-null unless this + * object is part of the root job graph---i.e. the root + * job, or one of + * its barriers or slots. */ protected PipelineModelObject( @NonNull Key rootJobKey, Key egParentKey, Key thisKey, Key generatorJobKey, String graphGUID) { @@ -119,7 +146,8 @@ protected PipelineModelObject( if (null == thisKey) { if (egParentKey == null) { - key = generateKey(rootJobKey.getProjectId(), rootJobKey.getNamespace(), getDatastoreKind()); + key = generateKey(rootJobKey.getProjectId(), rootJobKey.getDatabaseId(), rootJobKey.getNamespace(), + getDatastoreKind()); } else { key = generateKey(egParentKey, getDatastoreKind()); } @@ -138,18 +166,28 @@ protected PipelineModelObject( * generatorJobKey, and graphGUID, a newly generated key, and no entity group * parent. * - * @param rootJobKey The key of the root job for this pipeline. This must be - * non-null, except in the case that we are currently constructing the - * root job. In that case this must be a {@link JobRecord}. + * @param rootJobKey The key of the root job for this pipeline. This must + * be + * non-null, except in the case that we are currently + * constructing the + * root job. In that case this must be a + * {@link JobRecord}. * @param generatorJobKey The key of the job whose run() method created this - * object. This must be non-null unless this object is part of the root - * job graph---i.e. the root job, or one of its barriers or slots. - * @param graphGUID The unique GUID of the local graph of this object. This is - * used to determine whether or not this object is orphaned. The object - * is defined to be non-orphaned if its graphGUID is equal to the - * childGraphGUID of its parent job. This must be non-null unless this - * object is part of the root job graph---i.e. the root job, or one of - * its barriers or slots. + * object. This must be non-null unless this object is + * part of the root + * job graph---i.e. the root job, or one of its barriers + * or slots. + * @param graphGUID The unique GUID of the local graph of this object. + * This is + * used to determine whether or not this object is + * orphaned. The object + * is defined to be non-orphaned if its graphGUID is + * equal to the + * childGraphGUID of its parent job. This must be + * non-null unless this + * object is part of the root job graph---i.e. the root + * job, or one of + * its barriers or slots. */ protected PipelineModelObject(Key rootJobKey, Key generatorJobKey, String graphGUID) { this(rootJobKey, null, null, generatorJobKey, graphGUID); @@ -159,7 +197,7 @@ protected PipelineModelObject(Key rootJobKey, Key generatorJobKey, String graphG * Construct a new PipelineModelObject from the previously saved Entity. * * @param entity An Entity obtained previously from a call to - * {@link #toEntity()}. + * {@link #toEntity()}. */ protected PipelineModelObject(Entity entity) { this(extractRootJobKey(entity), null, extractKey(entity), extractGeneratorJobKey(entity), @@ -176,36 +214,44 @@ protected static Key generateKey(Key parentKey, String kind) { String name = GUIDGenerator.nextGUID(); KeyFactory keyFactory = new KeyFactory(parentKey.getProjectId(), parentKey.getNamespace()); + if (parentKey.getDatabaseId() != null && !parentKey.getDatabaseId().isEmpty()) { + keyFactory.setDatabaseId(parentKey.getDatabaseId()); + } keyFactory.addAncestors(parentKey.getAncestors()); keyFactory.addAncestor(PathElement.of(parentKey.getKind(), parentKey.getName())); keyFactory.setKind(kind); return keyFactory.newKey(name); } + public static Key generateKey(@NonNull String projectId, String databaseId, String namespace, + @NonNull String dataStoreKind) { - public static Key generateKey(@NonNull String projectId, String namespace, @NonNull String dataStoreKind) { - - //ISO date + time (to second) as a suffix, to aid human readability/traceability; any place we log job id, we know when it was triggered + // ISO date + time (to second) as a suffix, to aid human + // readability/traceability; any place we log job id, we know when it was + // triggered // q: why not swap it to a prefix? // pro: - // - even easier to read - // - free index by time + // - even easier to read + // - free index by time // con: - // - index will be hot - - // TODO: swap this once have per-tenant database/namespace, which should limit the index overheat issue - String name = - GUIDGenerator.nextGUID().replace("-", "") //avoid collision - + "_" + - Instant.now().truncatedTo(ChronoUnit.SECONDS).toString() - .replace(":", "") - .replace("T", "_") - .replace("Z", "") - .replace("-", ""); + // - index will be hot + + // TODO: swap this once have per-tenant database/namespace, which should limit + // the index overheat issue + String name = GUIDGenerator.nextGUID().replace("-", "") // avoid collision + + "_" + + Instant.now().truncatedTo(ChronoUnit.SECONDS).toString() + .replace(":", "") + .replace("T", "_") + .replace("Z", "") + .replace("-", ""); KeyFactory keyFactory = new KeyFactory(projectId); - if (namespace != null ) { //null implies default + if (databaseId != null && !databaseId.isEmpty()) { + keyFactory.setDatabaseId(databaseId); + } + if (namespace != null) { // null implies default keyFactory.setNamespace(namespace); } keyFactory.setKind(dataStoreKind); @@ -266,8 +312,8 @@ protected static List buildInflated(Collection listOfIds, Map List getListProperty(String propertyName, Entity entity) { if (entity.contains(propertyName)) { return (List) entity.getList(propertyName).stream() - .map(Value::get) - .collect(Collectors.toCollection(ArrayList::new)); + .map(Value::get) + .collect(Collectors.toCollection(ArrayList::new)); } else { return new LinkedList<>(); } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/PipelineTask.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/PipelineTask.java index 014c1921..6e344df9 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/PipelineTask.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/PipelineTask.java @@ -14,12 +14,6 @@ package com.google.appengine.tools.pipeline.impl.tasks; -import com.google.appengine.tools.pipeline.impl.QueueSettings; -import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesService; -import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; -import lombok.Getter; -import lombok.NonNull; - import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.time.Instant; @@ -28,12 +22,21 @@ import java.util.Properties; import java.util.Set; +import com.google.appengine.tools.pipeline.impl.QueueSettings; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesService; +import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; + +import lombok.Getter; +import lombok.NonNull; + /** - * A Pipeline Framework task to be executed asynchronously This is the abstract base class for all + * A Pipeline Framework task to be executed asynchronously This is the abstract + * base class for all * Pipeline task types. * * q: kinda analogous to a StepExecution in Spring Batch? - * --> yeah, think so; not *really* coupled to GAE Task queue ... that's more of implementation detail of runner + * --> yeah, think so; not *really* coupled to GAE Task queue ... that's more of + * implementation detail of runner * *

      * This class represents both a task to be enqueued and a task being handled. @@ -41,7 +44,8 @@ * When enqueueing a task, construct a concrete subclass with the appropriate * data, and then add the task to an * {@link com.google.appengine.tools.pipeline.impl.backend.UpdateSpec} and - * {@link com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd#save save}. + * {@link com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd#save + * save}. * Alternatively the task may be enqueued directly using * {@link com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd#enqueue(PipelineTask)}. *

      @@ -56,11 +60,14 @@ public abstract class PipelineTask { protected static final String TASK_TYPE_PARAMETER = "taskType"; - @Getter @NonNull + @Getter + @NonNull private final Type type; - @Getter // nullable, if not a deterministically 'named' task (will get a name upon enqueue) + @Getter // nullable, if not a deterministically 'named' task (will get a name upon + // enqueue) private final String taskName; - @Getter @NonNull + @Getter + @NonNull private final QueueSettings queueSettings; private enum TaskProperty { @@ -111,15 +118,52 @@ String getProperty(PipelineTask pipelineTask) { Long delay = pipelineTask.getQueueSettings().getDelayInSeconds(); return delay == null ? null : delay.toString(); } + }, + DATASTORE_DATABASE_ID { + @Override + String getPropertyName() { + return "dsDatabaseId"; + } + + @Override + void setProperty(PipelineTask pipelineTask, String value) { + pipelineTask.getQueueSettings().setDatabaseId(value); + } + + @Override + String getProperty(PipelineTask pipelineTask) { + return pipelineTask.getQueueSettings().getDatabaseId(); + } + }, + DATASTORE_NAMESPACE { + @Override + String getPropertyName() { + return "dsNamespace"; + } + + @Override + void setProperty(PipelineTask pipelineTask, String value) { + pipelineTask.getQueueSettings().setNamespace(value); + } + + @Override + String getProperty(PipelineTask pipelineTask) { + return pipelineTask.getQueueSettings().getNamespace(); + } }; static final Set ALL = EnumSet.allOf(TaskProperty.class); + String getPropertyName() { + return name(); + } + abstract void setProperty(PipelineTask pipelineTask, String value); + abstract String getProperty(PipelineTask pipelineTask); void applyFrom(PipelineTask pipelineTask, Properties properties) { - String value = properties.getProperty(name()); + String value = properties.getProperty(getPropertyName()); if (value != null) { setProperty(pipelineTask, value); } @@ -128,7 +172,7 @@ void applyFrom(PipelineTask pipelineTask, Properties properties) { void addTo(PipelineTask pipelineTask, Properties properties) { String value = getProperty(pipelineTask); if (value != null) { - properties.setProperty(name(), value); + properties.setProperty(getPropertyName(), value); } } } @@ -187,8 +231,6 @@ protected PipelineTask(Type type, String taskName, Properties properties) { } } - - public final Properties toProperties() { Properties properties = new Properties(); properties.setProperty(TASK_TYPE_PARAMETER, type.toString()); @@ -199,22 +241,29 @@ public final Properties toProperties() { return properties; } - public PipelineTaskQueue.TaskSpec toTaskSpec(AppEngineServicesService appEngineServicesService, String callback) { PipelineTaskQueue.TaskSpec.TaskSpecBuilder spec = PipelineTaskQueue.TaskSpec.builder() - .name(this.getTaskName()) - .callbackPath(callback) - .method(PipelineTaskQueue.TaskSpec.Method.POST); + .name(this.getTaskName()) + .callbackPath(callback) + .method(PipelineTaskQueue.TaskSpec.Method.POST); this.toProperties().entrySet() - .forEach(p -> spec.param((String) p.getKey(), (String) p.getValue())); + .forEach(p -> spec.param((String) p.getKey(), (String) p.getValue())); if (this.getQueueSettings().getDelayInSeconds() != null) { spec.scheduledExecutionTime(Instant.now().plusSeconds(this.getQueueSettings().getDelayInSeconds())); } + if (this.getQueueSettings().getDatabaseId() != null) { + spec.param("dsDatabaseId", this.getQueueSettings().getDatabaseId()); + } + + if (this.getQueueSettings().getNamespace() != null) { + spec.param("dsNamespace", this.getQueueSettings().getNamespace()); + } + String service = Optional.ofNullable(this.getQueueSettings().getOnService()) - .orElseGet(appEngineServicesService::getDefaultService); + .orElseGet(appEngineServicesService::getDefaultService); spec.service(service); String version = this.getQueueSettings().getOnServiceVersion(); @@ -234,7 +283,7 @@ public PipelineTaskQueue.TaskSpec toTaskSpec(AppEngineServicesService appEngineS public static PipelineTask fromProperties(String taskName, @NonNull Properties properties) { String taskTypeString = properties.getProperty(TASK_TYPE_PARAMETER); if (null == taskTypeString) { - throw new IllegalArgumentException(TASK_TYPE_PARAMETER + " property is missing: " + properties); + throw new IllegalArgumentException(TASK_TYPE_PARAMETER + " property is missing: " + properties); } Type type = Type.valueOf(taskTypeString); return type.createInstance(taskName, properties); diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/JobSettingTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/JobSettingTest.java new file mode 100644 index 00000000..563e29cc --- /dev/null +++ b/java/src/test/java/com/google/appengine/tools/pipeline/JobSettingTest.java @@ -0,0 +1,43 @@ +package com.google.appengine.tools.pipeline; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +class JobSettingTest { + + @Test + void testDatastoreDatabaseValidation() { + assertDoesNotThrow(() -> new JobSetting.DatastoreDatabase(null)); + assertDoesNotThrow(() -> new JobSetting.DatastoreDatabase("")); + assertDoesNotThrow(() -> new JobSetting.DatastoreDatabase("(default)")); + assertDoesNotThrow(() -> new JobSetting.DatastoreDatabase("my-database-123")); + + IllegalArgumentException e1 = assertThrows(IllegalArgumentException.class, + () -> new JobSetting.DatastoreDatabase("123-starts-with-number")); + assertEquals("Invalid Datastore database ID: 123-starts-with-number", e1.getMessage()); + + assertThrows(IllegalArgumentException.class, () -> new JobSetting.DatastoreDatabase("UPPERCASE")); + assertThrows(IllegalArgumentException.class, () -> new JobSetting.DatastoreDatabase("contains space")); + assertThrows(IllegalArgumentException.class, () -> new JobSetting.DatastoreDatabase("a".repeat(64))); // max + // length + // 63 + } + + @Test + void testDatastoreNamespaceValidation() { + assertDoesNotThrow(() -> new JobSetting.DatastoreNamespace(null)); + assertDoesNotThrow(() -> new JobSetting.DatastoreNamespace("")); + assertDoesNotThrow(() -> new JobSetting.DatastoreNamespace("my-namespace_1.0")); + + IllegalArgumentException e1 = assertThrows(IllegalArgumentException.class, + () -> new JobSetting.DatastoreNamespace("invalid space")); + assertEquals("Invalid Datastore namespace: invalid space", e1.getMessage()); + + assertThrows(IllegalArgumentException.class, () -> new JobSetting.DatastoreNamespace("a".repeat(101))); // max + // length + // 100 + } +} diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/impl/model/JobRecordTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/impl/model/JobRecordTest.java new file mode 100644 index 00000000..3fd302f4 --- /dev/null +++ b/java/src/test/java/com/google/appengine/tools/pipeline/impl/model/JobRecordTest.java @@ -0,0 +1,79 @@ +package com.google.appengine.tools.pipeline.impl.model; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.Test; + +import com.google.appengine.tools.pipeline.Job; +import com.google.appengine.tools.pipeline.JobSetting; +import com.google.appengine.tools.pipeline.impl.backend.SerializationStrategy; +import com.google.cloud.datastore.Key; + +class JobRecordTest { + + @Test + void testRootJobRecordSettings() { + Job jobInstance = mock(Job.class); + SerializationStrategy serializationStrategy = mock(SerializationStrategy.class); + + JobSetting.DatastoreDatabase dbSetting = new JobSetting.DatastoreDatabase("my-db"); + JobSetting.DatastoreNamespace nsSetting = new JobSetting.DatastoreNamespace("my-ns"); + + JobSetting[] settings = new JobSetting[] { dbSetting, nsSetting }; + + JobRecord rootJob = JobRecord.createRootJobRecord("my-project", jobInstance, serializationStrategy, settings); + + assertEquals("my-db", rootJob.getDatabaseId()); + assertEquals("my-ns", rootJob.getNamespace()); + } + + @Test + void testSubJobRecordInheritsGeneratorSettings() { + Job jobInstance = mock(Job.class); + SerializationStrategy serializationStrategy = mock(SerializationStrategy.class); + + Key rootJobKey = Key.newBuilder("my-project", "JobRecord", "root-job").build(); + Key generatorJobKey = Key.newBuilder("my-project", "JobRecord", "gen-job") + .setDatabaseId("gen-db") + .setNamespace("gen-ns") + .build(); + + JobRecord mockGenerator = mock(JobRecord.class); + when(mockGenerator.getRootJobKey()).thenReturn(rootJobKey); + when(mockGenerator.getKey()).thenReturn(generatorJobKey); + + JobRecord subJob = new JobRecord(mockGenerator, "graph-id", + jobInstance, false, new JobSetting[0], serializationStrategy); + + assertEquals("gen-db", subJob.getDatabaseId()); + assertEquals("gen-ns", subJob.getNamespace()); + } + + @Test + void testSubJobRecordOverridesGeneratorSettings() { + Job jobInstance = mock(Job.class); + SerializationStrategy serializationStrategy = mock(SerializationStrategy.class); + + Key rootJobKey = Key.newBuilder("my-project", "JobRecord", "root-job").build(); + Key generatorJobKey = Key.newBuilder("my-project", "JobRecord", "gen-job") + .setDatabaseId("gen-db") + .setNamespace("gen-ns") + .build(); + + JobRecord mockGenerator = mock(JobRecord.class); + when(mockGenerator.getRootJobKey()).thenReturn(rootJobKey); + when(mockGenerator.getKey()).thenReturn(generatorJobKey); + + JobSetting.DatastoreDatabase dbSetting = new JobSetting.DatastoreDatabase("new-db"); + JobSetting.DatastoreNamespace nsSetting = new JobSetting.DatastoreNamespace("new-ns"); + JobSetting[] settings = new JobSetting[] { dbSetting, nsSetting }; + + JobRecord subJob = new JobRecord(mockGenerator, "graph-id", + jobInstance, false, settings, serializationStrategy); + + assertEquals("new-db", subJob.getDatabaseId()); + assertEquals("new-ns", subJob.getNamespace()); + } +} diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/impl/model/PipelineModelObjectTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/impl/model/PipelineModelObjectTest.java index 09b0969e..50da6159 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/impl/model/PipelineModelObjectTest.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/impl/model/PipelineModelObjectTest.java @@ -1,22 +1,24 @@ package com.google.appengine.tools.pipeline.impl.model; -import com.google.cloud.datastore.Key; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; +import com.google.cloud.datastore.Key; class PipelineModelObjectTest { @Test void generateKey() { - Key key = PipelineModelObject.generateKey("project", "ns", "Kind"); + Key key = PipelineModelObject.generateKey("project", null, "ns", "Kind"); assertEquals("project", key.getProjectId()); assertEquals("ns", key.getNamespace()); assertEquals("Kind", key.getKind()); - //validate key.getName() is legal for GCP cloud datastore + // validate key.getName() is legal for GCP cloud datastore assertTrue(key.getName().matches("^[a-zA-Z0-9\\-_.~]+$")); } } \ No newline at end of file diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/impl/tasks/PipelineTaskTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/impl/tasks/PipelineTaskTest.java new file mode 100644 index 00000000..689918fd --- /dev/null +++ b/java/src/test/java/com/google/appengine/tools/pipeline/impl/tasks/PipelineTaskTest.java @@ -0,0 +1,54 @@ +package com.google.appengine.tools.pipeline.impl.tasks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Properties; + +import org.junit.jupiter.api.Test; + +import com.google.appengine.tools.pipeline.impl.QueueSettings; +import com.google.cloud.datastore.Key; + +class PipelineTaskTest { + + @Test + void testToAndFromPropertiesWithDatastoreSettings() { + QueueSettings queueSettings = new QueueSettings(); + queueSettings.setDatabaseId("my-database"); + queueSettings.setNamespace("my-namespace"); + + Key dummyKey = Key.newBuilder("my-project", "MyKind", "my-key").build(); + RunJobTask task = new RunJobTask(dummyKey, queueSettings); + + Properties properties = task.toProperties(); + + assertEquals("my-database", properties.getProperty("dsDatabaseId")); + assertEquals("my-namespace", properties.getProperty("dsNamespace")); + assertEquals(PipelineTask.Type.RUN_JOB.name(), properties.getProperty(PipelineTask.TASK_TYPE_PARAMETER)); + + PipelineTask reconstructed = PipelineTask.fromProperties(task.getTaskName(), properties); + assertNotNull(reconstructed); + assertEquals("my-database", reconstructed.getQueueSettings().getDatabaseId()); + assertEquals("my-namespace", reconstructed.getQueueSettings().getNamespace()); + } + + @Test + void testToAndFromPropertiesWithoutDatastoreSettings() { + QueueSettings queueSettings = new QueueSettings(); + + Key dummyKey = Key.newBuilder("my-project", "MyKind", "my-key").build(); + RunJobTask task = new RunJobTask(dummyKey, queueSettings); + + Properties properties = task.toProperties(); + + assertNull(properties.getProperty("dsDatabaseId")); + assertNull(properties.getProperty("dsNamespace")); + + PipelineTask reconstructed = PipelineTask.fromProperties(task.getTaskName(), properties); + assertNotNull(reconstructed); + assertNull(reconstructed.getQueueSettings().getDatabaseId()); + assertNull(reconstructed.getQueueSettings().getNamespace()); + } +} diff --git a/java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000..fdbd0b15 --- /dev/null +++ b/java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-subclass