Skip to content
50 changes: 22 additions & 28 deletions dev-resources/utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
[temporal.activity :as a :refer [defactivity]]
[temporal.client.core :as c]
[temporal.client.worker :as worker]
[temporal.converter.default :as default-data-converter]
[temporal.converter.json :as json]
[temporal.workflow :as w :refer [defworkflow]])
(:import [java.time Duration]))

(def client (atom nil))
(def current-worker-thread (atom nil))
(:import [io.temporal.common.converter NullPayloadConverter]
[java.time Duration]))

(def default-client-options {:target "localhost:7233"
:namespace "default"
Expand Down Expand Up @@ -37,35 +37,21 @@
@(w/invoke user-child-workflow args (merge default-workflow-options {:workflow-id "child-workflow"})))

(defn create-temporal-client
"Creates a new temporal client if the old one does not exist"
([] (create-temporal-client nil))

([options]
(when-not @client
(let [options (merge default-client-options options)]
(log/info "creating temporal client" options)
(reset! client (c/create-client options))))))
(let [options (merge default-client-options options)]
(log/info "creating temporal client" options)
(c/create-client options))))

(defn create-temporal-worker
([client] (create-temporal-worker client nil))

(defn worker-loop
([client] (worker-loop client nil))
([client options]
(let [options (merge default-worker-options options)]
(log/info "starting temporal worker" options)
(worker/start client options))))

(defn create-temporal-worker
"Starts a new instance running on another daemon thread,
stops the current temporal worker and thread if they exist"
([client] (create-temporal-worker client nil))
([client options]
(when (and @current-worker-thread (.isAlive @current-worker-thread))
(.interrupt @current-worker-thread)
(reset! current-worker-thread nil))
(let [thread (Thread. (partial worker-loop client options))]
(doto thread
(.setDaemon true)
(.start))
(reset! current-worker-thread thread))))

(defn execute-workflow
([client workflow arguments] (execute-workflow client workflow arguments nil))
([client workflow arguments options]
Expand All @@ -75,7 +61,15 @@
(c/start workflow arguments)
@(c/get-result workflow))))

(def data-converter
(default-data-converter/create
[(NullPayloadConverter.)
(json/create)]))

(comment
(do (create-temporal-client)
(create-temporal-worker @client)
(execute-workflow @client user-parent-workflow {:names ["Hanna" "Bob" "Tracy" "Felix"]})))
(let [client (create-temporal-client {:data-converter data-converter})
worker (create-temporal-worker client)]
(try
(execute-workflow client user-parent-workflow {:names ["Hanna" "Bob" "Tracy" "Felix"]})
(finally
(worker/stop worker)))))
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
:url "https://www.apache.org/licenses/LICENSE-2.0"
:year 2023
:key "apache-2.0"}
:plugins [[lein-cljfmt "0.9.0"]
:plugins [[dev.weavejester/lein-cljfmt "0.15.6"]
[lein-kibit "0.1.8"]
[lein-bikeshed "0.5.2"]
[lein-cloverage "1.2.4"]
Expand All @@ -18,6 +18,7 @@
[com.taoensso/timbre "6.6.1"]
[com.taoensso/nippy "3.4.2"]
[funcool/promesa "11.0.678"]
[metosin/jsonista "0.3.13"]
[medley "1.4.0"]
[slingshot "0.12.2"]]
:repl-options {:init-ns user}
Expand Down
2 changes: 1 addition & 1 deletion samples/mutex/src/temporal/sample/mutex/main.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[temporal.sample.mutex.core :as core])
(:gen-class))

(log/set-level! :info)
(log/set-min-level! :info)

(defn -main
[& args]
Expand Down
23 changes: 11 additions & 12 deletions src/temporal/activity.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

(ns temporal.activity
"Methods for defining and invoking activity tasks"
(:require [taoensso.timbre :as log]
[taoensso.nippy :as nippy]
[promesa.core :as p]
[temporal.internal.exceptions :as e]
(:require [promesa.core :as p]
[taoensso.timbre :as log]
[temporal.internal.activity :as a]
[temporal.internal.utils :as u]
[temporal.internal.promise]) ;; needed for IPromise protocol extention
[temporal.internal.exceptions :as e]
[temporal.internal.promise] ;; needed for IPromise protocol extention
[temporal.internal.utils :as u])
(:import [io.temporal.workflow Workflow]
[io.temporal.activity Activity]))

Expand All @@ -23,7 +22,7 @@ Arguments:
[details]
(let [ctx (Activity/getExecutionContext)]
(log/trace "heartbeat:" details)
(.heartbeat ctx (nippy/freeze details))))
(.heartbeat ctx details)))

