Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/dve/core_engine/backends/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,8 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType:
if polars_type:
return polars_type
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")


def pl_row_count(df: pl.DataFrame) -> int: # type: ignore
"""Return row count from a polars DataFrame object."""
return df.select(pl.len()).to_dicts()[0]["len"] # type: ignore
5 changes: 3 additions & 2 deletions src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from dve.parser.file_handling.service import _get_implementation
from dve.pipeline.utils import SubmissionStatus, deadletter_file, load_config, load_reader
from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates
from dve.reporting.utils import dump_feedback_errors, dump_processing_errors
from dve.reporting.utils import dump_feedback_errors, dump_processing_errors, extract_and_pivot_keys

PERMISSIBLE_EXCEPTIONS: tuple[type[Exception]] = (
FileNotFoundError, # type: ignore
Expand Down Expand Up @@ -718,6 +718,7 @@ def _get_error_dataframes(self, submission_id: str):
.otherwise(pl.lit("Warning")) # type: ignore
.alias("error_type")
)
df = extract_and_pivot_keys(df)
df = df.select(
pl.col("Entity").alias("Table"), # type: ignore
pl.col("error_type").alias("Type"), # type: ignore
Expand All @@ -729,7 +730,7 @@ def _get_error_dataframes(self, submission_id: str):
pl.col("Category"), # type: ignore
)
df = df.select(
pl.col(column).cast(ERROR_SCHEMA[column]) # type: ignore
pl.col(column).cast(ERROR_SCHEMA.get(column, pl.Utf8())) # type: ignore
for column in df.columns
)
df = df.sort("Type", descending=False) # type: ignore
Expand Down
56 changes: 56 additions & 0 deletions src/dve/reporting/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import json
from typing import Optional

import polars as pl

import dve.parser.file_handling as fh
from dve.core_engine.backends.utilities import pl_row_count
from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.type_hints import URI, Messages
from dve.reporting.error_report import conditional_cast
Expand Down Expand Up @@ -80,3 +83,56 @@ def dump_processing_errors(
f,
default=str,
)


def extract_and_pivot_keys(
df: pl.DataFrame, key_field: str = "Key" # type: ignore
) -> pl.DataFrame: # type: ignore
"""
Extract key pair values from a key fields column (str) and pivot the keys into new columns.

Where no keys exist for a given field, the unmodified dataframe will be returned and instances
of a mixture of actual keys and non valid values (null, None & "") a new column will not be
generated.

Args:
df (pl.DataFrame): dataframe to manipulate
key_field (str): name of column to extract key, value pairs from

Returns:
pl.DataFrame: Polars DataFrame with pivoted keys
"""
original_columns = df.columns
index_columns = [c for c in original_columns if c != key_field]

if pl_row_count(
df.select(key_field)
.filter(
(pl.col(key_field).str.lengths() > 0) # type: ignore
& ~(pl.col(key_field).eq("None")) # type: ignore
)
) == 0:
return df

return (
df
.with_columns(pl.col(key_field).str.extract_all(r"(\w+): (\w+)")) # type: ignore
.explode(key_field)
.with_columns(
pl.col(key_field).str.split_exact(":", 1) # type: ignore
.struct.rename_fields(["pivot_key", "pivot_values"])
.alias("ids")
)
.unnest("ids")
.select(
*[pl.col(c) for c in original_columns], # type: ignore
(pl.col("pivot_key") + pl.lit("_Identifier")).alias("pivot_key"), # type: ignore
(pl.col("pivot_values").str.strip(" ")).alias("pivot_values"), # type: ignore
)
.pivot(
values="pivot_values",
index=index_columns,
columns="pivot_key"
)
.drop(["null"])
)
43 changes: 43 additions & 0 deletions tests/test_error_reporting/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""Test objects in dve.reporting.utility"""
# pylint: disable=missing-function-docstring

import polars as pl

from dve.core_engine.backends.utilities import pl_row_count
from dve.reporting.utils import extract_and_pivot_keys


def test_extract_and_pivot_keys():
df = pl.DataFrame({
"entity": ["test1", "test2", "test3", "test4"],
"FailureType": ["submission1", "submission2", "submission3", "submission4"],
"id": [
"Key1: Value1 -- Key2: Value2 -- Key3: Value3",
"Key1: Value1 -- Key2: Value2",
"",
None,
]
})
result_df = extract_and_pivot_keys(df, key_field="id")
expected_df = pl.DataFrame({
"entity": ["test1", "test2", "test3", "test4"],
"FailureType": ["submission1", "submission2", "submission3", "submission4"],
"Key1_Identifier": ["Value1", "Value1", None, None],
"Key2_Identifier": ["Value2", "Value2", None, None],
"Key3_Identifier": ["Value3", None, None, None],
})

assert pl_row_count(result_df) == pl_row_count(df)
assert result_df.equals(expected_df)


def test_extract_and_pivot_keys_with_empty_key_field():
df = pl.DataFrame({
"entity": ["test1", "test2", "test3"],
"FailureType": ["submission1", "submission2", "submission3"],
"Key": ["", "None", None]
})
result_df = extract_and_pivot_keys(df)

assert pl_row_count(result_df) == pl_row_count(df)
assert result_df.equals(df)
4 changes: 0 additions & 4 deletions tests/test_pipeline/pipeline_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,3 @@ def error_data_after_business_rules() -> Iterator[Tuple[SubmissionInfo, str]]:
json.dump(error_data, f)

yield submitted_file_info, tdir


def pl_row_count(df: pl.DataFrame) -> int:
return df.select(pl.len()).to_dicts()[0]["len"]
2 changes: 1 addition & 1 deletion tests/test_pipeline/test_duckdb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from duckdb import DuckDBPyConnection

from dve.core_engine.backends.base.auditing import FilterCriteria
from dve.core_engine.backends.utilities import pl_row_count
from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager
from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader
from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord
Expand All @@ -26,7 +27,6 @@
from ..fixtures import temp_ddb_conn # pylint: disable=unused-import
from .pipeline_helpers import ( # pylint: disable=unused-import
PLANETS_RULES_PATH,
pl_row_count,
planet_data_after_file_transformation,
planet_test_files,
planets_data_after_business_rules,
Expand Down