-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_idem.py
More file actions
2936 lines (2447 loc) · 132 KB
/
test_idem.py
File metadata and controls
2936 lines (2447 loc) · 132 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Tests for idem.py"""
import csv
import io
import os
import subprocess
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from PIL import Image
import imagehash
import idem as idem_module
from idem import (
CACHE_FIELDS,
CACHE_FILENAME,
DB_DIR,
N_VIDEO_FRAMES,
VCACHE_FIELDS,
VCACHE_FILENAME,
_FULL_HASH_THRESHOLD,
_TS_TOLERANCE,
_collect_stale,
_file_checksum,
_folder_score,
_get_video_duration,
_name_score,
_open_vcache_for_append,
_resolve_transform,
_smart_defaults,
_valid_hex,
_video_distance,
build_exact_index,
build_hashes,
build_video_hashes,
compute_video_hashes,
ffmpeg_available,
fmt_size,
group_duplicates,
group_exact_duplicates,
group_video_duplicates,
load_cache,
load_vcache,
open_cache_for_append,
parse_size,
path_without_drive,
save_cache,
save_vcache,
scan_files,
)
# ── Helpers ────────────────────────────────────────────────────────────────────
def make_image(color="red", size=(32, 32), fmt="JPEG") -> bytes:
buf = io.BytesIO()
Image.new("RGB", size, color).save(buf, format=fmt)
return buf.getvalue()
def write_image(path: Path, color="red", size=(32, 32)) -> Path:
path.write_bytes(make_image(color=color, size=size))
return path
def fake_hash(hex_str: str) -> imagehash.ImageHash:
"""Create an ImageHash directly from a hex string (no image I/O needed)."""
return imagehash.hex_to_hash(hex_str)
def fake_entry(phash_hex: str, size: int = 100) -> tuple:
"""Return a (phash_int, dhash_int, size) tuple as produced by build_hashes.
dhash is set to 0 so the secondary filter in group_duplicates always passes,
keeping unit tests focused on phash-based grouping logic.
"""
return (int(phash_hex, 16), 0, size)
# ── path_without_drive ─────────────────────────────────────────────────────────
class TestPathWithoutDrive:
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific")
def test_strips_drive_letter(self):
"""The 'C:' prefix is removed, leaving the rest of the path intact."""
assert path_without_drive("C:\\Users\\foo\\bar.jpg") == "\\Users\\foo\\bar.jpg"
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific")
def test_strips_various_drive_letters(self):
"""Drive-letter stripping works for any letter (D:, Z:, etc.), not just C:."""
assert path_without_drive("D:\\photos\\beach.jpg") == "\\photos\\beach.jpg"
assert path_without_drive("Z:\\archive\\img.png") == "\\archive\\img.png"
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific")
def test_already_driveless_unchanged(self):
"""A path that already has no drive letter is returned unchanged."""
p = "\\Users\\foo\\bar.jpg"
assert path_without_drive(p) == p
@pytest.mark.skipif(sys.platform == "win32", reason="Non-Windows specific")
def test_unix_path_unchanged(self):
"""Unix-style paths have no drive letter and are returned as-is."""
p = "/Users/foo/bar.jpg"
assert path_without_drive(p) == p
def test_idempotent(self, tmp_path):
"""Applying twice gives the same result as applying once."""
p = str(tmp_path / "photo.jpg")
assert path_without_drive(path_without_drive(p)) == path_without_drive(p)
def test_result_has_no_drive_letter(self, tmp_path):
"""Works on both platforms — result never has X: prefix."""
result = path_without_drive(str(tmp_path / "photo.jpg"))
assert not (len(result) >= 2 and result[1] == ":")
# ── fmt_size ───────────────────────────────────────────────────────────────────
class TestFmtSize:
def test_bytes(self):
"""Values below 1 KB are formatted with a 'B' suffix."""
assert "B" in fmt_size(512)
def test_kilobytes(self):
"""Values in the KB range are formatted with a 'KB' suffix."""
assert "KB" in fmt_size(2 * 1024)
def test_megabytes(self):
"""Values in the MB range are formatted with a 'MB' suffix."""
assert "MB" in fmt_size(5 * 1024 * 1024)
def test_gigabytes(self):
"""Values in the GB range are formatted with a 'GB' suffix."""
assert "GB" in fmt_size(3 * 1024 ** 3)
# ── load_cache / save_cache ────────────────────────────────────────────────────
class TestCacheIO:
def test_missing_file_returns_empty(self, tmp_path):
"""A non-existent cache file returns an empty dict rather than raising."""
assert load_cache(str(tmp_path / "nonexistent.csv")) == {}
def test_round_trip_preserves_data(self, tmp_path):
"""Data saved with save_cache and reloaded with load_cache is identical."""
cache_path = str(tmp_path / "cache.csv")
original = {
"\\photos\\beach.jpg": {"size": 1000, "mtime": 1708531200.0, "phash": "f8c8e4e2c4e4e8f0", "dhash": ""},
"\\photos\\other.png": {"size": 2000, "mtime": 1708531300.5, "phash": "a0b0c0d0e0f0a0b0", "dhash": ""},
}
save_cache(cache_path, original)
loaded = load_cache(cache_path)
assert loaded == original
def test_load_normalizes_types(self, tmp_path):
"""size must be int, mtime must be float after loading."""
cache_path = str(tmp_path / "cache.csv")
with open(cache_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=CACHE_FIELDS)
writer.writeheader()
writer.writerow({"path": "\\photos\\a.jpg", "size": "1234",
"mtime": "1708531200.5", "phash": "0000000000000000"})
entry = load_cache(cache_path)["\\photos\\a.jpg"]
assert isinstance(entry["size"], int)
assert isinstance(entry["mtime"], float)
@pytest.mark.skipif(sys.platform != "win32", reason="Drive letters only on Windows")
def test_save_writes_keys_as_is(self, tmp_path):
"""save_cache writes keys verbatim; callers normalise via path_without_drive."""
cache_path = str(tmp_path / "cache.csv")
# Keys are always drive-stripped before insertion (see build_hashes /
# build_exact_index), so save_cache receives a driveless key here.
cache = {"\\photos\\beach.jpg": {"size": 1000, "mtime": 1.0, "phash": "0" * 16}}
save_cache(cache_path, cache)
with open(cache_path, newline="", encoding="utf-8") as f:
rows = list(csv.DictReader(f))
assert len(rows) == 1
assert rows[0]["path"] == "\\photos\\beach.jpg"
@pytest.mark.skipif(sys.platform != "win32", reason="Drive letters only on Windows")
def test_load_strips_drive_letter_from_csv(self, tmp_path):
"""Drive letters already in the CSV are stripped on read (migration safety)."""
cache_path = str(tmp_path / "cache.csv")
with open(cache_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=CACHE_FIELDS)
writer.writeheader()
writer.writerow({"path": "C:\\photos\\beach.jpg", "size": 1000,
"mtime": 1.0, "phash": "0" * 16})
cache = load_cache(cache_path)
assert "C:\\photos\\beach.jpg" not in cache
key = next(iter(cache))
assert not key.startswith("C:")
assert "beach.jpg" in key
def test_corrupt_file_returns_empty(self, tmp_path, capsys):
"""A file with invalid CSV content returns an empty dict and does not raise."""
cache_path = str(tmp_path / "cache.csv")
Path(cache_path).write_text("not_a_field\ngarbage_row", encoding="utf-8")
# Should not raise; returns empty dict and prints a warning
cache = load_cache(cache_path)
assert isinstance(cache, dict)
# ── scan_files ─────────────────────────────────────────────────────────────────
class TestScanFiles:
def test_finds_common_image_extensions(self, tmp_path):
"""All common raster image formats (.jpg, .png, .gif, etc.) are discovered."""
for ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif", ".webp"]:
(tmp_path / f"photo{ext}").write_bytes(b"x")
found = {Path(f).suffix.lower() for f in scan_files(str(tmp_path))}
assert found == {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif", ".webp"}
def test_finds_heic_heif(self, tmp_path):
"""HEIC and HEIF (Apple mobile) formats are also discovered."""
(tmp_path / "photo.heic").write_bytes(b"x")
(tmp_path / "photo.heif").write_bytes(b"x")
assert len(scan_files(str(tmp_path))) == 2
def test_ignores_raw_extensions(self, tmp_path):
"""Raw camera formats (.cr2, .nef, etc.) are excluded from scan results."""
for ext in [".cr2", ".nef", ".arw", ".dng", ".orf", ".rw2"]:
(tmp_path / f"raw{ext}").write_bytes(b"x")
assert scan_files(str(tmp_path)) == []
def test_ignores_video_extensions(self, tmp_path):
"""Video files (.mp4, .mov, etc.) are excluded from image scan results."""
for ext in [".mp4", ".mov", ".avi", ".mkv", ".m4v"]:
(tmp_path / f"video{ext}").write_bytes(b"x")
assert scan_files(str(tmp_path)) == []
def test_ignores_non_media_files(self, tmp_path):
"""Non-media files (.txt, .csv) are excluded from scan results."""
(tmp_path / "readme.txt").write_bytes(b"x")
(tmp_path / "data.csv").write_bytes(b"x")
assert scan_files(str(tmp_path)) == []
def test_recursive_discovery(self, tmp_path):
"""Files in nested subdirectories are found alongside top-level files."""
sub = tmp_path / "a" / "b"
sub.mkdir(parents=True)
(tmp_path / "top.jpg").write_bytes(b"x")
(sub / "deep.jpg").write_bytes(b"x")
assert len(scan_files(str(tmp_path))) == 2
def test_case_insensitive_extension(self, tmp_path):
"""Extension matching is case-insensitive (.JPG is found just like .jpg)."""
(tmp_path / "photo.JPG").write_bytes(b"x")
(tmp_path / "photo.PNG").write_bytes(b"x")
assert len(scan_files(str(tmp_path))) == 2
def test_returns_sorted(self, tmp_path):
"""The returned file list is in alphabetical order."""
for name in ["c.jpg", "a.jpg", "b.jpg"]:
(tmp_path / name).write_bytes(b"x")
found = scan_files(str(tmp_path))
assert found == sorted(found)
def test_returns_absolute_paths(self, tmp_path):
"""Every returned path is an absolute filesystem path."""
(tmp_path / "photo.jpg").write_bytes(b"x")
found = scan_files(str(tmp_path))
assert all(os.path.isabs(p) for p in found)
# ── build_hashes ───────────────────────────────────────────────────────────────
class TestBuildHashes:
def test_computes_hash_for_new_file(self, tmp_path):
"""A file not in the cache is hashed and counted as new."""
p = write_image(tmp_path / "photo.jpg")
cache = {}
hashes, new_c, rehashed_c, errors = build_hashes([str(p)], cache)
assert len(hashes) == 1
assert new_c == 1 and rehashed_c == 0
assert errors == 0
def test_cache_key_is_driveless(self, tmp_path):
"""Cache keys are stored without drive letters for cross-platform consistency."""
p = write_image(tmp_path / "photo.jpg")
cache = {}
build_hashes([str(p)], cache)
for key in cache:
assert not (len(key) >= 2 and key[1] == ":"), f"Drive letter found in key: {key!r}"
def test_cache_hit_skips_recompute(self, tmp_path):
"""A file with unchanged size/mtime is served from cache with no recompute."""
p = write_image(tmp_path / "photo.jpg")
cache = {}
hashes1, updated1, _, _ = build_hashes([str(p)], cache)
assert updated1 == 1
hashes2, updated2, _, _ = build_hashes([str(p)], cache)
assert updated2 == 0
assert hashes1[str(p)] == hashes2[str(p)]
def test_stale_size_triggers_recompute(self, tmp_path):
"""A file that has grown in size is recomputed and counted as re-hashed."""
p = write_image(tmp_path / "photo.jpg", size=(32, 32))
cache = {}
build_hashes([str(p)], cache)
# Overwrite with a larger image (different size)
write_image(p, color="blue", size=(64, 64))
_, _, updated, _ = build_hashes([str(p)], cache)
assert updated == 1
def test_stale_mtime_triggers_recompute(self, tmp_path):
"""A file whose cached mtime differs beyond tolerance is recomputed."""
p = write_image(tmp_path / "photo.jpg")
cache = {}
build_hashes([str(p)], cache)
# Manually corrupt the cached mtime to force staleness
key = path_without_drive(str(p))
cache[key]["mtime"] -= _TS_TOLERANCE + 10
_, _, updated, _ = build_hashes([str(p)], cache)
assert updated == 1
def test_non_image_content_counted_as_error(self, tmp_path):
"""A file that cannot be opened as an image is counted as an error, not re-hashed."""
p = tmp_path / "corrupt.jpg"
p.write_bytes(b"this is not an image")
cache = {}
_, updated, _, errors = build_hashes([str(p)], cache)
assert errors == 1
assert updated == 0
def test_multiple_files(self, tmp_path):
"""Multiple files are all hashed in a single call, each counted independently."""
files = [write_image(tmp_path / f"photo{i}.jpg", color=("red", "green", "blue")[i])
for i in range(3)]
cache = {}
hashes, updated, _, errors = build_hashes([str(f) for f in files], cache)
assert len(hashes) == 3
assert updated == 3
assert errors == 0
def test_cache_populated_after_run(self, tmp_path):
"""After hashing, the cache contains size, mtime, and phash for each file."""
p = write_image(tmp_path / "photo.jpg")
cache = {}
build_hashes([str(p)], cache)
key = path_without_drive(str(p))
assert key in cache
assert "phash" in cache[key]
assert "size" in cache[key]
assert "mtime" in cache[key]
# ── group_duplicates ───────────────────────────────────────────────────────────
class TestGroupDuplicates:
"""Uses fake ImageHash objects to test grouping logic without image I/O."""
def _files(self, tmp_path, *names, size=100):
"""Create dummy files (content doesn't matter for grouping logic)."""
paths = []
for name in names:
p = tmp_path / name
p.write_bytes(b"x" * size)
paths.append(str(p))
return paths
def test_empty_input(self):
"""An empty hash dict returns no duplicate groups."""
assert group_duplicates({}, threshold=10) == []
def test_single_file_not_grouped(self, tmp_path):
"""A single file cannot form a duplicate group."""
(p,) = self._files(tmp_path, "a.jpg")
assert group_duplicates({p: fake_entry("0" * 16)}, threshold=10) == []
def test_identical_hashes_grouped(self, tmp_path):
"""Two files with the same hash are placed in one duplicate group."""
p1, p2 = self._files(tmp_path, "a.jpg", "b.jpg")
h = fake_entry("0" * 16)
groups = group_duplicates({p1: h, p2: h}, threshold=0)
assert len(groups) == 1
assert {p for p, _ in groups[0]} == {p1, p2}
def test_distant_hashes_not_grouped(self, tmp_path):
"""Files with maximum Hamming distance (all bits differ) are not grouped."""
p1, p2 = self._files(tmp_path, "a.jpg", "b.jpg")
# Max Hamming distance = 64
groups = group_duplicates(
{p1: fake_entry("0" * 16), p2: fake_entry("f" * 16)},
threshold=10,
)
assert groups == []
def test_near_duplicate_within_threshold(self, tmp_path):
"""Files within the Hamming distance threshold are grouped as near-duplicates."""
p1, p2 = self._files(tmp_path, "a.jpg", "b.jpg")
# "000000000000000f" vs "0000000000000000": 4 bits differ → distance = 4
groups = group_duplicates(
{p1: fake_entry("000000000000000f"), p2: fake_entry("0000000000000000")},
threshold=5,
)
assert len(groups) == 1
def test_near_duplicate_outside_threshold_not_grouped(self, tmp_path):
"""Files whose Hamming distance exceeds the threshold are not grouped."""
p1, p2 = self._files(tmp_path, "a.jpg", "b.jpg")
# "000000000000000f" vs "0000000000000000": distance = 4
groups = group_duplicates(
{p1: fake_entry("000000000000000f"), p2: fake_entry("0000000000000000")},
threshold=3,
)
assert groups == []
def test_hamming_distance_exact_boundary(self, tmp_path):
"""bit_count() Hamming distance is computed correctly at the threshold boundary."""
p1, p2 = self._files(tmp_path, "a.jpg", "b.jpg")
# 0x000000000000000f has exactly 4 set bits → Hamming distance 4 from 0x0
ph_4bits = 0x000000000000000f
assert (ph_4bits ^ 0).bit_count() == 4 # verify the bit pattern itself
hashes = {p1: (ph_4bits, 0, 100), p2: (0, 0, 100)}
assert group_duplicates(hashes, threshold=4) != [] # 4 <= 4: grouped
assert group_duplicates(hashes, threshold=3) == [] # 4 > 3: not grouped
def test_within_group_sorted_largest_first(self, tmp_path):
"""Within a duplicate group, the largest file (by byte size) appears first."""
large, small = self._files(tmp_path, "large.jpg", "small.jpg")
h_large = fake_entry("0" * 16, size=1000)
h_small = fake_entry("0" * 16, size=100)
groups = group_duplicates({large: h_large, small: h_small}, threshold=0)
assert len(groups) == 1
assert groups[0][0][0] == large # largest first
assert groups[0][0][1] == 1000
assert groups[0][1][1] == 100
def test_multiple_groups_detected(self, tmp_path):
"""Two independent clusters of similar images each produce a separate group."""
p1, p2 = self._files(tmp_path, "a1.jpg", "a2.jpg")
p3, p4 = self._files(tmp_path, "b1.jpg", "b2.jpg")
ha = fake_entry("0" * 16)
hb = fake_entry("f" * 16)
groups = group_duplicates({p1: ha, p2: ha, p3: hb, p4: hb}, threshold=0)
assert len(groups) == 2
def test_groups_sorted_most_files_first(self, tmp_path):
"""Groups with more files appear before groups with fewer files."""
trio = self._files(tmp_path, "a1.jpg", "a2.jpg", "a3.jpg")
pair = self._files(tmp_path, "b1.jpg", "b2.jpg")
ha = fake_entry("0" * 16)
hb = fake_entry("f" * 16)
hashes = {p: ha for p in trio} | {p: hb for p in pair}
groups = group_duplicates(hashes, threshold=0)
assert len(groups) == 2
assert len(groups[0]) == 3 # trio comes first
assert len(groups[1]) == 2
def test_no_file_appears_in_two_groups(self, tmp_path):
"""Each file is claimed by at most one group."""
files = self._files(tmp_path, *[f"f{i}.jpg" for i in range(6)])
# All files share the same hash → one big group
h = fake_entry("0" * 16)
hashes = {p: h for p in files}
groups = group_duplicates(hashes, threshold=0)
all_paths = [p for g in groups for p, _ in g]
assert len(all_paths) == len(set(all_paths))
# ── Cache persistence across runs ──────────────────────────────────────────────
class TestCachePersistence:
def test_second_run_hits_cache(self, tmp_path):
"""On the second call with the same cache, nothing is recomputed."""
p = write_image(tmp_path / "photo.jpg")
cache_path = str(tmp_path / CACHE_FIELDS[0]) # reuse tmp_path cleanly
cache_path = str(tmp_path / "cache.csv")
cache = {}
_, updated1, _, _ = build_hashes([str(p)], cache)
save_cache(cache_path, cache)
assert updated1 == 1
cache2 = load_cache(cache_path)
_, updated2, _, _ = build_hashes([str(p)], cache2)
assert updated2 == 0
def test_saved_cache_has_no_drive_letters(self, tmp_path):
"""After save_cache, the CSV path column contains no drive letters."""
p = write_image(tmp_path / "photo.jpg")
cache_path = str(tmp_path / "cache.csv")
cache = {}
build_hashes([str(p)], cache)
save_cache(cache_path, cache)
with open(cache_path, newline="", encoding="utf-8") as f:
rows = list(csv.DictReader(f))
for row in rows:
assert not (len(row["path"]) >= 2 and row["path"][1] == ":"), \
f"Drive letter found in saved path: {row['path']!r}"
def test_hash_survives_round_trip(self, tmp_path):
"""Hash loaded from cache matches the originally computed hash."""
p = write_image(tmp_path / "photo.jpg")
cache_path = str(tmp_path / "cache.csv")
cache1 = {}
hashes1, _, _, _ = build_hashes([str(p)], cache1)
save_cache(cache_path, cache1)
cache2 = load_cache(cache_path)
hashes2, _, _, _ = build_hashes([str(p)], cache2)
assert hashes1[str(p)] == hashes2[str(p)]
def test_prune_removes_missing_files(self, tmp_path):
"""Cache entries for files no longer on disk are pruned."""
keep = write_image(tmp_path / "keep.jpg", color="red")
drop = write_image(tmp_path / "drop.jpg", color="blue")
cache = {}
build_hashes([str(keep), str(drop)], cache)
assert len(cache) == 2
# Simulate next run: only 'keep' is present
all_files = [str(keep)]
live_keys = {path_without_drive(p) for p in all_files}
gone = [k for k in cache if k not in live_keys]
for k in gone:
del cache[k]
assert len(cache) == 1
assert all("keep.jpg" in k for k in cache)
def test_modified_file_recomputed_after_cache_reload(self, tmp_path):
"""After save/load cycle, a modified file is detected as stale."""
p = write_image(tmp_path / "photo.jpg", color="red", size=(32, 32))
cache_path = str(tmp_path / "cache.csv")
cache = {}
build_hashes([str(p)], cache)
save_cache(cache_path, cache)
# Modify the file between runs
write_image(p, color="blue", size=(64, 64))
cache2 = load_cache(cache_path)
_, _, updated, _ = build_hashes([str(p)], cache2)
assert updated == 1
# ── End-to-end: duplicate detection on real images ─────────────────────────────
class TestEndToEnd:
def test_same_image_different_sizes_detected(self, tmp_path):
"""Same visual content at two resolutions → reported as duplicates."""
large = write_image(tmp_path / "large.jpg", color="red", size=(64, 64))
small = write_image(tmp_path / "small.jpg", color="red", size=(32, 32))
cache = {}
hashes, _, _, _ = build_hashes([str(large), str(small)], cache)
groups = group_duplicates(hashes, threshold=10)
assert len(groups) == 1
paths = {p for p, _ in groups[0]}
assert str(large) in paths
assert str(small) in paths
def test_different_images_not_grouped(self, tmp_path):
"""Visually distinct images should not end up in the same group."""
# Use a real image vs a checkerboard for maximum DCT difference
solid = write_image(tmp_path / "solid.jpg", color="white", size=(64, 64))
checker = Image.new("RGB", (64, 64))
px = checker.load()
for y in range(64):
for x in range(64):
px[x, y] = (0, 0, 0) if (x // 8 + y // 8) % 2 == 0 else (255, 255, 255)
checker_path = tmp_path / "checker.jpg"
checker.save(str(checker_path), format="JPEG")
cache = {}
hashes, _, _, _ = build_hashes([str(solid), str(checker_path)], cache)
ph_solid = hashes[str(solid)][0]
ph_checker = hashes[str(checker_path)][0]
dist = (ph_solid ^ ph_checker).bit_count()
# Only assert grouping for threshold below actual distance
if dist > 0:
groups = group_duplicates(hashes, threshold=dist - 1)
assert groups == []
# ── Incremental cache writes ────────────────────────────────────────────────────
class TestIncrementalCache:
"""Verify that hashes are flushed to disk as each file is processed."""
def test_open_cache_for_append_creates_file_with_header(self, tmp_path):
"""Opening a new cache file for append creates it and writes the CSV header row."""
cache_path = str(tmp_path / "cache.csv")
f = open_cache_for_append(cache_path)
f.close()
assert Path(cache_path).exists()
with open(cache_path, newline="", encoding="utf-8") as f:
header = next(csv.reader(f))
assert header == CACHE_FIELDS
def test_open_cache_for_append_no_duplicate_header(self, tmp_path):
"""Opening an existing file a second time must not write a second header."""
cache_path = str(tmp_path / "cache.csv")
f = open_cache_for_append(cache_path)
f.close()
f = open_cache_for_append(cache_path)
f.close()
with open(cache_path, newline="", encoding="utf-8") as f:
rows = list(csv.reader(f))
# Only the header row, no duplicate
assert rows == [CACHE_FIELDS]
def test_entries_written_during_build_hashes(self, tmp_path):
"""Each newly computed hash appears in the file before build_hashes returns."""
imgs = [write_image(tmp_path / f"p{i}.jpg", color=("red","green","blue")[i])
for i in range(3)]
cache_path = str(tmp_path / "cache.csv")
cache = {}
cache_out = open_cache_for_append(cache_path)
try:
build_hashes([str(p) for p in imgs], cache, cache_out)
finally:
cache_out.close()
loaded = load_cache(cache_path)
assert len(loaded) == 3
def test_cache_hit_does_not_append_row(self, tmp_path):
"""Files served from cache (no recompute) must not add a duplicate row."""
p = write_image(tmp_path / "photo.jpg")
cache_path = str(tmp_path / "cache.csv")
# First run: 1 new hash written
cache = {}
cache_out = open_cache_for_append(cache_path)
build_hashes([str(p)], cache, cache_out)
cache_out.close()
save_cache(cache_path, cache) # compact
# Second run: cache hit — file should still have exactly 1 data row
cache2 = load_cache(cache_path)
cache_out2 = open_cache_for_append(cache_path)
_, updated, _, _ = build_hashes([str(p)], cache2, cache_out2)
cache_out2.close()
assert updated == 0
with open(cache_path, newline="", encoding="utf-8") as f:
rows = list(csv.DictReader(f))
assert len(rows) == 1 # no extra row appended
def test_recovery_after_interrupt(self, tmp_path):
"""Hashes written before an interrupt are recoverable on the next run."""
imgs = [write_image(tmp_path / f"img{i}.jpg") for i in range(4)]
cache_path = str(tmp_path / "cache.csv")
# Simulate processing only the first 2 files (interrupt before the rest)
cache = {}
cache_out = open_cache_for_append(cache_path)
build_hashes([str(imgs[0]), str(imgs[1])], cache, cache_out)
cache_out.close()
# Do NOT call save_cache — simulates abrupt termination
# Next run: load the partially written cache
recovered = load_cache(cache_path)
assert len(recovered) == 2
# Resume: remaining 2 files should be computed, first 2 are cache hits
cache_out2 = open_cache_for_append(cache_path)
_, updated, _, _ = build_hashes([str(p) for p in imgs], recovered, cache_out2)
cache_out2.close()
assert updated == 2 # only the two that weren't processed before
def test_compaction_removes_duplicate_rows(self, tmp_path):
"""save_cache after append mode produces a clean single-entry-per-file CSV."""
p = write_image(tmp_path / "photo.jpg")
cache_path = str(tmp_path / "cache.csv")
# Two runs without compaction between them → file has 2 rows for the same file
cache = {}
cache_out = open_cache_for_append(cache_path)
build_hashes([str(p)], cache, cache_out)
cache_out.close()
# Force a re-hash by clearing the in-memory cache (simulates stale detection)
cache2 = {}
# Corrupt mtime in the on-disk cache so second run sees it as stale
loaded = load_cache(cache_path)
key = next(iter(loaded))
loaded[key]["mtime"] -= _TS_TOLERANCE + 10
cache_out2 = open_cache_for_append(cache_path)
build_hashes([str(p)], loaded, cache_out2)
cache_out2.close()
# File now has 2 data rows for the same path; compaction should fix that
save_cache(cache_path, loaded)
with open(cache_path, newline="", encoding="utf-8") as f:
rows = list(csv.DictReader(f))
assert len(rows) == 1
# ── CLI: --cache argument resolution ──────────────────────────────────────────
SCRIPT = os.path.join(os.path.dirname(__file__), "idem.py")
def _run(args: list, tmp_path) -> subprocess.CompletedProcess:
"""Run idem.py as a subprocess and return the result."""
return subprocess.run(
[sys.executable, SCRIPT] + args,
capture_output=True, text=True, cwd=str(tmp_path),
)
class TestCLICacheArg:
"""Integration tests for --cache directory/file resolution."""
def test_no_cache_arg_creates_file_in_scanned_dir(self, tmp_path):
"""Default: cache file is created inside the __databases subdirectory."""
write_image(tmp_path / "photo.jpg")
result = _run([str(tmp_path)], tmp_path)
assert result.returncode == 0, result.stderr
assert (tmp_path / DB_DIR / CACHE_FILENAME).exists()
def test_cache_as_directory_places_file_inside_it(self, tmp_path):
"""--cache <dir> puts images_perceptual_hash_db.csv inside that directory."""
write_image(tmp_path / "photo.jpg")
cache_dir = tmp_path / "my_cache_dir"
cache_dir.mkdir()
result = _run([str(tmp_path), "--cache", str(cache_dir)], tmp_path)
assert result.returncode == 0, result.stderr
assert (cache_dir / CACHE_FILENAME).exists()
def test_cache_as_new_file_path_is_created(self, tmp_path):
"""--cache <file> that does not yet exist is created by the script."""
write_image(tmp_path / "photo.jpg")
cache_file = tmp_path / "custom_cache.csv"
assert not cache_file.exists()
result = _run([str(tmp_path), "--cache", str(cache_file)], tmp_path)
assert result.returncode == 0, result.stderr
assert cache_file.exists()
def test_cache_as_existing_file_path_is_reused(self, tmp_path):
"""--cache <file> that already exists is loaded and updated."""
p = write_image(tmp_path / "photo.jpg")
cache_file = tmp_path / "custom_cache.csv"
# First run — populates the cache
result1 = _run([str(tmp_path), "--cache", str(cache_file)], tmp_path)
assert result1.returncode == 0, result1.stderr
# Second run — should report 0 new/updated hashes (all from cache)
result2 = _run([str(tmp_path), "--cache", str(cache_file)], tmp_path)
assert result2.returncode == 0, result2.stderr
assert "0 new * 0 re-hashed" in result2.stdout
# ── _name_score ─────────────────────────────────────────────────────────────────
class TestNameScoreCameraNames:
"""Camera/app-generated filenames should score 0 (no meaningful English words)."""
def test_google_pixel(self):
"""Google Pixel PXL_ prefix is a noise token; the name scores 0."""
assert _name_score("PXL_20231015_123456789.jpg") == 0
def test_generic_img(self):
"""Generic 'IMG_' prefix is a noise token; the name scores 0."""
assert _name_score("IMG_001.jpg") == 0
def test_sony_dsc(self):
"""Sony DSC_ prefix is a noise token; the name scores 0."""
assert _name_score("DSC_0001.jpg") == 0
def test_fuji_dscf(self):
"""Fuji DSCF prefix is a noise token; the name scores 0."""
assert _name_score("DSCF0042.jpg") == 0
def test_nikon_dscn(self):
"""Nikon DSCN prefix is a noise token; the name scores 0."""
assert _name_score("DSCN0099.JPG") == 0
def test_whatsapp(self):
"""WhatsApp WA-prefixed names are noise tokens; the name scores 0."""
assert _name_score("WA0001234567-1.jpg") == 0
def test_dcim_prefix(self):
"""DCIM_ prefix is a noise token; the name scores 0."""
assert _name_score("DCIM_001.jpg") == 0
def test_pure_digits(self):
"""A filename that is purely numeric (e.g. a date) scores 0."""
assert _name_score("20231015.jpg") == 0
def test_date_with_time(self):
"""A date+time-only filename (e.g. 20231015_153022) scores 0."""
assert _name_score("20231015_153022.jpg") == 0
def test_dcf_token(self):
"""DCF_ token is recognised as noise; the name scores 0."""
assert _name_score("DCF_0001.jpg") == 0
def test_whatsapp_token(self):
"""'WhatsApp' is a noise token; only digits remain after splitting, scoring 0."""
# 'WhatsApp' is a noise token; only digits remain after splitting
assert _name_score("WhatsApp_2023-10-15.jpg") == 0
def test_images_token(self):
"""'Images' is a noise token; the name scores 0."""
# 'Images' is a noise token
assert _name_score("images_001.jpg") == 0
def test_whatsapp_and_images_combined(self):
"""Both 'WhatsApp' and 'Images' tokens are noise; nothing meaningful remains."""
# Both tokens are noise; nothing meaningful remains
assert _name_score("WhatsApp_Images_2023-10-15.jpg") == 0
def test_whatsapp_case_insensitive(self):
"""'WhatsApp' noise detection is case-insensitive (whatsapp/WHATSAPP both score 0)."""
assert _name_score("whatsapp_2023.jpg") == 0
assert _name_score("WHATSAPP_2023.jpg") == 0
assert _name_score("WhatsApp_2023.jpg") == 0
def test_images_case_insensitive(self):
"""'Images' noise detection is case-insensitive (images/IMAGES/Images all score 0)."""
assert _name_score("images_001.jpg") == 0
assert _name_score("IMAGES_001.jpg") == 0
assert _name_score("Images_001.jpg") == 0
class TestNameScoreMeaningfulNames:
"""Filenames with real English words should score > 0."""
def test_two_words_underscore(self):
"""Two meaningful words separated by underscore score their combined letter count."""
# 'beach'(5) + 'vacation'(8) = 13
assert _name_score("beach_vacation.jpg") == 13
def test_two_words_with_year(self):
"""A year suffix is stripped; only the English words contribute to the score."""
# year is stripped; 'birthday'(8) + 'party'(5) = 13
assert _name_score("birthday_party_2023.jpg") == 13
def test_hyphen_separated(self):
"""Words separated by hyphens are each scored individually."""
# 'new'(3) + 'year'(4) + 'eve'(3) = 10
assert _name_score("new-year-eve.jpg") == 10
def test_no_separator(self):
"""A single token with no separator is scored by its total character length."""
# single CamelCase token, no noise match: 'BeachSunset'(11)
assert _name_score("BeachSunset.jpg") == 11
def test_date_prefix_meaningful_suffix(self):
"""A date prefix is stripped; only the meaningful suffix contributes to the score."""
# digits/underscore split away; only 'birthday'(8) remains
assert _name_score("20231015_birthday.jpg") == 8
def test_long_descriptive_name(self):
"""A multi-word descriptive filename accumulates scores for all words."""
# 'beautiful'(9)+'sunset'(6)+'at'(2)+'beach'(5) = 22
assert _name_score("beautiful_sunset_at_beach.jpg") == 22
def test_space_separated(self):
"""Spaces serve as word separators in the same way underscores do."""
# spaces are separators; 'family'(6)+'reunion'(7) = 13
assert _name_score("family reunion.jpg") == 13
def test_single_meaningful_word(self):
"""A single meaningful word scores its character length."""
assert _name_score("sunset.jpg") == 6
def test_uppercase_extension_ignored(self):
"""The file extension is stripped regardless of case before scoring."""
# extension is stripped; 'beach'(5)+'vacation'(8) = 13
assert _name_score("beach_vacation.JPEG") == 13
class TestNameScoreComparisons:
"""A meaningful name always outscores a camera name."""
@pytest.mark.parametrize("camera,meaningful", [
("PXL_20231015_123456789.jpg", "beach_vacation.jpg"),
("IMG_001.jpg", "birthday_party.jpg"),
("DSC_0042.jpg", "christmas_morning.jpg"),
("WA0001234567-1.jpg", "holiday_trip.jpg"),
("20231015_153022.jpg", "sunset.jpg"),
])
def test_meaningful_beats_camera(self, camera, meaningful):
"""Any descriptive filename scores higher than any camera-generated name."""
assert _name_score(meaningful) > _name_score(camera)
def test_more_words_beats_fewer(self):
"""A more descriptive (longer) name always outscores a shorter one."""
assert _name_score("beautiful_sunset_at_beach.jpg") > _name_score("sunset.jpg")
def test_camera_tie(self):
"""Two camera-style names both score 0 and are therefore tied."""
# Two camera-style names both score 0
assert _name_score("PXL_001.jpg") == _name_score("IMG_002.jpg") == 0
def test_noise_token_case_insensitive(self):
"""Noise tokens are matched case-insensitively (lowercase 'pxl' treated same as 'PXL')."""
# Lowercase 'pxl' should be treated as noise just like 'PXL'
assert _name_score("pxl_20231015_001.jpg") == 0
# ── _folder_score ────────────────────────────────────────────────────────────────
class TestFolderScoreDateFolders:
"""Folders that are purely dates or numbers should score 0."""
def test_iso_date(self):
"""ISO-format date folders (YYYY-MM-DD) score 0."""
assert _folder_score("/Photos/2023-12-25") == 0
def test_year_only(self):
"""Year-only folder names score 0."""
assert _folder_score("/Photos/2023") == 0
def test_year_month(self):
"""Year-month folder names (YYYY-MM) score 0."""
assert _folder_score("/Photos/2023-12") == 0
def test_underscore_date(self):
"""Underscore-separated date folders (YYYY_MM_DD) score 0."""
assert _folder_score("/Photos/2023_12_25") == 0
def test_pure_number(self):
"""Purely numeric folder names score 0."""
assert _folder_score("/camera/1234") == 0
class TestFolderScoreMeaningfulFolders:
"""Folders with English words should score > 0."""
def test_two_words(self):
"""A two-word folder name scores the total character count of its words."""
# 'BeachVacation' = 13
assert _folder_score("/Photos/Beach Vacation") == 13
def test_word_with_year(self):
"""A year suffix is stripped; only the English word contributes to the folder score."""
# digits stripped; 'Christmas'(9) remains
assert _folder_score("/Photos/Christmas 2023") == 9
def test_underscore_separated(self):
"""Underscores act as word separators in folder names."""
# underscores stripped; 'familyreunion'(13)
assert _folder_score("/2023/family_reunion") == 13
def test_only_last_component_scored(self):
"""Only the last path component (immediate parent folder) is scored."""
# Deep path vs shallow path with same last component must score equally
assert _folder_score("/very/long/path/Christmas Party") == _folder_score("/Christmas Party")
def test_camera_roll(self):
"""'Camera Roll' is a recognisable folder name, scoring its letter count."""
# 'CameraRoll' = 10
assert _folder_score("/Phone/Camera Roll") == 10
def test_letters_only_folder(self):
"""An all-letter folder name (not a noise token) scores its character count."""
# 'DCIM' has 4 alpha chars; not stripped (folder score doesn't filter noise tokens)
assert _folder_score("/DCIM") == 4
def test_mixed_alpha_digits(self):
"""Digits within a folder name are stripped before scoring the remaining letters."""
# '100MEDIA': strip digits → 'MEDIA'(5)
assert _folder_score("/camera/100MEDIA") == 5
class TestFolderScoreComparisons:
"""Descriptive folders outrank date/numeric ones."""
@pytest.mark.parametrize("date_folder,named_folder", [
("/2023-12-25", "/Christmas Morning"),
("/2023", "/Beach Vacation"),
("/2023-08", "/Summer Holidays"),
("/20231015", "/Birthday Party"),
])
def test_named_beats_date(self, date_folder, named_folder):
"""Descriptive folder names always score higher than date/numeric ones."""
assert _folder_score(named_folder) > _folder_score(date_folder)
# ── _smart_defaults — helpers ────────────────────────────────────────────────────
def _file(path, name, dir_, size):
return {"path": path, "name": name, "dir": dir_, "size": size}
# ── _smart_defaults — keep selection ─────────────────────────────────────────────
class TestSmartDefaultsKeep: