Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 97 additions & 16 deletions dpsynth/data_generation_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from dpsynth.local_mode import primitives
from dpsynth.local_mode import vectorized_transformations as vtx
import mbi
from mbi import estimation as mbi_estimation
import numpy as np
import pandas as pd

Expand All @@ -36,7 +37,10 @@ def _create_initializers(
domains: Mapping[str, domain.AttributeType],
numerical_bins: int,
init_delta: float,
) -> dict[str, primitives.DPMechanism]:
) -> tuple[
dict[str, primitives.DPMechanism],
dict[str, primitives.DPMechanism],
]:
"""Creates per-column initializers from the domain specification.

Args:
Expand All @@ -45,30 +49,31 @@ def _create_initializers(
init_delta: Delta for open-set categorical partition selection.

Returns:
A dictionary mapping column names to uncalibrated initializer instances.
A (closed_domain_initializers, numerical_initializers) tuple.

Raises:
ValueError: If a column has an unsupported attribute type.
"""
initializers = {}
closed_inits = {}
numerical_inits = {}
for col, attr in domains.items():
if isinstance(attr, domain.NumericalAttribute):
initializers[col] = initialization.NumericalInitializer(
numerical_inits[col] = initialization.NumericalInitializer(
name=col, num_partitions=numerical_bins, attribute=attr
)
elif isinstance(attr, domain.CategoricalAttribute):
initializers[col] = initialization.CategoricalInitializer(
closed_inits[col] = initialization.CategoricalInitializer(
name=col, attribute=attr
)
elif isinstance(attr, domain.OpenSetCategoricalAttribute):
initializers[col] = initialization.OpenSetCategoricalInitializer(
closed_inits[col] = initialization.OpenSetCategoricalInitializer(
name=col, attribute=attr, delta=init_delta
)
else:
raise ValueError(
f'Unsupported attribute type for column {col!r}: {type(attr)}'
)
return initializers
return closed_inits, numerical_inits


@dataclasses.dataclass
Expand Down Expand Up @@ -100,6 +105,7 @@ class DataGenerationV3(primitives.DPMechanism):
default_factory=discrete_mechanisms.MSTMechanism
)
initializers: dict[str, primitives.DPMechanism] | None = None
total_count_mechanism: primitives.DPGaussianCount | None = None
cross_attribute_constraints: Sequence[constraints.Constraint] = ()

def calibrate(
Expand Down Expand Up @@ -178,22 +184,49 @@ def _calibrate_zcdp(
self, zcdp_rho, numerical_bins, init_delta, init_budget_fraction
):
"""Simple additive zCDP budget split."""
inits = self.initializers or _create_initializers(
self.domains, numerical_bins, init_delta
if self.initializers is not None:
all_inits = dict(self.initializers)
has_closed_domain = any(
not isinstance(init, initialization.NumericalInitializer)
for init in all_inits.values()
)
else:
closed_inits, numerical_inits = _create_initializers(
self.domains, numerical_bins, init_delta
)
all_inits = {**closed_inits, **numerical_inits}
has_closed_domain = bool(closed_inits)

has_numerical = any(
isinstance(init, initialization.NumericalInitializer)
for init in all_inits.values()
)
needs_total_count = has_numerical and not has_closed_domain

init_rho = init_budget_fraction * zcdp_rho
per_col_rho = init_rho / len(inits)
# If we need a separate total count and have no closed-domain columns to
# estimate it from, allocate one extra share of init budget for it.
num_shares = len(all_inits) + (1 if needs_total_count else 0)
per_col_rho = init_rho / num_shares
discrete_rho = zcdp_rho - init_rho

calibrated_inits = {
col: init.calibrate(zcdp_rho=per_col_rho) for col, init in inits.items()
col: init.calibrate(zcdp_rho=per_col_rho)
for col, init in all_inits.items()
}
calibrated_total = (
primitives.DPGaussianCount().calibrate(zcdp_rho=per_col_rho)
if needs_total_count
else None
)
calibrated_discrete = self.discrete_mechanism.calibrate(
zcdp_rho=discrete_rho
)
return dataclasses.replace(
self,
initializers=calibrated_inits,
discrete_mechanism=calibrated_discrete,
total_count_mechanism=calibrated_total,
)

def _calibrate_approx_dp(
Expand Down Expand Up @@ -226,10 +259,24 @@ def _calibrate_approx_dp(
Returns:
A new DataGenerationV3 instance with calibrated sub-mechanisms.
"""
inits = self.initializers or _create_initializers(
self.domains, numerical_bins, init_delta
if self.initializers is not None:
inits = dict(self.initializers)
has_closed_domain = any(
not isinstance(init, initialization.NumericalInitializer)
for init in inits.values()
)
else:
closed_inits, numerical_inits = _create_initializers(
self.domains, numerical_bins, init_delta
)
inits = {**closed_inits, **numerical_inits}
has_closed_domain = bool(closed_inits)
has_numerical = any(
isinstance(init, initialization.NumericalInitializer)
for init in inits.values()
)
num_columns = len(inits)
needs_total_count = has_numerical and not has_closed_domain
num_columns = len(inits) + (1 if needs_total_count else 0)

# Stage 1: Convert (epsilon, remaining_delta) to zCDP and calibrate
# initializers with init_budget_fraction of that budget.
Expand All @@ -244,11 +291,17 @@ def _calibrate_approx_dp(
calibrated_inits = {
col: init.calibrate(zcdp_rho=per_col_rho) for col, init in inits.items()
}

calibrated_total = (
primitives.DPGaussianCount().calibrate(zcdp_rho=per_col_rho)
if needs_total_count
else None
)
# Stage 2: With init dp_events fixed, find the tightest discrete budget.
# The accountant handles ApproximateDpEvent deltas from open-set
# initializers automatically.
init_events = [init.dp_event for init in calibrated_inits.values()]
if calibrated_total is not None:
init_events.append(calibrated_total.dp_event)

# Determine accountant type based on discrete mechanism's dp_event.
probe_event = self.discrete_mechanism.calibrate(zcdp_rho=1.0).dp_event
Expand Down Expand Up @@ -277,6 +330,7 @@ def make_event_from_param(discrete_rho):
self,
initializers=calibrated_inits,
discrete_mechanism=calibrated_discrete,
total_count_mechanism=calibrated_total,
)

@property
Expand All @@ -292,6 +346,8 @@ def dp_event(self) -> dp_accounting.DpEvent:
if self.initializers is None:
raise ValueError('Must call calibrate() before accessing dp_event.')
events = [init.dp_event for init in self.initializers.values()]
if self.total_count_mechanism is not None:
events.append(self.total_count_mechanism.dp_event)
events.append(self.discrete_mechanism.dp_event)
return dp_accounting.ComposedDpEvent(events)

Expand Down Expand Up @@ -321,9 +377,34 @@ def __call__(
)

# Phase 1: Per-column initialization.
# Initialize closed-domain columns first to estimate the total count,
# then pass it to numerical initializers for heuristic measurements.
col_results: dict[str, initialization.ColumnMeasurement] = {}
closed_domain_measurements = []
for col, init in self.initializers.items():
col_results[col] = init(rng, data[col].values)
if not isinstance(init, initialization.NumericalInitializer):
col_results[col] = init(rng, data[col].values)
if col_results[col].measurement is not None:
closed_domain_measurements.append(col_results[col].measurement)

# Estimate total from closed-domain measurements or DPGaussianCount.
estimated_total = None
if closed_domain_measurements:
estimated_total = mbi_estimation.minimum_variance_unbiased_total(
closed_domain_measurements
)
elif self.total_count_mechanism is not None:
# Pick an arbitrary column to count records.
any_col = next(iter(self.domains))
estimated_total = max(
1.0, self.total_count_mechanism(rng, data[any_col].values)
)

for col, init in self.initializers.items():
if isinstance(init, initialization.NumericalInitializer):
col_results[col] = init(
rng, data[col].values, estimated_total=estimated_total
)

# Phase 2: Encode data to discrete domain.
discrete_domains = {}
Expand Down
7 changes: 7 additions & 0 deletions dpsynth/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ def exclusive_min_value(self) -> float:
return self.min_value - 1
return math.nextafter(self.min_value, -math.inf)

@property
def exclusive_max_value(self) -> float:
"""Returns the exclusive maximum value for this attribute."""
if self.dtype == 'int':
return self.max_value + 1
return math.nextafter(self.max_value, math.inf)

def standardize(self, value: Any) -> int | float | None:
"""Standardizes a value to one of the possible values."""
if self.clip_to_range:
Expand Down
99 changes: 77 additions & 22 deletions dpsynth/local_mode/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from __future__ import annotations

import dataclasses
from typing import TypeVar

import dp_accounting
from dpsynth import domain
Expand All @@ -27,9 +26,6 @@
import numpy as np


_M = TypeVar('_M')


@dataclasses.dataclass
class ColumnMeasurement:
"""Result of running a column initializer on raw data.
Expand All @@ -48,13 +44,6 @@ class ColumnMeasurement:
measurement: mbi.LinearMeasurement | None = None


def _validate_mechanism(mechanism: _M | None) -> _M:
"""Validates that the mechanism has been calibrated and returns it."""
if mechanism is None:
raise ValueError('Must call calibrate() before using the mechanism.')
return mechanism


@dataclasses.dataclass
class NumericalInitializer(primitives.DPMechanism):
"""Mechanism that creates the data encoding transform for numerical data.
Expand All @@ -79,25 +68,81 @@ def calibrate(self, *, zcdp_rho: float) -> NumericalInitializer:
"""Returns a copy calibrated to the given zCDP budget."""
mechanism = primitives.DPQuantiles(
lower=self.attribute.min_value,
upper=self.attribute.max_value,
upper=self.attribute.exclusive_max_value,
num_partitions=self.num_partitions,
# Infer from attribute, not data.dtype: NaN promotes int to float.
integer_jitter=self.attribute.dtype == 'int',
).calibrate(zcdp_rho=zcdp_rho)
return dataclasses.replace(self, mechanism=mechanism)

@property
def _zcdp_rho(self) -> float:
"""Total zCDP rho, derived as sum(eps_i^2 / 8) over composed events."""
event = self.dp_event # raises if not calibrated
assert isinstance(event, dp_accounting.ComposedDpEvent)
return sum(e.epsilon**2 / 8.0 for e in event.events)

@property
def dp_event(self) -> dp_accounting.DpEvent:
"""Returns the composed privacy event for the quantile computation."""
return _validate_mechanism(self.mechanism).dp_event
mechanism = self.mechanism
if mechanism is None:
raise ValueError('Must call calibrate() before using the mechanism.')
return mechanism.dp_event

def __call__(
self, rng: np.random.Generator, data: np.ndarray
self,
rng: np.random.Generator,
data: np.ndarray,
*,
estimated_total: float | None = None,
) -> ColumnMeasurement:
"""Returns a ColumnMeasurement with the discretization transform."""
"""Returns a ColumnMeasurement with the discretization transform.

Args:
rng: A numpy random number generator.
data: 1D array of numerical data.
estimated_total: If provided, a heuristic one-way measurement is included
assuming a uniform distribution over the original bins.

Returns:
A ColumnMeasurement with bin edges and optionally a heuristic measurement.
"""
# Dedup: concentrated data can make quantiles return duplicate edges.
edges = _validate_mechanism(self.mechanism)(rng, data)
bin_edges = np.unique(np.asarray(edges, dtype=float))
mechanism = self.mechanism
if mechanism is None:
raise ValueError('Must call calibrate() before using the mechanism.')
raw_edges = np.asarray(mechanism(rng, data), dtype=float)
if self.attribute.dtype == 'int':
# Snap edges to the integer lattice. Bins are right-closed (left,
# right] and discretize uses searchsorted with side='left', so
# floor preserves the partition: edge 4.7 → floor 4 gives the
# same integer split {≤4} | {≥5} via (…, 4] | (4, …].
raw_edges = np.floor(raw_edges)
bin_edges, edge_counts = np.unique(raw_edges, return_counts=True)
# Edge handling for integers (see initialization_test.py for coverage):
# For integer data with upper=max_value+1, edges can land at max_value
# after floor. Remove such edges and absorb their count into the last
# bin's weight so categorical_attribute_from_edges doesn't create a
# degenerate (max_value, max_value] tail bin.
max_val = self.attribute.max_value
if len(bin_edges) > 0 and bin_edges[-1] >= max_val:
tail_count = edge_counts[-1]
bin_edges = bin_edges[:-1]
edge_counts = edge_counts[:-1]
bin_weights = np.append(edge_counts, tail_count + 1)
else:
bin_weights = np.append(edge_counts, 1)
cat_attr = vtx.categorical_attribute_from_edges(bin_edges, self.attribute)
return ColumnMeasurement(cat_attr, bin_edges)

measurement = None
if estimated_total is not None:
uniform_counts = bin_weights * (estimated_total / self.num_partitions)
measurement = mbi.LinearMeasurement(
uniform_counts, (self.name,), stddev=1.0 / np.sqrt(self._zcdp_rho)
)

return ColumnMeasurement(cat_attr, bin_edges, measurement=measurement)


@dataclasses.dataclass
Expand Down Expand Up @@ -128,13 +173,18 @@ def calibrate(self, *, zcdp_rho: float) -> CategoricalInitializer:
@property
def dp_event(self) -> dp_accounting.DpEvent:
"""Returns the Gaussian privacy event for this mechanism."""
return _validate_mechanism(self.mechanism).dp_event
mechanism = self.mechanism
if mechanism is None:
raise ValueError('Must call calibrate() before using the mechanism.')
return mechanism.dp_event

def __call__(
self, rng: np.random.Generator, data: np.ndarray
) -> ColumnMeasurement:
"""Returns a ColumnMeasurement with the noisy histogram."""
mechanism = _validate_mechanism(self.mechanism)
mechanism = self.mechanism
if mechanism is None:
raise ValueError('Must call calibrate() before using the mechanism.')
encoded = vtx.discrete_encode(data, self.attribute)
noisy_counts = mechanism(rng, encoded)
measurement = mbi.LinearMeasurement(
Expand Down Expand Up @@ -176,13 +226,18 @@ def calibrate(self, *, zcdp_rho: float) -> OpenSetCategoricalInitializer:
@property
def dp_event(self) -> dp_accounting.DpEvent:
"""Returns the privacy event including thresholding delta."""
return _validate_mechanism(self.mechanism).dp_event
mechanism = self.mechanism
if mechanism is None:
raise ValueError('Must call calibrate() before using the mechanism.')
return mechanism.dp_event

def __call__(
self, rng: np.random.Generator, data: np.ndarray
) -> ColumnMeasurement:
"""Returns a differentially private measurement of the given data."""
mechanism = _validate_mechanism(self.mechanism)
mechanism = self.mechanism
if mechanism is None:
raise ValueError('Must call calibrate() before using the mechanism.')
# Map raw values to integer partition IDs for thresholding.
unique_values, inverse = np.unique(data, return_inverse=True)
selected_ids, counts, _ = mechanism(rng, inverse)
Expand Down
Loading
Loading