Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 162 additions & 1 deletion simple/stats/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from stats.config import Config
from stats.data import ImportType
from stats.data import InputFileFormat
from stats.data import McfNode
from stats.data import ParentSVG2ChildSpecializedNames
from stats.data import Triple
from stats.data import VerticalSpec
Comment on lines 10 to 16
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Several inline imports were added in the new methods _export_data_to_files_for_dcp and _upload_and_trigger_dcp_workflow. Per PEP 8, imports should be at the top of the file. This improves readability and ensures dependencies are resolved when the module is loaded.

Suggested change
from stats.config import Config
from stats.data import ImportType
from stats.data import InputFileFormat
from stats.data import McfNode
from stats.data import ParentSVG2ChildSpecializedNames
from stats.data import Triple
from stats.data import VerticalSpec
import hashlib
import json
import os
import subprocess
from datetime import datetime
import pandas as pd
from stats.config import Config
from stats.data import ImportType
from stats.data import InputFileFormat
from stats.data import McfNode
from stats.data import ParentSVG2ChildSpecializedNames
from stats.data import Triple
from stats.data import VerticalSpec
References
  1. Imports are always put at the top of the file, just after any module comments and docstrings, and before module globals and constants. (link)

Expand Down Expand Up @@ -42,6 +43,7 @@ class RunMode(StrEnum):
CUSTOM_DC = "customdc"
SCHEMA_UPDATE = "schemaupdate"
MAIN_DC = "maindc"
DCP_BRIDGE = "dcpbridge"


class Runner:
Expand Down Expand Up @@ -127,7 +129,7 @@ def run(self):
if self.mode == RunMode.SCHEMA_UPDATE:
logging.info("Skipping imports because run mode is schema update.")

elif self.mode == RunMode.CUSTOM_DC or self.mode == RunMode.MAIN_DC:
elif self.mode == RunMode.CUSTOM_DC or self.mode == RunMode.MAIN_DC or self.mode == RunMode.DCP_BRIDGE:
self._run_imports_and_do_post_import_work()

else:
Expand Down Expand Up @@ -213,6 +215,9 @@ def _run_imports_and_do_post_import_work(self):
# Generate NL artifacts (sentences, embeddings, topic cache).
self._generate_nl_artifacts()

if self.mode == RunMode.DCP_BRIDGE:
self._export_data_to_files_for_dcp()

# Write import info to DB.
self.db.insert_import_info(status=ImportStatus.SUCCESS)

Expand All @@ -238,6 +243,161 @@ def _generate_nl_artifacts(self):
topic_cache_triples = topic_triples + sv_peer_group_triples
nl.generate_topic_cache(topic_cache_triples, self.nl_dir)

def _export_data_to_files_for_dcp(self):
logging.info("Exporting data to files for DCP Bridge.")
import pandas as pd

# 1. Export observations to MCF
obs_tuples = self.db.engine.fetch_all("select * from observations")
if obs_tuples:
columns = ["entity", "variable", "date", "value", "provenance", "unit", "scaling_factor", "measurement_method", "observation_period", "properties"]
df = pd.DataFrame(obs_tuples, columns=columns)
Comment on lines +251 to +254
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Fetching all observations from the database into memory at once using fetch_all can lead to memory exhaustion (OOM) if the dataset is large. It is safer to process the data in chunks or stream the results from the database.


import hashlib

mcf_nodes = []
for i, row in df.iterrows():
node_lines = []
node_lines.append(f"Node: E:obs->{i}")
node_lines.append("typeOf: dcid:StatVarObservation")
node_lines.append(f"observationAbout: dcid:{row['entity']}")
node_lines.append(f"variableMeasured: dcid:{row['variable']}")
node_lines.append(f"observationDate: \"{row['date']}\"")
node_lines.append(f"value: {row['value']}")

# Generate a unique dcid for the observation
content_str = f"{row['entity']}_{row['variable']}_{row['date']}_{row['value']}"
obs_hash = hashlib.md5(content_str.encode()).hexdigest()
node_lines.append(f"dcid: \"dc/o/{obs_hash}\"")
Comment on lines +269 to +271
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Including the observation value in the DCID hash is problematic. If a value is corrected or updated for the same entity, variable, and date, this logic will generate a new DCID, resulting in a duplicate observation node in Data Commons instead of an update. Typically, a StatVarObservation DCID should be derived from its identity-defining properties (entity, variable, date, and measurement properties) but exclude the value itself.

Suggested change
content_str = f"{row['entity']}_{row['variable']}_{row['date']}_{row['value']}"
obs_hash = hashlib.md5(content_str.encode()).hexdigest()
node_lines.append(f"dcid: \"dc/o/{obs_hash}\"")
# Generate a unique dcid for the observation
content_str = f"{row['entity']}_{row['variable']}_{row['date']}"
obs_hash = hashlib.md5(content_str.encode()).hexdigest()
node_lines.append(f"dcid: \"dc/o/{obs_hash}\"")


if pd.notna(row['unit']) and row['unit']:
node_lines.append(f"unit: dcid:{row['unit']}")
if pd.notna(row['measurement_method']) and row['measurement_method']:
node_lines.append(f"measurementMethod: dcid:{row['measurement_method']}")
if pd.notna(row['observation_period']) and row['observation_period']:
node_lines.append(f"observationPeriod: \"{row['observation_period']}\"")
if pd.notna(row['scaling_factor']) and row['scaling_factor']:
node_lines.append(f"scalingFactor: {row['scaling_factor']}")

mcf_nodes.append("\n".join(node_lines))

mcf = "\n\n".join(mcf_nodes)
obs_file = self.output_dir.open_file("observations.mcf")
obs_file.write(mcf)
logging.info("Exported %s observations to %s", len(df), obs_file.full_path())

# 2. Export triples to MCF
triples_tuples = self.db.engine.fetch_all("select * from triples")
if triples_tuples:
nodes = {}
for tuple in triples_tuples:
subject_id, predicate, object_id, object_value = tuple
node = nodes.get(subject_id)
if not node:
node = McfNode(subject_id)
nodes[subject_id] = node

if object_id:
node.add_triple(Triple(subject_id, predicate, object_id=object_id))
else:
node.add_triple(Triple(subject_id, predicate, object_value=object_value))

mcf = "\n\n".join(map(lambda node: node.to_mcf(), nodes.values()))
mcf_file = self.output_dir.open_file("schema.mcf")
mcf_file.write(mcf)
logging.info("Exported %s nodes to %s", len(nodes), mcf_file.full_path())

# 3. Upload to GCS and Trigger Workflow
self._upload_and_trigger_dcp_workflow()

def _upload_and_trigger_dcp_workflow(self):
import os
import subprocess
import json
from datetime import datetime

bucket = os.getenv("DCP_INGESTION_BUCKET")
spanner_instance = os.getenv("SPANNER_INSTANCE_ID")
spanner_database = os.getenv("SPANNER_DATABASE_ID")
region = os.getenv("GCP_REGION", "us-central1")
project = os.getenv("GCP_PROJECT")
workflow_name = os.getenv("WORKFLOW_NAME")

if not all([bucket, spanner_instance, spanner_database, project, workflow_name]):
logging.error("Missing required environment variables for DCP Bridge. Skipping upload and trigger.")
logging.error("Required: DCP_INGESTION_BUCKET, SPANNER_INSTANCE_ID, SPANNER_DATABASE_ID, GCP_PROJECT, WORKFLOW_NAME")
return

logging.info("Uploading artifacts to GCS bucket: %s", bucket)

import_gcs_dir = f"gs://{bucket}/imports/dcpbridge_testing/"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The GCS path contains dcpbridge_testing, which suggests this might be a temporary path used during development. If this is intended for production, consider making the path configurable or using a more permanent directory name.

files_uploaded = False

# Upload observations if file exists
obs_file = self.output_dir.open_file("observations.mcf")
obs_local_path = obs_file.syspath() if hasattr(obs_file, 'syspath') else obs_file.path
if obs_local_path and os.path.exists(obs_local_path):
obs_gcs_path = f"{import_gcs_dir}observations.mcf"
logging.info("Uploading %s to %s", obs_local_path, obs_gcs_path)
subprocess.run(["gcloud", "storage", "cp", obs_local_path, obs_gcs_path], check=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using subprocess to call gcloud commands is less robust than using the official Google Cloud Python client libraries (e.g., google-cloud-storage and google-cloud-workflows). Client libraries provide better error handling, are more efficient, and remove the dependency on the gcloud CLI being installed in the runtime environment.

files_uploaded = True

# Upload schema if file exists
mcf_file = self.output_dir.open_file("schema.mcf")
mcf_local_path = mcf_file.syspath() if hasattr(mcf_file, 'syspath') else mcf_file.path
if mcf_local_path and os.path.exists(mcf_local_path):
schema_gcs_path = f"{import_gcs_dir}schema.mcf"
logging.info("Uploading %s to %s", mcf_local_path, schema_gcs_path)
subprocess.run(["gcloud", "storage", "cp", mcf_local_path, schema_gcs_path], check=True)
files_uploaded = True

# Upload NL artifacts
nl_local_path = self.nl_dir.syspath() if hasattr(self.nl_dir, 'syspath') else self.nl_dir.path
if nl_local_path and os.path.exists(nl_local_path):
nl_gcs_path = f"gs://{bucket}/nl/"
logging.info("Uploading NL artifacts from %s to %s", nl_local_path, nl_gcs_path)
subprocess.run(["gcloud", "storage", "cp", "-r", nl_local_path, nl_gcs_path], check=True)

if not files_uploaded:
logging.warning("No artifacts to import. Skipping workflow trigger.")
return

# Construct import list with single directory entry
prov_names = list(self.config.provenances.keys())
logical_import_name = prov_names[0] if prov_names else "DCP_Bridge_Import"

import_list = [{"importName": logical_import_name, "graphPath": import_gcs_dir}]

# Construct payload
import_name = f"dcp_bridge_{datetime.now().strftime('%Y%m%d%H%M%S')}"

payload = {
"spannerInstanceId": spanner_instance,
"spannerDatabaseId": spanner_database,
"importName": import_name,
"importList": json.dumps(import_list),
"tempLocation": f"gs://{bucket}/temp",
"region": region
}

payload_str = json.dumps(payload)

# Trigger workflow
logging.info("Triggering Cloud Workflow: %s", workflow_name)
cmd = [
"gcloud", "workflows", "run", workflow_name,
f"--project={project}",
f"--location={region}",
f"--data={payload_str}"
]

logging.info("Running command: %s", " ".join(cmd))
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logging.info("Successfully triggered workflow. Output: %s", result.stdout)
else:
logging.error("Failed to trigger workflow. Error: %s", result.stderr)

def _generate_svg_hierarchy(self):
if self.mode == RunMode.MAIN_DC:
logging.info("Hierarchy generation not supported for main dc, skipping.")
Expand Down Expand Up @@ -385,6 +545,7 @@ def _create_importer(self, input_file: File) -> Importer:
db=self.db,
reporter=reporter,
nodes=self.nodes,
mode=self.mode,
)
return ObservationsImporter(
input_file=input_file,
Expand Down
55 changes: 54 additions & 1 deletion simple/stats/variable_per_row_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from stats.data import Observation
from stats.data import ObservationProperties
from stats.data import strip_namespace
from stats.data import Triple
from stats.db import Db
from stats.importer import Importer
from stats.nodes import Nodes
Expand Down Expand Up @@ -54,12 +55,13 @@ class VariablePerRowImporter(Importer):
"""

def __init__(self, input_file: File, db: Db, reporter: FileImportReporter,
nodes: Nodes) -> None:
nodes: Nodes, mode: str = "customdc") -> None:
self.input_file = input_file
self.db = db
self.reporter = reporter
self.nodes = nodes
self.config = nodes.config
self.mode = mode
# Reassign after reading CSV.
self.column_mappings = dict(_DEFAULT_COLUMN_MAPPINGS)
self.reader: DictReader = None
Expand Down Expand Up @@ -160,3 +162,54 @@ def _add_entity_nodes(self) -> None:
logging.info("Importing %s of %s entities.", len(dcid2type),
len(new_entity_dcids))
self.nodes.entities_with_types(dcid2type)

if self.mode == "dcpbridge":
# Get parent places chain and their details
logging.info("Getting parent places chain from DC for %s entities.", len(new_entity_dcids))

resolved_dcids = set()
current_level_dcids = set(new_entity_dcids)

while current_level_dcids:
# Filter out already resolved ones to optimize API calls
to_resolve = list(current_level_dcids - resolved_dcids)
if not to_resolve:
break

logging.info("Resolving parents for %s entities.", len(to_resolve))
dcid2parents = dc.get_property_of_entities(to_resolve, "containedInPlace")

if not dcid2parents:
break

parent_triples = []
next_level_dcids = set()

for child_dcid, parent_dcid in dcid2parents.items():
parent_triples.append(Triple(child_dcid, "containedInPlace", object_id=parent_dcid))
next_level_dcids.add(parent_dcid)

self.db.insert_triples(parent_triples)

# Mark current as resolved
resolved_dcids.update(to_resolve)

# Prepare for next level
current_level_dcids = next_level_dcids
Comment on lines +173 to +198
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This recursive parent resolution logic is executed for every file import. Since resolved_dcids is local to this method, the same parent hierarchies (e.g., Country, State) will be re-fetched from the Data Commons API repeatedly if they appear across different files. This is inefficient and could lead to performance bottlenecks or hitting API rate limits. Consider caching these resolutions in a shared state, such as within the Nodes object or a dedicated cache.


# Resolve details for all unique parent places found in chain (excluding original ones)
all_parents = resolved_dcids - set(new_entity_dcids)
if all_parents:
logging.info("Resolving details for %s unique parent places found in chain.", len(all_parents))
parent_dcids_list = list(all_parents)

parent_dcid2type = dc.get_property_of_entities(parent_dcids_list, sc.PREDICATE_TYPE_OF)
parent_dcid2name = dc.get_property_of_entities(parent_dcids_list, sc.PREDICATE_NAME)

parent_info_triples = []
for dcid, type_str in parent_dcid2type.items():
parent_info_triples.append(Triple(dcid, sc.PREDICATE_TYPE_OF, object_id=type_str))
for dcid, name_str in parent_dcid2name.items():
parent_info_triples.append(Triple(dcid, sc.PREDICATE_NAME, object_value=name_str))

self.db.insert_triples(parent_info_triples)