From 6743118f217dd067f2f1a37b84c6d6a7a18c546a Mon Sep 17 00:00:00 2001 From: Rohit Kumar Date: Thu, 23 Apr 2026 14:45:43 +0000 Subject: [PATCH 1/4] feat: avro-implement timeseries-backfill pipeline for importing and validating observation data --- pipeline/pom.xml | 1 + pipeline/timeseries-backfill/README.md | 241 ++++++++++++++++++ pipeline/timeseries-backfill/pom.xml | 114 +++++++++ .../recreate_timeseries_tables.sh | 69 +++++ .../timeseries/LocalProgressTracker.java | 146 +++++++++++ .../timeseries/ObservationExportFiles.java | 128 ++++++++++ .../timeseries/SeriesIdGenerator.java | 21 ++ .../timeseries/SourceObservationRows.java | 167 ++++++++++++ .../TimeseriesBackfillAvroPipeline.java | 113 ++++++++ .../timeseries/TimeseriesBackfillIO.java | 103 ++++++++ .../TimeseriesBackfillOptionValidator.java | 33 +++ .../timeseries/TimeseriesBackfillOptions.java | 152 +++++++++++ .../TimeseriesBackfillPipeline.java | 170 ++++++++++++ .../timeseries/TimeseriesBackfillQueries.java | 83 ++++++ .../TimeseriesBackfillValidator.java | 144 +++++++++++ .../timeseries/TimeseriesMutationFactory.java | 125 +++++++++ .../timeseries/VariableMeasuredFilters.java | 23 ++ .../timeseries/LocalProgressTrackerTest.java | 58 +++++ .../ObservationExportFilesTest.java | 72 ++++++ .../timeseries/SourceObservationRowsTest.java | 123 +++++++++ .../TimeseriesBackfillAvroPipelineTest.java | 67 +++++ .../TimeseriesBackfillPipelineTest.java | 181 +++++++++++++ .../TimeseriesBackfillQueriesTest.java | 101 ++++++++ .../TimeseriesMutationFactoryTest.java | 170 ++++++++++++ 24 files changed, 2605 insertions(+) create mode 100644 pipeline/timeseries-backfill/README.md create mode 100644 pipeline/timeseries-backfill/pom.xml create mode 100755 pipeline/timeseries-backfill/recreate_timeseries_tables.sh create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/LocalProgressTracker.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/ObservationExportFiles.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SeriesIdGenerator.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillIO.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptionValidator.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptions.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueries.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillValidator.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java create mode 100644 pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/VariableMeasuredFilters.java create mode 100644 pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/LocalProgressTrackerTest.java create mode 100644 pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/ObservationExportFilesTest.java create mode 100644 pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java create mode 100644 pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipelineTest.java create mode 100644 pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipelineTest.java create mode 100644 pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueriesTest.java create mode 100644 pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java diff --git a/pipeline/pom.xml b/pipeline/pom.xml index cab0e1fa..4ca1da2f 100644 --- a/pipeline/pom.xml +++ b/pipeline/pom.xml @@ -22,6 +22,7 @@ differ ingestion spanner + timeseries-backfill util diff --git a/pipeline/timeseries-backfill/README.md b/pipeline/timeseries-backfill/README.md new file mode 100644 index 00000000..ddf9569e --- /dev/null +++ b/pipeline/timeseries-backfill/README.md @@ -0,0 +1,241 @@ +# Timeseries Backfill + +This module backfills the normalized timeseries tables from the legacy `Observation` table in Spanner. + +By default it writes: + +- `TimeSeries_rk` +- `TimeSeriesAttribute_rk` +- `StatVarObservation_rk` + +It does not populate `ObservationAttribute` in v1. + +## Source And Destination Shape + +Source table: + +- `Observation` from [import/pipeline/spanner/src/main/resources/spanner_schema.sql](../spanner/src/main/resources/spanner_schema.sql) + +Destination schema: + +- [rk-experiments/mixer/spanner/bq_spanner_ingestion/timeseries_schema.sql](/home/rohitrkumar_google_com/Documents/dc/github/rohitkumarbhagat/dc_local/rk-experiments/mixer/spanner/bq_spanner_ingestion/timeseries_schema.sql) + +Important assumptions used by this module: + +- `TimeSeries.id` reuses the existing `dc/os/__` series id shape. +- `TimeSeries.provenance` is derived from `import_name` as `dc/base/`. +- `TimeSeriesAttribute` stores the current normalized-read fields: + `observationAbout`, `facetId`, `importName`, `provenanceUrl`, `observationPeriod`, + `measurementMethod`, `unit`, `scalingFactor`, `isDcAggregate`. +- The backfill should run against a fixed source snapshot via `--readTimestamp` when correctness matters. + +## Entry Points + +- `org.datacommons.ingestion.timeseries.TimeseriesBackfillPipeline` + Beam/Dataflow batch job for the full backfill. +- `org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline` + Beam/Dataflow batch job that reads exported `Observation` Avro files and writes the same destination tables. +- `org.datacommons.ingestion.timeseries.TimeseriesBackfillValidator` + Small direct runner for bounded validation on one shard or range. + +The Spanner and Avro Beam entrypoints reuse the same downstream mutation and Spanner write path. + +## Flag Reference + +Core flags: + +- `--project`: GCP project that owns the Spanner instance and database. Prefer this for all runs, especially `DataflowRunner`. +- `--projectId`: Legacy fallback for the same GCP project value. Keep only for compatibility with the older import-pipeline style. +- `--spannerInstanceId`: Spanner instance to read from and write to. +- `--spannerDatabaseId`: Spanner database containing both the source `Observation` table and the destination normalized tables. +- `--sourceObservationTableName`: Source table name. Normally `Observation`. +- `--inputExportDir`: Avro-only. Spanner export directory containing `Observation-manifest.json`. +- `--inputFiles`: Avro-only. Comma-separated exact Avro file paths. +- `--destinationTimeSeriesTableName`: Destination parent series table. Normally `TimeSeries_rk`. +- `--destinationTimeSeriesAttributeTableName`: Destination series-attribute table. Normally `TimeSeriesAttribute_rk`. +- `--destinationStatVarObservationTableName`: Destination point table. Normally `StatVarObservation_rk`. +- `--readTimestamp`: Source read snapshot in RFC3339 format, for example `2026-04-22T00:00:00Z`. When set, the job reads from that exact Spanner snapshot for consistent backfill semantics. When empty, the code uses a Spanner strong read instead. +- `--variableMeasured`: Fixed `variable_measured` filter. Use one stat var or a comma-separated list such as `Count_Person,Min_Temperature`. +- `--startObservationAbout`: Inclusive lower bound on `observation_about`. Useful for sharding by place range. +- `--endObservationAboutExclusive`: Exclusive upper bound on `observation_about`. Use together with `--startObservationAbout` to define one shard. +- `--spannerEmulatorHost`: Optional emulator host such as `localhost:9010`. Leave empty for real Cloud Spanner. + +For the Avro entrypoint, exactly one of `--inputExportDir` or `--inputFiles` is required. + +If `--startObservationAbout` or `--endObservationAboutExclusive` is set, `--variableMeasured` is required. It can still be a comma-separated list. + +Beam pipeline row-limit flags: + +- `--maxSeriesRows`: Currently unsupported for Beam/Dataflow because partitioned Spanner queries cannot use the generated bounded SQL shape. Leave unset for Beam runs. +- `--maxPointRows`: Currently unsupported for Beam/Dataflow because partitioned Spanner queries cannot use the generated bounded SQL shape. Leave unset for Beam runs. + +Local progress flags: + +- `--progressEverySourceRows`: Local progress log interval for validator and `DirectRunner`. Default `1000`. Non-positive disables row-progress logs. +- `--heartbeatSeconds`: Local heartbeat log interval for validator and `DirectRunner`. Default `30`. Non-positive disables heartbeat logs. + +Optional Beam/Dataflow Spanner sink flags: + +- `--batchSizeBytes`: Optional `SpannerIO.Write` batch size in bytes. When unset, Beam uses its default. +- `--maxNumRows`: Optional `SpannerIO.Write` max rows per batch. When unset, Beam uses its default. +- `--maxNumMutations`: Optional `SpannerIO.Write` max mutations per batch. When unset, Beam uses its default. +- `--groupingFactor`: Optional `SpannerIO.Write` grouping factor. When unset, Beam uses its default. +- `--commitDeadlineSeconds`: Optional `SpannerIO.Write` commit deadline in seconds. When unset, Beam uses its default. + +Validator-only flags: + +- `--validatorMaxSeriesRows`: Cap on source series rows for `TimeseriesBackfillValidator`. Non-positive means no limit. + +Beam/Dataflow runner flags used in the examples: + +- `--runner`: Beam runner to use. `DirectRunner` runs locally. `DataflowRunner` submits the Beam job to Dataflow. +- `--region`: Dataflow region. Only needed with `DataflowRunner`. +- `--tempLocation`: GCS path used by Dataflow for temporary files. +- `--stagingLocation`: GCS path used by Dataflow for staged job artifacts. +- `--numWorkers`: Initial Dataflow worker count. +- `--maxNumWorkers`: Maximum Dataflow worker count when autoscaling is enabled. +- `--workerMachineType`: Worker VM shape, for example `n2-custom-4-32768`. +- `--numberOfWorkerHarnessThreads`: Number of SDK harness threads per worker, for example `2`. + +## Flag Guidance + +- Always set `--project`, `--spannerInstanceId`, and `--spannerDatabaseId`. +- Set `--readTimestamp` for any real backfill where you want a stable source snapshot. +- Use `TimeseriesBackfillPipeline` when the source is live Spanner and `TimeseriesBackfillAvroPipeline` when the source is a Spanner Avro export. +- Use `--variableMeasured` for every local or early Dataflow run. It accepts one stat var or a comma-separated list. +- Add `--startObservationAbout` and `--endObservationAboutExclusive` only when you want to shard one stat-var slice by place range. If either range flag is set, `--variableMeasured` is required. +- For the Avro entrypoint, use `--inputExportDir` for a full exported snapshot and `--inputFiles` for targeted reruns or debugging. +- For the Avro entrypoint, `--inputExportDir` reads only `Observation-manifest.json` and only the `Observation.avro-*` files listed there. Other exported table files in the same export directory are ignored. +- `--inputExportDir` always resolves the full `Observation` file list from the manifest. It is not a partial-file selector. +- Use `--inputFiles` when you want to process only a few exact Avro files for a targeted rerun or debugging. +- Avro local runs are supported only through `TimeseriesBackfillAvroPipeline` with `DirectRunner`. There is no separate standalone Avro validator. +- The Avro entrypoint writes directly to Spanner. It does not create new intermediate output files. +- `--variableMeasured`, `--startObservationAbout`, and `--endObservationAboutExclusive` are row-level filters after Avro rows are read. They narrow which rows are written, but they do not prune the Avro file list chosen from `--inputExportDir`. +- Use `TimeseriesBackfillValidator` first when you want a small bounded write. `--validatorMaxSeriesRows` is the only bounded-row flag that is supported today. +- Do not use `--maxSeriesRows` or `--maxPointRows` with Beam/Dataflow. They are intentionally blocked for partitioned Spanner reads. +- For local `DirectRunner`, tune `--progressEverySourceRows` and `--heartbeatSeconds` if you want more visible liveness logs. +- For Dataflow memory experiments, test worker settings separately from backfill query settings. The main ones are `--workerMachineType` and `--numberOfWorkerHarnessThreads`. +- Use the optional Spanner sink flags only when you want to override Beam defaults for the Beam/Dataflow write path. +- The standalone validator accepts those sink flags because it shares the options interface, but its direct `DatabaseClient.writeAtLeastOnce(...)` path does not use them. + +## Spanner Sink Tuning Order + +Suggested order to test if the sink is too memory-heavy or too serialized: + +- `groupingFactor` +- `maxNumRows` +- `maxNumMutations` +- `batchSizeBytes` +- `commitDeadlineSeconds` + +## One-Time Prep + +If you see a missing artifact error for `org.datacommons:datacommons-import-util`, install that top-level module once from the `import/pipeline` directory: + +```bash +mvn -f ../pom.xml -pl util -am install -Pgit-worktree -DskipTests +``` + +## Build + +From the `import/pipeline` directory: + +```bash +mvn -pl timeseries-backfill -am package -Pgit-worktree -DskipTests +``` + +## Validate Locally + +Run the standalone validator from the `import/pipeline` directory: + +```bash +mvn -Pgit-worktree compile exec:java \ + -pl timeseries-backfill -am \ + -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillValidator \ + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --sourceObservationTableName=Observation --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --startObservationAbout=geoId/06 --endObservationAboutExclusive=geoId/07 --validatorMaxSeriesRows=1000" +``` + +The validator uses the same single-read row mapping logic as the Beam pipeline and writes bounded batches directly to Spanner. + +During local validation, the default progress logs print: + +- a heartbeat every `30` seconds +- a row-progress log every `1000` source rows +- a validator flush log on each write batch + +Use the validator first when you want a small bounded write without running Beam. + +## Run With DirectRunner + +```bash +mvn -Pgit-worktree compile exec:java \ + -pl timeseries-backfill -am \ + -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillPipeline \ + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --sourceObservationTableName=Observation --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --startObservationAbout=geoId/06 --endObservationAboutExclusive=geoId/07 --runner=DirectRunner" +``` + +`DirectRunner` still runs the Beam job locally. For a small bounded run, use the validator instead of Beam row-cap flags, and pair any observation_about range with `--variableMeasured`. + +The Beam/Dataflow path reads the raw `observations` proto value from `Observation` and expands it inside Beam. The validator uses that same source-read and expansion path. + +For local `DirectRunner`, the default progress logs print: + +- a heartbeat every `30` seconds +- a row-progress log every `1000` source rows + +Set `--progressEverySourceRows` or `--heartbeatSeconds` if you want a different local signal. These flags are ignored for `DataflowRunner`. + +## Run Avro Export With DirectRunner + +```bash +mvn -Pgit-worktree compile exec:java \ + -pl timeseries-backfill -am \ + -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs:/// --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --runner=DirectRunner" +``` + +The Avro entrypoint reads exported `Observation` Avro rows, recomputes `provenance` from `import_name`, parses `observations` from the serialized proto payload, and then reuses the same normalized write path as the Spanner entrypoint. + +For Avro input selection: + +- `--inputExportDir` should point to the exact export subdirectory that contains `Observation-manifest.json`. +- `--inputExportDir` processes all `Observation.avro-*` files listed in that manifest. +- Use `--inputFiles` instead if you want to process only a few exact Avro files. +- `--variableMeasured` and the observation-about range flags still apply, but only after the selected Avro files are opened and rows are read. + +## Run On Dataflow + +```bash +mvn -Pgit-worktree compile exec:java \ + -pl timeseries-backfill -am \ + -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillPipeline \ + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --sourceObservationTableName=Observation --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --startObservationAbout=geoId/06 --endObservationAboutExclusive=geoId/07 --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2" +``` + +Use `--variableMeasured` together with `--startObservationAbout` and `--endObservationAboutExclusive` to shard Beam/Dataflow runs safely. + +Do not force the `ObservationAboutVariableMeasured` index for the Beam/Dataflow queries in their current form. The backfill selects columns that are not stored in that index, so forcing it can introduce a back join and make the query non-root-partitionable. + +## Run Avro Export On Dataflow + +```bash +mvn -Pgit-worktree compile exec:java \ + -pl timeseries-backfill -am \ + -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/ --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2" +``` + +## Recreate Destination Tables + +Run this from the `import` repo root to drop any existing experimental normalized `_rk` tables and recreate them from the checked-in schema: + +```bash +./pipeline/timeseries-backfill/recreate_timeseries_tables.sh datcom-store dc-kg-test dc_graph_5 +``` + +The script reads the current database DDL, drops the normalized `_rk` tables and indexes if they already exist, and then reapplies a suffixed form of [timeseries_schema.sql](/home/rohitrkumar_google_com/Documents/dc/github/rohitkumarbhagat/dc_local/rk-experiments/mixer/spanner/bq_spanner_ingestion/timeseries_schema.sql). + +## Notes + +- For the Avro export path, `Observation.provenance` should not be relied on as an exported populated value because it is a stored generated column in the deployed schema. This module recomputes it from `import_name`. +- The default experimental destination schema should include `TimeSeriesAttributePropertyValue_rk` and `TimeSeriesAttributeValue_rk` before using the normalized Mixer query path. diff --git a/pipeline/timeseries-backfill/pom.xml b/pipeline/timeseries-backfill/pom.xml new file mode 100644 index 00000000..5c3f1b9d --- /dev/null +++ b/pipeline/timeseries-backfill/pom.xml @@ -0,0 +1,114 @@ + + + 4.0.0 + + + org.datacommons + pipeline + ${revision} + + + org.datacommons + timeseries-backfill + ${revision} + Data Commons - Timeseries Backfill + + + + spanner + org.datacommons + ${revision} + + + data + org.datacommons + ${revision} + + + org.apache.beam + beam-sdks-java-core + + + org.apache.beam + beam-sdks-java-io-google-cloud-platform + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + + + org.apache.beam + beam-runners-direct-java + + + org.apache.beam + beam-sdks-java-extensions-avro + + + com.google.cloud + google-cloud-spanner + + + com.google.code.gson + gson + + + org.slf4j + slf4j-jdk14 + ${slf4j.version} + + + junit + junit + ${junit.version} + test + + + + + + + org.codehaus.mojo + exec-maven-plugin + + false + false + + + + org.apache.maven.plugins + maven-shade-plugin + 3.5.3 + + + package + + shade + + + ${project.artifactId}-bundled-${project.version} + + + org.datacommons.ingestion.timeseries.TimeseriesBackfillPipeline + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + diff --git a/pipeline/timeseries-backfill/recreate_timeseries_tables.sh b/pipeline/timeseries-backfill/recreate_timeseries_tables.sh new file mode 100755 index 00000000..6ee0a86a --- /dev/null +++ b/pipeline/timeseries-backfill/recreate_timeseries_tables.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [[ $# -ne 3 ]]; then + echo "Usage: $0 " >&2 + exit 1 +fi + +project_id="$1" +instance_id="$2" +database_id="$3" + +script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +schema_file="${script_dir}/../../../rk-experiments/mixer/spanner/bq_spanner_ingestion/timeseries_schema.sql" +schema_with_suffix_file="$(mktemp)" + +if [[ ! -f "${schema_file}" ]]; then + echo "Schema file not found: ${schema_file}" >&2 + exit 1 +fi + +current_schema_file="$(mktemp)" +drop_ddl_file="$(mktemp)" +trap 'rm -f "${current_schema_file}" "${drop_ddl_file}" "${schema_with_suffix_file}"' EXIT + +perl -0pe ' + s/\bTimeSeriesAttributePropertyValue\b/TimeSeriesAttributePropertyValue_rk/g; + s/\bTimeSeriesAttributeValue\b/TimeSeriesAttributeValue_rk/g; + s/\bTimeSeriesByProvenance\b/TimeSeriesByProvenance_rk/g; + s/\bTimeSeriesByVariableMeasured\b/TimeSeriesByVariableMeasured_rk/g; + s/\bObservationAttribute\b/ObservationAttribute_rk/g; + s/\bStatVarObservation\b/StatVarObservation_rk/g; + s/\bTimeSeriesAttribute\b/TimeSeriesAttribute_rk/g; + s/\bTimeSeries\b/TimeSeries_rk/g; +' "${schema_file}" > "${schema_with_suffix_file}" + +gcloud spanner databases ddl describe "${database_id}" \ + --project="${project_id}" \ + --instance="${instance_id}" \ + > "${current_schema_file}" + +append_if_present() { + local pattern="$1" + local ddl="$2" + if rg -q "${pattern}" "${current_schema_file}"; then + printf '%s\n' "${ddl}" >> "${drop_ddl_file}" + fi +} + +append_if_present "CREATE INDEX TimeSeriesByProvenance_rk " "DROP INDEX TimeSeriesByProvenance_rk" +append_if_present "CREATE INDEX TimeSeriesByVariableMeasured_rk " "DROP INDEX TimeSeriesByVariableMeasured_rk" +append_if_present "CREATE INDEX TimeSeriesAttributePropertyValue_rk " "DROP INDEX TimeSeriesAttributePropertyValue_rk" +append_if_present "CREATE INDEX TimeSeriesAttributeValue_rk " "DROP INDEX TimeSeriesAttributeValue_rk" +append_if_present "CREATE TABLE ObservationAttribute_rk " "DROP TABLE ObservationAttribute_rk" +append_if_present "CREATE TABLE StatVarObservation_rk " "DROP TABLE StatVarObservation_rk" +append_if_present "CREATE TABLE TimeSeriesAttribute_rk " "DROP TABLE TimeSeriesAttribute_rk" +append_if_present "CREATE TABLE TimeSeries_rk " "DROP TABLE TimeSeries_rk" + +if [[ -s "${drop_ddl_file}" ]]; then + gcloud spanner databases ddl update "${database_id}" \ + --project="${project_id}" \ + --instance="${instance_id}" \ + --ddl-file="${drop_ddl_file}" +fi + +gcloud spanner databases ddl update "${database_id}" \ + --project="${project_id}" \ + --instance="${instance_id}" \ + --ddl-file="${schema_with_suffix_file}" diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/LocalProgressTracker.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/LocalProgressTracker.java new file mode 100644 index 00000000..d717b498 --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/LocalProgressTracker.java @@ -0,0 +1,146 @@ +package org.datacommons.ingestion.timeseries; + +import com.google.cloud.Timestamp; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; +import org.slf4j.Logger; + +/** Emits local progress logs for validator and DirectRunner runs. */ +final class LocalProgressTracker implements AutoCloseable { + interface LogSink { + void info(String message); + } + + private final String runnerName; + private final int progressEverySourceRows; + private final int heartbeatSeconds; + private final LogSink logSink; + private final LongSupplier currentTimeMillis; + private final ScheduledExecutorService heartbeatExecutor; + private final AtomicLong sourceRows; + private final AtomicLong timeSeriesRows; + private final AtomicLong timeSeriesAttributeRows; + private final AtomicLong statVarObservationRows; + private final AtomicLong lastRowProgressMillis; + private final AtomicBoolean closed; + + LocalProgressTracker( + String runnerName, int progressEverySourceRows, int heartbeatSeconds, Logger logger) { + this( + runnerName, + progressEverySourceRows, + heartbeatSeconds, + logger::info, + System::currentTimeMillis, + createHeartbeatExecutor(runnerName, heartbeatSeconds)); + } + + LocalProgressTracker( + String runnerName, + int progressEverySourceRows, + int heartbeatSeconds, + LogSink logSink, + LongSupplier currentTimeMillis, + ScheduledExecutorService heartbeatExecutor) { + this.runnerName = runnerName; + this.progressEverySourceRows = progressEverySourceRows; + this.heartbeatSeconds = heartbeatSeconds; + this.logSink = logSink; + this.currentTimeMillis = currentTimeMillis; + this.heartbeatExecutor = heartbeatExecutor; + this.sourceRows = new AtomicLong(); + this.timeSeriesRows = new AtomicLong(); + this.timeSeriesAttributeRows = new AtomicLong(); + this.statVarObservationRows = new AtomicLong(); + this.lastRowProgressMillis = new AtomicLong(currentTimeMillis.getAsLong()); + this.closed = new AtomicBoolean(); + + if (heartbeatExecutor != null) { + heartbeatExecutor.scheduleAtFixedRate( + this::logHeartbeatNow, heartbeatSeconds, heartbeatSeconds, TimeUnit.SECONDS); + } + } + + boolean isEnabled() { + return progressEverySourceRows > 0 || heartbeatSeconds > 0; + } + + void recordRow(int timeSeriesAttributeRowsDelta, int statVarObservationRowsDelta) { + long sourceRowCount = sourceRows.incrementAndGet(); + timeSeriesRows.incrementAndGet(); + timeSeriesAttributeRows.addAndGet(timeSeriesAttributeRowsDelta); + statVarObservationRows.addAndGet(statVarObservationRowsDelta); + lastRowProgressMillis.set(currentTimeMillis.getAsLong()); + + if (progressEverySourceRows > 0 && sourceRowCount % progressEverySourceRows == 0) { + logSink.info(buildProgressMessage("progress")); + } + } + + void recordValidatorFlush(int mutationCount, Timestamp writeTimestamp) { + logSink.info( + runnerName + " flush: mutations=" + mutationCount + ", write_timestamp=" + writeTimestamp); + } + + void logHeartbeatNow() { + if (heartbeatSeconds <= 0 || closed.get()) { + return; + } + logSink.info(buildProgressMessage("heartbeat")); + } + + @Override + public void close() { + if (!closed.compareAndSet(false, true)) { + return; + } + if (heartbeatExecutor != null) { + heartbeatExecutor.shutdownNow(); + } + } + + private String buildProgressMessage(String kind) { + long sourceRowCount = sourceRows.get(); + long idleSeconds = + TimeUnit.MILLISECONDS.toSeconds( + currentTimeMillis.getAsLong() - lastRowProgressMillis.get()); + StringBuilder message = + new StringBuilder() + .append(runnerName) + .append(' ') + .append(kind) + .append(": source_rows=") + .append(sourceRowCount) + .append(", timeseries_rows=") + .append(timeSeriesRows.get()) + .append(", timeseries_attribute_rows=") + .append(timeSeriesAttributeRows.get()) + .append(", stat_var_observation_rows=") + .append(statVarObservationRows.get()) + .append(", seconds_since_last_row=") + .append(idleSeconds); + if (sourceRowCount == 0) { + message.append(", no_source_rows_yet=true"); + } + return message.toString(); + } + + private static ScheduledExecutorService createHeartbeatExecutor( + String runnerName, int heartbeatSeconds) { + if (heartbeatSeconds <= 0) { + return null; + } + ThreadFactory threadFactory = + runnable -> { + Thread thread = new Thread(runnable, runnerName + "-progress-heartbeat"); + thread.setDaemon(true); + return thread; + }; + return Executors.newSingleThreadScheduledExecutor(threadFactory); + } +} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/ObservationExportFiles.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/ObservationExportFiles.java new file mode 100644 index 00000000..02241f0a --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/ObservationExportFiles.java @@ -0,0 +1,128 @@ +package org.datacommons.ingestion.timeseries; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; + +/** Resolves Avro export files from either an export directory or an explicit file list. */ +final class ObservationExportFiles { + private ObservationExportFiles() {} + + static void validateOptions(TimeseriesBackfillOptions options) { + boolean hasInputExportDir = !options.getInputExportDir().isEmpty(); + boolean hasInputFiles = !options.getInputFiles().isEmpty(); + if (hasInputExportDir == hasInputFiles) { + throw new IllegalArgumentException( + "Exactly one of inputExportDir or inputFiles must be provided for the Avro pipeline."); + } + } + + static List resolveInputFiles(TimeseriesBackfillOptions options) { + if (!options.getInputFiles().isEmpty()) { + return parseCsv(options.getInputFiles()); + } + return readManifest( + options, options.getInputExportDir(), options.getSourceObservationTableName()); + } + + private static List readManifest( + TimeseriesBackfillOptions options, String exportDir, String sourceTableName) { + FileSystems.setDefaultPipelineOptions(options); + String normalizedExportDir = trimTrailingSlash(exportDir); + String manifestPath = normalizedExportDir + "/" + sourceTableName + "-manifest.json"; + ResourceId manifestResource; + try { + manifestResource = FileSystems.matchSingleFileSpec(manifestPath).resourceId(); + } catch (IOException e) { + throw new RuntimeException("Failed to find export manifest at " + manifestPath, e); + } + + try (Reader reader = + Channels.newReader(FileSystems.open(manifestResource), StandardCharsets.UTF_8.name())) { + JsonElement root = JsonParser.parseReader(reader); + List files = parseManifest(root, normalizedExportDir, sourceTableName); + if (!files.isEmpty()) { + return files; + } + throw new IllegalArgumentException( + "No Avro files for " + sourceTableName + " were found in " + manifestPath); + } catch (IOException e) { + throw new RuntimeException("Failed to read export manifest at " + manifestPath, e); + } + } + + static List parseManifest(JsonElement root, String exportDir, String sourceTableName) { + Pattern filePattern = + Pattern.compile("(^|.*/)" + Pattern.quote(sourceTableName) + "\\.avro-\\d{5}-of-\\d{5}$"); + Set files = new LinkedHashSet<>(); + collectFiles(root, exportDir, filePattern, files); + return new ArrayList<>(files); + } + + private static void collectFiles( + JsonElement element, String exportDir, Pattern filePattern, Set files) { + if (element == null || element.isJsonNull()) { + return; + } + if (element.isJsonPrimitive() && element.getAsJsonPrimitive().isString()) { + String value = element.getAsString(); + if (filePattern.matcher(value).find()) { + files.add(toAbsolutePath(exportDir, value)); + } + return; + } + if (element.isJsonArray()) { + for (JsonElement child : element.getAsJsonArray()) { + collectFiles(child, exportDir, filePattern, files); + } + return; + } + JsonObject object = element.getAsJsonObject(); + for (String key : object.keySet()) { + collectFiles(object.get(key), exportDir, filePattern, files); + } + } + + private static List parseCsv(String csv) { + List fileSpecs = new ArrayList<>(); + for (String part : csv.split(",")) { + String trimmed = part.trim(); + if (!trimmed.isEmpty()) { + fileSpecs.add(trimmed); + } + } + if (!fileSpecs.isEmpty()) { + return fileSpecs; + } + throw new IllegalArgumentException("inputFiles must contain at least one Avro file path."); + } + + private static String toAbsolutePath(String exportDir, String value) { + if (value.contains("://") || value.startsWith("/")) { + return value; + } + String relativePath = value.startsWith("./") ? value.substring(2) : value; + while (relativePath.startsWith("/")) { + relativePath = relativePath.substring(1); + } + return exportDir + "/" + relativePath; + } + + private static String trimTrailingSlash(String value) { + if (value.endsWith("/")) { + return value.substring(0, value.length() - 1); + } + return value; + } +} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SeriesIdGenerator.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SeriesIdGenerator.java new file mode 100644 index 00000000..3bc059c4 --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SeriesIdGenerator.java @@ -0,0 +1,21 @@ +package org.datacommons.ingestion.timeseries; + +/** Builds the existing dc/os series identifier used as the normalized series id. */ +final class SeriesIdGenerator { + private static final String PREFIX = "dc/os/"; + + private SeriesIdGenerator() {} + + static String build(String variableMeasured, String observationAbout, String facetId) { + return PREFIX + + sanitize(variableMeasured) + + "_" + + sanitize(observationAbout) + + "_" + + sanitize(facetId); + } + + private static String sanitize(String value) { + return value == null ? "" : value.replace('/', '_'); + } +} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java new file mode 100644 index 00000000..96edfd93 --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java @@ -0,0 +1,167 @@ +package org.datacommons.ingestion.timeseries; + +import com.google.cloud.ByteArray; +import com.google.cloud.spanner.Struct; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.datacommons.Storage.Observations; + +/** Row shapes used by the timeseries backfill. */ +final class SourceObservationRows { + private SourceObservationRows() {} + + static SourceObservationRow toObservationRow(Struct row) { + return buildObservationRow( + row.getString("observation_about"), + row.getString("variable_measured"), + row.getString("facet_id"), + getNullableString(row, "observation_period"), + getNullableString(row, "measurement_method"), + getNullableString(row, "unit"), + getNullableString(row, "scaling_factor"), + getNullableString(row, "import_name"), + getNullableString(row, "provenance_url"), + !row.isNull("is_dc_aggregate") && row.getBoolean("is_dc_aggregate"), + parseObservations(row)); + } + + static SourceObservationRow toObservationRow(GenericRecord row) { + return buildObservationRow( + getNullableString(row, "observation_about"), + getNullableString(row, "variable_measured"), + getNullableString(row, "facet_id"), + getNullableString(row, "observation_period"), + getNullableString(row, "measurement_method"), + getNullableString(row, "unit"), + getNullableString(row, "scaling_factor"), + getNullableString(row, "import_name"), + getNullableString(row, "provenance_url"), + getNullableBoolean(row, "is_dc_aggregate"), + parseObservations(row)); + } + + private static SourceObservationRow buildObservationRow( + String observationAbout, + String variableMeasured, + String facetId, + String observationPeriod, + String measurementMethod, + String unit, + String scalingFactor, + String importName, + String provenanceUrl, + boolean isDcAggregate, + Observations observations) { + SourceSeriesRow seriesRow = + new SourceSeriesRow( + observationAbout, + variableMeasured, + facetId, + observationPeriod, + measurementMethod, + unit, + scalingFactor, + importName, + provenanceUrl, + isDcAggregate, + deriveProvenance(importName)); + List pointRows = new ArrayList<>(); + for (Map.Entry entry : observations.getValuesMap().entrySet()) { + pointRows.add( + new SourcePointRow( + observationAbout, variableMeasured, facetId, entry.getKey(), entry.getValue())); + } + return new SourceObservationRow(seriesRow, pointRows); + } + + private static String getNullableString(Struct row, String columnName) { + return row.isNull(columnName) ? "" : row.getString(columnName); + } + + private static String getNullableString(GenericRecord row, String fieldName) { + Object value = getField(row, fieldName); + return value == null ? "" : value.toString(); + } + + private static boolean getNullableBoolean(GenericRecord row, String fieldName) { + Object value = getField(row, fieldName); + return value instanceof Boolean && (Boolean) value; + } + + private static Observations parseObservations(Struct row) { + if (row.isNull("observations")) { + return Observations.getDefaultInstance(); + } + ByteArray protoBytes = row.getBytes("observations"); + return parseObservations(protoBytes.toByteArray()); + } + + private static Observations parseObservations(GenericRecord row) { + Object value = getField(row, "observations"); + if (value == null) { + return Observations.getDefaultInstance(); + } + return parseObservations(toByteArray(value)); + } + + private static Observations parseObservations(byte[] protoBytes) { + try { + return Observations.parseFrom(protoBytes); + } catch (Exception e) { + throw new RuntimeException("Failed to parse observations proto", e); + } + } + + private static byte[] toByteArray(Object value) { + if (value instanceof ByteBuffer byteBuffer) { + ByteBuffer duplicate = byteBuffer.duplicate(); + byte[] bytes = new byte[duplicate.remaining()]; + duplicate.get(bytes); + return bytes; + } + if (value instanceof byte[] bytes) { + return bytes; + } + if (value instanceof GenericData.Fixed fixed) { + return fixed.bytes(); + } + throw new IllegalArgumentException("Unsupported observations Avro type: " + value.getClass()); + } + + private static Object getField(GenericRecord row, String fieldName) { + if (row.getSchema().getField(fieldName) == null) { + return null; + } + return row.get(fieldName); + } + + private static String deriveProvenance(String importName) { + return importName == null || importName.isEmpty() ? "" : "dc/base/" + importName; + } +} + +record SourceObservationRow(SourceSeriesRow seriesRow, List pointRows) + implements Serializable {} + +record SourceSeriesRow( + String observationAbout, + String variableMeasured, + String facetId, + String observationPeriod, + String measurementMethod, + String unit, + String scalingFactor, + String importName, + String provenanceUrl, + boolean isDcAggregate, + String provenance) + implements Serializable {} + +record SourcePointRow( + String observationAbout, String variableMeasured, String facetId, String date, String value) + implements Serializable {} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java new file mode 100644 index 00000000..a30c62ec --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java @@ -0,0 +1,113 @@ +package org.datacommons.ingestion.timeseries; + +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.extensions.avro.io.AvroIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Beam/Dataflow pipeline that backfills the normalized timeseries tables from Avro exports. */ +public class TimeseriesBackfillAvroPipeline { + private static final Logger LOGGER = + LoggerFactory.getLogger(TimeseriesBackfillAvroPipeline.class); + + public static void main(String[] args) { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(TimeseriesBackfillOptions.class); + validateOptions(options); + + List inputFiles = ObservationExportFiles.resolveInputFiles(options); + LOGGER.info( + "Starting timeseries Avro backfill with runner {} over {} Avro files", + TimeseriesBackfillPipeline.runnerName(options), + inputFiles.size()); + + try (LocalProgressTracker progressTracker = + TimeseriesBackfillPipeline.createLocalProgressTracker(options)) { + TimeseriesBackfillPipeline.setLocalProgressTracker(progressTracker); + Pipeline pipeline = Pipeline.create(options); + buildPipeline(pipeline, options, inputFiles); + PipelineResult result = pipeline.run(); + LOGGER.info("Timeseries Avro backfill returned runner {}", result.getClass().getSimpleName()); + } finally { + TimeseriesBackfillPipeline.clearLocalProgressTracker(); + } + } + + static void validateOptions(TimeseriesBackfillOptions options) { + TimeseriesBackfillPipeline.validateOptions(options); + ObservationExportFiles.validateOptions(options); + } + + static void buildPipeline( + Pipeline pipeline, TimeseriesBackfillOptions options, List inputFiles) { + PCollection sourceRows = + pipeline + .apply("CreateAvroFileSpecs", Create.of(inputFiles)) + .apply( + "ReadAvroSourceRows", + AvroIO.parseAllGenericRecords(SourceObservationRows::toObservationRow) + .withCoder(SerializableCoder.of(SourceObservationRow.class))) + .apply( + "FilterSourceRows", + ParDo.of( + new FilterSourceObservationRowsFn( + options.getStartObservationAbout(), + options.getEndObservationAboutExclusive(), + VariableMeasuredFilters.parse(options.getVariableMeasured())))); + sourceRows.setCoder(SerializableCoder.of(SourceObservationRow.class)); + TimeseriesBackfillPipeline.buildPipelineFromSourceRows(sourceRows, options); + } + + static boolean matchesFilters( + SourceSeriesRow seriesRow, + String startObservationAbout, + String endObservationAboutExclusive, + List variableMeasuredFilters) { + if (!startObservationAbout.isEmpty() + && seriesRow.observationAbout().compareTo(startObservationAbout) < 0) { + return false; + } + if (!endObservationAboutExclusive.isEmpty() + && seriesRow.observationAbout().compareTo(endObservationAboutExclusive) >= 0) { + return false; + } + return variableMeasuredFilters.isEmpty() + || variableMeasuredFilters.contains(seriesRow.variableMeasured()); + } + + static final class FilterSourceObservationRowsFn + extends DoFn { + private final String startObservationAbout; + private final String endObservationAboutExclusive; + private final List variableMeasuredFilters; + + FilterSourceObservationRowsFn( + String startObservationAbout, + String endObservationAboutExclusive, + List variableMeasuredFilters) { + this.startObservationAbout = startObservationAbout; + this.endObservationAboutExclusive = endObservationAboutExclusive; + this.variableMeasuredFilters = variableMeasuredFilters; + } + + @ProcessElement + public void processElement( + @Element SourceObservationRow sourceRow, OutputReceiver out) { + if (matchesFilters( + sourceRow.seriesRow(), + startObservationAbout, + endObservationAboutExclusive, + variableMeasuredFilters)) { + out.output(sourceRow); + } + } + } +} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillIO.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillIO.java new file mode 100644 index 00000000..d35d2106 --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillIO.java @@ -0,0 +1,103 @@ +package org.datacommons.ingestion.timeseries; + +import com.google.cloud.NoCredentials; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.TimestampBound; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.joda.time.Duration; + +/** Shared Spanner IO configuration for the backfill job and validator. */ +final class TimeseriesBackfillIO { + // Previous sink tuning values kept here for reference only: + // private static final int SPANNER_BATCH_SIZE_BYTES = 500 * 1024; + // private static final int SPANNER_MAX_NUM_ROWS = 2000; + // private static final int SPANNER_MAX_NUM_MUTATIONS = 10000; + // private static final int SPANNER_GROUPING_FACTOR = 3000; + // private static final int SPANNER_COMMIT_DEADLINE_SECONDS = 120; + + private TimeseriesBackfillIO() {} + + static String resolveProjectId(TimeseriesBackfillOptions options) { + if (options.getProject() != null && !options.getProject().isEmpty()) { + return options.getProject(); + } + return options.getProjectId(); + } + + static SpannerIO.CreateTransaction buildReadTransaction(TimeseriesBackfillOptions options) { + SpannerIO.CreateTransaction read = + SpannerIO.createTransaction() + .withProjectId(resolveProjectId(options)) + .withInstanceId(options.getSpannerInstanceId()) + .withDatabaseId(options.getSpannerDatabaseId()) + .withTimestampBound(getTimestampBound(options)); + if (!options.getSpannerEmulatorHost().isEmpty()) { + read = read.withEmulatorHost(options.getSpannerEmulatorHost()); + } + return read; + } + + static SpannerIO.Read buildRead(TimeseriesBackfillOptions options) { + SpannerIO.Read read = + SpannerIO.read() + .withProjectId(resolveProjectId(options)) + .withInstanceId(options.getSpannerInstanceId()) + .withDatabaseId(options.getSpannerDatabaseId()); + if (!options.getSpannerEmulatorHost().isEmpty()) { + read = read.withEmulatorHost(options.getSpannerEmulatorHost()); + } + return read; + } + + static SpannerIO.Write buildWrite(TimeseriesBackfillOptions options) { + SpannerIO.Write write = + SpannerIO.write() + .withProjectId(resolveProjectId(options)) + .withInstanceId(options.getSpannerInstanceId()) + .withDatabaseId(options.getSpannerDatabaseId()); + if (!options.getSpannerEmulatorHost().isEmpty()) { + write = write.withEmulatorHost(options.getSpannerEmulatorHost()); + } + if (options.getBatchSizeBytes() != null) { + write = write.withBatchSizeBytes(options.getBatchSizeBytes()); + } + if (options.getMaxNumRows() != null) { + write = write.withMaxNumRows(options.getMaxNumRows()); + } + if (options.getGroupingFactor() != null) { + write = write.withGroupingFactor(options.getGroupingFactor()); + } + if (options.getMaxNumMutations() != null) { + write = write.withMaxNumMutations(options.getMaxNumMutations()); + } + if (options.getCommitDeadlineSeconds() != null) { + write = + write.withCommitDeadline(Duration.standardSeconds(options.getCommitDeadlineSeconds())); + } + return write; + } + + static SpannerIO.WriteGrouped buildGroupedWrite(TimeseriesBackfillOptions options) { + return new SpannerIO.WriteGrouped(buildWrite(options)); + } + + static TimestampBound getTimestampBound(TimeseriesBackfillOptions options) { + if (options.getReadTimestamp().isEmpty()) { + return TimestampBound.strong(); + } + return TimestampBound.ofReadTimestamp(Timestamp.parseTimestamp(options.getReadTimestamp())); + } + + static Spanner createSpannerService(TimeseriesBackfillOptions options) { + SpannerOptions.Builder builder = + SpannerOptions.newBuilder().setProjectId(resolveProjectId(options)); + if (!options.getSpannerEmulatorHost().isEmpty()) { + builder + .setEmulatorHost(options.getSpannerEmulatorHost()) + .setCredentials(NoCredentials.getInstance()); + } + return builder.build().getService(); + } +} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptionValidator.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptionValidator.java new file mode 100644 index 00000000..38df97f5 --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptionValidator.java @@ -0,0 +1,33 @@ +package org.datacommons.ingestion.timeseries; + +/** Shared validation for backfill options. */ +final class TimeseriesBackfillOptionValidator { + private TimeseriesBackfillOptionValidator() {} + + static void validateCommonOptions(TimeseriesBackfillOptions options) { + if ((!options.getStartObservationAbout().isEmpty() + || !options.getEndObservationAboutExclusive().isEmpty()) + && VariableMeasuredFilters.parse(options.getVariableMeasured()).isEmpty()) { + throw new IllegalArgumentException( + "variableMeasured is required when startObservationAbout or " + + "endObservationAboutExclusive is provided."); + } + validateOptionalPositiveLong(options.getBatchSizeBytes(), "batchSizeBytes"); + validateOptionalPositiveInteger(options.getMaxNumRows(), "maxNumRows"); + validateOptionalPositiveInteger(options.getMaxNumMutations(), "maxNumMutations"); + validateOptionalPositiveInteger(options.getGroupingFactor(), "groupingFactor"); + validateOptionalPositiveInteger(options.getCommitDeadlineSeconds(), "commitDeadlineSeconds"); + } + + private static void validateOptionalPositiveLong(Long value, String optionName) { + if (value != null && value <= 0) { + throw new IllegalArgumentException(optionName + " must be positive when provided."); + } + } + + private static void validateOptionalPositiveInteger(Integer value, String optionName) { + if (value != null && value <= 0) { + throw new IllegalArgumentException(optionName + " must be positive when provided."); + } + } +} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptions.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptions.java new file mode 100644 index 00000000..4d3002b2 --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillOptions.java @@ -0,0 +1,152 @@ +package org.datacommons.ingestion.timeseries; + +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; + +/** Options shared by the Beam backfill and the standalone validator. */ +public interface TimeseriesBackfillOptions extends GcpOptions { + @Description("Legacy fallback for GCP project id. Prefer --project.") + @Default.String("datcom-store") + String getProjectId(); + + void setProjectId(String projectId); + + @Description("Spanner instance id") + @Default.String("dc-kg-test") + String getSpannerInstanceId(); + + void setSpannerInstanceId(String instanceId); + + @Description("Spanner database id") + @Default.String("dc_graph_5") + String getSpannerDatabaseId(); + + void setSpannerDatabaseId(String databaseId); + + @Description("Spanner emulator host") + @Default.String("") + String getSpannerEmulatorHost(); + + void setSpannerEmulatorHost(String emulatorHost); + + @Description("Source Observation table name") + @Default.String("Observation") + String getSourceObservationTableName(); + + void setSourceObservationTableName(String tableName); + + @Description( + "Input Spanner export directory containing Observation-manifest.json for the Avro entrypoint") + @Default.String("") + String getInputExportDir(); + + void setInputExportDir(String inputExportDir); + + @Description("Comma-separated Avro file paths for the Avro entrypoint") + @Default.String("") + String getInputFiles(); + + void setInputFiles(String inputFiles); + + @Description("Destination TimeSeries table name") + @Default.String("TimeSeries_rk") + String getDestinationTimeSeriesTableName(); + + void setDestinationTimeSeriesTableName(String tableName); + + @Description("Destination TimeSeriesAttribute table name") + @Default.String("TimeSeriesAttribute_rk") + String getDestinationTimeSeriesAttributeTableName(); + + void setDestinationTimeSeriesAttributeTableName(String tableName); + + @Description("Destination StatVarObservation table name") + @Default.String("StatVarObservation_rk") + String getDestinationStatVarObservationTableName(); + + void setDestinationStatVarObservationTableName(String tableName); + + @Description( + "Read timestamp in RFC3339 format for a consistent source snapshot. Empty uses a strong read.") + @Default.String("") + String getReadTimestamp(); + + void setReadTimestamp(String readTimestamp); + + @Description("Inclusive lower bound for source observation_about") + @Default.String("") + String getStartObservationAbout(); + + void setStartObservationAbout(String startObservationAbout); + + @Description("Exclusive upper bound for source observation_about") + @Default.String("") + String getEndObservationAboutExclusive(); + + void setEndObservationAboutExclusive(String endObservationAboutExclusive); + + @Description( + "Fixed source variable_measured filter. Accepts one value or a comma-separated list.") + @Default.String("") + String getVariableMeasured(); + + void setVariableMeasured(String variableMeasured); + + @Description("Log local progress every N source rows. Non-positive disables row-progress logs.") + @Default.Integer(1000) + int getProgressEverySourceRows(); + + void setProgressEverySourceRows(int progressEverySourceRows); + + @Description("Emit a local heartbeat log every N seconds. Non-positive disables heartbeats.") + @Default.Integer(30) + int getHeartbeatSeconds(); + + void setHeartbeatSeconds(int heartbeatSeconds); + + @Description("Optional Spanner sink batch size in bytes. When unset, Beam uses its default.") + Long getBatchSizeBytes(); + + void setBatchSizeBytes(Long batchSizeBytes); + + @Description("Optional Spanner sink max rows per batch. When unset, Beam uses its default.") + Integer getMaxNumRows(); + + void setMaxNumRows(Integer maxNumRows); + + @Description("Optional Spanner sink max mutations per batch. When unset, Beam uses its default.") + Integer getMaxNumMutations(); + + void setMaxNumMutations(Integer maxNumMutations); + + @Description("Optional Spanner sink grouping factor. When unset, Beam uses its default.") + Integer getGroupingFactor(); + + void setGroupingFactor(Integer groupingFactor); + + @Description( + "Optional Spanner sink commit deadline in seconds. When unset, Beam uses its default.") + Integer getCommitDeadlineSeconds(); + + void setCommitDeadlineSeconds(Integer commitDeadlineSeconds); + + @Description("Maximum source series rows to process in Beam mode. Non-positive means no limit.") + @Default.Integer(0) + int getMaxSeriesRows(); + + void setMaxSeriesRows(int maxSeriesRows); + + @Description("Maximum source point rows to process in Beam mode. Non-positive means no limit.") + @Default.Integer(0) + int getMaxPointRows(); + + void setMaxPointRows(int maxPointRows); + + @Description( + "Maximum source series rows to process in validator mode. Non-positive means no limit.") + @Default.Integer(1000) + int getValidatorMaxSeriesRows(); + + void setValidatorMaxSeriesRows(int validatorMaxSeriesRows); +} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java new file mode 100644 index 00000000..64c9977c --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java @@ -0,0 +1,170 @@ +package org.datacommons.ingestion.timeseries; + +import com.google.cloud.spanner.Struct; +import org.apache.beam.runners.direct.DirectRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; +import org.apache.beam.sdk.io.gcp.spanner.Transaction; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Beam/Dataflow pipeline that backfills the normalized timeseries tables from Observation. */ +public class TimeseriesBackfillPipeline { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeseriesBackfillPipeline.class); + private static volatile LocalProgressTracker localProgressTracker; + + public static void main(String[] args) { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(TimeseriesBackfillOptions.class); + validateOptions(options); + LOGGER.info("Starting timeseries backfill with runner {}", runnerName(options)); + try (LocalProgressTracker progressTracker = createLocalProgressTracker(options)) { + setLocalProgressTracker(progressTracker); + Pipeline pipeline = Pipeline.create(options); + buildPipeline(pipeline, options); + PipelineResult result = pipeline.run(); + LOGGER.info("Timeseries backfill returned runner {}", result.getClass().getSimpleName()); + } finally { + clearLocalProgressTracker(); + } + } + + static void buildPipeline(Pipeline pipeline, TimeseriesBackfillOptions options) { + TimeseriesBackfillQueries queries = new TimeseriesBackfillQueries(options); + PCollectionView readTransaction = + pipeline.apply("CreateReadTransaction", TimeseriesBackfillIO.buildReadTransaction(options)); + + PCollection sourceRows = + pipeline + .apply( + "ReadSourceRows", + TimeseriesBackfillIO.buildRead(options) + .withTransaction(readTransaction) + .withQuery(queries.buildSourceQuery(options.getMaxSeriesRows())) + .withQueryName("ReadSourceRows")) + .apply("NormalizeSourceRows", ParDo.of(new StructToSourceObservationRowsFn())); + sourceRows.setCoder(SerializableCoder.of(SourceObservationRow.class)); + buildPipelineFromSourceRows(sourceRows, options); + } + + static void buildPipelineFromSourceRows( + PCollection sourceRows, TimeseriesBackfillOptions options) { + PCollection mutationGroups = + sourceRows.apply( + "MapMutationGroups", + ParDo.of( + new SourceRowsToMutationGroupsFn( + options.getDestinationTimeSeriesTableName(), + options.getDestinationTimeSeriesAttributeTableName(), + options.getDestinationStatVarObservationTableName()))); + + mutationGroups.apply("WriteMutationGroups", TimeseriesBackfillIO.buildGroupedWrite(options)); + } + + static void validateOptions(TimeseriesBackfillOptions options) { + TimeseriesBackfillOptionValidator.validateCommonOptions(options); + if (options.getMaxSeriesRows() > 0 || options.getMaxPointRows() > 0) { + throw new IllegalArgumentException( + "Beam row caps are not supported in the Beam/Dataflow entrypoints. " + + "Use observation_about sharding or exact Avro file selection, or use the " + + "validator for small bounded runs."); + } + } + + static boolean shouldEnableLocalProgress(TimeseriesBackfillOptions options) { + return options.getRunner() == null || DirectRunner.class.equals(options.getRunner()); + } + + static LocalProgressTracker createLocalProgressTracker(TimeseriesBackfillOptions options) { + if (!shouldEnableLocalProgress(options)) { + return new LocalProgressTracker("DataflowRunner", 0, 0, LOGGER); + } + return new LocalProgressTracker( + runnerName(options), + options.getProgressEverySourceRows(), + options.getHeartbeatSeconds(), + LOGGER); + } + + static String runnerName(TimeseriesBackfillOptions options) { + if (options.getRunner() == null) { + return DirectRunner.class.getSimpleName(); + } + return options.getRunner().getSimpleName(); + } + + static void setLocalProgressTracker(LocalProgressTracker progressTracker) { + localProgressTracker = progressTracker; + } + + static void clearLocalProgressTracker() { + localProgressTracker = null; + } + + static final class StructToSourceObservationRowsFn extends DoFn { + @ProcessElement + public void processElement(@Element Struct row, OutputReceiver out) { + out.output(SourceObservationRows.toObservationRow(row)); + } + } + + static final class SourceRowsToMutationGroupsFn + extends DoFn { + private static final Counter SOURCE_ROWS = + Metrics.counter(SourceRowsToMutationGroupsFn.class, "source_rows"); + private static final Counter TIMESERIES_ROWS = + Metrics.counter(SourceRowsToMutationGroupsFn.class, "timeseries_rows_written"); + private static final Counter TIMESERIES_ATTRIBUTE_ROWS = + Metrics.counter(SourceRowsToMutationGroupsFn.class, "timeseries_attribute_rows_written"); + private static final Counter SOURCE_POINT_ROWS = + Metrics.counter(SourceRowsToMutationGroupsFn.class, "source_point_rows"); + private static final Counter STAT_VAR_OBSERVATION_ROWS = + Metrics.counter(SourceRowsToMutationGroupsFn.class, "stat_var_observation_rows_written"); + private final String timeSeriesTableName; + private final String timeSeriesAttributeTableName; + private final String statVarObservationTableName; + + SourceRowsToMutationGroupsFn( + String timeSeriesTableName, + String timeSeriesAttributeTableName, + String statVarObservationTableName) { + this.timeSeriesTableName = timeSeriesTableName; + this.timeSeriesAttributeTableName = timeSeriesAttributeTableName; + this.statVarObservationTableName = statVarObservationTableName; + } + + @ProcessElement + public void processElement( + @Element SourceObservationRow sourceRow, OutputReceiver out) { + BackfillMutationGroups mutationGroups = + TimeseriesMutationFactory.toMutationGroups( + sourceRow, + timeSeriesTableName, + timeSeriesAttributeTableName, + statVarObservationTableName); + SOURCE_ROWS.inc(); + TIMESERIES_ROWS.inc(); + TIMESERIES_ATTRIBUTE_ROWS.inc(mutationGroups.timeSeriesAttributeRows()); + SOURCE_POINT_ROWS.inc(mutationGroups.statVarObservationRows()); + STAT_VAR_OBSERVATION_ROWS.inc(mutationGroups.statVarObservationRows()); + LocalProgressTracker progressTracker = localProgressTracker; + if (progressTracker != null) { + progressTracker.recordRow( + mutationGroups.timeSeriesAttributeRows(), mutationGroups.statVarObservationRows()); + } + + for (MutationGroup mutationGroup : mutationGroups.groups()) { + out.output(mutationGroup); + } + } + } +} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueries.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueries.java new file mode 100644 index 00000000..e7edd50d --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueries.java @@ -0,0 +1,83 @@ +package org.datacommons.ingestion.timeseries; + +import com.google.cloud.spanner.Statement; +import java.util.ArrayList; +import java.util.List; + +/** Builds source Spanner SQL for the timeseries backfill. */ +final class TimeseriesBackfillQueries { + private final TimeseriesBackfillOptions options; + + TimeseriesBackfillQueries(TimeseriesBackfillOptions options) { + this.options = options; + } + + Statement buildSourceQuery() { + return buildSourceQuery(0); + } + + Statement buildSourceQuery(int limit) { + return buildQuery( + """ + SELECT + observation_about, + variable_measured, + facet_id, + observation_period, + measurement_method, + unit, + scaling_factor, + import_name, + provenance_url, + is_dc_aggregate, + provenance, + observations + FROM %s""" + .formatted(options.getSourceObservationTableName()), + "", + limit); + } + + private Statement buildQuery(String selectAndFrom, String columnPrefix, int limit) { + StringBuilder sql = new StringBuilder(selectAndFrom); + Statement.Builder builder = Statement.newBuilder(selectAndFrom); + List filters = new ArrayList<>(); + List variableMeasuredFilters = + VariableMeasuredFilters.parse(options.getVariableMeasured()); + + if (!options.getStartObservationAbout().isEmpty()) { + filters.add(columnPrefix + "observation_about >= @startObservationAbout"); + builder.bind("startObservationAbout").to(options.getStartObservationAbout()); + } + if (!options.getEndObservationAboutExclusive().isEmpty()) { + filters.add(columnPrefix + "observation_about < @endObservationAboutExclusive"); + builder.bind("endObservationAboutExclusive").to(options.getEndObservationAboutExclusive()); + } + if (!variableMeasuredFilters.isEmpty()) { + if (variableMeasuredFilters.size() == 1) { + filters.add(columnPrefix + "variable_measured = @variableMeasured"); + builder.bind("variableMeasured").to(variableMeasuredFilters.get(0)); + } else { + filters.add(columnPrefix + "variable_measured IN UNNEST(@variableMeasuredList)"); + builder.bind("variableMeasuredList").toStringArray(variableMeasuredFilters); + } + } + + if (!filters.isEmpty()) { + sql.append(" WHERE ").append(String.join(" AND ", filters)); + } + + if (limit > 0) { + sql.append(" ORDER BY ") + .append(columnPrefix) + .append("observation_about, ") + .append(columnPrefix) + .append("variable_measured, ") + .append(columnPrefix) + .append("facet_id"); + sql.append(" LIMIT ").append(limit); + } + + return builder.build().withReplacedSql(sql.toString()); + } +} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillValidator.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillValidator.java new file mode 100644 index 00000000..b8c35a36 --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillValidator.java @@ -0,0 +1,144 @@ +package org.datacommons.ingestion.timeseries; + +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.ReadOnlyTransaction; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Small direct runner for validating the normalized backfill on a bounded shard. */ +public class TimeseriesBackfillValidator { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeseriesBackfillValidator.class); + private static final int MAX_VALIDATOR_WRITE_BATCH_SIZE = 1000; + + public static void main(String[] args) { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(TimeseriesBackfillOptions.class); + TimeseriesBackfillOptionValidator.validateCommonOptions(options); + LOGGER.info("Starting timeseries validator"); + + TimeseriesBackfillQueries queries = new TimeseriesBackfillQueries(options); + DatabaseId databaseId = + DatabaseId.of( + TimeseriesBackfillIO.resolveProjectId(options), + options.getSpannerInstanceId(), + options.getSpannerDatabaseId()); + + try (LocalProgressTracker progressTracker = + new LocalProgressTracker( + "validator", + options.getProgressEverySourceRows(), + options.getHeartbeatSeconds(), + LOGGER); + Spanner spanner = TimeseriesBackfillIO.createSpannerService(options); + ReadOnlyTransaction readOnlyTransaction = + spanner + .getDatabaseClient(databaseId) + .readOnlyTransaction(TimeseriesBackfillIO.getTimestampBound(options))) { + DatabaseClient databaseClient = spanner.getDatabaseClient(databaseId); + Timestamp writeTimestamp = + validate(databaseClient, readOnlyTransaction, queries, options, progressTracker); + LOGGER.info("Validator finished. Final write timestamp: {}", writeTimestamp); + } + } + + private static Timestamp validate( + DatabaseClient databaseClient, + ReadOnlyTransaction readOnlyTransaction, + TimeseriesBackfillQueries queries, + TimeseriesBackfillOptions options, + LocalProgressTracker progressTracker) { + int sourceRows = 0; + int timeSeriesRows = 0; + int timeSeriesAttributeRows = 0; + int statVarObservationRows = 0; + List sampleSeriesIds = new ArrayList<>(); + List pendingMutations = new ArrayList<>(); + Timestamp lastWriteTimestamp = null; + + try (ResultSet resultSet = + readOnlyTransaction.executeQuery( + queries.buildSourceQuery(options.getValidatorMaxSeriesRows()))) { + while (resultSet.next()) { + SourceObservationRow row = + SourceObservationRows.toObservationRow(resultSet.getCurrentRowAsStruct()); + BackfillMutationGroups mutationGroups = + TimeseriesMutationFactory.toMutationGroups( + row, + options.getDestinationTimeSeriesTableName(), + options.getDestinationTimeSeriesAttributeTableName(), + options.getDestinationStatVarObservationTableName()); + sourceRows++; + if (sampleSeriesIds.size() < 5) { + sampleSeriesIds.add(TimeseriesMutationFactory.seriesId(row.seriesRow())); + } + + timeSeriesRows++; + timeSeriesAttributeRows += mutationGroups.timeSeriesAttributeRows(); + statVarObservationRows += mutationGroups.statVarObservationRows(); + progressTracker.recordRow( + mutationGroups.timeSeriesAttributeRows(), mutationGroups.statVarObservationRows()); + for (MutationGroup mutationGroup : mutationGroups.groups()) { + for (Mutation mutation : mutationGroup) { + pendingMutations.add(mutation); + } + } + + lastWriteTimestamp = + flushMutationsIfNeeded( + databaseClient, pendingMutations, lastWriteTimestamp, progressTracker); + } + } + + lastWriteTimestamp = + flushMutationsIfNeeded( + databaseClient, pendingMutations, lastWriteTimestamp, progressTracker, true); + LOGGER.info( + "Validated {} source rows into {} TimeSeries rows, {} TimeSeriesAttribute rows, and {} StatVarObservation rows. Sample series ids: {}", + sourceRows, + timeSeriesRows, + timeSeriesAttributeRows, + statVarObservationRows, + sampleSeriesIds); + return lastWriteTimestamp; + } + + private static Timestamp flushMutationsIfNeeded( + DatabaseClient databaseClient, + List pendingMutations, + Timestamp lastWriteTimestamp, + LocalProgressTracker progressTracker) { + return flushMutationsIfNeeded( + databaseClient, pendingMutations, lastWriteTimestamp, progressTracker, false); + } + + private static Timestamp flushMutationsIfNeeded( + DatabaseClient databaseClient, + List pendingMutations, + Timestamp lastWriteTimestamp, + LocalProgressTracker progressTracker, + boolean flushAll) { + if (pendingMutations.isEmpty()) { + return lastWriteTimestamp; + } + if (!flushAll && pendingMutations.size() < MAX_VALIDATOR_WRITE_BATCH_SIZE) { + return lastWriteTimestamp; + } + + int mutationCount = pendingMutations.size(); + Timestamp writeTimestamp = databaseClient.writeAtLeastOnce(pendingMutations); + if (progressTracker.isEnabled()) { + progressTracker.recordValidatorFlush(mutationCount, writeTimestamp); + } + pendingMutations.clear(); + return writeTimestamp; + } +} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java new file mode 100644 index 00000000..cba0c681 --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java @@ -0,0 +1,125 @@ +package org.datacommons.ingestion.timeseries; + +import com.google.cloud.spanner.Mutation; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; + +/** Converts source rows into normalized timeseries table mutations. */ +final class TimeseriesMutationFactory { + private static final int MAX_GROUP_MUTATIONS = 1000; + + private TimeseriesMutationFactory() {} + + static Mutation toTimeSeriesMutation(SourceSeriesRow row, String tableName) { + return Mutation.newInsertOrUpdateBuilder(tableName) + .set("id") + .to(seriesId(row)) + .set("variable_measured") + .to(row.variableMeasured()) + .set("provenance") + .to(row.provenance()) + .build(); + } + + static List toTimeSeriesAttributeMutations(SourceSeriesRow row, String tableName) { + List mutations = new ArrayList<>(); + addAttribute(mutations, tableName, row, "observationAbout", row.observationAbout()); + addAttribute(mutations, tableName, row, "facetId", row.facetId()); + addAttributeIfPresent(mutations, tableName, row, "importName", row.importName()); + addAttributeIfPresent(mutations, tableName, row, "provenanceUrl", row.provenanceUrl()); + addAttributeIfPresent(mutations, tableName, row, "observationPeriod", row.observationPeriod()); + addAttributeIfPresent(mutations, tableName, row, "measurementMethod", row.measurementMethod()); + addAttributeIfPresent(mutations, tableName, row, "unit", row.unit()); + addAttributeIfPresent(mutations, tableName, row, "scalingFactor", row.scalingFactor()); + addAttribute(mutations, tableName, row, "isDcAggregate", Boolean.toString(row.isDcAggregate())); + return mutations; + } + + static Mutation toStatVarObservationMutation(SourcePointRow row, String tableName) { + return Mutation.newInsertOrUpdateBuilder(tableName) + .set("id") + .to(seriesId(row)) + .set("date") + .to(row.date()) + .set("value") + .to(row.value()) + .build(); + } + + private static void addAttributeIfPresent( + List mutations, + String tableName, + SourceSeriesRow row, + String property, + String value) { + if (value == null || value.isEmpty()) { + return; + } + addAttribute(mutations, tableName, row, property, value); + } + + private static void addAttribute( + List mutations, + String tableName, + SourceSeriesRow row, + String property, + String value) { + mutations.add( + Mutation.newInsertOrUpdateBuilder(tableName) + .set("id") + .to(seriesId(row)) + .set("property") + .to(property) + .set("value") + .to(value) + .build()); + } + + static BackfillMutationGroups toMutationGroups( + SourceObservationRow row, + String timeSeriesTableName, + String timeSeriesAttributeTableName, + String statVarObservationTableName) { + Mutation timeSeriesMutation = toTimeSeriesMutation(row.seriesRow(), timeSeriesTableName); + List attributeMutations = + toTimeSeriesAttributeMutations(row.seriesRow(), timeSeriesAttributeTableName); + List pointMutations = new ArrayList<>(); + for (SourcePointRow pointRow : row.pointRows()) { + pointMutations.add(toStatVarObservationMutation(pointRow, statVarObservationTableName)); + } + + List mutationGroups = new ArrayList<>(); + List attached = new ArrayList<>(attributeMutations); + int maxAttachedMutations = MAX_GROUP_MUTATIONS - 1; + int pointIndex = 0; + while (pointIndex < pointMutations.size() && attached.size() < maxAttachedMutations) { + attached.add(pointMutations.get(pointIndex)); + pointIndex++; + } + mutationGroups.add(MutationGroup.create(timeSeriesMutation, attached)); + + while (pointIndex < pointMutations.size()) { + attached = new ArrayList<>(); + while (pointIndex < pointMutations.size() && attached.size() < maxAttachedMutations) { + attached.add(pointMutations.get(pointIndex)); + pointIndex++; + } + mutationGroups.add(MutationGroup.create(timeSeriesMutation, attached)); + } + + return new BackfillMutationGroups( + mutationGroups, attributeMutations.size(), pointMutations.size()); + } + + static String seriesId(SourceSeriesRow row) { + return SeriesIdGenerator.build(row.variableMeasured(), row.observationAbout(), row.facetId()); + } + + static String seriesId(SourcePointRow row) { + return SeriesIdGenerator.build(row.variableMeasured(), row.observationAbout(), row.facetId()); + } +} + +record BackfillMutationGroups( + List groups, int timeSeriesAttributeRows, int statVarObservationRows) {} diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/VariableMeasuredFilters.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/VariableMeasuredFilters.java new file mode 100644 index 00000000..a6d8e68b --- /dev/null +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/VariableMeasuredFilters.java @@ -0,0 +1,23 @@ +package org.datacommons.ingestion.timeseries; + +import java.util.ArrayList; +import java.util.List; + +/** Parses the variable_measured option into one or more filter values. */ +final class VariableMeasuredFilters { + private VariableMeasuredFilters() {} + + static List parse(String value) { + List filters = new ArrayList<>(); + if (value == null || value.isEmpty()) { + return filters; + } + for (String part : value.split(",")) { + String trimmed = part.trim(); + if (!trimmed.isEmpty()) { + filters.add(trimmed); + } + } + return filters; + } +} diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/LocalProgressTrackerTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/LocalProgressTrackerTest.java new file mode 100644 index 00000000..ae7748f0 --- /dev/null +++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/LocalProgressTrackerTest.java @@ -0,0 +1,58 @@ +package org.datacommons.ingestion.timeseries; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.Test; + +public class LocalProgressTrackerTest { + @Test + public void recordRow_logsAtConfiguredInterval() { + List messages = new ArrayList<>(); + AtomicLong nowMillis = new AtomicLong(1_000L); + LocalProgressTracker tracker = + new LocalProgressTracker("DirectRunner", 2, 0, messages::add, nowMillis::get, null); + + tracker.recordRow(3, 7); + tracker.recordRow(5, 11); + + assertEquals(1, messages.size()); + assertTrue(messages.get(0).contains("DirectRunner progress")); + assertTrue(messages.get(0).contains("source_rows=2")); + assertTrue(messages.get(0).contains("timeseries_attribute_rows=8")); + assertTrue(messages.get(0).contains("stat_var_observation_rows=18")); + } + + @Test + public void logHeartbeatNow_reportsNoRowsYet() { + List messages = new ArrayList<>(); + AtomicLong nowMillis = new AtomicLong(31_000L); + LocalProgressTracker tracker = + new LocalProgressTracker("validator", 0, 30, messages::add, nowMillis::get, null); + + tracker.logHeartbeatNow(); + + assertEquals(1, messages.size()); + assertTrue(messages.get(0).contains("validator heartbeat")); + assertTrue(messages.get(0).contains("source_rows=0")); + assertTrue(messages.get(0).contains("no_source_rows_yet=true")); + } + + @Test + public void recordValidatorFlush_logsMutationCountAndTimestamp() { + List messages = new ArrayList<>(); + LocalProgressTracker tracker = + new LocalProgressTracker("validator", 1, 0, messages::add, System::currentTimeMillis, null); + + tracker.recordValidatorFlush(12, Timestamp.parseTimestamp("2026-04-23T00:00:00Z")); + + assertEquals(1, messages.size()); + assertTrue(messages.get(0).contains("validator flush")); + assertTrue(messages.get(0).contains("mutations=12")); + assertTrue(messages.get(0).contains("2026-04-23T00:00:00Z")); + } +} diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/ObservationExportFilesTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/ObservationExportFilesTest.java new file mode 100644 index 00000000..7c5e7a3d --- /dev/null +++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/ObservationExportFilesTest.java @@ -0,0 +1,72 @@ +package org.datacommons.ingestion.timeseries; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.gson.JsonParser; +import java.util.List; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +public class ObservationExportFilesTest { + @Test + public void validateOptions_rejectsMissingAndBothSourceModes() { + TimeseriesBackfillOptions missingOptions = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + assertInvalid(missingOptions); + + TimeseriesBackfillOptions bothOptions = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + bothOptions.setInputExportDir("gs://bucket/export"); + bothOptions.setInputFiles("gs://bucket/export/Observation.avro-00000-of-00001"); + assertInvalid(bothOptions); + } + + @Test + public void resolveInputFiles_parsesCsvFileList() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + options.setInputFiles("gs://bucket/a.avro, gs://bucket/b.avro "); + + assertEquals( + List.of("gs://bucket/a.avro", "gs://bucket/b.avro"), + ObservationExportFiles.resolveInputFiles(options)); + } + + @Test + public void parseManifest_resolvesRelativeAndAbsoluteObservationFiles() { + List files = + ObservationExportFiles.parseManifest( + JsonParser.parseString( + """ + { + "files": [ + "Observation.avro-00000-of-00002", + "nested/Observation.avro-00001-of-00002", + "gs://other/export/Observation.avro-00002-of-00002", + "Other.avro-00000-of-00001" + ] + } + """), + "gs://bucket/export", + "Observation"); + + assertEquals( + List.of( + "gs://bucket/export/Observation.avro-00000-of-00002", + "gs://bucket/export/nested/Observation.avro-00001-of-00002", + "gs://other/export/Observation.avro-00002-of-00002"), + files); + } + + private static void assertInvalid(TimeseriesBackfillOptions options) { + try { + ObservationExportFiles.validateOptions(options); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + assertEquals( + "Exactly one of inputExportDir or inputFiles must be provided for the Avro pipeline.", + expected.getMessage()); + } + } +} diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java new file mode 100644 index 00000000..bc895627 --- /dev/null +++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java @@ -0,0 +1,123 @@ +package org.datacommons.ingestion.timeseries; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.ByteArray; +import com.google.cloud.spanner.Struct; +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.datacommons.Storage.Observations; +import org.junit.Test; + +public class SourceObservationRowsTest { + @Test + public void toObservationRow_expandsObservationProtoFromStruct() { + Struct row = + Struct.newBuilder() + .set("observation_about") + .to("geoId/06") + .set("variable_measured") + .to("Count_Person") + .set("facet_id") + .to("123") + .set("observation_period") + .to((String) null) + .set("measurement_method") + .to((String) null) + .set("unit") + .to((String) null) + .set("scaling_factor") + .to((String) null) + .set("import_name") + .to("TestImport") + .set("provenance_url") + .to((String) null) + .set("is_dc_aggregate") + .to(false) + .set("provenance") + .to("dc/base/WrongImport") + .set("observations") + .to( + ByteArray.copyFrom( + Observations.newBuilder() + .putValues("2023", "1") + .putValues("2024", "2") + .build() + .toByteArray())) + .build(); + + SourceObservationRow observationRow = SourceObservationRows.toObservationRow(row); + List rows = observationRow.pointRows(); + rows.sort(Comparator.comparing(SourcePointRow::date)); + + assertEquals("geoId/06", observationRow.seriesRow().observationAbout()); + assertEquals("Count_Person", observationRow.seriesRow().variableMeasured()); + assertEquals("dc/base/TestImport", observationRow.seriesRow().provenance()); + assertEquals(2, rows.size()); + assertEquals(new SourcePointRow("geoId/06", "Count_Person", "123", "2023", "1"), rows.get(0)); + assertEquals(new SourcePointRow("geoId/06", "Count_Person", "123", "2024", "2"), rows.get(1)); + } + + @Test + public void toObservationRow_expandsObservationProtoFromGenericRecord() { + GenericRecord row = new GenericData.Record(schemaWithoutGeneratedProvenance()); + row.put("observation_about", "geoId/06"); + row.put("variable_measured", "Count_Person"); + row.put("facet_id", "123"); + row.put("observation_period", null); + row.put("measurement_method", null); + row.put("unit", null); + row.put("scaling_factor", null); + row.put("import_name", "TestImport"); + row.put("provenance_url", "https://example.com"); + row.put("is_dc_aggregate", true); + row.put( + "observations", + ByteBuffer.wrap( + Observations.newBuilder() + .putValues("2023", "1") + .putValues("2024", "2") + .build() + .toByteArray())); + + SourceObservationRow observationRow = SourceObservationRows.toObservationRow(row); + List rows = observationRow.pointRows(); + rows.sort(Comparator.comparing(SourcePointRow::date)); + + assertEquals("geoId/06", observationRow.seriesRow().observationAbout()); + assertEquals("Count_Person", observationRow.seriesRow().variableMeasured()); + assertEquals("dc/base/TestImport", observationRow.seriesRow().provenance()); + assertEquals("https://example.com", observationRow.seriesRow().provenanceUrl()); + assertEquals(2, rows.size()); + assertEquals(new SourcePointRow("geoId/06", "Count_Person", "123", "2023", "1"), rows.get(0)); + assertEquals(new SourcePointRow("geoId/06", "Count_Person", "123", "2024", "2"), rows.get(1)); + } + + private static Schema schemaWithoutGeneratedProvenance() { + return new Schema.Parser() + .parse( + """ + { + "type": "record", + "name": "Observation", + "fields": [ + {"name": "observation_about", "type": "string"}, + {"name": "variable_measured", "type": "string"}, + {"name": "facet_id", "type": "string"}, + {"name": "observation_period", "type": ["null", "string"], "default": null}, + {"name": "measurement_method", "type": ["null", "string"], "default": null}, + {"name": "unit", "type": ["null", "string"], "default": null}, + {"name": "scaling_factor", "type": ["null", "string"], "default": null}, + {"name": "import_name", "type": "string"}, + {"name": "provenance_url", "type": ["null", "string"], "default": null}, + {"name": "is_dc_aggregate", "type": ["null", "boolean"], "default": null}, + {"name": "observations", "type": "bytes"} + ] + } + """); + } +} diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipelineTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipelineTest.java new file mode 100644 index 00000000..443727ca --- /dev/null +++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipelineTest.java @@ -0,0 +1,67 @@ +package org.datacommons.ingestion.timeseries; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +public class TimeseriesBackfillAvroPipelineTest { + @Test + public void validateOptions_acceptsExportDirAndInputFilesModes() { + TimeseriesBackfillOptions exportDirOptions = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + exportDirOptions.setInputExportDir("gs://bucket/export"); + + TimeseriesBackfillOptions inputFilesOptions = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + inputFilesOptions.setInputFiles("gs://bucket/export/Observation.avro-00000-of-00001"); + + TimeseriesBackfillAvroPipeline.validateOptions(exportDirOptions); + TimeseriesBackfillAvroPipeline.validateOptions(inputFilesOptions); + } + + @Test + public void validateOptions_rejectsBeamRowCapsForAvroPipeline() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + options.setInputExportDir("gs://bucket/export"); + options.setMaxSeriesRows(10); + + try { + TimeseriesBackfillAvroPipeline.validateOptions(options); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Beam row caps are not supported")); + } + } + + @Test + public void matchesFilters_appliesObservationAboutAndVariableMeasuredBounds() { + SourceSeriesRow seriesRow = + new SourceSeriesRow( + "geoId/06", + "Count_Person", + "123", + "", + "", + "", + "", + "TestImport", + "", + false, + "dc/base/TestImport"); + + assertTrue( + TimeseriesBackfillAvroPipeline.matchesFilters( + seriesRow, "geoId/06", "geoId/07", List.of("Count_Person"))); + assertFalse( + TimeseriesBackfillAvroPipeline.matchesFilters( + seriesRow, "geoId/07", "", List.of("Count_Person"))); + assertFalse( + TimeseriesBackfillAvroPipeline.matchesFilters( + seriesRow, "", "", List.of("Min_Temperature"))); + } +} diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipelineTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipelineTest.java new file mode 100644 index 00000000..d230a7ca --- /dev/null +++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipelineTest.java @@ -0,0 +1,181 @@ +package org.datacommons.ingestion.timeseries; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.lang.reflect.Method; +import java.util.OptionalInt; +import org.apache.beam.runners.dataflow.DataflowRunner; +import org.apache.beam.runners.direct.DirectRunner; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.joda.time.Duration; +import org.junit.Test; + +public class TimeseriesBackfillPipelineTest { + @Test + public void options_haveLocalProgressDefaults() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + + assertEquals(1000, options.getProgressEverySourceRows()); + assertEquals(30, options.getHeartbeatSeconds()); + } + + @Test + public void shouldEnableLocalProgress_trueForDefaultAndDirectRunner() { + TimeseriesBackfillOptions defaultOptions = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + TimeseriesBackfillOptions directOptions = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + directOptions.setRunner(DirectRunner.class); + + assertTrue(TimeseriesBackfillPipeline.shouldEnableLocalProgress(defaultOptions)); + assertTrue(TimeseriesBackfillPipeline.shouldEnableLocalProgress(directOptions)); + } + + @Test + public void shouldEnableLocalProgress_falseForDataflowRunner() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + options.setRunner(DataflowRunner.class); + + assertFalse(TimeseriesBackfillPipeline.shouldEnableLocalProgress(options)); + } + + @Test + public void resolveProjectId_prefersProjectAndFallsBackToProjectId() { + TimeseriesBackfillOptions projectOptions = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + projectOptions.setProject("beam-project"); + projectOptions.setProjectId("legacy-project"); + + TimeseriesBackfillOptions legacyOptions = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + legacyOptions.setProject(""); + legacyOptions.setProjectId("legacy-project"); + + assertEquals("beam-project", TimeseriesBackfillIO.resolveProjectId(projectOptions)); + assertEquals("legacy-project", TimeseriesBackfillIO.resolveProjectId(legacyOptions)); + } + + @Test + public void options_leaveOptionalSpannerSinkFlagsUnsetByDefault() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + + assertNull(options.getBatchSizeBytes()); + assertNull(options.getMaxNumRows()); + assertNull(options.getMaxNumMutations()); + assertNull(options.getGroupingFactor()); + assertNull(options.getCommitDeadlineSeconds()); + } + + @Test + public void options_parseOptionalSpannerSinkFlagsFromArgs() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.fromArgs( + "--batchSizeBytes=1024", + "--maxNumRows=100", + "--maxNumMutations=500", + "--groupingFactor=20", + "--commitDeadlineSeconds=45") + .as(TimeseriesBackfillOptions.class); + + assertEquals(Long.valueOf(1024), options.getBatchSizeBytes()); + assertEquals(Integer.valueOf(100), options.getMaxNumRows()); + assertEquals(Integer.valueOf(500), options.getMaxNumMutations()); + assertEquals(Integer.valueOf(20), options.getGroupingFactor()); + assertEquals(Integer.valueOf(45), options.getCommitDeadlineSeconds()); + } + + @Test + public void validateOptions_rejectsNonPositiveOptionalSpannerSinkFlags() { + assertRejectsInvalidWriteOption("batchSizeBytes", "batchSizeBytes must be positive"); + assertRejectsInvalidWriteOption("maxNumRows", "maxNumRows must be positive"); + assertRejectsInvalidWriteOption("maxNumMutations", "maxNumMutations must be positive"); + assertRejectsInvalidWriteOption("groupingFactor", "groupingFactor must be positive"); + assertRejectsInvalidWriteOption( + "commitDeadlineSeconds", "commitDeadlineSeconds must be positive"); + } + + @Test + public void buildWrite_appliesOptionalSpannerSinkFlagsWhenProvided() throws Exception { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + options.setProject("beam-project"); + options.setSpannerInstanceId("instance"); + options.setSpannerDatabaseId("database"); + options.setBatchSizeBytes(1024L); + options.setMaxNumRows(100); + options.setMaxNumMutations(500); + options.setGroupingFactor(20); + options.setCommitDeadlineSeconds(45); + + SpannerIO.Write write = TimeseriesBackfillIO.buildWrite(options); + + assertEquals(1024L, invokeLong(write, "getBatchSizeBytes")); + assertEquals(100L, invokeLong(write, "getMaxNumRows")); + assertEquals(500L, invokeLong(write, "getMaxNumMutations")); + assertEquals(OptionalInt.of(20), invokeGroupingFactor(write)); + assertEquals( + Duration.standardSeconds(45), invokeSpannerConfig(write).getCommitDeadline().get()); + } + + private static void assertRejectsInvalidWriteOption(String optionName, String messageFragment) { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + setInvalidWriteOption(options, optionName); + + try { + TimeseriesBackfillOptionValidator.validateCommonOptions(options); + fail("Expected IllegalArgumentException for " + optionName); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains(messageFragment)); + } + } + + private static void setInvalidWriteOption(TimeseriesBackfillOptions options, String optionName) { + switch (optionName) { + case "batchSizeBytes": + options.setBatchSizeBytes(0L); + return; + case "maxNumRows": + options.setMaxNumRows(0); + return; + case "maxNumMutations": + options.setMaxNumMutations(0); + return; + case "groupingFactor": + options.setGroupingFactor(0); + return; + case "commitDeadlineSeconds": + options.setCommitDeadlineSeconds(0); + return; + default: + throw new IllegalArgumentException("Unexpected option " + optionName); + } + } + + private static long invokeLong(SpannerIO.Write write, String methodName) throws Exception { + Method method = SpannerIO.Write.class.getDeclaredMethod(methodName); + method.setAccessible(true); + return (long) method.invoke(write); + } + + private static OptionalInt invokeGroupingFactor(SpannerIO.Write write) throws Exception { + Method method = SpannerIO.Write.class.getDeclaredMethod("getGroupingFactor"); + method.setAccessible(true); + return (OptionalInt) method.invoke(write); + } + + private static SpannerConfig invokeSpannerConfig(SpannerIO.Write write) throws Exception { + Method method = SpannerIO.Write.class.getDeclaredMethod("getSpannerConfig"); + method.setAccessible(true); + return (SpannerConfig) method.invoke(write); + } +} diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueriesTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueriesTest.java new file mode 100644 index 00000000..f53d9951 --- /dev/null +++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillQueriesTest.java @@ -0,0 +1,101 @@ +package org.datacommons.ingestion.timeseries; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.cloud.spanner.Statement; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +public class TimeseriesBackfillQueriesTest { + @Test + public void buildSourceQuery_selectsMetadataObservationsAndShardFilters() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + options.setSourceObservationTableName("Observation"); + options.setVariableMeasured("Count_Person"); + options.setStartObservationAbout("geoId/06"); + options.setEndObservationAboutExclusive("geoId/07"); + + Statement query = new TimeseriesBackfillQueries(options).buildSourceQuery(); + + assertTrue(query.getSql().contains("provenance")); + assertTrue(query.getSql().contains("observations")); + assertTrue(query.getSql().contains("FROM Observation")); + assertTrue(query.getSql().contains("observation_about >= @startObservationAbout")); + assertTrue(query.getSql().contains("observation_about < @endObservationAboutExclusive")); + assertTrue(query.getSql().contains("variable_measured = @variableMeasured")); + assertFalse(query.getSql().contains("UNNEST")); + assertTrue(query.hasBinding("startObservationAbout")); + assertTrue(query.hasBinding("endObservationAboutExclusive")); + assertTrue(query.hasBinding("variableMeasured")); + } + + @Test + public void buildSourceQuery_supportsCommaSeparatedVariableMeasuredFilters() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + options.setSourceObservationTableName("Observation"); + options.setVariableMeasured("Count_Person, Min_Temperature ,Max_Temperature"); + + Statement query = new TimeseriesBackfillQueries(options).buildSourceQuery(); + + assertTrue(query.getSql().contains("variable_measured IN UNNEST(@variableMeasuredList)")); + assertFalse(query.hasBinding("variableMeasured")); + assertTrue(query.hasBinding("variableMeasuredList")); + } + + @Test + public void buildSourceQuery_supportsBoundedValidatorRuns() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + options.setSourceObservationTableName("Observation"); + + Statement query = new TimeseriesBackfillQueries(options).buildSourceQuery(10); + + assertTrue(query.getSql().contains("provenance")); + assertTrue(query.getSql().contains("observations")); + assertTrue(query.getSql().contains("LIMIT 10")); + assertTrue(query.getSql().contains("ORDER BY observation_about")); + } + + @Test + public void validateOptions_rejectsObservationAboutRangeWithoutVariableMeasured() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + options.setStartObservationAbout("geoId/06"); + + try { + TimeseriesBackfillPipeline.validateOptions(options); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("variableMeasured is required")); + } + } + + @Test + public void validateOptions_acceptsObservationAboutRangeWithCommaSeparatedVariableMeasured() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + options.setVariableMeasured("Count_Person, Min_Temperature"); + options.setStartObservationAbout("geoId/06"); + + TimeseriesBackfillPipeline.validateOptions(options); + } + + @Test + public void validateOptions_rejectsBeamRowCaps() { + TimeseriesBackfillOptions options = + PipelineOptionsFactory.create().as(TimeseriesBackfillOptions.class); + options.setVariableMeasured("Count_Person"); + options.setMaxSeriesRows(10); + + try { + TimeseriesBackfillPipeline.validateOptions(options); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Beam row caps are not supported")); + } + } +} diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java new file mode 100644 index 00000000..2618e18e --- /dev/null +++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java @@ -0,0 +1,170 @@ +package org.datacommons.ingestion.timeseries; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.spanner.Mutation; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; +import org.junit.Test; + +public class TimeseriesMutationFactoryTest { + @Test + public void toTimeSeriesMutation_usesExistingSeriesDcidAndSourceProvenance() { + SourceSeriesRow row = + new SourceSeriesRow( + "geoId/06", + "Count_Person", + "12345", + "P1Y", + "CensusACS5yrSurvey", + "Person", + "1", + "CensusACS5YearSurvey", + "https://example.com", + false, + "dc/base/CensusACS5YearSurvey"); + + Mutation mutation = TimeseriesMutationFactory.toTimeSeriesMutation(row, "TimeSeries"); + + assertEquals("TimeSeries", mutation.getTable()); + assertEquals("dc/os/Count_Person_geoId_06_12345", mutation.asMap().get("id").getString()); + assertEquals("dc/base/CensusACS5YearSurvey", mutation.asMap().get("provenance").getString()); + } + + @Test + public void toTimeSeriesAttributeMutations_preservesNormalizedReadAttributes() { + SourceSeriesRow row = + new SourceSeriesRow( + "geoId/06", + "Count_Person", + "12345", + "P1Y", + "CensusACS5yrSurvey", + "Person", + "1", + "CensusACS5YearSurvey", + "", + true, + "dc/base/CensusACS5YearSurvey"); + + List mutations = + TimeseriesMutationFactory.toTimeSeriesAttributeMutations(row, "TimeSeriesAttribute"); + + Map properties = + mutations.stream() + .collect( + Collectors.toMap( + mutation -> mutation.asMap().get("property").getString(), + mutation -> mutation.asMap().get("value").getString())); + + assertEquals("geoId/06", properties.get("observationAbout")); + assertEquals("12345", properties.get("facetId")); + assertEquals("CensusACS5YearSurvey", properties.get("importName")); + assertEquals("P1Y", properties.get("observationPeriod")); + assertEquals("CensusACS5yrSurvey", properties.get("measurementMethod")); + assertEquals("Person", properties.get("unit")); + assertEquals("1", properties.get("scalingFactor")); + assertEquals("true", properties.get("isDcAggregate")); + assertFalse(properties.containsKey("provenanceUrl")); + } + + @Test + public void toStatVarObservationMutation_emitsPointMutation() { + SourcePointRow row = new SourcePointRow("geoId/06", "Count_Person", "12345", "2024", "123.4"); + + Mutation mutation = + TimeseriesMutationFactory.toStatVarObservationMutation(row, "StatVarObservation"); + + assertEquals("StatVarObservation", mutation.getTable()); + assertEquals("2024", mutation.asMap().get("date").getString()); + assertEquals("123.4", mutation.asMap().get("value").getString()); + } + + @Test + public void toMutationGroups_groupsParentAttributesAndPointsFromSingleSourceRow() { + SourceSeriesRow seriesRow = + new SourceSeriesRow( + "geoId/06", + "Count_Person", + "12345", + "P1Y", + "CensusACS5yrSurvey", + "Person", + "1", + "CensusACS5YearSurvey", + "", + true, + "dc/base/CensusACS5YearSurvey"); + List pointRows = + List.of( + new SourcePointRow("geoId/06", "Count_Person", "12345", "2023", "1"), + new SourcePointRow("geoId/06", "Count_Person", "12345", "2024", "2")); + + BackfillMutationGroups mutationGroups = + TimeseriesMutationFactory.toMutationGroups( + new SourceObservationRow(seriesRow, pointRows), + "TimeSeries", + "TimeSeriesAttribute", + "StatVarObservation"); + + assertEquals(1, mutationGroups.groups().size()); + assertEquals(8, mutationGroups.timeSeriesAttributeRows()); + assertEquals(2, mutationGroups.statVarObservationRows()); + + MutationGroup mutationGroup = mutationGroups.groups().get(0); + assertEquals("TimeSeries", mutationGroup.primary().getTable()); + assertEquals(10, mutationGroup.attached().size()); + assertTrue( + mutationGroup.attached().stream() + .anyMatch(mutation -> mutation.getTable().equals("TimeSeriesAttribute"))); + assertTrue( + mutationGroup.attached().stream() + .anyMatch(mutation -> mutation.getTable().equals("StatVarObservation"))); + } + + @Test + public void toMutationGroups_splitsLargePointSetsAcrossGroups() { + SourceSeriesRow seriesRow = + new SourceSeriesRow( + "geoId/06", + "Count_Person", + "12345", + "", + "", + "", + "", + "", + "", + false, + "dc/base/CensusACS5YearSurvey"); + List pointRows = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + pointRows.add( + new SourcePointRow("geoId/06", "Count_Person", "12345", Integer.toString(i), "1")); + } + + BackfillMutationGroups mutationGroups = + TimeseriesMutationFactory.toMutationGroups( + new SourceObservationRow(seriesRow, pointRows), + "TimeSeries", + "TimeSeriesAttribute", + "StatVarObservation"); + + assertEquals(2, mutationGroups.groups().size()); + assertEquals("TimeSeries", mutationGroups.groups().get(0).primary().getTable()); + assertEquals("TimeSeries", mutationGroups.groups().get(1).primary().getTable()); + } + + @Test + public void seriesIdGenerator_replacesSlashesWithUnderscores() { + assertEquals( + "dc/os/Count_Person_geoId_06_123", + SeriesIdGenerator.build("Count_Person", "geoId/06", "123")); + assertTrue(SeriesIdGenerator.build("Count/Person", "geoId/06", "123").contains("Count_Person")); + } +} From ca781f10c3fb22882eb92b770aff6d018d69afe5 Mon Sep 17 00:00:00 2001 From: Rohit Kumar Date: Fri, 24 Apr 2026 04:10:07 +0000 Subject: [PATCH 2/4] feat: add CompactSourceObservationRow support to pipeline to optimize processing of serialized observations --- pipeline/timeseries-backfill/README.md | 25 +++++- .../timeseries/SourceObservationRows.java | 39 +++++++-- .../TimeseriesBackfillAvroPipeline.java | 83 ++++++++++++++++--- .../TimeseriesBackfillPipeline.java | 4 + .../timeseries/TimeseriesMutationFactory.java | 38 +++++++++ .../timeseries/SourceObservationRowsTest.java | 31 +++++++ .../TimeseriesMutationFactoryTest.java | 34 ++++++++ 7 files changed, 237 insertions(+), 17 deletions(-) diff --git a/pipeline/timeseries-backfill/README.md b/pipeline/timeseries-backfill/README.md index ddf9569e..c6fc1019 100644 --- a/pipeline/timeseries-backfill/README.md +++ b/pipeline/timeseries-backfill/README.md @@ -191,7 +191,14 @@ Set `--progressEverySourceRows` or `--heartbeatSeconds` if you want a different mvn -Pgit-worktree compile exec:java \ -pl timeseries-backfill -am \ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ - -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs:/// --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --runner=DirectRunner" + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputFiles=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587/Observation.avro-00005-of-00303 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DirectRunner" + + + mvn -Pgit-worktree compile exec:java \ + -pl timeseries-backfill -am \ + -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputFiles=/usr/local/google/home/rohitrkumar/Documents/dc/github/rohitkumarbhagat/import/pipeline/Observation.avro-00042-of-00303 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DirectRunner" + ``` The Avro entrypoint reads exported `Observation` Avro rows, recomputes `provenance` from `import_name`, parses `observations` from the serialized proto payload, and then reuses the same normalized write path as the Spanner entrypoint. @@ -223,6 +230,22 @@ mvn -Pgit-worktree compile exec:java \ -pl timeseries-backfill -am \ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/ --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2" + + mvn -Pgit-worktree compile exec:java \ + -pl timeseries-backfill -am \ + -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputFiles=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587/Observation.avro-00042-of-00303 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2" + + mvn -Pgit-worktree compile exec:java \ + -pl timeseries-backfill -am \ + -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2" + + mvn -Pgit-worktree compile exec:java \ + -pl timeseries-backfill -am \ + -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2 + ``` ## Recreate Destination Tables diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java index 96edfd93..82bd43b1 100644 --- a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/SourceObservationRows.java @@ -45,6 +45,23 @@ static SourceObservationRow toObservationRow(GenericRecord row) { parseObservations(row)); } + static CompactSourceObservationRow toCompactObservationRow(GenericRecord row) { + return new CompactSourceObservationRow( + new SourceSeriesRow( + getNullableString(row, "observation_about"), + getNullableString(row, "variable_measured"), + getNullableString(row, "facet_id"), + getNullableString(row, "observation_period"), + getNullableString(row, "measurement_method"), + getNullableString(row, "unit"), + getNullableString(row, "scaling_factor"), + getNullableString(row, "import_name"), + getNullableString(row, "provenance_url"), + getNullableBoolean(row, "is_dc_aggregate"), + deriveProvenance(getNullableString(row, "import_name"))), + getObservationBytes(row)); + } + private static SourceObservationRow buildObservationRow( String observationAbout, String variableMeasured, @@ -102,14 +119,13 @@ private static Observations parseObservations(Struct row) { } private static Observations parseObservations(GenericRecord row) { - Object value = getField(row, "observations"); - if (value == null) { - return Observations.getDefaultInstance(); - } - return parseObservations(toByteArray(value)); + return parseObservations(getObservationBytes(row)); } - private static Observations parseObservations(byte[] protoBytes) { + static Observations parseObservations(byte[] protoBytes) { + if (protoBytes == null || protoBytes.length == 0) { + return Observations.getDefaultInstance(); + } try { return Observations.parseFrom(protoBytes); } catch (Exception e) { @@ -117,6 +133,14 @@ private static Observations parseObservations(byte[] protoBytes) { } } + private static byte[] getObservationBytes(GenericRecord row) { + Object value = getField(row, "observations"); + if (value == null) { + return new byte[0]; + } + return toByteArray(value); + } + private static byte[] toByteArray(Object value) { if (value instanceof ByteBuffer byteBuffer) { ByteBuffer duplicate = byteBuffer.duplicate(); @@ -148,6 +172,9 @@ private static String deriveProvenance(String importName) { record SourceObservationRow(SourceSeriesRow seriesRow, List pointRows) implements Serializable {} +record CompactSourceObservationRow(SourceSeriesRow seriesRow, byte[] observationsProtoBytes) + implements Serializable {} + record SourceSeriesRow( String observationAbout, String variableMeasured, diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java index a30c62ec..a59ef364 100644 --- a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillAvroPipeline.java @@ -5,6 +5,9 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.extensions.avro.io.AvroIO; +import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -48,22 +51,31 @@ static void validateOptions(TimeseriesBackfillOptions options) { static void buildPipeline( Pipeline pipeline, TimeseriesBackfillOptions options, List inputFiles) { - PCollection sourceRows = + PCollection sourceRows = pipeline .apply("CreateAvroFileSpecs", Create.of(inputFiles)) .apply( "ReadAvroSourceRows", - AvroIO.parseAllGenericRecords(SourceObservationRows::toObservationRow) - .withCoder(SerializableCoder.of(SourceObservationRow.class))) + AvroIO.parseAllGenericRecords(SourceObservationRows::toCompactObservationRow) + .withCoder(SerializableCoder.of(CompactSourceObservationRow.class))) .apply( "FilterSourceRows", ParDo.of( - new FilterSourceObservationRowsFn( + new FilterCompactSourceObservationRowsFn( options.getStartObservationAbout(), options.getEndObservationAboutExclusive(), VariableMeasuredFilters.parse(options.getVariableMeasured())))); - sourceRows.setCoder(SerializableCoder.of(SourceObservationRow.class)); - TimeseriesBackfillPipeline.buildPipelineFromSourceRows(sourceRows, options); + sourceRows.setCoder(SerializableCoder.of(CompactSourceObservationRow.class)); + + PCollection mutationGroups = + sourceRows.apply( + "MapMutationGroups", + ParDo.of( + new CompactRowsToMutationGroupsFn( + options.getDestinationTimeSeriesTableName(), + options.getDestinationTimeSeriesAttributeTableName(), + options.getDestinationStatVarObservationTableName()))); + mutationGroups.apply("WriteMutationGroups", TimeseriesBackfillIO.buildGroupedWrite(options)); } static boolean matchesFilters( @@ -83,13 +95,13 @@ static boolean matchesFilters( || variableMeasuredFilters.contains(seriesRow.variableMeasured()); } - static final class FilterSourceObservationRowsFn - extends DoFn { + static final class FilterCompactSourceObservationRowsFn + extends DoFn { private final String startObservationAbout; private final String endObservationAboutExclusive; private final List variableMeasuredFilters; - FilterSourceObservationRowsFn( + FilterCompactSourceObservationRowsFn( String startObservationAbout, String endObservationAboutExclusive, List variableMeasuredFilters) { @@ -100,7 +112,8 @@ static final class FilterSourceObservationRowsFn @ProcessElement public void processElement( - @Element SourceObservationRow sourceRow, OutputReceiver out) { + @Element CompactSourceObservationRow sourceRow, + OutputReceiver out) { if (matchesFilters( sourceRow.seriesRow(), startObservationAbout, @@ -110,4 +123,54 @@ public void processElement( } } } + + static final class CompactRowsToMutationGroupsFn + extends DoFn { + private static final Counter SOURCE_ROWS = + Metrics.counter(CompactRowsToMutationGroupsFn.class, "source_rows"); + private static final Counter TIMESERIES_ROWS = + Metrics.counter(CompactRowsToMutationGroupsFn.class, "timeseries_rows_written"); + private static final Counter TIMESERIES_ATTRIBUTE_ROWS = + Metrics.counter(CompactRowsToMutationGroupsFn.class, "timeseries_attribute_rows_written"); + private static final Counter SOURCE_POINT_ROWS = + Metrics.counter(CompactRowsToMutationGroupsFn.class, "source_point_rows"); + private static final Counter STAT_VAR_OBSERVATION_ROWS = + Metrics.counter(CompactRowsToMutationGroupsFn.class, "stat_var_observation_rows_written"); + private final String timeSeriesTableName; + private final String timeSeriesAttributeTableName; + private final String statVarObservationTableName; + + CompactRowsToMutationGroupsFn( + String timeSeriesTableName, + String timeSeriesAttributeTableName, + String statVarObservationTableName) { + this.timeSeriesTableName = timeSeriesTableName; + this.timeSeriesAttributeTableName = timeSeriesAttributeTableName; + this.statVarObservationTableName = statVarObservationTableName; + } + + @ProcessElement + public void processElement( + @Element CompactSourceObservationRow sourceRow, OutputReceiver out) { + BackfillMutationGroups mutationGroups = + TimeseriesMutationFactory.toMutationGroups( + sourceRow, + timeSeriesTableName, + timeSeriesAttributeTableName, + statVarObservationTableName); + SOURCE_ROWS.inc(); + TIMESERIES_ROWS.inc(); + TIMESERIES_ATTRIBUTE_ROWS.inc(mutationGroups.timeSeriesAttributeRows()); + SOURCE_POINT_ROWS.inc(mutationGroups.statVarObservationRows()); + STAT_VAR_OBSERVATION_ROWS.inc(mutationGroups.statVarObservationRows()); + LocalProgressTracker progressTracker = TimeseriesBackfillPipeline.localProgressTracker(); + if (progressTracker != null) { + progressTracker.recordRow( + mutationGroups.timeSeriesAttributeRows(), mutationGroups.statVarObservationRows()); + } + for (MutationGroup mutationGroup : mutationGroups.groups()) { + out.output(mutationGroup); + } + } + } } diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java index 64c9977c..8da32565 100644 --- a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesBackfillPipeline.java @@ -110,6 +110,10 @@ static void clearLocalProgressTracker() { localProgressTracker = null; } + static LocalProgressTracker localProgressTracker() { + return localProgressTracker; + } + static final class StructToSourceObservationRowsFn extends DoFn { @ProcessElement public void processElement(@Element Struct row, OutputReceiver out) { diff --git a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java index cba0c681..03a03780 100644 --- a/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java +++ b/pipeline/timeseries-backfill/src/main/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactory.java @@ -3,7 +3,9 @@ import com.google.cloud.spanner.Mutation; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; +import org.datacommons.Storage.Observations; /** Converts source rows into normalized timeseries table mutations. */ final class TimeseriesMutationFactory { @@ -112,6 +114,42 @@ static BackfillMutationGroups toMutationGroups( mutationGroups, attributeMutations.size(), pointMutations.size()); } + static BackfillMutationGroups toMutationGroups( + CompactSourceObservationRow row, + String timeSeriesTableName, + String timeSeriesAttributeTableName, + String statVarObservationTableName) { + Mutation timeSeriesMutation = toTimeSeriesMutation(row.seriesRow(), timeSeriesTableName); + List attributeMutations = + toTimeSeriesAttributeMutations(row.seriesRow(), timeSeriesAttributeTableName); + Observations observations = + SourceObservationRows.parseObservations(row.observationsProtoBytes()); + + List mutationGroups = new ArrayList<>(); + List attached = new ArrayList<>(attributeMutations); + int maxAttachedMutations = MAX_GROUP_MUTATIONS - 1; + int pointMutationCount = 0; + for (Map.Entry entry : observations.getValuesMap().entrySet()) { + if (attached.size() >= maxAttachedMutations) { + mutationGroups.add(MutationGroup.create(timeSeriesMutation, attached)); + attached = new ArrayList<>(); + } + attached.add( + toStatVarObservationMutation( + new SourcePointRow( + row.seriesRow().observationAbout(), + row.seriesRow().variableMeasured(), + row.seriesRow().facetId(), + entry.getKey(), + entry.getValue()), + statVarObservationTableName)); + pointMutationCount++; + } + mutationGroups.add(MutationGroup.create(timeSeriesMutation, attached)); + return new BackfillMutationGroups( + mutationGroups, attributeMutations.size(), pointMutationCount); + } + static String seriesId(SourceSeriesRow row) { return SeriesIdGenerator.build(row.variableMeasured(), row.observationAbout(), row.facetId()); } diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java index bc895627..b8f7a51d 100644 --- a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java +++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/SourceObservationRowsTest.java @@ -1,5 +1,6 @@ package org.datacommons.ingestion.timeseries; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import com.google.cloud.ByteArray; @@ -97,6 +98,36 @@ public void toObservationRow_expandsObservationProtoFromGenericRecord() { assertEquals(new SourcePointRow("geoId/06", "Count_Person", "123", "2024", "2"), rows.get(1)); } + @Test + public void toCompactObservationRow_keepsSeriesMetadataAndRawProtoBytes() { + byte[] observationBytes = + Observations.newBuilder() + .putValues("2023", "1") + .putValues("2024", "2") + .build() + .toByteArray(); + GenericRecord row = new GenericData.Record(schemaWithoutGeneratedProvenance()); + row.put("observation_about", "geoId/06"); + row.put("variable_measured", "Count_Person"); + row.put("facet_id", "123"); + row.put("observation_period", null); + row.put("measurement_method", null); + row.put("unit", null); + row.put("scaling_factor", null); + row.put("import_name", "TestImport"); + row.put("provenance_url", "https://example.com"); + row.put("is_dc_aggregate", true); + row.put("observations", ByteBuffer.wrap(observationBytes)); + + CompactSourceObservationRow observationRow = SourceObservationRows.toCompactObservationRow(row); + + assertEquals("geoId/06", observationRow.seriesRow().observationAbout()); + assertEquals("Count_Person", observationRow.seriesRow().variableMeasured()); + assertEquals("dc/base/TestImport", observationRow.seriesRow().provenance()); + assertEquals("https://example.com", observationRow.seriesRow().provenanceUrl()); + assertArrayEquals(observationBytes, observationRow.observationsProtoBytes()); + } + private static Schema schemaWithoutGeneratedProvenance() { return new Schema.Parser() .parse( diff --git a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java index 2618e18e..c0e0ddbc 100644 --- a/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java +++ b/pipeline/timeseries-backfill/src/test/java/org/datacommons/ingestion/timeseries/TimeseriesMutationFactoryTest.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; +import org.datacommons.Storage.Observations; import org.junit.Test; public class TimeseriesMutationFactoryTest { @@ -160,6 +161,39 @@ public void toMutationGroups_splitsLargePointSetsAcrossGroups() { assertEquals("TimeSeries", mutationGroups.groups().get(1).primary().getTable()); } + @Test + public void toMutationGroups_compactRowAvoidsExpandedPointListInput() { + SourceSeriesRow seriesRow = + new SourceSeriesRow( + "geoId/06", + "Count_Person", + "12345", + "", + "", + "", + "", + "TestImport", + "", + false, + "dc/base/TestImport"); + CompactSourceObservationRow compactRow = + new CompactSourceObservationRow( + seriesRow, + Observations.newBuilder() + .putValues("2023", "1") + .putValues("2024", "2") + .build() + .toByteArray()); + + BackfillMutationGroups mutationGroups = + TimeseriesMutationFactory.toMutationGroups( + compactRow, "TimeSeries", "TimeSeriesAttribute", "StatVarObservation"); + + assertEquals(1, mutationGroups.groups().size()); + assertEquals(2, mutationGroups.statVarObservationRows()); + assertEquals("TimeSeries", mutationGroups.groups().get(0).primary().getTable()); + } + @Test public void seriesIdGenerator_replacesSlashesWithUnderscores() { assertEquals( From 88e461c108e22dc9ddd8d78493e086e190015f47 Mon Sep 17 00:00:00 2001 From: Rohit Kumar Date: Fri, 24 Apr 2026 04:52:31 +0000 Subject: [PATCH 3/4] docs: update timeseries-backfill README with execution paths and export instructions, and add sample Avro files --- pipeline/timeseries-backfill/README.md | 34 +++++++++++++++++++------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/pipeline/timeseries-backfill/README.md b/pipeline/timeseries-backfill/README.md index c6fc1019..a6808afd 100644 --- a/pipeline/timeseries-backfill/README.md +++ b/pipeline/timeseries-backfill/README.md @@ -2,11 +2,15 @@ This module backfills the normalized timeseries tables from the legacy `Observation` table in Spanner. +It supports two execution paths: +- **Spanner to Spanner**: Reads directly from the live `Observation` table in Spanner. This approach is direct and doesn't require an intermediate export, but it queries the live database which might impact performance or require stable snapshots for consistency. **Note:** Complete migration using this path failed as queries timed out in Spanner after 2 hours. It is recommended only for targeted small batches (e.g., scoped by variable measured + geo combination). +- **Avro to Spanner**: Reads from exported `Observation` Avro files on GCS. This approach is decoupled from the live database for reading, making it suitable for large-scale backfills or when reading from a specific historical export. **Note:** For large backfills, this path is recommended. However, filtering in Avro reads all files in the selected directory/list. It supports 2 modes: all files in a directory or a single file. **Crucially, the Spanner table backup needs to be taken separately for this path to work (see One-Time Prep).** + By default it writes: -- `TimeSeries_rk` -- `TimeSeriesAttribute_rk` -- `StatVarObservation_rk` +- `TimeSeries_` +- `TimeSeriesAttribute_` +- `StatVarObservation_` It does not populate `ObservationAttribute` in v1. @@ -18,7 +22,7 @@ Source table: Destination schema: -- [rk-experiments/mixer/spanner/bq_spanner_ingestion/timeseries_schema.sql](/home/rohitrkumar_google_com/Documents/dc/github/rohitkumarbhagat/dc_local/rk-experiments/mixer/spanner/bq_spanner_ingestion/timeseries_schema.sql) +- [timeseries_schema.sql](/timeseries_schema.sql) Important assumptions used by this module: @@ -51,9 +55,9 @@ Core flags: - `--sourceObservationTableName`: Source table name. Normally `Observation`. - `--inputExportDir`: Avro-only. Spanner export directory containing `Observation-manifest.json`. - `--inputFiles`: Avro-only. Comma-separated exact Avro file paths. -- `--destinationTimeSeriesTableName`: Destination parent series table. Normally `TimeSeries_rk`. -- `--destinationTimeSeriesAttributeTableName`: Destination series-attribute table. Normally `TimeSeriesAttribute_rk`. -- `--destinationStatVarObservationTableName`: Destination point table. Normally `StatVarObservation_rk`. +- `--destinationTimeSeriesTableName`: Destination parent series table. Normally `TimeSeries_`. +- `--destinationTimeSeriesAttributeTableName`: Destination series-attribute table. Normally `TimeSeriesAttribute_`. +- `--destinationStatVarObservationTableName`: Destination point table. Normally `StatVarObservation_`. - `--readTimestamp`: Source read snapshot in RFC3339 format, for example `2026-04-22T00:00:00Z`. When set, the job reads from that exact Spanner snapshot for consistent backfill semantics. When empty, the code uses a Spanner strong read instead. - `--variableMeasured`: Fixed `variable_measured` filter. Use one stat var or a comma-separated list such as `Count_Person,Min_Temperature`. - `--startObservationAbout`: Inclusive lower bound on `observation_about`. Useful for sharding by place range. @@ -130,6 +134,18 @@ Suggested order to test if the sink is too memory-heavy or too serialized: ## One-Time Prep +### Exporting Observation Table (for Avro path) +If you plan to use the Avro execution path, you must first export the `Observation` table from Spanner to GCS as Avro files. Run this command to start the Dataflow export job: + +```bash +gcloud dataflow jobs run observation-export-$(date +%Y%m%d-%H%M%S) \ + --project=datcom-store \ + --region=us-central1 \ + --gcs-location=gs://dataflow-templates-us-central1/latest/Cloud_Spanner_to_GCS_Avro \ + --max-workers=50 \ + --parameters=instanceId=dc-kg-test,databaseId=dc_graph_2026_01_27,spannerProjectId=datcom-store,tableNames=Observation,outputDir=gs:///spanner_obs_dump_2026_04_21,dataBoostEnabled=true,spannerPriority=LOW +``` + If you see a missing artifact error for `org.datacommons:datacommons-import-util`, install that top-level module once from the `import/pipeline` directory: ```bash @@ -197,7 +213,7 @@ mvn -Pgit-worktree compile exec:java \ mvn -Pgit-worktree compile exec:java \ -pl timeseries-backfill -am \ -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ - -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputFiles=/usr/local/google/home/rohitrkumar/Documents/dc/github/rohitkumarbhagat/import/pipeline/Observation.avro-00042-of-00303 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DirectRunner" + -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputFiles=/import/pipeline/Observation.avro-00042-of-00303 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DirectRunner" ``` @@ -256,7 +272,7 @@ Run this from the `import` repo root to drop any existing experimental normalize ./pipeline/timeseries-backfill/recreate_timeseries_tables.sh datcom-store dc-kg-test dc_graph_5 ``` -The script reads the current database DDL, drops the normalized `_rk` tables and indexes if they already exist, and then reapplies a suffixed form of [timeseries_schema.sql](/home/rohitrkumar_google_com/Documents/dc/github/rohitkumarbhagat/dc_local/rk-experiments/mixer/spanner/bq_spanner_ingestion/timeseries_schema.sql). +The script reads the current database DDL, drops the normalized `_rk` tables and indexes if they already exist, and then reapplies a suffixed form of [timeseries_schema.sql](/timeseries_schema.sql). ## Notes From 813e8aeaae14242564c57542de02f461755fdc67 Mon Sep 17 00:00:00 2001 From: Rohit Kumar Date: Fri, 24 Apr 2026 05:09:47 +0000 Subject: [PATCH 4/4] chore: remove obsolete timeseries table recreation script and add Avro data files --- .../recreate_timeseries_tables.sh | 69 ------------------- 1 file changed, 69 deletions(-) delete mode 100755 pipeline/timeseries-backfill/recreate_timeseries_tables.sh diff --git a/pipeline/timeseries-backfill/recreate_timeseries_tables.sh b/pipeline/timeseries-backfill/recreate_timeseries_tables.sh deleted file mode 100755 index 6ee0a86a..00000000 --- a/pipeline/timeseries-backfill/recreate_timeseries_tables.sh +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -if [[ $# -ne 3 ]]; then - echo "Usage: $0 " >&2 - exit 1 -fi - -project_id="$1" -instance_id="$2" -database_id="$3" - -script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -schema_file="${script_dir}/../../../rk-experiments/mixer/spanner/bq_spanner_ingestion/timeseries_schema.sql" -schema_with_suffix_file="$(mktemp)" - -if [[ ! -f "${schema_file}" ]]; then - echo "Schema file not found: ${schema_file}" >&2 - exit 1 -fi - -current_schema_file="$(mktemp)" -drop_ddl_file="$(mktemp)" -trap 'rm -f "${current_schema_file}" "${drop_ddl_file}" "${schema_with_suffix_file}"' EXIT - -perl -0pe ' - s/\bTimeSeriesAttributePropertyValue\b/TimeSeriesAttributePropertyValue_rk/g; - s/\bTimeSeriesAttributeValue\b/TimeSeriesAttributeValue_rk/g; - s/\bTimeSeriesByProvenance\b/TimeSeriesByProvenance_rk/g; - s/\bTimeSeriesByVariableMeasured\b/TimeSeriesByVariableMeasured_rk/g; - s/\bObservationAttribute\b/ObservationAttribute_rk/g; - s/\bStatVarObservation\b/StatVarObservation_rk/g; - s/\bTimeSeriesAttribute\b/TimeSeriesAttribute_rk/g; - s/\bTimeSeries\b/TimeSeries_rk/g; -' "${schema_file}" > "${schema_with_suffix_file}" - -gcloud spanner databases ddl describe "${database_id}" \ - --project="${project_id}" \ - --instance="${instance_id}" \ - > "${current_schema_file}" - -append_if_present() { - local pattern="$1" - local ddl="$2" - if rg -q "${pattern}" "${current_schema_file}"; then - printf '%s\n' "${ddl}" >> "${drop_ddl_file}" - fi -} - -append_if_present "CREATE INDEX TimeSeriesByProvenance_rk " "DROP INDEX TimeSeriesByProvenance_rk" -append_if_present "CREATE INDEX TimeSeriesByVariableMeasured_rk " "DROP INDEX TimeSeriesByVariableMeasured_rk" -append_if_present "CREATE INDEX TimeSeriesAttributePropertyValue_rk " "DROP INDEX TimeSeriesAttributePropertyValue_rk" -append_if_present "CREATE INDEX TimeSeriesAttributeValue_rk " "DROP INDEX TimeSeriesAttributeValue_rk" -append_if_present "CREATE TABLE ObservationAttribute_rk " "DROP TABLE ObservationAttribute_rk" -append_if_present "CREATE TABLE StatVarObservation_rk " "DROP TABLE StatVarObservation_rk" -append_if_present "CREATE TABLE TimeSeriesAttribute_rk " "DROP TABLE TimeSeriesAttribute_rk" -append_if_present "CREATE TABLE TimeSeries_rk " "DROP TABLE TimeSeries_rk" - -if [[ -s "${drop_ddl_file}" ]]; then - gcloud spanner databases ddl update "${database_id}" \ - --project="${project_id}" \ - --instance="${instance_id}" \ - --ddl-file="${drop_ddl_file}" -fi - -gcloud spanner databases ddl update "${database_id}" \ - --project="${project_id}" \ - --instance="${instance_id}" \ - --ddl-file="${schema_with_suffix_file}"