Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions extensions/business/deeploy/deeploy_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class DEEPLOY_KEYS:
PIPELINE_PARAMS = "pipeline_params"
PIPELINE = "pipeline"
PIPELINE_CID = "pipeline_cid"
ONLINE = "online"
CHAIN_JOB = "chain_job"
JOB_CONFIG = "job_config"
# App params keys
APP_PARAMS = "app_params"
Expand Down
3 changes: 2 additions & 1 deletion extensions/business/deeploy/deeploy_manager_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ def get_apps(
sender, inputs = self.deeploy_verify_and_get_inputs(request)
auth_result = self.deeploy_get_auth_result(inputs)

apps = self._get_online_apps(
apps = self._get_apps_by_escrow_active_jobs(
sender_escrow=auth_result[DEEPLOY_KEYS.SENDER_ESCROW],
owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER],
project_id=inputs.get(DEEPLOY_KEYS.PROJECT_ID, None)
)
Expand Down
113 changes: 112 additions & 1 deletion extensions/business/deeploy/deeploy_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2548,10 +2548,16 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i
filtered_result[node][app_name] = app_data
result = filtered_result
if job_id is not None:
if isinstance(job_id, int):
job_id = [job_id]
unique_job_ids = set()
for raw_value in job_id:
unique_job_ids.add(raw_value)
filtered_result = self.defaultdict(dict)
for node, apps in result.items():
for app_name, app_data in apps.items():
if app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) != job_id:
app_job_id = app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None)
if app_job_id not in unique_job_ids:
continue
filtered_result[node][app_name] = app_data
result = filtered_result
Expand All @@ -2570,6 +2576,111 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i
app_data["node_alias"] = node_alias
return result

def _serialize_chain_job(self, raw_job):
"""
Serialize chain job details for JSON APIs, keeping bigint-like values as strings.
"""
serialized = {
"id": str(raw_job.get("jobId", "")),
"projectHash": raw_job.get("projectHash"),
"requestTimestamp": str(raw_job.get("requestTimestamp", 0)),
"startTimestamp": str(raw_job.get("startTimestamp", 0)),
"lastNodesChangeTimestamp": str(raw_job.get("lastNodesChangeTimestamp", 0)),
"jobType": str(raw_job.get("jobType", 0)),
"pricePerEpoch": str(raw_job.get("pricePerEpoch", 0)),
"lastExecutionEpoch": str(raw_job.get("lastExecutionEpoch", 0)),
"numberOfNodesRequested": str(raw_job.get("numberOfNodesRequested", 0)),
"balance": str(raw_job.get("balance", 0)),
"lastAllocatedEpoch": str(raw_job.get("lastAllocatedEpoch", 0)),
"activeNodes": [str(node) for node in raw_job.get("activeNodes", [])],
"network": raw_job.get("network"),
"escrowAddress": raw_job.get("escrowAddress"),
}
return serialized

def _chain_job_matches_project_id(self, chain_job, project_id):
"""
Validate whether serialized chain job details belong to the provided project.
"""
if project_id is None:
return True
return str(chain_job["projectHash"]).lower() == str(project_id).lower()

def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None):
"""
Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots.

Response shape:
{
"<job_id>": {
"job_id": <int>,
"pipeline": <dict|None>, # raw R1FS payload
"chain_job": <dict>, # serialized chain job details
"online": <dict>, # online apps snapshot keyed by node
}
}
"""
result = {}

active_jobs = self.bc.get_escrow_active_jobs(sender_escrow)
active_job_ids = [int(job["jobId"]) for job in active_jobs]
self.Pd(f"Fetched {len(active_job_ids)} active jobs from escrow {sender_escrow}, fetching details")

# Fetch online apps once, then reuse grouped entries per job_id.
all_online_apps = self._get_online_apps(
owner=owner,
job_id=active_job_ids,
project_id=project_id
)

online_apps_by_job_id = self.defaultdict(lambda: self.defaultdict(dict))
for node, apps in all_online_apps.items():
for app_name, app_data in apps.items():
app_job_id = int(app_data[NetMonCt.DEEPLOY_SPECS][DEEPLOY_KEYS.JOB_ID])
online_apps_by_job_id[app_job_id][node][app_name] = app_data

for active_job in active_jobs:
job_id = int(active_job["jobId"])
chain_job = self._serialize_chain_job(active_job)
chain_matches_project = self._chain_job_matches_project_id(chain_job, project_id)

grouped_online_apps = online_apps_by_job_id.get(job_id, {})
online_apps = {node: dict(apps) for node, apps in grouped_online_apps.items()}

pipeline = None
try:
pipeline = self.get_job_pipeline_from_cstore(job_id)
except Exception as exc:
self.Pd(f"Failed to load R1FS payload for job {job_id}: {exc}", color='y')
pipeline = None

if pipeline is not None:
pipeline_owner = pipeline[NetMonCt.OWNER]
if pipeline_owner != owner:
self.Pd(
f"Skipping R1FS payload for job {job_id}: owner mismatch "
f"(expected {owner}, got {pipeline_owner}).",
color='y'
)
pipeline = None

if pipeline.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.PROJECT_ID) != project_id:
self.Pd(f"Skipping R1FS payload for job {job_id}: project_id mismatch.", color='y')
pipeline = None

if pipeline is None and len(online_apps) == 0 and project_id is not None and not chain_matches_project:
# If a project filter is requested and neither source matches, skip this job.
continue

result[str(job_id)] = {
DEEPLOY_KEYS.JOB_ID: job_id,
DEEPLOY_KEYS.PIPELINE: pipeline,
DEEPLOY_KEYS.ONLINE: online_apps,
DEEPLOY_KEYS.CHAIN_JOB: chain_job,
}

return result

# TODO: REMOVE THIS, once instance_id is coming from ui for instances that have to be updated
# Maybe add is_new_instance:bool for native apps, that want to add an extra plugin
def _ensure_plugin_instance_ids(self, inputs, discovered_plugin_instances, owner=None, app_id=None, job_id=None):
Expand Down