Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions fossology/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,78 @@ def generate_report(
description = f"Report generation for upload {upload.uploadname} failed"
raise FossologyApiError(description, response)

def import_report(
self,
upload: Upload,
report_file: str,
report_format: str = "spdxrdf",
add_concluded_as_decisions: bool = False,
group: str | None = None,
) -> int:
"""Import an external report for a given upload.

Uploads the report file and schedules a ``reportImport`` job that
merges the report's license decisions into the upload.

API Endpoint: POST /report/import

:Example:

>>> from fossology import Fossology
>>>
>>> foss = Fossology(FOSS_URL, FOSS_TOKEN) # doctest: +SKIP
>>> job_id = foss.import_report(
... foss.detail_upload(1),
... "report.spdx.rdf",
... ) # doctest: +SKIP

:param upload: the upload the report is imported for
:param report_file: local path to the report file to import
:param report_format: the report format (default: "spdxrdf" — the only format
currently accepted by the Fossology API)
:param add_concluded_as_decisions: treat concluded licenses in the report
as clearing decisions (default: False)
:param group: the group name to act on behalf of (default: None)
Comment thread
Valyrian-Code marked this conversation as resolved.
:type upload: Upload
:type report_file: str
:type report_format: str
:type add_concluded_as_decisions: bool
:type group: str | None
:return: the id of the scheduled reportImport job
:rtype: int
:raises FossologyApiError: if the REST call failed
:raises AuthorizationError: if the REST call is not authorized
"""
params = {
"upload": str(upload.id),
"reportFormat": report_format,
"addConcludedAsDecisions": str(add_concluded_as_decisions).lower(),
}
headers = {}
if group:
headers["groupName"] = group

with open(report_file, "rb") as fp:
response = self.session.post(
f"{self.api}/report/import",
params=params,
headers=headers,
files={"report": fp},
)

upload_ref = f"{upload.uploadname} (id={upload.id})"

if response.status_code == 201:
return int(response.json()["message"])

elif response.status_code == 403:
description = f"Report import for upload {upload_ref} is not authorized"
raise AuthorizationError(description, response)

else:
description = f"Report import for upload {upload_ref} failed"
raise FossologyApiError(description, response)

@retry(retry=retry_if_exception_type(TryAgain), stop=stop_after_attempt(10))
def download_report(
self, report_id: int, group: str | None = None, wait_time: int = 0
Expand Down
51 changes: 51 additions & 0 deletions tests/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,57 @@ def test_generate_report(foss: Fossology, upload: Upload):
Path(report_path / report_name).unlink()


def test_import_report(foss: Fossology, upload: Upload, tmp_path: Path):
# `ReportFormat.SPDX2` generates SPDX 2.x in RDF, which is the same on-wire
# format the import endpoint's default "spdxrdf" accepts. Round-trip the
# report file: generate → download → import.
report_id = foss.generate_report(upload, report_format=ReportFormat.SPDX2)
report_content, report_name = foss.download_report(report_id)
report_file = tmp_path / report_name
report_file.write_bytes(report_content)

job_id = foss.import_report(upload, str(report_file))
assert isinstance(job_id, int)
assert job_id > 0


@responses.activate
def test_import_report_nogroup(
foss_server: str, foss: Fossology, upload: Upload, tmp_path: Path
):
responses.add(
responses.POST,
f"{foss_server}/api/v1/report/import",
status=403,
)
report_file = tmp_path / "dummy.rdf"
report_file.write_bytes(b"<rdf></rdf>")
with pytest.raises(AuthorizationError) as excinfo:
foss.import_report(upload, str(report_file), group="test")
assert (
f"Report import for upload {upload.uploadname} (id={upload.id}) "
f"is not authorized"
) in str(excinfo.value)


@responses.activate
def test_import_report_error(
foss_server: str, foss: Fossology, upload: Upload, tmp_path: Path
):
responses.add(
responses.POST,
f"{foss_server}/api/v1/report/import",
status=500,
)
report_file = tmp_path / "dummy.rdf"
report_file.write_bytes(b"<rdf></rdf>")
with pytest.raises(FossologyApiError) as excinfo:
foss.import_report(upload, str(report_file))
assert (
f"Report import for upload {upload.uploadname} (id={upload.id}) failed"
) in str(excinfo.value)


@responses.activate
def test_generate_report_unparseable_message(
foss_server: str, foss: Fossology, upload: Upload
Expand Down