diff --git a/dpsynth/data_generation_v3.py b/dpsynth/data_generation_v3.py index d4172f8..85573cc 100644 --- a/dpsynth/data_generation_v3.py +++ b/dpsynth/data_generation_v3.py @@ -28,6 +28,7 @@ from dpsynth.local_mode import primitives from dpsynth.local_mode import vectorized_transformations as vtx import mbi +from mbi import estimation as mbi_estimation import numpy as np import pandas as pd @@ -36,7 +37,10 @@ def _create_initializers( domains: Mapping[str, domain.AttributeType], numerical_bins: int, init_delta: float, -) -> dict[str, primitives.DPMechanism]: +) -> tuple[ + dict[str, primitives.DPMechanism], + dict[str, primitives.DPMechanism], +]: """Creates per-column initializers from the domain specification. Args: @@ -45,30 +49,31 @@ def _create_initializers( init_delta: Delta for open-set categorical partition selection. Returns: - A dictionary mapping column names to uncalibrated initializer instances. + A (closed_domain_initializers, numerical_initializers) tuple. Raises: ValueError: If a column has an unsupported attribute type. """ - initializers = {} + closed_inits = {} + numerical_inits = {} for col, attr in domains.items(): if isinstance(attr, domain.NumericalAttribute): - initializers[col] = initialization.NumericalInitializer( + numerical_inits[col] = initialization.NumericalInitializer( name=col, num_partitions=numerical_bins, attribute=attr ) elif isinstance(attr, domain.CategoricalAttribute): - initializers[col] = initialization.CategoricalInitializer( + closed_inits[col] = initialization.CategoricalInitializer( name=col, attribute=attr ) elif isinstance(attr, domain.OpenSetCategoricalAttribute): - initializers[col] = initialization.OpenSetCategoricalInitializer( + closed_inits[col] = initialization.OpenSetCategoricalInitializer( name=col, attribute=attr, delta=init_delta ) else: raise ValueError( f'Unsupported attribute type for column {col!r}: {type(attr)}' ) - return initializers + return closed_inits, numerical_inits @dataclasses.dataclass @@ -100,6 +105,7 @@ class DataGenerationV3(primitives.DPMechanism): default_factory=discrete_mechanisms.MSTMechanism ) initializers: dict[str, primitives.DPMechanism] | None = None + total_count_mechanism: primitives.DPGaussianCount | None = None cross_attribute_constraints: Sequence[constraints.Constraint] = () def calibrate( @@ -178,15 +184,41 @@ def _calibrate_zcdp( self, zcdp_rho, numerical_bins, init_delta, init_budget_fraction ): """Simple additive zCDP budget split.""" - inits = self.initializers or _create_initializers( - self.domains, numerical_bins, init_delta + if self.initializers is not None: + all_inits = dict(self.initializers) + has_closed_domain = any( + not isinstance(init, initialization.NumericalInitializer) + for init in all_inits.values() + ) + else: + closed_inits, numerical_inits = _create_initializers( + self.domains, numerical_bins, init_delta + ) + all_inits = {**closed_inits, **numerical_inits} + has_closed_domain = bool(closed_inits) + + has_numerical = any( + isinstance(init, initialization.NumericalInitializer) + for init in all_inits.values() ) + needs_total_count = has_numerical and not has_closed_domain + init_rho = init_budget_fraction * zcdp_rho - per_col_rho = init_rho / len(inits) + # If we need a separate total count and have no closed-domain columns to + # estimate it from, allocate one extra share of init budget for it. + num_shares = len(all_inits) + (1 if needs_total_count else 0) + per_col_rho = init_rho / num_shares discrete_rho = zcdp_rho - init_rho + calibrated_inits = { - col: init.calibrate(zcdp_rho=per_col_rho) for col, init in inits.items() + col: init.calibrate(zcdp_rho=per_col_rho) + for col, init in all_inits.items() } + calibrated_total = ( + primitives.DPGaussianCount().calibrate(zcdp_rho=per_col_rho) + if needs_total_count + else None + ) calibrated_discrete = self.discrete_mechanism.calibrate( zcdp_rho=discrete_rho ) @@ -194,6 +226,7 @@ def _calibrate_zcdp( self, initializers=calibrated_inits, discrete_mechanism=calibrated_discrete, + total_count_mechanism=calibrated_total, ) def _calibrate_approx_dp( @@ -226,10 +259,24 @@ def _calibrate_approx_dp( Returns: A new DataGenerationV3 instance with calibrated sub-mechanisms. """ - inits = self.initializers or _create_initializers( - self.domains, numerical_bins, init_delta + if self.initializers is not None: + inits = dict(self.initializers) + has_closed_domain = any( + not isinstance(init, initialization.NumericalInitializer) + for init in inits.values() + ) + else: + closed_inits, numerical_inits = _create_initializers( + self.domains, numerical_bins, init_delta + ) + inits = {**closed_inits, **numerical_inits} + has_closed_domain = bool(closed_inits) + has_numerical = any( + isinstance(init, initialization.NumericalInitializer) + for init in inits.values() ) - num_columns = len(inits) + needs_total_count = has_numerical and not has_closed_domain + num_columns = len(inits) + (1 if needs_total_count else 0) # Stage 1: Convert (epsilon, remaining_delta) to zCDP and calibrate # initializers with init_budget_fraction of that budget. @@ -244,11 +291,17 @@ def _calibrate_approx_dp( calibrated_inits = { col: init.calibrate(zcdp_rho=per_col_rho) for col, init in inits.items() } - + calibrated_total = ( + primitives.DPGaussianCount().calibrate(zcdp_rho=per_col_rho) + if needs_total_count + else None + ) # Stage 2: With init dp_events fixed, find the tightest discrete budget. # The accountant handles ApproximateDpEvent deltas from open-set # initializers automatically. init_events = [init.dp_event for init in calibrated_inits.values()] + if calibrated_total is not None: + init_events.append(calibrated_total.dp_event) # Determine accountant type based on discrete mechanism's dp_event. probe_event = self.discrete_mechanism.calibrate(zcdp_rho=1.0).dp_event @@ -277,6 +330,7 @@ def make_event_from_param(discrete_rho): self, initializers=calibrated_inits, discrete_mechanism=calibrated_discrete, + total_count_mechanism=calibrated_total, ) @property @@ -292,6 +346,8 @@ def dp_event(self) -> dp_accounting.DpEvent: if self.initializers is None: raise ValueError('Must call calibrate() before accessing dp_event.') events = [init.dp_event for init in self.initializers.values()] + if self.total_count_mechanism is not None: + events.append(self.total_count_mechanism.dp_event) events.append(self.discrete_mechanism.dp_event) return dp_accounting.ComposedDpEvent(events) @@ -321,9 +377,34 @@ def __call__( ) # Phase 1: Per-column initialization. + # Initialize closed-domain columns first to estimate the total count, + # then pass it to numerical initializers for heuristic measurements. col_results: dict[str, initialization.ColumnMeasurement] = {} + closed_domain_measurements = [] for col, init in self.initializers.items(): - col_results[col] = init(rng, data[col].values) + if not isinstance(init, initialization.NumericalInitializer): + col_results[col] = init(rng, data[col].values) + if col_results[col].measurement is not None: + closed_domain_measurements.append(col_results[col].measurement) + + # Estimate total from closed-domain measurements or DPGaussianCount. + estimated_total = None + if closed_domain_measurements: + estimated_total = mbi_estimation.minimum_variance_unbiased_total( + closed_domain_measurements + ) + elif self.total_count_mechanism is not None: + # Pick an arbitrary column to count records. + any_col = next(iter(self.domains)) + estimated_total = max( + 1.0, self.total_count_mechanism(rng, data[any_col].values) + ) + + for col, init in self.initializers.items(): + if isinstance(init, initialization.NumericalInitializer): + col_results[col] = init( + rng, data[col].values, estimated_total=estimated_total + ) # Phase 2: Encode data to discrete domain. discrete_domains = {} diff --git a/dpsynth/domain.py b/dpsynth/domain.py index 67458df..ddce1b5 100644 --- a/dpsynth/domain.py +++ b/dpsynth/domain.py @@ -187,6 +187,13 @@ def exclusive_min_value(self) -> float: return self.min_value - 1 return math.nextafter(self.min_value, -math.inf) + @property + def exclusive_max_value(self) -> float: + """Returns the exclusive maximum value for this attribute.""" + if self.dtype == 'int': + return self.max_value + 1 + return math.nextafter(self.max_value, math.inf) + def standardize(self, value: Any) -> int | float | None: """Standardizes a value to one of the possible values.""" if self.clip_to_range: diff --git a/dpsynth/local_mode/initialization.py b/dpsynth/local_mode/initialization.py index cf05f3d..1c1ab4c 100644 --- a/dpsynth/local_mode/initialization.py +++ b/dpsynth/local_mode/initialization.py @@ -17,7 +17,6 @@ from __future__ import annotations import dataclasses -from typing import TypeVar import dp_accounting from dpsynth import domain @@ -27,9 +26,6 @@ import numpy as np -_M = TypeVar('_M') - - @dataclasses.dataclass class ColumnMeasurement: """Result of running a column initializer on raw data. @@ -48,13 +44,6 @@ class ColumnMeasurement: measurement: mbi.LinearMeasurement | None = None -def _validate_mechanism(mechanism: _M | None) -> _M: - """Validates that the mechanism has been calibrated and returns it.""" - if mechanism is None: - raise ValueError('Must call calibrate() before using the mechanism.') - return mechanism - - @dataclasses.dataclass class NumericalInitializer(primitives.DPMechanism): """Mechanism that creates the data encoding transform for numerical data. @@ -79,25 +68,81 @@ def calibrate(self, *, zcdp_rho: float) -> NumericalInitializer: """Returns a copy calibrated to the given zCDP budget.""" mechanism = primitives.DPQuantiles( lower=self.attribute.min_value, - upper=self.attribute.max_value, + upper=self.attribute.exclusive_max_value, num_partitions=self.num_partitions, + # Infer from attribute, not data.dtype: NaN promotes int to float. + integer_jitter=self.attribute.dtype == 'int', ).calibrate(zcdp_rho=zcdp_rho) return dataclasses.replace(self, mechanism=mechanism) + @property + def _zcdp_rho(self) -> float: + """Total zCDP rho, derived as sum(eps_i^2 / 8) over composed events.""" + event = self.dp_event # raises if not calibrated + assert isinstance(event, dp_accounting.ComposedDpEvent) + return sum(e.epsilon**2 / 8.0 for e in event.events) + @property def dp_event(self) -> dp_accounting.DpEvent: """Returns the composed privacy event for the quantile computation.""" - return _validate_mechanism(self.mechanism).dp_event + mechanism = self.mechanism + if mechanism is None: + raise ValueError('Must call calibrate() before using the mechanism.') + return mechanism.dp_event def __call__( - self, rng: np.random.Generator, data: np.ndarray + self, + rng: np.random.Generator, + data: np.ndarray, + *, + estimated_total: float | None = None, ) -> ColumnMeasurement: - """Returns a ColumnMeasurement with the discretization transform.""" + """Returns a ColumnMeasurement with the discretization transform. + + Args: + rng: A numpy random number generator. + data: 1D array of numerical data. + estimated_total: If provided, a heuristic one-way measurement is included + assuming a uniform distribution over the original bins. + + Returns: + A ColumnMeasurement with bin edges and optionally a heuristic measurement. + """ # Dedup: concentrated data can make quantiles return duplicate edges. - edges = _validate_mechanism(self.mechanism)(rng, data) - bin_edges = np.unique(np.asarray(edges, dtype=float)) + mechanism = self.mechanism + if mechanism is None: + raise ValueError('Must call calibrate() before using the mechanism.') + raw_edges = np.asarray(mechanism(rng, data), dtype=float) + if self.attribute.dtype == 'int': + # Snap edges to the integer lattice. Bins are right-closed (left, + # right] and discretize uses searchsorted with side='left', so + # floor preserves the partition: edge 4.7 → floor 4 gives the + # same integer split {≤4} | {≥5} via (…, 4] | (4, …]. + raw_edges = np.floor(raw_edges) + bin_edges, edge_counts = np.unique(raw_edges, return_counts=True) + # Edge handling for integers (see initialization_test.py for coverage): + # For integer data with upper=max_value+1, edges can land at max_value + # after floor. Remove such edges and absorb their count into the last + # bin's weight so categorical_attribute_from_edges doesn't create a + # degenerate (max_value, max_value] tail bin. + max_val = self.attribute.max_value + if len(bin_edges) > 0 and bin_edges[-1] >= max_val: + tail_count = edge_counts[-1] + bin_edges = bin_edges[:-1] + edge_counts = edge_counts[:-1] + bin_weights = np.append(edge_counts, tail_count + 1) + else: + bin_weights = np.append(edge_counts, 1) cat_attr = vtx.categorical_attribute_from_edges(bin_edges, self.attribute) - return ColumnMeasurement(cat_attr, bin_edges) + + measurement = None + if estimated_total is not None: + uniform_counts = bin_weights * (estimated_total / self.num_partitions) + measurement = mbi.LinearMeasurement( + uniform_counts, (self.name,), stddev=1.0 / np.sqrt(self._zcdp_rho) + ) + + return ColumnMeasurement(cat_attr, bin_edges, measurement=measurement) @dataclasses.dataclass @@ -128,13 +173,18 @@ def calibrate(self, *, zcdp_rho: float) -> CategoricalInitializer: @property def dp_event(self) -> dp_accounting.DpEvent: """Returns the Gaussian privacy event for this mechanism.""" - return _validate_mechanism(self.mechanism).dp_event + mechanism = self.mechanism + if mechanism is None: + raise ValueError('Must call calibrate() before using the mechanism.') + return mechanism.dp_event def __call__( self, rng: np.random.Generator, data: np.ndarray ) -> ColumnMeasurement: """Returns a ColumnMeasurement with the noisy histogram.""" - mechanism = _validate_mechanism(self.mechanism) + mechanism = self.mechanism + if mechanism is None: + raise ValueError('Must call calibrate() before using the mechanism.') encoded = vtx.discrete_encode(data, self.attribute) noisy_counts = mechanism(rng, encoded) measurement = mbi.LinearMeasurement( @@ -176,13 +226,18 @@ def calibrate(self, *, zcdp_rho: float) -> OpenSetCategoricalInitializer: @property def dp_event(self) -> dp_accounting.DpEvent: """Returns the privacy event including thresholding delta.""" - return _validate_mechanism(self.mechanism).dp_event + mechanism = self.mechanism + if mechanism is None: + raise ValueError('Must call calibrate() before using the mechanism.') + return mechanism.dp_event def __call__( self, rng: np.random.Generator, data: np.ndarray ) -> ColumnMeasurement: """Returns a differentially private measurement of the given data.""" - mechanism = _validate_mechanism(self.mechanism) + mechanism = self.mechanism + if mechanism is None: + raise ValueError('Must call calibrate() before using the mechanism.') # Map raw values to integer partition IDs for thresholding. unique_values, inverse = np.unique(data, return_inverse=True) selected_ids, counts, _ = mechanism(rng, inverse) diff --git a/dpsynth/local_mode/primitives.py b/dpsynth/local_mode/primitives.py index 37b65eb..57e3676 100644 --- a/dpsynth/local_mode/primitives.py +++ b/dpsynth/local_mode/primitives.py @@ -110,6 +110,7 @@ def _median( upper: float, epsilon: float, jitter_multiple: float = 1e-4, + integer_jitter: bool = False, ) -> float: """Computes a differentially private median using the exponential mechanism. @@ -124,6 +125,8 @@ def _median( upper: Upper bound for the data. epsilon: Exponential mechanism privacy parameter. jitter_multiple: Multiplier for the jitter scale, relative to upper-lower. + integer_jitter: If True, use positive jitter U(0, 0.5) so that integer data + points are never pushed across an integer boundary before floor. Returns: A differentially private median estimate. @@ -139,10 +142,13 @@ def _median( return (lower + upper) / 2 return float(np.median(clamped_data)) - # Jitter size proportional to range. A small jitter makes duplicates unique - # and gives them non-zero length intervals, allowing them to be sampled. - jitter_scale = (upper - lower) * jitter_multiple - jitter = rng.uniform(-jitter_scale, jitter_scale, size=clamped_data.size) + # Jitter breaks ties, giving duplicate data points non-zero length intervals. + if integer_jitter: + # Positive jitter keeps floor() from shifting integers to a neighbor. + jitter = rng.uniform(0, 0.5, size=clamped_data.size) + else: + jitter_scale = (upper - lower) * jitter_multiple + jitter = rng.uniform(-jitter_scale, jitter_scale, size=clamped_data.size) jittered_data = np.clip(clamped_data + jitter, lower, upper) sorted_data = np.sort(jittered_data) @@ -203,6 +209,7 @@ def _quantiles( lower: float, upper: float, epsilon_levels: np.ndarray, + integer_jitter: bool = False, ) -> list[float]: """Computes uniformly spaced differentially private quantiles. @@ -217,6 +224,7 @@ def _quantiles( upper: Upper bound for the data. epsilon_levels: Per-level exponential mechanism epsilons, as returned by ``_quantile_epsilon_levels``. + integer_jitter: If True, use positive jitter U(0, 0.5) for integer data. Returns: A list of ``2 ** len(epsilon_levels) - 1`` sorted private quantile @@ -231,7 +239,14 @@ def quantiles_rec(current_data, curr_lower, curr_upper, current_depth): return [] eps = epsilon_levels[current_depth - 1] - med = _median(rng, current_data, curr_lower, curr_upper, eps) + med = _median( + rng, + current_data, + curr_lower, + curr_upper, + eps, + integer_jitter=integer_jitter, + ) left_mask = current_data <= med left_data = current_data[left_mask] @@ -267,62 +282,6 @@ def _get_threshold(delta, sigma, max_part): return thresholds.max() -def select_partitions_gaussian_thresholding( - rng: np.random.Generator, - data: np.ndarray, - gdp_budget: float, - delta: float, -) -> tuple[np.ndarray, np.ndarray, float]: - """Selects partitions using Gaussian Thresholding (Weighted Gaussian). - - This implements Algorithm 2 from the DP-SIPS paper (Swanberg et al., 2023) - under item-level DP. It is the simplest partition selection mechanism: - - 1. Compute the histogram of partition counts. - 2. Add Gaussian noise calibrated to the privacy budget. - 3. Return partitions whose noisy count exceeds a threshold chosen to - bound the false-positive probability per empty partition at delta. - - Under item-level DP each record is treated as a distinct user contributing - to exactly one partition, so the histogram has L2 sensitivity 1. The - threshold is T = 1 + sigma * Phi^{-1}(1 - delta), following the paper's - formula with max_part = 1. - - Args: - rng: A numpy random number generator. - data: 1D array of integers, where each element is a partition ID. - gdp_budget: Privacy budget in terms of squared Gaussian DP mu parameter - (gdp_budget = mu^2 = 1 / sigma^2). - delta: Failure probability (false positive bound per empty partition). - - Returns: - A tuple containing: - - selected_partitions: 1D array of partition IDs that passed the - threshold. - - estimated_counts: 1D array of noisy counts for each selected - partition. - - sigma: The standard deviation of the Gaussian noise added. - """ - if gdp_budget <= 0 or delta <= 0: - raise ValueError(f'{gdp_budget=} and {delta=} must be positive.') - - sigma = 1.0 / np.sqrt(gdp_budget) - - if data.size == 0: - return np.empty(0, dtype=data.dtype), np.empty(0, dtype=float), sigma - - unique_parts, counts = np.unique(data, return_counts=True) - noisy_counts = counts + rng.normal(scale=sigma, size=counts.size) - - # Threshold: ensures that an empty partition (true count 0) passes with - # probability at most delta. For max_part=1 this simplifies to: - # T = 1/sqrt(1) + sigma * Phi^{-1}(1 - delta) = 1 + sigma * ppf(1-delta) - threshold = 1.0 + sigma * scipy.stats.norm.ppf(1.0 - delta) - passed = noisy_counts >= threshold - - return unique_parts[passed], noisy_counts[passed], sigma - - def _select_partitions_sips( rng: np.random.Generator, data: np.ndarray, @@ -429,32 +388,6 @@ def _select_partitions_sips( return selected_partitions, selected_counts, max_sigma -def _gaussian_histogram( - rng: np.random.Generator, - data: np.ndarray, - domain_size: int, - sigma: float, -) -> np.ndarray: - """Computes a noisy histogram over a closed domain using the Gaussian mechanism. - - The histogram query has L2 sensitivity 1 under item-level DP (each record - contributes +1 to exactly one bin). Gaussian noise with the given standard - deviation is added independently to each bin count. - - Args: - rng: A numpy random number generator. - data: 1D array of integer-encoded categorical values in [0, domain_size). - domain_size: Number of categories in the closed domain. - sigma: Standard deviation of the Gaussian noise added to each bin. - - Returns: - A length-`domain_size` array of noisy counts. - """ - return np.bincount(data, minlength=domain_size) + rng.normal( - scale=sigma, size=domain_size - ) - - # --------------------------------------------------------------------------- # DPMechanism subclasses # --------------------------------------------------------------------------- @@ -471,11 +404,13 @@ class DPQuantiles(DPMechanism): lower: Lower bound for the data domain. upper: Upper bound for the data domain. num_partitions: Number of partitions (must be a power of 2). + integer_jitter: If True, use positive jitter U(0, 0.5) for integer data. """ lower: float upper: float num_partitions: int + integer_jitter: bool = False _epsilon_levels: Sequence[float] | None = dataclasses.field( default=None, repr=False ) @@ -527,7 +462,12 @@ def __call__(self, rng: np.random.Generator, data: np.ndarray) -> list[float]: if self._epsilon_levels is None: raise ValueError(_UNCALIBRATED_MSG.format(param='_epsilon_levels')) return _quantiles( - rng, data, self.lower, self.upper, np.asarray(self._epsilon_levels) + rng, + data, + self.lower, + self.upper, + np.asarray(self._epsilon_levels), + integer_jitter=self.integer_jitter, ) @@ -561,7 +501,48 @@ def __call__(self, rng: np.random.Generator, data: np.ndarray) -> np.ndarray: """Computes a differentially private histogram.""" if self.sigma is None: raise ValueError(_UNCALIBRATED_MSG.format(param='sigma')) - return _gaussian_histogram(rng, data, self.domain_size, self.sigma) + return np.bincount(data, minlength=self.domain_size) + rng.normal( + scale=self.sigma, size=self.domain_size + ) + + +@dataclasses.dataclass +class DPGaussianCount(DPMechanism): + """Differentially private count via the Gaussian mechanism. + + Returns a noisy count of the number of records in the input data. The + conversion from zCDP is ``sigma = sqrt(0.5 / zcdp_rho)``. + + Attributes: + sigma: Gaussian noise standard deviation. Set directly or via ``calibrate``. + """ + + sigma: float | None = None + + def calibrate(self, *, zcdp_rho: float) -> DPGaussianCount: + """Returns a copy with sigma derived from the zCDP budget.""" + return dataclasses.replace(self, sigma=math.sqrt(0.5 / zcdp_rho)) + + @property + def dp_event(self) -> dp_accounting.DpEvent: + """Returns the Gaussian privacy event for this mechanism.""" + if self.sigma is None: + raise ValueError(_UNCALIBRATED_MSG.format(param='sigma')) + return dp_accounting.GaussianDpEvent(noise_multiplier=self.sigma) + + def __call__(self, rng: np.random.Generator, data: np.ndarray) -> float: + """Returns a differentially private count of records. + + Args: + rng: A numpy random number generator. + data: 1D array of data records. + + Returns: + The noisy count (true count + Gaussian noise). + """ + if self.sigma is None: + raise ValueError(_UNCALIBRATED_MSG.format(param='sigma')) + return float(len(data) + rng.normal(scale=self.sigma)) @dataclasses.dataclass @@ -607,7 +588,11 @@ def __call__( """ if self.sigma is None: raise ValueError(_UNCALIBRATED_MSG.format(param='sigma')) - gdp_budget = np.inf if self.sigma == 0.0 else 1.0 / (self.sigma**2) - return select_partitions_gaussian_thresholding( - rng, data, gdp_budget, self.delta - ) + if data.size == 0: + return np.empty(0, dtype=data.dtype), np.empty(0, dtype=float), self.sigma + unique_parts, counts = np.unique(data, return_counts=True) + noisy_counts = counts + rng.normal(scale=self.sigma, size=counts.size) + # Threshold ensures an empty partition passes with probability <= delta. + threshold = 1.0 + self.sigma * scipy.stats.norm.ppf(1.0 - self.delta) + passed = noisy_counts >= threshold + return unique_parts[passed], noisy_counts[passed], self.sigma diff --git a/dpsynth/local_mode/vectorized_transformations.py b/dpsynth/local_mode/vectorized_transformations.py index 792ce84..b49445e 100644 --- a/dpsynth/local_mode/vectorized_transformations.py +++ b/dpsynth/local_mode/vectorized_transformations.py @@ -102,7 +102,11 @@ def categorical_attribute_from_edges( """ min_, max_ = attribute_domain.exclusive_min_value, attribute_domain.max_value full_edges = np.r_[min_, bin_edges, max_] - intervals = [f'({l}, {r}]' for l, r in zip(full_edges[:-1], full_edges[1:])] + if attribute_domain.dtype == 'int': + e = full_edges.astype(int) + intervals = [f'[{l+1}, {r}]' for l, r in zip(e[:-1], e[1:])] + else: + intervals = [f'({l}, {r}]' for l, r in zip(full_edges[:-1], full_edges[1:])] if not attribute_domain.clip_to_range: intervals = ['OUT_OF_DOMAIN'] + intervals return domain.CategoricalAttribute(intervals) diff --git a/tests/data_generation_v3_test.py b/tests/data_generation_v3_test.py index ba979e7..e1dd803 100644 --- a/tests/data_generation_v3_test.py +++ b/tests/data_generation_v3_test.py @@ -147,6 +147,38 @@ def test_calibrate_small_epsilon(self): self.assertIsInstance(synthetic_df, pd.DataFrame) self.assertListEqual(synthetic_df.columns.tolist(), ['A', 'B']) + def test_numerical_only_uses_dp_count(self): + """Numerical-only domains should allocate a DPGaussianCount for total.""" + domains = { + 'A': domain.NumericalAttribute(min_value=0, max_value=10), + 'B': domain.NumericalAttribute(min_value=-10, max_value=10), + } + df = pd.DataFrame({'A': [5, 5, 0], 'B': [5, -10, -5]}, dtype=float) + rng = np.random.default_rng(0) + calibrated = DataGenerationV3(domains=domains).calibrate(zcdp_rho=100.0) + + # total_count_mechanism should be set for numerical-only domains. + self.assertIsNotNone(calibrated.total_count_mechanism) + synthetic_df = calibrated(rng, df) + self.assertListEqual(synthetic_df.columns.tolist(), ['A', 'B']) + + def test_mixed_domain_no_dp_count(self): + """Mixed domains estimate total from categorical measurements.""" + domains = { + 'A': domain.CategoricalAttribute( + possible_values=['a', 'b', 'c'], out_of_domain_index=0 + ), + 'B': domain.NumericalAttribute(min_value=0, max_value=10), + } + df = pd.DataFrame({'A': ['a', 'b', 'c'], 'B': [1.0, 5.0, 10.0]}) + rng = np.random.default_rng(0) + calibrated = DataGenerationV3(domains=domains).calibrate(zcdp_rho=100.0) + + # No DPGaussianCount needed — total comes from categorical measurement. + self.assertIsNone(calibrated.total_count_mechanism) + synthetic_df = calibrated(rng, df) + self.assertListEqual(synthetic_df.columns.tolist(), ['A', 'B']) + if __name__ == '__main__': absltest.main() diff --git a/tests/local_mode/initialization_test.py b/tests/local_mode/initialization_test.py index 8a8c8ab..e3e9962 100644 --- a/tests/local_mode/initialization_test.py +++ b/tests/local_mode/initialization_test.py @@ -13,6 +13,7 @@ # limitations under the License. from absl.testing import absltest +from absl.testing import parameterized import dp_accounting from dpsynth import domain from dpsynth.local_mode import initialization @@ -96,6 +97,267 @@ def test_numerical_initializer_integer_data(self): # Domain size may be < 8 due to dedup, but must be >= 2. self.assertGreaterEqual(result.categorical_attribute.size, 2) + def test_numerical_initializer_integer_edges_are_floored(self): + """Integer attributes should produce integer-valued bin edges.""" + attr = domain.NumericalAttribute(min_value=0, max_value=100, dtype='int') + rng = np.random.default_rng(42) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=4, attribute=attr + ) + data = np.arange(100) + result = initializer.calibrate(zcdp_rho=100.0)(rng, data) + # All edges should be integers (floor was applied). + np.testing.assert_array_equal(result.bin_edges, np.floor(result.bin_edges)) + # Edges must be within [min_value, max_value - 1]. + self.assertGreaterEqual(result.bin_edges[0], 0) + self.assertLess(result.bin_edges[-1], 100) + + def test_numerical_initializer_measurement_with_merged_bins(self): + """When integer edges collapse, merged bins get proportionally more mass.""" + attr = domain.NumericalAttribute(min_value=0, max_value=100, dtype='int') + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=8, attribute=attr + ) + # Concentrated data will cause edge collisions after floor. + data = np.array([50] * 100 + [1, 99]) + result = initializer.calibrate(zcdp_rho=1.0)( + rng, data, estimated_total=100.0 + ) + self.assertIsNotNone(result.measurement) + # Measurement counts should sum to estimated_total. + np.testing.assert_allclose( + result.measurement.noisy_measurement.sum(), 100.0 + ) + + def test_numerical_initializer_measurement_with_estimated_total(self): + attr = domain.NumericalAttribute(min_value=0, max_value=10) + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='num_col', num_partitions=4, attribute=attr + ) + data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]) + result = initializer.calibrate(zcdp_rho=1.0)( + rng, data, estimated_total=100.0 + ) + + self.assertIsNotNone(result.measurement) + # Measurement should be uniform: 100.0 / num_bins for each bin. + num_bins = result.categorical_attribute.size + expected_count = 100.0 / num_bins + np.testing.assert_allclose( + result.measurement.noisy_measurement, + np.full(num_bins, expected_count), + ) + self.assertEqual(result.measurement.clique, ('num_col',)) + # stddev should be 1/sqrt(rho) = 1/sqrt(1.0) = 1.0 + self.assertAlmostEqual(result.measurement.stddev, 1.0) + + def test_numerical_initializer_no_measurement_without_estimated_total(self): + attr = domain.NumericalAttribute(min_value=0, max_value=10) + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=4, attribute=attr + ) + data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]) + result = initializer.calibrate(zcdp_rho=1.0)(rng, data) + self.assertIsNone(result.measurement) + + def test_integer_edges_at_max_value_absorbed_into_last_bin(self): + """Edges at max_value are removed; their count goes to the last bin.""" + attr = domain.NumericalAttribute(min_value=0, max_value=10, dtype='int') + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=8, attribute=attr + ) + # All data at max_value: all edges should land at or near max_value. + data = np.array([10] * 100) + result = initializer.calibrate(zcdp_rho=100.0)( + rng, data, estimated_total=100.0 + ) + # No edge should equal max_value (they get absorbed). + if len(result.bin_edges) > 0: + self.assertLess(result.bin_edges[-1], 10) + # Measurement counts must still sum to estimated_total. + np.testing.assert_allclose( + result.measurement.noisy_measurement.sum(), 100.0 + ) + # The last bin (containing max_value=10) should get the most mass. + counts = result.measurement.noisy_measurement + self.assertEqual(np.argmax(counts), len(counts) - 1) + + def test_bin_weights_sum_to_num_partitions(self): + """bin_weights must always sum to num_partitions regardless of dedup.""" + attr = domain.NumericalAttribute(min_value=0, max_value=20, dtype='int') + for seed in range(10): + rng = np.random.default_rng(seed) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=8, attribute=attr + ) + data = np.array([5] * 50 + [15] * 50) + result = initializer.calibrate(zcdp_rho=1.0)( + rng, data, estimated_total=100.0 + ) + # Sum of measurement counts = estimated_total = num_partitions * per_bin. + np.testing.assert_allclose( + result.measurement.noisy_measurement.sum(), + 100.0, + err_msg=f'seed={seed}: counts do not sum to estimated_total', + ) + + def test_integer_jitter_prevents_spurious_splits(self): + """Positive jitter should prevent edges from splitting across integers.""" + attr = domain.NumericalAttribute(min_value=0, max_value=100, dtype='int') + rng = np.random.default_rng(42) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=4, attribute=attr + ) + # Uniform data: with high budget, edges should land at 25, 50, 75. + data = np.arange(101) + result = initializer.calibrate(zcdp_rho=1000.0)(rng, data) + # With 4 partitions and 3 edges, no dedup should be needed. + self.assertLen(result.bin_edges, 3) + # All edges should be integers. + np.testing.assert_array_equal(result.bin_edges, np.floor(result.bin_edges)) + + def test_integer_heterogeneous_data_buckets(self): + """Heterogeneous integer data produces sensible bucket partitioning.""" + attr = domain.NumericalAttribute(min_value=0, max_value=10, dtype='int') + rng = np.random.default_rng(42) + initializer = initialization.NumericalInitializer( + name='x', num_partitions=4, attribute=attr + ) + # Deliberately lumpy distribution: 45 points across 4 distinct values. + data = np.array([0] * 10 + [3] * 10 + [5] * 17 + [6] * 8) + result = initializer.calibrate(zcdp_rho=np.inf)( + rng, data, estimated_total=len(data) + ) + # Edges should be integers strictly inside [min_value, max_value). + for e in result.bin_edges: + self.assertEqual(e, int(e), f'edge {e} is not an integer') + self.assertGreaterEqual(e, 0) + self.assertLess(e, 10) + + # Discretize and verify every data point is assigned. + encoded = vtx.discretize(data, result.bin_edges, attr) + self.assertEqual(encoded.shape, data.shape) + num_bins = result.categorical_attribute.size + true_counts = np.bincount(encoded, minlength=num_bins) + self.assertEqual(true_counts.sum(), len(data)) + # Bins covering the data range should be non-empty; trailing bins + # beyond the data (e.g. [7, 10] when max data is 6) may be empty. + self.assertGreater( + np.count_nonzero(true_counts), + 1, + f'too few occupied bins: counts={true_counts},' + f' edges={result.bin_edges}', + ) + + +class MeasurementApproximationTest(parameterized.TestCase): + """Property test: measurement counts ≈ true histogram at high budget. + + With high privacy budget the quantile algorithm creates near-equal-mass + bins, so the heuristic uniform measurement (estimated_total / num_bins per + bin, scaled by bin_weight) should approximate the true discretized counts. + """ + + @parameterized.named_parameters( + # Good cases: measurement should closely approximate true histogram. + dict( + testcase_name='uniform_int', + attr=domain.NumericalAttribute( + min_value=0, max_value=50, dtype='int' + ), + data=np.arange(51), + num_partitions=4, + max_l1=0.5, + ), + dict( + testcase_name='heterogeneous_int', + attr=domain.NumericalAttribute( + min_value=0, max_value=10, dtype='int' + ), + data=np.array([0] * 10 + [3] * 10 + [5] * 17 + [6] * 8 + [9] * 5), + num_partitions=4, + max_l1=1.0, + ), + dict( + testcase_name='uniform_float', + attr=domain.NumericalAttribute(min_value=0, max_value=100), + data=np.linspace(0, 100, 200), + num_partitions=8, + max_l1=0.3, + ), + dict( + testcase_name='boundary_heavy_int', + attr=domain.NumericalAttribute( + min_value=0, max_value=100, dtype='int' + ), + data=np.array([0] * 30 + [100] * 30 + [50] * 40), + num_partitions=4, + max_l1=1.0, + ), + # Hard cases: uniform heuristic is intentionally weak here. + # Empty bins between modes cause high L1, documenting the limitation. + dict( + testcase_name='bimodal_int', + attr=domain.NumericalAttribute( + min_value=0, max_value=100, dtype='int' + ), + data=np.array([10] * 50 + [90] * 50), + num_partitions=8, + max_l1=1.5, + ), + dict( + testcase_name='sparse_int', + attr=domain.NumericalAttribute( + min_value=0, max_value=1000, dtype='int' + ), + data=np.array([1] * 40 + [500] * 10 + [999] * 50), + num_partitions=4, + max_l1=1.5, + ), + ) + def test_measurement_approximates_true_histogram( + self, attr, data, num_partitions, max_l1 + ): + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='x', num_partitions=num_partitions, attribute=attr + ) + result = initializer.calibrate(zcdp_rho=1000.0)( + rng, data, estimated_total=len(data) + ) + # -- Structural checks (must always hold) -- + num_bins = result.categorical_attribute.size + self.assertGreaterEqual(num_bins, 2) + measurement = result.measurement + self.assertIsNotNone(measurement) + # Measurement counts must sum to estimated_total. + np.testing.assert_allclose(measurement.noisy_measurement.sum(), len(data)) + # All measurement counts should be positive. + self.assertTrue( + np.all(measurement.noisy_measurement > 0), + f'non-positive measurement counts: {measurement.noisy_measurement}', + ) + # -- Statistical approximation check -- + # Discretize data with the returned bin edges and compute true counts. + encoded = vtx.discretize(data, result.bin_edges, attr) + true_counts = np.bincount(encoded, minlength=num_bins).astype(float) + meas_counts = measurement.noisy_measurement + # Normalize both to probability vectors and check L1 distance. + true_prob = true_counts / true_counts.sum() + meas_prob = meas_counts / meas_counts.sum() + l1_dist = np.abs(true_prob - meas_prob).sum() + self.assertLess( + l1_dist, + max_l1, + f'Measurement too far from true histogram (L1={l1_dist:.3f}):\n' + f' true_counts = {true_counts}\n' + f' meas_counts = {meas_counts}', + ) + class CategoricalInitializerTest(absltest.TestCase): diff --git a/tests/local_mode/primitives_test.py b/tests/local_mode/primitives_test.py index 5257c68..cd10548 100644 --- a/tests/local_mode/primitives_test.py +++ b/tests/local_mode/primitives_test.py @@ -217,59 +217,43 @@ def setUp(self): def test_basic_operation(self): data = np.array([1] * 50 + [2] * 5) - selected, counts, sigma = ( - primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=10.0, delta=1e-5 - ) + mech = primitives.DPPartitionSelection( + delta=1e-5, sigma=1.0 / np.sqrt(10.0) ) + selected, counts, sigma = mech(self.rng, data) self.assertIn(1, selected) self.assertEqual(sigma, 1.0 / np.sqrt(10.0)) self.assertEqual(selected.size, counts.size) def test_empty_data(self): data = np.array([], dtype=int) - selected, counts, sigma = ( - primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=1.0, delta=1e-5 - ) - ) + mech = primitives.DPPartitionSelection(delta=1e-5, sigma=1.0) + selected, counts, sigma = mech(self.rng, data) self.assertEmpty(selected) self.assertEmpty(counts) self.assertEqual(sigma, 1.0) def test_high_budget_selects_all(self): data = np.array([1, 2, 3, 4, 5]) - selected, _, _ = primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=np.inf, delta=0.1 - ) + mech = primitives.DPPartitionSelection(delta=0.1, sigma=0.0) + selected, _, _ = mech(self.rng, data) self.assertCountEqual(selected, [1, 2, 3, 4, 5]) - def test_zero_budget_raises(self): - data = np.array([1, 2, 3]) - with self.assertRaises(ValueError): - primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=-0.1, delta=1e-5 - ) - with self.assertRaises(ValueError): - primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=1.0, delta=-0.001 - ) - def test_rare_items_not_selected(self): # One item with many occurrences, another with just 1. # With moderate budget and tight delta, the rare item should be dropped. data = np.array([1] * 100 + [2]) - selected, _, _ = primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=0.5, delta=1e-6 - ) + mech = primitives.DPPartitionSelection(delta=1e-6, sigma=1.0 / np.sqrt(0.5)) + selected, _, _ = mech(self.rng, data) self.assertIn(1, selected) self.assertNotIn(2, selected) def test_string_data_type(self): data = np.array(["a", "b", "a", "a", "c", "a", "c"]) - selected, _, _ = primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=10.0, delta=1e-5 + mech = primitives.DPPartitionSelection( + delta=1e-5, sigma=1.0 / np.sqrt(10.0) ) + selected, _, _ = mech(self.rng, data) self.assertTrue(all(isinstance(p, str) for p in selected)) @@ -281,19 +265,22 @@ def setUp(self): def test_basic_operation(self): data = np.array([0, 0, 1, 1, 1, 2]) - result = primitives._gaussian_histogram(self.rng, data, 4, sigma=1.0) + mech = primitives.DPGaussianHistogram(domain_size=4, sigma=1.0) + result = mech(self.rng, data) self.assertLen(result, 4) # Noisy counts should be close to true counts [2, 3, 1, 0]. np.testing.assert_allclose(result, [2, 3, 1, 0], atol=5.0) def test_zero_sigma(self): data = np.array([0, 0, 1, 2, 2, 2]) - result = primitives._gaussian_histogram(self.rng, data, 3, sigma=0.0) + mech = primitives.DPGaussianHistogram(domain_size=3, sigma=0.0) + result = mech(self.rng, data) np.testing.assert_array_equal(result, [2, 1, 3]) def test_empty_data(self): data = np.array([], dtype=int) - result = primitives._gaussian_histogram(self.rng, data, 3, sigma=1.0) + mech = primitives.DPGaussianHistogram(domain_size=3, sigma=1.0) + result = mech(self.rng, data) self.assertLen(result, 3) @@ -391,5 +378,36 @@ def test_dp_event_type(self): self.assertAlmostEqual(event.noise_multiplier, 1.0) +class DPGaussianCountTest(absltest.TestCase): + + def setUp(self): + super().setUp() + self.rng = np.random.default_rng(42) + + def test_calibrate_and_call(self): + mech = primitives.DPGaussianCount() + calibrated = mech.calibrate(zcdp_rho=0.5) + data = np.array([1, 2, 3, 4, 5]) + result = calibrated(self.rng, data) + self.assertIsInstance(result, float) + np.testing.assert_allclose(result, 5.0, atol=5.0) + + def test_zero_sigma_returns_exact_count(self): + mech = primitives.DPGaussianCount(sigma=0.0) + data = np.array([10, 20, 30]) + self.assertEqual(mech(self.rng, data), 3.0) + + def test_dp_event_raises_before_calibration(self): + mech = primitives.DPGaussianCount() + with self.assertRaises(ValueError): + _ = mech.dp_event + + def test_dp_event_type(self): + mech = primitives.DPGaussianCount().calibrate(zcdp_rho=0.5) + event = mech.dp_event + self.assertIsInstance(event, dp_accounting.GaussianDpEvent) + self.assertAlmostEqual(event.noise_multiplier, 1.0) + + if __name__ == "__main__": absltest.main()