From 228e12fb6206f5090f0bee28d1e5192234fccf4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 15:32:26 +0100 Subject: [PATCH 01/14] Add custom data-converter implementations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- project.clj | 1 + src/temporal/converter/byte_string.clj | 27 +++++ src/temporal/converter/default.clj | 19 ++++ src/temporal/converter/json.clj | 40 ++++++++ src/temporal/converter/metadata.clj | 37 +++++++ src/temporal/converter/nippy.clj | 27 +++++ src/temporal/converter/optional.clj | 18 ++++ src/temporal/converter/payload.clj | 17 ++++ test/temporal/converter/byte_string_test.clj | 26 +++++ test/temporal/converter/default_test.clj | 102 +++++++++++++++++++ test/temporal/converter/json_test.clj | 51 ++++++++++ test/temporal/converter/metadata_test.clj | 22 ++++ test/temporal/converter/nippy_test.clj | 53 ++++++++++ test/temporal/converter/optional_test.clj | 39 +++++++ test/temporal/converter/payload_test.clj | 15 +++ 15 files changed, 494 insertions(+) create mode 100644 src/temporal/converter/byte_string.clj create mode 100644 src/temporal/converter/default.clj create mode 100644 src/temporal/converter/json.clj create mode 100644 src/temporal/converter/metadata.clj create mode 100644 src/temporal/converter/nippy.clj create mode 100644 src/temporal/converter/optional.clj create mode 100644 src/temporal/converter/payload.clj create mode 100644 test/temporal/converter/byte_string_test.clj create mode 100644 test/temporal/converter/default_test.clj create mode 100644 test/temporal/converter/json_test.clj create mode 100644 test/temporal/converter/metadata_test.clj create mode 100644 test/temporal/converter/nippy_test.clj create mode 100644 test/temporal/converter/optional_test.clj create mode 100644 test/temporal/converter/payload_test.clj diff --git a/project.clj b/project.clj index 20d7212..80a7f29 100644 --- a/project.clj +++ b/project.clj @@ -18,6 +18,7 @@ [com.taoensso/timbre "6.6.1"] [com.taoensso/nippy "3.4.2"] [funcool/promesa "11.0.678"] + [metosin/jsonista "0.3.13"] [medley "1.4.0"] [slingshot "0.12.2"]] :repl-options {:init-ns user} diff --git a/src/temporal/converter/byte_string.clj b/src/temporal/converter/byte_string.clj new file mode 100644 index 0000000..dba14ef --- /dev/null +++ b/src/temporal/converter/byte_string.clj @@ -0,0 +1,27 @@ +(ns temporal.converter.byte-string + (:import + [io.temporal.shaded.com.google.protobuf ByteString] + [java.nio.charset StandardCharsets])) + +(defprotocol ToByteString + (->byte-string ^ByteString [value])) + +(extend-protocol ToByteString + nil + (->byte-string [_] + ByteString/EMPTY) + + byte/1 + (->byte-string [value] + (ByteString/copyFrom value)) + + String + (->byte-string [value] + (ByteString/copyFrom value StandardCharsets/UTF_8)) + + ByteString + (->byte-string [value] + value)) + +(defn byte-string? [input] + (instance? ByteString input)) diff --git a/src/temporal/converter/default.clj b/src/temporal/converter/default.clj new file mode 100644 index 0000000..a3b88aa --- /dev/null +++ b/src/temporal/converter/default.clj @@ -0,0 +1,19 @@ +(ns temporal.converter.default + (:require + [temporal.converter.json :as json] + [temporal.converter.nippy :as nippy]) + (:import + [io.temporal.common.converter DefaultDataConverter NullPayloadConverter PayloadConverter])) + +(def standard-converters + [(NullPayloadConverter.) + (nippy/create) + (json/create)]) + +(defn create + (^DefaultDataConverter [] + (create standard-converters)) + + (^DefaultDataConverter [converters] + (DefaultDataConverter. + (into-array PayloadConverter converters)))) diff --git a/src/temporal/converter/json.clj b/src/temporal/converter/json.clj new file mode 100644 index 0000000..3cc28a6 --- /dev/null +++ b/src/temporal/converter/json.clj @@ -0,0 +1,40 @@ +(ns temporal.converter.json + (:require + [jsonista.core :as json] + [temporal.converter.metadata :refer [->metadata]] + [temporal.converter.optional :refer [->optional]] + [temporal.converter.payload :refer [->payload]]) + (:import + [io.temporal.api.common.v1 Payload] + [io.temporal.common.converter PayloadConverter] + [java.lang.reflect Type])) + +(def ^String json-plain + "json/plain") + +(def metadata + (->metadata json-plain)) + +(def default-object-mapper + json/keyword-keys-object-mapper) + +(defn create ^PayloadConverter + ([] + (create default-object-mapper)) + + ([object-mapper] + (reify PayloadConverter + (getEncodingType [_] + json-plain) + + (toData [_ value] + (-> value + (json/write-value-as-bytes object-mapper) + (->payload metadata) + (->optional))) + + (fromData [_ ^Payload content ^Class _value-class ^Type _value-type] + (-> content + (.getData) + (.toByteArray) + (json/read-value object-mapper)))))) diff --git a/src/temporal/converter/metadata.clj b/src/temporal/converter/metadata.clj new file mode 100644 index 0000000..63ada90 --- /dev/null +++ b/src/temporal/converter/metadata.clj @@ -0,0 +1,37 @@ +(ns temporal.converter.metadata + (:require + [temporal.converter.byte-string :refer [->byte-string byte-string?]]) + (:import + [clojure.lang MapEntry] + [io.temporal.common.converter EncodingKeys] + [java.util Map])) + +(def empty {}) + +(defn metadata-entry? [input] + (and (instance? MapEntry input) + (instance? String (key input)) + (byte-string? (val input)))) + +(defn metadata? [input] + (and (instance? Map input) + (every? metadata-entry? input))) + +(defprotocol ToMetadata + (->metadata ^Map [value])) + +(extend-protocol ToMetadata + nil + (->metadata [_] + empty) + + String + (->metadata [encoding] + (->metadata {EncodingKeys/METADATA_ENCODING_KEY encoding})) + + Map + (->metadata [map] + (reduce-kv (fn [m k v] + (assoc m (str k) (->byte-string v))) + {} + map))) diff --git a/src/temporal/converter/nippy.clj b/src/temporal/converter/nippy.clj new file mode 100644 index 0000000..7255a8d --- /dev/null +++ b/src/temporal/converter/nippy.clj @@ -0,0 +1,27 @@ +(ns temporal.converter.nippy + (:require + [taoensso.nippy :as nippy] + [temporal.converter.metadata :refer [->metadata]] + [temporal.converter.optional :refer [->optional]] + [temporal.converter.payload :refer [->payload]]) + (:import + [io.temporal.api.common.v1 Payload] + [io.temporal.common.converter PayloadConverter] + [java.lang.reflect Type])) + +(def ^String binary-plain + "binary/plain") + +(def metadata + (->metadata binary-plain)) + +(defn create ^PayloadConverter [] + (reify PayloadConverter + (getEncodingType [_] + binary-plain) + + (toData [_ value] + (-> value (nippy/freeze) (->payload metadata) (->optional))) + + (fromData [_ ^Payload content ^Class _value-class ^Type _value-type] + (-> content (.getData) (.toByteArray) (nippy/thaw))))) diff --git a/src/temporal/converter/optional.clj b/src/temporal/converter/optional.clj new file mode 100644 index 0000000..5e99ecc --- /dev/null +++ b/src/temporal/converter/optional.clj @@ -0,0 +1,18 @@ +(ns temporal.converter.optional + (:import + [java.util Optional])) + +(def empty + (Optional/empty)) + +(defn optional? [value] + (instance? Optional value)) + +;; NOTE: do we need this? +(defn present? [value] + (and (optional? value) (.isPresent ^Optional value))) + +(defn ->optional [value] + (if value + (Optional/of value) + empty)) diff --git a/src/temporal/converter/payload.clj b/src/temporal/converter/payload.clj new file mode 100644 index 0000000..8dd1c43 --- /dev/null +++ b/src/temporal/converter/payload.clj @@ -0,0 +1,17 @@ +(ns temporal.converter.payload + (:require + [temporal.converter.byte-string :refer [->byte-string]]) + (:import + [io.temporal.api.common.v1 Payload])) + +(def empty-metadata {}) + +(defn ->payload + (^Payload [value] + (->payload value empty-metadata)) + + (^Payload [value metadata] + (.. (Payload/newBuilder) + (putAllMetadata metadata) + (setData (->byte-string value)) + (build)))) diff --git a/test/temporal/converter/byte_string_test.clj b/test/temporal/converter/byte_string_test.clj new file mode 100644 index 0000000..a97d852 --- /dev/null +++ b/test/temporal/converter/byte_string_test.clj @@ -0,0 +1,26 @@ +(ns temporal.converter.byte-string-test + (:require + [clojure.test :refer [are deftest]] + [temporal.converter.byte-string :as sut]) + (:import + [io.temporal.shaded.com.google.protobuf ByteString] + [java.nio.charset StandardCharsets])) + +(deftest ->byte-string + (are [input] (->> input (sut/->byte-string) (instance? ByteString)) + nil + "Something" + (byte-array 10) + ByteString/EMPTY)) + +(deftest byte-string? + (are [input] (sut/byte-string? input) + ByteString/EMPTY + (ByteString/copyFrom "" StandardCharsets/UTF_8) + (ByteString/copyFrom (byte-array 0)) + (ByteString/copyFrom (byte-array 10))) + + (are [input] (not (sut/byte-string? input)) + nil + "" + (byte-array 0))) diff --git a/test/temporal/converter/default_test.clj b/test/temporal/converter/default_test.clj new file mode 100644 index 0000000..2e8626d --- /dev/null +++ b/test/temporal/converter/default_test.clj @@ -0,0 +1,102 @@ +(ns temporal.converter.default-test + (:require + [clojure.test :refer [are deftest is]] + [taoensso.nippy :as nippy] + [temporal.converter.default :as sut] + [temporal.converter.metadata :refer [->metadata]] + [temporal.converter.payload :refer [->payload]]) + (:import + [io.temporal.api.common.v1 Payload] + [io.temporal.common.converter DataConverterException EncodingKeys NullPayloadConverter])) + +(defn payload->encoding [^Payload payload] + (.. payload + (getMetadataOrThrow EncodingKeys/METADATA_ENCODING_KEY) + (toStringUtf8))) + +(defn null-payload? [^Payload payload] + (and (= "binary/null" (payload->encoding payload)) + (.. payload (getData) (isEmpty)))) + +(defn nippy-payload? [expected] + (fn [^Payload payload] + (and (= "binary/plain" (payload->encoding payload)) + (= expected (-> payload + (.. (getData) (toByteArray)) + nippy/thaw))))) + +(deftest to-payload-with-default-converters + (let [data-converter (sut/create)] + (are [predicate input-value] (-> data-converter + (.toPayload input-value) + (.get) + (predicate)) + null-payload? nil + (nippy-payload? :something) :something + (nippy-payload? "Something") "Something" + (nippy-payload? [1 2 3 4]) [1 2 3 4] + (nippy-payload? {:foo "bar"}) {:foo "bar"}))) + +(deftest to-payload-with-custom-converters + (let [data-converter (sut/create [(NullPayloadConverter.)])] + (is (null-payload? (.. data-converter (toPayload nil) (get)))) + + (are [input-value] (thrown-with-msg? DataConverterException + #"No PayloadConverter is registered with this DataConverter" + (.toPayload data-converter input-value)) + :something + "Something" + [1 2 3 4] + {:foo "bar"}))) + +(def metadata-null + (->metadata "binary/null")) + +(def metadata-plain + (->metadata "binary/plain")) + +(def metadata-json + (->metadata "json/plain")) + +(defn ->payload-json [input] + (->payload input metadata-json)) + +(defn ->payload-nippy [input] + (-> input (nippy/freeze) (->payload metadata-plain))) + +(deftest from-payload-with-default-converters + (are [expected payload] (-> (sut/create) + (.fromPayload payload Object Object) + (= expected)) + nil + (->payload nil metadata-null) + + nil + (->payload-nippy nil) + + :something + (->payload-nippy :something) + + 1234 + (->payload-nippy 1234) + + [1 2 3 4] + (->payload-nippy [1 2 3 4]) + + {:foo :bar} + (->payload-nippy {:foo :bar}) + + nil + (->payload-json "null") + + 1234 + (->payload-json "1234") + + (hash-map :foo "bar") + (->payload-json "{\"foo\":\"bar\"}") + + [1 2 3 4] + (->payload-json "[1,2,3,4]") + + "Raw JSON string" + (->payload-json "\"Raw JSON string\"\""))) diff --git a/test/temporal/converter/json_test.clj b/test/temporal/converter/json_test.clj new file mode 100644 index 0000000..d13817a --- /dev/null +++ b/test/temporal/converter/json_test.clj @@ -0,0 +1,51 @@ +(ns temporal.converter.json-test + (:require + [jsonista.core :as json] + [clojure.template :refer [do-template]] + [clojure.test :refer [deftest is testing]] + [temporal.converter.byte-string :refer [byte-string?]] + [temporal.converter.json :as sut] + [temporal.converter.payload :refer [->payload]]) + (:import + [io.temporal.api.common.v1 Payload] + [java.util Optional])) + +(def data-converter (sut/create)) + +(deftest getEncodingType + (is (= "json/plain" (.getEncodingType data-converter)))) + +(deftest toData + (do-template + [input-value] + + (testing (str "encoding: " (class input-value)) + (let [value (.toData data-converter input-value) + payload (.get value) + encoding (.getMetadataOrDefault payload "encoding" nil) + data (.getData payload) + bytes (.toByteArray data)] + (is (instance? Optional value)) + (is (.isPresent value)) + (is (instance? Payload payload)) + (is (byte-string? encoding)) + (is (= "json/plain" (.toStringUtf8 encoding))) + (is (byte-string? data)) + (is (bytes? bytes)) + (is (= input-value (json/read-value bytes json/keyword-keys-object-mapper))))) + nil + "string" + {:key "value"} + ["a" "b" "c"])) + +(deftest fromData + (do-template + [input-value] + + (testing (str "decoding: " (class input-value)) + (let [content (-> input-value (json/write-value-as-bytes) (->payload sut/metadata))] + (is (= input-value (.fromData data-converter content (class input-value) Object))))) + nil + "string" + {:key "value"} + ["a" 0 "b" 1 "c" 2])) diff --git a/test/temporal/converter/metadata_test.clj b/test/temporal/converter/metadata_test.clj new file mode 100644 index 0000000..761048a --- /dev/null +++ b/test/temporal/converter/metadata_test.clj @@ -0,0 +1,22 @@ +(ns temporal.converter.metadata-test + (:require + [clojure.test :refer [are deftest is]] + [temporal.converter.byte-string :refer [->byte-string]] + [temporal.converter.metadata :as sut])) + +(deftest ->metadata-test + (are [input] (is (sut/metadata? (sut/->metadata input))) + nil + {} + "binary/null")) + +(deftest metadata? + (are [input] (is (sut/metadata? input)) + {} + sut/empty + {"custom" (->byte-string "data")}) + + (are [input] (is (not (sut/metadata? input))) + nil + "Something" + {:foo :bar})) diff --git a/test/temporal/converter/nippy_test.clj b/test/temporal/converter/nippy_test.clj new file mode 100644 index 0000000..ba2d1bd --- /dev/null +++ b/test/temporal/converter/nippy_test.clj @@ -0,0 +1,53 @@ +(ns temporal.converter.nippy-test + (:require + [clojure.template :refer [do-template]] + [clojure.test :refer [deftest is testing]] + [taoensso.nippy :as nippy] + [temporal.converter.byte-string :refer [byte-string?]] + [temporal.converter.nippy :as sut] + [temporal.converter.payload :refer [->payload]]) + (:import + [io.temporal.api.common.v1 Payload] + [java.util Optional])) + +(def data-converter (sut/create)) + +(deftest getEncodingType + (is (= "binary/plain" (.getEncodingType data-converter)))) + +(deftest toData + (do-template + [input-value] + + (testing (str "encoding: " (class input-value)) + (let [value (.toData data-converter input-value) + payload (.get value) + encoding (.getMetadataOrDefault payload "encoding" nil) + data (.getData payload) + bytes (.toByteArray data)] + (is (instance? Optional value)) + (is (.isPresent value)) + (is (instance? Payload payload)) + (is (byte-string? encoding)) + (is (= "binary/plain" (.toStringUtf8 encoding))) + (is (byte-string? data)) + (is (bytes? bytes)) + (is (= input-value (nippy/thaw bytes))))) + nil + "string" + {:key "value"} + [:a :b :c] + #{:a :b :c})) + +(deftest fromData + (do-template + [input-value] + + (testing (str "decoding: " (class input-value)) + (let [content (-> input-value (nippy/freeze) (->payload sut/metadata))] + (is (= input-value (.fromData data-converter content (class input-value) Object))))) + nil + "string" + {:key "value"} + [:a 0 :b 1 :c 2] + #{:a :b :c})) diff --git a/test/temporal/converter/optional_test.clj b/test/temporal/converter/optional_test.clj new file mode 100644 index 0000000..67b08e7 --- /dev/null +++ b/test/temporal/converter/optional_test.clj @@ -0,0 +1,39 @@ +(ns temporal.converter.optional-test + (:require + [clojure.template :refer [do-template]] + [clojure.test :refer [are deftest is testing]] + [temporal.converter.optional :as sut]) + (:import + [java.util Optional])) + +(deftest ->optional-with-present-values + (do-template + [value] + + (testing (str "encoding: " (class value)) + (let [optional (sut/->optional value)] + (is (instance? Optional optional)) + (is (.isPresent optional)) + (is (= value (.get optional))))) + + "Something" + :foo + [:a :b :c])) + +(deftest ->optional-with-nil + (let [optional (sut/->optional nil)] + (is (instance? Optional optional)) + (is (not (.isPresent optional))))) + +(deftest present? + (are [value] (sut/present? value) + (Optional/of "test") + (Optional/of :something) + (Optional/of [:a :b :c]) + (sut/->optional "test") + (sut/->optional :something) + (sut/->optional [:a :b :c])) + + (are [value] (not (sut/present? value)) + (Optional/empty) + (sut/->optional nil))) diff --git a/test/temporal/converter/payload_test.clj b/test/temporal/converter/payload_test.clj new file mode 100644 index 0000000..d1a8cf2 --- /dev/null +++ b/test/temporal/converter/payload_test.clj @@ -0,0 +1,15 @@ +(ns temporal.converter.payload-test + (:require + [clojure.test :refer [deftest is]] + [temporal.converter.payload :as sut]) + (:import + [java.nio.charset StandardCharsets])) + +(deftest ->payload-without-metadata + (let [payload (sut/->payload (.getBytes "Something" StandardCharsets/UTF_8))] + (is (.. payload getMetadataMap isEmpty)))) + +(deftest ->payload-with-metadata + (let [payload (sut/->payload (.getBytes "Something" StandardCharsets/UTF_8) + {"something" "bar"})] + (is (not (.. payload getMetadataMap isEmpty))))) From 39ae2e82414e866d29c84ee66ec67898a11942f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 15:39:11 +0100 Subject: [PATCH 02/14] Fix obsolete function call MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- samples/mutex/src/temporal/sample/mutex/main.clj | 2 +- test/temporal/test/utils.clj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/mutex/src/temporal/sample/mutex/main.clj b/samples/mutex/src/temporal/sample/mutex/main.clj index 73673b4..2a19943 100644 --- a/samples/mutex/src/temporal/sample/mutex/main.clj +++ b/samples/mutex/src/temporal/sample/mutex/main.clj @@ -3,7 +3,7 @@ [temporal.sample.mutex.core :as core]) (:gen-class)) -(log/set-level! :info) +(log/set-min-level! :info) (defn -main [& args] diff --git a/test/temporal/test/utils.clj b/test/temporal/test/utils.clj index d4f83cb..c3e0d26 100644 --- a/test/temporal/test/utils.clj +++ b/test/temporal/test/utils.clj @@ -7,7 +7,7 @@ [temporal.client.core :as c]) (:import [java.time Duration])) -(log/set-level! :trace) +(log/set-min-level! :trace) ;;----------------------------------------------------------------------------- ;; Data From d53f2d2d0da7e792f22b53be4b7492f00e0866b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 15:42:44 +0100 Subject: [PATCH 03/14] Remove unnecessary and confusing atom usage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- dev-resources/utils.clj | 40 +++++++++++++--------------------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/dev-resources/utils.clj b/dev-resources/utils.clj index fed4261..dcd30e8 100644 --- a/dev-resources/utils.clj +++ b/dev-resources/utils.clj @@ -6,9 +6,6 @@ [temporal.workflow :as w :refer [defworkflow]]) (:import [java.time Duration])) -(def client (atom nil)) -(def current-worker-thread (atom nil)) - (def default-client-options {:target "localhost:7233" :namespace "default" :enable-https false}) @@ -37,35 +34,21 @@ @(w/invoke user-child-workflow args (merge default-workflow-options {:workflow-id "child-workflow"}))) (defn create-temporal-client - "Creates a new temporal client if the old one does not exist" ([] (create-temporal-client nil)) + ([options] - (when-not @client - (let [options (merge default-client-options options)] - (log/info "creating temporal client" options) - (reset! client (c/create-client options)))))) + (let [options (merge default-client-options options)] + (log/info "creating temporal client" options) + (c/create-client options)))) + +(defn create-temporal-worker + ([client] (create-temporal-worker client nil)) -(defn worker-loop - ([client] (worker-loop client nil)) ([client options] (let [options (merge default-worker-options options)] (log/info "starting temporal worker" options) (worker/start client options)))) -(defn create-temporal-worker - "Starts a new instance running on another daemon thread, - stops the current temporal worker and thread if they exist" - ([client] (create-temporal-worker client nil)) - ([client options] - (when (and @current-worker-thread (.isAlive @current-worker-thread)) - (.interrupt @current-worker-thread) - (reset! current-worker-thread nil)) - (let [thread (Thread. (partial worker-loop client options))] - (doto thread - (.setDaemon true) - (.start)) - (reset! current-worker-thread thread)))) - (defn execute-workflow ([client workflow arguments] (execute-workflow client workflow arguments nil)) ([client workflow arguments options] @@ -76,6 +59,9 @@ @(c/get-result workflow)))) (comment - (do (create-temporal-client) - (create-temporal-worker @client) - (execute-workflow @client user-parent-workflow {:names ["Hanna" "Bob" "Tracy" "Felix"]}))) + (let [client (create-temporal-client) + worker (create-temporal-worker client)] + (try + (execute-workflow client user-parent-workflow {:names ["Hanna" "Bob" "Tracy" "Felix"]}) + (finally + (worker/stop worker))))) From 522377dc8177cbc8199da289529eb2054aaaf353 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 23:00:49 +0100 Subject: [PATCH 04/14] Use the custom data-converter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- dev-resources/utils.clj | 12 ++++++-- src/temporal/activity.clj | 15 +++++----- src/temporal/client/core.clj | 41 ++++++++++++++++----------- src/temporal/client/options.clj | 30 ++++++++++++++------ src/temporal/client/worker.clj | 12 ++++---- src/temporal/codec.clj | 30 ++++++++++++-------- src/temporal/internal/activity.clj | 13 ++++----- src/temporal/internal/utils.clj | 16 +++++------ src/temporal/internal/workflow.clj | 5 ++-- src/temporal/side_effect.clj | 9 ++---- src/temporal/testing/env.clj | 9 ++++-- src/temporal/workflow.clj | 13 ++++----- test/temporal/client/options_test.clj | 27 ++++++++++++++++++ test/temporal/test/utils.clj | 41 ++++++++++++++++++--------- 14 files changed, 172 insertions(+), 101 deletions(-) create mode 100644 test/temporal/client/options_test.clj diff --git a/dev-resources/utils.clj b/dev-resources/utils.clj index dcd30e8..73d800d 100644 --- a/dev-resources/utils.clj +++ b/dev-resources/utils.clj @@ -3,8 +3,11 @@ [temporal.activity :as a :refer [defactivity]] [temporal.client.core :as c] [temporal.client.worker :as worker] + [temporal.converter.default :as default-data-converter] + [temporal.converter.json :as json] [temporal.workflow :as w :refer [defworkflow]]) - (:import [java.time Duration])) + (:import [io.temporal.common.converter NullPayloadConverter] + [java.time Duration])) (def default-client-options {:target "localhost:7233" :namespace "default" @@ -58,8 +61,13 @@ (c/start workflow arguments) @(c/get-result workflow)))) +(def data-converter + (default-data-converter/create + [(NullPayloadConverter.) + (json/create)])) + (comment - (let [client (create-temporal-client) + (let [client (create-temporal-client {:data-converter data-converter}) worker (create-temporal-worker client)] (try (execute-workflow client user-parent-workflow {:names ["Hanna" "Bob" "Tracy" "Felix"]}) diff --git a/src/temporal/activity.clj b/src/temporal/activity.clj index 0a57d55..e943354 100644 --- a/src/temporal/activity.clj +++ b/src/temporal/activity.clj @@ -2,13 +2,12 @@ (ns temporal.activity "Methods for defining and invoking activity tasks" - (:require [taoensso.timbre :as log] - [taoensso.nippy :as nippy] - [promesa.core :as p] - [temporal.internal.exceptions :as e] + (:require [promesa.core :as p] + [taoensso.timbre :as log] [temporal.internal.activity :as a] - [temporal.internal.utils :as u] - [temporal.internal.promise]) ;; needed for IPromise protocol extention + [temporal.internal.exceptions :as e] + [temporal.internal.promise] ;; needed for IPromise protocol extention + [temporal.internal.utils :as u]) (:import [io.temporal.workflow Workflow] [io.temporal.activity Activity])) @@ -23,7 +22,7 @@ Arguments: [details] (let [ctx (Activity/getExecutionContext)] (log/trace "heartbeat:" details) - (.heartbeat ctx (nippy/freeze details)))) + (.heartbeat ctx details))) (defn get-heartbeat-details " @@ -38,7 +37,7 @@ along with the Activity Task for the next retry attempt and can be extracted by (let [ctx (Activity/getExecutionContext) details (.getHeartbeatDetails ctx u/bytes-type)] (let [v (when (.isPresent details) - (nippy/thaw (.get details)))] + (.get details))] (log/trace "get-heartbeat-details:" v) v))) diff --git a/src/temporal/client/core.clj b/src/temporal/client/core.clj index f57fe6a..a18d02d 100644 --- a/src/temporal/client/core.clj +++ b/src/temporal/client/core.clj @@ -2,15 +2,19 @@ (ns temporal.client.core "Methods for client interaction with Temporal" - (:require [taoensso.timbre :as log] - [taoensso.nippy :as nippy] - [promesa.core :as p] + (:require [promesa.core :as p] + [taoensso.timbre :as log] [temporal.client.options :as copts] - [temporal.internal.workflow :as w] + [temporal.internal.exceptions :as e] [temporal.internal.utils :as u] - [temporal.internal.exceptions :as e]) - (:import [java.time Duration] - [io.temporal.client WorkflowClient WorkflowStub WorkflowUpdateHandle WorkflowUpdateStage UpdateOptions])) + [temporal.internal.workflow :as w]) + (:import [io.temporal.client + UpdateOptions + WorkflowClient + WorkflowStub + WorkflowUpdateHandle + WorkflowUpdateStage] + [java.time Duration])) (defn- ^:no-doc create-client* [service-stub options] @@ -93,6 +97,7 @@ Workflow ID Conflict Policies (`:workflow-id-conflict-policy`): (let [stub (.newUntypedWorkflowStub client (u/namify workflow-id))] (log/trace "create-workflow id:" workflow-id) {:client client :stub stub})) + ([^WorkflowClient client workflow options] (let [wf-name (w/get-annotated-name workflow) options (w/wf-options-> options) @@ -100,11 +105,14 @@ Workflow ID Conflict Policies (`:workflow-id-conflict-policy`): (log/trace "create-workflow:" wf-name options) {:client client :stub stub}))) +(defn encode [^WorkflowClient client value] + (.. client (getOptions) (getDataConverter) (toPayloads (into-array Object value)))) + (defn start " Starts 'worklow' with 'params'" - [{:keys [^WorkflowStub stub] :as workflow} params] - (log/trace "start:" params) + [{:keys [^WorkflowStub stub ^WorkflowClient client] :as workflow} params] + (log/trace "start:" params "client:" client) (.start stub (u/->objarray params))) (defn signal-with-start @@ -159,9 +167,10 @@ defworkflow once the workflow concludes. @(get-result w)) ``` " - [{:keys [^WorkflowStub stub] :as workflow}] - (-> (.getResultAsync stub u/bytes-type) - (p/then nippy/thaw) + [{:keys [^WorkflowStub stub ^WorkflowClient client] :as workflow}] + (log/trace "get-result client:" client "stub:" stub) + (-> (.getResultAsync stub Object) + (p/then identity) (p/catch e/slingshot? e/recast-stone) (p/catch (fn [e] (log/error e) @@ -182,8 +191,7 @@ Arguments: ``` " [{:keys [^WorkflowStub stub] :as workflow} query-type args] - (-> (.query stub (u/namify query-type) u/bytes-type (u/->objarray args)) - (nippy/thaw))) + (-> (.query stub (u/namify query-type) u/bytes-type (u/->objarray args)))) (defn update " @@ -207,8 +215,7 @@ Arguments: " [{:keys [^WorkflowStub stub] :as workflow} update-type args] (log/trace "update:" update-type args) - (-> (.update stub (u/namify update-type) u/bytes-type (u/->objarray args)) - (nippy/thaw))) + (-> (.update stub (u/namify update-type) u/bytes-type (u/->objarray args)))) (def ^:private stage->enum "Maps keyword stage to WorkflowUpdateStage enum" @@ -222,7 +229,7 @@ Arguments: {:id (.getId handle) :execution (.getExecution handle) :result (-> (.getResultAsync handle) - (p/then nippy/thaw) + (p/then identity) (p/catch e/slingshot? e/recast-stone) (p/catch (fn [e] (log/error e) diff --git a/src/temporal/client/options.clj b/src/temporal/client/options.clj index cc757f9..a769231 100644 --- a/src/temporal/client/options.clj +++ b/src/temporal/client/options.clj @@ -1,10 +1,18 @@ (ns temporal.client.options - (:require [temporal.internal.utils :as u]) - (:import [io.temporal.client WorkflowClientOptions WorkflowClientOptions$Builder] - [io.temporal.common.interceptors WorkflowClientInterceptorBase] - [io.temporal.client.schedules ScheduleClientOptions ScheduleClientOptions$Builder] - [io.temporal.serviceclient WorkflowServiceStubs WorkflowServiceStubsOptions WorkflowServiceStubsOptions$Builder] - [io.temporal.authorization AuthorizationTokenSupplier])) + (:require + [temporal.converter.default :as default-data-converter] + [temporal.internal.utils :as u]) + (:import + [io.temporal.authorization AuthorizationTokenSupplier] + [io.temporal.client WorkflowClientOptions WorkflowClientOptions$Builder] + [io.temporal.client.schedules ScheduleClientOptions ScheduleClientOptions$Builder] + [io.temporal.common.interceptors WorkflowClientInterceptorBase] + [io.temporal.serviceclient WorkflowServiceStubs WorkflowServiceStubsOptions WorkflowServiceStubsOptions$Builder])) + +(defn assoc-default-data-converter [{:keys [data-converter] :as params}] + (cond-> params + (not data-converter) + (assoc :data-converter (default-data-converter/create)))) (def workflow-client-options " @@ -25,7 +33,10 @@ (defn ^:no-doc workflow-client-options-> ^WorkflowClientOptions [params] - (u/build (WorkflowClientOptions/newBuilder (WorkflowClientOptions/getDefaultInstance)) workflow-client-options params)) + (->> params + (assoc-default-data-converter) + (u/build (WorkflowClientOptions/newBuilder (WorkflowClientOptions/getDefaultInstance)) + workflow-client-options))) (def schedule-client-options " @@ -45,7 +56,10 @@ (defn ^:no-doc schedule-client-options-> ^ScheduleClientOptions [params] - (u/build (ScheduleClientOptions/newBuilder (ScheduleClientOptions/getDefaultInstance)) schedule-client-options params)) + (->> params + (assoc-default-data-converter) + (u/build (ScheduleClientOptions/newBuilder (ScheduleClientOptions/getDefaultInstance)) + schedule-client-options))) (defn ^:no-doc apikey-auth-fn-> ^AuthorizationTokenSupplier [f] diff --git a/src/temporal/client/worker.clj b/src/temporal/client/worker.clj index 8a6ba81..50eace7 100644 --- a/src/temporal/client/worker.clj +++ b/src/temporal/client/worker.clj @@ -4,15 +4,15 @@ "Methods for managing a Temporal worker instance" (:require [taoensso.timbre :as log] [temporal.internal.activity :as a] - [temporal.internal.workflow :as w] - [temporal.internal.utils :as u]) - (:import [io.temporal.worker Worker WorkerFactory WorkerFactoryOptions WorkerFactoryOptions$Builder WorkerOptions WorkerOptions$Builder] + [temporal.internal.utils :as u] + [temporal.internal.workflow :as w]) + (:import [io.temporal.common.interceptors WorkerInterceptor] + [io.temporal.worker Worker WorkerFactory WorkerFactoryOptions WorkerFactoryOptions$Builder WorkerOptions WorkerOptions$Builder] [io.temporal.worker.tuning PollerBehavior PollerBehaviorAutoscaling] - [temporal.internal.dispatcher DynamicWorkflowProxy] [io.temporal.workflow DynamicWorkflow] - [io.temporal.common.interceptors WorkerInterceptor])) + [temporal.internal.dispatcher DynamicWorkflowProxy])) -(defn ^:no-doc init +(defn ^:no-doc init " Initializes a worker instance, suitable for real connections or unit-testing with temporal.testing.env " diff --git a/src/temporal/codec.clj b/src/temporal/codec.clj index b8cc255..f5fcdd8 100644 --- a/src/temporal/codec.clj +++ b/src/temporal/codec.clj @@ -4,10 +4,11 @@ "Methods for managing codecs between a client and the Temporal backend" (:require [clojure.core.protocols :as p] [clojure.datafy :as d] - [medley.core :as m]) - (:import [io.temporal.common.converter DefaultDataConverter CodecDataConverter] + [medley.core :as m] + [temporal.converter.default :as default-data-converter]) + (:import [io.temporal.api.common.v1 Payload] + [io.temporal.common.converter CodecDataConverter] [io.temporal.payload.codec PayloadCodec] - [io.temporal.api.common.v1 Payload] [io.temporal.shaded.com.google.protobuf ByteString] [java.util Collections])) @@ -59,12 +60,17 @@ (defn create "Creates an instance of a [DataConverter](https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/converter/DataConverter.html) that accepts a [[Codec]]" - ^CodecDataConverter [codec] - (CodecDataConverter. - (DefaultDataConverter/newDefaultInstance) - (Collections/singletonList - (reify PayloadCodec - (encode [_ payloads] - (-encode codec payloads)) - (decode [_ payloads] - (-decode codec payloads)))))) + ^CodecDataConverter + ([codec] + (create (default-data-converter/create) codec)) + + ^CodecDataConverter + ([data-converter codec] + (CodecDataConverter. + data-converter + (Collections/singletonList + (reify PayloadCodec + (encode [_ payloads] + (-encode codec payloads)) + (decode [_ payloads] + (-decode codec payloads))))))) diff --git a/src/temporal/internal/activity.clj b/src/temporal/internal/activity.clj index 9e9bb4f..63065cd 100644 --- a/src/temporal/internal/activity.clj +++ b/src/temporal/internal/activity.clj @@ -1,15 +1,14 @@ ;; Copyright © Manetu, Inc. All rights reserved (ns ^:no-doc temporal.internal.activity - (:require [clojure.core.protocols :as p] + (:require [clojure.core.async :refer [ (fn [_ r] (type r))) diff --git a/src/temporal/internal/utils.clj b/src/temporal/internal/utils.clj index 002f4e6..f0f5a9f 100644 --- a/src/temporal/internal/utils.clj +++ b/src/temporal/internal/utils.clj @@ -3,8 +3,7 @@ (ns ^:no-doc temporal.internal.utils (:require [clojure.string :as string] [medley.core :as m] - [taoensso.timbre :as log] - [taoensso.nippy :as nippy]) + [taoensso.timbre :as log]) (:import [io.temporal.common.converter EncodedValues] [io.temporal.workflow Functions$Func Functions$Func1 @@ -17,6 +16,7 @@ (def ^Class bytes-type (Class/forName "[B")) (defn build [builder spec params] + (log/trace "building" builder "with" params) (try (doseq [[key value] params] (if-let [f (get spec key)] @@ -105,12 +105,13 @@ (defn ->objarray "Serializes x to an array of Objects, suitable for many Temporal APIs" [x] - (into-array Object [(nippy/freeze x)])) + (into-array Object [x])) (defn ->args "Decodes EncodedValues to native clojure data type. Assumes all data is in the first element" [^EncodedValues args] - (nippy/thaw (.get args (int 0) bytes-type))) + (log/trace "args:" args) + (.get args (int 0) bytes-type)) (def namify "Converts strings or keywords to strings, preserving fully qualified keywords when applicable" @@ -120,10 +121,9 @@ (defn complete-invoke [stub result] - (log/trace stub "completed with" (count result) "bytes") - (let [r (nippy/thaw result)] - (log/trace stub "results:" r) - r)) + (log/trace stub "completed with" result) + + result) (defn ->Func [f] diff --git a/src/temporal/internal/workflow.clj b/src/temporal/internal/workflow.clj index 50b4b72..97f8ea2 100644 --- a/src/temporal/internal/workflow.clj +++ b/src/temporal/internal/workflow.clj @@ -4,13 +4,12 @@ (:require [clojure.core.protocols :as p] [clojure.datafy :as d] [slingshot.slingshot :refer [try+]] - [taoensso.nippy :as nippy] [taoensso.timbre :as log] [temporal.common :as common] [temporal.internal.exceptions :as e] [temporal.internal.signals :as s] [temporal.internal.utils :as u]) - (:import [io.temporal.api.enums.v1 WorkflowIdReusePolicy WorkflowIdConflictPolicy] + (:import [io.temporal.api.enums.v1 WorkflowIdConflictPolicy WorkflowIdReusePolicy] [io.temporal.client WorkflowOptions WorkflowOptions$Builder] [io.temporal.internal.sync DestroyWorkflowThreadError] [io.temporal.workflow Workflow WorkflowInfo])) @@ -105,7 +104,7 @@ (f ctx {:args a :signals (s/create-signal-chan)}) (f a))] (log/trace workflow-id "result:" r) - (nippy/freeze r)) + r) (catch DestroyWorkflowThreadError ex (log/debug workflow-id "thread evicted") (throw ex)) diff --git a/src/temporal/side_effect.clj b/src/temporal/side_effect.clj index a4c984f..2912d93 100644 --- a/src/temporal/side_effect.clj +++ b/src/temporal/side_effect.clj @@ -2,8 +2,7 @@ (ns temporal.side-effect "Methods for managing side-effects from within workflows" - (:require [taoensso.nippy :as nippy] - [temporal.internal.utils :refer [->Func] :as u]) + (:require [temporal.internal.utils :refer [->Func] :as u]) (:import [io.temporal.workflow Workflow] [java.time Instant])) @@ -15,11 +14,9 @@ (defn invoke "Invokes 'f' via a Temporal [SideEffect](https://docs.temporal.io/concepts/what-is-a-side-effect/)" [f] - (nippy/thaw - (Workflow/sideEffect u/bytes-type - (->Func (fn [] (nippy/freeze (f))))))) + (Workflow/sideEffect u/bytes-type (->Func f))) (defn now "Returns the java.time.Instant as a SideEffect" [] - (invoke #(Instant/now))) \ No newline at end of file + (invoke #(Instant/now))) diff --git a/src/temporal/testing/env.clj b/src/temporal/testing/env.clj index 15810a4..ea8e696 100644 --- a/src/temporal/testing/env.clj +++ b/src/temporal/testing/env.clj @@ -23,7 +23,8 @@ (defn ^:no-doc test-env-options-> ^TestEnvironmentOptions [params] - (u/build (TestEnvironmentOptions/newBuilder) test-env-options params)) + (->> (merge {:workflow-client-options nil} params) + (u/build (TestEnvironmentOptions/newBuilder) test-env-options))) (defn create " @@ -47,7 +48,8 @@ Arguments: " ([] - (TestWorkflowEnvironment/newInstance)) + (create {})) + ([options] (TestWorkflowEnvironment/newInstance (test-env-options-> options)))) @@ -67,7 +69,8 @@ Arguments: ``` " [env {:keys [task-queue] :as options}] - (let [worker (.newWorker env (u/namify task-queue) (worker/worker-options-> options))] + (let [options (dissoc options :task-queue) + worker (.newWorker env (u/namify task-queue) (worker/worker-options-> options))] (worker/init worker options) (.start env) worker)) diff --git a/src/temporal/workflow.clj b/src/temporal/workflow.clj index 24a2387..eefe626 100644 --- a/src/temporal/workflow.clj +++ b/src/temporal/workflow.clj @@ -3,15 +3,14 @@ (ns temporal.workflow "Methods for defining and implementing Temporal workflows" (:require - [taoensso.nippy :as nippy] - [taoensso.timbre :as log] [promesa.core :as p] + [taoensso.timbre :as log] [temporal.common :as common] + [temporal.internal.child-workflow :as cw] [temporal.internal.exceptions :as e] [temporal.internal.search-attributes :as sa] [temporal.internal.utils :as u] - [temporal.internal.workflow :as w] - [temporal.internal.child-workflow :as cw]) + [temporal.internal.workflow :as w]) (:import [io.temporal.api.common.v1 WorkflowExecution] [io.temporal.workflow ContinueAsNewOptions ContinueAsNewOptions$Builder DynamicQueryHandler DynamicUpdateHandler ExternalWorkflowStub Workflow WorkflowLock] [java.util.function Supplier] @@ -79,8 +78,7 @@ Arguments: (let [query-type (keyword query-type) args (u/->args args)] (log/trace "handling query->" "query-type:" query-type "args:" args) - (-> (f query-type args) - (nippy/freeze))))))) + (f query-type args)))))) (defn register-update-handler! " @@ -138,8 +136,7 @@ if the update request is invalid. Validators must not mutate state or perform bl (let [update-type (keyword update-type) args (u/->args args)] (log/trace "handling update->" "update-type:" update-type "args:" args) - (-> (f update-type args) - (nippy/freeze)))) + (f update-type args))) (handleValidate [_ update-type args] (when validator (let [update-type (keyword update-type) diff --git a/test/temporal/client/options_test.clj b/test/temporal/client/options_test.clj new file mode 100644 index 0000000..3eb1b61 --- /dev/null +++ b/test/temporal/client/options_test.clj @@ -0,0 +1,27 @@ +(ns temporal.client.options-test + (:require + [clojure.test :refer [are deftest]] + [temporal.client.options :as sut]) + (:import + [io.temporal.common.converter DataConverter DefaultDataConverter])) + +(def MockDataConverter + (reify DataConverter)) + +(deftest workflow-client-options->without-data-converter + (are [data-converter-class options] (->> options + (sut/workflow-client-options->) + (.getDataConverter) + (instance? data-converter-class)) + DefaultDataConverter {} + DefaultDataConverter nil + DataConverter {:data-converter MockDataConverter})) + +(deftest schedule-client-options->without-data-converter + (are [data-converter-class options] (->> options + (sut/schedule-client-options->) + (.getDataConverter) + (instance? data-converter-class)) + DefaultDataConverter {} + DefaultDataConverter nil + DataConverter {:data-converter MockDataConverter})) diff --git a/test/temporal/test/utils.clj b/test/temporal/test/utils.clj index c3e0d26..ec8663a 100644 --- a/test/temporal/test/utils.clj +++ b/test/temporal/test/utils.clj @@ -2,10 +2,15 @@ (ns temporal.test.utils "Utilities common to all tests" - (:require [taoensso.timbre :as log] - [temporal.testing.env :as e] - [temporal.client.core :as c]) - (:import [java.time Duration])) + (:require + [taoensso.timbre :as log] + [temporal.client.core :as c] + [temporal.converter.default :as default-data-converter] + [temporal.converter.json :as json] + [temporal.testing.env :as e]) + (:import + [io.temporal.common.converter NullPayloadConverter] + [java.time Duration])) (log/set-min-level! :trace) @@ -31,14 +36,19 @@ ;;----------------------------------------------------------------------------- ;; Fixtures ;;----------------------------------------------------------------------------- -(defn create-service [] - (let [env (e/create) - client (e/get-client env) - worker (e/start env {:task-queue task-queue})] - (swap! state assoc - :env env - :worker worker - :client client))) +(defn create-service + ([] + (create-service {})) + + ([options] + (let [env (e/create {:workflow-client-options options}) + client (e/get-client env) + worker (e/start env {:task-queue task-queue})] + (log/trace "options:" (.getOptions client)) + (swap! state assoc + :env env + :worker worker + :client client)))) (defn destroy-service [] (swap! state @@ -46,7 +56,12 @@ (e/synchronized-stop env) (dissoc s :env :worker :client)))) +(def data-converter + (default-data-converter/create + [(NullPayloadConverter.) + (json/create)])) + (defn wrap-service [test-fn] - (create-service) + (create-service #_{:data-converter data-converter}) (test-fn) (destroy-service)) From 26a695f73f3ee40d0e7f3ad4b0940da3ab34aa66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 23:01:26 +0100 Subject: [PATCH 05/14] Fix typo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- test/temporal/test/concurrency.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/temporal/test/concurrency.clj b/test/temporal/test/concurrency.clj index 784a83c..e9acb0c 100644 --- a/test/temporal/test/concurrency.clj +++ b/test/temporal/test/concurrency.clj @@ -102,7 +102,7 @@ (p/then (fn [r] (log/info "r:" r) r)))) (deftest child-workflow-concurrency-test - (testing "Using a child workflow instead of an ctivity works with the promise api" + (testing "Using a child workflow instead of an activity works with the promise api" (let [workflow (t/create-workflow concurrent-parent-workflow)] (c/start workflow {}) (is (-> workflow c/get-result deref count (= 10))) From 9d5593f8be1e98e30f4d2d8cc289797635d5b40d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 23:07:01 +0100 Subject: [PATCH 06/14] Exclude empty MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- src/temporal/converter/metadata.clj | 1 + src/temporal/converter/optional.clj | 1 + 2 files changed, 2 insertions(+) diff --git a/src/temporal/converter/metadata.clj b/src/temporal/converter/metadata.clj index 63ada90..b2fbe4c 100644 --- a/src/temporal/converter/metadata.clj +++ b/src/temporal/converter/metadata.clj @@ -1,4 +1,5 @@ (ns temporal.converter.metadata + (:refer-clojure :exclude [empty]) (:require [temporal.converter.byte-string :refer [->byte-string byte-string?]]) (:import diff --git a/src/temporal/converter/optional.clj b/src/temporal/converter/optional.clj index 5e99ecc..67f9859 100644 --- a/src/temporal/converter/optional.clj +++ b/src/temporal/converter/optional.clj @@ -1,4 +1,5 @@ (ns temporal.converter.optional + (:refer-clojure :exclude [empty]) (:import [java.util Optional])) From d1af04b431e5127de7343e7f1522c0829b9ab541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 23:07:43 +0100 Subject: [PATCH 07/14] Remove unused import MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- src/temporal/internal/child_workflow.clj | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/temporal/internal/child_workflow.clj b/src/temporal/internal/child_workflow.clj index 3335dae..80f2fba 100644 --- a/src/temporal/internal/child_workflow.clj +++ b/src/temporal/internal/child_workflow.clj @@ -2,8 +2,7 @@ (:require [temporal.common :as common] [temporal.internal.utils :as u] [temporal.internal.workflow :as w]) - (:import [java.time Duration] - [io.temporal.api.enums.v1 ParentClosePolicy] + (:import [io.temporal.api.enums.v1 ParentClosePolicy] [io.temporal.workflow ChildWorkflowOptions ChildWorkflowOptions$Builder ChildWorkflowCancellationType])) (def cancellation-type-> From eb1a39824f3179c3d4d5f2ed5e6392ee75d4f045 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 23:08:15 +0100 Subject: [PATCH 08/14] Exclude update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- src/temporal/client/core.clj | 1 + 1 file changed, 1 insertion(+) diff --git a/src/temporal/client/core.clj b/src/temporal/client/core.clj index a18d02d..4ff930b 100644 --- a/src/temporal/client/core.clj +++ b/src/temporal/client/core.clj @@ -2,6 +2,7 @@ (ns temporal.client.core "Methods for client interaction with Temporal" + (:refer-clojure :exclude [update]) (:require [promesa.core :as p] [taoensso.timbre :as log] [temporal.client.options :as copts] From 2f08a33dc1db4231ac14e71c2f20dc3532664418 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 23:14:40 +0100 Subject: [PATCH 09/14] Prefix unused bindings with _ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- src/temporal/client/core.clj | 22 +++++++++++----------- src/temporal/client/worker.clj | 2 +- src/temporal/codec.clj | 2 +- src/temporal/internal/activity.clj | 2 +- src/temporal/internal/promise.clj | 14 +++++++------- src/temporal/tls.clj | 2 +- 6 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/temporal/client/core.clj b/src/temporal/client/core.clj index 4ff930b..2fc676f 100644 --- a/src/temporal/client/core.clj +++ b/src/temporal/client/core.clj @@ -112,7 +112,7 @@ Workflow ID Conflict Policies (`:workflow-id-conflict-policy`): (defn start " Starts 'worklow' with 'params'" - [{:keys [^WorkflowStub stub ^WorkflowClient client] :as workflow} params] + [{:keys [^WorkflowStub stub ^WorkflowClient client] :as _workflow} params] (log/trace "start:" params "client:" client) (.start stub (u/->objarray params))) @@ -137,7 +137,7 @@ set `:workflow-id-conflict-policy` to `:use-existing` when calling [[create-work (signal-with-start w :my-signal {:data \"value\"} {:initial \"args\"})) ``` " - [{:keys [^WorkflowStub stub] :as workflow} signal-name signal-params wf-params] + [{:keys [^WorkflowStub stub] :as _workflow} signal-name signal-params wf-params] (log/trace "signal-with-start->" "signal:" signal-name signal-params "workflow-params:" wf-params) (.signalWithStart stub (u/namify signal-name) (u/->objarray signal-params) (u/->objarray wf-params))) @@ -149,7 +149,7 @@ Sends 'params' as a signal 'signal-name' to 'workflow' (>! workflow ::my-signal {:msg \"Hi\"}) ``` " - [{:keys [^WorkflowStub stub] :as workflow} signal-name params] + [{:keys [^WorkflowStub stub] :as _workflow} signal-name params] (log/trace ">!" signal-name params) (.signal stub (u/namify signal-name) (u/->objarray params))) @@ -168,7 +168,7 @@ defworkflow once the workflow concludes. @(get-result w)) ``` " - [{:keys [^WorkflowStub stub ^WorkflowClient client] :as workflow}] + [{:keys [^WorkflowStub stub ^WorkflowClient client] :as _workflow}] (log/trace "get-result client:" client "stub:" stub) (-> (.getResultAsync stub Object) (p/then identity) @@ -191,7 +191,7 @@ Arguments: (query workflow ::my-query {:foo \"bar\"}) ``` " - [{:keys [^WorkflowStub stub] :as workflow} query-type args] + [{:keys [^WorkflowStub stub] :as _workflow} query-type args] (-> (.query stub (u/namify query-type) u/bytes-type (u/->objarray args)))) (defn update @@ -214,7 +214,7 @@ Arguments: (update workflow :increment {:amount 5}) ``` " - [{:keys [^WorkflowStub stub] :as workflow} update-type args] + [{:keys [^WorkflowStub stub] :as _workflow} update-type args] (log/trace "update:" update-type args) (-> (.update stub (u/namify update-type) u/bytes-type (u/->objarray args)))) @@ -279,7 +279,7 @@ Returns a map with: " ([workflow update-type args] (start-update workflow update-type args {})) - ([{:keys [^WorkflowStub stub] :as workflow} update-type args {:keys [wait-for-stage update-id] :as options}] + ([{:keys [^WorkflowStub stub] :as _workflow} update-type args {:keys [wait-for-stage update-id] :as options}] (log/trace "start-update:" update-type args options) (let [handle (if update-id ;; Use UpdateOptions when custom update-id is specified @@ -310,7 +310,7 @@ Returns a map with: @(:result handle)) ``` " - [{:keys [^WorkflowStub stub] :as workflow} update-id] + [{:keys [^WorkflowStub stub] :as _workflow} update-id] (log/trace "get-update-handle:" update-id) (let [handle (.getUpdateHandle stub update-id u/bytes-type)] (wrap-update-handle handle))) @@ -341,7 +341,7 @@ Returns a map with: " ([workflow update-type update-args start-args] (update-with-start workflow update-type update-args start-args {})) - ([{:keys [^WorkflowStub stub] :as workflow} update-type update-args start-args options] + ([{:keys [^WorkflowStub stub] :as _workflow} update-type update-args start-args options] (log/trace "update-with-start:" update-type update-args start-args options) (let [update-options (build-update-options (merge {:update-name update-type :wait-for-stage (or (:wait-for-stage options) :accepted)} @@ -357,7 +357,7 @@ Gracefully cancels 'workflow' (cancel workflow) ``` " - [{:keys [^WorkflowStub stub] :as workflow}] + [{:keys [^WorkflowStub stub] :as _workflow}] (.cancel stub)) (defn terminate @@ -368,5 +368,5 @@ Forcefully terminates 'workflow' (terminate workflow \"unresponsive\", {}) ``` " - [{:keys [^WorkflowStub stub] :as workflow} reason params] + [{:keys [^WorkflowStub stub] :as _workflow} reason params] (.terminate stub reason (u/->objarray params))) diff --git a/src/temporal/client/worker.clj b/src/temporal/client/worker.clj index 50eace7..2d908f5 100644 --- a/src/temporal/client/worker.clj +++ b/src/temporal/client/worker.clj @@ -184,5 +184,5 @@ Arguments: (stop instance)) ``` " - [{:keys [^WorkerFactory factory] :as instance}] + [{:keys [^WorkerFactory factory] :as _instance}] (.shutdown factory)) diff --git a/src/temporal/codec.clj b/src/temporal/codec.clj index f5fcdd8..d8ba758 100644 --- a/src/temporal/codec.clj +++ b/src/temporal/codec.clj @@ -35,7 +35,7 @@ (d/datafy x)) (defn- ^:no-doc ->payload - [{:keys [metadata data] :as payload}] + [{:keys [metadata data] :as _payload}] (let [builder (Payload/newBuilder)] (run! (fn [[k v]] (.putMetadata builder k (ByteString/copyFrom (bytes v)))) metadata) (when (some? data) diff --git a/src/temporal/internal/activity.clj b/src/temporal/internal/activity.clj index 63065cd..9348e68 100644 --- a/src/temporal/internal/activity.clj +++ b/src/temporal/internal/activity.clj @@ -106,7 +106,7 @@ (defn- -execute [ctx dispatch args] - (let [{:keys [activity-type activity-id] :as info} (get-info) + (let [{:keys [activity-type activity-id] :as _info} (get-info) f (u/find-dispatch-fn dispatch activity-type) a (u/->args args)] (log/trace activity-id "calling" f "with args:" a) diff --git a/src/temporal/internal/promise.clj b/src/temporal/internal/promise.clj index cab1ed3..a594f47 100644 --- a/src/temporal/internal/promise.clj +++ b/src/temporal/internal/promise.clj @@ -53,7 +53,7 @@ (-fmap ([it f] (pt/-promise (.thenApply ^Promise (.p it) (->Func (comp ->temporal f))))) - ([it f executor] + ([it f _executor] (pt/-fmap it f))) ;; -mcat: Monadic bind with one-level flatten @@ -61,21 +61,21 @@ (-mcat ([it f] (pt/-promise (.thenCompose ^Promise (.p it) (->Func (comp ->temporal f))))) - ([it f executor] + ([it f _executor] (pt/-mcat it f))) ;; -then: Apply with multi-level flatten (same as v9) (-then ([it f] (pt/-promise (.thenCompose ^Promise (.p it) (->Func (comp ->temporal f))))) - ([it f executor] + ([it f _executor] (pt/-then it f))) ;; -merr: Error mapping with flatten ;; Combines old v9 -mapErr and -thenErr behavior (-merr ([it f] - (letfn [(handler [v e] + (letfn [(handler [_v e] (if e (let [cause (if (instance? Promise e) (.getFailure e) @@ -86,7 +86,7 @@ (.handle $$ (->Func handler)) (.thenCompose $$ fw-identity) (pt/-promise $$)))) - ([it f executor] + ([it f _executor] (pt/-merr it f))) ;; -hmap: Handle both success and failure @@ -97,7 +97,7 @@ (.handle $$ (->Func (comp ->temporal f))) (.thenCompose $$ fw-identity) (pt/-promise $$))) - ([it f executor] + ([it f _executor] (pt/-hmap it f))) ;; -fnly: Finally handler (return value ignored) @@ -108,7 +108,7 @@ (.handle $$ (->Func (fn [v e] (f v e) (if e (throw e) v)))) (.thenCompose $$ fw-identity) (pt/-promise $$))) - ([it f executor] + ([it f _executor] (pt/-fnly it f)))) (extend-protocol pt/IPromiseFactory diff --git a/src/temporal/tls.clj b/src/temporal/tls.clj index e92aafd..cda23c3 100644 --- a/src/temporal/tls.clj +++ b/src/temporal/tls.clj @@ -41,7 +41,7 @@ Arguments: - 'key-path': The path to a PEM encoded private key representing this client's identity, used for mutual TLS authentication. " - ^SslContext [{:keys [ca-path cert-path key-path] :as args}] + ^SslContext [{:keys [ca-path cert-path key-path] :as _args}] (-> (GrpcSslContexts/forClient) (cond-> (some? ca-path) (.trustManager (new-trustmanagerfactory ca-path)) From 6beb3121246ab2b05145c11076897c229fe5c464 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 23:17:20 +0100 Subject: [PATCH 10/14] Remove unnecessary let MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- src/temporal/activity.clj | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/temporal/activity.clj b/src/temporal/activity.clj index e943354..5ca22e0 100644 --- a/src/temporal/activity.clj +++ b/src/temporal/activity.clj @@ -35,11 +35,11 @@ along with the Activity Task for the next retry attempt and can be extracted by [] (let [ctx (Activity/getExecutionContext) - details (.getHeartbeatDetails ctx u/bytes-type)] - (let [v (when (.isPresent details) - (.get details))] - (log/trace "get-heartbeat-details:" v) - v))) + details (.getHeartbeatDetails ctx u/bytes-type) + v (when (.isPresent details) + (.get details))] + (log/trace "get-heartbeat-details:" v) + v)) (defn get-info "Returns information about the Activity execution" From 360dd582916ea089acddcd212bce737188f8e17c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Mon, 26 Jan 2026 23:47:13 +0100 Subject: [PATCH 11/14] Update lein-cljfmt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 80a7f29..b9181fb 100644 --- a/project.clj +++ b/project.clj @@ -5,7 +5,7 @@ :url "https://www.apache.org/licenses/LICENSE-2.0" :year 2023 :key "apache-2.0"} - :plugins [[lein-cljfmt "0.9.0"] + :plugins [[dev.weavejester/lein-cljfmt "0.15.6"] [lein-kibit "0.1.8"] [lein-bikeshed "0.5.2"] [lein-cloverage "1.2.4"] From 04cb1efadcc119e89a7d45382dc0cd10e790085a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Tue, 27 Jan 2026 00:00:00 +0100 Subject: [PATCH 12/14] Fix extend-protocol for lein-cloverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- src/temporal/converter/byte_string.clj | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/temporal/converter/byte_string.clj b/src/temporal/converter/byte_string.clj index dba14ef..f059456 100644 --- a/src/temporal/converter/byte_string.clj +++ b/src/temporal/converter/byte_string.clj @@ -7,14 +7,15 @@ (->byte-string ^ByteString [value])) (extend-protocol ToByteString + ;; byte/1 - FIXME: this is not supported by lein-cloverage + (Class/forName "[B") + (->byte-string [value] + (ByteString/copyFrom value)) + nil (->byte-string [_] ByteString/EMPTY) - byte/1 - (->byte-string [value] - (ByteString/copyFrom value)) - String (->byte-string [value] (ByteString/copyFrom value StandardCharsets/UTF_8)) From 17904e06184b4334e6f92f52230d1e2774b4878c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Tue, 27 Jan 2026 10:35:28 +0100 Subject: [PATCH 13/14] Switch from reify to defrecord MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- src/temporal/converter/json.clj | 35 +++++++++++++++++--------------- src/temporal/converter/nippy.clj | 19 +++++++++-------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/src/temporal/converter/json.clj b/src/temporal/converter/json.clj index 3cc28a6..87c52be 100644 --- a/src/temporal/converter/json.clj +++ b/src/temporal/converter/json.clj @@ -18,23 +18,26 @@ (def default-object-mapper json/keyword-keys-object-mapper) -(defn create ^PayloadConverter +(defrecord JSONConverter [object-mapper] + PayloadConverter + (getEncodingType [_] + json-plain) + + (toData [_ value] + (-> value + (json/write-value-as-bytes object-mapper) + (->payload metadata) + (->optional))) + + (fromData [_ ^Payload content ^Class _value-class ^Type _value-type] + (-> content + (.getData) + (.toByteArray) + (json/read-value object-mapper)))) + +(defn create ([] (create default-object-mapper)) ([object-mapper] - (reify PayloadConverter - (getEncodingType [_] - json-plain) - - (toData [_ value] - (-> value - (json/write-value-as-bytes object-mapper) - (->payload metadata) - (->optional))) - - (fromData [_ ^Payload content ^Class _value-class ^Type _value-type] - (-> content - (.getData) - (.toByteArray) - (json/read-value object-mapper)))))) + (JSONConverter. object-mapper))) diff --git a/src/temporal/converter/nippy.clj b/src/temporal/converter/nippy.clj index 7255a8d..1fbc163 100644 --- a/src/temporal/converter/nippy.clj +++ b/src/temporal/converter/nippy.clj @@ -15,13 +15,16 @@ (def metadata (->metadata binary-plain)) -(defn create ^PayloadConverter [] - (reify PayloadConverter - (getEncodingType [_] - binary-plain) +(defrecord NippyConverter [] + PayloadConverter + (getEncodingType [_] + binary-plain) - (toData [_ value] - (-> value (nippy/freeze) (->payload metadata) (->optional))) + (toData [_ value] + (-> value (nippy/freeze) (->payload metadata) (->optional))) - (fromData [_ ^Payload content ^Class _value-class ^Type _value-type] - (-> content (.getData) (.toByteArray) (nippy/thaw))))) + (fromData [_ ^Payload content ^Class _value-class ^Type _value-type] + (-> content (.getData) (.toByteArray) (nippy/thaw)))) + +(defn create [] + (NippyConverter.)) From 29cb3d74a076c70e416bd0db01e7d81cb4ead4e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?KARASZI=20Istv=C3=A1n?= Date: Tue, 27 Jan 2026 12:54:54 +0100 Subject: [PATCH 14/14] Flip the argument order MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: KARASZI István --- test/temporal/test/versioning.clj | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/temporal/test/versioning.clj b/test/temporal/test/versioning.clj index a6c8d3a..d4054f7 100644 --- a/test/temporal/test/versioning.clj +++ b/test/temporal/test/versioning.clj @@ -44,8 +44,8 @@ wf (c/create-workflow client versioned-workflow {:task-queue t/task-queue :workflow-id "test-1"})] (testing "Invoke our v1 workflow" (c/start wf {}) - (is (= @(c/get-result wf) :v1)) - (is (= @mailbox :v1))) + (is (= :v1 @(c/get-result wf))) + (is (= :v1 @mailbox))) (with-mock _ {:target ::workflow-v1 :return workflow-v2} ;; emulates a code update by dynamically substituting v2 for v1 @@ -53,10 +53,10 @@ (reset! mailbox :slug) (let [history (.fetchHistory client "test-1")] (.replayWorkflowExecution worker history)) - (is (= @mailbox :slug))) ;; activity is not re-executed in replay, so the :slug should remain + (is (= :slug @mailbox))) ;; activity is not re-executed in replay, so the :slug should remain (testing "Invoke our workflow fresh and verify that it takes the v2 path" (reset! mailbox nil) (let [wf2 (c/create-workflow client versioned-workflow {:task-queue t/task-queue :workflow-id "test-2"})] (c/start wf2 {}) - (is (= @(c/get-result wf2) :v2)) - (is (= @mailbox :v2))))))) + (is (= :v2 @(c/get-result wf2))) + (is (= :v2 @mailbox)))))))