diff --git a/README.md b/README.md index 1d30bb6b..c9ef9e3e 100644 --- a/README.md +++ b/README.md @@ -15,12 +15,14 @@ Bring the VulnCheck API to your Python applications. - [Resources](#resources) - [Quickstart](#quickstart) - [Examples](#examples) - - [PURL](#purl) - - [CPE](#cpe) + - [Advisory](#advisory) - [Backup](#backup) - - [Indices](#indices) + - [Backup v4](#backup-v4) + - [CPE](#cpe) - [Index](#index) + - [Indices](#indices) - [Pagination](#pagination) + - [PURL](#purl) - [Contributing](#contributing) - [Security](#security) - [Sponsorship](#sponsorship) @@ -167,15 +169,14 @@ if __name__ == "__main__": ## Examples -### PURL +### Advisory -Get the CVE's for a given PURL +List all advisory feeds and query advisories filtered by feed ```python import vulncheck_sdk -from vulncheck_sdk.models.v3controllers_purl_response_data import ( - V3controllersPurlResponseData, -) +from vulncheck_sdk.models.search_v4_advisory_return_value import SearchV4AdvisoryReturnValue +from vulncheck_sdk.models.search_v4_list_feed_return_value import SearchV4ListFeedReturnValue import os TOKEN = os.environ["VULNCHECK_API_TOKEN"] @@ -184,14 +185,20 @@ configuration = vulncheck_sdk.Configuration() configuration.api_key["Bearer"] = TOKEN with vulncheck_sdk.ApiClient(configuration) as api_client: - endpoints_client = vulncheck_sdk.EndpointsApi(api_client) - - purl = "pkg:hex/coherence@0.1.2" - - api_response = endpoints_client.purl_get(purl) - data: V3controllersPurlResponseData = api_response.data - - print(data.cves) + advisory_client = vulncheck_sdk.AdvisoryApi(api_client) + + # List all available advisory feeds (/v4/advisory) + feeds: SearchV4ListFeedReturnValue = advisory_client.v4_list_advisory_feeds() + print("Available feeds:") + for feed in feeds.data: + print(f"name: {feed.name}") + + feed = "wolfi" + # Query advisories filtered by feed=wolfi (/v4/advisory?feed=wolfi) + advisories: SearchV4AdvisoryReturnValue = advisory_client.v4_query_advisories(name=feed) + print(f"{feed.capitalize()} advisories (page 1): {len(advisories.data)} results") + for advisory in advisories.data: + print(f"cve: {advisory.cve_metadata.cve_id}") ``` @@ -201,40 +208,31 @@ with vulncheck_sdk.ApiClient(configuration) as api_client: import asyncio import os import vulncheck_sdk.aio as vcaio -from vulncheck_sdk.aio.models.v3controllers_purl_response_data import ( - V3controllersPurlResponseData, -) +from vulncheck_sdk.aio.models.search_v4_advisory_return_value import SearchV4AdvisoryReturnValue +from vulncheck_sdk.aio.models.search_v4_list_feed_return_value import SearchV4ListFeedReturnValue -# Configuration TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN -async def get_data(client, purl: str): - # Await the client call directly - api_response = await client.purl_get(purl) - - # Access the data attribute from the response object - return api_response.data - - async def main(): async with vcaio.ApiClient(configuration) as api_client: - endpoints_client = vcaio.EndpointsApi(api_client) + advisory_client = vcaio.AdvisoryApi(api_client) - purl = "pkg:hex/coherence@0.1.2" + # List all available advisory feeds (/v4/advisory) + feeds: SearchV4ListFeedReturnValue = await advisory_client.v4_list_advisory_feeds() + print("Available feeds:") + for feed in feeds.data: + print(f"name: {feed.name}") - # 'await' the async function call - data: V3controllersPurlResponseData = await get_data(endpoints_client, purl) - - if data and data.cves: - print(f"Found {len(data.cves)} CVEs:") - for cve in data.cves: - print(f"- {cve}") - else: - print("No CVEs found or data is empty.") + feed = "wolfi" + # Query advisories filtered by feed=wolfi (/v4/advisory?feed=wolfi) + advisories: SearchV4AdvisoryReturnValue = await advisory_client.v4_query_advisories(name=feed) + print(f"{feed.capitalize()} advisories (page 1): {len(advisories.data)} results") + for advisory in advisories.data: + print(f"cve: {advisory.cve_metadata.cve_id}") if __name__ == "__main__": @@ -244,11 +242,12 @@ if __name__ == "__main__": -### CPE +### Backup -Get all CPE's related to a CVE +Download the backup for an index ```python +import urllib.request import vulncheck_sdk import os @@ -260,12 +259,14 @@ configuration.api_key["Bearer"] = TOKEN with vulncheck_sdk.ApiClient(configuration) as api_client: endpoints_client = vulncheck_sdk.EndpointsApi(api_client) - cpe = "cpe:/a:microsoft:internet_explorer:8.0.6001:beta" + index = "initial-access" - api_response = endpoints_client.cpe_get(cpe) + api_response = endpoints_client.backup_index_get(index) - for cve in api_response.data: - print(cve) + file_path = f"{index}.zip" + with urllib.request.urlopen(api_response.data[0].url) as response: + with open(file_path, "wb") as file: + file.write(response.read()) ``` @@ -274,6 +275,7 @@ with vulncheck_sdk.ApiClient(configuration) as api_client: ```python import asyncio import os +import urllib.request import vulncheck_sdk.aio as vcaio # Configuration @@ -283,39 +285,56 @@ configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN -async def get_cpe_vulnerabilities(): - # 'async with' to manage the connection life-cycle +def download_sync(url, file_path): + """ + Standard synchronous download using urllib.request. + This runs in a separate thread to avoid blocking the event loop. + """ + with urllib.request.urlopen(url) as response: + with open(file_path, "wb") as file: + file.write(response.read()) + + +async def main(): + # Use 'async with' to manage the connection life-cycle async with vcaio.ApiClient(configuration) as api_client: endpoints_client = vcaio.EndpointsApi(api_client) + index = "initial-access" - cpe = "cpe:/a:microsoft:internet_explorer:8.0.6001:beta" + # 'await' the coroutine to get the actual response data + api_response = await endpoints_client.backup_index_get(index) + + if not api_response.data: + print("No backup URL found.") + return + + download_url = api_response.data[0].url + file_path = f"{index}.zip" + + print(f"Downloading {index} via urllib (offloaded to thread)...") + # Use asyncio.to_thread to run the blocking call safely # 'await' the coroutine to get the actual response data - api_response = await endpoints_client.cpe_get(cpe) + await asyncio.to_thread(download_sync, download_url, file_path) - # Iterate through the results - if api_response.data: - for cve in api_response.data: - print(cve) - else: - print(f"No vulnerabilities found for CPE: {cpe}") + print(f"Successfully saved to {file_path}") if __name__ == "__main__": - # Run the main async entry point - asyncio.run(get_cpe_vulnerabilities()) + asyncio.run(main()) ``` -### Backup - -Download the backup for an index +### Backup v4 +List available v4 backups and download a backup by feed name ```python import urllib.request import vulncheck_sdk +from vulncheck_sdk.models.backup_list_backups_response import BackupListBackupsResponse +from vulncheck_sdk.models.backup_feed_item import BackupFeedItem import os TOKEN = os.environ["VULNCHECK_API_TOKEN"] @@ -324,16 +343,25 @@ configuration = vulncheck_sdk.Configuration() configuration.api_key["Bearer"] = TOKEN with vulncheck_sdk.ApiClient(configuration) as api_client: - endpoints_client = vulncheck_sdk.EndpointsApi(api_client) + backup_client = vulncheck_sdk.BackupApi(api_client) - index = "initial-access" + # List available backups (/v4/backup) + available: BackupListBackupsResponse = backup_client.v4_list_backups() - api_response = endpoints_client.backup_index_get(index) + for potential_backup in available.data: + print(f"Found backup: {potential_backup.name}") - file_path = f"{index}.zip" - with urllib.request.urlopen(api_response.data[0].url) as response: - with open(file_path, "wb") as file: - file.write(response.read()) + # Get backup for the wolfi feed (/v4/backup/wolfi) + feed = "wolfi" + response: BackupListBackupsResponse = backup_client.v4_get_backup_by_name(feed) + + print(f"Downloading {feed} backup") + file_path = f"{feed}.zip" + with urllib.request.urlopen(response.url) as r: + with open(file_path, "wb") as f: + f.write(r.read()) + + print(f"Successfully saved to {file_path}") ``` @@ -344,8 +372,9 @@ import asyncio import os import urllib.request import vulncheck_sdk.aio as vcaio +from vulncheck_sdk.aio.models.backup_list_backups_response import BackupListBackupsResponse +from vulncheck_sdk.aio.models.backup_backup_response import BackupBackupResponse -# Configuration TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() @@ -363,26 +392,23 @@ def download_sync(url, file_path): async def main(): - # Use 'async with' to manage the connection life-cycle async with vcaio.ApiClient(configuration) as api_client: - endpoints_client = vcaio.EndpointsApi(api_client) - index = "initial-access" + backup_client = vcaio.BackupApi(api_client) - # 'await' the coroutine to get the actual response data - api_response = await endpoints_client.backup_index_get(index) + # List available backups (/v4/backup) + available: BackupListBackupsResponse = await backup_client.v4_list_backups() + for potential_backup in available.data: + print(f"Found backup: {potential_backup.name}") - if not api_response.data: - print("No backup URL found.") - return + # Get backup for the wolfi feed (/v4/backup/wolfi) + feed = "wolfi" + response: BackupBackupResponse = await backup_client.v4_get_backup_by_name(feed) - download_url = api_response.data[0].url - file_path = f"{index}.zip" - print(f"Downloading {index} via urllib (offloaded to thread)...") + file_path = f"{feed}.zip" + print(f"Downloading {feed} backup via urllib (offloaded to thread)...") - # Use asyncio.to_thread to run the blocking call safely - # 'await' the coroutine to get the actual response data - await asyncio.to_thread(download_sync, download_url, file_path) + await asyncio.to_thread(download_sync, response.url, file_path) print(f"Successfully saved to {file_path}") @@ -394,9 +420,9 @@ if __name__ == "__main__": -### Indices +### CPE -Get all available indices +Get all CPE's related to a CVE ```python import vulncheck_sdk @@ -410,10 +436,12 @@ configuration.api_key["Bearer"] = TOKEN with vulncheck_sdk.ApiClient(configuration) as api_client: endpoints_client = vulncheck_sdk.EndpointsApi(api_client) - api_response = endpoints_client.index_get() + cpe = "cpe:/a:microsoft:internet_explorer:8.0.6001:beta" - for index in api_response.data: - print(index.name) + api_response = endpoints_client.cpe_get(cpe) + + for cve in api_response.data: + print(cve) ``` @@ -431,27 +459,27 @@ configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN -async def list_indices(): - # Use 'async with' to manage the connection life-cycle +async def get_cpe_vulnerabilities(): + # 'async with' to manage the connection life-cycle async with vcaio.ApiClient(configuration) as api_client: endpoints_client = vcaio.EndpointsApi(api_client) - # 'await' the coroutine to get the actual response - api_response = await endpoints_client.index_get() + cpe = "cpe:/a:microsoft:internet_explorer:8.0.6001:beta" + + # 'await' the coroutine to get the actual response data + api_response = await endpoints_client.cpe_get(cpe) # Iterate through the results if api_response.data: - print(f"{'Index Name':<30} | {'Description'}") - print("-" * 50) - for index in api_response.data: - print(f"{index.name:<30}") + for cve in api_response.data: + print(cve) else: - print("No indices found.") + print(f"No vulnerabilities found for CPE: {cpe}") if __name__ == "__main__": - # 4. Entry point to run the asynchronous event loop - asyncio.run(list_indices()) + # Run the main async entry point + asyncio.run(get_cpe_vulnerabilities()) ``` @@ -516,6 +544,69 @@ if __name__ == "__main__": ``` + + +### Indices + +Get all available indices + +```python +import vulncheck_sdk +import os + +TOKEN = os.environ["VULNCHECK_API_TOKEN"] + +configuration = vulncheck_sdk.Configuration() +configuration.api_key["Bearer"] = TOKEN + +with vulncheck_sdk.ApiClient(configuration) as api_client: + endpoints_client = vulncheck_sdk.EndpointsApi(api_client) + + api_response = endpoints_client.index_get() + + for index in api_response.data: + print(index.name) +``` + + +
Click to View Async Implementation + +```python +import asyncio +import os +import vulncheck_sdk.aio as vcaio + +# Configuration +TOKEN = os.environ.get("VULNCHECK_API_TOKEN") + +configuration = vcaio.Configuration() +configuration.api_key["Bearer"] = TOKEN + + +async def list_indices(): + # Use 'async with' to manage the connection life-cycle + async with vcaio.ApiClient(configuration) as api_client: + endpoints_client = vcaio.EndpointsApi(api_client) + + # 'await' the coroutine to get the actual response + api_response = await endpoints_client.index_get() + + # Iterate through the results + if api_response.data: + print(f"{'Index Name':<30} | {'Description'}") + print("-" * 50) + for index in api_response.data: + print(f"{index.name:<30}") + else: + print("No indices found.") + + +if __name__ == "__main__": + # 4. Entry point to run the asynchronous event loop + asyncio.run(list_indices()) +``` + +
### Pagination @@ -599,6 +690,83 @@ if __name__ == "__main__": ``` + + +### PURL + +Get the CVE's for a given PURL + +```python +import vulncheck_sdk +from vulncheck_sdk.models.v3controllers_purl_response_data import ( + V3controllersPurlResponseData, +) +import os + +TOKEN = os.environ["VULNCHECK_API_TOKEN"] + +configuration = vulncheck_sdk.Configuration() +configuration.api_key["Bearer"] = TOKEN + +with vulncheck_sdk.ApiClient(configuration) as api_client: + endpoints_client = vulncheck_sdk.EndpointsApi(api_client) + + purl = "pkg:hex/coherence@0.1.2" + + api_response = endpoints_client.purl_get(purl) + data: V3controllersPurlResponseData = api_response.data + + print(data.cves) +``` + + +
Click to View Async Implementation + +```python +import asyncio +import os +import vulncheck_sdk.aio as vcaio +from vulncheck_sdk.aio.models.v3controllers_purl_response_data import ( + V3controllersPurlResponseData, +) + +# Configuration +TOKEN = os.environ.get("VULNCHECK_API_TOKEN") + +configuration = vcaio.Configuration() +configuration.api_key["Bearer"] = TOKEN + + +async def get_data(client, purl: str): + # Await the client call directly + api_response = await client.purl_get(purl) + + # Access the data attribute from the response object + return api_response.data + + +async def main(): + async with vcaio.ApiClient(configuration) as api_client: + endpoints_client = vcaio.EndpointsApi(api_client) + + purl = "pkg:hex/coherence@0.1.2" + + # 'await' the async function call + data: V3controllersPurlResponseData = await get_data(endpoints_client, purl) + + if data and data.cves: + print(f"Found {len(data.cves)} CVEs:") + for cve in data.cves: + print(f"- {cve}") + else: + print("No CVEs found or data is empty.") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +
## Contributing diff --git a/README.template b/README.template index c82e0da1..a22e44b9 100644 --- a/README.template +++ b/README.template @@ -15,12 +15,14 @@ Bring the VulnCheck API to your Python applications. - [Resources](#resources) - [Quickstart](#quickstart) - [Examples](#examples) - - [PURL](#purl) - - [CPE](#cpe) + - [Advisory](#advisory) - [Backup](#backup) - - [Indices](#indices) + - [Backup v4](#backup-v4) + - [CPE](#cpe) - [Index](#index) + - [Indices](#indices) - [Pagination](#pagination) + - [PURL](#purl) - [Contributing](#contributing) - [Security](#security) - [Sponsorship](#sponsorship) @@ -65,13 +67,13 @@ with open("./tests/quickstart_aio.py", 'r') as f: ## Examples -### PURL +### Advisory -Get the CVE's for a given PURL +List all advisory feeds and query advisories filtered by feed @@ -80,7 +82,50 @@ with open("./tests/purl.py", 'r') as f: + + + + +### Backup + +Download the backup for an index + + + + +
Click to View Async Implementation + + + + +
+ +### Backup v4 +List available v4 backups and download a backup by feed name + + + + +
Click to View Async Implementation + + @@ -109,13 +154,13 @@ with open("./tests/cpe_aio.py", 'r') as f:
-### Backup +### Index -Download the backup for an index +Query VulnCheck-NVD2 for `CVE-2019-19781` @@ -124,7 +169,7 @@ with open("./tests/backup.py", 'r') as f: @@ -153,13 +198,13 @@ with open("./tests/indicies_aio.py", 'r') as f: -### Index +### Pagination -Query VulnCheck-NVD2 for `CVE-2019-19781` +Paginate over results for a query to VulnCheck-KEV using `cursor` @@ -168,20 +213,20 @@ with open("./tests/index.py", 'r') as f: -### Pagination +### PURL -Paginate over results for a query to VulnCheck-KEV using `cursor` +Get the CVE's for a given PURL @@ -190,7 +235,7 @@ with open("./tests/pagination.py", 'r') as f: