Skip to content

Adding a patch api for datasource#273

Open
vizsatiz wants to merge 1 commit intodevelopfrom
patch-datasource-api
Open

Adding a patch api for datasource#273
vizsatiz wants to merge 1 commit intodevelopfrom
patch-datasource-api

Conversation

@vizsatiz
Copy link
Copy Markdown
Member

@vizsatiz vizsatiz commented Apr 9, 2026

Summary by CodeRabbit

Release Notes

  • New Features
    • Added new endpoint to update existing datasource rows with customizable filter-based criteria
    • Extended row update functionality to BigQuery and Redshift datasources, enabling consistent update operations across multiple datasource platforms
    • Simplified datasource operations by removing resource name validation restrictions for improved flexibility

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 9, 2026

📝 Walkthrough

Walkthrough

This pull request introduces a new PATCH endpoint for updating datasource rows, removing a pre-query validation check, adding a payload model for update operations, and implementing the corresponding update logic across BigQuery, Redshift, and base plugin layers.

Changes

Cohort / File(s) Summary
API Endpoint Implementation
wavefront/server/modules/plugins_module/.../datasource_controller.py
Added new update_rows_json PATCH endpoint handler that validates datasource config exists and filter is provided, then delegates to plugin for update execution. Removed pre-existing check_is_valid_resource validation call.
Data Models
wavefront/server/modules/plugins_module/.../helper.py
Added new Pydantic model UpdateRowsJsonPayload with filter and data fields to structure update request payloads.
Service Layer
wavefront/server/modules/plugins_module/.../datasource_services.py
Removed check_is_valid_resource validation helper function and its hardcoded allowlist logic.
Plugin Interface
wavefront/server/plugins/datasource/datasource/types.py
Added abstract async method update_rows_json to DataSourceABC base class defining the interface for datasource implementations.
Plugin Wrapper
wavefront/server/plugins/datasource/datasource/__init__.py
Added async wrapper method update_rows_json in DatasourcePlugin that forwards calls to underlying datasource implementation.
BigQuery Implementation
wavefront/server/plugins/datasource/.../bigquery/__init__.py
Added update_rows_json implementation for BigQuery that constructs parameterized UPDATE statements with backticked column names and table prefix, then executes via client.
Redshift Implementation
wavefront/server/plugins/datasource/.../redshift/__init__.py
Added update_rows_json implementation for Redshift that builds parameterized UPDATE statements and executes them asynchronously via thread pool.

Suggested Reviewers

  • rootflo-hardik

Poem

🐰 A patch so fine to update rows,
From BigQuery to Redshift it flows,
With filter and data combined,
The perfect UPDATE we find,
No more validation to slow,
Just updates that cleanly grow!


🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: adding a PATCH endpoint for datasource updates.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch patch-datasource-api

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (1)
wavefront/server/modules/plugins_module/plugins_module/utils/helper.py (1)

25-28: Consider adding validation for non-empty data.

The model allows empty dictionaries for both filter and data. While the controller validates that filter is non-empty, there's no validation for data. An empty data dict would result in an invalid SQL UPDATE statement with no SET clause.

♻️ Proposed fix using Pydantic validators
+from pydantic import field_validator
+
 class UpdateRowsJsonPayload(BaseModel):
     filter: Dict[str, Any]
     data: Dict[str, Any]
+
+    `@field_validator`('filter', 'data')
+    `@classmethod`
+    def must_not_be_empty(cls, v, info):
+        if not v:
+            raise ValueError(f'{info.field_name} must not be empty')
+        return v

Alternatively, add a validation check in the controller similar to the existing filter check.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/modules/plugins_module/plugins_module/utils/helper.py`
around lines 25 - 28, The UpdateRowsJsonPayload Pydantic model currently allows
an empty data dict which can produce an invalid SQL UPDATE with no SET clause;
add a non-empty validation for the data field (mirror the existing filter
validation) by implementing a Pydantic validator on UpdateRowsJsonPayload.data
that raises a ValueError when data is empty, or alternatively add a
controller-side check where filter is validated to reject requests with an empty
data dict before building the UPDATE statement; reference UpdateRowsJsonPayload
and the controller's existing filter validation logic when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`:
- Around line 546-552: The handler currently only checks
update_rows_json_payload.filter but doesn't validate
update_rows_json_payload.data; add a check (next to the existing filter check in
the same function handling update rows in datasource_controller.py) to ensure
update_rows_json_payload.data is a non-empty dict/iterable and if empty return a
JSONResponse with status_code=status.HTTP_400_BAD_REQUEST and
content=response_formatter.buildErrorResponse('data is required to update rows')
to avoid generating an UPDATE with no SET clause.
- Around line 526-565: The update_rows_json endpoint (function update_rows_json)
is missing authorization checks allowing unrestricted data updates; fetch the
requester’s auth context (same mechanism used by
query_datasource/add_datasource/update_datasource — e.g., the is_admin flag or
current user/role check used elsewhere), call get_datasource_config as before,
then enforce authorization before calling DatasourcePlugin.update_rows_json: if
the user is not is_admin and does not have permitted access under your RLS
rules, return a 403 JSONResponse using response_formatter.buildErrorResponse;
otherwise proceed to call DatasourcePlugin.update_rows_json and return success.
Ensure you reuse the same auth helpers and error responses used by
query_datasource and the other datasource endpoints for consistency.
- Around line 555-559: The call to datasource_plugin.update_rows_json in the
controller should be wrapped in a try/except to handle DB/validation errors
instead of letting exceptions bubble as raw 500s; catch specific DB-related
exceptions (e.g., SQLAlchemyError or the ORM/client-specific exception) and a
general Exception fallback, log the error with context (resource_id, filter) and
the exception message, and raise a controlled HTTP error (e.g., raise
HTTPException with a clear message and appropriate status_code) so the client
gets a meaningful response; modify the controller function containing the
update_rows_json call to implement this error handling and logging around the
update_rows_json invocation.

In `@wavefront/server/plugins/datasource/datasource/bigquery/__init__.py`:
- Around line 75-84: The update_rows_json method is vulnerable because it
interpolates column names from data.keys() and filter.keys() into the SQL; add a
shared identifier validator (e.g., validate_identifier(name: str,
identifier_type: str = 'column') in
wavefront/server/plugins/datasource/datasource/helper.py that enforces a safe
pattern like ^[A-Za-z_][A-Za-z0-9_]*$ and raises on invalid names, then call
validate_identifier for each key in update_rows_json before building set_clause
and where_clause (keep using backticks and the existing paramized values in
all_params and execute_query).

In `@wavefront/server/plugins/datasource/datasource/redshift/__init__.py`:
- Around line 67-78: The update_rows_json method is vulnerable because it
interpolates table_name and column names from data.keys() and filter.keys()
directly into SQL; validate and sanitize those identifiers (and preferably
table_name) before building set_clause/where_clause—e.g., enforce a strict
whitelist or regex like alphanumeric + underscores only and raise on invalid
identifiers, then construct parameter names from the validated identifiers (use
the validated identifier strings for the f'{col} = :set_{col}' pieces) so only
safe column names are inserted into the query; alternatively implement a
table-name whitelist and reject or escape any name not allowed.

---

Nitpick comments:
In `@wavefront/server/modules/plugins_module/plugins_module/utils/helper.py`:
- Around line 25-28: The UpdateRowsJsonPayload Pydantic model currently allows
an empty data dict which can produce an invalid SQL UPDATE with no SET clause;
add a non-empty validation for the data field (mirror the existing filter
validation) by implementing a Pydantic validator on UpdateRowsJsonPayload.data
that raises a ValueError when data is empty, or alternatively add a
controller-side check where filter is validated to reject requests with an empty
data dict before building the UPDATE statement; reference UpdateRowsJsonPayload
and the controller's existing filter validation logic when making the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9b8db589-e458-4f11-a85b-9d3c422896a5

📥 Commits

Reviewing files that changed from the base of the PR and between a3a7752 and 84256b5.

📒 Files selected for processing (7)
  • wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py
  • wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py
  • wavefront/server/modules/plugins_module/plugins_module/utils/helper.py
  • wavefront/server/plugins/datasource/datasource/__init__.py
  • wavefront/server/plugins/datasource/datasource/bigquery/__init__.py
  • wavefront/server/plugins/datasource/datasource/redshift/__init__.py
  • wavefront/server/plugins/datasource/datasource/types.py
💤 Files with no reviewable changes (1)
  • wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py

Comment on lines +526 to +565
@datasource_router.patch('/v1/datasources/{datasource_id}/resources/{resource_id}')
@inject
async def update_rows_json(
_: Request,
datasource_id: str,
resource_id: str,
update_rows_json_payload: UpdateRowsJsonPayload,
response_formatter: ResponseFormatter = Depends(
Provide[CommonContainer.response_formatter]
),
):
datasource_type, datasource_config = await get_datasource_config(datasource_id)
if not datasource_config:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content=response_formatter.buildErrorResponse(
f'Datasource not found: {datasource_id}'
),
)

