Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions photos-metadata-restore/1_copy_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env python3
"""
1_copy_files.py - Copy image files to output directory with hash-based naming
"""

import hashlib
import json
import os
import shutil
from pathlib import Path
from typing import List, Dict, Any

from loguru import logger
from alive_progress import alive_bar


def get_all_extensions(directory: Path) -> List[str]:
"""Get all file extensions in the directory recursively."""
extensions = set()
for file_path in directory.rglob("*"):
if file_path.is_file():
extensions.add(file_path.suffix.lower())
return sorted(list(extensions))


def get_image_extensions() -> List[str]:
"""Get list of image file extensions."""
return [
".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif",
".webp", ".svg", ".ico", ".heic", ".heif", ".raw", ".cr2",
".nef", ".arw", ".dng", ".orf", ".rw2", ".pef", ".srw"
]


def get_video_extensions() -> List[str]:
"""Get list of video file extensions."""
return [
".mp4", ".avi", ".mov", ".wmv", ".flv", ".webm", ".mkv",
".m4v", ".3gp", ".ogv", ".mts", ".m2ts", ".ts"
]


def is_image_file(file_path: Path) -> bool:
"""Check if file is an image based on extension."""
return file_path.suffix.lower() in get_image_extensions()


def calculate_file_hash(file_path: Path) -> str:
"""Calculate MD5 hash of a file."""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()


def copy_files_with_hash(input_dir: Path, output_dir: Path) -> List[Dict[str, Any]]:
"""Copy image files to output directory with hash-based naming."""
pairs = []

# Get all image files
image_files = [f for f in input_dir.rglob("*") if f.is_file() and is_image_file(f)]

logger.info(f"Found {len(image_files)} image files to process")

with alive_bar(len(image_files), title="Copying files") as bar:
for file_path in image_files:
try:
# Calculate hash
file_hash = calculate_file_hash(file_path)

# Create new filename
new_filename = f"{file_hash}{file_path.suffix}"
output_path = output_dir / new_filename

# Copy file
shutil.copy2(file_path, output_path)

# Store pair information
pair = {
"source": str(file_path),
"destination": str(output_path),
"filename": new_filename,
"hash": file_hash
}
pairs.append(pair)

bar()

except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
bar()

return pairs


def main():
"""Main function."""
# Setup logging
logger.remove()
logger.add("output/copy_files.log", rotation="10 MB", level="INFO")
logger.add(lambda msg: print(msg, end=""), level="INFO")

# Setup paths
input_dir = Path("input")
output_dir = Path("output")
images_dir = output_dir / "images"

# Create output directories
if output_dir.exists():
shutil.rmtree(output_dir)
output_dir.mkdir(parents=True)
images_dir.mkdir()

logger.info("Starting file copy process")

# Check if input directory exists
if not input_dir.exists():
logger.error(f"Input directory {input_dir} does not exist")
return

# Get all extensions in input directory
all_extensions = get_all_extensions(input_dir)
image_extensions = get_image_extensions()
video_extensions = get_video_extensions()

logger.info(f"Found {len(all_extensions)} unique file extensions")
logger.info(f"Image extensions: {image_extensions}")
logger.info(f"Video extensions: {video_extensions}")

# Find non-image, non-video extensions
other_extensions = [ext for ext in all_extensions
if ext not in image_extensions and ext not in video_extensions]

if other_extensions:
logger.info(f"Non-image, non-video extensions found: {other_extensions}")

# Copy files
pairs = copy_files_with_hash(input_dir, images_dir)

# Save pairs to JSON
pairs_file = output_dir / "pair.json"
with open(pairs_file, "w", encoding="utf-8") as f:
json.dump(pairs, f, indent=2, ensure_ascii=False)

logger.info(f"Copied {len(pairs)} files to {images_dir}")
logger.info(f"Pair information saved to {pairs_file}")


