-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcopy_folders.py
More file actions
190 lines (148 loc) · 5.83 KB
/
copy_folders.py
File metadata and controls
190 lines (148 loc) · 5.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""
# Copy Folders Script
Description:
- Periodically copies multiple folders between local paths and remote paths using rclone
- Runs copy operations in separate threads for each source/destination pair
- Logs copy operations to individual log files with truncation if logs exceed LOG_MAX_MB
- Gracefully stops all threads on Ctrl+C or termination signal
Requirements:
- Python 3.x
- rclone (configured, e.g., dropbox:)
Usage:
python copy_folders.py
Config File (`folders.conf`):
Each line should define a copy job in the format:
source_path=dest_path|ext1,ext2,...
- The left-hand side (`source_path`) is always the **copy source**.
- The right-hand side (`dest_path`) is always the **copy destination**.
- An optional `|ext1,ext2,...` restricts copying to specific file extensions
(without leading dots). Example: `jpg,png,pdf`.
- If no `|` is provided, all files are copied.
Notes:
- Lines starting with `#` are ignored (comments).
- If a local source path does not exist, it will be skipped.
"""
import os
import time
import subprocess
import signal
import sys
import threading
import logging
# Safe log filename sanitiser
import re
def safe_name(path: str) -> str:
return re.sub(r'[^A-Za-z0-9_.-]+', '_', path)
# Configuration
CONFIG_FILE = "folders.conf"
LOG_DIR = "copy_logs"
LOG_MAX_MB = 10 # Max log size in MB
COPY_INTERVAL = 60 # Seconds between rclone copy runs
threads = []
# Ensure log directory exists
os.makedirs(LOG_DIR, exist_ok=True)
# Helper: Truncate log file if too large
def check_log_size(logfile):
if os.path.exists(logfile) and os.path.getsize(logfile) > LOG_MAX_MB * 1024 * 1024:
with open(logfile, 'rb') as f:
f.seek(-LOG_MAX_MB * 1024 * 1024, os.SEEK_END)
data = f.read()
with open(logfile, 'wb') as f:
f.write(data)
# Function to periodically copy a folder
def copy_folder(source_path, dest_path, include_exts=None):
include_exts = include_exts or [] # default: copy everything
# Use safe_name to handle spaces safely in log filename
safe_source = safe_name(source_path)
safe_dest = safe_name(dest_path)
log_file = os.path.join(LOG_DIR, f"rclone_{safe_source}_TO_{safe_dest}.log")
# Per-thread logger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
logger = logging.getLogger(f"{safe_source}->{safe_dest}")
logger.info(f"Started monitoring: SOURCE={source_path} DEST={dest_path} EXTENSIONS={include_exts or 'ALL'}")
while True:
logger.info(f"Starting copy cycle: SOURCE={source_path} DEST={dest_path}")
try:
# Base rclone command
cmd = [
"rclone", "copy",
"-v",
"--stats=5s",
"--stats-one-line",
"--retries", "10",
"--timeout", "30s",
"--ignore-checksum",
]
# Add include filters if needed
for ext in include_exts:
cmd.extend(["--include", f"*.{ext}"])
cmd.extend([source_path, dest_path])
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
for line in proc.stdout:
if line.strip():
logger.info(line.rstrip())
ret = proc.wait()
check_log_size(log_file)
if ret == 0:
logger.info(f"Copy successful: SOURCE={source_path} DEST={dest_path}")
else:
logger.error(f"Copy failed: SOURCE={source_path} DEST={dest_path} (exit {ret})")
except Exception as e:
logger.error(f"Error during copy: {e}")
logger.info(f"Waiting {COPY_INTERVAL} seconds until next copy cycle")
time.sleep(COPY_INTERVAL)
# Cleanup function for graceful shutdown
def cleanup(signum, frame):
print("Caught interrupt. Stopping all folder copy threads...")
sys.exit(0)
# Register signal handlers
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
# Read config and start copy threads
# Use rstrip("\n") instead of strip() to preserve spaces in paths.
if not os.path.exists(CONFIG_FILE):
print(f"Config file {CONFIG_FILE} not found. Please create it with source=dest|ext1,ext2 pairs.")
sys.exit(1)
with open(CONFIG_FILE, 'r') as f:
for line in f:
line = line.rstrip("\n") # >>> CHANGED (instead of .strip())
if not line or line.lstrip().startswith("#") or '=' not in line:
continue
# Split into main part and optional extensions
path_part, *ext_part = line.split("|", 1)
source_path, dest_path = path_part.split("=", 1)
# reserve internal spaces, only trim external accidental whitespace
source_path = source_path.strip()
dest_path = dest_path.strip()
include_exts = []
if ext_part:
include_exts = [e.strip().lower() for e in ext_part[0].split(",") if e.strip()]
# Skip missing local sources (but allow remotes like dropbox:/path)
if not os.path.exists(source_path) and ":" not in source_path:
print(f"Source path {source_path} does not exist. Skipping.")
continue
print(f"Monitoring: SOURCE={source_path} DEST={dest_path} EXTENSIONS={include_exts or 'ALL'}")
# Start a thread for each copy job
thread = threading.Thread(target=copy_folder, args=(source_path, dest_path, include_exts))
thread.daemon = True
thread.start()
threads.append(thread)
# Keep the script running
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
cleanup(None, None)