@@ -2570,6 +2570,133 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i
25702570 app_data ["node_alias" ] = node_alias
25712571 return result
25722572
2573+ def _pipeline_matches_project_id (self , pipeline , project_id ):
2574+ """
2575+ Validate whether a raw pipeline payload belongs to a given project.
2576+ """
2577+ if project_id is None :
2578+ return True
2579+ if not isinstance (pipeline , dict ):
2580+ return False
2581+
2582+ deeploy_specs = (
2583+ pipeline .get (ct .CONFIG_STREAM .DEEPLOY_SPECS , None ) or
2584+ pipeline .get (NetMonCt .DEEPLOY_SPECS , None ) or
2585+ pipeline .get ("DEEPLOY_SPECS" , None ) or
2586+ pipeline .get ("deeploy_specs" , None )
2587+ )
2588+ if not isinstance (deeploy_specs , dict ):
2589+ return False
2590+ return deeploy_specs .get (DEEPLOY_KEYS .PROJECT_ID , None ) == project_id
2591+
2592+ def _normalize_active_job_ids (self , active_job_ids ):
2593+ """
2594+ Normalize and deduplicate active job IDs returned by blockchain calls.
2595+ """
2596+ normalized_job_ids = []
2597+ seen = set ()
2598+
2599+ if not isinstance (active_job_ids , list ):
2600+ return normalized_job_ids
2601+
2602+ for raw_job_id in active_job_ids :
2603+ try :
2604+ parsed_job_id = int (raw_job_id )
2605+ except Exception :
2606+ self .Pd (f"Skipping invalid active job id '{ raw_job_id } ' returned by blockchain." , color = 'y' )
2607+ continue
2608+
2609+ if parsed_job_id in seen :
2610+ continue
2611+ seen .add (parsed_job_id )
2612+ normalized_job_ids .append (parsed_job_id )
2613+
2614+ return normalized_job_ids
2615+
2616+ def _get_apps_by_escrow_active_jobs (self , sender_escrow = None , owner = None , project_id = None ):
2617+ """
2618+ Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots.
2619+
2620+ Response shape:
2621+ {
2622+ "<job_id>": {
2623+ "job_id": <int>,
2624+ "pipeline_cid": <str|None>,
2625+ "pipeline": <dict|None>, # raw R1FS payload
2626+ "online": <dict>, # online apps snapshot keyed by node
2627+ }
2628+ }
2629+ """
2630+ result = {}
2631+ if not sender_escrow :
2632+ return result
2633+
2634+ raw_active_job_ids = self .bc .get_escrow_active_job_ids (sender_escrow )
2635+ active_job_ids = self ._normalize_active_job_ids (raw_active_job_ids )
2636+ self .Pd (f"Escrow { sender_escrow } active job ids: { active_job_ids } " )
2637+
2638+ for job_id in active_job_ids :
2639+ online_apps = self ._get_online_apps (
2640+ owner = owner ,
2641+ job_id = job_id ,
2642+ project_id = project_id
2643+ )
2644+ if not isinstance (online_apps , dict ):
2645+ online_apps = {}
2646+ else :
2647+ normalized_online_apps = {}
2648+ for node , apps in online_apps .items ():
2649+ if not isinstance (apps , dict ):
2650+ self .Pd (f"Skipping malformed online apps payload for node { node } ." , color = 'y' )
2651+ continue
2652+ normalized_online_apps [node ] = dict (apps )
2653+ online_apps = normalized_online_apps
2654+
2655+ pipeline_cid = None
2656+ pipeline = None
2657+ try :
2658+ pipeline_cid = self ._get_pipeline_from_cstore (job_id )
2659+ if pipeline_cid :
2660+ pipeline = self .get_pipeline_from_r1fs (pipeline_cid )
2661+ except Exception as exc :
2662+ self .Pd (f"Failed to load R1FS payload for job { job_id } : { exc } " , color = 'y' )
2663+ pipeline = None
2664+ pipeline_cid = None
2665+
2666+ if pipeline is not None and owner is not None :
2667+ pipeline_owner = (
2668+ pipeline .get (ct .CONFIG_STREAM .K_OWNER , None ) or
2669+ pipeline .get (NetMonCt .OWNER , None ) or
2670+ pipeline .get ("OWNER" , None ) or
2671+ pipeline .get ("owner" , None )
2672+ )
2673+ if pipeline_owner is not None and pipeline_owner != owner :
2674+ self .Pd (
2675+ f"Skipping R1FS payload for job { job_id } : owner mismatch "
2676+ f"(expected { owner } , got { pipeline_owner } )." ,
2677+ color = 'y'
2678+ )
2679+ pipeline = None
2680+ pipeline_cid = None
2681+
2682+ if pipeline is not None and project_id is not None and not self ._pipeline_matches_project_id (pipeline , project_id ):
2683+ self .Pd (f"Skipping R1FS payload for job { job_id } : project_id mismatch." , color = 'y' )
2684+ pipeline = None
2685+ pipeline_cid = None
2686+
2687+ if pipeline is None and len (online_apps ) == 0 and project_id is not None :
2688+ # If a project filter is requested and neither source matches, skip this job.
2689+ continue
2690+
2691+ result [str (job_id )] = {
2692+ DEEPLOY_KEYS .JOB_ID : job_id ,
2693+ DEEPLOY_KEYS .PIPELINE_CID : pipeline_cid if pipeline is not None else None ,
2694+ DEEPLOY_KEYS .PIPELINE : pipeline if pipeline is not None else None ,
2695+ DEEPLOY_KEYS .ONLINE : online_apps ,
2696+ }
2697+
2698+ return result
2699+
25732700 # TODO: REMOVE THIS, once instance_id is coming from ui for instances that have to be updated
25742701 # Maybe add is_new_instance:bool for native apps, that want to add an extra plugin
25752702 def _ensure_plugin_instance_ids (self , inputs , discovered_plugin_instances , owner = None , app_id = None , job_id = None ):
0 commit comments