Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/implementation/Client/GRPCClient/GRPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ export default class GRPCClient implements IClient {
interceptors.push(this.generateInterceptor());
}

const baseUrl = `${this.daprEndpoint.tls ? "https" : "http"}://${this.daprEndpoint.hostname}:${this.daprEndpoint.port}`;
const baseUrl = `${this.daprEndpoint.tls ? "https" : "http"}://${this.daprEndpoint.hostname}:${
this.daprEndpoint.port
}`;
this.logger.info(`Opening connection to ${this.options.daprHost}:${this.options.daprPort}`);

const transport = createGrpcTransport({
Expand Down
7 changes: 7 additions & 0 deletions src/implementation/Client/GRPCClient/invoker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ export default class GRPCClientInvoker implements IClientInvoker {
});

const { serializedData, contentType } = SerializerUtil.serializeGrpc(data);
const maxBytes = (this.client.options.maxBodySizeMb ?? 4) * 1024 * 1024;

const payloadSize = serializedData.length;

if (payloadSize > maxBytes) {
throw new Error(`Payload size ${payloadSize} exceeds maxBodySizeMb (${maxBytes})`);
}
const msgSerialized = create(AnySchema, { value: serializedData });

const msgInvoke = create(InvokeRequestSchema, {
Expand Down
58 changes: 43 additions & 15 deletions test/e2e/grpc/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
} from "../helpers/containers";

const serverHost = "127.0.0.1";
const serverPort = "3001";
const serverPort = "4001";
const daprAppId = "test-suite-grpc-server";

describe("grpc/server", () => {
Expand All @@ -43,10 +43,7 @@ describe("grpc/server", () => {

beforeAll(async () => {
network = await new Network().start();
[redisContainer, mqttContainer] = await Promise.all([
startRedisContainer(network),
startMqttContainer(network),
]);
[redisContainer, mqttContainer] = await Promise.all([startRedisContainer(network), startMqttContainer(network)]);

// The Dapr container calls back to the gRPC app server on the host.
await TestContainers.exposeHostPorts(parseInt(serverPort));
Expand Down Expand Up @@ -83,7 +80,7 @@ describe("grpc/server", () => {
.withNetwork(network)
.withAppId(daprAppId)
.withAppPort(parseInt(serverPort))
.withAppChannelAddress("host.testcontainers.internal")
.withAppChannelAddress(process.env.CI ? "localhost" : "host.docker.internal")
.withDaprLogLevel("info")
.withMaxRequestSizeMb(10)
.withComponent(buildBindingMqttComponent())
Expand Down Expand Up @@ -167,20 +164,51 @@ describe("grpc/server", () => {
);

it("should be able to receive payloads larger than 4 MB", async () => {
await new Promise((resolve, _reject) => setTimeout(resolve, 1000));
// 5 MB is above Dapr's default 4 MB limit but below our 10 MB sidecar limit.
const payload = new Uint8Array(5 * 1024 * 1024);

const res = await server.client.invoker.invoke(daprAppId, "test-invoker", HttpMethod.POST, payload);

expect(res).toBeDefined();
expect(mockInvoker).toHaveBeenCalled();
});

it("should fail for payload > 4MB when maxBodySizeMb is NOT set (default behavior)", async () => {
// Create a NEW container WITHOUT override
const defaultDaprContainer = await new DaprGrpcAppContainer()
.withNetwork(network)
.withAppId(daprAppId)
.withAppPort(parseInt(serverPort))
.withAppChannelAddress("host.docker.internal")
.withDaprLogLevel("info")
// ❌ NO .withMaxRequestSizeMb()
.withComponent(buildBindingMqttComponent())
.withComponent(buildBindingRedisComponent())
.withComponent(buildConfigRedisComponent())
.start();

const client = new DaprClient({
daprHost: defaultDaprContainer.getHost(),
daprPort: defaultDaprContainer.getGrpcPort().toString(),
communicationProtocol: CommunicationProtocolEnum.GRPC,
});

await client.start();
await new Promise((resolve) => setTimeout(resolve, 2000));

const payload = new Uint8Array(5 * 1024 * 1024); // 5MB (>4MB default)

let error: any;
try {
const res = await server.client.invoker.invoke(daprAppId, "test-invoker", HttpMethod.POST, payload);
console.log(res);
} catch (e) {
console.log(e);
await client.invoker.invoke(daprAppId, "test-invoker", HttpMethod.POST, payload);
} catch (e: any) {
error = e;
}

// Delay a bit for event to arrive
await new Promise((resolve, _reject) => setTimeout(resolve, 250));
});
expect(error.message).toBeDefined();

await client.stop().catch(() => {});
await defaultDaprContainer.stop();
}, 20000);
});

describe("binding", () => {
Expand Down
19 changes: 8 additions & 11 deletions test/e2e/helpers/DaprGrpcAppContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,8 @@ import {
StoppedTestContainer,
Wait,
} from "testcontainers";
import {
Component,
DaprPlacementContainer,
DaprSchedulerContainer,
} from "@dapr/testcontainer-node";
import {
DAPR_TEST_RUNTIME_IMAGE,
DAPR_TEST_PLACEMENT_IMAGE,
DAPR_TEST_SCHEDULER_IMAGE,
} from "./containers";
import { Component, DaprPlacementContainer, DaprSchedulerContainer } from "@dapr/testcontainer-node";
import { DAPR_TEST_RUNTIME_IMAGE, DAPR_TEST_PLACEMENT_IMAGE, DAPR_TEST_SCHEDULER_IMAGE } from "./containers";

const DAPRD_HTTP_PORT = 3500;
const DAPRD_GRPC_PORT = 50001;
Expand Down Expand Up @@ -180,7 +172,12 @@ export class DaprGrpcAppContainer extends GenericContainer {
cmds.push("--app-port", this.appPort.toString());
}
if (this.maxRequestSizeMb !== undefined) {
cmds.push("--dapr-http-max-request-size", this.maxRequestSizeMb.toString());
const size = `${this.maxRequestSizeMb}Mi`;

// Only override when explicitly requested
cmds.push("--max-body-size", size);
} else {
// DO NOTHING → let Dapr default (4MB) remain
}

this.withCommand(cmds);
Expand Down
5 changes: 1 addition & 4 deletions test/e2e/http/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ describe("http/server", () => {

beforeAll(async () => {
network = await new Network().start();
[redisContainer, mqttContainer] = await Promise.all([
startRedisContainer(network),
startMqttContainer(network),
]);
[redisContainer, mqttContainer] = await Promise.all([startRedisContainer(network), startMqttContainer(network)]);

// Allow the Dapr container to call back to the app server on the host.
await TestContainers.exposeHostPorts(parseInt(serverPort));
Expand Down
Loading