diff --git a/Dockerfile b/Dockerfile index 1456b15..c300c4d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -131,6 +131,8 @@ RUN sed -i 's|nobody:/|nobody:/home|' /etc/passwd && chown nobody:nobody /home ENV POSTGRES_URL=postgresql://postgres@localhost/postgres?host=/tmp +EXPOSE 5432 + # Development command - starts both PostgreSQL and the analyzer CMD ["/bin/bash", "-c", "\ su-exec postgres initdb -D $PGDATA || true && \ diff --git a/pg14.Dockerfile b/pg14.Dockerfile index 2b83951..8a184b8 100644 --- a/pg14.Dockerfile +++ b/pg14.Dockerfile @@ -145,6 +145,8 @@ RUN su-exec postgres initdb -D $PGDATA || true && \ USER postgres +EXPOSE 2345 + CMD ["/bin/bash", "-c", "\ pg_ctl -D $PGDATA -l $PGDATA/logfile start || (cat $PGDATA/logfile && exit 1) && \ until pg_isready -h /tmp; do sleep 0.5; done && \ diff --git a/src/remote/optimization.ts b/src/remote/optimization.ts index f2d7d46..812a7c5 100644 --- a/src/remote/optimization.ts +++ b/src/remote/optimization.ts @@ -19,7 +19,10 @@ export const LiveQueryOptimization = z.discriminatedUnion("state", [ z.object({ state: z.literal("waiting"), }), - z.object({ state: z.literal("optimizing") }), + z.object({ + state: z.literal("optimizing"), + retries: z.number().nonnegative(), + }), z.object({ state: z.literal("not_supported"), reason: z.string() }), z.object({ state: z.literal("improvements_available"), @@ -37,7 +40,11 @@ export const LiveQueryOptimization = z.discriminatedUnion("state", [ indexesUsed: z.array(z.string()), explainPlan: z.custom(), }), - z.object({ state: z.literal("timeout") }), + z.object({ + state: z.literal("timeout"), + waitedMs: z.number(), + retries: z.number().nonnegative(), + }), z.object({ state: z.literal("error"), error: z.string(), diff --git a/src/remote/query-optimizer.test.ts b/src/remote/query-optimizer.test.ts index 630d761..59b2056 100644 --- a/src/remote/query-optimizer.test.ts +++ b/src/remote/query-optimizer.test.ts @@ -4,7 +4,7 @@ import { ConnectionManager } from "../sync/connection-manager.ts"; import { Connectable } from "../sync/connectable.ts"; import { setTimeout } from "node:timers/promises"; import { assertArrayIncludes } from "@std/assert/array-includes"; -import { assert, assertGreater } from "@std/assert"; +import { assert, assertEquals, assertGreater } from "@std/assert"; import { type OptimizedQuery, RecentQuery } from "../sql/recent-query.ts"; Deno.test({ @@ -446,3 +446,101 @@ Deno.test({ } }, }); + +Deno.test({ + name: + "timed out queries are retried with exponential backoff up to maxRetries", + sanitizeOps: false, + sanitizeResources: false, + fn: async () => { + const pg = await new PostgreSqlContainer("postgres:17") + .withCopyContentToContainer([ + { + content: ` + create table slow_table(id int, data text); + insert into slow_table (id, data) select i, repeat('x', 1000) from generate_series(1, 100) i; + create extension pg_stat_statements; + select * from slow_table where id = 1; + `, + target: "/docker-entrypoint-initdb.d/init.sql", + }, + ]) + .withCommand([ + "-c", + "shared_preload_libraries=pg_stat_statements", + "-c", + "autovacuum=off", + "-c", + "track_counts=off", + "-c", + "track_io_timing=off", + "-c", + "track_activities=off", + ]) + .start(); + + const maxRetries = 2; + const queryTimeoutMs = 1; + + const manager = ConnectionManager.forLocalDatabase(); + const conn = Connectable.fromString(pg.getConnectionUri()); + const optimizer = new QueryOptimizer(manager, conn, { + maxRetries, + queryTimeoutMs, + queryTimeoutMaxMs: 100, + }); + + const timeoutEvents: { query: OptimizedQuery; waitedMs: number }[] = []; + optimizer.addListener("timeout", (query, waitedMs) => { + timeoutEvents.push({ query, waitedMs }); + }); + + const connector = manager.getConnectorFor(conn); + try { + const recentQueries = await connector.getRecentQueries(); + const slowQuery = recentQueries.find((q) => + q.query.includes("slow_table") && q.query.startsWith("select") + ); + assert(slowQuery, "Expected to find slow_table query"); + + await optimizer.start([slowQuery], { + kind: "fromStatisticsExport", + source: { kind: "inline" }, + stats: [{ + tableName: "slow_table", + schemaName: "public", + relpages: 1000, + reltuples: 1_000_000, + relallvisible: 1, + columns: [ + { columnName: "id", stats: null }, + { columnName: "data", stats: null }, + ], + indexes: [], + }], + }); + + await optimizer.finish; + + const queries = optimizer.getQueries(); + const resultQuery = queries.find((q) => q.query.includes("slow_table")); + assert(resultQuery, "Expected slow_table query in results"); + + assertEquals( + resultQuery.optimization.state, + "timeout", + "Expected query to be in timeout state", + ); + + if (resultQuery.optimization.state === "timeout") { + assertEquals( + resultQuery.optimization.retries, + maxRetries, + `Expected ${maxRetries} retries`, + ); + } + } finally { + await pg.stop(); + } + }, +}); diff --git a/src/remote/query-optimizer.ts b/src/remote/query-optimizer.ts index 39e29db..a54f3be 100644 --- a/src/remote/query-optimizer.ts +++ b/src/remote/query-optimizer.ts @@ -23,7 +23,6 @@ import { parse } from "@libpg-query/parser"; import { DisabledIndexes } from "./disabled-indexes.ts"; const MINIMUM_COST_CHANGE_PERCENTAGE = 5; -const QUERY_TIMEOUT_MS = 10000; type EventMap = { error: [Error, OptimizedQuery]; @@ -61,12 +60,23 @@ export class QueryOptimizer extends EventEmitter { private queriedSinceVacuum = 0; private static readonly vacuumThreshold = 5; + private readonly maxRetries: number; + private readonly queryTimeoutMs: number; + private readonly queryTimeoutMaxMs: number; constructor( private readonly manager: ConnectionManager, private readonly connectable: Connectable, + config?: { + maxRetries?: number; + queryTimeoutMs?: number; + queryTimeoutMaxMs?: number; + }, ) { super(); + this.maxRetries = config?.maxRetries ?? 5; + this.queryTimeoutMs = config?.queryTimeoutMs ?? 5_000; + this.queryTimeoutMaxMs = config?.queryTimeoutMaxMs ?? 120_000; } get validQueriesProcessed() { @@ -241,16 +251,15 @@ export class QueryOptimizer extends EventEmitter { let optimized: OptimizedQuery | undefined; const token = await this.semaphore.acquire(); try { - for (const [hash, entry] of this.queries.entries()) { - if (entry.optimization.state !== "waiting") { - continue; - } - this.queries.set( - hash, - entry.withOptimization({ state: "optimizing" }), + optimized = this.findAndMarkFirstQueryWhere((entry) => + entry.optimization.state === "waiting" + ); + // if nothing is in queue, start working through timed-out queries + if (!optimized) { + optimized = this.findAndMarkFirstQueryWhere((entry) => + entry.optimization.state === "timeout" && + this.isEligibleForTimeoutRetry(entry) ); - optimized = entry; - break; } } finally { this.semaphore.release(token); @@ -263,6 +272,7 @@ export class QueryOptimizer extends EventEmitter { const optimization = await this.optimizeQuery( optimized, this.target, + { timeoutMs: this.calculateTimeoutRetryDelay(optimized) }, ); this.queriedSinceVacuum++; if (this.queriedSinceVacuum > QueryOptimizer.vacuumThreshold) { @@ -287,6 +297,46 @@ export class QueryOptimizer extends EventEmitter { return Array.from(this.queries.values()); } + private findAndMarkFirstQueryWhere( + filter: (query: OptimizedQuery) => boolean, + ): OptimizedQuery | undefined { + for (const [hash, entry] of this.queries.entries()) { + if (!filter(entry)) { + continue; + } + let retries = 0; + if (entry.optimization.state === "timeout") { + retries = entry.optimization.retries; + } + this.queries.set( + hash, + entry.withOptimization({ state: "optimizing", retries }), + ); + return entry; + } + } + + private calculateTimeoutRetryDelay(entry: OptimizedQuery): number { + const baseDelay = this.queryTimeoutMs; + let delay = baseDelay; + if (entry.optimization.state === "timeout") { + delay = Math.min( + baseDelay * Math.pow(2, entry.optimization.retries), + this.queryTimeoutMaxMs, + ); + } + return delay; + } + + private isEligibleForTimeoutRetry( + entry: OptimizedQuery, + ): boolean { + if (entry.optimization.state !== "timeout") { + return false; + } + return entry.optimization.retries < this.maxRetries; + } + private async vacuum() { const connector = this.manager.getConnectorFor(this.connectable); try { @@ -329,7 +379,7 @@ export class QueryOptimizer extends EventEmitter { private async optimizeQuery( recent: OptimizedQuery, target: Target, - timeoutMs = QUERY_TIMEOUT_MS, + options: { timeoutMs: number }, ): Promise { const builder = new PostgresQueryBuilder(recent.query); let cost: number; @@ -337,14 +387,14 @@ export class QueryOptimizer extends EventEmitter { try { const explain = await withTimeout( target.optimizer.testQueryWithStats(builder), - timeoutMs, + options.timeoutMs, ); cost = explain.Plan["Total Cost"]; explainPlan = explain.Plan; } catch (error) { console.error("Error with baseline run", error); if (error instanceof TimeoutError) { - return this.onTimeout(recent, timeoutMs); + return this.onTimeout(recent, options.timeoutMs); } else if (error instanceof Error) { return this.onError(recent, error.message); } else { @@ -366,12 +416,12 @@ export class QueryOptimizer extends EventEmitter { indexes, (tx) => this.dropDisabledIndexes(tx), ), - timeoutMs, + options.timeoutMs, ); } catch (error) { console.error("Error with optimization", error); if (error instanceof TimeoutError) { - return this.onTimeout(recent, timeoutMs); + return this.onTimeout(recent, options.timeoutMs); } else if (error instanceof Error) { return this.onError(recent, error.message, explainPlan); } else { @@ -544,8 +594,12 @@ export class QueryOptimizer extends EventEmitter { recent: OptimizedQuery, waitedMs: number, ): LiveQueryOptimization { + let retries = 0; + if ("retries" in recent.optimization) { + retries = recent.optimization.retries + 1; + } this.emit("timeout", recent, waitedMs); - return { state: "timeout" }; + return { state: "timeout", waitedMs, retries }; } }