diff --git a/tests/test_executors.py b/tests/test_executors.py index 88197ad..f3481be 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -2,7 +2,7 @@ import concurrent.futures import multiprocessing import unittest - +import time from winloop import _testbase as tb @@ -37,9 +37,52 @@ def test_executors_process_pool_01(self): def test_executors_process_pool_02(self): self.run_pool_test(concurrent.futures.ThreadPoolExecutor) + class TestUVExecutors(_TestExecutors, tb.UVTestCase): - pass + # Only libuv can feasabily do this. + # this was implemented to help combat resource problems + + def test_libuv_threadpool(self): + self.loop.set_default_executor(None) + async def run(): + coros = [] + for i in range(0, 10): + coros.append(self.loop.run_in_executor(None, fib, i)) + res = await asyncio.gather(*coros) + self.assertEqual(res, fib10) + await asyncio.sleep(0.01) + fib10 = [fib(i) for i in range(10)] + self.loop.run_until_complete(run()) + + def test_libuv_threadpool_exception(self): + self.loop.set_default_executor(None) + async def run(): + class TestException(Exception): + pass + + def execption(): + raise TestException("Hello") + + with self.assertRaises(TestException): + await self.loop.run_in_executor(None, execption) + + self.loop.run_until_complete(run()) + + def test_libuv_threadpool_cancellation(self): + self.loop.set_default_executor(None) + + async def run(): + + def eternity(): + time(3600) + + fut = self.loop.run_in_executor(None, eternity) + fut.cancel() + with self.assertRaises(asyncio.CancelledError): + await fut + self.loop.run_until_complete(run()) + class TestAIOExecutors(_TestExecutors, tb.AIOTestCase): diff --git a/winloop/includes/uv.pxd b/winloop/includes/uv.pxd index edd6a38..58cb847 100644 --- a/winloop/includes/uv.pxd +++ b/winloop/includes/uv.pxd @@ -518,3 +518,12 @@ cdef extern from "uv.h" nogil: unsigned int uv_version() int uv_pipe(uv_file fds[2], int read_flags, int write_flags) + + + ctypedef struct uv_work_t: + void* data + + ctypedef void (*uv_work_cb)(uv_work_t* req) + ctypedef void (*uv_after_work_cb)(uv_work_t* req, int status) + int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb); + diff --git a/winloop/loop.pyx b/winloop/loop.pyx index 5f2a7c0..8bd2c8d 100644 --- a/winloop/loop.pyx +++ b/winloop/loop.pyx @@ -128,74 +128,7 @@ cdef inline run_in_context2(context, method, arg1, arg2): Py_DECREF(method) -def list2cmdline(seq): - """ - Translate a sequence of arguments into a command line - string, using the same rules as the MS C runtime: - - 1) Arguments are delimited by white space, which is either a - space or a tab. - - 2) A string surrounded by double quotation marks is - interpreted as a single argument, regardless of white space - contained within. A quoted string can be embedded in an - argument. - - 3) A double quotation mark preceded by a backslash is - interpreted as a literal double quotation mark. - - 4) Backslashes are interpreted literally, unless they - immediately precede a double quotation mark. - - 5) If backslashes immediately precede a double quotation mark, - every pair of backslashes is interpreted as a literal - backslash. If the number of backslashes is odd, the last - backslash escapes the next double quotation mark as - described in rule 3. - """ - - # See - # http://msdn.microsoft.com/en-us/library/17w5ykft.aspx - # or search http://msdn.microsoft.com for - # "Parsing C++ Command-Line Arguments" - result = [] - needquote = False - for arg in map(os.fsdecode, seq): - bs_buf = [] - - # Add a space to separate this argument from the others - if result: - result.append(' ') - - needquote = (" " in arg) or ("\t" in arg) or not arg - if needquote: - result.append('"') - - for c in arg: - if c == '\\': - # Don't know if we need to double yet. - bs_buf.append(c) - elif c == '"': - # Double backslashes. - result.append('\\' * len(bs_buf)*2) - bs_buf = [] - result.append('\\"') - else: - # Normal char - if bs_buf: - result.extend(bs_buf) - bs_buf = [] - result.append(c) - - # Add remaining backslashes, if any. - if bs_buf: - result.extend(bs_buf) - if needquote: - result.extend(bs_buf) - result.append('"') - - return ''.join(result).replace('\\"', '"') # Used for deprecation and removal of `loop.create_datagram_endpoint()`'s # *reuse_address* parameter @@ -2849,12 +2782,17 @@ cdef class Loop: if executor is None: executor = self._default_executor - # Only check when the default executor is being used + # Only check when a default executor is set + # we self._check_default_executor() if executor is None: - executor = cc_ThreadPoolExecutor() - self._default_executor = executor - + # if we do not have a default executor + # we should use libuv's threadpool + # to eliminate some bulkier payloads + # such as when running anything from a + # pure python executor for instance. + return _ExecutorFuture(self, func, args) + return aio_wrap_future(executor.submit(func, *args), loop=self) def set_default_executor(self, executor): @@ -3480,6 +3418,27 @@ class _SyncSocketWriterFuture(aio_Future): self.__remove_writer() aio_Future.cancel(self) +# used when no executor was passed and we want to +# utilize upon libuv's threadpool instead +class _ExecutorFuture(aio_Future): + def __init__(self, loop, func, args) -> None: + aio_Future.__init__(self, loop=loop) + self.__work = UVWork(loop, self, func, args) + + def __remove_worker(self): + if self.__work is not None: + self.__work._cancel() + self.__work = None + + if PY39: + def cancel(self, msg=None): + self.__remove_worker() + aio_Future.cancel(self, msg=msg) + + else: + def cancel(self): + self.__remove_worker() + aio_Future.cancel(self) include "cbhandles.pyx" include "pseudosock.pyx" diff --git a/winloop/request.pxd b/winloop/request.pxd index bafa51d..2185128 100644 --- a/winloop/request.pxd +++ b/winloop/request.pxd @@ -6,3 +6,23 @@ cdef class UVRequest: cdef on_done(self) cdef cancel(self) + + + + + +# Based off concurrent.futures.thread._WorkItem and Future +# this is a backup alternative so that some resources from +# using a default executor are eliminated. +cdef class UVWork(UVRequest): + cdef: + object fut # asyncio.Future[...] + object fn + object args + object result + object exc + + cdef void run(self) noexcept with gil + + + \ No newline at end of file diff --git a/winloop/request.pyx b/winloop/request.pyx index f15103a..d2fe5be 100644 --- a/winloop/request.pyx +++ b/winloop/request.pyx @@ -63,3 +63,85 @@ cdef class UVRequest: else: ex = convert_error(err) self.loop._handle_exception(ex) + + +cdef class UVWork(UVRequest): + cdef cancel(self): + UVRequest.cancel(self) + # if successful do the same on the future object's end. + if not self.fut.cancelled(): + self.fut.cancel() + + # shortcut + def _cancel(self): + return self.cancel() + + cdef void run(self) noexcept with gil: + cdef object result + try: + # if the eventloop fires the task but we cancelled previously + # it's best to try exiting now instead of later... + if self.fut.cancelled(): + return + try: + result = self.fn(*self.args) + except BaseException as exc: + self.exc = exc + else: + self.result = result + except BaseException as ex: + # if anything else fails. We don't have to be a sitting duck + # and let it slide. we can handle it as soon as possible... + self.exc = exc + + def __cinit__(self, Loop loop, object fut, object fn, object args): + self.request = PyMem_RawMalloc(sizeof(uv.uv_work_t)) + if self.request == NULL: + raise MemoryError + + self.loop = loop + self.done = 0 + self.fn = fn + self.args = args + self.result = None + self.exc = None + self.fut = fut + + self.request.data = self + + # UV_EINVAL will never happen because our callback exists... + uv.uv_queue_work(self.loop.uvloop, self.request, __on_work_cb, __on_after_work_cb) + Py_INCREF(self) + + def __dealloc__(self): + if self.request != NULL: + PyMem_RawFree(self.request) + + + +cdef void __on_work_cb(uv.uv_work_t* req) noexcept with gil: + (req.data).run() + +cdef void __on_after_work_cb(uv.uv_work_t* req, int err) noexcept with gil: + cdef object ex + cdef UVWork work = (req.data) + try: + if err == uv.UV_ECANCELED: + if not work.fut.cancelled(): + work.fut.cancel() + + elif err != 0: + ex = convert_error(err) + work.fut.set_exception(ex) + + if work.exc is not None: + work.fut.set_exception(work.exc) + else: + work.fut.set_result(work.result) + except BaseException as e: + work.loop._handle_exception(e) + finally: + work.on_done() + + +