-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
22 lines (16 loc) · 764 Bytes
/
worker.py
File metadata and controls
22 lines (16 loc) · 764 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time
from camunda.external_task.external_task import ExternalTask, TaskResult
from camunda.external_task.external_task_worker import ExternalTaskWorker
import robot
import listener
import json
def handle_task(task: ExternalTask) -> TaskResult:
l = listener.getVariablesListener()
robotOutput = robot.run("test.robot", listener=l)
if(robotOutput!= 0):
return task.failure(error_message="RF-task failed", error_details="The RF task was not completed successfully. For more information open the log.html",
max_retries=0, retry_timeout=5000)
else:
result_json = json.dumps(l.camunda_result)
return task.complete({"result": result_json})
ExternalTaskWorker(worker_id="1").subscribe("test", handle_task)