From 438c74a6144e240ed2624cf93c66a42de7ad4d73 Mon Sep 17 00:00:00 2001 From: Mehmet TOSUN <93265833+middt@users.noreply.github.com> Date: Tue, 24 Feb 2026 09:25:01 +0300 Subject: [PATCH] feat(state): add ClickHouse state store component Add a new state store component for ClickHouse with: - Full CRUD operations via database/sql with clickhouse-go driver - ETag support for optimistic concurrency (UUID-based) - TTL support with DateTime64 expiration column - ReplacingMergeTree engine for efficient updates - Username/password authentication - Bulk operations (BulkGet, BulkSet, BulkDelete) - Identifier length validation (max 250 for filesystem safety) - Auto-creation of database and table if not exists - Conformance test configuration and Docker Compose infrastructure - Certification test with Docker Compose setup Addresses review feedback: - Copyright year set to 2025 - Metadata fields use camelCase (clickhouseUrl, databaseName, tableName) - maxIdentifierLength reduced to 250 (from 256) for filesystem suffix room - Clean rebase on main (no accidental reverts of unrelated files) Signed-off-by: Mehmet TOSUN <93265833+middt@users.noreply.github.com> --- .../docker-compose-clickhouse.yml | 21 + go.mod | 15 +- go.sum | 41 +- state/clickhouse/clickhouse.go | 421 ++++++++++++++ .../clickhouse/clickhouse_integration_test.go | 543 ++++++++++++++++++ state/clickhouse/clickhouse_test.go | 222 +++++++ state/clickhouse/metadata.yaml | 50 ++ .../state/clickhouse/clickhouse_test.go | 138 +++++ .../state/clickhouse/docker-compose.yml | 21 + tests/config/state/clickhouse/clickhouse.yml | 18 + tests/config/state/tests.yml | 4 + tests/conformance/state_test.go | 3 + 12 files changed, 1483 insertions(+), 14 deletions(-) create mode 100644 .github/infrastructure/docker-compose-clickhouse.yml create mode 100644 state/clickhouse/clickhouse.go create mode 100644 state/clickhouse/clickhouse_integration_test.go create mode 100644 state/clickhouse/clickhouse_test.go create mode 100644 state/clickhouse/metadata.yaml create mode 100644 tests/certification/state/clickhouse/clickhouse_test.go create mode 100644 tests/certification/state/clickhouse/docker-compose.yml create mode 100644 tests/config/state/clickhouse/clickhouse.yml diff --git a/.github/infrastructure/docker-compose-clickhouse.yml b/.github/infrastructure/docker-compose-clickhouse.yml new file mode 100644 index 0000000000..cf61af77fe --- /dev/null +++ b/.github/infrastructure/docker-compose-clickhouse.yml @@ -0,0 +1,21 @@ +version: '3.8' + +services: + clickhouse: + image: clickhouse/clickhouse-server:latest + ports: + - "9000:9000" + - "8123:8123" + environment: + - CLICKHOUSE_USER=default + - CLICKHOUSE_PASSWORD=clickhouse_password + - CLICKHOUSE_DB=dapr_test + ulimits: + nofile: + soft: 262144 + hard: 262144 + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:8123/ping"] + interval: 5s + timeout: 5s + retries: 10 diff --git a/go.mod b/go.mod index b6c41ebff2..da2fd9c1a0 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 github.com/Azure/go-amqp v1.0.5 + github.com/ClickHouse/clickhouse-go/v2 v2.43.0 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/IBM/sarama v1.45.2 github.com/aerospike/aerospike-client-go/v6 v6.12.0 @@ -175,6 +176,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.0.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect + github.com/ClickHouse/ch-go v0.71.0 // indirect github.com/Code-Hex/go-generics-cache v1.3.1 // indirect github.com/DataDog/zstd v1.5.2 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 // indirect @@ -194,7 +196,7 @@ require ( github.com/alibabacloud-go/tea-xml v1.1.2 // indirect github.com/aliyun/credentials-go v1.1.2 // indirect github.com/aliyunmq/mq-http-go-sdk v1.0.3 // indirect - github.com/andybalholm/brotli v1.1.0 // indirect + github.com/andybalholm/brotli v1.2.0 // indirect github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc // indirect github.com/apache/rocketmq-client-go v1.2.5 // indirect github.com/apache/thrift v0.13.0 // indirect @@ -273,6 +275,8 @@ require ( github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/gage-technologies/mistral-go v1.1.0 // indirect github.com/gavv/httpexpect v2.0.0+incompatible // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.7.1 // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/go-jose/go-jose/v4 v4.1.3 // indirect github.com/go-kit/kit v0.10.0 // indirect @@ -385,10 +389,11 @@ require ( github.com/opencontainers/image-spec v1.1.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/panjf2000/ants/v2 v2.11.3 // indirect + github.com/paulmach/orb v0.12.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect - github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/pierrec/lz4/v4 v4.1.25 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pkoukk/tiktoken-go v0.1.6 // indirect @@ -405,11 +410,12 @@ require ( github.com/rs/zerolog v1.31.0 // indirect github.com/sagikazarmark/locafero v0.9.0 // indirect github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect - github.com/segmentio/asm v1.2.0 // indirect + github.com/segmentio/asm v1.2.1 // indirect github.com/sendgrid/rest v2.6.9+incompatible // indirect github.com/shirou/gopsutil/v3 v3.24.5 // indirect github.com/shirou/gopsutil/v4 v4.25.12 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/shopspring/decimal v1.4.0 // indirect github.com/sirupsen/logrus v1.9.4 // indirect github.com/sony/gobreaker v0.5.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect @@ -452,7 +458,8 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect go.opentelemetry.io/otel/trace v1.40.0 // indirect go.uber.org/atomic v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect + go.uber.org/zap v1.27.1 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/arch v0.14.0 // indirect golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect golang.org/x/sync v0.19.0 // indirect diff --git a/go.sum b/go.sum index 392ac19299..cb289f2319 100644 --- a/go.sum +++ b/go.sum @@ -125,6 +125,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/ClickHouse/ch-go v0.71.0 h1:bUdZ/EZj/LcVHsMqaRUP2holqygrPWQKeMjc6nZoyRM= +github.com/ClickHouse/ch-go v0.71.0/go.mod h1:NwbNc+7jaqfY58dmdDUbG4Jl22vThgx1cYjBw0vtgXw= +github.com/ClickHouse/clickhouse-go/v2 v2.43.0 h1:fUR05TrF1GyvLDa/mAQjkx7KbgwdLRffs2n9O3WobtE= +github.com/ClickHouse/clickhouse-go/v2 v2.43.0/go.mod h1:o6jf7JM/zveWC/PP277BLxjHy5KjnGX/jfljhM4s34g= github.com/Code-Hex/go-generics-cache v1.3.1 h1:i8rLwyhoyhaerr7JpjtYjJZUcCbWOdiYO3fZXLiEC4g= github.com/Code-Hex/go-generics-cache v1.3.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= @@ -221,8 +225,8 @@ github.com/aliyun/credentials-go v1.1.2/go.mod h1:ozcZaMR5kLM7pwtCMEpVmQ242suV6q github.com/aliyunmq/mq-http-go-sdk v1.0.3 h1:/uhH7DUoaw9XTtsPgDp7zdPUyG5FBKj2GmJJph9z+6o= github.com/aliyunmq/mq-http-go-sdk v1.0.3/go.mod h1:JYfRMQoPexERvnNNBcal0ZQ2TVQ5ialDiW9ScjaadEM= github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= -github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= -github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= +github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc h1:NZRon3MDqT4vddR3UIRBnwbbhEerghAimCSBsiESs3g= github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc/go.mod h1:cPJlbcHUTNTpiboMQjMHhE9XBni11LiBiG8FdrDuVzk= @@ -681,6 +685,10 @@ github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkPro github.com/go-chi/chi/v5 v5.0.7/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-co-op/gocron v1.9.0/go.mod h1:DbJm9kdgr1sEvWpHCA7dFFs/PGHPMil9/97EXCRPr4k= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= +github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -1003,8 +1011,8 @@ github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/C github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.1.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= -github.com/hashicorp/go-version v1.2.1 h1:zEfKbn2+PDgroKdiOzqiE8rsmLqU2uwi5PB5pBJ3TkI= -github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4= +github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -1418,6 +1426,9 @@ github.com/pashagolub/pgxmock/v2 v2.12.0 h1:IVRmQtVFNCoq7NOZ+PdfvB6fwnLJmEuWDhnc github.com/pashagolub/pgxmock/v2 v2.12.0/go.mod h1:D3YslkN/nJ4+umVqWmbwfSXugJIjPMChkGBG47OJpNw= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/paulmach/orb v0.12.0 h1:z+zOwjmG3MyEEqzv92UN49Lg1JFYx0L9GpGKNVDKk1s= +github.com/paulmach/orb v0.12.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= @@ -1433,8 +1444,8 @@ github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= -github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0= +github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -1555,8 +1566,8 @@ github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPO github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= -github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= +github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/sendgrid/rest v2.6.9+incompatible h1:1EyIcsNdn9KIisLW50MKwmSRSK+ekueiEMJ7NEoxJo0= github.com/sendgrid/rest v2.6.9+incompatible/go.mod h1:kXX7q3jZtJXK5c5qK83bSGMdV6tsOE70KbHoqJls4lE= github.com/sendgrid/sendgrid-go v3.13.0+incompatible h1:HZrzc06/QfBGesY9o3n1lvBrRONA+57rbDRKet7plos= @@ -1574,6 +1585,8 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sijms/go-ora/v2 v2.8.22 h1:3ABgRzVKxS439cEgSLjFKutIwOyhnyi4oOSBywEdOlU= github.com/sijms/go-ora/v2 v2.8.22/go.mod h1:QgFInVi3ZWyqAiJwzBQA+nbKYKH77tdp1PYoCqhR2dU= @@ -1683,6 +1696,7 @@ github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0 github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= @@ -1732,8 +1746,10 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -1744,6 +1760,8 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1: github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 h1:6fRhSjgLCkTD3JnJxvaJ4Sj+TYblw757bqYgZaOq5ZY= github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI= github.com/yashtewari/glob-intersection v0.2.0 h1:8iuHdN88yYuCzCdjt0gDe+6bAhUwBeEWqThExu54RFg= @@ -1792,6 +1810,7 @@ go.etcd.io/etcd/client/v3 v3.5.21/go.mod h1:mFYy67IOqmbRf/kRUvsHixzo3iG+1OF2W2+j go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY= go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0/go.mod h1:FAwse6Zlm5v4tEWZaTjmNhe17Int4Oxbu7+2r0DiD3w= go.etcd.io/etcd/server/v3 v3.5.0-alpha.0/go.mod h1:tsKetYpt980ZTpzl/gb+UOJj9RkIyCb1u4wjzMg90BQ= +go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.mongodb.org/mongo-driver v1.12.0/go.mod h1:AZkxhPnFJUoH7kZlFkVKucV20K387miPfm7oimrSmK0= go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= @@ -1890,8 +1909,10 @@ go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= -go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= -go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= goji.io v2.0.2+incompatible h1:uIssv/elbKRLznFUy3Xj4+2Mz/qKhek/9aZQDUMae7c= goji.io v2.0.2+incompatible/go.mod h1:sbqFwrtqZACxLBTQcdgVjFh54yGVCvwq8+w49MVMMIk= golang.org/x/arch v0.0.0-20201008161808-52c3e6f60cff/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4= diff --git a/state/clickhouse/clickhouse.go b/state/clickhouse/clickhouse.go new file mode 100644 index 0000000000..26dbeeacb8 --- /dev/null +++ b/state/clickhouse/clickhouse.go @@ -0,0 +1,421 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clickhouse + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "reflect" + "strings" + "time" + + _ "github.com/ClickHouse/clickhouse-go/v2" + "github.com/google/uuid" + + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + "github.com/dapr/components-contrib/state/utils" + "github.com/dapr/kit/logger" + kitmd "github.com/dapr/kit/metadata" +) + +type StateStore struct { + db *sql.DB + logger logger.Logger + config clickhouseMetadata +} + +// maxIdentifierLength is the maximum length for database and table names in ClickHouse. +// ClickHouse itself doesn't enforce a strict limit, but identifier names map to filesystem +// filenames. On ext4 (and most Linux filesystems), the max filename length is 255 bytes. +// ClickHouse also appends suffixes like ".sql", ".detached", etc., so we use 250 to leave +// room for these suffixes and avoid "filename too long" errors. +// See: https://github.com/ClickHouse/ClickHouse/issues/36485 +const maxIdentifierLength = 250 + +type clickhouseMetadata struct { + ClickhouseURL string `mapstructure:"clickhouseUrl"` + DatabaseName string `mapstructure:"databaseName"` + TableName string `mapstructure:"tableName"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` +} + +func NewClickHouseStateStore(logger logger.Logger) state.Store { + return &StateStore{ + logger: logger, + } +} + +func (c *StateStore) Init(ctx context.Context, metadata state.Metadata) error { + config, err := parseAndValidateMetadata(metadata) + if err != nil { + return err + } + c.config = config + + dsn := c.config.ClickhouseURL + if c.config.Username != "" && !strings.Contains(dsn, "username=") { + if !strings.Contains(dsn, "?") { + dsn += "?" + } else { + dsn += "&" + } + dsn += "username=" + c.config.Username + } + + if c.config.Password != "" && !strings.Contains(dsn, "password=") { + if !strings.Contains(dsn, "?") { + dsn += "?" + } else if !strings.HasSuffix(dsn, "&") { + dsn += "&" + } + dsn += "password=" + c.config.Password + } + + db, err := sql.Open("clickhouse", dsn) + if err != nil { + return fmt.Errorf("error opening connection: %v", err) + } + + if err := db.Ping(); err != nil { + return fmt.Errorf("error connecting to database: %v", err) + } + + // Database and table names are validated during metadata parsing + // and come from trusted configuration, so direct string concatenation is acceptable here. + createDBQuery := "CREATE DATABASE IF NOT EXISTS " + c.config.DatabaseName + if _, err := db.ExecContext(ctx, createDBQuery); err != nil { + return fmt.Errorf("error creating database: %v", err) + } + + //nolint:gosec + createTableQuery := ` + CREATE TABLE IF NOT EXISTS ` + c.config.DatabaseName + `.` + c.config.TableName + ` ( + key String, + value String, + etag String, + expire DateTime64(3) NULL, + PRIMARY KEY(key) + ) ENGINE = ReplacingMergeTree() + ORDER BY key + ` + + if _, err := db.ExecContext(ctx, createTableQuery); err != nil { + return fmt.Errorf("error creating table: %v", err) + } + + c.db = db + return nil +} + +func (c *StateStore) Features() []state.Feature { + return []state.Feature{ + state.FeatureETag, + state.FeatureTTL, + } +} + +func (c *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) { + if req.Key == "" { + return nil, errors.New("key is empty") + } + + query := ` + SELECT value, etag, expire + FROM ` + c.config.DatabaseName + `.` + c.config.TableName + ` FINAL + WHERE key = ? AND (expire IS NULL OR expire > now64()) + ` + + var value, etag string + var expire *time.Time + err := c.db.QueryRowContext(ctx, query, req.Key).Scan(&value, &etag, &expire) + if err == sql.ErrNoRows { + return &state.GetResponse{}, nil + } + if err != nil { + return nil, err + } + + var metadata map[string]string + if expire != nil { + metadata = map[string]string{ + state.GetRespMetaKeyTTLExpireTime: expire.UTC().Format(time.RFC3339), + } + } + + return &state.GetResponse{ + Data: []byte(value), + ETag: &etag, + Metadata: metadata, + }, nil +} + +func (c *StateStore) Set(ctx context.Context, req *state.SetRequest) error { + if req.Key == "" { + return errors.New("key is empty") + } + + ttlInSeconds := 0 + if req.Metadata != nil { + var parseErr error + ttlInSeconds, parseErr = parseTTL(req.Metadata) + if parseErr != nil { + return parseErr + } + } + + value, err := c.marshal(req.Value) + if err != nil { + return err + } + + var expireTime *time.Time + if ttlInSeconds > 0 { + t := time.Now().Add(time.Duration(ttlInSeconds) * time.Second) + expireTime = &t + } + + if req.ETag != nil && *req.ETag != "" { + currentETag, etagErr := c.getETag(ctx, req.Key) + if etagErr != nil { + return etagErr + } + + if currentETag != "" && currentETag != *req.ETag { + return state.NewETagError(state.ETagMismatch, nil) + } + } else if req.Options.Concurrency == state.FirstWrite { + exists, existsErr := c.keyExists(ctx, req.Key) + if existsErr != nil { + return existsErr + } + if exists { + return state.NewETagError(state.ETagMismatch, nil) + } + } + + etag := uuid.New().String() + + //nolint:gosec + insertQuery := ` + INSERT INTO ` + c.config.DatabaseName + `.` + c.config.TableName + ` (key, value, etag, expire) + VALUES (?, ?, ?, ?) + ` + + _, err = c.db.ExecContext(ctx, insertQuery, req.Key, value, etag, expireTime) + if err != nil { + //nolint:gosec + updateQuery := ` + ALTER TABLE ` + c.config.DatabaseName + `.` + c.config.TableName + ` + UPDATE value = ?, etag = ?, expire = ? + WHERE key = ? + ` + + _, updateErr := c.db.ExecContext(ctx, updateQuery, value, etag, expireTime, req.Key) + if updateErr != nil { + return fmt.Errorf("error updating value: %v", updateErr) + } + } + + return nil +} + +func (c *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error { + if req.Key == "" { + return errors.New("key is empty") + } + + if req.ETag != nil && *req.ETag != "" { + currentETag, err := c.getETag(ctx, req.Key) + if err != nil { + return err + } + + if currentETag != "" && currentETag != *req.ETag { + return state.NewETagError(state.ETagMismatch, nil) + } + } + + //nolint:gosec + query := "DELETE FROM " + c.config.DatabaseName + "." + c.config.TableName + " WHERE key = ?" + _, err := c.db.ExecContext(ctx, query, req.Key) + return err +} + +func (c *StateStore) marshal(v any) (string, error) { + var value string + switch v := v.(type) { + case []byte: + value = string(v) + case string: + value = v + default: + bt, err := utils.Marshal(v, json.Marshal) + if err != nil { + return "", err + } + value = string(bt) + } + return value, nil +} + +func parseTTL(metadata map[string]string) (int, error) { + if metadata == nil { + return 0, nil + } + ttl, ok := metadata["ttlInSeconds"] + if !ok || ttl == "" { + return 0, nil + } + + ttlMetadata := map[string]string{ + "ttlInSeconds": ttl, + } + + ttlPtr, err := utils.ParseTTL(ttlMetadata) + if err != nil { + return 0, fmt.Errorf("error parsing TTL: %v", err) + } + + if ttlPtr == nil { + return 0, nil + } + + return *ttlPtr, nil +} + +func parseAndValidateMetadata(metadata state.Metadata) (clickhouseMetadata, error) { + config := clickhouseMetadata{} + + err := kitmd.DecodeMetadata(metadata.Properties, &config) + if err != nil { + return config, err + } + + if config.ClickhouseURL == "" { + return config, errors.New("ClickHouse URL is missing") + } + + if config.DatabaseName == "" { + return config, errors.New("ClickHouse database name is missing") + } + + if len(config.DatabaseName) > maxIdentifierLength { + return config, fmt.Errorf("ClickHouse database name exceeds maximum length of %d characters", maxIdentifierLength) + } + + if config.TableName == "" { + return config, errors.New("ClickHouse table name is missing") + } + + if len(config.TableName) > maxIdentifierLength { + return config, fmt.Errorf("ClickHouse table name exceeds maximum length of %d characters", maxIdentifierLength) + } + + return config, nil +} + +func (c *StateStore) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { + metadataStruct := clickhouseMetadata{} + metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.StateStoreType) + return +} + +func (c *StateStore) BulkGet(ctx context.Context, reqs []state.GetRequest, opts state.BulkGetOpts) ([]state.BulkGetResponse, error) { + responses := make([]state.BulkGetResponse, len(reqs)) + for i, req := range reqs { + response, err := c.Get(ctx, &req) + if err != nil { + return nil, err + } + responses[i] = state.BulkGetResponse{ + Key: req.Key, + Data: response.Data, + ETag: response.ETag, + Metadata: response.Metadata, + Error: "", + } + } + return responses, nil +} + +func (c *StateStore) BulkSet(ctx context.Context, reqs []state.SetRequest, opts state.BulkStoreOpts) error { + for _, req := range reqs { + err := c.Set(ctx, &req) + if err != nil { + return err + } + } + return nil +} + +func (c *StateStore) BulkDelete(ctx context.Context, reqs []state.DeleteRequest, opts state.BulkStoreOpts) error { + for _, req := range reqs { + err := c.Delete(ctx, &req) + if err != nil { + return err + } + } + return nil +} + +func (c *StateStore) Close() error { + if c.db != nil { + return c.db.Close() + } + return nil +} + +func (c *StateStore) getETag(ctx context.Context, key string) (string, error) { + query := ` + SELECT etag + FROM ` + c.config.DatabaseName + `.` + c.config.TableName + ` FINAL + WHERE key = ? AND (expire IS NULL OR expire > now64()) + ` + + var etag string + err := c.db.QueryRowContext(ctx, query, key).Scan(&etag) + if err == sql.ErrNoRows { + return "", nil + } + if err != nil { + return "", fmt.Errorf("error getting etag: %v", err) + } + + return etag, nil +} + +func (c *StateStore) keyExists(ctx context.Context, key string) (bool, error) { + query := ` + SELECT 1 + FROM ` + c.config.DatabaseName + `.` + c.config.TableName + ` FINAL + WHERE key = ? AND (expire IS NULL OR expire > now64()) + LIMIT 1 + ` + + var exists int + err := c.db.QueryRowContext(ctx, query, key).Scan(&exists) + if err == sql.ErrNoRows { + return false, nil + } + if err != nil { + return false, fmt.Errorf("error checking key existence: %v", err) + } + + return true, nil +} diff --git a/state/clickhouse/clickhouse_integration_test.go b/state/clickhouse/clickhouse_integration_test.go new file mode 100644 index 0000000000..cb1897ff13 --- /dev/null +++ b/state/clickhouse/clickhouse_integration_test.go @@ -0,0 +1,543 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clickhouse + +import ( + "encoding/json" + "os" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + "github.com/dapr/kit/logger" +) + +const ( + connectionStringEnvKey = "DAPR_TEST_CLICKHOUSE_CONNSTRING" +) + +type fakeItem struct { + Color string +} + +func TestClickHouseIntegrationFull(t *testing.T) { + connectionString := getConnectionString() + if connectionString == "" { + t.Skipf("ClickHouse state integration tests skipped. To enable define the connection string using environment variable '%s' (example 'export %s=\"tcp://localhost:9000\")", connectionStringEnvKey, connectionStringEnvKey) + } + + t.Run("Test init configurations", func(t *testing.T) { + testInitConfiguration(t) + }) + + metadata := state.Metadata{ + Base: metadata.Base{Properties: map[string]string{ + "clickhouseUrl": connectionString, + "databaseName": "dapr_test", + "tableName": "state_test", + }}, + } + + chs := NewClickHouseStateStore(logger.NewLogger("test")).(*StateStore) + t.Cleanup(func() { + defer chs.Close() + }) + + err := chs.Init(t.Context(), metadata) + if err != nil { + t.Fatal(err) + } + + t.Run("Get Set Delete one item", func(t *testing.T) { + t.Parallel() + setGetUpdateDeleteOneItem(t, chs) + }) + + t.Run("Get item that does not exist", func(t *testing.T) { + t.Parallel() + getItemThatDoesNotExist(t, chs) + }) + + t.Run("Get item with no key fails", func(t *testing.T) { + t.Parallel() + getItemWithNoKey(t, chs) + }) + + t.Run("Set item with no key fails", func(t *testing.T) { + t.Parallel() + setItemWithNoKey(t, chs) + }) + + t.Run("Update and delete with etag succeeds", func(t *testing.T) { + t.Parallel() + updateAndDeleteWithEtagSucceeds(t, chs) + }) + + t.Run("Update with old etag fails", func(t *testing.T) { + t.Parallel() + updateWithOldEtagFails(t, chs) + }) + + t.Run("Insert with etag fails", func(t *testing.T) { + t.Parallel() + newItemWithEtagFails(t, chs) + }) + + t.Run("Delete with invalid etag fails", func(t *testing.T) { + t.Parallel() + deleteWithInvalidEtagFails(t, chs) + }) + + t.Run("Delete item with no key fails", func(t *testing.T) { + t.Parallel() + deleteItemWithNoKey(t, chs) + }) + + t.Run("Delete item that does not exist", func(t *testing.T) { + t.Parallel() + deleteItemThatDoesNotExist(t, chs) + }) + + t.Run("Bulk set and bulk delete", func(t *testing.T) { + t.Parallel() + testBulkSetAndBulkDelete(t, chs) + }) + + t.Run("Set item with invalid TTL", func(t *testing.T) { + t.Parallel() + testSetItemWithInvalidTTL(t, chs) + }) + + t.Run("Set item with negative TTL", func(t *testing.T) { + t.Parallel() + testSetItemWithNegativeTTL(t, chs) + }) + + t.Run("Set with TTL updates the expiration field", func(t *testing.T) { + t.Parallel() + setTTLUpdatesExpiry(t, chs) + }) + + t.Run("Expired item cannot be read", func(t *testing.T) { + t.Parallel() + expiredStateCannotBeRead(t, chs) + }) + + t.Run("Unexpired item can be read", func(t *testing.T) { + t.Parallel() + unexpiredStateCanBeRead(t, chs) + }) + + t.Run("Test Features", func(t *testing.T) { + t.Parallel() + testFeatures(t, chs) + }) +} + +func getConnectionString() string { + return os.Getenv(connectionStringEnvKey) +} + +func testInitConfiguration(t *testing.T) { + logger := logger.NewLogger("test") + tests := []struct { + name string + properties map[string]string + expectedErr string + }{ + { + name: "Empty", + properties: map[string]string{}, + expectedErr: "ClickHouse URL is missing", + }, + { + name: "Missing database name", + properties: map[string]string{ + "clickhouseUrl": "tcp://localhost:9000", + }, + expectedErr: "ClickHouse database name is missing", + }, + { + name: "Missing table name", + properties: map[string]string{ + "clickhouseUrl": "tcp://localhost:9000", + "databaseName": "test", + }, + expectedErr: "ClickHouse table name is missing", + }, + { + name: "Valid configuration", + properties: map[string]string{ + "clickhouseUrl": "tcp://localhost:9000", + "databaseName": "test", + "tableName": "state", + }, + expectedErr: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + store := NewClickHouseStateStore(logger) + metadata := state.Metadata{ + Base: metadata.Base{Properties: tt.properties}, + } + + err := store.Init(t.Context(), metadata) + if tt.expectedErr == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedErr) + } + }) + } +} + +func setGetUpdateDeleteOneItem(t *testing.T, s state.Store) { + key := randomKey() + value := &fakeItem{Color: "yellow"} + + setItem(t, s, key, value, nil) + + getResponse, outputObject := getItem(t, s, key) + assert.Equal(t, value, outputObject) + + newValue := &fakeItem{Color: "green"} + setItem(t, s, key, newValue, getResponse.ETag) + getResponse, outputObject = getItem(t, s, key) + + assert.Equal(t, newValue, outputObject) + + deleteItem(t, s, key, getResponse.ETag) +} + +func getItemThatDoesNotExist(t *testing.T, s state.Store) { + key := randomKey() + response, outputObject := getItem(t, s, key) + assert.Nil(t, response.Data) + assert.Equal(t, "", outputObject.Color) +} + +func getItemWithNoKey(t *testing.T, s state.Store) { + getReq := &state.GetRequest{ + Key: "", + } + + response, err := s.Get(t.Context(), getReq) + require.Error(t, err) + assert.Nil(t, response) +} + +func setItemWithNoKey(t *testing.T, s state.Store) { + setReq := &state.SetRequest{ + Key: "", + } + + err := s.Set(t.Context(), setReq) + require.Error(t, err) +} + +func newItemWithEtagFails(t *testing.T, s state.Store) { + value := &fakeItem{Color: "teal"} + invalidEtag := "12345" + + setReq := &state.SetRequest{ + Key: randomKey(), + ETag: &invalidEtag, + Value: value, + Options: state.SetStateOption{ + Concurrency: state.FirstWrite, + }, + } + + err := s.Set(t.Context(), setReq) + require.Error(t, err) +} + +func updateWithOldEtagFails(t *testing.T, s state.Store) { + key := randomKey() + value := &fakeItem{Color: "gray"} + setItem(t, s, key, value, nil) + getResponse, _ := getItem(t, s, key) + assert.NotNil(t, getResponse.ETag) + originalEtag := getResponse.ETag + + newValue := &fakeItem{Color: "silver"} + setItem(t, s, key, newValue, originalEtag) + getResponse, _ = getItem(t, s, key) + assert.NotNil(t, getResponse.ETag) + currentEtag := getResponse.ETag + + newValue = &fakeItem{Color: "maroon"} + setReq := &state.SetRequest{ + Key: key, + ETag: originalEtag, + Value: newValue, + } + err := s.Set(t.Context(), setReq) + require.Error(t, err) + + deleteItem(t, s, key, currentEtag) +} + +func updateAndDeleteWithEtagSucceeds(t *testing.T, s state.Store) { + key := randomKey() + value := &fakeItem{Color: "hazel"} + setItem(t, s, key, value, nil) + getResponse, outputObject := getItem(t, s, key) + assert.Equal(t, value, outputObject) + assert.NotNil(t, getResponse.ETag) + + value.Color = "purple" + setItem(t, s, key, value, getResponse.ETag) + updateResponse, outputObject := getItem(t, s, key) + assert.Equal(t, value, outputObject) + assert.NotNil(t, updateResponse.ETag) + assert.NotEqual(t, getResponse.ETag, updateResponse.ETag) + + deleteItem(t, s, key, updateResponse.ETag) +} + +func deleteWithInvalidEtagFails(t *testing.T, s state.Store) { + key := randomKey() + value := &fakeItem{Color: "mauve"} + setItem(t, s, key, value, nil) + getResponse, _ := getItem(t, s, key) + assert.NotNil(t, getResponse.ETag) + + invalidEtag := "12345" + deleteReq := &state.DeleteRequest{ + Key: key, + ETag: &invalidEtag, + } + err := s.Delete(t.Context(), deleteReq) + require.Error(t, err) + + deleteItem(t, s, key, getResponse.ETag) +} + +func deleteItemWithNoKey(t *testing.T, s state.Store) { + deleteReq := &state.DeleteRequest{ + Key: "", + } + err := s.Delete(t.Context(), deleteReq) + require.Error(t, err) +} + +func deleteItemThatDoesNotExist(t *testing.T, s state.Store) { + deleteReq := &state.DeleteRequest{ + Key: randomKey(), + } + err := s.Delete(t.Context(), deleteReq) + require.NoError(t, err) +} + +func testBulkSetAndBulkDelete(t *testing.T, s state.Store) { + setReq := []state.SetRequest{ + { + Key: randomKey(), + Value: &fakeItem{Color: "oceanblue"}, + }, + { + Key: randomKey(), + Value: &fakeItem{Color: "livingwhite"}, + }, + } + + err := s.BulkSet(t.Context(), setReq, state.BulkStoreOpts{}) + require.NoError(t, err) + assert.True(t, storeItemExists(t, s, setReq[0].Key)) + assert.True(t, storeItemExists(t, s, setReq[1].Key)) + + deleteReq := []state.DeleteRequest{ + { + Key: setReq[0].Key, + }, + { + Key: setReq[1].Key, + }, + } + + err = s.BulkDelete(t.Context(), deleteReq, state.BulkStoreOpts{}) + require.NoError(t, err) + assert.False(t, storeItemExists(t, s, setReq[0].Key)) + assert.False(t, storeItemExists(t, s, setReq[1].Key)) +} + +func testSetItemWithInvalidTTL(t *testing.T, s state.Store) { + setReq := &state.SetRequest{ + Key: randomKey(), + Value: &fakeItem{Color: "red"}, + Metadata: map[string]string{ + "ttlInSeconds": "XX", + }, + } + + err := s.Set(t.Context(), setReq) + require.Error(t, err) +} + +func testSetItemWithNegativeTTL(t *testing.T, s state.Store) { + setReq := &state.SetRequest{ + Key: randomKey(), + Value: &fakeItem{Color: "red"}, + Metadata: map[string]string{ + "ttlInSeconds": "-1", + }, + } + + err := s.Set(t.Context(), setReq) + require.Error(t, err) +} + +func setTTLUpdatesExpiry(t *testing.T, s state.Store) { + key := randomKey() + setReq := &state.SetRequest{ + Key: key, + Value: &fakeItem{Color: "red"}, + Metadata: map[string]string{ + "ttlInSeconds": "1000", + }, + } + + err := s.Set(t.Context(), setReq) + require.NoError(t, err) + + getReq := &state.GetRequest{Key: key} + resp, err := s.Get(t.Context(), getReq) + require.NoError(t, err) + assert.NotNil(t, resp.Data) + assert.NotEmpty(t, resp.Metadata[state.GetRespMetaKeyTTLExpireTime]) + + deleteReq := &state.DeleteRequest{Key: key} + err = s.Delete(t.Context(), deleteReq) + require.NoError(t, err) +} + +func expiredStateCannotBeRead(t *testing.T, s state.Store) { + key := randomKey() + setReq := &state.SetRequest{ + Key: key, + Value: &fakeItem{Color: "red"}, + Metadata: map[string]string{ + "ttlInSeconds": "1", + }, + } + + err := s.Set(t.Context(), setReq) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + getReq := &state.GetRequest{Key: key} + resp, err := s.Get(t.Context(), getReq) + require.NoError(t, err) + assert.Nil(t, resp.Data) +} + +func unexpiredStateCanBeRead(t *testing.T, s state.Store) { + key := randomKey() + value := &fakeItem{Color: "blue"} + setReq := &state.SetRequest{ + Key: key, + Value: value, + Metadata: map[string]string{ + "ttlInSeconds": "1000", + }, + } + + err := s.Set(t.Context(), setReq) + require.NoError(t, err) + + getReq := &state.GetRequest{Key: key} + resp, err := s.Get(t.Context(), getReq) + require.NoError(t, err) + assert.NotNil(t, resp.Data) + + var outputObject fakeItem + err = json.Unmarshal(resp.Data, &outputObject) + require.NoError(t, err) + assert.Equal(t, value, &outputObject) + + deleteReq := &state.DeleteRequest{Key: key} + err = s.Delete(t.Context(), deleteReq) + require.NoError(t, err) +} + +func testFeatures(t *testing.T, s state.Store) { + features := s.Features() + assert.Contains(t, features, state.FeatureETag) + assert.Contains(t, features, state.FeatureTTL) +} + +func setItem(t *testing.T, s state.Store, key string, value interface{}, etag *string) { + setReq := &state.SetRequest{ + Key: key, + ETag: etag, + Value: value, + } + + err := s.Set(t.Context(), setReq) + require.NoError(t, err) +} + +func getItem(t *testing.T, s state.Store, key string) (*state.GetResponse, *fakeItem) { + getReq := &state.GetRequest{ + Key: key, + } + + response, err := s.Get(t.Context(), getReq) + require.NoError(t, err) + assert.NotNil(t, response) + + outputObject := &fakeItem{} + if response.Data != nil { + err = json.Unmarshal(response.Data, outputObject) + require.NoError(t, err) + } + + return response, outputObject +} + +func deleteItem(t *testing.T, s state.Store, key string, etag *string) { + deleteReq := &state.DeleteRequest{ + Key: key, + ETag: etag, + } + + err := s.Delete(t.Context(), deleteReq) + require.NoError(t, err) + + assert.False(t, storeItemExists(t, s, key)) +} + +func storeItemExists(t *testing.T, s state.Store, key string) bool { + getReq := &state.GetRequest{ + Key: key, + } + response, err := s.Get(t.Context(), getReq) + require.NoError(t, err) + + return response.Data != nil +} + +func randomKey() string { + return uuid.New().String() +} diff --git a/state/clickhouse/clickhouse_test.go b/state/clickhouse/clickhouse_test.go new file mode 100644 index 0000000000..635a215a4a --- /dev/null +++ b/state/clickhouse/clickhouse_test.go @@ -0,0 +1,222 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clickhouse + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + "github.com/dapr/kit/logger" +) + +const ( + testURL = "http://localhost:8123" + testDatabase = "dapr_test" + testTable = "state_test" + testUsername = "default" + testPassword = "clickhouse_password" +) + +func TestClickHouseIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test") + } + + if os.Getenv("RUN_CLICKHOUSE_INTEGRATION_TEST") == "" { + t.Skip("Skipping ClickHouse integration test. Set RUN_CLICKHOUSE_INTEGRATION_TEST=1 to run") + } + + store := NewClickHouseStateStore(logger.NewLogger("test")) + ctx := t.Context() + + err := store.Init(ctx, state.Metadata{ + Base: metadata.Base{ + Properties: map[string]string{ + "clickhouseUrl": testURL, + "databaseName": testDatabase, + "tableName": testTable, + "username": testUsername, + "password": testPassword, + }, + }, + }) + require.NoError(t, err) + + t.Cleanup(func() { + if s, ok := store.(*StateStore); ok { + if s.db != nil { + _, _ = s.db.ExecContext(ctx, "DROP TABLE IF EXISTS "+testDatabase+"."+testTable) + _, _ = s.db.ExecContext(ctx, "DROP DATABASE IF EXISTS "+testDatabase) + } + _ = store.Close() + } + }) + + t.Run("Test CRUD operations", func(t *testing.T) { + testKey := "test-key" + testValue := []byte("test-value") + + err := store.Set(ctx, &state.SetRequest{ + Key: testKey, + Value: testValue, + }) + require.NoError(t, err) + + response, err := store.Get(ctx, &state.GetRequest{ + Key: testKey, + }) + require.NoError(t, err) + assert.Equal(t, testValue, response.Data) + + err = store.Delete(ctx, &state.DeleteRequest{ + Key: testKey, + }) + require.NoError(t, err) + + response, err = store.Get(ctx, &state.GetRequest{ + Key: testKey, + }) + require.NoError(t, err) + assert.Nil(t, response.Data) + }) + + t.Run("Test ETag support", func(t *testing.T) { + testKey := "etag-key" + testValue := []byte("etag-value") + + err := store.Set(ctx, &state.SetRequest{ + Key: testKey, + Value: testValue, + }) + require.NoError(t, err) + + response, err := store.Get(ctx, &state.GetRequest{ + Key: testKey, + }) + require.NoError(t, err) + assert.NotEmpty(t, response.ETag) + + err = store.Set(ctx, &state.SetRequest{ + Key: testKey, + Value: []byte("new-value"), + ETag: response.ETag, + }) + require.NoError(t, err) + + err = store.Delete(ctx, &state.DeleteRequest{ + Key: testKey, + }) + require.NoError(t, err) + }) + + t.Run("Test empty key", func(t *testing.T) { + err := store.Set(ctx, &state.SetRequest{ + Key: "", + Value: []byte("test"), + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "key is empty") + + _, err = store.Get(ctx, &state.GetRequest{ + Key: "", + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "key is empty") + + err = store.Delete(ctx, &state.DeleteRequest{ + Key: "", + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "key is empty") + }) + + t.Run("Test non-existent key", func(t *testing.T) { + response, err := store.Get(ctx, &state.GetRequest{ + Key: "non-existent-key", + }) + require.NoError(t, err) + assert.Nil(t, response.Data) + }) + + t.Run("Test Features", func(t *testing.T) { + features := store.Features() + assert.Contains(t, features, state.FeatureETag) + }) +} + +func TestParseAndValidateMetadata(t *testing.T) { + t.Run("With valid metadata", func(t *testing.T) { + properties := map[string]string{ + "clickhouseUrl": "tcp://127.0.0.1:9000", + "databaseName": "default", + "tableName": "statestore", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + + metadata, err := parseAndValidateMetadata(m) + require.NoError(t, err) + assert.Equal(t, properties["clickhouseUrl"], metadata.ClickhouseURL) + assert.Equal(t, properties["databaseName"], metadata.DatabaseName) + assert.Equal(t, properties["tableName"], metadata.TableName) + }) + + t.Run("Missing ClickhouseURL", func(t *testing.T) { + properties := map[string]string{ + "databaseName": "default", + "tableName": "statestore", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + + _, err := parseAndValidateMetadata(m) + require.Error(t, err) + assert.Equal(t, "ClickHouse URL is missing", err.Error()) + }) + + t.Run("Missing DatabaseName", func(t *testing.T) { + properties := map[string]string{ + "clickhouseUrl": "tcp://127.0.0.1:9000", + "tableName": "statestore", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + + _, err := parseAndValidateMetadata(m) + require.Error(t, err) + assert.Equal(t, "ClickHouse database name is missing", err.Error()) + }) + + t.Run("Missing TableName", func(t *testing.T) { + properties := map[string]string{ + "clickhouseUrl": "tcp://127.0.0.1:9000", + "databaseName": "default", + } + m := state.Metadata{ + Base: metadata.Base{Properties: properties}, + } + + _, err := parseAndValidateMetadata(m) + require.Error(t, err) + assert.Equal(t, "ClickHouse table name is missing", err.Error()) + }) +} diff --git a/state/clickhouse/metadata.yaml b/state/clickhouse/metadata.yaml new file mode 100644 index 0000000000..481ea58346 --- /dev/null +++ b/state/clickhouse/metadata.yaml @@ -0,0 +1,50 @@ +# yaml-language-server: $schema=../../component-metadata-schema.json +schemaVersion: v1 +type: state +name: clickhouse +version: v1 +status: stable +title: "ClickHouse" +urls: + - title: Reference + url: https://docs.dapr.io/reference/components-reference/supported-state-stores/setup-clickhouse/ +capabilities: + - crud + - etag + - ttl +authenticationProfiles: + - title: "Username and password" + description: "Authenticate using username and password." + metadata: + - name: username + type: string + required: false + description: | + Username for ClickHouse host. Defaults to empty. + example: "default" + default: "" + - name: password + type: string + required: false + sensitive: true + description: | + Password for ClickHouse host. No default. Use secretKeyRef for + secret reference + example: "mypassword" + default: "" +metadata: + - name: clickhouseUrl + required: true + description: Connection string for the ClickHouse host + example: "tcp://localhost:9000" + type: string + - name: databaseName + required: true + description: The database name to use. Will be created if database does not exist. + example: "dapr_state_store" + type: string + - name: tableName + required: true + description: The table name to use. Will be created if table does not exist. + example: "state" + type: string diff --git a/tests/certification/state/clickhouse/clickhouse_test.go b/tests/certification/state/clickhouse/clickhouse_test.go new file mode 100644 index 0000000000..b4953fae3b --- /dev/null +++ b/tests/certification/state/clickhouse/clickhouse_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clickhouse + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dapr/components-contrib/state" + "github.com/dapr/components-contrib/state/clickhouse" + "github.com/dapr/components-contrib/tests/certification/flow" + "github.com/dapr/components-contrib/tests/certification/flow/dockercompose" + "github.com/dapr/kit/logger" +) + +const ( + componentName = "clickhouse-store" +) + +func TestClickHouseStateStore(t *testing.T) { + flow.New(t, "Test ClickHouse state store certification"). + Step(dockercompose.Run("clickhouse", "docker-compose.yml")). + Step("verify clickhouse store operations", testClickHouseStateStore()). + Run() +} + +func testClickHouseStateStore() flow.Runnable { + return func(ctx flow.Context) error { + store := clickhouse.NewClickHouseStateStore(logger.NewLogger("clickhouse-store-test")) + + metadata := state.Metadata{} + metadata.Properties = map[string]string{ + "clickhouseUrl": "tcp://localhost:9000", + "databaseName": "dapr_test", + "tableName": "state_test", + "username": "default", + "password": "", + } + + err := store.Init(ctx, metadata) + require.NoError(ctx.T, err) + + setReq := &state.SetRequest{ + Key: "test-key", + Value: []byte("test-value"), + } + err = store.Set(ctx, setReq) + require.NoError(ctx.T, err) + + getReq := &state.GetRequest{ + Key: "test-key", + } + getResp, err := store.Get(ctx, getReq) + require.NoError(ctx.T, err) + assert.Equal(ctx.T, "test-value", string(getResp.Data)) + + delReq := &state.DeleteRequest{ + Key: "test-key", + } + err = store.Delete(ctx, delReq) + require.NoError(ctx.T, err) + + getResp, err = store.Get(ctx, getReq) + require.NoError(ctx.T, err) + assert.Nil(ctx.T, getResp.Data) + + ttlSetReq := &state.SetRequest{ + Key: "test-ttl-key", + Value: []byte("test-ttl-value"), + Metadata: map[string]string{ + "ttlInSeconds": "1", + }, + } + err = store.Set(ctx, ttlSetReq) + require.NoError(ctx.T, err) + + fmt.Println("Waiting for TTL to expire...") + flow.Sleep(2 * time.Second) + + ttlGetReq := &state.GetRequest{ + Key: "test-ttl-key", + } + ttlGetResp, err := store.Get(ctx, ttlGetReq) + require.NoError(ctx.T, err) + assert.Nil(ctx.T, ttlGetResp.Data) + + etagSetReq := &state.SetRequest{ + Key: "test-etag-key", + Value: []byte("test-etag-value"), + } + err = store.Set(ctx, etagSetReq) + require.NoError(ctx.T, err) + + etagGetReq := &state.GetRequest{ + Key: "test-etag-key", + } + etagGetResp, err := store.Get(ctx, etagGetReq) + require.NoError(ctx.T, err) + assert.NotNil(ctx.T, etagGetResp.ETag) + + etagUpdateReq := &state.SetRequest{ + Key: "test-etag-key", + Value: []byte("test-etag-value-updated"), + ETag: etagGetResp.ETag, + } + err = store.Set(ctx, etagUpdateReq) + require.NoError(ctx.T, err) + + badETag := "bad-etag" + etagBadUpdateReq := &state.SetRequest{ + Key: "test-etag-key", + Value: []byte("test-etag-value-updated-again"), + ETag: &badETag, + } + err = store.Set(ctx, etagBadUpdateReq) + require.Error(ctx.T, err) + + err = store.Delete(ctx, &state.DeleteRequest{Key: "test-etag-key"}) + require.NoError(ctx.T, err) + + return nil + } +} diff --git a/tests/certification/state/clickhouse/docker-compose.yml b/tests/certification/state/clickhouse/docker-compose.yml new file mode 100644 index 0000000000..cf61af77fe --- /dev/null +++ b/tests/certification/state/clickhouse/docker-compose.yml @@ -0,0 +1,21 @@ +version: '3.8' + +services: + clickhouse: + image: clickhouse/clickhouse-server:latest + ports: + - "9000:9000" + - "8123:8123" + environment: + - CLICKHOUSE_USER=default + - CLICKHOUSE_PASSWORD=clickhouse_password + - CLICKHOUSE_DB=dapr_test + ulimits: + nofile: + soft: 262144 + hard: 262144 + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:8123/ping"] + interval: 5s + timeout: 5s + retries: 10 diff --git a/tests/config/state/clickhouse/clickhouse.yml b/tests/config/state/clickhouse/clickhouse.yml new file mode 100644 index 0000000000..775ebabe6c --- /dev/null +++ b/tests/config/state/clickhouse/clickhouse.yml @@ -0,0 +1,18 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.clickhouse + version: v1 + metadata: + - name: clickhouseUrl + value: "tcp://localhost:9000" + - name: databaseName + value: "dapr_test" + - name: tableName + value: "state_test" + - name: username + value: "default" + - name: password + value: "clickhouse_password" diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index f8c7bbfb71..59ded0b35f 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -123,3 +123,7 @@ components: operations: [] - component: ravendb operations: [ "first-write", "etag", "ttl", "transaction" ] + - component: clickhouse + operations: [ "etag", "first-write", "ttl" ] + config: + badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70" diff --git a/tests/conformance/state_test.go b/tests/conformance/state_test.go index 78da04f26c..cd0cf0f987 100644 --- a/tests/conformance/state_test.go +++ b/tests/conformance/state_test.go @@ -29,6 +29,7 @@ import ( s_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb" s_azuretablestorage "github.com/dapr/components-contrib/state/azure/tablestorage" s_cassandra "github.com/dapr/components-contrib/state/cassandra" + s_clickhouse "github.com/dapr/components-contrib/state/clickhouse" s_cloudflareworkerskv "github.com/dapr/components-contrib/state/cloudflare/workerskv" s_cockroachdb_v1 "github.com/dapr/components-contrib/state/cockroachdb" s_coherence "github.com/dapr/components-contrib/state/coherence" @@ -125,6 +126,8 @@ func loadStateStore(name string) state.Store { return s_cassandra.NewCassandraStateStore(testLogger) case "cloudflare.workerskv": return s_cloudflareworkerskv.NewCFWorkersKV(testLogger) + case "clickhouse": + return s_clickhouse.NewClickHouseStateStore(testLogger) case "cockroachdb.v1": return s_cockroachdb_v1.New(testLogger) case "cockroachdb.v2":