Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ dist/cjs
.rdf-test-suite-cache

test/**/*.d.ts
test/**/*.js.map
test/backends/*.js
test/browser/index.js
test/browser/index.bundle.js
Expand Down
35 changes: 27 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "quadstore",
"version": "15.3.0",
"version": "15.4.0-beta.0",
"description": "Quadstore is a LevelDB-backed RDF graph database / triplestore for JavaScript runtimes (browsers, Node.js, Deno, Bun, ...) that implements the RDF/JS interfaces and supports SPARQL queries and querying across named graphs.",
"keywords": [
"node",
Expand Down Expand Up @@ -47,6 +47,7 @@
"@types/chai": "^5.2.2",
"@types/mocha": "^10.0.10",
"@types/n3": "^1.26.0",
"@types/node": "^24.1.0",
"@types/node-static": "^0.7.11",
"browser-level": "^3.0.0",
"chai": "^5.2.0",
Expand Down
17 changes: 9 additions & 8 deletions src/quadstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import type {
Pattern,
StoreOpts,
VoidResult,
TSReadable,
StreamLike,
TermName,
Prefixes,
QuadArrayResultWithInternals,
Expand All @@ -36,6 +36,7 @@ import type {
} from 'abstract-level';
import { EventEmitter } from 'events';
import {
AsyncIterator,
EmptyIterator,
wrap,
} from 'asynciterator';
Expand Down Expand Up @@ -120,7 +121,7 @@ export class Quadstore implements Store {
await this.db.clear();
}

match(subject?: Quad_Subject, predicate?: Quad_Predicate, object?: Quad_Object, graph?: Quad_Graph, opts: GetOpts = emptyObject): Stream<Quad> {
match(subject?: Quad_Subject, predicate?: Quad_Predicate, object?: Quad_Object, graph?: Quad_Graph, opts: GetOpts = emptyObject): AsyncIterator<Quad> {
// This is required due to the fact that Comunica may invoke the `.match()`
// method in generalized RDF mode, under which the subject may be a literal
// term.
Expand All @@ -145,17 +146,17 @@ export class Quadstore implements Store {
return results.approximateSize;
}

import(source: Stream<Quad>): EventEmitter {
import(source: StreamLike<Quad>): EventEmitter {
const emitter = new EventEmitter();
this.putStream(<TSReadable<Quad>>source, {})
this.putStream(source, {})
.then(() => { emitter.emit('end'); })
.catch((err) => { emitter.emit('error', err); });
return emitter;
}

remove(source: Stream<Quad>): EventEmitter {
remove(source: StreamLike<Quad>): EventEmitter {
const emitter = new EventEmitter();
this.delStream(<TSReadable<Quad>>source, {})
this.delStream(source, {})
.then(() => emitter.emit('end'))
.catch((err) => emitter.emit('error', err));
return emitter;
Expand Down Expand Up @@ -294,7 +295,7 @@ export class Quadstore implements Store {
return await getStream(this, pattern, opts);
}

async putStream(source: TSReadable<Quad>, opts: PutStreamOpts = emptyObject): Promise<VoidResult> {
async putStream(source: StreamLike<Quad>, opts: PutStreamOpts = emptyObject): Promise<VoidResult> {
this.ensureReady();
const batchSize = opts.batchSize || 1;
if (batchSize === 1) {
Expand All @@ -305,7 +306,7 @@ export class Quadstore implements Store {
return { type: ResultType.VOID };
}

async delStream(source: TSReadable<Quad>, opts: DelStreamOpts = emptyObject): Promise<VoidResult> {
async delStream(source: StreamLike<Quad>, opts: DelStreamOpts = emptyObject): Promise<VoidResult> {
this.ensureReady();
const batchSize = opts.batchSize || 1;
if (batchSize === 1) {
Expand Down
14 changes: 11 additions & 3 deletions src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

import type { Readable } from 'stream';
import type { AbstractChainedBatch, AbstractLevel } from 'abstract-level'
import type { AsyncIterator } from 'asynciterator';
import type { Literal, DataFactory, Quad_Subject, Quad_Predicate, Quad_Object, Quad_Graph, Quad, Term } from '@rdfjs/types';
import type { Scope } from '../scope/index.js';
import type { AbstractIteratorOptions } from 'abstract-level';
import type { EventEmitter } from 'events';

export interface BatchOpts {
/**
Expand All @@ -27,8 +27,6 @@ export interface PatchOpts extends BatchOpts {

export type TermName = 'subject' | 'predicate' | 'object' | 'graph';

export type TSReadable<T> = Readable | AsyncIterator<T>;

export enum ResultType {
VOID = 'void',
QUADS = 'quads',
Expand Down Expand Up @@ -151,3 +149,13 @@ export type TermWriter<T extends Term, E extends 'T' | 'F'> = E extends 'T'
? { write(node: T, serialized: SerializedTerm, prefixes: Prefixes, rangeMode: boolean, encodedValue: string): void }
: { write(node: T, serialized: SerializedTerm, prefixes: Prefixes): void }
;

export interface StreamLike<T = any> extends EventEmitter {
read(): T | null;
destroy?: () => void;
readable?: boolean;
on(event: 'readable', listener: () => void): this;
on(event: 'end', listener: () => void): this;
on(event: 'error', listener: (error: Error) => void): this;
on(event: 'data', listener: (item: T) => void): this;
}
24 changes: 11 additions & 13 deletions src/utils/consumeinbatches.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

import { TSReadable } from '../types/index.js';
import { StreamLike } from '../types/index.js';

export const consumeInBatches = async <T>(readable: TSReadable<T>, batchSize: number, onEachBatch: (items: T[]) => any): Promise<void> => {
export const consumeInBatches = async <T>(source: StreamLike<T>, batchSize: number, onEachBatch: (items: T[]) => any): Promise<void> => {
return new Promise((resolve, reject) => {
let bufpos = 0;
let looping = false;
Expand Down Expand Up @@ -39,7 +39,7 @@ export const consumeInBatches = async <T>(readable: TSReadable<T>, batchSize: nu
flushAndResolve();
return;
}
while (bufpos < batchSize && (item = readable.read()) !== null) {
while (bufpos < batchSize && (item = source.read()) !== null) {
buffer[bufpos++] = item;
}
if (item === null) {
Expand All @@ -54,17 +54,15 @@ export const consumeInBatches = async <T>(readable: TSReadable<T>, batchSize: nu
}
};
const cleanup = () => {
readable.removeListener('end', onEnd);
readable.removeListener('error', onError);
readable.removeListener('readable', onReadable);
if (typeof readable.destroy === 'function') {
readable.destroy();
}
source.removeListener('end', onEnd);
source.removeListener('error', onError);
source.removeListener('readable', onReadable);
source.destroy?.();
};
readable.on('end', onEnd);
readable.on('error', onError);
readable.on('readable', onReadable);
if (readable.readable !== false) {
source.on('end', onEnd);
source.on('error', onError);
source.on('readable', onReadable);
if ('readable' in source && source.readable) {
loop();
}
});
Expand Down
24 changes: 11 additions & 13 deletions src/utils/consumeonebyone.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@

import { TSReadable } from '../types/index.js';
import { StreamLike } from '../types/index.js';

export const consumeOneByOne = async <T>(iterator: TSReadable<T>, onEachItem: (item: T) => any) => {
export const consumeOneByOne = async <T>(source: StreamLike<T>, onEachItem: (item: T) => any) => {
return new Promise<void>((resolve, reject) => {
let item;
let ended = false;
let looping = false;
const loop = () => {
looping = true;
if ((item = iterator.read()) !== null) {
if ((item = source.read()) !== null) {
Promise.resolve(onEachItem(item))
.then(loop)
.catch(onError);
Expand Down Expand Up @@ -36,18 +36,16 @@ export const consumeOneByOne = async <T>(iterator: TSReadable<T>, onEachItem: (i
}
};
const cleanup = () => {
iterator.removeListener('end', onEnd);
iterator.removeListener('error', onError);
iterator.removeListener('readable', onReadable);
if (typeof iterator.destroy === 'function') {
iterator.destroy();
}
source.removeListener('end', onEnd);
source.removeListener('error', onError);
source.removeListener('readable', onReadable);
source.destroy?.();
};
iterator.on('end', onEnd);
iterator.on('error', onError);
iterator.on('readable', onReadable);
source.on('end', onEnd);
source.on('error', onError);
source.on('readable', onReadable);
// readable might be undefined in older versions of userland readable-stream
if (iterator.readable !== false) {
if ('readable' in source && source.readable) {
loop();
}
});
Expand Down
16 changes: 8 additions & 8 deletions src/utils/stuff.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

import type { EventEmitter } from 'events';
import type { AbstractLevel } from 'abstract-level';
import type { TSReadable, TermName } from '../types/index.js';
import type { TermName, StreamLike } from '../types/index.js';

export const isObject = (o: any): boolean => {
return typeof(o) === 'object' && o !== null;
Expand All @@ -20,16 +20,16 @@ export const ensureAbstractLevel = (o: any, key: string) => {
}
};

export const streamToArray = <T>(readStream: TSReadable<T>): Promise<T[]> => {
export const streamToArray = <T>(source: StreamLike<T>): Promise<T[]> => {
return new Promise((resolve, reject) => {
const chunks: T[] = [];
const onData = (chunk: T) => {
chunks.push(chunk);
};
const cleanup = () => {
readStream.removeListener('data', onData);
readStream.removeListener('error', onError);
readStream.destroy();
source.removeListener('data', onData);
source.removeListener('error', onError);
source.destroy?.();
};
const onEnd = () => {
cleanup();
Expand All @@ -39,9 +39,9 @@ export const streamToArray = <T>(readStream: TSReadable<T>): Promise<T[]> => {
cleanup();
reject(err);
};
readStream.on('error', onError);
readStream.on('end', onEnd);
readStream.on('data', onData);
source.on('error', onError);
source.on('end', onEnd);
source.on('data', onData);
});
}

Expand Down
2 changes: 2 additions & 0 deletions test/browser/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import { runMemoryLevelTests } from '../backends/memorylevel.js';
import { runBrowserLevelTests } from '../backends/browserlevel.js';
import { runOtherTests } from '../others/others.js';
import { runTypingsTests } from '../others/typings.js';

runOtherTests();
runTypingsTests(false);
runMemoryLevelTests();
runBrowserLevelTests();
6 changes: 5 additions & 1 deletion test/browser/webpack.config.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ module.exports = {
usedExports: true,
concatenateModules: false
},
resolve: {},
resolve: {
fallback: {
stream: false,
},
},
plugins: []
};
3 changes: 2 additions & 1 deletion test/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import { runMemoryLevelTests } from './backends/memorylevel.js';
import { runClassicLevelTests } from './backends/classiclevel.js';
import { runOtherTests } from './others/others.js';
import { runTypingsTests } from './others/typings.js';

runOtherTests();
runTypingsTests(true);
runMemoryLevelTests();
runClassicLevelTests();

Loading