From 9baf4a92d7f5b3c77032e864bc923a9332af904b Mon Sep 17 00:00:00 2001 From: Josh Huang Date: Wed, 4 Mar 2026 20:15:37 +0800 Subject: [PATCH] Standardize _get_scores() to return (np.ndarray, np.ndarray) across all strategies --- libact/base/interfaces.py | 21 +- libact/query_strategies/bald.py | 56 +-- libact/query_strategies/coreset.py | 74 ++- .../query_strategies/density_weighted_meta.py | 21 +- .../epsilon_uncertainty_sampling.py | 12 +- libact/query_strategies/hintsvm.py | 29 +- .../query_strategies/information_density.py | 16 +- libact/query_strategies/query_by_committee.py | 38 +- libact/query_strategies/quire.py | 46 +- libact/query_strategies/random_sampling.py | 16 + libact/query_strategies/tests/meson.build | 1 + libact/query_strategies/tests/test_bald.py | 4 +- libact/query_strategies/tests/test_coreset.py | 9 +- .../query_strategies/tests/test_get_scores.py | 422 ++++++++++++++++++ .../tests/test_information_density.py | 12 +- .../query_strategies/uncertainty_sampling.py | 12 +- libact/query_strategies/variance_reduction.py | 14 + 17 files changed, 645 insertions(+), 158 deletions(-) create mode 100644 libact/query_strategies/tests/test_get_scores.py diff --git a/libact/base/interfaces.py b/libact/base/interfaces.py index ac1b4f09..6924e50a 100644 --- a/libact/base/interfaces.py +++ b/libact/base/interfaces.py @@ -39,16 +39,27 @@ def update(self, entry_id, label): pass def _get_scores(self): - """Return the score used for making query, the larger the better. Read-only. + """Return acquisition scores for all unlabeled samples. - No modification to the internal states. + Subclasses should override this method to enable batch mode queries + and score-based strategy composition. Returns ------- - (ask_id, scores): list of tuple (int, float) - The index of the next unlabeled sample to be queried and the score assigned. + entry_ids : np.ndarray, shape (n_unlabeled,) + Global entry IDs of unlabeled samples. + scores : np.ndarray, shape (n_unlabeled,) + Acquisition scores. Higher = more informative. + + Raises + ------ + NotImplementedError + If the strategy does not support per-sample scoring. """ - pass + raise NotImplementedError( + f"{self.__class__.__name__} does not implement _get_scores(). " + "This is required for batch mode and score-based composition." + ) @abstractmethod def make_query(self): diff --git a/libact/query_strategies/bald.py b/libact/query_strategies/bald.py index 61702043..01eb0e00 100644 --- a/libact/query_strategies/bald.py +++ b/libact/query_strategies/bald.py @@ -186,44 +186,6 @@ def update(self, entry_id, label): # Retrain ensemble with the new labeled data self._train_ensemble() - @inherit_docstring_from(QueryStrategy) - def make_query(self): - dataset = self.dataset - unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries() - X_pool = np.asarray(X_pool) - - if len(unlabeled_entry_ids) == 0: - raise ValueError("No unlabeled samples available") - - # Get predictions from all models - all_proba = [] - for model in self.models: - proba = model.predict_proba(X_pool) - all_proba.append(np.asarray(proba)) - - all_proba = np.array(all_proba) # shape: (n_models, n_samples, n_classes) - - # Calculate BALD score: H[mean(P)] - mean(H[P]) - # Mean probability across ensemble - mean_proba = np.mean(all_proba, axis=0) # shape: (n_samples, n_classes) - - # Entropy of mean predictions (total uncertainty) - entropy_mean = self._entropy(mean_proba) # shape: (n_samples,) - - # Mean entropy across models (expected data uncertainty) - entropies = np.array([self._entropy(p) for p in all_proba]) # shape: (n_models, n_samples) - mean_entropy = np.mean(entropies, axis=0) # shape: (n_samples,) - - # BALD score = mutual information - bald_scores = entropy_mean - mean_entropy # shape: (n_samples,) - - # Select sample with highest BALD score (break ties randomly) - max_score = np.max(bald_scores) - candidates = np.where(np.isclose(bald_scores, max_score))[0] - selected_idx = self.random_state_.choice(candidates) - - return unlabeled_entry_ids[selected_idx] - def _get_scores(self): """Return BALD scores for all unlabeled samples.""" dataset = self.dataset @@ -231,7 +193,7 @@ def _get_scores(self): X_pool = np.asarray(X_pool) if len(unlabeled_entry_ids) == 0: - return [] + return np.array([], dtype=int), np.array([], dtype=float) # Get predictions from all models all_proba = np.array([ @@ -245,4 +207,18 @@ def _get_scores(self): mean_entropy = np.mean(entropies, axis=0) bald_scores = entropy_mean - mean_entropy - return list(zip(unlabeled_entry_ids, bald_scores)) + return np.asarray(unlabeled_entry_ids), bald_scores + + @inherit_docstring_from(QueryStrategy) + def make_query(self): + unlabeled_entry_ids, bald_scores = self._get_scores() + + if len(unlabeled_entry_ids) == 0: + raise ValueError("No unlabeled samples available") + + # Select sample with highest BALD score (break ties randomly) + max_score = np.max(bald_scores) + candidates = np.where(np.isclose(bald_scores, max_score))[0] + selected_idx = self.random_state_.choice(candidates) + + return unlabeled_entry_ids[selected_idx] diff --git a/libact/query_strategies/coreset.py b/libact/query_strategies/coreset.py index eca967b4..80932847 100644 --- a/libact/query_strategies/coreset.py +++ b/libact/query_strategies/coreset.py @@ -79,67 +79,30 @@ def __init__(self, dataset, **kwargs): random_state = kwargs.pop('random_state', None) self.random_state_ = seed_random_state(random_state) - @inherit_docstring_from(QueryStrategy) - def make_query(self): - dataset = self.dataset - unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries() - X_pool = np.asarray(X_pool) - - if len(unlabeled_entry_ids) == 0: - raise ValueError("No unlabeled samples available") - - # Get labeled data - labeled_entries = dataset.get_labeled_entries() - X_labeled = np.asarray(labeled_entries[0]) - - # Fallback to random if no labeled data - if len(X_labeled) == 0: - idx = self.random_state_.randint(0, len(unlabeled_entry_ids)) - return unlabeled_entry_ids[idx] - - # Transform features if transformer is provided - if self.transformer is not None: - X_pool_t = np.asarray(self.transformer.transform(X_pool)) - X_labeled_t = np.asarray(self.transformer.transform(X_labeled)) - else: - X_pool_t = X_pool - X_labeled_t = X_labeled - - # Compute pairwise distances: (n_unlabeled, n_labeled) - dist_matrix = cdist(X_pool_t, X_labeled_t, metric=self.metric) - - # For each unlabeled point, find minimum distance to any labeled point - min_distances = np.min(dist_matrix, axis=1) - - # Select the unlabeled point with maximum min-distance (farthest) - max_dist = np.max(min_distances) - candidates = np.where(np.isclose(min_distances, max_dist))[0] - selected_idx = self.random_state_.choice(candidates) - - return unlabeled_entry_ids[selected_idx] - def _get_scores(self): """Return min-distances to labeled set for all unlabeled samples. Returns ------- - scores : list of (entry_id, score) tuples - Each score is the minimum distance from that unlabeled point - to any labeled point. Higher score means more informative. + entry_ids : np.ndarray, shape (n_unlabeled,) + Global entry IDs of unlabeled samples. + scores : np.ndarray, shape (n_unlabeled,) + Min-distance from each unlabeled point to any labeled point. + Higher score means more informative. """ dataset = self.dataset unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries() X_pool = np.asarray(X_pool) if len(unlabeled_entry_ids) == 0: - return [] + return np.array([], dtype=int), np.array([], dtype=float) labeled_entries = dataset.get_labeled_entries() X_labeled = np.asarray(labeled_entries[0]) if len(X_labeled) == 0: - return list(zip(unlabeled_entry_ids, - [float('inf')] * len(unlabeled_entry_ids))) + return np.asarray(unlabeled_entry_ids), \ + np.full(len(unlabeled_entry_ids), float('inf')) if self.transformer is not None: X_pool_t = np.asarray(self.transformer.transform(X_pool)) @@ -151,4 +114,23 @@ def _get_scores(self): dist_matrix = cdist(X_pool_t, X_labeled_t, metric=self.metric) min_distances = np.min(dist_matrix, axis=1) - return list(zip(unlabeled_entry_ids, min_distances)) + return np.asarray(unlabeled_entry_ids), min_distances + + @inherit_docstring_from(QueryStrategy) + def make_query(self): + unlabeled_entry_ids, min_distances = self._get_scores() + + if len(unlabeled_entry_ids) == 0: + raise ValueError("No unlabeled samples available") + + # Fallback to random if no labeled data (scores are all inf) + if np.all(np.isinf(min_distances)): + idx = self.random_state_.randint(0, len(unlabeled_entry_ids)) + return unlabeled_entry_ids[idx] + + # Select the unlabeled point with maximum min-distance (farthest) + max_dist = np.max(min_distances) + candidates = np.where(np.isclose(min_distances, max_dist))[0] + selected_idx = self.random_state_.choice(candidates) + + return unlabeled_entry_ids[selected_idx] diff --git a/libact/query_strategies/density_weighted_meta.py b/libact/query_strategies/density_weighted_meta.py index de3fdafc..4302348d 100644 --- a/libact/query_strategies/density_weighted_meta.py +++ b/libact/query_strategies/density_weighted_meta.py @@ -99,10 +99,12 @@ def update(self, entry_id, label): @inherit_docstring_from(QueryStrategy) def _get_scores(self): dataset = self.dataset - X, _ = zip(*dataset.data) - scores = self.base_query_strategy._get_scores() - _, X_pool = dataset.get_unlabeled_entries() - unlabeled_entry_ids, base_scores = zip(*scores) + X, _ = dataset.get_entries() + unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries() + + if len(unlabeled_entry_ids) == 0: + return np.array([], dtype=int), np.array([], dtype=float) + _, base_scores = self.base_query_strategy._get_scores() self.clustering_method.fit(X) pool_cluster = self.clustering_method.predict(X_pool) @@ -118,13 +120,16 @@ def _get_scores(self): similarity = np.asarray(similarity) scores = base_scores * similarity**self.beta - return zip(unlabeled_entry_ids, scores) + return np.asarray(unlabeled_entry_ids), np.asarray(scores) @inherit_docstring_from(QueryStrategy) def make_query(self): - dataset = self.dataset + unlabeled_entry_ids, scores = self._get_scores() + + if len(unlabeled_entry_ids) == 0: + raise ValueError("No unlabeled samples available") - unlabeled_entry_ids, scores = zip(*self._get_scores()) - ask_id = self.random_state_.choice(np.where(scores == np.max(scores))[0]) + ask_id = self.random_state_.choice( + np.where(np.isclose(scores, np.max(scores)))[0]) return unlabeled_entry_ids[ask_id] diff --git a/libact/query_strategies/epsilon_uncertainty_sampling.py b/libact/query_strategies/epsilon_uncertainty_sampling.py index 5527bcb2..9baf455c 100644 --- a/libact/query_strategies/epsilon_uncertainty_sampling.py +++ b/libact/query_strategies/epsilon_uncertainty_sampling.py @@ -170,7 +170,10 @@ def _get_scores(self): Returns ------- - scores : list of (entry_id, score) tuples + entry_ids : np.ndarray, shape (n_unlabeled,) + Global entry IDs of unlabeled samples. + scores : np.ndarray, shape (n_unlabeled,) + Uncertainty scores. Higher = more uncertain. """ dataset = self.dataset self.model.train(dataset) @@ -178,10 +181,10 @@ def _get_scores(self): X_pool = np.asarray(X_pool) if len(unlabeled_entry_ids) == 0: - return [] + return np.array([], dtype=int), np.array([], dtype=float) scores = self._get_uncertainty_scores(X_pool) - return list(zip(unlabeled_entry_ids, scores)) + return np.asarray(unlabeled_entry_ids), np.asarray(scores) @inherit_docstring_from(QueryStrategy) def make_query(self, return_score=False): @@ -207,7 +210,8 @@ def make_query(self, return_score=False): ask_id = unlabeled_entry_ids[selected_idx] if return_score: - return ask_id, self._get_scores() + entry_ids, scores = self._get_scores() + return ask_id, list(zip(entry_ids, scores)) else: return ask_id diff --git a/libact/query_strategies/hintsvm.py b/libact/query_strategies/hintsvm.py index f6252cc7..71c4ab7b 100644 --- a/libact/query_strategies/hintsvm.py +++ b/libact/query_strategies/hintsvm.py @@ -129,10 +129,22 @@ def __init__(self, *args, **kwargs): self.svm_params['C'] = self.cl - @inherit_docstring_from(QueryStrategy) - def make_query(self): + def _get_scores(self): + """Return absolute decision values for all unlabeled samples. + + Returns + ------- + entry_ids : np.ndarray, shape (n_unlabeled,) + Global entry IDs of unlabeled samples. + scores : np.ndarray, shape (n_unlabeled,) + Absolute decision values from HintSVM. Higher = more informative. + """ dataset = self.dataset unlabeled_entry_ids, unlabeled_pool = dataset.get_unlabeled_entries() + + if len(unlabeled_entry_ids) == 0: + return np.array([], dtype=int), np.array([], dtype=float) + labeled_pool, y = dataset.get_labeled_entries() if len(np.unique(y)) > 2: raise ValueError("HintSVM query strategy support binary class " @@ -155,6 +167,15 @@ def make_query(self): np.array(unlabeled_pool, dtype=np.float64), self.svm_params) - p_val = [abs(float(val[0])) for val in p_val] - idx = int(np.argmax(p_val)) + scores = np.array([abs(float(val[0])) for val in p_val]) + return np.asarray(unlabeled_entry_ids), scores + + @inherit_docstring_from(QueryStrategy) + def make_query(self): + unlabeled_entry_ids, scores = self._get_scores() + + if len(unlabeled_entry_ids) == 0: + raise ValueError("No unlabeled samples available") + + idx = int(np.argmax(scores)) return unlabeled_entry_ids[idx] diff --git a/libact/query_strategies/information_density.py b/libact/query_strategies/information_density.py index ba8c8e02..a717dc78 100644 --- a/libact/query_strategies/information_density.py +++ b/libact/query_strategies/information_density.py @@ -197,7 +197,7 @@ def _get_scores(self): X_pool = np.asarray(X_pool) if len(unlabeled_entry_ids) == 0: - return [] + return np.array([], dtype=int), np.array([], dtype=float) uncertainty = self._uncertainty_scores(X_pool) # Ensure non-negative uncertainty (ContinuousModel predict_real can @@ -209,26 +209,22 @@ def _get_scores(self): scores = uncertainty * (density ** self.beta) - return list(zip(unlabeled_entry_ids, scores)) + return np.asarray(unlabeled_entry_ids), scores @inherit_docstring_from(QueryStrategy) def make_query(self, return_score=False): - dataset = self.dataset - unlabeled_entry_ids, _ = dataset.get_unlabeled_entries() + entry_ids, score_values = self._get_scores() - if len(unlabeled_entry_ids) == 0: + if len(entry_ids) == 0: raise ValueError("No unlabeled samples available") - scores = self._get_scores() - entry_ids, score_values = zip(*scores) - score_values = np.asarray(list(score_values)) - max_score = np.max(score_values) candidates = np.where(np.isclose(score_values, max_score))[0] selected_idx = self.random_state_.choice(candidates) if return_score: - return entry_ids[selected_idx], scores + return entry_ids[selected_idx], \ + list(zip(entry_ids, score_values)) else: return entry_ids[selected_idx] diff --git a/libact/query_strategies/query_by_committee.py b/libact/query_strategies/query_by_committee.py index ea8dadab..9dca0ff9 100644 --- a/libact/query_strategies/query_by_committee.py +++ b/libact/query_strategies/query_by_committee.py @@ -181,31 +181,47 @@ def update(self, entry_id, label): # Train each model with newly updated label. self.teach_students() - @inherit_docstring_from(QueryStrategy) - def make_query(self): + def _get_scores(self): + """Return disagreement scores for all unlabeled samples. + + Returns + ------- + entry_ids : np.ndarray, shape (n_unlabeled,) + Global entry IDs of unlabeled samples. + scores : np.ndarray, shape (n_unlabeled,) + Disagreement scores. Higher = more disagreement. + """ dataset = self.dataset unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries() + if len(unlabeled_entry_ids) == 0: + return np.array([], dtype=int), np.array([], dtype=float) + if self.disagreement == 'vote': - # Let the trained students vote for unlabeled data votes = np.zeros((len(X_pool), len(self.students))) for i, student in enumerate(self.students): votes[:, i] = student.predict(X_pool) - - vote_entropy = self._vote_disagreement(votes) - ask_idx = self.random_state_.choice( - np.where(np.isclose(vote_entropy, np.max(vote_entropy)))[0]) + scores = np.array(self._vote_disagreement(votes)) elif self.disagreement == 'kl_divergence': proba = [] for student in self.students: proba.append(student.predict_proba(X_pool)) proba = np.array(proba).transpose(1, 0, 2).astype(float) - - avg_kl = self._kl_divergence_disagreement(proba) - ask_idx = self.random_state_.choice( - np.where(np.isclose(avg_kl, np.max(avg_kl)))[0]) + scores = self._kl_divergence_disagreement(proba) else: raise ValueError("disagreement must be 'vote' or 'kl_divergence'") + return np.asarray(unlabeled_entry_ids), np.asarray(scores) + + @inherit_docstring_from(QueryStrategy) + def make_query(self): + unlabeled_entry_ids, scores = self._get_scores() + + if len(unlabeled_entry_ids) == 0: + raise ValueError("No unlabeled samples available") + + ask_idx = self.random_state_.choice( + np.where(np.isclose(scores, np.max(scores)))[0]) + return unlabeled_entry_ids[ask_idx] diff --git a/libact/query_strategies/quire.py b/libact/query_strategies/quire.py index 544baf1e..c556417f 100644 --- a/libact/query_strategies/quire.py +++ b/libact/query_strategies/quire.py @@ -108,12 +108,26 @@ def update(self, entry_id, label): self.Uindex.remove(entry_id) self.y[entry_id] = label - def make_query(self): + def _get_scores(self): + """Return QUIRE scores for all unlabeled samples. + + The original QUIRE uses min(eva) where lower is better. + Scores are negated so higher = more informative. + + Returns + ------- + entry_ids : np.ndarray, shape (n_unlabeled,) + Global entry IDs of unlabeled samples. + scores : np.ndarray, shape (n_unlabeled,) + Negated evaluation values. Higher = more informative. + """ L = self.L Lindex = self.Lindex Uindex = self.Uindex - query_index = -1 - min_eva = np.inf + + if len(Uindex) == 0: + return np.array([], dtype=int), np.array([], dtype=float) + y_labeled = np.array([label for label in self.y if label is not None]) det_Laa = np.linalg.det(L[np.ix_(Uindex, Uindex)]) # efficient computation of inv(Laa) @@ -123,11 +137,9 @@ def make_query(self): M1 = self.lmbda * np.eye(len(Uindex)) + self.K[np.ix_(Uindex, Uindex)] inv_Laa = M1 - M2 iList = list(range(len(Uindex))) - if len(iList) == 1: - return Uindex[0] + + all_eva = [] for i, each_index in enumerate(Uindex): - # go through all unlabeled instances and compute their evaluation - # values one by one Uindex_r = Uindex[:] Uindex_r.remove(each_index) iList_r = iList[:] @@ -147,8 +159,20 @@ def make_query(self): ) eva = L[each_index][each_index] - \ det_Laa / L[each_index][each_index] + 2 * np.abs(tmp) + all_eva.append(eva) + + # Negate so higher = better (original uses min) + scores = -np.array(all_eva) + return np.array(Uindex, dtype=int), scores + + def make_query(self): + entry_ids, scores = self._get_scores() + + if len(entry_ids) == 0: + raise ValueError("No unlabeled samples available") + + # Single unlabeled sample: return it directly + if len(entry_ids) == 1: + return entry_ids[0] - if eva < min_eva: - query_index = each_index - min_eva = eva - return query_index + return entry_ids[np.argmax(scores)] diff --git a/libact/query_strategies/random_sampling.py b/libact/query_strategies/random_sampling.py index 8feab8e2..5691c786 100644 --- a/libact/query_strategies/random_sampling.py +++ b/libact/query_strategies/random_sampling.py @@ -1,5 +1,7 @@ """Random Sampling """ +import numpy as np + from libact.base.interfaces import QueryStrategy from libact.utils import inherit_docstring_from, seed_random_state, zip @@ -42,6 +44,20 @@ def __init__(self, dataset, **kwargs): random_state = kwargs.pop('random_state', None) self.random_state_ = seed_random_state(random_state) + def _get_scores(self): + """Return uniform scores for all unlabeled samples. + + Returns + ------- + entry_ids : np.ndarray, shape (n_unlabeled,) + Global entry IDs of unlabeled samples. + scores : np.ndarray, shape (n_unlabeled,) + Uniform scores (all ones). + """ + unlabeled_entry_ids, _ = self.dataset.get_unlabeled_entries() + scores = np.ones(len(unlabeled_entry_ids), dtype=float) + return unlabeled_entry_ids, scores + @inherit_docstring_from(QueryStrategy) def make_query(self): dataset = self.dataset diff --git a/libact/query_strategies/tests/meson.build b/libact/query_strategies/tests/meson.build index 5c4378bd..8c6bf3b6 100644 --- a/libact/query_strategies/tests/meson.build +++ b/libact/query_strategies/tests/meson.build @@ -4,6 +4,7 @@ py_src = [ 'test_coreset.py', 'test_density_weighted_meta.py', 'test_epsilon_uncertainty_sampling.py', + 'test_get_scores.py', 'test_hintsvm.py', 'test_information_density.py', 'test_quire.py', diff --git a/libact/query_strategies/tests/test_bald.py b/libact/query_strategies/tests/test_bald.py index 28b539c5..2102ca49 100644 --- a/libact/query_strategies/tests/test_bald.py +++ b/libact/query_strategies/tests/test_bald.py @@ -81,11 +81,11 @@ def test_bald_score_computation(self): qs = BALD(trn_ds, models=models, random_state=42) # Get scores - scores = qs._get_scores() + entry_ids, scores = qs._get_scores() self.assertGreater(len(scores), 0) # All BALD scores should be non-negative (MI is non-negative) - for entry_id, score in scores: + for score in scores: self.assertGreaterEqual(score, -1e-10) # Allow small numerical errors def test_update_retrains_ensemble(self): diff --git a/libact/query_strategies/tests/test_coreset.py b/libact/query_strategies/tests/test_coreset.py index 6dc1ad22..209f32fb 100644 --- a/libact/query_strategies/tests/test_coreset.py +++ b/libact/query_strategies/tests/test_coreset.py @@ -154,20 +154,19 @@ def test_get_scores(self): trn_ds = init_dataset(self.X, self.y, n_labeled=4) qs = CoreSet(trn_ds, random_state=42) - scores = qs._get_scores() + entry_ids, scores = qs._get_scores() # Should have one score per unlabeled point unlabeled_ids = trn_ds.get_unlabeled_entries()[0] self.assertEqual(len(scores), len(unlabeled_ids)) # Scores should be non-negative - for entry_id, score in scores: + for score in scores: self.assertGreaterEqual(score, 0.0) # The farthest point should have the highest score - scores_dict = dict(scores) - max_id = max(scores_dict, key=scores_dict.get) - self.assertEqual(max_id, 9) # [5.0, 5.0] is farthest + max_idx = np.argmax(scores) + self.assertEqual(entry_ids[max_idx], 9) # [5.0, 5.0] is farthest if __name__ == '__main__': diff --git a/libact/query_strategies/tests/test_get_scores.py b/libact/query_strategies/tests/test_get_scores.py new file mode 100644 index 00000000..88e9547e --- /dev/null +++ b/libact/query_strategies/tests/test_get_scores.py @@ -0,0 +1,422 @@ +"""Tests for _get_scores() contract across all query strategies. + +Verifies that every strategy implementing _get_scores() returns a consistent +format: a tuple of two numpy arrays (entry_ids, scores). +""" +import unittest + +import numpy as np +from sklearn.linear_model import LogisticRegression + +from libact.base.dataset import Dataset +from libact.base.interfaces import QueryStrategy +from libact.models import SklearnProbaAdapter +from libact.query_strategies import ( + UncertaintySampling, + BALD, + CoreSet, + EpsilonUncertaintySampling, + InformationDensity, + DensityWeightedMeta, + QueryByCommittee, + QUIRE, + RandomSampling, + ActiveLearningByLearning, +) + +# Try importing C-extension strategies +try: + from libact.query_strategies import HintSVM + HAS_HINTSVM = True +except (ImportError, ModuleNotFoundError): + HAS_HINTSVM = False + +try: + from libact.query_strategies import VarianceReduction + HAS_VARIANCE_REDUCTION = True +except (ImportError, ModuleNotFoundError): + HAS_VARIANCE_REDUCTION = False + + +class TestGetScoresContract(unittest.TestCase): + """Verify _get_scores() contract across all strategies.""" + + def setUp(self): + np.random.seed(1126) + self.X = np.random.randn(30, 5) + self.y = np.random.choice([0, 1], size=30) + # First 10 labeled, rest unlabeled + y_partial = list(self.y[:10]) + [None] * 20 + self.dataset = Dataset(self.X, y_partial) + self.n_unlabeled = 20 + + def _make_dataset(self): + """Create a fresh dataset for strategies that need their own copy.""" + np.random.seed(1126) + X = np.random.randn(30, 5) + y = np.random.choice([0, 1], size=30) + y_partial = list(y[:10]) + [None] * 20 + return Dataset(X, y_partial) + + def _check_contract(self, qs): + """Verify the _get_scores return format contract.""" + result = qs._get_scores() + + # Must return a tuple of two elements + self.assertIsInstance(result, tuple) + self.assertEqual(len(result), 2) + + entry_ids, scores = result + + # Both must be numpy arrays + self.assertIsInstance(entry_ids, np.ndarray) + self.assertIsInstance(scores, np.ndarray) + + # Same length + self.assertEqual(len(entry_ids), len(scores)) + + # Length matches number of unlabeled samples + self.assertEqual(len(entry_ids), self.n_unlabeled) + + # entry_ids should be valid indices into the dataset + for eid in entry_ids: + self.assertTrue(0 <= eid < len(qs.dataset)) + # and they should be unlabeled + self.assertIsNone(qs.dataset[eid][1]) + + # scores should be finite + self.assertTrue(np.all(np.isfinite(scores))) + + # Consistency: make_query should return one of the entry_ids + ask_id = qs.make_query() + self.assertIn(ask_id, entry_ids) + + def test_uncertainty_sampling(self): + qs = UncertaintySampling( + self.dataset, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ) + ) + self._check_contract(qs) + + def test_uncertainty_sampling_sm(self): + qs = UncertaintySampling( + self.dataset, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ), + method='sm' + ) + self._check_contract(qs) + + def test_uncertainty_sampling_entropy(self): + qs = UncertaintySampling( + self.dataset, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ), + method='entropy' + ) + self._check_contract(qs) + + def test_bald(self): + qs = BALD( + self.dataset, + models=[ + SklearnProbaAdapter( + LogisticRegression(C=c, max_iter=200, solver='liblinear') + ) + for c in [0.01, 0.1, 1.0] + ], + random_state=42 + ) + self._check_contract(qs) + + def test_coreset(self): + qs = CoreSet(self.dataset, random_state=42) + self._check_contract(qs) + + def test_coreset_cosine(self): + # Use non-zero data for cosine metric + ds = self._make_dataset() + X_nonzero = np.abs(np.random.randn(30, 5)) + 0.1 + y_partial = list(self.y[:10]) + [None] * 20 + ds = Dataset(X_nonzero, y_partial) + qs = CoreSet(ds, metric='cosine', random_state=42) + result = qs._get_scores() + self.assertIsInstance(result, tuple) + self.assertEqual(len(result), 2) + self.assertIsInstance(result[0], np.ndarray) + self.assertIsInstance(result[1], np.ndarray) + + def test_epsilon_uncertainty_sampling(self): + qs = EpsilonUncertaintySampling( + self.dataset, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ), + epsilon=0.2, + random_state=42 + ) + self._check_contract(qs) + + def test_information_density(self): + qs = InformationDensity( + self.dataset, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ), + random_state=42 + ) + self._check_contract(qs) + + def test_density_weighted_meta(self): + base_qs = UncertaintySampling( + self.dataset, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ) + ) + qs = DensityWeightedMeta(self.dataset, base_qs, beta=1.0, + random_state=42) + self._check_contract(qs) + + def test_query_by_committee_vote(self): + qs = QueryByCommittee( + self.dataset, + models=[ + SklearnProbaAdapter( + LogisticRegression(C=c, max_iter=200, solver='liblinear') + ) + for c in [0.01, 0.1, 1.0] + ], + random_state=42 + ) + self._check_contract(qs) + + def test_query_by_committee_kl(self): + qs = QueryByCommittee( + self.dataset, + models=[ + SklearnProbaAdapter( + LogisticRegression(C=c, max_iter=200, solver='liblinear') + ) + for c in [0.01, 0.1, 1.0] + ], + disagreement='kl_divergence', + random_state=42 + ) + self._check_contract(qs) + + def test_quire(self): + qs = QUIRE(self.dataset) + result = qs._get_scores() + + self.assertIsInstance(result, tuple) + self.assertEqual(len(result), 2) + + entry_ids, scores = result + self.assertIsInstance(entry_ids, np.ndarray) + self.assertIsInstance(scores, np.ndarray) + self.assertEqual(len(entry_ids), len(scores)) + self.assertEqual(len(entry_ids), self.n_unlabeled) + self.assertTrue(np.all(np.isfinite(scores))) + + ask_id = qs.make_query() + self.assertIn(ask_id, entry_ids) + + def test_random_sampling(self): + qs = RandomSampling(self.dataset, random_state=42) + self._check_contract(qs) + # Random sampling should return uniform scores + entry_ids, scores = qs._get_scores() + self.assertTrue(np.allclose(scores, scores[0])) + self.assertTrue(np.allclose(scores, 1.0)) + + @unittest.skipUnless(HAS_HINTSVM, "HintSVM C extension not compiled") + def test_hintsvm(self): + qs = HintSVM(self.dataset, random_state=42) + self._check_contract(qs) + + @unittest.skipUnless(HAS_VARIANCE_REDUCTION, + "VarianceReduction C extension not compiled") + def test_variance_reduction_raises(self): + qs = VarianceReduction( + self.dataset, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ) + ) + with self.assertRaises(NotImplementedError): + qs._get_scores() + + def test_albl_raises(self): + """ALBL is a meta-strategy and does not implement _get_scores.""" + ds = self._make_dataset() + qs1 = UncertaintySampling( + ds, + model=SklearnProbaAdapter( + LogisticRegression(C=1., max_iter=200, solver='liblinear') + ) + ) + qs2 = UncertaintySampling( + ds, + model=SklearnProbaAdapter( + LogisticRegression(C=0.01, max_iter=200, solver='liblinear') + ), + method='entropy' + ) + albl = ActiveLearningByLearning( + ds, + query_strategies=[qs1, qs2], + T=20, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ), + random_state=42 + ) + with self.assertRaises(NotImplementedError): + albl._get_scores() + + +class TestGetScoresEmptyPool(unittest.TestCase): + """_get_scores on fully labeled dataset returns empty arrays.""" + + def test_uncertainty_sampling_empty(self): + np.random.seed(1126) + X = np.random.randn(10, 5) + y = np.random.choice([0, 1], size=10) + full_ds = Dataset(X, y) + qs = UncertaintySampling( + full_ds, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ) + ) + entry_ids, scores = qs._get_scores() + self.assertEqual(len(entry_ids), 0) + self.assertEqual(len(scores), 0) + self.assertIsInstance(entry_ids, np.ndarray) + self.assertIsInstance(scores, np.ndarray) + + def test_bald_empty(self): + np.random.seed(1126) + X = np.random.randn(10, 5) + y = np.random.choice([0, 1], size=10) + full_ds = Dataset(X, y) + qs = BALD( + full_ds, + models=[ + SklearnProbaAdapter( + LogisticRegression(C=c, max_iter=200, solver='liblinear') + ) + for c in [0.01, 0.1, 1.0] + ], + random_state=42 + ) + entry_ids, scores = qs._get_scores() + self.assertEqual(len(entry_ids), 0) + self.assertEqual(len(scores), 0) + self.assertIsInstance(entry_ids, np.ndarray) + self.assertIsInstance(scores, np.ndarray) + + def test_coreset_empty(self): + np.random.seed(1126) + X = np.random.randn(10, 5) + y = np.random.choice([0, 1], size=10) + full_ds = Dataset(X, y) + qs = CoreSet(full_ds, random_state=42) + entry_ids, scores = qs._get_scores() + self.assertEqual(len(entry_ids), 0) + self.assertEqual(len(scores), 0) + self.assertIsInstance(entry_ids, np.ndarray) + self.assertIsInstance(scores, np.ndarray) + + def test_random_sampling_empty(self): + np.random.seed(1126) + X = np.random.randn(10, 5) + y = np.random.choice([0, 1], size=10) + full_ds = Dataset(X, y) + qs = RandomSampling(full_ds, random_state=42) + entry_ids, scores = qs._get_scores() + self.assertEqual(len(entry_ids), 0) + self.assertEqual(len(scores), 0) + self.assertIsInstance(entry_ids, np.ndarray) + self.assertIsInstance(scores, np.ndarray) + + def test_density_weighted_meta_empty(self): + np.random.seed(1126) + X = np.random.randn(10, 5) + y = np.random.choice([0, 1], size=10) + full_ds = Dataset(X, y) + base_qs = UncertaintySampling( + full_ds, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ) + ) + qs = DensityWeightedMeta(full_ds, base_qs, beta=1.0, random_state=42) + entry_ids, scores = qs._get_scores() + self.assertEqual(len(entry_ids), 0) + self.assertEqual(len(scores), 0) + self.assertIsInstance(entry_ids, np.ndarray) + self.assertIsInstance(scores, np.ndarray) + + +class TestGetScoresReturnScore(unittest.TestCase): + """Verify return_score=True backward compatibility.""" + + def setUp(self): + np.random.seed(1126) + X = np.random.randn(30, 5) + y = np.random.choice([0, 1], size=30) + y_partial = list(y[:10]) + [None] * 20 + self.dataset = Dataset(X, y_partial) + + def test_uncertainty_sampling_return_score(self): + qs = UncertaintySampling( + self.dataset, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ) + ) + ask_id, score_list = qs.make_query(return_score=True) + self.assertIsInstance(ask_id, (int, np.integer)) + self.assertIsInstance(score_list, list) + # Each element should be a tuple of (id, score) + for item in score_list: + self.assertEqual(len(item), 2) + + def test_epsilon_us_return_score(self): + qs = EpsilonUncertaintySampling( + self.dataset, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ), + epsilon=0.2, + random_state=42 + ) + ask_id, score_list = qs.make_query(return_score=True) + self.assertIsInstance(ask_id, (int, np.integer)) + self.assertIsInstance(score_list, list) + for item in score_list: + self.assertEqual(len(item), 2) + + def test_information_density_return_score(self): + qs = InformationDensity( + self.dataset, + model=SklearnProbaAdapter( + LogisticRegression(max_iter=200, solver='liblinear') + ), + random_state=42 + ) + ask_id, score_list = qs.make_query(return_score=True) + self.assertIsInstance(ask_id, (int, np.integer)) + self.assertIsInstance(score_list, list) + for item in score_list: + self.assertEqual(len(item), 2) + + +if __name__ == '__main__': + unittest.main() diff --git a/libact/query_strategies/tests/test_information_density.py b/libact/query_strategies/tests/test_information_density.py index 958590c7..d7ddd00b 100644 --- a/libact/query_strategies/tests/test_information_density.py +++ b/libact/query_strategies/tests/test_information_density.py @@ -129,12 +129,10 @@ def test_beta_zero_equals_uncertainty(self): model = MockProbModel() qs = InformationDensity(trn_ds, model=model, beta=0.0, random_state=42) - scores = qs._get_scores() + entry_ids, score_values = qs._get_scores() # With beta=0, density^0 = 1 for all, so scores = uncertainty only # The first unlabeled point (most uncertain in MockProbModel) should score highest - entry_ids, score_values = zip(*scores) - score_values = list(score_values) max_idx = np.argmax(score_values) # First unlabeled has p=0.5 (max entropy) self.assertEqual(entry_ids[max_idx], 4) @@ -260,12 +258,12 @@ def test_get_scores(self): model = MockProbModel() qs = InformationDensity(trn_ds, model=model, random_state=42) - scores = qs._get_scores() + entry_ids, scores = qs._get_scores() unlabeled_ids = trn_ds.get_unlabeled_entries()[0] self.assertEqual(len(scores), len(unlabeled_ids)) # All scores should be non-negative - for entry_id, score in scores: + for score in scores: self.assertGreaterEqual(score, 0.0) def test_empty_pool_error(self): @@ -336,9 +334,9 @@ def predict_real(self, feature): qs = InformationDensity(trn_ds, model=model, method='lc', random_state=42) - scores = qs._get_scores() + entry_ids, scores = qs._get_scores() # All scores should be non-negative (uncertainty clamped to 0) - for entry_id, score in scores: + for score in scores: self.assertGreaterEqual(score, 0.0) def test_density_favors_dense_with_continuous_model(self): diff --git a/libact/query_strategies/uncertainty_sampling.py b/libact/query_strategies/uncertainty_sampling.py index 096fe934..ec6bd37f 100644 --- a/libact/query_strategies/uncertainty_sampling.py +++ b/libact/query_strategies/uncertainty_sampling.py @@ -100,6 +100,9 @@ def _get_scores(self): self.model.train(dataset) unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries() + if len(unlabeled_entry_ids) == 0: + return np.array([], dtype=int), np.array([], dtype=float) + if isinstance(self.model, ProbabilisticModel): dvalue = self.model.predict_proba(X_pool) elif isinstance(self.model, ContinuousModel): @@ -120,7 +123,7 @@ def _get_scores(self): score = np.sum(-dvalue * np.log(dvalue), axis=1) else: raise ValueError("method must be 'lc', 'sm', or 'entropy'") - return zip(unlabeled_entry_ids, score) + return np.asarray(unlabeled_entry_ids), np.asarray(score) def make_query(self, return_score=False): @@ -138,10 +141,9 @@ def make_query(self, return_score=False): Selection score of unlabled entries, the larger the better. """ - dataset = self.dataset - # unlabeled_entry_ids, _ = dataset.get_unlabeled_entries() - - unlabeled_entry_ids, scores = zip(*self._get_scores()) + unlabeled_entry_ids, scores = self._get_scores() + if len(scores) == 0: + raise ValueError("No unlabeled samples available") ask_id = np.argmax(scores) if return_score: diff --git a/libact/query_strategies/variance_reduction.py b/libact/query_strategies/variance_reduction.py index 2e83dbf1..e64d33aa 100644 --- a/libact/query_strategies/variance_reduction.py +++ b/libact/query_strategies/variance_reduction.py @@ -60,6 +60,20 @@ def __init__(self, *args, **kwargs): self.sigma = kwargs.pop('sigma', 1.0) self.n_jobs = kwargs.pop('n_jobs', 1) + def _get_scores(self): + """VarianceReduction does not support per-sample scoring. + + Raises + ------ + NotImplementedError + """ + raise NotImplementedError( + "VarianceReduction does not support _get_scores(). " + "Its computation is tightly coupled to the C extension. " + "Use make_query() directly, or wrap with a different strategy " + "for batch mode." + ) + @inherit_docstring_from(QueryStrategy) def make_query(self): Xlabeled, y = self.dataset.get_labeled_entries()