Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions libact/base/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,27 @@ def update(self, entry_id, label):
pass

def _get_scores(self):
"""Return the score used for making query, the larger the better. Read-only.
"""Return acquisition scores for all unlabeled samples.

No modification to the internal states.
Subclasses should override this method to enable batch mode queries
and score-based strategy composition.

Returns
-------
(ask_id, scores): list of tuple (int, float)
The index of the next unlabeled sample to be queried and the score assigned.
entry_ids : np.ndarray, shape (n_unlabeled,)
Global entry IDs of unlabeled samples.
scores : np.ndarray, shape (n_unlabeled,)
Acquisition scores. Higher = more informative.

Raises
------
NotImplementedError
If the strategy does not support per-sample scoring.
"""
pass
raise NotImplementedError(
f"{self.__class__.__name__} does not implement _get_scores(). "
"This is required for batch mode and score-based composition."
)

@abstractmethod
def make_query(self):
Expand Down
56 changes: 16 additions & 40 deletions libact/query_strategies/bald.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,52 +186,14 @@ def update(self, entry_id, label):
# Retrain ensemble with the new labeled data
self._train_ensemble()

@inherit_docstring_from(QueryStrategy)
def make_query(self):
dataset = self.dataset
unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries()
X_pool = np.asarray(X_pool)

if len(unlabeled_entry_ids) == 0:
raise ValueError("No unlabeled samples available")

# Get predictions from all models
all_proba = []
for model in self.models:
proba = model.predict_proba(X_pool)
all_proba.append(np.asarray(proba))

all_proba = np.array(all_proba) # shape: (n_models, n_samples, n_classes)

# Calculate BALD score: H[mean(P)] - mean(H[P])
# Mean probability across ensemble
mean_proba = np.mean(all_proba, axis=0) # shape: (n_samples, n_classes)

# Entropy of mean predictions (total uncertainty)
entropy_mean = self._entropy(mean_proba) # shape: (n_samples,)

# Mean entropy across models (expected data uncertainty)
entropies = np.array([self._entropy(p) for p in all_proba]) # shape: (n_models, n_samples)
mean_entropy = np.mean(entropies, axis=0) # shape: (n_samples,)

# BALD score = mutual information
bald_scores = entropy_mean - mean_entropy # shape: (n_samples,)

# Select sample with highest BALD score (break ties randomly)
max_score = np.max(bald_scores)
candidates = np.where(np.isclose(bald_scores, max_score))[0]
selected_idx = self.random_state_.choice(candidates)

return unlabeled_entry_ids[selected_idx]

def _get_scores(self):
"""Return BALD scores for all unlabeled samples."""
dataset = self.dataset
unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries()
X_pool = np.asarray(X_pool)

if len(unlabeled_entry_ids) == 0:
return []
return np.array([], dtype=int), np.array([], dtype=float)

# Get predictions from all models
all_proba = np.array([
Expand All @@ -245,4 +207,18 @@ def _get_scores(self):
mean_entropy = np.mean(entropies, axis=0)
bald_scores = entropy_mean - mean_entropy

return list(zip(unlabeled_entry_ids, bald_scores))
return np.asarray(unlabeled_entry_ids), bald_scores

@inherit_docstring_from(QueryStrategy)
def make_query(self):
unlabeled_entry_ids, bald_scores = self._get_scores()

if len(unlabeled_entry_ids) == 0:
raise ValueError("No unlabeled samples available")

# Select sample with highest BALD score (break ties randomly)
max_score = np.max(bald_scores)
candidates = np.where(np.isclose(bald_scores, max_score))[0]
selected_idx = self.random_state_.choice(candidates)

return unlabeled_entry_ids[selected_idx]
74 changes: 28 additions & 46 deletions libact/query_strategies/coreset.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,67 +79,30 @@ def __init__(self, dataset, **kwargs):
random_state = kwargs.pop('random_state', None)
self.random_state_ = seed_random_state(random_state)

@inherit_docstring_from(QueryStrategy)
def make_query(self):
dataset = self.dataset
unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries()
X_pool = np.asarray(X_pool)

if len(unlabeled_entry_ids) == 0:
raise ValueError("No unlabeled samples available")

# Get labeled data
labeled_entries = dataset.get_labeled_entries()
X_labeled = np.asarray(labeled_entries[0])

# Fallback to random if no labeled data
if len(X_labeled) == 0:
idx = self.random_state_.randint(0, len(unlabeled_entry_ids))
return unlabeled_entry_ids[idx]

# Transform features if transformer is provided
if self.transformer is not None:
X_pool_t = np.asarray(self.transformer.transform(X_pool))
X_labeled_t = np.asarray(self.transformer.transform(X_labeled))
else:
X_pool_t = X_pool
X_labeled_t = X_labeled

# Compute pairwise distances: (n_unlabeled, n_labeled)
dist_matrix = cdist(X_pool_t, X_labeled_t, metric=self.metric)

# For each unlabeled point, find minimum distance to any labeled point
min_distances = np.min(dist_matrix, axis=1)

# Select the unlabeled point with maximum min-distance (farthest)
max_dist = np.max(min_distances)
candidates = np.where(np.isclose(min_distances, max_dist))[0]
selected_idx = self.random_state_.choice(candidates)

return unlabeled_entry_ids[selected_idx]

def _get_scores(self):
"""Return min-distances to labeled set for all unlabeled samples.

