;
@@ -14,4 +13,7 @@ export interface Compiler {
compile: (ctx: Context, files: File[]) => Promise
;
}
-export type CompilerFactory
= (ctx: Context, streams: Streams) => Promise>;
+export type CompilerFactory = (
+ ctx: Context,
+ options: O
+) => Promise>;
diff --git a/packages/libs/src/io.ts b/packages/libs/src/io.ts
index e9e85d4e..75b3f73a 100644
--- a/packages/libs/src/io.ts
+++ b/packages/libs/src/io.ts
@@ -1,14 +1,21 @@
export interface Writer {
- write(data: Uint8Array): void
+ write(data: Uint8Array): void;
}
export interface Reader {
read(): Uint8Array;
- onData(handler: (data: Uint8Array) => void): Disposable
}
export interface Streams {
- in: Reader
- out: Writer
- err: Writer
+ in: Reader;
+ out: Writer;
+ err: Writer;
}
+
+export type ReadableStreamOfBytes = ReadableStream;
+
+export type WritableStreamOfBytes = WritableStream;
+
+export type BytesStreamWriter = WritableStreamDefaultWriter<
+ Uint8Array
+>;
diff --git a/packages/libs/src/sync/io.ts b/packages/libs/src/sync/io.ts
index dee26b82..34ba12d3 100644
--- a/packages/libs/src/sync/io.ts
+++ b/packages/libs/src/sync/io.ts
@@ -1,60 +1,55 @@
-import type { Streams, Writer } from "../io.js";
+import type { ReadableStreamOfBytes, Streams, Writer } from "libs/io";
+import { makeErrorWriter } from "libs/logger";
+import { noop } from "libs/function";
import type { SharedQueue } from "./shared-queue.js";
-export enum StreamType {
- Out = 1,
- Err = 2,
+export function readFromQueue(inputQueue: SharedQueue) {
+ const input = new ReadableStream({
+ pull(controller) {
+ const r = inputQueue.blockingRead();
+ const bytes = r.next().value.bytes;
+ // NOTE: The generator must be exhausted
+ r.next();
+ controller.enqueue(bytes);
+ },
+ });
+ return input;
+}
+
+export function writeToQueue(input: ReadableStreamOfBytes, queue: SharedQueue) {
+ const w = new WritableStream({
+ write(bytes) {
+ queue.pushBytes(bytes);
+ queue.commit();
+ },
+ });
+ const controller = new AbortController();
+ input
+ .pipeTo(w, { signal: controller.signal, preventCancel: true })
+ .catch(noop);
+ return {
+ [Symbol.dispose]() {
+ controller.abort();
+ },
+ };
}
-export function createSharedStreamsClient(
+export function createStreamsClient(
inputQueue: SharedQueue,
- beforeRead: () => void,
- write: (stream: StreamType, data: Uint8Array) => void
+ writer: Writer
): Streams {
return {
in: {
read() {
- beforeRead()
const r = inputQueue.blockingRead();
const bytes = r.next().value.bytes;
// NOTE: The generator must be exhausted
r.next();
return bytes;
},
- onData() {
- throw new Error("Not implemented");
- },
- },
- out: {
- write(data) {
- write(StreamType.Out, data);
- },
- },
- err: {
- write(data) {
- write(StreamType.Err, data);
- },
- },
- };
-}
-
-export function createSharedStreamsServer(
- inputQueue: SharedQueue,
- streams: Streams
-) {
- const writers: Record = {
- [StreamType.Out]: streams.out,
- [StreamType.Err]: streams.err,
- };
- return {
- onClientWrite(type: StreamType, data: Uint8Array) {
- writers[type].write(data);
- },
- write(size: number) {
- const bytes = streams.in.read()
- inputQueue.pushBytes(bytes.subarray(bytes.length - size));
- inputQueue.commit();
},
+ out: writer,
+ err: makeErrorWriter(writer),
};
}
diff --git a/packages/libs/src/testing/actor.ts b/packages/libs/src/testing/actor.ts
index b509d198..445ed381 100644
--- a/packages/libs/src/testing/actor.ts
+++ b/packages/libs/src/testing/actor.ts
@@ -4,11 +4,11 @@ import {
createRecoverableContext,
withCancel,
type Context,
-} from "../context.js";
-import { BACKSPACE, createLogger } from "../logger.js";
-import { stringifyError } from "../error.js";
-import { compileJsModule } from "../js.js";
-import type { Streams } from "../io.js";
+} from "libs/context";
+import { createLogger } from "libs/logger";
+import { stringifyError } from "libs/error";
+import { compileJsModule } from "libs/js";
+import type { Streams } from "libs/io";
import {
Actor,
MessageType,
@@ -18,14 +18,10 @@ import {
type EventMessage,
type IncomingMessage,
type OutgoingMessage,
-} from "../actor/index.js";
-import {
- createSharedStreamsClient,
- createSharedStreamsServer,
- SharedQueue,
- StreamType
-} from '../sync/index.js';
-import type { File } from "../compiler/index.js";
+} from "libs/actor";
+import { writeToQueue, SharedQueue, createStreamsClient } from "libs/sync";
+import type { File } from "libs/compiler";
+import type { RemoteCompilerFactoryOptions } from 'libs/compiler/actor';
import type { TestProgram, TestCompiler } from "./testing.js";
@@ -36,29 +32,27 @@ export interface InitConfig {
interface Handlers {
[key: string]: any;
- initialize(config: InitConfig): Promise;
- destroy(): void;
+ initialize (config: InitConfig): Promise;
+ destroy (): void;
- compile(files: File[]): Promise;
- stopCompile(): void;
+ compile (files: File[]): Promise;
+ stopCompile (): void;
- test(data: I): Promise;
- stopTest(): void;
+ test (data: I): Promise;
+ stopTest (): void;
}
type Incoming = IncomingMessage>;
-interface ReadEventMessage extends EventMessage<"read", undefined> {}
-
-interface WriteEventMessage extends EventMessage<"write", { type: StreamType, data: Uint8Array }> {}
+interface WriteEventMessage extends EventMessage<"write", Uint8Array> { }
-type TestingActorEvent = WriteEventMessage | ReadEventMessage;
+type TestingActorEvent = WriteEventMessage;
type Outgoing =
| OutgoingMessage, string>
| TestingActorEvent;
-async function evalEntity(functionStr: string) {
+async function evalEntity (functionStr: string) {
const moduleStr = `export default ${functionStr}`;
const mod = await compileJsModule<{ default: T }>(moduleStr);
return mod.default;
@@ -75,65 +69,60 @@ export type TestCompilerSuperFactory = (
universalFactory: UniversalFactory
) => Promise>;
-class TestCompilerActor extends Actor, string> implements Disposable {
+class TestCompilerActor
+ extends Actor, string>
+ implements Disposable {
protected compiler: TestCompiler | null = null;
protected compilerCtx = createRecoverableContext(() => {
- this.compiler = null
- return withCancel(createContext())
- })
- protected program: TestProgram | null = null
+ this.compiler = null;
+ return withCancel(createContext());
+ });
+ protected program: TestProgram | null = null;
protected programCtx = createRecoverableContext(() => {
- this.program = null
- return withCancel(this.compilerCtx.ref)
- })
- protected runCtx = createRecoverableContext(() => withCancel(this.programCtx.ref))
+ this.program = null;
+ return withCancel(this.compilerCtx.ref);
+ });
+ protected runCtx = createRecoverableContext(() =>
+ withCancel(this.programCtx.ref)
+ );
constructor(
connection: Connection, Outgoing>,
superFactory: TestCompilerSuperFactory
) {
const handlers: Handlers = {
- initialize: async({ universalFactoryFunction, buffer }) => {
+ initialize: async ({ universalFactoryFunction, buffer }) => {
const universalFactory = await evalEntity>(
universalFactoryFunction
);
- const sharedQueue = new SharedQueue(buffer)
- const client = createSharedStreamsClient(
- sharedQueue,
- () => connection.send({
- type: MessageType.Event,
- event: "read",
- payload: undefined,
- }),
- (type, data) => connection.send({
- type: MessageType.Event,
- event: "write",
- payload: {
- type,
- data
- }
- })
- )
this.compiler = await superFactory(
this.compilerCtx.ref,
- client,
+ createStreamsClient(
+ new SharedQueue(buffer),
+ {
+ write (data) {
+ connection.send({
+ type: MessageType.Event,
+ event: "write",
+ payload: data,
+ });
+ },
+ }
+ ),
universalFactory
);
},
destroy: () => {
- this.compilerCtx.cancel()
+ this.compilerCtx.cancel();
},
compile: async (files) => {
if (this.compiler === null) {
throw new Error("Test runner not initialized");
}
- this.program = await this.compiler.compile(
- this.programCtx.ref,
- files
- );
+ this.program = await this.compiler.compile(this.programCtx.ref, files);
},
stopCompile: () => {
- this.programCtx.cancel()
+ this.programCtx.cancel();
},
test: async (input) => {
if (this.program === null) {
@@ -148,11 +137,11 @@ class TestCompilerActor extends Actor, string> implement
try {
return await this.program.run(this.runCtx.ref, input);
} finally {
- this.runCtx.cancel()
+ this.runCtx.cancel();
}
},
stopTest: () => {
- this.runCtx.cancel()
+ this.runCtx.cancel();
},
};
super(connection, handlers, stringifyError);
@@ -160,14 +149,14 @@ class TestCompilerActor extends Actor, string> implement
[Symbol.dispose] (): void {
this.compiler = null;
- this.compilerCtx[Symbol.dispose]()
+ this.compilerCtx[Symbol.dispose]();
this.program = null;
- this.programCtx[Symbol.dispose]()
- this.runCtx[Symbol.dispose]()
+ this.programCtx[Symbol.dispose]();
+ this.runCtx[Symbol.dispose]();
}
}
-export function startTestCompilerActor(
+export function startTestCompilerActor (
ctx: Context,
superFactory: TestCompilerSuperFactory
) {
@@ -177,84 +166,66 @@ export function startTestCompilerActor(
connection.start(ctx);
const actor = new TestCompilerActor(connection, superFactory);
ctx.onCancel(() => {
- actor[Symbol.dispose]()
- })
+ actor[Symbol.dispose]();
+ });
actor.start(ctx);
}
interface WorkerConstructor {
- new (): Worker;
+ new(): Worker;
}
-export function makeRemoteTestCompilerFactory(
+export function makeRemoteTestCompilerFactory (
Worker: WorkerConstructor,
universalFactory: UniversalFactory
) {
- return async (ctx: Context, streams: Streams): Promise> => {
+ return async (
+ ctx: Context,
+ { input, output }: RemoteCompilerFactoryOptions
+ ): Promise> => {
const worker = new Worker();
- let read = -1
- const sub = streams.in.onData((data) => {
- if (read === -1) {
- return
- }
- if (data.length === 0) {
- server.write(read)
- // EOF
- server.write(0)
- read = -1
- }
- if (data === BACKSPACE) {
- // TODO: What to do with non-ASCII characters?
- read -= 1;
- } else {
- read += data.length
- }
- })
ctx.onCancel(() => {
- sub[Symbol.dispose]()
- worker.terminate()
- })
+ worker.terminate();
+ });
const connection = new WorkerConnection, Incoming>(
worker
);
connection.start(ctx);
- const log = createLogger(streams.out);
- const Buffer = window.SharedArrayBuffer
- ? SharedArrayBuffer
- : ArrayBuffer;
- const buffer = new Buffer(1024 * 1024 * 10)
- const server = createSharedStreamsServer(new SharedQueue(buffer), streams)
+ const log = createLogger(output);
+ const Buffer = window.SharedArrayBuffer ? SharedArrayBuffer : ArrayBuffer;
+ const buffer = new Buffer(1024 * 1024 * 10);
+ const sharedWriter = writeToQueue(input, new SharedQueue(buffer));
+ ctx.onCancel(() => {
+ sharedWriter[Symbol.dispose]();
+ });
const remote = startRemote, string, TestingActorEvent>(
ctx,
log,
connection,
{
- read() {
- read = 0;
+ write (data) {
+ output.write(data);
},
- write({ type, data }) {
- server.onClientWrite(type, data)
- },
- error(err) {
+ error (err) {
log.error(err instanceof CanceledError ? err.message : err);
},
}
);
- using _ = ctx.onCancel(() => remote.destroy())
+ using _ = ctx.onCancel(() => remote.destroy());
await remote.initialize({
universalFactoryFunction: universalFactory.toString(),
- buffer
+ buffer,
});
return {
- async compile(ctx, files) {
- using _ = ctx.onCancel(() => remote.stopCompile())
+ async compile (ctx, files) {
+ using _ = ctx.onCancel(() => remote.stopCompile());
await remote.compile(files);
return {
- async run(ctx, input) {
- using _ = ctx.onCancel(() => remote.stopTest())
+ async run (ctx, input) {
+ using _ = ctx.onCancel(() => remote.stopTest());
return await remote.test(input);
- }
- }
+ },
+ };
},
};
};
diff --git a/packages/libs/src/testing/testing.ts b/packages/libs/src/testing/testing.ts
index 09d64b4d..e121a737 100644
--- a/packages/libs/src/testing/testing.ts
+++ b/packages/libs/src/testing/testing.ts
@@ -1,7 +1,7 @@
-import type { Logger } from "../logger.js";
-import type { Context } from "../context.js";
-import type { Compiler, CompilerFactory } from "../compiler/index.js";
-import { isDeepEqual } from '../deep-equal.js';
+import type { Logger } from "libs/logger";
+import type { Context } from "libs/context";
+import type { Compiler, CompilerFactory } from "libs/compiler";
+import { isDeepEqual } from "libs/deep-equal";
export interface TestCase {
input: I;
@@ -21,9 +21,12 @@ export interface TestProgram {
run: (ctx: Context, input: I) => Promise;
}
-export type TestCompiler = Compiler>
+export type TestCompiler = Compiler>;
-export type TestCompilerFactory = CompilerFactory>
+export type TestCompilerFactory = CompilerFactory<
+ Options,
+ TestProgram
+>;
export async function runTests(
ctx: Context,