diff --git a/.agents/CLAUDE.md b/.agents/CLAUDE.md new file mode 100644 index 00000000..f67a352d --- /dev/null +++ b/.agents/CLAUDE.md @@ -0,0 +1,99 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Overview + +Worklytics' fork of Google's App Engine Pipelines Framework — a Java library for orchestrating asynchronous workflows on Google App Engine. All source lives under `java/`; the root contains only docs and the GitHub workflow. + +## Commands + +All Maven commands must be run from `java/`: + +```shell +# Run all tests +cd java && mvn test + +# Run a single test class +cd java && mvn test -Dtest=PipelineTest + +# Run a test by package pattern (as CI does) +cd java && mvn test -Dtest="com.google.appengine.tools.pipeline.**" + +# Build JAR +cd java && mvn package + +# Publish to GitHub Packages (requires ~/.m2/settings.xml with GitHub token) +cd java && mvn deploy +``` + +Tests require a Datastore emulator; `DatastoreExtension` starts it automatically. The env var `GOOGLE_CLOUD_PROJECT=test-project` is injected by `maven-surefire-plugin` in `pom.xml`. Failing tests are retried up to 3 times. + +## Architecture + +The library has three distinct roles mixed into one artifact: + +- **Client** — `PipelineService` / `PipelineOrchestrator` / `PipelineRunner`: API surface for starting pipelines, polling status, submitting promised values, and cancelling jobs. +- **Runner** — `PipelineServlet` + `TaskHandler` + `AppEngineBackEnd`: servlet-based execution engine that processes task-queue callbacks to advance pipeline state. +- **Admin UI** — JSON handlers (`JsonTreeHandler`, `JsonListHandler`, `JsonClassFilterHandler`) that serve pipeline state to the bundled web console. + +### Job model + +Users define pipelines by subclassing `Job0`…`Job6` (or `Job` directly) and implementing `run()`. Jobs are `Serializable`; instances are stored in Datastore and reconstituted on each execution attempt. + +Inside `run()`, child jobs are scheduled via `futureCall(new ChildJob(), arg1, arg2, ...)`, which returns a `FutureValue`. Passing a `FutureValue` as an argument to another `futureCall` declares a data dependency — the child won't run until its inputs are ready. `PromisedValue` is the mechanism for values provided by external agents; filled via `PipelineService.submitPromisedValue()`. + +`JobRunId` is a tuple `(project, database, namespace, name)` encoded as a colon-delimited string. `:` was chosen over `/` to avoid URL-encoding issues in some clients. + +### Persistence + +`AppEngineBackEnd` is the only production backend. It stores pipeline state in Google Cloud Datastore as `pipeline-job`, `pipeline-slot`, `pipeline-barrier`, and related entity kinds. All entities include a TTL field set 90 days in the future; you must create matching [Datastore TTL policies](https://cloud.google.com/datastore/docs/ttl) per entity kind for automatic cleanup. + +Datastore operations are retried with exponential backoff (up to 5 attempts) on `DatastoreException` or `IOException`. + +### Task queues + +Two implementations of `PipelineTaskQueue`: +- `CloudTasksTaskQueue` (default in production) — uses the Cloud Tasks v2 API. Caches queue location lookups. +- `AppEngineTaskQueue` (legacy, used in tests and when `USE_LEGACY_QUEUES` is set) — uses the old GAE SDK task queue API. + +Pipeline state-management tasks (`HandleSlotFilled`, `FinalizeJob`, etc.) are always sent to the default queue; only `RunJob` tasks respect `JobSetting.OnQueue`. + +### Dependency injection + +The framework uses Dagger 2 with custom scopes: + +- `JobRunServiceComponent` (`@Singleton`) — one per process; top-level component for the runner. Built from `AppEngineHostModule`. +- `StepExecutionComponent` (`@StepExecutionScoped`) — one per task-queue callback invocation. Created via `JobRunServiceComponent.stepExecutionComponent(StepExecutionModule)`. Provides `PipelineManager`, `PipelineService`, `PipelineRunner`, `ShardedJobRunner`. +- `MultiTenantComponent` / `TenantComponent` — used by the *client* side to obtain a `PipelineService` scoped to specific Datastore options (project/namespace/credentials). + +Job classes that need injected dependencies are annotated `@Injectable(DaggerMyContainer.class)`; `PipelineManager` calls the container's `inject()` via reflection before invoking `run()`. + +Servlets use `@AllArgsConstructor(onConstructor_ = @Inject)` so Dagger can construct them with dependencies — see `docs/servlet-di.md` for the rationale. + +### Multi-tenancy + +`AppEngineBackEnd.Options` carries `projectId`, `credentials`, and `DatastoreOptions` (which includes namespace). Pass tenant-specific options to `PipelineService.getInstance(options)` or via `JobSetting` to isolate pipeline data per tenant. + +### MapReduce / ShardedJob + +`MapReduceJob` and `MapJob` are `Job` subclasses that orchestrate sharded parallel work via `ShardedJobRunner`. Each shard runs as an `IncrementalTask`; state is stored as `IncrementalTaskState` in Datastore. `ShardedJobRunId` extends the pipeline `JobRunId` concept. + +## Testing patterns + +Test infrastructure is wired with JUnit 5 extensions: + +- `DatastoreExtension` — starts a `LocalDatastoreHelper` emulator before all tests, resets it between each test. +- `PipelineComponentsExtension` — builds `AppEngineBackEnd` + `StepExecutionComponent` against the emulator. +- `@PipelineSetupExtensions` — meta-annotation that applies both extensions; the standard setup for pipeline integration tests. + +Tests that extend `PipelineTest` get `pipelineService`, `pipelineManager`, and `appEngineBackend` injected. The `AppEngineTaskQueue` (not `CloudTasksTaskQueue`) is used in tests, backed by a `LocalTaskQueue`. + +## Local development environment variables + +| Variable | Effect | +|---|---| +| `USE_LEGACY_QUEUES` | Use GAE SDK task queues instead of Cloud Tasks API | +| `USE_LOCAL_SERVICE` | Treat all services as running on `localhost` as `default/v1` | +| `GAE_SERVICE_HOST_SUFFIX` | Override dynamic AppEngine service host lookup | +| `CLOUDTASKS_QUEUE_LOCATION` | Override dynamic Cloud Tasks queue location lookup | \ No newline at end of file diff --git a/.agents/plans/FIX-DATASTORE-TOBUILDER.md b/.agents/plans/FIX-DATASTORE-TOBUILDER.md new file mode 100644 index 00000000..45c6d521 --- /dev/null +++ b/.agents/plans/FIX-DATASTORE-TOBUILDER.md @@ -0,0 +1,242 @@ +# Fix: DatastoreOptions.toBuilder() drops host in google-cloud-datastore 2.40.0+ + +## Root Cause + +`DatastoreOptions` in `google-cloud-datastore` 2.40.0 (shipped in Google Cloud Java BOM 26.83.0) +introduced a **duplicate `host` field** inside `DatastoreOptions.Builder` that is separate from the +inherited `ServiceOptions.Builder.host` field. + +The `DatastoreOptions$Builder` setHost() correctly writes to **both** fields: + +```java +// Builder.setHost() bytecode: +putfield host // DatastoreOptions.Builder-own host (#69) +invokespecial ServiceOptions$Builder.setHost() // parent field +``` + +But the copy constructor used by `toBuilder()` only copies the DatastoreOptions-specific state +(namespace, databaseId, openTelemetryOptions, channelProvider) and **does NOT transfer the local +`host` field**: + +```java +// Builder copy-constructor references: access$000 (namespace), access$100 (databaseId), +// access$200 (openTelemetryOptions), access$300 (channelProvider) — no host access method. +``` + +Additionally, `Builder.build()` checks the local `host` field first: + +```java +// build() bytecode: +getfield host // local field #69 +ifnonnull → use it +// else: fall back to GrpcTransportOptions.getDefaultEndpoint() → real Cloud Datastore +``` + +**Result:** When `DATASTORE_EMULATOR_HOST` is set, `getDefaultInstance()` correctly configures the +emulator host. Calling `.toBuilder()` on that instance silently loses the host. The rebuilt options +then fall back to the real Cloud Datastore gRPC endpoint — so any entity written via the emulator +cannot be found by code that went through `toBuilder()`. + +This is exactly why pipeline slot lookups fail: the slot is written (job submission path) using +correct emulator `DatastoreOptions`, but the task-handler read path rebuilds options via +`getDefaultInstance().toBuilder()`, loses the host, and queries the real Cloud Datastore instead. + +--- + +## Pattern of the Fix + +Replace every `DatastoreOptions.getDefaultInstance().toBuilder()` (and any `.toBuilder()` call on +an existing `DatastoreOptions`) with an explicit `DatastoreOptions.newBuilder()` that copies each +field manually, including the host: + +```java +// BEFORE (broken in 2.40.0+): +DatastoreOptions.Builder builder = DatastoreOptions.getDefaultInstance().toBuilder(); + +// AFTER: +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); +Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); +``` + +For calls where the source is an existing `DatastoreOptions` instance (not `getDefaultInstance()`): + +```java +// BEFORE: +DatastoreOptions rebuilt = someOptions.toBuilder().build(); + +// AFTER: +DatastoreOptions rebuilt = DatastoreOptions.newBuilder() + .setProjectId(someOptions.getProjectId()) + .setCredentials(someOptions.getCredentials()) + .setTransportOptions(someOptions.getTransportOptions()) + .setHost(someOptions.getHost()) // must always set explicitly + .build(); +``` + +--- + +## Files to Fix + +### 1. `java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java` — **MOST CRITICAL** + +Lines 70–72. This is the task-handler entry point: every pipeline task call rebuilds `DatastoreOptions` +here. Losing the host means the task handler hits the real Cloud Datastore instead of the emulator. + +```java +// BEFORE (lines 70-72): +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder builder = defaultInstance.toBuilder(); + +// AFTER: +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); +Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); +``` + +Also add `import java.util.Optional;` if not already present (it already is in this file). + +--- + +### 2. `java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java` + +Line 53. `getDatastoreOptions()` already overrides host with `getDatastoreHost()` afterwards, so +this only manifests when `getDatastoreHost()` returns null — but the base copy is still wrong. + +```java +// BEFORE (line 53): +DatastoreOptions.Builder optionsBuilder = DatastoreOptions.getDefaultInstance().toBuilder(); + +// AFTER: +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder optionsBuilder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); +Optional.ofNullable(defaultInstance.getHost()).ifPresent(optionsBuilder::setHost); +``` + +--- + +### 3. `java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java` + +Four locations, all the same pattern. Lines 244, 348, and 483 are identical `getDatastore()` methods +in different inner classes; line 610 is `getDatastoreOptions()`. + +**Lines 244, 348, 483** (identical pattern in three inner classes): + +```java +// BEFORE: +datastore = DatastoreOptions.getDefaultInstance().toBuilder() + .setNamespace(settings.getNamespace()) + .build().getService(); + +// AFTER: +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder b = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); +Optional.ofNullable(defaultInstance.getHost()).ifPresent(b::setHost); +Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); +datastore = b.build().getService(); +``` + +**Line 610** (`getDatastoreOptions(MapReduceSettings)`): + +```java +// BEFORE: +DatastoreOptions.Builder builder = DatastoreOptions.getDefaultInstance().toBuilder(); + +// AFTER: +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); +Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); +``` + +--- + +### 4. `java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java` + +Line 34. The `datastoreOptions` field here is passed in from the caller. If the caller constructed +it correctly (using the explicit copy pattern), this is safe — but `.toBuilder().build()` is still a +no-op round-trip that can lose the host if the incoming options have it stored only in the +DatastoreOptions-level field. + +```java +// BEFORE (line 34): +Datastore datastore = datastoreOptions.toBuilder().build().getService(); + +// AFTER: +Datastore datastore = DatastoreOptions.newBuilder() + .setProjectId(datastoreOptions.getProjectId()) + .setCredentials(datastoreOptions.getCredentials()) + .setTransportOptions(datastoreOptions.getTransportOptions()) + .setHost(datastoreOptions.getHost()) + .build().getService(); +``` + +Or, since this is a round-trip with no changes, just use the instance directly: + +```java +Datastore datastore = datastoreOptions.getService(); +``` + +--- + +### 5. `java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java` + +Line 24. Same concern as #4 — the `toBuilder().build()` round-trip is used only to pass a +"fresh copy" to `DeleteShardsInfos`. Since `DatastoreOptions` is already immutable and +`DatastoreOptions` is serializable, just pass it directly: + +```java +// BEFORE (line 24): +return new DeleteShardsInfos(datastoreOptions.toBuilder().build(), getJobId(), start, end); + +// AFTER: +return new DeleteShardsInfos(datastoreOptions, getJobId(), start, end); +``` + +--- + +## Suggested Helper (Optional) + +To avoid repeating the copy pattern, consider adding a package-private utility: + +```java +// e.g., in RequestUtils or a new DatastoreUtils class +static DatastoreOptions.Builder copyOf(DatastoreOptions source) { + DatastoreOptions.Builder b = DatastoreOptions.newBuilder() + .setProjectId(source.getProjectId()) + .setCredentials(source.getCredentials()) + .setTransportOptions(source.getTransportOptions()); + Optional.ofNullable(source.getHost()).ifPresent(b::setHost); + Optional.ofNullable(source.getNamespace()).ifPresent(b::setNamespace); + Optional.ofNullable(source.getDatabaseId()).ifPresent(b::setDatabaseId); + return b; +} +``` + +Then each call site becomes: +```java +DatastoreOptions.Builder builder = copyOf(DatastoreOptions.getDefaultInstance()); +``` + +--- + +## Context + +This fix is required when consuming this library from a project using +**Google Cloud Java BOM ≥ 26.83.0** (which brings in `google-cloud-datastore` ≥ 2.40.0). +The library's own BOM (26.53.0 at time of writing) is not affected, but the host project's +BOM takes precedence at runtime. diff --git a/.github/workflows/test-java.yml b/.github/workflows/test-java.yml index 87122948..1ee4ed29 100644 --- a/.github/workflows/test-java.yml +++ b/.github/workflows/test-java.yml @@ -13,64 +13,79 @@ jobs: compile: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - uses: actions/setup-java@v4 + - uses: actions/checkout@v6 + - uses: actions/setup-java@v5 with: - distribution: 'adopt' - java-version: '17' # maven won't accept --release argument with java < 8; and 11 is next LTS + distribution: 'temurin' + java-version: '21' - id: 'read-cache-maven-packages' name: Read Cached Maven packages - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: ~/.m2 key: ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}- - name: Maven compile run: | cd java/ mvn clean compile - name: Cache compiled classes # Compiled classes are cached, later restored for tests - uses: actions/cache/save@v4 + uses: actions/cache/save@v5 with: path: ${{ github.workspace }}/**/target/**/*.* key: ${{ runner.os }}-${{ env.CODE_CACHE_VERSION }}-${{ github.ref_name }}-${{ github.sha }} - id: 'cache-maven-packages' name: Cache Maven packages - uses: actions/cache/save@v4 + uses: actions/cache/save@v5 with: path: ~/.m2 key: ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}-${{ hashFiles('**/pom.xml') }} test: + name: test (${{ matrix.name }}) runs-on: ubuntu-latest needs: compile strategy: + fail-fast: true matrix: - package: [ "com.google.appengine.tools.cloudtasktest.**", - "com.google.appengine.tools.mapreduce.impl.**", - "com.google.appengine.tools.mapreduce.inputs.**", - "com.google.appengine.tools.mapreduce.outputs.**", - "com.google.appengine.tools.mapreduce.*Test", - "com.google.appengine.tools.pipeline.**" ] + include: + - name: cloudtasktest + package: "com.google.appengine.tools.cloudtasktest.**" + - name: mapreduce-impl-N-to-Z + package: "!%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[A-M].+]" + - name: mapreduce-impl-A-to-M + package: "!%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[N-Z].+]" + - name: mapreduce-inputs + package: "com.google.appengine.tools.mapreduce.inputs.**" + - name: mapreduce-outputs + package: "com.google.appengine.tools.mapreduce.outputs.**" + - name: mapreduce-top-level + package: "com.google.appengine.tools.mapreduce.*Test" + - name: pipeline + package: "com.google.appengine.tools.pipeline.**" steps: - - uses: actions/checkout@v4 - - uses: actions/setup-java@v4 + - uses: actions/checkout@v6 + - uses: actions/setup-java@v5 with: distribution: 'adopt' - java-version: '17' # maven won't accept --release argument with java < 8; and 11 is next LTS + java-version: '21' # maven won't accept --release argument with java < 8; and 11 is next LTS - id: 'read-cache-maven-packages' name: Read Cached Maven packages - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: ~/.m2 key: ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}- - name: Restore compiled classes from cache - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: ${{ github.workspace }}/**/target/**/*.* key: ${{ runner.os }}-${{ env.CODE_CACHE_VERSION }}-${{ github.ref_name }}-${{ github.sha }} - - name: Run Tests for Package ${{ matrix.package }} + - name: Run Tests for Package ${{ matrix.name }} env: APPENGINE_MAPREDUCE_CI_SERVICE_ACCOUNT_KEY: ${{ secrets.APPENGINE_MAPREDUCE_CI_SERVICE_ACCOUNT_KEY }} run: | cd java/ - mvn test -T 1C -B -Dtest=${{ matrix.package }} -Dversions.logOutput=false -DprocessDependencies=false -DprocessDependencyManagement=false \ No newline at end of file + mvn test -T 1C -B "-Dtest=${{ matrix.package }}" -Dversions.logOutput=false -DprocessDependencies=false -DprocessDependencyManagement=false \ No newline at end of file diff --git a/java/appengine-pipeline-0.3+worklytics.12-pom.xml b/java/appengine-pipeline-0.3+worklytics.12-pom.xml deleted file mode 100644 index 1f55967a..00000000 --- a/java/appengine-pipeline-0.3+worklytics.12-pom.xml +++ /dev/null @@ -1,258 +0,0 @@ - - 4.0.0 - com.google.appengine.tools - appengine-pipeline - Pipeline Framework for Google App Engine - Framework for orchestrating complex workflows on top of Google App Engine - https://github.com/Worklytics/appengine-pipelines/ - - - - - 0.3+worklytics.12 - jar - - - The Apache Software License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0 - - - - UTF-8 - [2.7, 3.0) - 2.55 - 1.18.36 - [2.0.4, 3.0) - - - - - - github - Pipelines for Google App Engine by Worklytics - https://maven.pkg.github.com/Worklytics/appengine-pipelines - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.13.0 - - 17 - 17 - - - - org.apache.maven.plugins - maven-surefire-plugin - 3.2.5 - - - - test-project - 3 - - - - - org.apache.maven.plugins - maven-jar-plugin - 3.4.1 - - - - true - true - - - - - - - - ${basedir}/src/main/resources - com/google/appengine/tools/pipeline/impl/servlets - - - - - - - - com.google.cloud - libraries-bom - 26.53.0 - pom - import - - - - org.junit - junit-bom - 5.11.3 - pom - import - - - - - - - - - - com.google.cloud - google-cloud-appengine-admin - - - com.google.cloud - google-cloud-datastore - - - com.google.cloud - google-cloud-tasks - - - com.google.cloud - google-cloud-storage - - - com.google.guava - guava - [32.1,32.99] - - - - - - - - - javax.servlet - javax.servlet-api - 3.1.0 - provided - - - com.googlecode.charts4j - charts4j - 1.3 - - - org.json - json - 20231013 - - - it.unimi.dsi - fastutil - [6.5,6.6) - - - org.projectlombok - lombok - ${lombok.version} - provided - - - com.github.rholder - guava-retrying - 2.0.0 - - - - - - com.fasterxml.jackson.core - jackson-core - ${jackson.version} - - - com.fasterxml.jackson.core - jackson-annotations - ${jackson.version} - - - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} - - - com.fasterxml.jackson.datatype - jackson-datatype-jdk8 - ${jackson.version} - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - ${jackson.version} - - - jakarta.inject - jakarta.inject-api - 2.0.1 - - - - - - com.google.dagger - dagger - ${dagger.version} - - - - - - - - - - - - com.google.appengine - appengine-api-1.0-sdk - ${appengine.target.version} - test - - - - com.google.appengine - appengine-testing - ${appengine.target.version} - test - - - com.google.appengine - appengine-api-stubs - ${appengine.target.version} - test - - - org.junit.jupiter - junit-jupiter-api - test - - - org.junit.jupiter - junit-jupiter-engine - test - - - org.junit.jupiter - junit-jupiter-params - test - - - org.mockito - mockito-core - 5.15.2 - test - - - - \ No newline at end of file diff --git a/java/changes.md b/java/changes.md index b0f0eaf9..d8ec1e61 100644 --- a/java/changes.md +++ b/java/changes.md @@ -1,5 +1,12 @@ ## Change Log +### 0.3+worklytics.14 + - Fix: `DatastoreOptions.toBuilder()` drops the emulator host in `google-cloud-datastore` ≥ 2.40.0 + (Google Cloud Java BOM ≥ 26.83.0). Replaced all `toBuilder()` / `getDefaultInstance().toBuilder()` + calls with explicit `DatastoreOptions.newBuilder()` copies that preserve the `host` field. + Affected: `RequestUtils`, `ShardedJobAbstractSettings`, `MapReduceJob` (Sort/Merge/Reduce stages + and `getDatastoreOptions`), `FinalizeShardsInfos`, `DeleteShardedJob`. + ### 0.3+worklytics.7 (March 2025) - Handle cross-services transactionality - Fixes: serialization diff --git a/java/pom.xml b/java/pom.xml index 5d5a558e..9e8bd09e 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -17,9 +17,9 @@ - 0.3+worklytics.12 + 0.3+worklytics.14 UTF-8 - [2.18.2, 2.18.3) + [2.18.6, 2.19.0) 2.55 1.18.42 [2.0.4, 3.0) @@ -39,17 +39,29 @@ org.apache.maven.plugins maven-compiler-plugin - 3.13.0 + 3.15.0 - 17 - 17 - + 21 + 21 + false + + + org.projectlombok + lombok + ${lombok.version} + + + com.google.dagger + dagger-compiler + ${dagger.version} + + org.apache.maven.plugins maven-surefire-plugin - 3.2.5 + 3.5.2 @@ -61,7 +73,7 @@ org.apache.maven.plugins maven-jar-plugin - 3.4.1 + 3.5.0 @@ -85,7 +97,7 @@ com.google.cloud libraries-bom - 26.73.0 + 26.83.0 pom import @@ -137,11 +149,6 @@ 3.1.0 provided - - com.googlecode.charts4j - charts4j - 1.3 - org.json json @@ -167,6 +174,11 @@ + + com.googlecode.charts4j + charts4j + 1.3 + com.fasterxml.jackson.core jackson-core @@ -249,7 +261,7 @@ org.mockito mockito-core - 5.21.0 + 5.23.0 test diff --git a/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java b/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java new file mode 100644 index 00000000..9bd262d4 --- /dev/null +++ b/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java @@ -0,0 +1,82 @@ +package com.google.appengine.tools; + +import com.google.cloud.NoCredentials; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; +import com.google.cloud.datastore.DatastoreOptions; +import com.google.common.annotations.VisibleForTesting; +import lombok.extern.java.Log; + +@Log +public class EnvironmentUtils { + + // value of env var GOOGLE_CLOUD_PROJECT when running locally; underscores aren't actually legal in GCP project ids, + // so if this ever ends up being used in a real GCP API call, it blows up in validation before request is even sent by client + public static final String LOCAL_GAE_PROJECT_ID = "no_app_id"; + public static final String TEST_PROJECT_ID = "test-project"; + public static final String DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID = "local-gae-project"; + + /** + * Builds a DatastoreOptions.Builder pre-populated from the default instance, using + * NoCredentials in testing/emulator contexts (no ADC, or local GAE project id). + * @return + */ + public static DatastoreOptions.Builder datastoreBuilderFromDefaultInstance() { + DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); + return datastoreBuilderFromDatastoreOptions(defaultInstance); + } + + @VisibleForTesting + public static DatastoreOptions.Builder datastoreBuilderFromDatastoreOptions(DatastoreOptions datastoreOptions) { + // in case this needs to be overridden, there is a bug in toBuilder that loses the host + // so we need to copy over everything + DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(datastoreOptions.getProjectId()) + .setTransportOptions(datastoreOptions.getTransportOptions()) + .setDatabaseId(datastoreOptions.getDatabaseId()) + .setNamespace(datastoreOptions.getNamespace()) + .setHost(datastoreOptions.getHost()) + .setOpenTelemetryOptions(datastoreOptions.getOpenTelemetryOptions()); + + // set emulator host if needed + if (getDatastoreEmulatorHost() != null) { + builder.setHost(getDatastoreEmulatorHost()); + } + if (isNotCloudEnvironment(datastoreOptions)) { + // override credentials + builder.setCredentials(NoCredentials.getInstance()); + // set valid project id if needed + if (LOCAL_GAE_PROJECT_ID.equals(datastoreOptions.getProjectId())) { + log.info("pipelines fw detected running locally with GAE projectId set as '%s'; this isn't legal GCP project id, so changing to '%s'".formatted(LOCAL_GAE_PROJECT_ID, DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID)); + builder.setProjectId(DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID); + } + builder.setOpenTelemetryOptions(DatastoreOpenTelemetryOptions.newBuilder().build()); + } else if (datastoreOptions.getCredentials() != null) { + builder.setCredentials(datastoreOptions.getCredentials()); + } else { + log.warning("No credentials found for DatastoreOptions.Builder?"); + } + + return builder; + } + + @VisibleForTesting + public static boolean isNotCloudEnvironment() { + return isNotCloudEnvironment(DatastoreOptions.getDefaultInstance()); + } + + public static boolean isNotCloudEnvironment(String projectId) { + return projectId == null || + LOCAL_GAE_PROJECT_ID.equals(projectId) || + DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID.equals(projectId) || + TEST_PROJECT_ID.equals(projectId); + } + + private static boolean isNotCloudEnvironment(DatastoreOptions options) { + return isNotCloudEnvironment(options.getProjectId()); + } + + private static String getDatastoreEmulatorHost() { + return System.getProperty("DATASTORE_EMULATOR_HOST", System.getenv("DATASTORE_EMULATOR_HOST")); + } + +} diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java index d90e1422..05328d75 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java @@ -2,6 +2,7 @@ package com.google.appengine.tools.mapreduce; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.impl.BaseContext; import com.google.appengine.tools.mapreduce.impl.CountersImpl; import com.google.appengine.tools.mapreduce.impl.MapOnlyShardTask; @@ -127,7 +128,7 @@ public Value> handleException(CancellationException ex) { * @return shardedJobId for this job */ private ShardedJobRunId getShardedJobId() { - DatastoreOptions defaultDatastoreOptions = DatastoreOptions.getDefaultInstance(); + DatastoreOptions defaultDatastoreOptions = EnvironmentUtils.datastoreBuilderFromDefaultInstance().build(); return ShardedJobRunId.of( java.util.Optional.ofNullable(settings.getProjectId()).orElseGet(defaultDatastoreOptions::getProjectId), diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java index 2aef9a53..dbb00fbf 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java @@ -2,6 +2,7 @@ package com.google.appengine.tools.mapreduce; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.impl.*; import com.google.appengine.tools.mapreduce.impl.pipeline.CleanupPipelineJob; import com.google.appengine.tools.mapreduce.impl.pipeline.ExamineStatusAndReturnResult; @@ -9,10 +10,20 @@ import com.google.appengine.tools.mapreduce.impl.pipeline.ShardedJob; import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunId; import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobSettings; -import com.google.appengine.tools.mapreduce.impl.sort.*; +import com.google.appengine.tools.mapreduce.impl.sort.MergeContext; +import com.google.appengine.tools.mapreduce.impl.sort.MergeShardTask; +import com.google.appengine.tools.mapreduce.impl.sort.SortContext; +import com.google.appengine.tools.mapreduce.impl.sort.SortShardTask; +import com.google.appengine.tools.mapreduce.impl.sort.SortWorker; import com.google.appengine.tools.mapreduce.inputs.GoogleCloudStorageLineInput; import com.google.appengine.tools.mapreduce.outputs.GoogleCloudStorageFileOutput; -import com.google.appengine.tools.pipeline.*; +import com.google.appengine.tools.pipeline.FutureValue; +import com.google.appengine.tools.pipeline.Job0; +import com.google.appengine.tools.pipeline.Job1; +import com.google.appengine.tools.pipeline.JobRunId; +import com.google.appengine.tools.pipeline.PipelineOrchestrator; +import com.google.appengine.tools.pipeline.PromisedValue; +import com.google.appengine.tools.pipeline.Value; import com.google.appengine.tools.pipeline.impl.PipelineManager; import com.google.appengine.tools.pipeline.impl.backend.AppEngineEnvironment; import com.google.cloud.datastore.Datastore; @@ -30,7 +41,11 @@ import java.io.IOException; import java.io.Serial; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.logging.Level; import java.util.logging.Logger; @@ -241,9 +256,7 @@ static class SortJob extends Job1< protected Datastore getDatastore() { if (datastore == null) { - datastore = DatastoreOptions.getDefaultInstance().toBuilder() - .setNamespace(settings.getNamespace()) - .build().getService(); + datastore = settings.getDatastoreOptions().getService(); } return datastore; } @@ -345,9 +358,7 @@ static class MergeJob extends protected Datastore getDatastore() { if (datastore == null) { - datastore = DatastoreOptions.getDefaultInstance().toBuilder() - .setNamespace(settings.getNamespace()) - .build().getService(); + datastore = settings.getDatastoreOptions().getService(); } return datastore; } @@ -480,9 +491,7 @@ static class ReduceJob extends Job1, protected Datastore getDatastore() { if (datastore == null) { - datastore = DatastoreOptions.getDefaultInstance().toBuilder() - .setNamespace(settings.getNamespace()) - .build().getService(); + datastore = settings.getDatastoreOptions().getService(); } return datastore; } @@ -607,8 +616,7 @@ public String getJobDisplayName() { //NOTE: very coupled to `MapReduceSettings` implementation; could be there, but don't want to couple that to FW implementation // and also don't want it to be public, as would be exposed to API private DatastoreOptions getDatastoreOptions(@NonNull MapReduceSettings settings) { - DatastoreOptions.Builder builder = DatastoreOptions.getDefaultInstance().toBuilder(); - + DatastoreOptions.Builder builder = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); java.util.Optional.ofNullable(settings.getDatastoreHost()).ifPresent(builder::setHost); java.util.Optional.ofNullable(settings.getProjectId()).ifPresent(builder::setProjectId); java.util.Optional.ofNullable(settings.getDatabaseId()).ifPresent(builder::setDatabaseId); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java b/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java index c512f1b2..7b2890b4 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java @@ -1,5 +1,6 @@ package com.google.appengine.tools.mapreduce; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.impl.WorkerShardTask; import com.google.appengine.tools.pipeline.JobSetting; import com.google.cloud.datastore.DatastoreOptions; @@ -50,7 +51,7 @@ default JobSetting[] toJobSettings(JobSetting... extra) { default DatastoreOptions getDatastoreOptions() { - DatastoreOptions.Builder optionsBuilder = DatastoreOptions.getDefaultInstance().toBuilder(); + DatastoreOptions.Builder optionsBuilder = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); Optional.ofNullable(getDatastoreHost()).ifPresent(optionsBuilder::setHost); Optional.ofNullable(getProjectId()).ifPresent(optionsBuilder::setProjectId); Optional.ofNullable(getDatabaseId()).ifPresent(optionsBuilder::setDatabaseId); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java index 669cff58..4e349241 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java @@ -21,7 +21,7 @@ public DeleteShardedJob(DatastoreOptions datastoreOptions, ShardedJobRunId jobId @Override protected Job createShardsJob(int start, int end) { - return new DeleteShardsInfos(datastoreOptions.toBuilder().build(), getJobId(), start, end); + return new DeleteShardsInfos(datastoreOptions, getJobId(), start, end); } @Override diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java index dd0df7ca..6a1c8d9b 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java @@ -1,5 +1,6 @@ package com.google.appengine.tools.mapreduce.impl.shardedjob.pipeline; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.RetryExecutor; import com.google.appengine.tools.mapreduce.impl.shardedjob.*; import com.google.appengine.tools.mapreduce.impl.util.DatastoreSerializationUtil; @@ -41,7 +42,8 @@ private static void addParentKeyToList(PipelineBackendTransaction tx, List @Override public Value run() { - Datastore datastore = datastoreOptions.toBuilder().build().getService(); + Datastore datastore = EnvironmentUtils.datastoreBuilderFromDatastoreOptions(datastoreOptions).build().getService(); + final List toDelete = new ArrayList<>((end - start) * 2); PipelineBackendTransaction tx = PipelineBackendTransaction.newInstance(datastore); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardedJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardedJob.java index 7fb8b7c5..5777f5e6 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardedJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardedJob.java @@ -22,7 +22,7 @@ public FinalizeShardedJob(DatastoreOptions datastoreOptions, ShardedJobRunId job @Override protected Job createShardsJob(int start, int end) { - return new FinalizeShardsInfos(this.datastoreOptions.toBuilder().build(), getJobId(), status, start, end); + return new FinalizeShardsInfos(this.datastoreOptions, getJobId(), status, start, end); } @Override diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java index b2ccbf35..f701f5b9 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java @@ -2,6 +2,7 @@ import static java.util.concurrent.Executors.callable; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.RetryExecutor; import com.google.appengine.tools.mapreduce.impl.shardedjob.*; import com.google.appengine.tools.mapreduce.impl.util.DatastoreSerializationUtil; @@ -31,7 +32,8 @@ public class FinalizeShardsInfos extends Job0 { @Override public Value run() { - Datastore datastore = datastoreOptions.toBuilder().build().getService(); + // if coming from deserialization may lose transient properties that cause NPE + Datastore datastore = EnvironmentUtils.datastoreBuilderFromDatastoreOptions(datastoreOptions).build().getService(); RetryExecutor.call( ShardedJobRunner.FOREVER_RETRYER, diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java index d206c160..86471616 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java @@ -1,24 +1,20 @@ package com.google.appengine.tools.mapreduce.impl.util; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunId; import com.google.appengine.tools.pipeline.JobRunId; -import com.google.appengine.tools.pipeline.impl.backend.*; -import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; - -import com.google.common.base.Strings; -import lombok.*; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.Value; import lombok.extern.java.Log; import javax.inject.Inject; import javax.inject.Singleton; -import java.util.Arrays; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** * handles translation to/from request parameters and pipeline backends @@ -47,11 +43,6 @@ public static class Params { private static final String TRACEPARENT_HEADER = "traceparent"; private static final String CLOUD_TRACE_CONTEXT_HEADER = "X-Cloud-Trace-Context"; - - // value of env var GOOGLE_CLOUD_PROJECT when running locally; underscores aren't actually legal in GCP project ids, - // so if this ever ends up being used in a real GCP API call, it blows up in validation before request is even sent by client - public static final String LOCAL_GAE_PROJECT_ID = "no_app_id"; - private static final String DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID = "local-gae-project"; /** * value to override local GAE project id with, when running locally; to allow this on case-by-case basis @@ -66,18 +57,7 @@ public DatastoreOptions buildDatastoreFromRequest(HttpServletRequest request) { // - pass as parameters on request // - set as env vars (system properties), via Maven to pull (wouldn't exactly let us do integration tests) // --> no, host may include port, set at runtime by emulator; not easy/appropriate to fake as env var - - DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - - DatastoreOptions.Builder builder = defaultInstance.toBuilder(); - - if (LOCAL_GAE_PROJECT_ID.equals(defaultInstance.getProjectId())) { - log.info("pipelines fw detected running locally with GAE projectId set as 'no_app_id'; this isn't legal GCP project id, so changing to 'local-gae-project'"); - // 'no_app_id' isn't legal name, so change it - builder.setProjectId(getLocalProjectIdOverride()); - // try to get emulator host from env var, if available - builder.setHost(System.getProperty("DATASTORE_EMULATOR_HOST", System.getenv("DATASTORE_EMULATOR_HOST"))); - } + DatastoreOptions.Builder builder = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); // whatever values are, they can be overridden by request params getParam(request, Params.DATASTORE_HOST).ifPresent(builder::setHost); @@ -97,7 +77,7 @@ public Optional getJobId(HttpServletRequest request, String paramName) { } public JobRunId getRootPipelineId(HttpServletRequest request) throws ServletException { - return getJobId(request, Params.ROOT_PIPELINE_ID).map(s -> JobRunId.fromEncodedString(s)) + return getJobId(request, Params.ROOT_PIPELINE_ID).map(JobRunId::fromEncodedString) .orElseThrow(() -> new ServletException(Params.ROOT_PIPELINE_ID + " parameter not found.")); } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java b/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java index 9973483e..f9c2df8c 100644 --- a/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java @@ -1,13 +1,17 @@ package com.google.appengine.tools.pipeline.di; -import com.google.appengine.tools.mapreduce.impl.util.RequestUtils; -import com.google.appengine.tools.pipeline.impl.backend.*; +import com.google.appengine.tools.EnvironmentUtils; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineEnvironment; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesService; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesServiceImpl; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineStandardGen2; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineTaskQueue; +import com.google.appengine.tools.pipeline.impl.backend.CloudTasksTaskQueue; +import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; import com.google.appengine.v1.ApplicationsClient; import com.google.appengine.v1.ServicesClient; import com.google.appengine.v1.VersionsClient; -import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.tasks.v2.CloudTasksClient; -import dagger.Binds; import dagger.Module; import dagger.Provides; import lombok.SneakyThrows; @@ -108,6 +112,6 @@ PipelineTaskQueue pipelineTaskQueue(AppEngineEnvironment environment, } boolean isTestingContext(AppEngineEnvironment environment) { - return RequestUtils.LOCAL_GAE_PROJECT_ID.equals(environment.getProjectId()) || "test-project" .equals(environment.getProjectId()); + return EnvironmentUtils.isNotCloudEnvironment(environment.getProjectId()); } } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java index 5d393f55..6b584a69 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java @@ -14,17 +14,27 @@ package com.google.appengine.tools.pipeline.impl; -import com.google.appengine.tools.mapreduce.*; -import com.google.appengine.tools.mapreduce.impl.shardedjob.*; +import com.google.appengine.tools.EnvironmentUtils; +import com.google.appengine.tools.mapreduce.MapJob; +import com.google.appengine.tools.mapreduce.MapReduceJob; +import com.google.appengine.tools.mapreduce.MapReduceSettings; +import com.google.appengine.tools.mapreduce.MapReduceSpecification; +import com.google.appengine.tools.mapreduce.MapSettings; +import com.google.appengine.tools.mapreduce.MapSpecification; +import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTask; +import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTaskState; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobController; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunId; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunner; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobSettings; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobState; import com.google.appengine.tools.pipeline.*; import com.google.appengine.tools.pipeline.di.DaggerMultiTenantComponent; import com.google.appengine.tools.pipeline.di.MultiTenantComponent; import com.google.appengine.tools.pipeline.di.TenantModule; -import com.google.appengine.tools.pipeline.impl.backend.SerializationStrategy; -import com.google.appengine.tools.pipeline.impl.util.DIUtil; -import com.google.cloud.datastore.Key; import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd; import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd; +import com.google.appengine.tools.pipeline.impl.backend.SerializationStrategy; import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec; import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec.Group; import com.google.appengine.tools.pipeline.impl.model.Barrier; @@ -43,11 +53,13 @@ import com.google.appengine.tools.pipeline.impl.tasks.FinalizeJobTask; import com.google.appengine.tools.pipeline.impl.tasks.HandleChildExceptionTask; import com.google.appengine.tools.pipeline.impl.tasks.HandleSlotFilledTask; -import com.google.appengine.tools.pipeline.impl.tasks.RunJobTask; import com.google.appengine.tools.pipeline.impl.tasks.PipelineTask; +import com.google.appengine.tools.pipeline.impl.tasks.RunJobTask; +import com.google.appengine.tools.pipeline.impl.util.DIUtil; import com.google.appengine.tools.pipeline.impl.util.GUIDGenerator; import com.google.appengine.tools.pipeline.impl.util.StringUtils; import com.google.appengine.tools.pipeline.util.Pair; +import com.google.cloud.datastore.Key; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; @@ -65,7 +77,11 @@ import java.lang.reflect.Method; import java.time.Duration; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.logging.Level; import java.util.stream.Collectors; @@ -184,8 +200,8 @@ public JobRecord registerNewJobRecord(UpdateSpec updateSpec, JobSetting[] settin // --> JobRecordFactory or something, that gets injected String projectId = backEnd.getOptions().as(AppEngineBackEnd.Options.class).getProjectId(); - if (projectId.equals("no_app_id")) { - throw new IllegalStateException("projectId is 'no_app_id'; this isn't legal GCP project id"); + if (EnvironmentUtils.LOCAL_GAE_PROJECT_ID.equals(projectId)) { + throw new IllegalStateException("projectId is '%s'; this isn't legal GCP project id".formatted(projectId)); } JobRecord jobRecord = JobRecord.createRootJobRecord(projectId, jobInstance, getSerializationStrategy(), settings); diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java index 6516c1d6..9fa5120f 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java @@ -15,6 +15,7 @@ package com.google.appengine.tools.pipeline.impl.backend; import com.github.rholder.retry.*; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.pipeline.JobRunId; import com.google.appengine.tools.pipeline.NoSuchObjectException; import com.google.appengine.tools.pipeline.impl.model.*; @@ -114,7 +115,7 @@ public void onRetry(Attempt attempt) { // Only used in tests public AppEngineBackEnd(Options options, PipelineTaskQueue taskQueue, AppEngineServicesService appEngineServicesService) { - this(options.getDatastoreOptions().toBuilder().build().getService(), taskQueue, appEngineServicesService); + this(EnvironmentUtils.datastoreBuilderFromDatastoreOptions(options.getDatastoreOptions()).build().getService(), taskQueue, appEngineServicesService); } @Builder @@ -131,10 +132,11 @@ public static class Options implements PipelineBackEnd.Options { @SneakyThrows public static Options defaults() { + DatastoreOptions dsOptions = EnvironmentUtils.datastoreBuilderFromDefaultInstance().build(); return Options.builder() - .datastoreOptions(DatastoreOptions.getDefaultInstance()) + .datastoreOptions(dsOptions) .credentials(GoogleCredentials.getApplicationDefault()) - .projectId(DatastoreOptions.getDefaultProjectId()) + .projectId(dsOptions.getProjectId()) .build(); } diff --git a/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java b/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java index ae3512ca..16b87f28 100644 --- a/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java +++ b/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java @@ -9,7 +9,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.stream.Collectors; @@ -297,17 +296,4 @@ private void rollbackAllServices() { rollbackTasks(); } - @Override - protected void finalize() throws Throwable { - try { - if (this.getDsTransaction().isActive()) { - // shouldn't happen, unless opening tnx just for read, just is kind of absurd in - // a strong consistency model - log.log(Level.WARNING, String.format("Finalizing PipelineBackendTransactionImpl transaction open for %s", - stopwatch.elapsed(TimeUnit.MILLISECONDS))); - } - } finally { - super.finalize(); - } - } } diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java b/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java index 33f9b695..2fc967b3 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java @@ -1,13 +1,20 @@ package com.google.appengine.tools.mapreduce; +import com.google.appengine.tools.EnvironmentUtils; +import com.google.cloud.NoCredentials; import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.datastore.testing.LocalDatastoreHelper; import lombok.extern.java.Log; -import org.junit.jupiter.api.extension.*; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; import java.net.ConnectException; -import java.time.Duration; import java.util.logging.Level; /** @@ -19,7 +26,7 @@ @Log public class DatastoreExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback { - public static String TEST_DATASTORE_PROJECT_ID = "test-project"; + public static String TEST_DATASTORE_PROJECT_ID = EnvironmentUtils.TEST_PROJECT_ID; public static String DS_CONTEXT_KEY = "ds-emulator"; public static String DS_OPTIONS_CONTEXT_KEY = "ds-options"; @@ -32,11 +39,13 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception { .setConsistency(1.0) .build(); globalDatastoreHelper.start(); + System.setProperty("DATASTORE_EMULATOR_HOST", "localhost:" + globalDatastoreHelper.getPort()); log.info("Datastore emulator started on port : " + globalDatastoreHelper.getPort()); } @Override public void afterAll(ExtensionContext extensionContext) throws Exception { + System.clearProperty("DATASTORE_EMULATOR_HOST"); int attempt = 0; boolean stopped = false; @@ -61,11 +70,14 @@ public void afterAll(ExtensionContext extensionContext) throws Exception { public void beforeEach(ExtensionContext extensionContext) throws Exception { globalDatastoreHelper.reset(); log.info("Datastore emulator reset"); - DatastoreOptions options = globalDatastoreHelper.getOptions().toBuilder() - .setProjectId(TEST_DATASTORE_PROJECT_ID) - .build(); - - extensionContext.getStore(ExtensionContext.Namespace.GLOBAL).put(DS_OPTIONS_CONTEXT_KEY, options); + DatastoreOptions.Builder builder = EnvironmentUtils.datastoreBuilderFromDatastoreOptions(globalDatastoreHelper.getOptions()); + builder.setProjectId(TEST_DATASTORE_PROJECT_ID); + builder.setCredentials(NoCredentials.getInstance()); + builder.setHost("localhost:" + globalDatastoreHelper.getPort()); + builder.setOpenTelemetryOptions(DatastoreOpenTelemetryOptions.newBuilder().build()); + + DatastoreOptions options = builder.build(); + extensionContext.getStore(ExtensionContext.Namespace.GLOBAL).put(DS_OPTIONS_CONTEXT_KEY, options); Datastore datastore = options.getService(); extensionContext.getStore(ExtensionContext.Namespace.GLOBAL).put(DS_CONTEXT_KEY, datastore); diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/IncrementalTaskStateTest.java b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/IncrementalTaskStateTest.java index 5ea7e4f3..5f1d4e5d 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/IncrementalTaskStateTest.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/IncrementalTaskStateTest.java @@ -1,5 +1,6 @@ package com.google.appengine.tools.mapreduce.impl.shardedjob; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.pipeline.DatastoreExtension; import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; @@ -15,7 +16,7 @@ class IncrementalTaskStateTest { @Test void hasNoParent() { - Datastore datastore = DatastoreOptions.getDefaultInstance().getService(); + Datastore datastore = EnvironmentUtils.datastoreBuilderFromDefaultInstance().build().getService(); Key exampleKey = IncrementalTaskState.makeKey(datastore, IncrementalTaskId.of(ShardedJobRunId.builder().project("test-project").jobId("job").build(), 1)); diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/TestController.java b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/TestController.java index 07a40560..f88659fa 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/TestController.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/TestController.java @@ -1,5 +1,6 @@ package com.google.appengine.tools.mapreduce.impl.shardedjob; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.pipeline.PipelineService; import com.google.cloud.datastore.DatastoreOptions; import lombok.*; @@ -30,7 +31,7 @@ public class TestController extends ShardedJobController { public DatastoreOptions getDatastoreOptions() { // in case serialized, recreate to "recover" transient fields - return datastoreOptions.toBuilder().build(); + return EnvironmentUtils.datastoreBuilderFromDatastoreOptions(datastoreOptions).build(); } @Override diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtilsTest.java b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtilsTest.java index 7d5d828b..ebe5de87 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtilsTest.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtilsTest.java @@ -1,23 +1,16 @@ package com.google.appengine.tools.mapreduce.impl.util; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunId; -import com.google.appengine.tools.pipeline.JobRunId; -import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd; -import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd; -import com.google.appengine.tools.pipeline.impl.model.JobRecord; -import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; -import com.google.cloud.datastore.Key; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import javax.servlet.http.HttpServletRequest; - import java.net.URLDecoder; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -44,7 +37,7 @@ public void getMapReduceId() { @Test public void localBootstrap() { //make this like local situation - System.setProperty("GOOGLE_CLOUD_PROJECT", "no_app_id"); + System.setProperty("GOOGLE_CLOUD_PROJECT", EnvironmentUtils.LOCAL_GAE_PROJECT_ID); System.setProperty("DATASTORE_EMULATOR_HOST", "http://localhost:8081"); RequestUtils requestUtils = new RequestUtils(); diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java b/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java index 3af98faf..5947bc9b 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java @@ -1,15 +1,18 @@ package com.google.appengine.tools.pipeline; +import com.google.appengine.tools.EnvironmentUtils; import com.google.cloud.datastore.Datastore; -import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.datastore.testing.LocalDatastoreHelper; -import io.opentelemetry.api.OpenTelemetry; import lombok.extern.java.Log; -import org.junit.jupiter.api.extension.*; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; import java.net.ConnectException; -import java.time.Duration; import java.util.logging.Level; /** @@ -21,7 +24,7 @@ @Log public class DatastoreExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback { - public static String TEST_DATASTORE_PROJECT_ID = "test-project"; + public static String TEST_DATASTORE_PROJECT_ID = EnvironmentUtils.TEST_PROJECT_ID; public static String DS_CONTEXT_KEY = "ds-emulator"; public static String DS_OPTIONS_CONTEXT_KEY = "ds-options"; @@ -34,11 +37,13 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception { .setConsistency(1.0) .build(); globalDatastoreHelper.start(); + System.setProperty("DATASTORE_EMULATOR_HOST", "localhost:" + globalDatastoreHelper.getPort()); log.info("Datastore emulator started on port : " + globalDatastoreHelper.getPort()); } @Override public void afterAll(ExtensionContext extensionContext) throws Exception { + System.clearProperty("DATASTORE_EMULATOR_HOST"); int attempt = 0; boolean stopped = false; @@ -63,7 +68,7 @@ public void afterAll(ExtensionContext extensionContext) throws Exception { public void beforeEach(ExtensionContext extensionContext) throws Exception { globalDatastoreHelper.reset(); log.info("Datastore emulator reset"); - DatastoreOptions options = globalDatastoreHelper.getOptions().toBuilder() + DatastoreOptions options = EnvironmentUtils.datastoreBuilderFromDatastoreOptions(globalDatastoreHelper.getOptions()) .setProjectId(TEST_DATASTORE_PROJECT_ID) .build(); diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/PipelineTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/PipelineTest.java index fd7c7c0a..ca4dfb34 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/PipelineTest.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/PipelineTest.java @@ -31,6 +31,8 @@ import com.google.apphosting.api.ApiProxy; import com.google.auth.Credentials; +import com.google.cloud.NoCredentials; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import com.google.cloud.datastore.DatastoreOptions; import lombok.Getter; @@ -104,7 +106,8 @@ public static SerializationStrategy getSerializationStrategy() { return new AppEngineBackEnd(AppEngineBackEnd.Options.builder() .datastoreOptions(DatastoreOptions.newBuilder() .setProjectId("test-project") - .setCredentials(mock(Credentials.class)) + .setCredentials(NoCredentials.getInstance()) + .setOpenTelemetryOptions(DatastoreOpenTelemetryOptions.newBuilder().build()) .build()) .build(), null, diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java index a1da7fed..3df8941b 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java @@ -1,13 +1,13 @@ package com.google.appengine.tools.pipeline.impl.backend; import com.google.appengine.tools.pipeline.impl.util.SerializationUtils; -import com.google.auth.Credentials; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.auth.oauth2.UserCredentials; import com.google.cloud.NoCredentials; import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import com.google.cloud.datastore.DatastoreOptions; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; @@ -25,7 +25,8 @@ void getOptions() { //TODO: replace this with GoogleCredentials.getApplicationDefault() when it's available, if we ever auth the Github // Action with GCP (debatably necessary for integration tests) GoogleCredentials credentials = GoogleCredentials.newBuilder() - .setQuotaProjectId("test-project") + // use some non-local/test project id to not override the credentials + .setQuotaProjectId("some-project") .setAccessToken(AccessToken.newBuilder().setTokenValue("token").setExpirationTime(new Date()).build()) .build(); @@ -33,6 +34,7 @@ void getOptions() { Datastore datastore = DatastoreOptions.newBuilder() .setProjectId(credentials.getQuotaProjectId()) .setCredentials(credentials) + .setOpenTelemetryOptions(DatastoreOpenTelemetryOptions.newBuilder().build()) .build().getService(); AppEngineBackEnd backend = new AppEngineBackEnd(datastore, mock(PipelineTaskQueue.class), mock(AppEngineServicesService.class)); diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java b/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java index ad911f71..e5ba925e 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java @@ -1,11 +1,15 @@ package com.google.appengine.tools.pipeline.testutil; -import com.google.appengine.tools.mapreduce.impl.util.RequestUtils; -import com.google.appengine.tools.pipeline.impl.backend.*; +import com.google.appengine.tools.EnvironmentUtils; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineEnvironment; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesService; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesServiceImpl; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineStandardGen2; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineTaskQueue; +import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; import com.google.appengine.v1.ApplicationsClient; import com.google.appengine.v1.ServicesClient; import com.google.appengine.v1.VersionsClient; -import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.tasks.v2.CloudTasksClient; import dagger.Binds; import dagger.Module; @@ -56,7 +60,7 @@ AppEngineServicesService appEngineServicesService(AppEngineServicesServiceImpl i //before, test harness basically did this by overriding env vars via ApiProxy stuff; see LocalModulesServiceTestConfig - if (isTestingContext()) { + if (EnvironmentUtils.isNotCloudEnvironment()) { return new AppEngineServicesService() { @Override public String getLocation() { @@ -83,11 +87,6 @@ public String getWorkerServiceHostName(String service, String version) { } } - boolean isTestingContext() { - DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - return RequestUtils.LOCAL_GAE_PROJECT_ID.equals(defaultInstance.getProjectId()) || "test-project" .equals(defaultInstance.getProjectId()); - } - @Module interface Bindings { // this is the real difference for tests atm diff --git a/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java b/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java index 2c1a9bf9..5cddbc76 100644 --- a/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java +++ b/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java @@ -3,15 +3,16 @@ import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.Transaction; +import com.google.protobuf.ByteString; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; import java.util.Collections; -import java.util.List; import java.util.Set; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; class PipelineBackendTransactionImplTest { @@ -27,6 +28,7 @@ void setUp() { mockTaskQueue = mock(PipelineTaskQueue.class); mockDatastore = mock(Datastore.class); when(mockDatastore.newTransaction()).thenReturn(mockTransaction); + when(mockTransaction.getTransactionId()).thenReturn(ByteString.copyFromUtf8("transaction-id")); pipelineBackendTransaction = new PipelineBackendTransactionImpl(mockDatastore, mockTaskQueue); } @@ -35,7 +37,8 @@ void commit() { when(mockTransaction.isActive()).thenReturn(true); when(mockTaskQueue.enqueue(anyString(), anyCollection())).thenReturn(Collections.emptyList()); - pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); + pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder() + .method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); pipelineBackendTransaction.commit(); verify(mockTransaction).commit(); @@ -45,20 +48,20 @@ void commit() { @Test void commitQueueFailsDeletesTasks() { when(mockTransaction.isActive()).thenReturn(true); - Set taskReferences = Collections.singleton(PipelineTaskQueue.TaskReference.of("queue1", "task-ref")); + Set taskReferences = Collections + .singleton(PipelineTaskQueue.TaskReference.of("queue1", "task-ref")); when(mockTaskQueue.enqueue(anyString(), anyCollection())).thenThrow(new RuntimeException("error enqueueing")); - pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); + pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder() + .method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); assertThrows(RuntimeException.class, () -> pipelineBackendTransaction.commit()); verify(mockTaskQueue).enqueue(anyString(), anyCollection()); } @Test - void commitDatastoreFailsDeletesTasks() { + void commitDatastoreFailsNoTasksEnqueuedOrDeleted() { when(mockTransaction.isActive()).thenReturn(true); - List taskReferences = Collections.singletonList(PipelineTaskQueue.TaskReference.of("queue1", "task-ref")); - when(mockTaskQueue.enqueue(anyString(), anyCollection())).thenReturn(taskReferences); when(mockTransaction.commit()).thenThrow(new RuntimeException("error committing")); pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); @@ -66,13 +69,15 @@ void commitDatastoreFailsDeletesTasks() { assertThrows(RuntimeException.class, () -> pipelineBackendTransaction.commit()); verify(mockTransaction, atMostOnce()).commit(); - verify(mockTaskQueue).enqueue(anyString(), anyCollection()); - verify(mockTaskQueue).deleteTasks(ArgumentMatchers.argThat(taskReferences::containsAll)); + // Datastore commits first; on failure tasks are never enqueued so there is nothing to delete + verify(mockTaskQueue, never()).enqueue(anyString(), anyCollection()); + verify(mockTaskQueue, never()).deleteTasks(any()); } @Test void enqueue() { - PipelineTaskQueue.TaskSpec task = PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build(); + PipelineTaskQueue.TaskSpec task = PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET) + .callbackPath("path").build(); pipelineBackendTransaction.enqueue("queue1", task); assertFalse(pipelineBackendTransaction.getPendingTaskSpecsByQueue().isEmpty()); @@ -81,7 +86,8 @@ void enqueue() { @Test void rollback() { - pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); + pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder() + .method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); pipelineBackendTransaction.rollback(); verify(mockTransaction).rollback(); diff --git a/java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000..fdbd0b15 --- /dev/null +++ b/java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-subclass