feat: Adding support for python 3.13#1666
Conversation
…into gaaguiar/proxy_worker
* build: recognize collection_model_binding_data for batch inputs (#1655) * add cmbd * Add * Add * Rm newline * Add tests * Fix cmbd * Fix test * Lint * Rm * Rm * Add back newline * rm ws * Rm list * Rm cmbd from cache * Avoid caching * Keep cmbd check * Add comment * Lint --------- Co-authored-by: Evan Roman <evanroman@microsoft.com> Co-authored-by: hallvictoria <59299039+hallvictoria@users.noreply.github.com> * build: update Python Worker Version to 4.36.1 (#1660) Co-authored-by: AzureFunctionsPython <funcdisc@microsoft.com> * initial changes * Update Python SDK Version to 1.23.0 (#1663) Co-authored-by: AzureFunctionsPython <azfunc@microsoft.com> * merges from ADO * merge fixes * merge fixes * merge fixes * merge fixes * don't run 313 unit tests yet * changes for builds --------- Co-authored-by: Evan <66287338+EvanR-Dev@users.noreply.github.com> Co-authored-by: Evan Roman <evanroman@microsoft.com> Co-authored-by: AzureFunctionsPython <funcdisc@microsoft.com> Co-authored-by: AzureFunctionsPython <azfunc@microsoft.com>
| @unittest.skipIf(sys.version_info.minor < 13, "For python 3.13+," | ||
| "this logic is in the" | ||
| "library worker.") | ||
| @unittest.skipIf(sys.version_info.minor >= 13, "For python 3.13+," |
There was a problem hiding this comment.
Use a constant to defined the supported python version to minimize the changes in the future.
| # Extract minor version as integers | ||
| PY_MINOR="${PY_VER#*.}" | ||
|
|
||
| if [ "$PY_MINOR" -ge 13 ] then |
There was a problem hiding this comment.
NIT: Consider having a file that contains the supported version of python and used at both ci-unit-tests and ci-e2e-tests.yml file.
| @unittest.skipIf(sys.version_info.minor < 13, "For python 3.13+," | ||
| "this logic is in the" | ||
| "library worker.") | ||
| @unittest.skipIf(sys.version_info.minor >= 13, "For python 3.13+," |
There was a problem hiding this comment.
[Checking] You are skipping tests for 3.13 is that intended?
| workerPath: 'python/prodV4/worker.py' | ||
| Python313V4: | ||
| pythonVersion: '3.13' | ||
| workerPath: 'python/proxyV4/worker.py' |
There was a problem hiding this comment.
Can the worker Path be a variable at the top?
| EVENTGRID_URI: $(LinuxEventGridTopicUriString312) | ||
| EVENTGRID_CONNECTION: $(LinuxEventGridConnectionKeyString312) | ||
| Python313: | ||
| PYTHON_VERSION: '3.13' |
There was a problem hiding this comment.
E2E tests are re-using 312 resources for now until the appropriate 313 resources are created (cosmosdb, sql, eventgrid).
For this - is there a helper script to smoothen the process of creation? If not, please add it in the 3.13 backlog.
| @@ -100,6 +106,9 @@ jobs: | |||
| Python312V4: | |||
There was a problem hiding this comment.
In the long run - We should simplify these blocks - the yaml is unnecessarily long.
| Write-Host "##vso[task.setvariable variable=skipTest;]false" | ||
| } | ||
| displayName: 'Set skipTest variable' | ||
| condition: or(eq(variables.isSdkRelease, true), eq(variables['USETESTPYTHONSDK'], true)) |
| if [[ $version_minor -lt 13 ]]; then | ||
| cp -r azure_functions_worker/protos "$BUILD_SOURCESDIRECTORY/deps/azure_functions_worker" | ||
| else | ||
| cp -r proxy_worker/protos "$BUILD_SOURCESDIRECTORY/deps/proxy_worker" |
There was a problem hiding this comment.
in the long run - the name shouldn't change - can it be done in this iteration itself?
| UNIX_SHARED_MEMORY_DIRECTORIES, | ||
| ) | ||
| from azure_functions_worker.utils.common import get_app_setting, is_envvar_true | ||
| if sys.version_info.minor < 13: |
There was a problem hiding this comment.
I really don't like this :( Better to create a 3.13 specific util for now.
| subprocess.run( | ||
| [sys.executable, '-m', 'flake8', '--config', str(config_path)], | ||
| [sys.executable, '-m', 'flake8', '--config', str(config_path), | ||
| 'azure_functions_worker',], |
There was a problem hiding this comment.
Would it not "flake" other folders?
| if __name__ == '__main__': | ||
| add_script_root_to_sys_path() | ||
| main.main() | ||
| minor_version = sys.version_info[1] |
There was a problem hiding this comment.
We wanted to get out of the whole minor version thing - Please create a different test folder if needed.
| # third-party user packages over worker packages in PYTHONPATH | ||
| user_pkg_paths = determine_user_pkg_paths() | ||
| joined_pkg_paths = os.pathsep.join(user_pkg_paths) | ||
| env['PYTHONPATH'] = f'{joined_pkg_paths}:{func_worker_dir}' |
There was a problem hiding this comment.
This needs a documentation update
| # Copyright (c) Microsoft Corporation. All rights reserved. | ||
| # Licensed under the MIT License. | ||
|
|
||
| VERSION = "4.36.1" |
There was a problem hiding this comment.
Any specific reason it should be in a python file or would it be in the pyproject.toml itself
|
|
||
| from azure_functions_worker import protos | ||
| from azure_functions_worker.bindings import datumdef, meta | ||
| if sys.version_info.minor < 13: |
| logger.info("Args: %s", args) | ||
| logger.info('Starting proxy worker.') | ||
| logger.info('Worker ID: %s, Request ID: %s, Host Address: %s:%s', | ||
| args.worker_id, args.request_id, args.host, args.port) |
| return getattr(_invocation_id_local, 'invocation_id', None) | ||
|
|
||
|
|
||
| class AsyncLoggingHandler(logging.Handler): |
There was a problem hiding this comment.
Exploration for later - to move to logging
| return int(max_workers) if max_workers else None | ||
|
|
||
| async def _handle__worker_init_request(self, request): | ||
| logger.info('Received WorkerInitRequest, ' |
There was a problem hiding this comment.
the new logs added are intended to help confirm that the library worker is receiving requests appropriately.
No new info logs - make them debug. During cold start, adding a log should be fine but post Evan's validation.
|
|
||
| global _library_worker | ||
| directory = request.worker_init_request.function_app_directory | ||
| v2_directory = os.path.join(directory, get_script_file_name()) |
There was a problem hiding this comment.
Rename - make it explicit that its a function_app entry file for v2 model
| v2_directory = os.path.join(directory, get_script_file_name()) | ||
| if os.path.exists(v2_directory): | ||
| try: | ||
| import azure_functions_worker_v2 # NoQA |
There was a problem hiding this comment.
Explore to import this in a separate variable above.
x = importlib.import (azure_functions_worker_v2)
x.handle_dispatch_init in your code.
Decide if you want to run import lib again based on a return DependencyManager.prioritize_customer_dependencies(directory) if it tells you that Cx did bring in a runtime in reqs.txt
|
|
||
| def on_logging(self, record: logging.LogRecord, | ||
| formatted_msg: str) -> None: | ||
| if record.levelno >= logging.CRITICAL: |
There was a problem hiding this comment.
Thats a lot uses an if-elif chain to map log levels, consider having a map defined to clean the code.
| async def _dispatch_grpc_request(self, request): | ||
| content_type = request.WhichOneof("content") | ||
|
|
||
| match content_type: |
There was a problem hiding this comment.
Instead of if-else, use a dictionary
There was a problem hiding this comment.
So something like
handlers = handlers["content_type"]And handlers is a dict [<content_type:str>, <func_point:addr>]
| self._grpc_resp_queue.put_nowait(self._GRPC_STOP_RESPONSE) | ||
| self._grpc_thread.join() | ||
| self._grpc_thread = None | ||
|
|
There was a problem hiding this comment.
[Blocker] You are joining thread and cleanup, there could be exception from these which will left resources in open(or undefined state). I will encourage use to exception and finally to ensure thread safety.
There was a problem hiding this comment.
which will left resources in open(or undefined state)
This is already at the end of the workers lifetime - and we can discard any state.
| try: | ||
| forever = self._loop.create_future() | ||
|
|
||
| self._grpc_resp_queue.put_nowait( |
There was a problem hiding this comment.
[Checking, unaware of python data structures] If multiple thread attempt to put in the queue is that thread safe. We use something called BlockingQueue or ConcurrentQueue in Java
|
|
||
| def stop(self) -> None: | ||
| if self._grpc_thread is not None: | ||
| self._grpc_resp_queue.put_nowait(self._GRPC_STOP_RESPONSE) |
There was a problem hiding this comment.
Also you should consider having lock, if two thread access this code simultaneously, it will lead to race cndition.
| try: | ||
| for req in grpc_req_stream: | ||
| self._loop.call_soon_threadsafe( | ||
| self._loop.create_task, self._dispatch_grpc_request(req)) |
There was a problem hiding this comment.
[Checking] If _dispatch_grpc_request itself is thread safe? You might need to synchronize the _dispatch_grpc_request
| if logger and handler: | ||
| handler.flush() | ||
| logger.removeHandler(handler) | ||
|
|
There was a problem hiding this comment.
[Checking] Should you need close handler?
…ons-python-worker into gaaguiar/proxy_worker
…ons-python-worker into gaaguiar/proxy_worker
| logger.info("Args: %s", args) | ||
| logger.info( | ||
| 'Starting proxy worker. Worker ID: %s, Request ID: %s, Host Address: %s:%s', | ||
| args.worker_id, args.request_id, args.host, args.port) |
| cls._add_to_sys_path(cls.cx_deps_path, True) | ||
| cls._add_to_sys_path(working_directory, False) | ||
|
|
||
| logger.info(f'Finished prioritize_customer_dependencies: {sys.path}') |
There was a problem hiding this comment.
Please convert to debug (when GA)
| cls._add_to_sys_path(cls.worker_deps_path, True) | ||
| cls._add_to_sys_path(cls.cx_deps_path, True) | ||
| cls._add_to_sys_path(working_directory, False) |
There was a problem hiding this comment.
Can adding to first or last be more explicit - readable
| logger.info(f'Finished prioritize_customer_dependencies: {sys.path}') | ||
|
|
||
| @classmethod | ||
| def _add_to_sys_path(cls, path: str, add_to_first: bool): |
There was a problem hiding this comment.
move private methods to be earlier than public methods.
| @staticmethod | ||
| def _get_cx_deps_path() -> str: | ||
| """Get the directory storing the customer's third-party libraries. | ||
|
|
||
| Returns | ||
| ------- | ||
| str | ||
| Core Tools: path to customer's site packages | ||
| Linux Dedicated/Premium: path to customer's site packages | ||
| Linux Consumption: empty string | ||
| """ | ||
| prefix: Optional[str] = os.getenv(AZURE_WEBJOBS_SCRIPT_ROOT) | ||
| cx_paths: List[str] = [ | ||
| p for p in sys.path | ||
| if prefix and p.startswith(prefix) and ('site-packages' in p) | ||
| ] | ||
| # Return first or default of customer path | ||
| return (cx_paths or [''])[0] |
There was a problem hiding this comment.
Why is "AZURE_WEBJOBS_SCRIPT_ROOT" showing up here?
There was a problem hiding this comment.
First - explore using the dir from grpc request itself. or use cls.Cx_working_directory
| The worker packages path | ||
| """ | ||
| # 1. Try to parse the absolute path python/3.13/LINUX/X64 in sys.path | ||
| r = re.compile(r'.*python(\/|\\)\d+\.\d+(\/|\\)(WINDOWS|LINUX|OSX).*') |
There was a problem hiding this comment.
This Regex should be on the top as Constant
| # Don't reload proxy_worker | ||
| to_be_cleared_from_cache = set([ | ||
| module_name for module_name in not_builtin | ||
| if not module_name.startswith('proxy_worker') |
There was a problem hiding this comment.
Name - shouldn't be hardcoded. Should be moved to Constant.
|
|
||
| # App Setting constants | ||
| PYTHON_ENABLE_DEBUG_LOGGING = "PYTHON_ENABLE_DEBUG_LOGGING" | ||
| PYTHON_THREADPOOL_THREAD_COUNT = "PYTHON_THREADPOOL_THREAD_COUNT" |
There was a problem hiding this comment.
Proxy worker shouldn't create the TP. Thus this constant shouldn't be here.
| self._sync_call_tp: Optional[concurrent.futures.Executor] = ( | ||
| self._create_sync_call_tp(self._get_sync_tp_max_workers())) | ||
|
|
There was a problem hiding this comment.
We should explore - moving this logic into lib_worker and not to put in proxy worker.
| @classmethod | ||
| async def connect(cls, host: str, port: int, worker_id: str, | ||
| request_id: str, connect_timeout: float): | ||
| loop = asyncio.events.get_event_loop() |
There was a problem hiding this comment.
Add a task - app setting for enabling "uvloop" - add this feature.
uvloop.new_event_loop vs asyncio.events.get_event_loop()
| async def _dispatch_grpc_request(self, request): | ||
| content_type = request.WhichOneof("content") | ||
|
|
||
| match content_type: |
There was a problem hiding this comment.
So something like
handlers = handlers["content_type"]And handlers is a dict [<content_type:str>, <func_point:addr>]
| self._grpc_resp_queue.put_nowait(self._GRPC_STOP_RESPONSE) | ||
| self._grpc_thread.join() | ||
| self._grpc_thread = None | ||
|
|
There was a problem hiding this comment.
which will left resources in open(or undefined state)
This is already at the end of the workers lifetime - and we can discard any state.
vrdmr
left a comment
There was a problem hiding this comment.
Looks good to go in;
follow up tasks to be created and minor tests.
Description
Proxy Worker Changes (merge to dev)
Fixes #
PR information
Quality of Code and Contribution Guidelines