Returns
-------
scores : list of (entry_id, score) tuples
Each score is the minimum distance from that unlabeled point
to any labeled point. Higher score means more informative.
entry_ids : np.ndarray, shape (n_unlabeled,)
Global entry IDs of unlabeled samples.
scores : np.ndarray, shape (n_unlabeled,)
Min-distance from each unlabeled point to any labeled point.
Higher score means more informative.
"""
dataset = self.dataset
unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries()
X_pool = np.asarray(X_pool)

if len(unlabeled_entry_ids) == 0:
return []
return np.array([], dtype=int), np.array([], dtype=float)

labeled_entries = dataset.get_labeled_entries()
X_labeled = np.asarray(labeled_entries[0])

if len(X_labeled) == 0:
return list(zip(unlabeled_entry_ids,
[float('inf')] * len(unlabeled_entry_ids)))
return np.asarray(unlabeled_entry_ids), \
np.full(len(unlabeled_entry_ids), float('inf'))

if self.transformer is not None:
X_pool_t = np.asarray(self.transformer.transform(X_pool))
Expand All @@ -151,4 +114,23 @@ def _get_scores(self):
dist_matrix = cdist(X_pool_t, X_labeled_t, metric=self.metric)
min_distances = np.min(dist_matrix, axis=1)

return list(zip(unlabeled_entry_ids, min_distances))
return np.asarray(unlabeled_entry_ids), min_distances

@inherit_docstring_from(QueryStrategy)
def make_query(self):
unlabeled_entry_ids, min_distances = self._get_scores()

if len(unlabeled_entry_ids) == 0:
raise ValueError("No unlabeled samples available")

# Fallback to random if no labeled data (scores are all inf)
if np.all(np.isinf(min_distances)):
idx = self.random_state_.randint(0, len(unlabeled_entry_ids))
return unlabeled_entry_ids[idx]

# Select the unlabeled point with maximum min-distance (farthest)
max_dist = np.max(min_distances)
candidates = np.where(np.isclose(min_distances, max_dist))[0]
selected_idx = self.random_state_.choice(candidates)

return unlabeled_entry_ids[selected_idx]
21 changes: 13 additions & 8 deletions libact/query_strategies/density_weighted_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,12 @@ def update(self, entry_id, label):
@inherit_docstring_from(QueryStrategy)
def _get_scores(self):
dataset = self.dataset
X, _ = zip(*dataset.data)
scores = self.base_query_strategy._get_scores()
_, X_pool = dataset.get_unlabeled_entries()
unlabeled_entry_ids, base_scores = zip(*scores)
X, _ = dataset.get_entries()
unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries()

if len(unlabeled_entry_ids) == 0:
return np.array([], dtype=int), np.array([], dtype=float)
_, base_scores = self.base_query_strategy._get_scores()

self.clustering_method.fit(X)
pool_cluster = self.clustering_method.predict(X_pool)
Expand All @@ -118,13 +120,16 @@ def _get_scores(self):
similarity = np.asarray(similarity)

scores = base_scores * similarity**self.beta
return zip(unlabeled_entry_ids, scores)
return np.asarray(unlabeled_entry_ids), np.asarray(scores)

@inherit_docstring_from(QueryStrategy)
def make_query(self):
dataset = self.dataset
unlabeled_entry_ids, scores = self._get_scores()

if len(unlabeled_entry_ids) == 0:
raise ValueError("No unlabeled samples available")

unlabeled_entry_ids, scores = zip(*self._get_scores())
ask_id = self.random_state_.choice(np.where(scores == np.max(scores))[0])
ask_id = self.random_state_.choice(
np.where(np.isclose(scores, np.max(scores)))[0])

return unlabeled_entry_ids[ask_id]
12 changes: 8 additions & 4 deletions libact/query_strategies/epsilon_uncertainty_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,21 @@ def _get_scores(self):

Returns
-------
scores : list of (entry_id, score) tuples
entry_ids : np.ndarray, shape (n_unlabeled,)
Global entry IDs of unlabeled samples.
scores : np.ndarray, shape (n_unlabeled,)
Uncertainty scores. Higher = more uncertain.
"""
dataset = self.dataset
self.model.train(dataset)
unlabeled_entry_ids, X_pool = dataset.get_unlabeled_entries()
X_pool = np.asarray(X_pool)

