Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 93 additions & 72 deletions PeerCouchDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@ import { decodeBinary } from "./lib/src/string_and_binary/convert.ts";
import { isPlainText } from "./lib/src/string_and_binary/path.ts";
import { DispatchFun, Peer } from "./Peer.ts";
import { createBinaryBlob, createTextBlob, isDocContentSame, unique } from "./lib/src/common/utils.ts";
import { PouchDB } from "./lib/src/pouchdb/pouchdb-http.ts";
import { promiseWithResolver } from "octagonal-wheels/promises";

// export class PeerInstance()

export class PeerCouchDB extends Peer {
man: DirectFileManipulator;
declare config: PeerCouchDBConf;
private _started = promiseWithResolver<void>();
constructor(conf: PeerCouchDBConf, dispatcher: DispatchFun) {
super(conf, dispatcher);
this.man = new DirectFileManipulator(conf);
// Use Deno's native fetch to bypass node:http shim issues with Traefik/long-polling
this.man.$$createPouchDBInstance = <T extends object>(): PouchDB.Database<T> => {
return new PouchDB(this.man.options.url + "/" + this.man.options.database, {
auth: { username: this.man.options.username, password: this.man.options.password },
fetch: (url: string | Request, opts?: RequestInit) => globalThis.fetch(url, opts),
}) as PouchDB.Database<T>;
};
// Fetch remote since.
this.man.since = this.getSetting("since") || "now";
}
async delete(pathSrc: string): Promise<boolean> {
await this._started.promise;
const path = this.toLocalPath(pathSrc);
if (await this.isRepeating(pathSrc, false)) {
return false;
Expand All @@ -31,6 +42,7 @@ export class PeerCouchDB extends Peer {
return r;
}
async put(pathSrc: string, data: FileData): Promise<boolean> {
await this._started.promise;
const path = this.toLocalPath(pathSrc);
if (await this.isRepeating(pathSrc, data)) {
return false;
Expand Down Expand Up @@ -63,6 +75,7 @@ export class PeerCouchDB extends Peer {
return r;
}
async get(pathSrc: FilePathWithPrefix): Promise<false | FileData> {
await this._started.promise;
const path = this.toLocalPath(pathSrc) as FilePathWithPrefix;
const ret = await this.man.get(path) as false | ReadyEntry;
if (ret === false) {
Expand All @@ -77,6 +90,7 @@ export class PeerCouchDB extends Peer {
};
}
async getMeta(pathSrc: FilePathWithPrefix): Promise<false | FileData> {
await this._started.promise;
const path = this.toLocalPath(pathSrc) as FilePathWithPrefix;
const ret = await this.man.get(path, true) as false | MetaEntry;
if (ret === false) {
Expand All @@ -91,84 +105,91 @@ export class PeerCouchDB extends Peer {
};
}
async start(): Promise<void> {
const baseDir = this.toLocalPath("");
await this.man.ready.promise;
const w = await this.man.rawGet<Record<string, any>>(MILESTONE_DOCID);
if (w && "tweak_values" in w) {
if (this.config.useRemoteTweaks) {
const tweaks = Object.values(w["tweak_values"])[0] as TweakValues;
// console.log(tweaks)
const orgConf = { ...this.config } as Record<string, any>;
this.config.customChunkSize = tweaks.customChunkSize ?? this.config.customChunkSize;
this.config.minimumChunkSize = tweaks.minimumChunkSize ?? this.config.minimumChunkSize;
if (tweaks.encrypt && !this.config.passphrase) {
throw new Error("Remote database is encrypted but no passphrase provided.");
}
if (tweaks.usePathObfuscation && !this.config.obfuscatePassphrase) {
throw new Error("Remote database is obfuscated but no obfuscate passphrase provided.");
}
this.config.hashAlg = tweaks.hashAlg ?? this.config.hashAlg;
this.config.maxAgeInEden = tweaks.maxAgeInEden ?? this.config.maxAgeInEden;
this.config.maxTotalLengthInEden = tweaks.maxTotalLengthInEden ?? this.config.maxTotalLengthInEden;
this.config.maxChunksInEden = tweaks.maxChunksInEden ?? this.config.maxChunksInEden;
this.config.useEden = tweaks.useEden ?? this.config.useEden;
if (!this.config.enableCompression != !tweaks.enableCompression) {
throw new Error("Compression setting mismatched.");
}
this.config.useDynamicIterationCount = tweaks.useDynamicIterationCount ?? this.config.useDynamicIterationCount;
this.config.enableChunkSplitterV2 = tweaks.enableChunkSplitterV2 ?? this.config.enableChunkSplitterV2;
this.config.chunkSplitterVersion = tweaks.chunkSplitterVersion ?? this.config.chunkSplitterVersion;
this.config.E2EEAlgorithm = tweaks.E2EEAlgorithm ?? this.config.E2EEAlgorithm;
this.config.minimumChunkSize = tweaks.minimumChunkSize ?? this.config.minimumChunkSize;
this.config.customChunkSize = tweaks.customChunkSize ?? this.config.customChunkSize;
this.config.doNotUseFixedRevisionForChunks = tweaks.doNotUseFixedRevisionForChunks ?? this.config.doNotUseFixedRevisionForChunks;
this.config.handleFilenameCaseSensitive = tweaks.handleFilenameCaseSensitive ?? this.config.handleFilenameCaseSensitive;
const newConf = { ...this.config } as Record<string, any>;
this.man.options = this.config;
await this.man.liveSyncLocalDB.initializeDatabase()
// await this.man.managers.initManagers();
const diff = unique([...Object.keys(orgConf), ...Object.keys(tweaks)]).filter(k => orgConf[k] != newConf[k]);
if (diff.length > 0) {
this.normalLog(`Remote tweaks changed --->`);
for (const diffKey of diff) {
this.normalLog(`${diffKey}\t: ${orgConf[diffKey]} \t : ${newConf[diffKey]}`);
try {
const baseDir = this.toLocalPath("");
await this.man.ready.promise;
const w = await this.man.rawGet<Record<string, any>>(MILESTONE_DOCID);
if (w && "tweak_values" in w) {
if (this.config.useRemoteTweaks) {
const tweaks = Object.values(w["tweak_values"])[0] as TweakValues;
// console.log(tweaks)
const orgConf = { ...this.config } as Record<string, any>;
this.config.customChunkSize = tweaks.customChunkSize ?? this.config.customChunkSize;
this.config.minimumChunkSize = tweaks.minimumChunkSize ?? this.config.minimumChunkSize;
if (tweaks.encrypt && !this.config.passphrase) {
throw new Error("Remote database is encrypted but no passphrase provided.");
}
if (tweaks.usePathObfuscation && !this.config.obfuscatePassphrase) {
throw new Error("Remote database is obfuscated but no obfuscate passphrase provided.");
}
this.config.hashAlg = tweaks.hashAlg ?? this.config.hashAlg;
this.config.maxAgeInEden = tweaks.maxAgeInEden ?? this.config.maxAgeInEden;
this.config.maxTotalLengthInEden = tweaks.maxTotalLengthInEden ?? this.config.maxTotalLengthInEden;
this.config.maxChunksInEden = tweaks.maxChunksInEden ?? this.config.maxChunksInEden;
this.config.useEden = tweaks.useEden ?? this.config.useEden;
if (!this.config.enableCompression != !tweaks.enableCompression) {
throw new Error("Compression setting mismatched.");
}
this.config.useDynamicIterationCount = tweaks.useDynamicIterationCount ?? this.config.useDynamicIterationCount;
this.config.enableChunkSplitterV2 = tweaks.enableChunkSplitterV2 ?? this.config.enableChunkSplitterV2;
this.config.chunkSplitterVersion = tweaks.chunkSplitterVersion ?? this.config.chunkSplitterVersion;
this.config.E2EEAlgorithm = tweaks.E2EEAlgorithm ?? this.config.E2EEAlgorithm;
this.config.minimumChunkSize = tweaks.minimumChunkSize ?? this.config.minimumChunkSize;
this.config.customChunkSize = tweaks.customChunkSize ?? this.config.customChunkSize;
this.config.doNotUseFixedRevisionForChunks = tweaks.doNotUseFixedRevisionForChunks ?? this.config.doNotUseFixedRevisionForChunks;
this.config.handleFilenameCaseSensitive = tweaks.handleFilenameCaseSensitive ?? this.config.handleFilenameCaseSensitive;
const newConf = { ...this.config } as Record<string, any>;
this.man.options = this.config;
await this.man.liveSyncLocalDB.initializeDatabase()
// await this.man.managers.initManagers();
const diff = unique([...Object.keys(orgConf), ...Object.keys(tweaks)]).filter(k => orgConf[k] != newConf[k]);
if (diff.length > 0) {
this.normalLog(`Remote tweaks changed --->`);
for (const diffKey of diff) {
this.normalLog(`${diffKey}\t: ${orgConf[diffKey]} \t : ${newConf[diffKey]}`);
}
this.normalLog(`<--- Remote tweaks changed`);
}
this.normalLog(`<--- Remote tweaks changed`);
}
}
}
if (!w) {
this.normalLog(`Remote database looks like empty. fetch from the first.`);
this.setSetting("remote-created", "0");
return;
}
const created = w.created;
if (this.getSetting("remote-created") !== `${created}`) {
this.man.since = "";
this.normalLog(`Remote database looks like rebuilt. fetch from the first again.`);
this.setSetting("remote-created", `${created}`);
} else {
this.normalLog(`Watch starting from ${this.man.since}`);
}
this.man.beginWatch(async (entry) => {
const d = entry.type == "plain" ? entry.data : new Uint8Array(decodeBinary(entry.data));
let path = entry.path.substring(baseDir.length);
if (path.startsWith("/")) {
path = path.substring(1);
if (!w) {
this.normalLog(`Remote database looks like empty. fetch from the first.`);
this.setSetting("remote-created", "0");
return;
}
if (entry.deleted || entry._deleted) {
this.sendLog(`${path} delete detected`);
await this.dispatchDeleted(path);
const created = w.created;
if (this.getSetting("remote-created") !== `${created}`) {
this.man.since = "";
this.normalLog(`Remote database looks like rebuilt. fetch from the first again.`);
this.setSetting("remote-created", `${created}`);
} else {
const docData = { ctime: entry.ctime, mtime: entry.mtime, size: entry.size, deleted: entry.deleted || entry._deleted, data: d };
this.sendLog(`${path} change detected`);
await this.dispatch(path, docData);
this.normalLog(`Watch starting from ${this.man.since}`);
}
}, (entry) => {
this.setSetting("since", this.man.since);
if (entry.path.indexOf(":") !== -1) return false;
return entry.path.startsWith(baseDir);
});
this.man.beginWatch(async (entry) => {
const d = entry.type == "plain" ? entry.data : new Uint8Array(decodeBinary(entry.data));
let path = entry.path.substring(baseDir.length);
if (path.startsWith("/")) {
path = path.substring(1);
}
if (entry.deleted || entry._deleted) {
this.sendLog(`${path} delete detected`);
await this.dispatchDeleted(path);
} else {
const docData = { ctime: entry.ctime, mtime: entry.mtime, size: entry.size, deleted: entry.deleted || entry._deleted, data: d };
this.sendLog(`${path} change detected`);
await this.dispatch(path, docData);
}
}, (entry) => {
this.setSetting("since", this.man.since);
if (entry.path.indexOf(":") !== -1) return false;
return entry.path.startsWith(baseDir);
});
} catch (e) {
this._started.reject(e);
throw e;
} finally {
this._started.resolve();
}
}
async dispatch(path: string, data: FileData | false) {
if (data === false) return;
Expand Down
2 changes: 1 addition & 1 deletion deno.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"pouchdb-mapreduce": "npm:pouchdb-mapreduce@^9.0.0",
"transform-pouch": "npm:transform-pouch@^2.0.0",
"chokidar": "npm:chokidar@^3.5.1",
"trystero": "https://github.com/vrtmrz/vrtmrz/trystero#9e892a93ec14eeb57ce806d272fbb7c3935256d8"
"trystero": "npm:trystero@^0.22.0"
},
"lint": {
"rules": {
Expand Down