diff --git a/pipeline/pom.xml b/pipeline/pom.xml
index cab0e1fa..4ca1da2f 100644
--- a/pipeline/pom.xml
+++ b/pipeline/pom.xml
@@ -22,6 +22,7 @@
differ
ingestion
spanner
+ timeseries-backfill
util
diff --git a/pipeline/timeseries-backfill/README.md b/pipeline/timeseries-backfill/README.md
new file mode 100644
index 00000000..a6808afd
--- /dev/null
+++ b/pipeline/timeseries-backfill/README.md
@@ -0,0 +1,280 @@
+# Timeseries Backfill
+
+This module backfills the normalized timeseries tables from the legacy `Observation` table in Spanner.
+
+It supports two execution paths:
+- **Spanner to Spanner**: Reads directly from the live `Observation` table in Spanner. This approach is direct and doesn't require an intermediate export, but it queries the live database which might impact performance or require stable snapshots for consistency. **Note:** Complete migration using this path failed as queries timed out in Spanner after 2 hours. It is recommended only for targeted small batches (e.g., scoped by variable measured + geo combination).
+- **Avro to Spanner**: Reads from exported `Observation` Avro files on GCS. This approach is decoupled from the live database for reading, making it suitable for large-scale backfills or when reading from a specific historical export. **Note:** For large backfills, this path is recommended. However, filtering in Avro reads all files in the selected directory/list. It supports 2 modes: all files in a directory or a single file. **Crucially, the Spanner table backup needs to be taken separately for this path to work (see One-Time Prep).**
+
+By default it writes:
+
+- `TimeSeries_`
+- `TimeSeriesAttribute_`
+- `StatVarObservation_`
+
+It does not populate `ObservationAttribute` in v1.
+
+## Source And Destination Shape
+
+Source table:
+
+- `Observation` from [import/pipeline/spanner/src/main/resources/spanner_schema.sql](../spanner/src/main/resources/spanner_schema.sql)
+
+Destination schema:
+
+- [timeseries_schema.sql](/timeseries_schema.sql)
+
+Important assumptions used by this module:
+
+- `TimeSeries.id` reuses the existing `dc/os/__` series id shape.
+- `TimeSeries.provenance` is derived from `import_name` as `dc/base/`.
+- `TimeSeriesAttribute` stores the current normalized-read fields:
+ `observationAbout`, `facetId`, `importName`, `provenanceUrl`, `observationPeriod`,
+ `measurementMethod`, `unit`, `scalingFactor`, `isDcAggregate`.
+- The backfill should run against a fixed source snapshot via `--readTimestamp` when correctness matters.
+
+## Entry Points
+
+- `org.datacommons.ingestion.timeseries.TimeseriesBackfillPipeline`
+ Beam/Dataflow batch job for the full backfill.
+- `org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline`
+ Beam/Dataflow batch job that reads exported `Observation` Avro files and writes the same destination tables.
+- `org.datacommons.ingestion.timeseries.TimeseriesBackfillValidator`
+ Small direct runner for bounded validation on one shard or range.
+
+The Spanner and Avro Beam entrypoints reuse the same downstream mutation and Spanner write path.
+
+## Flag Reference
+
+Core flags:
+
+- `--project`: GCP project that owns the Spanner instance and database. Prefer this for all runs, especially `DataflowRunner`.
+- `--projectId`: Legacy fallback for the same GCP project value. Keep only for compatibility with the older import-pipeline style.
+- `--spannerInstanceId`: Spanner instance to read from and write to.
+- `--spannerDatabaseId`: Spanner database containing both the source `Observation` table and the destination normalized tables.
+- `--sourceObservationTableName`: Source table name. Normally `Observation`.
+- `--inputExportDir`: Avro-only. Spanner export directory containing `Observation-manifest.json`.
+- `--inputFiles`: Avro-only. Comma-separated exact Avro file paths.
+- `--destinationTimeSeriesTableName`: Destination parent series table. Normally `TimeSeries_`.
+- `--destinationTimeSeriesAttributeTableName`: Destination series-attribute table. Normally `TimeSeriesAttribute_`.
+- `--destinationStatVarObservationTableName`: Destination point table. Normally `StatVarObservation_`.
+- `--readTimestamp`: Source read snapshot in RFC3339 format, for example `2026-04-22T00:00:00Z`. When set, the job reads from that exact Spanner snapshot for consistent backfill semantics. When empty, the code uses a Spanner strong read instead.
+- `--variableMeasured`: Fixed `variable_measured` filter. Use one stat var or a comma-separated list such as `Count_Person,Min_Temperature`.
+- `--startObservationAbout`: Inclusive lower bound on `observation_about`. Useful for sharding by place range.
+- `--endObservationAboutExclusive`: Exclusive upper bound on `observation_about`. Use together with `--startObservationAbout` to define one shard.
+- `--spannerEmulatorHost`: Optional emulator host such as `localhost:9010`. Leave empty for real Cloud Spanner.
+
+For the Avro entrypoint, exactly one of `--inputExportDir` or `--inputFiles` is required.
+
+If `--startObservationAbout` or `--endObservationAboutExclusive` is set, `--variableMeasured` is required. It can still be a comma-separated list.
+
+Beam pipeline row-limit flags:
+
+- `--maxSeriesRows`: Currently unsupported for Beam/Dataflow because partitioned Spanner queries cannot use the generated bounded SQL shape. Leave unset for Beam runs.
+- `--maxPointRows`: Currently unsupported for Beam/Dataflow because partitioned Spanner queries cannot use the generated bounded SQL shape. Leave unset for Beam runs.
+
+Local progress flags:
+
+- `--progressEverySourceRows`: Local progress log interval for validator and `DirectRunner`. Default `1000`. Non-positive disables row-progress logs.
+- `--heartbeatSeconds`: Local heartbeat log interval for validator and `DirectRunner`. Default `30`. Non-positive disables heartbeat logs.
+
+Optional Beam/Dataflow Spanner sink flags:
+
+- `--batchSizeBytes`: Optional `SpannerIO.Write` batch size in bytes. When unset, Beam uses its default.
+- `--maxNumRows`: Optional `SpannerIO.Write` max rows per batch. When unset, Beam uses its default.
+- `--maxNumMutations`: Optional `SpannerIO.Write` max mutations per batch. When unset, Beam uses its default.
+- `--groupingFactor`: Optional `SpannerIO.Write` grouping factor. When unset, Beam uses its default.
+- `--commitDeadlineSeconds`: Optional `SpannerIO.Write` commit deadline in seconds. When unset, Beam uses its default.
+
+Validator-only flags:
+
+- `--validatorMaxSeriesRows`: Cap on source series rows for `TimeseriesBackfillValidator`. Non-positive means no limit.
+
+Beam/Dataflow runner flags used in the examples:
+
+- `--runner`: Beam runner to use. `DirectRunner` runs locally. `DataflowRunner` submits the Beam job to Dataflow.
+- `--region`: Dataflow region. Only needed with `DataflowRunner`.
+- `--tempLocation`: GCS path used by Dataflow for temporary files.
+- `--stagingLocation`: GCS path used by Dataflow for staged job artifacts.
+- `--numWorkers`: Initial Dataflow worker count.
+- `--maxNumWorkers`: Maximum Dataflow worker count when autoscaling is enabled.
+- `--workerMachineType`: Worker VM shape, for example `n2-custom-4-32768`.
+- `--numberOfWorkerHarnessThreads`: Number of SDK harness threads per worker, for example `2`.
+
+## Flag Guidance
+
+- Always set `--project`, `--spannerInstanceId`, and `--spannerDatabaseId`.
+- Set `--readTimestamp` for any real backfill where you want a stable source snapshot.
+- Use `TimeseriesBackfillPipeline` when the source is live Spanner and `TimeseriesBackfillAvroPipeline` when the source is a Spanner Avro export.
+- Use `--variableMeasured` for every local or early Dataflow run. It accepts one stat var or a comma-separated list.
+- Add `--startObservationAbout` and `--endObservationAboutExclusive` only when you want to shard one stat-var slice by place range. If either range flag is set, `--variableMeasured` is required.
+- For the Avro entrypoint, use `--inputExportDir` for a full exported snapshot and `--inputFiles` for targeted reruns or debugging.
+- For the Avro entrypoint, `--inputExportDir` reads only `Observation-manifest.json` and only the `Observation.avro-*` files listed there. Other exported table files in the same export directory are ignored.
+- `--inputExportDir` always resolves the full `Observation` file list from the manifest. It is not a partial-file selector.
+- Use `--inputFiles` when you want to process only a few exact Avro files for a targeted rerun or debugging.
+- Avro local runs are supported only through `TimeseriesBackfillAvroPipeline` with `DirectRunner`. There is no separate standalone Avro validator.
+- The Avro entrypoint writes directly to Spanner. It does not create new intermediate output files.
+- `--variableMeasured`, `--startObservationAbout`, and `--endObservationAboutExclusive` are row-level filters after Avro rows are read. They narrow which rows are written, but they do not prune the Avro file list chosen from `--inputExportDir`.
+- Use `TimeseriesBackfillValidator` first when you want a small bounded write. `--validatorMaxSeriesRows` is the only bounded-row flag that is supported today.
+- Do not use `--maxSeriesRows` or `--maxPointRows` with Beam/Dataflow. They are intentionally blocked for partitioned Spanner reads.
+- For local `DirectRunner`, tune `--progressEverySourceRows` and `--heartbeatSeconds` if you want more visible liveness logs.
+- For Dataflow memory experiments, test worker settings separately from backfill query settings. The main ones are `--workerMachineType` and `--numberOfWorkerHarnessThreads`.
+- Use the optional Spanner sink flags only when you want to override Beam defaults for the Beam/Dataflow write path.
+- The standalone validator accepts those sink flags because it shares the options interface, but its direct `DatabaseClient.writeAtLeastOnce(...)` path does not use them.
+
+## Spanner Sink Tuning Order
+
+Suggested order to test if the sink is too memory-heavy or too serialized:
+
+- `groupingFactor`
+- `maxNumRows`
+- `maxNumMutations`
+- `batchSizeBytes`
+- `commitDeadlineSeconds`
+
+## One-Time Prep
+
+### Exporting Observation Table (for Avro path)
+If you plan to use the Avro execution path, you must first export the `Observation` table from Spanner to GCS as Avro files. Run this command to start the Dataflow export job:
+
+```bash
+gcloud dataflow jobs run observation-export-$(date +%Y%m%d-%H%M%S) \
+ --project=datcom-store \
+ --region=us-central1 \
+ --gcs-location=gs://dataflow-templates-us-central1/latest/Cloud_Spanner_to_GCS_Avro \
+ --max-workers=50 \
+ --parameters=instanceId=dc-kg-test,databaseId=dc_graph_2026_01_27,spannerProjectId=datcom-store,tableNames=Observation,outputDir=gs:///spanner_obs_dump_2026_04_21,dataBoostEnabled=true,spannerPriority=LOW
+```
+
+If you see a missing artifact error for `org.datacommons:datacommons-import-util`, install that top-level module once from the `import/pipeline` directory:
+
+```bash
+mvn -f ../pom.xml -pl util -am install -Pgit-worktree -DskipTests
+```
+
+## Build
+
+From the `import/pipeline` directory:
+
+```bash
+mvn -pl timeseries-backfill -am package -Pgit-worktree -DskipTests
+```
+
+## Validate Locally
+
+Run the standalone validator from the `import/pipeline` directory:
+
+```bash
+mvn -Pgit-worktree compile exec:java \
+ -pl timeseries-backfill -am \
+ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillValidator \
+ -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --sourceObservationTableName=Observation --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --startObservationAbout=geoId/06 --endObservationAboutExclusive=geoId/07 --validatorMaxSeriesRows=1000"
+```
+
+The validator uses the same single-read row mapping logic as the Beam pipeline and writes bounded batches directly to Spanner.
+
+During local validation, the default progress logs print:
+
+- a heartbeat every `30` seconds
+- a row-progress log every `1000` source rows
+- a validator flush log on each write batch
+
+Use the validator first when you want a small bounded write without running Beam.
+
+## Run With DirectRunner
+
+```bash
+mvn -Pgit-worktree compile exec:java \
+ -pl timeseries-backfill -am \
+ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillPipeline \
+ -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --sourceObservationTableName=Observation --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --startObservationAbout=geoId/06 --endObservationAboutExclusive=geoId/07 --runner=DirectRunner"
+```
+
+`DirectRunner` still runs the Beam job locally. For a small bounded run, use the validator instead of Beam row-cap flags, and pair any observation_about range with `--variableMeasured`.
+
+The Beam/Dataflow path reads the raw `observations` proto value from `Observation` and expands it inside Beam. The validator uses that same source-read and expansion path.
+
+For local `DirectRunner`, the default progress logs print:
+
+- a heartbeat every `30` seconds
+- a row-progress log every `1000` source rows
+
+Set `--progressEverySourceRows` or `--heartbeatSeconds` if you want a different local signal. These flags are ignored for `DataflowRunner`.
+
+## Run Avro Export With DirectRunner
+
+```bash
+mvn -Pgit-worktree compile exec:java \
+ -pl timeseries-backfill -am \
+ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \
+ -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputFiles=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587/Observation.avro-00005-of-00303 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DirectRunner"
+
+
+ mvn -Pgit-worktree compile exec:java \
+ -pl timeseries-backfill -am \
+ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \
+ -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputFiles=/import/pipeline/Observation.avro-00042-of-00303 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DirectRunner"
+
+```
+
+The Avro entrypoint reads exported `Observation` Avro rows, recomputes `provenance` from `import_name`, parses `observations` from the serialized proto payload, and then reuses the same normalized write path as the Spanner entrypoint.
+
+For Avro input selection:
+
+- `--inputExportDir` should point to the exact export subdirectory that contains `Observation-manifest.json`.
+- `--inputExportDir` processes all `Observation.avro-*` files listed in that manifest.
+- Use `--inputFiles` instead if you want to process only a few exact Avro files.
+- `--variableMeasured` and the observation-about range flags still apply, but only after the selected Avro files are opened and rows are read.
+
+## Run On Dataflow
+
+```bash
+mvn -Pgit-worktree compile exec:java \
+ -pl timeseries-backfill -am \
+ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillPipeline \
+ -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --sourceObservationTableName=Observation --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --startObservationAbout=geoId/06 --endObservationAboutExclusive=geoId/07 --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2"
+```
+
+Use `--variableMeasured` together with `--startObservationAbout` and `--endObservationAboutExclusive` to shard Beam/Dataflow runs safely.
+
+Do not force the `ObservationAboutVariableMeasured` index for the Beam/Dataflow queries in their current form. The backfill selects columns that are not stored in that index, so forcing it can introduce a back join and make the query non-root-partitionable.
+
+## Run Avro Export On Dataflow
+
+```bash
+mvn -Pgit-worktree compile exec:java \
+ -pl timeseries-backfill -am \
+ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \
+ -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/ --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2"
+
+ mvn -Pgit-worktree compile exec:java \
+ -pl timeseries-backfill -am \
+ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \
+ -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputFiles=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587/Observation.avro-00042-of-00303 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2"
+
+ mvn -Pgit-worktree compile exec:java \
+ -pl timeseries-backfill -am \
+ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \
+ -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2"
+
+ mvn -Pgit-worktree compile exec:java \
+ -pl timeseries-backfill -am \
+ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \
+ -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2
+
+```
+
+## Recreate Destination Tables
+
+Run this from the `import` repo root to drop any existing experimental normalized `_rk` tables and recreate them from the checked-in schema:
+
+```bash
+./pipeline/timeseries-backfill/recreate_timeseries_tables.sh datcom-store dc-kg-test dc_graph_5
+```
+
+The script reads the current database DDL, drops the normalized `_rk` tables and indexes if they already exist, and then reapplies a suffixed form of [timeseries_schema.sql](/timeseries_schema.sql).
+
+## Notes
+
+- For the Avro export path, `Observation.provenance` should not be relied on as an exported populated value because it is a stored generated column in the deployed schema. This module recomputes it from `import_name`.
+- The default experimental destination schema should include `TimeSeriesAttributePropertyValue_rk` and `TimeSeriesAttributeValue_rk` before using the normalized Mixer query path.
diff --git a/pipeline/timeseries-backfill/pom.xml b/pipeline/timeseries-backfill/pom.xml
new file mode 100644
index 00000000..5c3f1b9d
--- /dev/null
+++ b/pipeline/timeseries-backfill/pom.xml
@@ -0,0 +1,114 @@
+
+
+ 4.0.0
+
+
+ org.datacommons
+ pipeline
+ ${revision}
+
+
+ org.datacommons
+ timeseries-backfill
+ ${revision}
+ Data Commons - Timeseries Backfill
+
+
+
+ spanner
+ org.datacommons
+ ${revision}
+
+
+ data
+ org.datacommons
+ ${revision}
+
+
+ org.apache.beam
+ beam-sdks-java-core
+
+
+ org.apache.beam
+ beam-sdks-java-io-google-cloud-platform
+
+
+ org.apache.beam
+ beam-runners-google-cloud-dataflow-java
+
+
+ org.apache.beam
+ beam-runners-direct-java
+
+
+ org.apache.beam
+ beam-sdks-java-extensions-avro
+
+
+ com.google.cloud
+ google-cloud-spanner
+
+
+ com.google.code.gson
+ gson
+
+
+ org.slf4j
+ slf4j-jdk14
+ ${slf4j.version}
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+
+ false
+ false
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.5.3
+
+
+ package
+
+ shade
+
+
+ ${project.artifactId}-bundled-${project.version}
+
+
+ org.datacommons.ingestion.timeseries.TimeseriesBackfillPipeline
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/LocalProgressTracker.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/LocalProgressTracker.java
new file mode 100644
index 00000000..d717b498
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/LocalProgressTracker.java
@@ -0,0 +1,146 @@
+package org.datacommons.ingestion.timeseries;
+
+import com.google.cloud.Timestamp;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import org.slf4j.Logger;
+
+/** Emits local progress logs for validator and DirectRunner runs. */
+final class LocalProgressTracker implements AutoCloseable {
+ interface LogSink {
+ void info(String message);
+ }
+
+ private final String runnerName;
+ private final int progressEverySourceRows;
+ private final int heartbeatSeconds;
+ private final LogSink logSink;
+ private final LongSupplier currentTimeMillis;
+ private final ScheduledExecutorService heartbeatExecutor;
+ private final AtomicLong sourceRows;
+ private final AtomicLong timeSeriesRows;
+ private final AtomicLong timeSeriesAttributeRows;
+ private final AtomicLong statVarObservationRows;
+ private final AtomicLong lastRowProgressMillis;
+ private final AtomicBoolean closed;
+
+ LocalProgressTracker(
+ String runnerName, int progressEverySourceRows, int heartbeatSeconds, Logger logger) {
+ this(
+ runnerName,
+ progressEverySourceRows,
+ heartbeatSeconds,
+ logger::info,
+ System::currentTimeMillis,
+ createHeartbeatExecutor(runnerName, heartbeatSeconds));
+ }
+
+ LocalProgressTracker(
+ String runnerName,
+ int progressEverySourceRows,
+ int heartbeatSeconds,
+ LogSink logSink,
+ LongSupplier currentTimeMillis,
+ ScheduledExecutorService heartbeatExecutor) {
+ this.runnerName = runnerName;
+ this.progressEverySourceRows = progressEverySourceRows;
+ this.heartbeatSeconds = heartbeatSeconds;
+ this.logSink = logSink;
+ this.currentTimeMillis = currentTimeMillis;
+ this.heartbeatExecutor = heartbeatExecutor;
+ this.sourceRows = new AtomicLong();
+ this.timeSeriesRows = new AtomicLong();
+ this.timeSeriesAttributeRows = new AtomicLong();
+ this.statVarObservationRows = new AtomicLong();
+ this.lastRowProgressMillis = new AtomicLong(currentTimeMillis.getAsLong());
+ this.closed = new AtomicBoolean();
+
+ if (heartbeatExecutor != null) {
+ heartbeatExecutor.scheduleAtFixedRate(
+ this::logHeartbeatNow, heartbeatSeconds, heartbeatSeconds, TimeUnit.SECONDS);
+ }
+ }
+
+ boolean isEnabled() {
+ return progressEverySourceRows > 0 || heartbeatSeconds > 0;
+ }
+
+ void recordRow(int timeSeriesAttributeRowsDelta, int statVarObservationRowsDelta) {
+ long sourceRowCount = sourceRows.incrementAndGet();
+ timeSeriesRows.incrementAndGet();
+ timeSeriesAttributeRows.addAndGet(timeSeriesAttributeRowsDelta);
+ statVarObservationRows.addAndGet(statVarObservationRowsDelta);
+ lastRowProgressMillis.set(currentTimeMillis.getAsLong());
+
+ if (progressEverySourceRows > 0 && sourceRowCount % progressEverySourceRows == 0) {
+ logSink.info(buildProgressMessage("progress"));
+ }
+ }
+
+ void recordValidatorFlush(int mutationCount, Timestamp writeTimestamp) {
+ logSink.info(
+ runnerName + " flush: mutations=" + mutationCount + ", write_timestamp=" + writeTimestamp);
+ }
+
+ void logHeartbeatNow() {
+ if (heartbeatSeconds <= 0 || closed.get()) {
+ return;
+ }
+ logSink.info(buildProgressMessage("heartbeat"));
+ }
+
+ @Override
+ public void close() {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+ if (heartbeatExecutor != null) {
+ heartbeatExecutor.shutdownNow();
+ }
+ }
+
+ private String buildProgressMessage(String kind) {
+ long sourceRowCount = sourceRows.get();
+ long idleSeconds =
+ TimeUnit.MILLISECONDS.toSeconds(
+ currentTimeMillis.getAsLong() - lastRowProgressMillis.get());
+ StringBuilder message =
+ new StringBuilder()
+ .append(runnerName)
+ .append(' ')
+ .append(kind)
+ .append(": source_rows=")
+ .append(sourceRowCount)
+ .append(", timeseries_rows=")
+ .append(timeSeriesRows.get())
+ .append(", timeseries_attribute_rows=")
+ .append(timeSeriesAttributeRows.get())
+ .append(", stat_var_observation_rows=")
+ .append(statVarObservationRows.get())
+ .append(", seconds_since_last_row=")
+ .append(idleSeconds);
+ if (sourceRowCount == 0) {
+ message.append(", no_source_rows_yet=true");
+ }
+ return message.toString();
+ }
+
+ private static ScheduledExecutorService createHeartbeatExecutor(
+ String runnerName, int heartbeatSeconds) {
+ if (heartbeatSeconds <= 0) {
+ return null;
+ }
+ ThreadFactory threadFactory =
+ runnable -> {
+ Thread thread = new Thread(runnable, runnerName + "-progress-heartbeat");
+ thread.setDaemon(true);
+ return thread;
+ };
+ return Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/ObservationExportFiles.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/ObservationExportFiles.java
new file mode 100644
index 00000000..02241f0a
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/ObservationExportFiles.java
@@ -0,0 +1,128 @@
+package org.datacommons.ingestion.timeseries;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+
+/** Resolves Avro export files from either an export directory or an explicit file list. */
+final class ObservationExportFiles {
+ private ObservationExportFiles() {}
+
+ static void validateOptions(TimeseriesBackfillOptions options) {
+ boolean hasInputExportDir = !options.getInputExportDir().isEmpty();
+ boolean hasInputFiles = !options.getInputFiles().isEmpty();
+ if (hasInputExportDir == hasInputFiles) {
+ throw new IllegalArgumentException(
+ "Exactly one of inputExportDir or inputFiles must be provided for the Avro pipeline.");
+ }
+ }
+
+ static List resolveInputFiles(TimeseriesBackfillOptions options) {
+ if (!options.getInputFiles().isEmpty()) {
+ return parseCsv(options.getInputFiles());
+ }
+ return readManifest(
+ options, options.getInputExportDir(), options.getSourceObservationTableName());
+ }
+
+ private static List readManifest(
+ TimeseriesBackfillOptions options, String exportDir, String sourceTableName) {
+ FileSystems.setDefaultPipelineOptions(options);
+ String normalizedExportDir = trimTrailingSlash(exportDir);
+ String manifestPath = normalizedExportDir + "/" + sourceTableName + "-manifest.json";
+ ResourceId manifestResource;
+ try {
+ manifestResource = FileSystems.matchSingleFileSpec(manifestPath).resourceId();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find export manifest at " + manifestPath, e);
+ }
+
+ try (Reader reader =
+ Channels.newReader(FileSystems.open(manifestResource), StandardCharsets.UTF_8.name())) {
+ JsonElement root = JsonParser.parseReader(reader);
+ List files = parseManifest(root, normalizedExportDir, sourceTableName);
+ if (!files.isEmpty()) {
+ return files;
+ }
+ throw new IllegalArgumentException(
+ "No Avro files for " + sourceTableName + " were found in " + manifestPath);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read export manifest at " + manifestPath, e);
+ }
+ }
+
+ static List parseManifest(JsonElement root, String exportDir, String sourceTableName) {
+ Pattern filePattern =
+ Pattern.compile("(^|.*/)" + Pattern.quote(sourceTableName) + "\\.avro-\\d{5}-of-\\d{5}$");
+ Set files = new LinkedHashSet<>();
+ collectFiles(root, exportDir, filePattern, files);
+ return new ArrayList<>(files);
+ }
+
+ private static void collectFiles(
+ JsonElement element, String exportDir, Pattern filePattern, Set files) {
+ if (element == null || element.isJsonNull()) {
+ return;
+ }
+ if (element.isJsonPrimitive() && element.getAsJsonPrimitive().isString()) {
+ String value = element.getAsString();
+ if (filePattern.matcher(value).find()) {
+ files.add(toAbsolutePath(exportDir, value));
+ }
+ return;
+ }
+ if (element.isJsonArray()) {
+ for (JsonElement child : element.getAsJsonArray()) {
+ collectFiles(child, exportDir, filePattern, files);
+ }
+ return;
+ }
+ JsonObject object = element.getAsJsonObject();
+ for (String key : object.keySet()) {
+ collectFiles(object.get(key), exportDir, filePattern, files);
+ }
+ }
+
+ private static List parseCsv(String csv) {
+ List fileSpecs = new ArrayList<>();
+ for (String part : csv.split(",")) {
+ String trimmed = part.trim();
+ if (!trimmed.isEmpty()) {
+ fileSpecs.add(trimmed);
+ }
+ }
+ if (!fileSpecs.isEmpty()) {
+ return fileSpecs;
+ }
+ throw new IllegalArgumentException("inputFiles must contain at least one Avro file path.");
+ }
+
+ private static String toAbsolutePath(String exportDir, String value) {
+ if (value.contains("://") || value.startsWith("/")) {
+ return value;
+ }
+ String relativePath = value.startsWith("./") ? value.substring(2) : value;
+ while (relativePath.startsWith("/")) {
+ relativePath = relativePath.substring(1);
+ }
+ return exportDir + "/" + relativePath;
+ }
+
+ private static String trimTrailingSlash(String value) {
+ if (value.endsWith("/")) {
+ return value.substring(0, value.length() - 1);
+ }
+ return value;
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SeriesIdGenerator.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SeriesIdGenerator.java
new file mode 100644
index 00000000..3bc059c4
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SeriesIdGenerator.java
@@ -0,0 +1,21 @@
+package org.datacommons.ingestion.timeseries;
+
+/** Builds the existing dc/os series identifier used as the normalized series id. */
+final class SeriesIdGenerator {
+ private static final String PREFIX = "dc/os/";
+
+ private SeriesIdGenerator() {}
+
+ static String build(String variableMeasured, String observationAbout, String facetId) {
+ return PREFIX
+ + sanitize(variableMeasured)
+ + "_"
+ + sanitize(observationAbout)
+ + "_"
+ + sanitize(facetId);
+ }
+
+ private static String sanitize(String value) {
+ return value == null ? "" : value.replace('/', '_');
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java
new file mode 100644
index 00000000..82bd43b1
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java
@@ -0,0 +1,194 @@
+package org.datacommons.ingestion.timeseries;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.spanner.Struct;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.datacommons.Storage.Observations;
+
+/** Row shapes used by the timeseries backfill. */
+final class SourceObservationRows {
+ private SourceObservationRows() {}
+
+ static SourceObservationRow toObservationRow(Struct row) {
+ return buildObservationRow(
+ row.getString("observation_about"),
+ row.getString("variable_measured"),
+ row.getString("facet_id"),
+ getNullableString(row, "observation_period"),
+ getNullableString(row, "measurement_method"),
+ getNullableString(row, "unit"),
+ getNullableString(row, "scaling_factor"),
+ getNullableString(row, "import_name"),
+ getNullableString(row, "provenance_url"),
+ !row.isNull("is_dc_aggregate") && row.getBoolean("is_dc_aggregate"),
+ parseObservations(row));
+ }
+
+ static SourceObservationRow toObservationRow(GenericRecord row) {
+ return buildObservationRow(
+ getNullableString(row, "observation_about"),
+ getNullableString(row, "variable_measured"),
+ getNullableString(row, "facet_id"),
+ getNullableString(row, "observation_period"),
+ getNullableString(row, "measurement_method"),
+ getNullableString(row, "unit"),
+ getNullableString(row, "scaling_factor"),
+ getNullableString(row, "import_name"),
+ getNullableString(row, "provenance_url"),
+ getNullableBoolean(row, "is_dc_aggregate"),
+ parseObservations(row));
+ }
+
+ static CompactSourceObservationRow toCompactObservationRow(GenericRecord row) {
+ return new CompactSourceObservationRow(
+ new SourceSeriesRow(
+ getNullableString(row, "observation_about"),
+ getNullableString(row, "variable_measured"),
+ getNullableString(row, "facet_id"),
+ getNullableString(row, "observation_period"),
+ getNullableString(row, "measurement_method"),
+ getNullableString(row, "unit"),
+ getNullableString(row, "scaling_factor"),
+ getNullableString(row, "import_name"),
+ getNullableString(row, "provenance_url"),
+ getNullableBoolean(row, "is_dc_aggregate"),
+ deriveProvenance(getNullableString(row, "import_name"))),
+ getObservationBytes(row));
+ }
+
+ private static SourceObservationRow buildObservationRow(
+ String observationAbout,
+ String variableMeasured,
+ String facetId,
+ String observationPeriod,
+ String measurementMethod,
+ String unit,
+ String scalingFactor,
+ String importName,
+ String provenanceUrl,
+ boolean isDcAggregate,
+ Observations observations) {
+ SourceSeriesRow seriesRow =
+ new SourceSeriesRow(
+ observationAbout,
+ variableMeasured,
+ facetId,
+ observationPeriod,
+ measurementMethod,
+ unit,
+ scalingFactor,
+ importName,
+ provenanceUrl,
+ isDcAggregate,
+ deriveProvenance(importName));
+ List pointRows = new ArrayList<>();
+ for (Map.Entry entry : observations.getValuesMap().entrySet()) {
+ pointRows.add(
+ new SourcePointRow(
+ observationAbout, variableMeasured, facetId, entry.getKey(), entry.getValue()));
+ }
+ return new SourceObservationRow(seriesRow, pointRows);
+ }
+
+ private static String getNullableString(Struct row, String columnName) {
+ return row.isNull(columnName) ? "" : row.getString(columnName);
+ }
+
+ private static String getNullableString(GenericRecord row, String fieldName) {
+ Object value = getField(row, fieldName);
+ return value == null ? "" : value.toString();
+ }
+
+ private static boolean getNullableBoolean(GenericRecord row, String fieldName) {
+ Object value = getField(row, fieldName);
+ return value instanceof Boolean && (Boolean) value;
+ }
+
+ private static Observations parseObservations(Struct row) {
+ if (row.isNull("observations")) {
+ return Observations.getDefaultInstance();
+ }
+ ByteArray protoBytes = row.getBytes("observations");
+ return parseObservations(protoBytes.toByteArray());
+ }
+
+ private static Observations parseObservations(GenericRecord row) {
+ return parseObservations(getObservationBytes(row));
+ }
+
+ static Observations parseObservations(byte[] protoBytes) {
+ if (protoBytes == null || protoBytes.length == 0) {
+ return Observations.getDefaultInstance();
+ }
+ try {
+ return Observations.parseFrom(protoBytes);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse observations proto", e);
+ }
+ }
+
+ private static byte[] getObservationBytes(GenericRecord row) {
+ Object value = getField(row, "observations");
+ if (value == null) {
+ return new byte[0];
+ }
+ return toByteArray(value);
+ }
+
+ private static byte[] toByteArray(Object value) {
+ if (value instanceof ByteBuffer byteBuffer) {
+ ByteBuffer duplicate = byteBuffer.duplicate();
+ byte[] bytes = new byte[duplicate.remaining()];
+ duplicate.get(bytes);
+ return bytes;
+ }
+ if (value instanceof byte[] bytes) {
+ return bytes;
+ }
+ if (value instanceof GenericData.Fixed fixed) {
+ return fixed.bytes();
+ }
+ throw new IllegalArgumentException("Unsupported observations Avro type: " + value.getClass());
+ }
+
+ private static Object getField(GenericRecord row, String fieldName) {
+ if (row.getSchema().getField(fieldName) == null) {
+ return null;
+ }
+ return row.get(fieldName);
+ }
+
+ private static String deriveProvenance(String importName) {
+ return importName == null || importName.isEmpty() ? "" : "dc/base/" + importName;
+ }
+}
+
+record SourceObservationRow(SourceSeriesRow seriesRow, List pointRows)
+ implements Serializable {}
+
+record CompactSourceObservationRow(SourceSeriesRow seriesRow, byte[] observationsProtoBytes)
+ implements Serializable {}
+
+record SourceSeriesRow(
+ String observationAbout,
+ String variableMeasured,
+ String facetId,
+ String observationPeriod,
+ String measurementMethod,
+ String unit,
+ String scalingFactor,
+ String importName,
+ String provenanceUrl,
+ boolean isDcAggregate,
+ String provenance)
+ implements Serializable {}
+
+record SourcePointRow(
+ String observationAbout, String variableMeasured, String facetId, String date, String value)
+ implements Serializable {}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java
new file mode 100644
index 00000000..a59ef364
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java
@@ -0,0 +1,176 @@
+package org.datacommons.ingestion.timeseries;
+
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.extensions.avro.io.AvroIO;
+import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Beam/Dataflow pipeline that backfills the normalized timeseries tables from Avro exports. */
+public class TimeseriesBackfillAvroPipeline {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TimeseriesBackfillAvroPipeline.class);
+
+ public static void main(String[] args) {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(TimeseriesBackfillOptions.class);
+ validateOptions(options);
+
+ List inputFiles = ObservationExportFiles.resolveInputFiles(options);
+ LOGGER.info(
+ "Starting timeseries Avro backfill with runner {} over {} Avro files",
+ TimeseriesBackfillPipeline.runnerName(options),
+ inputFiles.size());
+
+ try (LocalProgressTracker progressTracker =
+ TimeseriesBackfillPipeline.createLocalProgressTracker(options)) {
+ TimeseriesBackfillPipeline.setLocalProgressTracker(progressTracker);
+ Pipeline pipeline = Pipeline.create(options);
+ buildPipeline(pipeline, options, inputFiles);
+ PipelineResult result = pipeline.run();
+ LOGGER.info("Timeseries Avro backfill returned runner {}", result.getClass().getSimpleName());
+ } finally {
+ TimeseriesBackfillPipeline.clearLocalProgressTracker();
+ }
+ }
+
+ static void validateOptions(TimeseriesBackfillOptions options) {
+ TimeseriesBackfillPipeline.validateOptions(options);
+ ObservationExportFiles.validateOptions(options);
+ }
+
+ static void buildPipeline(
+ Pipeline pipeline, TimeseriesBackfillOptions options, List inputFiles) {
+ PCollection sourceRows =
+ pipeline
+ .apply("CreateAvroFileSpecs", Create.of(inputFiles))
+ .apply(
+ "ReadAvroSourceRows",
+ AvroIO.parseAllGenericRecords(SourceObservationRows::toCompactObservationRow)
+ .withCoder(SerializableCoder.of(CompactSourceObservationRow.class)))
+ .apply(
+ "FilterSourceRows",
+ ParDo.of(
+ new FilterCompactSourceObservationRowsFn(
+ options.getStartObservationAbout(),
+ options.getEndObservationAboutExclusive(),
+ VariableMeasuredFilters.parse(options.getVariableMeasured()))));
+ sourceRows.setCoder(SerializableCoder.of(CompactSourceObservationRow.class));
+
+ PCollection mutationGroups =
+ sourceRows.apply(
+ "MapMutationGroups",
+ ParDo.of(
+ new CompactRowsToMutationGroupsFn(
+ options.getDestinationTimeSeriesTableName(),
+ options.getDestinationTimeSeriesAttributeTableName(),
+ options.getDestinationStatVarObservationTableName())));
+ mutationGroups.apply("WriteMutationGroups", TimeseriesBackfillIO.buildGroupedWrite(options));
+ }
+
+ static boolean matchesFilters(
+ SourceSeriesRow seriesRow,
+ String startObservationAbout,
+ String endObservationAboutExclusive,
+ List variableMeasuredFilters) {
+ if (!startObservationAbout.isEmpty()
+ && seriesRow.observationAbout().compareTo(startObservationAbout) < 0) {
+ return false;
+ }
+ if (!endObservationAboutExclusive.isEmpty()
+ && seriesRow.observationAbout().compareTo(endObservationAboutExclusive) >= 0) {
+ return false;
+ }
+ return variableMeasuredFilters.isEmpty()
+ || variableMeasuredFilters.contains(seriesRow.variableMeasured());
+ }
+
+ static final class FilterCompactSourceObservationRowsFn
+ extends DoFn {
+ private final String startObservationAbout;
+ private final String endObservationAboutExclusive;
+ private final List variableMeasuredFilters;
+
+ FilterCompactSourceObservationRowsFn(
+ String startObservationAbout,
+ String endObservationAboutExclusive,
+ List variableMeasuredFilters) {
+ this.startObservationAbout = startObservationAbout;
+ this.endObservationAboutExclusive = endObservationAboutExclusive;
+ this.variableMeasuredFilters = variableMeasuredFilters;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element CompactSourceObservationRow sourceRow,
+ OutputReceiver out) {
+ if (matchesFilters(
+ sourceRow.seriesRow(),
+ startObservationAbout,
+ endObservationAboutExclusive,
+ variableMeasuredFilters)) {
+ out.output(sourceRow);
+ }
+ }
+ }
+
+ static final class CompactRowsToMutationGroupsFn
+ extends DoFn {
+ private static final Counter SOURCE_ROWS =
+ Metrics.counter(CompactRowsToMutationGroupsFn.class, "source_rows");
+ private static final Counter TIMESERIES_ROWS =
+ Metrics.counter(CompactRowsToMutationGroupsFn.class, "timeseries_rows_written");
+ private static final Counter TIMESERIES_ATTRIBUTE_ROWS =
+ Metrics.counter(CompactRowsToMutationGroupsFn.class, "timeseries_attribute_rows_written");
+ private static final Counter SOURCE_POINT_ROWS =
+ Metrics.counter(CompactRowsToMutationGroupsFn.class, "source_point_rows");
+ private static final Counter STAT_VAR_OBSERVATION_ROWS =
+ Metrics.counter(CompactRowsToMutationGroupsFn.class, "stat_var_observation_rows_written");
+ private final String timeSeriesTableName;
+ private final String timeSeriesAttributeTableName;
+ private final String statVarObservationTableName;
+
+ CompactRowsToMutationGroupsFn(
+ String timeSeriesTableName,
+ String timeSeriesAttributeTableName,
+ String statVarObservationTableName) {
+ this.timeSeriesTableName = timeSeriesTableName;
+ this.timeSeriesAttributeTableName = timeSeriesAttributeTableName;
+ this.statVarObservationTableName = statVarObservationTableName;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element CompactSourceObservationRow sourceRow, OutputReceiver out) {
+ BackfillMutationGroups mutationGroups =
+ TimeseriesMutationFactory.toMutationGroups(
+ sourceRow,
+ timeSeriesTableName,
+ timeSeriesAttributeTableName,
+ statVarObservationTableName);
+ SOURCE_ROWS.inc();
+ TIMESERIES_ROWS.inc();
+ TIMESERIES_ATTRIBUTE_ROWS.inc(mutationGroups.timeSeriesAttributeRows());
+ SOURCE_POINT_ROWS.inc(mutationGroups.statVarObservationRows());
+ STAT_VAR_OBSERVATION_ROWS.inc(mutationGroups.statVarObservationRows());
+ LocalProgressTracker progressTracker = TimeseriesBackfillPipeline.localProgressTracker();
+ if (progressTracker != null) {
+ progressTracker.recordRow(
+ mutationGroups.timeSeriesAttributeRows(), mutationGroups.statVarObservationRows());
+ }
+ for (MutationGroup mutationGroup : mutationGroups.groups()) {
+ out.output(mutationGroup);
+ }
+ }
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillIO.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillIO.java
new file mode 100644
index 00000000..d35d2106
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillIO.java
@@ -0,0 +1,103 @@
+package org.datacommons.ingestion.timeseries;
+
+import com.google.cloud.NoCredentials;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerOptions;
+import com.google.cloud.spanner.TimestampBound;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.joda.time.Duration;
+
+/** Shared Spanner IO configuration for the backfill job and validator. */
+final class TimeseriesBackfillIO {
+ // Previous sink tuning values kept here for reference only:
+ // private static final int SPANNER_BATCH_SIZE_BYTES = 500 * 1024;
+ // private static final int SPANNER_MAX_NUM_ROWS = 2000;
+ // private static final int SPANNER_MAX_NUM_MUTATIONS = 10000;
+ // private static final int SPANNER_GROUPING_FACTOR = 3000;
+ // private static final int SPANNER_COMMIT_DEADLINE_SECONDS = 120;
+
+ private TimeseriesBackfillIO() {}
+
+ static String resolveProjectId(TimeseriesBackfillOptions options) {
+ if (options.getProject() != null && !options.getProject().isEmpty()) {
+ return options.getProject();
+ }
+ return options.getProjectId();
+ }
+
+ static SpannerIO.CreateTransaction buildReadTransaction(TimeseriesBackfillOptions options) {
+ SpannerIO.CreateTransaction read =
+ SpannerIO.createTransaction()
+ .withProjectId(resolveProjectId(options))
+ .withInstanceId(options.getSpannerInstanceId())
+ .withDatabaseId(options.getSpannerDatabaseId())
+ .withTimestampBound(getTimestampBound(options));
+ if (!options.getSpannerEmulatorHost().isEmpty()) {
+ read = read.withEmulatorHost(options.getSpannerEmulatorHost());
+ }
+ return read;
+ }
+
+ static SpannerIO.Read buildRead(TimeseriesBackfillOptions options) {
+ SpannerIO.Read read =
+ SpannerIO.read()
+ .withProjectId(resolveProjectId(options))
+ .withInstanceId(options.getSpannerInstanceId())
+ .withDatabaseId(options.getSpannerDatabaseId());
+ if (!options.getSpannerEmulatorHost().isEmpty()) {
+ read = read.withEmulatorHost(options.getSpannerEmulatorHost());
+ }
+ return read;
+ }
+
+ static SpannerIO.Write buildWrite(TimeseriesBackfillOptions options) {
+ SpannerIO.Write write =
+ SpannerIO.write()
+ .withProjectId(resolveProjectId(options))
+ .withInstanceId(options.getSpannerInstanceId())
+ .withDatabaseId(options.getSpannerDatabaseId());
+ if (!options.getSpannerEmulatorHost().isEmpty()) {
+ write = write.withEmulatorHost(options.getSpannerEmulatorHost());
+ }
+ if (options.getBatchSizeBytes() != null) {
+ write = write.withBatchSizeBytes(options.getBatchSizeBytes());
+ }
+ if (options.getMaxNumRows() != null) {
+ write = write.withMaxNumRows(options.getMaxNumRows());
+ }
+ if (options.getGroupingFactor() != null) {
+ write = write.withGroupingFactor(options.getGroupingFactor());
+ }
+ if (options.getMaxNumMutations() != null) {
+ write = write.withMaxNumMutations(options.getMaxNumMutations());
+ }
+ if (options.getCommitDeadlineSeconds() != null) {
+ write =
+ write.withCommitDeadline(Duration.standardSeconds(options.getCommitDeadlineSeconds()));
+ }
+ return write;
+ }
+
+ static SpannerIO.WriteGrouped buildGroupedWrite(TimeseriesBackfillOptions options) {
+ return new SpannerIO.WriteGrouped(buildWrite(options));
+ }
+
+ static TimestampBound getTimestampBound(TimeseriesBackfillOptions options) {
+ if (options.getReadTimestamp().isEmpty()) {
+ return TimestampBound.strong();
+ }
+ return TimestampBound.ofReadTimestamp(Timestamp.parseTimestamp(options.getReadTimestamp()));
+ }
+
+ static Spanner createSpannerService(TimeseriesBackfillOptions options) {
+ SpannerOptions.Builder builder =
+ SpannerOptions.newBuilder().setProjectId(resolveProjectId(options));
+ if (!options.getSpannerEmulatorHost().isEmpty()) {
+ builder
+ .setEmulatorHost(options.getSpannerEmulatorHost())
+ .setCredentials(NoCredentials.getInstance());
+ }
+ return builder.build().getService();
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptionValidator.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptionValidator.java
new file mode 100644
index 00000000..38df97f5
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptionValidator.java
@@ -0,0 +1,33 @@
+package org.datacommons.ingestion.timeseries;
+
+/** Shared validation for backfill options. */
+final class TimeseriesBackfillOptionValidator {
+ private TimeseriesBackfillOptionValidator() {}
+
+ static void validateCommonOptions(TimeseriesBackfillOptions options) {
+ if ((!options.getStartObservationAbout().isEmpty()
+ || !options.getEndObservationAboutExclusive().isEmpty())
+ && VariableMeasuredFilters.parse(options.getVariableMeasured()).isEmpty()) {
+ throw new IllegalArgumentException(
+ "variableMeasured is required when startObservationAbout or "
+ + "endObservationAboutExclusive is provided.");
+ }
+ validateOptionalPositiveLong(options.getBatchSizeBytes(), "batchSizeBytes");
+ validateOptionalPositiveInteger(options.getMaxNumRows(), "maxNumRows");
+ validateOptionalPositiveInteger(options.getMaxNumMutations(), "maxNumMutations");
+ validateOptionalPositiveInteger(options.getGroupingFactor(), "groupingFactor");
+ validateOptionalPositiveInteger(options.getCommitDeadlineSeconds(), "commitDeadlineSeconds");
+ }
+
+ private static void validateOptionalPositiveLong(Long value, String optionName) {
+ if (value != null && value <= 0) {
+ throw new IllegalArgumentException(optionName + " must be positive when provided.");
+ }
+ }
+
+ private static void validateOptionalPositiveInteger(Integer value, String optionName) {
+ if (value != null && value <= 0) {
+ throw new IllegalArgumentException(optionName + " must be positive when provided.");
+ }
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptions.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptions.java
new file mode 100644
index 00000000..4d3002b2
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptions.java
@@ -0,0 +1,152 @@
+package org.datacommons.ingestion.timeseries;
+
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+
+/** Options shared by the Beam backfill and the standalone validator. */
+public interface TimeseriesBackfillOptions extends GcpOptions {
+ @Description("Legacy fallback for GCP project id. Prefer --project.")
+ @Default.String("datcom-store")
+ String getProjectId();
+
+ void setProjectId(String projectId);
+
+ @Description("Spanner instance id")
+ @Default.String("dc-kg-test")
+ String getSpannerInstanceId();
+
+ void setSpannerInstanceId(String instanceId);
+
+ @Description("Spanner database id")
+ @Default.String("dc_graph_5")
+ String getSpannerDatabaseId();
+
+ void setSpannerDatabaseId(String databaseId);
+
+ @Description("Spanner emulator host")
+ @Default.String("")
+ String getSpannerEmulatorHost();
+
+ void setSpannerEmulatorHost(String emulatorHost);
+
+ @Description("Source Observation table name")
+ @Default.String("Observation")
+ String getSourceObservationTableName();
+
+ void setSourceObservationTableName(String tableName);
+
+ @Description(
+ "Input Spanner export directory containing Observation-manifest.json for the Avro entrypoint")
+ @Default.String("")
+ String getInputExportDir();
+
+ void setInputExportDir(String inputExportDir);
+
+ @Description("Comma-separated Avro file paths for the Avro entrypoint")
+ @Default.String("")
+ String getInputFiles();
+
+ void setInputFiles(String inputFiles);
+
+ @Description("Destination TimeSeries table name")
+ @Default.String("TimeSeries_rk")
+ String getDestinationTimeSeriesTableName();
+
+ void setDestinationTimeSeriesTableName(String tableName);
+
+ @Description("Destination TimeSeriesAttribute table name")
+ @Default.String("TimeSeriesAttribute_rk")
+ String getDestinationTimeSeriesAttributeTableName();
+
+ void setDestinationTimeSeriesAttributeTableName(String tableName);
+
+ @Description("Destination StatVarObservation table name")
+ @Default.String("StatVarObservation_rk")
+ String getDestinationStatVarObservationTableName();
+
+ void setDestinationStatVarObservationTableName(String tableName);
+
+ @Description(
+ "Read timestamp in RFC3339 format for a consistent source snapshot. Empty uses a strong read.")
+ @Default.String("")
+ String getReadTimestamp();
+
+ void setReadTimestamp(String readTimestamp);
+
+ @Description("Inclusive lower bound for source observation_about")
+ @Default.String("")
+ String getStartObservationAbout();
+
+ void setStartObservationAbout(String startObservationAbout);
+
+ @Description("Exclusive upper bound for source observation_about")
+ @Default.String("")
+ String getEndObservationAboutExclusive();
+
+ void setEndObservationAboutExclusive(String endObservationAboutExclusive);
+
+ @Description(
+ "Fixed source variable_measured filter. Accepts one value or a comma-separated list.")
+ @Default.String("")
+ String getVariableMeasured();
+
+ void setVariableMeasured(String variableMeasured);
+
+ @Description("Log local progress every N source rows. Non-positive disables row-progress logs.")
+ @Default.Integer(1000)
+ int getProgressEverySourceRows();
+
+ void setProgressEverySourceRows(int progressEverySourceRows);
+
+ @Description("Emit a local heartbeat log every N seconds. Non-positive disables heartbeats.")
+ @Default.Integer(30)
+ int getHeartbeatSeconds();
+
+ void setHeartbeatSeconds(int heartbeatSeconds);
+
+ @Description("Optional Spanner sink batch size in bytes. When unset, Beam uses its default.")
+ Long getBatchSizeBytes();
+
+ void setBatchSizeBytes(Long batchSizeBytes);
+
+ @Description("Optional Spanner sink max rows per batch. When unset, Beam uses its default.")
+ Integer getMaxNumRows();
+
+ void setMaxNumRows(Integer maxNumRows);
+
+ @Description("Optional Spanner sink max mutations per batch. When unset, Beam uses its default.")
+ Integer getMaxNumMutations();
+
+ void setMaxNumMutations(Integer maxNumMutations);
+
+ @Description("Optional Spanner sink grouping factor. When unset, Beam uses its default.")
+ Integer getGroupingFactor();
+
+ void setGroupingFactor(Integer groupingFactor);
+
+ @Description(
+ "Optional Spanner sink commit deadline in seconds. When unset, Beam uses its default.")
+ Integer getCommitDeadlineSeconds();
+
+ void setCommitDeadlineSeconds(Integer commitDeadlineSeconds);
+
+ @Description("Maximum source series rows to process in Beam mode. Non-positive means no limit.")
+ @Default.Integer(0)
+ int getMaxSeriesRows();
+
+ void setMaxSeriesRows(int maxSeriesRows);
+
+ @Description("Maximum source point rows to process in Beam mode. Non-positive means no limit.")
+ @Default.Integer(0)
+ int getMaxPointRows();
+
+ void setMaxPointRows(int maxPointRows);
+
+ @Description(
+ "Maximum source series rows to process in validator mode. Non-positive means no limit.")
+ @Default.Integer(1000)
+ int getValidatorMaxSeriesRows();
+
+ void setValidatorMaxSeriesRows(int validatorMaxSeriesRows);
+}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java
new file mode 100644
index 00000000..8da32565
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java
@@ -0,0 +1,174 @@
+package org.datacommons.ingestion.timeseries;
+
+import com.google.cloud.spanner.Struct;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
+import org.apache.beam.sdk.io.gcp.spanner.Transaction;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Beam/Dataflow pipeline that backfills the normalized timeseries tables from Observation. */
+public class TimeseriesBackfillPipeline {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TimeseriesBackfillPipeline.class);
+ private static volatile LocalProgressTracker localProgressTracker;
+
+ public static void main(String[] args) {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(TimeseriesBackfillOptions.class);
+ validateOptions(options);
+ LOGGER.info("Starting timeseries backfill with runner {}", runnerName(options));
+ try (LocalProgressTracker progressTracker = createLocalProgressTracker(options)) {
+ setLocalProgressTracker(progressTracker);
+ Pipeline pipeline = Pipeline.create(options);
+ buildPipeline(pipeline, options);
+ PipelineResult result = pipeline.run();
+ LOGGER.info("Timeseries backfill returned runner {}", result.getClass().getSimpleName());
+ } finally {
+ clearLocalProgressTracker();
+ }
+ }
+
+ static void buildPipeline(Pipeline pipeline, TimeseriesBackfillOptions options) {
+ TimeseriesBackfillQueries queries = new TimeseriesBackfillQueries(options);
+ PCollectionView readTransaction =
+ pipeline.apply("CreateReadTransaction", TimeseriesBackfillIO.buildReadTransaction(options));
+
+ PCollection sourceRows =
+ pipeline
+ .apply(
+ "ReadSourceRows",
+ TimeseriesBackfillIO.buildRead(options)
+ .withTransaction(readTransaction)
+ .withQuery(queries.buildSourceQuery(options.getMaxSeriesRows()))
+ .withQueryName("ReadSourceRows"))
+ .apply("NormalizeSourceRows", ParDo.of(new StructToSourceObservationRowsFn()));
+ sourceRows.setCoder(SerializableCoder.of(SourceObservationRow.class));
+ buildPipelineFromSourceRows(sourceRows, options);
+ }
+
+ static void buildPipelineFromSourceRows(
+ PCollection sourceRows, TimeseriesBackfillOptions options) {
+ PCollection mutationGroups =
+ sourceRows.apply(
+ "MapMutationGroups",
+ ParDo.of(
+ new SourceRowsToMutationGroupsFn(
+ options.getDestinationTimeSeriesTableName(),
+ options.getDestinationTimeSeriesAttributeTableName(),
+ options.getDestinationStatVarObservationTableName())));
+
+ mutationGroups.apply("WriteMutationGroups", TimeseriesBackfillIO.buildGroupedWrite(options));
+ }
+
+ static void validateOptions(TimeseriesBackfillOptions options) {
+ TimeseriesBackfillOptionValidator.validateCommonOptions(options);
+ if (options.getMaxSeriesRows() > 0 || options.getMaxPointRows() > 0) {
+ throw new IllegalArgumentException(
+ "Beam row caps are not supported in the Beam/Dataflow entrypoints. "
+ + "Use observation_about sharding or exact Avro file selection, or use the "
+ + "validator for small bounded runs.");
+ }
+ }
+
+ static boolean shouldEnableLocalProgress(TimeseriesBackfillOptions options) {
+ return options.getRunner() == null || DirectRunner.class.equals(options.getRunner());
+ }
+
+ static LocalProgressTracker createLocalProgressTracker(TimeseriesBackfillOptions options) {
+ if (!shouldEnableLocalProgress(options)) {
+ return new LocalProgressTracker("DataflowRunner", 0, 0, LOGGER);
+ }
+ return new LocalProgressTracker(
+ runnerName(options),
+ options.getProgressEverySourceRows(),
+ options.getHeartbeatSeconds(),
+ LOGGER);
+ }
+
+ static String runnerName(TimeseriesBackfillOptions options) {
+ if (options.getRunner() == null) {
+ return DirectRunner.class.getSimpleName();
+ }
+ return options.getRunner().getSimpleName();
+ }
+
+ static void setLocalProgressTracker(LocalProgressTracker progressTracker) {
+ localProgressTracker = progressTracker;
+ }
+
+ static void clearLocalProgressTracker() {
+ localProgressTracker = null;
+ }
+
+ static LocalProgressTracker localProgressTracker() {
+ return localProgressTracker;
+ }
+
+ static final class StructToSourceObservationRowsFn extends DoFn {
+ @ProcessElement
+ public void processElement(@Element Struct row, OutputReceiver out) {
+ out.output(SourceObservationRows.toObservationRow(row));
+ }
+ }
+
+ static final class SourceRowsToMutationGroupsFn
+ extends DoFn {
+ private static final Counter SOURCE_ROWS =
+ Metrics.counter(SourceRowsToMutationGroupsFn.class, "source_rows");
+ private static final Counter TIMESERIES_ROWS =
+ Metrics.counter(SourceRowsToMutationGroupsFn.class, "timeseries_rows_written");
+ private static final Counter TIMESERIES_ATTRIBUTE_ROWS =
+ Metrics.counter(SourceRowsToMutationGroupsFn.class, "timeseries_attribute_rows_written");
+ private static final Counter SOURCE_POINT_ROWS =
+ Metrics.counter(SourceRowsToMutationGroupsFn.class, "source_point_rows");
+ private static final Counter STAT_VAR_OBSERVATION_ROWS =
+ Metrics.counter(SourceRowsToMutationGroupsFn.class, "stat_var_observation_rows_written");
+ private final String timeSeriesTableName;
+ private final String timeSeriesAttributeTableName;
+ private final String statVarObservationTableName;
+
+ SourceRowsToMutationGroupsFn(
+ String timeSeriesTableName,
+ String timeSeriesAttributeTableName,
+ String statVarObservationTableName) {
+ this.timeSeriesTableName = timeSeriesTableName;
+ this.timeSeriesAttributeTableName = timeSeriesAttributeTableName;
+ this.statVarObservationTableName = statVarObservationTableName;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element SourceObservationRow sourceRow, OutputReceiver out) {
+ BackfillMutationGroups mutationGroups =
+ TimeseriesMutationFactory.toMutationGroups(
+ sourceRow,
+ timeSeriesTableName,
+ timeSeriesAttributeTableName,
+ statVarObservationTableName);
+ SOURCE_ROWS.inc();
+ TIMESERIES_ROWS.inc();
+ TIMESERIES_ATTRIBUTE_ROWS.inc(mutationGroups.timeSeriesAttributeRows());
+ SOURCE_POINT_ROWS.inc(mutationGroups.statVarObservationRows());
+ STAT_VAR_OBSERVATION_ROWS.inc(mutationGroups.statVarObservationRows());
+ LocalProgressTracker progressTracker = localProgressTracker;
+ if (progressTracker != null) {
+ progressTracker.recordRow(
+ mutationGroups.timeSeriesAttributeRows(), mutationGroups.statVarObservationRows());
+ }
+
+ for (MutationGroup mutationGroup : mutationGroups.groups()) {
+ out.output(mutationGroup);
+ }
+ }
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueries.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueries.java
new file mode 100644
index 00000000..e7edd50d
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueries.java
@@ -0,0 +1,83 @@
+package org.datacommons.ingestion.timeseries;
+
+import com.google.cloud.spanner.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Builds source Spanner SQL for the timeseries backfill. */
+final class TimeseriesBackfillQueries {
+ private final TimeseriesBackfillOptions options;
+
+ TimeseriesBackfillQueries(TimeseriesBackfillOptions options) {
+ this.options = options;
+ }
+
+ Statement buildSourceQuery() {
+ return buildSourceQuery(0);
+ }
+
+ Statement buildSourceQuery(int limit) {
+ return buildQuery(
+ """
+ SELECT
+ observation_about,
+ variable_measured,
+ facet_id,
+ observation_period,
+ measurement_method,
+ unit,
+ scaling_factor,
+ import_name,
+ provenance_url,
+ is_dc_aggregate,
+ provenance,
+ observations
+ FROM %s"""
+ .formatted(options.getSourceObservationTableName()),
+ "",
+ limit);
+ }
+
+ private Statement buildQuery(String selectAndFrom, String columnPrefix, int limit) {
+ StringBuilder sql = new StringBuilder(selectAndFrom);
+ Statement.Builder builder = Statement.newBuilder(selectAndFrom);
+ List filters = new ArrayList<>();
+ List variableMeasuredFilters =
+ VariableMeasuredFilters.parse(options.getVariableMeasured());
+
+ if (!options.getStartObservationAbout().isEmpty()) {
+ filters.add(columnPrefix + "observation_about >= @startObservationAbout");
+ builder.bind("startObservationAbout").to(options.getStartObservationAbout());
+ }
+ if (!options.getEndObservationAboutExclusive().isEmpty()) {
+ filters.add(columnPrefix + "observation_about < @endObservationAboutExclusive");
+ builder.bind("endObservationAboutExclusive").to(options.getEndObservationAboutExclusive());
+ }
+ if (!variableMeasuredFilters.isEmpty()) {
+ if (variableMeasuredFilters.size() == 1) {
+ filters.add(columnPrefix + "variable_measured = @variableMeasured");
+ builder.bind("variableMeasured").to(variableMeasuredFilters.get(0));
+ } else {
+ filters.add(columnPrefix + "variable_measured IN UNNEST(@variableMeasuredList)");
+ builder.bind("variableMeasuredList").toStringArray(variableMeasuredFilters);
+ }
+ }
+
+ if (!filters.isEmpty()) {
+ sql.append(" WHERE ").append(String.join(" AND ", filters));
+ }
+
+ if (limit > 0) {
+ sql.append(" ORDER BY ")
+ .append(columnPrefix)
+ .append("observation_about, ")
+ .append(columnPrefix)
+ .append("variable_measured, ")
+ .append(columnPrefix)
+ .append("facet_id");
+ sql.append(" LIMIT ").append(limit);
+ }
+
+ return builder.build().withReplacedSql(sql.toString());
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillValidator.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillValidator.java
new file mode 100644
index 00000000..b8c35a36
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillValidator.java
@@ -0,0 +1,144 @@
+package org.datacommons.ingestion.timeseries;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.ReadOnlyTransaction;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Spanner;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Small direct runner for validating the normalized backfill on a bounded shard. */
+public class TimeseriesBackfillValidator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TimeseriesBackfillValidator.class);
+ private static final int MAX_VALIDATOR_WRITE_BATCH_SIZE = 1000;
+
+ public static void main(String[] args) {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(TimeseriesBackfillOptions.class);
+ TimeseriesBackfillOptionValidator.validateCommonOptions(options);
+ LOGGER.info("Starting timeseries validator");
+
+ TimeseriesBackfillQueries queries = new TimeseriesBackfillQueries(options);
+ DatabaseId databaseId =
+ DatabaseId.of(
+ TimeseriesBackfillIO.resolveProjectId(options),
+ options.getSpannerInstanceId(),
+ options.getSpannerDatabaseId());
+
+ try (LocalProgressTracker progressTracker =
+ new LocalProgressTracker(
+ "validator",
+ options.getProgressEverySourceRows(),
+ options.getHeartbeatSeconds(),
+ LOGGER);
+ Spanner spanner = TimeseriesBackfillIO.createSpannerService(options);
+ ReadOnlyTransaction readOnlyTransaction =
+ spanner
+ .getDatabaseClient(databaseId)
+ .readOnlyTransaction(TimeseriesBackfillIO.getTimestampBound(options))) {
+ DatabaseClient databaseClient = spanner.getDatabaseClient(databaseId);
+ Timestamp writeTimestamp =
+ validate(databaseClient, readOnlyTransaction, queries, options, progressTracker);
+ LOGGER.info("Validator finished. Final write timestamp: {}", writeTimestamp);
+ }
+ }
+
+ private static Timestamp validate(
+ DatabaseClient databaseClient,
+ ReadOnlyTransaction readOnlyTransaction,
+ TimeseriesBackfillQueries queries,
+ TimeseriesBackfillOptions options,
+ LocalProgressTracker progressTracker) {
+ int sourceRows = 0;
+ int timeSeriesRows = 0;
+ int timeSeriesAttributeRows = 0;
+ int statVarObservationRows = 0;
+ List sampleSeriesIds = new ArrayList<>();
+ List pendingMutations = new ArrayList<>();
+ Timestamp lastWriteTimestamp = null;
+
+ try (ResultSet resultSet =
+ readOnlyTransaction.executeQuery(
+ queries.buildSourceQuery(options.getValidatorMaxSeriesRows()))) {
+ while (resultSet.next()) {
+ SourceObservationRow row =
+ SourceObservationRows.toObservationRow(resultSet.getCurrentRowAsStruct());
+ BackfillMutationGroups mutationGroups =
+ TimeseriesMutationFactory.toMutationGroups(
+ row,
+ options.getDestinationTimeSeriesTableName(),
+ options.getDestinationTimeSeriesAttributeTableName(),
+ options.getDestinationStatVarObservationTableName());
+ sourceRows++;
+ if (sampleSeriesIds.size() < 5) {
+ sampleSeriesIds.add(TimeseriesMutationFactory.seriesId(row.seriesRow()));
+ }
+
+ timeSeriesRows++;
+ timeSeriesAttributeRows += mutationGroups.timeSeriesAttributeRows();
+ statVarObservationRows += mutationGroups.statVarObservationRows();
+ progressTracker.recordRow(
+ mutationGroups.timeSeriesAttributeRows(), mutationGroups.statVarObservationRows());
+ for (MutationGroup mutationGroup : mutationGroups.groups()) {
+ for (Mutation mutation : mutationGroup) {
+ pendingMutations.add(mutation);
+ }
+ }
+
+ lastWriteTimestamp =
+ flushMutationsIfNeeded(
+ databaseClient, pendingMutations, lastWriteTimestamp, progressTracker);
+ }
+ }
+
+ lastWriteTimestamp =
+ flushMutationsIfNeeded(
+ databaseClient, pendingMutations, lastWriteTimestamp, progressTracker, true);
+ LOGGER.info(
+ "Validated {} source rows into {} TimeSeries rows, {} TimeSeriesAttribute rows, and {} StatVarObservation rows. Sample series ids: {}",
+ sourceRows,
+ timeSeriesRows,
+ timeSeriesAttributeRows,
+ statVarObservationRows,
+ sampleSeriesIds);
+ return lastWriteTimestamp;
+ }
+
+ private static Timestamp flushMutationsIfNeeded(
+ DatabaseClient databaseClient,
+ List pendingMutations,
+ Timestamp lastWriteTimestamp,
+ LocalProgressTracker progressTracker) {
+ return flushMutationsIfNeeded(
+ databaseClient, pendingMutations, lastWriteTimestamp, progressTracker, false);
+ }
+
+ private static Timestamp flushMutationsIfNeeded(
+ DatabaseClient databaseClient,
+ List pendingMutations,
+ Timestamp lastWriteTimestamp,
+ LocalProgressTracker progressTracker,
+ boolean flushAll) {
+ if (pendingMutations.isEmpty()) {
+ return lastWriteTimestamp;
+ }
+ if (!flushAll && pendingMutations.size() < MAX_VALIDATOR_WRITE_BATCH_SIZE) {
+ return lastWriteTimestamp;
+ }
+
+ int mutationCount = pendingMutations.size();
+ Timestamp writeTimestamp = databaseClient.writeAtLeastOnce(pendingMutations);
+ if (progressTracker.isEnabled()) {
+ progressTracker.recordValidatorFlush(mutationCount, writeTimestamp);
+ }
+ pendingMutations.clear();
+ return writeTimestamp;
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java
new file mode 100644
index 00000000..03a03780
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java
@@ -0,0 +1,163 @@
+package org.datacommons.ingestion.timeseries;
+
+import com.google.cloud.spanner.Mutation;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
+import org.datacommons.Storage.Observations;
+
+/** Converts source rows into normalized timeseries table mutations. */
+final class TimeseriesMutationFactory {
+ private static final int MAX_GROUP_MUTATIONS = 1000;
+
+ private TimeseriesMutationFactory() {}
+
+ static Mutation toTimeSeriesMutation(SourceSeriesRow row, String tableName) {
+ return Mutation.newInsertOrUpdateBuilder(tableName)
+ .set("id")
+ .to(seriesId(row))
+ .set("variable_measured")
+ .to(row.variableMeasured())
+ .set("provenance")
+ .to(row.provenance())
+ .build();
+ }
+
+ static List toTimeSeriesAttributeMutations(SourceSeriesRow row, String tableName) {
+ List mutations = new ArrayList<>();
+ addAttribute(mutations, tableName, row, "observationAbout", row.observationAbout());
+ addAttribute(mutations, tableName, row, "facetId", row.facetId());
+ addAttributeIfPresent(mutations, tableName, row, "importName", row.importName());
+ addAttributeIfPresent(mutations, tableName, row, "provenanceUrl", row.provenanceUrl());
+ addAttributeIfPresent(mutations, tableName, row, "observationPeriod", row.observationPeriod());
+ addAttributeIfPresent(mutations, tableName, row, "measurementMethod", row.measurementMethod());
+ addAttributeIfPresent(mutations, tableName, row, "unit", row.unit());
+ addAttributeIfPresent(mutations, tableName, row, "scalingFactor", row.scalingFactor());
+ addAttribute(mutations, tableName, row, "isDcAggregate", Boolean.toString(row.isDcAggregate()));
+ return mutations;
+ }
+
+ static Mutation toStatVarObservationMutation(SourcePointRow row, String tableName) {
+ return Mutation.newInsertOrUpdateBuilder(tableName)
+ .set("id")
+ .to(seriesId(row))
+ .set("date")
+ .to(row.date())
+ .set("value")
+ .to(row.value())
+ .build();
+ }
+
+ private static void addAttributeIfPresent(
+ List mutations,
+ String tableName,
+ SourceSeriesRow row,
+ String property,
+ String value) {
+ if (value == null || value.isEmpty()) {
+ return;
+ }
+ addAttribute(mutations, tableName, row, property, value);
+ }
+
+ private static void addAttribute(
+ List mutations,
+ String tableName,
+ SourceSeriesRow row,
+ String property,
+ String value) {
+ mutations.add(
+ Mutation.newInsertOrUpdateBuilder(tableName)
+ .set("id")
+ .to(seriesId(row))
+ .set("property")
+ .to(property)
+ .set("value")
+ .to(value)
+ .build());
+ }
+
+ static BackfillMutationGroups toMutationGroups(
+ SourceObservationRow row,
+ String timeSeriesTableName,
+ String timeSeriesAttributeTableName,
+ String statVarObservationTableName) {
+ Mutation timeSeriesMutation = toTimeSeriesMutation(row.seriesRow(), timeSeriesTableName);
+ List attributeMutations =
+ toTimeSeriesAttributeMutations(row.seriesRow(), timeSeriesAttributeTableName);
+ List pointMutations = new ArrayList<>();
+ for (SourcePointRow pointRow : row.pointRows()) {
+ pointMutations.add(toStatVarObservationMutation(pointRow, statVarObservationTableName));
+ }
+
+ List mutationGroups = new ArrayList<>();
+ List attached = new ArrayList<>(attributeMutations);
+ int maxAttachedMutations = MAX_GROUP_MUTATIONS - 1;
+ int pointIndex = 0;
+ while (pointIndex < pointMutations.size() && attached.size() < maxAttachedMutations) {
+ attached.add(pointMutations.get(pointIndex));
+ pointIndex++;
+ }
+ mutationGroups.add(MutationGroup.create(timeSeriesMutation, attached));
+
+ while (pointIndex < pointMutations.size()) {
+ attached = new ArrayList<>();
+ while (pointIndex < pointMutations.size() && attached.size() < maxAttachedMutations) {
+ attached.add(pointMutations.get(pointIndex));
+ pointIndex++;
+ }
+ mutationGroups.add(MutationGroup.create(timeSeriesMutation, attached));
+ }
+
+ return new BackfillMutationGroups(
+ mutationGroups, attributeMutations.size(), pointMutations.size());
+ }
+
+ static BackfillMutationGroups toMutationGroups(
+ CompactSourceObservationRow row,
+ String timeSeriesTableName,
+ String timeSeriesAttributeTableName,
+ String statVarObservationTableName) {
+ Mutation timeSeriesMutation = toTimeSeriesMutation(row.seriesRow(), timeSeriesTableName);
+ List attributeMutations =
+ toTimeSeriesAttributeMutations(row.seriesRow(), timeSeriesAttributeTableName);
+ Observations observations =
+ SourceObservationRows.parseObservations(row.observationsProtoBytes());
+
+ List mutationGroups = new ArrayList<>();
+ List attached = new ArrayList<>(attributeMutations);
+ int maxAttachedMutations = MAX_GROUP_MUTATIONS - 1;
+ int pointMutationCount = 0;
+ for (Map.Entry entry : observations.getValuesMap().entrySet()) {
+ if (attached.size() >= maxAttachedMutations) {
+ mutationGroups.add(MutationGroup.create(timeSeriesMutation, attached));
+ attached = new ArrayList<>();
+ }
+ attached.add(
+ toStatVarObservationMutation(
+ new SourcePointRow(
+ row.seriesRow().observationAbout(),
+ row.seriesRow().variableMeasured(),
+ row.seriesRow().facetId(),
+ entry.getKey(),
+ entry.getValue()),
+ statVarObservationTableName));
+ pointMutationCount++;
+ }
+ mutationGroups.add(MutationGroup.create(timeSeriesMutation, attached));
+ return new BackfillMutationGroups(
+ mutationGroups, attributeMutations.size(), pointMutationCount);
+ }
+
+ static String seriesId(SourceSeriesRow row) {
+ return SeriesIdGenerator.build(row.variableMeasured(), row.observationAbout(), row.facetId());
+ }
+
+ static String seriesId(SourcePointRow row) {
+ return SeriesIdGenerator.build(row.variableMeasured(), row.observationAbout(), row.facetId());
+ }
+}
+
+record BackfillMutationGroups(
+ List groups, int timeSeriesAttributeRows, int statVarObservationRows) {}
diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/VariableMeasuredFilters.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/VariableMeasuredFilters.java
new file mode 100644
index 00000000..a6d8e68b
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/VariableMeasuredFilters.java
@@ -0,0 +1,23 @@
+package org.datacommons.ingestion.timeseries;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Parses the variable_measured option into one or more filter values. */
+final class VariableMeasuredFilters {
+ private VariableMeasuredFilters() {}
+
+ static List parse(String value) {
+ List filters = new ArrayList<>();
+ if (value == null || value.isEmpty()) {
+ return filters;
+ }
+ for (String part : value.split(",")) {
+ String trimmed = part.trim();
+ if (!trimmed.isEmpty()) {
+ filters.add(trimmed);
+ }
+ }
+ return filters;
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/LocalProgressTrackerTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/LocalProgressTrackerTest.java
new file mode 100644
index 00000000..ae7748f0
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/LocalProgressTrackerTest.java
@@ -0,0 +1,58 @@
+package org.datacommons.ingestion.timeseries;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.Test;
+
+public class LocalProgressTrackerTest {
+ @Test
+ public void recordRow_logsAtConfiguredInterval() {
+ List messages = new ArrayList<>();
+ AtomicLong nowMillis = new AtomicLong(1_000L);
+ LocalProgressTracker tracker =
+ new LocalProgressTracker("DirectRunner", 2, 0, messages::add, nowMillis::get, null);
+
+ tracker.recordRow(3, 7);
+ tracker.recordRow(5, 11);
+
+ assertEquals(1, messages.size());
+ assertTrue(messages.get(0).contains("DirectRunner progress"));
+ assertTrue(messages.get(0).contains("source_rows=2"));
+ assertTrue(messages.get(0).contains("timeseries_attribute_rows=8"));
+ assertTrue(messages.get(0).contains("stat_var_observation_rows=18"));
+ }
+
+ @Test
+ public void logHeartbeatNow_reportsNoRowsYet() {
+ List messages = new ArrayList<>();
+ AtomicLong nowMillis = new AtomicLong(31_000L);
+ LocalProgressTracker tracker =
+ new LocalProgressTracker("validator", 0, 30, messages::add, nowMillis::get, null);
+
+ tracker.logHeartbeatNow();
+
+ assertEquals(1, messages.size());
+ assertTrue(messages.get(0).contains("validator heartbeat"));
+ assertTrue(messages.get(0).contains("source_rows=0"));
+ assertTrue(messages.get(0).contains("no_source_rows_yet=true"));
+ }
+
+ @Test
+ public void recordValidatorFlush_logsMutationCountAndTimestamp() {
+ List messages = new ArrayList<>();
+ LocalProgressTracker tracker =
+ new LocalProgressTracker("validator", 1, 0, messages::add, System::currentTimeMillis, null);
+
+ tracker.recordValidatorFlush(12, Timestamp.parseTimestamp("2026-04-23T00:00:00Z"));
+
+ assertEquals(1, messages.size());
+ assertTrue(messages.get(0).contains("validator flush"));
+ assertTrue(messages.get(0).contains("mutations=12"));
+ assertTrue(messages.get(0).contains("2026-04-23T00:00:00Z"));
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/ObservationExportFilesTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/ObservationExportFilesTest.java
new file mode 100644
index 00000000..7c5e7a3d
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/ObservationExportFilesTest.java
@@ -0,0 +1,72 @@
+package org.datacommons.ingestion.timeseries;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.gson.JsonParser;
+import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+public class ObservationExportFilesTest {
+ @Test
+ public void validateOptions_rejectsMissingAndBothSourceModes() {
+ TimeseriesBackfillOptions missingOptions =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ assertInvalid(missingOptions);
+
+ TimeseriesBackfillOptions bothOptions =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ bothOptions.setInputExportDir("gs://bucket/export");
+ bothOptions.setInputFiles("gs://bucket/export/Observation.avro-00000-of-00001");
+ assertInvalid(bothOptions);
+ }
+
+ @Test
+ public void resolveInputFiles_parsesCsvFileList() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ options.setInputFiles("gs://bucket/a.avro, gs://bucket/b.avro ");
+
+ assertEquals(
+ List.of("gs://bucket/a.avro", "gs://bucket/b.avro"),
+ ObservationExportFiles.resolveInputFiles(options));
+ }
+
+ @Test
+ public void parseManifest_resolvesRelativeAndAbsoluteObservationFiles() {
+ List files =
+ ObservationExportFiles.parseManifest(
+ JsonParser.parseString(
+ """
+ {
+ "files": [
+ "Observation.avro-00000-of-00002",
+ "nested/Observation.avro-00001-of-00002",
+ "gs://other/export/Observation.avro-00002-of-00002",
+ "Other.avro-00000-of-00001"
+ ]
+ }
+ """),
+ "gs://bucket/export",
+ "Observation");
+
+ assertEquals(
+ List.of(
+ "gs://bucket/export/Observation.avro-00000-of-00002",
+ "gs://bucket/export/nested/Observation.avro-00001-of-00002",
+ "gs://other/export/Observation.avro-00002-of-00002"),
+ files);
+ }
+
+ private static void assertInvalid(TimeseriesBackfillOptions options) {
+ try {
+ ObservationExportFiles.validateOptions(options);
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "Exactly one of inputExportDir or inputFiles must be provided for the Avro pipeline.",
+ expected.getMessage());
+ }
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java
new file mode 100644
index 00000000..b8f7a51d
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java
@@ -0,0 +1,154 @@
+package org.datacommons.ingestion.timeseries;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.spanner.Struct;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.datacommons.Storage.Observations;
+import org.junit.Test;
+
+public class SourceObservationRowsTest {
+ @Test
+ public void toObservationRow_expandsObservationProtoFromStruct() {
+ Struct row =
+ Struct.newBuilder()
+ .set("observation_about")
+ .to("geoId/06")
+ .set("variable_measured")
+ .to("Count_Person")
+ .set("facet_id")
+ .to("123")
+ .set("observation_period")
+ .to((String) null)
+ .set("measurement_method")
+ .to((String) null)
+ .set("unit")
+ .to((String) null)
+ .set("scaling_factor")
+ .to((String) null)
+ .set("import_name")
+ .to("TestImport")
+ .set("provenance_url")
+ .to((String) null)
+ .set("is_dc_aggregate")
+ .to(false)
+ .set("provenance")
+ .to("dc/base/WrongImport")
+ .set("observations")
+ .to(
+ ByteArray.copyFrom(
+ Observations.newBuilder()
+ .putValues("2023", "1")
+ .putValues("2024", "2")
+ .build()
+ .toByteArray()))
+ .build();
+
+ SourceObservationRow observationRow = SourceObservationRows.toObservationRow(row);
+ List rows = observationRow.pointRows();
+ rows.sort(Comparator.comparing(SourcePointRow::date));
+
+ assertEquals("geoId/06", observationRow.seriesRow().observationAbout());
+ assertEquals("Count_Person", observationRow.seriesRow().variableMeasured());
+ assertEquals("dc/base/TestImport", observationRow.seriesRow().provenance());
+ assertEquals(2, rows.size());
+ assertEquals(new SourcePointRow("geoId/06", "Count_Person", "123", "2023", "1"), rows.get(0));
+ assertEquals(new SourcePointRow("geoId/06", "Count_Person", "123", "2024", "2"), rows.get(1));
+ }
+
+ @Test
+ public void toObservationRow_expandsObservationProtoFromGenericRecord() {
+ GenericRecord row = new GenericData.Record(schemaWithoutGeneratedProvenance());
+ row.put("observation_about", "geoId/06");
+ row.put("variable_measured", "Count_Person");
+ row.put("facet_id", "123");
+ row.put("observation_period", null);
+ row.put("measurement_method", null);
+ row.put("unit", null);
+ row.put("scaling_factor", null);
+ row.put("import_name", "TestImport");
+ row.put("provenance_url", "https://example.com");
+ row.put("is_dc_aggregate", true);
+ row.put(
+ "observations",
+ ByteBuffer.wrap(
+ Observations.newBuilder()
+ .putValues("2023", "1")
+ .putValues("2024", "2")
+ .build()
+ .toByteArray()));
+
+ SourceObservationRow observationRow = SourceObservationRows.toObservationRow(row);
+ List rows = observationRow.pointRows();
+ rows.sort(Comparator.comparing(SourcePointRow::date));
+
+ assertEquals("geoId/06", observationRow.seriesRow().observationAbout());
+ assertEquals("Count_Person", observationRow.seriesRow().variableMeasured());
+ assertEquals("dc/base/TestImport", observationRow.seriesRow().provenance());
+ assertEquals("https://example.com", observationRow.seriesRow().provenanceUrl());
+ assertEquals(2, rows.size());
+ assertEquals(new SourcePointRow("geoId/06", "Count_Person", "123", "2023", "1"), rows.get(0));
+ assertEquals(new SourcePointRow("geoId/06", "Count_Person", "123", "2024", "2"), rows.get(1));
+ }
+
+ @Test
+ public void toCompactObservationRow_keepsSeriesMetadataAndRawProtoBytes() {
+ byte[] observationBytes =
+ Observations.newBuilder()
+ .putValues("2023", "1")
+ .putValues("2024", "2")
+ .build()
+ .toByteArray();
+ GenericRecord row = new GenericData.Record(schemaWithoutGeneratedProvenance());
+ row.put("observation_about", "geoId/06");
+ row.put("variable_measured", "Count_Person");
+ row.put("facet_id", "123");
+ row.put("observation_period", null);
+ row.put("measurement_method", null);
+ row.put("unit", null);
+ row.put("scaling_factor", null);
+ row.put("import_name", "TestImport");
+ row.put("provenance_url", "https://example.com");
+ row.put("is_dc_aggregate", true);
+ row.put("observations", ByteBuffer.wrap(observationBytes));
+
+ CompactSourceObservationRow observationRow = SourceObservationRows.toCompactObservationRow(row);
+
+ assertEquals("geoId/06", observationRow.seriesRow().observationAbout());
+ assertEquals("Count_Person", observationRow.seriesRow().variableMeasured());
+ assertEquals("dc/base/TestImport", observationRow.seriesRow().provenance());
+ assertEquals("https://example.com", observationRow.seriesRow().provenanceUrl());
+ assertArrayEquals(observationBytes, observationRow.observationsProtoBytes());
+ }
+
+ private static Schema schemaWithoutGeneratedProvenance() {
+ return new Schema.Parser()
+ .parse(
+ """
+ {
+ "type": "record",
+ "name": "Observation",
+ "fields": [
+ {"name": "observation_about", "type": "string"},
+ {"name": "variable_measured", "type": "string"},
+ {"name": "facet_id", "type": "string"},
+ {"name": "observation_period", "type": ["null", "string"], "default": null},
+ {"name": "measurement_method", "type": ["null", "string"], "default": null},
+ {"name": "unit", "type": ["null", "string"], "default": null},
+ {"name": "scaling_factor", "type": ["null", "string"], "default": null},
+ {"name": "import_name", "type": "string"},
+ {"name": "provenance_url", "type": ["null", "string"], "default": null},
+ {"name": "is_dc_aggregate", "type": ["null", "boolean"], "default": null},
+ {"name": "observations", "type": "bytes"}
+ ]
+ }
+ """);
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipelineTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipelineTest.java
new file mode 100644
index 00000000..443727ca
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipelineTest.java
@@ -0,0 +1,67 @@
+package org.datacommons.ingestion.timeseries;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+public class TimeseriesBackfillAvroPipelineTest {
+ @Test
+ public void validateOptions_acceptsExportDirAndInputFilesModes() {
+ TimeseriesBackfillOptions exportDirOptions =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ exportDirOptions.setInputExportDir("gs://bucket/export");
+
+ TimeseriesBackfillOptions inputFilesOptions =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ inputFilesOptions.setInputFiles("gs://bucket/export/Observation.avro-00000-of-00001");
+
+ TimeseriesBackfillAvroPipeline.validateOptions(exportDirOptions);
+ TimeseriesBackfillAvroPipeline.validateOptions(inputFilesOptions);
+ }
+
+ @Test
+ public void validateOptions_rejectsBeamRowCapsForAvroPipeline() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ options.setInputExportDir("gs://bucket/export");
+ options.setMaxSeriesRows(10);
+
+ try {
+ TimeseriesBackfillAvroPipeline.validateOptions(options);
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Beam row caps are not supported"));
+ }
+ }
+
+ @Test
+ public void matchesFilters_appliesObservationAboutAndVariableMeasuredBounds() {
+ SourceSeriesRow seriesRow =
+ new SourceSeriesRow(
+ "geoId/06",
+ "Count_Person",
+ "123",
+ "",
+ "",
+ "",
+ "",
+ "TestImport",
+ "",
+ false,
+ "dc/base/TestImport");
+
+ assertTrue(
+ TimeseriesBackfillAvroPipeline.matchesFilters(
+ seriesRow, "geoId/06", "geoId/07", List.of("Count_Person")));
+ assertFalse(
+ TimeseriesBackfillAvroPipeline.matchesFilters(
+ seriesRow, "geoId/07", "", List.of("Count_Person")));
+ assertFalse(
+ TimeseriesBackfillAvroPipeline.matchesFilters(
+ seriesRow, "", "", List.of("Min_Temperature")));
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipelineTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipelineTest.java
new file mode 100644
index 00000000..d230a7ca
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipelineTest.java
@@ -0,0 +1,181 @@
+package org.datacommons.ingestion.timeseries;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.lang.reflect.Method;
+import java.util.OptionalInt;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.joda.time.Duration;
+import org.junit.Test;
+
+public class TimeseriesBackfillPipelineTest {
+ @Test
+ public void options_haveLocalProgressDefaults() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+
+ assertEquals(1000, options.getProgressEverySourceRows());
+ assertEquals(30, options.getHeartbeatSeconds());
+ }
+
+ @Test
+ public void shouldEnableLocalProgress_trueForDefaultAndDirectRunner() {
+ TimeseriesBackfillOptions defaultOptions =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ TimeseriesBackfillOptions directOptions =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ directOptions.setRunner(DirectRunner.class);
+
+ assertTrue(TimeseriesBackfillPipeline.shouldEnableLocalProgress(defaultOptions));
+ assertTrue(TimeseriesBackfillPipeline.shouldEnableLocalProgress(directOptions));
+ }
+
+ @Test
+ public void shouldEnableLocalProgress_falseForDataflowRunner() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ options.setRunner(DataflowRunner.class);
+
+ assertFalse(TimeseriesBackfillPipeline.shouldEnableLocalProgress(options));
+ }
+
+ @Test
+ public void resolveProjectId_prefersProjectAndFallsBackToProjectId() {
+ TimeseriesBackfillOptions projectOptions =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ projectOptions.setProject("beam-project");
+ projectOptions.setProjectId("legacy-project");
+
+ TimeseriesBackfillOptions legacyOptions =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ legacyOptions.setProject("");
+ legacyOptions.setProjectId("legacy-project");
+
+ assertEquals("beam-project", TimeseriesBackfillIO.resolveProjectId(projectOptions));
+ assertEquals("legacy-project", TimeseriesBackfillIO.resolveProjectId(legacyOptions));
+ }
+
+ @Test
+ public void options_leaveOptionalSpannerSinkFlagsUnsetByDefault() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+
+ assertNull(options.getBatchSizeBytes());
+ assertNull(options.getMaxNumRows());
+ assertNull(options.getMaxNumMutations());
+ assertNull(options.getGroupingFactor());
+ assertNull(options.getCommitDeadlineSeconds());
+ }
+
+ @Test
+ public void options_parseOptionalSpannerSinkFlagsFromArgs() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.fromArgs(
+ "--batchSizeBytes=1024",
+ "--maxNumRows=100",
+ "--maxNumMutations=500",
+ "--groupingFactor=20",
+ "--commitDeadlineSeconds=45")
+ .as(TimeseriesBackfillOptions.class);
+
+ assertEquals(Long.valueOf(1024), options.getBatchSizeBytes());
+ assertEquals(Integer.valueOf(100), options.getMaxNumRows());
+ assertEquals(Integer.valueOf(500), options.getMaxNumMutations());
+ assertEquals(Integer.valueOf(20), options.getGroupingFactor());
+ assertEquals(Integer.valueOf(45), options.getCommitDeadlineSeconds());
+ }
+
+ @Test
+ public void validateOptions_rejectsNonPositiveOptionalSpannerSinkFlags() {
+ assertRejectsInvalidWriteOption("batchSizeBytes", "batchSizeBytes must be positive");
+ assertRejectsInvalidWriteOption("maxNumRows", "maxNumRows must be positive");
+ assertRejectsInvalidWriteOption("maxNumMutations", "maxNumMutations must be positive");
+ assertRejectsInvalidWriteOption("groupingFactor", "groupingFactor must be positive");
+ assertRejectsInvalidWriteOption(
+ "commitDeadlineSeconds", "commitDeadlineSeconds must be positive");
+ }
+
+ @Test
+ public void buildWrite_appliesOptionalSpannerSinkFlagsWhenProvided() throws Exception {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ options.setProject("beam-project");
+ options.setSpannerInstanceId("instance");
+ options.setSpannerDatabaseId("database");
+ options.setBatchSizeBytes(1024L);
+ options.setMaxNumRows(100);
+ options.setMaxNumMutations(500);
+ options.setGroupingFactor(20);
+ options.setCommitDeadlineSeconds(45);
+
+ SpannerIO.Write write = TimeseriesBackfillIO.buildWrite(options);
+
+ assertEquals(1024L, invokeLong(write, "getBatchSizeBytes"));
+ assertEquals(100L, invokeLong(write, "getMaxNumRows"));
+ assertEquals(500L, invokeLong(write, "getMaxNumMutations"));
+ assertEquals(OptionalInt.of(20), invokeGroupingFactor(write));
+ assertEquals(
+ Duration.standardSeconds(45), invokeSpannerConfig(write).getCommitDeadline().get());
+ }
+
+ private static void assertRejectsInvalidWriteOption(String optionName, String messageFragment) {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ setInvalidWriteOption(options, optionName);
+
+ try {
+ TimeseriesBackfillOptionValidator.validateCommonOptions(options);
+ fail("Expected IllegalArgumentException for " + optionName);
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains(messageFragment));
+ }
+ }
+
+ private static void setInvalidWriteOption(TimeseriesBackfillOptions options, String optionName) {
+ switch (optionName) {
+ case "batchSizeBytes":
+ options.setBatchSizeBytes(0L);
+ return;
+ case "maxNumRows":
+ options.setMaxNumRows(0);
+ return;
+ case "maxNumMutations":
+ options.setMaxNumMutations(0);
+ return;
+ case "groupingFactor":
+ options.setGroupingFactor(0);
+ return;
+ case "commitDeadlineSeconds":
+ options.setCommitDeadlineSeconds(0);
+ return;
+ default:
+ throw new IllegalArgumentException("Unexpected option " + optionName);
+ }
+ }
+
+ private static long invokeLong(SpannerIO.Write write, String methodName) throws Exception {
+ Method method = SpannerIO.Write.class.getDeclaredMethod(methodName);
+ method.setAccessible(true);
+ return (long) method.invoke(write);
+ }
+
+ private static OptionalInt invokeGroupingFactor(SpannerIO.Write write) throws Exception {
+ Method method = SpannerIO.Write.class.getDeclaredMethod("getGroupingFactor");
+ method.setAccessible(true);
+ return (OptionalInt) method.invoke(write);
+ }
+
+ private static SpannerConfig invokeSpannerConfig(SpannerIO.Write write) throws Exception {
+ Method method = SpannerIO.Write.class.getDeclaredMethod("getSpannerConfig");
+ method.setAccessible(true);
+ return (SpannerConfig) method.invoke(write);
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueriesTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueriesTest.java
new file mode 100644
index 00000000..f53d9951
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueriesTest.java
@@ -0,0 +1,101 @@
+package org.datacommons.ingestion.timeseries;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.cloud.spanner.Statement;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+public class TimeseriesBackfillQueriesTest {
+ @Test
+ public void buildSourceQuery_selectsMetadataObservationsAndShardFilters() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ options.setSourceObservationTableName("Observation");
+ options.setVariableMeasured("Count_Person");
+ options.setStartObservationAbout("geoId/06");
+ options.setEndObservationAboutExclusive("geoId/07");
+
+ Statement query = new TimeseriesBackfillQueries(options).buildSourceQuery();
+
+ assertTrue(query.getSql().contains("provenance"));
+ assertTrue(query.getSql().contains("observations"));
+ assertTrue(query.getSql().contains("FROM Observation"));
+ assertTrue(query.getSql().contains("observation_about >= @startObservationAbout"));
+ assertTrue(query.getSql().contains("observation_about < @endObservationAboutExclusive"));
+ assertTrue(query.getSql().contains("variable_measured = @variableMeasured"));
+ assertFalse(query.getSql().contains("UNNEST"));
+ assertTrue(query.hasBinding("startObservationAbout"));
+ assertTrue(query.hasBinding("endObservationAboutExclusive"));
+ assertTrue(query.hasBinding("variableMeasured"));
+ }
+
+ @Test
+ public void buildSourceQuery_supportsCommaSeparatedVariableMeasuredFilters() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ options.setSourceObservationTableName("Observation");
+ options.setVariableMeasured("Count_Person, Min_Temperature ,Max_Temperature");
+
+ Statement query = new TimeseriesBackfillQueries(options).buildSourceQuery();
+
+ assertTrue(query.getSql().contains("variable_measured IN UNNEST(@variableMeasuredList)"));
+ assertFalse(query.hasBinding("variableMeasured"));
+ assertTrue(query.hasBinding("variableMeasuredList"));
+ }
+
+ @Test
+ public void buildSourceQuery_supportsBoundedValidatorRuns() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ options.setSourceObservationTableName("Observation");
+
+ Statement query = new TimeseriesBackfillQueries(options).buildSourceQuery(10);
+
+ assertTrue(query.getSql().contains("provenance"));
+ assertTrue(query.getSql().contains("observations"));
+ assertTrue(query.getSql().contains("LIMIT 10"));
+ assertTrue(query.getSql().contains("ORDER BY observation_about"));
+ }
+
+ @Test
+ public void validateOptions_rejectsObservationAboutRangeWithoutVariableMeasured() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ options.setStartObservationAbout("geoId/06");
+
+ try {
+ TimeseriesBackfillPipeline.validateOptions(options);
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("variableMeasured is required"));
+ }
+ }
+
+ @Test
+ public void validateOptions_acceptsObservationAboutRangeWithCommaSeparatedVariableMeasured() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ options.setVariableMeasured("Count_Person, Min_Temperature");
+ options.setStartObservationAbout("geoId/06");
+
+ TimeseriesBackfillPipeline.validateOptions(options);
+ }
+
+ @Test
+ public void validateOptions_rejectsBeamRowCaps() {
+ TimeseriesBackfillOptions options =
+ PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class);
+ options.setVariableMeasured("Count_Person");
+ options.setMaxSeriesRows(10);
+
+ try {
+ TimeseriesBackfillPipeline.validateOptions(options);
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Beam row caps are not supported"));
+ }
+ }
+}
diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java
new file mode 100644
index 00000000..c0e0ddbc
--- /dev/null
+++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java
@@ -0,0 +1,204 @@
+package org.datacommons.ingestion.timeseries;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.spanner.Mutation;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
+import org.datacommons.Storage.Observations;
+import org.junit.Test;
+
+public class TimeseriesMutationFactoryTest {
+ @Test
+ public void toTimeSeriesMutation_usesExistingSeriesDcidAndSourceProvenance() {
+ SourceSeriesRow row =
+ new SourceSeriesRow(
+ "geoId/06",
+ "Count_Person",
+ "12345",
+ "P1Y",
+ "CensusACS5yrSurvey",
+ "Person",
+ "1",
+ "CensusACS5YearSurvey",
+ "https://example.com",
+ false,
+ "dc/base/CensusACS5YearSurvey");
+
+ Mutation mutation = TimeseriesMutationFactory.toTimeSeriesMutation(row, "TimeSeries");
+
+ assertEquals("TimeSeries", mutation.getTable());
+ assertEquals("dc/os/Count_Person_geoId_06_12345", mutation.asMap().get("id").getString());
+ assertEquals("dc/base/CensusACS5YearSurvey", mutation.asMap().get("provenance").getString());
+ }
+
+ @Test
+ public void toTimeSeriesAttributeMutations_preservesNormalizedReadAttributes() {
+ SourceSeriesRow row =
+ new SourceSeriesRow(
+ "geoId/06",
+ "Count_Person",
+ "12345",
+ "P1Y",
+ "CensusACS5yrSurvey",
+ "Person",
+ "1",
+ "CensusACS5YearSurvey",
+ "",
+ true,
+ "dc/base/CensusACS5YearSurvey");
+
+ List mutations =
+ TimeseriesMutationFactory.toTimeSeriesAttributeMutations(row, "TimeSeriesAttribute");
+
+ Map properties =
+ mutations.stream()
+ .collect(
+ Collectors.toMap(
+ mutation -> mutation.asMap().get("property").getString(),
+ mutation -> mutation.asMap().get("value").getString()));
+
+ assertEquals("geoId/06", properties.get("observationAbout"));
+ assertEquals("12345", properties.get("facetId"));
+ assertEquals("CensusACS5YearSurvey", properties.get("importName"));
+ assertEquals("P1Y", properties.get("observationPeriod"));
+ assertEquals("CensusACS5yrSurvey", properties.get("measurementMethod"));
+ assertEquals("Person", properties.get("unit"));
+ assertEquals("1", properties.get("scalingFactor"));
+ assertEquals("true", properties.get("isDcAggregate"));
+ assertFalse(properties.containsKey("provenanceUrl"));
+ }
+
+ @Test
+ public void toStatVarObservationMutation_emitsPointMutation() {
+ SourcePointRow row = new SourcePointRow("geoId/06", "Count_Person", "12345", "2024", "123.4");
+
+ Mutation mutation =
+ TimeseriesMutationFactory.toStatVarObservationMutation(row, "StatVarObservation");
+
+ assertEquals("StatVarObservation", mutation.getTable());
+ assertEquals("2024", mutation.asMap().get("date").getString());
+ assertEquals("123.4", mutation.asMap().get("value").getString());
+ }
+
+ @Test
+ public void toMutationGroups_groupsParentAttributesAndPointsFromSingleSourceRow() {
+ SourceSeriesRow seriesRow =
+ new SourceSeriesRow(
+ "geoId/06",
+ "Count_Person",
+ "12345",
+ "P1Y",
+ "CensusACS5yrSurvey",
+ "Person",
+ "1",
+ "CensusACS5YearSurvey",
+ "",
+ true,
+ "dc/base/CensusACS5YearSurvey");
+ List pointRows =
+ List.of(
+ new SourcePointRow("geoId/06", "Count_Person", "12345", "2023", "1"),
+ new SourcePointRow("geoId/06", "Count_Person", "12345", "2024", "2"));
+
+ BackfillMutationGroups mutationGroups =
+ TimeseriesMutationFactory.toMutationGroups(
+ new SourceObservationRow(seriesRow, pointRows),
+ "TimeSeries",
+ "TimeSeriesAttribute",
+ "StatVarObservation");
+
+ assertEquals(1, mutationGroups.groups().size());
+ assertEquals(8, mutationGroups.timeSeriesAttributeRows());
+ assertEquals(2, mutationGroups.statVarObservationRows());
+
+ MutationGroup mutationGroup = mutationGroups.groups().get(0);
+ assertEquals("TimeSeries", mutationGroup.primary().getTable());
+ assertEquals(10, mutationGroup.attached().size());
+ assertTrue(
+ mutationGroup.attached().stream()
+ .anyMatch(mutation -> mutation.getTable().equals("TimeSeriesAttribute")));
+ assertTrue(
+ mutationGroup.attached().stream()
+ .anyMatch(mutation -> mutation.getTable().equals("StatVarObservation")));
+ }
+
+ @Test
+ public void toMutationGroups_splitsLargePointSetsAcrossGroups() {
+ SourceSeriesRow seriesRow =
+ new SourceSeriesRow(
+ "geoId/06",
+ "Count_Person",
+ "12345",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ false,
+ "dc/base/CensusACS5YearSurvey");
+ List pointRows = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ pointRows.add(
+ new SourcePointRow("geoId/06", "Count_Person", "12345", Integer.toString(i), "1"));
+ }
+
+ BackfillMutationGroups mutationGroups =
+ TimeseriesMutationFactory.toMutationGroups(
+ new SourceObservationRow(seriesRow, pointRows),
+ "TimeSeries",
+ "TimeSeriesAttribute",
+ "StatVarObservation");
+
+ assertEquals(2, mutationGroups.groups().size());
+ assertEquals("TimeSeries", mutationGroups.groups().get(0).primary().getTable());
+ assertEquals("TimeSeries", mutationGroups.groups().get(1).primary().getTable());
+ }
+
+ @Test
+ public void toMutationGroups_compactRowAvoidsExpandedPointListInput() {
+ SourceSeriesRow seriesRow =
+ new SourceSeriesRow(
+ "geoId/06",
+ "Count_Person",
+ "12345",
+ "",
+ "",
+ "",
+ "",
+ "TestImport",
+ "",
+ false,
+ "dc/base/TestImport");
+ CompactSourceObservationRow compactRow =
+ new CompactSourceObservationRow(
+ seriesRow,
+ Observations.newBuilder()
+ .putValues("2023", "1")
+ .putValues("2024", "2")
+ .build()
+ .toByteArray());
+
+ BackfillMutationGroups mutationGroups =
+ TimeseriesMutationFactory.toMutationGroups(
+ compactRow, "TimeSeries", "TimeSeriesAttribute", "StatVarObservation");
+
+ assertEquals(1, mutationGroups.groups().size());
+ assertEquals(2, mutationGroups.statVarObservationRows());
+ assertEquals("TimeSeries", mutationGroups.groups().get(0).primary().getTable());
+ }
+
+ @Test
+ public void seriesIdGenerator_replacesSlashesWithUnderscores() {
+ assertEquals(
+ "dc/os/Count_Person_geoId_06_123",
+ SeriesIdGenerator.build("Count_Person", "geoId/06", "123"));
+ assertTrue(SeriesIdGenerator.build("Count/Person", "geoId/06", "123").contains("Count_Person"));
+ }
+}