Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,106 @@ will spawn a number of threads in order to optimize performance for
iRODS server versions 4.2.9+ and file sizes larger than a default
threshold value of 32 Megabytes.

Because multithreaded processes under Unix-type operating systems sometimes
need special handling, it is recommended that any put or get of a large file
be appropriately handled in the case that a terminating signal aborts the
transfer:

```python
from irods.parallel import abort_parallel_transfers

def handler(signal, _):
abort_parallel_transfers()

signal(SIGTERM, handler)

try:
# A multi-1247 put or get can leave non-daemon threads running if not treated with care.
session.data_objects.put(...)
except KeyboardInterrupt:
# Internally, the library has likely already started a shutdown for the
# present put operation, but we can non-destructively issue the following
# call to ensure any other ongoing transfers are aborted, prior to re-raising:
abort_parallel_transfers()

printf('Due to a SIGINT or Control-C, the put failed.')
# Raise again, as is customary when catching a directly BaseException-derived object.
raise
except RuntimeError:
printf('The put failed.')
# ...
```

In general it is better (for applications wanting to gracefully handle
interrupted lengthy data transfers to/from iRODS data objects) to anticipate
control-C by handling both `KeyboardInterrupt` and `RuntimeError`, as shown
above.

Of course, had we intercepted SIGINT (meaning we assigned it a custom
handler), we could avoid the need to handle the `KeyboardInterrupt` in an
exception clause.

Internally, of course, the PRC must handle all eventualities, including
`KeyboardInterrupt`, by closing down the current transfer being setup or
waited on, in consideration of very simple applications which might do no
signal or exception handling of their own. Otherwise we would risk some
non-daemon threads not finishing, which could risk preventing a prompt and
orderly exit from the main program.

When a signal or exception handler calls `abort_parallel_transfers()`, all
parallel transfers are aborted immediately. Upon return to the normal flow
of the main program, the affected transfers will raise `RuntimeError` to
indicate the PUT or GET operation has failed.

Note that `abort_parallel_transfers()` is designed to be safe for inclusion
in signal handlers (e.g. it may be called several times without detrimental
effects); and that it returns promptly after having initiated the process of
shutting down the data transfer threads, rather than waiting for them to
terminate first. This owes to the best practice of minimizing time spent in
signal handlers. However, if desired, `abort_parallel_transfers()` may be
iterated subsequently with `(dry_run=True, ...)` to track the progress of the
shutdown. The default object returned (a dictionary whose keys are weak
references to the thread managers) will have a boolean value of `False` once
all transfer threads have exited.

The following example shows how to abort all synchronous ("foreground") puts
while leaving background transfers alone:

```python
import irods.helpers, threading
from irods.parallel import abort_parallel_transfers, FILTER_FUNCTIONS, io_main, Oper

session = irods.helpers.make_session()
hc = irods.helpers.home_collection(session)

sessions_for_threads = [session.clone() for _ in range(3)]

# Launch an asynchronous (i.e. "background") transfer
io_main(session, '{hc}/target_0', Oper.PUT|Oper.NONBLOCKING, 'my_large_file')

Threads = [ threading.Thread(target=lambda _sess, put_args: _sess.data_objects.put(*put_args, num_threads=2),
args=(sess,["my_large_file", f"{hc}/target_"+str(i+1)]))
for i, sess in enumerate(sessions_for_threads) ]

try:
# Launch transfer threads
for t in Threads:
t.start()
# Wait on transfer threads
for t in Threads:
t.join()
except KeyboardInterrupt:
# Trapping control-C, stop transfers launched synchronously, i.e. in the foreground of each transfer thread
# explicitly launched above.
abort_parallel_transfers(filter_function = FILTER_FUNCTIONS.foreground)

# [...]
# As the main application continues (or exits), the asynchronous transfer will be the only one to finish
# successfully after this call to `abort_parallel_transfers`. Note: Although care is taken not to
# leave replicas in a locked state, these relics of unfinished transfers can still remain in the
# object catalog.
```

Progress bars
-------------

Expand Down
56 changes: 35 additions & 21 deletions irods/manager/data_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,13 @@ def __init__(self, *a, **kwd):
super(ManagedBufferedRandom, self).__init__(*a, **kwd)
import irods.session

self.do_close = True

with irods.session._fds_lock:
irods.session._fds[self] = None

def __del__(self):
if not self.closed:
if self.do_close and not self.closed:
self.close()
call___del__if_exists(super(ManagedBufferedRandom, self))

Expand Down Expand Up @@ -245,15 +247,21 @@ def _download(self, obj, local_path, num_threads, updatables=(), **options):
if self.should_parallelize_transfer(
num_threads, o, open_options=options.items()
):
if not self.parallel_get(
(obj, o),
local_file,
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
data_open_returned_values=data_open_returned_values_,
updatables=updatables,
):
raise RuntimeError("parallel get failed")
error = RuntimeError("parallel get failed")
try:
if not self.parallel_get(
(obj, o),
local_file,
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
data_open_returned_values=data_open_returned_values_,
updatables=updatables,
):
raise error
except ex.iRODSException as e:
raise e
except BaseException as e:
raise error from e
else:
with open(local_file, "wb") as f:
for chunk in chunks(o, self.READ_BUFFER_SIZE):
Expand Down Expand Up @@ -353,17 +361,23 @@ def put(
):
o = deferred_call(self.open, (obj, "w"), options)
f.close()
if not self.parallel_put(
local_path,
(obj, o),
total_bytes=sizelist[0],
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, "")
or options.get(kw.DEST_RESC_NAME_KW, ""),
open_options=options,
updatables=updatables,
):
raise RuntimeError("parallel put failed")
error = RuntimeError("parallel put failed")
try:
if not self.parallel_put(
local_path,
(obj, o),
total_bytes=sizelist[0],
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, "")
or options.get(kw.DEST_RESC_NAME_KW, ""),
open_options=options,
updatables=updatables,
):
raise error
except ex.iRODSException as e:
raise e
except BaseException as e:
raise error from e
else:
with self.open(obj, "w", **options) as o:
# Set operation type to trigger acPostProcForPut
Expand Down
Loading
Loading