Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/releasing.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ into temporary consumers, and verify:

- Node end-to-end usage of `@prisma/streams-local`
- Bun end-to-end usage of `@prisma/streams-local`, including the live `/touch/*` path
- stateful local-runtime reopen flows that must read `/_schema` and skip
duplicate first-schema installs when the registry already matches
- local package exposure of `GET /v1/server/_details` and `GET /v1/stream/{name}/_routing_keys`
- Bun CLI startup for `@prisma/streams-server`

Expand Down
4 changes: 4 additions & 0 deletions docs/schemas.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ Important rule:
- a search-only update requires an already-installed schema version
- if you are installing the first schema for a stream, install `schema` and
`search` together in the same `_schema` request
- first-schema installation is not idempotent after data exists; stateful
clients that reopen an existing stream must `GET /_schema` first and skip the
install when the current registry already matches the desired schema/search
configuration

Not supported:

Expand Down
127 changes: 127 additions & 0 deletions scripts/test-bun-local-package.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,45 @@ const server = await startLocalDurableStreamsServer({

const baseUrl = server.exports.http.url;
const stream = "state";
const schemaStream = "schema-reopen";
const schemaUpdate = {
schema: {
type: "object",
additionalProperties: false,
required: ["repo"],
properties: {
repo: { type: "string" },
},
},
};

async function fetchJson(url, init) {
const res = await fetch(url, init);
const text = await res.text();
return { status: res.status, body: text ? JSON.parse(text) : null };
}

async function ensureSchemaInstalled(baseUrl, stream, update) {
const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" });
if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`);

const currentSchema = current.body?.schemas?.["1"] ?? null;
const alreadyMatches =
current.body?.currentVersion === 1 &&
JSON.stringify(currentSchema) === JSON.stringify(update.schema) &&
JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) &&
JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null);

if (alreadyMatches) return;

const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(update),
});
if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`);
}

try {
const serverDetails = await fetchJson(\`\${baseUrl}/v1/server/_details\`, { method: "GET" });
if (serverDetails.status !== 200) throw new Error(\`/v1/server/_details failed: \${serverDetails.status}\`);
Expand Down Expand Up @@ -144,6 +176,23 @@ try {
}
}

{
const res = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
method: "PUT",
headers: { "content-type": "application/json" },
});
if (res.status !== 201 && res.status !== 200) throw new Error(\`schema stream PUT failed: \${res.status}\`);

await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate);

const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify([{ repo: "alpha/repo" }]),
});
if (append.status !== 204) throw new Error(\`schema stream append failed: \${append.status}\`);
}

const activate = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/touch/templates/activate\`, {
method: "POST",
headers: { "content-type": "application/json" },
Expand Down Expand Up @@ -213,6 +262,84 @@ try {
);

run("bun", ["consumer.mjs"], consumerDir);

writeFileSync(
join(consumerDir, "consumer-reopen.mjs"),
`
import { startLocalDurableStreamsServer } from "@prisma/streams-local";

const server = await startLocalDurableStreamsServer({
name: "${localServerName}",
port: 0,
hostname: "127.0.0.1",
});

const baseUrl = server.exports.http.url;
const schemaStream = "schema-reopen";
const schemaUpdate = {
schema: {
type: "object",
additionalProperties: false,
required: ["repo"],
properties: {
repo: { type: "string" },
},
},
};

async function fetchJson(url, init) {
const res = await fetch(url, init);
const text = await res.text();
return { status: res.status, body: text ? JSON.parse(text) : null };
}

async function ensureSchemaInstalled(baseUrl, stream, update) {
const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" });
if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`);

const currentSchema = current.body?.schemas?.["1"] ?? null;
const alreadyMatches =
current.body?.currentVersion === 1 &&
JSON.stringify(currentSchema) === JSON.stringify(update.schema) &&
JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) &&
JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null);

if (alreadyMatches) return;

const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(update),
});
if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`);
}

try {
await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate);

const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify([{ repo: "beta/repo" }]),
});
if (append.status !== 204) throw new Error(\`schema stream reopen append failed: \${append.status}\`);

const read = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}?offset=-1&format=json\`, {
method: "GET",
});
if (read.status !== 200) throw new Error(\`schema stream reopen read failed: \${read.status}\`);
if (JSON.stringify(read.body) !== JSON.stringify([{ repo: "alpha/repo" }, { repo: "beta/repo" }])) {
throw new Error(\`unexpected schema stream reopen read: \${JSON.stringify(read.body)}\`);
}

console.log(JSON.stringify({ ok: true, reopen: true, url: baseUrl }));
} finally {
await server.close();
}
`
);

run("bun", ["consumer-reopen.mjs"], consumerDir);
} finally {
rmSync(tmpRoot, { recursive: true, force: true });
}
127 changes: 127 additions & 0 deletions scripts/test-node-local-package.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,45 @@ const server = await startLocalDurableStreamsServer({

const baseUrl = server.exports.http.url;
const stream = "state";
const schemaStream = "schema-reopen";
const schemaUpdate = {
schema: {
type: "object",
additionalProperties: false,
required: ["repo"],
properties: {
repo: { type: "string" },
},
},
};

async function fetchJson(url, init) {
const res = await fetch(url, init);
const text = await res.text();
return { status: res.status, body: text ? JSON.parse(text) : null };
}

async function ensureSchemaInstalled(baseUrl, stream, update) {
const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" });
if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`);

const currentSchema = current.body?.schemas?.["1"] ?? null;
const alreadyMatches =
current.body?.currentVersion === 1 &&
JSON.stringify(currentSchema) === JSON.stringify(update.schema) &&
JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) &&
JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null);

if (alreadyMatches) return;

const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(update),
});
if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`);
}

try {
const serverDetails = await fetchJson(\`\${baseUrl}/v1/server/_details\`, { method: "GET" });
if (serverDetails.status !== 200) throw new Error(\`/v1/server/_details failed: \${serverDetails.status}\`);
Expand Down Expand Up @@ -144,6 +176,23 @@ try {
}
}

{
const res = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
method: "PUT",
headers: { "content-type": "application/json" },
});
if (res.status !== 201 && res.status !== 200) throw new Error(\`schema stream PUT failed: \${res.status}\`);

await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate);

const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify([{ repo: "alpha/repo" }]),
});
if (append.status !== 204) throw new Error(\`schema stream append failed: \${append.status}\`);
}

const activate = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/touch/templates/activate\`, {
method: "POST",
headers: { "content-type": "application/json" },
Expand Down Expand Up @@ -213,6 +262,84 @@ try {
);

run("node", ["consumer.mjs"], consumerDir);

writeFileSync(
join(consumerDir, "consumer-reopen.mjs"),
`
import { startLocalDurableStreamsServer } from "@prisma/streams-local";

const server = await startLocalDurableStreamsServer({
name: "${localServerName}",
port: 0,
hostname: "127.0.0.1",
});

const baseUrl = server.exports.http.url;
const schemaStream = "schema-reopen";
const schemaUpdate = {
schema: {
type: "object",
additionalProperties: false,
required: ["repo"],
properties: {
repo: { type: "string" },
},
},
};

async function fetchJson(url, init) {
const res = await fetch(url, init);
const text = await res.text();
return { status: res.status, body: text ? JSON.parse(text) : null };
}

async function ensureSchemaInstalled(baseUrl, stream, update) {
const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" });
if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`);

const currentSchema = current.body?.schemas?.["1"] ?? null;
const alreadyMatches =
current.body?.currentVersion === 1 &&
JSON.stringify(currentSchema) === JSON.stringify(update.schema) &&
JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) &&
JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null);

if (alreadyMatches) return;

const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(update),
});
if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`);
}

try {
await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate);

const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify([{ repo: "beta/repo" }]),
});
if (append.status !== 204) throw new Error(\`schema stream reopen append failed: \${append.status}\`);

const read = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}?offset=-1&format=json\`, {
method: "GET",
});
if (read.status !== 200) throw new Error(\`schema stream reopen read failed: \${read.status}\`);
if (JSON.stringify(read.body) !== JSON.stringify([{ repo: "alpha/repo" }, { repo: "beta/repo" }])) {
throw new Error(\`unexpected schema stream reopen read: \${JSON.stringify(read.body)}\`);
}

console.log(JSON.stringify({ ok: true, reopen: true, url: baseUrl }));
} finally {
await server.close();
}
`
);

run("node", ["consumer-reopen.mjs"], consumerDir);
} finally {
rmSync(tmpRoot, { recursive: true, force: true });
}
Loading
Loading