Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/implementation/Client/DaprClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import IClientPubSub from "../../interfaces/Client/IClientPubSub";
import IClientSecret from "../../interfaces/Client/IClientSecret";
import IClientSidecar from "../../interfaces/Client/IClientSidecar";
import IClientState from "../../interfaces/Client/IClientState";
import IClientJobs from "../../interfaces/Client/IClientJobs";
import IClientWorkflow from "../../interfaces/Client/IClientWorkflow";

import GRPCClient from "./GRPCClient/GRPCClient";
Expand All @@ -40,6 +41,7 @@ import GRPCClientPubSub from "./GRPCClient/pubsub";
import GRPCClientSecret from "./GRPCClient/secret";
import GRPCClientSidecar from "./GRPCClient/sidecar";
import GRPCClientState from "./GRPCClient/state";
import GRPCClientJobs from "./GRPCClient/jobs";
import GRPCClientWorkflow from "./GRPCClient/workflow";

import HTTPClient from "./HTTPClient/HTTPClient";
Expand All @@ -56,6 +58,7 @@ import HTTPClientPubSub from "./HTTPClient/pubsub";
import HTTPClientSecret from "./HTTPClient/secret";
import HTTPClientSidecar from "./HTTPClient/sidecar";
import HTTPClientState from "./HTTPClient/state";
import HTTPClientJobs from "./HTTPClient/jobs";
import HTTPClientWorkflow from "./HTTPClient/workflow";

import CommunicationProtocolEnum from "../../enum/CommunicationProtocol.enum";
Expand All @@ -75,6 +78,7 @@ export default class DaprClient {
readonly crypto: IClientCrypto;
readonly health: IClientHealth;
readonly invoker: IClientInvoker;
readonly jobs: IClientJobs;
readonly lock: IClientLock;
readonly metadata: IClientMetadata;
readonly proxy: IClientProxy;
Expand Down Expand Up @@ -115,6 +119,7 @@ export default class DaprClient {
this.sidecar = new GRPCClientSidecar(client);
this.proxy = new GRPCClientProxy(client);
this.configuration = new GRPCClientConfiguration(client);
this.jobs = new GRPCClientJobs(client);
this.lock = new GRPCClientLock(client);
this.crypto = new GRPCClientCrypto(client);
this.actor = new GRPCClientActor(client); // we use an abstractor here since we interface through a builder with the Actor Runtime
Expand All @@ -132,6 +137,7 @@ export default class DaprClient {
this.crypto = new HTTPClientCrypto(client);
this.health = new HTTPClientHealth(client);
this.invoker = new HTTPClientInvoker(client);
this.jobs = new HTTPClientJobs(client);
this.lock = new HTTPClientLock(client);
this.metadata = new HTTPClientMetadata(client);
this.proxy = new HTTPClientProxy(client);
Expand Down
189 changes: 189 additions & 0 deletions src/implementation/Client/GRPCClient/jobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { create } from "@bufbuild/protobuf";
import { AnySchema, DurationSchema } from "@bufbuild/protobuf/wkt";
import GRPCClient from "./GRPCClient";
import IClientJobs from "../../../interfaces/Client/IClientJobs";
import { Job } from "../../../types/jobs/Job";
import { JobFailurePolicy } from "../../../types/jobs/JobFailurePolicy";
import { ScheduleJobRequest } from "../../../types/jobs/ScheduleJobRequest";
import {
JobSchema,
ScheduleJobRequestSchema,
GetJobRequestSchema,
DeleteJobRequestSchema,
} from "../../../proto/dapr/proto/runtime/v1/dapr_pb";
import {
JobFailurePolicySchema,
JobFailurePolicyDropSchema,
JobFailurePolicyConstantSchema,
} from "../../../proto/dapr/proto/common/v1/common_pb";
import type { JobFailurePolicy as ProtoJobFailurePolicy } from "../../../proto/dapr/proto/common/v1/common_pb";
import type { Job as ProtoJob } from "../../../proto/dapr/proto/runtime/v1/dapr_pb";

/**
* Parse a duration string like "5s", "1m30s", "2h" into total seconds.
* Supports h, m, s units. Throws on unrecognized formats.
*/
function parseDurationSeconds(duration: string): number {
let total = 0;
let matched = false;
for (const match of duration.matchAll(/(\d+)(h|m|s)/g)) {
matched = true;
const value = parseInt(match[1], 10);
switch (match[2]) {
case "h":
total += value * 3600;
break;
case "m":
total += value * 60;
break;
case "s":
total += value;
break;
}
}
if (!matched) {
throw new Error(`Invalid duration format: "${duration}". Expected format like "5s", "1m30s", "2h".`);
}
return total;
}

/**
* Convert SDK JobFailurePolicy to proto JobFailurePolicy.
*/
function toProtoFailurePolicy(fp: JobFailurePolicy): ProtoJobFailurePolicy {
if (fp.type === "drop") {
return create(JobFailurePolicySchema, {
policy: { case: "drop", value: create(JobFailurePolicyDropSchema) },
});
}
const constantPolicy: Record<string, unknown> = {};
if (fp.interval !== undefined) {
constantPolicy.interval = create(DurationSchema, { seconds: BigInt(parseDurationSeconds(fp.interval)) });
}
if (fp.maxRetries !== undefined) {
constantPolicy.maxRetries = fp.maxRetries;
}
return create(JobFailurePolicySchema, {
policy: { case: "constant", value: create(JobFailurePolicyConstantSchema, constantPolicy) },
});
}

/**
* Convert proto Job to SDK Job type.
*/
function fromProtoJob(protoJob: ProtoJob): Job {
const job: Job = { name: protoJob.name };

if (protoJob.schedule !== undefined) {
job.schedule = protoJob.schedule;
}
if (protoJob.dueTime !== undefined) {
job.dueTime = protoJob.dueTime;
}
if (protoJob.repeats !== undefined) {
job.repeats = protoJob.repeats;
}
if (protoJob.ttl !== undefined) {
job.ttl = protoJob.ttl;
}

if (protoJob.data !== undefined && protoJob.data.value?.length) {
try {
job.data = JSON.parse(Buffer.from(protoJob.data.value).toString("utf-8"));
} catch {
job.data = Buffer.from(protoJob.data.value).toString("utf-8");
}
}

if (protoJob.failurePolicy !== undefined) {
const policy = protoJob.failurePolicy.policy;
if (policy.case === "drop") {
job.failurePolicy = { type: "drop" };
} else if (policy.case === "constant") {
const constant = policy.value;
const fp: JobFailurePolicy & { type: "constant" } = { type: "constant" };
if (constant.interval !== undefined) {
fp.interval = `${Number(constant.interval.seconds)}s`;
}
if (constant.maxRetries !== undefined) {
fp.maxRetries = constant.maxRetries;
}
job.failurePolicy = fp;
}
}

return job;
}

export default class GRPCClientJobs implements IClientJobs {
client: GRPCClient;

constructor(client: GRPCClient) {
this.client = client;
}

async schedule(job: ScheduleJobRequest): Promise<void> {
const client = await this.client.getClient();

const protoJobFields: Record<string, unknown> = {
name: job.name,
};

if (job.schedule !== undefined) {
protoJobFields.schedule = job.schedule;
}
if (job.dueTime !== undefined) {
protoJobFields.dueTime = job.dueTime;
}
if (job.repeats !== undefined) {
protoJobFields.repeats = job.repeats;
}
if (job.ttl !== undefined) {
protoJobFields.ttl = job.ttl;
}
if (job.data !== undefined) {
protoJobFields.data = create(AnySchema, {
value: Buffer.from(JSON.stringify(job.data), "utf-8"),
});
}
if (job.failurePolicy !== undefined) {
protoJobFields.failurePolicy = toProtoFailurePolicy(job.failurePolicy);
}

const req = create(ScheduleJobRequestSchema, {
job: create(JobSchema, protoJobFields),
overwrite: job.overwrite ?? false,
});

await client.scheduleJobAlpha1(req);
}

async get(name: string): Promise<Job> {
const client = await this.client.getClient();
const res = await client.getJobAlpha1(create(GetJobRequestSchema, { name }));

if (!res.job) {
throw new Error(`Job '${name}' not found in response`);
}

return fromProtoJob(res.job);
}

async delete(name: string): Promise<void> {
const client = await this.client.getClient();
await client.deleteJobAlpha1(create(DeleteJobRequestSchema, { name }));
}
}
124 changes: 124 additions & 0 deletions src/implementation/Client/HTTPClient/jobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import HTTPClient from "./HTTPClient";
import IClientJobs from "../../../interfaces/Client/IClientJobs";
import { Job } from "../../../types/jobs/Job";
import { JobFailurePolicy } from "../../../types/jobs/JobFailurePolicy";
import { ScheduleJobRequest } from "../../../types/jobs/ScheduleJobRequest";
import { KeyValueType } from "../../../types/KeyValue.type";

/**
* Serialize SDK JobFailurePolicy to the HTTP API shape.
*/
function toHTTPFailurePolicy(fp: JobFailurePolicy): KeyValueType {
if (fp.type === "drop") {
return { drop: {} };
}
const constant: KeyValueType = {};
if (fp.interval !== undefined) {
constant.interval = fp.interval;
}
if (fp.maxRetries !== undefined) {
constant.maxRetries = fp.maxRetries;
}
return { constant };
}

export default class HTTPClientJobs implements IClientJobs {
client: HTTPClient;

constructor(client: HTTPClient) {
this.client = client;
}

async schedule(job: ScheduleJobRequest): Promise<void> {
const body: KeyValueType = {};

if (job.schedule !== undefined) {
body.schedule = job.schedule;
}
if (job.dueTime !== undefined) {
body.dueTime = job.dueTime;
}
if (job.repeats !== undefined) {
body.repeats = job.repeats;
}
if (job.ttl !== undefined) {
body.ttl = job.ttl;
}
if (job.data !== undefined) {
body.data = job.data;
}
if (job.failurePolicy !== undefined) {
body.failurePolicy = toHTTPFailurePolicy(job.failurePolicy);
}
if (job.overwrite !== undefined) {
body.overwrite = job.overwrite;
}

await this.client.executeWithApiVersion("v1.0-alpha1", `/jobs/${job.name}`, {
method: "POST",
body,
});
}

async get(name: string): Promise<Job> {
const result = await this.client.executeWithApiVersion("v1.0-alpha1", `/jobs/${name}`, {
method: "GET",
});

const raw = result as KeyValueType;
const job: Job = { name };

if (raw.schedule !== undefined) {
job.schedule = raw.schedule as string;
}
if (raw.dueTime !== undefined) {
job.dueTime = raw.dueTime as string;
}
if (raw.repeats !== undefined) {
job.repeats = raw.repeats as number;
}
if (raw.ttl !== undefined) {
job.ttl = raw.ttl as string;
}
if (raw.data !== undefined) {
job.data = raw.data;
}
if (raw.failurePolicy !== undefined) {
const fp = raw.failurePolicy as KeyValueType;
if (fp.drop !== undefined) {
job.failurePolicy = { type: "drop" };
} else if (fp.constant !== undefined) {
const constant = fp.constant as KeyValueType;
const policy: JobFailurePolicy & { type: "constant" } = { type: "constant" };
if (constant.interval !== undefined) {
policy.interval = constant.interval as string;
}
if (constant.maxRetries !== undefined) {
policy.maxRetries = constant.maxRetries as number;
}
job.failurePolicy = policy;
}
}

return job;
}

async delete(name: string): Promise<void> {
await this.client.executeWithApiVersion("v1.0-alpha1", `/jobs/${name}`, {
method: "DELETE",
});
}
}
9 changes: 9 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ import StateConcurrencyEnum from "./enum/StateConcurrency.enum";
import StateConsistencyEnum from "./enum/StateConsistency.enum";
import { StateGetBulkOptions } from "./types/state/StateGetBulkOptions.type";

import { Job } from "./types/jobs/Job";
import { ScheduleJobRequest } from "./types/jobs/ScheduleJobRequest";
import { JobFailurePolicy } from "./types/jobs/JobFailurePolicy";
import { JobEvent } from "./types/jobs/JobEvent";

import DaprWorkflowClient from "./workflow/client/DaprWorkflowClient";
import WorkflowActivityContext from "./workflow/runtime/WorkflowActivityContext";
import WorkflowContext from "./workflow/runtime/WorkflowContext";
Expand Down Expand Up @@ -79,6 +84,10 @@ export {
StateConsistencyEnum,
PubSubBulkPublishResponse,
StateGetBulkOptions,
Job,
ScheduleJobRequest,
JobFailurePolicy,
JobEvent,
DaprWorkflowClient,
WorkflowActivityContext,
WorkflowContext,
Expand Down
Loading
Loading