Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 204 additions & 98 deletions fair_mango/metrics/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,42 +66,26 @@ def __init__(
self,
data: Dataset,
) -> None:
sensitive = data.sensitive
real_target = data.real_target
predicted_target = data.predicted_target
positive_target = data.positive_target
df = data.df
if predicted_target == []:
predicted_target = None
if sensitive is None:
self.dataset = data
self.sensitive = data.sensitive
self.real_target = data.real_target
self.predicted_target = data.predicted_target
self.positive_target = data.positive_target
self.df = data.df

if self.predicted_target == []:
self.predicted_target = None
if self.sensitive is None:
raise AttributeError(
"'sensitive_group' attribute is required when data is pandas dataframe"
)

pairs = list(
self.pairs = list(
chain.from_iterable(
combinations(sensitive, r) for r in range(1, len(sensitive) + 1)
combinations(self.sensitive, r) for r in range(1, len(self.sensitive) + 1)
)
)

self.df = df
self.sensitive = sensitive
self.real_target = real_target
self.predicted_target = predicted_target
self.positive_target = positive_target
self.pairs = pairs

def _create_dataset_for_pair(self, pair: tuple[str, ...]) -> Dataset:
"""Create a Dataset instance for a given pair of sensitive attributes."""
return Dataset(
self.df,
pair,
self.real_target,
self.predicted_target,
self.positive_target,
)


class SupersetFairnessMetrics(Superset):
"""Calculate fairness metrics score for all combinations of sensitive
attributes and ranks them. This class computes all applicable fairness metrics across different
Expand Down Expand Up @@ -183,6 +167,34 @@ def __init__(
"false_positive_rate_ratio": FalsePositiveRateRatio,
}

def _rank_binary(self, dataset: Dataset, positive_outcome: str | None = None) -> list[SupersetFairnessRankingResult]:
"""Helper to run rank on a binary dataset."""
results = []
for pair in self.pairs:
pair_dataset = Dataset(
dataset.df,
pair,
dataset.real_target,
dataset.predicted_target,
dataset.positive_target,
)
rankings = {}
for metric_name, metric_class in self._dataset_metrics.items():
metric = metric_class(pair_dataset)
rankings[metric_name] = metric.rank()
if self.predicted_target is not None:
for metric_name, metric_class in self._model_metrics.items():
metric = metric_class(pair_dataset)
rankings[metric_name] = metric.rank()
results.append(
SupersetFairnessRankingResult(
sensitive_attributes=pair,
rankings=rankings,
positive_outcome=positive_outcome,
)
)
return results

def rank(self) -> list[SupersetFairnessRankingResult]:
"""Calculate fairness metrics rankings for all combinations of sensitive
attributes and all applicable fairness metrics.
Expand All @@ -194,26 +206,59 @@ def rank(self) -> list[SupersetFairnessRankingResult]:
- sensitive_attributes: List of sensitive attribute names for this combination
- rankings: Dictionary mapping metric names to their ranking results
"""
results = []
y_true = self.dataset.df[self.dataset.real_target]
is_multiclass = y_true.nunique() > 2

for pair in self.pairs:
dataset = self._create_dataset_for_pair(pair)
if not is_multiclass:
return self._rank_binary(self.dataset)

rankings = {}
results = []

for metric_name, metric_class in self._dataset_metrics.items():
metric = metric_class(dataset)
rankings[metric_name] = metric.rank()
classes = sorted(y_true.unique())
original_df = self.dataset.df.copy()

for positive_outcome in classes:
temp_df = original_df.copy()
real_target_col = self.dataset.real_target
pred_target_col = self.dataset.predicted_target
temp_df[real_target_col] = (temp_df[real_target_col] == positive_outcome).astype(int)
if pred_target_col and pred_target_col in temp_df.columns:
temp_df[pred_target_col] = (temp_df[pred_target_col] == positive_outcome).astype(int)
binary_dataset = Dataset(
df=temp_df,
sensitive=self.dataset.sensitive,
real_target=self.dataset.real_target,
predicted_target=self.dataset.predicted_target,
positive_target=1,
)
class_results = self._rank_binary(binary_dataset, str(positive_outcome))
results.extend(class_results)
return results

def _summary_binary(self, dataset: Dataset, positive_outcome: str | None = None) -> list[SupersetFairnessSummaryResult]:
"""Helper to run summary on a binary dataset."""
results = []
for pair in self.pairs:
pair_dataset = Dataset(
dataset.df,
pair,
dataset.real_target,
dataset.predicted_target,
dataset.positive_target,
)
summaries: dict[str, FairnessSummaryDifferenceResult | FairnessSummaryDifferenceFairResult | FairnessSummaryRatioResult | FairnessSummaryRatioFairResult] = {}
for metric_name, metric_class in self._dataset_metrics.items():
metric = metric_class(pair_dataset)
summaries[metric_name] = metric.summary()
if self.predicted_target is not None:
for metric_name, metric_class in self._model_metrics.items():
metric = metric_class(dataset)
rankings[metric_name] = metric.rank()

metric = metric_class(pair_dataset)
summaries[metric_name] = metric.summary()
results.append(
SupersetFairnessRankingResult(
sensitive_attributes=list(pair),
rankings=rankings,
SupersetFairnessSummaryResult(
sensitive_attributes=pair,
summaries=summaries,
positive_outcome=positive_outcome,
)
)

Expand All @@ -230,40 +275,68 @@ def summary(self) -> list[SupersetFairnessSummaryResult]:
- sensitive_attributes: List of sensitive attribute names for this combination
- summaries: Dictionary mapping metric names to their summary results
"""
results = []
y_true = self.dataset.df[self.dataset.real_target]
is_multiclass = y_true.nunique() > 2

for pair in self.pairs:
dataset = self._create_dataset_for_pair(pair)
if not is_multiclass:
return self._summary_binary(self.dataset)

summaries: dict[
str,
FairnessSummaryDifferenceResult
| FairnessSummaryDifferenceFairResult
| FairnessSummaryRatioResult
| FairnessSummaryRatioFairResult,
] = {}
results = []

for metric_name, metric_class in self._dataset_metrics.items():
metric = metric_class(dataset)
summaries[metric_name] = metric.summary()
classes = sorted(y_true.unique())
original_df = self.dataset.df.copy()

for positive_outcome in classes:
temp_df = original_df.copy()
real_target_col = self.dataset.real_target
pred_target_col = self.dataset.predicted_target
temp_df[real_target_col] = (temp_df[real_target_col] == positive_outcome).astype(int)
if pred_target_col and pred_target_col in temp_df.columns:
temp_df[pred_target_col] = (temp_df[pred_target_col] == positive_outcome).astype(int)
binary_dataset = Dataset(
df=temp_df,
sensitive=self.dataset.sensitive,
real_target=self.dataset.real_target,
predicted_target=self.dataset.predicted_target,
positive_target=1,
)
class_results = self._summary_binary(binary_dataset, str(positive_outcome))
results.extend(class_results)
return results

def _is_biased_binary(self, dataset: Dataset, thresholds: dict[str, float], positive_outcome: str | None = None) -> list[SupersetBiasResult]:
"""Helper to run is_biased on a binary dataset."""
effective_thresholds = DEFAULT_BIAS_THRESHOLDS | thresholds
results = []
for pair in self.pairs:
pair_dataset = Dataset(
dataset.df,
pair,
dataset.real_target,
dataset.predicted_target,
dataset.positive_target,
)
bias_results = {}
for metric_name, metric_class in self._dataset_metrics.items():
metric = metric_class(pair_dataset)
threshold = effective_thresholds[metric_name]
bias_results[metric_name] = metric.is_biased(threshold)
if self.predicted_target is not None:
for metric_name, metric_class in self._model_metrics.items():
metric = metric_class(dataset)
summaries[metric_name] = metric.summary()

metric = metric_class(pair_dataset)
threshold = effective_thresholds[metric_name]
bias_results[metric_name] = metric.is_biased(threshold)
results.append(
SupersetFairnessSummaryResult(
sensitive_attributes=list(pair),
summaries=summaries,
SupersetBiasResult(
sensitive_attributes=pair,
bias_results=bias_results,
positive_outcome=positive_outcome,
)
)

return results

def is_biased(
self, thresholds: dict[str, float] | None = None
) -> list[SupersetBiasResult]:
def is_biased(self, thresholds: dict[str, float] | None = None) -> list[SupersetBiasResult]:
"""Determine bias for all combinations of sensitive attributes and all
applicable fairness metrics.

Expand All @@ -282,34 +355,33 @@ def is_biased(
"""
if thresholds is None:
thresholds = {}

y_true = self.dataset.df[self.dataset.real_target]
is_multiclass = y_true.nunique() > 2

effective_thresholds = DEFAULT_BIAS_THRESHOLDS | thresholds
if not is_multiclass:
return self._is_biased_binary(self.dataset, thresholds)

results = []

for pair in self.pairs:
dataset = self._create_dataset_for_pair(pair)

bias_results = {}

for metric_name, metric_class in self._dataset_metrics.items():
metric = metric_class(dataset)
threshold = effective_thresholds[metric_name]
bias_results[metric_name] = metric.is_biased(threshold)

if self.predicted_target is not None:
for metric_name, metric_class in self._model_metrics.items():
metric = metric_class(dataset)
threshold = effective_thresholds[metric_name]
bias_results[metric_name] = metric.is_biased(threshold)

results.append(
SupersetBiasResult(
sensitive_attributes=list(pair),
bias_results=bias_results,
)
classes = sorted(y_true.unique())
original_df = self.dataset.df.copy()

for positive_outcome in classes:
temp_df = original_df.copy()
real_target_col = self.dataset.real_target
pred_target_col = self.dataset.predicted_target
temp_df[real_target_col] = (temp_df[real_target_col] == positive_outcome).astype(int)
if pred_target_col and pred_target_col in temp_df.columns:
temp_df[pred_target_col] = (temp_df[pred_target_col] == positive_outcome).astype(int)
binary_dataset = Dataset(
df=temp_df,
sensitive=self.dataset.sensitive,
real_target=self.dataset.real_target,
predicted_target=self.dataset.predicted_target,
positive_target=1,
)

class_results = self._is_biased_binary(binary_dataset, thresholds, str(positive_outcome))
results.extend(class_results)
return results


Expand Down Expand Up @@ -344,6 +416,28 @@ def __init__(
super().__init__(data)
self.metrics = [SelectionRate, PerformanceMetric, ConfusionMatrix]

def _evaluate_binary(self, dataset: Dataset, positive_outcome: str | None = None) -> list[SupersetPerformanceMetricsResult]:
"""Helper to run evaluate on a binary dataset."""
results = []
for pair in self.pairs:
pair_dataset = Dataset(
dataset.df,
pair,
dataset.real_target,
dataset.predicted_target,
dataset.positive_target,
)
combined_results = self._initialize_base_results(pair_dataset)
self._process_metrics_for_dataset(pair_dataset, combined_results)
results.append(
SupersetPerformanceMetricsResult(
sensitive_attributes=pair,
data=combined_results,
positive_outcome=positive_outcome,
)
)
return results

def evaluate(self) -> list[SupersetPerformanceMetricsResult]:
"""Calculate performance evaluation metrics for different subsets of
sensitive attributes. Ex:
Expand Down Expand Up @@ -403,20 +497,32 @@ def evaluate(self) -> list[SupersetPerformanceMetricsResult]:
}
]
"""
results = []
y_true = self.dataset.df[self.dataset.real_target]
is_multiclass = y_true.nunique() > 2

for pair in self.pairs:
dataset = self._create_dataset_for_pair(pair)
combined_results = self._initialize_base_results(dataset)
self._process_metrics_for_dataset(dataset, combined_results)
if not is_multiclass:
return self._evaluate_binary(self.dataset)

results.append(
SupersetPerformanceMetricsResult(
sensitive_attributes=pair,
data=combined_results,
)
results = []
classes = sorted(y_true.unique())
original_df = self.dataset.df.copy()

for positive_outcome in classes:
temp_df = original_df.copy()
real_target_col = self.dataset.real_target
pred_target_col = self.dataset.predicted_target
temp_df[real_target_col] = (temp_df[real_target_col] == positive_outcome).astype(int)
if pred_target_col and pred_target_col in temp_df.columns:
temp_df[pred_target_col] = (temp_df[pred_target_col] == positive_outcome).astype(int)
binary_dataset = Dataset(
df=temp_df,
sensitive=self.dataset.sensitive,
real_target=self.dataset.real_target,
predicted_target=self.dataset.predicted_target,
positive_target=1,
)

class_results = self._evaluate_binary(binary_dataset, str(positive_outcome))
results.extend(class_results)
return results

def _initialize_base_results(
Expand Down
Loading