diff --git a/dpsynth/domain.py b/dpsynth/domain.py index 67458df..ddce1b5 100644 --- a/dpsynth/domain.py +++ b/dpsynth/domain.py @@ -187,6 +187,13 @@ def exclusive_min_value(self) -> float: return self.min_value - 1 return math.nextafter(self.min_value, -math.inf) + @property + def exclusive_max_value(self) -> float: + """Returns the exclusive maximum value for this attribute.""" + if self.dtype == 'int': + return self.max_value + 1 + return math.nextafter(self.max_value, math.inf) + def standardize(self, value: Any) -> int | float | None: """Standardizes a value to one of the possible values.""" if self.clip_to_range: diff --git a/dpsynth/local_mode/initialization.py b/dpsynth/local_mode/initialization.py index cf05f3d..8645dc1 100644 --- a/dpsynth/local_mode/initialization.py +++ b/dpsynth/local_mode/initialization.py @@ -79,25 +79,77 @@ def calibrate(self, *, zcdp_rho: float) -> NumericalInitializer: """Returns a copy calibrated to the given zCDP budget.""" mechanism = primitives.DPQuantiles( lower=self.attribute.min_value, - upper=self.attribute.max_value, + upper=self.attribute.exclusive_max_value, num_partitions=self.num_partitions, + # Infer from attribute, not data.dtype: NaN promotes int to float. + integer_jitter=self.attribute.dtype == 'int', ).calibrate(zcdp_rho=zcdp_rho) return dataclasses.replace(self, mechanism=mechanism) + @property + def _zcdp_rho(self) -> float: + """Total zCDP rho, derived as sum(eps_i^2 / 8) over composed events.""" + event = self.dp_event # raises if not calibrated + assert isinstance(event, dp_accounting.ComposedDpEvent) + return sum(e.epsilon**2 / 8.0 for e in event.events) + @property def dp_event(self) -> dp_accounting.DpEvent: """Returns the composed privacy event for the quantile computation.""" return _validate_mechanism(self.mechanism).dp_event def __call__( - self, rng: np.random.Generator, data: np.ndarray + self, + rng: np.random.Generator, + data: np.ndarray, + *, + estimated_total: float | None = None, ) -> ColumnMeasurement: - """Returns a ColumnMeasurement with the discretization transform.""" + """Returns a ColumnMeasurement with the discretization transform. + + Args: + rng: A numpy random number generator. + data: 1D array of numerical data. + estimated_total: If provided, a heuristic one-way measurement is included + assuming a uniform distribution over the original bins. + + Returns: + A ColumnMeasurement with bin edges and optionally a heuristic measurement. + """ # Dedup: concentrated data can make quantiles return duplicate edges. - edges = _validate_mechanism(self.mechanism)(rng, data) - bin_edges = np.unique(np.asarray(edges, dtype=float)) + raw_edges = _validate_mechanism(self.mechanism)(rng, data) + raw_edges = np.asarray(raw_edges, dtype=float) + if self.attribute.dtype == 'int': + # Snap edges to the integer lattice. Bins are right-closed (left, + # right] and discretize uses searchsorted with side='left', so + # floor preserves the partition: edge 4.7 → floor 4 gives the + # same integer split {≤4} | {≥5} via (…, 4] | (4, …]. + raw_edges = np.floor(raw_edges) + bin_edges, edge_counts = np.unique(raw_edges, return_counts=True) + # For integer data with upper=max_value+1, edges can land at max_value + # after floor. Remove such edges and absorb their count into the last + # bin's weight so categorical_attribute_from_edges doesn't create a + # degenerate (max_value, max_value] tail bin. + max_val = self.attribute.max_value + if len(bin_edges) > 0 and bin_edges[-1] >= max_val: + tail_count = edge_counts[-1] + bin_edges = bin_edges[:-1] + edge_counts = edge_counts[:-1] + bin_weights = np.append(edge_counts, tail_count + 1) + else: + bin_weights = np.append(edge_counts, 1) cat_attr = vtx.categorical_attribute_from_edges(bin_edges, self.attribute) - return ColumnMeasurement(cat_attr, bin_edges) + + measurement = None + if estimated_total is not None: + rho = self._zcdp_rho + uniform_counts = bin_weights * (estimated_total / self.num_partitions) + stddev = 1.0 / np.sqrt(rho) + measurement = mbi.LinearMeasurement( + uniform_counts, (self.name,), stddev=stddev + ) + + return ColumnMeasurement(cat_attr, bin_edges, measurement=measurement) @dataclasses.dataclass diff --git a/dpsynth/local_mode/primitives.py b/dpsynth/local_mode/primitives.py index 37b65eb..5f977ca 100644 --- a/dpsynth/local_mode/primitives.py +++ b/dpsynth/local_mode/primitives.py @@ -110,6 +110,7 @@ def _median( upper: float, epsilon: float, jitter_multiple: float = 1e-4, + integer_jitter: bool = False, ) -> float: """Computes a differentially private median using the exponential mechanism. @@ -124,6 +125,8 @@ def _median( upper: Upper bound for the data. epsilon: Exponential mechanism privacy parameter. jitter_multiple: Multiplier for the jitter scale, relative to upper-lower. + integer_jitter: If True, use positive jitter U(0, 0.5) so that integer data + points are never pushed across an integer boundary before floor. Returns: A differentially private median estimate. @@ -139,10 +142,13 @@ def _median( return (lower + upper) / 2 return float(np.median(clamped_data)) - # Jitter size proportional to range. A small jitter makes duplicates unique - # and gives them non-zero length intervals, allowing them to be sampled. - jitter_scale = (upper - lower) * jitter_multiple - jitter = rng.uniform(-jitter_scale, jitter_scale, size=clamped_data.size) + # Jitter breaks ties, giving duplicate data points non-zero length intervals. + if integer_jitter: + # Positive jitter keeps floor() from shifting integers to a neighbor. + jitter = rng.uniform(0, 0.5, size=clamped_data.size) + else: + jitter_scale = (upper - lower) * jitter_multiple + jitter = rng.uniform(-jitter_scale, jitter_scale, size=clamped_data.size) jittered_data = np.clip(clamped_data + jitter, lower, upper) sorted_data = np.sort(jittered_data) @@ -203,6 +209,7 @@ def _quantiles( lower: float, upper: float, epsilon_levels: np.ndarray, + integer_jitter: bool = False, ) -> list[float]: """Computes uniformly spaced differentially private quantiles. @@ -217,6 +224,7 @@ def _quantiles( upper: Upper bound for the data. epsilon_levels: Per-level exponential mechanism epsilons, as returned by ``_quantile_epsilon_levels``. + integer_jitter: If True, use positive jitter U(0, 0.5) for integer data. Returns: A list of ``2 ** len(epsilon_levels) - 1`` sorted private quantile @@ -231,7 +239,14 @@ def quantiles_rec(current_data, curr_lower, curr_upper, current_depth): return [] eps = epsilon_levels[current_depth - 1] - med = _median(rng, current_data, curr_lower, curr_upper, eps) + med = _median( + rng, + current_data, + curr_lower, + curr_upper, + eps, + integer_jitter=integer_jitter, + ) left_mask = current_data <= med left_data = current_data[left_mask] @@ -471,11 +486,13 @@ class DPQuantiles(DPMechanism): lower: Lower bound for the data domain. upper: Upper bound for the data domain. num_partitions: Number of partitions (must be a power of 2). + integer_jitter: If True, use positive jitter U(0, 0.5) for integer data. """ lower: float upper: float num_partitions: int + integer_jitter: bool = False _epsilon_levels: Sequence[float] | None = dataclasses.field( default=None, repr=False ) @@ -527,7 +544,12 @@ def __call__(self, rng: np.random.Generator, data: np.ndarray) -> list[float]: if self._epsilon_levels is None: raise ValueError(_UNCALIBRATED_MSG.format(param='_epsilon_levels')) return _quantiles( - rng, data, self.lower, self.upper, np.asarray(self._epsilon_levels) + rng, + data, + self.lower, + self.upper, + np.asarray(self._epsilon_levels), + integer_jitter=self.integer_jitter, ) diff --git a/dpsynth/local_mode/vectorized_transformations.py b/dpsynth/local_mode/vectorized_transformations.py index 792ce84..b49445e 100644 --- a/dpsynth/local_mode/vectorized_transformations.py +++ b/dpsynth/local_mode/vectorized_transformations.py @@ -102,7 +102,11 @@ def categorical_attribute_from_edges( """ min_, max_ = attribute_domain.exclusive_min_value, attribute_domain.max_value full_edges = np.r_[min_, bin_edges, max_] - intervals = [f'({l}, {r}]' for l, r in zip(full_edges[:-1], full_edges[1:])] + if attribute_domain.dtype == 'int': + e = full_edges.astype(int) + intervals = [f'[{l+1}, {r}]' for l, r in zip(e[:-1], e[1:])] + else: + intervals = [f'({l}, {r}]' for l, r in zip(full_edges[:-1], full_edges[1:])] if not attribute_domain.clip_to_range: intervals = ['OUT_OF_DOMAIN'] + intervals return domain.CategoricalAttribute(intervals) diff --git a/tests/local_mode/initialization_test.py b/tests/local_mode/initialization_test.py index 8a8c8ab..aa13b65 100644 --- a/tests/local_mode/initialization_test.py +++ b/tests/local_mode/initialization_test.py @@ -13,6 +13,7 @@ # limitations under the License. from absl.testing import absltest +from absl.testing import parameterized import dp_accounting from dpsynth import domain from dpsynth.local_mode import initialization @@ -96,6 +97,369 @@ def test_numerical_initializer_integer_data(self): # Domain size may be < 8 due to dedup, but must be >= 2. self.assertGreaterEqual(result.categorical_attribute.size, 2) + def test_numerical_initializer_integer_edges_are_floored(self): + """Integer attributes should produce integer-valued bin edges.""" + attr = domain.NumericalAttribute(min_value=0, max_value=100, dtype='int') + rng = np.random.default_rng(42) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=4, attribute=attr + ) + data = np.arange(100) + result = initializer.calibrate(zcdp_rho=100.0)(rng, data) + # All edges should be integers (floor was applied). + np.testing.assert_array_equal(result.bin_edges, np.floor(result.bin_edges)) + # Edges must be within [min_value, max_value - 1]. + self.assertGreaterEqual(result.bin_edges[0], 0) + self.assertLess(result.bin_edges[-1], 100) + + def test_numerical_initializer_measurement_with_merged_bins(self): + """When integer edges collapse, merged bins get proportionally more mass.""" + attr = domain.NumericalAttribute(min_value=0, max_value=100, dtype='int') + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=8, attribute=attr + ) + # Concentrated data will cause edge collisions after floor. + data = np.array([50] * 100 + [1, 99]) + result = initializer.calibrate(zcdp_rho=1.0)( + rng, data, estimated_total=100.0 + ) + self.assertIsNotNone(result.measurement) + # Measurement counts should sum to estimated_total. + np.testing.assert_allclose( + result.measurement.noisy_measurement.sum(), 100.0 + ) + + def test_numerical_initializer_measurement_with_estimated_total(self): + attr = domain.NumericalAttribute(min_value=0, max_value=10) + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='num_col', num_partitions=4, attribute=attr + ) + data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]) + result = initializer.calibrate(zcdp_rho=1.0)( + rng, data, estimated_total=100.0 + ) + + self.assertIsNotNone(result.measurement) + # Measurement should be uniform: 100.0 / num_bins for each bin. + num_bins = result.categorical_attribute.size + expected_count = 100.0 / num_bins + np.testing.assert_allclose( + result.measurement.noisy_measurement, + np.full(num_bins, expected_count), + ) + self.assertEqual(result.measurement.clique, ('num_col',)) + # stddev should be 1/sqrt(rho) = 1/sqrt(1.0) = 1.0 + self.assertAlmostEqual(result.measurement.stddev, 1.0) + + def test_numerical_initializer_no_measurement_without_estimated_total(self): + attr = domain.NumericalAttribute(min_value=0, max_value=10) + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=4, attribute=attr + ) + data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]) + result = initializer.calibrate(zcdp_rho=1.0)(rng, data) + self.assertIsNone(result.measurement) + + def test_integer_edges_at_max_value_absorbed_into_last_bin(self): + """Edges at max_value are removed; their count goes to the last bin.""" + attr = domain.NumericalAttribute(min_value=0, max_value=10, dtype='int') + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=8, attribute=attr + ) + # All data at max_value: all edges should land at or near max_value. + data = np.array([10] * 100) + result = initializer.calibrate(zcdp_rho=100.0)( + rng, data, estimated_total=100.0 + ) + # No edge should equal max_value (they get absorbed). + if len(result.bin_edges) > 0: + self.assertLess(result.bin_edges[-1], 10) + # Measurement counts must still sum to estimated_total. + np.testing.assert_allclose( + result.measurement.noisy_measurement.sum(), 100.0 + ) + # The last bin (containing max_value=10) should get the most mass. + counts = result.measurement.noisy_measurement + self.assertEqual(np.argmax(counts), len(counts) - 1) + + def test_bin_weights_sum_to_num_partitions(self): + """bin_weights must always sum to num_partitions regardless of dedup.""" + attr = domain.NumericalAttribute(min_value=0, max_value=20, dtype='int') + for seed in range(10): + rng = np.random.default_rng(seed) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=8, attribute=attr + ) + data = np.array([5] * 50 + [15] * 50) + result = initializer.calibrate(zcdp_rho=1.0)( + rng, data, estimated_total=100.0 + ) + # Sum of measurement counts = estimated_total = num_partitions * per_bin. + np.testing.assert_allclose( + result.measurement.noisy_measurement.sum(), + 100.0, + err_msg=f'seed={seed}: counts do not sum to estimated_total', + ) + + def test_integer_jitter_prevents_spurious_splits(self): + """Positive jitter should prevent edges from splitting across integers.""" + attr = domain.NumericalAttribute(min_value=0, max_value=100, dtype='int') + rng = np.random.default_rng(42) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=4, attribute=attr + ) + # Uniform data: with high budget, edges should land at 25, 50, 75. + data = np.arange(101) + result = initializer.calibrate(zcdp_rho=1000.0)(rng, data) + # With 4 partitions and 3 edges, no dedup should be needed. + self.assertLen(result.bin_edges, 3) + # All edges should be integers. + np.testing.assert_array_equal(result.bin_edges, np.floor(result.bin_edges)) + + def test_integer_heterogeneous_data_buckets(self): + """Heterogeneous integer data produces sensible bucket partitioning.""" + attr = domain.NumericalAttribute(min_value=0, max_value=10, dtype='int') + rng = np.random.default_rng(42) + initializer = initialization.NumericalInitializer( + name='x', num_partitions=4, attribute=attr + ) + # Deliberately lumpy distribution: 45 points across 4 distinct values. + data = np.array([0] * 10 + [3] * 10 + [5] * 17 + [6] * 8) + result = initializer.calibrate(zcdp_rho=np.inf)( + rng, data, estimated_total=len(data) + ) + # Edges should be integers strictly inside [min_value, max_value). + for e in result.bin_edges: + self.assertEqual(e, int(e), f'edge {e} is not an integer') + self.assertGreaterEqual(e, 0) + self.assertLess(e, 10) + + # Discretize and verify every data point is assigned. + encoded = vtx.discretize(data, result.bin_edges, attr) + self.assertEqual(encoded.shape, data.shape) + num_bins = result.categorical_attribute.size + true_counts = np.bincount(encoded, minlength=num_bins) + self.assertLen(data, true_counts.sum()) + # Bins covering the data range should be non-empty; trailing bins + # beyond the data (e.g. [7, 10] when max data is 6) may be empty. + self.assertGreater( + np.count_nonzero(true_counts), + 1, + f'too few occupied bins: counts={true_counts},' + f' edges={result.bin_edges}', + ) + + +class MeasurementApproximationTest(parameterized.TestCase): + """Property test: measurement counts ≈ true histogram. + + The heuristic uniform measurement should satisfy + L1(measurement, true_histogram) < max(3 / sqrt(rho), 2 - 1/K) + where the 3/sqrt(rho) term covers quantile noise and 2-1/K covers the + worst-case uniform-vs-delta misspecification error (K = num_bins). + """ + + @parameterized.named_parameters( + # --- High budget (rho=1000): misspecification-dominated --- + dict( + testcase_name='uniform_int', + attr=domain.NumericalAttribute( + min_value=0, max_value=50, dtype='int' + ), + data=np.arange(51), + num_partitions=4, + rho=1000.0, + ), + dict( + testcase_name='heterogeneous_int', + attr=domain.NumericalAttribute( + min_value=0, max_value=10, dtype='int' + ), + data=np.array([0] * 10 + [3] * 10 + [5] * 17 + [6] * 8 + [9] * 5), + num_partitions=4, + rho=1000.0, + ), + dict( + testcase_name='uniform_float', + attr=domain.NumericalAttribute( + min_value=0, max_value=100, dtype='float' + ), + data=np.linspace(0, 100, 200), + num_partitions=8, + rho=1000.0, + ), + dict( + testcase_name='boundary_heavy_int', + attr=domain.NumericalAttribute( + min_value=0, max_value=100, dtype='int' + ), + data=np.array([0] * 30 + [100] * 30 + [50] * 40), + num_partitions=4, + rho=1000.0, + ), + dict( + testcase_name='bimodal_int', + attr=domain.NumericalAttribute( + min_value=0, max_value=100, dtype='int' + ), + data=np.array([10] * 50 + [90] * 50), + num_partitions=8, + rho=1000.0, + ), + dict( + testcase_name='sparse_int', + attr=domain.NumericalAttribute( + min_value=0, max_value=1000, dtype='int' + ), + data=np.array([1] * 40 + [500] * 10 + [999] * 50), + num_partitions=4, + rho=1000.0, + ), + # --- Low budget (rho=0.5): noise-dominated --- + dict( + testcase_name='uniform_int_low_rho', + attr=domain.NumericalAttribute( + min_value=0, max_value=50, dtype='int' + ), + data=np.arange(51), + num_partitions=4, + rho=0.5, + ), + dict( + testcase_name='uniform_float_low_rho', + attr=domain.NumericalAttribute( + min_value=0, max_value=100, dtype='float' + ), + data=np.linspace(0, 100, 200), + num_partitions=8, + rho=0.5, + ), + dict( + testcase_name='heterogeneous_int_low_rho', + attr=domain.NumericalAttribute( + min_value=0, max_value=10, dtype='int' + ), + data=np.array([0] * 10 + [3] * 10 + [5] * 17 + [6] * 8 + [9] * 5), + num_partitions=4, + rho=0.5, + ), + ) + def test_measurement_approximates_true_histogram( + self, attr, data, num_partitions, rho + ): + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='x', num_partitions=num_partitions, attribute=attr + ) + result = initializer.calibrate(zcdp_rho=rho)( + rng, data, estimated_total=len(data) + ) + # -- Structural checks (must always hold) -- + num_bins = result.categorical_attribute.size + self.assertGreaterEqual(num_bins, 2) + measurement = result.measurement + self.assertIsNotNone(measurement) + # Measurement counts must sum to estimated_total. + np.testing.assert_allclose(measurement.noisy_measurement.sum(), len(data)) + # All measurement counts should be positive. + self.assertTrue( + np.all(measurement.noisy_measurement > 0), + f'non-positive measurement counts: {measurement.noisy_measurement}', + ) + # -- Statistical approximation check -- + encoded = vtx.discretize(data, result.bin_edges, attr) + true_counts = np.bincount(encoded, minlength=num_bins).astype(float) + meas_counts = measurement.noisy_measurement + true_prob = true_counts / true_counts.sum() + meas_prob = meas_counts / meas_counts.sum() + l1_dist = np.abs(true_prob - meas_prob).sum() + # 3/sqrt(rho) covers quantile noise; 2-1/K covers uniform-vs-delta. + max_l1 = max(3.0 / np.sqrt(rho), 2.0 - 1.0 / num_bins) + self.assertLess( + l1_dist, + max_l1, + f'Measurement too far from true histogram (L1={l1_dist:.3f},' + f' bound={max_l1:.3f}, rho={rho}):\n' + f' true_counts = {true_counts}\n' + f' meas_counts = {meas_counts}', + ) + + def test_measurement_property_random_configs(self): + """Randomized property test: max(3/sqrt(rho), 2-1/K) over many configs.""" + master_rng = np.random.default_rng(20260619) + num_trials = 50 + + for trial in range(num_trials): + with self.subTest(trial=trial): + # -- Random configuration -- + rho = float(10 ** master_rng.uniform(-1, 3)) # 0.1 to 1000 + num_partitions = int(master_rng.choice([2, 4, 8, 16])) + min_val = int(master_rng.integers(0, 50)) + max_val = min_val + int(master_rng.integers(5, 200)) + is_int = bool(master_rng.random() < 0.5) + + # -- Random data as a mixture of 1-5 point masses -- + num_modes = int(master_rng.integers(1, 6)) + modes = master_rng.integers(min_val, max_val + 1, size=num_modes) + weights = master_rng.dirichlet(np.ones(num_modes)) + n = int(master_rng.integers(50, 300)) + counts = np.round(weights * n).astype(int) + counts[-1] = n - counts[:-1].sum() # ensure exact total + data = np.concatenate([np.full(c, m) for c, m in zip(counts, modes)]) + if not is_int: + data = data.astype(float) + + dtype = 'int' if is_int else 'float' + attr = domain.NumericalAttribute( + min_value=min_val, max_value=max_val, dtype=dtype + ) + + # -- Run initializer -- + rng = np.random.default_rng(trial) + initializer = initialization.NumericalInitializer( + name='x', num_partitions=num_partitions, attribute=attr + ) + result = initializer.calibrate(zcdp_rho=rho)( + rng, data, estimated_total=len(data) + ) + + # -- Structural checks -- + measurement = result.measurement + self.assertIsNotNone(measurement) + np.testing.assert_allclose( + measurement.noisy_measurement.sum(), + len(data), + err_msg=f'trial={trial}: counts do not sum to N', + ) + self.assertTrue( + np.all(measurement.noisy_measurement > 0), + f'trial={trial}: non-positive counts ' + f'{measurement.noisy_measurement}', + ) + + # -- L1 property check -- + num_bins = result.categorical_attribute.size + encoded = vtx.discretize(data, result.bin_edges, attr) + true_counts = np.bincount(encoded, minlength=num_bins).astype(float) + meas_counts = measurement.noisy_measurement + true_prob = true_counts / true_counts.sum() + meas_prob = meas_counts / meas_counts.sum() + l1_dist = np.abs(true_prob - meas_prob).sum() + max_l1 = max(3.0 / np.sqrt(rho), 2.0 - 1.0 / num_bins) + self.assertLess( + l1_dist, + max_l1, + f'trial={trial} (rho={rho:.2f}, K={num_partitions},' + f' modes={modes}, is_int={is_int}):\n' + f' L1={l1_dist:.3f}, bound={max_l1:.3f}\n' + f' true_counts={true_counts}\n' + f' meas_counts={meas_counts}', + ) + class CategoricalInitializerTest(absltest.TestCase):