88
99
1010def authentication_middleware (func : Any ) -> Any :
11+ """Decorator to enforce bearer token authentication on flask routes."""
12+
1113 def wrapper (* args : Any , ** kwargs : Any ) -> Any :
1214 token = AuthenticationMiddleware .get_token_from_auth_header (request )
1315 # Check if bearer token exists and validate it
@@ -23,32 +25,26 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
2325class AuthenticationMiddleware :
2426 @classmethod
2527 def validate_bearer_token (cls , token : str | None ) -> bool :
28+ """Validate the provided bearer token against the database."""
2629 try :
2730 if token is None :
2831 current_app .logger .error ("Authentication failed. Empty bearer token" )
2932 return False
3033 platform_key_table = f'"{ Env .DB_SCHEMA } ".{ DBTable .PLATFORM_KEY } '
31- query = f"SELECT * FROM { platform_key_table } WHERE key = ' { token } ' "
32- cursor = be_db .execute_sql (query )
34+ query = f"SELECT * FROM { platform_key_table } WHERE key = %s "
35+ cursor = be_db .execute_sql (query , ( token ,) )
3336 result_row = cursor .fetchone ()
3437 cursor .close ()
3538 if not result_row or len (result_row ) == 0 :
36- current_app .logger .error (
37- f"Authentication failed. bearer token not found { token } "
38- )
39+ current_app .logger .error ("Authentication failed. bearer token not found" )
3940 return False
4041 platform_key = str (result_row [1 ])
4142 is_active = bool (result_row [2 ])
4243 if not is_active :
43- current_app .logger .error (
44- f"Token is not active. Activate \
45- before using it. token { token } "
46- )
44+ current_app .logger .error ("Token is not active. Activate before using it." )
4745 return False
4846 if platform_key != token :
49- current_app .logger .error (
50- f"Authentication failed. Invalid bearer token: { token } "
51- )
47+ current_app .logger .error ("Authentication failed. Invalid bearer token" )
5248 return False
5349
5450 except Exception as e :
@@ -62,6 +58,7 @@ def validate_bearer_token(cls, token: str | None) -> bool:
6258
6359 @classmethod
6460 def get_token_from_auth_header (cls , request : Request ) -> str | None :
61+ """Extract the bearer token from the Authorization header."""
6562 try :
6663 bearer_token = request .headers .get ("Authorization" )
6764 if not bearer_token :
@@ -99,6 +96,7 @@ def get_organization_from_bearer_token(cls, token: str) -> tuple[int | None, str
9996
10097 @classmethod
10198 def execute_query (cls , query : str , params : tuple = ()) -> Any :
99+ """Execute a SQL query and return the first result."""
102100 cursor = be_db .execute_sql (query , params )
103101 result_row = cursor .fetchone ()
104102 cursor .close ()
0 commit comments