Skip to content

Commit 3532a24

Browse files
committed
upload fixed to not finish until upload completes
1 parent be880a9 commit 3532a24

1 file changed

Lines changed: 157 additions & 12 deletions

File tree

src/bloodhound_cli/main.py

Lines changed: 157 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
import configparser
55
from neo4j import GraphDatabase
66
import argparse
7-
from typing import List, Dict, Optional
7+
from typing import List, Dict, Optional, Tuple
88
from datetime import datetime, timezone
99
import re
1010
import requests
1111
import getpass
1212
from pathlib import Path
13+
import time
1314

1415
try:
1516
from rich.console import Console
@@ -1225,6 +1226,109 @@ def upload_files(self, file_paths: List[str], start_path: str = "/api/v2/file-up
12251226

12261227
return results
12271228

1229+
def _get_file_upload_job(self, job_id: int, list_path: str = "/api/v2/file-upload") -> Optional[Dict]:
1230+
"""Fetch a single file-upload job by id using the list endpoint.
1231+
1232+
The OpenAPI defines only a list endpoint with filter params. Some deployments
1233+
may not support filtering by id directly, so we attempt with a query param and
1234+
fall back to client-side filtering.
1235+
"""
1236+
url = f"{self.base_url}{list_path}"
1237+
try:
1238+
# First, try server-side filtering
1239+
resp = self.session.get(url, params={"id": job_id}, verify=self.verify, timeout=60)
1240+
if resp.status_code >= 400:
1241+
# Retry without filter and do client-side filtering
1242+
resp = self.session.get(url, verify=self.verify, timeout=60)
1243+
data = resp.json() if resp.headers.get("Content-Type", "").startswith("application/json") else {}
1244+
items = []
1245+
if isinstance(data, dict):
1246+
items = data.get("data") or data.get("items") or []
1247+
# items is expected to be a list of jobs
1248+
for item in items:
1249+
try:
1250+
if int(item.get("id")) == int(job_id):
1251+
return item
1252+
except Exception:
1253+
continue
1254+
except requests.RequestException:
1255+
return None
1256+
except ValueError:
1257+
return None
1258+
return None
1259+
1260+
def upload_files_and_wait(self, file_paths: List[str], start_path: str = "/api/v2/file-upload/start", upload_path_tpl: str = "/api/v2/file-upload/{job_id}", end_path_tpl: str = "/api/v2/file-upload/{job_id}/end", content_type: Optional[str] = None, tag: Optional[str] = None, poll_interval: int = 5, timeout_seconds: int = 1800, list_path: str = "/api/v2/file-upload") -> Tuple[Dict[str, str], Optional[Dict]]:
1261+
"""Upload files and wait until the job has finished processing in BloodHound.
1262+
1263+
Returns a tuple: (per-file results mapping, final job dict or None on timeout/error).
1264+
"""
1265+
results = self.upload_files(
1266+
file_paths=file_paths,
1267+
start_path=start_path,
1268+
upload_path_tpl=upload_path_tpl,
1269+
end_path_tpl=end_path_tpl,
1270+
content_type=content_type,
1271+
tag=tag,
1272+
)
1273+
1274+
# If any upload failed outright, we still proceed to try to get job id from the start response
1275+
# However, current implementation does not expose job id externally. Re-start minimally to obtain it
1276+
# by inferring from latest job in the list. This is a best-effort strategy.
1277+
1278+
# Attempt to find the most recent job and poll it if it looks like ours.
1279+
start_time = time.time()
1280+
last_status = None
1281+
job = None
1282+
spinner_shown = False
1283+
while True:
1284+
job = self._get_file_upload_job(job_id=self._infer_latest_file_upload_job_id(list_path=list_path))
1285+
if job is None:
1286+
# Brief grace period immediately after upload
1287+
if time.time() - start_time > 15:
1288+
break
1289+
else:
1290+
status = job.get("status")
1291+
status_message = job.get("status_message")
1292+
if _RICH_AVAILABLE:
1293+
if not spinner_shown:
1294+
console.rule("[bold cyan]Waiting for ingestion to complete")
1295+
spinner_shown = True
1296+
console.log({"status": status, "message": status_message})
1297+
else:
1298+
print(f"Job status: {status} - {status_message}")
1299+
1300+
# Terminal statuses: -1 invalid, 2 complete, 3 canceled, 4 timed out, 5 failed, 8 partially complete
1301+
if status in [-1, 2, 3, 4, 5, 8]:
1302+
break
1303+
if time.time() - start_time > timeout_seconds:
1304+
break
1305+
time.sleep(max(1, poll_interval))
1306+
1307+
return results, job
1308+
1309+
def _infer_latest_file_upload_job_id(self, list_path: str = "/api/v2/file-upload") -> Optional[int]:
1310+
"""Best-effort helper to get the latest file upload job id for current user.
1311+
1312+
The API returns paginated list with sort options, but for simplicity we fetch and
1313+
take the max id as the latest.
1314+
"""
1315+
url = f"{self.base_url}{list_path}"
1316+
try:
1317+
resp = self.session.get(url, verify=self.verify, timeout=60)
1318+
data = resp.json() if resp.headers.get("Content-Type", "").startswith("application/json") else {}
1319+
items = []
1320+
if isinstance(data, dict):
1321+
items = data.get("data") or data.get("items") or []
1322+
ids = []
1323+
for item in items:
1324+
try:
1325+
ids.append(int(item.get("id")))
1326+
except Exception:
1327+
continue
1328+
return max(ids) if ids else None
1329+
except Exception:
1330+
return None
1331+
12281332
def not_implemented(self, feature: str):
12291333
msg = (
12301334
f"Feature '{feature}' is not yet implemented for BloodHound CE in this CLI. "
@@ -1370,6 +1474,12 @@ def main():
13701474
parser_upload.add_argument("--content-type", help="Force Content-Type (auto-detected from extension if omitted)")
13711475
parser_upload.add_argument("--insecure", action="store_true", help="Disable TLS certificate verification")
13721476
parser_upload.add_argument("--tag", help="Optional tag/label to include with upload (reserved)")
1477+
wait_group = parser_upload.add_mutually_exclusive_group()
1478+
wait_group.add_argument("--wait", dest="wait", action="store_true", help="Wait for ingestion to complete (default)")
1479+
wait_group.add_argument("--no-wait", dest="wait", action="store_false", help="Return immediately after upload is accepted")
1480+
parser_upload.set_defaults(wait=True)
1481+
parser_upload.add_argument("--poll-interval", type=int, default=5, help="Seconds between status checks (default: 5)")
1482+
parser_upload.add_argument("--timeout", type=int, default=1800, help="Max seconds to wait for completion (default: 1800)")
13731483

13741484
# acl subcommand (refactored)
13751485
parser_acl = subparsers.add_parser("acl", help="Query ACLs in BloodHound")
@@ -1515,17 +1625,52 @@ def main():
15151625
if not token_present:
15161626
print("No CE API token found. Run 'bloodhound-cli --edition ce auth --url http://localhost:7474 --username <user>' first.")
15171627
else:
1518-
results = analyzer.upload_files(
1519-
args.files,
1520-
start_path=args.start_path,
1521-
upload_path_tpl=args.upload_path,
1522-
end_path_tpl=args.end_path,
1523-
content_type=args.content_type,
1524-
tag=args.tag,
1525-
)
1526-
print("\nUpload results\n" + "=" * 50)
1527-
for f, msg in results.items():
1528-
print(f"{f}: {msg}")
1628+
if args.wait:
1629+
results, job = analyzer.upload_files_and_wait(
1630+
args.files,
1631+
start_path=args.start_path,
1632+
upload_path_tpl=args.upload_path,
1633+
end_path_tpl=args.end_path,
1634+
content_type=args.content_type,
1635+
tag=args.tag,
1636+
poll_interval=args.poll_interval,
1637+
timeout_seconds=args.timeout,
1638+
)
1639+
print("\nUpload results\n" + "=" * 50)
1640+
for f, msg in results.items():
1641+
print(f"{f}: {msg}")
1642+
# Human-readable status mapping
1643+
status_map = {
1644+
-1: "Invalid",
1645+
0: "Ready",
1646+
1: "Running",
1647+
2: "Complete",
1648+
3: "Canceled",
1649+
4: "Timed Out",
1650+
5: "Failed",
1651+
6: "Ingesting",
1652+
7: "Analyzing",
1653+
8: "Partially Complete",
1654+
}
1655+
if job:
1656+
st = job.get("status")
1657+
st_readable = status_map.get(st, str(st))
1658+
msg = job.get("status_message")
1659+
print("\nIngestion job status: {}{}".format(st_readable, f" - {msg}" if msg else ""))
1660+
else:
1661+
print("\nIngestion job status: Unknown (timeout or not found)")
1662+
else:
1663+
results = analyzer.upload_files(
1664+
args.files,
1665+
start_path=args.start_path,
1666+
upload_path_tpl=args.upload_path,
1667+
end_path_tpl=args.end_path,
1668+
content_type=args.content_type,
1669+
tag=args.tag,
1670+
)
1671+
print("\nUpload results\n" + "=" * 50)
1672+
for f, msg in results.items():
1673+
print(f"{f}: {msg}")
15291674
elif args.subcommand == "user":
15301675
if args.password_last_change:
15311676
analyzer.print_password_last_change(args.domain, user=args.user, output=args.output)

0 commit comments

Comments
 (0)