diff --git a/doc/locale/fr/LC_MESSAGES/user_guide/features.po b/doc/locale/fr/LC_MESSAGES/user_guide/features.po index a4bfce6..1611882 100644 --- a/doc/locale/fr/LC_MESSAGES/user_guide/features.po +++ b/doc/locale/fr/LC_MESSAGES/user_guide/features.po @@ -453,11 +453,20 @@ msgstr "" msgid "Clip values to specified range" msgstr "Écrêter les valeurs à une plage spécifiée" +msgid ":func:`replace_special_values `" +msgstr ":func:`replace_special_values `" + +msgid ":func:`replace_special_values `" +msgstr ":func:`replace_special_values `" + +msgid "Replace NaN, +Inf and -Inf values with configurable strategies" +msgstr "Remplacer les valeurs NaN, +Inf et -Inf avec des stratégies configurables" + msgid ":func:`offset_correction `" -msgstr "" +msgstr ":func:`offset_correction `" msgid ":func:`offset_correction `" -msgstr "" +msgstr ":func:`offset_correction `" msgid "Remove DC offset/background" msgstr "Supprimer le décalage DC/fond continu" diff --git a/doc/user_guide/features.rst b/doc/user_guide/features.rst index 486b3f9..299aaf1 100644 --- a/doc/user_guide/features.rst +++ b/doc/user_guide/features.rst @@ -249,6 +249,9 @@ Signal Conditioning * - :func:`clip ` - :func:`clip ` - Clip values to specified range + * - :func:`replace_special_values ` + - :func:`replace_special_values ` + - Replace NaN, +Inf and -Inf values with configurable strategies * - :func:`offset_correction ` - :func:`offset_correction ` - Remove DC offset/background diff --git a/sigima/enums.py b/sigima/enums.py index 0f40f8c..c57df24 100644 --- a/sigima/enums.py +++ b/sigima/enums.py @@ -200,3 +200,43 @@ class DetectionROIGeometry(gds.LabeledEnum): CIRCLE = "circle", _("Circle") RECTANGLE = "rectangle", _("Rectangle") + + +class ReplacementStrategySignal(gds.LabeledEnum): + """Replacement strategies for special values (NaN/Inf) in signals.""" + + NONE = "none", _("Do nothing") + ZERO = "zero", _("Replace with zero") + CONSTANT = "constant", _("Replace with constant") + MIN = "min", _("Replace with minimum") + MAX = "max", _("Replace with maximum") + MEAN = "mean", _("Replace with mean") + MEDIAN = "median", _("Replace with median") + DELETE = "delete", _("Delete points") + FORWARD_FILL = "ffill", _("Forward fill (previous value)") + BACKWARD_FILL = "bfill", _("Backward fill (next value)") + INTERP_LINEAR = "interp_linear", _("Interpolation: Linear") + INTERP_SPLINE = "interp_spline", _("Interpolation: Spline") + INTERP_QUADRATIC = "interp_quadratic", _("Interpolation: Quadratic") + INTERP_CUBIC = "interp_cubic", _("Interpolation: Cubic") + INTERP_PCHIP = "interp_pchip", _("Interpolation: PCHIP") + NEIGHBOR_MIN = "neighbor_min", _("N-neighbor minimum") + NEIGHBOR_MAX = "neighbor_max", _("N-neighbor maximum") + NEIGHBOR_MEAN = "neighbor_mean", _("N-neighbor mean") + NEIGHBOR_MEDIAN = "neighbor_median", _("N-neighbor median") + + +class ReplacementStrategyImage(gds.LabeledEnum): + """Replacement strategies for special values (NaN/Inf) in images.""" + + NONE = "none", _("Do nothing") + ZERO = "zero", _("Replace with zero") + CONSTANT = "constant", _("Replace with constant") + MIN = "min", _("Replace with minimum") + MAX = "max", _("Replace with maximum") + MEAN = "mean", _("Replace with mean") + MEDIAN = "median", _("Replace with median") + NEIGHBOR_MIN = "neighbor_min", _("N-neighbor minimum") + NEIGHBOR_MAX = "neighbor_max", _("N-neighbor maximum") + NEIGHBOR_MEAN = "neighbor_mean", _("N-neighbor mean") + NEIGHBOR_MEDIAN = "neighbor_median", _("N-neighbor median") diff --git a/sigima/locale/fr/LC_MESSAGES/sigima.po b/sigima/locale/fr/LC_MESSAGES/sigima.po index 0b05ce7..b300bcb 100644 --- a/sigima/locale/fr/LC_MESSAGES/sigima.po +++ b/sigima/locale/fr/LC_MESSAGES/sigima.po @@ -142,6 +142,63 @@ msgstr "Chaque signal devient une colonne" msgid "Rectangle" msgstr "Rectangle" +msgid "Do nothing" +msgstr "Ne rien faire" + +msgid "Replace with zero" +msgstr "Remplacer par zéro" + +msgid "Replace with constant" +msgstr "Remplacer par une constante" + +msgid "Replace with minimum" +msgstr "Remplacer par le minimum" + +msgid "Replace with maximum" +msgstr "Remplacer par le maximum" + +msgid "Replace with mean" +msgstr "Remplacer par la moyenne" + +msgid "Replace with median" +msgstr "Remplacer par la médiane" + +msgid "Delete points" +msgstr "Supprimer les points" + +msgid "Forward fill (previous value)" +msgstr "Remplissage vers l'avant (valeur précédente)" + +msgid "Backward fill (next value)" +msgstr "Remplissage vers l'arrière (valeur suivante)" + +msgid "Interpolation: Linear" +msgstr "Interpolation linéaire" + +msgid "Interpolation: Spline" +msgstr "Interpolation spline" + +msgid "Interpolation: Quadratic" +msgstr "Interpolation quadratique" + +msgid "Interpolation: Cubic" +msgstr "Interpolation cubique" + +msgid "Interpolation: PCHIP" +msgstr "Interpolation PCHIP" + +msgid "N-neighbor minimum" +msgstr "Plus proche voisin minimum" + +msgid "N-neighbor maximum" +msgstr "Plus proche voisin maximum" + +msgid "N-neighbor mean" +msgstr "Plus proche voisin moyenne" + +msgid "N-neighbor median" +msgstr "Plus proche voisin médiane" + msgid "All supported files" msgstr "Tous les fichiers pris en charge" @@ -757,6 +814,33 @@ msgstr "Méthode de normalisation" msgid "Method used for normalization" msgstr "Méthode utilisée pour la normalisation" +msgid "Number of neighboring points used when a neighbor-based strategy is selected" +msgstr "Nombre de points voisins utilisés lorsqu'une stratégie basée sur les voisins est sélectionnée" + +msgid "Value used when a 'Replace with constant' strategy is selected" +msgstr "Valeur utilisée lorsqu'une stratégie 'Remplacer par une constante' est sélectionnée" + +msgid "Replace special values (signal)" +msgstr "Remplacer les valeurs spéciales (signal)" + +msgid "NaN" +msgstr "NaN" + +msgid "Strategy" +msgstr "Stratégie" + +msgid "Neighbor size" +msgstr "Taille du voisinage" + +msgid "+ Infinity" +msgstr "+ Infini" + +msgid "- Infinity" +msgstr "- Infini" + +msgid "Replace special values (image)" +msgstr "Remplacer les valeurs spéciales (image)" + msgid "rows" msgstr "lignes" @@ -1043,22 +1127,18 @@ msgstr "Translation en X" msgid "Y translation" msgstr "Translation en Y" -#, fuzzy msgid "column size" -msgstr "Colonnes" +msgstr "Nombre de colonnes" -#, fuzzy msgid "row size" -msgstr "Taille" +msgstr "Nombre de lignes" -#, fuzzy msgid "column spacing" msgstr "Espace entre chaque colonne" msgid "Horizontal spacing between ROI centers, as a percentage of the automatically computed cell width (100% = evenly distributed grid)" msgstr "Espacement horizontal entre les centres des ROI, en pourcentage de la largeur de cellule automatiquement calculée (100% = grille uniformément distribuée)" -#, fuzzy msgid "row spacing" msgstr "Espace entre chaque ligne" @@ -1507,9 +1587,6 @@ msgstr "Ajustement polynomial" msgid "Degree" msgstr "Degré" -msgid "Strategy" -msgstr "Stratégie" - msgid "Location" msgstr "Emplacement" @@ -1620,3 +1697,6 @@ msgstr "Barycentre" msgid "Plot dialog" msgstr "Fenêtre de tracé" + +msgid "Replace special values is not applicable to integer images because they cannot contain NaN or infinite values." +msgstr "Le remplacement des valeurs spéciales n'est pas applicable aux images de type entier car elles ne peuvent pas contenir de valeurs NaN ou infinies." diff --git a/sigima/params.py b/sigima/params.py index 295c25c..f6a7582 100644 --- a/sigima/params.py +++ b/sigima/params.py @@ -361,6 +361,8 @@ "PulseFeaturesParam", "ROIGridParam", "RadialProfileParam", + "ReplaceSpecialValuesImageParam", + "ReplaceSpecialValuesSignalParam", "Resampling1DParam", "Resampling2DParam", "RescaleIntensityParam", @@ -393,6 +395,8 @@ MovingMedianParam, NormalizeParam, PhaseParam, + ReplaceSpecialValuesImageParam, + ReplaceSpecialValuesSignalParam, SignalsToImageParam, SpectrumParam, ) diff --git a/sigima/proc/base.py b/sigima/proc/base.py index 294685f..e9899bd 100644 --- a/sigima/proc/base.py +++ b/sigima/proc/base.py @@ -24,6 +24,8 @@ FilterMode, MathOperator, NormalizationMethod, + ReplacementStrategyImage, + ReplacementStrategySignal, SignalsToImageOrientation, ) from sigima.proc.title_formatting import get_default_title_formatter @@ -236,6 +238,193 @@ class SignalsToImageParam(gds.DataSet, title=_("Signals to image")): ).set_prop("display", active=_prop) +_NEIGHBOR_HELP = _( + "Number of neighboring points used when a neighbor-based strategy is selected" +) +_CONSTANT_HELP = _("Value used when a 'Replace with constant' strategy is selected") + +_S_SIG = ReplacementStrategySignal +_S_IMG = ReplacementStrategyImage + + +def _is_signal_constant(x: ReplacementStrategySignal) -> bool: + return x == _S_SIG.CONSTANT + + +def _is_signal_neighbor(x: ReplacementStrategySignal) -> bool: + return x in ( + _S_SIG.NEIGHBOR_MIN, + _S_SIG.NEIGHBOR_MAX, + _S_SIG.NEIGHBOR_MEAN, + _S_SIG.NEIGHBOR_MEDIAN, + ) + + +def _is_image_constant(x: ReplacementStrategyImage) -> bool: + return x == _S_IMG.CONSTANT + + +def _is_image_neighbor(x: ReplacementStrategyImage) -> bool: + return x in ( + _S_IMG.NEIGHBOR_MIN, + _S_IMG.NEIGHBOR_MAX, + _S_IMG.NEIGHBOR_MEAN, + _S_IMG.NEIGHBOR_MEDIAN, + ) + + +class ReplaceSpecialValuesSignalParam( + gds.DataSet, title=_("Replace special values (signal)") +): + """Parameters for replacing NaN, +Inf and -Inf values in signals. + + Each target has its own tab with strategy and related advanced parameters. + """ + + _tabs = gds.BeginTabGroup("targets") + + # --- NaN --- + _g_nan = gds.BeginGroup(_("NaN")) + _prop_nan = gds.GetAttrProp("nan_strategy") + nan_strategy = gds.ChoiceItem( + _("Strategy"), + ReplacementStrategySignal, + default=ReplacementStrategySignal.ZERO, + ).set_prop("display", store=_prop_nan) + nan_constant_value = gds.FloatItem( + _("Constant value"), + default=0.0, + help=_CONSTANT_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_nan, _is_signal_constant)) + nan_neighbor_size = gds.IntItem( + _("Neighbor size"), + default=3, + min=1, + help=_NEIGHBOR_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_nan, _is_signal_neighbor)) + _e_g_nan = gds.EndGroup(_("NaN")) + + # --- +Infinity --- + _g_posinf = gds.BeginGroup(_("+ Infinity")) + _prop_posinf = gds.GetAttrProp("posinf_strategy") + posinf_strategy = gds.ChoiceItem( + _("Strategy"), + ReplacementStrategySignal, + default=ReplacementStrategySignal.ZERO, + ).set_prop("display", store=_prop_posinf) + posinf_constant_value = gds.FloatItem( + _("Constant value"), + default=0.0, + help=_CONSTANT_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_posinf, _is_signal_constant)) + posinf_neighbor_size = gds.IntItem( + _("Neighbor size"), + default=3, + min=1, + help=_NEIGHBOR_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_posinf, _is_signal_neighbor)) + _e_g_posinf = gds.EndGroup(_("+ Infinity")) + + # --- -Infinity --- + _g_neginf = gds.BeginGroup(_("- Infinity")) + _prop_neginf = gds.GetAttrProp("neginf_strategy") + neginf_strategy = gds.ChoiceItem( + _("Strategy"), + ReplacementStrategySignal, + default=ReplacementStrategySignal.ZERO, + ).set_prop("display", store=_prop_neginf) + neginf_constant_value = gds.FloatItem( + _("Constant value"), + default=0.0, + help=_CONSTANT_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_neginf, _is_signal_constant)) + neginf_neighbor_size = gds.IntItem( + _("Neighbor size"), + default=3, + min=1, + help=_NEIGHBOR_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_neginf, _is_signal_neighbor)) + _e_g_neginf = gds.EndGroup(_("- Infinity")) + + _e_tabs = gds.EndTabGroup("targets") + + +class ReplaceSpecialValuesImageParam( + gds.DataSet, title=_("Replace special values (image)") +): + """Parameters for replacing NaN, +Inf and -Inf values in images. + + Each target has its own tab with strategy and related advanced parameters. + """ + + _tabs = gds.BeginTabGroup("targets") + + # --- NaN --- + _g_nan = gds.BeginGroup(_("NaN")) + _prop_nan = gds.GetAttrProp("nan_strategy") + nan_strategy = gds.ChoiceItem( + _("Strategy"), + ReplacementStrategyImage, + default=ReplacementStrategyImage.ZERO, + ).set_prop("display", store=_prop_nan) + nan_constant_value = gds.FloatItem( + _("Constant value"), + default=0.0, + help=_CONSTANT_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_nan, _is_image_constant)) + nan_neighbor_size = gds.IntItem( + _("Neighbor size"), + default=3, + min=1, + help=_NEIGHBOR_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_nan, _is_image_neighbor)) + _e_g_nan = gds.EndGroup(_("NaN")) + + # --- +Infinity --- + _g_posinf = gds.BeginGroup(_("+ Infinity")) + _prop_posinf = gds.GetAttrProp("posinf_strategy") + posinf_strategy = gds.ChoiceItem( + _("Strategy"), + ReplacementStrategyImage, + default=ReplacementStrategyImage.ZERO, + ).set_prop("display", store=_prop_posinf) + posinf_constant_value = gds.FloatItem( + _("Constant value"), + default=0.0, + help=_CONSTANT_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_posinf, _is_image_constant)) + posinf_neighbor_size = gds.IntItem( + _("Neighbor size"), + default=3, + min=1, + help=_NEIGHBOR_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_posinf, _is_image_neighbor)) + _e_g_posinf = gds.EndGroup(_("+ Infinity")) + + # --- -Infinity --- + _g_neginf = gds.BeginGroup(_("- Infinity")) + _prop_neginf = gds.GetAttrProp("neginf_strategy") + neginf_strategy = gds.ChoiceItem( + _("Strategy"), + ReplacementStrategyImage, + default=ReplacementStrategyImage.ZERO, + ).set_prop("display", store=_prop_neginf) + neginf_constant_value = gds.FloatItem( + _("Constant value"), + default=0.0, + help=_CONSTANT_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_neginf, _is_image_constant)) + neginf_neighbor_size = gds.IntItem( + _("Neighbor size"), + default=3, + min=1, + help=_NEIGHBOR_HELP, + ).set_prop("display", active=gds.FuncProp(_prop_neginf, _is_image_neighbor)) + _e_g_neginf = gds.EndGroup(_("- Infinity")) + + _e_tabs = gds.EndTabGroup("targets") + + # MARK: Helper functions for creating result objects ----------------------------------- Obj = TypeVar("Obj", bound="SignalObj | ImageObj") diff --git a/sigima/proc/image/__init__.py b/sigima/proc/image/__init__.py index d197b72..f503f3b 100644 --- a/sigima/proc/image/__init__.py +++ b/sigima/proc/image/__init__.py @@ -210,6 +210,7 @@ histogram, normalize, offset_correction, + replace_special_values, rescale_intensity, ) from sigima.proc.image.extraction import ( @@ -461,6 +462,7 @@ "quadratic_difference", "radial_profile", "real", + "replace_special_values", "resampling", "rescale_intensity", "resize", diff --git a/sigima/proc/image/exposure.py b/sigima/proc/image/exposure.py index 5348331..9293718 100644 --- a/sigima/proc/image/exposure.py +++ b/sigima/proc/image/exposure.py @@ -27,6 +27,8 @@ from __future__ import annotations +import warnings + import guidata.dataset as gds import numpy as np from skimage import exposure @@ -34,12 +36,14 @@ import sigima.enums import sigima.tools.image from sigima.config import _ +from sigima.enums import ReplacementStrategyImage from sigima.objects.image import ImageObj, ROI2DParam from sigima.objects.signal import SignalObj from sigima.proc.base import ( ClipParam, HistogramParam, NormalizeParam, + ReplaceSpecialValuesImageParam, new_signal_result, ) from sigima.proc.decorator import computation_function @@ -49,6 +53,7 @@ dst_2_to_1, restore_data_outside_roi, ) +from sigima.tools.image import replace_values as rv2d # NOTE: Only parameter classes DEFINED in this module should be included in __all__. # Parameter classes imported from other modules (like sigima.proc.base) should NOT @@ -72,6 +77,7 @@ "histogram", "normalize", "offset_correction", + "replace_special_values", "rescale_intensity", ] @@ -402,3 +408,112 @@ def offset_correction(src: ImageObj, p: ROI2DParam) -> ImageObj: dst.data = src.data - np.nanmean(p.get_data(src)) restore_data_outside_roi(dst, src) return dst + + +def _apply_image_strategy( + data: np.ndarray, + mask: np.ndarray, + strategy: ReplacementStrategyImage, + neighbor_size: int, + constant_value: float, +) -> np.ndarray: + """Apply a single replacement strategy to masked positions in an image. + + Args: + data: 2-D data array (may be modified in place). + mask: boolean mask of positions to replace. + strategy: replacement strategy to apply. + neighbor_size: neighborhood radius for neighbor-based strategies. + constant_value: value used for the CONSTANT strategy. + + Returns: + Data array with replacements applied. + """ + s = strategy + if not np.any(mask) or s == ReplacementStrategyImage.NONE: + return data + + if s == ReplacementStrategyImage.ZERO: + rv2d.replace_with_fixed_2d(data, mask, 0.0) + elif s == ReplacementStrategyImage.CONSTANT: + rv2d.replace_with_fixed_2d(data, mask, constant_value) + elif s == ReplacementStrategyImage.MIN: + rv2d.replace_with_stat_2d(data, mask, "min") + elif s == ReplacementStrategyImage.MAX: + rv2d.replace_with_stat_2d(data, mask, "max") + elif s == ReplacementStrategyImage.MEAN: + rv2d.replace_with_stat_2d(data, mask, "mean") + elif s == ReplacementStrategyImage.MEDIAN: + rv2d.replace_with_stat_2d(data, mask, "median") + elif s == ReplacementStrategyImage.NEIGHBOR_MIN: + rv2d.neighbor_replace_2d(data, mask, neighbor_size, "min") + elif s == ReplacementStrategyImage.NEIGHBOR_MAX: + rv2d.neighbor_replace_2d(data, mask, neighbor_size, "max") + elif s == ReplacementStrategyImage.NEIGHBOR_MEAN: + rv2d.neighbor_replace_2d(data, mask, neighbor_size, "mean") + elif s == ReplacementStrategyImage.NEIGHBOR_MEDIAN: + rv2d.neighbor_replace_2d(data, mask, neighbor_size, "median") + else: + raise ValueError(f"Unsupported image replacement strategy: {s}") + return data + + +@computation_function() +def replace_special_values( + src: ImageObj, p: ReplaceSpecialValuesImageParam +) -> ImageObj: + """Replace NaN, +Inf and -Inf values in an image. + + Each target (NaN, +Inf, -Inf) is treated independently with its own strategy. + + Args: + src: input image object. + p: parameters specifying the strategy for each target. + + Returns: + Output image object with special values replaced. + """ + strategies = [] + if p.nan_strategy != ReplacementStrategyImage.NONE: + strategies.append(f"NaN→{p.nan_strategy.value}") + if p.posinf_strategy != ReplacementStrategyImage.NONE: + strategies.append(f"+Inf→{p.posinf_strategy.value}") + if p.neginf_strategy != ReplacementStrategyImage.NONE: + strategies.append(f"-Inf→{p.neginf_strategy.value}") + suffix = ", ".join(strategies) if strategies else "none" + + dst = dst_1_to_1(src, "replace_special_values", suffix) + + if np.issubdtype(src.data.dtype, np.integer): + warnings.warn( + _( + "Replace special values is not applicable to integer images " + "because they cannot contain NaN or infinite values." + ), + stacklevel=2, + ) + return dst + + data = dst.data.copy() + + for target, strategy, const_val, neigh_size in ( + (np.isnan, p.nan_strategy, p.nan_constant_value, p.nan_neighbor_size), + ( + np.isposinf, + p.posinf_strategy, + p.posinf_constant_value, + p.posinf_neighbor_size, + ), + ( + np.isneginf, + p.neginf_strategy, + p.neginf_constant_value, + p.neginf_neighbor_size, + ), + ): + mask = target(data) + data = _apply_image_strategy(data, mask, strategy, neigh_size, const_val) + + dst.data = data + restore_data_outside_roi(dst, src) + return dst diff --git a/sigima/proc/signal/__init__.py b/sigima/proc/signal/__init__.py index 543b634..19feec0 100644 --- a/sigima/proc/signal/__init__.py +++ b/sigima/proc/signal/__init__.py @@ -227,6 +227,7 @@ interpolate, normalize, offset_correction, + replace_special_values, replace_x_by_other_y, resampling, reverse_x, @@ -349,6 +350,7 @@ "psd", "quadratic_difference", "real", + "replace_special_values", "replace_x_by_other_y", "resampling", "restore_data_outside_roi", diff --git a/sigima/proc/signal/processing.py b/sigima/proc/signal/processing.py index 9be20da..e3d905c 100644 --- a/sigima/proc/signal/processing.py +++ b/sigima/proc/signal/processing.py @@ -32,11 +32,22 @@ from sigima.config import _ from sigima.config import options as sigima_options -from sigima.enums import Interpolation1DMethod, NormalizationMethod, WindowingMethod +from sigima.enums import ( + Interpolation1DMethod, + NormalizationMethod, + ReplacementStrategySignal, + WindowingMethod, +) from sigima.objects import ROI1DParam, SignalObj -from sigima.proc.base import ClipParam, NormalizeParam, dst_2_to_1 +from sigima.proc.base import ( + ClipParam, + NormalizeParam, + ReplaceSpecialValuesSignalParam, + dst_2_to_1, +) from sigima.proc.decorator import computation_function from sigima.tools.signal import fourier, interpolation, scaling, windowing +from sigima.tools.signal import replace_values as rv from .base import dst_1_to_1, is_uncertainty_data_available, restore_data_outside_roi @@ -599,3 +610,134 @@ def replace_x_by_other_y(src1: SignalObj, src2: SignalObj) -> SignalObj: # Y label and unit remain from src1 restore_data_outside_roi(dst, src1) return dst + + +def _apply_signal_strategy( + x: np.ndarray, + y: np.ndarray, + mask: np.ndarray, + strategy: ReplacementStrategySignal, + neighbor_size: int, + constant_value: float, +) -> tuple[np.ndarray, np.ndarray, bool]: + """Apply a single replacement strategy to masked positions in a signal. + + Args: + x: abscissa array. + y: ordinate array (may be modified in place). + mask: boolean mask of positions to replace. + strategy: replacement strategy to apply. + neighbor_size: number of neighbors for neighbor-based strategies. + constant_value: value used for the CONSTANT strategy. + + Returns: + Tuple ``(x, y, resized)`` where *resized* is ``True`` when the arrays + changed length (i.e. the ``DELETE`` strategy was applied). + """ + s = strategy + if not np.any(mask) or s == ReplacementStrategySignal.NONE: + return x, y, False + + if s == ReplacementStrategySignal.ZERO: + rv.replace_with_fixed(y, mask, 0.0) + elif s == ReplacementStrategySignal.CONSTANT: + rv.replace_with_fixed(y, mask, constant_value) + elif s == ReplacementStrategySignal.MIN: + rv.replace_with_stat(y, mask, "min") + elif s == ReplacementStrategySignal.MAX: + rv.replace_with_stat(y, mask, "max") + elif s == ReplacementStrategySignal.MEAN: + rv.replace_with_stat(y, mask, "mean") + elif s == ReplacementStrategySignal.MEDIAN: + rv.replace_with_stat(y, mask, "median") + elif s == ReplacementStrategySignal.DELETE: + if rv.check_uniform_sampling(x): + warnings.warn( + "Deleting points from a uniformly sampled signal will break " + "the uniform sampling. Consider using interpolation instead.", + stacklevel=3, + ) + x, y = rv.delete_masked_points(x, y, mask) + return x, y, True + elif s == ReplacementStrategySignal.FORWARD_FILL: + rv.forward_fill(y, mask) + elif s == ReplacementStrategySignal.BACKWARD_FILL: + rv.backward_fill(y, mask) + elif s.value.startswith("interp_"): + rv.interpolate_masked(x, y, mask, s.value) + elif s == ReplacementStrategySignal.NEIGHBOR_MIN: + rv.neighbor_replace(y, mask, neighbor_size, "min") + elif s == ReplacementStrategySignal.NEIGHBOR_MAX: + rv.neighbor_replace(y, mask, neighbor_size, "max") + elif s == ReplacementStrategySignal.NEIGHBOR_MEAN: + rv.neighbor_replace(y, mask, neighbor_size, "mean") + elif s == ReplacementStrategySignal.NEIGHBOR_MEDIAN: + rv.neighbor_replace(y, mask, neighbor_size, "median") + else: + raise ValueError(f"Unsupported signal replacement strategy: {s}") + return x, y, False + + +@computation_function() +def replace_special_values( + src: SignalObj, p: ReplaceSpecialValuesSignalParam +) -> SignalObj: + """Replace NaN, +Inf and -Inf values in a signal. + + Each target (NaN, +Inf, -Inf) is treated independently with its own strategy. + + Args: + src: source signal. + p: parameters specifying the strategy for each target. + + Returns: + Result signal object with special values replaced. + """ + strategies = [] + if p.nan_strategy != ReplacementStrategySignal.NONE: + strategies.append(f"NaN→{p.nan_strategy.value}") + if p.posinf_strategy != ReplacementStrategySignal.NONE: + strategies.append(f"+Inf→{p.posinf_strategy.value}") + if p.neginf_strategy != ReplacementStrategySignal.NONE: + strategies.append(f"-Inf→{p.neginf_strategy.value}") + suffix = ", ".join(strategies) if strategies else "none" + + dst = dst_1_to_1(src, "replace_special_values", suffix) + x, y = dst.get_data() + x = x.copy() + y = y.copy() + + # Apply strategies in order: NaN first, then +Inf, then -Inf + for target, strategy, const_val, neigh_size in ( + (np.isnan, p.nan_strategy, p.nan_constant_value, p.nan_neighbor_size), + ( + np.isposinf, + p.posinf_strategy, + p.posinf_constant_value, + p.posinf_neighbor_size, + ), + ( + np.isneginf, + p.neginf_strategy, + p.neginf_constant_value, + p.neginf_neighbor_size, + ), + ): + mask = target(y) + x, y, resized = _apply_signal_strategy( + x, + y, + mask, + strategy, + neigh_size, + const_val, + ) + + # Rebuild signal data (set_xydata with only x, y strips any error bars; + # do NOT use dst.dy = None / dst.dx = None afterwards because the property + # setters would re-expand xydata to 4 rows filled with NaN) + dst.set_xydata(x, y) + + if not resized: + restore_data_outside_roi(dst, src) + return dst diff --git a/sigima/tests/image/replace_special_values_unit_test.py b/sigima/tests/image/replace_special_values_unit_test.py new file mode 100644 index 0000000..b4eb8e1 --- /dev/null +++ b/sigima/tests/image/replace_special_values_unit_test.py @@ -0,0 +1,373 @@ +# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file. + +""" +Unit tests for replace_special_values (image) +---------------------------------------------- + +Tests for replacing NaN, +Inf and -Inf values in images using the various +strategies provided by :func:`sigima.proc.image.replace_special_values`. +""" + +from __future__ import annotations + +import numpy as np +import pytest + +import sigima.objects +import sigima.proc.image as sipi +from sigima.enums import ReplacementStrategyImage as S +from sigima.proc.base import ReplaceSpecialValuesImageParam +from sigima.tools.image.replace_values import count_special_values_2d + + +def _make_image(data: np.ndarray) -> sigima.objects.ImageObj: + """Helper: create an ImageObj from a 2-D array.""" + return sigima.objects.create_image("test", data) + + +# --------------------------------------------------------------------------- +# Fixed value strategies +# --------------------------------------------------------------------------- + + +class TestFixedValueStrategies: + """Test replacement with fixed values (zero, min, max, mean, median).""" + + @pytest.fixture() + def image_with_nan(self): + """Create a test image containing NaN values.""" + data = np.array([[1.0, np.nan, 3.0], [4.0, 5.0, np.nan], [7.0, 8.0, 9.0]]) + return _make_image(data) + + def test_replace_zero(self, image_with_nan): + """Test replacement of NaN values with zero.""" + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.ZERO, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sipi.replace_special_values(image_with_nan, p) + assert not np.any(np.isnan(dst.data)) + assert dst.data[0, 1] == 0.0 + assert dst.data[1, 2] == 0.0 + + def test_replace_min(self, image_with_nan): + """Test replacement of NaN values with the minimum of valid data.""" + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.MIN, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sipi.replace_special_values(image_with_nan, p) + valid_min = np.nanmin(image_with_nan.data) + assert dst.data[0, 1] == pytest.approx(valid_min) + assert dst.data[1, 2] == pytest.approx(valid_min) + + def test_replace_max(self, image_with_nan): + """Test replacement of NaN values with the maximum of valid data.""" + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.MAX, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sipi.replace_special_values(image_with_nan, p) + valid_max = np.nanmax(image_with_nan.data) + assert dst.data[0, 1] == pytest.approx(valid_max) + assert dst.data[1, 2] == pytest.approx(valid_max) + + def test_replace_mean(self, image_with_nan): + """Test replacement of NaN values with the mean of valid data.""" + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.MEAN, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sipi.replace_special_values(image_with_nan, p) + valid_mean = np.nanmean(image_with_nan.data) + assert dst.data[0, 1] == pytest.approx(valid_mean) + + def test_replace_median(self, image_with_nan): + """Test replacement of NaN values with the median of valid data.""" + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.MEDIAN, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sipi.replace_special_values(image_with_nan, p) + valid_median = np.nanmedian(image_with_nan.data) + assert dst.data[0, 1] == pytest.approx(valid_median) + + def test_replace_constant(self, image_with_nan): + """Test replacement of NaN values with a user-specified constant.""" + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.CONSTANT, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_constant_value=42.0, + ) + dst = sipi.replace_special_values(image_with_nan, p) + assert not np.any(np.isnan(dst.data)) + assert dst.data[0, 1] == 42.0 + assert dst.data[1, 2] == 42.0 + + +# --------------------------------------------------------------------------- +# Neighbor strategies +# --------------------------------------------------------------------------- + + +class TestNeighborStrategies: + """Test N-neighbor replacement strategies.""" + + @pytest.fixture() + def image_with_nan(self): + """Create a test image with a single NaN value surrounded by valid data.""" + data = np.ones((5, 5), dtype=float) * 4.0 + data[2, 2] = np.nan + return _make_image(data) + + def test_neighbor_mean(self, image_with_nan): + """Test replacement of NaN values with the mean of neighboring valid data.""" + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.NEIGHBOR_MEAN, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_neighbor_size=1, + ) + dst = sipi.replace_special_values(image_with_nan, p) + assert not np.any(np.isnan(dst.data)) + assert dst.data[2, 2] == pytest.approx(4.0) + + def test_neighbor_median(self, image_with_nan): + """Test replacement of NaN values with the median of neighboring valid data.""" + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.NEIGHBOR_MEDIAN, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_neighbor_size=1, + ) + dst = sipi.replace_special_values(image_with_nan, p) + assert not np.any(np.isnan(dst.data)) + assert dst.data[2, 2] == pytest.approx(4.0) + + def test_neighbor_min(self, image_with_nan): + """Test replacement of NaN values with the minimum of neighboring valid data.""" + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.NEIGHBOR_MIN, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_neighbor_size=1, + ) + dst = sipi.replace_special_values(image_with_nan, p) + assert not np.any(np.isnan(dst.data)) + assert dst.data[2, 2] == pytest.approx(4.0) + + def test_neighbor_max(self, image_with_nan): + """Test replacement of NaN values with the maximum of neighboring valid data.""" + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.NEIGHBOR_MAX, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_neighbor_size=1, + ) + dst = sipi.replace_special_values(image_with_nan, p) + assert not np.any(np.isnan(dst.data)) + assert dst.data[2, 2] == pytest.approx(4.0) + + +# --------------------------------------------------------------------------- +# Multiple targets +# --------------------------------------------------------------------------- + + +class TestMultipleTargets: + """Test independent processing of NaN, +Inf and -Inf.""" + + def test_all_three_targets(self): + """Test replacement of NaN, +Inf and -Inf values in the same image.""" + # Strategies are applied sequentially: NaN first, then +inf, then -inf. + # After NaN→ZERO, the data min includes 0.0, so -inf→MIN gives 0.0. + data = np.array([[1.0, np.nan, 3.0], [np.inf, 5.0, -np.inf], [7.0, 8.0, 9.0]]) + src = _make_image(data) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.ZERO, + posinf_strategy=S.MAX, + neginf_strategy=S.MIN, + ) + dst = sipi.replace_special_values(src, p) + assert not np.any(np.isnan(dst.data)) + assert not np.any(np.isinf(dst.data)) + assert dst.data[0, 1] == 0.0 # NaN → zero + assert dst.data[1, 0] == pytest.approx(9.0) # +inf → max(after NaN→0) + assert dst.data[1, 2] == pytest.approx(0.0) # -inf → min(after NaN→0) + + def test_none_leaves_unchanged(self): + """Test that 'none' strategies leave the corresponding values unchanged.""" + data = np.array([[1.0, np.nan, 3.0], [4.0, 5.0, 6.0]]) + src = _make_image(data) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.NONE, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sipi.replace_special_values(src, p) + assert np.isnan(dst.data[0, 1]) + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + """Test edge conditions and special inputs.""" + + def test_no_special_values(self): + """Test that an image with no special values is unchanged.""" + data = np.arange(9, dtype=float).reshape(3, 3) + src = _make_image(data) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.ZERO, + posinf_strategy=S.ZERO, + neginf_strategy=S.ZERO, + ) + dst = sipi.replace_special_values(src, p) + np.testing.assert_array_equal(dst.data, data) + + def test_no_special_values_preserves_float32_dtype(self): + """Test that an image with no special values keeps its dtype (e.g. float32).""" + data = np.arange(9, dtype=np.float32).reshape(3, 3) + src = _make_image(data) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.ZERO, + posinf_strategy=S.ZERO, + neginf_strategy=S.ZERO, + ) + dst = sipi.replace_special_values(src, p) + assert dst.data.dtype == np.float32 + np.testing.assert_array_equal(dst.data, data) + + def test_replace_special_values_rejects_integer_images(self): + """Test that attempting to replace special values in an integer image raises + a warning and leaves data unchanged.""" + data = np.arange(9, dtype=np.uint16).reshape(3, 3) + src = _make_image(data) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.CONSTANT, + nan_constant_value=10.0, + ) + with pytest.warns(UserWarning, match="not applicable to integer images"): + dst = sipi.replace_special_values(src, p) + assert dst is not src + assert dst.data.dtype == np.uint16 + np.testing.assert_array_equal(dst.data, src.data) + + def test_count_special_values_integer_image_is_zero(self): + """Test that counting special values in an integer image returns zero for + all types.""" + data = np.arange(9, dtype=np.uint16).reshape(3, 3) + assert count_special_values_2d(data) == { + "nan": 0, + "posinf": 0, + "neginf": 0, + } + + def test_posinf_only(self): + """Test replacement of +Inf values with zero.""" + data = np.array([[1.0, np.inf], [3.0, 4.0]]) + src = _make_image(data) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.NONE, + posinf_strategy=S.ZERO, + neginf_strategy=S.NONE, + ) + dst = sipi.replace_special_values(src, p) + assert dst.data[0, 1] == 0.0 + assert not np.any(np.isinf(dst.data)) + + +# --------------------------------------------------------------------------- +# Validation test (required by the test framework) +# --------------------------------------------------------------------------- + + +@pytest.mark.validation +def test_image_replace_special_values() -> None: + """Validation test for the image replace_special_values processing.""" + # Use NaN-only data for stat-based strategies (mean of Inf is NaN) + data_nan = np.array([[1.0, np.nan, 3.0], [4.0, 5.0, np.nan], [7.0, 8.0, 9.0]]) + data_all = np.array([[1.0, np.nan, 3.0], [np.inf, 5.0, -np.inf], [7.0, 8.0, 9.0]]) + + # Test fixed strategies on NaN-only data + for strategy in (S.ZERO, S.MIN, S.MAX, S.MEAN, S.MEDIAN): + src = _make_image(data_nan.copy()) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=strategy, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sigima.proc.image.replace_special_values(src, p) + assert not np.any(np.isnan(dst.data)) + + src = _make_image(data_nan.astype(np.float32)) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.CONSTANT, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_constant_value=4.0, + ) + dst = sigima.proc.image.replace_special_values(src, p) + assert dst.data.dtype == np.float32 + + # Test all three targets with non-stat strategies + src = _make_image(data_all.copy()) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.ZERO, + posinf_strategy=S.ZERO, + neginf_strategy=S.ZERO, + ) + dst = sigima.proc.image.replace_special_values(src, p) + assert not np.any(np.isnan(dst.data)) + assert not np.any(np.isinf(dst.data)) + + # Test neighbor strategies + data_smooth = np.arange(25, dtype=float).reshape(5, 5) + data_smooth[2, 2] = np.nan + for strategy in ( + S.NEIGHBOR_MIN, + S.NEIGHBOR_MAX, + S.NEIGHBOR_MEAN, + S.NEIGHBOR_MEDIAN, + ): + src = _make_image(data_smooth.copy()) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=strategy, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_neighbor_size=1, + ) + dst = sigima.proc.image.replace_special_values(src, p) + assert not np.any(np.isnan(dst.data)) + + # Test constant strategy + src = _make_image(data_nan.copy()) + p = ReplaceSpecialValuesImageParam.create( + nan_strategy=S.CONSTANT, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_constant_value=-999.0, + ) + dst = sigima.proc.image.replace_special_values(src, p) + assert not np.any(np.isnan(dst.data)) + assert dst.data[0, 1] == -999.0 + + +# --------------------------------------------------------------------------- +# Count special values utility +# --------------------------------------------------------------------------- + + +class TestCountSpecialValues2D: + """Test the count_special_values_2d utility.""" + + def test_count_mixed(self): + """Test counting of NaN, +Inf and -Inf values in a mixed array.""" + data = np.array([[1.0, np.nan], [np.inf, -np.inf]]) + counts = count_special_values_2d(data) + assert counts == {"nan": 1, "posinf": 1, "neginf": 1} + + def test_count_none(self): + """Test counting in an array with no special values.""" + data = np.array([[1.0, 2.0], [3.0, 4.0]]) + counts = count_special_values_2d(data) + assert counts == {"nan": 0, "posinf": 0, "neginf": 0} diff --git a/sigima/tests/signal/replace_special_values_unit_test.py b/sigima/tests/signal/replace_special_values_unit_test.py new file mode 100644 index 0000000..a043fc3 --- /dev/null +++ b/sigima/tests/signal/replace_special_values_unit_test.py @@ -0,0 +1,481 @@ +# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file. + +""" +Unit tests for replace_special_values (signal) +----------------------------------------------- + +Tests for replacing NaN, +Inf and -Inf values in signals using the various +strategies provided by :func:`sigima.proc.signal.replace_special_values`. +""" + +from __future__ import annotations + +import numpy as np +import pytest + +import sigima.objects +import sigima.proc.signal as sips +from sigima.enums import ReplacementStrategySignal as S +from sigima.proc.base import ReplaceSpecialValuesSignalParam +from sigima.tools.signal.replace_values import count_special_values + + +def _make_signal( + y: np.ndarray, x: np.ndarray | None = None +) -> sigima.objects.SignalObj: + """Helper: create a SignalObj from x/y arrays.""" + if x is None: + x = np.arange(len(y), dtype=float) + return sigima.objects.create_signal("test", x, y) + + +# --------------------------------------------------------------------------- +# Fixed value strategies +# --------------------------------------------------------------------------- + + +class TestFixedValueStrategies: + """Test replacement with fixed values (zero, min, max, mean, median).""" + + @pytest.fixture() + def signal_with_nan(self): + """Fixture: a signal containing NaN values.""" + y = np.array([1.0, np.nan, 3.0, np.nan, 5.0]) + return _make_signal(y) + + def test_replace_zero(self, signal_with_nan): + """Test replacement of NaN values with zero.""" + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.ZERO, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sips.replace_special_values(signal_with_nan, p) + assert not np.any(np.isnan(dst.y)) + assert dst.y[1] == 0.0 + assert dst.y[3] == 0.0 + + def test_replace_min(self, signal_with_nan): + """Test replacement of NaN values with the minimum of valid data.""" + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.MIN, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sips.replace_special_values(signal_with_nan, p) + assert dst.y[1] == 1.0 + assert dst.y[3] == 1.0 + + def test_replace_max(self, signal_with_nan): + """Test replacement of NaN values with the maximum of valid data.""" + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.MAX, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sips.replace_special_values(signal_with_nan, p) + assert dst.y[1] == 5.0 + assert dst.y[3] == 5.0 + + def test_replace_mean(self, signal_with_nan): + """Test replacement of NaN values with the mean of valid data.""" + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.MEAN, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sips.replace_special_values(signal_with_nan, p) + expected_mean = np.mean([1.0, 3.0, 5.0]) + np.testing.assert_allclose(dst.y[1], expected_mean) + np.testing.assert_allclose(dst.y[3], expected_mean) + + def test_replace_median(self, signal_with_nan): + """Test replacement of NaN values with the median of valid data.""" + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.MEDIAN, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sips.replace_special_values(signal_with_nan, p) + expected_median = np.median([1.0, 3.0, 5.0]) + np.testing.assert_allclose(dst.y[1], expected_median) + + def test_replace_constant(self, signal_with_nan): + """Test replacement of NaN values with a user-specified constant.""" + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.CONSTANT, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_constant_value=42.0, + ) + dst = sips.replace_special_values(signal_with_nan, p) + assert not np.any(np.isnan(dst.y)) + assert dst.y[1] == 42.0 + assert dst.y[3] == 42.0 + + +# --------------------------------------------------------------------------- +# Removal strategies (signal only) +# --------------------------------------------------------------------------- + + +class TestRemovalStrategies: + """Test delete, forward fill, and backward fill.""" + + def test_delete(self): + """Test deletion of NaN values.""" + x = np.array([0.0, 1.0, 2.5, 4.5, 7.0]) + y = np.array([1.0, np.nan, 3.0, 4.0, 5.0]) + src = _make_signal(y, x) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.DELETE, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sips.replace_special_values(src, p) + assert len(dst.y) == 4 + np.testing.assert_array_equal(dst.y, [1.0, 3.0, 4.0, 5.0]) + np.testing.assert_array_equal(dst.x, [0.0, 2.5, 4.5, 7.0]) + + def test_delete_warns_uniform_sampling(self): + """Test that deletion of NaN values emits a warning about uniform sampling.""" + x = np.linspace(0, 10, 100) + y = np.sin(x) + y[50] = np.nan + src = _make_signal(y, x) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.DELETE, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + with pytest.warns(UserWarning, match="uniformly sampled"): + sips.replace_special_values(src, p) + + def test_forward_fill(self): + """Test forward fill of NaN values.""" + y = np.array([1.0, np.nan, np.nan, 4.0, 5.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.FORWARD_FILL, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sips.replace_special_values(src, p) + np.testing.assert_array_equal(dst.y, [1.0, 1.0, 1.0, 4.0, 5.0]) + + def test_forward_fill_leading_nan(self): + """Test forward fill when leading values are NaN.""" + y = np.array([np.nan, np.nan, 3.0, 4.0, 5.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.FORWARD_FILL, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sips.replace_special_values(src, p) + np.testing.assert_array_equal(dst.y, [3.0, 3.0, 3.0, 4.0, 5.0]) + + def test_backward_fill(self): + """Test backward fill of NaN values.""" + y = np.array([1.0, np.nan, np.nan, 4.0, 5.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.BACKWARD_FILL, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sips.replace_special_values(src, p) + np.testing.assert_array_equal(dst.y, [1.0, 4.0, 4.0, 4.0, 5.0]) + + def test_backward_fill_trailing_nan(self): + """Test backward fill when trailing values are NaN.""" + y = np.array([1.0, 2.0, 3.0, np.nan, np.nan]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.BACKWARD_FILL, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sips.replace_special_values(src, p) + np.testing.assert_array_equal(dst.y, [1.0, 2.0, 3.0, 3.0, 3.0]) + + +# --------------------------------------------------------------------------- +# Interpolation strategies +# --------------------------------------------------------------------------- + + +class TestInterpolationStrategies: + """Test interpolation-based replacement.""" + + @pytest.fixture() + def signal_with_gap(self): + """Fixture: a signal containing NaN values with valid data on both sides.""" + x = np.arange(10, dtype=float) + y = 2.0 * x + 1.0 # linear: y = 2x + 1 + y[3] = np.nan + y[7] = np.nan + return _make_signal(y, x) + + def test_interp_linear(self, signal_with_gap): + """Test linear interpolation of NaN values.""" + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.INTERP_LINEAR, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sips.replace_special_values(signal_with_gap, p) + # Linear data → perfect reconstruction + np.testing.assert_allclose(dst.y[3], 7.0, atol=1e-10) + np.testing.assert_allclose(dst.y[7], 15.0, atol=1e-10) + + def test_interp_cubic(self, signal_with_gap): + """Test cubic interpolation of NaN values.""" + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.INTERP_CUBIC, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sips.replace_special_values(signal_with_gap, p) + np.testing.assert_allclose(dst.y[3], 7.0, atol=1e-6) + + def test_interp_pchip(self, signal_with_gap): + """Test PCHIP interpolation of NaN values.""" + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.INTERP_PCHIP, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sips.replace_special_values(signal_with_gap, p) + np.testing.assert_allclose(dst.y[3], 7.0, atol=1e-6) + + +# --------------------------------------------------------------------------- +# Neighbor strategies +# --------------------------------------------------------------------------- + + +class TestNeighborStrategies: + """Test neighbor-based replacement.""" + + def test_neighbor_mean(self): + """Test replacement of NaN values with the mean of neighboring valid data.""" + y = np.array([1.0, 2.0, np.nan, 4.0, 5.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.NEIGHBOR_MEAN, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_neighbor_size=1, + ) + dst = sips.replace_special_values(src, p) + # Neighbors of index 2 are [2.0, 4.0] → mean = 3.0 + np.testing.assert_allclose(dst.y[2], 3.0) + + def test_neighbor_median(self): + """Test replacement of NaN values with the median of neighboring valid data.""" + y = np.array([1.0, 2.0, np.nan, 8.0, 5.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.NEIGHBOR_MEDIAN, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_neighbor_size=1, + ) + dst = sips.replace_special_values(src, p) + # Neighbors of index 2 are [2.0, 8.0] → median = 5.0 + np.testing.assert_allclose(dst.y[2], 5.0) + + def test_neighbor_min(self): + """Test replacement of NaN values with the minimum of neighboring valid data.""" + y = np.array([1.0, 2.0, np.nan, 8.0, 5.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.NEIGHBOR_MIN, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_neighbor_size=1, + ) + dst = sips.replace_special_values(src, p) + # Neighbors of index 2 are [2.0, 8.0] → min = 2.0 + np.testing.assert_allclose(dst.y[2], 2.0) + + def test_neighbor_max(self): + """Test replacement of NaN values with the maximum of neighboring valid data.""" + y = np.array([1.0, 2.0, np.nan, 8.0, 5.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.NEIGHBOR_MAX, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_neighbor_size=1, + ) + dst = sips.replace_special_values(src, p) + # Neighbors of index 2 are [2.0, 8.0] → max = 8.0 + np.testing.assert_allclose(dst.y[2], 8.0) + + +# --------------------------------------------------------------------------- +# Multiple targets (NaN + Inf) +# --------------------------------------------------------------------------- + + +class TestMultipleTargets: + """Test replacing NaN, +Inf and -Inf simultaneously.""" + + def test_all_three_targets(self): + """Test replacement of NaN, +Inf and -Inf values in the same signal.""" + y = np.array([1.0, np.nan, np.inf, -np.inf, 5.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.ZERO, + posinf_strategy=S.MAX, + neginf_strategy=S.MIN, + ) + dst = sips.replace_special_values(src, p) + assert dst.y[1] == 0.0 # NaN → 0 + assert dst.y[2] == 5.0 # +inf → max of valid data + assert dst.y[3] == 0.0 # -inf → min (0.0 is now min after NaN→0) + + def test_none_strategy_skips(self): + """Test that the NONE strategy skips replacement.""" + y = np.array([1.0, np.nan, 3.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.NONE, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sips.replace_special_values(src, p) + assert np.isnan(dst.y[1]) + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + """Test edge cases.""" + + def test_no_special_values(self): + """Test that a signal with no special values is unchanged.""" + y = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.ZERO, posinf_strategy=S.ZERO, neginf_strategy=S.ZERO + ) + dst = sips.replace_special_values(src, p) + np.testing.assert_array_equal(dst.y, y) + + def test_all_nan(self): + """Test that a signal with all NaN values is replaced correctly.""" + y = np.array([np.nan, np.nan, np.nan]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.ZERO, posinf_strategy=S.NONE, neginf_strategy=S.NONE + ) + dst = sips.replace_special_values(src, p) + np.testing.assert_array_equal(dst.y, [0.0, 0.0, 0.0]) + + def test_posinf_only(self): + """Test replacement of +Inf values with zero.""" + y = np.array([1.0, np.inf, 3.0]) + src = _make_signal(y) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.NONE, posinf_strategy=S.ZERO, neginf_strategy=S.NONE + ) + dst = sips.replace_special_values(src, p) + assert dst.y[1] == 0.0 + assert dst.y[0] == 1.0 + assert dst.y[2] == 3.0 + + +# --------------------------------------------------------------------------- +# Validation test (required by the test framework) +# --------------------------------------------------------------------------- + + +@pytest.mark.validation +def test_signal_replace_special_values() -> None: + """Validation test for the signal replace_special_values processing.""" + # Use separate data per category to avoid cross-contamination + # (e.g. mean of data containing Inf is NaN) + y_nan = np.array([1.0, np.nan, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]) + y_all = np.array([1.0, np.nan, 3.0, np.inf, -np.inf, 6.0, 7.0, 8.0]) + + # Test fixed strategies on NaN-only data + for strategy in (S.ZERO, S.MIN, S.MAX, S.MEAN, S.MEDIAN): + src = _make_signal(y_nan.copy()) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=strategy, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sigima.proc.signal.replace_special_values(src, p) + assert not np.any(np.isnan(dst.y)) + + # Test all three targets with non-stat strategies + src = _make_signal(y_all.copy()) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.ZERO, + posinf_strategy=S.ZERO, + neginf_strategy=S.ZERO, + ) + dst = sigima.proc.signal.replace_special_values(src, p) + assert not np.any(np.isnan(dst.y)) + assert not np.any(np.isinf(dst.y)) + + # Test interpolation strategies + for strategy in (S.INTERP_LINEAR, S.INTERP_CUBIC, S.INTERP_PCHIP): + src = _make_signal(y_nan.copy()) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=strategy, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + ) + dst = sigima.proc.signal.replace_special_values(src, p) + assert not np.any(np.isnan(dst.y)) + + # Test neighbor strategies + for strategy in ( + S.NEIGHBOR_MIN, + S.NEIGHBOR_MAX, + S.NEIGHBOR_MEAN, + S.NEIGHBOR_MEDIAN, + ): + src = _make_signal(y_nan.copy()) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=strategy, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_neighbor_size=1, + ) + dst = sigima.proc.signal.replace_special_values(src, p) + assert not np.any(np.isnan(dst.y)) + + # Test constant strategy + src = _make_signal(y_nan.copy()) + p = ReplaceSpecialValuesSignalParam.create( + nan_strategy=S.CONSTANT, + posinf_strategy=S.NONE, + neginf_strategy=S.NONE, + nan_constant_value=-999.0, + ) + dst = sigima.proc.signal.replace_special_values(src, p) + assert not np.any(np.isnan(dst.y)) + assert dst.y[1] == -999.0 + + +# --------------------------------------------------------------------------- +# Count special values utility +# --------------------------------------------------------------------------- + + +class TestCountSpecialValues: + """Test the count_special_values utility.""" + + def test_count_mixed(self): + """Test counting of NaN, +Inf and -Inf values in a mixed array.""" + y = np.array([1.0, np.nan, np.inf, -np.inf, 5.0, np.nan]) + + counts = count_special_values(y) + assert counts == {"nan": 2, "posinf": 1, "neginf": 1} + + def test_count_none(self): + """Test counting when there are no special values.""" + y = np.array([1.0, 2.0, 3.0]) + + counts = count_special_values(y) + assert counts == {"nan": 0, "posinf": 0, "neginf": 0} + + def test_count_all_nan(self): + """Test counting when all values are NaN.""" + y = np.array([np.nan, np.nan]) + + counts = count_special_values(y) + assert counts == {"nan": 2, "posinf": 0, "neginf": 0} diff --git a/sigima/tools/image/replace_values.py b/sigima/tools/image/replace_values.py new file mode 100644 index 0000000..005cb30 --- /dev/null +++ b/sigima/tools/image/replace_values.py @@ -0,0 +1,213 @@ +# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file. + +""" +Replace special values in 2D images +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Low-level NumPy/OpenCV algorithms for replacing NaN, +Inf and -Inf values in +2D arrays. These functions operate on raw arrays and are called by the +higher-level :mod:`sigima.proc.image.exposure` functions. +""" + +from __future__ import annotations + +import warnings + +import numpy as np + +# Stat function registry and fallback chains. +# When a statistic produces NaN (e.g. nanmean on [+inf, -inf]), we fall back +# to the next statistic in the chain before resorting to 0.0. +_STAT_FUNCS = { + "min": np.nanmin, + "max": np.nanmax, + "mean": np.nanmean, + "median": np.nanmedian, +} + +_STAT_FALLBACKS: dict[str, list[str]] = { + "mean": ["median", "min"], + "median": ["mean", "min"], + "min": ["max"], + "max": ["min"], +} + + +def _compute_stat_with_fallback(valid: np.ndarray, stat: str) -> float: + """Compute *stat* on *valid* data with a fallback chain. + + If the primary statistic yields NaN (can happen when *valid* contains + mixed infinities), the function tries the fallback statistics defined in + :data:`_STAT_FALLBACKS` before returning ``0.0``. + """ + chain = [stat] + _STAT_FALLBACKS.get(stat, []) + for name in chain: + with np.errstate(invalid="ignore"): + value = float(_STAT_FUNCS[name](valid)) + if not np.isnan(value): + if name != stat: + warnings.warn( + f"Statistic '{stat}' produced NaN; falling back to '{name}'.", + stacklevel=3, + ) + return value + warnings.warn( + "All statistics produced NaN; filling with 0.", + stacklevel=3, + ) + return 0.0 + + +def replace_with_fixed_2d( + data: np.ndarray, mask: np.ndarray, value: float +) -> np.ndarray: + """Replace masked positions with a fixed *value*. + + Args: + data: 2-D data array (modified in place). + mask: boolean mask (``True`` where replacement is needed). + value: replacement value. + + Returns: + *data* (modified in place). + """ + data[mask] = value + return data + + +def replace_with_stat_2d( + data: np.ndarray, + mask: np.ndarray, + stat: str, +) -> np.ndarray: + """Replace masked positions with a statistic computed on valid data. + + Only finite values (excluding NaN, +Inf, -Inf) are used to compute the + statistic, so that special values still present for other targets do not + bias the result. + + Args: + data: 2-D data array (modified in place). + mask: boolean mask. + stat: one of ``"min"``, ``"max"``, ``"mean"``, ``"median"``. + + Returns: + *data* (modified in place). + """ + valid = data[~mask & np.isfinite(data)] + if valid.size == 0: + warnings.warn( + "No valid data to compute statistic; filling with 0.", stacklevel=2 + ) + data[mask] = 0.0 + return data + data[mask] = _compute_stat_with_fallback(valid, stat) + return data + + +def neighbor_replace_2d( + data: np.ndarray, + mask: np.ndarray, + n: int, + stat: str, +) -> np.ndarray: + """Replace masked positions using statistics of their neighborhood. + + When no valid neighbor is found within the initial radius *n*, the search + radius is progressively doubled until valid data is found or the full + image has been scanned. If still no valid neighbor exists, the + corresponding global statistic is used as a fallback. As a last resort + the value is set to ``0.0``. + + Args: + data: 2-D data array (modified in place). + mask: boolean mask. + n: neighborhood radius (the window is ``(2*n+1) × (2*n+1)``). + stat: ``"min"``, ``"max"``, ``"mean"`` or ``"median"``. + + Returns: + *data* (modified in place). + """ + funcs = { + "min": np.nanmin, + "max": np.nanmax, + "mean": np.nanmean, + "median": np.nanmedian, + } + func = funcs[stat] + rows, cols = data.shape + max_dim = max(rows, cols) + data_orig = data.copy() + data_orig[mask] = np.nan + + # Pre-compute global fallback (all valid finite values) + all_valid = data_orig[np.isfinite(data_orig)] + if all_valid.size > 0: + global_fallback = float(func(all_valid)) + else: + global_fallback = 0.0 + + for r, c in zip(*np.where(mask)): + value = _neighbor_search_2d(data_orig, r, c, n, rows, cols, max_dim, func) + if value is not None: + data[r, c] = value + else: + if global_fallback != 0.0: + warnings.warn( + f"No valid neighbor found at ({r}, {c}); " + f"using global {stat} as fallback.", + stacklevel=2, + ) + data[r, c] = global_fallback + return data + + +def _neighbor_search_2d( + data_orig: np.ndarray, + r: int, + c: int, + n: int, + rows: int, + cols: int, + max_dim: int, + func, +) -> float | None: + """Search for valid neighbors with progressive radius expansion. + + Returns the computed statistic or ``None`` if no valid neighbor exists. + """ + radius = n + while radius < max_dim: + r_lo, r_hi = max(0, r - radius), min(rows, r + radius + 1) + c_lo, c_hi = max(0, c - radius), min(cols, c + radius + 1) + patch = data_orig[r_lo:r_hi, c_lo:c_hi] + valid = patch[np.isfinite(patch)] + if valid.size > 0: + with np.errstate(invalid="ignore"): + result = float(func(valid)) + if not np.isnan(result): + return result + # Double the radius for the next attempt + radius *= 2 + return None + + +def count_special_values_2d( + data: np.ndarray, +) -> dict[str, int]: + """Count special values in a 2-D array. + + Args: + data: 2-D data array. + + Returns: + Dictionary with keys ``"nan"``, ``"posinf"``, ``"neginf"`` + and integer counts. + """ + if np.issubdtype(data.dtype, np.integer): + return {"nan": 0, "posinf": 0, "neginf": 0} + return { + "nan": int(np.count_nonzero(np.isnan(data))), + "posinf": int(np.count_nonzero(np.isposinf(data))), + "neginf": int(np.count_nonzero(np.isneginf(data))), + } diff --git a/sigima/tools/signal/replace_values.py b/sigima/tools/signal/replace_values.py new file mode 100644 index 0000000..dd7035d --- /dev/null +++ b/sigima/tools/signal/replace_values.py @@ -0,0 +1,342 @@ +# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file. + +""" +Replace special values in 1D signals +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Low-level NumPy algorithms for replacing NaN, +Inf and -Inf values in 1D +arrays. These functions operate on raw arrays and are called by the +higher-level :mod:`sigima.proc.signal.processing` functions. +""" + +from __future__ import annotations + +import warnings + +import numpy as np +import scipy.interpolate + +from sigima.enums import Interpolation1DMethod + +# Stat function registry and fallback chains. +# When a statistic produces NaN (e.g. nanmean on [+inf, -inf]), we fall back +# to the next statistic in the chain before resorting to 0.0. +_STAT_FUNCS = { + "min": np.nanmin, + "max": np.nanmax, + "mean": np.nanmean, + "median": np.nanmedian, +} + +_STAT_FALLBACKS: dict[str, list[str]] = { + "mean": ["median", "min"], + "median": ["mean", "min"], + "min": ["max"], + "max": ["min"], +} + + +def _compute_stat_with_fallback(valid: np.ndarray, stat: str) -> float: + """Compute *stat* on *valid* data with a fallback chain. + + If the primary statistic yields NaN (can happen when *valid* contains + mixed infinities), the function tries the fallback statistics defined in + :data:`_STAT_FALLBACKS` before returning ``0.0``. + """ + chain = [stat] + _STAT_FALLBACKS.get(stat, []) + for name in chain: + with np.errstate(invalid="ignore"): + value = float(_STAT_FUNCS[name](valid)) + if not np.isnan(value): + if name != stat: + warnings.warn( + f"Statistic '{stat}' produced NaN; falling back to '{name}'.", + stacklevel=3, + ) + return value + warnings.warn( + "All statistics produced NaN; filling with 0.", + stacklevel=3, + ) + return 0.0 + + +def check_uniform_sampling(x: np.ndarray, rtol: float = 1e-6) -> bool: + """Check whether *x* is uniformly sampled. + + Args: + x: 1-D array of abscissa values (must be sorted). + rtol: relative tolerance for the spacing comparison. + + Returns: + ``True`` if the spacing between consecutive points is constant + (within *rtol*). + """ + if x.size < 2: + return True + dx = np.diff(x) + return bool(np.allclose(dx, dx[0], rtol=rtol)) + + +def replace_with_fixed(y: np.ndarray, mask: np.ndarray, value: float) -> np.ndarray: + """Replace masked positions with a fixed *value*. + + Args: + y: data array (modified in place). + mask: boolean mask (``True`` where replacement is needed). + value: replacement value. + + Returns: + *y* (modified in place). + """ + y[mask] = value + return y + + +def replace_with_stat( + y: np.ndarray, + mask: np.ndarray, + stat: str, +) -> np.ndarray: + """Replace masked positions with a statistic computed on valid data. + + Only finite values (excluding NaN, +Inf, -Inf) are used to compute the + statistic, so that special values still present for other targets do not + bias the result. + + Args: + y: data array (modified in place). + mask: boolean mask. + stat: one of ``"min"``, ``"max"``, ``"mean"``, ``"median"``. + + Returns: + *y* (modified in place). + """ + valid = y[~mask & np.isfinite(y)] + if valid.size == 0: + warnings.warn( + "No valid data to compute statistic; filling with 0.", stacklevel=2 + ) + y[mask] = 0.0 + return y + y[mask] = _compute_stat_with_fallback(valid, stat) + return y + + +def delete_masked_points( + x: np.ndarray, y: np.ndarray, mask: np.ndarray +) -> tuple[np.ndarray, np.ndarray]: + """Delete points where *mask* is ``True``. + + Args: + x: abscissa array. + y: ordinate array. + mask: boolean mask of points to remove. + + Returns: + Tuple ``(x_new, y_new)`` with masked points removed. + """ + keep = ~mask + return x[keep], y[keep] + + +def forward_fill(y: np.ndarray, mask: np.ndarray) -> np.ndarray: + """Fill masked positions with the previous valid value (forward fill). + + If the first element(s) are masked, they are filled with the first valid value + encountered. + + Args: + y: data array (modified in place). + mask: boolean mask. + + Returns: + *y* (modified in place). + """ + indices = np.where(~mask, np.arange(len(y)), 0) + np.maximum.accumulate(indices, out=indices) + # Handle leading masked values: fill with first valid + first_valid = np.argmax(~mask) + indices[:first_valid] = first_valid + y[:] = y[indices] + return y + + +def backward_fill(y: np.ndarray, mask: np.ndarray) -> np.ndarray: + """Fill masked positions with the next valid value (backward fill). + + If the last element(s) are masked, they are filled with the last valid value + encountered. + + Args: + y: data array (modified in place). + mask: boolean mask. + + Returns: + *y* (modified in place). + """ + n = len(y) + indices = np.where(~mask, np.arange(n), n - 1) + # Reverse accumulate minimum + indices = np.minimum.accumulate(indices[::-1])[::-1] + # Handle trailing masked values + last_valid = n - 1 - np.argmax(~mask[::-1]) + indices[indices > last_valid] = last_valid + y[:] = y[indices] + return y + + +_INTERP_METHOD_MAP = { + "interp_linear": Interpolation1DMethod.LINEAR, + "interp_spline": Interpolation1DMethod.SPLINE, + "interp_quadratic": Interpolation1DMethod.QUADRATIC, + "interp_cubic": Interpolation1DMethod.CUBIC, + "interp_pchip": Interpolation1DMethod.PCHIP, +} + + +def interpolate_masked( + x: np.ndarray, + y: np.ndarray, + mask: np.ndarray, + method: str, +) -> np.ndarray: + """Interpolate values at masked positions using valid neighbors. + + Args: + x: abscissa array. + y: data array (modified in place). + mask: boolean mask. + method: strategy value string (e.g. ``"interp_linear"``). + + Returns: + *y* (modified in place). + """ + valid = ~mask + if valid.sum() < 2: + warnings.warn( + "Not enough valid points for interpolation; filling with 0.", stacklevel=2 + ) + y[mask] = 0.0 + return y + + interp_method = _INTERP_METHOD_MAP[method] + x_valid, y_valid = x[valid], y[valid] + x_masked = x[mask] + + if interp_method == Interpolation1DMethod.LINEAR: + y[mask] = np.interp(x_masked, x_valid, y_valid) + elif interp_method == Interpolation1DMethod.SPLINE: + knots, coeffs, degree = scipy.interpolate.splrep(x_valid, y_valid, s=0) + y[mask] = scipy.interpolate.splev(x_masked, (knots, coeffs, degree), der=0) + elif interp_method == Interpolation1DMethod.QUADRATIC: + coeffs = np.polyfit(x_valid, y_valid, min(2, len(x_valid) - 1)) + y[mask] = np.polyval(coeffs, x_masked) + elif interp_method == Interpolation1DMethod.CUBIC: + interp = scipy.interpolate.Akima1DInterpolator(x_valid, y_valid) + y[mask] = interp(x_masked) + elif interp_method == Interpolation1DMethod.PCHIP: + interp = scipy.interpolate.PchipInterpolator(x_valid, y_valid) + y[mask] = interp(x_masked) + else: + raise ValueError(f"Unknown interpolation method: {method}") + return y + + +def neighbor_replace( + y: np.ndarray, + mask: np.ndarray, + n: int, + stat: str, +) -> np.ndarray: + """Replace masked positions using statistics of their *n* nearest valid neighbors. + + When no valid neighbor is found within the initial radius *n*, the search + radius is progressively doubled until valid data is found or the full array + has been scanned. If still no valid neighbor exists, the corresponding + global statistic is used as a fallback. As a last resort the value is set + to ``0.0``. + + Args: + y: data array (modified in place). + mask: boolean mask. + n: number of neighbors to consider on each side. + stat: ``"min"``, ``"max"``, ``"mean"`` or ``"median"``. + + Returns: + *y* (modified in place). + """ + funcs = { + "min": np.nanmin, + "max": np.nanmax, + "mean": np.nanmean, + "median": np.nanmedian, + } + func = funcs[stat] + size = len(y) + # Work on a copy to avoid using already-replaced values + y_orig = y.copy() + y_orig[mask] = np.nan + + # Pre-compute global fallback (all valid, i.e. non-masked, finite values) + all_valid = y_orig[np.isfinite(y_orig)] + if all_valid.size > 0: + global_fallback = float(func(all_valid)) + else: + global_fallback = 0.0 + + for idx in np.where(mask)[0]: + value = _neighbor_search_1d(y_orig, idx, n, size, func) + if value is not None: + y[idx] = value + else: + if global_fallback != 0.0: + warnings.warn( + f"No valid neighbor found at index {idx}; " + f"using global {stat} as fallback.", + stacklevel=2, + ) + y[idx] = global_fallback + return y + + +def _neighbor_search_1d( + y_orig: np.ndarray, idx: int, n: int, size: int, func +) -> float | None: + """Search for valid neighbors with progressive radius expansion. + + Returns the computed statistic or ``None`` if no valid neighbor exists. + """ + radius = n + while radius < size: + lo = max(0, idx - radius) + hi = min(size, idx + radius + 1) + neighbors = y_orig[lo:hi] + valid = neighbors[np.isfinite(neighbors)] + if valid.size > 0: + with np.errstate(invalid="ignore"): + result = float(func(valid)) + if not np.isnan(result): + return result + # Double the radius for the next attempt + radius *= 2 + return None + + +def count_special_values( + y: np.ndarray, +) -> dict[str, int]: + """Count special values in a 1-D array. + + Args: + y: data array. + + Returns: + Dictionary with keys ``"nan"``, ``"posinf"``, ``"neginf"`` + and integer counts. + """ + return { + "nan": int(np.count_nonzero(np.isnan(y))), + "posinf": int(np.count_nonzero(np.isposinf(y))), + "neginf": int(np.count_nonzero(np.isneginf(y))), + }