Skip to content

Commit ec1f13f

Browse files
authored
Merge pull request #4 from prisma/codex/schema-reopen-regression
Handle schema reopen flows in local consumers
2 parents 9251679 + 540ada9 commit ec1f13f

5 files changed

Lines changed: 332 additions & 0 deletions

File tree

docs/releasing.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ into temporary consumers, and verify:
4747

4848
- Node end-to-end usage of `@prisma/streams-local`
4949
- Bun end-to-end usage of `@prisma/streams-local`, including the live `/touch/*` path
50+
- stateful local-runtime reopen flows that must read `/_schema` and skip
51+
duplicate first-schema installs when the registry already matches
5052
- local package exposure of `GET /v1/server/_details` and `GET /v1/stream/{name}/_routing_keys`
5153
- Bun CLI startup for `@prisma/streams-server`
5254

docs/schemas.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ Important rule:
114114
- a search-only update requires an already-installed schema version
115115
- if you are installing the first schema for a stream, install `schema` and
116116
`search` together in the same `_schema` request
117+
- first-schema installation is not idempotent after data exists; stateful
118+
clients that reopen an existing stream must `GET /_schema` first and skip the
119+
install when the current registry already matches the desired schema/search
120+
configuration
117121

118122
Not supported:
119123

scripts/test-bun-local-package.mjs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,45 @@ const server = await startLocalDurableStreamsServer({
7171
7272
const baseUrl = server.exports.http.url;
7373
const stream = "state";
74+
const schemaStream = "schema-reopen";
75+
const schemaUpdate = {
76+
schema: {
77+
type: "object",
78+
additionalProperties: false,
79+
required: ["repo"],
80+
properties: {
81+
repo: { type: "string" },
82+
},
83+
},
84+
};
7485
7586
async function fetchJson(url, init) {
7687
const res = await fetch(url, init);
7788
const text = await res.text();
7889
return { status: res.status, body: text ? JSON.parse(text) : null };
7990
}
8091
92+
async function ensureSchemaInstalled(baseUrl, stream, update) {
93+
const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" });
94+
if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`);
95+
96+
const currentSchema = current.body?.schemas?.["1"] ?? null;
97+
const alreadyMatches =
98+
current.body?.currentVersion === 1 &&
99+
JSON.stringify(currentSchema) === JSON.stringify(update.schema) &&
100+
JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) &&
101+
JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null);
102+
103+
if (alreadyMatches) return;
104+
105+
const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, {
106+
method: "POST",
107+
headers: { "content-type": "application/json" },
108+
body: JSON.stringify(update),
109+
});
110+
if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`);
111+
}
112+
81113
try {
82114
const serverDetails = await fetchJson(\`\${baseUrl}/v1/server/_details\`, { method: "GET" });
83115
if (serverDetails.status !== 200) throw new Error(\`/v1/server/_details failed: \${serverDetails.status}\`);
@@ -144,6 +176,23 @@ try {
144176
}
145177
}
146178
179+
{
180+
const res = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
181+
method: "PUT",
182+
headers: { "content-type": "application/json" },
183+
});
184+
if (res.status !== 201 && res.status !== 200) throw new Error(\`schema stream PUT failed: \${res.status}\`);
185+
186+
await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate);
187+
188+
const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
189+
method: "POST",
190+
headers: { "content-type": "application/json" },
191+
body: JSON.stringify([{ repo: "alpha/repo" }]),
192+
});
193+
if (append.status !== 204) throw new Error(\`schema stream append failed: \${append.status}\`);
194+
}
195+
147196
const activate = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/touch/templates/activate\`, {
148197
method: "POST",
149198
headers: { "content-type": "application/json" },
@@ -213,6 +262,84 @@ try {
213262
);
214263

215264
run("bun", ["consumer.mjs"], consumerDir);
265+
266+
writeFileSync(
267+
join(consumerDir, "consumer-reopen.mjs"),
268+
`
269+
import { startLocalDurableStreamsServer } from "@prisma/streams-local";
270+
271+
const server = await startLocalDurableStreamsServer({
272+
name: "${localServerName}",
273+
port: 0,
274+
hostname: "127.0.0.1",
275+
});
276+
277+
const baseUrl = server.exports.http.url;
278+
const schemaStream = "schema-reopen";
279+
const schemaUpdate = {
280+
schema: {
281+
type: "object",
282+
additionalProperties: false,
283+
required: ["repo"],
284+
properties: {
285+
repo: { type: "string" },
286+
},
287+
},
288+
};
289+
290+
async function fetchJson(url, init) {
291+
const res = await fetch(url, init);
292+
const text = await res.text();
293+
return { status: res.status, body: text ? JSON.parse(text) : null };
294+
}
295+
296+
async function ensureSchemaInstalled(baseUrl, stream, update) {
297+
const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" });
298+
if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`);
299+
300+
const currentSchema = current.body?.schemas?.["1"] ?? null;
301+
const alreadyMatches =
302+
current.body?.currentVersion === 1 &&
303+
JSON.stringify(currentSchema) === JSON.stringify(update.schema) &&
304+
JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) &&
305+
JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null);
306+
307+
if (alreadyMatches) return;
308+
309+
const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, {
310+
method: "POST",
311+
headers: { "content-type": "application/json" },
312+
body: JSON.stringify(update),
313+
});
314+
if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`);
315+
}
316+
317+
try {
318+
await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate);
319+
320+
const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
321+
method: "POST",
322+
headers: { "content-type": "application/json" },
323+
body: JSON.stringify([{ repo: "beta/repo" }]),
324+
});
325+
if (append.status !== 204) throw new Error(\`schema stream reopen append failed: \${append.status}\`);
326+
327+
const read = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}?offset=-1&format=json\`, {
328+
method: "GET",
329+
});
330+
if (read.status !== 200) throw new Error(\`schema stream reopen read failed: \${read.status}\`);
331+
if (JSON.stringify(read.body) !== JSON.stringify([{ repo: "alpha/repo" }, { repo: "beta/repo" }])) {
332+
throw new Error(\`unexpected schema stream reopen read: \${JSON.stringify(read.body)}\`);
333+
}
334+
335+
console.log(JSON.stringify({ ok: true, reopen: true, url: baseUrl }));
336+
} finally {
337+
await server.close();
338+
}
339+
`
340+
);
341+
342+
run("bun", ["consumer-reopen.mjs"], consumerDir);
216343
} finally {
217344
rmSync(tmpRoot, { recursive: true, force: true });
218345
}

scripts/test-node-local-package.mjs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,45 @@ const server = await startLocalDurableStreamsServer({
7171
7272
const baseUrl = server.exports.http.url;
7373
const stream = "state";
74+
const schemaStream = "schema-reopen";
75+
const schemaUpdate = {
76+
schema: {
77+
type: "object",
78+
additionalProperties: false,
79+
required: ["repo"],
80+
properties: {
81+
repo: { type: "string" },
82+
},
83+
},
84+
};
7485
7586
async function fetchJson(url, init) {
7687
const res = await fetch(url, init);
7788
const text = await res.text();
7889
return { status: res.status, body: text ? JSON.parse(text) : null };
7990
}
8091
92+
async function ensureSchemaInstalled(baseUrl, stream, update) {
93+
const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" });
94+
if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`);
95+
96+
const currentSchema = current.body?.schemas?.["1"] ?? null;
97+
const alreadyMatches =
98+
current.body?.currentVersion === 1 &&
99+
JSON.stringify(currentSchema) === JSON.stringify(update.schema) &&
100+
JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) &&
101+
JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null);
102+
103+
if (alreadyMatches) return;
104+
105+
const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, {
106+
method: "POST",
107+
headers: { "content-type": "application/json" },
108+
body: JSON.stringify(update),
109+
});
110+
if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`);
111+
}
112+
81113
try {
82114
const serverDetails = await fetchJson(\`\${baseUrl}/v1/server/_details\`, { method: "GET" });
83115
if (serverDetails.status !== 200) throw new Error(\`/v1/server/_details failed: \${serverDetails.status}\`);
@@ -144,6 +176,23 @@ try {
144176
}
145177
}
146178
179+
{
180+
const res = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
181+
method: "PUT",
182+
headers: { "content-type": "application/json" },
183+
});
184+
if (res.status !== 201 && res.status !== 200) throw new Error(\`schema stream PUT failed: \${res.status}\`);
185+
186+
await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate);
187+
188+
const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
189+
method: "POST",
190+
headers: { "content-type": "application/json" },
191+
body: JSON.stringify([{ repo: "alpha/repo" }]),
192+
});
193+
if (append.status !== 204) throw new Error(\`schema stream append failed: \${append.status}\`);
194+
}
195+
147196
const activate = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/touch/templates/activate\`, {
148197
method: "POST",
149198
headers: { "content-type": "application/json" },
@@ -213,6 +262,84 @@ try {
213262
);
214263

215264
run("node", ["consumer.mjs"], consumerDir);
265+
266+
writeFileSync(
267+
join(consumerDir, "consumer-reopen.mjs"),
268+
`
269+
import { startLocalDurableStreamsServer } from "@prisma/streams-local";
270+
271+
const server = await startLocalDurableStreamsServer({
272+
name: "${localServerName}",
273+
port: 0,
274+
hostname: "127.0.0.1",
275+
});
276+
277+
const baseUrl = server.exports.http.url;
278+
const schemaStream = "schema-reopen";
279+
const schemaUpdate = {
280+
schema: {
281+
type: "object",
282+
additionalProperties: false,
283+
required: ["repo"],
284+
properties: {
285+
repo: { type: "string" },
286+
},
287+
},
288+
};
289+
290+
async function fetchJson(url, init) {
291+
const res = await fetch(url, init);
292+
const text = await res.text();
293+
return { status: res.status, body: text ? JSON.parse(text) : null };
294+
}
295+
296+
async function ensureSchemaInstalled(baseUrl, stream, update) {
297+
const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" });
298+
if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`);
299+
300+
const currentSchema = current.body?.schemas?.["1"] ?? null;
301+
const alreadyMatches =
302+
current.body?.currentVersion === 1 &&
303+
JSON.stringify(currentSchema) === JSON.stringify(update.schema) &&
304+
JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) &&
305+
JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null);
306+
307+
if (alreadyMatches) return;
308+
309+
const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, {
310+
method: "POST",
311+
headers: { "content-type": "application/json" },
312+
body: JSON.stringify(update),
313+
});
314+
if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`);
315+
}
316+
317+
try {
318+
await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate);
319+
320+
const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, {
321+
method: "POST",
322+
headers: { "content-type": "application/json" },
323+
body: JSON.stringify([{ repo: "beta/repo" }]),
324+
});
325+
if (append.status !== 204) throw new Error(\`schema stream reopen append failed: \${append.status}\`);
326+
327+
const read = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}?offset=-1&format=json\`, {
328+
method: "GET",
329+
});
330+
if (read.status !== 200) throw new Error(\`schema stream reopen read failed: \${read.status}\`);
331+
if (JSON.stringify(read.body) !== JSON.stringify([{ repo: "alpha/repo" }, { repo: "beta/repo" }])) {
332+
throw new Error(\`unexpected schema stream reopen read: \${JSON.stringify(read.body)}\`);
333+
}
334+
335+
console.log(JSON.stringify({ ok: true, reopen: true, url: baseUrl }));
336+
} finally {
337+
await server.close();
338+
}
339+
`
340+
);
341+
342+
run("node", ["consumer-reopen.mjs"], consumerDir);
216343
} finally {
217344
rmSync(tmpRoot, { recursive: true, force: true });
218345
}

0 commit comments

Comments
 (0)