Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def _invoke_import_tool(self, absolute_import_dir: str,
f'Error reading report.json {report_json} file: {e}')
if os.path.exists(output_path):
# Upload output to GCS.
gcs_output = f'{relative_import_dir}/{import_spec["import_name"]}/{version}/{import_prefix}/validation'
gcs_output = f'{relative_import_dir}/{import_spec["import_name"]}/{version}/{import_prefix}/genmcf'
Comment thread
vish-cs marked this conversation as resolved.
logging.info(
f'Uploading genmcf output to GCS path: {gcs_output}')
for filename in os.listdir(output_path):
Expand Down Expand Up @@ -587,7 +587,10 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
import_prefix, 'validation')
os.makedirs(validation_output_path, exist_ok=True)
current_data_path = os.path.join(genmcf_output_path, '*.mcf')
previous_data_path = latest_version + f'/{import_prefix}/validation/*.mcf'
previous_data_path = latest_version + f'/{import_prefix}/genmcf/*.mcf'
if latest_version and not file_util.file_get_matching(
previous_data_path):
previous_data_path = latest_version + f'/{import_prefix}/validation/*.mcf'
Comment thread
vish-cs marked this conversation as resolved.
summary_stats = os.path.join(genmcf_output_path,
'summary_report.csv')
report_json = os.path.join(genmcf_output_path, 'report.json')
Expand Down Expand Up @@ -623,7 +626,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
"previous_version": latest_version,
"current_version": version
})
differ_output_file = differ_output
differ_output_file = validation_output_path
# Save the previous version being compared to
with open(
os.path.join(validation_output_path,
Expand Down
13 changes: 13 additions & 0 deletions tools/import_differ/differ_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import glob
import fnmatch
import json
import os
import pandas as pd
import re
Expand Down Expand Up @@ -71,6 +72,18 @@ def write_csv_data(df: pd.DataFrame, dest: str, file: str, tmp_dir: str):
upload_output_data(path, dest)


def write_json_data(data, dest: str, file: str, tmp_dir: str):
""" Writes data to a JSON file with the given path."""
if dest.startswith('gs://'):
path = os.path.join(tmp_dir, file)
else:
path = os.path.join(dest, file)
with open(path, mode='w', encoding='utf-8') as out_file:
json.dump(data, out_file, indent=4)
if dest.startswith('gs://'):
upload_output_data(path, dest)


def upload_output_data(src: str, dest: str):
client = storage.Client()
bucket_name = dest.split('/')[2]
Expand Down
13 changes: 13 additions & 0 deletions tools/import_differ/import_differ.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,22 @@ def run_differ(self):
'obs_diff_log.csv', tmp_path)
differ_utils.write_csv_data(schema_diff, self.output_path,
'schema_diff_log.csv', tmp_path)
differ_summary = {
'current_version': self.current_data,
'previous_version': self.previous_data,
'current_obs_size': current_df_obs.shape[0],
'previous_obs_size': previous_df_obs.shape[0],
'current_schema_size': current_df_schema.shape[0],
'previous_schema_size': previous_df_schema.shape[0],
'obs_diff_size': obs_diff.shape[0],
'schema_diff_size': schema_diff.shape[0]
}
differ_utils.write_json_data(differ_summary, self.output_path,
'differ_summary.json', tmp_path)

logging.info(f'Generated observation diff of size {obs_diff.shape[0]}')
logging.info(f'Generated schema diff of size {schema_diff.shape[0]}')
logging.info(f'Differ summary: {differ_summary}')

logging.info(f'Performing schema diff analysis')
schema_diff_summary = self.schema_diff_analysis(schema_diff)
Expand Down
1 change: 1 addition & 0 deletions tools/import_validation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ The following validations are currently supported:
| `MISSING_REFS_COUNT` | Checks that the total number of missing references is within a threshold. | `lint` | `threshold` (integer, defaults to 0) |
| `LINT_ERROR_COUNT` | Checks that the total number of lint errors is within a threshold. | `lint` | `threshold` (integer, defaults to 0) |
| `DELETED_RECORDS_COUNT` | Checks that the total number of deleted points is within a threshold. | `differ` | `threshold` (integer, defaults to 0) |
| `DELETED_RECORDS_PERCENT` | Checks that the percentage of deleted points is within a threshold. | `differ` | `threshold` (integer, defaults to 0) |
| `MODIFIED_RECORDS_COUNT` | Checks that the number of modified points is the same for all StatVars. | `differ` | None |
| `ADDED_RECORDS_COUNT` | Checks that the number of added points is the same for all StatVars. | `differ` | None |
| `NUM_PLACES_CONSISTENT` | Checks that the number of places is the same for all StatVars. | `stats` | None |
Expand Down
51 changes: 42 additions & 9 deletions tools/import_validation/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, validation_config_path: str, differ_output: str,
self.data_sources = {
'stats': pd.DataFrame(),
'differ': pd.DataFrame(),
'differ_summary': {},
'lint': {}
}

Expand All @@ -59,6 +60,8 @@ def __init__(self, validation_config_path: str, differ_output: str,
(self.validator.validate_max_date_consistent, 'stats'),
'DELETED_RECORDS_COUNT':
(self.validator.validate_deleted_records_count, 'differ'),
'DELETED_RECORDS_PERCENT':
(self.validator.validate_deleted_records_percent, 'differ'),
'MISSING_REFS_COUNT':
(self.validator.validate_missing_refs_count, 'lint'),
'LINT_ERROR_COUNT':
Expand Down Expand Up @@ -99,7 +102,7 @@ def _initialize_data_sources(self, stats_summary: str, lint_report: str,
if 'differ' in req_sources:
if not differ_output or not os.path.exists(differ_output):
logging.warning(
f"A validation rule requires the 'differ' data source, but the --differ_output file was not provided or does not exist. Path: {differ_output}"
f"A validation rule requires the 'differ' data source, but the --differ_output path was not provided or does not exist. Path: {differ_output}"
)

if 'lint' in req_sources and (not lint_report or
Expand All @@ -115,12 +118,38 @@ def _initialize_data_sources(self, stats_summary: str, lint_report: str,
logging.warning("stats_summary file exists but is empty: %s",
stats_summary)

if differ_output and os.path.exists(differ_output) and os.path.getsize(
differ_output) > 0:
self.data_sources['differ'] = pd.read_csv(differ_output)
elif differ_output and os.path.exists(differ_output):
logging.warning("differ_output file exists but is empty: %s",
differ_output)
# Handle differ output (folder or file)
differ_csv_path = None
differ_json_path = None

if differ_output and os.path.exists(differ_output):
if os.path.isdir(differ_output):
differ_csv_path = os.path.join(differ_output,
'obs_diff_summary.csv')
differ_json_path = os.path.join(differ_output,
'differ_summary.json')
else:
differ_csv_path = differ_output

if differ_csv_path and os.path.exists(
differ_csv_path) and os.path.getsize(differ_csv_path) > 0:
self.data_sources['differ'] = pd.read_csv(differ_csv_path)
elif differ_csv_path and os.path.exists(differ_csv_path):
logging.warning("differ csv file exists but is empty: %s",
differ_csv_path)

if differ_json_path and os.path.exists(
differ_json_path) and os.path.getsize(differ_json_path) > 0:
try:
with open(differ_json_path, 'r') as f:
self.data_sources['differ_summary'] = json.load(f)
except Exception as e:
logging.error(
f"JSON parse error while reading differ summary at {differ_json_path}: {e}"
)
elif differ_json_path and os.path.exists(differ_json_path):
logging.warning("differ summary file exists but is empty: %s",
differ_json_path)

if lint_report and os.path.exists(lint_report) and os.path.getsize(
lint_report) > 0:
Expand All @@ -129,7 +158,7 @@ def _initialize_data_sources(self, stats_summary: str, lint_report: str,
self.data_sources['lint'] = json.load(f)
except Exception as e:
logging.error(
"JSON parse error while reading lint report at {lint_report}: {e}"
f"JSON parse error while reading lint report at {lint_report}: {e}"
)
elif lint_report and os.path.exists(lint_report):
logging.warning("lint_report file exists but is empty: %s",
Expand Down Expand Up @@ -174,6 +203,10 @@ def run_validations(self) -> tuple[bool, list[ValidationResult]]:
result = validation_func(self.data_sources['stats'],
self.data_sources['differ'],
rule['params'])
elif validator_name == 'DELETED_RECORDS_PERCENT':
result = validation_func(
self.data_sources['differ'],
self.data_sources.get('differ_summary'), rule['params'])
else:
scope = rule.get('scope', {})
if isinstance(scope, str):
Expand Down Expand Up @@ -230,7 +263,7 @@ def main(_):
flags.DEFINE_string('validation_config', 'validation_config.json',
'Path to the validation config file.')
flags.DEFINE_string('differ_output', None,
'Path to the differ output data file.')
'Path to the differ output directory or data file.')
flags.DEFINE_string('stats_summary', None,
'Path to the stats summary report file.')
flags.DEFINE_string('lint_report', None,
Expand Down
55 changes: 55 additions & 0 deletions tools/import_validation/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,61 @@ def test_runner_uses_custom_name(self, MockValidator):
self.assertEqual(output_df.iloc[0]['ValidationName'],
'My_Custom_Test_Name')

@patch('tools.import_validation.runner.Validator')
def test_runner_deleted_records_percent(self, MockValidator):
# 1. Setup mock
mock_validator_instance = MockValidator.return_value
expected_result = ValidationResult(ValidationStatus.PASSED,
'DELETED_RECORDS_PERCENT',
details={'percent': 5.0})
mock_validator_instance.validate_deleted_records_percent.return_value = expected_result

# 2. Setup files in a directory for differ output
differ_dir = os.path.join(self.test_dir.name, 'differ_out')
os.makedirs(differ_dir, exist_ok=True)

differ_csv = os.path.join(differ_dir, 'obs_diff_summary.csv')
pd.DataFrame({'DELETED': [5]}).to_csv(differ_csv, index=False)

differ_json = os.path.join(differ_dir, 'differ_summary.json')
with open(differ_json, 'w') as f:
json.dump({'previous_data_size': 100}, f)

with open(self.config_path, 'w') as f:
json.dump(
{
'rules': [{
'rule_id': 'check_percent',
'validator': 'DELETED_RECORDS_PERCENT',
'params': {
'threshold': 10
}
}]
}, f)

# 3. Run runner
runner = ValidationRunner(
validation_config_path=self.config_path,
stats_summary=self.stats_path,
differ_output=differ_dir, # Pass directory
lint_report=self.report_path,
validation_output=self.output_path)
runner.run_validations()

# 4. Verify
# Check if validator was called with correct arguments
call_args, _ = mock_validator_instance.validate_deleted_records_percent.call_args
# call_args is (df, summary, params)
self.assertIsInstance(call_args[0], pd.DataFrame)
self.assertEqual(call_args[1]['previous_data_size'], 100)
self.assertEqual(call_args[2]['threshold'], 10)

# Check output
output_df = pd.read_csv(self.output_path)
self.assertEqual(len(output_df), 1)
self.assertEqual(output_df.iloc[0]['ValidationName'], 'check_percent')
self.assertEqual(output_df.iloc[0]['Status'], 'PASSED')

@patch('tools.import_validation.runner.logging')
@patch('tools.import_validation.runner.Validator')
def test_runner_handles_unknown_validation(self, MockValidator,
Expand Down
6 changes: 3 additions & 3 deletions tools/import_validation/validation_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
"schema_version": "1.0",
"rules": [
{
"rule_id": "check_deleted_records_count",
"description": "Checks that the number of deleted points is within the threshold.",
"validator": "DELETED_RECORDS_COUNT",
"rule_id": "check_deleted_records_percent",
"description": "Checks that the percentage of deleted points is within the threshold.",
"validator": "DELETED_RECORDS_PERCENT",
"params": {
"threshold": 0
}
Expand Down
77 changes: 77 additions & 0 deletions tools/import_validation/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,83 @@ def validate_deleted_records_count(self, differ_df: pd.DataFrame,
'rows_failed': 0
})

def validate_deleted_records_percent(self, differ_df: pd.DataFrame,
summary: dict,
params: dict) -> ValidationResult:
"""Checks if the percentage of deleted records is within a threshold.

Args:
differ_df: A DataFrame containing the differ output.
summary: A dictionary containing the differ summary.
params: A dictionary containing the validation parameters, which may
have a 'threshold' key.

Returns:
A ValidationResult object.
"""
if differ_df is None:
return ValidationResult(ValidationStatus.DATA_ERROR,
'DELETED_RECORDS_PERCENT',
message="Differ DataFrame is missing.")

if summary is None:
return ValidationResult(ValidationStatus.DATA_ERROR,
'DELETED_RECORDS_PERCENT',
message="Differ summary is missing.")

if 'previous_obs_size' not in summary:
return ValidationResult(
ValidationStatus.DATA_ERROR,
'DELETED_RECORDS_PERCENT',
message=
"Differ summary is missing required field: 'previous_obs_size'."
)

previous_obs_size = summary['previous_obs_size']

if differ_df.empty:
deleted_records_count = 0
elif 'DELETED' not in differ_df.columns:
return ValidationResult(
ValidationStatus.DATA_ERROR,
'DELETED_RECORDS_PERCENT',
message="Input data is missing required column: 'DELETED'.")
else:
deleted_records_count = differ_df['DELETED'].sum()

if previous_obs_size == 0:
if deleted_records_count > 0:
percent = 100.0
else:
percent = 0.0
else:
percent = (deleted_records_count / previous_obs_size) * 100

threshold = params.get('threshold', 0)

if percent > threshold:
return ValidationResult(
ValidationStatus.FAILED,
'DELETED_RECORDS_PERCENT',
message=
f"Found {percent:.2f}% deleted records, which is over the threshold of {threshold}%.",
details={
'deleted_records_count': int(deleted_records_count),
'previous_obs_size': int(previous_obs_size),
'percent': percent,
'threshold': threshold
})

return ValidationResult(
ValidationStatus.PASSED,
'DELETED_RECORDS_PERCENT',
details={
'deleted_records_count': int(deleted_records_count),
'previous_obs_size': int(previous_obs_size),
'percent': percent,
'threshold': threshold
})

def validate_missing_refs_count(self, report: dict,
params: dict) -> ValidationResult:
"""Checks if the total number of missing references is within a threshold.
Expand Down
Loading
Loading