From 7e46e5eb438d5ac1c64eef03a178908fca7c1104 Mon Sep 17 00:00:00 2001 From: Erik Schultink Date: Wed, 25 Feb 2026 06:04:28 -0800 Subject: [PATCH 01/25] bump version number; delete old pom --- ...pengine-pipeline-0.3+worklytics.12-pom.xml | 258 ------------------ java/pom.xml | 2 +- 2 files changed, 1 insertion(+), 259 deletions(-) delete mode 100644 java/appengine-pipeline-0.3+worklytics.12-pom.xml diff --git a/java/appengine-pipeline-0.3+worklytics.12-pom.xml b/java/appengine-pipeline-0.3+worklytics.12-pom.xml deleted file mode 100644 index 1f55967a..00000000 --- a/java/appengine-pipeline-0.3+worklytics.12-pom.xml +++ /dev/null @@ -1,258 +0,0 @@ - - 4.0.0 - com.google.appengine.tools - appengine-pipeline - Pipeline Framework for Google App Engine - Framework for orchestrating complex workflows on top of Google App Engine - https://github.com/Worklytics/appengine-pipelines/ - - - - - 0.3+worklytics.12 - jar - - - The Apache Software License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0 - - - - UTF-8 - [2.7, 3.0) - 2.55 - 1.18.36 - [2.0.4, 3.0) - - - - - - github - Pipelines for Google App Engine by Worklytics - https://maven.pkg.github.com/Worklytics/appengine-pipelines - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.13.0 - - 17 - 17 - - - - org.apache.maven.plugins - maven-surefire-plugin - 3.2.5 - - - - test-project - 3 - - - - - org.apache.maven.plugins - maven-jar-plugin - 3.4.1 - - - - true - true - - - - - - - - ${basedir}/src/main/resources - com/google/appengine/tools/pipeline/impl/servlets - - - - - - - - com.google.cloud - libraries-bom - 26.53.0 - pom - import - - - - org.junit - junit-bom - 5.11.3 - pom - import - - - - - - - - - - com.google.cloud - google-cloud-appengine-admin - - - com.google.cloud - google-cloud-datastore - - - com.google.cloud - google-cloud-tasks - - - com.google.cloud - google-cloud-storage - - - com.google.guava - guava - [32.1,32.99] - - - - - - - - - javax.servlet - javax.servlet-api - 3.1.0 - provided - - - com.googlecode.charts4j - charts4j - 1.3 - - - org.json - json - 20231013 - - - it.unimi.dsi - fastutil - [6.5,6.6) - - - org.projectlombok - lombok - ${lombok.version} - provided - - - com.github.rholder - guava-retrying - 2.0.0 - - - - - - com.fasterxml.jackson.core - jackson-core - ${jackson.version} - - - com.fasterxml.jackson.core - jackson-annotations - ${jackson.version} - - - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} - - - com.fasterxml.jackson.datatype - jackson-datatype-jdk8 - ${jackson.version} - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - ${jackson.version} - - - jakarta.inject - jakarta.inject-api - 2.0.1 - - - - - - com.google.dagger - dagger - ${dagger.version} - - - - - - - - - - - - com.google.appengine - appengine-api-1.0-sdk - ${appengine.target.version} - test - - - - com.google.appengine - appengine-testing - ${appengine.target.version} - test - - - com.google.appengine - appengine-api-stubs - ${appengine.target.version} - test - - - org.junit.jupiter - junit-jupiter-api - test - - - org.junit.jupiter - junit-jupiter-engine - test - - - org.junit.jupiter - junit-jupiter-params - test - - - org.mockito - mockito-core - 5.15.2 - test - - - - \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml index 5d5a558e..009ef2ca 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -17,7 +17,7 @@ - 0.3+worklytics.12 + 0.3+worklytics.13 UTF-8 [2.18.2, 2.18.3) 2.55 From c4a093339777be9eef159cabb595af27d5968721 Mon Sep 17 00:00:00 2001 From: Erik Schultink Date: Wed, 25 Feb 2026 06:13:17 -0800 Subject: [PATCH 02/25] bump to java21 --- .github/workflows/test-java.yml | 4 ++-- java/pom.xml | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/.github/workflows/test-java.yml b/.github/workflows/test-java.yml index 87122948..3e43503f 100644 --- a/.github/workflows/test-java.yml +++ b/.github/workflows/test-java.yml @@ -17,7 +17,7 @@ jobs: - uses: actions/setup-java@v4 with: distribution: 'adopt' - java-version: '17' # maven won't accept --release argument with java < 8; and 11 is next LTS + java-version: '21' # maven won't accept --release argument with java < 8; and 11 is next LTS - id: 'read-cache-maven-packages' name: Read Cached Maven packages uses: actions/cache@v4 @@ -56,7 +56,7 @@ jobs: - uses: actions/setup-java@v4 with: distribution: 'adopt' - java-version: '17' # maven won't accept --release argument with java < 8; and 11 is next LTS + java-version: '21' # maven won't accept --release argument with java < 8; and 11 is next LTS - id: 'read-cache-maven-packages' name: Read Cached Maven packages uses: actions/cache@v4 diff --git a/java/pom.xml b/java/pom.xml index 009ef2ca..5a8bc2bb 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -41,9 +41,8 @@ maven-compiler-plugin 3.13.0 - 17 - 17 - + 21 + 21 From 44771b083459a94506fc0f735d08c9578b6b52a2 Mon Sep 17 00:00:00 2001 From: Erik Schultink Date: Wed, 25 Feb 2026 06:25:16 -0800 Subject: [PATCH 03/25] cleanup workflow --- .github/workflows/test-java.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-java.yml b/.github/workflows/test-java.yml index 3e43503f..202160af 100644 --- a/.github/workflows/test-java.yml +++ b/.github/workflows/test-java.yml @@ -16,8 +16,8 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-java@v4 with: - distribution: 'adopt' - java-version: '21' # maven won't accept --release argument with java < 8; and 11 is next LTS + distribution: 'temurin' + java-version: '21' - id: 'read-cache-maven-packages' name: Read Cached Maven packages uses: actions/cache@v4 From 8b2058040d0ed64df288a1080ce4b0d5c3634823 Mon Sep 17 00:00:00 2001 From: Erik Schultink Date: Wed, 25 Feb 2026 07:39:58 -0800 Subject: [PATCH 04/25] organize dependencies --- java/pom.xml | 30 ++++++++--- .../PipelineBackendTransactionImplTest.java | 50 ++++++++++++------- .../org.mockito.plugins.MockMaker | 1 + 3 files changed, 56 insertions(+), 25 deletions(-) create mode 100644 java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/java/pom.xml b/java/pom.xml index 5a8bc2bb..7b7a643a 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -39,17 +39,31 @@ org.apache.maven.plugins maven-compiler-plugin - 3.13.0 + 3.15.0 21 21 + false + + + org.projectlombok + lombok + ${lombok.version} + + + com.google.dagger + dagger-compiler + ${dagger.version} + + org.apache.maven.plugins maven-surefire-plugin - 3.2.5 + 3.5.2 + test-project @@ -60,7 +74,7 @@ org.apache.maven.plugins maven-jar-plugin - 3.4.1 + 3.5.0 @@ -136,11 +150,6 @@ 3.1.0 provided - - com.googlecode.charts4j - charts4j - 1.3 - org.json json @@ -166,6 +175,11 @@ + + com.googlecode.charts4j + charts4j + 1.3 + com.fasterxml.jackson.core jackson-core diff --git a/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java b/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java index 2c1a9bf9..822afbac 100644 --- a/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java +++ b/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java @@ -1,18 +1,26 @@ package com.google.appengine.tools.txn; -import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; -import com.google.cloud.datastore.Datastore; -import com.google.cloud.datastore.Transaction; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.Collections; import java.util.List; import java.util.Set; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; + +import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; +import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.Transaction; class PipelineBackendTransactionImplTest { @@ -27,6 +35,7 @@ void setUp() { mockTaskQueue = mock(PipelineTaskQueue.class); mockDatastore = mock(Datastore.class); when(mockDatastore.newTransaction()).thenReturn(mockTransaction); + when(mockTransaction.getTransactionId()).thenReturn(com.google.protobuf.ByteString.copyFromUtf8("mock-txn-id")); pipelineBackendTransaction = new PipelineBackendTransactionImpl(mockDatastore, mockTaskQueue); } @@ -35,7 +44,8 @@ void commit() { when(mockTransaction.isActive()).thenReturn(true); when(mockTaskQueue.enqueue(anyString(), anyCollection())).thenReturn(Collections.emptyList()); - pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); + pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder() + .method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); pipelineBackendTransaction.commit(); verify(mockTransaction).commit(); @@ -45,10 +55,12 @@ void commit() { @Test void commitQueueFailsDeletesTasks() { when(mockTransaction.isActive()).thenReturn(true); - Set taskReferences = Collections.singleton(PipelineTaskQueue.TaskReference.of("queue1", "task-ref")); + Set taskReferences = Collections + .singleton(PipelineTaskQueue.TaskReference.of("queue1", "task-ref")); when(mockTaskQueue.enqueue(anyString(), anyCollection())).thenThrow(new RuntimeException("error enqueueing")); - pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); + pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder() + .method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); assertThrows(RuntimeException.class, () -> pipelineBackendTransaction.commit()); verify(mockTaskQueue).enqueue(anyString(), anyCollection()); @@ -57,22 +69,25 @@ void commitQueueFailsDeletesTasks() { @Test void commitDatastoreFailsDeletesTasks() { when(mockTransaction.isActive()).thenReturn(true); - List taskReferences = Collections.singletonList(PipelineTaskQueue.TaskReference.of("queue1", "task-ref")); + List taskReferences = Collections + .singletonList(PipelineTaskQueue.TaskReference.of("queue1", "task-ref")); when(mockTaskQueue.enqueue(anyString(), anyCollection())).thenReturn(taskReferences); when(mockTransaction.commit()).thenThrow(new RuntimeException("error committing")); - pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); + pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder() + .method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); assertThrows(RuntimeException.class, () -> pipelineBackendTransaction.commit()); verify(mockTransaction, atMostOnce()).commit(); - verify(mockTaskQueue).enqueue(anyString(), anyCollection()); - verify(mockTaskQueue).deleteTasks(ArgumentMatchers.argThat(taskReferences::containsAll)); + verify(mockTaskQueue, org.mockito.Mockito.never()).enqueue(anyString(), anyCollection()); + verify(mockTaskQueue, org.mockito.Mockito.never()).deleteTasks(ArgumentMatchers.any()); } @Test void enqueue() { - PipelineTaskQueue.TaskSpec task = PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build(); + PipelineTaskQueue.TaskSpec task = PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET) + .callbackPath("path").build(); pipelineBackendTransaction.enqueue("queue1", task); assertFalse(pipelineBackendTransaction.getPendingTaskSpecsByQueue().isEmpty()); @@ -81,7 +96,8 @@ void enqueue() { @Test void rollback() { - pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); + pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder() + .method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); pipelineBackendTransaction.rollback(); verify(mockTransaction).rollback(); diff --git a/java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000..fdbd0b15 --- /dev/null +++ b/java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-subclass From 5125223c8a79ea602de8f5a864c261417ce1eeb2 Mon Sep 17 00:00:00 2001 From: Erik Schultink Date: Tue, 5 May 2026 10:49:02 -0700 Subject: [PATCH 05/25] bump jackson to solve https://github.com/Worklytics/appengine-pipelines/security/dependabot/9 (#95) --- java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/pom.xml b/java/pom.xml index 7b7a643a..79568074 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -19,7 +19,7 @@ 0.3+worklytics.13 UTF-8 - [2.18.2, 2.18.3) + [2.18.6, 2.19.0) 2.55 1.18.42 [2.0.4, 3.0) From 7584c3864cfade33dea3050f96cbdb64a6cf9d0a Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Wed, 27 May 2026 20:12:55 +0200 Subject: [PATCH 06/25] Init claude on project --- .agents/CLAUDE.md | 99 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 .agents/CLAUDE.md diff --git a/.agents/CLAUDE.md b/.agents/CLAUDE.md new file mode 100644 index 00000000..f67a352d --- /dev/null +++ b/.agents/CLAUDE.md @@ -0,0 +1,99 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Overview + +Worklytics' fork of Google's App Engine Pipelines Framework — a Java library for orchestrating asynchronous workflows on Google App Engine. All source lives under `java/`; the root contains only docs and the GitHub workflow. + +## Commands + +All Maven commands must be run from `java/`: + +```shell +# Run all tests +cd java && mvn test + +# Run a single test class +cd java && mvn test -Dtest=PipelineTest + +# Run a test by package pattern (as CI does) +cd java && mvn test -Dtest="com.google.appengine.tools.pipeline.**" + +# Build JAR +cd java && mvn package + +# Publish to GitHub Packages (requires ~/.m2/settings.xml with GitHub token) +cd java && mvn deploy +``` + +Tests require a Datastore emulator; `DatastoreExtension` starts it automatically. The env var `GOOGLE_CLOUD_PROJECT=test-project` is injected by `maven-surefire-plugin` in `pom.xml`. Failing tests are retried up to 3 times. + +## Architecture + +The library has three distinct roles mixed into one artifact: + +- **Client** — `PipelineService` / `PipelineOrchestrator` / `PipelineRunner`: API surface for starting pipelines, polling status, submitting promised values, and cancelling jobs. +- **Runner** — `PipelineServlet` + `TaskHandler` + `AppEngineBackEnd`: servlet-based execution engine that processes task-queue callbacks to advance pipeline state. +- **Admin UI** — JSON handlers (`JsonTreeHandler`, `JsonListHandler`, `JsonClassFilterHandler`) that serve pipeline state to the bundled web console. + +### Job model + +Users define pipelines by subclassing `Job0`…`Job6` (or `Job` directly) and implementing `run()`. Jobs are `Serializable`; instances are stored in Datastore and reconstituted on each execution attempt. + +Inside `run()`, child jobs are scheduled via `futureCall(new ChildJob(), arg1, arg2, ...)`, which returns a `FutureValue`. Passing a `FutureValue` as an argument to another `futureCall` declares a data dependency — the child won't run until its inputs are ready. `PromisedValue` is the mechanism for values provided by external agents; filled via `PipelineService.submitPromisedValue()`. + +`JobRunId` is a tuple `(project, database, namespace, name)` encoded as a colon-delimited string. `:` was chosen over `/` to avoid URL-encoding issues in some clients. + +### Persistence + +`AppEngineBackEnd` is the only production backend. It stores pipeline state in Google Cloud Datastore as `pipeline-job`, `pipeline-slot`, `pipeline-barrier`, and related entity kinds. All entities include a TTL field set 90 days in the future; you must create matching [Datastore TTL policies](https://cloud.google.com/datastore/docs/ttl) per entity kind for automatic cleanup. + +Datastore operations are retried with exponential backoff (up to 5 attempts) on `DatastoreException` or `IOException`. + +### Task queues + +Two implementations of `PipelineTaskQueue`: +- `CloudTasksTaskQueue` (default in production) — uses the Cloud Tasks v2 API. Caches queue location lookups. +- `AppEngineTaskQueue` (legacy, used in tests and when `USE_LEGACY_QUEUES` is set) — uses the old GAE SDK task queue API. + +Pipeline state-management tasks (`HandleSlotFilled`, `FinalizeJob`, etc.) are always sent to the default queue; only `RunJob` tasks respect `JobSetting.OnQueue`. + +### Dependency injection + +The framework uses Dagger 2 with custom scopes: + +- `JobRunServiceComponent` (`@Singleton`) — one per process; top-level component for the runner. Built from `AppEngineHostModule`. +- `StepExecutionComponent` (`@StepExecutionScoped`) — one per task-queue callback invocation. Created via `JobRunServiceComponent.stepExecutionComponent(StepExecutionModule)`. Provides `PipelineManager`, `PipelineService`, `PipelineRunner`, `ShardedJobRunner`. +- `MultiTenantComponent` / `TenantComponent` — used by the *client* side to obtain a `PipelineService` scoped to specific Datastore options (project/namespace/credentials). + +Job classes that need injected dependencies are annotated `@Injectable(DaggerMyContainer.class)`; `PipelineManager` calls the container's `inject()` via reflection before invoking `run()`. + +Servlets use `@AllArgsConstructor(onConstructor_ = @Inject)` so Dagger can construct them with dependencies — see `docs/servlet-di.md` for the rationale. + +### Multi-tenancy + +`AppEngineBackEnd.Options` carries `projectId`, `credentials`, and `DatastoreOptions` (which includes namespace). Pass tenant-specific options to `PipelineService.getInstance(options)` or via `JobSetting` to isolate pipeline data per tenant. + +### MapReduce / ShardedJob + +`MapReduceJob` and `MapJob` are `Job` subclasses that orchestrate sharded parallel work via `ShardedJobRunner`. Each shard runs as an `IncrementalTask`; state is stored as `IncrementalTaskState` in Datastore. `ShardedJobRunId` extends the pipeline `JobRunId` concept. + +## Testing patterns + +Test infrastructure is wired with JUnit 5 extensions: + +- `DatastoreExtension` — starts a `LocalDatastoreHelper` emulator before all tests, resets it between each test. +- `PipelineComponentsExtension` — builds `AppEngineBackEnd` + `StepExecutionComponent` against the emulator. +- `@PipelineSetupExtensions` — meta-annotation that applies both extensions; the standard setup for pipeline integration tests. + +Tests that extend `PipelineTest` get `pipelineService`, `pipelineManager`, and `appEngineBackend` injected. The `AppEngineTaskQueue` (not `CloudTasksTaskQueue`) is used in tests, backed by a `LocalTaskQueue`. + +## Local development environment variables + +| Variable | Effect | +|---|---| +| `USE_LEGACY_QUEUES` | Use GAE SDK task queues instead of Cloud Tasks API | +| `USE_LOCAL_SERVICE` | Treat all services as running on `localhost` as `default/v1` | +| `GAE_SERVICE_HOST_SUFFIX` | Override dynamic AppEngine service host lookup | +| `CLOUDTASKS_QUEUE_LOCATION` | Override dynamic Cloud Tasks queue location lookup | \ No newline at end of file From 29cc5ae72835a70843c1a7c6553a7b96492851e5 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Wed, 27 May 2026 20:13:13 +0200 Subject: [PATCH 07/25] Plan to fix DatastoreOptions.toBuilder() dropping host --- .agents/plans/FIX-DATASTORE-TOBUILDER.md | 242 +++++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 .agents/plans/FIX-DATASTORE-TOBUILDER.md diff --git a/.agents/plans/FIX-DATASTORE-TOBUILDER.md b/.agents/plans/FIX-DATASTORE-TOBUILDER.md new file mode 100644 index 00000000..45c6d521 --- /dev/null +++ b/.agents/plans/FIX-DATASTORE-TOBUILDER.md @@ -0,0 +1,242 @@ +# Fix: DatastoreOptions.toBuilder() drops host in google-cloud-datastore 2.40.0+ + +## Root Cause + +`DatastoreOptions` in `google-cloud-datastore` 2.40.0 (shipped in Google Cloud Java BOM 26.83.0) +introduced a **duplicate `host` field** inside `DatastoreOptions.Builder` that is separate from the +inherited `ServiceOptions.Builder.host` field. + +The `DatastoreOptions$Builder` setHost() correctly writes to **both** fields: + +```java +// Builder.setHost() bytecode: +putfield host // DatastoreOptions.Builder-own host (#69) +invokespecial ServiceOptions$Builder.setHost() // parent field +``` + +But the copy constructor used by `toBuilder()` only copies the DatastoreOptions-specific state +(namespace, databaseId, openTelemetryOptions, channelProvider) and **does NOT transfer the local +`host` field**: + +```java +// Builder copy-constructor references: access$000 (namespace), access$100 (databaseId), +// access$200 (openTelemetryOptions), access$300 (channelProvider) — no host access method. +``` + +Additionally, `Builder.build()` checks the local `host` field first: + +```java +// build() bytecode: +getfield host // local field #69 +ifnonnull → use it +// else: fall back to GrpcTransportOptions.getDefaultEndpoint() → real Cloud Datastore +``` + +**Result:** When `DATASTORE_EMULATOR_HOST` is set, `getDefaultInstance()` correctly configures the +emulator host. Calling `.toBuilder()` on that instance silently loses the host. The rebuilt options +then fall back to the real Cloud Datastore gRPC endpoint — so any entity written via the emulator +cannot be found by code that went through `toBuilder()`. + +This is exactly why pipeline slot lookups fail: the slot is written (job submission path) using +correct emulator `DatastoreOptions`, but the task-handler read path rebuilds options via +`getDefaultInstance().toBuilder()`, loses the host, and queries the real Cloud Datastore instead. + +--- + +## Pattern of the Fix + +Replace every `DatastoreOptions.getDefaultInstance().toBuilder()` (and any `.toBuilder()` call on +an existing `DatastoreOptions`) with an explicit `DatastoreOptions.newBuilder()` that copies each +field manually, including the host: + +```java +// BEFORE (broken in 2.40.0+): +DatastoreOptions.Builder builder = DatastoreOptions.getDefaultInstance().toBuilder(); + +// AFTER: +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); +Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); +``` + +For calls where the source is an existing `DatastoreOptions` instance (not `getDefaultInstance()`): + +```java +// BEFORE: +DatastoreOptions rebuilt = someOptions.toBuilder().build(); + +// AFTER: +DatastoreOptions rebuilt = DatastoreOptions.newBuilder() + .setProjectId(someOptions.getProjectId()) + .setCredentials(someOptions.getCredentials()) + .setTransportOptions(someOptions.getTransportOptions()) + .setHost(someOptions.getHost()) // must always set explicitly + .build(); +``` + +--- + +## Files to Fix + +### 1. `java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java` — **MOST CRITICAL** + +Lines 70–72. This is the task-handler entry point: every pipeline task call rebuilds `DatastoreOptions` +here. Losing the host means the task handler hits the real Cloud Datastore instead of the emulator. + +```java +// BEFORE (lines 70-72): +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder builder = defaultInstance.toBuilder(); + +// AFTER: +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); +Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); +``` + +Also add `import java.util.Optional;` if not already present (it already is in this file). + +--- + +### 2. `java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java` + +Line 53. `getDatastoreOptions()` already overrides host with `getDatastoreHost()` afterwards, so +this only manifests when `getDatastoreHost()` returns null — but the base copy is still wrong. + +```java +// BEFORE (line 53): +DatastoreOptions.Builder optionsBuilder = DatastoreOptions.getDefaultInstance().toBuilder(); + +// AFTER: +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder optionsBuilder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); +Optional.ofNullable(defaultInstance.getHost()).ifPresent(optionsBuilder::setHost); +``` + +--- + +### 3. `java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java` + +Four locations, all the same pattern. Lines 244, 348, and 483 are identical `getDatastore()` methods +in different inner classes; line 610 is `getDatastoreOptions()`. + +**Lines 244, 348, 483** (identical pattern in three inner classes): + +```java +// BEFORE: +datastore = DatastoreOptions.getDefaultInstance().toBuilder() + .setNamespace(settings.getNamespace()) + .build().getService(); + +// AFTER: +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder b = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); +Optional.ofNullable(defaultInstance.getHost()).ifPresent(b::setHost); +Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); +datastore = b.build().getService(); +``` + +**Line 610** (`getDatastoreOptions(MapReduceSettings)`): + +```java +// BEFORE: +DatastoreOptions.Builder builder = DatastoreOptions.getDefaultInstance().toBuilder(); + +// AFTER: +DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); +DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); +Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); +``` + +--- + +### 4. `java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java` + +Line 34. The `datastoreOptions` field here is passed in from the caller. If the caller constructed +it correctly (using the explicit copy pattern), this is safe — but `.toBuilder().build()` is still a +no-op round-trip that can lose the host if the incoming options have it stored only in the +DatastoreOptions-level field. + +```java +// BEFORE (line 34): +Datastore datastore = datastoreOptions.toBuilder().build().getService(); + +// AFTER: +Datastore datastore = DatastoreOptions.newBuilder() + .setProjectId(datastoreOptions.getProjectId()) + .setCredentials(datastoreOptions.getCredentials()) + .setTransportOptions(datastoreOptions.getTransportOptions()) + .setHost(datastoreOptions.getHost()) + .build().getService(); +``` + +Or, since this is a round-trip with no changes, just use the instance directly: + +```java +Datastore datastore = datastoreOptions.getService(); +``` + +--- + +### 5. `java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java` + +Line 24. Same concern as #4 — the `toBuilder().build()` round-trip is used only to pass a +"fresh copy" to `DeleteShardsInfos`. Since `DatastoreOptions` is already immutable and +`DatastoreOptions` is serializable, just pass it directly: + +```java +// BEFORE (line 24): +return new DeleteShardsInfos(datastoreOptions.toBuilder().build(), getJobId(), start, end); + +// AFTER: +return new DeleteShardsInfos(datastoreOptions, getJobId(), start, end); +``` + +--- + +## Suggested Helper (Optional) + +To avoid repeating the copy pattern, consider adding a package-private utility: + +```java +// e.g., in RequestUtils or a new DatastoreUtils class +static DatastoreOptions.Builder copyOf(DatastoreOptions source) { + DatastoreOptions.Builder b = DatastoreOptions.newBuilder() + .setProjectId(source.getProjectId()) + .setCredentials(source.getCredentials()) + .setTransportOptions(source.getTransportOptions()); + Optional.ofNullable(source.getHost()).ifPresent(b::setHost); + Optional.ofNullable(source.getNamespace()).ifPresent(b::setNamespace); + Optional.ofNullable(source.getDatabaseId()).ifPresent(b::setDatabaseId); + return b; +} +``` + +Then each call site becomes: +```java +DatastoreOptions.Builder builder = copyOf(DatastoreOptions.getDefaultInstance()); +``` + +--- + +## Context + +This fix is required when consuming this library from a project using +**Google Cloud Java BOM ≥ 26.83.0** (which brings in `google-cloud-datastore` ≥ 2.40.0). +The library's own BOM (26.53.0 at time of writing) is not affected, but the host project's +BOM takes precedence at runtime. From b3e5c521f694ede59f3567ba8ed6ced128092ad2 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Wed, 27 May 2026 20:29:05 +0200 Subject: [PATCH 08/25] Apply fix, update version to 0.3+worklytics.14 --- java/changes.md | 7 ++++ java/pom.xml | 2 +- .../tools/mapreduce/MapReduceJob.java | 40 ++++++++++++++----- .../mapreduce/ShardedJobAbstractSettings.java | 7 +++- .../shardedjob/pipeline/DeleteShardedJob.java | 2 +- .../pipeline/FinalizeShardsInfos.java | 2 +- .../mapreduce/impl/util/RequestUtils.java | 6 ++- 7 files changed, 51 insertions(+), 15 deletions(-) diff --git a/java/changes.md b/java/changes.md index b0f0eaf9..d8ec1e61 100644 --- a/java/changes.md +++ b/java/changes.md @@ -1,5 +1,12 @@ ## Change Log +### 0.3+worklytics.14 + - Fix: `DatastoreOptions.toBuilder()` drops the emulator host in `google-cloud-datastore` ≥ 2.40.0 + (Google Cloud Java BOM ≥ 26.83.0). Replaced all `toBuilder()` / `getDefaultInstance().toBuilder()` + calls with explicit `DatastoreOptions.newBuilder()` copies that preserve the `host` field. + Affected: `RequestUtils`, `ShardedJobAbstractSettings`, `MapReduceJob` (Sort/Merge/Reduce stages + and `getDatastoreOptions`), `FinalizeShardsInfos`, `DeleteShardedJob`. + ### 0.3+worklytics.7 (March 2025) - Handle cross-services transactionality - Fixes: serialization diff --git a/java/pom.xml b/java/pom.xml index c3e40039..af2f7cfd 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -8,7 +8,7 @@ https://github.com/Worklytics/appengine-pipelines/ - 0.3+worklytics.12 + 0.3+worklytics.14 jar diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java index 2aef9a53..d03f5c1f 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java @@ -241,9 +241,14 @@ static class SortJob extends Job1< protected Datastore getDatastore() { if (datastore == null) { - datastore = DatastoreOptions.getDefaultInstance().toBuilder() - .setNamespace(settings.getNamespace()) - .build().getService(); + DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); + DatastoreOptions.Builder b = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); + java.util.Optional.ofNullable(defaultInstance.getHost()).ifPresent(b::setHost); + java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); + datastore = b.build().getService(); } return datastore; } @@ -345,9 +350,14 @@ static class MergeJob extends protected Datastore getDatastore() { if (datastore == null) { - datastore = DatastoreOptions.getDefaultInstance().toBuilder() - .setNamespace(settings.getNamespace()) - .build().getService(); + DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); + DatastoreOptions.Builder b = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); + java.util.Optional.ofNullable(defaultInstance.getHost()).ifPresent(b::setHost); + java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); + datastore = b.build().getService(); } return datastore; } @@ -480,9 +490,14 @@ static class ReduceJob extends Job1, protected Datastore getDatastore() { if (datastore == null) { - datastore = DatastoreOptions.getDefaultInstance().toBuilder() - .setNamespace(settings.getNamespace()) - .build().getService(); + DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); + DatastoreOptions.Builder b = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); + java.util.Optional.ofNullable(defaultInstance.getHost()).ifPresent(b::setHost); + java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); + datastore = b.build().getService(); } return datastore; } @@ -607,7 +622,12 @@ public String getJobDisplayName() { //NOTE: very coupled to `MapReduceSettings` implementation; could be there, but don't want to couple that to FW implementation // and also don't want it to be public, as would be exposed to API private DatastoreOptions getDatastoreOptions(@NonNull MapReduceSettings settings) { - DatastoreOptions.Builder builder = DatastoreOptions.getDefaultInstance().toBuilder(); + DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); + DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); + java.util.Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); java.util.Optional.ofNullable(settings.getDatastoreHost()).ifPresent(builder::setHost); java.util.Optional.ofNullable(settings.getProjectId()).ifPresent(builder::setProjectId); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java b/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java index c512f1b2..03e73e98 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java @@ -50,7 +50,12 @@ default JobSetting[] toJobSettings(JobSetting... extra) { default DatastoreOptions getDatastoreOptions() { - DatastoreOptions.Builder optionsBuilder = DatastoreOptions.getDefaultInstance().toBuilder(); + DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); + DatastoreOptions.Builder optionsBuilder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); + Optional.ofNullable(defaultInstance.getHost()).ifPresent(optionsBuilder::setHost); Optional.ofNullable(getDatastoreHost()).ifPresent(optionsBuilder::setHost); Optional.ofNullable(getProjectId()).ifPresent(optionsBuilder::setProjectId); Optional.ofNullable(getDatabaseId()).ifPresent(optionsBuilder::setDatabaseId); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java index 669cff58..4e349241 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java @@ -21,7 +21,7 @@ public DeleteShardedJob(DatastoreOptions datastoreOptions, ShardedJobRunId jobId @Override protected Job createShardsJob(int start, int end) { - return new DeleteShardsInfos(datastoreOptions.toBuilder().build(), getJobId(), start, end); + return new DeleteShardsInfos(datastoreOptions, getJobId(), start, end); } @Override diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java index b2ccbf35..cedcb91c 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java @@ -31,7 +31,7 @@ public class FinalizeShardsInfos extends Job0 { @Override public Value run() { - Datastore datastore = datastoreOptions.toBuilder().build().getService(); + Datastore datastore = datastoreOptions.getService(); RetryExecutor.call( ShardedJobRunner.FOREVER_RETRYER, diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java index d206c160..5d4e1eeb 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java @@ -69,7 +69,11 @@ public DatastoreOptions buildDatastoreFromRequest(HttpServletRequest request) { DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - DatastoreOptions.Builder builder = defaultInstance.toBuilder(); + DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(defaultInstance.getCredentials()) + .setTransportOptions(defaultInstance.getTransportOptions()); + Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); if (LOCAL_GAE_PROJECT_ID.equals(defaultInstance.getProjectId())) { log.info("pipelines fw detected running locally with GAE projectId set as 'no_app_id'; this isn't legal GCP project id, so changing to 'local-gae-project'"); From a2bb7e1ae97181714bf41175ed52b9f5cd906564 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Wed, 27 May 2026 21:26:06 +0200 Subject: [PATCH 09/25] NoCredentials on tests --- .../google/appengine/tools/mapreduce/DatastoreExtension.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java b/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java index 33f9b695..4710fabf 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java @@ -1,5 +1,6 @@ package com.google.appengine.tools.mapreduce; +import com.google.cloud.NoCredentials; import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.datastore.testing.LocalDatastoreHelper; @@ -63,6 +64,7 @@ public void beforeEach(ExtensionContext extensionContext) throws Exception { log.info("Datastore emulator reset"); DatastoreOptions options = globalDatastoreHelper.getOptions().toBuilder() .setProjectId(TEST_DATASTORE_PROJECT_ID) + .setCredentials(NoCredentials.getInstance()) .build(); extensionContext.getStore(ExtensionContext.Namespace.GLOBAL).put(DS_OPTIONS_CONTEXT_KEY, options); From 0cff9ea25ce38719c32d40dd03c9e8ad7ec3bc28 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Wed, 27 May 2026 21:42:20 +0200 Subject: [PATCH 10/25] Simplify DatastoreOptions usage. Bound jackson version to 2.9+ --- java/pom.xml | 2 +- .../mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java | 2 +- .../mapreduce/impl/shardedjob/pipeline/FinalizeShardedJob.java | 2 +- .../appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index af2f7cfd..3c8eb102 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -18,7 +18,7 @@ UTF-8 - [2.7, 3.0) + [2.9, 2.99) 2.55 1.18.36 [2.0.4, 3.0) diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java index dd0df7ca..a7c5fcc0 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java @@ -41,7 +41,7 @@ private static void addParentKeyToList(PipelineBackendTransaction tx, List @Override public Value run() { - Datastore datastore = datastoreOptions.toBuilder().build().getService(); + Datastore datastore = datastoreOptions.getService(); final List toDelete = new ArrayList<>((end - start) * 2); PipelineBackendTransaction tx = PipelineBackendTransaction.newInstance(datastore); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardedJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardedJob.java index 7fb8b7c5..5777f5e6 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardedJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardedJob.java @@ -22,7 +22,7 @@ public FinalizeShardedJob(DatastoreOptions datastoreOptions, ShardedJobRunId job @Override protected Job createShardsJob(int start, int end) { - return new FinalizeShardsInfos(this.datastoreOptions.toBuilder().build(), getJobId(), status, start, end); + return new FinalizeShardsInfos(this.datastoreOptions, getJobId(), status, start, end); } @Override diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java index 6516c1d6..71727e7f 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java @@ -114,7 +114,7 @@ public void onRetry(Attempt attempt) { // Only used in tests public AppEngineBackEnd(Options options, PipelineTaskQueue taskQueue, AppEngineServicesService appEngineServicesService) { - this(options.getDatastoreOptions().toBuilder().build().getService(), taskQueue, appEngineServicesService); + this(options.getDatastoreOptions().getService(), taskQueue, appEngineServicesService); } @Builder From 8ba00b8ef94737c3f54cfcf5dda53732f57d9711 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Wed, 27 May 2026 21:56:52 +0200 Subject: [PATCH 11/25] Use NoCredentials on tests / local --- .../appengine/tools/mapreduce/impl/util/RequestUtils.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java index 5d4e1eeb..f46c10d7 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java @@ -3,6 +3,7 @@ import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunId; import com.google.appengine.tools.pipeline.JobRunId; import com.google.appengine.tools.pipeline.impl.backend.*; +import com.google.cloud.NoCredentials; import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; @@ -81,6 +82,7 @@ public DatastoreOptions buildDatastoreFromRequest(HttpServletRequest request) { builder.setProjectId(getLocalProjectIdOverride()); // try to get emulator host from env var, if available builder.setHost(System.getProperty("DATASTORE_EMULATOR_HOST", System.getenv("DATASTORE_EMULATOR_HOST"))); + builder.setCredentials(NoCredentials.getInstance()); } // whatever values are, they can be overridden by request params From a02db65af717634e894d5866777c5233b368c7ba Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Wed, 27 May 2026 22:19:27 +0200 Subject: [PATCH 12/25] Helper method to build DatastoreOptions.Builder from default --- .../tools/mapreduce/MapReduceJob.java | 30 ++++--------------- .../mapreduce/ShardedJobAbstractSettings.java | 8 ++--- .../mapreduce/impl/util/RequestUtils.java | 26 +++++++++++----- 3 files changed, 25 insertions(+), 39 deletions(-) diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java index d03f5c1f..4014fcd2 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java @@ -3,6 +3,7 @@ package com.google.appengine.tools.mapreduce; import com.google.appengine.tools.mapreduce.impl.*; +import com.google.appengine.tools.mapreduce.impl.util.RequestUtils; import com.google.appengine.tools.mapreduce.impl.pipeline.CleanupPipelineJob; import com.google.appengine.tools.mapreduce.impl.pipeline.ExamineStatusAndReturnResult; import com.google.appengine.tools.mapreduce.impl.pipeline.ResultAndStatus; @@ -241,12 +242,7 @@ static class SortJob extends Job1< protected Datastore getDatastore() { if (datastore == null) { - DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - DatastoreOptions.Builder b = DatastoreOptions.newBuilder() - .setProjectId(defaultInstance.getProjectId()) - .setCredentials(defaultInstance.getCredentials()) - .setTransportOptions(defaultInstance.getTransportOptions()); - java.util.Optional.ofNullable(defaultInstance.getHost()).ifPresent(b::setHost); + DatastoreOptions.Builder b = RequestUtils.builderFromDefaultInstance(); java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); datastore = b.build().getService(); } @@ -350,12 +346,7 @@ static class MergeJob extends protected Datastore getDatastore() { if (datastore == null) { - DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - DatastoreOptions.Builder b = DatastoreOptions.newBuilder() - .setProjectId(defaultInstance.getProjectId()) - .setCredentials(defaultInstance.getCredentials()) - .setTransportOptions(defaultInstance.getTransportOptions()); - java.util.Optional.ofNullable(defaultInstance.getHost()).ifPresent(b::setHost); + DatastoreOptions.Builder b = RequestUtils.builderFromDefaultInstance(); java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); datastore = b.build().getService(); } @@ -490,12 +481,7 @@ static class ReduceJob extends Job1, protected Datastore getDatastore() { if (datastore == null) { - DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - DatastoreOptions.Builder b = DatastoreOptions.newBuilder() - .setProjectId(defaultInstance.getProjectId()) - .setCredentials(defaultInstance.getCredentials()) - .setTransportOptions(defaultInstance.getTransportOptions()); - java.util.Optional.ofNullable(defaultInstance.getHost()).ifPresent(b::setHost); + DatastoreOptions.Builder b = RequestUtils.builderFromDefaultInstance(); java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); datastore = b.build().getService(); } @@ -622,13 +608,7 @@ public String getJobDisplayName() { //NOTE: very coupled to `MapReduceSettings` implementation; could be there, but don't want to couple that to FW implementation // and also don't want it to be public, as would be exposed to API private DatastoreOptions getDatastoreOptions(@NonNull MapReduceSettings settings) { - DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() - .setProjectId(defaultInstance.getProjectId()) - .setCredentials(defaultInstance.getCredentials()) - .setTransportOptions(defaultInstance.getTransportOptions()); - java.util.Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); - + DatastoreOptions.Builder builder = RequestUtils.builderFromDefaultInstance(); java.util.Optional.ofNullable(settings.getDatastoreHost()).ifPresent(builder::setHost); java.util.Optional.ofNullable(settings.getProjectId()).ifPresent(builder::setProjectId); java.util.Optional.ofNullable(settings.getDatabaseId()).ifPresent(builder::setDatabaseId); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java b/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java index 03e73e98..35f44bcd 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java @@ -1,6 +1,7 @@ package com.google.appengine.tools.mapreduce; import com.google.appengine.tools.mapreduce.impl.WorkerShardTask; +import com.google.appengine.tools.mapreduce.impl.util.RequestUtils; import com.google.appengine.tools.pipeline.JobSetting; import com.google.cloud.datastore.DatastoreOptions; @@ -50,12 +51,7 @@ default JobSetting[] toJobSettings(JobSetting... extra) { default DatastoreOptions getDatastoreOptions() { - DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - DatastoreOptions.Builder optionsBuilder = DatastoreOptions.newBuilder() - .setProjectId(defaultInstance.getProjectId()) - .setCredentials(defaultInstance.getCredentials()) - .setTransportOptions(defaultInstance.getTransportOptions()); - Optional.ofNullable(defaultInstance.getHost()).ifPresent(optionsBuilder::setHost); + DatastoreOptions.Builder optionsBuilder = RequestUtils.builderFromDefaultInstance(); Optional.ofNullable(getDatastoreHost()).ifPresent(optionsBuilder::setHost); Optional.ofNullable(getProjectId()).ifPresent(optionsBuilder::setProjectId); Optional.ofNullable(getDatabaseId()).ifPresent(optionsBuilder::setDatabaseId); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java index f46c10d7..8786ad2c 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java @@ -60,6 +60,21 @@ public static class Params { @Getter @Setter private String localProjectIdOverride = DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID; + /** + * Builds a DatastoreOptions.Builder pre-populated from the default instance, with null-safe + * credential handling (falls back to NoCredentials when ADC is unavailable, e.g. CI/emulator). + */ + public static DatastoreOptions.Builder builderFromDefaultInstance() { + DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); + com.google.auth.Credentials credentials = defaultInstance.getCredentials(); + DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setCredentials(credentials != null ? credentials : NoCredentials.getInstance()) + .setTransportOptions(defaultInstance.getTransportOptions()); + Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); + return builder; + } + public DatastoreOptions buildDatastoreFromRequest(HttpServletRequest request) { // so we need 1) host, 2) projectId, and 3) databaseId from somewhere @@ -68,15 +83,10 @@ public DatastoreOptions buildDatastoreFromRequest(HttpServletRequest request) { // - set as env vars (system properties), via Maven to pull (wouldn't exactly let us do integration tests) // --> no, host may include port, set at runtime by emulator; not easy/appropriate to fake as env var - DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - - DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() - .setProjectId(defaultInstance.getProjectId()) - .setCredentials(defaultInstance.getCredentials()) - .setTransportOptions(defaultInstance.getTransportOptions()); - Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); + String defaultProjectId = DatastoreOptions.getDefaultInstance().getProjectId(); + DatastoreOptions.Builder builder = builderFromDefaultInstance(); - if (LOCAL_GAE_PROJECT_ID.equals(defaultInstance.getProjectId())) { + if (LOCAL_GAE_PROJECT_ID.equals(defaultProjectId)) { log.info("pipelines fw detected running locally with GAE projectId set as 'no_app_id'; this isn't legal GCP project id, so changing to 'local-gae-project'"); // 'no_app_id' isn't legal name, so change it builder.setProjectId(getLocalProjectIdOverride()); From 1de653d27d3450ae791ddb0da01825ef1113618c Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 09:56:24 +0200 Subject: [PATCH 13/25] Utils class to unify local/test checks handling. --- .github/workflows/test-java.yml | 1 + .../appengine/tools/EnvironmentUtils.java | 72 +++++++++++++++++++ .../tools/mapreduce/MapReduceJob.java | 30 +++++--- .../mapreduce/ShardedJobAbstractSettings.java | 4 +- .../mapreduce/impl/util/RequestUtils.java | 54 +++----------- .../pipeline/di/AppEngineHostModule.java | 14 ++-- .../tools/pipeline/impl/PipelineManager.java | 34 ++++++--- .../tools/mapreduce/DatastoreExtension.java | 12 +++- .../mapreduce/impl/util/RequestUtilsTest.java | 13 +--- .../tools/pipeline/DatastoreExtension.java | 13 ++-- .../backend/AppEngineBackEndOptionsTest.java | 3 +- .../testutil/AppEngineHostTestModule.java | 17 +++-- 12 files changed, 170 insertions(+), 97 deletions(-) create mode 100644 java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java diff --git a/.github/workflows/test-java.yml b/.github/workflows/test-java.yml index bea04a3a..6842b451 100644 --- a/.github/workflows/test-java.yml +++ b/.github/workflows/test-java.yml @@ -42,6 +42,7 @@ jobs: runs-on: ubuntu-latest needs: compile strategy: + fail-fast: true matrix: package: [ "com.google.appengine.tools.cloudtasktest.**", "com.google.appengine.tools.mapreduce.impl.**", diff --git a/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java b/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java new file mode 100644 index 00000000..38242ede --- /dev/null +++ b/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java @@ -0,0 +1,72 @@ +package com.google.appengine.tools; + +import com.google.cloud.NoCredentials; +import com.google.cloud.datastore.DatastoreOptions; +import com.google.common.annotations.VisibleForTesting; +import lombok.extern.java.Log; + +@Log +public class EnvironmentUtils { + + // value of env var GOOGLE_CLOUD_PROJECT when running locally; underscores aren't actually legal in GCP project ids, + // so if this ever ends up being used in a real GCP API call, it blows up in validation before request is even sent by client + public static final String LOCAL_GAE_PROJECT_ID = "no_app_id"; + public static final String TEST_PROJECT_ID = "test-project"; + public static final String DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID = "local-gae-project"; + + /** + * Builds a DatastoreOptions.Builder pre-populated from the default instance, using + * NoCredentials in testing/emulator contexts (no ADC, or local GAE project id). + * @return + */ + public static DatastoreOptions.Builder datastoreBuilderFromDefaultInstance() { + DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); + // in case this needs to be overridden, there is a bug in toBuilder that loses the host + // so we need to copy over everything + DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + .setProjectId(defaultInstance.getProjectId()) + .setTransportOptions(defaultInstance.getTransportOptions()) + .setCredentials(defaultInstance.getCredentials()) + .setDatabaseId(defaultInstance.getDatabaseId()) + .setNamespace(defaultInstance.getNamespace()) + .setHost(defaultInstance.getHost()); + + if (isTestingContext(defaultInstance)) { + // override credentials + builder.setCredentials(NoCredentials.getInstance()); + // set valid project id if needed + if (LOCAL_GAE_PROJECT_ID.equals(defaultInstance.getProjectId())) { + log.info("pipelines fw detected running locally with GAE projectId set as '%s'; this isn't legal GCP project id, so changing to '%s'".formatted(LOCAL_GAE_PROJECT_ID, DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID)); + builder.setProjectId(DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID); + } + // set emulator host if needed + if (getDatastoreEmulatorHost() != null) { + builder.setHost(getDatastoreEmulatorHost()); + } + } + + return builder; + } + + @VisibleForTesting + public static boolean isTestingContext() { + return isTestingContext(DatastoreOptions.getDefaultInstance()); + } + + public static boolean isTestingContext(String projectId) { + return projectId == null || + LOCAL_GAE_PROJECT_ID.equals(projectId) || + DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID.equals(projectId) || + TEST_PROJECT_ID.equals(projectId) || + getDatastoreEmulatorHost() != null; + } + + private static boolean isTestingContext(DatastoreOptions options) { + return isTestingContext(options.getProjectId()); + } + + private static String getDatastoreEmulatorHost() { + return System.getProperty("DATASTORE_EMULATOR_HOST", System.getenv("DATASTORE_EMULATOR_HOST")); + } + +} diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java index 4014fcd2..4ae729c3 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java @@ -2,18 +2,28 @@ package com.google.appengine.tools.mapreduce; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.impl.*; -import com.google.appengine.tools.mapreduce.impl.util.RequestUtils; import com.google.appengine.tools.mapreduce.impl.pipeline.CleanupPipelineJob; import com.google.appengine.tools.mapreduce.impl.pipeline.ExamineStatusAndReturnResult; import com.google.appengine.tools.mapreduce.impl.pipeline.ResultAndStatus; import com.google.appengine.tools.mapreduce.impl.pipeline.ShardedJob; import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunId; import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobSettings; -import com.google.appengine.tools.mapreduce.impl.sort.*; +import com.google.appengine.tools.mapreduce.impl.sort.MergeContext; +import com.google.appengine.tools.mapreduce.impl.sort.MergeShardTask; +import com.google.appengine.tools.mapreduce.impl.sort.SortContext; +import com.google.appengine.tools.mapreduce.impl.sort.SortShardTask; +import com.google.appengine.tools.mapreduce.impl.sort.SortWorker; import com.google.appengine.tools.mapreduce.inputs.GoogleCloudStorageLineInput; import com.google.appengine.tools.mapreduce.outputs.GoogleCloudStorageFileOutput; -import com.google.appengine.tools.pipeline.*; +import com.google.appengine.tools.pipeline.FutureValue; +import com.google.appengine.tools.pipeline.Job0; +import com.google.appengine.tools.pipeline.Job1; +import com.google.appengine.tools.pipeline.JobRunId; +import com.google.appengine.tools.pipeline.PipelineOrchestrator; +import com.google.appengine.tools.pipeline.PromisedValue; +import com.google.appengine.tools.pipeline.Value; import com.google.appengine.tools.pipeline.impl.PipelineManager; import com.google.appengine.tools.pipeline.impl.backend.AppEngineEnvironment; import com.google.cloud.datastore.Datastore; @@ -31,7 +41,11 @@ import java.io.IOException; import java.io.Serial; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.logging.Level; import java.util.logging.Logger; @@ -242,7 +256,7 @@ static class SortJob extends Job1< protected Datastore getDatastore() { if (datastore == null) { - DatastoreOptions.Builder b = RequestUtils.builderFromDefaultInstance(); + DatastoreOptions.Builder b = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); datastore = b.build().getService(); } @@ -346,7 +360,7 @@ static class MergeJob extends protected Datastore getDatastore() { if (datastore == null) { - DatastoreOptions.Builder b = RequestUtils.builderFromDefaultInstance(); + DatastoreOptions.Builder b = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); datastore = b.build().getService(); } @@ -481,7 +495,7 @@ static class ReduceJob extends Job1, protected Datastore getDatastore() { if (datastore == null) { - DatastoreOptions.Builder b = RequestUtils.builderFromDefaultInstance(); + DatastoreOptions.Builder b = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); datastore = b.build().getService(); } @@ -608,7 +622,7 @@ public String getJobDisplayName() { //NOTE: very coupled to `MapReduceSettings` implementation; could be there, but don't want to couple that to FW implementation // and also don't want it to be public, as would be exposed to API private DatastoreOptions getDatastoreOptions(@NonNull MapReduceSettings settings) { - DatastoreOptions.Builder builder = RequestUtils.builderFromDefaultInstance(); + DatastoreOptions.Builder builder = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); java.util.Optional.ofNullable(settings.getDatastoreHost()).ifPresent(builder::setHost); java.util.Optional.ofNullable(settings.getProjectId()).ifPresent(builder::setProjectId); java.util.Optional.ofNullable(settings.getDatabaseId()).ifPresent(builder::setDatabaseId); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java b/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java index 35f44bcd..7b2890b4 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java @@ -1,7 +1,7 @@ package com.google.appengine.tools.mapreduce; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.impl.WorkerShardTask; -import com.google.appengine.tools.mapreduce.impl.util.RequestUtils; import com.google.appengine.tools.pipeline.JobSetting; import com.google.cloud.datastore.DatastoreOptions; @@ -51,7 +51,7 @@ default JobSetting[] toJobSettings(JobSetting... extra) { default DatastoreOptions getDatastoreOptions() { - DatastoreOptions.Builder optionsBuilder = RequestUtils.builderFromDefaultInstance(); + DatastoreOptions.Builder optionsBuilder = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); Optional.ofNullable(getDatastoreHost()).ifPresent(optionsBuilder::setHost); Optional.ofNullable(getProjectId()).ifPresent(optionsBuilder::setProjectId); Optional.ofNullable(getDatabaseId()).ifPresent(optionsBuilder::setDatabaseId); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java index 8786ad2c..86471616 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java @@ -1,25 +1,20 @@ package com.google.appengine.tools.mapreduce.impl.util; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunId; import com.google.appengine.tools.pipeline.JobRunId; -import com.google.appengine.tools.pipeline.impl.backend.*; -import com.google.cloud.NoCredentials; -import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; - -import com.google.common.base.Strings; -import lombok.*; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.Value; import lombok.extern.java.Log; import javax.inject.Inject; import javax.inject.Singleton; -import java.util.Arrays; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** * handles translation to/from request parameters and pipeline backends @@ -48,11 +43,6 @@ public static class Params { private static final String TRACEPARENT_HEADER = "traceparent"; private static final String CLOUD_TRACE_CONTEXT_HEADER = "X-Cloud-Trace-Context"; - - // value of env var GOOGLE_CLOUD_PROJECT when running locally; underscores aren't actually legal in GCP project ids, - // so if this ever ends up being used in a real GCP API call, it blows up in validation before request is even sent by client - public static final String LOCAL_GAE_PROJECT_ID = "no_app_id"; - private static final String DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID = "local-gae-project"; /** * value to override local GAE project id with, when running locally; to allow this on case-by-case basis @@ -60,21 +50,6 @@ public static class Params { @Getter @Setter private String localProjectIdOverride = DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID; - /** - * Builds a DatastoreOptions.Builder pre-populated from the default instance, with null-safe - * credential handling (falls back to NoCredentials when ADC is unavailable, e.g. CI/emulator). - */ - public static DatastoreOptions.Builder builderFromDefaultInstance() { - DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - com.google.auth.Credentials credentials = defaultInstance.getCredentials(); - DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() - .setProjectId(defaultInstance.getProjectId()) - .setCredentials(credentials != null ? credentials : NoCredentials.getInstance()) - .setTransportOptions(defaultInstance.getTransportOptions()); - Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost); - return builder; - } - public DatastoreOptions buildDatastoreFromRequest(HttpServletRequest request) { // so we need 1) host, 2) projectId, and 3) databaseId from somewhere @@ -82,18 +57,7 @@ public DatastoreOptions buildDatastoreFromRequest(HttpServletRequest request) { // - pass as parameters on request // - set as env vars (system properties), via Maven to pull (wouldn't exactly let us do integration tests) // --> no, host may include port, set at runtime by emulator; not easy/appropriate to fake as env var - - String defaultProjectId = DatastoreOptions.getDefaultInstance().getProjectId(); - DatastoreOptions.Builder builder = builderFromDefaultInstance(); - - if (LOCAL_GAE_PROJECT_ID.equals(defaultProjectId)) { - log.info("pipelines fw detected running locally with GAE projectId set as 'no_app_id'; this isn't legal GCP project id, so changing to 'local-gae-project'"); - // 'no_app_id' isn't legal name, so change it - builder.setProjectId(getLocalProjectIdOverride()); - // try to get emulator host from env var, if available - builder.setHost(System.getProperty("DATASTORE_EMULATOR_HOST", System.getenv("DATASTORE_EMULATOR_HOST"))); - builder.setCredentials(NoCredentials.getInstance()); - } + DatastoreOptions.Builder builder = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); // whatever values are, they can be overridden by request params getParam(request, Params.DATASTORE_HOST).ifPresent(builder::setHost); @@ -113,7 +77,7 @@ public Optional getJobId(HttpServletRequest request, String paramName) { } public JobRunId getRootPipelineId(HttpServletRequest request) throws ServletException { - return getJobId(request, Params.ROOT_PIPELINE_ID).map(s -> JobRunId.fromEncodedString(s)) + return getJobId(request, Params.ROOT_PIPELINE_ID).map(JobRunId::fromEncodedString) .orElseThrow(() -> new ServletException(Params.ROOT_PIPELINE_ID + " parameter not found.")); } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java b/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java index 9973483e..063f2d13 100644 --- a/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java @@ -1,13 +1,17 @@ package com.google.appengine.tools.pipeline.di; -import com.google.appengine.tools.mapreduce.impl.util.RequestUtils; -import com.google.appengine.tools.pipeline.impl.backend.*; +import com.google.appengine.tools.EnvironmentUtils; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineEnvironment; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesService; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesServiceImpl; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineStandardGen2; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineTaskQueue; +import com.google.appengine.tools.pipeline.impl.backend.CloudTasksTaskQueue; +import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; import com.google.appengine.v1.ApplicationsClient; import com.google.appengine.v1.ServicesClient; import com.google.appengine.v1.VersionsClient; -import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.tasks.v2.CloudTasksClient; -import dagger.Binds; import dagger.Module; import dagger.Provides; import lombok.SneakyThrows; @@ -108,6 +112,6 @@ PipelineTaskQueue pipelineTaskQueue(AppEngineEnvironment environment, } boolean isTestingContext(AppEngineEnvironment environment) { - return RequestUtils.LOCAL_GAE_PROJECT_ID.equals(environment.getProjectId()) || "test-project" .equals(environment.getProjectId()); + return EnvironmentUtils.isTestingContext(environment.getProjectId()); } } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java index 5d393f55..6b584a69 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java @@ -14,17 +14,27 @@ package com.google.appengine.tools.pipeline.impl; -import com.google.appengine.tools.mapreduce.*; -import com.google.appengine.tools.mapreduce.impl.shardedjob.*; +import com.google.appengine.tools.EnvironmentUtils; +import com.google.appengine.tools.mapreduce.MapJob; +import com.google.appengine.tools.mapreduce.MapReduceJob; +import com.google.appengine.tools.mapreduce.MapReduceSettings; +import com.google.appengine.tools.mapreduce.MapReduceSpecification; +import com.google.appengine.tools.mapreduce.MapSettings; +import com.google.appengine.tools.mapreduce.MapSpecification; +import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTask; +import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTaskState; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobController; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunId; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunner; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobSettings; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobState; import com.google.appengine.tools.pipeline.*; import com.google.appengine.tools.pipeline.di.DaggerMultiTenantComponent; import com.google.appengine.tools.pipeline.di.MultiTenantComponent; import com.google.appengine.tools.pipeline.di.TenantModule; -import com.google.appengine.tools.pipeline.impl.backend.SerializationStrategy; -import com.google.appengine.tools.pipeline.impl.util.DIUtil; -import com.google.cloud.datastore.Key; import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd; import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd; +import com.google.appengine.tools.pipeline.impl.backend.SerializationStrategy; import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec; import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec.Group; import com.google.appengine.tools.pipeline.impl.model.Barrier; @@ -43,11 +53,13 @@ import com.google.appengine.tools.pipeline.impl.tasks.FinalizeJobTask; import com.google.appengine.tools.pipeline.impl.tasks.HandleChildExceptionTask; import com.google.appengine.tools.pipeline.impl.tasks.HandleSlotFilledTask; -import com.google.appengine.tools.pipeline.impl.tasks.RunJobTask; import com.google.appengine.tools.pipeline.impl.tasks.PipelineTask; +import com.google.appengine.tools.pipeline.impl.tasks.RunJobTask; +import com.google.appengine.tools.pipeline.impl.util.DIUtil; import com.google.appengine.tools.pipeline.impl.util.GUIDGenerator; import com.google.appengine.tools.pipeline.impl.util.StringUtils; import com.google.appengine.tools.pipeline.util.Pair; +import com.google.cloud.datastore.Key; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; @@ -65,7 +77,11 @@ import java.lang.reflect.Method; import java.time.Duration; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.logging.Level; import java.util.stream.Collectors; @@ -184,8 +200,8 @@ public JobRecord registerNewJobRecord(UpdateSpec updateSpec, JobSetting[] settin // --> JobRecordFactory or something, that gets injected String projectId = backEnd.getOptions().as(AppEngineBackEnd.Options.class).getProjectId(); - if (projectId.equals("no_app_id")) { - throw new IllegalStateException("projectId is 'no_app_id'; this isn't legal GCP project id"); + if (EnvironmentUtils.LOCAL_GAE_PROJECT_ID.equals(projectId)) { + throw new IllegalStateException("projectId is '%s'; this isn't legal GCP project id".formatted(projectId)); } JobRecord jobRecord = JobRecord.createRootJobRecord(projectId, jobInstance, getSerializationStrategy(), settings); diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java b/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java index 4710fabf..1e2451e1 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java @@ -1,14 +1,19 @@ package com.google.appengine.tools.mapreduce; +import com.google.appengine.tools.EnvironmentUtils; import com.google.cloud.NoCredentials; import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.datastore.testing.LocalDatastoreHelper; import lombok.extern.java.Log; -import org.junit.jupiter.api.extension.*; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; import java.net.ConnectException; -import java.time.Duration; import java.util.logging.Level; /** @@ -20,7 +25,7 @@ @Log public class DatastoreExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback { - public static String TEST_DATASTORE_PROJECT_ID = "test-project"; + public static String TEST_DATASTORE_PROJECT_ID = EnvironmentUtils.TEST_PROJECT_ID; public static String DS_CONTEXT_KEY = "ds-emulator"; public static String DS_OPTIONS_CONTEXT_KEY = "ds-options"; @@ -65,6 +70,7 @@ public void beforeEach(ExtensionContext extensionContext) throws Exception { DatastoreOptions options = globalDatastoreHelper.getOptions().toBuilder() .setProjectId(TEST_DATASTORE_PROJECT_ID) .setCredentials(NoCredentials.getInstance()) + .setHost("localhost:" + globalDatastoreHelper.getPort()) .build(); extensionContext.getStore(ExtensionContext.Namespace.GLOBAL).put(DS_OPTIONS_CONTEXT_KEY, options); diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtilsTest.java b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtilsTest.java index 7d5d828b..ebe5de87 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtilsTest.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtilsTest.java @@ -1,23 +1,16 @@ package com.google.appengine.tools.mapreduce.impl.util; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunId; -import com.google.appengine.tools.pipeline.JobRunId; -import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd; -import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd; -import com.google.appengine.tools.pipeline.impl.model.JobRecord; -import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; -import com.google.cloud.datastore.Key; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import javax.servlet.http.HttpServletRequest; - import java.net.URLDecoder; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -44,7 +37,7 @@ public void getMapReduceId() { @Test public void localBootstrap() { //make this like local situation - System.setProperty("GOOGLE_CLOUD_PROJECT", "no_app_id"); + System.setProperty("GOOGLE_CLOUD_PROJECT", EnvironmentUtils.LOCAL_GAE_PROJECT_ID); System.setProperty("DATASTORE_EMULATOR_HOST", "http://localhost:8081"); RequestUtils requestUtils = new RequestUtils(); diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java b/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java index 3af98faf..f977ecd0 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java @@ -1,15 +1,18 @@ package com.google.appengine.tools.pipeline; +import com.google.appengine.tools.EnvironmentUtils; import com.google.cloud.datastore.Datastore; -import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.datastore.testing.LocalDatastoreHelper; -import io.opentelemetry.api.OpenTelemetry; import lombok.extern.java.Log; -import org.junit.jupiter.api.extension.*; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; import java.net.ConnectException; -import java.time.Duration; import java.util.logging.Level; /** @@ -21,7 +24,7 @@ @Log public class DatastoreExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback { - public static String TEST_DATASTORE_PROJECT_ID = "test-project"; + public static String TEST_DATASTORE_PROJECT_ID = EnvironmentUtils.TEST_PROJECT_ID; public static String DS_CONTEXT_KEY = "ds-emulator"; public static String DS_OPTIONS_CONTEXT_KEY = "ds-options"; diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java index a1da7fed..8cfffa9b 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java @@ -1,13 +1,13 @@ package com.google.appengine.tools.pipeline.impl.backend; import com.google.appengine.tools.pipeline.impl.util.SerializationUtils; -import com.google.auth.Credentials; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.auth.oauth2.UserCredentials; import com.google.cloud.NoCredentials; import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import com.google.cloud.datastore.DatastoreOptions; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; @@ -33,6 +33,7 @@ void getOptions() { Datastore datastore = DatastoreOptions.newBuilder() .setProjectId(credentials.getQuotaProjectId()) .setCredentials(credentials) + .setOpenTelemetryOptions(DatastoreOpenTelemetryOptions.newBuilder().build()) .build().getService(); AppEngineBackEnd backend = new AppEngineBackEnd(datastore, mock(PipelineTaskQueue.class), mock(AppEngineServicesService.class)); diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java b/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java index ad911f71..4ca75c28 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java @@ -1,11 +1,15 @@ package com.google.appengine.tools.pipeline.testutil; -import com.google.appengine.tools.mapreduce.impl.util.RequestUtils; -import com.google.appengine.tools.pipeline.impl.backend.*; +import com.google.appengine.tools.EnvironmentUtils; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineEnvironment; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesService; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineServicesServiceImpl; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineStandardGen2; +import com.google.appengine.tools.pipeline.impl.backend.AppEngineTaskQueue; +import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; import com.google.appengine.v1.ApplicationsClient; import com.google.appengine.v1.ServicesClient; import com.google.appengine.v1.VersionsClient; -import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.tasks.v2.CloudTasksClient; import dagger.Binds; import dagger.Module; @@ -56,7 +60,7 @@ AppEngineServicesService appEngineServicesService(AppEngineServicesServiceImpl i //before, test harness basically did this by overriding env vars via ApiProxy stuff; see LocalModulesServiceTestConfig - if (isTestingContext()) { + if (EnvironmentUtils.isTestingContext()) { return new AppEngineServicesService() { @Override public String getLocation() { @@ -83,11 +87,6 @@ public String getWorkerServiceHostName(String service, String version) { } } - boolean isTestingContext() { - DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); - return RequestUtils.LOCAL_GAE_PROJECT_ID.equals(defaultInstance.getProjectId()) || "test-project" .equals(defaultInstance.getProjectId()); - } - @Module interface Bindings { // this is the real difference for tests atm From db2c79028d6be2d11feeead099dfd109f4cc2cc6 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 10:06:16 +0200 Subject: [PATCH 14/25] Do not set credentials upfront, as if null provokes NPE --- .../java/com/google/appengine/tools/EnvironmentUtils.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java b/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java index 38242ede..08d18686 100644 --- a/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java +++ b/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java @@ -26,7 +26,6 @@ public static DatastoreOptions.Builder datastoreBuilderFromDefaultInstance() { DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() .setProjectId(defaultInstance.getProjectId()) .setTransportOptions(defaultInstance.getTransportOptions()) - .setCredentials(defaultInstance.getCredentials()) .setDatabaseId(defaultInstance.getDatabaseId()) .setNamespace(defaultInstance.getNamespace()) .setHost(defaultInstance.getHost()); @@ -43,6 +42,10 @@ public static DatastoreOptions.Builder datastoreBuilderFromDefaultInstance() { if (getDatastoreEmulatorHost() != null) { builder.setHost(getDatastoreEmulatorHost()); } + } else if (defaultInstance.getCredentials() != null) { + builder.setCredentials(defaultInstance.getCredentials()); + } else { + log.warning("No credentials found for DatastoreOptions.Builder?"); } return builder; From 913f6531dca0810c59353e093ff9b238c40d86e9 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 10:58:18 +0200 Subject: [PATCH 15/25] Deal with datastoreOptions coming from serialization cases --- .github/workflows/test-java.yml | 22 ++++++----- .../appengine/tools/EnvironmentUtils.java | 38 +++++++++++-------- .../appengine/tools/mapreduce/MapJob.java | 3 +- .../pipeline/DeleteShardsInfos.java | 4 +- .../pipeline/FinalizeShardsInfos.java | 4 +- .../pipeline/di/AppEngineHostModule.java | 2 +- .../impl/backend/AppEngineBackEnd.java | 3 +- .../tools/mapreduce/DatastoreExtension.java | 16 ++++---- .../shardedjob/IncrementalTaskStateTest.java | 3 +- .../impl/shardedjob/TestController.java | 3 +- .../tools/pipeline/PipelineTest.java | 5 ++- .../testutil/AppEngineHostTestModule.java | 2 +- 12 files changed, 65 insertions(+), 40 deletions(-) diff --git a/.github/workflows/test-java.yml b/.github/workflows/test-java.yml index 6842b451..28475de4 100644 --- a/.github/workflows/test-java.yml +++ b/.github/workflows/test-java.yml @@ -11,30 +11,32 @@ jobs: compile: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - uses: actions/setup-java@v4 + - uses: actions/checkout@v6 + - uses: actions/setup-java@v5 with: distribution: 'adopt' java-version: '17' # maven won't accept --release argument with java < 8; and 11 is next LTS - id: 'read-cache-maven-packages' name: Read Cached Maven packages - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: ~/.m2 key: ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}- - name: Maven compile run: | cd java/ mvn clean compile - name: Cache compiled classes # Compiled classes are cached, later restored for tests - uses: actions/cache/save@v4 + uses: actions/cache/save@v5 with: path: ${{ github.workspace }}/**/target/**/*.* key: ${{ runner.os }}-${{ env.CODE_CACHE_VERSION }}-${{ github.ref_name }}-${{ github.sha }} - id: 'cache-maven-packages' name: Cache Maven packages - uses: actions/cache/save@v4 + uses: actions/cache/save@v5 with: path: ~/.m2 key: ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}-${{ hashFiles('**/pom.xml') }} @@ -51,19 +53,21 @@ jobs: "com.google.appengine.tools.mapreduce.*Test", "com.google.appengine.tools.pipeline.**" ] steps: - - uses: actions/checkout@v4 - - uses: actions/setup-java@v4 + - uses: actions/checkout@v6 + - uses: actions/setup-java@v5 with: distribution: 'adopt' java-version: '17' # maven won't accept --release argument with java < 8; and 11 is next LTS - id: 'read-cache-maven-packages' name: Read Cached Maven packages - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: ~/.m2 key: ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}- - name: Restore compiled classes from cache - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: ${{ github.workspace }}/**/target/**/*.* key: ${{ runner.os }}-${{ env.CODE_CACHE_VERSION }}-${{ github.ref_name }}-${{ github.sha }} diff --git a/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java b/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java index 08d18686..54fa1d7f 100644 --- a/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java +++ b/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java @@ -1,6 +1,7 @@ package com.google.appengine.tools; import com.google.cloud.NoCredentials; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import com.google.cloud.datastore.DatastoreOptions; import com.google.common.annotations.VisibleForTesting; import lombok.extern.java.Log; @@ -21,20 +22,26 @@ public class EnvironmentUtils { */ public static DatastoreOptions.Builder datastoreBuilderFromDefaultInstance() { DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance(); + return datastoreBuilderFromDatastoreOptions(defaultInstance); + } + + @VisibleForTesting + public static DatastoreOptions.Builder datastoreBuilderFromDatastoreOptions(DatastoreOptions datastoreOptions) { // in case this needs to be overridden, there is a bug in toBuilder that loses the host // so we need to copy over everything DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() - .setProjectId(defaultInstance.getProjectId()) - .setTransportOptions(defaultInstance.getTransportOptions()) - .setDatabaseId(defaultInstance.getDatabaseId()) - .setNamespace(defaultInstance.getNamespace()) - .setHost(defaultInstance.getHost()); + .setProjectId(datastoreOptions.getProjectId()) + .setTransportOptions(datastoreOptions.getTransportOptions()) + .setDatabaseId(datastoreOptions.getDatabaseId()) + .setNamespace(datastoreOptions.getNamespace()) + .setHost(datastoreOptions.getHost()) + .setOpenTelemetryOptions(datastoreOptions.getOpenTelemetryOptions()); - if (isTestingContext(defaultInstance)) { + if (isNotCloudEnvironment(datastoreOptions)) { // override credentials builder.setCredentials(NoCredentials.getInstance()); // set valid project id if needed - if (LOCAL_GAE_PROJECT_ID.equals(defaultInstance.getProjectId())) { + if (LOCAL_GAE_PROJECT_ID.equals(datastoreOptions.getProjectId())) { log.info("pipelines fw detected running locally with GAE projectId set as '%s'; this isn't legal GCP project id, so changing to '%s'".formatted(LOCAL_GAE_PROJECT_ID, DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID)); builder.setProjectId(DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID); } @@ -42,8 +49,9 @@ public static DatastoreOptions.Builder datastoreBuilderFromDefaultInstance() { if (getDatastoreEmulatorHost() != null) { builder.setHost(getDatastoreEmulatorHost()); } - } else if (defaultInstance.getCredentials() != null) { - builder.setCredentials(defaultInstance.getCredentials()); + builder.setOpenTelemetryOptions(DatastoreOpenTelemetryOptions.newBuilder().build()); + } else if (datastoreOptions.getCredentials() != null) { + builder.setCredentials(datastoreOptions.getCredentials()); } else { log.warning("No credentials found for DatastoreOptions.Builder?"); } @@ -51,12 +59,12 @@ public static DatastoreOptions.Builder datastoreBuilderFromDefaultInstance() { return builder; } - @VisibleForTesting - public static boolean isTestingContext() { - return isTestingContext(DatastoreOptions.getDefaultInstance()); + @VisibleForTesting + public static boolean isNotCloudEnvironment() { + return isNotCloudEnvironment(DatastoreOptions.getDefaultInstance()); } - public static boolean isTestingContext(String projectId) { + public static boolean isNotCloudEnvironment(String projectId) { return projectId == null || LOCAL_GAE_PROJECT_ID.equals(projectId) || DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID.equals(projectId) || @@ -64,8 +72,8 @@ public static boolean isTestingContext(String projectId) { getDatastoreEmulatorHost() != null; } - private static boolean isTestingContext(DatastoreOptions options) { - return isTestingContext(options.getProjectId()); + private static boolean isNotCloudEnvironment(DatastoreOptions options) { + return isNotCloudEnvironment(options.getProjectId()); } private static String getDatastoreEmulatorHost() { diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java index d90e1422..05328d75 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java @@ -2,6 +2,7 @@ package com.google.appengine.tools.mapreduce; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.impl.BaseContext; import com.google.appengine.tools.mapreduce.impl.CountersImpl; import com.google.appengine.tools.mapreduce.impl.MapOnlyShardTask; @@ -127,7 +128,7 @@ public Value> handleException(CancellationException ex) { * @return shardedJobId for this job */ private ShardedJobRunId getShardedJobId() { - DatastoreOptions defaultDatastoreOptions = DatastoreOptions.getDefaultInstance(); + DatastoreOptions defaultDatastoreOptions = EnvironmentUtils.datastoreBuilderFromDefaultInstance().build(); return ShardedJobRunId.of( java.util.Optional.ofNullable(settings.getProjectId()).orElseGet(defaultDatastoreOptions::getProjectId), diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java index a7c5fcc0..6a1c8d9b 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java @@ -1,5 +1,6 @@ package com.google.appengine.tools.mapreduce.impl.shardedjob.pipeline; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.RetryExecutor; import com.google.appengine.tools.mapreduce.impl.shardedjob.*; import com.google.appengine.tools.mapreduce.impl.util.DatastoreSerializationUtil; @@ -41,7 +42,8 @@ private static void addParentKeyToList(PipelineBackendTransaction tx, List @Override public Value run() { - Datastore datastore = datastoreOptions.getService(); + Datastore datastore = EnvironmentUtils.datastoreBuilderFromDatastoreOptions(datastoreOptions).build().getService(); + final List toDelete = new ArrayList<>((end - start) * 2); PipelineBackendTransaction tx = PipelineBackendTransaction.newInstance(datastore); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java index cedcb91c..f701f5b9 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java @@ -2,6 +2,7 @@ import static java.util.concurrent.Executors.callable; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.mapreduce.RetryExecutor; import com.google.appengine.tools.mapreduce.impl.shardedjob.*; import com.google.appengine.tools.mapreduce.impl.util.DatastoreSerializationUtil; @@ -31,7 +32,8 @@ public class FinalizeShardsInfos extends Job0 { @Override public Value run() { - Datastore datastore = datastoreOptions.getService(); + // if coming from deserialization may lose transient properties that cause NPE + Datastore datastore = EnvironmentUtils.datastoreBuilderFromDatastoreOptions(datastoreOptions).build().getService(); RetryExecutor.call( ShardedJobRunner.FOREVER_RETRYER, diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java b/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java index 063f2d13..f9c2df8c 100644 --- a/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/di/AppEngineHostModule.java @@ -112,6 +112,6 @@ PipelineTaskQueue pipelineTaskQueue(AppEngineEnvironment environment, } boolean isTestingContext(AppEngineEnvironment environment) { - return EnvironmentUtils.isTestingContext(environment.getProjectId()); + return EnvironmentUtils.isNotCloudEnvironment(environment.getProjectId()); } } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java index 71727e7f..3d584f8a 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java @@ -15,6 +15,7 @@ package com.google.appengine.tools.pipeline.impl.backend; import com.github.rholder.retry.*; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.pipeline.JobRunId; import com.google.appengine.tools.pipeline.NoSuchObjectException; import com.google.appengine.tools.pipeline.impl.model.*; @@ -132,7 +133,7 @@ public static class Options implements PipelineBackEnd.Options { @SneakyThrows public static Options defaults() { return Options.builder() - .datastoreOptions(DatastoreOptions.getDefaultInstance()) + .datastoreOptions(EnvironmentUtils.datastoreBuilderFromDefaultInstance().build()) .credentials(GoogleCredentials.getApplicationDefault()) .projectId(DatastoreOptions.getDefaultProjectId()) .build(); diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java b/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java index 1e2451e1..eeedea90 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java @@ -3,6 +3,7 @@ import com.google.appengine.tools.EnvironmentUtils; import com.google.cloud.NoCredentials; import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.datastore.testing.LocalDatastoreHelper; import lombok.extern.java.Log; @@ -67,13 +68,14 @@ public void afterAll(ExtensionContext extensionContext) throws Exception { public void beforeEach(ExtensionContext extensionContext) throws Exception { globalDatastoreHelper.reset(); log.info("Datastore emulator reset"); - DatastoreOptions options = globalDatastoreHelper.getOptions().toBuilder() - .setProjectId(TEST_DATASTORE_PROJECT_ID) - .setCredentials(NoCredentials.getInstance()) - .setHost("localhost:" + globalDatastoreHelper.getPort()) - .build(); - - extensionContext.getStore(ExtensionContext.Namespace.GLOBAL).put(DS_OPTIONS_CONTEXT_KEY, options); + DatastoreOptions.Builder builder = EnvironmentUtils.datastoreBuilderFromDatastoreOptions(globalDatastoreHelper.getOptions()); + builder.setProjectId(TEST_DATASTORE_PROJECT_ID); + builder.setCredentials(NoCredentials.getInstance()); + builder.setHost("localhost:" + globalDatastoreHelper.getPort()); + builder.setOpenTelemetryOptions(DatastoreOpenTelemetryOptions.newBuilder().build()); + + DatastoreOptions options = builder.build(); + extensionContext.getStore(ExtensionContext.Namespace.GLOBAL).put(DS_OPTIONS_CONTEXT_KEY, options); Datastore datastore = options.getService(); extensionContext.getStore(ExtensionContext.Namespace.GLOBAL).put(DS_CONTEXT_KEY, datastore); diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/IncrementalTaskStateTest.java b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/IncrementalTaskStateTest.java index 5ea7e4f3..5f1d4e5d 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/IncrementalTaskStateTest.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/IncrementalTaskStateTest.java @@ -1,5 +1,6 @@ package com.google.appengine.tools.mapreduce.impl.shardedjob; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.pipeline.DatastoreExtension; import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; @@ -15,7 +16,7 @@ class IncrementalTaskStateTest { @Test void hasNoParent() { - Datastore datastore = DatastoreOptions.getDefaultInstance().getService(); + Datastore datastore = EnvironmentUtils.datastoreBuilderFromDefaultInstance().build().getService(); Key exampleKey = IncrementalTaskState.makeKey(datastore, IncrementalTaskId.of(ShardedJobRunId.builder().project("test-project").jobId("job").build(), 1)); diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/TestController.java b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/TestController.java index 07a40560..f88659fa 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/TestController.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/TestController.java @@ -1,5 +1,6 @@ package com.google.appengine.tools.mapreduce.impl.shardedjob; +import com.google.appengine.tools.EnvironmentUtils; import com.google.appengine.tools.pipeline.PipelineService; import com.google.cloud.datastore.DatastoreOptions; import lombok.*; @@ -30,7 +31,7 @@ public class TestController extends ShardedJobController { public DatastoreOptions getDatastoreOptions() { // in case serialized, recreate to "recover" transient fields - return datastoreOptions.toBuilder().build(); + return EnvironmentUtils.datastoreBuilderFromDatastoreOptions(datastoreOptions).build(); } @Override diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/PipelineTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/PipelineTest.java index fd7c7c0a..ca4dfb34 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/PipelineTest.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/PipelineTest.java @@ -31,6 +31,8 @@ import com.google.apphosting.api.ApiProxy; import com.google.auth.Credentials; +import com.google.cloud.NoCredentials; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import com.google.cloud.datastore.DatastoreOptions; import lombok.Getter; @@ -104,7 +106,8 @@ public static SerializationStrategy getSerializationStrategy() { return new AppEngineBackEnd(AppEngineBackEnd.Options.builder() .datastoreOptions(DatastoreOptions.newBuilder() .setProjectId("test-project") - .setCredentials(mock(Credentials.class)) + .setCredentials(NoCredentials.getInstance()) + .setOpenTelemetryOptions(DatastoreOpenTelemetryOptions.newBuilder().build()) .build()) .build(), null, diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java b/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java index 4ca75c28..e5ba925e 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/testutil/AppEngineHostTestModule.java @@ -60,7 +60,7 @@ AppEngineServicesService appEngineServicesService(AppEngineServicesServiceImpl i //before, test harness basically did this by overriding env vars via ApiProxy stuff; see LocalModulesServiceTestConfig - if (EnvironmentUtils.isTestingContext()) { + if (EnvironmentUtils.isNotCloudEnvironment()) { return new AppEngineServicesService() { @Override public String getLocation() { From bae8dba8579af748fba1aae17325e5da15e3696d Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 12:23:34 +0200 Subject: [PATCH 16/25] Reuse DatastoreOptions.Builder from settings --- .../appengine/tools/mapreduce/MapReduceJob.java | 12 +++--------- .../appengine/tools/pipeline/DatastoreExtension.java | 2 +- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java index 4ae729c3..dbb00fbf 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java @@ -256,9 +256,7 @@ static class SortJob extends Job1< protected Datastore getDatastore() { if (datastore == null) { - DatastoreOptions.Builder b = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); - java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); - datastore = b.build().getService(); + datastore = settings.getDatastoreOptions().getService(); } return datastore; } @@ -360,9 +358,7 @@ static class MergeJob extends protected Datastore getDatastore() { if (datastore == null) { - DatastoreOptions.Builder b = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); - java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); - datastore = b.build().getService(); + datastore = settings.getDatastoreOptions().getService(); } return datastore; } @@ -495,9 +491,7 @@ static class ReduceJob extends Job1, protected Datastore getDatastore() { if (datastore == null) { - DatastoreOptions.Builder b = EnvironmentUtils.datastoreBuilderFromDefaultInstance(); - java.util.Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace); - datastore = b.build().getService(); + datastore = settings.getDatastoreOptions().getService(); } return datastore; } diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java b/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java index f977ecd0..ba7358ae 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java @@ -66,7 +66,7 @@ public void afterAll(ExtensionContext extensionContext) throws Exception { public void beforeEach(ExtensionContext extensionContext) throws Exception { globalDatastoreHelper.reset(); log.info("Datastore emulator reset"); - DatastoreOptions options = globalDatastoreHelper.getOptions().toBuilder() + DatastoreOptions options = EnvironmentUtils.datastoreBuilderFromDatastoreOptions(globalDatastoreHelper.getOptions()) .setProjectId(TEST_DATASTORE_PROJECT_ID) .build(); From 5e719f682032578654656638da21e075ba3bbc98 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 12:27:27 +0200 Subject: [PATCH 17/25] Split regex patterns for mapreduce impl packages tests --- .github/workflows/test-java.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test-java.yml b/.github/workflows/test-java.yml index 28475de4..5ca13816 100644 --- a/.github/workflows/test-java.yml +++ b/.github/workflows/test-java.yml @@ -47,7 +47,8 @@ jobs: fail-fast: true matrix: package: [ "com.google.appengine.tools.cloudtasktest.**", - "com.google.appengine.tools.mapreduce.impl.**", + "%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[A-M].+]", + "%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[N-Z].+]", "com.google.appengine.tools.mapreduce.inputs.**", "com.google.appengine.tools.mapreduce.outputs.**", "com.google.appengine.tools.mapreduce.*Test", From 7bed34b059f59cc8c7d19c97332eb882ed30145b Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 12:34:08 +0200 Subject: [PATCH 18/25] Fix CI setting to use matrix --- .github/workflows/test-java.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-java.yml b/.github/workflows/test-java.yml index 5ca13816..0f1ab889 100644 --- a/.github/workflows/test-java.yml +++ b/.github/workflows/test-java.yml @@ -77,4 +77,4 @@ jobs: APPENGINE_MAPREDUCE_CI_SERVICE_ACCOUNT_KEY: ${{ secrets.APPENGINE_MAPREDUCE_CI_SERVICE_ACCOUNT_KEY }} run: | cd java/ - mvn test -T 1C -B -Dtest=${{ matrix.package }} -Dversions.logOutput=false -DprocessDependencies=false -DprocessDependencyManagement=false \ No newline at end of file + mvn test -T 1C -B "-Dtest=${{ matrix.package }}" -Dversions.logOutput=false -DprocessDependencies=false -DprocessDependencyManagement=false \ No newline at end of file From 541dabcd491433fd9b7e783f94f8a9b76ef14c66 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 12:39:58 +0200 Subject: [PATCH 19/25] Fix regex patterns --- .github/workflows/test-java.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-java.yml b/.github/workflows/test-java.yml index 0f1ab889..95a47faf 100644 --- a/.github/workflows/test-java.yml +++ b/.github/workflows/test-java.yml @@ -47,8 +47,8 @@ jobs: fail-fast: true matrix: package: [ "com.google.appengine.tools.cloudtasktest.**", - "%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[A-M].+]", - "%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[N-Z].+]", + "!%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[A-M].+]", + "!%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[N-Z].+]", "com.google.appengine.tools.mapreduce.inputs.**", "com.google.appengine.tools.mapreduce.outputs.**", "com.google.appengine.tools.mapreduce.*Test", From 71858da67290104e7edfaa1c2a393f03f9c6d6eb Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 13:11:28 +0200 Subject: [PATCH 20/25] pass host property for datastore tests --- .../google/appengine/tools/mapreduce/DatastoreExtension.java | 2 ++ .../com/google/appengine/tools/pipeline/DatastoreExtension.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java b/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java index eeedea90..2fc967b3 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/DatastoreExtension.java @@ -39,11 +39,13 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception { .setConsistency(1.0) .build(); globalDatastoreHelper.start(); + System.setProperty("DATASTORE_EMULATOR_HOST", "localhost:" + globalDatastoreHelper.getPort()); log.info("Datastore emulator started on port : " + globalDatastoreHelper.getPort()); } @Override public void afterAll(ExtensionContext extensionContext) throws Exception { + System.clearProperty("DATASTORE_EMULATOR_HOST"); int attempt = 0; boolean stopped = false; diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java b/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java index ba7358ae..5947bc9b 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/DatastoreExtension.java @@ -37,11 +37,13 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception { .setConsistency(1.0) .build(); globalDatastoreHelper.start(); + System.setProperty("DATASTORE_EMULATOR_HOST", "localhost:" + globalDatastoreHelper.getPort()); log.info("Datastore emulator started on port : " + globalDatastoreHelper.getPort()); } @Override public void afterAll(ExtensionContext extensionContext) throws Exception { + System.clearProperty("DATASTORE_EMULATOR_HOST"); int attempt = 0; boolean stopped = false; From 5e95be88e5e25e42832955a6a8d8b150be4a4103 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 15:02:57 +0200 Subject: [PATCH 21/25] Fix PipelineBackendTransactionImplTest to match Datastore-first commit order Test was written for old order (enqueue tasks then commit Datastore). Current implementation commits Datastore first, so when Datastore fails tasks are never enqueued and nothing needs to be deleted. Co-Authored-By: Claude Sonnet 4.6 --- .../tools/txn/PipelineBackendTransactionImplTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java b/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java index 2c1a9bf9..f1fbd253 100644 --- a/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java +++ b/java/src/test/java/com/google/appengine/tools/txn/PipelineBackendTransactionImplTest.java @@ -3,6 +3,7 @@ import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue; import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.Transaction; +import com.google.protobuf.ByteString; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; @@ -27,6 +28,7 @@ void setUp() { mockTaskQueue = mock(PipelineTaskQueue.class); mockDatastore = mock(Datastore.class); when(mockDatastore.newTransaction()).thenReturn(mockTransaction); + when(mockTransaction.getTransactionId()).thenReturn(ByteString.copyFromUtf8("transaction-id")); pipelineBackendTransaction = new PipelineBackendTransactionImpl(mockDatastore, mockTaskQueue); } @@ -55,10 +57,8 @@ void commitQueueFailsDeletesTasks() { } @Test - void commitDatastoreFailsDeletesTasks() { + void commitDatastoreFailsNoTasksEnqueuedOrDeleted() { when(mockTransaction.isActive()).thenReturn(true); - List taskReferences = Collections.singletonList(PipelineTaskQueue.TaskReference.of("queue1", "task-ref")); - when(mockTaskQueue.enqueue(anyString(), anyCollection())).thenReturn(taskReferences); when(mockTransaction.commit()).thenThrow(new RuntimeException("error committing")); pipelineBackendTransaction.enqueue("queue1", PipelineTaskQueue.TaskSpec.builder().method(PipelineTaskQueue.TaskSpec.Method.GET).callbackPath("path").build()); @@ -66,8 +66,9 @@ void commitDatastoreFailsDeletesTasks() { assertThrows(RuntimeException.class, () -> pipelineBackendTransaction.commit()); verify(mockTransaction, atMostOnce()).commit(); - verify(mockTaskQueue).enqueue(anyString(), anyCollection()); - verify(mockTaskQueue).deleteTasks(ArgumentMatchers.argThat(taskReferences::containsAll)); + // Datastore commits first; on failure tasks are never enqueued so there is nothing to delete + verify(mockTaskQueue, never()).enqueue(anyString(), anyCollection()); + verify(mockTaskQueue, never()).deleteTasks(any()); } @Test From 775955a6c4d567c5c719794952c9f25868744848 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 16:22:49 +0200 Subject: [PATCH 22/25] Compile to Java 21 bytecode --- java/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index 1c15ed9c..a558db13 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -41,8 +41,8 @@ maven-compiler-plugin 3.13.0 - 17 - 17 + 21 + 21 From 72b4d65aae4094fb0c100f27bbef15ae0ccb58df Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 16:28:37 +0200 Subject: [PATCH 23/25] Remove deprecated finalize() from PipelineBackendTransactionImpl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit finalize() is deprecated and marked for removal in modern Java. The override only logged a warning for unclosed transactions — no actual resource cleanup. Callers already have rollbackIfActive() for explicit cleanup. Co-Authored-By: Claude Sonnet 4.6 --- .../tools/txn/PipelineBackendTransactionImpl.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java b/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java index ae3512ca..16b87f28 100644 --- a/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java +++ b/java/src/main/java/com/google/appengine/tools/txn/PipelineBackendTransactionImpl.java @@ -9,7 +9,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.stream.Collectors; @@ -297,17 +296,4 @@ private void rollbackAllServices() { rollbackTasks(); } - @Override - protected void finalize() throws Throwable { - try { - if (this.getDsTransaction().isActive()) { - // shouldn't happen, unless opening tnx just for read, just is kind of absurd in - // a strong consistency model - log.log(Level.WARNING, String.format("Finalizing PipelineBackendTransactionImpl transaction open for %s", - stopwatch.elapsed(TimeUnit.MILLISECONDS))); - } - } finally { - super.finalize(); - } - } } From 52d210fd7f8303f3f7fbfb4f34089985583ba831 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 17:55:13 +0200 Subject: [PATCH 24/25] Fix tests --- java/pom.xml | 2 +- .../com/google/appengine/tools/EnvironmentUtils.java | 11 +++++------ .../tools/pipeline/impl/backend/AppEngineBackEnd.java | 7 ++++--- .../impl/backend/AppEngineBackEndOptionsTest.java | 3 ++- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index c00c685f..9e8bd09e 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -261,7 +261,7 @@ org.mockito mockito-core - 5.21.0 + 5.23.0 test diff --git a/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java b/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java index 54fa1d7f..9bd262d4 100644 --- a/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java +++ b/java/src/main/java/com/google/appengine/tools/EnvironmentUtils.java @@ -37,6 +37,10 @@ public static DatastoreOptions.Builder datastoreBuilderFromDatastoreOptions(Data .setHost(datastoreOptions.getHost()) .setOpenTelemetryOptions(datastoreOptions.getOpenTelemetryOptions()); + // set emulator host if needed + if (getDatastoreEmulatorHost() != null) { + builder.setHost(getDatastoreEmulatorHost()); + } if (isNotCloudEnvironment(datastoreOptions)) { // override credentials builder.setCredentials(NoCredentials.getInstance()); @@ -45,10 +49,6 @@ public static DatastoreOptions.Builder datastoreBuilderFromDatastoreOptions(Data log.info("pipelines fw detected running locally with GAE projectId set as '%s'; this isn't legal GCP project id, so changing to '%s'".formatted(LOCAL_GAE_PROJECT_ID, DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID)); builder.setProjectId(DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID); } - // set emulator host if needed - if (getDatastoreEmulatorHost() != null) { - builder.setHost(getDatastoreEmulatorHost()); - } builder.setOpenTelemetryOptions(DatastoreOpenTelemetryOptions.newBuilder().build()); } else if (datastoreOptions.getCredentials() != null) { builder.setCredentials(datastoreOptions.getCredentials()); @@ -68,8 +68,7 @@ public static boolean isNotCloudEnvironment(String projectId) { return projectId == null || LOCAL_GAE_PROJECT_ID.equals(projectId) || DEFAULT_OVERRIDE_LOCAL_GAE_PROJECT_ID.equals(projectId) || - TEST_PROJECT_ID.equals(projectId) || - getDatastoreEmulatorHost() != null; + TEST_PROJECT_ID.equals(projectId); } private static boolean isNotCloudEnvironment(DatastoreOptions options) { diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java index 3d584f8a..9fa5120f 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java @@ -115,7 +115,7 @@ public void onRetry(Attempt attempt) { // Only used in tests public AppEngineBackEnd(Options options, PipelineTaskQueue taskQueue, AppEngineServicesService appEngineServicesService) { - this(options.getDatastoreOptions().getService(), taskQueue, appEngineServicesService); + this(EnvironmentUtils.datastoreBuilderFromDatastoreOptions(options.getDatastoreOptions()).build().getService(), taskQueue, appEngineServicesService); } @Builder @@ -132,10 +132,11 @@ public static class Options implements PipelineBackEnd.Options { @SneakyThrows public static Options defaults() { + DatastoreOptions dsOptions = EnvironmentUtils.datastoreBuilderFromDefaultInstance().build(); return Options.builder() - .datastoreOptions(EnvironmentUtils.datastoreBuilderFromDefaultInstance().build()) + .datastoreOptions(dsOptions) .credentials(GoogleCredentials.getApplicationDefault()) - .projectId(DatastoreOptions.getDefaultProjectId()) + .projectId(dsOptions.getProjectId()) .build(); } diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java index 8cfffa9b..3df8941b 100644 --- a/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java +++ b/java/src/test/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEndOptionsTest.java @@ -25,7 +25,8 @@ void getOptions() { //TODO: replace this with GoogleCredentials.getApplicationDefault() when it's available, if we ever auth the Github // Action with GCP (debatably necessary for integration tests) GoogleCredentials credentials = GoogleCredentials.newBuilder() - .setQuotaProjectId("test-project") + // use some non-local/test project id to not override the credentials + .setQuotaProjectId("some-project") .setAccessToken(AccessToken.newBuilder().setTokenValue("token").setExpirationTime(new Date()).build()) .build(); From 27dd669e22c133d259d05ece331fb2df5946511b Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Thu, 28 May 2026 18:53:24 +0200 Subject: [PATCH 25/25] Better naming for test matrix entries --- .github/workflows/test-java.yml | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/.github/workflows/test-java.yml b/.github/workflows/test-java.yml index 2f9ec24a..1ee4ed29 100644 --- a/.github/workflows/test-java.yml +++ b/.github/workflows/test-java.yml @@ -43,18 +43,27 @@ jobs: path: ~/.m2 key: ${{ runner.os }}-${{ env.M2_CACHE_VERSION }}-${{ hashFiles('**/pom.xml') }} test: + name: test (${{ matrix.name }}) runs-on: ubuntu-latest needs: compile strategy: fail-fast: true matrix: - package: [ "com.google.appengine.tools.cloudtasktest.**", - "!%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[A-M].+]", - "!%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[N-Z].+]", - "com.google.appengine.tools.mapreduce.inputs.**", - "com.google.appengine.tools.mapreduce.outputs.**", - "com.google.appengine.tools.mapreduce.*Test", - "com.google.appengine.tools.pipeline.**" ] + include: + - name: cloudtasktest + package: "com.google.appengine.tools.cloudtasktest.**" + - name: mapreduce-impl-N-to-Z + package: "!%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[A-M].+]" + - name: mapreduce-impl-A-to-M + package: "!%regex[com\\.google\\.appengine\\.tools\\.mapreduce\\.impl\\.([a-z]+\\.)*[N-Z].+]" + - name: mapreduce-inputs + package: "com.google.appengine.tools.mapreduce.inputs.**" + - name: mapreduce-outputs + package: "com.google.appengine.tools.mapreduce.outputs.**" + - name: mapreduce-top-level + package: "com.google.appengine.tools.mapreduce.*Test" + - name: pipeline + package: "com.google.appengine.tools.pipeline.**" steps: - uses: actions/checkout@v6 - uses: actions/setup-java@v5 @@ -74,7 +83,7 @@ jobs: with: path: ${{ github.workspace }}/**/target/**/*.* key: ${{ runner.os }}-${{ env.CODE_CACHE_VERSION }}-${{ github.ref_name }}-${{ github.sha }} - - name: Run Tests for Package ${{ matrix.package }} + - name: Run Tests for Package ${{ matrix.name }} env: APPENGINE_MAPREDUCE_CI_SERVICE_ACCOUNT_KEY: ${{ secrets.APPENGINE_MAPREDUCE_CI_SERVICE_ACCOUNT_KEY }} run: |