Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/data_and_terminology.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ attribute_domains = {

## Automatic Schema Deduction & Population

If you are using the [Scalable PipelineBackend API](scalable_beam_api.md) with structured formats like TFRecord, you do not always need to specify an exhaustive YAML file upfront.
If you are using the [Scalable PipelineBackend API](scalable_beam_api.md) with structured formats like TFRecord, you do not always need to specify an exhaustive YAML file upfront. Numerical attributes still require public lower and upper bounds before DP quantile derivation.

1. **Format Deduction**: DPSynth inspects a sample of records
to deduce column names and native types (`INT`, `STR`,
`FLOAT`).
2. **The Population Phase**: The pipeline runs distributed DP transformations
([`pipeline_transformations/`](../pipeline_transformations/README.md))
to privately compute quantiles for numerical columns and discover valid
categories for strings, fully populating the internal `DatasetDescriptor`
automatically.
to privately compute quantiles within public numerical bounds and discover
valid categories for strings, fully populating the internal
`DatasetDescriptor` automatically.
6 changes: 4 additions & 2 deletions docs/processing_lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ also referred to as an undirected probabilistic graphical model (PGM).
## Stage 1: Derivation and Initialization

When raw datasets are ingested without an exhaustive, explicit schema, DPSynth
must privately deduce domain boundaries before any modeling can begin.
must populate discrete domain metadata before any modeling can begin. Numerical
columns require public lower and upper bounds, and DPSynth privately derives
quantile cut points within those bounds.

* **Numerical
Derivation**: [`numerical_values_derivation.py`](../pipeline_transformations/numerical_values_derivation.py)
Expand All @@ -108,7 +110,7 @@ Continuous floating-point columns (like transaction amounts or timestamps)
cannot be modeled directly by discrete graphical models.

1. DPSynth runs distributed Differentially Private Quantile algorithms over the
data distribution.
data distribution using caller-provided public lower and upper bounds.
2. The continuous range is split into equal-frequency buckets (e.g., `K = 32`
bins by default). Bins are clustered where data is dense and expanded where
data is sparse.
Expand Down
7 changes: 4 additions & 3 deletions dpsynth/pipeline_transformations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ project. It interacts deeply with other core modules:

* **[dataset_descriptors/](../dataset_descriptors/)**: The transformations
here are the primary engine for the **Population Phase** of the
`DatasetDescriptor`. They securely derive missing metadata (like bounds or
categories) from raw data.
`DatasetDescriptor`. They securely derive missing metadata like categories
from raw data and compute DP quantiles within public numerical bounds.
* **[discrete_mechanisms/](../discrete_mechanisms/)**: While this directory
implements Beam-native versions of mechanisms, it frequently reuses the
shared mathematical utilities (e.g., domain compression logic) found in
Expand All @@ -41,7 +41,8 @@ Modules are organized based on their stage in the synthesis pipeline:
### 2. Domain Metadata Derivation (Population Phase)

These transformations securely fill an initially "uninitialized"
`DatasetDescriptor` with real-world data bounds.
`DatasetDescriptor` with categorical domains and numerical quantile cut points.
Numerical lower and upper bounds must be supplied as public metadata.

* **`categorical_values_derivation.py`**: Derives categorical value lists
using DP partition selection.
Expand Down
26 changes: 26 additions & 0 deletions dpsynth/pipeline_transformations/dataset_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def encode_dataset(
dp_engine,
numerical_attr_indices_to_derive,
num_quantiles,
_public_numerical_bounds(
descriptor, numerical_attr_indices_to_derive
),
)
) # (Collection[key, attribute, quantiles]) i.e.
# (Collection[str | int, domain.NumericalAttribute, tuple[float,..]])
Expand All @@ -99,6 +102,29 @@ def encode_fn(row, d: dataset_descriptor.DatasetDescriptor):
return encoded_data, descriptor


def _public_numerical_bounds(
descriptor: dataset_descriptor.DatasetDescriptor,
numerical_attr_indices_to_derive: list[int],
) -> dict[int, domain.NumericalAttribute]:
"""Returns public numerical bounds or raises if any are missing."""
result = {}
missing = []
for index in numerical_attr_indices_to_derive:
attribute = descriptor.attributes[index]
if attribute.numerical_attribute is None:
missing.append(attribute.name)
else:
result[index] = attribute.numerical_attribute

if missing:
raise ValueError(
'Numerical attributes require public lower and upper bounds before DP'
f' quantile derivation. Missing bounds for: {missing}.'
)

return result


