Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions packages/cli/skills/dkg-node/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,19 @@ curl -X POST $BASE_URL/api/assertion/climate-report/import-file \
```json
{
"assertionUri": "did:dkg:context-graph:research/assertion/0xAgentAddr/climate-report",
"fileHash": "sha256:a1b2c3...",
"detectedContentType": "text/markdown",
"fileHash": "keccak256:a1b2c3...",
"detectedContentType": "application/pdf",
"extraction": {
"status": "completed",
"tripleCount": 14,
"pipelineUsed": "text/markdown",
"mdIntermediateHash": "sha256:a1b2c3..."
"pipelineUsed": "application/pdf",
"mdIntermediateHash": "keccak256:d4e5f6..."
}
}
```

Both `fileHash` and `mdIntermediateHash` are `keccak256:<hex>` per spec §10.2:603. `mdIntermediateHash` is only present when Phase 1 actually ran (converter-backed imports like PDF/DOCX); pure-markdown imports leave it undefined.

### Extraction statuses

- `completed` — Phase 1 (if needed) and Phase 2 both ran; triples were written to the assertion graph
Expand Down
485 changes: 453 additions & 32 deletions packages/cli/src/daemon.ts

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions packages/cli/src/extraction-status.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
export interface ExtractionStatusRecord {
status: 'in_progress' | 'completed' | 'skipped' | 'failed';
// `keccak256:<hex>` — canonical per spec §10.2:603 / 03 §2.1:658.
fileHash: string;
detectedContentType: string;
pipelineUsed: string | null;
tripleCount: number;
// `keccak256:<hex>` — present only when Phase 1 actually ran (PDF/
// DOCX via MarkItDown). Undefined for pure-markdown imports.
mdIntermediateHash?: string;
error?: string;
startedAt: string;
Expand Down
303 changes: 263 additions & 40 deletions packages/cli/src/extraction/markdown-extractor.ts

Large diffs are not rendered by default.

139 changes: 114 additions & 25 deletions packages/cli/src/file-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@ import { createHash } from 'node:crypto';
import { mkdir, readFile, rename, stat, unlink, writeFile } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { join, resolve } from 'node:path';
import { ethers } from 'ethers';

export interface FileStoreEntry {
/** sha256 hash of the file contents, formatted as `sha256:<hex>`. */
/**
* sha256 hash of the file contents, formatted as `sha256:<hex>`.
* Used as the on-disk storage key for historical compatibility.
*/
hash: string;
/**
* keccak256 hash of the file contents, formatted as `keccak256:<hex>`.
* Used on the wire and in the data/meta graph triples per
* `05_PROTOCOL_EXTENSIONS.md §6.3` and `19_MARKDOWN_CONTENT_TYPE.md §10`.
*/
keccak256: string;
/** Absolute path to the stored file on disk. */
path: string;
/** Size of the file in bytes. */
Expand All @@ -38,17 +48,23 @@ export class FileStore {

/**
* Persist `bytes` to the store and return the resulting entry. Idempotent:
* re-putting the same bytes returns the same hash without rewriting the
* existing blob. The `contentType` metadata is
* attached to the return value but not persisted to disk — callers that
* need durable content-type metadata should store it separately (e.g. in
* an `_meta` triple keyed by hash).
* re-putting the same bytes returns the same hashes without rewriting the
* existing blob. The `contentType` metadata is attached to the return
* value but not persisted to disk — callers that need durable
* content-type metadata should store it separately (e.g. in an `_meta`
* triple keyed by hash).
*
* Content is stored under the sha256 shard layout. A small pointer file
* under `keccak256/<hex>` is also written so the same blob is resolvable
* by keccak256, which is the hash used on the wire and in graph triples.
*/
async put(bytes: Buffer, contentType: string): Promise<FileStoreEntry> {
const hex = createHash('sha256').update(bytes).digest('hex');
const hash = `sha256:${hex}`;
const path = this.resolvePath(hex);
await mkdir(join(this.rootDir, hex.slice(0, 2)), { recursive: true });
const sha256Hex = createHash('sha256').update(bytes).digest('hex');
const keccakHex = ethers.keccak256(bytes).replace(/^0x/, '');
const hash = `sha256:${sha256Hex}`;
const keccak256 = `keccak256:${keccakHex}`;
const path = this.resolvePath(sha256Hex);
await mkdir(join(this.rootDir, sha256Hex.slice(0, 2)), { recursive: true });
if (!existsSync(path)) {
const tempPath = `${path}.tmp-${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}`;
try {
Expand All @@ -66,20 +82,44 @@ export class FileStore {
}
}
}
return { hash, path, size: bytes.length, contentType };
const pointerPath = this.resolveKeccakPointerPath(keccakHex);
if (!existsSync(pointerPath)) {
await mkdir(join(this.rootDir, 'keccak256', keccakHex.slice(0, 2)), { recursive: true });
const tempPointer = `${pointerPath}.tmp-${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}`;
try {
await writeFile(tempPointer, sha256Hex, { flag: 'wx' });
try {
await rename(tempPointer, pointerPath);
} catch (err: any) {
if (!existsSync(pointerPath)) {
throw err;
}
}
} finally {
if (existsSync(tempPointer)) {
await unlink(tempPointer).catch(() => {});
}
}
}
return { hash, keccak256, path, size: bytes.length, contentType };
}

/** Retrieve the raw bytes for a previously-stored hash, or null if absent. */
/**
* Retrieve the raw bytes for a previously-stored hash, or null if absent.
* Accepts either the `sha256:<hex>` or `keccak256:<hex>` form. For
* keccak256 inputs the pointer file written at put() time is dereferenced
* to the underlying sha256 blob.
*/
async get(hash: string): Promise<Buffer | null> {
const path = this.hashToPath(hash);
const path = await this.hashToPath(hash);
if (!path) return null;
if (!existsSync(path)) return null;
return readFile(path);
}

/** Check whether a hash is present in the store. */
async has(hash: string): Promise<boolean> {
const path = this.hashToPath(hash);
const path = await this.hashToPath(hash);
if (!path) return false;
try {
await stat(path);
Expand All @@ -89,11 +129,52 @@ export class FileStore {
}
}

/** Resolve a hash to its on-disk path, or null for malformed hashes. */
hashToPath(hash: string): string | null {
const hex = normalizeHash(hash);
if (!hex) return null;
return this.resolvePath(hex);
/**
* Resolve a hash to the underlying blob's on-disk path. Always returns
* the CONTENT path regardless of which hash algorithm the caller
* supplied:
*
* - `sha256:<hex>` or bare hex → the sharded blob path directly
* - `keccak256:<hex>` → read the pointer file written at `put()` time,
* deref it to the sha256 hex, return the sharded blob path for that
*
* Returns null for malformed hashes, for keccak256 inputs whose
* pointer file does not exist, and for pointer files that contain
* unexpected content.
*
* This is async because the keccak256 path requires a disk read. If
* you specifically want the on-disk location of the keccak pointer
* file (e.g. for integrity checks, debugging, or cleanup), use
* `hashToPointerPath(keccakHash)` instead — that's synchronous and
* returns null for non-keccak inputs.
*/
async hashToPath(hash: string): Promise<string | null> {
const parsed = parseHash(hash);
if (!parsed) return null;
if (parsed.algo === 'sha256') return this.resolvePath(parsed.hex);
const pointerPath = this.resolveKeccakPointerPath(parsed.hex);
if (!existsSync(pointerPath)) return null;
const sha256Hex = (await readFile(pointerPath, 'utf-8')).trim();
if (!/^[0-9a-f]{64}$/i.test(sha256Hex)) return null;
return this.resolvePath(sha256Hex.toLowerCase());
}

/**
* Resolve a `keccak256:<hex>` hash to its pointer-file path
* synchronously, without dereferencing. Returns null for malformed
* keccak256 hashes and for any other algorithm (use `hashToPath` to
* get the content path for sha256). Intended for callers that want
* to inspect or manipulate the keccak → sha256 indirection directly.
*/
hashToPointerPath(hash: string): string | null {
const parsed = parseHash(hash);
if (!parsed) return null;
if (parsed.algo !== 'keccak256') return null;
return this.resolveKeccakPointerPath(parsed.hex);
}

private resolveKeccakPointerPath(hex: string): string {
return join(this.rootDir, 'keccak256', hex.slice(0, 2), hex.slice(2));
}

/** Root directory the store writes into. */
Expand All @@ -107,13 +188,21 @@ export class FileStore {
}

/**
* Normalize a hash string to its 64-char hex form. Accepts either the
* prefixed (`sha256:abcd...`) or bare (`abcd...`) variants. Returns null for
* anything that isn't a valid sha256 hex.
* Parse a hash string and return its algorithm + 64-char hex form. Accepts
* `sha256:<hex>`, `keccak256:<hex>`, or bare `<hex>` (treated as sha256 for
* backwards compatibility). Returns null for anything that isn't a valid
* 64-char hex under a supported algorithm.
*/
function normalizeHash(hash: string): string | null {
function parseHash(hash: string): { algo: 'sha256' | 'keccak256'; hex: string } | null {
if (typeof hash !== 'string') return null;
const hex = hash.startsWith('sha256:') ? hash.slice('sha256:'.length) : hash;
let algo: 'sha256' | 'keccak256' = 'sha256';
let hex = hash;
if (hash.startsWith('sha256:')) {
hex = hash.slice('sha256:'.length);
} else if (hash.startsWith('keccak256:')) {
algo = 'keccak256';
hex = hash.slice('keccak256:'.length);
}
if (!/^[0-9a-f]{64}$/i.test(hex)) return null;
return hex.toLowerCase();
return { algo, hex: hex.toLowerCase() };
}
6 changes: 3 additions & 3 deletions packages/cli/test/document-processor-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ describe('Full extraction pipeline simulation', () => {
// Build the import-file response as the daemon would
const importFileResponse = {
assertionUri: 'did:dkg:context-graph:sales/assertion/0xSales/q4-report',
fileHash: 'sha256:abc123',
fileHash: 'keccak256:abc123',
detectedContentType: 'text/html',
extraction: {
status: phase2Triples.length > 0 ? 'completed' as const : 'skipped' as const,
tripleCount: phase2Triples.length,
mdIntermediateHash: 'sha256:def456',
mdIntermediateHash: 'keccak256:def456',
pipelineUsed: 'text/html',
},
};
Expand All @@ -328,7 +328,7 @@ describe('Full extraction pipeline simulation', () => {
// Node would return extraction.status: "skipped"
const importFileResponse = {
assertionUri: 'did:dkg:context-graph:test/assertion/0xAgent/binary-blob',
fileHash: 'sha256:xyz789',
fileHash: 'keccak256:xyz789',
detectedContentType: 'application/octet-stream',
extraction: {
status: 'skipped' as const,
Expand Down
Loading
Loading