@@ -39,7 +39,7 @@ class ChaskiStreamer(ChaskiNode):
3939 # ----------------------------------------------------------------------
4040 def __init__ (
4141 self ,
42- destination_folder : str = '.' ,
42+ destination_folder : str = "." ,
4343 chunk_size : int = 8192 ,
4444 file_handling_callback : callable = None ,
4545 allow_incoming_files : bool = False ,
@@ -48,6 +48,8 @@ def __init__(
4848 * args : tuple ,
4949 ** kwargs : dict ,
5050 ):
51+ # Initialize lock for file transfer to prevent concurrent transfers
52+ self ._file_transfer_lock = asyncio .Lock ()
5153 """
5254 Initialize a new instance of ChaskiStreamer.
5355
@@ -86,8 +88,8 @@ def __init__(
8688 self .terminate_stream_flag = False
8789
8890 self .enable_message_propagation ()
89- self .add_propagation_command (' ChaskiMessage' )
90- self .add_propagation_command (' ChaskiStorageRequest' )
91+ self .add_propagation_command (" ChaskiMessage" )
92+ self .add_propagation_command (" ChaskiStorageRequest" )
9193
9294 if persistent_storage :
9395 self .persistent_storage = PersistentStorage ()
@@ -101,7 +103,7 @@ def __repr__(self):
101103 such as the IP address and port. If the instance is a root node, it prepends an
102104 asterisk (*) to the string.
103105 """
104- h = '*' if self .paired else ''
106+ h = "*" if self .paired else ""
105107 return h + self .address
106108
107109 # ----------------------------------------------------------------------
@@ -122,7 +124,7 @@ def address(self) -> str:
122124
123125 # ----------------------------------------------------------------------
124126 @classmethod
125- def get_hash (cls , file : str , algorithm : str = ' sha256' ) -> str :
127+ def get_hash (cls , file : str , algorithm : str = " sha256" ) -> str :
126128 """
127129 Compute the hash of a file using the specified algorithm.
128130
@@ -143,7 +145,7 @@ def get_hash(cls, file: str, algorithm: str = 'sha256') -> str:
143145 The hexadecimal hash digest of the file.
144146 """
145147 hash_func = hashlib .new (algorithm )
146- with open (file , 'rb' ) as f :
148+ with open (file , "rb" ) as f :
147149 while chunk := f .read (8192 ): # Read the file in 8192 byte blocks
148150 hash_func .update (chunk )
149151 return hash_func .hexdigest ()
@@ -188,7 +190,7 @@ def _get_status(self, **kwargs) -> dict:
188190 }
189191
190192 # ----------------------------------------------------------------------
191- async def __aenter__ (self ) -> Generator [' Message' , None , None ]:
193+ async def __aenter__ (self ) -> Generator [" Message" , None , None ]:
192194 """
193195 Enter the asynchronous context for streaming messages.
194196
@@ -208,7 +210,7 @@ async def __aexit__(
208210 self ,
209211 exception_type : type ,
210212 exception_value : BaseException ,
211- exception_traceback : ' TracebackType' ,
213+ exception_traceback : " TracebackType" ,
212214 ) -> None :
213215 """
214216 Exit the runtime context related to this object and stop the streamer.
@@ -250,10 +252,10 @@ async def push(self, topic: str, data: bytes = None) -> None:
250252 data : bytes, optional
251253 The byte-encoded data to be sent with the message. This could be any binary payload that subscribers are expected to process.
252254 """
253- await self ._write (' ChaskiMessage' , data = data , topic = topic )
255+ await self ._write (" ChaskiMessage" , data = data , topic = topic )
254256
255257 # ----------------------------------------------------------------------
256- async def _process_ChaskiMessage (self , message : ' Message' , edge : ' Edge' ) -> None :
258+ async def _process_ChaskiMessage (self , message : " Message" , edge : " Edge" ) -> None :
257259 """
258260 Process an incoming Chaski message and place it onto the message queue.
259261
@@ -307,7 +309,7 @@ def deactivate_file_transfer(self) -> None:
307309 self .allow_incoming_files = False
308310
309311 # ----------------------------------------------------------------------
310- async def message_stream (self ) -> Generator [' Message' , None , None ]:
312+ async def message_stream (self ) -> Generator [" Message" , None , None ]:
311313 """
312314 Asynchronously generate messages from the message queue.
313315
@@ -358,7 +360,7 @@ def terminate_stream(self) -> None:
358360 async def push_file (
359361 self ,
360362 topic : str ,
361- file : ' IOBase' ,
363+ file : " IOBase" ,
362364 filename : str = None ,
363365 data : dict = {},
364366 ):
@@ -386,37 +388,44 @@ async def push_file(
386388 This method uses asynchronous I/O to read the file in chunks and send each chunk
387389 without blocking the event loop. It ensures that the entire file is processed and sent
388390 even if the process involves multiple chunks.
391+
392+ This method uses a lock to ensure only one file transfer happens at a time.
389393 """
390- size = 0
391- # Initialize a SHA-256 hash function for computing the hash digest of the file chunks
392- hash_func = hashlib .new ('sha256' )
393- while True :
394- # Read the next chunk of data from the file up to the specified chunk size
395- chunk = file .read (self .chunk_size )
396- # Increment the size by the length of the current chunk
397- size += len (chunk )
398- # Update the hash function with the current chunk of data.
399- hash_func .update (chunk )
400- # Package the chunked file data along with metadata such as filename, hash, and chunk size
401- package_data = {
402- 'filename' : (filename if filename else os .path .split (file .name )[- 1 ]),
403- 'chunk' : chunk ,
404- 'hash' : hash_func .hexdigest (),
405- 'data' : data ,
406- 'chunk_size' : self .chunk_size ,
407- 'size' : size ,
408- }
409-
410- # Send the chunked file data as a message to the specified topic and yield control to the event loop
411- await self ._write ('ChaskiFile' , data = package_data , topic = topic )
412- await asyncio .sleep (0 ) # very important sleep
413-
414- # If no more chunks are available to read, the file transfer is complete
415- if not chunk :
416- break
394+
395+ # Acquire the lock to ensure only one file transfer at a time
396+ async with self ._file_transfer_lock :
397+ size = 0
398+ # Initialize a SHA-256 hash function for computing the hash digest of the file chunks
399+ hash_func = hashlib .new ("sha256" )
400+
401+ while True :
402+ # Read the next chunk of data from the file up to the specified chunk size
403+ chunk = file .read (self .chunk_size )
404+ # If no more chunks are available to read, the file transfer is complete
405+ if not chunk :
406+ break
407+ # Increment the size by the length of the current chunk
408+ size += len (chunk )
409+ # Update the hash function with the current chunk of data.
410+ hash_func .update (chunk )
411+ # Package the chunked file data along with metadata such as filename, hash, and chunk size
412+ package_data = {
413+ "filename" : (
414+ filename if filename else os .path .split (file .name )[- 1 ]
415+ ),
416+ "chunk" : chunk ,
417+ "hash" : hash_func .hexdigest (),
418+ "data" : data ,
419+ "chunk_size" : self .chunk_size ,
420+ "size" : size ,
421+ }
422+
423+ # Send the chunked file data as a message to the specified topic and yield control to the event loop
424+ await self ._write ("ChaskiFile" , data = package_data , topic = topic )
425+ await asyncio .sleep (0 ) # very important sleep
417426
418427 # ----------------------------------------------------------------------
419- async def _process_ChaskiFile (self , message : ' Message' , edge : ' Edge' ) -> None :
428+ async def _process_ChaskiFile (self , message : " Message" , edge : " Edge" ) -> None :
420429 """
421430 Process an incoming ChaskiFile message and append each chunk of data to the target file.
422431
@@ -444,10 +453,10 @@ async def _process_ChaskiFile(self, message: 'Message', edge: 'Edge') -> None:
444453 return
445454
446455 # Append incoming file chunk data to the target file in append-binary mode
447- if chunk := message .data .pop (' chunk' ):
456+ if chunk := message .data .pop (" chunk" ):
448457 with open (
449- os .path .join (self .destination_folder , message .data [' filename' ]),
450- 'ab' ,
458+ os .path .join (self .destination_folder , message .data [" filename" ]),
459+ "ab" ,
451460 ) as file :
452461 # Write the current chunk to the target file in append-binary mode
453462 file .write (chunk )
@@ -459,7 +468,7 @@ async def _process_ChaskiFile(self, message: 'Message', edge: 'Edge') -> None:
459468 self .file_handling_callback (
460469 ** {
461470 ** message .data ,
462- ' destiny_folder' : self .destination_folder ,
471+ " destiny_folder" : self .destination_folder ,
463472 }
464473 )
465474
0 commit comments