(defn get-heartbeat-details
"
Expand All @@ -36,11 +35,11 @@ along with the Activity Task for the next retry attempt and can be extracted by
[]

(let [ctx (Activity/getExecutionContext)
details (.getHeartbeatDetails ctx u/bytes-type)]
(let [v (when (.isPresent details)
(nippy/thaw (.get details)))]
(log/trace "get-heartbeat-details:" v)
v)))
details (.getHeartbeatDetails ctx u/bytes-type)
v (when (.isPresent details)
(.get details))]
(log/trace "get-heartbeat-details:" v)
v))

(defn get-info
"Returns information about the Activity execution"
Expand Down
60 changes: 34 additions & 26 deletions src/temporal/client/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@

(ns temporal.client.core
"Methods for client interaction with Temporal"
(:require [taoensso.timbre :as log]
[taoensso.nippy :as nippy]
[promesa.core :as p]
(:refer-clojure :exclude [update])
(:require [promesa.core :as p]
[taoensso.timbre :as log]
[temporal.client.options :as copts]
[temporal.internal.workflow :as w]
[temporal.internal.exceptions :as e]
[temporal.internal.utils :as u]
[temporal.internal.exceptions :as e])
(:import [java.time Duration]
[io.temporal.client WorkflowClient WorkflowStub WorkflowUpdateHandle WorkflowUpdateStage UpdateOptions]))
[temporal.internal.workflow :as w])
(:import [io.temporal.client
UpdateOptions
WorkflowClient
WorkflowStub
WorkflowUpdateHandle
WorkflowUpdateStage]
[java.time Duration]))

(defn- ^:no-doc create-client*
[service-stub options]
Expand Down Expand Up @@ -93,18 +98,22 @@ Workflow ID Conflict Policies (`:workflow-id-conflict-policy`):
(let [stub (.newUntypedWorkflowStub client (u/namify workflow-id))]
(log/trace "create-workflow id:" workflow-id)
{:client client :stub stub}))

([^WorkflowClient client workflow options]
(let [wf-name (w/get-annotated-name workflow)
options (w/wf-options-> options)
stub (.newUntypedWorkflowStub client wf-name options)]
(log/trace "create-workflow:" wf-name options)
{:client client :stub stub})))

(defn encode [^WorkflowClient client value]
(.. client (getOptions) (getDataConverter) (toPayloads (into-array Object value))))

(defn start
"
Starts 'worklow' with 'params'"
[{:keys [^WorkflowStub stub] :as workflow} params]
(log/trace "start:" params)
[{:keys [^WorkflowStub stub ^WorkflowClient client] :as _workflow} params]
(log/trace "start:" params "client:" client)
(.start stub (u/->objarray params)))