if len(unlabeled_entry_ids) == 0:
return []
return np.array([], dtype=int), np.array([], dtype=float)

scores = self._get_uncertainty_scores(X_pool)
return list(zip(unlabeled_entry_ids, scores))
return np.asarray(unlabeled_entry_ids), np.asarray(scores)

@inherit_docstring_from(QueryStrategy)
def make_query(self, return_score=False):
Expand All @@ -207,7 +210,8 @@ def make_query(self, return_score=False):
ask_id = unlabeled_entry_ids[selected_idx]

if return_score:
return ask_id, self._get_scores()
entry_ids, scores = self._get_scores()
return ask_id, list(zip(entry_ids, scores))
else:
return ask_id

Expand Down
29 changes: 25 additions & 4 deletions libact/query_strategies/hintsvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,22 @@ def __init__(self, *args, **kwargs):

self.svm_params['C'] = self.cl

@inherit_docstring_from(QueryStrategy)
def make_query(self):
def _get_scores(self):
"""Return absolute decision values for all unlabeled samples.

Returns
-------
entry_ids : np.ndarray, shape (n_unlabeled,)
Global entry IDs of unlabeled samples.
scores : np.ndarray, shape (n_unlabeled,)
Absolute decision values from HintSVM. Higher = more informative.
"""
dataset = self.dataset
unlabeled_entry_ids, unlabeled_pool = dataset.get_unlabeled_entries()

if len(unlabeled_entry_ids) == 0:
return np.array([], dtype=int), np.array([], dtype=float)

labeled_pool, y = dataset.get_labeled_entries()
if len(np.unique(y)) > 2:
raise ValueError("HintSVM query strategy support binary class "
Expand All @@ -155,6 +167,15 @@ def make_query(self):
np.array(unlabeled_pool, dtype=np.float64),
self.svm_params)

p_val = [abs(float(val[0])) for val in p_val]
idx = int(np.argmax(p_val))
scores = np.array([abs(float(val[0])) for val in p_val])
return np.asarray(unlabeled_entry_ids), scores

@inherit_docstring_from(QueryStrategy)
def make_query(self):
unlabeled_entry_ids, scores = self._get_scores()

if len(unlabeled_entry_ids) == 0:
raise ValueError("No unlabeled samples available")

idx = int(np.argmax(scores))
return unlabeled_entry_ids[idx]
16 changes: 6 additions & 10 deletions libact/query_strategies/information_density.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def _get_scores(self):
X_pool = np.asarray(X_pool)

if len(unlabeled_entry_ids) == 0:
return []
return np.array([], dtype=int), np.array([], dtype=float)

uncertainty = self._uncertainty_scores(X_pool)
# Ensure non-negative uncertainty (ContinuousModel predict_real can
Expand All @@ -209,26 +209,22 @@ def _get_scores(self):

scores = uncertainty * (density ** self.beta)

return list(zip(unlabeled_entry_ids, scores))
return np.asarray(unlabeled_entry_ids), scores

@inherit_docstring_from(QueryStrategy)
def make_query(self, return_score=False):
dataset = self.dataset
unlabeled_entry_ids, _ = dataset.get_unlabeled_entries()
entry_ids, score_values = self._get_scores()

if len(unlabeled_entry_ids) == 0:
if len(entry_ids) == 0:
raise ValueError("No unlabeled samples available")

scores = self._get_scores()
entry_ids, score_values = zip(*scores)
score_values = np.asarray(list(score_values))

max_score = np.max(score_values)
candidates = np.where(np.isclose(score_values, max_score))[0]
selected_idx = self.random_state_.choice(candidates)

if return_score:
return entry_ids[selected_idx], scores
return entry_ids[selected_idx], \
list(zip(entry_ids, score_values))
else:
return entry_ids[selected_idx]

Expand Down
Loading
Loading