diff --git a/fossology/report.py b/fossology/report.py index 3217d51..5a3f21e 100644 --- a/fossology/report.py +++ b/fossology/report.py @@ -85,6 +85,78 @@ def generate_report( description = f"Report generation for upload {upload.uploadname} failed" raise FossologyApiError(description, response) + def import_report( + self, + upload: Upload, + report_file: str, + report_format: str = "spdxrdf", + add_concluded_as_decisions: bool = False, + group: str | None = None, + ) -> int: + """Import an external report for a given upload. + + Uploads the report file and schedules a ``reportImport`` job that + merges the report's license decisions into the upload. + + API Endpoint: POST /report/import + + :Example: + + >>> from fossology import Fossology + >>> + >>> foss = Fossology(FOSS_URL, FOSS_TOKEN) # doctest: +SKIP + >>> job_id = foss.import_report( + ... foss.detail_upload(1), + ... "report.spdx.rdf", + ... ) # doctest: +SKIP + + :param upload: the upload the report is imported for + :param report_file: local path to the report file to import + :param report_format: the report format (default: "spdxrdf" — the only format + currently accepted by the Fossology API) + :param add_concluded_as_decisions: treat concluded licenses in the report + as clearing decisions (default: False) + :param group: the group name to act on behalf of (default: None) + :type upload: Upload + :type report_file: str + :type report_format: str + :type add_concluded_as_decisions: bool + :type group: str | None + :return: the id of the scheduled reportImport job + :rtype: int + :raises FossologyApiError: if the REST call failed + :raises AuthorizationError: if the REST call is not authorized + """ + params = { + "upload": str(upload.id), + "reportFormat": report_format, + "addConcludedAsDecisions": str(add_concluded_as_decisions).lower(), + } + headers = {} + if group: + headers["groupName"] = group + + with open(report_file, "rb") as fp: + response = self.session.post( + f"{self.api}/report/import", + params=params, + headers=headers, + files={"report": fp}, + ) + + upload_ref = f"{upload.uploadname} (id={upload.id})" + + if response.status_code == 201: + return int(response.json()["message"]) + + elif response.status_code == 403: + description = f"Report import for upload {upload_ref} is not authorized" + raise AuthorizationError(description, response) + + else: + description = f"Report import for upload {upload_ref} failed" + raise FossologyApiError(description, response) + @retry(retry=retry_if_exception_type(TryAgain), stop=stop_after_attempt(10)) def download_report( self, report_id: int, group: str | None = None, wait_time: int = 0 diff --git a/tests/test_report.py b/tests/test_report.py index 97e21d3..1eb8f3a 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -47,6 +47,57 @@ def test_generate_report(foss: Fossology, upload: Upload): Path(report_path / report_name).unlink() +def test_import_report(foss: Fossology, upload: Upload, tmp_path: Path): + # `ReportFormat.SPDX2` generates SPDX 2.x in RDF, which is the same on-wire + # format the import endpoint's default "spdxrdf" accepts. Round-trip the + # report file: generate → download → import. + report_id = foss.generate_report(upload, report_format=ReportFormat.SPDX2) + report_content, report_name = foss.download_report(report_id) + report_file = tmp_path / report_name + report_file.write_bytes(report_content) + + job_id = foss.import_report(upload, str(report_file)) + assert isinstance(job_id, int) + assert job_id > 0 + + +@responses.activate +def test_import_report_nogroup( + foss_server: str, foss: Fossology, upload: Upload, tmp_path: Path +): + responses.add( + responses.POST, + f"{foss_server}/api/v1/report/import", + status=403, + ) + report_file = tmp_path / "dummy.rdf" + report_file.write_bytes(b"") + with pytest.raises(AuthorizationError) as excinfo: + foss.import_report(upload, str(report_file), group="test") + assert ( + f"Report import for upload {upload.uploadname} (id={upload.id}) " + f"is not authorized" + ) in str(excinfo.value) + + +@responses.activate +def test_import_report_error( + foss_server: str, foss: Fossology, upload: Upload, tmp_path: Path +): + responses.add( + responses.POST, + f"{foss_server}/api/v1/report/import", + status=500, + ) + report_file = tmp_path / "dummy.rdf" + report_file.write_bytes(b"") + with pytest.raises(FossologyApiError) as excinfo: + foss.import_report(upload, str(report_file)) + assert ( + f"Report import for upload {upload.uploadname} (id={upload.id}) failed" + ) in str(excinfo.value) + + @responses.activate def test_generate_report_unparseable_message( foss_server: str, foss: Fossology, upload: Upload