Skip to content

Commit f85bb8e

Browse files
authored
Merge pull request #4 from eccenca/bugfix/updateToken-CMEM-6890
update cmempy user access token each time a workflow execution status…
2 parents 21b05de + 8dd54fd commit f85bb8e

4 files changed

Lines changed: 511 additions & 398 deletions

File tree

CHANGELOG.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,22 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this p
77

88
## [Unreleased]
99

10+
TODO: add at least one Added, Changed, Deprecated, Removed, Fixed or Security section
11+
12+
13+
## [0.8.0] 2025-09-10
14+
15+
### Added
16+
17+
- allow to cancel the workflow
18+
19+
### Fixed
20+
21+
- update cmempy user access token each time a workflow execution status is updated
22+
23+
24+
## [0.7.0] 2025-09-04
25+
1026
### Added
1127

1228
- extendend documentation

CLAUDE.md

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Essential Commands
6+
7+
This project uses **Task** (not Make) for build automation. All commands are defined in `Taskfile.yaml`:
8+
9+
- `task check` - Run all quality checks (lint, format, type-check, test)
10+
- `task lint` - Run ruff linting
11+
- `task format` - Auto-format code with ruff
12+
- `task type-check` - Run mypy type checking
13+
- `task test` - Run pytest unit tests
14+
- `task test:integration` - Run integration tests (requires CMEM instance)
15+
- `task build` - Build wheel package with poetry
16+
- `task install` - Install package in development mode
17+
- `task pre-commit` - Install pre-commit hooks
18+
19+
## Architecture Overview
20+
21+
### Core Components
22+
23+
**cmem_plugin_loopwf/task.py** - Main plugin implementation:
24+
- `StartWorkflow` class extends `WorkflowPlugin` from cmem-plugin-base
25+
- Takes input entities and starts sub-workflow for each entity
26+
- `WorkflowExecution` dataclass manages individual workflow execution status
27+
- `WorkflowExecutionList` handles queue-based execution with configurable parallelism
28+
- Implements polling-based workflow status monitoring with async execution
29+
- Supports both entity metadata and file content processing
30+
31+
**cmem_plugin_loopwf/workflow_type.py** - Workflow type definitions and utilities:
32+
- Custom parameter types for workflow selection
33+
- Dynamic value fetching and autocomplete integration
34+
35+
**cmem_plugin_loopwf/exceptions.py** - Custom exception classes:
36+
- Plugin-specific error handling and exception types
37+
38+
### Key Patterns
39+
40+
- **Dataclass Configuration**: Uses `@dataclass` for parameter models with type hints
41+
- **Decorator-Based Registration**: Plugins use `@Plugin` decorator from cmem-plugin-base
42+
- **Autocomplete Integration**: Custom parameter types with dynamic value fetching
43+
- **Async Workflow Execution**: Non-blocking workflow starts with status polling
44+
45+
### Project Structure
46+
47+
```
48+
cmem_plugin_loopwf/
49+
├── task.py # Main plugin logic
50+
├── workflow_type.py # Workflow type definitions and utilities
51+
├── exceptions.py # Custom exception classes
52+
├── loopwf.svg # Plugin icon
53+
└── __init__.py # Plugin registration
54+
tests/ # Unit and integration tests
55+
├── conftest.py # Test configuration
56+
├── test_task.py # Task plugin tests
57+
├── test_discovery.py # Plugin discovery tests
58+
├── test_workflow_type.py # Workflow type tests
59+
├── test_workflow_execution_list.py # Execution list tests
60+
└── utils.py # Test utilities
61+
```
62+
63+
## Development Workflow
64+
65+
### Dependencies
66+
- **Poetry** for package management (not pip/requirements.txt)
67+
- **Ruff** for linting and formatting (replaces black/isort/flake8)
68+
- **MyPy** for type checking
69+
- **Pre-commit** hooks for quality gates
70+
71+
### Testing
72+
- Unit tests in `tests/` directory using pytest
73+
- Integration tests require running CMEM instance
74+
- Test configuration in `pyproject.toml` under `[tool.pytest.ini_options]`
75+
76+
### Plugin Development
77+
- Extend `WorkflowPlugin` base class from cmem-plugin-base
78+
- Use type hints extensively - this is a strongly typed codebase
79+
- Follow the existing parameter model pattern with dataclasses
80+
- All plugins must be registered in `__init__.py`
81+
82+
### CMEM Integration
83+
- Plugin outputs must be compatible with CMEM's RDF/entity model
84+
- Uses `cmempy` library for CMEM API interactions (config, get_json, execute_workflow_io, get_workflows_io)
85+
- Authentication handled through user context propagation via `setup_cmempy_user_access`
86+
- All operations are project-scoped within CMEM instance
87+
- Supports async workflow execution via `/api/workflow/executeAsync` endpoint
88+
- Entity conversion to JSON for workflow input via replaceable datasets
89+
- File processing support with configurable MIME types for file-to-workflow scenarios

cmem_plugin_loopwf/task.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ def wait_until_finished(self) -> None:
160160

161161
def update(self) -> None:
162162
"""Update the execution status"""
163+
if self.execution_context:
164+
setup_cmempy_user_access(context=self.execution_context.user)
163165
response = get_json(
164166
f"{config.get_di_api_endpoint()}/workspace/activities/status",
165167
params={
@@ -189,11 +191,13 @@ def __init__(self):
189191

190192
def execute(self, parallel_execution: int) -> None:
191193
"""Execute all workflow executions"""
192-
while self.queued > 0:
193-
while self.running < parallel_execution and self.queued > 0:
194+
while self.queued > 0 and not self.is_canceling():
195+
while self.running < parallel_execution and self.queued > 0 and not self.is_canceling():
194196
self.start_next()
195197
self.report()
196198
self.wait_until_finished()
199+
if self.is_canceling():
200+
self.logger.info("Execution canceled - stopping workflow processing")
197201
self.report()
198202

199203
def start_next(self) -> bool:
@@ -206,9 +210,11 @@ def start_next(self) -> bool:
206210

207211
def wait_until_finished(self, polling_time: int = 1) -> None:
208212
"""Wait until all running workflows are finished"""
209-
while self.running > 0:
213+
while self.running > 0 and not self.is_canceling():
210214
sleep(polling_time)
211215
self.update_running_status()
216+
if self.is_canceling():
217+
self.logger.info("Cancellation detected during polling - stopping workflow monitoring")
212218

213219
def update_running_status(self) -> None:
214220
"""Update status of running workflows"""
@@ -247,6 +253,13 @@ def queued(self) -> int:
247253
"""Returns the number of queued workflows"""
248254
return len([_ for _ in self.statuses if _.is_queued])
249255

256+
def is_canceling(self) -> bool:
257+
"""Check if the workflow execution context is in canceling state"""
258+
if self.context and hasattr(self.context, "workflow") and self.context.workflow:
259+
status = self.context.workflow.status()
260+
return str(status) == "Canceling"
261+
return False
262+
250263

251264
@Plugin(
252265
label="Start Workflow per Entity",

0 commit comments

Comments
 (0)