if not update_rows_json_payload.filter:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=response_formatter.buildErrorResponse(
'filter is required to update rows'
),
)

datasource_plugin = DatasourcePlugin(datasource_type, datasource_config)
await datasource_plugin.update_rows_json(
resource_id,
update_rows_json_payload.filter,
update_rows_json_payload.data,
)
return JSONResponse(
status_code=status.HTTP_200_OK,
content=response_formatter.buildSuccessResponse(
{'message': 'Rows updated successfully'}
),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Missing authorization check allows unrestricted data modification.

This endpoint lacks the authorization checks present in all other datasource endpoints. Comparing with:

  • add_datasource (line 72): checks is_admin
  • update_datasource (line 141): checks is_admin
  • query_datasource (line 437): checks is_admin or applies RLS filters
  • insert_rows_json (line 490): no auth check either (pre-existing issue)

Without authorization, any authenticated user can update any rows in any table, which is a significant security vulnerability for a data modification endpoint.

🔒 Proposed fix to add authorization
 `@datasource_router.patch`('/v1/datasources/{datasource_id}/resources/{resource_id}')
 `@inject`
 async def update_rows_json(
-    _: Request,
+    request: Request,
     datasource_id: str,
     resource_id: str,
     update_rows_json_payload: UpdateRowsJsonPayload,
     response_formatter: ResponseFormatter = Depends(
         Provide[CommonContainer.response_formatter]
     ),
 ):
+    role_id = request.state.session.role_id
+    is_admin = await check_admin(role_id)
+    if not is_admin:
+        return JSONResponse(
+            status_code=status.HTTP_403_FORBIDDEN,
+            content=response_formatter.buildErrorResponse(
+                'Data access not set for non-admin user'
+            ),
+        )
+
     datasource_type, datasource_config = await get_datasource_config(datasource_id)
🧰 Tools
🪛 Ruff (0.15.9)

[warning] 533-535: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`
around lines 526 - 565, The update_rows_json endpoint (function
update_rows_json) is missing authorization checks allowing unrestricted data
updates; fetch the requester’s auth context (same mechanism used by
query_datasource/add_datasource/update_datasource — e.g., the is_admin flag or
current user/role check used elsewhere), call get_datasource_config as before,
then enforce authorization before calling DatasourcePlugin.update_rows_json: if
the user is not is_admin and does not have permitted access under your RLS
rules, return a 403 JSONResponse using response_formatter.buildErrorResponse;
otherwise proceed to call DatasourcePlugin.update_rows_json and return success.
Ensure you reuse the same auth helpers and error responses used by
query_datasource and the other datasource endpoints for consistency.

Comment on lines +546 to +552
if not update_rows_json_payload.filter:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=response_formatter.buildErrorResponse(
'filter is required to update rows'
),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Also validate that data is not empty.

The filter validation is good, but an empty data dict would result in an invalid SQL UPDATE statement with no SET clause.

🐛 Proposed fix
     if not update_rows_json_payload.filter:
         return JSONResponse(
             status_code=status.HTTP_400_BAD_REQUEST,
             content=response_formatter.buildErrorResponse(
                 'filter is required to update rows'
             ),
         )
+
+    if not update_rows_json_payload.data:
+        return JSONResponse(
+            status_code=status.HTTP_400_BAD_REQUEST,
+            content=response_formatter.buildErrorResponse(
+                'data is required to update rows'
+            ),
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`
around lines 546 - 552, The handler currently only checks
update_rows_json_payload.filter but doesn't validate
update_rows_json_payload.data; add a check (next to the existing filter check in
the same function handling update rows in datasource_controller.py) to ensure
update_rows_json_payload.data is a non-empty dict/iterable and if empty return a
JSONResponse with status_code=status.HTTP_400_BAD_REQUEST and
content=response_formatter.buildErrorResponse('data is required to update rows')
to avoid generating an UPDATE with no SET clause.

Comment on lines +555 to +559
await datasource_plugin.update_rows_json(
resource_id,
update_rows_json_payload.filter,
update_rows_json_payload.data,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add error handling for the update operation.

The update_rows_json call is not wrapped in a try-except block. If the underlying database operation fails (e.g., invalid column names, constraint violations, connection issues), the exception will propagate as a 500 error without a meaningful message.

🛡️ Proposed fix with error handling
     datasource_plugin = DatasourcePlugin(datasource_type, datasource_config)
-    await datasource_plugin.update_rows_json(
-        resource_id,
-        update_rows_json_payload.filter,
-        update_rows_json_payload.data,
-    )
+    try:
+        await datasource_plugin.update_rows_json(
+            resource_id,
+            update_rows_json_payload.filter,
+            update_rows_json_payload.data,
+        )
+    except ValueError as e:
+        return JSONResponse(
+            status_code=status.HTTP_400_BAD_REQUEST,
+            content=response_formatter.buildErrorResponse(str(e)),
+        )
+    except Exception as e:
+        return JSONResponse(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            content=response_formatter.buildErrorResponse(
+                f'Failed to update rows: {str(e)}'
+            ),
+        )
     return JSONResponse(
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await datasource_plugin.update_rows_json(
resource_id,
update_rows_json_payload.filter,
update_rows_json_payload.data,
)
try:
await datasource_plugin.update_rows_json(
resource_id,
update_rows_json_payload.filter,
update_rows_json_payload.data,
)
except ValueError as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=response_formatter.buildErrorResponse(str(e)),
)
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=response_formatter.buildErrorResponse(
f'Failed to update rows: {str(e)}'
),
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py`
around lines 555 - 559, The call to datasource_plugin.update_rows_json in the
controller should be wrapped in a try/except to handle DB/validation errors
instead of letting exceptions bubble as raw 500s; catch specific DB-related
exceptions (e.g., SQLAlchemyError or the ORM/client-specific exception) and a
general Exception fallback, log the error with context (resource_id, filter) and
the exception message, and raise a controlled HTTP error (e.g., raise
HTTPException with a clear message and appropriate status_code) so the client
gets a meaningful response; modify the controller function containing the
update_rows_json call to implement this error handling and logging around the
update_rows_json invocation.

Comment on lines +75 to +84
async def update_rows_json(
self, table_name: str, filter: Dict[str, Any], data: Dict[str, Any]
) -> None:
set_params = {f'set_{k}': v for k, v in data.items()}
where_params = {f'where_{k}': v for k, v in filter.items()}
all_params = {**set_params, **where_params}
set_clause = ', '.join([f'`{k}` = @set_{k}' for k in data])
where_clause = ' AND '.join([f'`{k}` = @where_{k}' for k in filter])
query = f'UPDATE `{self.table_prefix}{table_name}` SET {set_clause} WHERE {where_clause}'
await self.client.execute_query(query, params=all_params)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Same SQL injection risk as Redshift implementation.

Column names from data.keys() and filter.keys() are directly interpolated into the SQL query. While BigQuery's backtick quoting provides some protection, it doesn't fully prevent injection attacks.

This implementation has the same vulnerability as the Redshift implementation. Consider extracting the column name validation into a shared utility function in the base module to ensure consistent security across all datasource plugins.

🛡️ Proposed shared validation approach

Add a validation utility in wavefront/server/plugins/datasource/datasource/helper.py:

import re

def validate_identifier(name: str, identifier_type: str = 'column') -> str:
    """Validate SQL identifier contains only safe characters."""
    if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
        raise ValueError(f'Invalid {identifier_type} name: {name}')
    return name

Then use it in both BigQuery and Redshift implementations:

+from ..helper import validate_identifier
+
 async def update_rows_json(
     self, table_name: str, filter: Dict[str, Any], data: Dict[str, Any]
 ) -> None:
+    validate_identifier(table_name, 'table')
+    for key in list(data.keys()) + list(filter.keys()):
+        validate_identifier(key, 'column')
+
     set_params = {f'set_{k}': v for k, v in data.items()}
🧰 Tools
🪛 Ruff (0.15.9)

[error] 76-76: Function argument filter is shadowing a Python builtin

(A002)


[error] 83-83: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/plugins/datasource/datasource/bigquery/__init__.py` around
lines 75 - 84, The update_rows_json method is vulnerable because it interpolates
column names from data.keys() and filter.keys() into the SQL; add a shared
identifier validator (e.g., validate_identifier(name: str, identifier_type: str
= 'column') in wavefront/server/plugins/datasource/datasource/helper.py that
enforces a safe pattern like ^[A-Za-z_][A-Za-z0-9_]*$ and raises on invalid
names, then call validate_identifier for each key in update_rows_json before
building set_clause and where_clause (keep using backticks and the existing
paramized values in all_params and execute_query).

Comment on lines +67 to +78
async def update_rows_json(
self, table_name: str, filter: Dict[str, Any], data: Dict[str, Any]
) -> None:
set_params = {f'set_{k}': v for k, v in data.items()}
where_params = {f'where_{k}': v for k, v in filter.items()}
all_params = {**set_params, **where_params}
set_clause = ', '.join([f'{k} = :set_{k}' for k in data])
where_clause = ' AND '.join([f'{k} = :where_{k}' for k in filter])
query = (
f'UPDATE {self.db_name}.{table_name} SET {set_clause} WHERE {where_clause}'
)
await asyncio.to_thread(self.client.execute_query_as_dict, query, all_params)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

SQL injection risk via unvalidated column names.

While values are properly parameterized, column names from data.keys() and filter.keys() are directly interpolated into the SQL query without sanitization. If an attacker can control the keys in the request payload, they could inject malicious SQL via column names.

For example, a malicious key like "id; DROP TABLE users; --" would be directly embedded in the query.

🛡️ Proposed fix to validate column names
+import re
+
+def _validate_column_name(name: str) -> str:
+    """Validate column name contains only safe characters."""
+    if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
+        raise ValueError(f'Invalid column name: {name}')
+    return name
+
 async def update_rows_json(
     self, table_name: str, filter: Dict[str, Any], data: Dict[str, Any]
 ) -> None:
+    # Validate column names to prevent SQL injection
+    for key in list(data.keys()) + list(filter.keys()):
+        _validate_column_name(key)
+
     set_params = {f'set_{k}': v for k, v in data.items()}
     where_params = {f'where_{k}': v for k, v in filter.items()}

Consider also validating table_name with the same pattern, or using a whitelist of allowed table names.

🧰 Tools
🪛 Ruff (0.15.9)

[error] 68-68: Function argument filter is shadowing a Python builtin

(A002)


[error] 76-76: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/plugins/datasource/datasource/redshift/__init__.py` around
lines 67 - 78, The update_rows_json method is vulnerable because it interpolates
table_name and column names from data.keys() and filter.keys() directly into
SQL; validate and sanitize those identifiers (and preferably table_name) before
building set_clause/where_clause—e.g., enforce a strict whitelist or regex like
alphanumeric + underscores only and raise on invalid identifiers, then construct
parameter names from the validated identifiers (use the validated identifier
strings for the f'{col} = :set_{col}' pieces) so only safe column names are
inserted into the query; alternatively implement a table-name whitelist and
reject or escape any name not allowed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant