diff --git a/src/implementation/Client/GRPCClient/state.ts b/src/implementation/Client/GRPCClient/state.ts index 30f1d694c..da20e8c60 100644 --- a/src/implementation/Client/GRPCClient/state.ts +++ b/src/implementation/Client/GRPCClient/state.ts @@ -14,6 +14,7 @@ limitations under the License. import { create } from "@bufbuild/protobuf"; import GRPCClient from "./GRPCClient"; import { + DeleteBulkStateRequestSchema, DeleteStateRequestSchema, ExecuteStateTransactionRequestSchema, GetBulkStateRequestSchema, @@ -35,6 +36,7 @@ import { Settings } from "../../../utils/Settings.util"; import { StateSaveResponseType } from "../../../types/state/StateSaveResponseType"; import { StateSaveOptions } from "../../../types/state/StateSaveOptions.type"; import { StateDeleteOptions } from "../../../types/state/StateDeleteOptions.type"; +import { StateDeleteBulkOptions } from "../../../types/state/StateDeleteBulkOptions.type"; import { StateGetOptions } from "../../../types/state/StateGetOptions.type"; import { IStateOptions } from "../../../types/state/StateOptions.type"; @@ -130,6 +132,29 @@ export default class GRPCClientState implements IClientState { return {}; } + async deleteBulk( + storeName: string, + items: KeyValuePairType[], + _options: StateDeleteBulkOptions = {}, + ): Promise { + const states: StateItem[] = []; + + for (const item of items) { + const si = create(StateItemSchema, { + key: item.key, + etag: item?.etag ? create(EtagSchema, { value: item.etag }) : undefined, + options: this._configureStateOptions(item?.options), + metadata: item.metadata ?? {}, + }); + states.push(si); + } + + const client = await this.client.getClient(); + await client.deleteBulkState(create(DeleteBulkStateRequestSchema, { storeName, states })); + + return {}; + } + async transaction( storeName: string, operations: OperationType[] = [], diff --git a/src/implementation/Client/HTTPClient/state.ts b/src/implementation/Client/HTTPClient/state.ts index fb9c2998b..b93020ff7 100644 --- a/src/implementation/Client/HTTPClient/state.ts +++ b/src/implementation/Client/HTTPClient/state.ts @@ -26,6 +26,7 @@ import { Logger } from "../../../logger/Logger"; import { StateSaveResponseType } from "../../../types/state/StateSaveResponseType"; import { StateSaveOptions } from "../../../types/state/StateSaveOptions.type"; import { StateDeleteOptions } from "../../../types/state/StateDeleteOptions.type"; +import { StateDeleteBulkOptions } from "../../../types/state/StateDeleteBulkOptions.type"; import { THTTPExecuteParams } from "../../../types/http/THTTPExecuteParams.type"; import { StateGetOptions } from "../../../types/state/StateGetOptions.type"; @@ -120,6 +121,31 @@ export default class HTTPClientState implements IClientState { return {}; } + async deleteBulk( + storeName: string, + items: KeyValuePairType[], + _options: StateDeleteBulkOptions = {}, + ): Promise { + // The Dapr HTTP API does not expose a dedicated bulk-delete endpoint. + // Perform individual DELETE requests for each item. + try { + await Promise.all( + items.map((item) => + this.delete(storeName, item.key, { + etag: item.etag, + concurrency: item.options?.concurrency, + consistency: item.options?.consistency, + }), + ), + ); + } catch (e: any) { + this.logger.error(`Error bulk deleting state from store ${storeName}, error: ${e}`); + return { error: e }; + } + + return {}; + } + async transaction( storeName: string, operations: OperationType[] = [], diff --git a/src/index.ts b/src/index.ts index c90de5eb4..50e72957b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -39,6 +39,7 @@ import DaprPubSubStatusEnum from "./enum/DaprPubSubStatus.enum"; import StateConcurrencyEnum from "./enum/StateConcurrency.enum"; import StateConsistencyEnum from "./enum/StateConsistency.enum"; import { StateGetBulkOptions } from "./types/state/StateGetBulkOptions.type"; +import { StateDeleteBulkOptions } from "./types/state/StateDeleteBulkOptions.type"; import DaprWorkflowClient from "./workflow/client/DaprWorkflowClient"; import WorkflowActivityContext from "./workflow/runtime/WorkflowActivityContext"; @@ -79,6 +80,7 @@ export { StateConsistencyEnum, PubSubBulkPublishResponse, StateGetBulkOptions, + StateDeleteBulkOptions, DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, diff --git a/src/interfaces/Client/IClientState.ts b/src/interfaces/Client/IClientState.ts index 5f70f599f..36a9fbbae 100644 --- a/src/interfaces/Client/IClientState.ts +++ b/src/interfaces/Client/IClientState.ts @@ -21,6 +21,7 @@ import { StateGetBulkOptions } from "../../types/state/StateGetBulkOptions.type" import { StateSaveResponseType } from "../../types/state/StateSaveResponseType"; import { StateSaveOptions } from "../../types/state/StateSaveOptions.type"; import { StateDeleteOptions } from "../../types/state/StateDeleteOptions.type"; +import { StateDeleteBulkOptions } from "../../types/state/StateDeleteBulkOptions.type"; import { StateGetOptions } from "../../types/state/StateGetOptions.type"; export default interface IClientState { @@ -28,6 +29,11 @@ export default interface IClientState { get(storeName: string, key: string, options?: Partial): Promise; getBulk(storeName: string, keys: string[], options?: StateGetBulkOptions): Promise; delete(storeName: string, key: string, options?: Partial): Promise; + deleteBulk( + storeName: string, + items: KeyValuePairType[], + options?: StateDeleteBulkOptions, + ): Promise; transaction(storeName: string, operations?: OperationType[], metadata?: IRequestMetadata | null): Promise; query(storeName: string, query: StateQueryType): Promise; } diff --git a/src/types/state/StateDeleteBulkOptions.type.ts b/src/types/state/StateDeleteBulkOptions.type.ts new file mode 100644 index 000000000..1758da32d --- /dev/null +++ b/src/types/state/StateDeleteBulkOptions.type.ts @@ -0,0 +1,21 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { KeyValueType } from "../KeyValue.type"; + +export type StateDeleteBulkOptions = { + /** + * Metadata to be passed to the bulk delete operation. + */ + metadata?: KeyValueType; +}; diff --git a/test/e2e/common/client.test.ts b/test/e2e/common/client.test.ts index a74496cc3..3514ccf0d 100644 --- a/test/e2e/common/client.test.ts +++ b/test/e2e/common/client.test.ts @@ -386,6 +386,35 @@ describe("common/client/http", () => { expect(res).toEqual(""); }); + it("should be able to delete multiple keys in bulk from the state store", async () => { + await client.state.save(stateStoreName, [ + { + key: "key-1", + value: "value-1", + }, + { + key: "key-2", + value: "value-2", + }, + { + key: "key-3", + value: "value-3", + }, + ]); + + await client.state.deleteBulk(stateStoreName, [ + { key: "key-1", value: "" }, + { key: "key-3", value: "" }, + ]); + + const res1 = await client.state.get(stateStoreName, "key-1"); + const res2 = await client.state.get(stateStoreName, "key-2"); + const res3 = await client.state.get(stateStoreName, "key-3"); + expect(res1).toBeFalsy(); + expect(res2).toEqual("value-2"); + expect(res3).toBeFalsy(); + }); + it("should be able to perform a transaction that replaces a key and deletes another", async () => { await client.state.transaction(stateStoreName, [ { @@ -907,6 +936,35 @@ describe("common/client/grpc", () => { expect(res).toEqual(""); }); + it("should be able to delete multiple keys in bulk from the state store", async () => { + await client.state.save(stateStoreName, [ + { + key: "key-1", + value: "value-1", + }, + { + key: "key-2", + value: "value-2", + }, + { + key: "key-3", + value: "value-3", + }, + ]); + + await client.state.deleteBulk(stateStoreName, [ + { key: "key-1", value: "" }, + { key: "key-3", value: "" }, + ]); + + const res1 = await client.state.get(stateStoreName, "key-1"); + const res2 = await client.state.get(stateStoreName, "key-2"); + const res3 = await client.state.get(stateStoreName, "key-3"); + expect(res1).toEqual(""); + expect(res2).toEqual("value-2"); + expect(res3).toEqual(""); + }); + it("should be able to perform a transaction that replaces a key and deletes another", async () => { await client.state.transaction(stateStoreName, [ {