diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index 69ae69fd..02ea4de4 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -43,6 +43,8 @@ class DEEPLOY_KEYS: PIPELINE_PARAMS = "pipeline_params" PIPELINE = "pipeline" PIPELINE_CID = "pipeline_cid" + ONLINE = "online" + CHAIN_JOB = "chain_job" JOB_CONFIG = "job_config" # App params keys APP_PARAMS = "app_params" diff --git a/extensions/business/deeploy/deeploy_manager_api.py b/extensions/business/deeploy/deeploy_manager_api.py index eeffa430..1070cb3e 100644 --- a/extensions/business/deeploy/deeploy_manager_api.py +++ b/extensions/business/deeploy/deeploy_manager_api.py @@ -184,7 +184,8 @@ def get_apps( sender, inputs = self.deeploy_verify_and_get_inputs(request) auth_result = self.deeploy_get_auth_result(inputs) - apps = self._get_online_apps( + apps = self._get_apps_by_escrow_active_jobs( + sender_escrow=auth_result[DEEPLOY_KEYS.SENDER_ESCROW], owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER], project_id=inputs.get(DEEPLOY_KEYS.PROJECT_ID, None) ) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 9eac1679..3c61b829 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2548,10 +2548,16 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i filtered_result[node][app_name] = app_data result = filtered_result if job_id is not None: + if isinstance(job_id, int): + job_id = [job_id] + unique_job_ids = set() + for raw_value in job_id: + unique_job_ids.add(raw_value) filtered_result = self.defaultdict(dict) for node, apps in result.items(): for app_name, app_data in apps.items(): - if app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) != job_id: + app_job_id = app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) + if app_job_id not in unique_job_ids: continue filtered_result[node][app_name] = app_data result = filtered_result @@ -2570,6 +2576,111 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i app_data["node_alias"] = node_alias return result + def _serialize_chain_job(self, raw_job): + """ + Serialize chain job details for JSON APIs, keeping bigint-like values as strings. + """ + serialized = { + "id": str(raw_job.get("jobId", "")), + "projectHash": raw_job.get("projectHash"), + "requestTimestamp": str(raw_job.get("requestTimestamp", 0)), + "startTimestamp": str(raw_job.get("startTimestamp", 0)), + "lastNodesChangeTimestamp": str(raw_job.get("lastNodesChangeTimestamp", 0)), + "jobType": str(raw_job.get("jobType", 0)), + "pricePerEpoch": str(raw_job.get("pricePerEpoch", 0)), + "lastExecutionEpoch": str(raw_job.get("lastExecutionEpoch", 0)), + "numberOfNodesRequested": str(raw_job.get("numberOfNodesRequested", 0)), + "balance": str(raw_job.get("balance", 0)), + "lastAllocatedEpoch": str(raw_job.get("lastAllocatedEpoch", 0)), + "activeNodes": [str(node) for node in raw_job.get("activeNodes", [])], + "network": raw_job.get("network"), + "escrowAddress": raw_job.get("escrowAddress"), + } + return serialized + + def _chain_job_matches_project_id(self, chain_job, project_id): + """ + Validate whether serialized chain job details belong to the provided project. + """ + if project_id is None: + return True + return str(chain_job["projectHash"]).lower() == str(project_id).lower() + + def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None): + """ + Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots. + + Response shape: + { + "": { + "job_id": , + "pipeline": , # raw R1FS payload + "chain_job": , # serialized chain job details + "online": , # online apps snapshot keyed by node + } + } + """ + result = {} + + active_jobs = self.bc.get_escrow_active_jobs(sender_escrow) + active_job_ids = [int(job["jobId"]) for job in active_jobs] + self.Pd(f"Fetched {len(active_job_ids)} active jobs from escrow {sender_escrow}, fetching details") + + # Fetch online apps once, then reuse grouped entries per job_id. + all_online_apps = self._get_online_apps( + owner=owner, + job_id=active_job_ids, + project_id=project_id + ) + + online_apps_by_job_id = self.defaultdict(lambda: self.defaultdict(dict)) + for node, apps in all_online_apps.items(): + for app_name, app_data in apps.items(): + app_job_id = int(app_data[NetMonCt.DEEPLOY_SPECS][DEEPLOY_KEYS.JOB_ID]) + online_apps_by_job_id[app_job_id][node][app_name] = app_data + + for active_job in active_jobs: + job_id = int(active_job["jobId"]) + chain_job = self._serialize_chain_job(active_job) + chain_matches_project = self._chain_job_matches_project_id(chain_job, project_id) + + grouped_online_apps = online_apps_by_job_id.get(job_id, {}) + online_apps = {node: dict(apps) for node, apps in grouped_online_apps.items()} + + pipeline = None + try: + pipeline = self.get_job_pipeline_from_cstore(job_id) + except Exception as exc: + self.Pd(f"Failed to load R1FS payload for job {job_id}: {exc}", color='y') + pipeline = None + + if pipeline is not None: + pipeline_owner = pipeline[NetMonCt.OWNER] + if pipeline_owner != owner: + self.Pd( + f"Skipping R1FS payload for job {job_id}: owner mismatch " + f"(expected {owner}, got {pipeline_owner}).", + color='y' + ) + pipeline = None + + if pipeline.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.PROJECT_ID) != project_id: + self.Pd(f"Skipping R1FS payload for job {job_id}: project_id mismatch.", color='y') + pipeline = None + + if pipeline is None and len(online_apps) == 0 and project_id is not None and not chain_matches_project: + # If a project filter is requested and neither source matches, skip this job. + continue + + result[str(job_id)] = { + DEEPLOY_KEYS.JOB_ID: job_id, + DEEPLOY_KEYS.PIPELINE: pipeline, + DEEPLOY_KEYS.ONLINE: online_apps, + DEEPLOY_KEYS.CHAIN_JOB: chain_job, + } + + return result + # TODO: REMOVE THIS, once instance_id is coming from ui for instances that have to be updated # Maybe add is_new_instance:bool for native apps, that want to add an extra plugin def _ensure_plugin_instance_ids(self, inputs, discovered_plugin_instances, owner=None, app_id=None, job_id=None):