From b5533e3cd5529ddcf6c1c6bc18312e50b58ac49b Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Sun, 15 Mar 2026 21:44:58 -0700 Subject: [PATCH 1/4] instrumentation: Use Instrumentation struct for parallel workers This simplifies the DSM allocations a bit since we don't need to separately allocate WAL and buffer usage, and allows the easier future addition of a third stack-based struct being discussed. In passing, adjust InstrAccumParallelQuery to handle multiple workers: All callers currently have a loop to call the accumulation for each worker and a local loop variable defined. To slightly simplify the callers, move the loop into the InstrAccumParallelQuery function and add a new "nworkers" argument to it. Author: Lukas Fittl Reviewed-by: Discussion: --- src/backend/access/brin/brin.c | 46 ++++++------------- src/backend/access/gin/gininsert.c | 46 ++++++------------- src/backend/access/nbtree/nbtsort.c | 46 ++++++------------- src/backend/commands/vacuumparallel.c | 53 ++++++++------------- src/backend/executor/execParallel.c | 66 ++++++++++++--------------- src/backend/executor/instrument.c | 27 +++++++---- src/include/executor/execParallel.h | 5 +- src/include/executor/instrument.h | 4 +- 8 files changed, 114 insertions(+), 179 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index bdb30752e098c..b04028a38581e 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -51,8 +51,7 @@ #define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001) #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xB000000000000004) /* * Status for index builds performed in parallel. This is allocated in a @@ -148,8 +147,7 @@ typedef struct BrinLeader BrinShared *brinshared; Sharedsort *sharedsort; Snapshot snapshot; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; } BrinLeader; /* @@ -2387,8 +2385,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, BrinShared *brinshared; Sharedsort *sharedsort; BrinLeader *brinleader = palloc0_object(BrinLeader); - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; bool leaderparticipates = true; int querylen; @@ -2430,18 +2427,14 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, shm_toc_estimate_keys(&pcxt->estimator, 2); /* - * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE - * and PARALLEL_KEY_BUFFER_USAGE. + * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. * * If there are no extensions loaded that care, we could skip this. We * have no way of knowing whether anyone's looking at pgWalUsage or * pgBufferUsage, so do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); + mul_size(sizeof(Instrumentation), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ @@ -2514,15 +2507,12 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, } /* - * Allocate space for each worker's WalUsage and BufferUsage; no need to + * Allocate space for each worker's Instrumentation; no need to * initialize. */ - walusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); - bufferusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -2533,8 +2523,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, brinleader->brinshared = brinshared; brinleader->sharedsort = sharedsort; brinleader->snapshot = snapshot; - brinleader->walusage = walusage; - brinleader->bufferusage = bufferusage; + brinleader->instr = instr; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -2563,8 +2552,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) { - int i; - /* Shutdown worker processes */ WaitForParallelWorkersToFinish(brinleader->pcxt); @@ -2572,8 +2559,8 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) * Next, accumulate WAL usage. (This must wait for the workers to finish, * or we might get incomplete data.) */ - for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); + InstrAccumParallelQuery(brinleader->instr, + brinleader->pcxt->nworkers_launched); /* Free last reference to MVCC snapshot, if one was used */ if (IsMVCCSnapshot(brinleader->snapshot)) @@ -2887,8 +2874,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *worker_instr; int sortmem; /* @@ -2949,10 +2935,8 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) heapRel, indexRel, sortmem, false); /* Report WAL/buffer usage during parallel execution */ - bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); index_close(indexRel, indexLockmode); table_close(heapRel, heapLockmode); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index cb9ed3b563c6f..c6d144d12f5ec 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -45,8 +45,7 @@ #define PARALLEL_KEY_GIN_SHARED UINT64CONST(0xB000000000000001) #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xB000000000000004) /* * Status for index builds performed in parallel. This is allocated in a @@ -138,8 +137,7 @@ typedef struct GinLeader GinBuildShared *ginshared; Sharedsort *sharedsort; Snapshot snapshot; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; } GinLeader; typedef struct @@ -945,8 +943,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, GinBuildShared *ginshared; Sharedsort *sharedsort; GinLeader *ginleader = palloc0_object(GinLeader); - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; bool leaderparticipates = true; int querylen; @@ -987,18 +984,14 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, shm_toc_estimate_keys(&pcxt->estimator, 2); /* - * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE - * and PARALLEL_KEY_BUFFER_USAGE. + * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. * * If there are no extensions loaded that care, we could skip this. We * have no way of knowing whether anyone's looking at pgWalUsage or * pgBufferUsage, so do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); + mul_size(sizeof(Instrumentation), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ @@ -1066,15 +1059,12 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, } /* - * Allocate space for each worker's WalUsage and BufferUsage; no need to + * Allocate space for each worker's Instrumentation; no need to * initialize. */ - walusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); - bufferusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -1085,8 +1075,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, ginleader->ginshared = ginshared; ginleader->sharedsort = sharedsort; ginleader->snapshot = snapshot; - ginleader->walusage = walusage; - ginleader->bufferusage = bufferusage; + ginleader->instr = instr; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -1115,8 +1104,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) { - int i; - /* Shutdown worker processes */ WaitForParallelWorkersToFinish(ginleader->pcxt); @@ -1124,8 +1111,8 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) * Next, accumulate WAL usage. (This must wait for the workers to finish, * or we might get incomplete data.) */ - for (i = 0; i < ginleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]); + InstrAccumParallelQuery(ginleader->instr, + ginleader->pcxt->nworkers_launched); /* Free last reference to MVCC snapshot, if one was used */ if (IsMVCCSnapshot(ginleader->snapshot)) @@ -2118,8 +2105,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *worker_instr; int sortmem; /* @@ -2199,10 +2185,8 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) heapRel, indexRel, sortmem, false); /* Report WAL/buffer usage during parallel execution */ - bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); index_close(indexRel, indexLockmode); table_close(heapRel, heapLockmode); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 756dfa3dcf47e..0e5fa86cf17a7 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -66,8 +66,7 @@ #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002) #define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000005) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xA000000000000006) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xA000000000000005) /* * DISABLE_LEADER_PARTICIPATION disables the leader's participation in @@ -195,8 +194,7 @@ typedef struct BTLeader Sharedsort *sharedsort; Sharedsort *sharedsort2; Snapshot snapshot; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; } BTLeader; /* @@ -1408,8 +1406,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) Sharedsort *sharedsort2; BTSpool *btspool = buildstate->spool; BTLeader *btleader = palloc0_object(BTLeader); - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; bool leaderparticipates = true; int querylen; @@ -1462,18 +1459,14 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) } /* - * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE - * and PARALLEL_KEY_BUFFER_USAGE. + * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. * * If there are no extensions loaded that care, we could skip this. We * have no way of knowing whether anyone's looking at pgWalUsage or * pgBufferUsage, so do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); + mul_size(sizeof(Instrumentation), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ @@ -1560,15 +1553,12 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) } /* - * Allocate space for each worker's WalUsage and BufferUsage; no need to + * Allocate space for each worker's Instrumentation; no need to * initialize. */ - walusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); - bufferusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -1580,8 +1570,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btleader->sharedsort = sharedsort; btleader->sharedsort2 = sharedsort2; btleader->snapshot = snapshot; - btleader->walusage = walusage; - btleader->bufferusage = bufferusage; + btleader->instr = instr; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -1610,8 +1599,6 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) static void _bt_end_parallel(BTLeader *btleader) { - int i; - /* Shutdown worker processes */ WaitForParallelWorkersToFinish(btleader->pcxt); @@ -1619,8 +1606,8 @@ _bt_end_parallel(BTLeader *btleader) * Next, accumulate WAL usage. (This must wait for the workers to finish, * or we might get incomplete data.) */ - for (i = 0; i < btleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]); + InstrAccumParallelQuery(btleader->instr, + btleader->pcxt->nworkers_launched); /* Free last reference to MVCC snapshot, if one was used */ if (IsMVCCSnapshot(btleader->snapshot)) @@ -1753,8 +1740,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *worker_instr; int sortmem; #ifdef BTREE_BUILD_STATS @@ -1836,10 +1822,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) sharedsort2, sortmem, false); /* Report WAL/buffer usage during parallel execution */ - bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); #ifdef BTREE_BUILD_STATS if (log_btree_build_stats) diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 41cefcfde54fe..b2cdab310d638 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -56,9 +56,8 @@ */ #define PARALLEL_VACUUM_KEY_SHARED 1 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2 -#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3 -#define PARALLEL_VACUUM_KEY_WAL_USAGE 4 -#define PARALLEL_VACUUM_KEY_INDEX_STATS 5 +#define PARALLEL_VACUUM_KEY_INSTRUMENTATION 3 +#define PARALLEL_VACUUM_KEY_INDEX_STATS 4 /* * Struct for cost-based vacuum delay related parameters to share among an @@ -236,11 +235,8 @@ struct ParallelVacuumState /* Shared dead items space among parallel vacuum workers */ TidStore *dead_items; - /* Points to buffer usage area in DSM */ - BufferUsage *buffer_usage; - - /* Points to WAL usage area in DSM */ - WalUsage *wal_usage; + /* Points to instrumentation area in DSM */ + Instrumentation *instr; /* * False if the index is totally unsuitable target for all parallel @@ -311,8 +307,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PVShared *shared; TidStore *dead_items; PVIndStats *indstats; - BufferUsage *buffer_usage; - WalUsage *wal_usage; + Instrumentation *instr; bool *will_parallel_vacuum; Size est_indstats_len; Size est_shared_len; @@ -365,18 +360,15 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_estimate_keys(&pcxt->estimator, 1); /* - * Estimate space for BufferUsage and WalUsage -- - * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. + * Estimate space for Instrumentation -- + * PARALLEL_VACUUM_KEY_INSTRUMENTATION. * * If there are no extensions loaded that care, we could skip this. We * have no way of knowing whether anyone's looking at pgBufferUsage or * pgWalUsage, so do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); + mul_size(sizeof(Instrumentation), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ @@ -474,17 +466,13 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, pvs->shared = shared; /* - * Allocate space for each worker's BufferUsage and WalUsage; no need to - * initialize + * Allocate space for each worker's Instrumentation; no need to + * initialize. */ - buffer_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); - pvs->buffer_usage = buffer_usage; - wal_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); - pvs->wal_usage = wal_usage; + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INSTRUMENTATION, instr); + pvs->instr = instr; /* Store query string for workers */ if (debug_query_string) @@ -944,8 +932,8 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan /* Wait for all vacuum workers to finish */ WaitForParallelWorkersToFinish(pvs->pcxt); - for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); + InstrAccumParallelQuery(pvs->instr, + pvs->pcxt->nworkers_launched); } /* @@ -1202,8 +1190,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) PVIndStats *indstats; PVShared *shared; TidStore *dead_items; - BufferUsage *buffer_usage; - WalUsage *wal_usage; + Instrumentation *worker_instr; int nindexes; char *sharedquery; ErrorContextCallback errcallback; @@ -1311,10 +1298,8 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) parallel_vacuum_process_safe_indexes(&pvs); /* Report buffer/WAL usage during parallel execution */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); - wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); /* Report any remaining cost-based vacuum delay time */ if (track_cost_delay_timing) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 81b87d82fab47..89e717a1c507d 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -60,13 +60,12 @@ #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000004) #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) -#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) +#define PARALLEL_KEY_NODE_INSTRUMENTATION UINT64CONST(0xE000000000000006) #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -662,8 +661,6 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_data; char *pstmt_space; char *paramlistinfo_space; - BufferUsage *bufusage_space; - WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; @@ -727,21 +724,14 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_keys(&pcxt->estimator, 1); /* - * Estimate space for BufferUsage. + * Estimate space for Instrumentation. * * If EXPLAIN is not in use and there are no extensions loaded that care, * we could skip this. But we have no way of knowing whether anyone's * looking at pgBufferUsage, so do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* - * Same thing for WalUsage. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); + mul_size(sizeof(Instrumentation), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Estimate space for tuple queues. */ @@ -827,17 +817,18 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); - /* Allocate space for each worker's BufferUsage; no need to initialize. */ - bufusage_space = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); - pei->buffer_usage = bufusage_space; + /* + * Allocate space for each worker's Instrumentation; no need to + * initialize. + */ + { + Instrumentation *instr; - /* Same for WalUsage. */ - walusage_space = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space); - pei->wal_usage = walusage_space; + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); + pei->instrumentation = instr; + } /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); @@ -863,9 +854,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, instrument = GetInstrumentationArray(instrumentation); for (i = 0; i < nworkers * e.nnodes; ++i) InstrInitNode(&instrument[i], estate->es_instrument, false); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, + shm_toc_insert(pcxt->toc, PARALLEL_KEY_NODE_INSTRUMENTATION, instrumentation); - pei->instrumentation = instrumentation; + pei->node_instrumentation = instrumentation; if (estate->es_jit_flags != PGJIT_NONE) { @@ -1258,8 +1249,7 @@ ExecParallelFinish(ParallelExecutorInfo *pei) * Next, accumulate buffer/WAL usage. (This must wait for the workers to * finish, or we might get incomplete data.) */ - for (i = 0; i < nworkers; i++) - InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); + InstrAccumParallelQuery(pei->instrumentation, nworkers); pei->finished = true; } @@ -1273,10 +1263,10 @@ ExecParallelFinish(ParallelExecutorInfo *pei) void ExecParallelCleanup(ParallelExecutorInfo *pei) { - /* Accumulate instrumentation, if any. */ - if (pei->instrumentation) + /* Accumulate node instrumentation, if any. */ + if (pei->node_instrumentation) ExecParallelRetrieveInstrumentation(pei->planstate, - pei->instrumentation); + pei->node_instrumentation); /* Accumulate JIT instrumentation, if any. */ if (pei->jit_instrumentation) @@ -1514,8 +1504,6 @@ void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { FixedParallelExecutorState *fpes; - BufferUsage *buffer_usage; - WalUsage *wal_usage; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1530,7 +1518,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); - instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); + instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_NODE_INSTRUMENTATION, true); if (instrumentation != NULL) instrument_options = instrumentation->instrument_options; jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION, @@ -1588,10 +1576,12 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecutorFinish(queryDesc); /* Report buffer/WAL usage during parallel execution. */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); + { + Instrumentation *worker_instr; + + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); + } /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index ffbcd57213396..20431e64fb4de 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -284,20 +284,29 @@ InstrStartParallelQuery(void) /* report usage after parallel executor shutdown */ void -InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage) +InstrEndParallelQuery(Instrumentation *instr) { - memset(bufusage, 0, sizeof(BufferUsage)); - BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage); - memset(walusage, 0, sizeof(WalUsage)); - WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage); + memset(&instr->bufusage, 0, sizeof(BufferUsage)); + BufferUsageAccumDiff(&instr->bufusage, &pgBufferUsage, &save_pgBufferUsage); + memset(&instr->walusage, 0, sizeof(WalUsage)); + WalUsageAccumDiff(&instr->walusage, &pgWalUsage, &save_pgWalUsage); } -/* accumulate work done by workers in leader's stats */ +/* + * Accumulate work done by parallel workers in the leader's stats. + * + * instr points to the per-worker Instrumentation array the leader allocated in + * DSM; each of the nworkers launched workers reported into its own slot via + * InstrEndParallelQuery. Must be called only after the workers have finished. + */ void -InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage) +InstrAccumParallelQuery(Instrumentation *instr, int nworkers) { - BufferUsageAdd(&pgBufferUsage, bufusage); - WalUsageAdd(&pgWalUsage, walusage); + for (int i = 0; i < nworkers; i++) + { + BufferUsageAdd(&pgBufferUsage, &instr[i].bufusage); + WalUsageAdd(&pgWalUsage, &instr[i].walusage); + } } /* dst += add */ diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 5a2034811d563..6c8b602d07f98 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -25,9 +25,8 @@ typedef struct ParallelExecutorInfo { PlanState *planstate; /* plan subtree we're running in parallel */ ParallelContext *pcxt; /* parallel context we're using */ - BufferUsage *buffer_usage; /* points to bufusage area in DSM */ - WalUsage *wal_usage; /* walusage area in DSM */ - SharedExecutorInstrumentation *instrumentation; /* optional */ + Instrumentation *instrumentation; /* instrumentation area in DSM */ + SharedExecutorInstrumentation *node_instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index f093a52aae013..8fab0e17fd0fb 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -148,8 +148,8 @@ extern void InstrStartTrigger(TriggerInstrumentation *tginstr); extern void InstrStopTrigger(TriggerInstrumentation *tginstr, int64 firings); extern void InstrStartParallelQuery(void); -extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage); -extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage); +extern void InstrEndParallelQuery(Instrumentation *instr); +extern void InstrAccumParallelQuery(Instrumentation *instr, int nworkers); extern void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add, const BufferUsage *sub); extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, From b284ad73268cafd0c56f85a9ac80b09f7519cc26 Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Sun, 31 May 2026 10:01:32 -0700 Subject: [PATCH 2/4] Unify parallel worker handling for instrumentation Similar to query text, introduce helpers to estimate the shared memory required for instrumentation, and for allocating and storing it in shared memory. Author: Lukas Fittl Reviewed-by: Discussion: --- src/backend/access/brin/brin.c | 21 ++++-------------- src/backend/access/gin/gininsert.c | 21 ++++-------------- src/backend/access/nbtree/nbtsort.c | 21 ++++-------------- src/backend/access/transam/parallel.c | 32 +++++++++++++++++++++++++++ src/backend/commands/vacuumparallel.c | 25 +++++---------------- src/backend/executor/execParallel.c | 27 +++++----------------- src/include/access/parallel.h | 4 ++++ 7 files changed, 58 insertions(+), 93 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index b04028a38581e..8bd032c866993 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -2426,16 +2426,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, shm_toc_estimate_keys(&pcxt->estimator, 2); - /* - * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgWalUsage or - * pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -2506,13 +2498,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); } - /* - * Allocate space for each worker's Instrumentation; no need to - * initialize. - */ - instr = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); + /* Allocate space for each worker's Instrumentation. */ + instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index c6d144d12f5ec..97a8f38be5d5c 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -983,16 +983,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, shm_toc_estimate_keys(&pcxt->estimator, 2); - /* - * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgWalUsage or - * pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -1058,13 +1050,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); } - /* - * Allocate space for each worker's Instrumentation; no need to - * initialize. - */ - instr = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); + /* Allocate space for each worker's Instrumentation. */ + instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 0e5fa86cf17a7..7f0a1b88062a8 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1458,16 +1458,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) shm_toc_estimate_keys(&pcxt->estimator, 3); } - /* - * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgWalUsage or - * pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -1552,13 +1544,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); } - /* - * Allocate space for each worker's Instrumentation; no need to - * initialize. - */ - instr = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); + /* Allocate space for each worker's Instrumentation. */ + instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 89e9d224eec7d..17fb7c15aabc3 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -28,6 +28,7 @@ #include "commands/async.h" #include "commands/vacuum.h" #include "executor/execParallel.h" +#include "executor/instrument.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqmq.h" @@ -1026,6 +1027,37 @@ DestroyParallelContext(ParallelContext *pcxt) pfree(pcxt); } +/* + * Helpers for managing the per-worker Instrumentation array that parallel + * leaders allocate in DSM. The worker side fills in its own slot directly via + * InstrEndParallelQuery. + */ + +/* Reserve DSM space for the per-worker Instrumentation array. */ +void +EstimateParallelInstrumentation(ParallelContext *pcxt) +{ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * Allocate the per-worker Instrumentation array in DSM and publish it under + * the given key. No need to initialize; each worker fills in its own slot. + * Returns the array for the leader's convenience. + */ +Instrumentation * +StoreParallelInstrumentation(ParallelContext *pcxt, uint64 key) +{ + Instrumentation *instr; + + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, key, instr); + return instr; +} + /* * Are there any parallel contexts currently active? */ diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index b2cdab310d638..5ffae66260d97 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -307,7 +307,6 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PVShared *shared; TidStore *dead_items; PVIndStats *indstats; - Instrumentation *instr; bool *will_parallel_vacuum; Size est_indstats_len; Size est_shared_len; @@ -359,17 +358,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); shm_toc_estimate_keys(&pcxt->estimator, 1); - /* - * Estimate space for Instrumentation -- - * PARALLEL_VACUUM_KEY_INSTRUMENTATION. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgBufferUsage or - * pgWalUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -465,14 +455,9 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); pvs->shared = shared; - /* - * Allocate space for each worker's Instrumentation; no need to - * initialize. - */ - instr = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INSTRUMENTATION, instr); - pvs->instr = instr; + /* Allocate space for each worker's Instrumentation. */ + pvs->instr = StoreParallelInstrumentation(pcxt, + PARALLEL_VACUUM_KEY_INSTRUMENTATION); /* Store query string for workers */ if (debug_query_string) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 89e717a1c507d..4f202f544b332 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -723,16 +723,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); shm_toc_estimate_keys(&pcxt->estimator, 1); - /* - * Estimate space for Instrumentation. - * - * If EXPLAIN is not in use and there are no extensions loaded that care, - * we could skip this. But we have no way of knowing whether anyone's - * looking at pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Estimate space for tuple queues. */ shm_toc_estimate_chunk(&pcxt->estimator, @@ -817,18 +809,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); - /* - * Allocate space for each worker's Instrumentation; no need to - * initialize. - */ - { - Instrumentation *instr; - - instr = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); - pei->instrumentation = instr; - } + /* Allocate space for each worker's Instrumentation. */ + pei->instrumentation = StoreParallelInstrumentation(pcxt, + PARALLEL_KEY_INSTRUMENTATION); /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 60f857675e05a..3c85924e85d5b 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -72,6 +72,10 @@ extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); +extern void EstimateParallelInstrumentation(ParallelContext *pcxt); +extern struct Instrumentation *StoreParallelInstrumentation(ParallelContext *pcxt, + uint64 key); + extern void HandleParallelMessageInterrupt(void); extern void ProcessParallelMessages(void); extern void AtEOXact_Parallel(bool isCommit); From d9a7af3fb658fa8a201475d51bb101387fd9d626 Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Sun, 31 May 2026 09:59:33 -0700 Subject: [PATCH 3/4] Unify parallel worker handling for query text Multiple callers (parallel index builds, parallel vacuum, parallel query) all had the same implementation for passing query text to the parallel workers. To reduce duplicated code, introduce new helper functions to: (1) estimate the needed shared memory space for the query text, (2) store the query text in shared memory, and (3) restore the query text in the worker. Author: Lukas Fittl Reviewed-by: Discussion: --- src/backend/access/brin/brin.c | 28 +++--------------- src/backend/access/gin/gininsert.c | 28 +++--------------- src/backend/access/nbtree/nbtsort.c | 28 +++--------------- src/backend/access/transam/parallel.c | 42 +++++++++++++++++++++++++++ src/backend/commands/vacuumparallel.c | 26 ++--------------- src/include/access/parallel.h | 3 ++ 6 files changed, 60 insertions(+), 95 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 8bd032c866993..c237449123d25 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -2387,7 +2387,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, BrinLeader *brinleader = palloc0_object(BrinLeader); Instrumentation *instr; bool leaderparticipates = true; - int querylen; #ifdef DISABLE_LEADER_PARTICIPATION leaderparticipates = false; @@ -2430,14 +2429,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); /* Everyone's had a chance to ask for space, so now create the DSM */ InitializeParallelDSM(pcxt); @@ -2489,14 +2481,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); - } + StoreParallelQueryText(pcxt, PARALLEL_KEY_QUERY_TEXT); /* Allocate space for each worker's Instrumentation. */ instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); @@ -2853,7 +2838,6 @@ _brin_parallel_scan_and_build(BrinBuildState *state, void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) { - char *sharedquery; BrinShared *brinshared; Sharedsort *sharedsort; BrinBuildState *buildstate; @@ -2871,12 +2855,8 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Assert((MyProc->statusFlags == 0) || (MyProc->statusFlags == PROC_IN_SAFE_IC)); - /* Set debug_query_string for individual workers first */ - sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - - /* Report the query string from leader */ - pgstat_report_activity(STATE_RUNNING, debug_query_string); + /* Set debug_query_string and report the query string from leader */ + RestoreParallelQueryText(toc, PARALLEL_KEY_QUERY_TEXT); /* Look up brin shared state */ brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 97a8f38be5d5c..57e51c574b914 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -945,7 +945,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, GinLeader *ginleader = palloc0_object(GinLeader); Instrumentation *instr; bool leaderparticipates = true; - int querylen; #ifdef DISABLE_LEADER_PARTICIPATION leaderparticipates = false; @@ -987,14 +986,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); /* Everyone's had a chance to ask for space, so now create the DSM */ InitializeParallelDSM(pcxt); @@ -1041,14 +1033,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); - } + StoreParallelQueryText(pcxt, PARALLEL_KEY_QUERY_TEXT); /* Allocate space for each worker's Instrumentation. */ instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); @@ -2084,7 +2069,6 @@ _gin_parallel_scan_and_build(GinBuildState *state, void _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) { - char *sharedquery; GinBuildShared *ginshared; Sharedsort *sharedsort; GinBuildState buildstate; @@ -2102,12 +2086,8 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Assert((MyProc->statusFlags == 0) || (MyProc->statusFlags == PROC_IN_SAFE_IC)); - /* Set debug_query_string for individual workers first */ - sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - - /* Report the query string from leader */ - pgstat_report_activity(STATE_RUNNING, debug_query_string); + /* Set debug_query_string and report the query string from leader */ + RestoreParallelQueryText(toc, PARALLEL_KEY_QUERY_TEXT); /* Look up gin shared state */ ginshared = shm_toc_lookup(toc, PARALLEL_KEY_GIN_SHARED, false); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 7f0a1b88062a8..f43bb939081c3 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1408,7 +1408,6 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) BTLeader *btleader = palloc0_object(BTLeader); Instrumentation *instr; bool leaderparticipates = true; - int querylen; #ifdef DISABLE_LEADER_PARTICIPATION leaderparticipates = false; @@ -1462,14 +1461,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); /* Everyone's had a chance to ask for space, so now create the DSM */ InitializeParallelDSM(pcxt); @@ -1535,14 +1527,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) } /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); - } + StoreParallelQueryText(pcxt, PARALLEL_KEY_QUERY_TEXT); /* Allocate space for each worker's Instrumentation. */ instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); @@ -1717,7 +1702,6 @@ _bt_leader_participate_as_worker(BTBuildState *buildstate) void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) { - char *sharedquery; BTSpool *btspool; BTSpool *btspool2; BTShared *btshared; @@ -1742,12 +1726,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) Assert((MyProc->statusFlags == 0) || (MyProc->statusFlags == PROC_IN_SAFE_IC)); - /* Set debug_query_string for individual workers first */ - sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - - /* Report the query string from leader */ - pgstat_report_activity(STATE_RUNNING, debug_query_string); + /* Set debug_query_string and report the query string from leader */ + RestoreParallelQueryText(toc, PARALLEL_KEY_QUERY_TEXT); /* Look up nbtree shared state */ btshared = shm_toc_lookup(toc, PARALLEL_KEY_BTREE_SHARED, false); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 17fb7c15aabc3..f55cfa6853fc6 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -1027,6 +1027,48 @@ DestroyParallelContext(ParallelContext *pcxt) pfree(pcxt); } +/* + * Helpers for passing the current query text down to parallel workers. + */ + +/* Reserve DSM space for the query text, if any. */ +void +EstimateParallelQueryText(ParallelContext *pcxt) +{ + if (debug_query_string) + { + shm_toc_estimate_chunk(&pcxt->estimator, strlen(debug_query_string) + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* Copy the query text into DSM under the given key, if any. */ +void +StoreParallelQueryText(ParallelContext *pcxt, uint64 key) +{ + if (debug_query_string) + { + Size querylen = strlen(debug_query_string); + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, key, sharedquery); + } +} + +/* + * Restore the query text in a worker: set debug_query_string and report it as + * the current activity. The key is looked up with missing_ok, so this is a + * no-op (leaving debug_query_string NULL) when the leader stored no text. + */ +void +RestoreParallelQueryText(shm_toc *toc, uint64 key) +{ + debug_query_string = shm_toc_lookup(toc, key, true); + pgstat_report_activity(STATE_RUNNING, debug_query_string); +} + /* * Helpers for managing the per-worker Instrumentation array that parallel * leaders allocate in DSM. The worker side fills in its own slot directly via diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 5ffae66260d97..f0726752f9db1 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -312,7 +312,6 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, Size est_shared_len; int nindexes_mwm = 0; int parallel_workers = 0; - int querylen; /* * A parallel vacuum must be requested and there must be indexes on the @@ -362,14 +361,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); InitializeParallelDSM(pcxt); @@ -460,16 +452,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PARALLEL_VACUUM_KEY_INSTRUMENTATION); /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - sharedquery[querylen] = '\0'; - shm_toc_insert(pcxt->toc, - PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); - } + StoreParallelQueryText(pcxt, PARALLEL_VACUUM_KEY_QUERY_TEXT); /* Success -- return parallel vacuum state */ return pvs; @@ -1177,7 +1160,6 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) TidStore *dead_items; Instrumentation *worker_instr; int nindexes; - char *sharedquery; ErrorContextCallback errcallback; /* @@ -1191,9 +1173,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false); /* Set debug_query_string for individual workers */ - sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - pgstat_report_activity(STATE_RUNNING, debug_query_string); + RestoreParallelQueryText(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT); /* Track query ID */ pgstat_report_query_id(shared->queryid, false); diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 3c85924e85d5b..20e7e7673b902 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -72,6 +72,9 @@ extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); +extern void EstimateParallelQueryText(ParallelContext *pcxt); +extern void StoreParallelQueryText(ParallelContext *pcxt, uint64 key); +extern void RestoreParallelQueryText(shm_toc *toc, uint64 key); extern void EstimateParallelInstrumentation(ParallelContext *pcxt); extern struct Instrumentation *StoreParallelInstrumentation(ParallelContext *pcxt, uint64 key); From c943118ed44608561da184967171474581b2f54c Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Sun, 31 May 2026 11:34:29 -0700 Subject: [PATCH 4/4] Unify parallel index build infrastructure The three in-tree index AMs that currently support parallel index builds (nbtree, gin and brin) each share very similar code for initializing parallel workers, allocating shared memory and communicating between the leader and the workers. To reduce duplication, introduce shared helpers for parallel index builds, unifiying the existing code. This also fixes an oversight in parallel GIN index builds that forgot to pass the queryid to the parallel workers, like other AMs do. Author: Lukas Fittl Reviewed-by: Discussion: --- src/backend/access/brin/brin.c | 188 +++---------- src/backend/access/common/Makefile | 1 + src/backend/access/common/meson.build | 1 + .../access/common/parallel_index_build.c | 246 ++++++++++++++++++ src/backend/access/gin/gininsert.c | 182 +++---------- src/backend/access/nbtree/nbtsort.c | 202 +++----------- src/include/access/parallel_index_build.h | 119 +++++++++ 7 files changed, 465 insertions(+), 474 deletions(-) create mode 100644 src/backend/access/common/parallel_index_build.c create mode 100644 src/include/access/parallel_index_build.h diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index c237449123d25..dc651383d63c0 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -19,6 +19,7 @@ #include "access/brin_page.h" #include "access/brin_pageops.h" #include "access/brin_xlog.h" +#include "access/parallel_index_build.h" #include "access/relation.h" #include "access/reloptions.h" #include "access/relscan.h" @@ -59,49 +60,13 @@ */ typedef struct BrinShared { - /* - * These fields are not modified during the build. They primarily exist - * for the benefit of worker processes that need to create state - * corresponding to that used by the leader. - */ - Oid heaprelid; - Oid indexrelid; - bool isconcurrent; - BlockNumber pagesPerRange; - int scantuplesortstates; - - /* Query ID, for report in worker processes */ - int64 queryid; + /* Common parallel index build state (must be first) */ + ParallelIndexBuildShared base; /* - * workersdonecv is used to monitor the progress of workers. All parallel - * participants must indicate that they are done before leader can use - * results built by the workers (and before leader can write the data into - * the index). + * BRIN-specific immutable state, not modified during the build. */ - ConditionVariable workersdonecv; - - /* - * mutex protects all fields before heapdesc. - * - * These fields contain status information of interest to BRIN index - * builds that must work just the same when an index is built in parallel. - */ - slock_t mutex; - - /* - * Mutable state that is maintained by workers, and reported back to - * leader at end of the scans. - * - * nparticipantsdone is number of worker processes finished. - * - * reltuples is the total number of input heap tuples. - * - * indtuples is the total number of tuples that made it into the index. - */ - int nparticipantsdone; - double reltuples; - double indtuples; + BlockNumber pagesPerRange; /* * ParallelTableScanDescData data follows. Can't directly embed here, as @@ -233,7 +198,6 @@ static void brin_fill_empty_ranges(BrinBuildState *state, static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state); -static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _brin_parallel_heapscan(BrinBuildState *state); static double _brin_parallel_merge(BrinBuildState *state); static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, @@ -2396,29 +2360,18 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Enter parallel mode, and create context for parallel build of brin * index */ - EnterParallelMode(); - Assert(request > 0); - pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main", - request); + pcxt = ParallelIndexBuildCreateContext("_brin_parallel_build_main", request); scantuplesortstates = leaderparticipates ? request + 1 : request; - /* - * Prepare for scan of the base relation. In a normal index build, we use - * SnapshotAny because we must retrieve all tuples and do our own time - * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. - */ - if (!isconcurrent) - snapshot = SnapshotAny; - else - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + /* Prepare for scan of the base relation. */ + snapshot = ParallelIndexBuildGetSnapshot(isconcurrent); /* * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. */ - estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + estbrinshared = ParallelIndexBuildEstimateShared(heap, snapshot, + sizeof(BrinShared)); shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -2446,24 +2399,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, /* Store shared build state, for which we reserved space */ brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared); - /* Initialize immutable state */ - brinshared->heaprelid = RelationGetRelid(heap); - brinshared->indexrelid = RelationGetRelid(index); - brinshared->isconcurrent = isconcurrent; - brinshared->scantuplesortstates = scantuplesortstates; + /* Initialize BRIN-specific immutable state */ brinshared->pagesPerRange = buildstate->bs_pagesPerRange; - brinshared->queryid = pgstat_get_my_query_id(); - ConditionVariableInit(&brinshared->workersdonecv); - SpinLockInit(&brinshared->mutex); - - /* Initialize mutable state */ - brinshared->nparticipantsdone = 0; - brinshared->reltuples = 0.0; - brinshared->indtuples = 0.0; - - table_parallelscan_initialize(heap, - ParallelTableScanFromBrinShared(brinshared), - snapshot); + /* Initialize common state, and the parallel scan that follows the struct */ + ParallelIndexBuildInitShared(&brinshared->base, heap, index, isconcurrent, + scantuplesortstates, + ParallelTableScanFromBrinShared(brinshared), + snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -2524,21 +2466,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) { - /* Shutdown worker processes */ - WaitForParallelWorkersToFinish(brinleader->pcxt); - - /* - * Next, accumulate WAL usage. (This must wait for the workers to finish, - * or we might get incomplete data.) - */ - InstrAccumParallelQuery(brinleader->instr, - brinleader->pcxt->nworkers_launched); - - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(brinleader->snapshot)) - UnregisterSnapshot(brinleader->snapshot); - DestroyParallelContext(brinleader->pcxt); - ExitParallelMode(); + ParallelIndexBuildEnd(brinleader->pcxt, brinleader->instr, + brinleader->snapshot); } /* @@ -2554,28 +2483,12 @@ static double _brin_parallel_heapscan(BrinBuildState *state) { BrinShared *brinshared = state->bs_leader->brinshared; - int nparticipanttuplesorts; - nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts; - for (;;) - { - SpinLockAcquire(&brinshared->mutex); - if (brinshared->nparticipantsdone == nparticipanttuplesorts) - { - /* copy the data into leader state */ - state->bs_reltuples = brinshared->reltuples; - state->bs_numtuples = brinshared->indtuples; - - SpinLockRelease(&brinshared->mutex); - break; - } - SpinLockRelease(&brinshared->mutex); - - ConditionVariableSleep(&brinshared->workersdonecv, - WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); - } - - ConditionVariableCancelSleep(); + /* Wait for all participants to finish, and collect their results */ + ParallelIndexBuildWaitForWorkers(&brinshared->base, + state->bs_leader->nparticipanttuplesorts, + &state->bs_reltuples, &state->bs_numtuples, + NULL, NULL); return state->bs_reltuples; } @@ -2734,18 +2647,6 @@ _brin_parallel_merge(BrinBuildState *state) return reltuples; } -/* - * Returns size of shared memory required to store state for a parallel - * brin index build based on the snapshot its parallel scan will use. - */ -static Size -_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) -{ - /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ - return add_size(BUFFERALIGN(sizeof(BrinShared)), - table_parallelscan_estimate(heap, snapshot)); -} - /* * Within leader, participate as a parallel worker. */ @@ -2800,7 +2701,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, /* Join parallel scan */ indexInfo = BuildIndexInfo(index); - indexInfo->ii_Concurrent = brinshared->isconcurrent; + indexInfo->ii_Concurrent = brinshared->base.isconcurrent; scan = table_beginscan_parallel(heap, ParallelTableScanFromBrinShared(brinshared), @@ -2817,17 +2718,9 @@ _brin_parallel_scan_and_build(BrinBuildState *state, state->bs_reltuples += reltuples; - /* - * Done. Record ambuild statistics. - */ - SpinLockAcquire(&brinshared->mutex); - brinshared->nparticipantsdone++; - brinshared->reltuples += state->bs_reltuples; - brinshared->indtuples += state->bs_numtuples; - SpinLockRelease(&brinshared->mutex); - - /* Notify leader */ - ConditionVariableSignal(&brinshared->workersdonecv); + /* Done. Record ambuild statistics, and notify leader. */ + ParallelIndexBuildReportScanDone(&brinshared->base, state->bs_reltuples, + state->bs_numtuples, false, false); tuplesort_end(state->bs_sortstate); } @@ -2861,24 +2754,9 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Look up brin shared state */ brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false); - /* Open relations using lock modes known to be obtained by index.c */ - if (!brinshared->isconcurrent) - { - heapLockmode = ShareLock; - indexLockmode = AccessExclusiveLock; - } - else - { - heapLockmode = ShareUpdateExclusiveLock; - indexLockmode = RowExclusiveLock; - } - - /* Track query ID */ - pgstat_report_query_id(brinshared->queryid, false); - - /* Open relations within worker */ - heapRel = table_open(brinshared->heaprelid, heapLockmode); - indexRel = index_open(brinshared->indexrelid, indexLockmode); + /* Open relations within worker, using the leader's lock modes */ + ParallelIndexBuildOpenRelations(&brinshared->base, &heapRel, &indexRel, + &heapLockmode, &indexLockmode); buildstate = initialize_brin_buildstate(indexRel, NULL, brinshared->pagesPerRange, @@ -2896,7 +2774,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) * (when requested number of workers were not launched, this will be * somewhat higher than it is for other workers). */ - sortmem = maintenance_work_mem / brinshared->scantuplesortstates; + sortmem = maintenance_work_mem / brinshared->base.scantuplesortstates; _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort, heapRel, indexRel, sortmem, false); @@ -2905,8 +2783,8 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); - index_close(indexRel, indexLockmode); - table_close(heapRel, heapLockmode); + ParallelIndexBuildCloseRelations(heapRel, indexRel, heapLockmode, + indexLockmode); } /* diff --git a/src/backend/access/common/Makefile b/src/backend/access/common/Makefile index e78de312659ed..0d4769b96af57 100644 --- a/src/backend/access/common/Makefile +++ b/src/backend/access/common/Makefile @@ -18,6 +18,7 @@ OBJS = \ detoast.o \ heaptuple.o \ indextuple.o \ + parallel_index_build.o \ printsimple.o \ printtup.o \ relation.o \ diff --git a/src/backend/access/common/meson.build b/src/backend/access/common/meson.build index 35e89b5ea67d5..7d6247b97542d 100644 --- a/src/backend/access/common/meson.build +++ b/src/backend/access/common/meson.build @@ -6,6 +6,7 @@ backend_sources += files( 'detoast.c', 'heaptuple.c', 'indextuple.c', + 'parallel_index_build.c', 'printsimple.c', 'printtup.c', 'relation.c', diff --git a/src/backend/access/common/parallel_index_build.c b/src/backend/access/common/parallel_index_build.c new file mode 100644 index 0000000000000..adce8c547e539 --- /dev/null +++ b/src/backend/access/common/parallel_index_build.c @@ -0,0 +1,246 @@ +/*------------------------------------------------------------------------- + * + * parallel_index_build.c + * Shared infrastructure for parallel index builds. + * + * This file contains the access-method-independent parts of the parallel + * index build lifecycle shared by different index access methods: setting up the + * parallel context and shared state, and opening/closing relations and tearing + * everything down again in the worker and leader. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/common/parallel_index_build.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/parallel_index_build.h" +#include "access/genam.h" +#include "access/table.h" +#include "access/tableam.h" +#include "access/xact.h" +#include "executor/instrument.h" +#include "pgstat.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/wait_event.h" + +/* + * Enter parallel mode and create a parallel context for an index build, using + * the named worker entry point. + */ +ParallelContext * +ParallelIndexBuildCreateContext(const char *worker_function, int nworkers) +{ + Assert(nworkers > 0); + + EnterParallelMode(); + + return CreateParallelContext("postgres", worker_function, nworkers); +} + +/* + * Choose the snapshot for the heap scan of an index build. + * + * In a normal index build, we use SnapshotAny because we must retrieve all + * tuples and do our own time qual checks (because we have to index + * RECENTLY_DEAD tuples). In a concurrent build, we take a regular MVCC + * snapshot and index whatever's live according to that. The caller is + * responsible for unregistering an MVCC snapshot by calling ParallelIndexBuildEnd. + */ +Snapshot +ParallelIndexBuildGetSnapshot(bool isconcurrent) +{ + if (!isconcurrent) + return SnapshotAny; + + return RegisterSnapshot(GetTransactionSnapshot()); +} + +/* + * Estimate the DSM space for the shared state struct of a parallel index + * build, including the parallel table scan descriptor that trails it. + * + * am_shared_size is sizeof() the access method's whole shared struct (which + * embeds ParallelIndexBuildShared as its first member). + */ +Size +ParallelIndexBuildEstimateShared(Relation heap, Snapshot snapshot, + Size am_shared_size) +{ + /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ + return add_size(BUFFERALIGN(am_shared_size), + table_parallelscan_estimate(heap, snapshot)); +} + +/* + * Initialize the common shared state for a parallel index build. + * + * The caller has already allocated the embedding shared struct in DSM; this + * fills in the common header and initializes the parallel heap scan + * descriptor that follows the (whole) embedding struct, which the caller + * passes as pscan. AM-specific fields are the caller's responsibility. + */ +void +ParallelIndexBuildInitShared(ParallelIndexBuildShared * shared, + Relation heap, Relation index, + bool isconcurrent, int scantuplesortstates, + ParallelTableScanDesc pscan, Snapshot snapshot) +{ + /* Initialize immutable state */ + shared->heaprelid = RelationGetRelid(heap); + shared->indexrelid = RelationGetRelid(index); + shared->isconcurrent = isconcurrent; + shared->scantuplesortstates = scantuplesortstates; + shared->queryid = pgstat_get_my_query_id(); + ConditionVariableInit(&shared->workersdonecv); + SpinLockInit(&shared->mutex); + + /* Initialize mutable state */ + shared->nparticipantsdone = 0; + shared->reltuples = 0.0; + shared->indtuples = 0.0; + shared->havedead = false; + shared->brokenhotchain = false; + + table_parallelscan_initialize(heap, pscan, snapshot); +} + +/* + * Wait, in the leader, for all participants to finish their portion of the + * scan, then read back the accumulated per-build results. + * + * reltuples and indtuples receive the totals. havedead and brokenhotchain are + * optional (pass NULL when the AM does not care): they report whether any + * worker saw RECENTLY_DEAD tuples or a broken HOT chain. + */ +void +ParallelIndexBuildWaitForWorkers(ParallelIndexBuildShared * shared, + int nparticipants, + double *reltuples, double *indtuples, + bool *havedead, bool *brokenhotchain) +{ + for (;;) + { + SpinLockAcquire(&shared->mutex); + if (shared->nparticipantsdone == nparticipants) + { + *reltuples = shared->reltuples; + *indtuples = shared->indtuples; + if (havedead) + *havedead = shared->havedead; + if (brokenhotchain) + *brokenhotchain = shared->brokenhotchain; + SpinLockRelease(&shared->mutex); + break; + } + SpinLockRelease(&shared->mutex); + + ConditionVariableSleep(&shared->workersdonecv, + WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); + } + + ConditionVariableCancelSleep(); +} + +/* + * Shut down the workers and tear down a parallel index build. + * + * Waits for all workers to finish, accumulates their buffer/WAL usage into the + * leader's stats, releases the MVCC snapshot if one was used, and exits + * parallel mode. instr is the per-worker Instrumentation array previously + * allocated with StoreParallelInstrumentation. + */ +void +ParallelIndexBuildEnd(ParallelContext *pcxt, struct Instrumentation *instr, + Snapshot snapshot) +{ + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(pcxt); + + /* + * Next, accumulate instrumentation. This must wait for the workers to + * finish, or we might get incomplete data. + */ + InstrAccumParallelQuery(instr, pcxt->nworkers_launched); + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); +} + +/* + * Open the heap and index relations in a parallel index build worker. + * + * Selects the lock modes used by the leader (which differ for concurrent + * builds), reports the query ID, and opens both relations. The chosen lock + * modes are returned so they can be passed to ParallelIndexBuildCloseRelations. + */ +void +ParallelIndexBuildOpenRelations(ParallelIndexBuildShared * shared, + Relation *heapRel, Relation *indexRel, + LOCKMODE *heapLockmode, LOCKMODE *indexLockmode) +{ + /* Open relations using lock modes known to be obtained by index.c */ + if (!shared->isconcurrent) + { + *heapLockmode = ShareLock; + *indexLockmode = AccessExclusiveLock; + } + else + { + *heapLockmode = ShareUpdateExclusiveLock; + *indexLockmode = RowExclusiveLock; + } + + /* Track query ID */ + pgstat_report_query_id(shared->queryid, false); + + /* Open relations within worker */ + *heapRel = table_open(shared->heaprelid, *heapLockmode); + *indexRel = index_open(shared->indexrelid, *indexLockmode); +} + +/* + * Close the relations opened by ParallelIndexBuildOpenRelations. + */ +void +ParallelIndexBuildCloseRelations(Relation heapRel, Relation indexRel, + LOCKMODE heapLockmode, LOCKMODE indexLockmode) +{ + index_close(indexRel, indexLockmode); + table_close(heapRel, heapLockmode); +} + +/* + * Report, from a worker, that it has finished its portion of the scan. + * + * Accumulates the worker's tuple counts into the shared totals, ORs in the + * havedead/brokenhotchain flags (AMs that don't track these pass false), and + * signals the leader. Must be paired with ParallelIndexBuildWaitForWorkers in + * the leader. + */ +void +ParallelIndexBuildReportScanDone(ParallelIndexBuildShared * shared, + double reltuples, double indtuples, + bool havedead, bool brokenhotchain) +{ + SpinLockAcquire(&shared->mutex); + shared->nparticipantsdone++; + shared->reltuples += reltuples; + shared->indtuples += indtuples; + if (havedead) + shared->havedead = true; + if (brokenhotchain) + shared->brokenhotchain = true; + SpinLockRelease(&shared->mutex); + + /* Notify leader */ + ConditionVariableSignal(&shared->workersdonecv); +} diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 57e51c574b914..f0386f6bdd373 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -17,6 +17,7 @@ #include "access/gin_private.h" #include "access/gin_tuple.h" #include "access/parallel.h" +#include "access/parallel_index_build.h" #include "access/table.h" #include "access/tableam.h" #include "access/xloginsert.h" @@ -53,45 +54,8 @@ */ typedef struct GinBuildShared { - /* - * These fields are not modified during the build. They primarily exist - * for the benefit of worker processes that need to create state - * corresponding to that used by the leader. - */ - Oid heaprelid; - Oid indexrelid; - bool isconcurrent; - int scantuplesortstates; - - /* - * workersdonecv is used to monitor the progress of workers. All parallel - * participants must indicate that they are done before leader can use - * results built by the workers (and before leader can write the data into - * the index). - */ - ConditionVariable workersdonecv; - - /* - * mutex protects all following fields - * - * These fields contain status information of interest to GIN index builds - * that must work just the same when an index is built in parallel. - */ - slock_t mutex; - - /* - * Mutable state that is maintained by workers, and reported back to - * leader at end of the scans. - * - * nparticipantsdone is number of worker processes finished. - * - * reltuples is the total number of input heap tuples. - * - * indtuples is the total number of tuples that made it into the index. - */ - int nparticipantsdone; - double reltuples; - double indtuples; + /* Common parallel index build state (must be first) */ + ParallelIndexBuildShared base; /* * ParallelTableScanDescData data follows. Can't directly embed here, as @@ -185,7 +149,6 @@ typedef struct static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state); -static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _gin_parallel_heapscan(GinBuildState *state); static double _gin_parallel_merge(GinBuildState *state); static void _gin_leader_participate_as_worker(GinBuildState *buildstate, @@ -953,29 +916,18 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, /* * Enter parallel mode, and create context for parallel build of gin index */ - EnterParallelMode(); - Assert(request > 0); - pcxt = CreateParallelContext("postgres", "_gin_parallel_build_main", - request); + pcxt = ParallelIndexBuildCreateContext("_gin_parallel_build_main", request); scantuplesortstates = leaderparticipates ? request + 1 : request; - /* - * Prepare for scan of the base relation. In a normal index build, we use - * SnapshotAny because we must retrieve all tuples and do our own time - * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. - */ - if (!isconcurrent) - snapshot = SnapshotAny; - else - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + /* Prepare for scan of the base relation. */ + snapshot = ParallelIndexBuildGetSnapshot(isconcurrent); /* * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace. */ - estginshared = _gin_parallel_estimate_shared(heap, snapshot); + estginshared = ParallelIndexBuildEstimateShared(heap, snapshot, + sizeof(GinBuildShared)); shm_toc_estimate_chunk(&pcxt->estimator, estginshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -1003,23 +955,11 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, /* Store shared build state, for which we reserved space */ ginshared = (GinBuildShared *) shm_toc_allocate(pcxt->toc, estginshared); - /* Initialize immutable state */ - ginshared->heaprelid = RelationGetRelid(heap); - ginshared->indexrelid = RelationGetRelid(index); - ginshared->isconcurrent = isconcurrent; - ginshared->scantuplesortstates = scantuplesortstates; - - ConditionVariableInit(&ginshared->workersdonecv); - SpinLockInit(&ginshared->mutex); - - /* Initialize mutable state */ - ginshared->nparticipantsdone = 0; - ginshared->reltuples = 0.0; - ginshared->indtuples = 0.0; - - table_parallelscan_initialize(heap, - ParallelTableScanFromGinBuildShared(ginshared), - snapshot); + /* Initialize common state, and the parallel scan that follows the struct */ + ParallelIndexBuildInitShared(&ginshared->base, heap, index, isconcurrent, + scantuplesortstates, + ParallelTableScanFromGinBuildShared(ginshared), + snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1076,21 +1016,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) { - /* Shutdown worker processes */ - WaitForParallelWorkersToFinish(ginleader->pcxt); - - /* - * Next, accumulate WAL usage. (This must wait for the workers to finish, - * or we might get incomplete data.) - */ - InstrAccumParallelQuery(ginleader->instr, - ginleader->pcxt->nworkers_launched); - - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(ginleader->snapshot)) - UnregisterSnapshot(ginleader->snapshot); - DestroyParallelContext(ginleader->pcxt); - ExitParallelMode(); + ParallelIndexBuildEnd(ginleader->pcxt, ginleader->instr, + ginleader->snapshot); } /* @@ -1106,28 +1033,12 @@ static double _gin_parallel_heapscan(GinBuildState *state) { GinBuildShared *ginshared = state->bs_leader->ginshared; - int nparticipanttuplesorts; - - nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts; - for (;;) - { - SpinLockAcquire(&ginshared->mutex); - if (ginshared->nparticipantsdone == nparticipanttuplesorts) - { - /* copy the data into leader state */ - state->bs_reltuples = ginshared->reltuples; - state->bs_numtuples = ginshared->indtuples; - SpinLockRelease(&ginshared->mutex); - break; - } - SpinLockRelease(&ginshared->mutex); - - ConditionVariableSleep(&ginshared->workersdonecv, - WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); - } - - ConditionVariableCancelSleep(); + /* Wait for all participants to finish, and collect their results */ + ParallelIndexBuildWaitForWorkers(&ginshared->base, + state->bs_leader->nparticipanttuplesorts, + &state->bs_reltuples, &state->bs_numtuples, + NULL, NULL); return state->bs_reltuples; } @@ -1761,17 +1672,6 @@ _gin_parallel_merge(GinBuildState *state) return reltuples; } -/* - * Returns size of shared memory required to store state for a parallel - * gin index build based on the snapshot its parallel scan will use. - */ -static Size -_gin_parallel_estimate_shared(Relation heap, Snapshot snapshot) -{ - /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ - return add_size(BUFFERALIGN(sizeof(GinBuildShared)), - table_parallelscan_estimate(heap, snapshot)); -} /* * Within leader, participate as a parallel worker. @@ -2008,7 +1908,7 @@ _gin_parallel_scan_and_build(GinBuildState *state, state->work_mem = (sortmem / 2); /* remember how many workers participate in the build */ - state->bs_num_workers = ginshared->scantuplesortstates; + state->bs_num_workers = ginshared->base.scantuplesortstates; /* Begin "partial" tuplesort */ state->bs_sortstate = tuplesort_begin_index_gin(heap, index, @@ -2024,7 +1924,7 @@ _gin_parallel_scan_and_build(GinBuildState *state, /* Join parallel scan */ indexInfo = BuildIndexInfo(index); - indexInfo->ii_Concurrent = ginshared->isconcurrent; + indexInfo->ii_Concurrent = ginshared->base.isconcurrent; scan = table_beginscan_parallel(heap, ParallelTableScanFromGinBuildShared(ginshared), @@ -2048,17 +1948,9 @@ _gin_parallel_scan_and_build(GinBuildState *state, state->bs_reltuples += reltuples; - /* - * Done. Record ambuild statistics. - */ - SpinLockAcquire(&ginshared->mutex); - ginshared->nparticipantsdone++; - ginshared->reltuples += state->bs_reltuples; - ginshared->indtuples += state->bs_numtuples; - SpinLockRelease(&ginshared->mutex); - - /* Notify leader */ - ConditionVariableSignal(&ginshared->workersdonecv); + /* Done. Record ambuild statistics, and notify leader. */ + ParallelIndexBuildReportScanDone(&ginshared->base, state->bs_reltuples, + state->bs_numtuples, false, false); tuplesort_end(state->bs_sortstate); } @@ -2092,21 +1984,9 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Look up gin shared state */ ginshared = shm_toc_lookup(toc, PARALLEL_KEY_GIN_SHARED, false); - /* Open relations using lock modes known to be obtained by index.c */ - if (!ginshared->isconcurrent) - { - heapLockmode = ShareLock; - indexLockmode = AccessExclusiveLock; - } - else - { - heapLockmode = ShareUpdateExclusiveLock; - indexLockmode = RowExclusiveLock; - } - - /* Open relations within worker */ - heapRel = table_open(ginshared->heaprelid, heapLockmode); - indexRel = index_open(ginshared->indexrelid, indexLockmode); + /* Open relations within worker, using the leader's lock modes */ + ParallelIndexBuildOpenRelations(&ginshared->base, &heapRel, &indexRel, + &heapLockmode, &indexLockmode); /* initialize the GIN build state */ initGinState(&buildstate.ginstate, indexRel); @@ -2146,7 +2026,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) * (when requested number of workers were not launched, this will be * somewhat higher than it is for other workers). */ - sortmem = maintenance_work_mem / ginshared->scantuplesortstates; + sortmem = maintenance_work_mem / ginshared->base.scantuplesortstates; _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort, heapRel, indexRel, sortmem, false); @@ -2155,8 +2035,8 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); - index_close(indexRel, indexLockmode); - table_close(heapRel, heapLockmode); + ParallelIndexBuildCloseRelations(heapRel, indexRel, heapLockmode, + indexLockmode); } /* diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index f43bb939081c3..c953805576ce3 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -42,6 +42,7 @@ #include "access/nbtree.h" #include "access/parallel.h" +#include "access/parallel_index_build.h" #include "access/relscan.h" #include "access/table.h" #include "access/tableam.h" @@ -95,58 +96,12 @@ typedef struct BTSpool */ typedef struct BTShared { - /* - * These fields are not modified during the sort. They primarily exist - * for the benefit of worker processes that need to create BTSpool state - * corresponding to that used by the leader. - */ - Oid heaprelid; - Oid indexrelid; + /* Common parallel index build state (must be first) */ + ParallelIndexBuildShared base; + + /* nbtree-specific immutable state, not modified during the sort */ bool isunique; bool nulls_not_distinct; - bool isconcurrent; - int scantuplesortstates; - - /* Query ID, for report in worker processes */ - int64 queryid; - - /* - * workersdonecv is used to monitor the progress of workers. All parallel - * participants must indicate that they are done before leader can use - * mutable state that workers maintain during scan (and before leader can - * proceed to tuplesort_performsort()). - */ - ConditionVariable workersdonecv; - - /* - * mutex protects all fields before heapdesc. - * - * These fields contain status information of interest to B-Tree index - * builds that must work just the same when an index is built in parallel. - */ - slock_t mutex; - - /* - * Mutable state that is maintained by workers, and reported back to - * leader at end of parallel scan. - * - * nparticipantsdone is number of worker processes finished. - * - * reltuples is the total number of input heap tuples. - * - * havedead indicates if RECENTLY_DEAD tuples were encountered during - * build. - * - * indtuples is the total number of tuples that made it into the index. - * - * brokenhotchain indicates if any worker detected a broken HOT chain - * during build. - */ - int nparticipantsdone; - double reltuples; - bool havedead; - double indtuples; - bool brokenhotchain; /* * ParallelTableScanDescData data follows. Can't directly embed here, as @@ -280,7 +235,6 @@ static void _bt_load(BTWriteState *wstate, static void _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request); static void _bt_end_parallel(BTLeader *btleader); -static Size _bt_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _bt_parallel_heapscan(BTBuildState *buildstate, bool *brokenhotchain); static void _bt_leader_participate_as_worker(BTBuildState *buildstate); @@ -1417,30 +1371,19 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) * Enter parallel mode, and create context for parallel build of btree * index */ - EnterParallelMode(); - Assert(request > 0); - pcxt = CreateParallelContext("postgres", "_bt_parallel_build_main", - request); + pcxt = ParallelIndexBuildCreateContext("_bt_parallel_build_main", request); scantuplesortstates = leaderparticipates ? request + 1 : request; - /* - * Prepare for scan of the base relation. In a normal index build, we use - * SnapshotAny because we must retrieve all tuples and do our own time - * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. - */ - if (!isconcurrent) - snapshot = SnapshotAny; - else - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + /* Prepare for scan of the base relation. */ + snapshot = ParallelIndexBuildGetSnapshot(isconcurrent); /* * Estimate size for our own PARALLEL_KEY_BTREE_SHARED workspace, and * PARALLEL_KEY_TUPLESORT tuplesort workspace */ - estbtshared = _bt_parallel_estimate_shared(btspool->heap, snapshot); + estbtshared = ParallelIndexBuildEstimateShared(btspool->heap, snapshot, + sizeof(BTShared)); shm_toc_estimate_chunk(&pcxt->estimator, estbtshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -1478,25 +1421,14 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) /* Store shared build state, for which we reserved space */ btshared = (BTShared *) shm_toc_allocate(pcxt->toc, estbtshared); - /* Initialize immutable state */ - btshared->heaprelid = RelationGetRelid(btspool->heap); - btshared->indexrelid = RelationGetRelid(btspool->index); + /* Initialize nbtree-specific immutable state */ btshared->isunique = btspool->isunique; btshared->nulls_not_distinct = btspool->nulls_not_distinct; - btshared->isconcurrent = isconcurrent; - btshared->scantuplesortstates = scantuplesortstates; - btshared->queryid = pgstat_get_my_query_id(); - ConditionVariableInit(&btshared->workersdonecv); - SpinLockInit(&btshared->mutex); - /* Initialize mutable state */ - btshared->nparticipantsdone = 0; - btshared->reltuples = 0.0; - btshared->havedead = false; - btshared->indtuples = 0.0; - btshared->brokenhotchain = false; - table_parallelscan_initialize(btspool->heap, - ParallelTableScanFromBTShared(btshared), - snapshot); + /* Initialize common state, and the parallel scan that follows the struct */ + ParallelIndexBuildInitShared(&btshared->base, btspool->heap, btspool->index, + isconcurrent, scantuplesortstates, + ParallelTableScanFromBTShared(btshared), + snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1571,33 +1503,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) static void _bt_end_parallel(BTLeader *btleader) { - /* Shutdown worker processes */ - WaitForParallelWorkersToFinish(btleader->pcxt); - - /* - * Next, accumulate WAL usage. (This must wait for the workers to finish, - * or we might get incomplete data.) - */ - InstrAccumParallelQuery(btleader->instr, - btleader->pcxt->nworkers_launched); - - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(btleader->snapshot)) - UnregisterSnapshot(btleader->snapshot); - DestroyParallelContext(btleader->pcxt); - ExitParallelMode(); -} - -/* - * Returns size of shared memory required to store state for a parallel - * btree index build based on the snapshot its parallel scan will use. - */ -static Size -_bt_parallel_estimate_shared(Relation heap, Snapshot snapshot) -{ - /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ - return add_size(BUFFERALIGN(sizeof(BTShared)), - table_parallelscan_estimate(heap, snapshot)); + ParallelIndexBuildEnd(btleader->pcxt, btleader->instr, btleader->snapshot); } /* @@ -1616,29 +1522,13 @@ static double _bt_parallel_heapscan(BTBuildState *buildstate, bool *brokenhotchain) { BTShared *btshared = buildstate->btleader->btshared; - int nparticipanttuplesorts; double reltuples; - nparticipanttuplesorts = buildstate->btleader->nparticipanttuplesorts; - for (;;) - { - SpinLockAcquire(&btshared->mutex); - if (btshared->nparticipantsdone == nparticipanttuplesorts) - { - buildstate->havedead = btshared->havedead; - buildstate->indtuples = btshared->indtuples; - *brokenhotchain = btshared->brokenhotchain; - reltuples = btshared->reltuples; - SpinLockRelease(&btshared->mutex); - break; - } - SpinLockRelease(&btshared->mutex); - - ConditionVariableSleep(&btshared->workersdonecv, - WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); - } - - ConditionVariableCancelSleep(); + /* Wait for all participants to finish, and collect their results */ + ParallelIndexBuildWaitForWorkers(&btshared->base, + buildstate->btleader->nparticipanttuplesorts, + &reltuples, &buildstate->indtuples, + &buildstate->havedead, brokenhotchain); return reltuples; } @@ -1732,24 +1622,9 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Look up nbtree shared state */ btshared = shm_toc_lookup(toc, PARALLEL_KEY_BTREE_SHARED, false); - /* Open relations using lock modes known to be obtained by index.c */ - if (!btshared->isconcurrent) - { - heapLockmode = ShareLock; - indexLockmode = AccessExclusiveLock; - } - else - { - heapLockmode = ShareUpdateExclusiveLock; - indexLockmode = RowExclusiveLock; - } - - /* Track query ID */ - pgstat_report_query_id(btshared->queryid, false); - - /* Open relations within worker */ - heapRel = table_open(btshared->heaprelid, heapLockmode); - indexRel = index_open(btshared->indexrelid, indexLockmode); + /* Open relations within worker, using the leader's lock modes */ + ParallelIndexBuildOpenRelations(&btshared->base, &heapRel, &indexRel, + &heapLockmode, &indexLockmode); /* Initialize worker's own spool */ btspool = palloc0_object(BTSpool); @@ -1784,7 +1659,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) InstrStartParallelQuery(); /* Perform sorting of spool, and possibly a spool2 */ - sortmem = maintenance_work_mem / btshared->scantuplesortstates; + sortmem = maintenance_work_mem / btshared->base.scantuplesortstates; _bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort, sharedsort2, sortmem, false); @@ -1800,8 +1675,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) } #endif /* BTREE_BUILD_STATS */ - index_close(indexRel, indexLockmode); - table_close(heapRel, heapLockmode); + ParallelIndexBuildCloseRelations(heapRel, indexRel, heapLockmode, + indexLockmode); } /* @@ -1877,7 +1752,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, /* Join parallel scan */ indexInfo = BuildIndexInfo(btspool->index); - indexInfo->ii_Concurrent = btshared->isconcurrent; + indexInfo->ii_Concurrent = btshared->base.isconcurrent; scan = table_beginscan_parallel(btspool->heap, ParallelTableScanFromBTShared(btshared), SO_NONE); @@ -1899,21 +1774,12 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, } /* - * Done. Record ambuild statistics, and whether we encountered a broken - * HOT chain. + * Done. Record ambuild statistics (including whether we encountered a + * broken HOT chain), and notify the leader. */ - SpinLockAcquire(&btshared->mutex); - btshared->nparticipantsdone++; - btshared->reltuples += reltuples; - if (buildstate.havedead) - btshared->havedead = true; - btshared->indtuples += buildstate.indtuples; - if (indexInfo->ii_BrokenHotChain) - btshared->brokenhotchain = true; - SpinLockRelease(&btshared->mutex); - - /* Notify leader */ - ConditionVariableSignal(&btshared->workersdonecv); + ParallelIndexBuildReportScanDone(&btshared->base, reltuples, + buildstate.indtuples, buildstate.havedead, + indexInfo->ii_BrokenHotChain); /* We can end tuplesorts immediately */ tuplesort_end(btspool->sortstate); diff --git a/src/include/access/parallel_index_build.h b/src/include/access/parallel_index_build.h new file mode 100644 index 0000000000000..f6d311218a4c3 --- /dev/null +++ b/src/include/access/parallel_index_build.h @@ -0,0 +1,119 @@ +/*------------------------------------------------------------------------- + * + * parallel_index_build.h + * Shared infrastructure for parallel index builds. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/parallel_index_build.h + * + *------------------------------------------------------------------------- + */ +#ifndef PARALLEL_INDEX_BUILD_H +#define PARALLEL_INDEX_BUILD_H + +#include "access/parallel.h" +#include "access/relscan.h" +#include "storage/condition_variable.h" +#include "storage/lockdefs.h" +#include "storage/spin.h" +#include "utils/relcache.h" +#include "utils/snapshot.h" + +struct Instrumentation; + +/* + * Shared state common to all parallel index builds. + * + * Access methods embed this as the first member of their own shared state + * and append any AM-specific fields, followed by the ParallelTableScanDescData + * for the heap scan. The immutable fields are set once by the leader before + * workers are launched; the mutable fields are maintained by the workers + * under mutex and read back by the leader once all workers have signalled + * completion. + */ +typedef struct ParallelIndexBuildShared +{ + /* Immutable state, set by the leader before launching workers */ + Oid heaprelid; + Oid indexrelid; + bool isconcurrent; + int scantuplesortstates; + + /* Query ID, for report in worker processes */ + int64 queryid; + + /* + * workersdonecv is used to monitor the progress of workers. All parallel + * participants must indicate that they are done before the leader can use + * the results built by the workers. + */ + ConditionVariable workersdonecv; + + /* + * mutex protects the mutable fields below. + */ + slock_t mutex; + + /* + * Mutable state that is maintained by workers for all index types, and + * reported back to leader at end of the scans. + * + * nparticipantsdone is the number of worker processes finished. + * + * reltuples is the total number of input heap tuples. + * + * indtuples is the total number of tuples that made it into the index. + */ + int nparticipantsdone; + double reltuples; + double indtuples; + + /* + * Mutable state that is maintained for exact index AMs (e.g. nbtree), and + * unused for lossy AMs. + * + * havedead indicates if RECENTLY_DEAD tuples were encountered during + * build. + * + * brokenhotchain indicates if any worker detected a broken HOT chain + * during build. + */ + bool havedead; + bool brokenhotchain; +} ParallelIndexBuildShared; + +/* Leader-side helpers */ +extern ParallelContext *ParallelIndexBuildCreateContext(const char *worker_function, + int nworkers); +extern Snapshot ParallelIndexBuildGetSnapshot(bool isconcurrent); +extern Size ParallelIndexBuildEstimateShared(Relation heap, Snapshot snapshot, + Size am_shared_size); +extern void ParallelIndexBuildInitShared(ParallelIndexBuildShared * shared, + Relation heap, Relation index, + bool isconcurrent, + int scantuplesortstates, + ParallelTableScanDesc pscan, + Snapshot snapshot); +extern void ParallelIndexBuildWaitForWorkers(ParallelIndexBuildShared * shared, + int nparticipants, + double *reltuples, double *indtuples, + bool *havedead, bool *brokenhotchain); +extern void ParallelIndexBuildEnd(ParallelContext *pcxt, + struct Instrumentation *instr, + Snapshot snapshot); + +/* Worker-side helpers */ +extern void ParallelIndexBuildReportScanDone(ParallelIndexBuildShared * shared, + double reltuples, double indtuples, + bool havedead, bool brokenhotchain); +extern void ParallelIndexBuildOpenRelations(ParallelIndexBuildShared * shared, + Relation *heapRel, Relation *indexRel, + LOCKMODE *heapLockmode, + LOCKMODE *indexLockmode); +extern void ParallelIndexBuildCloseRelations(Relation heapRel, Relation indexRel, + LOCKMODE heapLockmode, + LOCKMODE indexLockmode); + +#endif /* PARALLEL_INDEX_BUILD_H */