(defn signal-with-start
Expand All @@ -128,7 +137,7 @@ set `:workflow-id-conflict-policy` to `:use-existing` when calling [[create-work
(signal-with-start w :my-signal {:data \"value\"} {:initial \"args\"}))
```
"
[{:keys [^WorkflowStub stub] :as workflow} signal-name signal-params wf-params]
[{:keys [^WorkflowStub stub] :as _workflow} signal-name signal-params wf-params]
(log/trace "signal-with-start->" "signal:" signal-name signal-params "workflow-params:" wf-params)
(.signalWithStart stub (u/namify signal-name) (u/->objarray signal-params) (u/->objarray wf-params)))

Expand All @@ -140,7 +149,7 @@ Sends 'params' as a signal 'signal-name' to 'workflow'
(>! workflow ::my-signal {:msg \"Hi\"})
```
"
[{:keys [^WorkflowStub stub] :as workflow} signal-name params]
[{:keys [^WorkflowStub stub] :as _workflow} signal-name params]
(log/trace ">!" signal-name params)
(.signal stub (u/namify signal-name) (u/->objarray params)))

Expand All @@ -159,9 +168,10 @@ defworkflow once the workflow concludes.
@(get-result w))
```
"
[{:keys [^WorkflowStub stub] :as workflow}]
(-> (.getResultAsync stub u/bytes-type)
(p/then nippy/thaw)
[{:keys [^WorkflowStub stub ^WorkflowClient client] :as _workflow}]
(log/trace "get-result client:" client "stub:" stub)
(-> (.getResultAsync stub Object)
(p/then identity)
(p/catch e/slingshot? e/recast-stone)
(p/catch (fn [e]
(log/error e)
Expand All @@ -181,9 +191,8 @@ Arguments:
(query workflow ::my-query {:foo \"bar\"})
```
"
[{:keys [^WorkflowStub stub] :as workflow} query-type args]
(-> (.query stub (u/namify query-type) u/bytes-type (u/->objarray args))
(nippy/thaw)))
[{:keys [^WorkflowStub stub] :as _workflow} query-type args]
(-> (.query stub (u/namify query-type) u/bytes-type (u/->objarray args))))

(defn update
"
Expand All @@ -205,10 +214,9 @@ Arguments:
(update workflow :increment {:amount 5})
```
"
[{:keys [^WorkflowStub stub] :as workflow} update-type args]
[{:keys [^WorkflowStub stub] :as _workflow} update-type args]
(log/trace "update:" update-type args)
(-> (.update stub (u/namify update-type) u/bytes-type (u/->objarray args))
(nippy/thaw)))
(-> (.update stub (u/namify update-type) u/bytes-type (u/->objarray args))))

(def ^:private stage->enum
"Maps keyword stage to WorkflowUpdateStage enum"
Expand All @@ -222,7 +230,7 @@ Arguments:
{:id (.getId handle)
:execution (.getExecution handle)
:result (-> (.getResultAsync handle)
(p/then nippy/thaw)
(p/then identity)
(p/catch e/slingshot? e/recast-stone)
(p/catch (fn [e]
(log/error e)
Expand Down Expand Up @@ -271,7 +279,7 @@ Returns a map with:
"
([workflow update-type args]
(start-update workflow update-type args {}))
([{:keys [^WorkflowStub stub] :as workflow} update-type args {:keys [wait-for-stage update-id] :as options}]
([{:keys [^WorkflowStub stub] :as _workflow} update-type args {:keys [wait-for-stage update-id] :as options}]
(log/trace "start-update:" update-type args options)
(let [handle (if update-id
;; Use UpdateOptions when custom update-id is specified
Expand Down Expand Up @@ -302,7 +310,7 @@ Returns a map with:
@(:result handle))
```
"
[{:keys [^WorkflowStub stub] :as workflow} update-id]
[{:keys [^WorkflowStub stub] :as _workflow} update-id]
(log/trace "get-update-handle:" update-id)
(let [handle (.getUpdateHandle stub update-id u/bytes-type)]
(wrap-update-handle handle)))
Expand Down Expand Up @@ -333,7 +341,7 @@ Returns a map with:
"
([workflow update-type update-args start-args]
(update-with-start workflow update-type update-args start-args {}))
([{:keys [^WorkflowStub stub] :as workflow} update-type update-args start-args options]
([{:keys [^WorkflowStub stub] :as _workflow} update-type update-args start-args options]
(log/trace "update-with-start:" update-type update-args start-args options)
(let [update-options (build-update-options (merge {:update-name update-type
:wait-for-stage (or (:wait-for-stage options) :accepted)}
Expand All @@ -349,7 +357,7 @@ Gracefully cancels 'workflow'
(cancel workflow)
```
"
[{:keys [^WorkflowStub stub] :as workflow}]
[{:keys [^WorkflowStub stub] :as _workflow}]
(.cancel stub))

(defn terminate
Expand All @@ -360,5 +368,5 @@ Forcefully terminates 'workflow'
(terminate workflow \"unresponsive\", {})
```
"
[{:keys [^WorkflowStub stub] :as workflow} reason params]
[{:keys [^WorkflowStub stub] :as _workflow} reason params]
(.terminate stub reason (u/->objarray params)))
30 changes: 22 additions & 8 deletions src/temporal/client/options.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
(ns temporal.client.options
(:require [temporal.internal.utils :as u])
(:import [io.temporal.client WorkflowClientOptions WorkflowClientOptions$Builder]
[io.temporal.common.interceptors WorkflowClientInterceptorBase]
[io.temporal.client.schedules ScheduleClientOptions ScheduleClientOptions$Builder]
[io.temporal.serviceclient WorkflowServiceStubs WorkflowServiceStubsOptions WorkflowServiceStubsOptions$Builder]
[io.temporal.authorization AuthorizationTokenSupplier]))
(:require
[temporal.converter.default :as default-data-converter]
[temporal.internal.utils :as u])
(:import
[io.temporal.authorization AuthorizationTokenSupplier]
[io.temporal.client WorkflowClientOptions WorkflowClientOptions$Builder]
[io.temporal.client.schedules ScheduleClientOptions ScheduleClientOptions$Builder]
[io.temporal.common.interceptors WorkflowClientInterceptorBase]
[io.temporal.serviceclient WorkflowServiceStubs WorkflowServiceStubsOptions WorkflowServiceStubsOptions$Builder]))

(defn assoc-default-data-converter [{:keys [data-converter] :as params}]
(cond-> params
(not data-converter)
(assoc :data-converter (default-data-converter/create))))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a singleton instance via a plain def? Why recreate it every call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about it, but I followed the Java SDK logic there. I can definitely create that, I can also create a namespace called temporal.converter and create it there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this shouldn't matter much since this function won't be called often.


(def workflow-client-options
"
Expand All @@ -25,7 +33,10 @@

(defn ^:no-doc workflow-client-options->
^WorkflowClientOptions [params]
(u/build (WorkflowClientOptions/newBuilder (WorkflowClientOptions/getDefaultInstance)) workflow-client-options params))
(->> params
(assoc-default-data-converter)
(u/build (WorkflowClientOptions/newBuilder (WorkflowClientOptions/getDefaultInstance))
workflow-client-options)))

(def schedule-client-options
"
Expand All @@ -45,7 +56,10 @@

(defn ^:no-doc schedule-client-options->
^ScheduleClientOptions [params]
(u/build (ScheduleClientOptions/newBuilder (ScheduleClientOptions/getDefaultInstance)) schedule-client-options params))
(->> params
(assoc-default-data-converter)
(u/build (ScheduleClientOptions/newBuilder (ScheduleClientOptions/getDefaultInstance))
schedule-client-options)))

(defn ^:no-doc apikey-auth-fn->
^AuthorizationTokenSupplier [f]
Expand Down
14 changes: 7 additions & 7 deletions src/temporal/client/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
"Methods for managing a Temporal worker instance"
(:require [taoensso.timbre :as log]
[temporal.internal.activity :as a]
[temporal.internal.workflow :as w]
[temporal.internal.utils :as u])
(:import [io.temporal.worker Worker WorkerFactory WorkerFactoryOptions WorkerFactoryOptions$Builder WorkerOptions WorkerOptions$Builder]
[temporal.internal.utils :as u]
[temporal.internal.workflow :as w])
(:import [io.temporal.common.interceptors WorkerInterceptor]
[io.temporal.worker Worker WorkerFactory WorkerFactoryOptions WorkerFactoryOptions$Builder WorkerOptions WorkerOptions$Builder]
[io.temporal.worker.tuning PollerBehavior PollerBehaviorAutoscaling]
[temporal.internal.dispatcher DynamicWorkflowProxy]
[io.temporal.workflow DynamicWorkflow]
[io.temporal.common.interceptors WorkerInterceptor]))
[temporal.internal.dispatcher DynamicWorkflowProxy]))

(defn ^:no-doc init
(defn ^:no-doc init
"
Initializes a worker instance, suitable for real connections or unit-testing with temporal.testing.env
"
Expand Down Expand Up @@ -184,5 +184,5 @@ Arguments:
(stop instance))
```
"
[{:keys [^WorkerFactory factory] :as instance}]
[{:keys [^WorkerFactory factory] :as _instance}]
(.shutdown factory))
Loading