diff --git a/src/dve/core_engine/backends/utilities.py b/src/dve/core_engine/backends/utilities.py index bfa6f90..015ae7d 100644 --- a/src/dve/core_engine/backends/utilities.py +++ b/src/dve/core_engine/backends/utilities.py @@ -176,3 +176,8 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType: if polars_type: return polars_type raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}") + + +def pl_row_count(df: pl.DataFrame) -> int: # type: ignore + """Return row count from a polars DataFrame object.""" + return df.select(pl.len()).to_dicts()[0]["len"] # type: ignore diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 819656a..72bb0d3 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -34,7 +34,7 @@ from dve.parser.file_handling.service import _get_implementation from dve.pipeline.utils import SubmissionStatus, deadletter_file, load_config, load_reader from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates -from dve.reporting.utils import dump_feedback_errors, dump_processing_errors +from dve.reporting.utils import dump_feedback_errors, dump_processing_errors, extract_and_pivot_keys PERMISSIBLE_EXCEPTIONS: tuple[type[Exception]] = ( FileNotFoundError, # type: ignore @@ -718,6 +718,7 @@ def _get_error_dataframes(self, submission_id: str): .otherwise(pl.lit("Warning")) # type: ignore .alias("error_type") ) + df = extract_and_pivot_keys(df) df = df.select( pl.col("Entity").alias("Table"), # type: ignore pl.col("error_type").alias("Type"), # type: ignore @@ -729,7 +730,7 @@ def _get_error_dataframes(self, submission_id: str): pl.col("Category"), # type: ignore ) df = df.select( - pl.col(column).cast(ERROR_SCHEMA[column]) # type: ignore + pl.col(column).cast(ERROR_SCHEMA.get(column, pl.Utf8())) # type: ignore for column in df.columns ) df = df.sort("Type", descending=False) # type: ignore diff --git a/src/dve/reporting/utils.py b/src/dve/reporting/utils.py index 8832b6a..81788d0 100644 --- a/src/dve/reporting/utils.py +++ b/src/dve/reporting/utils.py @@ -3,7 +3,10 @@ import json from typing import Optional +import polars as pl + import dve.parser.file_handling as fh +from dve.core_engine.backends.utilities import pl_row_count from dve.core_engine.exceptions import CriticalProcessingError from dve.core_engine.type_hints import URI, Messages from dve.reporting.error_report import conditional_cast @@ -80,3 +83,56 @@ def dump_processing_errors( f, default=str, ) + + +def extract_and_pivot_keys( + df: pl.DataFrame, key_field: str = "Key" # type: ignore +) -> pl.DataFrame: # type: ignore + """ + Extract key pair values from a key fields column (str) and pivot the keys into new columns. + + Where no keys exist for a given field, the unmodified dataframe will be returned and instances + of a mixture of actual keys and non valid values (null, None & "") a new column will not be + generated. + + Args: + df (pl.DataFrame): dataframe to manipulate + key_field (str): name of column to extract key, value pairs from + + Returns: + pl.DataFrame: Polars DataFrame with pivoted keys + """ + original_columns = df.columns + index_columns = [c for c in original_columns if c != key_field] + + if pl_row_count( + df.select(key_field) + .filter( + (pl.col(key_field).str.lengths() > 0) # type: ignore + & ~(pl.col(key_field).eq("None")) # type: ignore + ) + ) == 0: + return df + + return ( + df + .with_columns(pl.col(key_field).str.extract_all(r"(\w+): (\w+)")) # type: ignore + .explode(key_field) + .with_columns( + pl.col(key_field).str.split_exact(":", 1) # type: ignore + .struct.rename_fields(["pivot_key", "pivot_values"]) + .alias("ids") + ) + .unnest("ids") + .select( + *[pl.col(c) for c in original_columns], # type: ignore + (pl.col("pivot_key") + pl.lit("_Identifier")).alias("pivot_key"), # type: ignore + (pl.col("pivot_values").str.strip(" ")).alias("pivot_values"), # type: ignore + ) + .pivot( + values="pivot_values", + index=index_columns, + columns="pivot_key" + ) + .drop(["null"]) + ) diff --git a/tests/test_error_reporting/test_utils.py b/tests/test_error_reporting/test_utils.py new file mode 100644 index 0000000..92b0082 --- /dev/null +++ b/tests/test_error_reporting/test_utils.py @@ -0,0 +1,43 @@ +"""Test objects in dve.reporting.utility""" +# pylint: disable=missing-function-docstring + +import polars as pl + +from dve.core_engine.backends.utilities import pl_row_count +from dve.reporting.utils import extract_and_pivot_keys + + +def test_extract_and_pivot_keys(): + df = pl.DataFrame({ + "entity": ["test1", "test2", "test3", "test4"], + "FailureType": ["submission1", "submission2", "submission3", "submission4"], + "id": [ + "Key1: Value1 -- Key2: Value2 -- Key3: Value3", + "Key1: Value1 -- Key2: Value2", + "", + None, + ] + }) + result_df = extract_and_pivot_keys(df, key_field="id") + expected_df = pl.DataFrame({ + "entity": ["test1", "test2", "test3", "test4"], + "FailureType": ["submission1", "submission2", "submission3", "submission4"], + "Key1_Identifier": ["Value1", "Value1", None, None], + "Key2_Identifier": ["Value2", "Value2", None, None], + "Key3_Identifier": ["Value3", None, None, None], + }) + + assert pl_row_count(result_df) == pl_row_count(df) + assert result_df.equals(expected_df) + + +def test_extract_and_pivot_keys_with_empty_key_field(): + df = pl.DataFrame({ + "entity": ["test1", "test2", "test3"], + "FailureType": ["submission1", "submission2", "submission3"], + "Key": ["", "None", None] + }) + result_df = extract_and_pivot_keys(df) + + assert pl_row_count(result_df) == pl_row_count(df) + assert result_df.equals(df) diff --git a/tests/test_pipeline/pipeline_helpers.py b/tests/test_pipeline/pipeline_helpers.py index 1518ccf..ce60a81 100644 --- a/tests/test_pipeline/pipeline_helpers.py +++ b/tests/test_pipeline/pipeline_helpers.py @@ -403,7 +403,3 @@ def error_data_after_business_rules() -> Iterator[Tuple[SubmissionInfo, str]]: json.dump(error_data, f) yield submitted_file_info, tdir - - -def pl_row_count(df: pl.DataFrame) -> int: - return df.select(pl.len()).to_dicts()[0]["len"] diff --git a/tests/test_pipeline/test_duckdb_pipeline.py b/tests/test_pipeline/test_duckdb_pipeline.py index 58eb4ac..d575392 100644 --- a/tests/test_pipeline/test_duckdb_pipeline.py +++ b/tests/test_pipeline/test_duckdb_pipeline.py @@ -15,6 +15,7 @@ from duckdb import DuckDBPyConnection from dve.core_engine.backends.base.auditing import FilterCriteria +from dve.core_engine.backends.utilities import pl_row_count from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord @@ -26,7 +27,6 @@ from ..fixtures import temp_ddb_conn # pylint: disable=unused-import from .pipeline_helpers import ( # pylint: disable=unused-import PLANETS_RULES_PATH, - pl_row_count, planet_data_after_file_transformation, planet_test_files, planets_data_after_business_rules,