diff --git a/.env.example b/.env.example index ef6f26dd..2fa86e9c 100644 --- a/.env.example +++ b/.env.example @@ -29,6 +29,9 @@ # -- Perplexity -- # PERPLEXITY_API_KEY= +# -- Power BI -- +# POWERBI_ACCESS_TOKEN= + # -- Shopify Customer -- # SHOPIFY_CUSTOMER_ACCESS_TOKEN= # SHOPIFY_CUSTOMER_SHOP_URL= diff --git a/powerbi/config.json b/powerbi/config.json index f3b6de31..21917f1d 100644 --- a/powerbi/config.json +++ b/powerbi/config.json @@ -1,6 +1,6 @@ { "name": "Power BI", - "version": "1.0.0", + "version": "2.0.0", "description": "Power BI integration for managing workspaces, datasets, reports, and dashboards", "entry_point": "powerbi.py", "auth": { @@ -60,14 +60,10 @@ } } } - }, - "result": { - "type": "boolean" } }, "required": [ - "workspaces", - "result" + "workspaces" ] } }, @@ -92,14 +88,10 @@ "workspace": { "type": "object", "description": "Workspace details" - }, - "result": { - "type": "boolean" } }, "required": [ - "workspace", - "result" + "workspace" ] } }, @@ -131,7 +123,7 @@ "type": "string" }, "configuredBy": { - "type": "string" + "type": ["string", "null"] }, "isRefreshable": { "type": "boolean" @@ -147,14 +139,10 @@ } } } - }, - "result": { - "type": "boolean" } }, "required": [ - "datasets", - "result" + "datasets" ] } }, @@ -183,14 +171,10 @@ "dataset": { "type": "object", "description": "Dataset details" - }, - "result": { - "type": "boolean" } }, "required": [ - "dataset", - "result" + "dataset" ] } }, @@ -286,18 +270,15 @@ "output_schema": { "type": "object", "properties": { - "result": { - "type": "boolean" - }, "message": { "type": "string" }, "request_id": { - "type": "string" + "type": ["string", "null"] } }, "required": [ - "result" + "message" ] } }, @@ -335,30 +316,26 @@ "type": "object", "properties": { "refreshType": { - "type": "string" + "type": ["string", "null"] }, "startTime": { - "type": "string" + "type": ["string", "null"] }, "endTime": { - "type": "string" + "type": ["string", "null"] }, "status": { - "type": "string" + "type": ["string", "null"] }, "requestId": { - "type": "string" + "type": ["string", "null"] } } } - }, - "result": { - "type": "boolean" } }, "required": [ - "refreshes", - "result" + "refreshes" ] } }, @@ -390,24 +367,20 @@ "type": "string" }, "webUrl": { - "type": "string" + "type": ["string", "null"] }, "embedUrl": { - "type": "string" + "type": ["string", "null"] }, "datasetId": { - "type": "string" + "type": ["string", "null"] } } } - }, - "result": { - "type": "boolean" } }, "required": [ - "reports", - "result" + "reports" ] } }, @@ -436,14 +409,10 @@ "report": { "type": "object", "description": "Report details" - }, - "result": { - "type": "boolean" } }, "required": [ - "report", - "result" + "report" ] } }, @@ -476,19 +445,19 @@ "type": "object", "properties": { "datasourceType": { - "type": "string" + "type": ["string", "null"] }, "datasourceId": { - "type": "string" + "type": ["string", "null"] }, "gatewayId": { - "type": "string" + "type": ["string", "null"] }, "name": { - "type": "string" + "type": ["string", "null"] }, "connectionString": { - "type": "string" + "type": ["string", "null"] }, "connectionDetails": { "type": "object", @@ -496,14 +465,10 @@ } } } - }, - "result": { - "type": "boolean" } }, "required": [ - "datasources", - "result" + "datasources" ] } }, @@ -553,14 +518,8 @@ }, "embedUrl": { "type": "string" - }, - "result": { - "type": "boolean" } - }, - "required": [ - "result" - ] + } } }, "refresh_report": { @@ -594,9 +553,6 @@ "output_schema": { "type": "object", "properties": { - "result": { - "type": "boolean" - }, "message": { "type": "string" }, @@ -605,7 +561,7 @@ } }, "required": [ - "result" + "message" ] } }, @@ -644,15 +600,12 @@ "export_id": { "type": "string" }, - "result": { - "type": "boolean" - }, "message": { "type": "string" } }, "required": [ - "result" + "message" ] } }, @@ -688,14 +641,10 @@ }, "percentComplete": { "type": "integer" - }, - "result": { - "type": "boolean" } }, "required": [ - "status", - "result" + "status" ] } }, @@ -724,24 +673,20 @@ "type": "string" }, "displayName": { - "type": "string" + "type": ["string", "null"] }, "isReadOnly": { "type": "boolean" }, "embedUrl": { - "type": "string" + "type": ["string", "null"] } } } - }, - "result": { - "type": "boolean" } }, "required": [ - "dashboards", - "result" + "dashboards" ] } }, @@ -770,14 +715,10 @@ "dashboard": { "type": "object", "description": "Dashboard details" - }, - "result": { - "type": "boolean" } }, "required": [ - "dashboard", - "result" + "dashboard" ] } }, @@ -813,27 +754,23 @@ "type": "string" }, "title": { - "type": "string" + "type": ["string", "null"] }, "embedUrl": { - "type": "string" + "type": ["string", "null"] }, "datasetId": { - "type": "string" + "type": ["string", "null"] }, "reportId": { - "type": "string" + "type": ["string", "null"] } } } - }, - "result": { - "type": "boolean" } }, "required": [ - "tiles", - "result" + "tiles" ] } }, @@ -876,17 +813,13 @@ "results": { "type": "array", "description": "Query results" - }, - "result": { - "type": "boolean" } }, "required": [ - "results", - "result" + "results" ] } } }, "display_name": "Power BI" -} \ No newline at end of file +} diff --git a/powerbi/powerbi.py b/powerbi/powerbi.py index c19d5cde..bd918f78 100644 --- a/powerbi/powerbi.py +++ b/powerbi/powerbi.py @@ -1,4 +1,4 @@ -from autohive_integrations_sdk import Integration, ExecutionContext, ActionHandler +from autohive_integrations_sdk import Integration, ExecutionContext, ActionHandler, ActionResult, ActionError from typing import Dict, Any # Create the integration using the config.json @@ -25,7 +25,7 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(f"{POWERBI_API_BASE}/groups", params=params) workspaces = [] - for workspace in response.get("value", []): + for workspace in response.data.get("value", []): workspaces.append( { "id": workspace.get("id"), @@ -36,10 +36,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): } ) - return {"workspaces": workspaces, "result": True} + return ActionResult(data={"workspaces": workspaces}, cost_usd=0.0) except Exception as e: - return {"workspaces": [], "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("get_workspace") @@ -50,10 +50,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(f"{POWERBI_API_BASE}/groups/{workspace_id}") - return {"workspace": response, "result": True} + return ActionResult(data={"workspace": response.data}, cost_usd=0.0) except Exception as e: - return {"workspace": {}, "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("list_datasets") @@ -70,7 +70,7 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url) datasets = [] - for dataset in response.get("value", []): + for dataset in response.data.get("value", []): datasets.append( { "id": dataset.get("id"), @@ -83,10 +83,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): } ) - return {"datasets": datasets, "result": True} + return ActionResult(data={"datasets": datasets}, cost_usd=0.0) except Exception as e: - return {"datasets": [], "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("get_dataset") @@ -103,10 +103,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url) - return {"dataset": response, "result": True} + return ActionResult(data={"dataset": response.data}, cost_usd=0.0) except Exception as e: - return {"dataset": {}, "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("refresh_dataset") @@ -161,13 +161,16 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): # Extract request ID from response headers if available request_id = None - if hasattr(response, "headers") and "x-ms-request-id" in response.headers: + if response.headers and "x-ms-request-id" in response.headers: request_id = response.headers["x-ms-request-id"] - return {"result": True, "message": "Dataset refresh initiated successfully", "request_id": request_id} + return ActionResult( + data={"message": "Dataset refresh initiated successfully", "request_id": request_id}, + cost_usd=0.0, + ) except Exception as e: - return {"result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("get_refresh_history") @@ -188,7 +191,7 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url, params=params) refreshes = [] - for refresh in response.get("value", []): + for refresh in response.data.get("value", []): refreshes.append( { "refreshType": refresh.get("refreshType"), @@ -199,10 +202,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): } ) - return {"refreshes": refreshes, "result": True} + return ActionResult(data={"refreshes": refreshes}, cost_usd=0.0) except Exception as e: - return {"refreshes": [], "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("list_reports") @@ -219,7 +222,7 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url) reports = [] - for report in response.get("value", []): + for report in response.data.get("value", []): reports.append( { "id": report.get("id"), @@ -230,10 +233,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): } ) - return {"reports": reports, "result": True} + return ActionResult(data={"reports": reports}, cost_usd=0.0) except Exception as e: - return {"reports": [], "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("get_report") @@ -250,10 +253,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url) - return {"report": response, "result": True} + return ActionResult(data={"report": response.data}, cost_usd=0.0) except Exception as e: - return {"report": {}, "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("get_report_datasources") @@ -271,7 +274,7 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url) datasources = [] - for datasource in response.get("value", []): + for datasource in response.data.get("value", []): ds_data = { "datasourceType": datasource.get("datasourceType"), "datasourceId": datasource.get("datasourceId"), @@ -286,10 +289,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): datasources.append(ds_data) - return {"datasources": datasources, "result": True} + return ActionResult(data={"datasources": datasources}, cost_usd=0.0) except Exception as e: - return {"datasources": [], "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("refresh_report") @@ -307,10 +310,11 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): report_url = f"{POWERBI_API_BASE}/reports/{report_id}" report_response = await context.fetch(report_url) - dataset_id = report_response.get("datasetId") + report_data = report_response.data + dataset_id = report_data.get("datasetId") if not dataset_id: - return {"result": False, "error": "Report does not have an associated dataset"} + return ActionError(message="Report does not have an associated dataset") # Now refresh the dataset if workspace_id: @@ -322,14 +326,16 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): await context.fetch(refresh_url, method="POST", json=refresh_request) - return { - "result": True, - "message": f"Dataset refresh initiated successfully for report '{report_response.get('name')}'", - "dataset_id": dataset_id, - } + return ActionResult( + data={ + "message": f"Dataset refresh initiated successfully for report '{report_data.get('name')}'", + "dataset_id": dataset_id, + }, + cost_usd=0.0, + ) except Exception as e: - return {"result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("clone_report") @@ -357,16 +363,18 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url, method="POST", json=clone_request) - return { - "id": response.get("id"), - "name": response.get("name"), - "webUrl": response.get("webUrl"), - "embedUrl": response.get("embedUrl"), - "result": True, - } + return ActionResult( + data={ + "id": response.data.get("id"), + "name": response.data.get("name"), + "webUrl": response.data.get("webUrl"), + "embedUrl": response.data.get("embedUrl"), + }, + cost_usd=0.0, + ) except Exception as e: - return {"result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("export_report") @@ -386,10 +394,13 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url, method="POST", json=export_request) - return {"export_id": response.get("id"), "result": True, "message": "Export initiated successfully"} + return ActionResult( + data={"export_id": response.data.get("id"), "message": "Export initiated successfully"}, + cost_usd=0.0, + ) except Exception as e: - return {"result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("get_export_status") @@ -407,14 +418,16 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url) - return { - "status": response.get("status"), - "percentComplete": response.get("percentComplete", 0), - "result": True, - } + return ActionResult( + data={ + "status": response.data.get("status"), + "percentComplete": response.data.get("percentComplete", 0), + }, + cost_usd=0.0, + ) except Exception as e: - return {"status": "Failed", "percentComplete": 0, "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("list_dashboards") @@ -431,7 +444,7 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url) dashboards = [] - for dashboard in response.get("value", []): + for dashboard in response.data.get("value", []): dashboards.append( { "id": dashboard.get("id"), @@ -441,10 +454,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): } ) - return {"dashboards": dashboards, "result": True} + return ActionResult(data={"dashboards": dashboards}, cost_usd=0.0) except Exception as e: - return {"dashboards": [], "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("get_dashboard") @@ -461,10 +474,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url) - return {"dashboard": response, "result": True} + return ActionResult(data={"dashboard": response.data}, cost_usd=0.0) except Exception as e: - return {"dashboard": {}, "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("get_dashboard_tiles") @@ -482,7 +495,7 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url) tiles = [] - for tile in response.get("value", []): + for tile in response.data.get("value", []): tiles.append( { "id": tile.get("id"), @@ -493,10 +506,10 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): } ) - return {"tiles": tiles, "result": True} + return ActionResult(data={"tiles": tiles}, cost_usd=0.0) except Exception as e: - return {"tiles": [], "result": False, "error": str(e)} + return ActionError(message=str(e)) @powerbi.action("execute_queries") @@ -516,7 +529,7 @@ async def execute(self, inputs: Dict[str, Any], context: ExecutionContext): response = await context.fetch(url, method="POST", json=query_request) - return {"results": response.get("results", []), "result": True} + return ActionResult(data={"results": response.data.get("results", [])}, cost_usd=0.0) except Exception as e: - return {"results": [], "result": False, "error": str(e)} + return ActionError(message=str(e)) diff --git a/powerbi/requirements.txt b/powerbi/requirements.txt index b56fee2e..1af9591f 100644 --- a/powerbi/requirements.txt +++ b/powerbi/requirements.txt @@ -1 +1 @@ -autohive-integrations-sdk~=1.0.2 +autohive-integrations-sdk~=2.0.0 diff --git a/powerbi/tests/conftest.py b/powerbi/tests/conftest.py new file mode 100644 index 00000000..2e22f0b7 --- /dev/null +++ b/powerbi/tests/conftest.py @@ -0,0 +1,20 @@ +import os +import sys +from unittest.mock import AsyncMock, MagicMock + +import pytest + +# Make powerbi.py importable as a top-level module. +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + + +@pytest.fixture +def mock_context(): + """Mock execution context with the platform OAuth shape Power BI expects.""" + ctx = MagicMock(name="ExecutionContext") + ctx.fetch = AsyncMock(name="fetch") + ctx.auth = { + "auth_type": "PlatformOauth2", + "credentials": {"access_token": "test_token"}, # nosec B105 + } + return ctx diff --git a/powerbi/tests/context.py b/powerbi/tests/context.py deleted file mode 100644 index c37777cf..00000000 --- a/powerbi/tests/context.py +++ /dev/null @@ -1,18 +0,0 @@ -""" -Test Context Module - -This module sets up the necessary paths and imports for testing the Power BI integration. -It ensures that both the integration module and its dependencies are accessible to test files. -""" - -import os -import sys - -# Add parent directory to path so we can import the integration module -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - -# Add dependencies directory to path for third-party packages -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../dependencies"))) - -# Import the Power BI integration instance -from powerbi import powerbi diff --git a/powerbi/tests/test_powerbi_dashboards_unit.py b/powerbi/tests/test_powerbi_dashboards_unit.py new file mode 100644 index 00000000..064c86f4 --- /dev/null +++ b/powerbi/tests/test_powerbi_dashboards_unit.py @@ -0,0 +1,167 @@ +import os +import sys +import importlib.util + +_parent = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +_deps = os.path.abspath(os.path.join(os.path.dirname(__file__), "../dependencies")) +sys.path.insert(0, _parent) +sys.path.insert(0, _deps) + +import pytest # noqa: E402 +from unittest.mock import AsyncMock, MagicMock # noqa: E402 +from autohive_integrations_sdk import FetchResponse # noqa: E402 +from autohive_integrations_sdk.integration import ResultType # noqa: E402 + +_spec = importlib.util.spec_from_file_location("powerbi_mod", os.path.join(_parent, "powerbi.py")) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) + +powerbi = _mod.powerbi + +pytestmark = pytest.mark.unit + +POWERBI_API_BASE = "https://api.powerbi.com/v1.0/myorg" + + +@pytest.fixture +def mock_context(): + ctx = MagicMock(name="ExecutionContext") + ctx.fetch = AsyncMock(name="fetch") + ctx.auth = { + "auth_type": "PlatformOauth2", + "credentials": {"access_token": "test_token"}, # nosec B105 + } + return ctx + + +# ---- Dashboards ---- + + +class TestListDashboards: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, + headers={}, + data={"value": [{"id": "db-1", "displayName": "Sales Dashboard", "isReadOnly": False}]}, + ) + + result = await powerbi.execute_action("list_dashboards", {}, mock_context) + + assert result.result.data["dashboards"][0]["id"] == "db-1" + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("list_dashboards", {"workspace_id": "ws-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/dashboards" + + @pytest.mark.asyncio + async def test_url_without_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("list_dashboards", {}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/dashboards" + + @pytest.mark.asyncio + async def test_empty_list(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + result = await powerbi.execute_action("list_dashboards", {}, mock_context) + + assert result.result.data["dashboards"] == [] + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Unauthorized") + + result = await powerbi.execute_action("list_dashboards", {}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestGetDashboard: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + db = {"id": "db-1", "displayName": "Sales Dashboard"} + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data=db) + + result = await powerbi.execute_action("get_dashboard", {"dashboard_id": "db-1"}, mock_context) + + assert result.result.data["dashboard"]["id"] == "db-1" + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={}) + + await powerbi.execute_action("get_dashboard", {"dashboard_id": "db-1", "workspace_id": "ws-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/dashboards/db-1" + + @pytest.mark.asyncio + async def test_url_without_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={}) + + await powerbi.execute_action("get_dashboard", {"dashboard_id": "db-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/dashboards/db-1" + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Not found") + + result = await powerbi.execute_action("get_dashboard", {"dashboard_id": "db-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestGetDashboardTiles: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, + headers={}, + data={"value": [{"id": "tile-1", "title": "Revenue", "datasetId": "ds-1", "reportId": "rpt-1"}]}, + ) + + result = await powerbi.execute_action("get_dashboard_tiles", {"dashboard_id": "db-1"}, mock_context) + + assert result.result.data["tiles"][0]["id"] == "tile-1" + assert result.result.data["tiles"][0]["title"] == "Revenue" + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action( + "get_dashboard_tiles", {"dashboard_id": "db-1", "workspace_id": "ws-1"}, mock_context + ) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/dashboards/db-1/tiles" + + @pytest.mark.asyncio + async def test_url_without_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("get_dashboard_tiles", {"dashboard_id": "db-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/dashboards/db-1/tiles" + + @pytest.mark.asyncio + async def test_empty_tiles(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + result = await powerbi.execute_action("get_dashboard_tiles", {"dashboard_id": "db-1"}, mock_context) + + assert result.result.data["tiles"] == [] + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("API error") + + result = await powerbi.execute_action("get_dashboard_tiles", {"dashboard_id": "db-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR diff --git a/powerbi/tests/test_powerbi_datasets_unit.py b/powerbi/tests/test_powerbi_datasets_unit.py new file mode 100644 index 00000000..597a4a8e --- /dev/null +++ b/powerbi/tests/test_powerbi_datasets_unit.py @@ -0,0 +1,240 @@ +import os +import sys +import importlib.util + +_parent = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +_deps = os.path.abspath(os.path.join(os.path.dirname(__file__), "../dependencies")) +sys.path.insert(0, _parent) +sys.path.insert(0, _deps) + +import pytest # noqa: E402 +from unittest.mock import AsyncMock, MagicMock # noqa: E402 +from autohive_integrations_sdk import FetchResponse # noqa: E402 +from autohive_integrations_sdk.integration import ResultType # noqa: E402 + +_spec = importlib.util.spec_from_file_location("powerbi_mod", os.path.join(_parent, "powerbi.py")) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) + +powerbi = _mod.powerbi + +pytestmark = pytest.mark.unit + +POWERBI_API_BASE = "https://api.powerbi.com/v1.0/myorg" + + +@pytest.fixture +def mock_context(): + ctx = MagicMock(name="ExecutionContext") + ctx.fetch = AsyncMock(name="fetch") + ctx.auth = { + "auth_type": "PlatformOauth2", + "credentials": {"access_token": "test_token"}, # nosec B105 + } + return ctx + + +# ---- Datasets ---- + + +class TestListDatasets: + @pytest.mark.asyncio + async def test_happy_path_no_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, + headers={}, + data={ + "value": [{"id": "ds-1", "name": "Sales DS", "configuredBy": "user@test.com", "isRefreshable": True}] + }, + ) + + result = await powerbi.execute_action("list_datasets", {}, mock_context) + + assert result.result.data["datasets"][0]["id"] == "ds-1" + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("list_datasets", {"workspace_id": "ws-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/datasets" + + @pytest.mark.asyncio + async def test_url_without_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("list_datasets", {}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/datasets" + + @pytest.mark.asyncio + async def test_empty_list(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + result = await powerbi.execute_action("list_datasets", {}, mock_context) + + assert result.result.data["datasets"] == [] + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Unauthorized") + + result = await powerbi.execute_action("list_datasets", {}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestGetDataset: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + ds = {"id": "ds-1", "name": "Sales"} + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data=ds) + + result = await powerbi.execute_action("get_dataset", {"dataset_id": "ds-1"}, mock_context) + + assert result.result.data["dataset"]["id"] == "ds-1" + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={}) + + await powerbi.execute_action("get_dataset", {"dataset_id": "ds-1", "workspace_id": "ws-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/datasets/ds-1" + + @pytest.mark.asyncio + async def test_url_without_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={}) + + await powerbi.execute_action("get_dataset", {"dataset_id": "ds-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/datasets/ds-1" + + @pytest.mark.asyncio + async def test_response_shape(self, mock_context): + ds = {"id": "ds-1", "name": "Sales", "isRefreshable": True} + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data=ds) + + result = await powerbi.execute_action("get_dataset", {"dataset_id": "ds-1"}, mock_context) + + assert "dataset" in result.result.data + assert result.result.data["dataset"]["name"] == "Sales" + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Not found") + + result = await powerbi.execute_action("get_dataset", {"dataset_id": "ds-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestRefreshDataset: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=202, headers={}, data=None) + + result = await powerbi.execute_action("refresh_dataset", {"dataset_id": "ds-1"}, mock_context) + + assert result.result.data["message"] == "Dataset refresh initiated successfully" + + @pytest.mark.asyncio + async def test_request_method_is_post(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=202, headers={}, data=None) + + await powerbi.execute_action("refresh_dataset", {"dataset_id": "ds-1"}, mock_context) + + assert mock_context.fetch.call_args.kwargs["method"] == "POST" + + @pytest.mark.asyncio + async def test_default_notify_option(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=202, headers={}, data=None) + + await powerbi.execute_action("refresh_dataset", {"dataset_id": "ds-1"}, mock_context) + + payload = mock_context.fetch.call_args.kwargs["json"] + assert payload["notifyOption"] == "NoNotification" + + @pytest.mark.asyncio + async def test_notify_option_passed(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=202, headers={}, data=None) + + await powerbi.execute_action( + "refresh_dataset", {"dataset_id": "ds-1", "notify_option": "MailOnFailure"}, mock_context + ) + + payload = mock_context.fetch.call_args.kwargs["json"] + assert payload["notifyOption"] == "MailOnFailure" + + @pytest.mark.asyncio + async def test_request_id_from_headers(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=202, headers={"x-ms-request-id": "req-123"}, data=None) + + result = await powerbi.execute_action("refresh_dataset", {"dataset_id": "ds-1"}, mock_context) + + assert result.result.data["request_id"] == "req-123" + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=202, headers={}, data=None) + + await powerbi.execute_action("refresh_dataset", {"dataset_id": "ds-1", "workspace_id": "ws-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/datasets/ds-1/refreshes" + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("API error") + + result = await powerbi.execute_action("refresh_dataset", {"dataset_id": "ds-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestGetRefreshHistory: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, + headers={}, + data={"value": [{"refreshType": "Full", "status": "Completed", "startTime": "2024-01-01T00:00:00Z"}]}, + ) + + result = await powerbi.execute_action("get_refresh_history", {"dataset_id": "ds-1"}, mock_context) + + assert result.result.data["refreshes"][0]["refreshType"] == "Full" + + @pytest.mark.asyncio + async def test_top_param_sent(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("get_refresh_history", {"dataset_id": "ds-1", "top": 5}, mock_context) + + assert mock_context.fetch.call_args.kwargs["params"]["$top"] == 5 + + @pytest.mark.asyncio + async def test_default_top_is_10(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("get_refresh_history", {"dataset_id": "ds-1"}, mock_context) + + assert mock_context.fetch.call_args.kwargs["params"]["$top"] == 10 + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action( + "get_refresh_history", {"dataset_id": "ds-1", "workspace_id": "ws-1"}, mock_context + ) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/datasets/ds-1/refreshes" + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Timeout") + + result = await powerbi.execute_action("get_refresh_history", {"dataset_id": "ds-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR diff --git a/powerbi/tests/test_powerbi_integration.py b/powerbi/tests/test_powerbi_integration.py index 106042a1..6a0e6d9e 100644 --- a/powerbi/tests/test_powerbi_integration.py +++ b/powerbi/tests/test_powerbi_integration.py @@ -1,590 +1,144 @@ """ -Power BI Integration Test Suite +Live integration tests for the Power BI integration. -This module contains comprehensive unit tests for the Power BI integration, -covering all action handlers and their various edge cases. +Requires POWERBI_ACCESS_TOKEN set in the environment or project .env. -Test Categories: -- Workspace Management: List, Get -- Dataset Operations: List, Get, Refresh, Refresh History -- Report Management: List, Get, Clone, Export, Export Status, Refresh, Datasources -- Dashboard Operations: List, Get, Get Tiles -- Query Execution: Execute DAX/MDX queries -- Error Handling: General exception handling +Token extraction recipe: +1. Authorize the Power BI platform OAuth app for a sandbox/test tenant. +2. Request the scopes configured in powerbi/config.json, including workspace, + dataset, report, dashboard, and query permissions. +3. Copy the resulting short-lived OAuth access token to POWERBI_ACCESS_TOKEN. +4. Add the value to the project .env file or export it in your shell before + running these tests. -Each test uses mocked API responses to verify action handler behavior -without making actual API calls to Power BI. +Safe read-only run: + pytest powerbi/tests/test_powerbi_integration.py -m "integration and not destructive" """ -import unittest -from unittest.mock import AsyncMock, Mock -from context import powerbi - - -class TestPowerBIIntegration(unittest.TestCase): - """ - Test suite for Power BI integration action handlers. - - This test class validates all Power BI integration actions using mocked - ExecutionContext to simulate API responses without external dependencies. - """ - - def setUp(self): - """ - Set up test fixtures before each test method. - - Creates a mock ExecutionContext with an AsyncMock fetch method - to simulate Power BI REST API calls. - """ - self.mock_context = Mock() - self.mock_context.fetch = AsyncMock() - - # ======================================================================== - # WORKSPACE TESTS - # ======================================================================== - - async def test_list_workspaces_success(self): - """ - Test successful workspace listing without filters. - - Verifies that the ListWorkspacesAction correctly processes and returns - workspace data from the Power BI API. - """ - # Mock successful API response - mock_response = { - "value": [ - { - "id": "workspace1", - "name": "Test Workspace", - "isReadOnly": False, - "isOnDedicatedCapacity": False, - "type": "Workspace", - } - ] - } - self.mock_context.fetch.return_value = mock_response - - # Create action handler - handler = powerbi.ListWorkspacesAction() - - # Test inputs - inputs = {} - - # Execute action - result = await handler.execute(inputs, self.mock_context) - - # Verify result - self.assertTrue(result["result"]) - self.assertEqual(len(result["workspaces"]), 1) - self.assertEqual(result["workspaces"][0]["name"], "Test Workspace") - - # Verify API call - self.mock_context.fetch.assert_called_once() - - async def test_list_workspaces_with_filter(self): - """ - Test workspace listing with OData filter and top parameters. - - Verifies that filter and pagination parameters are correctly passed - to the Power BI API when listing workspaces. - """ - mock_response = {"value": []} - self.mock_context.fetch.return_value = mock_response - - handler = powerbi.ListWorkspacesAction() - inputs = {"filter": "name eq 'Test'", "top": 10} - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - # Verify params were passed - call_args = self.mock_context.fetch.call_args - self.assertIn("params", call_args[1]) - self.assertEqual(call_args[1]["params"]["$filter"], "name eq 'Test'") - self.assertEqual(call_args[1]["params"]["$top"], 10) - - async def test_get_workspace_success(self): - """ - Test retrieval of a single workspace by ID. - - Verifies that workspace details are correctly fetched and returned. - """ - mock_response = {"id": "workspace1", "name": "Test Workspace", "isReadOnly": False} - self.mock_context.fetch.return_value = mock_response - - handler = powerbi.GetWorkspaceAction() - inputs = {"workspace_id": "workspace1"} - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - self.assertEqual(result["workspace"]["name"], "Test Workspace") - - # ======================================================================== - # DATASET TESTS - # ======================================================================== - - async def test_list_datasets_success(self): - """ - Test successful dataset listing within a workspace. - - Verifies that dataset information including refresh capabilities - and configuration is correctly returned. - """ - mock_response = { - "value": [ - { - "id": "dataset1", - "name": "Sales Dataset", - "configuredBy": "user@example.com", - "isRefreshable": True, - "isEffectiveIdentityRequired": False, - "isEffectiveIdentityRolesRequired": False, - "isOnPremGatewayRequired": False, - } - ] - } - self.mock_context.fetch.return_value = mock_response - - handler = powerbi.ListDatasetsAction() - inputs = {"workspace_id": "workspace1"} - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - self.assertEqual(len(result["datasets"]), 1) - self.assertEqual(result["datasets"][0]["name"], "Sales Dataset") - self.assertTrue(result["datasets"][0]["isRefreshable"]) - - async def test_list_datasets_without_workspace(self): - """ - Test dataset listing without specifying a workspace. - - Verifies that datasets can be listed from the user's scope - without targeting a specific workspace. - """ - mock_response = {"value": []} - self.mock_context.fetch.return_value = mock_response - - handler = powerbi.ListDatasetsAction() - inputs = {} - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - # Verify the URL doesn't include workspace/groups - call_args = self.mock_context.fetch.call_args - self.assertIn("/datasets", call_args[0][0]) - self.assertNotIn("/groups/", call_args[0][0]) - - async def test_refresh_dataset_success(self): - """ - Test triggering a dataset refresh with notification options. - - Verifies that refresh requests are properly formatted and submitted - with the correct notify_option parameter. - """ - self.mock_context.fetch.return_value = None - - handler = powerbi.RefreshDatasetAction() - inputs = {"dataset_id": "dataset1", "workspace_id": "workspace1", "notify_option": "MailOnFailure"} - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - self.assertIn("message", result) - - # Verify API call - call_args = self.mock_context.fetch.call_args - self.assertIn("refreshes", call_args[0][0]) - self.assertEqual(call_args[1]["method"], "POST") - self.assertEqual(call_args[1]["json"]["notifyOption"], "MailOnFailure") - - async def test_refresh_dataset_with_enhanced_parameters(self): - """ - Test dataset refresh with advanced parameters. - - Verifies that enhanced refresh options like refresh type, commit mode, - parallelism, and retry count are correctly passed to the API. - """ - self.mock_context.fetch.return_value = None - - handler = powerbi.RefreshDatasetAction() - inputs = { - "dataset_id": "dataset1", - "type": "Full", - "commit_mode": "Transactional", - "max_parallelism": 4, - "retry_count": 3, +from unittest.mock import AsyncMock + +import aiohttp +import pytest +from autohive_integrations_sdk import FetchResponse, ResultType + +from powerbi import powerbi + +pytestmark = pytest.mark.integration + + +@pytest.fixture +def live_context(env_credentials, make_context): + access_token = env_credentials("POWERBI_ACCESS_TOKEN") + if not access_token: + pytest.skip("POWERBI_ACCESS_TOKEN not set — skipping integration tests") + + async def real_fetch(url, *, method="GET", json=None, headers=None, params=None, **kwargs): + merged_headers = dict(headers or {}) + merged_headers["Authorization"] = f"Bearer {access_token}" + async with aiohttp.ClientSession() as session: + async with session.request( + method, + url, + json=json, + headers=merged_headers, + params=params, + **kwargs, + ) as resp: + try: + data = await resp.json(content_type=None) + except Exception: + data = await resp.text() + return FetchResponse(status=resp.status, headers=dict(resp.headers), data=data) + + ctx = make_context( + auth={ + "auth_type": "PlatformOauth2", + "credentials": {"access_token": access_token}, } + ) + ctx.fetch = AsyncMock(side_effect=real_fetch) + return ctx - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - - # Verify enhanced parameters were passed - call_args = self.mock_context.fetch.call_args - json_data = call_args[1]["json"] - self.assertEqual(json_data["type"], "Full") - self.assertEqual(json_data["commitMode"], "Transactional") - self.assertEqual(json_data["maxParallelism"], 4) - self.assertEqual(json_data["retryCount"], 3) - - async def test_get_refresh_history_success(self): - """ - Test retrieving dataset refresh history. - - Verifies that refresh history including status, timestamps, and request IDs - is correctly fetched and formatted. - """ - mock_response = { - "value": [ - { - "refreshType": "ViaApi", - "startTime": "2024-08-01T10:00:00Z", - "endTime": "2024-08-01T10:05:00Z", - "status": "Completed", - "requestId": "req123", - } - ] - } - self.mock_context.fetch.return_value = mock_response - - handler = powerbi.GetRefreshHistoryAction() - inputs = {"dataset_id": "dataset1", "workspace_id": "workspace1", "top": 5} - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - self.assertEqual(len(result["refreshes"]), 1) - self.assertEqual(result["refreshes"][0]["status"], "Completed") - self.assertEqual(result["refreshes"][0]["refreshType"], "ViaApi") - - # ======================================================================== - # REPORT TESTS - # ======================================================================== - - async def test_list_reports_success(self): - """ - Test successful report listing. - - Verifies that report metadata including web URLs, embed URLs, - and associated dataset IDs are correctly retrieved. - """ - mock_response = { - "value": [ - { - "id": "report1", - "name": "Sales Report", - "webUrl": "https://app.powerbi.com/reports/report1", - "embedUrl": "https://app.powerbi.com/reportEmbed?reportId=report1", - "datasetId": "dataset1", - } - ] - } - self.mock_context.fetch.return_value = mock_response - handler = powerbi.ListReportsAction() - inputs = {"workspace_id": "workspace1"} +async def _first_workspace_id(live_context): + result = await powerbi.execute_action("list_workspaces", {"top": 5}, live_context) + if result.type != ResultType.ACTION: + pytest.skip(f"Unable to list Power BI workspaces: {result.result.message}") - result = await handler.execute(inputs, self.mock_context) + workspaces = result.result.data["workspaces"] + if not workspaces: + pytest.skip("No Power BI workspaces available for workspace-scoped live tests") + return workspaces[0]["id"] - self.assertTrue(result["result"]) - self.assertEqual(len(result["reports"]), 1) - self.assertEqual(result["reports"][0]["name"], "Sales Report") - async def test_get_report_success(self): - """ - Test retrieving a single report by ID. +async def _first_dataset_id(live_context): + result = await powerbi.execute_action("list_datasets", {}, live_context) + if result.type != ResultType.ACTION: + pytest.skip(f"Unable to list Power BI datasets: {result.result.message}") - Verifies that report details including associated dataset ID - are correctly fetched and returned. - """ - mock_response = {"id": "report1", "name": "Sales Report", "datasetId": "dataset1"} - self.mock_context.fetch.return_value = mock_response + datasets = result.result.data["datasets"] + if not datasets: + pytest.skip("No Power BI datasets available for dataset/query live tests") + return datasets[0]["id"] - handler = powerbi.GetReportAction() - inputs = {"report_id": "report1", "workspace_id": "workspace1"} - result = await handler.execute(inputs, self.mock_context) +async def test_list_workspaces_returns_workspaces(live_context): + result = await powerbi.execute_action("list_workspaces", {"top": 5}, live_context) - self.assertTrue(result["result"]) - self.assertEqual(result["report"]["name"], "Sales Report") + assert result.type == ResultType.ACTION + data = result.result.data + assert "workspaces" in data + assert isinstance(data["workspaces"], list) - async def test_clone_report_success(self): - """ - Test cloning a report to another workspace or with a different dataset. - Verifies that report cloning requests include the correct name, - target workspace ID, and target dataset ID parameters. - """ - mock_response = { - "id": "report2", - "name": "Sales Report Copy", - "webUrl": "https://app.powerbi.com/reports/report2", - "embedUrl": "https://app.powerbi.com/reportEmbed?reportId=report2", - } - self.mock_context.fetch.return_value = mock_response - - handler = powerbi.CloneReportAction() - inputs = { - "report_id": "report1", - "name": "Sales Report Copy", - "workspace_id": "workspace1", - "target_workspace_id": "workspace2", - } +async def test_list_datasets_returns_datasets(live_context): + result = await powerbi.execute_action("list_datasets", {}, live_context) - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - self.assertEqual(result["name"], "Sales Report Copy") - self.assertIn("webUrl", result) - - # Verify API call - call_args = self.mock_context.fetch.call_args - self.assertIn("Clone", call_args[0][0]) - self.assertEqual(call_args[1]["method"], "POST") - self.assertEqual(call_args[1]["json"]["name"], "Sales Report Copy") - - async def test_export_report_success(self): - """ - Test initiating a report export to PDF or other formats. - - Verifies that export requests are properly formatted with the - desired format and return an export ID for status tracking. - """ - mock_response = {"id": "export123"} - self.mock_context.fetch.return_value = mock_response - - handler = powerbi.ExportReportAction() - inputs = {"report_id": "report1", "workspace_id": "workspace1", "format": "PDF"} - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - self.assertEqual(result["export_id"], "export123") - - # Verify API call - call_args = self.mock_context.fetch.call_args - self.assertEqual(call_args[1]["json"]["format"], "PDF") - - async def test_get_export_status_success(self): - """ - Test checking the status of an ongoing report export. - - Verifies that export status including completion percentage - and current state are correctly retrieved. - """ - mock_response = {"status": "Succeeded", "percentComplete": 100} - self.mock_context.fetch.return_value = mock_response - - handler = powerbi.GetExportStatusAction() - inputs = {"report_id": "report1", "export_id": "export123", "workspace_id": "workspace1"} - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - self.assertEqual(result["status"], "Succeeded") - self.assertEqual(result["percentComplete"], 100) - - # ======================================================================== - # DASHBOARD TESTS - # ======================================================================== - - async def test_list_dashboards_success(self): - """ - Test successful dashboard listing. - - Verifies that dashboard metadata including display names, - read-only status, and embed URLs are correctly retrieved. - """ - mock_response = { - "value": [ - { - "id": "dashboard1", - "displayName": "Sales Dashboard", - "isReadOnly": False, - "embedUrl": "https://app.powerbi.com/dashboardEmbed?dashboardId=dashboard1", - } - ] - } - self.mock_context.fetch.return_value = mock_response - - handler = powerbi.ListDashboardsAction() - inputs = {"workspace_id": "workspace1"} - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - self.assertEqual(len(result["dashboards"]), 1) - self.assertEqual(result["dashboards"][0]["displayName"], "Sales Dashboard") - - async def test_get_dashboard_tiles_success(self): - """ - Test retrieving tiles from a dashboard. - - Verifies that dashboard tile information including embed URLs, - dataset IDs, and report IDs are correctly fetched. - """ - mock_response = { - "value": [ - { - "id": "tile1", - "title": "Revenue Chart", - "embedUrl": "https://app.powerbi.com/tileEmbed?tileId=tile1", - "datasetId": "dataset1", - "reportId": "report1", - } - ] - } - self.mock_context.fetch.return_value = mock_response + assert result.type == ResultType.ACTION + data = result.result.data + assert "datasets" in data + assert isinstance(data["datasets"], list) - handler = powerbi.GetDashboardTilesAction() - inputs = {"dashboard_id": "dashboard1", "workspace_id": "workspace1"} - result = await handler.execute(inputs, self.mock_context) +async def test_list_reports_returns_reports(live_context): + result = await powerbi.execute_action("list_reports", {}, live_context) - self.assertTrue(result["result"]) - self.assertEqual(len(result["tiles"]), 1) - self.assertEqual(result["tiles"][0]["title"], "Revenue Chart") - - # ======================================================================== - # QUERY EXECUTION TESTS - # ======================================================================== - - async def test_execute_queries_success(self): - """ - Test executing DAX/MDX queries on a dataset. - - Verifies that query requests are correctly formatted and query results - are properly returned from the Power BI dataset. - """ - mock_response = {"results": [{"tables": [{"rows": [{"Column1": "Value1", "Column2": 100}]}]}]} - self.mock_context.fetch.return_value = mock_response - - handler = powerbi.ExecuteQueriesAction() - inputs = { - "dataset_id": "dataset1", - "workspace_id": "workspace1", - "queries": [{"query": "EVALUATE VALUES('Table'[Column])"}], - } - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - self.assertEqual(len(result["results"]), 1) - - # Verify API call - call_args = self.mock_context.fetch.call_args - self.assertIn("executeQueries", call_args[0][0]) - self.assertEqual(call_args[1]["method"], "POST") - - # ======================================================================== - # ADVANCED REPORT TESTS - # ======================================================================== - - async def test_refresh_report_success(self): - """ - Test refreshing a report by refreshing its underlying dataset. - - Verifies that the action correctly fetches the report's dataset ID - and triggers a refresh operation on that dataset. - """ - # Mock report response - mock_report_response = {"id": "report1", "name": "Sales Report", "datasetId": "dataset1"} - - # Setup mock to return different values for different calls - self.mock_context.fetch.side_effect = [ - mock_report_response, # First call gets the report - None, # Second call triggers the refresh - ] - - handler = powerbi.RefreshReportAction() - inputs = {"report_id": "report1", "workspace_id": "workspace1", "notify_option": "MailOnFailure"} - - result = await handler.execute(inputs, self.mock_context) - - self.assertTrue(result["result"]) - self.assertEqual(result["dataset_id"], "dataset1") - self.assertIn("message", result) - - # Verify both API calls were made - self.assertEqual(self.mock_context.fetch.call_count, 2) - - async def test_refresh_report_no_dataset(self): - """ - Test error handling when a report has no associated dataset. - - Verifies that the action properly handles and returns an error - when attempting to refresh a report without a linked dataset. - """ - # Mock report response without dataset - mock_report_response = {"id": "report1", "name": "Sales Report"} - self.mock_context.fetch.return_value = mock_report_response - - handler = powerbi.RefreshReportAction() - inputs = {"report_id": "report1", "workspace_id": "workspace1"} - - result = await handler.execute(inputs, self.mock_context) - - self.assertFalse(result["result"]) - self.assertIn("error", result) - self.assertIn("dataset", result["error"].lower()) - - async def test_get_report_datasources_success(self): - """ - Test retrieving datasources connected to a report. - - Verifies that datasource information including type, connection strings, - gateway IDs, and connection details are correctly retrieved. - """ - mock_response = { - "value": [ - { - "datasourceType": "Sql", - "datasourceId": "datasource1", - "gatewayId": "gateway1", - "name": "SQL Server", - "connectionString": "Server=localhost;Database=Sales", - "connectionDetails": {"server": "localhost", "database": "Sales"}, - } - ] - } - self.mock_context.fetch.return_value = mock_response + assert result.type == ResultType.ACTION + data = result.result.data + assert "reports" in data + assert isinstance(data["reports"], list) - handler = powerbi.GetReportDatasourcesAction() - inputs = {"report_id": "report1", "workspace_id": "workspace1"} - result = await handler.execute(inputs, self.mock_context) +async def test_list_dashboards_returns_dashboards(live_context): + result = await powerbi.execute_action("list_dashboards", {}, live_context) - self.assertTrue(result["result"]) - self.assertEqual(len(result["datasources"]), 1) - self.assertEqual(result["datasources"][0]["datasourceType"], "Sql") - self.assertIn("connectionDetails", result["datasources"][0]) + assert result.type == ResultType.ACTION + data = result.result.data + assert "dashboards" in data + assert isinstance(data["dashboards"], list) - # ======================================================================== - # ERROR HANDLING TESTS - # ======================================================================== - async def test_error_handling(self): - """ - Test general error handling across all actions. +async def test_execute_queries_returns_results(live_context): + dataset_id = await _first_dataset_id(live_context) - Verifies that exceptions from the Power BI API are properly caught, - formatted, and returned with appropriate error messages. - """ - # Mock API error - self.mock_context.fetch.side_effect = Exception("API Error") + result = await powerbi.execute_action( + "execute_queries", + {"dataset_id": dataset_id, "queries": [{"query": 'EVALUATE ROW("AutohiveIntegrationTest", 1)'}]}, + live_context, + ) - handler = powerbi.ListWorkspacesAction() - inputs = {} + assert result.type == ResultType.ACTION + data = result.result.data + assert "results" in data + assert isinstance(data["results"], list) - result = await handler.execute(inputs, self.mock_context) - self.assertFalse(result["result"]) - self.assertIn("error", result) - self.assertEqual(result["error"], "API Error") +async def test_get_workspace_returns_workspace_shape(live_context): + workspace_id = await _first_workspace_id(live_context) + result = await powerbi.execute_action("get_workspace", {"workspace_id": workspace_id}, live_context) -if __name__ == "__main__": - unittest.main() + assert result.type == ResultType.ACTION + data = result.result.data + assert "workspace" in data + assert data["workspace"]["id"] == workspace_id diff --git a/powerbi/tests/test_powerbi_queries_unit.py b/powerbi/tests/test_powerbi_queries_unit.py new file mode 100644 index 00000000..ee8d0e5f --- /dev/null +++ b/powerbi/tests/test_powerbi_queries_unit.py @@ -0,0 +1,127 @@ +import os +import sys +import importlib.util + +_parent = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +_deps = os.path.abspath(os.path.join(os.path.dirname(__file__), "../dependencies")) +sys.path.insert(0, _parent) +sys.path.insert(0, _deps) + +import pytest # noqa: E402 +from unittest.mock import AsyncMock, MagicMock # noqa: E402 +from autohive_integrations_sdk import FetchResponse # noqa: E402 +from autohive_integrations_sdk.integration import ResultType # noqa: E402 + +_spec = importlib.util.spec_from_file_location("powerbi_mod", os.path.join(_parent, "powerbi.py")) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) + +powerbi = _mod.powerbi + +pytestmark = pytest.mark.unit + +POWERBI_API_BASE = "https://api.powerbi.com/v1.0/myorg" + + +@pytest.fixture +def mock_context(): + ctx = MagicMock(name="ExecutionContext") + ctx.fetch = AsyncMock(name="fetch") + ctx.auth = { + "auth_type": "PlatformOauth2", + "credentials": {"access_token": "test_token"}, # nosec B105 + } + return ctx + + +# ---- Queries ---- + + +class TestExecuteQueries: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, + headers={}, + data={"results": [{"tables": [{"rows": [{"Revenue": 1000}]}]}]}, + ) + + result = await powerbi.execute_action( + "execute_queries", + {"dataset_id": "ds-1", "queries": [{"query": "EVALUATE VALUES(Sales)"}]}, + mock_context, + ) + + assert len(result.result.data["results"]) == 1 + + @pytest.mark.asyncio + async def test_request_method_is_post(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"results": []}) + + await powerbi.execute_action( + "execute_queries", + {"dataset_id": "ds-1", "queries": [{"query": "EVALUATE VALUES(Sales)"}]}, + mock_context, + ) + + assert mock_context.fetch.call_args.kwargs["method"] == "POST" + + @pytest.mark.asyncio + async def test_request_payload(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"results": []}) + + queries = [{"query": "EVALUATE VALUES(Sales)"}] + await powerbi.execute_action("execute_queries", {"dataset_id": "ds-1", "queries": queries}, mock_context) + + payload = mock_context.fetch.call_args.kwargs["json"] + assert payload["queries"] == queries + assert payload["serializerSettings"]["includeNulls"] is True + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"results": []}) + + await powerbi.execute_action( + "execute_queries", + {"dataset_id": "ds-1", "workspace_id": "ws-1", "queries": [{"query": "EVALUATE VALUES(Sales)"}]}, + mock_context, + ) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/datasets/ds-1/executeQueries" + + @pytest.mark.asyncio + async def test_url_without_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"results": []}) + + await powerbi.execute_action( + "execute_queries", + {"dataset_id": "ds-1", "queries": [{"query": "EVALUATE VALUES(Sales)"}]}, + mock_context, + ) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/datasets/ds-1/executeQueries" + + @pytest.mark.asyncio + async def test_empty_results(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"results": []}) + + result = await powerbi.execute_action( + "execute_queries", + {"dataset_id": "ds-1", "queries": [{"query": "EVALUATE VALUES(Sales)"}]}, + mock_context, + ) + + assert result.result.data["results"] == [] + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Query failed") + + result = await powerbi.execute_action( + "execute_queries", + {"dataset_id": "ds-1", "queries": [{"query": "EVALUATE VALUES(Sales)"}]}, + mock_context, + ) + + assert result.type == ResultType.ACTION_ERROR + assert "Query failed" in result.result.message diff --git a/powerbi/tests/test_powerbi_reports_unit.py b/powerbi/tests/test_powerbi_reports_unit.py new file mode 100644 index 00000000..7fe14db5 --- /dev/null +++ b/powerbi/tests/test_powerbi_reports_unit.py @@ -0,0 +1,342 @@ +import os +import sys +import importlib.util + +_parent = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +_deps = os.path.abspath(os.path.join(os.path.dirname(__file__), "../dependencies")) +sys.path.insert(0, _parent) +sys.path.insert(0, _deps) + +import pytest # noqa: E402 +from unittest.mock import AsyncMock, MagicMock # noqa: E402 +from autohive_integrations_sdk import FetchResponse # noqa: E402 +from autohive_integrations_sdk.integration import ResultType # noqa: E402 + +_spec = importlib.util.spec_from_file_location("powerbi_mod", os.path.join(_parent, "powerbi.py")) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) + +powerbi = _mod.powerbi + +pytestmark = pytest.mark.unit + +POWERBI_API_BASE = "https://api.powerbi.com/v1.0/myorg" + + +@pytest.fixture +def mock_context(): + ctx = MagicMock(name="ExecutionContext") + ctx.fetch = AsyncMock(name="fetch") + ctx.auth = { + "auth_type": "PlatformOauth2", + "credentials": {"access_token": "test_token"}, # nosec B105 + } + return ctx + + +# ---- Reports ---- + + +class TestListReports: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, + headers={}, + data={"value": [{"id": "rpt-1", "name": "Sales Report", "datasetId": "ds-1"}]}, + ) + + result = await powerbi.execute_action("list_reports", {}, mock_context) + + assert result.result.data["reports"][0]["id"] == "rpt-1" + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("list_reports", {"workspace_id": "ws-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/reports" + + @pytest.mark.asyncio + async def test_url_without_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("list_reports", {}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/reports" + + @pytest.mark.asyncio + async def test_empty_list(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + result = await powerbi.execute_action("list_reports", {}, mock_context) + + assert result.result.data["reports"] == [] + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Connection error") + + result = await powerbi.execute_action("list_reports", {}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestGetReport: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + rpt = {"id": "rpt-1", "name": "Sales Report", "datasetId": "ds-1"} + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data=rpt) + + result = await powerbi.execute_action("get_report", {"report_id": "rpt-1"}, mock_context) + + assert result.result.data["report"]["id"] == "rpt-1" + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={}) + + await powerbi.execute_action("get_report", {"report_id": "rpt-1", "workspace_id": "ws-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/reports/rpt-1" + + @pytest.mark.asyncio + async def test_url_without_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={}) + + await powerbi.execute_action("get_report", {"report_id": "rpt-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/reports/rpt-1" + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Not found") + + result = await powerbi.execute_action("get_report", {"report_id": "rpt-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestGetReportDatasources: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, + headers={}, + data={"value": [{"datasourceType": "Sql", "datasourceId": "dsc-1", "name": "DB Source"}]}, + ) + + result = await powerbi.execute_action("get_report_datasources", {"report_id": "rpt-1"}, mock_context) + + assert result.result.data["datasources"][0]["datasourceType"] == "Sql" + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action( + "get_report_datasources", {"report_id": "rpt-1", "workspace_id": "ws-1"}, mock_context + ) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/reports/rpt-1/datasources" + + @pytest.mark.asyncio + async def test_connection_details_included(self, mock_context): + conn_details = {"server": "myserver", "database": "mydb"} + mock_context.fetch.return_value = FetchResponse( + status=200, + headers={}, + data={"value": [{"datasourceType": "Sql", "connectionDetails": conn_details}]}, + ) + + result = await powerbi.execute_action("get_report_datasources", {"report_id": "rpt-1"}, mock_context) + + assert result.result.data["datasources"][0]["connectionDetails"]["server"] == "myserver" + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("API error") + + result = await powerbi.execute_action("get_report_datasources", {"report_id": "rpt-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestRefreshReport: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.side_effect = [ + FetchResponse(status=200, headers={}, data={"id": "rpt-1", "name": "Sales", "datasetId": "ds-1"}), + FetchResponse(status=202, headers={}, data=None), + ] + + result = await powerbi.execute_action("refresh_report", {"report_id": "rpt-1"}, mock_context) + + assert result.result.data["dataset_id"] == "ds-1" + assert "Sales" in result.result.data["message"] + + @pytest.mark.asyncio + async def test_missing_dataset_returns_error(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"id": "rpt-1", "name": "Report"}) + + result = await powerbi.execute_action("refresh_report", {"report_id": "rpt-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + assert "associated dataset" in result.result.message + + @pytest.mark.asyncio + async def test_makes_two_requests(self, mock_context): + mock_context.fetch.side_effect = [ + FetchResponse(status=200, headers={}, data={"id": "rpt-1", "name": "Sales", "datasetId": "ds-1"}), + FetchResponse(status=202, headers={}, data=None), + ] + + await powerbi.execute_action("refresh_report", {"report_id": "rpt-1"}, mock_context) + + assert mock_context.fetch.call_count == 2 + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Network error") + + result = await powerbi.execute_action("refresh_report", {"report_id": "rpt-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestCloneReport: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, + headers={}, + data={ + "id": "rpt-2", + "name": "Sales Copy", + "webUrl": "https://app.powerbi.com/rpt-2", + "embedUrl": "https://embed/rpt-2", + }, + ) + + result = await powerbi.execute_action( + "clone_report", {"report_id": "rpt-1", "name": "Sales Copy"}, mock_context + ) + + assert result.result.data["id"] == "rpt-2" + assert result.result.data["name"] == "Sales Copy" + + @pytest.mark.asyncio + async def test_request_method_is_post(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"id": "rpt-2"}) + + await powerbi.execute_action("clone_report", {"report_id": "rpt-1", "name": "Clone"}, mock_context) + + assert mock_context.fetch.call_args.kwargs["method"] == "POST" + + @pytest.mark.asyncio + async def test_clone_request_payload(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"id": "rpt-2"}) + + await powerbi.execute_action( + "clone_report", + {"report_id": "rpt-1", "name": "Clone", "target_workspace_id": "ws-2", "target_dataset_id": "ds-2"}, + mock_context, + ) + + payload = mock_context.fetch.call_args.kwargs["json"] + assert payload["name"] == "Clone" + assert payload["targetWorkspaceId"] == "ws-2" + assert payload["targetModelId"] == "ds-2" + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Forbidden") + + result = await powerbi.execute_action("clone_report", {"report_id": "rpt-1", "name": "Clone"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestExportReport: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=202, headers={}, data={"id": "exp-1"}) + + result = await powerbi.execute_action("export_report", {"report_id": "rpt-1"}, mock_context) + + assert result.result.data["export_id"] == "exp-1" + assert result.result.data["message"] == "Export initiated successfully" + + @pytest.mark.asyncio + async def test_default_format_is_pdf(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=202, headers={}, data={"id": "exp-1"}) + + await powerbi.execute_action("export_report", {"report_id": "rpt-1"}, mock_context) + + payload = mock_context.fetch.call_args.kwargs["json"] + assert payload["format"] == "PDF" + + @pytest.mark.asyncio + async def test_custom_format(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=202, headers={}, data={"id": "exp-1"}) + + await powerbi.execute_action("export_report", {"report_id": "rpt-1", "format": "PPTX"}, mock_context) + + payload = mock_context.fetch.call_args.kwargs["json"] + assert payload["format"] == "PPTX" + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Service unavailable") + + result = await powerbi.execute_action("export_report", {"report_id": "rpt-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + + +class TestGetExportStatus: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, headers={}, data={"status": "Running", "percentComplete": 45} + ) + + result = await powerbi.execute_action( + "get_export_status", {"report_id": "rpt-1", "export_id": "exp-1"}, mock_context + ) + + assert result.result.data["status"] == "Running" + assert result.result.data["percentComplete"] == 45 + + @pytest.mark.asyncio + async def test_url_with_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, headers={}, data={"status": "Succeeded", "percentComplete": 100} + ) + + await powerbi.execute_action( + "get_export_status", {"report_id": "rpt-1", "export_id": "exp-1", "workspace_id": "ws-1"}, mock_context + ) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-1/reports/rpt-1/exports/exp-1" + + @pytest.mark.asyncio + async def test_url_without_workspace(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, headers={}, data={"status": "Succeeded", "percentComplete": 100} + ) + + await powerbi.execute_action("get_export_status", {"report_id": "rpt-1", "export_id": "exp-1"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/reports/rpt-1/exports/exp-1" + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Not found") + + result = await powerbi.execute_action( + "get_export_status", {"report_id": "rpt-1", "export_id": "exp-1"}, mock_context + ) + + assert result.type == ResultType.ACTION_ERROR diff --git a/powerbi/tests/test_powerbi_workspaces_unit.py b/powerbi/tests/test_powerbi_workspaces_unit.py new file mode 100644 index 00000000..c73b1643 --- /dev/null +++ b/powerbi/tests/test_powerbi_workspaces_unit.py @@ -0,0 +1,144 @@ +import os +import sys +import importlib.util + +_parent = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +_deps = os.path.abspath(os.path.join(os.path.dirname(__file__), "../dependencies")) +sys.path.insert(0, _parent) +sys.path.insert(0, _deps) + +import pytest # noqa: E402 +from unittest.mock import AsyncMock, MagicMock # noqa: E402 +from autohive_integrations_sdk import FetchResponse # noqa: E402 +from autohive_integrations_sdk.integration import ResultType # noqa: E402 + +_spec = importlib.util.spec_from_file_location("powerbi_mod", os.path.join(_parent, "powerbi.py")) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) + +powerbi = _mod.powerbi + +pytestmark = pytest.mark.unit + +POWERBI_API_BASE = "https://api.powerbi.com/v1.0/myorg" + + +@pytest.fixture +def mock_context(): + ctx = MagicMock(name="ExecutionContext") + ctx.fetch = AsyncMock(name="fetch") + ctx.auth = { + "auth_type": "PlatformOauth2", + "credentials": {"access_token": "test_token"}, # nosec B105 + } + return ctx + + +# ---- Workspaces ---- + + +class TestListWorkspaces: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + mock_context.fetch.return_value = FetchResponse( + status=200, + headers={}, + data={ + "value": [ + { + "id": "ws-1", + "name": "My Workspace", + "isReadOnly": False, + "isOnDedicatedCapacity": False, + "type": "Workspace", + } + ] + }, + ) + + result = await powerbi.execute_action("list_workspaces", {}, mock_context) + + assert result.result.data["workspaces"][0]["id"] == "ws-1" + assert result.result.data["workspaces"][0]["name"] == "My Workspace" + + @pytest.mark.asyncio + async def test_request_url(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("list_workspaces", {}, mock_context) + + call_args = mock_context.fetch.call_args + assert call_args.args[0] == f"{POWERBI_API_BASE}/groups" + + @pytest.mark.asyncio + async def test_filter_param_passed(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("list_workspaces", {"filter": "type eq 'Workspace'"}, mock_context) + + call_args = mock_context.fetch.call_args + assert call_args.kwargs["params"]["$filter"] == "type eq 'Workspace'" + + @pytest.mark.asyncio + async def test_top_param_passed(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + await powerbi.execute_action("list_workspaces", {"top": 50}, mock_context) + + call_args = mock_context.fetch.call_args + assert call_args.kwargs["params"]["$top"] == 50 + + @pytest.mark.asyncio + async def test_empty_list(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={"value": []}) + + result = await powerbi.execute_action("list_workspaces", {}, mock_context) + + assert result.result.data["workspaces"] == [] + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Connection refused") + + result = await powerbi.execute_action("list_workspaces", {}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + assert "Connection refused" in result.result.message + + +class TestGetWorkspace: + @pytest.mark.asyncio + async def test_happy_path(self, mock_context): + ws = {"id": "ws-1", "name": "Sales", "isReadOnly": False} + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data=ws) + + result = await powerbi.execute_action("get_workspace", {"workspace_id": "ws-1"}, mock_context) + + assert result.result.data["workspace"]["id"] == "ws-1" + + @pytest.mark.asyncio + async def test_request_url(self, mock_context): + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data={}) + + await powerbi.execute_action("get_workspace", {"workspace_id": "ws-abc"}, mock_context) + + assert mock_context.fetch.call_args.args[0] == f"{POWERBI_API_BASE}/groups/ws-abc" + + @pytest.mark.asyncio + async def test_response_shape(self, mock_context): + ws = {"id": "ws-1", "name": "Sales", "isReadOnly": True, "type": "Workspace"} + mock_context.fetch.return_value = FetchResponse(status=200, headers={}, data=ws) + + result = await powerbi.execute_action("get_workspace", {"workspace_id": "ws-1"}, mock_context) + + assert "workspace" in result.result.data + assert result.result.data["workspace"]["name"] == "Sales" + + @pytest.mark.asyncio + async def test_exception_returns_action_error(self, mock_context): + mock_context.fetch.side_effect = Exception("Not found") + + result = await powerbi.execute_action("get_workspace", {"workspace_id": "ws-1"}, mock_context) + + assert result.type == ResultType.ACTION_ERROR + assert "Not found" in result.result.message