From 2f55a8deeeb251e4b57780f0d60bed7eec4850ff Mon Sep 17 00:00:00 2001 From: Alessandro Date: Tue, 24 Feb 2026 15:04:42 +0100 Subject: [PATCH 1/5] feat: get apps from R1FS --- extensions/business/deeploy/deeploy_const.py | 1 + .../business/deeploy/deeploy_manager_api.py | 3 +- extensions/business/deeploy/deeploy_mixin.py | 127 ++++++++++++++++++ 3 files changed, 130 insertions(+), 1 deletion(-) diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index 69ae69fd..33d0100f 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -43,6 +43,7 @@ class DEEPLOY_KEYS: PIPELINE_PARAMS = "pipeline_params" PIPELINE = "pipeline" PIPELINE_CID = "pipeline_cid" + ONLINE = "online" JOB_CONFIG = "job_config" # App params keys APP_PARAMS = "app_params" diff --git a/extensions/business/deeploy/deeploy_manager_api.py b/extensions/business/deeploy/deeploy_manager_api.py index eeffa430..1070cb3e 100644 --- a/extensions/business/deeploy/deeploy_manager_api.py +++ b/extensions/business/deeploy/deeploy_manager_api.py @@ -184,7 +184,8 @@ def get_apps( sender, inputs = self.deeploy_verify_and_get_inputs(request) auth_result = self.deeploy_get_auth_result(inputs) - apps = self._get_online_apps( + apps = self._get_apps_by_escrow_active_jobs( + sender_escrow=auth_result[DEEPLOY_KEYS.SENDER_ESCROW], owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER], project_id=inputs.get(DEEPLOY_KEYS.PROJECT_ID, None) ) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 9eac1679..65164df5 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2570,6 +2570,133 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i app_data["node_alias"] = node_alias return result + def _pipeline_matches_project_id(self, pipeline, project_id): + """ + Validate whether a raw pipeline payload belongs to a given project. + """ + if project_id is None: + return True + if not isinstance(pipeline, dict): + return False + + deeploy_specs = ( + pipeline.get(ct.CONFIG_STREAM.DEEPLOY_SPECS, None) or + pipeline.get(NetMonCt.DEEPLOY_SPECS, None) or + pipeline.get("DEEPLOY_SPECS", None) or + pipeline.get("deeploy_specs", None) + ) + if not isinstance(deeploy_specs, dict): + return False + return deeploy_specs.get(DEEPLOY_KEYS.PROJECT_ID, None) == project_id + + def _normalize_active_job_ids(self, active_job_ids): + """ + Normalize and deduplicate active job IDs returned by blockchain calls. + """ + normalized_job_ids = [] + seen = set() + + if not isinstance(active_job_ids, list): + return normalized_job_ids + + for raw_job_id in active_job_ids: + try: + parsed_job_id = int(raw_job_id) + except Exception: + self.Pd(f"Skipping invalid active job id '{raw_job_id}' returned by blockchain.", color='y') + continue + + if parsed_job_id in seen: + continue + seen.add(parsed_job_id) + normalized_job_ids.append(parsed_job_id) + + return normalized_job_ids + + def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, project_id=None): + """ + Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots. + + Response shape: + { + "": { + "job_id": , + "pipeline_cid": , + "pipeline": , # raw R1FS payload + "online": , # online apps snapshot keyed by node + } + } + """ + result = {} + if not sender_escrow: + return result + + raw_active_job_ids = self.bc.get_escrow_active_job_ids(sender_escrow) + active_job_ids = self._normalize_active_job_ids(raw_active_job_ids) + self.Pd(f"Escrow {sender_escrow} active job ids: {active_job_ids}") + + for job_id in active_job_ids: + online_apps = self._get_online_apps( + owner=owner, + job_id=job_id, + project_id=project_id + ) + if not isinstance(online_apps, dict): + online_apps = {} + else: + normalized_online_apps = {} + for node, apps in online_apps.items(): + if not isinstance(apps, dict): + self.Pd(f"Skipping malformed online apps payload for node {node}.", color='y') + continue + normalized_online_apps[node] = dict(apps) + online_apps = normalized_online_apps + + pipeline_cid = None + pipeline = None + try: + pipeline_cid = self._get_pipeline_from_cstore(job_id) + if pipeline_cid: + pipeline = self.get_pipeline_from_r1fs(pipeline_cid) + except Exception as exc: + self.Pd(f"Failed to load R1FS payload for job {job_id}: {exc}", color='y') + pipeline = None + pipeline_cid = None + + if pipeline is not None and owner is not None: + pipeline_owner = ( + pipeline.get(ct.CONFIG_STREAM.K_OWNER, None) or + pipeline.get(NetMonCt.OWNER, None) or + pipeline.get("OWNER", None) or + pipeline.get("owner", None) + ) + if pipeline_owner is not None and pipeline_owner != owner: + self.Pd( + f"Skipping R1FS payload for job {job_id}: owner mismatch " + f"(expected {owner}, got {pipeline_owner}).", + color='y' + ) + pipeline = None + pipeline_cid = None + + if pipeline is not None and project_id is not None and not self._pipeline_matches_project_id(pipeline, project_id): + self.Pd(f"Skipping R1FS payload for job {job_id}: project_id mismatch.", color='y') + pipeline = None + pipeline_cid = None + + if pipeline is None and len(online_apps) == 0 and project_id is not None: + # If a project filter is requested and neither source matches, skip this job. + continue + + result[str(job_id)] = { + DEEPLOY_KEYS.JOB_ID: job_id, + DEEPLOY_KEYS.PIPELINE_CID: pipeline_cid if pipeline is not None else None, + DEEPLOY_KEYS.PIPELINE: pipeline if pipeline is not None else None, + DEEPLOY_KEYS.ONLINE: online_apps, + } + + return result + # TODO: REMOVE THIS, once instance_id is coming from ui for instances that have to be updated # Maybe add is_new_instance:bool for native apps, that want to add an extra plugin def _ensure_plugin_instance_ids(self, inputs, discovered_plugin_instances, owner=None, app_id=None, job_id=None): From e25af513469051937e5e58c53d5a9e0fb10b2484 Mon Sep 17 00:00:00 2001 From: Alessandro Date: Thu, 26 Feb 2026 14:52:31 +0100 Subject: [PATCH 2/5] feat: include chain job details in get_apps payload --- extensions/business/deeploy/deeploy_const.py | 1 + extensions/business/deeploy/deeploy_mixin.py | 91 ++++++++++++++++++-- 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index 33d0100f..02ea4de4 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -44,6 +44,7 @@ class DEEPLOY_KEYS: PIPELINE = "pipeline" PIPELINE_CID = "pipeline_cid" ONLINE = "online" + CHAIN_JOB = "chain_job" JOB_CONFIG = "job_config" # App params keys APP_PARAMS = "app_params" diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 65164df5..e30ee222 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2613,6 +2613,82 @@ def _normalize_active_job_ids(self, active_job_ids): return normalized_job_ids + def _normalize_active_jobs(self, active_jobs): + """ + Normalize and deduplicate active job details returned by blockchain calls. + """ + normalized_active_jobs = [] + seen = set() + + if not isinstance(active_jobs, list): + return normalized_active_jobs + + for raw_job in active_jobs: + if not isinstance(raw_job, dict): + self.Pd(f"Skipping invalid active job payload '{raw_job}'.", color='y') + continue + + raw_job_id = raw_job.get("jobId", raw_job.get("id", None)) + try: + parsed_job_id = int(raw_job_id) + except Exception: + self.Pd(f"Skipping active job with invalid id '{raw_job_id}'.", color='y') + continue + + if parsed_job_id in seen: + continue + seen.add(parsed_job_id) + + normalized_active_jobs.append({ + "job_id": parsed_job_id, + "raw": raw_job, + }) + + return normalized_active_jobs + + def _serialize_chain_job(self, raw_job): + """ + Serialize chain job details for JSON APIs, keeping bigint-like values as strings. + """ + if not isinstance(raw_job, dict): + return None + + active_nodes = raw_job.get("activeNodes", []) + if not isinstance(active_nodes, list): + active_nodes = [] + + serialized = { + "id": str(raw_job.get("jobId", raw_job.get("id", ""))), + "projectHash": raw_job.get("projectHash"), + "requestTimestamp": str(raw_job.get("requestTimestamp", 0)), + "startTimestamp": str(raw_job.get("startTimestamp", 0)), + "lastNodesChangeTimestamp": str(raw_job.get("lastNodesChangeTimestamp", 0)), + "jobType": str(raw_job.get("jobType", 0)), + "pricePerEpoch": str(raw_job.get("pricePerEpoch", 0)), + "lastExecutionEpoch": str(raw_job.get("lastExecutionEpoch", 0)), + "numberOfNodesRequested": str(raw_job.get("numberOfNodesRequested", 0)), + "balance": str(raw_job.get("balance", 0)), + "lastAllocatedEpoch": str(raw_job.get("lastAllocatedEpoch", 0)), + "activeNodes": [str(node) for node in active_nodes], + "network": raw_job.get("network"), + "escrowAddress": raw_job.get("escrowAddress"), + } + return serialized + + def _chain_job_matches_project_id(self, chain_job, project_id): + """ + Validate whether serialized chain job details belong to the provided project. + """ + if project_id is None: + return True + if not isinstance(chain_job, dict): + return False + + chain_project_id = chain_job.get("projectHash", None) + if chain_project_id is None: + return False + return str(chain_project_id).lower() == str(project_id).lower() + def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, project_id=None): """ Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots. @@ -2631,11 +2707,15 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec if not sender_escrow: return result - raw_active_job_ids = self.bc.get_escrow_active_job_ids(sender_escrow) - active_job_ids = self._normalize_active_job_ids(raw_active_job_ids) - self.Pd(f"Escrow {sender_escrow} active job ids: {active_job_ids}") + raw_active_jobs = self.bc.get_escrow_active_jobs(sender_escrow) + active_jobs = self._normalize_active_jobs(raw_active_jobs) + self.Pd(f"Escrow {sender_escrow} active jobs: {[job['job_id'] for job in active_jobs]}") + + for active_job in active_jobs: + job_id = active_job["job_id"] + chain_job = self._serialize_chain_job(active_job.get("raw", {})) + chain_matches_project = self._chain_job_matches_project_id(chain_job, project_id) - for job_id in active_job_ids: online_apps = self._get_online_apps( owner=owner, job_id=job_id, @@ -2684,7 +2764,7 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec pipeline = None pipeline_cid = None - if pipeline is None and len(online_apps) == 0 and project_id is not None: + if pipeline is None and len(online_apps) == 0 and project_id is not None and not chain_matches_project: # If a project filter is requested and neither source matches, skip this job. continue @@ -2693,6 +2773,7 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec DEEPLOY_KEYS.PIPELINE_CID: pipeline_cid if pipeline is not None else None, DEEPLOY_KEYS.PIPELINE: pipeline if pipeline is not None else None, DEEPLOY_KEYS.ONLINE: online_apps, + DEEPLOY_KEYS.CHAIN_JOB: chain_job, } return result From b3b68e442374b1ee5d1d8fc1ac0994989406fe22 Mon Sep 17 00:00:00 2001 From: Alessandro Date: Thu, 26 Feb 2026 15:51:47 +0100 Subject: [PATCH 3/5] feat: enhance job ID handling in online apps retrieval --- extensions/business/deeploy/deeploy_mixin.py | 48 +++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index e30ee222..0f4259b1 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2548,10 +2548,16 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i filtered_result[node][app_name] = app_data result = filtered_result if job_id is not None: + if isinstance(job_id, int): + job_id = [job_id] + unique_job_ids = set() + for raw_value in job_id: + unique_job_ids.add(raw_value) filtered_result = self.defaultdict(dict) for node, apps in result.items(): for app_name, app_data in apps.items(): - if app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) != job_id: + app_job_id = app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) + if app_job_id not in unique_job_ids: continue filtered_result[node][app_name] = app_data result = filtered_result @@ -2711,26 +2717,36 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec active_jobs = self._normalize_active_jobs(raw_active_jobs) self.Pd(f"Escrow {sender_escrow} active jobs: {[job['job_id'] for job in active_jobs]}") + active_job_ids = [job["job_id"] for job in active_jobs] + + # Fetch online apps once, then reuse grouped entries per job_id. + all_online_apps = self._get_online_apps( + owner=owner, + job_id=active_job_ids, + project_id=project_id + ) + + online_apps_by_job_id = self.defaultdict(lambda: self.defaultdict(dict)) + if isinstance(all_online_apps, dict): + for node, apps in all_online_apps.items(): + if not isinstance(apps, dict): + self.Pd(f"Skipping malformed online apps payload for node {node}.", color='y') + continue + for app_name, app_data in apps.items(): + app_job_id = self._extract_app_job_id(app_data) + if app_job_id is None: + continue + online_apps_by_job_id[app_job_id][node][app_name] = app_data + for active_job in active_jobs: job_id = active_job["job_id"] chain_job = self._serialize_chain_job(active_job.get("raw", {})) chain_matches_project = self._chain_job_matches_project_id(chain_job, project_id) - online_apps = self._get_online_apps( - owner=owner, - job_id=job_id, - project_id=project_id - ) - if not isinstance(online_apps, dict): - online_apps = {} - else: - normalized_online_apps = {} - for node, apps in online_apps.items(): - if not isinstance(apps, dict): - self.Pd(f"Skipping malformed online apps payload for node {node}.", color='y') - continue - normalized_online_apps[node] = dict(apps) - online_apps = normalized_online_apps + online_apps = {} + grouped_online_apps = online_apps_by_job_id.get(job_id, {}) + if isinstance(grouped_online_apps, dict): + online_apps = {node: dict(apps) for node, apps in grouped_online_apps.items() if isinstance(apps, dict)} pipeline_cid = None pipeline = None From db19ca46211bdcf0fbb7b181c29861c21ece2f1c Mon Sep 17 00:00:00 2001 From: Alessandro Date: Thu, 26 Feb 2026 16:04:22 +0100 Subject: [PATCH 4/5] fix --- extensions/business/deeploy/deeploy_mixin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 0f4259b1..66ede3b6 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2733,7 +2733,7 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec self.Pd(f"Skipping malformed online apps payload for node {node}.", color='y') continue for app_name, app_data in apps.items(): - app_job_id = self._extract_app_job_id(app_data) + app_job_id = app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) if app_job_id is None: continue online_apps_by_job_id[app_job_id][node][app_name] = app_data From a0c6e467244e4a683ab2bfecdd4fd553a8a3f018 Mon Sep 17 00:00:00 2001 From: Alessandro Date: Thu, 26 Feb 2026 17:55:50 +0100 Subject: [PATCH 5/5] feat: simplify job handling and improve pipeline validation in deeploy mixin --- extensions/business/deeploy/deeploy_mixin.py | 161 +++---------------- 1 file changed, 24 insertions(+), 137 deletions(-) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 66ede3b6..3c61b829 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2576,95 +2576,12 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i app_data["node_alias"] = node_alias return result - def _pipeline_matches_project_id(self, pipeline, project_id): - """ - Validate whether a raw pipeline payload belongs to a given project. - """ - if project_id is None: - return True - if not isinstance(pipeline, dict): - return False - - deeploy_specs = ( - pipeline.get(ct.CONFIG_STREAM.DEEPLOY_SPECS, None) or - pipeline.get(NetMonCt.DEEPLOY_SPECS, None) or - pipeline.get("DEEPLOY_SPECS", None) or - pipeline.get("deeploy_specs", None) - ) - if not isinstance(deeploy_specs, dict): - return False - return deeploy_specs.get(DEEPLOY_KEYS.PROJECT_ID, None) == project_id - - def _normalize_active_job_ids(self, active_job_ids): - """ - Normalize and deduplicate active job IDs returned by blockchain calls. - """ - normalized_job_ids = [] - seen = set() - - if not isinstance(active_job_ids, list): - return normalized_job_ids - - for raw_job_id in active_job_ids: - try: - parsed_job_id = int(raw_job_id) - except Exception: - self.Pd(f"Skipping invalid active job id '{raw_job_id}' returned by blockchain.", color='y') - continue - - if parsed_job_id in seen: - continue - seen.add(parsed_job_id) - normalized_job_ids.append(parsed_job_id) - - return normalized_job_ids - - def _normalize_active_jobs(self, active_jobs): - """ - Normalize and deduplicate active job details returned by blockchain calls. - """ - normalized_active_jobs = [] - seen = set() - - if not isinstance(active_jobs, list): - return normalized_active_jobs - - for raw_job in active_jobs: - if not isinstance(raw_job, dict): - self.Pd(f"Skipping invalid active job payload '{raw_job}'.", color='y') - continue - - raw_job_id = raw_job.get("jobId", raw_job.get("id", None)) - try: - parsed_job_id = int(raw_job_id) - except Exception: - self.Pd(f"Skipping active job with invalid id '{raw_job_id}'.", color='y') - continue - - if parsed_job_id in seen: - continue - seen.add(parsed_job_id) - - normalized_active_jobs.append({ - "job_id": parsed_job_id, - "raw": raw_job, - }) - - return normalized_active_jobs - def _serialize_chain_job(self, raw_job): """ Serialize chain job details for JSON APIs, keeping bigint-like values as strings. """ - if not isinstance(raw_job, dict): - return None - - active_nodes = raw_job.get("activeNodes", []) - if not isinstance(active_nodes, list): - active_nodes = [] - serialized = { - "id": str(raw_job.get("jobId", raw_job.get("id", ""))), + "id": str(raw_job.get("jobId", "")), "projectHash": raw_job.get("projectHash"), "requestTimestamp": str(raw_job.get("requestTimestamp", 0)), "startTimestamp": str(raw_job.get("startTimestamp", 0)), @@ -2675,7 +2592,7 @@ def _serialize_chain_job(self, raw_job): "numberOfNodesRequested": str(raw_job.get("numberOfNodesRequested", 0)), "balance": str(raw_job.get("balance", 0)), "lastAllocatedEpoch": str(raw_job.get("lastAllocatedEpoch", 0)), - "activeNodes": [str(node) for node in active_nodes], + "activeNodes": [str(node) for node in raw_job.get("activeNodes", [])], "network": raw_job.get("network"), "escrowAddress": raw_job.get("escrowAddress"), } @@ -2687,15 +2604,9 @@ def _chain_job_matches_project_id(self, chain_job, project_id): """ if project_id is None: return True - if not isinstance(chain_job, dict): - return False - - chain_project_id = chain_job.get("projectHash", None) - if chain_project_id is None: - return False - return str(chain_project_id).lower() == str(project_id).lower() + return str(chain_job["projectHash"]).lower() == str(project_id).lower() - def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, project_id=None): + def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None): """ Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots. @@ -2703,21 +2614,17 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec { "": { "job_id": , - "pipeline_cid": , - "pipeline": , # raw R1FS payload - "online": , # online apps snapshot keyed by node + "pipeline": , # raw R1FS payload + "chain_job": , # serialized chain job details + "online": , # online apps snapshot keyed by node } } """ result = {} - if not sender_escrow: - return result - - raw_active_jobs = self.bc.get_escrow_active_jobs(sender_escrow) - active_jobs = self._normalize_active_jobs(raw_active_jobs) - self.Pd(f"Escrow {sender_escrow} active jobs: {[job['job_id'] for job in active_jobs]}") - active_job_ids = [job["job_id"] for job in active_jobs] + active_jobs = self.bc.get_escrow_active_jobs(sender_escrow) + active_job_ids = [int(job["jobId"]) for job in active_jobs] + self.Pd(f"Fetched {len(active_job_ids)} active jobs from escrow {sender_escrow}, fetching details") # Fetch online apps once, then reuse grouped entries per job_id. all_online_apps = self._get_online_apps( @@ -2727,58 +2634,39 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec ) online_apps_by_job_id = self.defaultdict(lambda: self.defaultdict(dict)) - if isinstance(all_online_apps, dict): - for node, apps in all_online_apps.items(): - if not isinstance(apps, dict): - self.Pd(f"Skipping malformed online apps payload for node {node}.", color='y') - continue - for app_name, app_data in apps.items(): - app_job_id = app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) - if app_job_id is None: - continue - online_apps_by_job_id[app_job_id][node][app_name] = app_data + for node, apps in all_online_apps.items(): + for app_name, app_data in apps.items(): + app_job_id = int(app_data[NetMonCt.DEEPLOY_SPECS][DEEPLOY_KEYS.JOB_ID]) + online_apps_by_job_id[app_job_id][node][app_name] = app_data for active_job in active_jobs: - job_id = active_job["job_id"] - chain_job = self._serialize_chain_job(active_job.get("raw", {})) + job_id = int(active_job["jobId"]) + chain_job = self._serialize_chain_job(active_job) chain_matches_project = self._chain_job_matches_project_id(chain_job, project_id) - online_apps = {} grouped_online_apps = online_apps_by_job_id.get(job_id, {}) - if isinstance(grouped_online_apps, dict): - online_apps = {node: dict(apps) for node, apps in grouped_online_apps.items() if isinstance(apps, dict)} + online_apps = {node: dict(apps) for node, apps in grouped_online_apps.items()} - pipeline_cid = None pipeline = None try: - pipeline_cid = self._get_pipeline_from_cstore(job_id) - if pipeline_cid: - pipeline = self.get_pipeline_from_r1fs(pipeline_cid) + pipeline = self.get_job_pipeline_from_cstore(job_id) except Exception as exc: self.Pd(f"Failed to load R1FS payload for job {job_id}: {exc}", color='y') pipeline = None - pipeline_cid = None - - if pipeline is not None and owner is not None: - pipeline_owner = ( - pipeline.get(ct.CONFIG_STREAM.K_OWNER, None) or - pipeline.get(NetMonCt.OWNER, None) or - pipeline.get("OWNER", None) or - pipeline.get("owner", None) - ) - if pipeline_owner is not None and pipeline_owner != owner: + + if pipeline is not None: + pipeline_owner = pipeline[NetMonCt.OWNER] + if pipeline_owner != owner: self.Pd( f"Skipping R1FS payload for job {job_id}: owner mismatch " f"(expected {owner}, got {pipeline_owner}).", color='y' ) pipeline = None - pipeline_cid = None - if pipeline is not None and project_id is not None and not self._pipeline_matches_project_id(pipeline, project_id): + if pipeline.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.PROJECT_ID) != project_id: self.Pd(f"Skipping R1FS payload for job {job_id}: project_id mismatch.", color='y') pipeline = None - pipeline_cid = None if pipeline is None and len(online_apps) == 0 and project_id is not None and not chain_matches_project: # If a project filter is requested and neither source matches, skip this job. @@ -2786,8 +2674,7 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec result[str(job_id)] = { DEEPLOY_KEYS.JOB_ID: job_id, - DEEPLOY_KEYS.PIPELINE_CID: pipeline_cid if pipeline is not None else None, - DEEPLOY_KEYS.PIPELINE: pipeline if pipeline is not None else None, + DEEPLOY_KEYS.PIPELINE: pipeline, DEEPLOY_KEYS.ONLINE: online_apps, DEEPLOY_KEYS.CHAIN_JOB: chain_job, }