diff --git a/docs/data_and_terminology.md b/docs/data_and_terminology.md index d4928ac..9016c05 100644 --- a/docs/data_and_terminology.md +++ b/docs/data_and_terminology.md @@ -166,13 +166,13 @@ attribute_domains = { ## Automatic Schema Deduction & Population -If you are using the [Scalable PipelineBackend API](scalable_beam_api.md) with structured formats like TFRecord, you do not always need to specify an exhaustive YAML file upfront. +If you are using the [Scalable PipelineBackend API](scalable_beam_api.md) with structured formats like TFRecord, you do not always need to specify an exhaustive YAML file upfront. Numerical attributes still require public lower and upper bounds before DP quantile derivation. 1. **Format Deduction**: DPSynth inspects a sample of records to deduce column names and native types (`INT`, `STR`, `FLOAT`). 2. **The Population Phase**: The pipeline runs distributed DP transformations ([`pipeline_transformations/`](../pipeline_transformations/README.md)) - to privately compute quantiles for numerical columns and discover valid - categories for strings, fully populating the internal `DatasetDescriptor` - automatically. + to privately compute quantiles within public numerical bounds and discover + valid categories for strings, fully populating the internal + `DatasetDescriptor` automatically. diff --git a/docs/processing_lifecycle.md b/docs/processing_lifecycle.md index d712757..89e7e18 100644 --- a/docs/processing_lifecycle.md +++ b/docs/processing_lifecycle.md @@ -95,7 +95,9 @@ also referred to as an undirected probabilistic graphical model (PGM). ## Stage 1: Derivation and Initialization When raw datasets are ingested without an exhaustive, explicit schema, DPSynth -must privately deduce domain boundaries before any modeling can begin. +must populate discrete domain metadata before any modeling can begin. Numerical +columns require public lower and upper bounds, and DPSynth privately derives +quantile cut points within those bounds. * **Numerical Derivation**: [`numerical_values_derivation.py`](../pipeline_transformations/numerical_values_derivation.py) @@ -108,7 +110,7 @@ Continuous floating-point columns (like transaction amounts or timestamps) cannot be modeled directly by discrete graphical models. 1. DPSynth runs distributed Differentially Private Quantile algorithms over the - data distribution. + data distribution using caller-provided public lower and upper bounds. 2. The continuous range is split into equal-frequency buckets (e.g., `K = 32` bins by default). Bins are clustered where data is dense and expanded where data is sparse. diff --git a/dpsynth/pipeline_transformations/README.md b/dpsynth/pipeline_transformations/README.md index ac865f5..942db40 100644 --- a/dpsynth/pipeline_transformations/README.md +++ b/dpsynth/pipeline_transformations/README.md @@ -17,8 +17,8 @@ project. It interacts deeply with other core modules: * **[dataset_descriptors/](../dataset_descriptors/)**: The transformations here are the primary engine for the **Population Phase** of the - `DatasetDescriptor`. They securely derive missing metadata (like bounds or - categories) from raw data. + `DatasetDescriptor`. They securely derive missing metadata like categories + from raw data and compute DP quantiles within public numerical bounds. * **[discrete_mechanisms/](../discrete_mechanisms/)**: While this directory implements Beam-native versions of mechanisms, it frequently reuses the shared mathematical utilities (e.g., domain compression logic) found in @@ -41,7 +41,8 @@ Modules are organized based on their stage in the synthesis pipeline: ### 2. Domain Metadata Derivation (Population Phase) These transformations securely fill an initially "uninitialized" -`DatasetDescriptor` with real-world data bounds. +`DatasetDescriptor` with categorical domains and numerical quantile cut points. +Numerical lower and upper bounds must be supplied as public metadata. * **`categorical_values_derivation.py`**: Derives categorical value lists using DP partition selection. diff --git a/dpsynth/pipeline_transformations/dataset_encoding.py b/dpsynth/pipeline_transformations/dataset_encoding.py index 75ace8d..bf53ada 100644 --- a/dpsynth/pipeline_transformations/dataset_encoding.py +++ b/dpsynth/pipeline_transformations/dataset_encoding.py @@ -73,6 +73,9 @@ def encode_dataset( dp_engine, numerical_attr_indices_to_derive, num_quantiles, + _public_numerical_bounds( + descriptor, numerical_attr_indices_to_derive + ), ) ) # (Collection[key, attribute, quantiles]) i.e. # (Collection[str | int, domain.NumericalAttribute, tuple[float,..]]) @@ -99,6 +102,29 @@ def encode_fn(row, d: dataset_descriptor.DatasetDescriptor): return encoded_data, descriptor +def _public_numerical_bounds( + descriptor: dataset_descriptor.DatasetDescriptor, + numerical_attr_indices_to_derive: list[int], +) -> dict[int, domain.NumericalAttribute]: + """Returns public numerical bounds or raises if any are missing.""" + result = {} + missing = [] + for index in numerical_attr_indices_to_derive: + attribute = descriptor.attributes[index] + if attribute.numerical_attribute is None: + missing.append(attribute.name) + else: + result[index] = attribute.numerical_attribute + + if missing: + raise ValueError( + 'Numerical attributes require public lower and upper bounds before DP' + f' quantile derivation. Missing bounds for: {missing}.' + ) + + return result + + def decode_dataset( data: types.Collection[tuple[int, ...]], descriptor: types.Collection[dataset_descriptor.DatasetDescriptor], diff --git a/dpsynth/pipeline_transformations/numerical_values_derivation.py b/dpsynth/pipeline_transformations/numerical_values_derivation.py index e1854ac..622b6e3 100644 --- a/dpsynth/pipeline_transformations/numerical_values_derivation.py +++ b/dpsynth/pipeline_transformations/numerical_values_derivation.py @@ -20,6 +20,7 @@ import copy import dataclasses +from collections.abc import Mapping from typing import Any, Generic, TypeAlias, TypeVar from dpsynth import domain @@ -27,7 +28,6 @@ from dpsynth.pipeline_transformations import types import numpy as np import pipeline_dp -from pipeline_dp import pipeline_functions Key: TypeAlias = TypeVar('Key', bound=str | int) Record: TypeAlias = tuple[Any, ...] | dict[Key, Any] @@ -46,6 +46,7 @@ def derive_numerical_attributes( dp_engine: pipeline_dp.DPEngine, attribute_keys_to_derive: list[Key], num_quantile_buckets: int, + public_bounds: Mapping[Key, domain.NumericalAttribute] | None = None, ) -> types.Collection[NumericalAttributeOutput] | None: """Derives new NumericalAttribute objects by computing DP quantiles. @@ -61,6 +62,8 @@ def derive_numerical_attributes( num_quantile_buckets: The number of quantile buckets to use for discretization. This means `num_quantile_buckets - 1` boundaries will be computed. + public_bounds: Public numerical bounds for each attribute. These bounds + must be provided by the caller instead of inferred from sensitive data. Returns: A collection of `NumericalAttributeOutput` objects, where @@ -72,35 +75,19 @@ def derive_numerical_attributes( # No attributes to derive values for. return None - def extract_field_values(row): - for key in attribute_keys_to_derive: - yield key, row[key] + public_bounds = dict(public_bounds or {}) + missing_bounds = [ + key for key in attribute_keys_to_derive if key not in public_bounds + ] + if missing_bounds: + raise ValueError( + 'Public numerical bounds must be provided for every numerical' + f' attribute. Missing bounds for: {missing_bounds}.' + ) - key_val_pairs = backend.flat_map( - input_data, - extract_field_values, - 'Extract key-value pairs for all keys in attribute_keys_to_derive.', - ) # (key, value) - - min_max = pipeline_functions.min_max_per_key( - backend, key_val_pairs, 'Compute min and max per key.' - ) # (key, (min, max)) - - key_to_attr = backend.map_values( - min_max, - lambda min_max: domain.NumericalAttribute( - min_value=min_max[0], max_value=min_max[1] - ), - 'Create numerical attribute from min/max', - ) # (key, domain.NumericalAttribute) - - # Conversion of key_to_attr to a singleton collection - key_to_attr = backend.to_list( - key_to_attr, 'key_to_attr to list' - ) # singleton [(key, domain.NumericalAttribute)] - key_to_attr = backend.map( - key_to_attr, dict, 'key_to_attr list to dict' - ) # singleton {key: domain.NumericalAttribute} + key_to_attr = backend.to_collection( + [public_bounds], input_data, 'Create public numerical attributes' + ) quantiles = _compute_dp_quantiles( input_data, diff --git a/tests/pipeline_transformations/dataset_encoding_test.py b/tests/pipeline_transformations/dataset_encoding_test.py index 3de8bd6..34529e8 100644 --- a/tests/pipeline_transformations/dataset_encoding_test.py +++ b/tests/pipeline_transformations/dataset_encoding_test.py @@ -60,7 +60,11 @@ def test_encode_decode_mixed(self, mock_derive_numerical): ), # numerical attribute dataset_descriptor.AttributeDescriptor( - name="col3", data_type=dataset_descriptor.DataType.FLOAT + name="col3", + data_type=dataset_descriptor.DataType.FLOAT, + numerical_attribute=domain.NumericalAttribute( + min_value=0.0, max_value=12.0, clip_to_range=False + ), ), ], data_record_converter=FakeDataRecordConverter(), @@ -137,9 +141,38 @@ def test_encode_decode_mixed(self, mock_derive_numerical): self.assertEqual(decoded_data, expected_decoded_data) mock_derive_numerical.assert_called_once_with( - data, backend, dp_engine, [2], num_quantiles + data, + backend, + dp_engine, + [2], + num_quantiles, + { + 2: domain.NumericalAttribute( + min_value=0.0, max_value=12.0, clip_to_range=False + ) + }, + ) + + def test_encode_dataset_raises_for_numerical_attribute_missing_bounds(self): + backend = pipeline_dp.LocalBackend() + accountant = pipeline_dp.NaiveBudgetAccountant( + total_epsilon=5.0, total_delta=1e-10 + ) + dp_engine = pipeline_dp.DPEngine(accountant, backend) + descriptors = dataset_descriptor.DatasetDescriptor( + attributes=[ + dataset_descriptor.AttributeDescriptor( + name="col1", data_type=dataset_descriptor.DataType.FLOAT + ), + ], + data_record_converter=FakeDataRecordConverter(), ) + with self.assertRaisesRegex(ValueError, "Missing bounds for"): + dataset_encoding.encode_dataset( + [(1.0,), (2.0,)], backend, dp_engine, descriptors + ) + def test_get_indices_to_discretisize(self): descriptors = dataset_descriptor.DatasetDescriptor( attributes=[ diff --git a/tests/pipeline_transformations/numerical_values_derivation_test.py b/tests/pipeline_transformations/numerical_values_derivation_test.py index ad7e8e0..38394b2 100644 --- a/tests/pipeline_transformations/numerical_values_derivation_test.py +++ b/tests/pipeline_transformations/numerical_values_derivation_test.py @@ -182,6 +182,10 @@ def test_derive_numerical_attributes(self): dp_engine=dp_engine, attribute_keys_to_derive=attribute_keys, num_quantile_buckets=num_buckets, + public_bounds={ + "feat1": domain.NumericalAttribute(min_value=0, max_value=10), + "feat2": domain.NumericalAttribute(min_value=0, max_value=110), + }, ) accountant.compute_budgets() @@ -191,20 +195,20 @@ def test_derive_numerical_attributes(self): self.assertIn("feat1", derived_attrs_dict) self.assertIn("feat2", derived_attrs_dict) - # Assert feat1 (Range [0, 9]) + # Assert feat1 uses the provided public range, not the exact data range. feat1_output = derived_attrs_dict["feat1"] self.assertEqual(feat1_output.attribute.min_value, 0) - self.assertEqual(feat1_output.attribute.max_value, 9) + self.assertEqual(feat1_output.attribute.max_value, 10) self.assertLen(feat1_output.quantiles, 4) expected_quantiles_feat1 = [1.8, 3.6, 5.4, 7.2] self.assertSequenceAlmostEqual( feat1_output.quantiles, expected_quantiles_feat1, delta=10 ) - # Assert F2 (Range [10, 100]) + # Assert F2 uses the provided public range, not the exact data range. f2_output = derived_attrs_dict["feat2"] - self.assertEqual(f2_output.attribute.min_value, 10) - self.assertEqual(f2_output.attribute.max_value, 100) + self.assertEqual(f2_output.attribute.min_value, 0) + self.assertEqual(f2_output.attribute.max_value, 110) self.assertLen(f2_output.quantiles, 4) expected_quantiles_f2 = [28.0, 46.0, 64.0, 82.0] @@ -226,6 +230,9 @@ def test_derive_numerical_attributes_empty_input(self): dp_engine=dp_engine, attribute_keys_to_derive=["field"], num_quantile_buckets=3, + public_bounds={ + "field": domain.NumericalAttribute(min_value=0, max_value=1) + }, ) accountant.compute_budgets() derived_attrs_list = list(derived_attrs) @@ -245,6 +252,9 @@ def test_derive_numerical_attributes_constant_val(self): dp_engine=dp_engine, attribute_keys_to_derive=["field"], num_quantile_buckets=4, + public_bounds={ + "field": domain.NumericalAttribute(min_value=0, max_value=20) + }, ) accountant.compute_budgets() @@ -253,13 +263,30 @@ def test_derive_numerical_attributes_constant_val(self): self.assertLen(derived_attrs_list, 1) output = derived_attrs_list[0] self.assertEqual(output.key, "field") - self.assertEqual(output.attribute.min_value, 10) - self.assertEqual(output.attribute.max_value, 10) + self.assertEqual(output.attribute.min_value, 0) + self.assertEqual(output.attribute.max_value, 20) self.assertLen(output.quantiles, 3) self.assertSequenceAlmostEqual( output.quantiles, [10.0, 10.0, 10.0], delta=1.0 ) + def test_derive_numerical_attributes_requires_public_bounds(self): + input_data = [{"field": i} for i in range(10)] + accountant = pipeline_dp.NaiveBudgetAccountant( + total_epsilon=self._EPSILON, total_delta=self._DELTA + ) + backend = pipeline_dp.LocalBackend() + dp_engine = pipeline_dp.DPEngine(accountant, backend) + + with self.assertRaisesRegex(ValueError, "Public numerical bounds"): + numerical_values_derivation.derive_numerical_attributes( + input_data=input_data, + backend=backend, + dp_engine=dp_engine, + attribute_keys_to_derive=["field"], + num_quantile_buckets=4, + ) + def test_derive_numerical_attributes_no_attributes_to_derive(self): input_data = [{"feat1": i, "feat2": (i + 1) * 10} for i in range(10)] accountant = pipeline_dp.NaiveBudgetAccountant(