def decode_dataset(
data: types.Collection[tuple[int, ...]],
descriptor: types.Collection[dataset_descriptor.DatasetDescriptor],
Expand Down
45 changes: 16 additions & 29 deletions dpsynth/pipeline_transformations/numerical_values_derivation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@

import copy
import dataclasses
from collections.abc import Mapping
from typing import Any, Generic, TypeAlias, TypeVar

from dpsynth import domain
from dpsynth.dataset_descriptors import dataset_descriptor
from dpsynth.pipeline_transformations import types
import numpy as np
import pipeline_dp
from pipeline_dp import pipeline_functions

Key: TypeAlias = TypeVar('Key', bound=str | int)
Record: TypeAlias = tuple[Any, ...] | dict[Key, Any]
Expand All @@ -46,6 +46,7 @@ def derive_numerical_attributes(
dp_engine: pipeline_dp.DPEngine,
attribute_keys_to_derive: list[Key],
num_quantile_buckets: int,
public_bounds: Mapping[Key, domain.NumericalAttribute] | None = None,
) -> types.Collection[NumericalAttributeOutput] | None:
"""Derives new NumericalAttribute objects by computing DP quantiles.

Expand All @@ -61,6 +62,8 @@ def derive_numerical_attributes(
num_quantile_buckets: The number of quantile buckets to use for
discretization. This means `num_quantile_buckets - 1` boundaries will be
computed.
public_bounds: Public numerical bounds for each attribute. These bounds
must be provided by the caller instead of inferred from sensitive data.

Returns:
A collection of `NumericalAttributeOutput` objects, where
Expand All @@ -72,35 +75,19 @@ def derive_numerical_attributes(
# No attributes to derive values for.
return None

def extract_field_values(row):
for key in attribute_keys_to_derive:
yield key, row[key]
public_bounds = dict(public_bounds or {})
missing_bounds = [
key for key in attribute_keys_to_derive if key not in public_bounds
]
if missing_bounds:
raise ValueError(
'Public numerical bounds must be provided for every numerical'
f' attribute. Missing bounds for: {missing_bounds}.'
)

key_val_pairs = backend.flat_map(
input_data,
extract_field_values,
'Extract key-value pairs for all keys in attribute_keys_to_derive.',
) # (key, value)

min_max = pipeline_functions.min_max_per_key(
backend, key_val_pairs, 'Compute min and max per key.'
) # (key, (min, max))

key_to_attr = backend.map_values(
min_max,
lambda min_max: domain.NumericalAttribute(
min_value=min_max[0], max_value=min_max[1]
),
'Create numerical attribute from min/max',
) # (key, domain.NumericalAttribute)

# Conversion of key_to_attr to a singleton collection
key_to_attr = backend.to_list(
key_to_attr, 'key_to_attr to list'
) # singleton [(key, domain.NumericalAttribute)]
key_to_attr = backend.map(
key_to_attr, dict, 'key_to_attr list to dict'
) # singleton {key: domain.NumericalAttribute}
key_to_attr = backend.to_collection(
[public_bounds], input_data, 'Create public numerical attributes'
)

quantiles = _compute_dp_quantiles(
input_data,
Expand Down
37 changes: 35 additions & 2 deletions tests/pipeline_transformations/dataset_encoding_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ def test_encode_decode_mixed(self, mock_derive_numerical):
),
# numerical attribute
dataset_descriptor.AttributeDescriptor(
name="col3", data_type=dataset_descriptor.DataType.FLOAT
name="col3",
data_type=dataset_descriptor.DataType.FLOAT,
numerical_attribute=domain.NumericalAttribute(
min_value=0.0, max_value=12.0, clip_to_range=False
),
),
],
data_record_converter=FakeDataRecordConverter(),
Expand Down Expand Up @@ -137,9 +141,38 @@ def test_encode_decode_mixed(self, mock_derive_numerical):
self.assertEqual(decoded_data, expected_decoded_data)

mock_derive_numerical.assert_called_once_with(
data, backend, dp_engine, [2], num_quantiles
data,
backend,
dp_engine,
[2],
num_quantiles,
{
2: domain.NumericalAttribute(
min_value=0.0, max_value=12.0, clip_to_range=False
)
},
)

def test_encode_dataset_raises_for_numerical_attribute_missing_bounds(self):
backend = pipeline_dp.LocalBackend()
accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=5.0, total_delta=1e-10
)
dp_engine = pipeline_dp.DPEngine(accountant, backend)
descriptors = dataset_descriptor.DatasetDescriptor(
attributes=[
dataset_descriptor.AttributeDescriptor(
name="col1", data_type=dataset_descriptor.DataType.FLOAT
),
],
data_record_converter=FakeDataRecordConverter(),
)

with self.assertRaisesRegex(ValueError, "Missing bounds for"):
dataset_encoding.encode_dataset(
[(1.0,), (2.0,)], backend, dp_engine, descriptors
)

def test_get_indices_to_discretisize(self):
descriptors = dataset_descriptor.DatasetDescriptor(
attributes=[
Expand Down
41 changes: 34 additions & 7 deletions tests/pipeline_transformations/numerical_values_derivation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ def test_derive_numerical_attributes(self):
dp_engine=dp_engine,
attribute_keys_to_derive=attribute_keys,
num_quantile_buckets=num_buckets,
public_bounds={
"feat1": domain.NumericalAttribute(min_value=0, max_value=10),
"feat2": domain.NumericalAttribute(min_value=0, max_value=110),
},
)

accountant.compute_budgets()
Expand All @@ -191,20 +195,20 @@ def test_derive_numerical_attributes(self):
self.assertIn("feat1", derived_attrs_dict)
self.assertIn("feat2", derived_attrs_dict)

# Assert feat1 (Range [0, 9])
# Assert feat1 uses the provided public range, not the exact data range.
feat1_output = derived_attrs_dict["feat1"]
self.assertEqual(feat1_output.attribute.min_value, 0)
self.assertEqual(feat1_output.attribute.max_value, 9)
self.assertEqual(feat1_output.attribute.max_value, 10)
self.assertLen(feat1_output.quantiles, 4)
expected_quantiles_feat1 = [1.8, 3.6, 5.4, 7.2]
self.assertSequenceAlmostEqual(
feat1_output.quantiles, expected_quantiles_feat1, delta=10
)

# Assert F2 (Range [10, 100])
# Assert F2 uses the provided public range, not the exact data range.
f2_output = derived_attrs_dict["feat2"]
self.assertEqual(f2_output.attribute.min_value, 10)
self.assertEqual(f2_output.attribute.max_value, 100)
self.assertEqual(f2_output.attribute.min_value, 0)
self.assertEqual(f2_output.attribute.max_value, 110)
self.assertLen(f2_output.quantiles, 4)

expected_quantiles_f2 = [28.0, 46.0, 64.0, 82.0]
Expand All @@ -226,6 +230,9 @@ def test_derive_numerical_attributes_empty_input(self):
dp_engine=dp_engine,
attribute_keys_to_derive=["field"],
num_quantile_buckets=3,
public_bounds={
"field": domain.NumericalAttribute(min_value=0, max_value=1)
},
)
accountant.compute_budgets()
derived_attrs_list = list(derived_attrs)
Expand All @@ -245,6 +252,9 @@ def test_derive_numerical_attributes_constant_val(self):
dp_engine=dp_engine,
attribute_keys_to_derive=["field"],
num_quantile_buckets=4,
public_bounds={
"field": domain.NumericalAttribute(min_value=0, max_value=20)
},
)

accountant.compute_budgets()
Expand All @@ -253,13 +263,30 @@ def test_derive_numerical_attributes_constant_val(self):
self.assertLen(derived_attrs_list, 1)
output = derived_attrs_list[0]
self.assertEqual(output.key, "field")
self.assertEqual(output.attribute.min_value, 10)
self.assertEqual(output.attribute.max_value, 10)
self.assertEqual(output.attribute.min_value, 0)
self.assertEqual(output.attribute.max_value, 20)
self.assertLen(output.quantiles, 3)
self.assertSequenceAlmostEqual(
output.quantiles, [10.0, 10.0, 10.0], delta=1.0
)

def test_derive_numerical_attributes_requires_public_bounds(self):
input_data = [{"field": i} for i in range(10)]
accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=self._EPSILON, total_delta=self._DELTA
)
backend = pipeline_dp.LocalBackend()
dp_engine = pipeline_dp.DPEngine(accountant, backend)

with self.assertRaisesRegex(ValueError, "Public numerical bounds"):
numerical_values_derivation.derive_numerical_attributes(
input_data=input_data,
backend=backend,
dp_engine=dp_engine,
attribute_keys_to_derive=["field"],
num_quantile_buckets=4,
)

def test_derive_numerical_attributes_no_attributes_to_derive(self):
input_data = [{"feat1": i, "feat2": (i + 1) * 10} for i in range(10)]
accountant = pipeline_dp.NaiveBudgetAccountant(
Expand Down