-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodel_utils.py
More file actions
223 lines (181 loc) · 7.57 KB
/
model_utils.py
File metadata and controls
223 lines (181 loc) · 7.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import torch
import torch.nn as nn
from torchvision import transforms
import h5py
from pathlib import Path
import torch.nn.functional as F
LABEL_DICT = {
0: 'Acute leukemia',
1: 'MDS',
2: 'MDS/MPN',
3: 'MPN',
4: 'Lymphoma',
5: 'Plasma cell neoplasm',
6: 'Reactive changes',
7: 'Healthy'
}
def get_models(modelname, model_path=None, image_size=224, device='cpu'):
if modelname.lower() in ["dinobloom_s","dinobloom_b","dinobloom_l","dinobloom_g"]:
modelname_dict= {"dinobloom_s":"dinov2_vits14", "dinobloom_b":"dinov2_vitb14", "dinobloom_l":"dinov2_vitl14", "dinobloom_g":"dinov2_vitg14"}
modelname = modelname_dict[modelname]
model = _load_dinobloom_weights(model_path,modelname,image_size)
else:
raise ValueError(f"Model {modelname} not found")
model = model.to(device)
model.eval()
return model
def _load_dinobloom_weights(model_path, modelname, image_size=224):
input_dims = {
"dinov2_vits14": 384,
"dinov2_vitb14": 768,
"dinov2_vitl14": 1024,
"dinov2_vitg14": 1536,
}
model = torch.hub.load("facebookresearch/dinov2", modelname)
ckpt = torch.load(model_path, map_location="cpu")
new_state = {}
for k, v in ckpt["teacher"].items():
if "dino_head" in k or "ibot_head" in k:
continue
new_state[k.replace("backbone.", "")] = v
num_tokens = int(1 + (image_size / 14) ** 2)
model.pos_embed = nn.Parameter(torch.zeros(1, num_tokens, input_dims[modelname]))
model.load_state_dict(new_state, strict=True)
return model
def get_transforms(model_name,image_size=224):
if model_name.lower().replace("_reg","") in [
"dinobloom_s",
"dinobloom_b",
"dinobloom_l",
"dinobloom_g",
]:
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
size = image_size
else:
raise ValueError("Model name not found")
size=(size,size)
transforms_list = [transforms.Resize(size), transforms.ToTensor(), transforms.Normalize(mean=mean, std=std)]
preprocess_transforms = transforms.Compose(transforms_list)
return preprocess_transforms
def save_features(save_filename, embeddings, label, img_paths_list, output_ext='.h5'):
if output_ext == '.h5':
with h5py.File(save_filename, "w") as hf:
hf.create_dataset("features", data=embeddings)
hf.create_dataset("labels", data=label)
dt = h5py.string_dtype(encoding='utf-8')
hf.create_dataset("img_paths", data=[str(p) for p in img_paths_list], dtype=dt)
else:
torch.save({
"features": embeddings,
"labels": label,
"img_paths": img_paths_list
}, save_filename)
def load_embeddings(embedding_dir):
ext = Path(embedding_dir).suffix
if ext == '.h5':
features, label, img_paths = read_h5file(embedding_dir)
elif ext == '.pt':
features, label, img_paths = read_ptfile(embedding_dir)
else:
raise ValueError(f"Unknown embedding extension {ext}")
return features, label, img_paths # ✅ missing return
def read_h5file(file_name):
with h5py.File(file_name, 'r') as hf:
features = hf['features'][()] # numpy array
label = hf['labels'][()]
if "img_paths" in hf:
img_paths_raw = hf['img_paths'][()]
if isinstance(img_paths_raw[0], bytes):
img_paths = [p.decode('utf-8') for p in img_paths_raw]
else:
img_paths = [str(p) for p in img_paths_raw]
else:
img_paths = [f"idx_{i}" for i in range(len(features))]
return features, label, img_paths
def read_ptfile(file_name):
data = torch.load(file_name, map_location=torch.device("cpu"))
features = data['features']
label = data['labels']
# ✅ use same key naming convention for consistency
if 'img_paths' in data:
img_paths = data['img_paths']
elif 'image_paths' in data:
img_paths = data['image_paths']
else:
img_paths = [f"idx_{i}" for i in range(len(features))]
return features, label, img_paths
def compute_rollout_attention(all_layer_matrices, start_layer=0):
# adding residual consideration- code adapted from https://github.com/samiraabnar/attention_flow
num_tokens = all_layer_matrices[0].shape[1]
batch_size = all_layer_matrices[0].shape[0]
eye = torch.eye(num_tokens).expand(batch_size, num_tokens, num_tokens).to(all_layer_matrices[0].device)
all_layer_matrices = [all_layer_matrices[i] + eye for i in range(len(all_layer_matrices))]
matrices_aug = [all_layer_matrices[i] / all_layer_matrices[i].sum(dim=-1, keepdim=True)
for i in range(len(all_layer_matrices))]
joint_attention = matrices_aug[start_layer]
for i in range(start_layer+1, len(matrices_aug)):
joint_attention = matrices_aug[i].bmm(joint_attention)
return joint_attention
def generate_rollout(model, input,start_layer=0):
model(input)
blocks = model.transformer.layers
all_layer_attentions = []
for blk in blocks:
attn_heads = blk[0].fn.get_attention_map()
avg_heads = (attn_heads.sum(dim=1) / attn_heads.shape[1]).detach()
all_layer_attentions.append(avg_heads)
rollout = compute_rollout_attention(all_layer_attentions, start_layer=start_layer)
return rollout[:,0, 1:]
@torch.no_grad()
def _row_norm_with_residual(A, eps=1e-6):
# A: (B, N, N)
B, N, _ = A.shape
eye = torch.eye(N, device=A.device).expand(B, N, N)
A = (A + eye)/2
return A / (A.sum(dim=-1, keepdim=True) + eps)
def grad_rollout(model, x, y=None, class_idx=None, start_layer=0, use_relu=True, eps=1e-6):
"""
Returns CLS->token relevance: shape (B, N-1)
- If y is given: uses CrossEntropy loss to backprop.
- Else: uses class_idx (per-sample) or argmax(logits) if None.
"""
model.zero_grad()
model.eval() # disable dropout noise
with torch.enable_grad():
logits = model(x, register_hook=True)
if y is not None:
loss = F.cross_entropy(logits, y)
else:
# target specific class per sample or argmax
if class_idx is None:
idx = logits.argmax(dim=1)
else:
if isinstance(class_idx, (list, tuple)):
idx = torch.tensor(class_idx, device=logits.device, dtype=torch.long)
elif isinstance(class_idx, torch.Tensor):
idx = class_idx.to(logits.device)
else:
idx = torch.full((logits.size(0),), int(class_idx), device=logits.device, dtype=torch.long)
loss = logits[torch.arange(logits.size(0), device=logits.device), idx].sum()
loss.backward()
# collect per-layer attention and gradients
mats = []
for blk in model.model.transformer.layers:
attn = blk[0].fn.get_attention_map() # (B, H, N, N), detached in your saver
grad = blk[0].fn.get_attn_gradients() # (B, H, N, N), from the hook
# Two common ways to combine heads:
# (A) elementwise grad*attn then mean over heads (simple and effective)
A = (attn * grad).mean(dim=1) # (B, N, N)
# Grad-CAM style gating
if use_relu:
A[A < 0] = 0
# row-normalize with residual
A = _row_norm_with_residual(A, eps=eps)
mats.append(A)
# rollout from start_layer upward
joint = mats[start_layer]
for k in range(start_layer + 1, len(mats)):
joint = mats[k].bmm(joint)
# CLS -> tokens (exclude CLS column)
return joint[:, 0, 1:] # (B, N-1)