-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgcs_sycn.py
More file actions
82 lines (64 loc) · 2.47 KB
/
gcs_sycn.py
File metadata and controls
82 lines (64 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# A simple Google Cloud Storage sync application similar to dropbox. When some modification
# (creation, deletion, updation) happens in a folder, the file will be sync with the google cloud storage
# tried to mimic the Dropbox sync feature
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from google.cloud import storage
import time
json_key = "./key.json"
sync_folder = "." # current folder
class Storage:
def __init__(self):
try:
self.storage_client = storage.Client.from_service_account_json(json_key)
self.bucket = self.storage_client.get_bucket("vj_test_report")
except Exception as error:
print(f"There is some error has occurred: {error}")
raise error
def upload_file(self, file_name, file_path, is_update=False):
blob = self.bucket.blob(file_name)
blob.upload_from_filename(file_path)
if is_update:
print(f"Updated the {file_name}")
else:
print(f"Uploaded the {file_name}")
def delete_file(self, file_name):
blob = self.bucket.blob(file_name)
blob.delete()
print(f"File removed {file_name}")
class MyCustomFileHandler(FileSystemEventHandler):
def on_created(self, event):
file_path = event.src_path
file_name = file_path.split("\\")[-1]
print(f"A new file created: {file_name}")
storage = Storage()
storage.upload_file(file_name=file_name,
file_path=file_path)
def on_deleted(self, event):
file_path = event.src_path
file_name = file_path.split("\\")[-1]
print(f"A file moved: {event.src_path}")
storage = Storage()
storage.delete_file(file_name)
def on_modified(self, event):
file_path = event.src_path
file_name = file_path.split("\\")[-1]
print(f"A File updated: {file_name}")
storage = Storage()
storage.upload_file(file_name=file_name,
file_path=file_path,
is_update=True
)
def on_moved(self, event):
self.on_deleted()
if __name__ == "__main__":
event_handler = MyCustomFileHandler()
observer = Observer()
observer.schedule(event_handler, sync_folder, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except Exception as error:
observer.stop()
observer.join()