From fa08a1afaefddefb0fa2790cc083871afc1d41a2 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Mon, 27 Apr 2026 18:44:54 -0400 Subject: [PATCH] Successfully tested Data Bridge between the CDC Data job running local, and the Ingestion workflow. --- simple/stats/runner.py | 163 +++++++++++++++++++++- simple/stats/variable_per_row_importer.py | 55 +++++++- 2 files changed, 216 insertions(+), 2 deletions(-) diff --git a/simple/stats/runner.py b/simple/stats/runner.py index 793542ed..2f27ed9d 100644 --- a/simple/stats/runner.py +++ b/simple/stats/runner.py @@ -10,6 +10,7 @@ from stats.config import Config from stats.data import ImportType from stats.data import InputFileFormat +from stats.data import McfNode from stats.data import ParentSVG2ChildSpecializedNames from stats.data import Triple from stats.data import VerticalSpec @@ -42,6 +43,7 @@ class RunMode(StrEnum): CUSTOM_DC = "customdc" SCHEMA_UPDATE = "schemaupdate" MAIN_DC = "maindc" + DCP_BRIDGE = "dcpbridge" class Runner: @@ -127,7 +129,7 @@ def run(self): if self.mode == RunMode.SCHEMA_UPDATE: logging.info("Skipping imports because run mode is schema update.") - elif self.mode == RunMode.CUSTOM_DC or self.mode == RunMode.MAIN_DC: + elif self.mode == RunMode.CUSTOM_DC or self.mode == RunMode.MAIN_DC or self.mode == RunMode.DCP_BRIDGE: self._run_imports_and_do_post_import_work() else: @@ -213,6 +215,9 @@ def _run_imports_and_do_post_import_work(self): # Generate NL artifacts (sentences, embeddings, topic cache). self._generate_nl_artifacts() + if self.mode == RunMode.DCP_BRIDGE: + self._export_data_to_files_for_dcp() + # Write import info to DB. self.db.insert_import_info(status=ImportStatus.SUCCESS) @@ -238,6 +243,161 @@ def _generate_nl_artifacts(self): topic_cache_triples = topic_triples + sv_peer_group_triples nl.generate_topic_cache(topic_cache_triples, self.nl_dir) + def _export_data_to_files_for_dcp(self): + logging.info("Exporting data to files for DCP Bridge.") + import pandas as pd + + # 1. Export observations to MCF + obs_tuples = self.db.engine.fetch_all("select * from observations") + if obs_tuples: + columns = ["entity", "variable", "date", "value", "provenance", "unit", "scaling_factor", "measurement_method", "observation_period", "properties"] + df = pd.DataFrame(obs_tuples, columns=columns) + + import hashlib + + mcf_nodes = [] + for i, row in df.iterrows(): + node_lines = [] + node_lines.append(f"Node: E:obs->{i}") + node_lines.append("typeOf: dcid:StatVarObservation") + node_lines.append(f"observationAbout: dcid:{row['entity']}") + node_lines.append(f"variableMeasured: dcid:{row['variable']}") + node_lines.append(f"observationDate: \"{row['date']}\"") + node_lines.append(f"value: {row['value']}") + + # Generate a unique dcid for the observation + content_str = f"{row['entity']}_{row['variable']}_{row['date']}_{row['value']}" + obs_hash = hashlib.md5(content_str.encode()).hexdigest() + node_lines.append(f"dcid: \"dc/o/{obs_hash}\"") + + if pd.notna(row['unit']) and row['unit']: + node_lines.append(f"unit: dcid:{row['unit']}") + if pd.notna(row['measurement_method']) and row['measurement_method']: + node_lines.append(f"measurementMethod: dcid:{row['measurement_method']}") + if pd.notna(row['observation_period']) and row['observation_period']: + node_lines.append(f"observationPeriod: \"{row['observation_period']}\"") + if pd.notna(row['scaling_factor']) and row['scaling_factor']: + node_lines.append(f"scalingFactor: {row['scaling_factor']}") + + mcf_nodes.append("\n".join(node_lines)) + + mcf = "\n\n".join(mcf_nodes) + obs_file = self.output_dir.open_file("observations.mcf") + obs_file.write(mcf) + logging.info("Exported %s observations to %s", len(df), obs_file.full_path()) + + # 2. Export triples to MCF + triples_tuples = self.db.engine.fetch_all("select * from triples") + if triples_tuples: + nodes = {} + for tuple in triples_tuples: + subject_id, predicate, object_id, object_value = tuple + node = nodes.get(subject_id) + if not node: + node = McfNode(subject_id) + nodes[subject_id] = node + + if object_id: + node.add_triple(Triple(subject_id, predicate, object_id=object_id)) + else: + node.add_triple(Triple(subject_id, predicate, object_value=object_value)) + + mcf = "\n\n".join(map(lambda node: node.to_mcf(), nodes.values())) + mcf_file = self.output_dir.open_file("schema.mcf") + mcf_file.write(mcf) + logging.info("Exported %s nodes to %s", len(nodes), mcf_file.full_path()) + + # 3. Upload to GCS and Trigger Workflow + self._upload_and_trigger_dcp_workflow() + + def _upload_and_trigger_dcp_workflow(self): + import os + import subprocess + import json + from datetime import datetime + + bucket = os.getenv("DCP_INGESTION_BUCKET") + spanner_instance = os.getenv("SPANNER_INSTANCE_ID") + spanner_database = os.getenv("SPANNER_DATABASE_ID") + region = os.getenv("GCP_REGION", "us-central1") + project = os.getenv("GCP_PROJECT") + workflow_name = os.getenv("WORKFLOW_NAME") + + if not all([bucket, spanner_instance, spanner_database, project, workflow_name]): + logging.error("Missing required environment variables for DCP Bridge. Skipping upload and trigger.") + logging.error("Required: DCP_INGESTION_BUCKET, SPANNER_INSTANCE_ID, SPANNER_DATABASE_ID, GCP_PROJECT, WORKFLOW_NAME") + return + + logging.info("Uploading artifacts to GCS bucket: %s", bucket) + + import_gcs_dir = f"gs://{bucket}/imports/dcpbridge_testing/" + files_uploaded = False + + # Upload observations if file exists + obs_file = self.output_dir.open_file("observations.mcf") + obs_local_path = obs_file.syspath() if hasattr(obs_file, 'syspath') else obs_file.path + if obs_local_path and os.path.exists(obs_local_path): + obs_gcs_path = f"{import_gcs_dir}observations.mcf" + logging.info("Uploading %s to %s", obs_local_path, obs_gcs_path) + subprocess.run(["gcloud", "storage", "cp", obs_local_path, obs_gcs_path], check=True) + files_uploaded = True + + # Upload schema if file exists + mcf_file = self.output_dir.open_file("schema.mcf") + mcf_local_path = mcf_file.syspath() if hasattr(mcf_file, 'syspath') else mcf_file.path + if mcf_local_path and os.path.exists(mcf_local_path): + schema_gcs_path = f"{import_gcs_dir}schema.mcf" + logging.info("Uploading %s to %s", mcf_local_path, schema_gcs_path) + subprocess.run(["gcloud", "storage", "cp", mcf_local_path, schema_gcs_path], check=True) + files_uploaded = True + + # Upload NL artifacts + nl_local_path = self.nl_dir.syspath() if hasattr(self.nl_dir, 'syspath') else self.nl_dir.path + if nl_local_path and os.path.exists(nl_local_path): + nl_gcs_path = f"gs://{bucket}/nl/" + logging.info("Uploading NL artifacts from %s to %s", nl_local_path, nl_gcs_path) + subprocess.run(["gcloud", "storage", "cp", "-r", nl_local_path, nl_gcs_path], check=True) + + if not files_uploaded: + logging.warning("No artifacts to import. Skipping workflow trigger.") + return + + # Construct import list with single directory entry + prov_names = list(self.config.provenances.keys()) + logical_import_name = prov_names[0] if prov_names else "DCP_Bridge_Import" + + import_list = [{"importName": logical_import_name, "graphPath": import_gcs_dir}] + + # Construct payload + import_name = f"dcp_bridge_{datetime.now().strftime('%Y%m%d%H%M%S')}" + + payload = { + "spannerInstanceId": spanner_instance, + "spannerDatabaseId": spanner_database, + "importName": import_name, + "importList": json.dumps(import_list), + "tempLocation": f"gs://{bucket}/temp", + "region": region + } + + payload_str = json.dumps(payload) + + # Trigger workflow + logging.info("Triggering Cloud Workflow: %s", workflow_name) + cmd = [ + "gcloud", "workflows", "run", workflow_name, + f"--project={project}", + f"--location={region}", + f"--data={payload_str}" + ] + + logging.info("Running command: %s", " ".join(cmd)) + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode == 0: + logging.info("Successfully triggered workflow. Output: %s", result.stdout) + else: + logging.error("Failed to trigger workflow. Error: %s", result.stderr) + def _generate_svg_hierarchy(self): if self.mode == RunMode.MAIN_DC: logging.info("Hierarchy generation not supported for main dc, skipping.") @@ -385,6 +545,7 @@ def _create_importer(self, input_file: File) -> Importer: db=self.db, reporter=reporter, nodes=self.nodes, + mode=self.mode, ) return ObservationsImporter( input_file=input_file, diff --git a/simple/stats/variable_per_row_importer.py b/simple/stats/variable_per_row_importer.py index 3c36b4bb..442725ee 100644 --- a/simple/stats/variable_per_row_importer.py +++ b/simple/stats/variable_per_row_importer.py @@ -20,6 +20,7 @@ from stats.data import Observation from stats.data import ObservationProperties from stats.data import strip_namespace +from stats.data import Triple from stats.db import Db from stats.importer import Importer from stats.nodes import Nodes @@ -54,12 +55,13 @@ class VariablePerRowImporter(Importer): """ def __init__(self, input_file: File, db: Db, reporter: FileImportReporter, - nodes: Nodes) -> None: + nodes: Nodes, mode: str = "customdc") -> None: self.input_file = input_file self.db = db self.reporter = reporter self.nodes = nodes self.config = nodes.config + self.mode = mode # Reassign after reading CSV. self.column_mappings = dict(_DEFAULT_COLUMN_MAPPINGS) self.reader: DictReader = None @@ -160,3 +162,54 @@ def _add_entity_nodes(self) -> None: logging.info("Importing %s of %s entities.", len(dcid2type), len(new_entity_dcids)) self.nodes.entities_with_types(dcid2type) + + if self.mode == "dcpbridge": + # Get parent places chain and their details + logging.info("Getting parent places chain from DC for %s entities.", len(new_entity_dcids)) + + resolved_dcids = set() + current_level_dcids = set(new_entity_dcids) + + while current_level_dcids: + # Filter out already resolved ones to optimize API calls + to_resolve = list(current_level_dcids - resolved_dcids) + if not to_resolve: + break + + logging.info("Resolving parents for %s entities.", len(to_resolve)) + dcid2parents = dc.get_property_of_entities(to_resolve, "containedInPlace") + + if not dcid2parents: + break + + parent_triples = [] + next_level_dcids = set() + + for child_dcid, parent_dcid in dcid2parents.items(): + parent_triples.append(Triple(child_dcid, "containedInPlace", object_id=parent_dcid)) + next_level_dcids.add(parent_dcid) + + self.db.insert_triples(parent_triples) + + # Mark current as resolved + resolved_dcids.update(to_resolve) + + # Prepare for next level + current_level_dcids = next_level_dcids + + # Resolve details for all unique parent places found in chain (excluding original ones) + all_parents = resolved_dcids - set(new_entity_dcids) + if all_parents: + logging.info("Resolving details for %s unique parent places found in chain.", len(all_parents)) + parent_dcids_list = list(all_parents) + + parent_dcid2type = dc.get_property_of_entities(parent_dcids_list, sc.PREDICATE_TYPE_OF) + parent_dcid2name = dc.get_property_of_entities(parent_dcids_list, sc.PREDICATE_NAME) + + parent_info_triples = [] + for dcid, type_str in parent_dcid2type.items(): + parent_info_triples.append(Triple(dcid, sc.PREDICATE_TYPE_OF, object_id=type_str)) + for dcid, name_str in parent_dcid2name.items(): + parent_info_triples.append(Triple(dcid, sc.PREDICATE_NAME, object_value=name_str)) + + self.db.insert_triples(parent_info_triples)