diff --git a/src/implementation/Client/DaprClient.ts b/src/implementation/Client/DaprClient.ts index 3a3b1ad61..23df741a3 100644 --- a/src/implementation/Client/DaprClient.ts +++ b/src/implementation/Client/DaprClient.ts @@ -25,6 +25,7 @@ import IClientPubSub from "../../interfaces/Client/IClientPubSub"; import IClientSecret from "../../interfaces/Client/IClientSecret"; import IClientSidecar from "../../interfaces/Client/IClientSidecar"; import IClientState from "../../interfaces/Client/IClientState"; +import IClientJobs from "../../interfaces/Client/IClientJobs"; import IClientWorkflow from "../../interfaces/Client/IClientWorkflow"; import GRPCClient from "./GRPCClient/GRPCClient"; @@ -40,6 +41,7 @@ import GRPCClientPubSub from "./GRPCClient/pubsub"; import GRPCClientSecret from "./GRPCClient/secret"; import GRPCClientSidecar from "./GRPCClient/sidecar"; import GRPCClientState from "./GRPCClient/state"; +import GRPCClientJobs from "./GRPCClient/jobs"; import GRPCClientWorkflow from "./GRPCClient/workflow"; import HTTPClient from "./HTTPClient/HTTPClient"; @@ -56,6 +58,7 @@ import HTTPClientPubSub from "./HTTPClient/pubsub"; import HTTPClientSecret from "./HTTPClient/secret"; import HTTPClientSidecar from "./HTTPClient/sidecar"; import HTTPClientState from "./HTTPClient/state"; +import HTTPClientJobs from "./HTTPClient/jobs"; import HTTPClientWorkflow from "./HTTPClient/workflow"; import CommunicationProtocolEnum from "../../enum/CommunicationProtocol.enum"; @@ -75,6 +78,7 @@ export default class DaprClient { readonly crypto: IClientCrypto; readonly health: IClientHealth; readonly invoker: IClientInvoker; + readonly jobs: IClientJobs; readonly lock: IClientLock; readonly metadata: IClientMetadata; readonly proxy: IClientProxy; @@ -115,6 +119,7 @@ export default class DaprClient { this.sidecar = new GRPCClientSidecar(client); this.proxy = new GRPCClientProxy(client); this.configuration = new GRPCClientConfiguration(client); + this.jobs = new GRPCClientJobs(client); this.lock = new GRPCClientLock(client); this.crypto = new GRPCClientCrypto(client); this.actor = new GRPCClientActor(client); // we use an abstractor here since we interface through a builder with the Actor Runtime @@ -132,6 +137,7 @@ export default class DaprClient { this.crypto = new HTTPClientCrypto(client); this.health = new HTTPClientHealth(client); this.invoker = new HTTPClientInvoker(client); + this.jobs = new HTTPClientJobs(client); this.lock = new HTTPClientLock(client); this.metadata = new HTTPClientMetadata(client); this.proxy = new HTTPClientProxy(client); diff --git a/src/implementation/Client/GRPCClient/jobs.ts b/src/implementation/Client/GRPCClient/jobs.ts new file mode 100644 index 000000000..69c0f6f43 --- /dev/null +++ b/src/implementation/Client/GRPCClient/jobs.ts @@ -0,0 +1,189 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { create } from "@bufbuild/protobuf"; +import { AnySchema, DurationSchema } from "@bufbuild/protobuf/wkt"; +import GRPCClient from "./GRPCClient"; +import IClientJobs from "../../../interfaces/Client/IClientJobs"; +import { Job } from "../../../types/jobs/Job"; +import { JobFailurePolicy } from "../../../types/jobs/JobFailurePolicy"; +import { ScheduleJobRequest } from "../../../types/jobs/ScheduleJobRequest"; +import { + JobSchema, + ScheduleJobRequestSchema, + GetJobRequestSchema, + DeleteJobRequestSchema, +} from "../../../proto/dapr/proto/runtime/v1/dapr_pb"; +import { + JobFailurePolicySchema, + JobFailurePolicyDropSchema, + JobFailurePolicyConstantSchema, +} from "../../../proto/dapr/proto/common/v1/common_pb"; +import type { JobFailurePolicy as ProtoJobFailurePolicy } from "../../../proto/dapr/proto/common/v1/common_pb"; +import type { Job as ProtoJob } from "../../../proto/dapr/proto/runtime/v1/dapr_pb"; + +/** + * Parse a duration string like "5s", "1m30s", "2h" into total seconds. + * Supports h, m, s units. Throws on unrecognized formats. + */ +function parseDurationSeconds(duration: string): number { + let total = 0; + let matched = false; + for (const match of duration.matchAll(/(\d+)(h|m|s)/g)) { + matched = true; + const value = parseInt(match[1], 10); + switch (match[2]) { + case "h": + total += value * 3600; + break; + case "m": + total += value * 60; + break; + case "s": + total += value; + break; + } + } + if (!matched) { + throw new Error(`Invalid duration format: "${duration}". Expected format like "5s", "1m30s", "2h".`); + } + return total; +} + +/** + * Convert SDK JobFailurePolicy to proto JobFailurePolicy. + */ +function toProtoFailurePolicy(fp: JobFailurePolicy): ProtoJobFailurePolicy { + if (fp.type === "drop") { + return create(JobFailurePolicySchema, { + policy: { case: "drop", value: create(JobFailurePolicyDropSchema) }, + }); + } + const constantPolicy: Record = {}; + if (fp.interval !== undefined) { + constantPolicy.interval = create(DurationSchema, { seconds: BigInt(parseDurationSeconds(fp.interval)) }); + } + if (fp.maxRetries !== undefined) { + constantPolicy.maxRetries = fp.maxRetries; + } + return create(JobFailurePolicySchema, { + policy: { case: "constant", value: create(JobFailurePolicyConstantSchema, constantPolicy) }, + }); +} + +/** + * Convert proto Job to SDK Job type. + */ +function fromProtoJob(protoJob: ProtoJob): Job { + const job: Job = { name: protoJob.name }; + + if (protoJob.schedule !== undefined) { + job.schedule = protoJob.schedule; + } + if (protoJob.dueTime !== undefined) { + job.dueTime = protoJob.dueTime; + } + if (protoJob.repeats !== undefined) { + job.repeats = protoJob.repeats; + } + if (protoJob.ttl !== undefined) { + job.ttl = protoJob.ttl; + } + + if (protoJob.data !== undefined && protoJob.data.value?.length) { + try { + job.data = JSON.parse(Buffer.from(protoJob.data.value).toString("utf-8")); + } catch { + job.data = Buffer.from(protoJob.data.value).toString("utf-8"); + } + } + + if (protoJob.failurePolicy !== undefined) { + const policy = protoJob.failurePolicy.policy; + if (policy.case === "drop") { + job.failurePolicy = { type: "drop" }; + } else if (policy.case === "constant") { + const constant = policy.value; + const fp: JobFailurePolicy & { type: "constant" } = { type: "constant" }; + if (constant.interval !== undefined) { + fp.interval = `${Number(constant.interval.seconds)}s`; + } + if (constant.maxRetries !== undefined) { + fp.maxRetries = constant.maxRetries; + } + job.failurePolicy = fp; + } + } + + return job; +} + +export default class GRPCClientJobs implements IClientJobs { + client: GRPCClient; + + constructor(client: GRPCClient) { + this.client = client; + } + + async schedule(job: ScheduleJobRequest): Promise { + const client = await this.client.getClient(); + + const protoJobFields: Record = { + name: job.name, + }; + + if (job.schedule !== undefined) { + protoJobFields.schedule = job.schedule; + } + if (job.dueTime !== undefined) { + protoJobFields.dueTime = job.dueTime; + } + if (job.repeats !== undefined) { + protoJobFields.repeats = job.repeats; + } + if (job.ttl !== undefined) { + protoJobFields.ttl = job.ttl; + } + if (job.data !== undefined) { + protoJobFields.data = create(AnySchema, { + value: Buffer.from(JSON.stringify(job.data), "utf-8"), + }); + } + if (job.failurePolicy !== undefined) { + protoJobFields.failurePolicy = toProtoFailurePolicy(job.failurePolicy); + } + + const req = create(ScheduleJobRequestSchema, { + job: create(JobSchema, protoJobFields), + overwrite: job.overwrite ?? false, + }); + + await client.scheduleJobAlpha1(req); + } + + async get(name: string): Promise { + const client = await this.client.getClient(); + const res = await client.getJobAlpha1(create(GetJobRequestSchema, { name })); + + if (!res.job) { + throw new Error(`Job '${name}' not found in response`); + } + + return fromProtoJob(res.job); + } + + async delete(name: string): Promise { + const client = await this.client.getClient(); + await client.deleteJobAlpha1(create(DeleteJobRequestSchema, { name })); + } +} diff --git a/src/implementation/Client/HTTPClient/jobs.ts b/src/implementation/Client/HTTPClient/jobs.ts new file mode 100644 index 000000000..a46814745 --- /dev/null +++ b/src/implementation/Client/HTTPClient/jobs.ts @@ -0,0 +1,124 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import HTTPClient from "./HTTPClient"; +import IClientJobs from "../../../interfaces/Client/IClientJobs"; +import { Job } from "../../../types/jobs/Job"; +import { JobFailurePolicy } from "../../../types/jobs/JobFailurePolicy"; +import { ScheduleJobRequest } from "../../../types/jobs/ScheduleJobRequest"; +import { KeyValueType } from "../../../types/KeyValue.type"; + +/** + * Serialize SDK JobFailurePolicy to the HTTP API shape. + */ +function toHTTPFailurePolicy(fp: JobFailurePolicy): KeyValueType { + if (fp.type === "drop") { + return { drop: {} }; + } + const constant: KeyValueType = {}; + if (fp.interval !== undefined) { + constant.interval = fp.interval; + } + if (fp.maxRetries !== undefined) { + constant.maxRetries = fp.maxRetries; + } + return { constant }; +} + +export default class HTTPClientJobs implements IClientJobs { + client: HTTPClient; + + constructor(client: HTTPClient) { + this.client = client; + } + + async schedule(job: ScheduleJobRequest): Promise { + const body: KeyValueType = {}; + + if (job.schedule !== undefined) { + body.schedule = job.schedule; + } + if (job.dueTime !== undefined) { + body.dueTime = job.dueTime; + } + if (job.repeats !== undefined) { + body.repeats = job.repeats; + } + if (job.ttl !== undefined) { + body.ttl = job.ttl; + } + if (job.data !== undefined) { + body.data = job.data; + } + if (job.failurePolicy !== undefined) { + body.failurePolicy = toHTTPFailurePolicy(job.failurePolicy); + } + if (job.overwrite !== undefined) { + body.overwrite = job.overwrite; + } + + await this.client.executeWithApiVersion("v1.0-alpha1", `/jobs/${job.name}`, { + method: "POST", + body, + }); + } + + async get(name: string): Promise { + const result = await this.client.executeWithApiVersion("v1.0-alpha1", `/jobs/${name}`, { + method: "GET", + }); + + const raw = result as KeyValueType; + const job: Job = { name }; + + if (raw.schedule !== undefined) { + job.schedule = raw.schedule as string; + } + if (raw.dueTime !== undefined) { + job.dueTime = raw.dueTime as string; + } + if (raw.repeats !== undefined) { + job.repeats = raw.repeats as number; + } + if (raw.ttl !== undefined) { + job.ttl = raw.ttl as string; + } + if (raw.data !== undefined) { + job.data = raw.data; + } + if (raw.failurePolicy !== undefined) { + const fp = raw.failurePolicy as KeyValueType; + if (fp.drop !== undefined) { + job.failurePolicy = { type: "drop" }; + } else if (fp.constant !== undefined) { + const constant = fp.constant as KeyValueType; + const policy: JobFailurePolicy & { type: "constant" } = { type: "constant" }; + if (constant.interval !== undefined) { + policy.interval = constant.interval as string; + } + if (constant.maxRetries !== undefined) { + policy.maxRetries = constant.maxRetries as number; + } + job.failurePolicy = policy; + } + } + + return job; + } + + async delete(name: string): Promise { + await this.client.executeWithApiVersion("v1.0-alpha1", `/jobs/${name}`, { + method: "DELETE", + }); + } +} diff --git a/src/implementation/Server/DaprServer.ts b/src/implementation/Server/DaprServer.ts index 14326a6fb..68a34eb32 100644 --- a/src/implementation/Server/DaprServer.ts +++ b/src/implementation/Server/DaprServer.ts @@ -16,6 +16,7 @@ import IServerPubSub from "../../interfaces/Server/IServerPubSub"; import IServerBinding from "../../interfaces/Server/IServerBinding"; import IServerInvoker from "../../interfaces/Server/IServerInvoker"; import IServerActor from "../../interfaces/Server/IServerActor"; +import IServerJobs from "../../interfaces/Server/IServerJobs"; import CommunicationProtocolEnum from "../../enum/CommunicationProtocol.enum"; import GRPCServer from "./GRPCServer/GRPCServer"; @@ -23,12 +24,14 @@ import GRPCServerPubSub from "./GRPCServer/pubsub"; import GRPCServerBinding from "./GRPCServer/binding"; import GRPCServerInvoker from "./GRPCServer/invoker"; import GRPCServerActor from "./GRPCServer/actor"; +import GRPCServerJobs from "./GRPCServer/jobs"; import HTTPServer from "./HTTPServer/HTTPServer"; import HTTPServerPubSub from "./HTTPServer/pubsub"; import HTTPServerBinding from "./HTTPServer/binding"; import HTTPServerInvoker from "./HTTPServer/invoker"; import HTTPServerActor from "./HTTPServer/actor"; +import HTTPServerJobs from "./HTTPServer/jobs"; import { Settings } from "../../utils/Settings.util"; import { DaprServerOptions } from "../../types/DaprServerOptions"; import DaprClient from "../Client/DaprClient"; @@ -43,6 +46,7 @@ export default class DaprServer { readonly binding: IServerBinding; readonly invoker: IServerInvoker; readonly actor: IServerActor; + readonly jobs: IServerJobs; readonly client: DaprClient; constructor(serverOptions: Partial = {}) { @@ -90,6 +94,7 @@ export default class DaprServer { this.binding = new GRPCServerBinding(server); this.invoker = new GRPCServerInvoker(server); this.actor = new GRPCServerActor(server); + this.jobs = new GRPCServerJobs(server); break; } case CommunicationProtocolEnum.HTTP: @@ -101,6 +106,7 @@ export default class DaprServer { this.binding = new HTTPServerBinding(server); this.invoker = new HTTPServerInvoker(server); this.actor = new HTTPServerActor(server, this.client); + this.jobs = new HTTPServerJobs(server); break; } } diff --git a/src/implementation/Server/GRPCServer/GRPCServerImpl.ts b/src/implementation/Server/GRPCServer/GRPCServerImpl.ts index 8f2f1a3de..25192c249 100644 --- a/src/implementation/Server/GRPCServer/GRPCServerImpl.ts +++ b/src/implementation/Server/GRPCServer/GRPCServerImpl.ts @@ -69,6 +69,7 @@ export default class GRPCServerImpl { handlersInvoke: { [key: string]: DaprInvokerCallbackFunction }; handlersBindings: { [key: string]: TypeDaprBindingCallback }; + handlersJobs: { [key: string]: (data: unknown) => Promise }; constructor(_server: IServerType, loggerOptions?: LoggerOptions) { this.logger = new Logger("GRPCServer", "GRPCServerImpl", loggerOptions); @@ -76,6 +77,7 @@ export default class GRPCServerImpl { this.handlersInvoke = {}; this.handlersBindings = {}; + this.handlersJobs = {}; } createInputBindingHandlerKey(bindingName: string): string { @@ -115,6 +117,10 @@ export default class GRPCServerImpl { this.handlersBindings[handlerKey] = cb; } + registerJobEventHandler(jobName: string, cb: (data: unknown) => Promise): void { + this.handlersJobs[jobName] = cb; + } + getSubscriptions(): PubSubSubscriptionsType { return this.subscriptionManager.getSubscriptions(); } @@ -362,10 +368,12 @@ export default class GRPCServerImpl { if (daprConfig?.routes?.rules) { for (const ruleItem of daprConfig.routes.rules) { - routes.rules.push(create(TopicRuleSchema, { - match: ruleItem.match, - path: ruleItem.path, - })); + routes.rules.push( + create(TopicRuleSchema, { + match: ruleItem.match, + path: ruleItem.path, + }), + ); } } @@ -393,7 +401,32 @@ export default class GRPCServerImpl { return create(HealthCheckResponseSchema); } - async onJobEventAlpha1(_request: JobEventRequest, _context: HandlerContext): Promise { + async onJobEventAlpha1(request: JobEventRequest, _context: HandlerContext): Promise { + // Prefer method field (always "job/" per Dapr spec); fall back to name field. + const jobName = request.method.startsWith("job/") ? request.method.slice("job/".length) : request.name; + + if (!this.handlersJobs[jobName]) { + this.logger.warn(`Event for job: "${jobName}" was not handled`); + return create(JobEventResponseSchema); + } + + // Deserialize Any data + let data: unknown; + if (request.data?.value?.length) { + try { + data = JSON.parse(Buffer.from(request.data.value).toString()); + } catch { + data = Buffer.from(request.data.value).toString(); + } + } + + try { + await this.handlersJobs[jobName](data); + } catch (e) { + this.logger.error(`Job handler failed for job '${jobName}': ${e}`); + throw e; + } + return create(JobEventResponseSchema); } } diff --git a/src/implementation/Server/GRPCServer/jobs.ts b/src/implementation/Server/GRPCServer/jobs.ts new file mode 100644 index 000000000..1b0b572bd --- /dev/null +++ b/src/implementation/Server/GRPCServer/jobs.ts @@ -0,0 +1,34 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import GRPCServer from "./GRPCServer"; +import IServerJobs, { JobEventHandler } from "../../../interfaces/Server/IServerJobs"; +import { Logger } from "../../../logger/Logger"; + +// https://docs.dapr.io/reference/api/jobs_api/ +export default class GRPCServerJobs implements IServerJobs { + server: GRPCServer; + private readonly logger: Logger; + + constructor(server: GRPCServer) { + this.server = server; + this.logger = new Logger("GRPCServer", "Jobs", server.client.options.logger); + } + + async addJobEventHandler(name: string, handler: JobEventHandler): Promise { + this.logger.info(`Registering gRPC onJobEvent Handler: Job = ${name}`); + this.server.getServerImpl().registerJobEventHandler(name, async (data: unknown) => { + await handler({ name, data }); + }); + } +} diff --git a/src/implementation/Server/HTTPServer/jobs.ts b/src/implementation/Server/HTTPServer/jobs.ts new file mode 100644 index 000000000..320353a43 --- /dev/null +++ b/src/implementation/Server/HTTPServer/jobs.ts @@ -0,0 +1,55 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import HTTPServer from "./HTTPServer"; +import HttpStatusCode from "../../../enum/HttpStatusCode.enum"; +import IServerJobs, { JobEventHandler } from "../../../interfaces/Server/IServerJobs"; +import { Logger } from "../../../logger/Logger"; + +// https://docs.dapr.io/reference/api/jobs_api/ +export default class HTTPServerJobs implements IServerJobs { + private readonly server: HTTPServer; + private readonly logger: Logger; + + constructor(server: HTTPServer) { + this.server = server; + this.logger = new Logger("HTTPServer", "Jobs", server.client.options.logger); + } + + async addJobEventHandler(name: string, handler: JobEventHandler): Promise { + const server = await this.server.getServer(); + + server.options(`/job/${name}`, async (_req, res) => { + return res.end(); + }); + + server.post(`/job/${name}`, async (req, res) => { + req.setTimeout(60 * 1000); + + try { + await handler({ name, data: req?.body }); + res.statusCode = HttpStatusCode.OK; + return res.end(); + } catch (e) { + this.logger.error(`Job handler failed for '${name}': ${e}`); + res.statusCode = HttpStatusCode.INTERNAL_SERVER_ERROR; + return res.end( + JSON.stringify({ + error: "JOB_HANDLER_FAILED", + error_msg: `Error processing job '${name}'`, + }), + ); + } + }); + } +} diff --git a/src/index.ts b/src/index.ts index c90de5eb4..2ac280a23 100644 --- a/src/index.ts +++ b/src/index.ts @@ -40,6 +40,12 @@ import StateConcurrencyEnum from "./enum/StateConcurrency.enum"; import StateConsistencyEnum from "./enum/StateConsistency.enum"; import { StateGetBulkOptions } from "./types/state/StateGetBulkOptions.type"; +import { Job } from "./types/jobs/Job"; +import { ScheduleJobRequest } from "./types/jobs/ScheduleJobRequest"; +import { JobFailurePolicy } from "./types/jobs/JobFailurePolicy"; +import { JobEvent } from "./types/jobs/JobEvent"; +import { JobEventHandler } from "./interfaces/Server/IServerJobs"; + import DaprWorkflowClient from "./workflow/client/DaprWorkflowClient"; import WorkflowActivityContext from "./workflow/runtime/WorkflowActivityContext"; import WorkflowContext from "./workflow/runtime/WorkflowContext"; @@ -79,6 +85,11 @@ export { StateConsistencyEnum, PubSubBulkPublishResponse, StateGetBulkOptions, + Job, + ScheduleJobRequest, + JobFailurePolicy, + JobEvent, + JobEventHandler, DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, diff --git a/src/interfaces/Client/IClientJobs.ts b/src/interfaces/Client/IClientJobs.ts new file mode 100644 index 000000000..1db9479ef --- /dev/null +++ b/src/interfaces/Client/IClientJobs.ts @@ -0,0 +1,38 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { Job } from "../../types/jobs/Job"; +import { ScheduleJobRequest } from "../../types/jobs/ScheduleJobRequest"; + +export default interface IClientJobs { + /** + * Create or update a scheduled job. + * @alpha Jobs API is Alpha1. + * @param job The job definition to schedule. + */ + schedule(job: ScheduleJobRequest): Promise; + + /** + * Retrieve a job definition by name. + * @alpha Jobs API is Alpha1. + * @param name The job name. + */ + get(name: string): Promise; + + /** + * Delete a job by name. + * @alpha Jobs API is Alpha1. + * @param name The job name. + */ + delete(name: string): Promise; +} diff --git a/src/interfaces/Server/IServerJobs.ts b/src/interfaces/Server/IServerJobs.ts new file mode 100644 index 000000000..46d228508 --- /dev/null +++ b/src/interfaces/Server/IServerJobs.ts @@ -0,0 +1,26 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { JobEvent } from "../../types/jobs/JobEvent"; + +export type JobEventHandler = (event: JobEvent) => Promise; + +export default interface IServerJobs { + /** + * Register a handler for a job event triggered by the Dapr sidecar. + * @alpha Jobs API is Alpha1. + * @param name The job name to handle. + * @param handler Callback invoked when the job triggers. + */ + addJobEventHandler(name: string, handler: JobEventHandler): Promise; +} diff --git a/src/types/jobs/Job.ts b/src/types/jobs/Job.ts new file mode 100644 index 000000000..c2bf1b9a6 --- /dev/null +++ b/src/types/jobs/Job.ts @@ -0,0 +1,24 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { JobFailurePolicy } from "./JobFailurePolicy"; + +export interface Job { + name: string; + schedule?: string; + dueTime?: string; + repeats?: number; + ttl?: string; + data?: unknown; + failurePolicy?: JobFailurePolicy; +} diff --git a/src/types/jobs/JobEvent.ts b/src/types/jobs/JobEvent.ts new file mode 100644 index 000000000..9580eccaf --- /dev/null +++ b/src/types/jobs/JobEvent.ts @@ -0,0 +1,25 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/** + * Event delivered to a job event handler when the sidecar triggers a job. + * Mirrors Go SDK's `common.JobEvent`. + * + * @alpha All Jobs APIs are Alpha1. + */ +export interface JobEvent { + /** The job name. */ + name: string; + /** The deserialized job payload. */ + data?: unknown; +} diff --git a/src/types/jobs/JobFailurePolicy.ts b/src/types/jobs/JobFailurePolicy.ts new file mode 100644 index 000000000..14af60bb3 --- /dev/null +++ b/src/types/jobs/JobFailurePolicy.ts @@ -0,0 +1,25 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/** + * Failure policy for a scheduled job. + * + * Maps to proto `common.v1.JobFailurePolicy` oneof: + * - `drop`: discard the job tick on failure + * - `constant`: retry with a fixed interval + * + * @alpha All Jobs APIs are Alpha1. + */ +export type JobFailurePolicy = + | { type: "drop" } + | { type: "constant"; interval?: string; maxRetries?: number }; diff --git a/src/types/jobs/ScheduleJobRequest.ts b/src/types/jobs/ScheduleJobRequest.ts new file mode 100644 index 000000000..7e74f86bb --- /dev/null +++ b/src/types/jobs/ScheduleJobRequest.ts @@ -0,0 +1,18 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { Job } from "./Job"; + +export interface ScheduleJobRequest extends Job { + overwrite?: boolean; +} diff --git a/test/unit/jobs/client.test.ts b/test/unit/jobs/client.test.ts new file mode 100644 index 000000000..2e8662723 --- /dev/null +++ b/test/unit/jobs/client.test.ts @@ -0,0 +1,192 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import HTTPClient from "../../../src/implementation/Client/HTTPClient/HTTPClient"; +import HTTPClientJobs from "../../../src/implementation/Client/HTTPClient/jobs"; +import { ScheduleJobRequest } from "../../../src/types/jobs/ScheduleJobRequest"; + +describe("HTTPClientJobs", () => { + let httpClient: HTTPClient; + let jobs: HTTPClientJobs; + let executeSpy: jest.SpyInstance; + + beforeEach(() => { + httpClient = new HTTPClient({ + daprHost: "127.0.0.1", + daprPort: "3500", + communicationProtocol: 0, + }); + jobs = new HTTPClientJobs(httpClient); + executeSpy = jest.spyOn(httpClient, "executeWithApiVersion").mockResolvedValue({}); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + describe("schedule()", () => { + it("should send a POST to /jobs/:name with the correct body and without name in body", async () => { + const job: ScheduleJobRequest = { + name: "my-job", + schedule: "0 * * * *", + repeats: 10, + }; + + await jobs.schedule(job); + + expect(executeSpy).toHaveBeenCalledTimes(1); + const [apiVersion, url, params] = executeSpy.mock.calls[0]; + expect(apiVersion).toBe("v1.0-alpha1"); + expect(url).toBe("/jobs/my-job"); + expect(params.method).toBe("POST"); + expect(params.body).not.toHaveProperty("name"); + expect(params.body.schedule).toBe("0 * * * *"); + expect(params.body.repeats).toBe(10); + }); + + it("should include overwrite in body when set", async () => { + const job: ScheduleJobRequest = { + name: "my-job", + schedule: "@every 5s", + overwrite: true, + }; + + await jobs.schedule(job); + + const [, , params] = executeSpy.mock.calls[0]; + expect(params.body.overwrite).toBe(true); + }); + + it("should serialize drop failure policy correctly", async () => { + const job: ScheduleJobRequest = { + name: "my-job", + schedule: "@every 1m", + failurePolicy: { type: "drop" }, + }; + + await jobs.schedule(job); + + const [, , params] = executeSpy.mock.calls[0]; + expect(params.body.failurePolicy).toEqual({ drop: {} }); + }); + + it("should serialize constant failure policy with interval and maxRetries", async () => { + const job: ScheduleJobRequest = { + name: "my-job", + schedule: "@every 1m", + failurePolicy: { type: "constant", interval: "5s", maxRetries: 3 }, + }; + + await jobs.schedule(job); + + const [, , params] = executeSpy.mock.calls[0]; + expect(params.body.failurePolicy).toEqual({ constant: { interval: "5s", maxRetries: 3 } }); + }); + + it("should serialize constant failure policy with only interval", async () => { + const job: ScheduleJobRequest = { + name: "my-job", + schedule: "@every 1m", + failurePolicy: { type: "constant", interval: "10s" }, + }; + + await jobs.schedule(job); + + const [, , params] = executeSpy.mock.calls[0]; + expect(params.body.failurePolicy).toEqual({ constant: { interval: "10s" } }); + }); + + it("should not include undefined fields in body", async () => { + const job: ScheduleJobRequest = { + name: "my-job", + dueTime: "2026-01-01T00:00:00Z", + }; + + await jobs.schedule(job); + + const [, , params] = executeSpy.mock.calls[0]; + expect(params.body).not.toHaveProperty("schedule"); + expect(params.body).not.toHaveProperty("repeats"); + expect(params.body).not.toHaveProperty("ttl"); + expect(params.body).not.toHaveProperty("failurePolicy"); + expect(params.body.dueTime).toBe("2026-01-01T00:00:00Z"); + }); + }); + + describe("get()", () => { + it("should send a GET to /jobs/:name and return a deserialized Job", async () => { + executeSpy.mockResolvedValueOnce({ + schedule: "@every 1m", + repeats: 5, + ttl: "1h", + }); + + const result = await jobs.get("my-job"); + + expect(executeSpy).toHaveBeenCalledTimes(1); + const [apiVersion, url, params] = executeSpy.mock.calls[0]; + expect(apiVersion).toBe("v1.0-alpha1"); + expect(url).toBe("/jobs/my-job"); + expect(params.method).toBe("GET"); + + expect(result.name).toBe("my-job"); + expect(result.schedule).toBe("@every 1m"); + expect(result.repeats).toBe(5); + expect(result.ttl).toBe("1h"); + }); + + it("should deserialize drop failure policy from HTTP response", async () => { + executeSpy.mockResolvedValueOnce({ + schedule: "@every 1m", + failurePolicy: { drop: {} }, + }); + + const result = await jobs.get("my-job"); + + expect(result.failurePolicy).toEqual({ type: "drop" }); + }); + + it("should deserialize constant failure policy from HTTP response", async () => { + executeSpy.mockResolvedValueOnce({ + schedule: "@every 1m", + failurePolicy: { constant: { interval: "5s", maxRetries: 3 } }, + }); + + const result = await jobs.get("my-job"); + + expect(result.failurePolicy).toEqual({ type: "constant", interval: "5s", maxRetries: 3 }); + }); + + it("should return job without optional fields when not present in response", async () => { + executeSpy.mockResolvedValueOnce({}); + + const result = await jobs.get("bare-job"); + + expect(result.name).toBe("bare-job"); + expect(result.schedule).toBeUndefined(); + expect(result.failurePolicy).toBeUndefined(); + }); + }); + + describe("delete()", () => { + it("should send a DELETE to /jobs/:name", async () => { + await jobs.delete("my-job"); + + expect(executeSpy).toHaveBeenCalledTimes(1); + const [apiVersion, url, params] = executeSpy.mock.calls[0]; + expect(apiVersion).toBe("v1.0-alpha1"); + expect(url).toBe("/jobs/my-job"); + expect(params.method).toBe("DELETE"); + }); + }); +}); diff --git a/test/unit/jobs/server.test.ts b/test/unit/jobs/server.test.ts new file mode 100644 index 000000000..2fdeeedab --- /dev/null +++ b/test/unit/jobs/server.test.ts @@ -0,0 +1,142 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import HTTPServerJobs from "../../../src/implementation/Server/HTTPServer/jobs"; +import { JobEvent } from "../../../src/types/jobs/JobEvent"; +import { JobEventHandler } from "../../../src/interfaces/Server/IServerJobs"; +import HttpStatusCode from "../../../src/enum/HttpStatusCode.enum"; + +describe("HTTPServerJobs", () => { + /** Build a minimal mock HTTPServer that captures registered express routes. */ + const makeServer = () => { + const routes: { path: string; handler: Function }[] = []; + + const mockExpressApp = { + post: (path: string, handler: Function) => { + routes.push({ path, handler }); + }, + }; + + const mockHTTPServer = { + getServer: jest.fn().mockResolvedValue(mockExpressApp), + client: { + options: { + logger: undefined, + }, + }, + } as any; + + return { mockHTTPServer, routes }; + }; + + /** Simulate an express request/response pair. */ + const makeReqRes = (body: unknown = {}) => { + const req = { + body, + setTimeout: jest.fn(), + }; + const res = { + statusCode: 0, + end: jest.fn().mockReturnValue(undefined), + }; + return { req, res }; + }; + + describe("addJobEventHandler()", () => { + it("registers a POST route at /job/", async () => { + const { mockHTTPServer, routes } = makeServer(); + const jobs = new HTTPServerJobs(mockHTTPServer); + + await jobs.addJobEventHandler("my-job", jest.fn()); + + expect(routes).toHaveLength(1); + expect(routes[0].path).toBe("/job/my-job"); + }); + + it("dispatches a JobEvent with the correct name and body data", async () => { + const { mockHTTPServer, routes } = makeServer(); + const jobs = new HTTPServerJobs(mockHTTPServer); + + const received: JobEvent[] = []; + const handler: JobEventHandler = async (event) => { + received.push(event); + }; + + await jobs.addJobEventHandler("process-order", handler); + + const { req, res } = makeReqRes({ orderId: 42 }); + await routes[0].handler(req, res); + + expect(received).toHaveLength(1); + expect(received[0].name).toBe("process-order"); + expect(received[0].data).toEqual({ orderId: 42 }); + expect(res.statusCode).toBe(HttpStatusCode.OK); + }); + + it("returns 500 and error JSON when the handler throws", async () => { + const { mockHTTPServer, routes } = makeServer(); + const jobs = new HTTPServerJobs(mockHTTPServer); + + const failingHandler: JobEventHandler = async () => { + throw new Error("handler error"); + }; + + await jobs.addJobEventHandler("failing-job", failingHandler); + + const { req, res } = makeReqRes({}); + await routes[0].handler(req, res); + + expect(res.statusCode).toBe(HttpStatusCode.INTERNAL_SERVER_ERROR); + const body = JSON.parse(res.end.mock.calls[0][0]); + expect(body.error).toBe("JOB_HANDLER_FAILED"); + expect(body.error_msg).toContain("failing-job"); + }); + + it("registers distinct routes for different job names", async () => { + const { mockHTTPServer, routes } = makeServer(); + const jobs = new HTTPServerJobs(mockHTTPServer); + + await jobs.addJobEventHandler("job-alpha", jest.fn()); + await jobs.addJobEventHandler("job-beta", jest.fn()); + + const paths = routes.map((r) => r.path); + expect(paths).toContain("/job/job-alpha"); + expect(paths).toContain("/job/job-beta"); + }); + + it("dispatches to the correct handler when multiple jobs are registered", async () => { + const { mockHTTPServer, routes } = makeServer(); + const jobs = new HTTPServerJobs(mockHTTPServer); + + const alphaEvents: JobEvent[] = []; + const betaEvents: JobEvent[] = []; + + await jobs.addJobEventHandler("job-alpha", async (e) => alphaEvents.push(e)); + await jobs.addJobEventHandler("job-beta", async (e) => betaEvents.push(e)); + + const alphaRoute = routes.find((r) => r.path === "/job/job-alpha")!; + const betaRoute = routes.find((r) => r.path === "/job/job-beta")!; + + const { req: reqA, res: resA } = makeReqRes({ src: "alpha" }); + const { req: reqB, res: resB } = makeReqRes({ src: "beta" }); + + await alphaRoute.handler(reqA, resA); + await betaRoute.handler(reqB, resB); + + expect(alphaEvents).toHaveLength(1); + expect(alphaEvents[0].name).toBe("job-alpha"); + expect(betaEvents).toHaveLength(1); + expect(betaEvents[0].name).toBe("job-beta"); + }); + }); +});