Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ packages/
# Install dependencies
pnpm install

# Login to wrangler (needed to parse SQL w/ AI)
# Or set a CLOUDFLARE_API_TOKEN in the projects .env file
wrangler login

# Run locally
pnpm dev

Expand Down
7 changes: 5 additions & 2 deletions packages/big-daddy/llm-resources/conductor-crud.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
4. Execute per-shard INSERTs with remapped parameters
5. Dispatch index maintenance events
6. Invalidate cache
7. **Bump per-shard row counts via `batchBumpTableShardRowCounts()`**

**Key Functions:**
- `groupInsertByShards()` - Groups rows by target shard, remaps placeholder indices
- `extractInsertedRows()` - Builds row data for index maintenance

**Important:** Multi-row INSERTs are split by shard key hash.
**Important:**
- Multi-row INSERTs are split by shard key hash
- Row counts use VALUES.length (not SQLite's rowsWritten which includes index writes)

---

Expand All @@ -50,6 +52,7 @@
4. Execute on shards
5. Dispatch index maintenance with old rows
6. Invalidate cache
7. **Decrement per-shard row counts via `batchBumpTableShardRowCounts()`**

---

Expand Down
20 changes: 20 additions & 0 deletions packages/big-daddy/llm-resources/conductor-key-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ Every table has a hidden `_virtualShard` column:
- INSERT: Specific key invalidation
- UPDATE/DELETE: Full index invalidation (conservative)

## Per-Shard Row Count Tracking

Row counts are tracked per-shard in `table_shards.row_count`:

| Operation | Action |
|-----------|--------|
| INSERT | Increment by VALUES.length per shard |
| DELETE | Decrement by rowsAffected per shard |
| UPDATE | No change (row count unchanged) |
| Resharding | Set exact counts after atomic switch |

**Why VALUES.length for INSERT?**
SQLite's `rowsWritten` counts B-tree operations (table + indexes), which inflates the count 2x for tables with composite primary keys. Using VALUES.length gives the logical row count.

**Topology Methods:**
- `getTableShardRowCounts(table)` → get all shard counts
- `bumpTableShardRowCount(table, shard, delta)` → single shard
- `batchBumpTableShardRowCounts(table, deltaMap)` → multiple shards
- `setTableShardRowCounts(table, countsMap)` → set exact (resharding)

## Effect Usage
Table operations (`create-drop.ts`) use Effect for:
- Typed errors (`TableAlreadyExistsError`, `TopologyFetchError`, etc.)
Expand Down
6 changes: 4 additions & 2 deletions packages/big-daddy/src/dashboard/database.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const DashboardPage = ({
</h2>
<form method="post" action={`/dash/${databaseId}/sql`} class="space-y-3">
<textarea
id="sql-query"
name="query"
placeholder="Enter SQL query..."
class="w-full px-3 py-2 border-2 border-black text-sm font-mono focus:outline-none focus:ring-2 focus:ring-offset-2 focus:ring-black"
Expand All @@ -48,12 +49,13 @@ export const DashboardPage = ({
<div class="flex gap-3">
<button
type="submit"
class="flex-1 bg-black text-white font-medium py-2 hover:bg-white hover:text-black hover:border-2 hover:border-black border-2 border-black transition"
class="flex-1 bg-black text-white font-medium py-2 border-2 border-black hover:bg-white hover:text-black transition"
>
Execute
</button>
<button
type="reset"
type="button"
onclick="document.getElementById('sql-query').value = ''"
class="flex-1 bg-white text-black font-medium py-2 border-2 border-black hover:bg-black hover:text-white transition"
>
Clear
Expand Down
13 changes: 13 additions & 0 deletions packages/big-daddy/src/engine/async-jobs/reshard-table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,19 @@ export async function processReshardTableJob(
await topologyStub.atomicStatusSwitch(tableName);
logger.info`Phase 4: Atomic status switch completed ${{ source }} ${{ table: tableName }}`;

// Phase 4B: Set row counts for new shards using the verified distribution
const rowCountsByShardId = new Map<number, number>();
for (const targetShardId of targetShardIds) {
// Use the breakdown from verification if available, otherwise use copy distribution
const rowCount =
verifyStats.verificationDetails?.targets.breakdown[targetShardId] ??
copyStats.distribution[targetShardId] ??
0;
rowCountsByShardId.set(targetShardId, rowCount);
}
await topologyStub.setTableShardRowCounts(tableName, rowCountsByShardId);
logger.info`Phase 4B: Row counts set for new shards ${{ source }} ${{ table: tableName }} ${{ rowCounts: Object.fromEntries(rowCountsByShardId) }}`;

// Phase 5: Delete old shard data
await deleteOldShardData(env, job.database_id, sourceShardId, tableName);
const shardId = sourceShardId;
Expand Down
22 changes: 22 additions & 0 deletions packages/big-daddy/src/engine/conductor/crud/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,28 @@ export async function handleDelete(
duration: 0,
}));

// STEP 8: Decrement row counts for each shard
if (result.rowsAffected && result.rowsAffected > 0) {
const { databaseId, topology } = context;
const topologyId = topology.idFromName(databaseId);
const topologyStub = topology.get(topologyId);

// Build delta map from per-shard results (negative for deletions)
const deltaByShard = new Map<number, number>();
for (let i = 0; i < shardsToQuery.length; i++) {
const shard = shardsToQuery[i];
const shardResult = results[i];
const rowsAffected = shardResult?.rowsAffected ?? 0;
if (shard && rowsAffected > 0) {
deltaByShard.set(shard.shard_id, -rowsAffected);
}
}

if (deltaByShard.size > 0) {
await topologyStub.batchBumpTableShardRowCounts(tableName, deltaByShard);
}
}

logger.info`DELETE query completed ${{ shardsQueried: shardsToQuery.length }} ${{ rowsAffected: result.rowsAffected }}`;

return result;
Expand Down
45 changes: 42 additions & 3 deletions packages/big-daddy/src/engine/conductor/crud/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ export async function handleInsert(
params,
);

logger.info`Query plan determined for INSERT ${{ shardsSelected: planData.shardsToQuery.length }} ${{ indexesUsed: planData.virtualIndexes.length }} ${{ shardKey: planData.shardKey }}`;
logger.info`Query plan determined for INSERT ${{ shardsSelected: planData.shardsToQuery.length }} ${{
indexesUsed: planData.virtualIndexes.length,
}} ${{ shardKey: planData.shardKey }}`;

const allShards = planData.shardsToQuery;

Expand All @@ -214,12 +216,16 @@ export async function handleInsert(
? allShards
: allShards.filter((s) => perShardStatements.has(s.shard_id));

logger.info`INSERT rows grouped by shard ${{ shardsWithRows: perShardStatements.size }} ${{ totalShards: allShards.length }} ${{ shardsToQueryLength: shardsToQuery.length }}`;
logger.info`INSERT rows grouped by shard ${{ shardsWithRows: perShardStatements.size }} ${{ totalShards: allShards.length }} ${{
shardsToQueryLength: shardsToQuery.length,
}}`;

// STEP 3: Log write if resharding is in progress
await logWriteIfResharding(tableName, statement.type, query, params, context);

// STEP 4: Execute per-shard INSERTs
// Track per-shard results for row count bumping (declared here for scope)
const resultsPerShard = new Map<number, QueryResult>();
let execResult:
| {
results:
Expand All @@ -232,7 +238,6 @@ export async function handleInsert(
| undefined;
if (perShardStatements.size > 0) {
// Execute only the shards that have rows
const resultsPerShard = new Map<number, QueryResult>();
const shardStatsPerShard: ShardStats[] = [];

for (const [
Expand Down Expand Up @@ -358,6 +363,40 @@ export async function handleInsert(
// Add shard statistics
result.shardStats = execResult.shardStats;

// STEP 8: Bump row counts for each shard
// Use VALUES clause length (logical rows), not rowsAffected from SQLite
// SQLite's rowsWritten includes B-tree writes for indexes (e.g., composite PK),
// which would inflate counts by 2x for tables with indexes
const { databaseId, topology } = context;
const topologyId = topology.idFromName(databaseId);
const topologyStub = topology.get(topologyId);

if (perShardStatements.size > 0) {
// Build delta map from VALUES clause length per shard
const deltaByShard = new Map<number, number>();
for (const [shardId, { statement: shardStatement }] of perShardStatements) {
const rowsInserted = shardStatement.values?.length ?? 0;
if (rowsInserted > 0) {
deltaByShard.set(shardId, rowsInserted);
}
}

if (deltaByShard.size > 0) {
await topologyStub.batchBumpTableShardRowCounts(tableName, deltaByShard);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too sure we want to be going back and tracking this in the topology, it's going to add an extra DO operation which increases cost. The topology is also a single point of failure and increasing writes to it will lower our total possible throughput

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, I didnt realise about the new DO storage costs. Lets scrap this then

}
} else {
// Fallback case: shard key not in INSERT, rows were inserted on all shards
// Each shard gets all the rows from the original statement
const rowsInserted = statement.values?.length ?? 0;
if (rowsInserted > 0) {
const deltaByShard = new Map<number, number>();
for (const shard of shardsToQuery) {
deltaByShard.set(shard.shard_id, rowsInserted);
}
await topologyStub.batchBumpTableShardRowCounts(tableName, deltaByShard);
}
}

logger.info`INSERT query completed ${{ shardsQueried: shardsToQuery.length }} ${{ rowsAffected: result.rowsAffected }}`;

return result;
Expand Down
98 changes: 98 additions & 0 deletions packages/big-daddy/src/engine/topology/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import type {
AsyncJob,
ReshardingState,
ShardRowCount,
SqlParam,
StorageNode,
TableMetadata,
Expand Down Expand Up @@ -41,7 +42,7 @@
}[];

if (result.length > 0 && result[0]?.value === "true") {
throw new Error(

Check failure on line 45 in packages/big-daddy/src/engine/topology/crud.ts

View workflow job for this annotation

GitHub Actions / Test

Unhandled error

Error: Topology already created. Storage nodes cannot be modified after creation. ❯ CRUDOperations.create src/engine/topology/crud.ts:45:10 ❯ Topology.create src/engine/topology/index.ts:134:20 ❯ DurableObject.fn home/runner/work/big-daddy/big-daddy/node_modules/.pnpm/@cloudflare+vitest-pool-workers@0.10.3_@cloudflare+workers-types@4.20251224.0_@vitest+runner@_ou7hbo3ptxjlyssnkvrfwqdwby/node_modules/@cloudflare/vitest-pool-workers/dist/worker/lib/cloudflare/test-internal.mjs:358:14 This error originated in "test/engine/topology.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "should throw if create is called twice". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.
"Topology already created. Storage nodes cannot be modified after creation.",
);
}
Expand Down Expand Up @@ -224,4 +225,101 @@

return { success: true };
}

// ============================================================================
// Row Count Operations
// ============================================================================

/**
* Get row counts for all shards of a table
*
* @param tableName - Name of the table
* @returns Array of per-shard row counts
*/
getTableShardRowCounts(tableName: string): ShardRowCount[] {
return this.storage.sql
.exec(
`SELECT table_name, shard_id, row_count, updated_at
FROM table_shards
WHERE table_name = ? AND status = 'active'
ORDER BY shard_id`,
tableName,
)
.toArray() as unknown as ShardRowCount[];
}

/**
* Bump (increment or decrement) the row count for a specific shard
* Clamps at 0 to prevent negative counts
*
* @param tableName - Name of the table
* @param shardId - ID of the shard
* @param delta - Amount to change (positive for inserts, negative for deletes)
*/
bumpTableShardRowCount(
tableName: string,
shardId: number,
delta: number,
): void {
const now = Date.now();
// Use MAX(0, ...) to clamp at 0
this.storage.sql.exec(
`UPDATE table_shards
SET row_count = MAX(0, row_count + ?), updated_at = ?
WHERE table_name = ? AND shard_id = ?`,
delta,
now,
tableName,
shardId,
);
}

/**
* Batch bump row counts for multiple shards of a table
* More efficient than calling bumpTableShardRowCount multiple times
*
* @param tableName - Name of the table
* @param deltaByShard - Map of shard_id to delta (amount to change)
*/
batchBumpTableShardRowCounts(
tableName: string,
deltaByShard: Map<number, number>,
): void {
const now = Date.now();
for (const [shardId, delta] of deltaByShard) {
this.storage.sql.exec(
`UPDATE table_shards
SET row_count = MAX(0, row_count + ?), updated_at = ?
WHERE table_name = ? AND shard_id = ?`,
delta,
now,
tableName,
shardId,
);
}
}

/**
* Set row counts for shards (used after resharding to set exact counts)
*
* @param tableName - Name of the table
* @param countsByShardId - Map of shard_id to row_count
*/
setTableShardRowCounts(
tableName: string,
countsByShardId: Map<number, number>,
): void {
const now = Date.now();
for (const [shardId, rowCount] of countsByShardId) {
this.storage.sql.exec(
`UPDATE table_shards
SET row_count = ?, updated_at = ?
WHERE table_name = ? AND shard_id = ?`,
rowCount,
now,
tableName,
shardId,
);
}
}
}
Loading
Loading