diff --git a/AGENTS.md b/AGENTS.md index 838e827a..6dcbe063 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -167,3 +167,30 @@ Entry format: - Details: Updated Hard Rules and BUILDER-CRITIC Step 4 to enforce critical/horizontal-only logging; removed non-critical historical entries (`ML-20260211-001`, `ML-20260211-004`, `ML-20260211-005`, `ML-20260211-006`, `ML-20260211-007`, `ML-20260211-008`) per owner request. - Verification: `rg -n "critical-only|ballast|Criticality|ML-20260211-00[1245678]|ML-20260212-009" AGENTS.md`; `sed -n '1,260p' AGENTS.md` - Links: `AGENTS.md` + +- ID: `ML-20260224-001` +- Timestamp: `2026-02-24T00:50:16Z` +- Type: `change` +- Summary: Refactored Deeploy manager endpoints to use PostponedRequest polling instead of blocking waits. +- Criticality: Operational/runtime behavior change; long-running deploy endpoints no longer block the main plugin loop. +- Details: Added non-blocking response checks in deeploy mixin, deferred blockchain confirmations to postponed solver, and converted create/update/scale-up endpoints to return PostponedRequest while tracking pending state and timeouts. +- Verification: Not run (not requested). +- Links: `extensions/business/deeploy/deeploy_manager_api.py`, `extensions/business/deeploy/deeploy_mixin.py` + +- ID: `ML-20260224-002` +- Timestamp: `2026-02-24T13:22:52Z` +- Type: `change` +- Summary: Restored blockchain update submissions for non-confirmable deeploy operations in async path. +- Criticality: Operational correctness for blockchain state updates when chainstore confirmations are disabled. +- Details: When response keys are absent, async create/update now submits node updates for non-confirmable jobs; scale-up submits confirmations using combined new/update nodes. Tests adjusted to override the mangled balance-check method without touching production behavior. +- Verification: Not run (not requested). +- Links: `extensions/business/deeploy/deeploy_manager_api.py`, `extensions/business/deeploy/test_deeploy.py` + +- ID: `ML-20260224-003` +- Timestamp: `2026-02-24T14:01:46Z` +- Type: `change` +- Summary: Fixed pending deeploy timeout cleanup and scale-up confirmation node extraction. +- Criticality: Correctness in deferred deploy processing and confirmation logic. +- Details: Timeout handler now uses pending_id from state and handles missing `now`; scale-up finalization extracts nodes from status entries safely. +- Verification: Not run (not requested). +- Links: `extensions/business/deeploy/deeploy_manager_api.py` diff --git a/extensions/business/container_apps/container_app_runner.py b/extensions/business/container_apps/container_app_runner.py index 7a1dcc8c..a96afc0a 100644 --- a/extensions/business/container_apps/container_app_runner.py +++ b/extensions/business/container_apps/container_app_runner.py @@ -3298,6 +3298,7 @@ def _handle_initial_launch(self): self._start_container_log_stream() self._maybe_execute_build_and_run() + self._last_image_check = self.time() self.P("Container launched successfully") self.P(self.container) if self.current_image_hash: diff --git a/extensions/business/container_apps/worker_app_runner.py b/extensions/business/container_apps/worker_app_runner.py index 097016ed..8d5b50bd 100644 --- a/extensions/business/container_apps/worker_app_runner.py +++ b/extensions/business/container_apps/worker_app_runner.py @@ -43,6 +43,7 @@ "REPO_URL": None, "BRANCH": "main", "POLL_INTERVAL": 60, + "RATE_LIMIT_BACKOFF": 5 * 60, }, # Disable image auto-update; Git monitoring drives restarts @@ -97,6 +98,7 @@ def _after_reset(self): self.branch = None self.repo_url = None self._last_git_check = 0 + self._git_backoff_until = 0 self._repo_configured = False self._repo_owner = None self._repo_name = None @@ -279,21 +281,19 @@ def _check_git_updates(self, current_time=None): return None self._last_git_check = current_time + previous_commit = self.current_commit latest_commit = self._get_latest_commit() if not latest_commit: return None - if self.current_commit and latest_commit != self.current_commit: + if previous_commit and latest_commit != previous_commit: self.P( - f"New commit detected ({latest_commit[:7]} != {self.current_commit[:7]}). Restart required.", + f"New commit detected ({latest_commit[:7]} != {previous_commit[:7]}). Restart required.", color='y', ) - self.current_commit = latest_commit return StopReason.EXTERNAL_UPDATE # Git update triggers external update restart else: - if not self.current_commit: - self.current_commit = latest_commit self.P(f"Commit check ({self.branch}): {latest_commit}", color='d') return None @@ -338,8 +338,9 @@ def _ensure_repo_state(self, initial=False): self._configure_repo_url() latest_commit = self._get_latest_commit() + self._last_git_check = self.time() + if latest_commit: - self.current_commit = latest_commit self.P(f"Latest commit on {self.branch}: {latest_commit}", color='d') else: self.P("Unable to determine latest commit during initialization", color='y') @@ -444,6 +445,9 @@ def _get_latest_commit(self, return_data=False): headers = {"Authorization": f"token {token}"} if token else {} + if self.time() < self._git_backoff_until: + return (None, None) if return_data else None + try: self.Pd(f"Commit check URL: {api_url}", score=5, color='b') resp = requests.get(api_url, headers=headers, timeout=10) @@ -451,13 +455,17 @@ def _get_latest_commit(self, return_data=False): if resp.status_code == 200: data = resp.json() latest_sha = data.get("commit", {}).get("sha", None) + if latest_sha: + self.current_commit = latest_sha if return_data: return latest_sha, data return latest_sha if resp.status_code == 404: self.P(f"Repository or branch not found: {api_url}", color='r') elif resp.status_code == 403: - self.P("GitHub API rate limit exceeded or access denied", color='r') + vcs_backoff = (getattr(self, 'cfg_vcs_data', {}) or {}).get('RATE_LIMIT_BACKOFF', 300) + self._git_backoff_until = self.time() + vcs_backoff + self.P(f"GitHub API rate limit exceeded or access denied. Backing off for {vcs_backoff}s.", color='r') else: self.P(f"Failed to fetch latest commit (HTTP {resp.status_code}): {resp.text}", color='r') except requests.RequestException as exc: diff --git a/extensions/business/dauth/dauth_manager.py b/extensions/business/dauth/dauth_manager.py index eff1a2d9..4dc88bbc 100644 --- a/extensions/business/dauth/dauth_manager.py +++ b/extensions/business/dauth/dauth_manager.py @@ -22,6 +22,7 @@ """ from extensions.business.mixins.node_tags_mixin import _NodeTagsMixin from naeural_core.business.default.web_app.supervisor_fast_api_web_app import SupervisorFastApiWebApp as BasePlugin +from extensions.business.mixins.request_tracking_mixin import _RequestTrackingMixin from extensions.business.dauth.dauth_mixin import _DauthMixin __VER__ = '0.2.2' @@ -35,7 +36,12 @@ 'DAUTH_VERBOSE' : False, 'DAUTH_LOG_RESPONSE' : True, + 'LOG_REQUESTS' : True, + 'REQUESTS_CSTORE_HKEY': 'DAUTH_REQUESTS', + 'REQUESTS_MAX_RECORDS': 2, + 'REQUESTS_LOG_INTERVAL': 5 * 60, + 'SUPRESS_LOGS_AFTER_INTERVAL' : 300, # required ENV keys are defined in plugin template and should be added here @@ -83,7 +89,8 @@ class DauthManagerPlugin( BasePlugin, _DauthMixin, - _NodeTagsMixin + _NodeTagsMixin, + _RequestTrackingMixin, ): """ This plugin is the dAuth FastAPI web app that provides an endpoints for decentralized authentication. @@ -103,10 +110,23 @@ def on_init(self): self.P("Started {} plugin on {} / {}\n - Auth keys: {}\n - Predefined keys: {}".format( self.__class__.__name__, my_address, my_eth_address, self.cfg_auth_env_keys, self.cfg_auth_predefined_keys) - ) + ) + self._init_request_tracking() return + def on_request(self, request): + self._track_request(request) + return + + def on_response(self, method, response): + self._track_response(method, response) + return + + def process(self): + self._maybe_log_and_save_tracked_requests() + return + def __get_current_epoch(self): """ Get the current epoch of the node. diff --git a/extensions/business/deeploy/deeploy_manager_api.py b/extensions/business/deeploy/deeploy_manager_api.py index b6ee3e11..eeffa430 100644 --- a/extensions/business/deeploy/deeploy_manager_api.py +++ b/extensions/business/deeploy/deeploy_manager_api.py @@ -10,6 +10,7 @@ from .deeploy_mixin import _DeeployMixin from .deeploy_target_nodes_mixin import _DeeployTargetNodesMixin from extensions.business.mixins.node_tags_mixin import _NodeTagsMixin +from extensions.business.mixins.request_tracking_mixin import _RequestTrackingMixin from .deeploy_const import ( DEEPLOY_CREATE_REQUEST, DEEPLOY_CREATE_REQUEST_MULTI_PLUGIN, DEEPLOY_GET_APPS_REQUEST, DEEPLOY_DELETE_REQUEST, DEEPLOY_ERRORS, DEEPLOY_KEYS, DEEPLOY_SCALE_UP_JOB_WORKERS_REQUEST, DEEPLOY_STATUS, DEEPLOY_INSTANCE_COMMAND_REQUEST, @@ -20,9 +21,6 @@ from naeural_core.business.default.web_app.supervisor_fast_api_web_app import SupervisorFastApiWebApp as BasePlugin -DEEPLOY_REQUESTS_CSTORE_HKEY = "DEEPLOY_REQUESTS" -DEEPLOY_REQUESTS_MAX_RECORDS = 5 - __VER__ = '0.6.0' @@ -33,14 +31,19 @@ 'ASSETS' : 'nothing', # TODO: this should not be required in future 'REQUEST_TIMEOUT': 300, + 'POSTPONED_POLL_INTERVAL': 0.5, 'DEEPLOY_VERBOSE' : 10, - + 'LOG_REQUESTS': True, + 'SUPRESS_LOGS_AFTER_INTERVAL' : 300, 'WARMUP_DELAY' : 300, 'PIPELINES_CHECK_DELAY' : 300, 'MIN_ETH_BALANCE' : 0.00005, + + 'REQUESTS_CSTORE_HKEY': 'DEEPLOY_REQUESTS', 'REQUESTS_LOG_INTERVAL' : 5 * 60, + 'REQUESTS_MAX_RECORDS' : 2, 'VALIDATION_RULES': { **BasePlugin.CONFIG['VALIDATION_RULES'], @@ -55,17 +58,19 @@ class DeeployManagerApiPlugin( _DeeployTargetNodesMixin, _NodeTagsMixin, _DeeployJobMixin, + _RequestTrackingMixin, ): """ This plugin is the dAuth FastAPI web app that provides an endpoints for decentralized authentication. """ CONFIG = _CONFIG - def __init__(self, **kwargs): super(DeeployManagerApiPlugin, self).__init__(**kwargs) return + def check_debug_logging_enabled(self): + return self.cfg_deeploy_verbose or super(DeeployManagerApiPlugin, self).check_debug_logging_enabled() def on_init(self): super(DeeployManagerApiPlugin, self).on_init() @@ -85,38 +90,17 @@ def on_init(self): color='r', boxed=True ) self.maybe_stop_tunnel_engine() - # Request tracking state - self.__recent_requests = self.deque(maxlen=DEEPLOY_REQUESTS_MAX_RECORDS) - self.__last_requests_log_time = 0 + self._init_request_tracking() + self.__pending_deploy_requests = {} return def on_request(self, request): - """ - Hook called when a new request arrives from the FastAPI side. - Captures minimal request metadata and writes the last N records to cstore. + self._track_request(request) + return - Parameters - ---------- - request : dict - Raw request payload pulled from the server queue. - Structure: {'id': str, 'value': tuple, 'profile': dict|None} - """ - try: - value = request.get('value') - endpoint = value[0] if isinstance(value, (list, tuple)) and len(value) > 0 else 'unknown' - record = { - 'ts': self.datetime.now(self.timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), - 'endpoint': endpoint, - } - self.__recent_requests.append(record) - self.chainstore_hset( - hkey=DEEPLOY_REQUESTS_CSTORE_HKEY, - key=self.ee_id, - value=list(self.__recent_requests), - ) - except Exception as e: - self.P(f"Error tracking request in cstore: {e}", color='r') + def on_response(self, method, response): + self._track_response(method, response) return @@ -222,7 +206,8 @@ def get_apps( def _process_pipeline_request( self, request: dict, - is_create: bool = True + is_create: bool = True, + async_mode: bool = False, ): """ Common logic for processing pipeline create/update requests. @@ -233,7 +218,9 @@ def _process_pipeline_request( The request dictionary is_create : bool True for create operations, False for update operations - + async_mode : bool + When True, return a pending state for PostponedRequest polling. + Returns ------- dict @@ -410,7 +397,7 @@ def _process_pipeline_request( pipeline_params=pipeline_params, ) - dct_status, str_status = self.check_and_deploy_pipelines( + dct_status, str_status, response_keys = self.check_and_deploy_pipelines( owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER], inputs=inputs, app_id=app_id, @@ -421,18 +408,8 @@ def _process_pipeline_request( discovered_plugin_instances=discovered_plugin_instances, dct_deeploy_specs_create=deeploy_specs_payload, job_app_type=job_app_type, + wait_for_responses=not async_mode, ) - - if nodes_changed and str_status in [DEEPLOY_STATUS.SUCCESS, DEEPLOY_STATUS.COMMAND_DELIVERED]: - if (dct_status is not None and is_confirmable_job and len(confirmation_nodes) == len(dct_status)) or not is_confirmable_job: - eth_nodes = [self.bc.node_addr_to_eth_addr(node) for node in confirmation_nodes] - eth_nodes = sorted(eth_nodes) - self.bc.submit_node_update( - job_id=job_id, - nodes=eth_nodes, - ) - #endif - #endif return_request = request.get(DEEPLOY_KEYS.RETURN_REQUEST, False) if return_request: @@ -454,6 +431,68 @@ def _process_pipeline_request( # if pipeline_params: # dct_request[DEEPLOY_KEYS.PIPELINE_PARAMS] = pipeline_params + if async_mode: + if len(response_keys) == 0: + if nodes_changed and not is_confirmable_job: + eth_nodes = [self.bc.node_addr_to_eth_addr(node) for node in confirmation_nodes] + eth_nodes = sorted(eth_nodes) + try: + self.P("Submitting blockchain update for job {} with nodes: {}".format(job_id, eth_nodes)) + self.bc.submit_node_update( + job_id=job_id, + nodes=eth_nodes, + ) + except Exception as e: + self.P(f"An error occurred while submitting node update for job {job_id}: {e}", color='r') + result = { + DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.COMMAND_DELIVERED, + DEEPLOY_KEYS.STATUS_DETAILS: {}, + DEEPLOY_KEYS.APP_ID: app_id, + DEEPLOY_KEYS.REQUEST: dct_request, + DEEPLOY_KEYS.AUTH: auth_result, + } + response = self._get_response({ + **result + }) + return response + + pending_state = { + 'kind': 'pipeline', + 'response_keys': response_keys, + 'dct_status': dct_status, + 'start_time': self.time(), + 'timeout': self.cfg_request_timeout, + 'next_check_ts': self.time() + self.cfg_postponed_poll_interval, + 'base_result': { + DEEPLOY_KEYS.APP_ID: app_id, + DEEPLOY_KEYS.REQUEST: dct_request, + DEEPLOY_KEYS.AUTH: auth_result, + }, + 'confirm': { + 'nodes_changed': nodes_changed, + 'confirmation_nodes': confirmation_nodes, + 'is_confirmable_job': is_confirmable_job, + 'job_id': job_id, + }, + } + return {'__pending__': pending_state} + + if nodes_changed and str_status in [DEEPLOY_STATUS.SUCCESS, DEEPLOY_STATUS.COMMAND_DELIVERED]: + if (dct_status is not None and is_confirmable_job and len(confirmation_nodes) == len(dct_status)) or not is_confirmable_job: + eth_nodes = [self.bc.node_addr_to_eth_addr(node) for node in confirmation_nodes] + eth_nodes = sorted(eth_nodes) + try: + self.P("Submitting blockchain update for job {} with nodes: {}".format(job_id, eth_nodes)) + self.bc.submit_node_update( + job_id=job_id, + nodes=eth_nodes, + ) + except Exception as e: + self.P(f"An error occurred while submitting node update for job {job_id}: {e}", color='r') + raise e + #endif + #endif + result = { DEEPLOY_KEYS.STATUS: str_status, DEEPLOY_KEYS.STATUS_DETAILS: dct_status, @@ -473,6 +512,260 @@ def _process_pipeline_request( }) return response + def _register_pending_deploy_request(self, pending_state): + """ + Register a pending deploy request and return a PostponedRequest handle. + + Parameters + ---------- + pending_state : dict + State payload containing response keys, timeout, and metadata. + + Returns + ------- + PostponedRequest + Deferred request handle for polling in the plugin loop. + """ + pending_id = self.uuid() + pending_state['pending_id'] = pending_id + self.__pending_deploy_requests[pending_id] = pending_state + return self.create_postponed_request( + solver_method=self.solve_postponed_deploy_request, + method_kwargs={ + 'pending_id': pending_id + } + ) + + def maybe_mark_timed_out_request(self, pending_id: str, pending, now: float = None): + """ + Check a pending request for timeout and return a response when expired. + + Parameters + ---------- + pending_id : str + Identifier of the pending deeploy request. + pending : dict + State payload containing response keys, timeout, and metadata. + now : float + Current timestamp. + + Returns + ------- + dict or None + Response dictionary if the request timed out, or None if still valid. + + """ + res = None + if now is None: + now = self.time() + if (now - pending['start_time']) > pending['timeout']: + if pending.get('kind') == 'scale_up': + result = { + DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.TIMEOUT, + DEEPLOY_KEYS.STATUS_DETAILS: pending.get('dct_status', {}), + DEEPLOY_KEYS.JOB_ID: pending.get('job_id'), + DEEPLOY_KEYS.REQUEST: pending.get('request'), + DEEPLOY_KEYS.AUTH: pending.get('auth'), + } + else: + result = { + DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.TIMEOUT, + DEEPLOY_KEYS.STATUS_DETAILS: pending.get('dct_status', {}), + **pending.get('base_result', {}) + } + self.__pending_deploy_requests.pop(pending_id, None) + res = self._get_response({ + **result + }) + # endif pending request timed out + return res + + def finalize_pending_request_pipeline( + self, pending, dct_status, str_status + ): + """ + Finalize a pending pipeline request and optionally submit BC updates. + + Parameters + ---------- + pending : dict + Pending request state. + dct_status : dict + Collected response details keyed by response key. + str_status : str + Aggregate status string. + + Returns + ------- + dict + Final response payload for the pipeline request. + """ + confirm = pending.get('confirm', {}) + nodes_changed = confirm.get('nodes_changed', False) + is_confirmable_job = confirm.get('is_confirmable_job', False) + confirmation_nodes = confirm.get('confirmation_nodes', []) + job_id = confirm.get('job_id', None) + if nodes_changed and str_status in [DEEPLOY_STATUS.SUCCESS, DEEPLOY_STATUS.COMMAND_DELIVERED]: + if (dct_status is not None and is_confirmable_job and len(confirmation_nodes) == len( + dct_status)) or not is_confirmable_job: + eth_nodes = [self.bc.node_addr_to_eth_addr(node) for node in confirmation_nodes] + eth_nodes = sorted(eth_nodes) + try: + self.P("Submitting blockchain update for job {} with nodes: {}".format(job_id, eth_nodes)) + self.bc.submit_node_update( + job_id=job_id, + nodes=eth_nodes, + ) + except Exception as e: + self.P(f"An error occurred while submitting node update for job {job_id}: {e}", color='r') + # endif nodes changed and success or delivered + + return { + DEEPLOY_KEYS.STATUS: str_status, + DEEPLOY_KEYS.STATUS_DETAILS: dct_status, + **pending.get('base_result', {}) + } + + def finalize_pending_request_scale_up( + self, pending, dct_status, str_status + ): + """ + Finalize a pending scale-up request and submit BC confirmation. + + Parameters + ---------- + pending : dict + Pending request state. + dct_status : dict + Collected response details keyed by response key. + str_status : str + Aggregate status string. + + Returns + ------- + dict + Final response payload for the scale-up request. + """ + job_id = pending.get('job_id') + is_confirmable_job = pending.get('is_confirmable_job', False) + nodes = list( + cstore_response.get("node") for cstore_response in dct_status.values() + if cstore_response.get("node") is not None + ) + self.Pd(f"Nodes to confirm: {self.json_dumps(nodes, indent=2)}") + self._submit_bc_job_confirmation( + str_status=str_status, + dct_status=dct_status, + nodes=nodes, + job_id=job_id, + is_confirmable_job=is_confirmable_job, + ) + return { + DEEPLOY_KEYS.STATUS: str_status, + DEEPLOY_KEYS.STATUS_DETAILS: dct_status, + DEEPLOY_KEYS.JOB_ID: job_id, + DEEPLOY_KEYS.REQUEST: pending.get('request'), + DEEPLOY_KEYS.AUTH: pending.get('auth'), + } + + def finalize_pending_request( + self, pending, dct_status, str_status, + ): + """ + Finalize a pending request based on its kind. + + Parameters + ---------- + pending : dict + Pending request state. + dct_status : dict + Collected response details keyed by response key. + str_status : str + Aggregate status string. + + Returns + ------- + dict + Final response payload. + """ + # Finalize pending request + if pending.get('kind') == 'pipeline': + result = self.finalize_pending_request_pipeline( + pending=pending, + dct_status=dct_status, + str_status=str_status, + ) + elif pending.get('kind') == 'scale_up': + result = self.finalize_pending_request_scale_up( + pending=pending, + dct_status=dct_status, + str_status=str_status, + ) + else: + result = { + DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.FAIL, + DEEPLOY_KEYS.ERROR: f"Unknown pending request kind: {pending.get('kind')}" + } + return result + + def solve_postponed_deploy_request(self, pending_id: str): + """ + Resolve a pending deploy request by polling for chainstore responses. + + Parameters + ---------- + pending_id : str + Identifier of the pending deploy request. + + Returns + ------- + dict or PostponedRequest + Final response when complete or a PostponedRequest to continue polling. + """ + pending = self.__pending_deploy_requests.get(pending_id) + if not pending: + result = { + DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.FAIL, + DEEPLOY_KEYS.ERROR: f"Pending request {pending_id} not found.", + } + return self._get_response({ + **result + }) + + now = self.time() + # Not all requests are processed every iteration in order to lighten the load on the CPU + postponed_kwargs = { + 'solver_method': self.solve_postponed_deploy_request, + 'method_kwargs': {'pending_id': pending_id}, + } + if now < pending.get('next_check_ts', 0): + return self.create_postponed_request(**postponed_kwargs) + + timeout_response = self.maybe_mark_timed_out_request(pending_id=pending_id, pending=pending, now=now) + if timeout_response: + return timeout_response + + dct_status, str_status, done = self._check_pipeline_responses_once( + response_keys=pending['response_keys'], + dct_status=pending.get('dct_status'), + ) + pending['dct_status'] = dct_status + + if not done: + pending['next_check_ts'] = now + self.cfg_postponed_poll_interval + return self.create_postponed_request(**postponed_kwargs) + + result = self.finalize_pending_request( + pending=pending, + dct_status=dct_status, + str_status=str_status, + ) + + self.__pending_deploy_requests.pop(pending_id, None) + return self._get_response({ + **result + }) + @BasePlugin.endpoint(method="post") # /create_pipeline def create_pipeline( @@ -593,7 +886,10 @@ def create_pipeline( 2. Move while checker for chainstore keys in process. """ self.Pd(f"Called Deeploy create_pipeline endpoint") - return self._process_pipeline_request(request, is_create=True) + result = self._process_pipeline_request(request, is_create=True, async_mode=True) + if isinstance(result, dict) and result.get('__pending__') is not None: + return self._register_pending_deploy_request(result['__pending__']) + return result @BasePlugin.endpoint(method="post") # /update_pipeline @@ -668,7 +964,10 @@ def update_pipeline( """ self.P(f"Received an update_pipeline request with body: {self.json_dumps(request)}") - return self._process_pipeline_request(request, is_create=False) + result = self._process_pipeline_request(request, is_create=False, async_mode=True) + if isinstance(result, dict) and result.get('__pending__') is not None: + return self._register_pending_deploy_request(result['__pending__']) + return result @BasePlugin.endpoint(method="post") def scale_up_job_workers(self, @@ -728,20 +1027,14 @@ def scale_up_job_workers(self, update_nodes = list(running_apps_for_job.keys()) new_nodes = self._check_nodes_availability(inputs) - dct_status, str_status = self.scale_up_job(new_nodes=new_nodes, - update_nodes=update_nodes, - owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER], - job_id=job_id, - running_apps_for_job=running_apps_for_job) - - nodes = list(cstore_response["node"] for cstore_response in dct_status.values()) - self.Pd(f"Nodes to confirm: {self.json_dumps(nodes, indent=2)}") - - self._submit_bc_job_confirmation(str_status=str_status, - dct_status=dct_status, - nodes=nodes, - job_id=job_id, - is_confirmable_job=is_confirmable_job) + dct_status, str_status, response_keys = self.scale_up_job( + new_nodes=new_nodes, + update_nodes=update_nodes, + owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER], + job_id=job_id, + running_apps_for_job=running_apps_for_job, + wait_for_responses=False + ) return_request = request.get(DEEPLOY_KEYS.RETURN_REQUEST, False) if return_request: @@ -749,16 +1042,42 @@ def scale_up_job_workers(self, else: dct_request = None - result = { - DEEPLOY_KEYS.STATUS: str_status, - DEEPLOY_KEYS.STATUS_DETAILS: dct_status, - DEEPLOY_KEYS.JOB_ID: job_id, - DEEPLOY_KEYS.REQUEST: dct_request, - DEEPLOY_KEYS.AUTH: auth_result, + if len(response_keys) == 0: + if not is_confirmable_job: + nodes = list(set(update_nodes + new_nodes)) + self.Pd(f"Nodes to confirm (non-confirmable job): {self.json_dumps(nodes, indent=2)}") + self._submit_bc_job_confirmation(str_status=DEEPLOY_STATUS.COMMAND_DELIVERED, + dct_status={}, + nodes=nodes, + job_id=job_id, + is_confirmable_job=is_confirmable_job) + result = { + DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.COMMAND_DELIVERED, + DEEPLOY_KEYS.STATUS_DETAILS: dct_status, + DEEPLOY_KEYS.JOB_ID: job_id, + DEEPLOY_KEYS.REQUEST: dct_request, + DEEPLOY_KEYS.AUTH: auth_result, + } + if self.cfg_deeploy_verbose > 1: + self.P(f"Request Result: {result}") + response = self._get_response({ + **result + }) + return response + + pending_state = { + 'kind': 'scale_up', + 'response_keys': response_keys, + 'dct_status': {}, + 'start_time': self.time(), + 'timeout': self.cfg_request_timeout, + 'next_check_ts': self.time() + self.cfg_postponed_poll_interval, + 'job_id': job_id, + 'is_confirmable_job': is_confirmable_job, + 'request': dct_request, + 'auth': auth_result, } - - if self.cfg_deeploy_verbose > 1: - self.P(f"Request Result: {result}") + return self._register_pending_deploy_request(pending_state) except Exception as e: result = self.__handle_error(e, request) @@ -1081,17 +1400,5 @@ def process(self): self.P(f"Error checking running pipelines: {e}", color='r') self.__last_pipelines_check_time = self.time() - # Periodic dump of all nodes' recent requests from cstore - if (self.time() - self.__last_requests_log_time) > self.cfg_requests_log_interval: - try: - all_requests = self.chainstore_hgetall(hkey=DEEPLOY_REQUESTS_CSTORE_HKEY) - if all_requests: - self.P(f"Deeploy requests across all nodes:\n{self.json_dumps(all_requests, indent=2)}") - else: - self.P("Deeploy requests across all nodes: no data") - except Exception as e: - self.P(f"Error dumping deeploy requests from cstore: {e}", color='r') - # end try - self.__last_requests_log_time = self.time() - # end if + self._maybe_log_and_save_tracked_requests() return diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 6a8da897..9eac1679 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -194,6 +194,7 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, else: dct_deeploy_specs.pop(DEEPLOY_KEYS.CHAINSTORE_RESPONSE_KEYS, None) + saved_pipeline = None for addr, node_plugins in node_plugins_by_addr.items(): msg = '' if self.cfg_deeploy_verbose > 1: @@ -202,7 +203,7 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, if addr is not None: - pipeline = self.cmdapi_start_pipeline_by_params( + saved_pipeline = self.cmdapi_start_pipeline_by_params( name=app_id, app_alias=app_alias, pipeline_type=app_type, @@ -215,15 +216,16 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, **pipeline_kwargs, ) - self.Pd(f"Pipeline started: {self.json_dumps(pipeline)}") - try: - save_result = self.save_job_pipeline_in_cstore(pipeline, job_id) - self.P(f"Pipeline saved in CSTORE: {save_result}") - except Exception as e: - self.P(f"Error saving pipeline in CSTORE: {e}", color="r") # endif addr is valid # endfor each target node + self.Pd(f"Pipeline started: {self.json_dumps(saved_pipeline)}") + try: + save_result = self.save_job_pipeline_in_cstore(saved_pipeline, job_id) + self.P(f"Pipeline CID saved in CSTORE: {save_result}", color='r' if not save_result else None) + except Exception as e: + self.P(f"Error saving pipeline in CSTORE: {e}", color="r") + cleaned_response_keys = prepared_response_keys if inputs.chainstore_response else {} return cleaned_response_keys @@ -568,6 +570,50 @@ def __prepare_plugins_for_update(self, inputs, discovered_plugin_instances): return instances_by_node, base_plugin + def _check_pipeline_responses_once(self, response_keys, dct_status=None): + """ + Check pipeline responses once without blocking. + + Parameters + ---------- + response_keys : dict + Mapping of node address to list of chainstore response keys. + dct_status : dict, optional + Accumulator of responses already received. + + Returns + ------- + tuple + (dct_status, str_status, done) with updated status and completion flag. + """ + if dct_status is None: + dct_status = {} + + if len(response_keys) == 0: + return dct_status, DEEPLOY_STATUS.COMMAND_DELIVERED, True + + for node_addr, response_keys_list in response_keys.items(): + for response_key in response_keys_list: + if response_key in dct_status: + continue + res = self.chainstore_get(response_key) + if res is not None: + self.Pd( + f"Received response for {response_key} from {node_addr}: {self.json_dumps(res)}. Node Addr: {node_addr}" + ) + dct_status[response_key] = { + 'node': node_addr, + 'details': res + } + # endif response key present + # endfor response keys of one node + # endfor nodes + + total_response_keys = sum(len(keys) for keys in response_keys.values()) + done = len(dct_status) == total_response_keys + str_status = DEEPLOY_STATUS.SUCCESS if done else DEEPLOY_STATUS.PENDING + return dct_status, str_status, done + def _get_pipeline_responses(self, response_keys, timeout_seconds=300): """ Wait until all the responses are received via CSTORE and compose status response. @@ -600,37 +646,26 @@ def _get_pipeline_responses(self, response_keys, timeout_seconds=300): done = False if len(response_keys) > 0 else True start_time = self.time() - self.Pd("Waiting for responses from nodes...") - self.Pd(f"Response keys to wait for: {self.json_dumps(response_keys, indent=2)}") + self.P(f"Response keys to wait for: {self.json_dumps(response_keys, indent=2)}") if len(response_keys) == 0: str_status = DEEPLOY_STATUS.COMMAND_DELIVERED return dct_status, str_status + self.P(f"Waiting for responses from {len(response_keys)} plugin instances...") + while not done: current_time = self.time() if current_time - start_time > timeout_seconds: str_status = DEEPLOY_STATUS.TIMEOUT - self.P(f"Timeout reached ({timeout_seconds} seconds) while waiting for responses. Current status: {self.json_dumps(dct_status, indent=2)}") - self.P(f"Response keys: {self.json_dumps(response_keys, indent=2)}") + self.P(f"Timeout reached ({timeout_seconds} seconds) while waiting for responses. Current status: {self.json_dumps(dct_status, indent=2)}", color='r') + self.P(f"Response keys: {self.json_dumps(response_keys, indent=2)}", color='r') break - - for node_addr, response_keys_list in response_keys.items(): - for response_key in response_keys_list: - if response_key in dct_status: - continue - res = self.chainstore_get(response_key) - if res is not None: - self.Pd( - f"Received response for {response_key} from {node_addr}: {self.json_dumps(res)}. Node Addr: {node_addr}") - dct_status[response_key] = { - 'node': node_addr, - 'details': res - } - total_response_keys = sum(len(keys) for keys in response_keys.values()) - if len(dct_status) == total_response_keys: - str_status = DEEPLOY_STATUS.SUCCESS - done = True + + dct_status, str_status, done = self._check_pipeline_responses_once( + response_keys=response_keys, + dct_status=dct_status, + ) # end for each response key # endwhile cycle until all responses are received return dct_status, str_status @@ -1748,9 +1783,49 @@ def deeploy_prepare_plugins(self, inputs): plugins = [plugin] return plugins - def check_and_deploy_pipelines(self, owner, inputs, app_id, app_alias, app_type, update_nodes, new_nodes, discovered_plugin_instances=[], dct_deeploy_specs=None, job_app_type=None, dct_deeploy_specs_create=None): + def check_and_deploy_pipelines( + self, owner, inputs, app_id, + app_alias, app_type, + update_nodes, new_nodes, + discovered_plugin_instances=[], + dct_deeploy_specs=None, job_app_type=None, + dct_deeploy_specs_create=None, + wait_for_responses=True + ): """ Validate the inputs and deploy the pipeline on the target nodes. + + Parameters + ---------- + owner : str + Escrow owner address. + inputs : dict-like + Normalized request inputs. + app_id : str + Application identifier. + app_alias : str + Application alias for display. + app_type : str + Pipeline type (capture type). + update_nodes : list[str] + Nodes that should receive update operations. + new_nodes : list[str] + Nodes that should receive create operations. + discovered_plugin_instances : list, optional + Discovered plugin instances used for update operations. + dct_deeploy_specs : dict, optional + Deeploy specs used for update operations. + job_app_type : str, optional + Detected or provided job app type. + dct_deeploy_specs_create : dict, optional + Deeploy specs used for create operations. + wait_for_responses : bool, optional + When True, block until responses are collected or timeout. + + Returns + ------- + tuple + (dct_status, str_status, response_keys) """ # Phase 1: Check if nodes are available @@ -1761,23 +1836,46 @@ def check_and_deploy_pipelines(self, owner, inputs, app_id, app_alias, app_type, # Phase 2: Launch the pipeline on each node and set CSTORE `response_key`` for the "callback" action response_keys = {} if len(update_nodes) > 0: - update_response_keys = self.__update_pipeline_on_nodes(update_nodes, inputs, app_id, app_alias, app_type, owner, discovered_plugin_instances, dct_deeploy_specs, job_app_type=job_app_type) + update_response_keys = self.__update_pipeline_on_nodes( + update_nodes, inputs, app_id, app_alias, app_type, + owner, discovered_plugin_instances, dct_deeploy_specs, + job_app_type=job_app_type + ) response_keys.update(update_response_keys) if len(new_nodes) > 0: - new_response_keys = self.__create_pipeline_on_nodes(new_nodes, inputs, app_id, app_alias, app_type, owner, job_app_type=job_app_type, dct_deeploy_specs=dct_deeploy_specs_create) + new_response_keys = self.__create_pipeline_on_nodes( + new_nodes, inputs, app_id, app_alias, app_type, owner, + job_app_type=job_app_type, + dct_deeploy_specs=dct_deeploy_specs_create + ) response_keys.update(new_response_keys) # Phase 3: Wait until all the responses are received via CSTORE and compose status response - dct_status, str_status = self._get_pipeline_responses(response_keys, 300) + # Reset Response Keys + self.P("Resetting response keys in chainstore before waiting for new responses...") + for _, node_response_keys in response_keys.items(): + for response_key in node_response_keys: + try: + self.chainstore_set(response_key, None) + except Exception as e: + self.P(f"Error resetting response key {response_key} in chainstore: {e}", color='r') + # end try + # end for + # end for + + if wait_for_responses: + dct_status, str_status = self._get_pipeline_responses(response_keys, 300) + else: + dct_status, str_status = {}, DEEPLOY_STATUS.PENDING self.P(f"Pipeline responses: str_status = {str_status} | dct_status =\n {self.json_dumps(dct_status, indent=2)}") # if pipelines to not use CHAINSTORE_RESPONSE, we can assume nodes reveived the command (BLIND) - to be modified in native plugins # else we consider all good if str_status is SUCCESS - return dct_status, str_status + return dct_status, str_status, response_keys - def scale_up_job(self, new_nodes, update_nodes, job_id, owner, running_apps_for_job): + def scale_up_job(self, new_nodes, update_nodes, job_id, owner, running_apps_for_job, wait_for_responses=True): """ Scale up the job workers. """ @@ -1809,9 +1907,12 @@ def scale_up_job(self, new_nodes, update_nodes, job_id, owner, running_apps_for_ update_pipelines=update_pipelines, owner=owner) - dct_status, str_status = self._get_pipeline_responses(chainstore_response_keys, 300) + if wait_for_responses: + dct_status, str_status = self._get_pipeline_responses(chainstore_response_keys, 300) + else: + dct_status, str_status = {}, DEEPLOY_STATUS.PENDING - return dct_status, str_status + return dct_status, str_status, chainstore_response_keys def _discover_plugin_instances( self, @@ -2365,7 +2466,9 @@ def _submit_bc_job_confirmation(self, str_status, dct_status, nodes, job_id, is_ def check_running_pipelines_and_add_to_r1fs(self): self.P(f"Checking running pipelines and adding them to R1FS...") running_pipelines = self.netmon.network_known_pipelines() + self.P(f"Retrieved pipelines from {len(running_pipelines)} nodes from NetMon.") listed_job_ids = self.list_all_deployed_jobs_from_cstore() + self.P(f"Retrieved {len(listed_job_ids)} listed job IDs from CSTORE.") netmon_job_ids = {} for node, pipelines in running_pipelines.items(): for pipeline in pipelines: @@ -2375,12 +2478,18 @@ def check_running_pipelines_and_add_to_r1fs(self): if job_id in netmon_job_ids or not job_id: continue netmon_job_ids[job_id] = pipeline + # endfor running pipelines + self.P(f"Identified {len(netmon_job_ids)} unique job IDs from running pipelines in NetMon.") for netmon_job_id, pipeline in netmon_job_ids.items(): listed_job_cid = listed_job_ids.get(str(netmon_job_id), None) if listed_job_cid and len(listed_job_cid) == 46: continue self.save_job_pipeline_in_cstore(pipeline, netmon_job_id) - + # endfor job IDs + # This should log how many new job pipelines were added to R1FS and how many were already listed, + # but at the moment save_job_pipeline_in_cstore does not return a value to determine that, so we log + # that we checked all job IDs. + self.P(f"Checked all job IDs.") return netmon_job_ids def delete_pipeline_from_nodes(self, app_id=None, job_id=None, owner=None, allow_missing=False, discovered_instances=None): diff --git a/extensions/business/deeploy/test_deeploy.py b/extensions/business/deeploy/test_deeploy.py new file mode 100644 index 00000000..1f15b5c2 --- /dev/null +++ b/extensions/business/deeploy/test_deeploy.py @@ -0,0 +1,524 @@ +import unittest + +from naeural_core.utils.fastapi_utils import PostponedRequest + +from extensions.business.deeploy.deeploy_manager_api import DeeployManagerApiPlugin +from extensions.business.deeploy.deeploy_const import DEEPLOY_KEYS, DEEPLOY_STATUS + + +class _BCStub: + """ + Minimal blockchain stub for deeploy tests. + """ + def __init__(self): + self.submitted = [] + + def node_addr_to_eth_addr(self, node): + """ + Convert internal node address to eth-style address. + + Parameters + ---------- + node : str + Internal node address. + + Returns + ------- + str + Stubbed eth address. + """ + return f"eth_{node}" + + def submit_node_update(self, job_id, nodes): + """ + Record a submit_node_update call. + + Parameters + ---------- + job_id : int + Job identifier. + nodes : list[str] + Target node addresses. + """ + self.submitted.append((job_id, list(nodes))) + + +class _InputsStub(dict): + """ + Dict-like object that exposes keys as attributes. + """ + def __getattr__(self, item): + """ + Fetch values via attribute access. + + Parameters + ---------- + item : str + Attribute name. + + Returns + ------- + Any + Value from the dictionary. + """ + try: + return self[item] + except KeyError: + raise AttributeError(item) + + +class _DeeployStub(DeeployManagerApiPlugin): + """ + Deeploy manager stub with minimal dependencies for unit testing. + """ + def __init__(self): + pass + + def time(self): + """ + Return current mocked time. + + Returns + ------- + float + Current mocked timestamp. + """ + return self._now + + def _get_response(self, dct_data): + """ + Return response payload without wrapping. + + Parameters + ---------- + dct_data : dict + Response data. + + Returns + ------- + dict + Response data passed through. + """ + return dct_data + + def create_postponed_request(self, solver_method, method_kwargs=None): + """ + Create a PostponedRequest instance. + + Parameters + ---------- + solver_method : callable + Solver method to call during polling. + method_kwargs : dict, optional + Solver keyword arguments. + + Returns + ------- + PostponedRequest + Deferred request handle. + """ + if method_kwargs is None: + method_kwargs = {} + return PostponedRequest(solver_method=solver_method, method_kwargs=method_kwargs) + + def chainstore_get(self, key): + """ + Lookup a chainstore key in local stub storage. + + Parameters + ---------- + key : str + Chainstore key. + + Returns + ------- + Any + Stored value or None. + """ + return self._chainstore.get(key) + + def P(self, *args, **kwargs): + """ + No-op logger. + """ + return + + def Pd(self, *args, **kwargs): + """ + No-op debug logger. + """ + return + + def json_dumps(self, obj, **kwargs): + """ + Serialize JSON for debug output. + + Parameters + ---------- + obj : Any + Object to serialize. + + Returns + ------- + str + JSON string. + """ + import json + return json.dumps(obj) + + # Deeploy API stubs for endpoint tests + def _DeeployManagerApiPlugin__ensure_eth_balance(self): + """ + Stub balance check. + + Returns + ------- + bool + Always True in tests. + """ + return True + + def deeploy_verify_and_get_inputs(self, request, require_sender_is_oracle=False, no_hash=True): + """ + Stub request verification. + + Parameters + ---------- + request : dict + Raw request. + require_sender_is_oracle : bool, optional + Oracle enforcement flag. + no_hash : bool, optional + Hashing flag. + + Returns + ------- + tuple + (sender, inputs) tuple for testing. + """ + inputs = _InputsStub(request) + return "sender", inputs + + def deeploy_get_auth_result(self, inputs): + """ + Stub auth result payload. + + Parameters + ---------- + inputs : dict + Request inputs. + + Returns + ------- + dict + Auth result with escrow owner. + """ + return {DEEPLOY_KEYS.ESCROW_OWNER: "owner"} + + def deeploy_check_payment_and_job_owner(self, inputs, owner, is_create=False, debug=False): + """ + Stub payment/ownership check. + + Parameters + ---------- + inputs : dict + Request inputs. + owner : str + Expected owner. + is_create : bool, optional + Create flag. + debug : bool, optional + Debug flag. + + Returns + ------- + bool + Always True in tests. + """ + return True + + def _get_online_apps(self, job_id=None, owner=None): + """ + Stub online app discovery. + + Parameters + ---------- + job_id : int, optional + Job identifier. + owner : str, optional + Owner address. + + Returns + ------- + dict + Minimal app map for tests. + """ + if job_id is None: + return {} + return {"node1": {"app1": {"dummy": True}}} + + def _check_nodes_availability(self, inputs, skip_resource_check=False): + """ + Stub node availability check. + + Parameters + ---------- + inputs : dict + Request inputs. + skip_resource_check : bool, optional + Resource check flag. + + Returns + ------- + list[str] + Single node address for tests. + """ + return ["node2"] + + def scale_up_job(self, new_nodes, update_nodes, job_id, owner, running_apps_for_job, wait_for_responses=True): + """ + Stub scale-up operation. + + Parameters + ---------- + new_nodes : list[str] + New nodes to deploy on. + update_nodes : list[str] + Existing nodes to update. + job_id : int + Job identifier. + owner : str + Owner address. + running_apps_for_job : dict + Running apps mapping. + wait_for_responses : bool, optional + Wait flag (ignored). + + Returns + ------- + tuple + (dct_status, str_status, response_keys) + """ + response_keys = {"node1": ["k1"]} + return {}, DEEPLOY_STATUS.PENDING, response_keys + + +class DeeployPostponedTests(unittest.TestCase): + """ + Unit tests for postponed deeploy flow. + """ + def setUp(self): + """ + Initialize stub plugin instance. + """ + self.plugin = _DeeployStub.__new__(_DeeployStub) + self.plugin._now = 1_000.0 + self.plugin.cfg_postponed_poll_interval = 0.5 + self.plugin.cfg_request_timeout = 10 + self.plugin._chainstore = {} + self.plugin.bc = _BCStub() + self.plugin._DeeployManagerApiPlugin__pending_deploy_requests = {} + + def test_check_pipeline_responses_no_keys(self): + """ + Ensure empty response keys resolve immediately. + """ + dct_status, str_status, done = self.plugin._check_pipeline_responses_once({}) + self.assertEqual(dct_status, {}) + self.assertEqual(str_status, DEEPLOY_STATUS.COMMAND_DELIVERED) + self.assertTrue(done) + + def test_check_pipeline_responses_partial(self): + """ + Ensure partial responses remain pending. + """ + response_keys = {"nodeA": ["k1", "k2"]} + self.plugin._chainstore["k1"] = {"ok": True} + dct_status, str_status, done = self.plugin._check_pipeline_responses_once(response_keys) + self.assertIn("k1", dct_status) + self.assertNotIn("k2", dct_status) + self.assertEqual(str_status, DEEPLOY_STATUS.PENDING) + self.assertFalse(done) + + def test_solve_postponed_pipeline_timeout(self): + """ + Ensure pipeline pending state times out correctly. + """ + pending_id = "pid1" + self.plugin._DeeployManagerApiPlugin__pending_deploy_requests[pending_id] = { + 'kind': 'pipeline', + 'response_keys': {"nodeA": ["k1"]}, + 'dct_status': {}, + 'start_time': 0, + 'timeout': 1, + 'next_check_ts': 0, + 'base_result': { + DEEPLOY_KEYS.APP_ID: "app1", + DEEPLOY_KEYS.REQUEST: {"x": 1}, + DEEPLOY_KEYS.AUTH: {"a": 2}, + }, + 'confirm': {}, + } + self.plugin._now = 5 + res = self.plugin.solve_postponed_deploy_request(pending_id) + self.assertEqual(res[DEEPLOY_KEYS.STATUS], DEEPLOY_STATUS.TIMEOUT) + self.assertEqual(res[DEEPLOY_KEYS.APP_ID], "app1") + + def test_solve_postponed_scale_up_timeout(self): + """ + Ensure scale-up pending state times out with expected fields. + """ + pending_id = "pid2" + self.plugin._DeeployManagerApiPlugin__pending_deploy_requests[pending_id] = { + 'kind': 'scale_up', + 'response_keys': {"nodeA": ["k1"]}, + 'dct_status': {}, + 'start_time': 0, + 'timeout': 1, + 'next_check_ts': 0, + 'job_id': 123, + 'is_confirmable_job': True, + 'request': {"req": 1}, + 'auth': {"auth": 2}, + } + self.plugin._now = 5 + res = self.plugin.solve_postponed_deploy_request(pending_id) + self.assertEqual(res[DEEPLOY_KEYS.STATUS], DEEPLOY_STATUS.TIMEOUT) + self.assertEqual(res[DEEPLOY_KEYS.JOB_ID], 123) + self.assertIn(DEEPLOY_KEYS.REQUEST, res) + self.assertIn(DEEPLOY_KEYS.AUTH, res) + + def test_solve_postponed_returns_postponed_when_not_ready(self): + """ + Ensure solver returns PostponedRequest when gating interval not reached. + """ + pending_id = "pid3" + self.plugin._DeeployManagerApiPlugin__pending_deploy_requests[pending_id] = { + 'kind': 'pipeline', + 'response_keys': {"nodeA": ["k1"]}, + 'dct_status': {}, + 'start_time': 0, + 'timeout': 100, + 'next_check_ts': self.plugin._now + 5, + 'base_result': {}, + 'confirm': {}, + } + res = self.plugin.solve_postponed_deploy_request(pending_id) + self.assertIsInstance(res, PostponedRequest) + + def test_solve_postponed_pipeline_success_confirms(self): + """ + Ensure successful pipeline confirms blockchain update. + """ + pending_id = "pid4" + self.plugin._DeeployManagerApiPlugin__pending_deploy_requests[pending_id] = { + 'kind': 'pipeline', + 'response_keys': {"nodeA": ["k1", "k2"]}, + 'dct_status': {}, + 'start_time': 0, + 'timeout': 100, + 'next_check_ts': 0, + 'base_result': { + DEEPLOY_KEYS.APP_ID: "app1", + DEEPLOY_KEYS.REQUEST: {}, + DEEPLOY_KEYS.AUTH: {}, + }, + 'confirm': { + 'nodes_changed': True, + 'confirmation_nodes': ["nodeA"], + 'is_confirmable_job': True, + 'job_id': 77, + }, + } + self.plugin._chainstore["k1"] = {"ok": True} + self.plugin._chainstore["k2"] = {"ok": True} + res = self.plugin.solve_postponed_deploy_request(pending_id) + self.assertEqual(res[DEEPLOY_KEYS.STATUS], DEEPLOY_STATUS.SUCCESS) + self.assertEqual(self.plugin.bc.submitted, [(77, ["eth_nodeA"])]) + + def test_solve_postponed_missing_pending(self): + """ + Ensure missing pending ID returns failure. + """ + res = self.plugin.solve_postponed_deploy_request("missing") + self.assertEqual(res[DEEPLOY_KEYS.STATUS], DEEPLOY_STATUS.FAIL) + + +class DeeployEndpointTests(unittest.TestCase): + """ + Endpoint-level tests for deeploy manager API. + """ + def setUp(self): + """ + Initialize stub plugin instance. + """ + self.plugin = _DeeployStub.__new__(_DeeployStub) + self.plugin._now = 1_000.0 + self.plugin.cfg_postponed_poll_interval = 0.5 + self.plugin.cfg_request_timeout = 10 + self.plugin._chainstore = {} + self.plugin.bc = _BCStub() + self.plugin._DeeployManagerApiPlugin__pending_deploy_requests = {} + + def test_create_pipeline_returns_postponed(self): + """ + Ensure create_pipeline returns PostponedRequest on pending state. + """ + def _process_pipeline_request(request, is_create=True, async_mode=False): + return {'__pending__': {'kind': 'pipeline', 'response_keys': {"n": ["k"]}, 'dct_status': {}, + 'start_time': self.plugin._now, 'timeout': 10, 'next_check_ts': 0, + 'base_result': {}, 'confirm': {}}} + self.plugin._process_pipeline_request = _process_pipeline_request + res = self.plugin.create_pipeline({}) + self.assertIsInstance(res, PostponedRequest) + + def test_update_pipeline_returns_postponed(self): + """ + Ensure update_pipeline returns PostponedRequest on pending state. + """ + def _process_pipeline_request(request, is_create=True, async_mode=False): + return {'__pending__': {'kind': 'pipeline', 'response_keys': {"n": ["k"]}, 'dct_status': {}, + 'start_time': self.plugin._now, 'timeout': 10, 'next_check_ts': 0, + 'base_result': {}, 'confirm': {}}} + self.plugin._process_pipeline_request = _process_pipeline_request + res = self.plugin.update_pipeline({}) + self.assertIsInstance(res, PostponedRequest) + + def test_create_pipeline_passthrough(self): + """ + Ensure create_pipeline returns direct result when not pending. + """ + def _process_pipeline_request(request, is_create=True, async_mode=False): + return {"status": "ok"} + self.plugin._process_pipeline_request = _process_pipeline_request + res = self.plugin.create_pipeline({}) + self.assertEqual(res["status"], "ok") + + def test_scale_up_job_workers_returns_postponed(self): + """ + Ensure scale_up_job_workers returns PostponedRequest when response keys exist. + """ + req = { + DEEPLOY_KEYS.JOB_ID: 10, + DEEPLOY_KEYS.CHAINSTORE_RESPONSE: True, + } + res = self.plugin.scale_up_job_workers(req) + self.assertIsInstance(res, PostponedRequest) + + def test_scale_up_job_workers_command_delivered(self): + """ + Ensure scale_up_job_workers returns command-delivered when no keys exist. + """ + def scale_up_job(new_nodes, update_nodes, job_id, owner, running_apps_for_job, wait_for_responses=True): + return {}, DEEPLOY_STATUS.PENDING, {} + self.plugin.scale_up_job = scale_up_job + req = { + DEEPLOY_KEYS.JOB_ID: 10, + DEEPLOY_KEYS.CHAINSTORE_RESPONSE: False, + } + res = self.plugin.scale_up_job_workers(req) + self.assertEqual(res[DEEPLOY_KEYS.STATUS], DEEPLOY_STATUS.COMMAND_DELIVERED) + + +if __name__ == "__main__": + unittest.main() diff --git a/extensions/business/mixins/request_tracking_mixin.py b/extensions/business/mixins/request_tracking_mixin.py new file mode 100644 index 00000000..9c6e710f --- /dev/null +++ b/extensions/business/mixins/request_tracking_mixin.py @@ -0,0 +1,117 @@ +DEFAULT_REQUESTS_MAX_RECORDS = 2 +DEFAULT_REQUESTS_LOG_INTERVAL = 5 * 60 # 300 seconds + + +class _RequestTrackingMixin(object): + """ + Mixin that adds chainstore-based request/response tracking to FastAPI plugins. + + Opt-in: set REQUESTS_CSTORE_HKEY in plugin config to enable. + When not set (None), all methods are no-ops. + + Config keys: + REQUESTS_CSTORE_HKEY : str or None -- chainstore hash key (None = disabled) + REQUESTS_MAX_RECORDS : int -- max recent requests in deque (default 2) + REQUESTS_LOG_INTERVAL : int -- seconds between cross-node log dumps (default 300) + """ + + @property + def __rt_max_records(self): + val = getattr(self, 'cfg_requests_max_records', None) + if not isinstance(val, int) or val < 1: + return DEFAULT_REQUESTS_MAX_RECORDS + return val + + @property + def __rt_log_interval(self): + val = getattr(self, 'cfg_requests_log_interval', None) + if not isinstance(val, (int, float)) or val <= 0: + return DEFAULT_REQUESTS_LOG_INTERVAL + return val + + @property + def __rt_cstore_hkey(self): + return getattr(self, 'cfg_requests_cstore_hkey', None) + + def _init_request_tracking(self): + """Call from on_init(). Initializes tracking state if enabled.""" + self.__rt_recent_requests = None + self.__rt_last_log_time = 0 + self.__rt_dirty = False + if self.__rt_cstore_hkey: + self.__rt_recent_requests = self.deque(maxlen=self.__rt_max_records) + return + + def _track_request(self, request): + """ + Called from on_request (monitor thread). Records request start. + + NOTE: does NOT write to chainstore — only appends to the in-memory deque + and marks it dirty. The actual chainstore write is deferred to + _track_response / _maybe_log_tracked_requests which run on the main thread, + avoiding timer corruption from concurrent thread access. + """ + if self.__rt_recent_requests is None: + return + try: + value = request.get('value') + request_id = request.get('id') + endpoint = value[0] if isinstance(value, (list, tuple)) and len(value) > 0 else 'unknown' + record = { + 'id': request_id, + 'endpoint': endpoint, + 'date_start': self.datetime.now(self.timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), + 'date_complete': None, + } + self.__rt_recent_requests.append(record) + self.__rt_dirty = True + except Exception as e: + self.P(f"Error tracking request in cstore: {e}", color='r') + return + + def _track_response(self, method, response): + """Called from the response processing flow (main thread). Stamps completion time and flushes to chainstore.""" + if self.__rt_recent_requests is None: + return + try: + request_id = response.get('id') + for record in self.__rt_recent_requests: + if record.get('id') == request_id: + record['date_complete'] = self.datetime.now(self.timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + self.__rt_dirty = True + break + except Exception as e: + self.P(f"Error tracking response in cstore: {e}", color='r') + if self.__rt_dirty: + self.__rt_save() + return + + def __rt_save(self): + """Write recent requests to chainstore (main thread only).""" + self.chainstore_hset( + hkey=self.__rt_cstore_hkey, + key=self.ee_id, + value=list(self.__rt_recent_requests), + debug=True + ) + self.__rt_dirty = False + return + + def _maybe_log_and_save_tracked_requests(self): + """Call from process() (main thread). Flushes dirty data and periodically logs cross-node request data.""" + if self.__rt_recent_requests is None: + return + if self.__rt_dirty: + self.__rt_save() + if (self.time() - self.__rt_last_log_time) > self.__rt_log_interval: + try: + hkey = self.__rt_cstore_hkey + all_requests = self.chainstore_hgetall(hkey=hkey) + if all_requests: + self.P(f"{hkey} requests across all nodes:\n{self.json_dumps(all_requests, indent=2)}") + else: + self.P(f"{hkey} requests across all nodes: no data") + except Exception as e: + self.P(f"Error dumping requests from cstore: {e}", color='r') + self.__rt_last_log_time = self.time() + return diff --git a/ver.py b/ver.py index 9a83f5be..a66c2015 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.60' +__VER__ = '2.10.70'