-
Notifications
You must be signed in to change notification settings - Fork 35
[CDC to DCP Data Bridge] Succesful test of a data bridge #502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |||||||||||||||
| from stats.config import Config | ||||||||||||||||
| from stats.data import ImportType | ||||||||||||||||
| from stats.data import InputFileFormat | ||||||||||||||||
| from stats.data import McfNode | ||||||||||||||||
| from stats.data import ParentSVG2ChildSpecializedNames | ||||||||||||||||
| from stats.data import Triple | ||||||||||||||||
| from stats.data import VerticalSpec | ||||||||||||||||
|
|
@@ -42,6 +43,7 @@ class RunMode(StrEnum): | |||||||||||||||
| CUSTOM_DC = "customdc" | ||||||||||||||||
| SCHEMA_UPDATE = "schemaupdate" | ||||||||||||||||
| MAIN_DC = "maindc" | ||||||||||||||||
| DCP_BRIDGE = "dcpbridge" | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| class Runner: | ||||||||||||||||
|
|
@@ -127,7 +129,7 @@ def run(self): | |||||||||||||||
| if self.mode == RunMode.SCHEMA_UPDATE: | ||||||||||||||||
| logging.info("Skipping imports because run mode is schema update.") | ||||||||||||||||
|
|
||||||||||||||||
| elif self.mode == RunMode.CUSTOM_DC or self.mode == RunMode.MAIN_DC: | ||||||||||||||||
| elif self.mode == RunMode.CUSTOM_DC or self.mode == RunMode.MAIN_DC or self.mode == RunMode.DCP_BRIDGE: | ||||||||||||||||
| self._run_imports_and_do_post_import_work() | ||||||||||||||||
|
|
||||||||||||||||
| else: | ||||||||||||||||
|
|
@@ -213,6 +215,9 @@ def _run_imports_and_do_post_import_work(self): | |||||||||||||||
| # Generate NL artifacts (sentences, embeddings, topic cache). | ||||||||||||||||
| self._generate_nl_artifacts() | ||||||||||||||||
|
|
||||||||||||||||
| if self.mode == RunMode.DCP_BRIDGE: | ||||||||||||||||
| self._export_data_to_files_for_dcp() | ||||||||||||||||
|
|
||||||||||||||||
| # Write import info to DB. | ||||||||||||||||
| self.db.insert_import_info(status=ImportStatus.SUCCESS) | ||||||||||||||||
|
|
||||||||||||||||
|
|
@@ -238,6 +243,161 @@ def _generate_nl_artifacts(self): | |||||||||||||||
| topic_cache_triples = topic_triples + sv_peer_group_triples | ||||||||||||||||
| nl.generate_topic_cache(topic_cache_triples, self.nl_dir) | ||||||||||||||||
|
|
||||||||||||||||
| def _export_data_to_files_for_dcp(self): | ||||||||||||||||
| logging.info("Exporting data to files for DCP Bridge.") | ||||||||||||||||
| import pandas as pd | ||||||||||||||||
|
|
||||||||||||||||
| # 1. Export observations to MCF | ||||||||||||||||
| obs_tuples = self.db.engine.fetch_all("select * from observations") | ||||||||||||||||
| if obs_tuples: | ||||||||||||||||
| columns = ["entity", "variable", "date", "value", "provenance", "unit", "scaling_factor", "measurement_method", "observation_period", "properties"] | ||||||||||||||||
| df = pd.DataFrame(obs_tuples, columns=columns) | ||||||||||||||||
|
Comment on lines
+251
to
+254
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||
|
|
||||||||||||||||
| import hashlib | ||||||||||||||||
|
|
||||||||||||||||
| mcf_nodes = [] | ||||||||||||||||
| for i, row in df.iterrows(): | ||||||||||||||||
| node_lines = [] | ||||||||||||||||
| node_lines.append(f"Node: E:obs->{i}") | ||||||||||||||||
| node_lines.append("typeOf: dcid:StatVarObservation") | ||||||||||||||||
| node_lines.append(f"observationAbout: dcid:{row['entity']}") | ||||||||||||||||
| node_lines.append(f"variableMeasured: dcid:{row['variable']}") | ||||||||||||||||
| node_lines.append(f"observationDate: \"{row['date']}\"") | ||||||||||||||||
| node_lines.append(f"value: {row['value']}") | ||||||||||||||||
|
|
||||||||||||||||
| # Generate a unique dcid for the observation | ||||||||||||||||
| content_str = f"{row['entity']}_{row['variable']}_{row['date']}_{row['value']}" | ||||||||||||||||
| obs_hash = hashlib.md5(content_str.encode()).hexdigest() | ||||||||||||||||
| node_lines.append(f"dcid: \"dc/o/{obs_hash}\"") | ||||||||||||||||
|
Comment on lines
+269
to
+271
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Including the observation
Suggested change
|
||||||||||||||||
|
|
||||||||||||||||
| if pd.notna(row['unit']) and row['unit']: | ||||||||||||||||
| node_lines.append(f"unit: dcid:{row['unit']}") | ||||||||||||||||
| if pd.notna(row['measurement_method']) and row['measurement_method']: | ||||||||||||||||
| node_lines.append(f"measurementMethod: dcid:{row['measurement_method']}") | ||||||||||||||||
| if pd.notna(row['observation_period']) and row['observation_period']: | ||||||||||||||||
| node_lines.append(f"observationPeriod: \"{row['observation_period']}\"") | ||||||||||||||||
| if pd.notna(row['scaling_factor']) and row['scaling_factor']: | ||||||||||||||||
| node_lines.append(f"scalingFactor: {row['scaling_factor']}") | ||||||||||||||||
|
|
||||||||||||||||
| mcf_nodes.append("\n".join(node_lines)) | ||||||||||||||||
|
|
||||||||||||||||
| mcf = "\n\n".join(mcf_nodes) | ||||||||||||||||
| obs_file = self.output_dir.open_file("observations.mcf") | ||||||||||||||||
| obs_file.write(mcf) | ||||||||||||||||
| logging.info("Exported %s observations to %s", len(df), obs_file.full_path()) | ||||||||||||||||
|
|
||||||||||||||||
| # 2. Export triples to MCF | ||||||||||||||||
| triples_tuples = self.db.engine.fetch_all("select * from triples") | ||||||||||||||||
| if triples_tuples: | ||||||||||||||||
| nodes = {} | ||||||||||||||||
| for tuple in triples_tuples: | ||||||||||||||||
| subject_id, predicate, object_id, object_value = tuple | ||||||||||||||||
| node = nodes.get(subject_id) | ||||||||||||||||
| if not node: | ||||||||||||||||
| node = McfNode(subject_id) | ||||||||||||||||
| nodes[subject_id] = node | ||||||||||||||||
|
|
||||||||||||||||
| if object_id: | ||||||||||||||||
| node.add_triple(Triple(subject_id, predicate, object_id=object_id)) | ||||||||||||||||
| else: | ||||||||||||||||
| node.add_triple(Triple(subject_id, predicate, object_value=object_value)) | ||||||||||||||||
|
|
||||||||||||||||
| mcf = "\n\n".join(map(lambda node: node.to_mcf(), nodes.values())) | ||||||||||||||||
| mcf_file = self.output_dir.open_file("schema.mcf") | ||||||||||||||||
| mcf_file.write(mcf) | ||||||||||||||||
| logging.info("Exported %s nodes to %s", len(nodes), mcf_file.full_path()) | ||||||||||||||||
|
|
||||||||||||||||
| # 3. Upload to GCS and Trigger Workflow | ||||||||||||||||
| self._upload_and_trigger_dcp_workflow() | ||||||||||||||||
|
|
||||||||||||||||
| def _upload_and_trigger_dcp_workflow(self): | ||||||||||||||||
| import os | ||||||||||||||||
| import subprocess | ||||||||||||||||
| import json | ||||||||||||||||
| from datetime import datetime | ||||||||||||||||
|
|
||||||||||||||||
| bucket = os.getenv("DCP_INGESTION_BUCKET") | ||||||||||||||||
| spanner_instance = os.getenv("SPANNER_INSTANCE_ID") | ||||||||||||||||
| spanner_database = os.getenv("SPANNER_DATABASE_ID") | ||||||||||||||||
| region = os.getenv("GCP_REGION", "us-central1") | ||||||||||||||||
| project = os.getenv("GCP_PROJECT") | ||||||||||||||||
| workflow_name = os.getenv("WORKFLOW_NAME") | ||||||||||||||||
|
|
||||||||||||||||
| if not all([bucket, spanner_instance, spanner_database, project, workflow_name]): | ||||||||||||||||
| logging.error("Missing required environment variables for DCP Bridge. Skipping upload and trigger.") | ||||||||||||||||
| logging.error("Required: DCP_INGESTION_BUCKET, SPANNER_INSTANCE_ID, SPANNER_DATABASE_ID, GCP_PROJECT, WORKFLOW_NAME") | ||||||||||||||||
| return | ||||||||||||||||
|
|
||||||||||||||||
| logging.info("Uploading artifacts to GCS bucket: %s", bucket) | ||||||||||||||||
|
|
||||||||||||||||
| import_gcs_dir = f"gs://{bucket}/imports/dcpbridge_testing/" | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||
| files_uploaded = False | ||||||||||||||||
|
|
||||||||||||||||
| # Upload observations if file exists | ||||||||||||||||
| obs_file = self.output_dir.open_file("observations.mcf") | ||||||||||||||||
| obs_local_path = obs_file.syspath() if hasattr(obs_file, 'syspath') else obs_file.path | ||||||||||||||||
| if obs_local_path and os.path.exists(obs_local_path): | ||||||||||||||||
| obs_gcs_path = f"{import_gcs_dir}observations.mcf" | ||||||||||||||||
| logging.info("Uploading %s to %s", obs_local_path, obs_gcs_path) | ||||||||||||||||
| subprocess.run(["gcloud", "storage", "cp", obs_local_path, obs_gcs_path], check=True) | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using |
||||||||||||||||
| files_uploaded = True | ||||||||||||||||
|
|
||||||||||||||||
| # Upload schema if file exists | ||||||||||||||||
| mcf_file = self.output_dir.open_file("schema.mcf") | ||||||||||||||||
| mcf_local_path = mcf_file.syspath() if hasattr(mcf_file, 'syspath') else mcf_file.path | ||||||||||||||||
| if mcf_local_path and os.path.exists(mcf_local_path): | ||||||||||||||||
| schema_gcs_path = f"{import_gcs_dir}schema.mcf" | ||||||||||||||||
| logging.info("Uploading %s to %s", mcf_local_path, schema_gcs_path) | ||||||||||||||||
| subprocess.run(["gcloud", "storage", "cp", mcf_local_path, schema_gcs_path], check=True) | ||||||||||||||||
| files_uploaded = True | ||||||||||||||||
|
|
||||||||||||||||
| # Upload NL artifacts | ||||||||||||||||
| nl_local_path = self.nl_dir.syspath() if hasattr(self.nl_dir, 'syspath') else self.nl_dir.path | ||||||||||||||||
| if nl_local_path and os.path.exists(nl_local_path): | ||||||||||||||||
| nl_gcs_path = f"gs://{bucket}/nl/" | ||||||||||||||||
| logging.info("Uploading NL artifacts from %s to %s", nl_local_path, nl_gcs_path) | ||||||||||||||||
| subprocess.run(["gcloud", "storage", "cp", "-r", nl_local_path, nl_gcs_path], check=True) | ||||||||||||||||
|
|
||||||||||||||||
| if not files_uploaded: | ||||||||||||||||
| logging.warning("No artifacts to import. Skipping workflow trigger.") | ||||||||||||||||
| return | ||||||||||||||||
|
|
||||||||||||||||
| # Construct import list with single directory entry | ||||||||||||||||
| prov_names = list(self.config.provenances.keys()) | ||||||||||||||||
| logical_import_name = prov_names[0] if prov_names else "DCP_Bridge_Import" | ||||||||||||||||
|
|
||||||||||||||||
| import_list = [{"importName": logical_import_name, "graphPath": import_gcs_dir}] | ||||||||||||||||
|
|
||||||||||||||||
| # Construct payload | ||||||||||||||||
| import_name = f"dcp_bridge_{datetime.now().strftime('%Y%m%d%H%M%S')}" | ||||||||||||||||
|
|
||||||||||||||||
| payload = { | ||||||||||||||||
| "spannerInstanceId": spanner_instance, | ||||||||||||||||
| "spannerDatabaseId": spanner_database, | ||||||||||||||||
| "importName": import_name, | ||||||||||||||||
| "importList": json.dumps(import_list), | ||||||||||||||||
| "tempLocation": f"gs://{bucket}/temp", | ||||||||||||||||
| "region": region | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| payload_str = json.dumps(payload) | ||||||||||||||||
|
|
||||||||||||||||
| # Trigger workflow | ||||||||||||||||
| logging.info("Triggering Cloud Workflow: %s", workflow_name) | ||||||||||||||||
| cmd = [ | ||||||||||||||||
| "gcloud", "workflows", "run", workflow_name, | ||||||||||||||||
| f"--project={project}", | ||||||||||||||||
| f"--location={region}", | ||||||||||||||||
| f"--data={payload_str}" | ||||||||||||||||
| ] | ||||||||||||||||
|
|
||||||||||||||||
| logging.info("Running command: %s", " ".join(cmd)) | ||||||||||||||||
| result = subprocess.run(cmd, capture_output=True, text=True) | ||||||||||||||||
| if result.returncode == 0: | ||||||||||||||||
| logging.info("Successfully triggered workflow. Output: %s", result.stdout) | ||||||||||||||||
| else: | ||||||||||||||||
| logging.error("Failed to trigger workflow. Error: %s", result.stderr) | ||||||||||||||||
|
|
||||||||||||||||
| def _generate_svg_hierarchy(self): | ||||||||||||||||
| if self.mode == RunMode.MAIN_DC: | ||||||||||||||||
| logging.info("Hierarchy generation not supported for main dc, skipping.") | ||||||||||||||||
|
|
@@ -385,6 +545,7 @@ def _create_importer(self, input_file: File) -> Importer: | |||||||||||||||
| db=self.db, | ||||||||||||||||
| reporter=reporter, | ||||||||||||||||
| nodes=self.nodes, | ||||||||||||||||
| mode=self.mode, | ||||||||||||||||
| ) | ||||||||||||||||
| return ObservationsImporter( | ||||||||||||||||
| input_file=input_file, | ||||||||||||||||
|
|
||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| from stats.data import Observation | ||
| from stats.data import ObservationProperties | ||
| from stats.data import strip_namespace | ||
| from stats.data import Triple | ||
| from stats.db import Db | ||
| from stats.importer import Importer | ||
| from stats.nodes import Nodes | ||
|
|
@@ -54,12 +55,13 @@ class VariablePerRowImporter(Importer): | |
| """ | ||
|
|
||
| def __init__(self, input_file: File, db: Db, reporter: FileImportReporter, | ||
| nodes: Nodes) -> None: | ||
| nodes: Nodes, mode: str = "customdc") -> None: | ||
| self.input_file = input_file | ||
| self.db = db | ||
| self.reporter = reporter | ||
| self.nodes = nodes | ||
| self.config = nodes.config | ||
| self.mode = mode | ||
| # Reassign after reading CSV. | ||
| self.column_mappings = dict(_DEFAULT_COLUMN_MAPPINGS) | ||
| self.reader: DictReader = None | ||
|
|
@@ -160,3 +162,54 @@ def _add_entity_nodes(self) -> None: | |
| logging.info("Importing %s of %s entities.", len(dcid2type), | ||
| len(new_entity_dcids)) | ||
| self.nodes.entities_with_types(dcid2type) | ||
|
|
||
| if self.mode == "dcpbridge": | ||
| # Get parent places chain and their details | ||
| logging.info("Getting parent places chain from DC for %s entities.", len(new_entity_dcids)) | ||
|
|
||
| resolved_dcids = set() | ||
| current_level_dcids = set(new_entity_dcids) | ||
|
|
||
| while current_level_dcids: | ||
| # Filter out already resolved ones to optimize API calls | ||
| to_resolve = list(current_level_dcids - resolved_dcids) | ||
| if not to_resolve: | ||
| break | ||
|
|
||
| logging.info("Resolving parents for %s entities.", len(to_resolve)) | ||
| dcid2parents = dc.get_property_of_entities(to_resolve, "containedInPlace") | ||
|
|
||
| if not dcid2parents: | ||
| break | ||
|
|
||
| parent_triples = [] | ||
| next_level_dcids = set() | ||
|
|
||
| for child_dcid, parent_dcid in dcid2parents.items(): | ||
| parent_triples.append(Triple(child_dcid, "containedInPlace", object_id=parent_dcid)) | ||
| next_level_dcids.add(parent_dcid) | ||
|
|
||
| self.db.insert_triples(parent_triples) | ||
|
|
||
| # Mark current as resolved | ||
| resolved_dcids.update(to_resolve) | ||
|
|
||
| # Prepare for next level | ||
| current_level_dcids = next_level_dcids | ||
|
Comment on lines
+173
to
+198
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This recursive parent resolution logic is executed for every file import. Since |
||
|
|
||
| # Resolve details for all unique parent places found in chain (excluding original ones) | ||
| all_parents = resolved_dcids - set(new_entity_dcids) | ||
| if all_parents: | ||
| logging.info("Resolving details for %s unique parent places found in chain.", len(all_parents)) | ||
| parent_dcids_list = list(all_parents) | ||
|
|
||
| parent_dcid2type = dc.get_property_of_entities(parent_dcids_list, sc.PREDICATE_TYPE_OF) | ||
| parent_dcid2name = dc.get_property_of_entities(parent_dcids_list, sc.PREDICATE_NAME) | ||
|
|
||
| parent_info_triples = [] | ||
| for dcid, type_str in parent_dcid2type.items(): | ||
| parent_info_triples.append(Triple(dcid, sc.PREDICATE_TYPE_OF, object_id=type_str)) | ||
| for dcid, name_str in parent_dcid2name.items(): | ||
| parent_info_triples.append(Triple(dcid, sc.PREDICATE_NAME, object_value=name_str)) | ||
|
|
||
| self.db.insert_triples(parent_info_triples) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several inline imports were added in the new methods
_export_data_to_files_for_dcpand_upload_and_trigger_dcp_workflow. Per PEP 8, imports should be at the top of the file. This improves readability and ensures dependencies are resolved when the module is loaded.References