Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ classifiers = [
]
dependencies = [
"beautifulsoup4>=4.11.0",
"cyclonedx-python-lib>=11.6.0",
"datasets>=2.0.0",
"fastapi>=0.104.0",
"flask>=2.3.0",
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ python-dotenv>=1.0.0
PyYAML>=6.0.1
flask>=2.3.0
gunicorn>=21.2.0
cyclonedx-python-lib>=11.6.0
cyclonedx-python-lib>=4.0.0
packageurl-python>=0.17.6
python-multipart
jinja2>=3.0.0
Expand Down
179 changes: 42 additions & 137 deletions src/models/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,6 @@
from typing import Dict, Optional, Any, List, Union
from urllib.parse import urlparse
from packageurl import PackageURL
from cyclonedx.model import ExternalReference, ExternalReferenceType, Property, XsUri
from cyclonedx.model.bom import Bom, BomMetaData, Tool
from cyclonedx.model.bom_ref import BomRef
from cyclonedx.model.component import Component, ComponentType
from cyclonedx.model.contact import OrganizationalContact, OrganizationalEntity
from cyclonedx.model.dependency import Dependency
from cyclonedx.model.license import DisjunctiveLicense
from cyclonedx.output.json import JsonV1Dot6

from huggingface_hub import HfApi, ModelCard
from huggingface_hub.repocard_data import EvalResult
Expand Down Expand Up @@ -199,33 +191,32 @@ def _get_tool_metadata(self) -> Dict[str, Any]:

def _create_minimal_aibom(self, model_id: str, spec_version: str = "1.6") -> Dict[str, Any]:
"""Create a minimal valid AIBOM structure in case of errors"""
hf_purl = self._generate_hf_purl(model_id, "1.0")

bom = Bom()
bom.serial_number = uuid.uuid4()
bom.version = 1
bom.metadata = BomMetaData(
timestamp=datetime.datetime.now(datetime.timezone.utc),
tools=[Tool(vendor="OWASP GenAI Security Project", name=AIBOM_GEN_NAME, version=AIBOM_GEN_VERSION)],
component=Component(
name=model_id.split("/")[-1],
type=ComponentType.APPLICATION,
version="1.0",
bom_ref=PackageURL(type='generic', name=model_id, version="1.0").to_string(),
purl=PackageURL(type='generic', name=model_id, version="1.0")
)
)

model_component = Component(
name=model_id.split("/")[-1],
type=ComponentType.MACHINE_LEARNING_MODEL,
version="1.0",
bom_ref=hf_purl,
purl=PackageURL.from_string(hf_purl),
)
bom.components.add(model_component)

return json.loads(JsonV1Dot6(bom).output_as_string())
hf_purl = self._generate_purl(model_id, "1.0")
metadata_purl = self._generate_purl(model_id, "1.0", purl_type="generic")

return {
"bomFormat": "CycloneDX",
"specVersion": spec_version,
"serialNumber": f"urn:uuid:{str(uuid.uuid4())}",
"version": 1,
"metadata": {
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds'),
"tools": self._get_tool_metadata(),
"component": {
"bom-ref": metadata_purl,
"type": "application",
"name": model_id.split("/")[-1],
"version": "1.0"
}
},
"components": [{
"bom-ref": hf_purl,
"type": "machine-learning-model",
"name": model_id.split("/")[-1],
"version": "1.0",
"purl": hf_purl
}]
}

def _fetch_with_backoff(self, fetch_func, *args, max_retries=3, initial_backoff=1.0, **kwargs):
import time
Expand Down Expand Up @@ -274,111 +265,25 @@ def _create_aibom_structure(self, model_id: str, metadata: Dict[str, Any], spec_
full_commit = metadata.get("commit")
version = full_commit[:8] if full_commit else "1.0"

metadata_section = self._create_metadata_section(model_id, metadata, overrides=metadata_overrides)
component_section = self._create_component_section(model_id, metadata)

bom = Bom()
bom.serial_number = uuid.uuid4()
bom.version = 1
bom.metadata = self._build_cyclonedx_metadata(metadata_section)
model_component = self._build_cyclonedx_component(component_section)
bom.components.add(model_component)
bom.dependencies.add(
Dependency(
ref=BomRef(metadata_section["component"]["bom-ref"]),
dependencies=[Dependency(ref=model_component.bom_ref)]
)
)
aibom = {
"bomFormat": "CycloneDX",
"specVersion": spec_version,
"serialNumber": f"urn:uuid:{str(uuid.uuid4())}",
"version": 1,
"metadata": self._create_metadata_section(model_id, metadata, overrides=metadata_overrides),
"components": [self._create_component_section(model_id, metadata)],
"dependencies": [
{
"ref": self._generate_purl(model_id, version, purl_type="generic"),
"dependsOn": [self._generate_purl(model_id, version)]
}
]
}


aibom = json.loads(JsonV1Dot6(bom).output_as_string())
aibom["metadata"]["component"]["description"] = metadata_section["component"].get("description")
if component_section.get("modelCard"):
aibom["components"][0]["modelCard"] = component_section["modelCard"]

return aibom

def _build_cyclonedx_metadata(self, metadata_section: Dict[str, Any]) -> BomMetaData:
metadata_component = metadata_section["component"]
return BomMetaData(
timestamp=datetime.datetime.now(datetime.timezone.utc),
tools=[Tool(vendor="OWASP GenAI Security Project", name=AIBOM_GEN_NAME, version=AIBOM_GEN_VERSION)],
component=Component(
name=metadata_component["name"],
type=ComponentType.APPLICATION,
version=metadata_component["version"],
description=metadata_component.get("description"),
bom_ref=metadata_component["bom-ref"],
purl=PackageURL.from_string(metadata_component["purl"]),
manufacturer=self._entity_from_dict(metadata_component.get("manufacturer")),
supplier=self._entity_from_dict(metadata_component.get("supplier")),
authors=self._authors_from_dicts(metadata_component.get("authors", []))
)
)

def _build_cyclonedx_component(self, component_section: Dict[str, Any]) -> Component:
return Component(
name=component_section["name"],
type=ComponentType.MACHINE_LEARNING_MODEL,
group=component_section.get("group") or None,
version=component_section["version"],
description=component_section.get("description"),
bom_ref=component_section["bom-ref"],
purl=PackageURL.from_string(component_section["purl"]),
licenses=self._licenses_from_dicts(component_section.get("licenses", [])),
manufacturer=self._entity_from_dict(component_section.get("manufacturer")),
supplier=self._entity_from_dict(component_section.get("supplier")),
authors=self._authors_from_dicts(component_section.get("authors", [])),
properties=self._properties_from_dicts(component_section.get("properties", [])),
external_references=self._external_refs_from_dicts(component_section.get("externalReferences", []))
)

@staticmethod
def _entity_from_dict(entity: Optional[Dict[str, Any]]) -> Optional[OrganizationalEntity]:
if not entity or not entity.get("name"):
return None
urls = entity.get("url") or []
return OrganizationalEntity(name=entity["name"], urls=[XsUri(url) for url in urls])

@staticmethod
def _authors_from_dicts(authors: List[Dict[str, Any]]) -> List[OrganizationalContact]:
return [OrganizationalContact(name=author["name"]) for author in authors if author.get("name")]

@staticmethod
def _licenses_from_dicts(licenses: List[Dict[str, Any]]) -> List[DisjunctiveLicense]:
converted_licenses: List[DisjunctiveLicense] = []
for license_entry in licenses:
license_data = license_entry.get("license", {})
license_id = license_data.get("id")
license_name = license_data.get("name")
if license_id:
converted_licenses.append(DisjunctiveLicense(id=license_id))
elif license_name:
converted_licenses.append(DisjunctiveLicense(name=license_name, url=license_data.get("url")))
return converted_licenses

@staticmethod
def _properties_from_dicts(properties: List[Dict[str, Any]]) -> List[Property]:
return [Property(name=prop["name"], value=prop["value"]) for prop in properties if prop.get("name")]

def _external_refs_from_dicts(self, refs: List[Dict[str, Any]]) -> List[ExternalReference]:
external_refs: List[ExternalReference] = []
for ref in refs:
if not ref.get("url"):
continue
reference_type = self._map_external_reference_type(ref.get("type", "website"))
external_refs.append(
ExternalReference(
type=reference_type,
url=XsUri(ref["url"]),
comment=ref.get("comment")
)
)
return external_refs

@staticmethod
def _map_external_reference_type(reference_type: str) -> ExternalReferenceType:
normalized_name = reference_type.upper().replace("-", "_")
return ExternalReferenceType.__members__.get(normalized_name, ExternalReferenceType.WEBSITE)

def _create_metadata_section(self, model_id: str, metadata: Dict[str, Any], overrides: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')

Expand Down
40 changes: 15 additions & 25 deletions tests/test_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import unittest
from unittest.mock import MagicMock, patch
from cyclonedx.model import ExternalReferenceType
from src.models.service import AIBOMService

class TestService(unittest.TestCase):
Expand Down Expand Up @@ -81,7 +80,7 @@ def test_generate_aibom_version_truncation(self, mock_extractor_cls, mock_score)
self.assertIn(f"@{expected_version}", ml_cmp["bom-ref"])

# Verify dependencies
self.assertIn("dependencies", aibom)
self.assertIn(f"@{expected_version}", aibom["dependencies"][0]["ref"])
self.assertIn(f"@{expected_version}", aibom["dependencies"][0]["dependsOn"][0])

def test_infer_io_formats(self):
Expand All @@ -105,29 +104,20 @@ def test_infer_io_formats(self):
self.assertEqual(inputs, [])
self.assertEqual(outputs, [])

def test_create_aibom_structure_uses_cyclonedx_outputter(self):
metadata = {
"name": "test-model",
"author": "tester",
"commit": "1234567890abcdef"
}

aibom = self.service._create_aibom_structure("owner/test-model", metadata)

self.assertEqual(aibom["bomFormat"], "CycloneDX")
self.assertEqual(aibom["specVersion"], "1.6")
self.assertIn("$schema", aibom)
self.assertEqual(aibom["components"][0]["type"], "machine-learning-model")

def test_external_reference_type_mapping_defaults_to_website(self):
self.assertEqual(
self.service._map_external_reference_type("documentation"),
ExternalReferenceType.DOCUMENTATION
)
self.assertEqual(
self.service._map_external_reference_type("totally-unknown-type"),
ExternalReferenceType.WEBSITE
)
def test_generate_purl_huggingface_default(self):
"""Test _generate_purl with default huggingface type"""
purl = self.service._generate_purl("owner/model", "1.0")
self.assertEqual(purl, "pkg:huggingface/owner/model@1.0")

def test_generate_purl_generic_type(self):
"""Test _generate_purl with generic type"""
purl = self.service._generate_purl("owner/model", "1.0", purl_type="generic")
self.assertEqual(purl, "pkg:generic/owner/model@1.0")

def test_generate_purl_no_namespace(self):
"""Test _generate_purl without namespace"""
purl = self.service._generate_purl("model", "1.0")
self.assertEqual(purl, "pkg:huggingface/model@1.0")

if __name__ == '__main__':
unittest.main()