From 7a9371b3b7d92239b4b2b8d0ff67fd1e6f614b39 Mon Sep 17 00:00:00 2001 From: Upstash Box Date: Wed, 18 Mar 2026 09:36:08 +0000 Subject: [PATCH 1/6] UPS-9: update workflow-py --- tests/test_redact.py | 140 ++++++++++++++++++ upstash_workflow/asyncio/context/context.py | 3 + upstash_workflow/asyncio/serve/options.py | 4 + upstash_workflow/asyncio/serve/serve.py | 10 +- upstash_workflow/asyncio/workflow_requests.py | 4 +- upstash_workflow/context/context.py | 3 + upstash_workflow/serve/options.py | 4 + upstash_workflow/serve/serve.py | 10 +- upstash_workflow/types.py | 16 ++ upstash_workflow/workflow_requests.py | 23 ++- 10 files changed, 211 insertions(+), 6 deletions(-) create mode 100644 tests/test_redact.py diff --git a/tests/test_redact.py b/tests/test_redact.py new file mode 100644 index 0000000..d4452ea --- /dev/null +++ b/tests/test_redact.py @@ -0,0 +1,140 @@ +"""Tests for redacted fields functionality.""" + +from upstash_workflow.workflow_requests import _get_headers +from upstash_workflow.types import Redact + + +def test_redact_body_only() -> None: + """Test redacting only the body.""" + redact: Redact = {"body": True} + + result = _get_headers( + "true", + "wfr-test-id", + "https://example.com", + None, + None, + 3, + redact=redact, + ) + + assert result.headers["Upstash-Redact-Fields"] == "body" + + +def test_redact_header_all() -> None: + """Test redacting all headers.""" + redact: Redact = {"header": True} + + result = _get_headers( + "true", + "wfr-test-id", + "https://example.com", + None, + None, + 3, + redact=redact, + ) + + assert result.headers["Upstash-Redact-Fields"] == "header" + + +def test_redact_body_and_header_all() -> None: + """Test redacting body and all headers.""" + redact: Redact = {"body": True, "header": True} + + result = _get_headers( + "true", + "wfr-test-id", + "https://example.com", + None, + None, + 3, + redact=redact, + ) + + assert result.headers["Upstash-Redact-Fields"] == "body,header" + + +def test_redact_specific_headers() -> None: + """Test redacting specific headers.""" + redact: Redact = {"header": ["Authorization"]} + + result = _get_headers( + "true", + "wfr-test-id", + "https://example.com", + None, + None, + 3, + redact=redact, + ) + + assert result.headers["Upstash-Redact-Fields"] == "header[Authorization]" + + +def test_redact_body_and_specific_headers() -> None: + """Test redacting body and specific headers.""" + redact: Redact = {"body": True, "header": ["Authorization", "X-API-Key"]} + + result = _get_headers( + "true", + "wfr-test-id", + "https://example.com", + None, + None, + 3, + redact=redact, + ) + + assert result.headers["Upstash-Redact-Fields"] == "body,header[Authorization],header[X-API-Key]" + + +def test_redact_with_failure_url() -> None: + """Test redacting with failure URL sets both headers.""" + redact: Redact = {"body": True, "header": ["Authorization"]} + + result = _get_headers( + "true", + "wfr-test-id", + "https://example.com", + None, + None, + 3, + workflow_failure_url="https://failure.com", + redact=redact, + ) + + assert result.headers["Upstash-Redact-Fields"] == "body,header[Authorization]" + assert result.headers["Upstash-Failure-Callback-Redact-Fields"] == "body,header[Authorization]" + + +def test_no_redact() -> None: + """Test that no redact header is added when redact is None.""" + result = _get_headers( + "true", + "wfr-test-id", + "https://example.com", + None, + None, + 3, + redact=None, + ) + + assert "Upstash-Redact-Fields" not in result.headers + + +def test_redact_empty_header_list() -> None: + """Test that empty header list doesn't add redact parts.""" + redact: Redact = {"header": []} + + result = _get_headers( + "true", + "wfr-test-id", + "https://example.com", + None, + None, + 3, + redact=redact, + ) + + assert "Upstash-Redact-Fields" not in result.headers diff --git a/upstash_workflow/asyncio/context/context.py b/upstash_workflow/asyncio/context/context.py index 68d0d2b..9268133 100644 --- a/upstash_workflow/asyncio/context/context.py +++ b/upstash_workflow/asyncio/context/context.py @@ -27,6 +27,7 @@ HTTPMethods, CallResponse, CallResponseDict, + Redact, ) TInitialPayload = TypeVar("TInitialPayload") @@ -51,6 +52,7 @@ def __init__( initial_payload: TInitialPayload, env: Optional[Dict[str, Optional[str]]] = None, retries: Optional[int] = None, + redact: Optional[Redact] = None, ): self.qstash_client: AsyncQStash = qstash_client self.workflow_run_id: str = workflow_run_id @@ -61,6 +63,7 @@ def __init__( self.request_payload: TInitialPayload = initial_payload self.env: Dict[str, Optional[str]] = env or {} self.retries: int = DEFAULT_RETRIES if retries is None else retries + self.redact: Optional[Redact] = redact self._executor: _AutoExecutor = _AutoExecutor(self, self._steps) async def run( diff --git a/upstash_workflow/asyncio/serve/options.py b/upstash_workflow/asyncio/serve/options.py index f229d84..2996551 100644 --- a/upstash_workflow/asyncio/serve/options.py +++ b/upstash_workflow/asyncio/serve/options.py @@ -11,6 +11,7 @@ ) from upstash_workflow.types import ( _FinishCondition, + Redact, ) from upstash_workflow import AsyncWorkflowContext @@ -34,6 +35,7 @@ class ServeOptions(Generic[TInitialPayload, TResponse]): Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]] ] failure_url: Optional[str] + redact: Optional[Redact] @dataclass @@ -57,6 +59,7 @@ def _process_options( Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]] ] = None, failure_url: Optional[str] = None, + redact: Optional[Redact] = None, ) -> ServeBaseOptions[TInitialPayload, TResponse]: environment = env if env is not None else dict(os.environ) @@ -135,6 +138,7 @@ def _initial_payload_parser(initial_request: str) -> TInitialPayload: url=url, failure_url=failure_url, failure_function=failure_function, + redact=redact, ) diff --git a/upstash_workflow/asyncio/serve/serve.py b/upstash_workflow/asyncio/serve/serve.py index f1d5bc7..75604fc 100644 --- a/upstash_workflow/asyncio/serve/serve.py +++ b/upstash_workflow/asyncio/serve/serve.py @@ -19,7 +19,7 @@ from upstash_workflow.asyncio.serve.options import _process_options from upstash_workflow.error import _format_workflow_error from upstash_workflow import AsyncWorkflowContext -from upstash_workflow.types import _FinishCondition +from upstash_workflow.types import _FinishCondition, Redact from upstash_workflow.asyncio.serve.authorization import _DisabledWorkflowContext _logger = logging.getLogger(__name__) @@ -44,6 +44,7 @@ def _serve_base( Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]] ] = None, failure_url: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Dict[str, Callable[[TRequest], Awaitable[TResponse]]]: processed_options = _process_options( qstash_client=qstash_client, @@ -56,6 +57,7 @@ def _serve_base( url=url, failure_function=failure_function, failure_url=failure_url, + redact=redact, ) qstash_client = processed_options.qstash_client on_step_finish = processed_options.on_step_finish @@ -67,6 +69,7 @@ def _serve_base( url = processed_options.url failure_url = processed_options.failure_url failure_function = processed_options.failure_function + redact = processed_options.redact async def _handler(request: TRequest) -> TResponse: workflow_url, workflow_failure_url = _determine_urls( @@ -119,6 +122,7 @@ async def _handler(request: TRequest) -> TResponse: env=env, retries=retries, failure_url=workflow_failure_url, + redact=redact, ) auth_check = await _DisabledWorkflowContext[Any].try_authentication( @@ -146,7 +150,7 @@ async def _handler(request: TRequest) -> TResponse: if call_return_check == "continue-workflow": if is_first_invocation: - await _trigger_first_invocation(workflow_context, retries) + await _trigger_first_invocation(workflow_context, retries, redact) else: async def on_step() -> None: @@ -188,6 +192,7 @@ def serve( Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]] ] = None, failure_url: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Dict[str, Callable[[TRequest], Awaitable[TResponse]]]: """ Creates a method that handles incoming requests and runs the provided @@ -215,4 +220,5 @@ def serve( url=url, failure_function=failure_function, failure_url=failure_url, + redact=redact, ) diff --git a/upstash_workflow/asyncio/workflow_requests.py b/upstash_workflow/asyncio/workflow_requests.py index 467fbff..e0f1f05 100644 --- a/upstash_workflow/asyncio/workflow_requests.py +++ b/upstash_workflow/asyncio/workflow_requests.py @@ -16,7 +16,7 @@ from upstash_workflow.constants import ( WORKFLOW_ID_HEADER, ) -from upstash_workflow.types import StepTypes +from upstash_workflow.types import StepTypes, Redact from upstash_workflow.workflow_types import _AsyncRequest from upstash_workflow.workflow_requests import _get_headers, _recreate_user_headers @@ -31,6 +31,7 @@ async def _trigger_first_invocation( workflow_context: AsyncWorkflowContext[TInitialPayload], retries: int, + redact: Optional[Redact] = None, ) -> None: headers = _get_headers( "true", @@ -39,6 +40,7 @@ async def _trigger_first_invocation( workflow_context.headers, None, retries, + redact=redact, ).headers await workflow_context.qstash_client.message.publish_json( diff --git a/upstash_workflow/context/context.py b/upstash_workflow/context/context.py index 281b5c1..8cabaff 100644 --- a/upstash_workflow/context/context.py +++ b/upstash_workflow/context/context.py @@ -26,6 +26,7 @@ HTTPMethods, CallResponse, CallResponseDict, + Redact, ) TInitialPayload = TypeVar("TInitialPayload") @@ -50,6 +51,7 @@ def __init__( initial_payload: TInitialPayload, env: Optional[Dict[str, Optional[str]]] = None, retries: Optional[int] = None, + redact: Optional[Redact] = None, ): self.qstash_client: QStash = qstash_client self.workflow_run_id: str = workflow_run_id @@ -60,6 +62,7 @@ def __init__( self.request_payload: TInitialPayload = initial_payload self.env: Dict[str, Optional[str]] = env or {} self.retries: int = DEFAULT_RETRIES if retries is None else retries + self.redact: Optional[Redact] = redact self._executor: _AutoExecutor = _AutoExecutor(self, self._steps) def run( diff --git a/upstash_workflow/serve/options.py b/upstash_workflow/serve/options.py index fb3743b..464baac 100644 --- a/upstash_workflow/serve/options.py +++ b/upstash_workflow/serve/options.py @@ -23,6 +23,7 @@ ) from upstash_workflow.types import ( _FinishCondition, + Redact, ) from upstash_workflow import WorkflowContext from dataclasses import dataclass @@ -46,6 +47,7 @@ class ServeOptions(Generic[TInitialPayload, TResponse]): Callable[[WorkflowContext[TInitialPayload], int, str, Dict[str, str]], Any] ] failure_url: Optional[str] + redact: Optional[Redact] @dataclass @@ -69,6 +71,7 @@ def _process_options( Callable[[WorkflowContext, int, str, Dict[str, str]], Any] ] = None, failure_url: Optional[str] = None, + redact: Optional[Redact] = None, ) -> ServeBaseOptions[TInitialPayload, TResponse]: """ Fills the options with default values if they are not provided. @@ -160,6 +163,7 @@ def _initial_payload_parser(initial_request: str) -> TInitialPayload: url=url, failure_url=failure_url, failure_function=failure_function, + redact=redact, ) diff --git a/upstash_workflow/serve/serve.py b/upstash_workflow/serve/serve.py index 8476ee2..55dd99a 100644 --- a/upstash_workflow/serve/serve.py +++ b/upstash_workflow/serve/serve.py @@ -20,7 +20,7 @@ from upstash_workflow.serve.options import _process_options, _determine_urls from upstash_workflow.error import _format_workflow_error from upstash_workflow import WorkflowContext -from upstash_workflow.types import _FinishCondition +from upstash_workflow.types import _FinishCondition, Redact from upstash_workflow.serve.authorization import _DisabledWorkflowContext _logger = logging.getLogger(__name__) @@ -45,6 +45,7 @@ def _serve_base( Callable[[WorkflowContext, int, str, Dict[str, str]], Any] ] = None, failure_url: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Dict[str, Callable[[TRequest], TResponse]]: processed_options = _process_options( qstash_client=qstash_client, @@ -57,6 +58,7 @@ def _serve_base( url=url, failure_function=failure_function, failure_url=failure_url, + redact=redact, ) qstash_client = processed_options.qstash_client on_step_finish = processed_options.on_step_finish @@ -68,6 +70,7 @@ def _serve_base( url = processed_options.url failure_url = processed_options.failure_url failure_function = processed_options.failure_function + redact = processed_options.redact def _handler(request: TRequest) -> TResponse: """ @@ -129,6 +132,7 @@ def _handler(request: TRequest) -> TResponse: env=env, retries=retries, failure_url=workflow_failure_url, + redact=redact, ) auth_check = _DisabledWorkflowContext[Any].try_authentication( @@ -156,7 +160,7 @@ def _handler(request: TRequest) -> TResponse: if call_return_check == "continue-workflow": if is_first_invocation: - _trigger_first_invocation(workflow_context, retries) + _trigger_first_invocation(workflow_context, retries, redact) else: def on_step() -> None: @@ -198,6 +202,7 @@ def serve( Callable[[WorkflowContext, int, str, Dict[str, str]], Any] ] = None, failure_url: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Dict[str, Callable[[TRequest], TResponse]]: """ Creates a method that handles incoming requests and runs the provided @@ -225,4 +230,5 @@ def serve( url=url, failure_function=failure_function, failure_url=failure_url, + redact=redact, ) diff --git a/upstash_workflow/types.py b/upstash_workflow/types.py index 73efdf8..ba188ec 100644 --- a/upstash_workflow/types.py +++ b/upstash_workflow/types.py @@ -100,3 +100,19 @@ class CallResponseDict(TypedDict): status: int body: Any header: Dict[str, List[str]] + + +class Redact(TypedDict, total=False): + """ + Configuration for redacting fields in logs. + """ + + body: Literal[True] + """Redact the request body in logs.""" + + header: Union[Literal[True], List[str]] + """ + Redact headers in logs. + - `True` to redact all headers + - List of header names to redact specific headers (e.g., ["Authorization"]) + """ diff --git a/upstash_workflow/workflow_requests.py b/upstash_workflow/workflow_requests.py index 8863b0a..82a4cfd 100644 --- a/upstash_workflow/workflow_requests.py +++ b/upstash_workflow/workflow_requests.py @@ -25,7 +25,7 @@ DEFAULT_CONTENT_TYPE, DEFAULT_RETRIES, ) -from upstash_workflow.types import StepTypes, DefaultStep, _HeadersResponse +from upstash_workflow.types import StepTypes, DefaultStep, _HeadersResponse, Redact from upstash_workflow.workflow_types import _SyncRequest if TYPE_CHECKING: @@ -39,6 +39,7 @@ def _trigger_first_invocation( workflow_context: WorkflowContext[TInitialPayload], retries: int, + redact: Optional[Redact] = None, ) -> None: headers = _get_headers( "true", @@ -47,6 +48,7 @@ def _trigger_first_invocation( workflow_context.headers, None, retries, + redact=redact, ).headers workflow_context.qstash_client.message.publish_json( @@ -263,6 +265,7 @@ def _get_headers( call_retries: Optional[int] = None, call_timeout: Optional[Union[int, str]] = None, workflow_failure_url: Optional[str] = None, + redact: Optional[Redact] = None, ) -> _HeadersResponse: """ Gets headers for calling QStash @@ -357,6 +360,24 @@ def _get_headers( header_value ) + # Add redact headers if specified + if redact is not None: + redact_parts = [] + if redact.get("body"): + redact_parts.append("body") + if redact.get("header") is not None: + if redact["header"] is True: + redact_parts.append("header") + elif isinstance(redact["header"], list) and len(redact["header"]) > 0: + for header_name in redact["header"]: + redact_parts.append(f"header[{header_name}]") + if redact_parts: + base_headers["Upstash-Redact-Fields"] = ",".join(redact_parts) + if workflow_failure_url: + base_headers["Upstash-Failure-Callback-Redact-Fields"] = ",".join( + redact_parts + ) + content_type = user_headers.get("Content-Type") if user_headers else None content_type = DEFAULT_CONTENT_TYPE if content_type is None else content_type From 5e175b3215a740724c3b21ed5e6cfc2b13950cba Mon Sep 17 00:00:00 2001 From: CahidArda Date: Thu, 19 Mar 2026 01:17:31 +0300 Subject: [PATCH 2/6] UPS-9: delegate redact header handling to qstash client Instead of manually computing Upstash-Redact-Fields headers in _get_headers, pass the redact parameter directly to qstash_client.message.publish_json which handles it natively. Bump qstash dependency to ^3.4.0 and version to 0.1.5. Co-Authored-By: Claude Opus 4.6 (1M context) --- pyproject.toml | 4 +- tests/test_redact.py | 166 ++++++------------ upstash_workflow/__init__.py | 2 +- upstash_workflow/asyncio/workflow_requests.py | 2 +- upstash_workflow/workflow_requests.py | 21 +-- 5 files changed, 59 insertions(+), 136 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9b69116..859a72c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "upstash-workflow" -version = "0.1.4" +version = "0.1.5" description = "Python SDK for Upstash Workflow" license = "MIT" authors = ["Upstash "] @@ -29,7 +29,7 @@ packages = [{ include = "upstash_workflow" }] [tool.poetry.dependencies] python = "^3.8" -qstash = "^2.0.3" +qstash = "^3.4.0" [tool.poetry.group.fastapi.dependencies] fastapi = "^0.115.0" diff --git a/tests/test_redact.py b/tests/test_redact.py index d4452ea..dd678e3 100644 --- a/tests/test_redact.py +++ b/tests/test_redact.py @@ -1,140 +1,82 @@ -"""Tests for redacted fields functionality.""" +"""Tests for redact parameter being passed to qstash client.""" -from upstash_workflow.workflow_requests import _get_headers +from unittest.mock import MagicMock, patch +from upstash_workflow.workflow_requests import _trigger_first_invocation from upstash_workflow.types import Redact -def test_redact_body_only() -> None: - """Test redacting only the body.""" +def _make_workflow_context(): + ctx = MagicMock() + ctx.workflow_run_id = "wfr-test-id" + ctx.url = "https://example.com" + ctx.headers = {} + ctx.request_payload = '{"test": true}' + ctx.qstash_client.message.publish_json = MagicMock() + return ctx + + +def test_trigger_passes_redact_body() -> None: + """Test that redact with body is passed to publish_json.""" + ctx = _make_workflow_context() redact: Redact = {"body": True} - result = _get_headers( - "true", - "wfr-test-id", - "https://example.com", - None, - None, - 3, - redact=redact, - ) + _trigger_first_invocation(ctx, retries=3, redact=redact) - assert result.headers["Upstash-Redact-Fields"] == "body" + ctx.qstash_client.message.publish_json.assert_called_once() + call_kwargs = ctx.qstash_client.message.publish_json.call_args + assert call_kwargs.kwargs["redact"] == {"body": True} -def test_redact_header_all() -> None: - """Test redacting all headers.""" +def test_trigger_passes_redact_header_all() -> None: + """Test that redact with all headers is passed to publish_json.""" + ctx = _make_workflow_context() redact: Redact = {"header": True} - result = _get_headers( - "true", - "wfr-test-id", - "https://example.com", - None, - None, - 3, - redact=redact, - ) + _trigger_first_invocation(ctx, retries=3, redact=redact) - assert result.headers["Upstash-Redact-Fields"] == "header" + call_kwargs = ctx.qstash_client.message.publish_json.call_args + assert call_kwargs.kwargs["redact"] == {"header": True} -def test_redact_body_and_header_all() -> None: - """Test redacting body and all headers.""" - redact: Redact = {"body": True, "header": True} +def test_trigger_passes_redact_specific_headers() -> None: + """Test that redact with specific headers is passed to publish_json.""" + ctx = _make_workflow_context() + redact: Redact = {"header": ["Authorization", "X-API-Key"]} - result = _get_headers( - "true", - "wfr-test-id", - "https://example.com", - None, - None, - 3, - redact=redact, - ) + _trigger_first_invocation(ctx, retries=3, redact=redact) - assert result.headers["Upstash-Redact-Fields"] == "body,header" + call_kwargs = ctx.qstash_client.message.publish_json.call_args + assert call_kwargs.kwargs["redact"] == {"header": ["Authorization", "X-API-Key"]} -def test_redact_specific_headers() -> None: - """Test redacting specific headers.""" - redact: Redact = {"header": ["Authorization"]} +def test_trigger_passes_redact_body_and_headers() -> None: + """Test that redact with body and specific headers is passed to publish_json.""" + ctx = _make_workflow_context() + redact: Redact = {"body": True, "header": ["Authorization"]} - result = _get_headers( - "true", - "wfr-test-id", - "https://example.com", - None, - None, - 3, - redact=redact, - ) + _trigger_first_invocation(ctx, retries=3, redact=redact) - assert result.headers["Upstash-Redact-Fields"] == "header[Authorization]" + call_kwargs = ctx.qstash_client.message.publish_json.call_args + assert call_kwargs.kwargs["redact"] == {"body": True, "header": ["Authorization"]} -def test_redact_body_and_specific_headers() -> None: - """Test redacting body and specific headers.""" - redact: Redact = {"body": True, "header": ["Authorization", "X-API-Key"]} +def test_trigger_passes_no_redact() -> None: + """Test that redact=None is passed when no redact specified.""" + ctx = _make_workflow_context() - result = _get_headers( - "true", - "wfr-test-id", - "https://example.com", - None, - None, - 3, - redact=redact, - ) + _trigger_first_invocation(ctx, retries=3, redact=None) - assert result.headers["Upstash-Redact-Fields"] == "body,header[Authorization],header[X-API-Key]" + call_kwargs = ctx.qstash_client.message.publish_json.call_args + assert call_kwargs.kwargs["redact"] is None -def test_redact_with_failure_url() -> None: - """Test redacting with failure URL sets both headers.""" +def test_trigger_no_redact_headers_in_headers() -> None: + """Test that Upstash-Redact-Fields is NOT in the headers (qstash client handles it).""" + ctx = _make_workflow_context() redact: Redact = {"body": True, "header": ["Authorization"]} - result = _get_headers( - "true", - "wfr-test-id", - "https://example.com", - None, - None, - 3, - workflow_failure_url="https://failure.com", - redact=redact, - ) - - assert result.headers["Upstash-Redact-Fields"] == "body,header[Authorization]" - assert result.headers["Upstash-Failure-Callback-Redact-Fields"] == "body,header[Authorization]" - - -def test_no_redact() -> None: - """Test that no redact header is added when redact is None.""" - result = _get_headers( - "true", - "wfr-test-id", - "https://example.com", - None, - None, - 3, - redact=None, - ) - - assert "Upstash-Redact-Fields" not in result.headers - - -def test_redact_empty_header_list() -> None: - """Test that empty header list doesn't add redact parts.""" - redact: Redact = {"header": []} - - result = _get_headers( - "true", - "wfr-test-id", - "https://example.com", - None, - None, - 3, - redact=redact, - ) - - assert "Upstash-Redact-Fields" not in result.headers + _trigger_first_invocation(ctx, retries=3, redact=redact) + + call_kwargs = ctx.qstash_client.message.publish_json.call_args + headers = call_kwargs.kwargs["headers"] + assert "Upstash-Redact-Fields" not in headers diff --git a/upstash_workflow/__init__.py b/upstash_workflow/__init__.py index 748b76e..9d752a6 100644 --- a/upstash_workflow/__init__.py +++ b/upstash_workflow/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.1.4" +__version__ = "0.1.5" from upstash_workflow.context.context import WorkflowContext from upstash_workflow.serve.serve import serve diff --git a/upstash_workflow/asyncio/workflow_requests.py b/upstash_workflow/asyncio/workflow_requests.py index e0f1f05..9a0df16 100644 --- a/upstash_workflow/asyncio/workflow_requests.py +++ b/upstash_workflow/asyncio/workflow_requests.py @@ -40,13 +40,13 @@ async def _trigger_first_invocation( workflow_context.headers, None, retries, - redact=redact, ).headers await workflow_context.qstash_client.message.publish_json( url=workflow_context.url, body=workflow_context.request_payload, headers=headers, + redact=redact, ) diff --git a/upstash_workflow/workflow_requests.py b/upstash_workflow/workflow_requests.py index 82a4cfd..3aced72 100644 --- a/upstash_workflow/workflow_requests.py +++ b/upstash_workflow/workflow_requests.py @@ -48,13 +48,13 @@ def _trigger_first_invocation( workflow_context.headers, None, retries, - redact=redact, ).headers workflow_context.qstash_client.message.publish_json( url=workflow_context.url, body=workflow_context.request_payload, headers=headers, + redact=redact, ) @@ -265,7 +265,6 @@ def _get_headers( call_retries: Optional[int] = None, call_timeout: Optional[Union[int, str]] = None, workflow_failure_url: Optional[str] = None, - redact: Optional[Redact] = None, ) -> _HeadersResponse: """ Gets headers for calling QStash @@ -360,24 +359,6 @@ def _get_headers( header_value ) - # Add redact headers if specified - if redact is not None: - redact_parts = [] - if redact.get("body"): - redact_parts.append("body") - if redact.get("header") is not None: - if redact["header"] is True: - redact_parts.append("header") - elif isinstance(redact["header"], list) and len(redact["header"]) > 0: - for header_name in redact["header"]: - redact_parts.append(f"header[{header_name}]") - if redact_parts: - base_headers["Upstash-Redact-Fields"] = ",".join(redact_parts) - if workflow_failure_url: - base_headers["Upstash-Failure-Callback-Redact-Fields"] = ",".join( - redact_parts - ) - content_type = user_headers.get("Content-Type") if user_headers else None content_type = DEFAULT_CONTENT_TYPE if content_type is None else content_type From f1740cfc02d420344e96a34ed5cf0c798886ca2b Mon Sep 17 00:00:00 2001 From: CahidArda Date: Thu, 19 Mar 2026 01:18:46 +0300 Subject: [PATCH 3/6] fix: fmt --- tests/test_redact.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_redact.py b/tests/test_redact.py index dd678e3..6b5fd29 100644 --- a/tests/test_redact.py +++ b/tests/test_redact.py @@ -1,6 +1,6 @@ """Tests for redact parameter being passed to qstash client.""" -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock from upstash_workflow.workflow_requests import _trigger_first_invocation from upstash_workflow.types import Redact From 7558bdb400bc36b40fbd35443ccc454f3391ed14 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Thu, 19 Mar 2026 01:23:01 +0300 Subject: [PATCH 4/6] docs: add guidelines for linting and type checking in AGENTS.md --- AGENTS.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..c6c29c2 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,13 @@ +# Agents + +## Linting and type checking + +Before committing any changes, always run: + +``` +poetry run ruff format . +poetry run ruff check . +poetry run mypy --show-error-codes . +``` + +Fix all errors before committing. Do not commit code with unused imports, formatting issues, or type errors. From 3eb6fb8b77765b4af2759e45132623f8e572348a Mon Sep 17 00:00:00 2001 From: CahidArda Date: Tue, 24 Mar 2026 12:12:52 +0300 Subject: [PATCH 5/6] UPS-9: fix batch header prefixing for qstash v3 and add redact tests Bypass qstash v3 SDK's batch_json which auto-prefixes all headers with Upstash-Forward-, corrupting workflow control headers. Construct the batch body manually and send via the HTTP client directly. Also add integration tests for triggering workflows with redact fields. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/asyncio/test_context.py | 37 ++++++++ tests/asyncio/utils.py | 7 +- tests/test_context.py | 36 +++++++ tests/utils.py | 11 ++- .../asyncio/context/auto_executor.py | 95 ++++++++++++------- upstash_workflow/context/auto_executor.py | 95 ++++++++++++------- 6 files changed, 207 insertions(+), 74 deletions(-) diff --git a/tests/asyncio/test_context.py b/tests/asyncio/test_context.py index 922d8a2..96549c8 100644 --- a/tests/asyncio/test_context.py +++ b/tests/asyncio/test_context.py @@ -2,6 +2,8 @@ from qstash import AsyncQStash from upstash_workflow import AsyncWorkflowContext from upstash_workflow.error import WorkflowAbort +from upstash_workflow.types import Redact +from upstash_workflow.asyncio.workflow_requests import _trigger_first_invocation from tests.utils import ( RequestFields, ResponseFields, @@ -103,3 +105,38 @@ async def execute() -> None: ], ), ) + + +@pytest.mark.asyncio +async def test_trigger_workflow_with_redact(qstash_client: AsyncQStash) -> None: + redact: Redact = {"body": True, "header": ["Authorization"]} + + context = AsyncWorkflowContext( + qstash_client=qstash_client, + workflow_run_id="wfr-id", + headers={}, + steps=[], + url=WORKFLOW_ENDPOINT, + initial_payload="my-payload", + env=None, + retries=3, + failure_url=None, + redact=redact, + ) + + async def execute() -> None: + await _trigger_first_invocation(context, retries=3, redact=redact) + + await mock_qstash_server( + execute=execute, + response_fields=ResponseFields(status=200, body="msgId"), + receives_request=RequestFields( + method="POST", + url=f"{MOCK_QSTASH_SERVER_URL}/v2/publish/{WORKFLOW_ENDPOINT}", + token="mock-token", + body="my-payload", + headers={ + "Upstash-Redact-Fields": "body,header[Authorization]", + }, + ), + ) diff --git a/tests/asyncio/utils.py b/tests/asyncio/utils.py index cafebf0..743fa6e 100644 --- a/tests/asyncio/utils.py +++ b/tests/asyncio/utils.py @@ -51,8 +51,13 @@ async def handler(request: web.Request) -> web.Response: text=f"assertion in mock QStash failed: {str(error)}", status=400 ) + if "/v2/batch" in str(request.url): + data = [{"messageId": response_fields.body, "deduplicated": False}] + else: + data = {"messageId": response_fields.body, "deduplicated": False} + return web.json_response( - data=[{"messageId": response_fields.body, "deduplicated": False}], + data=data, status=response_fields.status, ) diff --git a/tests/test_context.py b/tests/test_context.py index b9290f2..fe274f0 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -2,6 +2,8 @@ from qstash import QStash from upstash_workflow import WorkflowContext from upstash_workflow.error import WorkflowAbort +from upstash_workflow.types import Redact +from upstash_workflow.workflow_requests import _trigger_first_invocation from tests.utils import ( mock_qstash_server, RequestFields, @@ -102,3 +104,37 @@ def execute() -> None: ], ), ) + + +def test_trigger_workflow_with_redact(qstash_client: QStash) -> None: + redact: Redact = {"body": True, "header": ["Authorization"]} + + context = WorkflowContext( + qstash_client=qstash_client, + workflow_run_id="wfr-id", + headers={}, + steps=[], + url=WORKFLOW_ENDPOINT, + initial_payload="my-payload", + env=None, + retries=3, + failure_url=None, + redact=redact, + ) + + def execute() -> None: + _trigger_first_invocation(context, retries=3, redact=redact) + + mock_qstash_server( + execute=execute, + response_fields=ResponseFields(status=200, body="msgId"), + receives_request=RequestFields( + method="POST", + url=f"{MOCK_QSTASH_SERVER_URL}/v2/publish/{WORKFLOW_ENDPOINT}", + token="mock-token", + body="my-payload", + headers={ + "Upstash-Redact-Fields": "body,header[Authorization]", + }, + ), + ) diff --git a/tests/utils.py b/tests/utils.py index b2a41a4..0f5e05a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -108,9 +108,14 @@ def handle_request(self) -> None: ) return - response_data = json.dumps( - [{"messageId": response_fields.body, "deduplicated": False}] - ) + if "/v2/batch" in self.path: + response_data = json.dumps( + [{"messageId": response_fields.body, "deduplicated": False}] + ) + else: + response_data = json.dumps( + {"messageId": response_fields.body, "deduplicated": False} + ) self.send_response(response_fields.status) self.send_header("Content-type", "application/json") diff --git a/upstash_workflow/asyncio/context/auto_executor.py b/upstash_workflow/asyncio/context/auto_executor.py index b1b4c13..00a037e 100644 --- a/upstash_workflow/asyncio/context/auto_executor.py +++ b/upstash_workflow/asyncio/context/auto_executor.py @@ -1,11 +1,10 @@ from __future__ import annotations from typing import TYPE_CHECKING, List, Union, Literal, cast, Any, TypeVar import json -from qstash.message import BatchJsonRequest from upstash_workflow.constants import NO_CONCURRENCY from upstash_workflow.error import WorkflowError, WorkflowAbort from upstash_workflow.workflow_requests import _get_headers -from upstash_workflow.types import DefaultStep, HTTPMethods +from upstash_workflow.types import DefaultStep from upstash_workflow.asyncio.context.steps import _BaseLazyStep, _LazyCallStep if TYPE_CHECKING: @@ -72,7 +71,7 @@ async def submit_steps_to_qstash( f"Unable to submit steps to QStash. Provided list is empty. Current step: {self.step_count}" ) - batch_requests = [] + batch_body = [] for index, single_step in enumerate(steps): lazy_step = lazy_steps[index] headers = _get_headers( @@ -93,41 +92,67 @@ async def submit_steps_to_qstash( single_step.out = json.dumps(single_step.out) - batch_requests.append( - BatchJsonRequest( - headers=headers, - method=cast(HTTPMethods, single_step.call_method), - body=single_step.call_body, - url=single_step.call_url, - ) - if single_step.call_url - else ( - BatchJsonRequest( - headers=headers, - body={ - "method": "POST", - "stepId": single_step.step_id, - "stepName": single_step.step_name, - "stepType": single_step.step_type, - "out": single_step.out, - "sleepFor": single_step.sleep_for, - "sleepUntil": single_step.sleep_until, - "concurrent": single_step.concurrent, - "targetStep": single_step.target_step, - "callUrl": single_step.call_url, - "callMethod": single_step.call_method, - "callBody": single_step.call_body, - "callHeaders": single_step.call_headers, + if single_step.call_url: + batch_body.append( + { + "destination": single_step.call_url, + "headers": { + "Content-Type": "application/json", + "Upstash-Method": single_step.call_method, + **headers, }, - url=self.context.url, - not_before=cast( # TODO: Change not_before type in BatchJsonRequest - Any, single_step.sleep_until if will_wait else None + "body": json.dumps(single_step.call_body), + "queue": None, + } + ) + else: + step_headers = { + "Content-Type": "application/json", + **headers, + } + + sleep_until = single_step.sleep_until if will_wait else None + sleep_for = single_step.sleep_for if will_wait else None + + if sleep_until is not None: + step_headers["Upstash-Not-Before"] = str(sleep_until) + if sleep_for is not None: + if isinstance(sleep_for, int): + step_headers["Upstash-Delay"] = f"{sleep_for}s" + else: + step_headers["Upstash-Delay"] = str(sleep_for) + + batch_body.append( + { + "destination": self.context.url, + "headers": step_headers, + "body": json.dumps( + { + "method": "POST", + "stepId": single_step.step_id, + "stepName": single_step.step_name, + "stepType": single_step.step_type, + "out": single_step.out, + "sleepFor": single_step.sleep_for, + "sleepUntil": single_step.sleep_until, + "concurrent": single_step.concurrent, + "targetStep": single_step.target_step, + "callUrl": single_step.call_url, + "callMethod": single_step.call_method, + "callBody": single_step.call_body, + "callHeaders": single_step.call_headers, + } ), - delay=cast(Any, single_step.sleep_for if will_wait else None), - ) + "queue": None, + } ) - ) - await self.context.qstash_client.message.batch_json(batch_requests) + + await self.context.qstash_client.http.request( + path="/v2/batch", + body=json.dumps(batch_body), + headers={"Content-Type": "application/json"}, + method="POST", + ) raise WorkflowAbort(steps[0].step_name, steps[0]) diff --git a/upstash_workflow/context/auto_executor.py b/upstash_workflow/context/auto_executor.py index 63bed04..8c6afb2 100644 --- a/upstash_workflow/context/auto_executor.py +++ b/upstash_workflow/context/auto_executor.py @@ -1,11 +1,10 @@ from __future__ import annotations from typing import TYPE_CHECKING, List, Union, Literal, cast, Any, TypeVar import json -from qstash.message import BatchJsonRequest from upstash_workflow.constants import NO_CONCURRENCY from upstash_workflow.error import WorkflowError, WorkflowAbort from upstash_workflow.workflow_requests import _get_headers -from upstash_workflow.types import DefaultStep, HTTPMethods +from upstash_workflow.types import DefaultStep from upstash_workflow.context.steps import _BaseLazyStep, _LazyCallStep if TYPE_CHECKING: @@ -65,7 +64,7 @@ def submit_steps_to_qstash( f"Unable to submit steps to QStash. Provided list is empty. Current step: {self.step_count}" ) - batch_requests = [] + batch_body = [] for index, single_step in enumerate(steps): lazy_step = lazy_steps[index] headers = _get_headers( @@ -86,41 +85,67 @@ def submit_steps_to_qstash( single_step.out = json.dumps(single_step.out) - batch_requests.append( - BatchJsonRequest( - headers=headers, - method=cast(HTTPMethods, single_step.call_method), - body=single_step.call_body, - url=single_step.call_url, - ) - if single_step.call_url - else ( - BatchJsonRequest( - headers=headers, - body={ - "method": "POST", - "stepId": single_step.step_id, - "stepName": single_step.step_name, - "stepType": single_step.step_type, - "out": single_step.out, - "sleepFor": single_step.sleep_for, - "sleepUntil": single_step.sleep_until, - "concurrent": single_step.concurrent, - "targetStep": single_step.target_step, - "callUrl": single_step.call_url, - "callMethod": single_step.call_method, - "callBody": single_step.call_body, - "callHeaders": single_step.call_headers, + if single_step.call_url: + batch_body.append( + { + "destination": single_step.call_url, + "headers": { + "Content-Type": "application/json", + "Upstash-Method": single_step.call_method, + **headers, }, - url=self.context.url, - not_before=cast( # TODO: Change not_before type in BatchJsonRequest - Any, single_step.sleep_until if will_wait else None + "body": json.dumps(single_step.call_body), + "queue": None, + } + ) + else: + step_headers = { + "Content-Type": "application/json", + **headers, + } + + sleep_until = single_step.sleep_until if will_wait else None + sleep_for = single_step.sleep_for if will_wait else None + + if sleep_until is not None: + step_headers["Upstash-Not-Before"] = str(sleep_until) + if sleep_for is not None: + if isinstance(sleep_for, int): + step_headers["Upstash-Delay"] = f"{sleep_for}s" + else: + step_headers["Upstash-Delay"] = str(sleep_for) + + batch_body.append( + { + "destination": self.context.url, + "headers": step_headers, + "body": json.dumps( + { + "method": "POST", + "stepId": single_step.step_id, + "stepName": single_step.step_name, + "stepType": single_step.step_type, + "out": single_step.out, + "sleepFor": single_step.sleep_for, + "sleepUntil": single_step.sleep_until, + "concurrent": single_step.concurrent, + "targetStep": single_step.target_step, + "callUrl": single_step.call_url, + "callMethod": single_step.call_method, + "callBody": single_step.call_body, + "callHeaders": single_step.call_headers, + } ), - delay=cast(Any, single_step.sleep_for if will_wait else None), - ) + "queue": None, + } ) - ) - self.context.qstash_client.message.batch_json(batch_requests) + + self.context.qstash_client.http.request( + path="/v2/batch", + body=json.dumps(batch_body), + headers={"Content-Type": "application/json"}, + method="POST", + ) raise WorkflowAbort(steps[0].step_name, steps[0]) From 6a2f9277e8bfb2404dc43a26614315cddd9415cc Mon Sep 17 00:00:00 2001 From: CahidArda Date: Thu, 26 Mar 2026 11:52:28 +0300 Subject: [PATCH 6/6] fix: declare data variable in mock_qstash_server for clarity --- tests/asyncio/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/asyncio/utils.py b/tests/asyncio/utils.py index 743fa6e..f53b8ad 100644 --- a/tests/asyncio/utils.py +++ b/tests/asyncio/utils.py @@ -51,6 +51,7 @@ async def handler(request: web.Request) -> web.Response: text=f"assertion in mock QStash failed: {str(error)}", status=400 ) + data: Any if "/v2/batch" in str(request.url): data = [{"messageId": response_fields.body, "deduplicated": False}] else: