Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7e46e5e
bump version number; delete old pom
eschultink Feb 25, 2026
c4a0933
bump to java21
eschultink Feb 25, 2026
44771b0
cleanup workflow
eschultink Feb 25, 2026
8b20580
organize dependencies
eschultink Feb 25, 2026
a6375f0
Merge pull request #93 from Worklytics/s219-prep-next-version
jlorper Feb 27, 2026
5125223
bump jackson to solve https://github.com/Worklytics/appengine-pipelin…
eschultink May 5, 2026
7584c38
Init claude on project
jlorper May 27, 2026
29cc5ae
Plan to fix DatastoreOptions.toBuilder() dropping host
jlorper May 27, 2026
b3e5c52
Apply fix, update version to 0.3+worklytics.14
jlorper May 27, 2026
a2bb7e1
NoCredentials on tests
jlorper May 27, 2026
0cff9ea
Simplify DatastoreOptions usage. Bound jackson version to 2.9+
jlorper May 27, 2026
8ba00b8
Use NoCredentials on tests / local
jlorper May 27, 2026
a02db65
Helper method to build DatastoreOptions.Builder from default
jlorper May 27, 2026
1de653d
Utils class to unify local/test checks handling.
jlorper May 28, 2026
db2c790
Do not set credentials upfront, as if null provokes NPE
jlorper May 28, 2026
913f653
Deal with datastoreOptions coming from serialization cases
jlorper May 28, 2026
bae8dba
Reuse DatastoreOptions.Builder from settings
jlorper May 28, 2026
5e719f6
Split regex patterns for mapreduce impl packages tests
jlorper May 28, 2026
7bed34b
Fix CI setting to use matrix
jlorper May 28, 2026
541dabc
Fix regex patterns
jlorper May 28, 2026
71858da
pass host property for datastore tests
jlorper May 28, 2026
5e95be8
Fix PipelineBackendTransactionImplTest to match Datastore-first commi…
jlorper May 28, 2026
ff4cfe5
Merge remote-tracking branch 'origin/main' into s226-local-dev-fix
jlorper May 28, 2026
775955a
Compile to Java 21 bytecode
jlorper May 28, 2026
72b4d65
Remove deprecated finalize() from PipelineBackendTransactionImpl
jlorper May 28, 2026
8b52a84
Merge remote-tracking branch 'refs/remotes/origin/rc-0.13' into s226-…
jlorper May 28, 2026
52d210f
Fix tests
jlorper May 28, 2026
27dd669
Better naming for test matrix entries
jlorper May 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions .agents/CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Overview

Worklytics' fork of Google's App Engine Pipelines Framework — a Java library for orchestrating asynchronous workflows on Google App Engine. All source lives under `java/`; the root contains only docs and the GitHub workflow.

## Commands

All Maven commands must be run from `java/`:

```shell
# Run all tests
cd java && mvn test

# Run a single test class
cd java && mvn test -Dtest=PipelineTest

# Run a test by package pattern (as CI does)
cd java && mvn test -Dtest="com.google.appengine.tools.pipeline.**"

# Build JAR
cd java && mvn package

# Publish to GitHub Packages (requires ~/.m2/settings.xml with GitHub token)
cd java && mvn deploy
```

Tests require a Datastore emulator; `DatastoreExtension` starts it automatically. The env var `GOOGLE_CLOUD_PROJECT=test-project` is injected by `maven-surefire-plugin` in `pom.xml`. Failing tests are retried up to 3 times.

## Architecture

The library has three distinct roles mixed into one artifact:

- **Client** — `PipelineService` / `PipelineOrchestrator` / `PipelineRunner`: API surface for starting pipelines, polling status, submitting promised values, and cancelling jobs.
- **Runner** — `PipelineServlet` + `TaskHandler` + `AppEngineBackEnd`: servlet-based execution engine that processes task-queue callbacks to advance pipeline state.
- **Admin UI** — JSON handlers (`JsonTreeHandler`, `JsonListHandler`, `JsonClassFilterHandler`) that serve pipeline state to the bundled web console.

### Job model

Users define pipelines by subclassing `Job0`…`Job6` (or `Job<E>` directly) and implementing `run()`. Jobs are `Serializable`; instances are stored in Datastore and reconstituted on each execution attempt.

Inside `run()`, child jobs are scheduled via `futureCall(new ChildJob(), arg1, arg2, ...)`, which returns a `FutureValue<T>`. Passing a `FutureValue` as an argument to another `futureCall` declares a data dependency — the child won't run until its inputs are ready. `PromisedValue` is the mechanism for values provided by external agents; filled via `PipelineService.submitPromisedValue()`.

`JobRunId` is a tuple `(project, database, namespace, name)` encoded as a colon-delimited string. `:` was chosen over `/` to avoid URL-encoding issues in some clients.

### Persistence

`AppEngineBackEnd` is the only production backend. It stores pipeline state in Google Cloud Datastore as `pipeline-job`, `pipeline-slot`, `pipeline-barrier`, and related entity kinds. All entities include a TTL field set 90 days in the future; you must create matching [Datastore TTL policies](https://cloud.google.com/datastore/docs/ttl) per entity kind for automatic cleanup.

Datastore operations are retried with exponential backoff (up to 5 attempts) on `DatastoreException` or `IOException`.

### Task queues

Two implementations of `PipelineTaskQueue`:
- `CloudTasksTaskQueue` (default in production) — uses the Cloud Tasks v2 API. Caches queue location lookups.
- `AppEngineTaskQueue` (legacy, used in tests and when `USE_LEGACY_QUEUES` is set) — uses the old GAE SDK task queue API.

Pipeline state-management tasks (`HandleSlotFilled`, `FinalizeJob`, etc.) are always sent to the default queue; only `RunJob` tasks respect `JobSetting.OnQueue`.

### Dependency injection

The framework uses Dagger 2 with custom scopes:

- `JobRunServiceComponent` (`@Singleton`) — one per process; top-level component for the runner. Built from `AppEngineHostModule`.
- `StepExecutionComponent` (`@StepExecutionScoped`) — one per task-queue callback invocation. Created via `JobRunServiceComponent.stepExecutionComponent(StepExecutionModule)`. Provides `PipelineManager`, `PipelineService`, `PipelineRunner`, `ShardedJobRunner`.
- `MultiTenantComponent` / `TenantComponent` — used by the *client* side to obtain a `PipelineService` scoped to specific Datastore options (project/namespace/credentials).

Job classes that need injected dependencies are annotated `@Injectable(DaggerMyContainer.class)`; `PipelineManager` calls the container's `inject()` via reflection before invoking `run()`.

Servlets use `@AllArgsConstructor(onConstructor_ = @Inject)` so Dagger can construct them with dependencies — see `docs/servlet-di.md` for the rationale.

### Multi-tenancy

`AppEngineBackEnd.Options` carries `projectId`, `credentials`, and `DatastoreOptions` (which includes namespace). Pass tenant-specific options to `PipelineService.getInstance(options)` or via `JobSetting` to isolate pipeline data per tenant.

### MapReduce / ShardedJob

`MapReduceJob` and `MapJob` are `Job` subclasses that orchestrate sharded parallel work via `ShardedJobRunner`. Each shard runs as an `IncrementalTask`; state is stored as `IncrementalTaskState` in Datastore. `ShardedJobRunId` extends the pipeline `JobRunId` concept.

## Testing patterns

Test infrastructure is wired with JUnit 5 extensions:

- `DatastoreExtension` — starts a `LocalDatastoreHelper` emulator before all tests, resets it between each test.
- `PipelineComponentsExtension` — builds `AppEngineBackEnd` + `StepExecutionComponent` against the emulator.
- `@PipelineSetupExtensions` — meta-annotation that applies both extensions; the standard setup for pipeline integration tests.

Tests that extend `PipelineTest` get `pipelineService`, `pipelineManager`, and `appEngineBackend` injected. The `AppEngineTaskQueue` (not `CloudTasksTaskQueue`) is used in tests, backed by a `LocalTaskQueue`.

## Local development environment variables

| Variable | Effect |
|---|---|
| `USE_LEGACY_QUEUES` | Use GAE SDK task queues instead of Cloud Tasks API |
| `USE_LOCAL_SERVICE` | Treat all services as running on `localhost` as `default/v1` |
| `GAE_SERVICE_HOST_SUFFIX` | Override dynamic AppEngine service host lookup |
| `CLOUDTASKS_QUEUE_LOCATION` | Override dynamic Cloud Tasks queue location lookup |
242 changes: 242 additions & 0 deletions .agents/plans/FIX-DATASTORE-TOBUILDER.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
# Fix: DatastoreOptions.toBuilder() drops host in google-cloud-datastore 2.40.0+

## Root Cause

`DatastoreOptions` in `google-cloud-datastore` 2.40.0 (shipped in Google Cloud Java BOM 26.83.0)
introduced a **duplicate `host` field** inside `DatastoreOptions.Builder` that is separate from the
inherited `ServiceOptions.Builder.host` field.

The `DatastoreOptions$Builder` setHost() correctly writes to **both** fields:

```java
// Builder.setHost() bytecode:
putfield host // DatastoreOptions.Builder-own host (#69)
invokespecial ServiceOptions$Builder.setHost() // parent field
```

But the copy constructor used by `toBuilder()` only copies the DatastoreOptions-specific state
(namespace, databaseId, openTelemetryOptions, channelProvider) and **does NOT transfer the local
`host` field**:

```java
// Builder copy-constructor references: access$000 (namespace), access$100 (databaseId),
// access$200 (openTelemetryOptions), access$300 (channelProvider) — no host access method.
```

Additionally, `Builder.build()` checks the local `host` field first:

```java
// build() bytecode:
getfield host // local field #69
ifnonnull → use it
// else: fall back to GrpcTransportOptions.getDefaultEndpoint() → real Cloud Datastore
```

**Result:** When `DATASTORE_EMULATOR_HOST` is set, `getDefaultInstance()` correctly configures the
emulator host. Calling `.toBuilder()` on that instance silently loses the host. The rebuilt options
then fall back to the real Cloud Datastore gRPC endpoint — so any entity written via the emulator
cannot be found by code that went through `toBuilder()`.

This is exactly why pipeline slot lookups fail: the slot is written (job submission path) using
correct emulator `DatastoreOptions`, but the task-handler read path rebuilds options via
`getDefaultInstance().toBuilder()`, loses the host, and queries the real Cloud Datastore instead.

---

## Pattern of the Fix

Replace every `DatastoreOptions.getDefaultInstance().toBuilder()` (and any `.toBuilder()` call on
an existing `DatastoreOptions`) with an explicit `DatastoreOptions.newBuilder()` that copies each
field manually, including the host:

```java
// BEFORE (broken in 2.40.0+):
DatastoreOptions.Builder builder = DatastoreOptions.getDefaultInstance().toBuilder();

// AFTER:
DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance();
DatastoreOptions.Builder builder = DatastoreOptions.newBuilder()
.setProjectId(defaultInstance.getProjectId())
.setCredentials(defaultInstance.getCredentials())
.setTransportOptions(defaultInstance.getTransportOptions());
Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost);
```

For calls where the source is an existing `DatastoreOptions` instance (not `getDefaultInstance()`):

```java
// BEFORE:
DatastoreOptions rebuilt = someOptions.toBuilder().build();

// AFTER:
DatastoreOptions rebuilt = DatastoreOptions.newBuilder()
.setProjectId(someOptions.getProjectId())
.setCredentials(someOptions.getCredentials())
.setTransportOptions(someOptions.getTransportOptions())
.setHost(someOptions.getHost()) // must always set explicitly
.build();
```

---

## Files to Fix

### 1. `java/src/main/java/com/google/appengine/tools/mapreduce/impl/util/RequestUtils.java` — **MOST CRITICAL**

Lines 70–72. This is the task-handler entry point: every pipeline task call rebuilds `DatastoreOptions`
here. Losing the host means the task handler hits the real Cloud Datastore instead of the emulator.

```java
// BEFORE (lines 70-72):
DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance();
DatastoreOptions.Builder builder = defaultInstance.toBuilder();

// AFTER:
DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance();
DatastoreOptions.Builder builder = DatastoreOptions.newBuilder()
.setProjectId(defaultInstance.getProjectId())
.setCredentials(defaultInstance.getCredentials())
.setTransportOptions(defaultInstance.getTransportOptions());
Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost);
```

Also add `import java.util.Optional;` if not already present (it already is in this file).

---

### 2. `java/src/main/java/com/google/appengine/tools/mapreduce/ShardedJobAbstractSettings.java`

Line 53. `getDatastoreOptions()` already overrides host with `getDatastoreHost()` afterwards, so
this only manifests when `getDatastoreHost()` returns null — but the base copy is still wrong.

```java
// BEFORE (line 53):
DatastoreOptions.Builder optionsBuilder = DatastoreOptions.getDefaultInstance().toBuilder();

// AFTER:
DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance();
DatastoreOptions.Builder optionsBuilder = DatastoreOptions.newBuilder()
.setProjectId(defaultInstance.getProjectId())
.setCredentials(defaultInstance.getCredentials())
.setTransportOptions(defaultInstance.getTransportOptions());
Optional.ofNullable(defaultInstance.getHost()).ifPresent(optionsBuilder::setHost);
```

---

### 3. `java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java`

Four locations, all the same pattern. Lines 244, 348, and 483 are identical `getDatastore()` methods
in different inner classes; line 610 is `getDatastoreOptions()`.

**Lines 244, 348, 483** (identical pattern in three inner classes):

```java
// BEFORE:
datastore = DatastoreOptions.getDefaultInstance().toBuilder()
.setNamespace(settings.getNamespace())
.build().getService();

// AFTER:
DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance();
DatastoreOptions.Builder b = DatastoreOptions.newBuilder()
.setProjectId(defaultInstance.getProjectId())
.setCredentials(defaultInstance.getCredentials())
.setTransportOptions(defaultInstance.getTransportOptions());
Optional.ofNullable(defaultInstance.getHost()).ifPresent(b::setHost);
Optional.ofNullable(settings.getNamespace()).ifPresent(b::setNamespace);
datastore = b.build().getService();
```

**Line 610** (`getDatastoreOptions(MapReduceSettings)`):

```java
// BEFORE:
DatastoreOptions.Builder builder = DatastoreOptions.getDefaultInstance().toBuilder();

// AFTER:
DatastoreOptions defaultInstance = DatastoreOptions.getDefaultInstance();
DatastoreOptions.Builder builder = DatastoreOptions.newBuilder()
.setProjectId(defaultInstance.getProjectId())
.setCredentials(defaultInstance.getCredentials())
.setTransportOptions(defaultInstance.getTransportOptions());
Optional.ofNullable(defaultInstance.getHost()).ifPresent(builder::setHost);
```

---

### 4. `java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java`

Line 34. The `datastoreOptions` field here is passed in from the caller. If the caller constructed
it correctly (using the explicit copy pattern), this is safe — but `.toBuilder().build()` is still a
no-op round-trip that can lose the host if the incoming options have it stored only in the
DatastoreOptions-level field.

```java
// BEFORE (line 34):
Datastore datastore = datastoreOptions.toBuilder().build().getService();

// AFTER:
Datastore datastore = DatastoreOptions.newBuilder()
.setProjectId(datastoreOptions.getProjectId())
.setCredentials(datastoreOptions.getCredentials())
.setTransportOptions(datastoreOptions.getTransportOptions())
.setHost(datastoreOptions.getHost())
.build().getService();
```

Or, since this is a round-trip with no changes, just use the instance directly:

```java
Datastore datastore = datastoreOptions.getService();
```

---

### 5. `java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardedJob.java`

Line 24. Same concern as #4 — the `toBuilder().build()` round-trip is used only to pass a
"fresh copy" to `DeleteShardsInfos`. Since `DatastoreOptions` is already immutable and
`DatastoreOptions` is serializable, just pass it directly:

```java
// BEFORE (line 24):
return new DeleteShardsInfos(datastoreOptions.toBuilder().build(), getJobId(), start, end);

// AFTER:
return new DeleteShardsInfos(datastoreOptions, getJobId(), start, end);
```

---

## Suggested Helper (Optional)

To avoid repeating the copy pattern, consider adding a package-private utility:

```java
// e.g., in RequestUtils or a new DatastoreUtils class
static DatastoreOptions.Builder copyOf(DatastoreOptions source) {
DatastoreOptions.Builder b = DatastoreOptions.newBuilder()
.setProjectId(source.getProjectId())
.setCredentials(source.getCredentials())
.setTransportOptions(source.getTransportOptions());
Optional.ofNullable(source.getHost()).ifPresent(b::setHost);
Optional.ofNullable(source.getNamespace()).ifPresent(b::setNamespace);
Optional.ofNullable(source.getDatabaseId()).ifPresent(b::setDatabaseId);
return b;
}
```

Then each call site becomes:
```java
DatastoreOptions.Builder builder = copyOf(DatastoreOptions.getDefaultInstance());
```

---

## Context

This fix is required when consuming this library from a project using
**Google Cloud Java BOM ≥ 26.83.0** (which brings in `google-cloud-datastore` ≥ 2.40.0).
The library's own BOM (26.53.0 at time of writing) is not affected, but the host project's
BOM takes precedence at runtime.
Loading
Loading