Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ aind-services = [
"requests",
"pyyaml",
"pykeepass",
"ms-active-directory",
"cryptography",
"winkerberos; sys_platform == 'win32'",
"ldap3; sys_platform == 'win32'",
"msal; sys_platform == 'win32'",
"aind-data-schema>=2",
"aind-data-transfer-service >= 1.22.0",
Expand Down
13 changes: 6 additions & 7 deletions src/clabe/data_transfer/aind_watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,26 +430,25 @@ def _find_modality_candidates(source: PathLike) -> Dict[str, List[Path]]:

@staticmethod
def _get_project_names(
end_point: str = "http://aind-metadata-service/project_names", timeout: int = 5
end_point: str = "http://aind-metadata-service/api/v2/project_names", timeout: int = 5
) -> list[str]:
"""
Fetches the list of valid project names from the metadata service.

Args:
end_point: The endpoint URL for the metadata service
timeout: Timeout for the request in seconds
end_point: The endpoint URL for the metadata service.
timeout: Timeout for the request in seconds.

Returns:
A list of valid project names
A list of valid project names.

Raises:
HTTPError: If the request fails
HTTPError: If the request fails.
"""
response = requests.get(end_point, timeout=timeout)
if response.ok:
return json.loads(response.content)["data"]
return response.json()
else:
response.raise_for_status()
raise HTTPError(f"Failed to fetch project names from endpoint. {response.content.decode('utf-8')}")

def is_running(self) -> bool:
Expand Down
131 changes: 35 additions & 96 deletions src/clabe/utils/aind_auth.py
Original file line number Diff line number Diff line change
@@ -1,101 +1,40 @@
import concurrent.futures
import logging
import platform
from typing import Optional
from urllib.parse import quote

logger = logging.getLogger(__name__)

_ad_logger = logging.getLogger(
"ms_active_directory"
) # This library is annoyingly verbose at the INFO level. We will turn it off unless the root is at DEBUG level


if platform.system() == "Windows":

def validate_aind_username(
username: str,
domain: str = "corp.alleninstitute.org",
domain_username: Optional[str] = None,
timeout: Optional[float] = 2,
) -> bool:
"""
Validates if the given username exists in the AIND Active Directory.

This function authenticates with the corporate Active Directory and searches
for the specified username to verify its existence within the organization.
See https://github.com/AllenNeuralDynamics/aind-watchdog-service/issues/110#issuecomment-2828869619

Args:
username: The username to validate against Active Directory
domain: The Active Directory domain to search. Defaults to Allen Institute domain
domain_username: Username for domain authentication. Defaults to current user
timeout: Timeout in seconds for the validation operation

Returns:
bool: True if the username exists in Active Directory, False otherwise

Raises:
concurrent.futures.TimeoutError: If the validation operation times out
import requests

Example:
```python
# Validate a username in Active Directory
is_valid = validate_aind_username("j.doe")
```
"""
import getpass # type: ignore[import]

import ldap3 # type: ignore[import]
import ms_active_directory # type: ignore[import]

def _helper(username: str, domain: str, domain_username: Optional[str]) -> bool:
"""A function submitted to a thread pool to validate the username."""
if not logger.isEnabledFor(logging.DEBUG):
_ad_logger.disabled = True
if domain_username is None:
domain_username = getpass.getuser()
_domain = ms_active_directory.ADDomain(domain)
session = _domain.create_session_as_user(
domain_username,
authentication_mechanism=ldap3.SASL,
sasl_mechanism=ldap3.GSSAPI,
)
return session.find_user_by_name(username) is not None

try:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(_helper, username, domain, domain_username)
result = future.result(timeout=timeout)
return result
except concurrent.futures.TimeoutError as e:
logger.error("Timeout occurred while validating username: %s", e)
e.add_note("Timeout occurred while validating username")
raise

else:

def validate_aind_username(
username: str,
domain: str = "corp.alleninstitute.org",
domain_username: Optional[str] = None,
timeout: Optional[float] = 2,
) -> bool:
"""
Validates if the given username is in the AIND Active Directory.

This function is a no-op on non-Windows platforms since Active Directory
authentication is not available.

This function always returns True on non-Windows platforms.

Args:
username: The username to validate (ignored on non-Windows platforms)
domain: The Active Directory domain (ignored on non-Windows platforms)
domain_username: Username for domain authentication (ignored on non-Windows platforms)
timeout: Timeout for the validation operation (ignored on non-Windows platforms)
logger = logging.getLogger(__name__)

Returns:
bool: Always returns True on non-Windows platforms
"""
logger.warning("Active Directory validation is not implemented for non-Windows platforms")
return True
_AD_ENDPOINT = "http://aind-metadata-service/api/v2/active_directory"


def validate_aind_username(
username: str,
timeout: Optional[float] = 2,
) -> bool:
"""
Validates if the given username exists in the AIND Active Directory.

Queries the AIND metadata service to verify the username exists.
Returns False (instead of raising) on network errors so callers can
decide how to handle the degraded state.

Args:
username: The username to validate.
timeout: Timeout in seconds for the HTTP request. Defaults to 2.

Returns:
bool: True if the username was found, False otherwise.

Example:
```python
is_valid = validate_aind_username("j.doe")
```
"""
try:
response = requests.get(f"{_AD_ENDPOINT}/{quote(username, safe='')}", timeout=timeout)
return response.ok
except requests.RequestException as e:
logger.warning("Failed to validate username '%s': %s", username, e)
return False
5 changes: 3 additions & 2 deletions tests/data_transfer/test_data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,18 @@ def test_is_not_running(self, mock_check_output, watchdog_service):
def test_get_project_names(self, mock_get, watchdog_service):
mock_response = MagicMock()
mock_response.ok = True
mock_response.content = '{"data": ["test_project"]}'
mock_response.json.return_value = ["test_project", "other_project"]
mock_get.return_value = mock_response
project_names = watchdog_service._get_project_names()
assert "test_project" in project_names
mock_get.assert_called_once_with("http://aind-metadata-service/api/v2/project_names", timeout=5)

@patch("clabe.data_transfer.aind_watchdog.requests.get")
def test_get_project_names_fail(self, mock_get, watchdog_service):
mock_response = MagicMock()
mock_response.ok = False
mock_get.return_value = mock_response
with pytest.raises(Exception):
with pytest.raises(HTTPError):
watchdog_service._get_project_names()

@patch(
Expand Down
107 changes: 63 additions & 44 deletions tests/utils/test_aind_auth.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,69 @@
import platform
from unittest.mock import MagicMock, patch

import pytest
import requests

from clabe.utils import aind_auth


@pytest.mark.skipif(platform.system() != "Windows", reason="Windows-only test")
def test_validate_aind_username_windows_valid():
"""Test validate_aind_username on Windows with a valid user."""
with (
patch("ms_active_directory.ADDomain") as mock_ad_domain,
patch("ldap3.SASL", "SASL"),
patch("ldap3.GSSAPI", "GSSAPI"),
):
mock_session = MagicMock()
mock_session.find_user_by_name.return_value = True
mock_ad_domain.return_value.create_session_as_user.return_value = mock_session

assert aind_auth.validate_aind_username("testuser")


@pytest.mark.skipif(platform.system() != "Windows", reason="Windows-only test")
def test_validate_aind_username_windows_invalid():
"""Test validate_aind_username on Windows with an invalid user."""
with (
patch("ms_active_directory.ADDomain") as mock_ad_domain,
patch("ldap3.SASL", "SASL"),
patch("ldap3.GSSAPI", "GSSAPI"),
):
mock_session = MagicMock()
mock_session.find_user_by_name.return_value = None
mock_ad_domain.return_value.create_session_as_user.return_value = mock_session

assert not aind_auth.validate_aind_username("testuser")


@pytest.mark.skipif(platform.system() != "Windows", reason="Windows-only test")
def test_validate_aind_username_windows_timeout():
"""Test validate_aind_username on Windows with a timeout."""
with (
patch("ms_active_directory.ADDomain"),
patch("ldap3.SASL", "SASL"),
patch("ldap3.GSSAPI", "GSSAPI"),
patch("concurrent.futures.ThreadPoolExecutor.submit") as mock_submit,
):
mock_submit.side_effect = TimeoutError
with pytest.raises(TimeoutError):
aind_auth.validate_aind_username("testuser")
def test_validate_aind_username_valid():
"""Returns True when the metadata service finds the user."""
with patch("clabe.utils.aind_auth.requests.get") as mock_get:
mock_response = MagicMock()
mock_response.ok = True
mock_response.json.return_value = {
"username": "j.doe",
"full_name": "Jane Doe",
"email": "j.doe@alleninstitute.org",
}
mock_get.return_value = mock_response

assert aind_auth.validate_aind_username("j.doe") is True
mock_get.assert_called_once_with(
"http://aind-metadata-service/api/v2/active_directory/j.doe",
timeout=2,
)


def test_validate_aind_username_invalid():
"""Returns False when the metadata service does not find the user."""
with patch("clabe.utils.aind_auth.requests.get") as mock_get:
mock_response = MagicMock()
mock_response.ok = False
mock_get.return_value = mock_response

assert aind_auth.validate_aind_username("no.one") is False


def test_validate_aind_username_request_exception():
"""Returns False (with a logged warning) on a network error."""
with patch("clabe.utils.aind_auth.requests.get", side_effect=requests.RequestException("timeout")):
assert aind_auth.validate_aind_username("j.doe") is False


def test_validate_aind_username_custom_timeout():
"""Passes the timeout argument through to requests.get."""
with patch("clabe.utils.aind_auth.requests.get") as mock_get:
mock_response = MagicMock()
mock_response.ok = True
mock_response.json.return_value = {"username": "j.doe"}
mock_get.return_value = mock_response

aind_auth.validate_aind_username("j.doe", timeout=10)
mock_get.assert_called_once_with(
"http://aind-metadata-service/api/v2/active_directory/j.doe",
timeout=10,
)


def test_validate_aind_username_encodes_special_chars():
"""URL-encodes special characters in the username."""
with patch("clabe.utils.aind_auth.requests.get") as mock_get:
mock_response = MagicMock()
mock_response.ok = False
mock_get.return_value = mock_response

aind_auth.validate_aind_username("../admin")
mock_get.assert_called_once_with(
"http://aind-metadata-service/api/v2/active_directory/..%2Fadmin",
timeout=2,
)
Loading
Loading