-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimage_processing.py
More file actions
282 lines (218 loc) · 10.4 KB
/
image_processing.py
File metadata and controls
282 lines (218 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""
Image Processing for VAS File Architect
Handles image grouping and cropping. Utilises a breadth-first search to chain corner
points together into clusters if they fall within a defined tolerance distance.
"""
import logging
from collections import defaultdict
from pathlib import Path
from PIL import Image
def extract_corners(filepath):
"""
Extract the top-left and bottom-right corners of an image's bounding box.
:param Path filepath: Path to the image file.
:return: Top-left coords, bottom-right coords, and directory of the image.
:rtype: tuple[tuple[int,int], tuple[int,int], Path] or None
:raise IOError: if unable to access filepath.
"""
try:
with Image.open(filepath) as img:
if img.mode != 'RGBA':
logging.info(f"Converting {filepath.stem} from {img.mode} to RGBA.")
img = img.convert('RGBA')
bbox = img.getbbox()
if bbox:
tl_corner = (bbox[0], bbox[1])
br_corner = (bbox[2], bbox[3])
directory = filepath.parent
logging.info(f"{filepath.stem}: tl: {tl_corner}, br: {br_corner}, dir: {directory}")
return tl_corner, br_corner, directory
logging.warning("No non-transparent area found in image.")
return None, None, None
except IOError as e:
logging.error(f"IOError while accessing {filepath}: {e}")
return None, None, None
except Exception as e:
logging.error(f"Unexpected error while processing file {filepath}: {e}")
return None, None, None
def is_within_tolerance(corner, other_corner, tolerance):
"""
Calculate distance between two points on a plane,
check whether the result falls within the tolerance distance.
:param tuple[int, int] corner: x,y coords of a corner.
:param tuple[int, int] other_corner: x,y coords of the other corner.
:param int tolerance: maximum distance in pixels to be considered neighbours.
:return: True if distance <= tolerance.
:rtype: bool
"""
distance = ((corner[0] - other_corner[0]) ** 2 + (corner[1] - other_corner[1]) ** 2) ** 0.5
return distance <= tolerance
def bfs_cluster_corners(corners, tolerance):
"""
Breadth-first search to label every corner point with a cluster ID.
Each point within tolerance is considered a neighbour and added to the queue.
This function is called separately for the lists of tl and br corners.
:param list of (tuple[int, int]) corners: list of x,y coords of corners.
:param int tolerance: maximum distance in pixels between neighbours.
:return: Mapped list of cluster labels to filepaths.
:rtype: list
"""
cluster_id = 0
clusters = [-1] * len(corners)
for i in range(len(corners)):
if clusters[i] != -1:
continue
queue = [i]
while queue:
current = queue.pop(0)
clusters[current] = cluster_id
for j, other_corner in enumerate(corners):
if clusters[j] == -1 and is_within_tolerance(corners[current], corners[j], tolerance):
queue.append(j)
cluster_id += 1
return clusters
def calculate_mbr_for_group(bboxes):
"""
Calculate the minimum bounding rectangle for a group of images.
:param list of tuple bboxes: Bounding boxes of all images in a group.
:return: min_x0, min_y0, max_x1, max_y1
:rtype: tuple[int, int, int, int]
"""
min_x0 = min(bbox[0][0] for bbox in bboxes)
min_y0 = min(bbox[0][1] for bbox in bboxes)
max_x1 = max(bbox[1][0] for bbox in bboxes)
max_y1 = max(bbox[1][1] for bbox in bboxes)
logging.info(f"Calculated MBR: {min_x0, min_y0, max_x1, max_y1}")
return min_x0, min_y0, max_x1, max_y1
def crop_image(filepath, bbox=None):
"""
Crop a given image based on the specified bounding box.
:param Path filepath: File path to the image.
:param tuple[int,int,int,int] bbox: Bounding box coordinates (x0, y0, x1, y1).
:return: Bbox coords and cropped image.
:rtype: tuple[tuple[int,int,int,int],Image.Image]
:raise IOError: if unable to access filepath.
:raise Exception:
"""
try:
with Image.open(filepath) as img:
if img.mode != 'RGBA':
logging.info(f"Converting {filepath.stem} from {img.mode} to RGBA for processing.")
img = img.convert('RGBA')
if not bbox:
bbox = img.getbbox()
if bbox:
cropped_img = img.crop(bbox)
logging.info(f"Image cropped: {filepath.stem}")
return bbox, cropped_img
else:
logging.warning(f"No non-transparent area found in {filepath}")
return None
except IOError as e:
logging.error(f"IOError while accessing {filepath}: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error while processing file {filepath}: {e}")
return None
def group_images_by_clusters(dir_filepaths, filepath_to_corners, tl_clusters, br_clusters):
"""
Assigns images to groups by combining the cluster IDs of top-left corners and bottom-right corners.
Images which share both tl and br clusters are grouped together.
:param list of Path dir_filepaths: File paths in a directory.
:param dict filepath_to_corners: Mapping file paths to their corners.
:param list tl_clusters: Cluster labels for top-left corners.
:param list br_clusters: Cluster labels for bottom-right corners.
:return: Dict of created groups
:rtype: defaultdict
"""
grouped_images = defaultdict(list)
logging.info("Grouping images as per clusters.")
for i, filepath in enumerate(dir_filepaths):
tl_cluster = tl_clusters[i]
br_cluster = br_clusters[i]
directory = filepath_to_corners[filepath][2]
grouped_images[(tl_cluster, br_cluster, directory)].append(filepath)
logging.info(f"{filepath.stem} added to group: ({tl_cluster},{br_cluster}).")
logging.info(f"Formed {len(grouped_images)} image groups.")
return grouped_images
def enforce_unique_name(base_name, existing_names):
"""
Enforce unique names for image masks.
:param str base_name: Original name for the image file.
:param set of str existing_names: Set of existing names.
:return: A unique name for the image mask.
:rtype: str
"""
if base_name not in existing_names:
return base_name
count = 1
new_name = f"{base_name}_{count}"
while new_name in existing_names:
count += 1
new_name = f"{base_name}_{count}"
logging.info(f"{base_name} relabelled as {new_name}")
return new_name
def process_all_images(filepaths):
"""
Extracts a list of all top-left corner coords, a list of all bottom-right corner coords, and performs a
breadth-first search on these to create groups of all corners which fall within a tolerance distance of another.
Images are grouped if they share both a top-left and bottom-right cluster ID.The minimum
bounding rectangle for each group is calculated, which is to replace the original bounding box of each
image in the group.
Images are cropped to their updated bbox and checked to ensure each name is unique.
:param list of Path filepaths: A list of file paths to the images.
:return: Tuple of all processed image data and all skipped image files.
:raise Exception:
"""
filepath_to_corners = {}
directory_to_filepaths = defaultdict(list)
for filepath in filepaths:
tl, br, directory = extract_corners(filepath)
if tl and br:
filepath_to_corners[filepath] = (tl, br, directory)
directory_to_filepaths[directory].append(filepath)
all_grouped_images = {}
tolerance = 10 # Maximum distance in pixels between neighbouring points. Lower this value for tighter clusters.
for directory, dir_filepaths in directory_to_filepaths.items():
logging.info(f"Processing: {directory.parts[-1]}")
try:
logging.info("Beginning BFS.")
tl_corners = [filepath_to_corners[fp][0] for fp in dir_filepaths]
br_corners = [filepath_to_corners[fp][1] for fp in dir_filepaths]
tl_clusters = bfs_cluster_corners(tl_corners, tolerance)
br_clusters = bfs_cluster_corners(br_corners, tolerance)
grouped_images = group_images_by_clusters(dir_filepaths, filepath_to_corners, tl_clusters, br_clusters)
all_grouped_images.update(grouped_images)
except Exception as e:
logging.error(f"Error processing images in directory {directory}: {e}")
logging.info("Completed image grouping. Updating bounding boxes.")
for group, group_filepaths in all_grouped_images.items():
try:
bboxes = [filepath_to_corners[filepath][:2] for filepath in group_filepaths]
mbr = calculate_mbr_for_group(bboxes)
for filepath in group_filepaths:
filepath_to_corners[filepath] = mbr
except Exception as e:
logging.error(f"Error updating bounding boxes for group {group}: {e}")
all_image_data, skipped_files = [], []
existing_names = set()
for filepath in filepaths:
try:
updated_bbox = filepath_to_corners.get(filepath, None)
if updated_bbox:
result = crop_image(filepath, updated_bbox)
if result:
bbox, cropped_img = result
unique_name = enforce_unique_name(filepath.stem, existing_names)
existing_names.add(unique_name)
filepath = filepath.with_stem(str(unique_name))
image_data = {'filepath': filepath, 'bbox': bbox, 'cropped_img': cropped_img}
all_image_data.append(image_data)
else:
logging.warning(f"Skipping image: {filepath.stem}")
skipped_files.append(filepath)
except Exception as e:
logging.error(f"Error processing image {filepath}: {e}")
skipped_files.append(filepath)
logging.info("Image processing completed.")
return all_image_data, skipped_files