Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 87 additions & 69 deletions src/opower/utilities/scl.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,78 @@ def timezone() -> str:
"""Return the timezone."""
return "America/Los_Angeles"

async def _async_sso_login(
self,
session: aiohttp.ClientSession,
username: str,
password: str,
hidden_inputs: dict[str, str],
) -> tuple[str, dict[str, str]]:
"""Perform SSO login flow and return SAML response action URL and hidden inputs."""
if set(hidden_inputs.keys()) != {"signature", "state", "loginCtx"}:
raise InvalidAuth("Unexpected SSO login form fields")

# POST to https://login.seattle.gov/#/login?appName=EPORTAL_PROD with signature, state, loginCtx
# need to parse signinAT, initialState from html sessionStorage.setItem
async with session.post(
"https://login.seattle.gov/#/login?appName=EPORTAL_PROD",
data=hidden_inputs,
headers={"User-Agent": USER_AGENT},
raise_for_status=True,
) as resp:
login_result = await resp.text()
session_items = _get_session_storage_values(login_result)
if not {"initialState", "signinAT"}.issubset(set(session_items.keys())):
raise InvalidAuth("Missing session storage values from login page")

# POST to https://login.seattle.gov/authenticate with credentials, initialState, signinAT
# response has authnToken in JSON response if initialState and signinAT present
async with session.post(
"https://login.seattle.gov/authenticate",
json={
"credentials": {"username": username, "password": password},
"initialState": json.loads(session_items.get("initialState", "{}")),
"signinAT": session_items.get("signinAT"),
},
headers={"User-Agent": USER_AGENT},
raise_for_status=False,
) as resp:
if resp.status == 400:
raise InvalidAuth("Username and password failed")
authenticate_result = await resp.json()
if "error_description" in authenticate_result:
raise InvalidAuth(authenticate_result["error_description"])
authnToken = authenticate_result.get("authnToken")
if not authnToken:
raise InvalidAuth("Authentication failed: no authnToken received")

# POST to https://idcs-3359adb31e35415e8c1729c5c8098c6d.identity.oraclecloud.com/sso/v1/sdk/session with authnToken
# response has OCIS_REQ in HTML form
async with session.post(
"https://idcs-3359adb31e35415e8c1729c5c8098c6d.identity.oraclecloud.com/sso/v1/sdk/session",
data={"authnToken": authnToken},
headers={"User-Agent": USER_AGENT},
raise_for_status=True,
) as resp:
session_result = await resp.text()
action_url, hidden_inputs = get_form_action_url_and_hidden_inputs(session_result)
if action_url != "https://idcs-3359adb31e35415e8c1729c5c8098c6d.identity.oraclecloud.com/fed/v1/user/response/login":
raise InvalidAuth("Unexpected Oracle IDCS session URL")
if set(hidden_inputs.keys()) != {"OCIS_REQ"}:
raise InvalidAuth("Unexpected Oracle IDCS session form fields")

# POST to https://idcs-3359adb31e35415e8c1729c5c8098c6d.identity.oraclecloud.com/fed/v1/user/response/login
# with OCIS_REQ (form data)
# response has SAMLResponse in HTML form
async with session.post(
action_url,
data=hidden_inputs,
headers={"User-Agent": USER_AGENT},
raise_for_status=True,
) as resp:
idcs_login_result = await resp.text()
return get_form_action_url_and_hidden_inputs(idcs_login_result)

async def async_login(
self,
session: aiohttp.ClientSession,
Expand All @@ -59,70 +131,12 @@ async def async_login(
action_url, hidden_inputs = get_form_action_url_and_hidden_inputs(ssologin_result)
if action_url == "https://login.seattle.gov/#/login?appName=EPORTAL_PROD":
# Not logged in to seattle.gov, go through SSO flow
assert set(hidden_inputs.keys()) == {"signature", "state", "loginCtx"}

# POST to https://login.seattle.gov/#/login?appName=EPORTAL_PROD with signature, state, loginCtx
# need to parse signinAT, initialState from html sessionStorage.setItem
async with session.post(
action_url,
data=hidden_inputs,
headers={"User-Agent": USER_AGENT},
raise_for_status=True,
) as resp:
login_result = await resp.text()
session_items = _get_session_storage_values(login_result)
assert {"initialState", "signinAT"}.issubset(set(session_items.keys()))

# POST to https://login.seattle.gov/authenticate with credentials, initialState, signinAT?
# response has authnToken in JSON response if initialState and signinAT present
async with session.post(
"https://login.seattle.gov/authenticate",
json={
"credentials": {"username": username, "password": password},
"initialState": json.loads(session_items.get("initialState", "{}")),
"signinAT": session_items.get("signinAT"),
},
headers={"User-Agent": USER_AGENT},
raise_for_status=False,
) as resp:
if resp.status == 400:
raise InvalidAuth("Username and password failed")
authenticate_result = await resp.json()
if "error_description" in authenticate_result:
raise InvalidAuth(authenticate_result["error_description"])
assert authenticate_result["authnToken"]
authnToken = authenticate_result["authnToken"]

# POST to https://idcs-3359adb31e35415e8c1729c5c8098c6d.identity.oraclecloud.com/sso/v1/sdk/session with authnToken
# response has OCIS_REQ in HTML form
async with session.post(
"https://idcs-3359adb31e35415e8c1729c5c8098c6d.identity.oraclecloud.com/sso/v1/sdk/session",
data={"authnToken": authnToken},
headers={"User-Agent": USER_AGENT},
raise_for_status=True,
) as resp:
session_result = await resp.text()
action_url, hidden_inputs = get_form_action_url_and_hidden_inputs(session_result)
assert (
action_url
== "https://idcs-3359adb31e35415e8c1729c5c8098c6d.identity.oraclecloud.com/fed/v1/user/response/login"
)
assert set(hidden_inputs.keys()) == {"OCIS_REQ"}

# POST to https://idcs-3359adb31e35415e8c1729c5c8098c6d.identity.oraclecloud.com/fed/v1/user/response/login
# with OCIS_REQ (form data)
# response has SAMLResponse in HTML form
async with session.post(
action_url,
data=hidden_inputs,
headers={"User-Agent": USER_AGENT},
raise_for_status=True,
) as resp:
idcs_login_result = await resp.text()
action_url, hidden_inputs = get_form_action_url_and_hidden_inputs(idcs_login_result)

assert action_url == "https://myutilities.seattle.gov/rest/auth/samlresp"
assert set(hidden_inputs.keys()) == {"RelayState", "SAMLResponse"}
action_url, hidden_inputs = await self._async_sso_login(session, username, password, hidden_inputs)

if action_url != "https://myutilities.seattle.gov/rest/auth/samlresp":
raise InvalidAuth("Unexpected SAML response URL")
if set(hidden_inputs.keys()) != {"RelayState", "SAMLResponse"}:
raise InvalidAuth("Unexpected SAML response form fields")

# POST to https://myutilities.seattle.gov/rest/auth/samlresp w/ RelayState https://myutilities.seattle.gov/eportal
# and SAMLResponse
Expand All @@ -136,7 +150,8 @@ async def async_login(
) as resp:
url = resp.real_url.human_repr()
user_token = _get_user_token_from_url(url)
assert user_token
if not user_token:
raise InvalidAuth("Failed to extract user token from redirect URL")

# getSSOToken (/auth/token)
async with session.post(
Expand All @@ -150,8 +165,9 @@ async def async_login(
raise_for_status=True,
) as resp:
auth_token_result = await resp.json()
assert auth_token_result["access_token"]
access_token = auth_token_result["access_token"]
access_token = auth_token_result.get("access_token")
if not access_token:
raise InvalidAuth("Failed to retrieve access token")
customer_id = auth_token_result["user"]["customerId"]

# List SCL accounts, required to fetch opower token
Expand Down Expand Up @@ -199,6 +215,8 @@ async def async_login(
raise_for_status=True,
) as resp:
result = await resp.json()
assert result["token"]
token = result.get("token")
if not token:
raise InvalidAuth("Failed to retrieve Opower token")

return str(result["token"])
return str(token)
5 changes: 5 additions & 0 deletions tests/opower/utilities/scl/idcs_session_response.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<html><body>
<form action="https://idcs-3359adb31e35415e8c1729c5c8098c6d.identity.oraclecloud.com/fed/v1/user/response/login">
<input type="hidden" name="OCIS_REQ" value="test_ocis_req_value">
</form>
</body></html>
7 changes: 7 additions & 0 deletions tests/opower/utilities/scl/login_page.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<html><head>
<script>
sessionStorage.setItem("initialState", '{"requestState":"abc123"}');
sessionStorage.setItem("signinAT", 'test_signin_at_token');
sessionStorage.setItem("otherItem", 'ignored_value');
</script>
</head><body></body></html>
6 changes: 6 additions & 0 deletions tests/opower/utilities/scl/saml_response.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<html><body>
<form action="https://myutilities.seattle.gov/rest/auth/samlresp">
<input type="hidden" name="RelayState" value="https://myutilities.seattle.gov/eportal">
<input type="hidden" name="SAMLResponse" value="test_saml_response_base64_value">
</form>
</body></html>
7 changes: 7 additions & 0 deletions tests/opower/utilities/scl/ssologin_response.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<html><body>
<form action="https://login.seattle.gov/#/login?appName=EPORTAL_PROD">
<input type="hidden" name="signature" value="test_signature_value">
<input type="hidden" name="state" value="test_state_value">
<input type="hidden" name="loginCtx" value="test_loginCtx_value">
</form>
</body></html>
148 changes: 148 additions & 0 deletions tests/opower/utilities/test_scl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Tests for Seattle City Light (SCL)."""

import os
import unittest

import aiohttp
from dotenv import load_dotenv
from yarl import URL

from opower.utilities.helpers import get_form_action_url_and_hidden_inputs
from opower.utilities.scl import SCL, _get_session_storage_values, _get_user_token_from_url

SSOLOGIN_HTML_FILENAME = os.path.join(os.path.dirname(__file__), "scl/ssologin_response.html")
LOGIN_PAGE_HTML_FILENAME = os.path.join(os.path.dirname(__file__), "scl/login_page.html")
IDCS_SESSION_HTML_FILENAME = os.path.join(os.path.dirname(__file__), "scl/idcs_session_response.html")
SAML_RESPONSE_HTML_FILENAME = os.path.join(os.path.dirname(__file__), "scl/saml_response.html")
ENV_SECRET_PATH = os.path.join(os.path.dirname(__file__), "../../../.env.secret")


class TestSCL(unittest.IsolatedAsyncioTestCase):
"""Test public methods inherited from UtilityBase."""

def test_name(self) -> None:
"""Test name."""
scl = SCL()
self.assertEqual("Seattle City Light (SCL)", scl.name())

def test_subdomain(self) -> None:
"""Test subdomain."""
scl = SCL()
self.assertEqual("scl", scl.subdomain())

def test_timezone(self) -> None:
"""Test timezone."""
scl = SCL()
self.assertEqual("America/Los_Angeles", scl.timezone())

async def test_real_login(self) -> None:
"""Perform a live test against SCL and Opower websites."""
load_dotenv(dotenv_path=ENV_SECRET_PATH)

username = os.getenv("SCL_USERNAME")
password = os.getenv("SCL_PASSWORD")

if username is None or password is None:
self.skipTest("Add `SCL_USERNAME=` and `SCL_PASSWORD=` to `.env.secret` to run live SCL test.")

scl = SCL()
session = aiohttp.ClientSession()
self.addCleanup(session.close)

access_token = await scl.async_login(session, username, password, {})

self.assertIsNotNone(access_token)
self.assertTrue(len(access_token) > 0)

cookies = session.cookie_jar.filter_cookies(URL("https://scl.opower.com"))
self.assertTrue(len(cookies) > 0, "Expected opower cookies to be set")


class TestSCLSessionStorageParser(unittest.TestCase):
"""Test parsing sessionStorage values from the login page."""

def test_parse_session_storage(self) -> None:
"""Test extracting sessionStorage items from HTML."""
with open(LOGIN_PAGE_HTML_FILENAME) as f:
html = f.read()
items = _get_session_storage_values(html)
self.assertIn("initialState", items)
self.assertIn("signinAT", items)
self.assertEqual(items["signinAT"], "test_signin_at_token")

def test_parse_session_storage_empty(self) -> None:
"""Test parsing HTML with no sessionStorage calls."""
items = _get_session_storage_values("<html><body></body></html>")
self.assertEqual(items, {})

def test_parse_session_storage_multiple(self) -> None:
"""Test that all sessionStorage items are captured."""
with open(LOGIN_PAGE_HTML_FILENAME) as f:
html = f.read()
items = _get_session_storage_values(html)
self.assertEqual(len(items), 3)
self.assertIn("otherItem", items)


class TestSCLUserTokenExtraction(unittest.TestCase):
"""Test extracting user token from redirect URL."""

def test_valid_url(self) -> None:
"""Test extracting token from a valid redirect URL."""
url = "https://myutilities.seattle.gov/eportal/#/ssohome/abc123xyz"
token = _get_user_token_from_url(url)
self.assertEqual(token, "abc123xyz")

def test_invalid_url(self) -> None:
"""Test that an unrelated URL returns empty string."""
token = _get_user_token_from_url("https://example.com/other")
self.assertEqual(token, "")

def test_empty_url(self) -> None:
"""Test that an empty string returns empty string."""
token = _get_user_token_from_url("")
self.assertEqual(token, "")

def test_url_with_long_token(self) -> None:
"""Test extracting a long UUID-style token."""
url = "https://myutilities.seattle.gov/eportal/#/ssohome/550e8400-e29b-41d4-a716-446655440000"
token = _get_user_token_from_url(url)
self.assertEqual(token, "550e8400-e29b-41d4-a716-446655440000")


class TestSCLFormParsing(unittest.TestCase):
"""Test parsing HTML forms from SSO flow responses."""

def test_ssologin_form(self) -> None:
"""Test parsing the SSO login form."""
with open(SSOLOGIN_HTML_FILENAME) as f:
html = f.read()
action_url, hidden_inputs = get_form_action_url_and_hidden_inputs(html)
self.assertEqual(
action_url,
"https://login.seattle.gov/#/login?appName=EPORTAL_PROD",
)
self.assertEqual(set(hidden_inputs.keys()), {"signature", "state", "loginCtx"})
self.assertEqual(hidden_inputs["signature"], "test_signature_value")

def test_idcs_session_form(self) -> None:
"""Test parsing the Oracle IDCS session form."""
with open(IDCS_SESSION_HTML_FILENAME) as f:
html = f.read()
action_url, hidden_inputs = get_form_action_url_and_hidden_inputs(html)
self.assertEqual(
action_url,
"https://idcs-3359adb31e35415e8c1729c5c8098c6d.identity.oraclecloud.com/fed/v1/user/response/login",
)
self.assertEqual(set(hidden_inputs.keys()), {"OCIS_REQ"})

def test_saml_response_form(self) -> None:
"""Test parsing the SAML response form."""
with open(SAML_RESPONSE_HTML_FILENAME) as f:
html = f.read()
action_url, hidden_inputs = get_form_action_url_and_hidden_inputs(html)
self.assertEqual(
action_url,
"https://myutilities.seattle.gov/rest/auth/samlresp",
)
self.assertEqual(set(hidden_inputs.keys()), {"RelayState", "SAMLResponse"})