Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions santander_sdk/api_client/auth.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import abc
from datetime import datetime, timedelta

from requests import HTTPError, JSONDecodeError
Expand All @@ -8,19 +9,55 @@
from santander_sdk.api_client.exceptions import SantanderRequestError


class TokenStore(abc.ABC):
@abc.abstractmethod
def get(self) -> str | None: ...

@abc.abstractmethod
def set(self, token: str, expires_in: timedelta) -> None: ...


class InMemoryTokenStore(TokenStore):
def __init__(self, offset=timedelta(seconds=60)):
self._token = None
self._expires_at = None
self._offset = offset

def get(self):
if self._is_expired():
return None

return self._token

def _is_expired(self):
if self._expires_at is None:
return True

return self._expires_at - self._offset < datetime.now()

def set(self, token: str, expires_in: timedelta):
self._token = token
self._expires_at = datetime.now() + expires_in


class SantanderAuth(AuthBase):
TOKEN_ENDPOINT = "/auth/oauth/v2/token"
TIMEOUT_SECS = 60
BEFORE_EXPIRE_TOKEN = timedelta(seconds=60)

def __init__(self, base_url, client_id, client_secret, cert_path):
def __init__(
self,
base_url,
client_id,
client_secret,
cert_path,
token_store=InMemoryTokenStore(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Qual seria a utilidade de passar um token_store nessa função já que tem um defautl?

):
self.base_url = base_url
self.client_id = client_id
self.client_secret = client_secret
self.cert_path = cert_path

self._token = None
self.expires_at = None
self.token_store = token_store

@classmethod
def from_config(cls, config: SantanderClientConfiguration):
Expand All @@ -38,14 +75,12 @@ def __call__(self, r):

@property
def token(self):
if self.is_expired:
self.renew()

return self._token
token = self.token_store.get()
if not token:
token, expires_in = self.renew()
self.token_store.set(token, expires_in)

@token.setter
def token(self, values):
self._token, self.expires_at = values
return token

def renew(self):
session = BaseURLSession(base_url=self.base_url)
Expand Down Expand Up @@ -76,14 +111,4 @@ def renew(self):
) from e

data = response.json()
self.token = (
data["access_token"],
datetime.now() + timedelta(seconds=data["expires_in"]),
)

@property
def is_expired(self):
if not self.expires_at:
return True

return datetime.now() > self.expires_at - self.BEFORE_EXPIRE_TOKEN
return data["access_token"], timedelta(seconds=data["expires_in"])
30 changes: 8 additions & 22 deletions tests/test_auth_unit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from re import compile as regex
from datetime import datetime
from datetime import timedelta

import pytest
from freezegun import freeze_time
Expand Down Expand Up @@ -51,34 +51,20 @@ def test_renew_when_token_empty(auth, responses):
assert req.headers["X-Application-Key"] == auth.client_id


@freeze_time("2025-02-13 10:05")
def test_renew_token_when_expired(auth, responses):
responses.add(
responses.POST,
regex(".+/auth/oauth/v2/token"),
json={"access_token": "NEW_VALID_TOKEN", "expires_in": 120},
json={"access_token": "FRESH_TOKEN", "expires_in": 120},
)
auth.token = "VALID_TOKEN", datetime(2025, 2, 13, 10)
with freeze_time("2025-02-13 10:00"):
auth.token_store.set("EXPIRED_TOKEN", timedelta(0))

req = PreparedRequest()
req.prepare("GET", "https://api.santander.com.br/orders", auth=auth)
with freeze_time("2025-02-13 10:01"):
req = PreparedRequest()
req.prepare("GET", "https://api.santander.com.br/orders", auth=auth)

assert req.headers["Authorization"] == "Bearer NEW_VALID_TOKEN"
assert auth.expires_at == datetime(2025, 2, 13, 10, 7)


@freeze_time("2025-02-13 10:00")
@pytest.mark.parametrize(
"expires_at,expected",
[
(None, True),
(datetime(2025, 2, 13, 10, 1), False),
(datetime(2025, 2, 13, 10, 0, 59), True),
],
)
def test_is_expired(auth, expires_at, expected):
auth.expires_at = expires_at
assert auth.is_expired is expected
assert req.headers["Authorization"] == "Bearer FRESH_TOKEN"


def test_invalid_credentials(auth, responses):
Expand Down
Loading