if __name__ == "__main__":
main()
211 changes: 211 additions & 0 deletions photos-metadata-restore/2_filter_missing_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""
2_filter_missing_metadata.py - Check for missing metadata in image files
"""

import json
import os
from pathlib import Path
from typing import Dict, Any, Optional, Tuple

from loguru import logger
from alive_progress import alive_bar
from PIL import Image
from PIL.ExifTags import TAGS
import exifread


def get_exif_datetime(image_path: Path) -> Optional[str]:
"""Extract datetime information from EXIF data."""
try:
with open(image_path, 'rb') as f:
tags = exifread.process_file(f, details=False)

datetime_fields = [
'EXIF DateTime',
'EXIF DateTimeOriginal',
'EXIF DateTimeDigitized',
'Image DateTime'
]

for field in datetime_fields:
if field in tags:
return str(tags[field])

except Exception as e:
logger.debug(f"Error reading EXIF datetime from {image_path}: {e}")

return None


def get_exif_location(image_path: Path) -> Optional[Tuple[float, float, Optional[float]]]:
"""Extract GPS location from EXIF data."""
try:
with open(image_path, 'rb') as f:
tags = exifread.process_file(f, details=False)

# Check for GPS tags
gps_latitude = tags.get('GPS GPSLatitude')
gps_latitude_ref = tags.get('GPS GPSLatitudeRef')
gps_longitude = tags.get('GPS GPSLongitude')
gps_longitude_ref = tags.get('GPS GPSLongitudeRef')
gps_altitude = tags.get('GPS GPSAltitude')

if gps_latitude and gps_longitude:
# Convert to decimal degrees
lat = convert_to_decimal_degrees(gps_latitude, gps_latitude_ref)
lon = convert_to_decimal_degrees(gps_longitude, gps_longitude_ref)
alt = float(gps_altitude) if gps_altitude else None

return (lat, lon, alt)

except Exception as e:
logger.debug(f"Error reading EXIF GPS from {image_path}: {e}")

return None


def convert_to_decimal_degrees(coord, ref):
"""Convert GPS coordinates to decimal degrees."""
try:
# Parse the coordinate string
coord_str = str(coord)
ref_str = str(ref)

# Extract degrees, minutes, seconds
parts = coord_str.replace('[', '').replace(']', '').split(', ')
degrees = float(parts[0])
minutes = float(parts[1])
seconds = float(parts[2])

# Calculate decimal degrees
decimal = degrees + minutes/60 + seconds/3600

# Apply reference (N/S, E/W)
if ref_str in ['S', 'W']:
decimal = -decimal

return decimal

except Exception as e:
logger.debug(f"Error converting coordinates: {e}")
return None


def get_file_creation_time(file_path: Path) -> Optional[str]:
"""Get file creation time."""
try:
stat = file_path.stat()
return str(stat.st_ctime)
except Exception as e:
logger.debug(f"Error getting file creation time for {file_path}: {e}")
return None


def analyze_image_metadata(image_path: Path) -> Dict[str, Any]:
"""Analyze metadata for a single image file."""
result = {
"datetime": {
"exif_datetime": None,
"exif_datetime_original": None,
"exif_datetime_digitized": None,
"file_creation_time": None,
"json_datetime": None
},
"location": {
"latitude": None,
"longitude": None,
"altitude": None,
"exif_gps": False,
"json_location": False
},
"has_datetime": False,
"has_location": False,
"metadata_sources": []
}

# Check EXIF datetime
exif_datetime = get_exif_datetime(image_path)
if exif_datetime:
result["datetime"]["exif_datetime"] = exif_datetime
result["metadata_sources"].append("exif")
result["has_datetime"] = True

# Check EXIF location
location = get_exif_location(image_path)
if location:
lat, lon, alt = location
result["location"]["latitude"] = lat
result["location"]["longitude"] = lon
result["location"]["altitude"] = alt
result["location"]["exif_gps"] = True
result["has_location"] = True
if "exif" not in result["metadata_sources"]:
result["metadata_sources"].append("exif")

# Check file creation time
file_time = get_file_creation_time(image_path)
if file_time:
result["datetime"]["file_creation_time"] = file_time
if not result["has_datetime"]:
result["has_datetime"] = True

return result


def main():
"""Main function."""
# Setup logging
logger.remove()
logger.add("output/filter_metadata.log", rotation="10 MB", level="INFO")
logger.add(lambda msg: print(msg, end=""), level="INFO")

# Setup paths
output_dir = Path("output")
images_dir = output_dir / "images"
metadata_file = output_dir / "metadata.json"

logger.info("Starting metadata analysis")

# Check if images directory exists
if not images_dir.exists():
logger.error(f"Images directory {images_dir} does not exist")
return

# Get all image files
image_files = [f for f in images_dir.iterdir() if f.is_file() and f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp', '.heic', '.heif']]

logger.info(f"Found {len(image_files)} image files to analyze")

# Analyze metadata for each image
metadata = {}

with alive_bar(len(image_files), title="Analyzing metadata") as bar:
for image_path in image_files:
try:
filename = image_path.name
metadata[filename] = analyze_image_metadata(image_path)
bar()

except Exception as e:
logger.error(f"Error analyzing {image_path}: {e}")
bar()

# Save metadata to JSON
with open(metadata_file, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)

# Log summary
total_images = len(metadata)
images_with_datetime = sum(1 for m in metadata.values() if m["has_datetime"])
images_with_location = sum(1 for m in metadata.values() if m["has_location"])

logger.info(f"Metadata analysis complete")
logger.info(f"Total images: {total_images}")
logger.info(f"Images with datetime: {images_with_datetime}")
logger.info(f"Images with location: {images_with_location}")
logger.info(f"Metadata saved to {metadata_file}")


if __name__ == "__main__":
main()
Loading