Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ RUN sed -i 's|nobody:/|nobody:/home|' /etc/passwd && chown nobody:nobody /home

ENV POSTGRES_URL=postgresql://postgres@localhost/postgres?host=/tmp

EXPOSE 5432

# Development command - starts both PostgreSQL and the analyzer
CMD ["/bin/bash", "-c", "\
su-exec postgres initdb -D $PGDATA || true && \
Expand Down
2 changes: 2 additions & 0 deletions pg14.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ RUN su-exec postgres initdb -D $PGDATA || true && \

USER postgres

EXPOSE 2345

CMD ["/bin/bash", "-c", "\
pg_ctl -D $PGDATA -l $PGDATA/logfile start || (cat $PGDATA/logfile && exit 1) && \
until pg_isready -h /tmp; do sleep 0.5; done && \
Expand Down
11 changes: 9 additions & 2 deletions src/remote/optimization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ export const LiveQueryOptimization = z.discriminatedUnion("state", [
z.object({
state: z.literal("waiting"),
}),
z.object({ state: z.literal("optimizing") }),
z.object({
state: z.literal("optimizing"),
retries: z.number().nonnegative(),
}),
z.object({ state: z.literal("not_supported"), reason: z.string() }),
z.object({
state: z.literal("improvements_available"),
Expand All @@ -37,7 +40,11 @@ export const LiveQueryOptimization = z.discriminatedUnion("state", [
indexesUsed: z.array(z.string()),
explainPlan: z.custom<PostgresExplainStage>(),
}),
z.object({ state: z.literal("timeout") }),
z.object({
state: z.literal("timeout"),
waitedMs: z.number(),
retries: z.number().nonnegative(),
}),
z.object({
state: z.literal("error"),
error: z.string(),
Expand Down
100 changes: 99 additions & 1 deletion src/remote/query-optimizer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ConnectionManager } from "../sync/connection-manager.ts";
import { Connectable } from "../sync/connectable.ts";
import { setTimeout } from "node:timers/promises";
import { assertArrayIncludes } from "@std/assert/array-includes";
import { assert, assertGreater } from "@std/assert";
import { assert, assertEquals, assertGreater } from "@std/assert";
import { type OptimizedQuery, RecentQuery } from "../sql/recent-query.ts";

Deno.test({
Expand Down Expand Up @@ -446,3 +446,101 @@ Deno.test({
}
},
});

Deno.test({
name:
"timed out queries are retried with exponential backoff up to maxRetries",
sanitizeOps: false,
sanitizeResources: false,
fn: async () => {
const pg = await new PostgreSqlContainer("postgres:17")
.withCopyContentToContainer([
{
content: `
create table slow_table(id int, data text);
insert into slow_table (id, data) select i, repeat('x', 1000) from generate_series(1, 100) i;
create extension pg_stat_statements;
select * from slow_table where id = 1;
`,
target: "/docker-entrypoint-initdb.d/init.sql",
},
])
.withCommand([
"-c",
"shared_preload_libraries=pg_stat_statements",
"-c",
"autovacuum=off",
"-c",
"track_counts=off",
"-c",
"track_io_timing=off",
"-c",
"track_activities=off",
])
.start();

const maxRetries = 2;
const queryTimeoutMs = 1;

const manager = ConnectionManager.forLocalDatabase();
const conn = Connectable.fromString(pg.getConnectionUri());
const optimizer = new QueryOptimizer(manager, conn, {
maxRetries,
queryTimeoutMs,
queryTimeoutMaxMs: 100,
});

const timeoutEvents: { query: OptimizedQuery; waitedMs: number }[] = [];
optimizer.addListener("timeout", (query, waitedMs) => {
timeoutEvents.push({ query, waitedMs });
});

const connector = manager.getConnectorFor(conn);
try {
const recentQueries = await connector.getRecentQueries();
const slowQuery = recentQueries.find((q) =>
q.query.includes("slow_table") && q.query.startsWith("select")
);
assert(slowQuery, "Expected to find slow_table query");

await optimizer.start([slowQuery], {
kind: "fromStatisticsExport",
source: { kind: "inline" },
stats: [{
tableName: "slow_table",
schemaName: "public",
relpages: 1000,
reltuples: 1_000_000,
relallvisible: 1,
columns: [
{ columnName: "id", stats: null },
{ columnName: "data", stats: null },
],
indexes: [],
}],
});

await optimizer.finish;

const queries = optimizer.getQueries();
const resultQuery = queries.find((q) => q.query.includes("slow_table"));
assert(resultQuery, "Expected slow_table query in results");

assertEquals(
resultQuery.optimization.state,
"timeout",
"Expected query to be in timeout state",
);

if (resultQuery.optimization.state === "timeout") {
assertEquals(
resultQuery.optimization.retries,
maxRetries,
`Expected ${maxRetries} retries`,
);
}
} finally {
await pg.stop();
}
},
});
86 changes: 70 additions & 16 deletions src/remote/query-optimizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import { parse } from "@libpg-query/parser";
import { DisabledIndexes } from "./disabled-indexes.ts";

const MINIMUM_COST_CHANGE_PERCENTAGE = 5;
const QUERY_TIMEOUT_MS = 10000;

type EventMap = {
error: [Error, OptimizedQuery];
Expand Down Expand Up @@ -61,12 +60,23 @@ export class QueryOptimizer extends EventEmitter<EventMap> {

private queriedSinceVacuum = 0;
private static readonly vacuumThreshold = 5;
private readonly maxRetries: number;
private readonly queryTimeoutMs: number;
private readonly queryTimeoutMaxMs: number;

constructor(
private readonly manager: ConnectionManager,
private readonly connectable: Connectable,
config?: {
maxRetries?: number;
queryTimeoutMs?: number;
queryTimeoutMaxMs?: number;
},
) {
super();
this.maxRetries = config?.maxRetries ?? 5;
this.queryTimeoutMs = config?.queryTimeoutMs ?? 5_000;
this.queryTimeoutMaxMs = config?.queryTimeoutMaxMs ?? 120_000;
}

get validQueriesProcessed() {
Expand Down Expand Up @@ -241,16 +251,15 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
let optimized: OptimizedQuery | undefined;
const token = await this.semaphore.acquire();
try {
for (const [hash, entry] of this.queries.entries()) {
if (entry.optimization.state !== "waiting") {
continue;
}
this.queries.set(
hash,
entry.withOptimization({ state: "optimizing" }),
optimized = this.findAndMarkFirstQueryWhere((entry) =>
entry.optimization.state === "waiting"
);
// if nothing is in queue, start working through timed-out queries
if (!optimized) {
optimized = this.findAndMarkFirstQueryWhere((entry) =>
entry.optimization.state === "timeout" &&
this.isEligibleForTimeoutRetry(entry)
);
optimized = entry;
break;
}
} finally {
this.semaphore.release(token);
Expand All @@ -263,6 +272,7 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
const optimization = await this.optimizeQuery(
optimized,
this.target,
{ timeoutMs: this.calculateTimeoutRetryDelay(optimized) },
);
this.queriedSinceVacuum++;
if (this.queriedSinceVacuum > QueryOptimizer.vacuumThreshold) {
Expand All @@ -287,6 +297,46 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
return Array.from(this.queries.values());
}

private findAndMarkFirstQueryWhere(
filter: (query: OptimizedQuery) => boolean,
): OptimizedQuery | undefined {
for (const [hash, entry] of this.queries.entries()) {
if (!filter(entry)) {
continue;
}
let retries = 0;
if (entry.optimization.state === "timeout") {
retries = entry.optimization.retries;
}
this.queries.set(
hash,
entry.withOptimization({ state: "optimizing", retries }),
);
return entry;
}
}

private calculateTimeoutRetryDelay(entry: OptimizedQuery): number {
const baseDelay = this.queryTimeoutMs;
let delay = baseDelay;
if (entry.optimization.state === "timeout") {
delay = Math.min(
baseDelay * Math.pow(2, entry.optimization.retries),
this.queryTimeoutMaxMs,
);
}
return delay;
}

private isEligibleForTimeoutRetry(
entry: OptimizedQuery,
): boolean {
if (entry.optimization.state !== "timeout") {
return false;
}
return entry.optimization.retries < this.maxRetries;
}

private async vacuum() {
const connector = this.manager.getConnectorFor(this.connectable);
try {
Expand Down Expand Up @@ -329,22 +379,22 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
private async optimizeQuery(
recent: OptimizedQuery,
target: Target,
timeoutMs = QUERY_TIMEOUT_MS,
options: { timeoutMs: number },
): Promise<LiveQueryOptimization> {
const builder = new PostgresQueryBuilder(recent.query);
let cost: number;
let explainPlan: PostgresExplainStage | undefined;
try {
const explain = await withTimeout(
target.optimizer.testQueryWithStats(builder),
timeoutMs,
options.timeoutMs,
);
cost = explain.Plan["Total Cost"];
explainPlan = explain.Plan;
} catch (error) {
console.error("Error with baseline run", error);
if (error instanceof TimeoutError) {
return this.onTimeout(recent, timeoutMs);
return this.onTimeout(recent, options.timeoutMs);
} else if (error instanceof Error) {
return this.onError(recent, error.message);
} else {
Expand All @@ -366,12 +416,12 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
indexes,
(tx) => this.dropDisabledIndexes(tx),
),
timeoutMs,
options.timeoutMs,
);
} catch (error) {
console.error("Error with optimization", error);
if (error instanceof TimeoutError) {
return this.onTimeout(recent, timeoutMs);
return this.onTimeout(recent, options.timeoutMs);
} else if (error instanceof Error) {
return this.onError(recent, error.message, explainPlan);
} else {
Expand Down Expand Up @@ -544,8 +594,12 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
recent: OptimizedQuery,
waitedMs: number,
): LiveQueryOptimization {
let retries = 0;
if ("retries" in recent.optimization) {
retries = recent.optimization.retries + 1;
}
this.emit("timeout", recent, waitedMs);
return { state: "timeout" };
return { state: "timeout", waitedMs, retries };
}
}

Expand Down
Loading