-
Notifications
You must be signed in to change notification settings - Fork 137
[client] Introduce api protocol helper for listening connectors (#850) #851
Conversation
+ Add support for multi env to fix queue protocol var name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding this feature
I've left some comments regarding the code.
Do you think we should extend the unit tests in tests/02-integration, as they currently only cover AMQP cases?
I'll be happy to answer your questions and address any remarks.
| @@ -41,7 +45,7 @@ def start_loop(loop): | |||
|
|
|||
|
|
|||
| def get_config_variable( | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this "OR like" behavior is introduced for the "QUEUE_PROTOCOL" backwards compatibility even though we now recommend "CONNECTOR_QUEUE_PROTOCOL" (addition in line 931).
1/ As the get_config_variable is used in many other place, do not you think we could avoid this change in the get_config_var method and only use the OR behavior for this specific variable loading process
2/ Should we add a deprecation warning ?
self.queue_protocol = get_config_variable(
env_var="CONNECTOR_QUEUE_PROTOCOL",
yaml_path=["connector", "queue_protocol"],
config=config,
)
if not self.queue_protocol: # for backwards compatibility
self.queue_protocol = get_config_variable(
env_var="QUEUE_PROTOCOL",
yaml_path=None,
config=config,
)
if self.queue_protocol:
raise DeprecationWarning("QUEUE_PROTOCOL is deprecated, please use CONNECTOR_QUEUE_PROTOCOL instead.")
if not self.queue_protocol:
self.queue_protocol = "amqp"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| try: | ||
| authorization: str = request.headers.get("Authorization") | ||
| scheme, token = authorization.split() | ||
| if scheme.lower() != "bearer" or token != self.opencti_token: | ||
| return {"error": "Invalid credentials"} | ||
| except Exception: | ||
| return {"error": "Invalid credentials"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove the try-except clause by
- adding value check to avoid :
"authorization.split() => Attribute error NoneType as no attribute 'split'" in case no "Authorization" key in headers dict
or "scheme, token => ValueError not enough/to many values to unpack " in case Authorization is empty or badly formatted string.
| try: | |
| authorization: str = request.headers.get("Authorization") | |
| scheme, token = authorization.split() | |
| if scheme.lower() != "bearer" or token != self.opencti_token: | |
| return {"error": "Invalid credentials"} | |
| except Exception: | |
| return {"error": "Invalid credentials"} | |
| authorization: str = request.headers.get("Authorization", "") # empty string default value for typing | |
| items = authorization.split() if isinstance(authorization, str) else [] | |
| if len(items) != 2 or items[0].lower() != "bearer" or items[1] != self.opencti_token: | |
| return {"error": "Invalid credentials"} | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| try: | ||
| data = await request.json() # Get the JSON payload | ||
| except Exception as e: | ||
| return {"error": "Invalid JSON payload", "details": str(e)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| try: | |
| data = await request.json() # Get the JSON payload | |
| except Exception as e: | |
| return {"error": "Invalid JSON payload", "details": str(e)} | |
| try: | |
| data = await request.json() # Get the JSON payload | |
| except json.JSONDecodeError as e: | |
| return {"error": "Invalid JSON payload", "details": str(e)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| "Failing reporting the processing" | ||
| ) | ||
|
|
||
| async def _process_callback(self, request: Request): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend to use JSONResponse with HTTP code rather than dictionnary for easier future debugging
from fastapi.responses import JSONResponse
...
async def _process_callback(self, request: Request) -> JSONResponse:
# 01. Check the authentication
...
return JSONResponse(status_code=401, content={"error": "Invalid credentials"})
...
# 02. Parse the data and execute
return JSONResponse(status_code=400, content={"error": "Invalid JSON payload", "details": str(e)})
...
return JSONResponse(status_code=500, content={"error": "Error processing message", "details": str(e)})
...
# all good
return JSONResponse(status_code=202, content={"message": "Message successfully processed"})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
See #850
Related to OpenCTI-Platform/opencti#10046