Conversation
📝 WalkthroughWalkthroughThis pull request introduces a new PATCH endpoint for updating datasource rows, removing a pre-query validation check, adding a payload model for update operations, and implementing the corresponding update logic across BigQuery, Redshift, and base plugin layers. Changes
Suggested Reviewers
Poem
🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (1)
wavefront/server/modules/plugins_module/plugins_module/utils/helper.py (1)
25-28: Consider adding validation for non-emptydata.The model allows empty dictionaries for both
filteranddata. While the controller validates thatfilteris non-empty, there's no validation fordata. An emptydatadict would result in an invalid SQLUPDATEstatement with noSETclause.♻️ Proposed fix using Pydantic validators
+from pydantic import field_validator + class UpdateRowsJsonPayload(BaseModel): filter: Dict[str, Any] data: Dict[str, Any] + + `@field_validator`('filter', 'data') + `@classmethod` + def must_not_be_empty(cls, v, info): + if not v: + raise ValueError(f'{info.field_name} must not be empty') + return vAlternatively, add a validation check in the controller similar to the existing
filtercheck.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/modules/plugins_module/plugins_module/utils/helper.py` around lines 25 - 28, The UpdateRowsJsonPayload Pydantic model currently allows an empty data dict which can produce an invalid SQL UPDATE with no SET clause; add a non-empty validation for the data field (mirror the existing filter validation) by implementing a Pydantic validator on UpdateRowsJsonPayload.data that raises a ValueError when data is empty, or alternatively add a controller-side check where filter is validated to reject requests with an empty data dict before building the UPDATE statement; reference UpdateRowsJsonPayload and the controller's existing filter validation logic when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`:
- Around line 546-552: The handler currently only checks
update_rows_json_payload.filter but doesn't validate
update_rows_json_payload.data; add a check (next to the existing filter check in
the same function handling update rows in datasource_controller.py) to ensure
update_rows_json_payload.data is a non-empty dict/iterable and if empty return a
JSONResponse with status_code=status.HTTP_400_BAD_REQUEST and
content=response_formatter.buildErrorResponse('data is required to update rows')
to avoid generating an UPDATE with no SET clause.
- Around line 526-565: The update_rows_json endpoint (function update_rows_json)
is missing authorization checks allowing unrestricted data updates; fetch the
requester’s auth context (same mechanism used by
query_datasource/add_datasource/update_datasource — e.g., the is_admin flag or
current user/role check used elsewhere), call get_datasource_config as before,
then enforce authorization before calling DatasourcePlugin.update_rows_json: if
the user is not is_admin and does not have permitted access under your RLS
rules, return a 403 JSONResponse using response_formatter.buildErrorResponse;
otherwise proceed to call DatasourcePlugin.update_rows_json and return success.
Ensure you reuse the same auth helpers and error responses used by
query_datasource and the other datasource endpoints for consistency.
- Around line 555-559: The call to datasource_plugin.update_rows_json in the
controller should be wrapped in a try/except to handle DB/validation errors
instead of letting exceptions bubble as raw 500s; catch specific DB-related
exceptions (e.g., SQLAlchemyError or the ORM/client-specific exception) and a
general Exception fallback, log the error with context (resource_id, filter) and
the exception message, and raise a controlled HTTP error (e.g., raise
HTTPException with a clear message and appropriate status_code) so the client
gets a meaningful response; modify the controller function containing the
update_rows_json call to implement this error handling and logging around the
update_rows_json invocation.
In `@wavefront/server/plugins/datasource/datasource/bigquery/__init__.py`:
- Around line 75-84: The update_rows_json method is vulnerable because it
interpolates column names from data.keys() and filter.keys() into the SQL; add a
shared identifier validator (e.g., validate_identifier(name: str,
identifier_type: str = 'column') in
wavefront/server/plugins/datasource/datasource/helper.py that enforces a safe
pattern like ^[A-Za-z_][A-Za-z0-9_]*$ and raises on invalid names, then call
validate_identifier for each key in update_rows_json before building set_clause
and where_clause (keep using backticks and the existing paramized values in
all_params and execute_query).
In `@wavefront/server/plugins/datasource/datasource/redshift/__init__.py`:
- Around line 67-78: The update_rows_json method is vulnerable because it
interpolates table_name and column names from data.keys() and filter.keys()
directly into SQL; validate and sanitize those identifiers (and preferably
table_name) before building set_clause/where_clause—e.g., enforce a strict
whitelist or regex like alphanumeric + underscores only and raise on invalid
identifiers, then construct parameter names from the validated identifiers (use
the validated identifier strings for the f'{col} = :set_{col}' pieces) so only
safe column names are inserted into the query; alternatively implement a
table-name whitelist and reject or escape any name not allowed.
---
Nitpick comments:
In `@wavefront/server/modules/plugins_module/plugins_module/utils/helper.py`:
- Around line 25-28: The UpdateRowsJsonPayload Pydantic model currently allows
an empty data dict which can produce an invalid SQL UPDATE with no SET clause;
add a non-empty validation for the data field (mirror the existing filter
validation) by implementing a Pydantic validator on UpdateRowsJsonPayload.data
that raises a ValueError when data is empty, or alternatively add a
controller-side check where filter is validated to reject requests with an empty
data dict before building the UPDATE statement; reference UpdateRowsJsonPayload
and the controller's existing filter validation logic when making the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9b8db589-e458-4f11-a85b-9d3c422896a5
📒 Files selected for processing (7)
wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.pywavefront/server/modules/plugins_module/plugins_module/services/datasource_services.pywavefront/server/modules/plugins_module/plugins_module/utils/helper.pywavefront/server/plugins/datasource/datasource/__init__.pywavefront/server/plugins/datasource/datasource/bigquery/__init__.pywavefront/server/plugins/datasource/datasource/redshift/__init__.pywavefront/server/plugins/datasource/datasource/types.py
💤 Files with no reviewable changes (1)
- wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py
| @datasource_router.patch('/v1/datasources/{datasource_id}/resources/{resource_id}') | ||
| @inject | ||
| async def update_rows_json( | ||
| _: Request, | ||
| datasource_id: str, | ||
| resource_id: str, | ||
| update_rows_json_payload: UpdateRowsJsonPayload, | ||
| response_formatter: ResponseFormatter = Depends( | ||
| Provide[CommonContainer.response_formatter] | ||
| ), | ||
| ): | ||
| datasource_type, datasource_config = await get_datasource_config(datasource_id) | ||
| if not datasource_config: | ||
| return JSONResponse( | ||
| status_code=status.HTTP_404_NOT_FOUND, | ||
| content=response_formatter.buildErrorResponse( | ||
| f'Datasource not found: {datasource_id}' | ||
| ), | ||
| ) | ||
|
|
||
| if not update_rows_json_payload.filter: | ||
| return JSONResponse( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| content=response_formatter.buildErrorResponse( | ||
| 'filter is required to update rows' | ||
| ), | ||
| ) | ||
|
|
||
| datasource_plugin = DatasourcePlugin(datasource_type, datasource_config) | ||
| await datasource_plugin.update_rows_json( | ||
| resource_id, | ||
| update_rows_json_payload.filter, | ||
| update_rows_json_payload.data, | ||
| ) | ||
| return JSONResponse( | ||
| status_code=status.HTTP_200_OK, | ||
| content=response_formatter.buildSuccessResponse( | ||
| {'message': 'Rows updated successfully'} | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Critical: Missing authorization check allows unrestricted data modification.
This endpoint lacks the authorization checks present in all other datasource endpoints. Comparing with:
add_datasource(line 72): checksis_adminupdate_datasource(line 141): checksis_adminquery_datasource(line 437): checksis_adminor applies RLS filtersinsert_rows_json(line 490): no auth check either (pre-existing issue)
Without authorization, any authenticated user can update any rows in any table, which is a significant security vulnerability for a data modification endpoint.
🔒 Proposed fix to add authorization
`@datasource_router.patch`('/v1/datasources/{datasource_id}/resources/{resource_id}')
`@inject`
async def update_rows_json(
- _: Request,
+ request: Request,
datasource_id: str,
resource_id: str,
update_rows_json_payload: UpdateRowsJsonPayload,
response_formatter: ResponseFormatter = Depends(
Provide[CommonContainer.response_formatter]
),
):
+ role_id = request.state.session.role_id
+ is_admin = await check_admin(role_id)
+ if not is_admin:
+ return JSONResponse(
+ status_code=status.HTTP_403_FORBIDDEN,
+ content=response_formatter.buildErrorResponse(
+ 'Data access not set for non-admin user'
+ ),
+ )
+
datasource_type, datasource_config = await get_datasource_config(datasource_id)🧰 Tools
🪛 Ruff (0.15.9)
[warning] 533-535: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`
around lines 526 - 565, The update_rows_json endpoint (function
update_rows_json) is missing authorization checks allowing unrestricted data
updates; fetch the requester’s auth context (same mechanism used by
query_datasource/add_datasource/update_datasource — e.g., the is_admin flag or
current user/role check used elsewhere), call get_datasource_config as before,
then enforce authorization before calling DatasourcePlugin.update_rows_json: if
the user is not is_admin and does not have permitted access under your RLS
rules, return a 403 JSONResponse using response_formatter.buildErrorResponse;
otherwise proceed to call DatasourcePlugin.update_rows_json and return success.
Ensure you reuse the same auth helpers and error responses used by
query_datasource and the other datasource endpoints for consistency.
| if not update_rows_json_payload.filter: | ||
| return JSONResponse( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| content=response_formatter.buildErrorResponse( | ||
| 'filter is required to update rows' | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Also validate that data is not empty.
The filter validation is good, but an empty data dict would result in an invalid SQL UPDATE statement with no SET clause.
🐛 Proposed fix
if not update_rows_json_payload.filter:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=response_formatter.buildErrorResponse(
'filter is required to update rows'
),
)
+
+ if not update_rows_json_payload.data:
+ return JSONResponse(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ content=response_formatter.buildErrorResponse(
+ 'data is required to update rows'
+ ),
+ )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`
around lines 546 - 552, The handler currently only checks
update_rows_json_payload.filter but doesn't validate
update_rows_json_payload.data; add a check (next to the existing filter check in
the same function handling update rows in datasource_controller.py) to ensure
update_rows_json_payload.data is a non-empty dict/iterable and if empty return a
JSONResponse with status_code=status.HTTP_400_BAD_REQUEST and
content=response_formatter.buildErrorResponse('data is required to update rows')
to avoid generating an UPDATE with no SET clause.
| await datasource_plugin.update_rows_json( | ||
| resource_id, | ||
| update_rows_json_payload.filter, | ||
| update_rows_json_payload.data, | ||
| ) |
There was a problem hiding this comment.
Add error handling for the update operation.
The update_rows_json call is not wrapped in a try-except block. If the underlying database operation fails (e.g., invalid column names, constraint violations, connection issues), the exception will propagate as a 500 error without a meaningful message.
🛡️ Proposed fix with error handling
datasource_plugin = DatasourcePlugin(datasource_type, datasource_config)
- await datasource_plugin.update_rows_json(
- resource_id,
- update_rows_json_payload.filter,
- update_rows_json_payload.data,
- )
+ try:
+ await datasource_plugin.update_rows_json(
+ resource_id,
+ update_rows_json_payload.filter,
+ update_rows_json_payload.data,
+ )
+ except ValueError as e:
+ return JSONResponse(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ content=response_formatter.buildErrorResponse(str(e)),
+ )
+ except Exception as e:
+ return JSONResponse(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ content=response_formatter.buildErrorResponse(
+ f'Failed to update rows: {str(e)}'
+ ),
+ )
return JSONResponse(📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await datasource_plugin.update_rows_json( | |
| resource_id, | |
| update_rows_json_payload.filter, | |
| update_rows_json_payload.data, | |
| ) | |
| try: | |
| await datasource_plugin.update_rows_json( | |
| resource_id, | |
| update_rows_json_payload.filter, | |
| update_rows_json_payload.data, | |
| ) | |
| except ValueError as e: | |
| return JSONResponse( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| content=response_formatter.buildErrorResponse(str(e)), | |
| ) | |
| except Exception as e: | |
| return JSONResponse( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| content=response_formatter.buildErrorResponse( | |
| f'Failed to update rows: {str(e)}' | |
| ), | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`
around lines 555 - 559, The call to datasource_plugin.update_rows_json in the
controller should be wrapped in a try/except to handle DB/validation errors
instead of letting exceptions bubble as raw 500s; catch specific DB-related
exceptions (e.g., SQLAlchemyError or the ORM/client-specific exception) and a
general Exception fallback, log the error with context (resource_id, filter) and
the exception message, and raise a controlled HTTP error (e.g., raise
HTTPException with a clear message and appropriate status_code) so the client
gets a meaningful response; modify the controller function containing the
update_rows_json call to implement this error handling and logging around the
update_rows_json invocation.
| async def update_rows_json( | ||
| self, table_name: str, filter: Dict[str, Any], data: Dict[str, Any] | ||
| ) -> None: | ||
| set_params = {f'set_{k}': v for k, v in data.items()} | ||
| where_params = {f'where_{k}': v for k, v in filter.items()} | ||
| all_params = {**set_params, **where_params} | ||
| set_clause = ', '.join([f'`{k}` = @set_{k}' for k in data]) | ||
| where_clause = ' AND '.join([f'`{k}` = @where_{k}' for k in filter]) | ||
| query = f'UPDATE `{self.table_prefix}{table_name}` SET {set_clause} WHERE {where_clause}' | ||
| await self.client.execute_query(query, params=all_params) |
There was a problem hiding this comment.
Same SQL injection risk as Redshift implementation.
Column names from data.keys() and filter.keys() are directly interpolated into the SQL query. While BigQuery's backtick quoting provides some protection, it doesn't fully prevent injection attacks.
This implementation has the same vulnerability as the Redshift implementation. Consider extracting the column name validation into a shared utility function in the base module to ensure consistent security across all datasource plugins.
🛡️ Proposed shared validation approach
Add a validation utility in wavefront/server/plugins/datasource/datasource/helper.py:
import re
def validate_identifier(name: str, identifier_type: str = 'column') -> str:
"""Validate SQL identifier contains only safe characters."""
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
raise ValueError(f'Invalid {identifier_type} name: {name}')
return nameThen use it in both BigQuery and Redshift implementations:
+from ..helper import validate_identifier
+
async def update_rows_json(
self, table_name: str, filter: Dict[str, Any], data: Dict[str, Any]
) -> None:
+ validate_identifier(table_name, 'table')
+ for key in list(data.keys()) + list(filter.keys()):
+ validate_identifier(key, 'column')
+
set_params = {f'set_{k}': v for k, v in data.items()}🧰 Tools
🪛 Ruff (0.15.9)
[error] 76-76: Function argument filter is shadowing a Python builtin
(A002)
[error] 83-83: Possible SQL injection vector through string-based query construction
(S608)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@wavefront/server/plugins/datasource/datasource/bigquery/__init__.py` around
lines 75 - 84, The update_rows_json method is vulnerable because it interpolates
column names from data.keys() and filter.keys() into the SQL; add a shared
identifier validator (e.g., validate_identifier(name: str, identifier_type: str
= 'column') in wavefront/server/plugins/datasource/datasource/helper.py that
enforces a safe pattern like ^[A-Za-z_][A-Za-z0-9_]*$ and raises on invalid
names, then call validate_identifier for each key in update_rows_json before
building set_clause and where_clause (keep using backticks and the existing
paramized values in all_params and execute_query).
| async def update_rows_json( | ||
| self, table_name: str, filter: Dict[str, Any], data: Dict[str, Any] | ||
| ) -> None: | ||
| set_params = {f'set_{k}': v for k, v in data.items()} | ||
| where_params = {f'where_{k}': v for k, v in filter.items()} | ||
| all_params = {**set_params, **where_params} | ||
| set_clause = ', '.join([f'{k} = :set_{k}' for k in data]) | ||
| where_clause = ' AND '.join([f'{k} = :where_{k}' for k in filter]) | ||
| query = ( | ||
| f'UPDATE {self.db_name}.{table_name} SET {set_clause} WHERE {where_clause}' | ||
| ) | ||
| await asyncio.to_thread(self.client.execute_query_as_dict, query, all_params) |
There was a problem hiding this comment.
SQL injection risk via unvalidated column names.
While values are properly parameterized, column names from data.keys() and filter.keys() are directly interpolated into the SQL query without sanitization. If an attacker can control the keys in the request payload, they could inject malicious SQL via column names.
For example, a malicious key like "id; DROP TABLE users; --" would be directly embedded in the query.
🛡️ Proposed fix to validate column names
+import re
+
+def _validate_column_name(name: str) -> str:
+ """Validate column name contains only safe characters."""
+ if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
+ raise ValueError(f'Invalid column name: {name}')
+ return name
+
async def update_rows_json(
self, table_name: str, filter: Dict[str, Any], data: Dict[str, Any]
) -> None:
+ # Validate column names to prevent SQL injection
+ for key in list(data.keys()) + list(filter.keys()):
+ _validate_column_name(key)
+
set_params = {f'set_{k}': v for k, v in data.items()}
where_params = {f'where_{k}': v for k, v in filter.items()}Consider also validating table_name with the same pattern, or using a whitelist of allowed table names.
🧰 Tools
🪛 Ruff (0.15.9)
[error] 68-68: Function argument filter is shadowing a Python builtin
(A002)
[error] 76-76: Possible SQL injection vector through string-based query construction
(S608)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@wavefront/server/plugins/datasource/datasource/redshift/__init__.py` around
lines 67 - 78, The update_rows_json method is vulnerable because it interpolates
table_name and column names from data.keys() and filter.keys() directly into
SQL; validate and sanitize those identifiers (and preferably table_name) before
building set_clause/where_clause—e.g., enforce a strict whitelist or regex like
alphanumeric + underscores only and raise on invalid identifiers, then construct
parameter names from the validated identifiers (use the validated identifier
strings for the f'{col} = :set_{col}' pieces) so only safe column names are
inserted into the query; alternatively implement a table-name whitelist and
reject or escape any name not allowed.
Summary by CodeRabbit
Release Notes