diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index bdb30752e098c..dc651383d63c0 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -19,6 +19,7 @@ #include "access/brin_page.h" #include "access/brin_pageops.h" #include "access/brin_xlog.h" +#include "access/parallel_index_build.h" #include "access/relation.h" #include "access/reloptions.h" #include "access/relscan.h" @@ -51,8 +52,7 @@ #define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001) #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xB000000000000004) /* * Status for index builds performed in parallel. This is allocated in a @@ -60,49 +60,13 @@ */ typedef struct BrinShared { - /* - * These fields are not modified during the build. They primarily exist - * for the benefit of worker processes that need to create state - * corresponding to that used by the leader. - */ - Oid heaprelid; - Oid indexrelid; - bool isconcurrent; - BlockNumber pagesPerRange; - int scantuplesortstates; - - /* Query ID, for report in worker processes */ - int64 queryid; - - /* - * workersdonecv is used to monitor the progress of workers. All parallel - * participants must indicate that they are done before leader can use - * results built by the workers (and before leader can write the data into - * the index). - */ - ConditionVariable workersdonecv; + /* Common parallel index build state (must be first) */ + ParallelIndexBuildShared base; /* - * mutex protects all fields before heapdesc. - * - * These fields contain status information of interest to BRIN index - * builds that must work just the same when an index is built in parallel. - */ - slock_t mutex; - - /* - * Mutable state that is maintained by workers, and reported back to - * leader at end of the scans. - * - * nparticipantsdone is number of worker processes finished. - * - * reltuples is the total number of input heap tuples. - * - * indtuples is the total number of tuples that made it into the index. + * BRIN-specific immutable state, not modified during the build. */ - int nparticipantsdone; - double reltuples; - double indtuples; + BlockNumber pagesPerRange; /* * ParallelTableScanDescData data follows. Can't directly embed here, as @@ -148,8 +112,7 @@ typedef struct BrinLeader BrinShared *brinshared; Sharedsort *sharedsort; Snapshot snapshot; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; } BrinLeader; /* @@ -235,7 +198,6 @@ static void brin_fill_empty_ranges(BrinBuildState *state, static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state); -static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _brin_parallel_heapscan(BrinBuildState *state); static double _brin_parallel_merge(BrinBuildState *state); static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, @@ -2387,10 +2349,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, BrinShared *brinshared; Sharedsort *sharedsort; BrinLeader *brinleader = palloc0_object(BrinLeader); - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; bool leaderparticipates = true; - int querylen; #ifdef DISABLE_LEADER_PARTICIPATION leaderparticipates = false; @@ -2400,59 +2360,29 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Enter parallel mode, and create context for parallel build of brin * index */ - EnterParallelMode(); - Assert(request > 0); - pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main", - request); + pcxt = ParallelIndexBuildCreateContext("_brin_parallel_build_main", request); scantuplesortstates = leaderparticipates ? request + 1 : request; - /* - * Prepare for scan of the base relation. In a normal index build, we use - * SnapshotAny because we must retrieve all tuples and do our own time - * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. - */ - if (!isconcurrent) - snapshot = SnapshotAny; - else - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + /* Prepare for scan of the base relation. */ + snapshot = ParallelIndexBuildGetSnapshot(isconcurrent); /* * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. */ - estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + estbrinshared = ParallelIndexBuildEstimateShared(heap, snapshot, + sizeof(BrinShared)); shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); shm_toc_estimate_keys(&pcxt->estimator, 2); - /* - * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE - * and PARALLEL_KEY_BUFFER_USAGE. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgWalUsage or - * pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); /* Everyone's had a chance to ask for space, so now create the DSM */ InitializeParallelDSM(pcxt); @@ -2469,24 +2399,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, /* Store shared build state, for which we reserved space */ brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared); - /* Initialize immutable state */ - brinshared->heaprelid = RelationGetRelid(heap); - brinshared->indexrelid = RelationGetRelid(index); - brinshared->isconcurrent = isconcurrent; - brinshared->scantuplesortstates = scantuplesortstates; + /* Initialize BRIN-specific immutable state */ brinshared->pagesPerRange = buildstate->bs_pagesPerRange; - brinshared->queryid = pgstat_get_my_query_id(); - ConditionVariableInit(&brinshared->workersdonecv); - SpinLockInit(&brinshared->mutex); - - /* Initialize mutable state */ - brinshared->nparticipantsdone = 0; - brinshared->reltuples = 0.0; - brinshared->indtuples = 0.0; - - table_parallelscan_initialize(heap, - ParallelTableScanFromBrinShared(brinshared), - snapshot); + /* Initialize common state, and the parallel scan that follows the struct */ + ParallelIndexBuildInitShared(&brinshared->base, heap, index, isconcurrent, + scantuplesortstates, + ParallelTableScanFromBrinShared(brinshared), + snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -2504,25 +2423,10 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; + StoreParallelQueryText(pcxt, PARALLEL_KEY_QUERY_TEXT); - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); - } - - /* - * Allocate space for each worker's WalUsage and BufferUsage; no need to - * initialize. - */ - walusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); - bufferusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + /* Allocate space for each worker's Instrumentation. */ + instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -2533,8 +2437,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, brinleader->brinshared = brinshared; brinleader->sharedsort = sharedsort; brinleader->snapshot = snapshot; - brinleader->walusage = walusage; - brinleader->bufferusage = bufferusage; + brinleader->instr = instr; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -2563,23 +2466,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) { - int i; - - /* Shutdown worker processes */ - WaitForParallelWorkersToFinish(brinleader->pcxt); - - /* - * Next, accumulate WAL usage. (This must wait for the workers to finish, - * or we might get incomplete data.) - */ - for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); - - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(brinleader->snapshot)) - UnregisterSnapshot(brinleader->snapshot); - DestroyParallelContext(brinleader->pcxt); - ExitParallelMode(); + ParallelIndexBuildEnd(brinleader->pcxt, brinleader->instr, + brinleader->snapshot); } /* @@ -2595,28 +2483,12 @@ static double _brin_parallel_heapscan(BrinBuildState *state) { BrinShared *brinshared = state->bs_leader->brinshared; - int nparticipanttuplesorts; - - nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts; - for (;;) - { - SpinLockAcquire(&brinshared->mutex); - if (brinshared->nparticipantsdone == nparticipanttuplesorts) - { - /* copy the data into leader state */ - state->bs_reltuples = brinshared->reltuples; - state->bs_numtuples = brinshared->indtuples; - - SpinLockRelease(&brinshared->mutex); - break; - } - SpinLockRelease(&brinshared->mutex); - - ConditionVariableSleep(&brinshared->workersdonecv, - WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); - } - ConditionVariableCancelSleep(); + /* Wait for all participants to finish, and collect their results */ + ParallelIndexBuildWaitForWorkers(&brinshared->base, + state->bs_leader->nparticipanttuplesorts, + &state->bs_reltuples, &state->bs_numtuples, + NULL, NULL); return state->bs_reltuples; } @@ -2775,18 +2647,6 @@ _brin_parallel_merge(BrinBuildState *state) return reltuples; } -/* - * Returns size of shared memory required to store state for a parallel - * brin index build based on the snapshot its parallel scan will use. - */ -static Size -_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) -{ - /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ - return add_size(BUFFERALIGN(sizeof(BrinShared)), - table_parallelscan_estimate(heap, snapshot)); -} - /* * Within leader, participate as a parallel worker. */ @@ -2841,7 +2701,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, /* Join parallel scan */ indexInfo = BuildIndexInfo(index); - indexInfo->ii_Concurrent = brinshared->isconcurrent; + indexInfo->ii_Concurrent = brinshared->base.isconcurrent; scan = table_beginscan_parallel(heap, ParallelTableScanFromBrinShared(brinshared), @@ -2858,17 +2718,9 @@ _brin_parallel_scan_and_build(BrinBuildState *state, state->bs_reltuples += reltuples; - /* - * Done. Record ambuild statistics. - */ - SpinLockAcquire(&brinshared->mutex); - brinshared->nparticipantsdone++; - brinshared->reltuples += state->bs_reltuples; - brinshared->indtuples += state->bs_numtuples; - SpinLockRelease(&brinshared->mutex); - - /* Notify leader */ - ConditionVariableSignal(&brinshared->workersdonecv); + /* Done. Record ambuild statistics, and notify leader. */ + ParallelIndexBuildReportScanDone(&brinshared->base, state->bs_reltuples, + state->bs_numtuples, false, false); tuplesort_end(state->bs_sortstate); } @@ -2879,7 +2731,6 @@ _brin_parallel_scan_and_build(BrinBuildState *state, void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) { - char *sharedquery; BrinShared *brinshared; Sharedsort *sharedsort; BrinBuildState *buildstate; @@ -2887,8 +2738,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *worker_instr; int sortmem; /* @@ -2898,34 +2748,15 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Assert((MyProc->statusFlags == 0) || (MyProc->statusFlags == PROC_IN_SAFE_IC)); - /* Set debug_query_string for individual workers first */ - sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - - /* Report the query string from leader */ - pgstat_report_activity(STATE_RUNNING, debug_query_string); + /* Set debug_query_string and report the query string from leader */ + RestoreParallelQueryText(toc, PARALLEL_KEY_QUERY_TEXT); /* Look up brin shared state */ brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false); - /* Open relations using lock modes known to be obtained by index.c */ - if (!brinshared->isconcurrent) - { - heapLockmode = ShareLock; - indexLockmode = AccessExclusiveLock; - } - else - { - heapLockmode = ShareUpdateExclusiveLock; - indexLockmode = RowExclusiveLock; - } - - /* Track query ID */ - pgstat_report_query_id(brinshared->queryid, false); - - /* Open relations within worker */ - heapRel = table_open(brinshared->heaprelid, heapLockmode); - indexRel = index_open(brinshared->indexrelid, indexLockmode); + /* Open relations within worker, using the leader's lock modes */ + ParallelIndexBuildOpenRelations(&brinshared->base, &heapRel, &indexRel, + &heapLockmode, &indexLockmode); buildstate = initialize_brin_buildstate(indexRel, NULL, brinshared->pagesPerRange, @@ -2943,19 +2774,17 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) * (when requested number of workers were not launched, this will be * somewhat higher than it is for other workers). */ - sortmem = maintenance_work_mem / brinshared->scantuplesortstates; + sortmem = maintenance_work_mem / brinshared->base.scantuplesortstates; _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort, heapRel, indexRel, sortmem, false); /* Report WAL/buffer usage during parallel execution */ - bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); - index_close(indexRel, indexLockmode); - table_close(heapRel, heapLockmode); + ParallelIndexBuildCloseRelations(heapRel, indexRel, heapLockmode, + indexLockmode); } /* diff --git a/src/backend/access/common/Makefile b/src/backend/access/common/Makefile index e78de312659ed..0d4769b96af57 100644 --- a/src/backend/access/common/Makefile +++ b/src/backend/access/common/Makefile @@ -18,6 +18,7 @@ OBJS = \ detoast.o \ heaptuple.o \ indextuple.o \ + parallel_index_build.o \ printsimple.o \ printtup.o \ relation.o \ diff --git a/src/backend/access/common/meson.build b/src/backend/access/common/meson.build index 35e89b5ea67d5..7d6247b97542d 100644 --- a/src/backend/access/common/meson.build +++ b/src/backend/access/common/meson.build @@ -6,6 +6,7 @@ backend_sources += files( 'detoast.c', 'heaptuple.c', 'indextuple.c', + 'parallel_index_build.c', 'printsimple.c', 'printtup.c', 'relation.c', diff --git a/src/backend/access/common/parallel_index_build.c b/src/backend/access/common/parallel_index_build.c new file mode 100644 index 0000000000000..adce8c547e539 --- /dev/null +++ b/src/backend/access/common/parallel_index_build.c @@ -0,0 +1,246 @@ +/*------------------------------------------------------------------------- + * + * parallel_index_build.c + * Shared infrastructure for parallel index builds. + * + * This file contains the access-method-independent parts of the parallel + * index build lifecycle shared by different index access methods: setting up the + * parallel context and shared state, and opening/closing relations and tearing + * everything down again in the worker and leader. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/common/parallel_index_build.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/parallel_index_build.h" +#include "access/genam.h" +#include "access/table.h" +#include "access/tableam.h" +#include "access/xact.h" +#include "executor/instrument.h" +#include "pgstat.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/wait_event.h" + +/* + * Enter parallel mode and create a parallel context for an index build, using + * the named worker entry point. + */ +ParallelContext * +ParallelIndexBuildCreateContext(const char *worker_function, int nworkers) +{ + Assert(nworkers > 0); + + EnterParallelMode(); + + return CreateParallelContext("postgres", worker_function, nworkers); +} + +/* + * Choose the snapshot for the heap scan of an index build. + * + * In a normal index build, we use SnapshotAny because we must retrieve all + * tuples and do our own time qual checks (because we have to index + * RECENTLY_DEAD tuples). In a concurrent build, we take a regular MVCC + * snapshot and index whatever's live according to that. The caller is + * responsible for unregistering an MVCC snapshot by calling ParallelIndexBuildEnd. + */ +Snapshot +ParallelIndexBuildGetSnapshot(bool isconcurrent) +{ + if (!isconcurrent) + return SnapshotAny; + + return RegisterSnapshot(GetTransactionSnapshot()); +} + +/* + * Estimate the DSM space for the shared state struct of a parallel index + * build, including the parallel table scan descriptor that trails it. + * + * am_shared_size is sizeof() the access method's whole shared struct (which + * embeds ParallelIndexBuildShared as its first member). + */ +Size +ParallelIndexBuildEstimateShared(Relation heap, Snapshot snapshot, + Size am_shared_size) +{ + /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ + return add_size(BUFFERALIGN(am_shared_size), + table_parallelscan_estimate(heap, snapshot)); +} + +/* + * Initialize the common shared state for a parallel index build. + * + * The caller has already allocated the embedding shared struct in DSM; this + * fills in the common header and initializes the parallel heap scan + * descriptor that follows the (whole) embedding struct, which the caller + * passes as pscan. AM-specific fields are the caller's responsibility. + */ +void +ParallelIndexBuildInitShared(ParallelIndexBuildShared * shared, + Relation heap, Relation index, + bool isconcurrent, int scantuplesortstates, + ParallelTableScanDesc pscan, Snapshot snapshot) +{ + /* Initialize immutable state */ + shared->heaprelid = RelationGetRelid(heap); + shared->indexrelid = RelationGetRelid(index); + shared->isconcurrent = isconcurrent; + shared->scantuplesortstates = scantuplesortstates; + shared->queryid = pgstat_get_my_query_id(); + ConditionVariableInit(&shared->workersdonecv); + SpinLockInit(&shared->mutex); + + /* Initialize mutable state */ + shared->nparticipantsdone = 0; + shared->reltuples = 0.0; + shared->indtuples = 0.0; + shared->havedead = false; + shared->brokenhotchain = false; + + table_parallelscan_initialize(heap, pscan, snapshot); +} + +/* + * Wait, in the leader, for all participants to finish their portion of the + * scan, then read back the accumulated per-build results. + * + * reltuples and indtuples receive the totals. havedead and brokenhotchain are + * optional (pass NULL when the AM does not care): they report whether any + * worker saw RECENTLY_DEAD tuples or a broken HOT chain. + */ +void +ParallelIndexBuildWaitForWorkers(ParallelIndexBuildShared * shared, + int nparticipants, + double *reltuples, double *indtuples, + bool *havedead, bool *brokenhotchain) +{ + for (;;) + { + SpinLockAcquire(&shared->mutex); + if (shared->nparticipantsdone == nparticipants) + { + *reltuples = shared->reltuples; + *indtuples = shared->indtuples; + if (havedead) + *havedead = shared->havedead; + if (brokenhotchain) + *brokenhotchain = shared->brokenhotchain; + SpinLockRelease(&shared->mutex); + break; + } + SpinLockRelease(&shared->mutex); + + ConditionVariableSleep(&shared->workersdonecv, + WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); + } + + ConditionVariableCancelSleep(); +} + +/* + * Shut down the workers and tear down a parallel index build. + * + * Waits for all workers to finish, accumulates their buffer/WAL usage into the + * leader's stats, releases the MVCC snapshot if one was used, and exits + * parallel mode. instr is the per-worker Instrumentation array previously + * allocated with StoreParallelInstrumentation. + */ +void +ParallelIndexBuildEnd(ParallelContext *pcxt, struct Instrumentation *instr, + Snapshot snapshot) +{ + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(pcxt); + + /* + * Next, accumulate instrumentation. This must wait for the workers to + * finish, or we might get incomplete data. + */ + InstrAccumParallelQuery(instr, pcxt->nworkers_launched); + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); +} + +/* + * Open the heap and index relations in a parallel index build worker. + * + * Selects the lock modes used by the leader (which differ for concurrent + * builds), reports the query ID, and opens both relations. The chosen lock + * modes are returned so they can be passed to ParallelIndexBuildCloseRelations. + */ +void +ParallelIndexBuildOpenRelations(ParallelIndexBuildShared * shared, + Relation *heapRel, Relation *indexRel, + LOCKMODE *heapLockmode, LOCKMODE *indexLockmode) +{ + /* Open relations using lock modes known to be obtained by index.c */ + if (!shared->isconcurrent) + { + *heapLockmode = ShareLock; + *indexLockmode = AccessExclusiveLock; + } + else + { + *heapLockmode = ShareUpdateExclusiveLock; + *indexLockmode = RowExclusiveLock; + } + + /* Track query ID */ + pgstat_report_query_id(shared->queryid, false); + + /* Open relations within worker */ + *heapRel = table_open(shared->heaprelid, *heapLockmode); + *indexRel = index_open(shared->indexrelid, *indexLockmode); +} + +/* + * Close the relations opened by ParallelIndexBuildOpenRelations. + */ +void +ParallelIndexBuildCloseRelations(Relation heapRel, Relation indexRel, + LOCKMODE heapLockmode, LOCKMODE indexLockmode) +{ + index_close(indexRel, indexLockmode); + table_close(heapRel, heapLockmode); +} + +/* + * Report, from a worker, that it has finished its portion of the scan. + * + * Accumulates the worker's tuple counts into the shared totals, ORs in the + * havedead/brokenhotchain flags (AMs that don't track these pass false), and + * signals the leader. Must be paired with ParallelIndexBuildWaitForWorkers in + * the leader. + */ +void +ParallelIndexBuildReportScanDone(ParallelIndexBuildShared * shared, + double reltuples, double indtuples, + bool havedead, bool brokenhotchain) +{ + SpinLockAcquire(&shared->mutex); + shared->nparticipantsdone++; + shared->reltuples += reltuples; + shared->indtuples += indtuples; + if (havedead) + shared->havedead = true; + if (brokenhotchain) + shared->brokenhotchain = true; + SpinLockRelease(&shared->mutex); + + /* Notify leader */ + ConditionVariableSignal(&shared->workersdonecv); +} diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index cb9ed3b563c6f..f0386f6bdd373 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -17,6 +17,7 @@ #include "access/gin_private.h" #include "access/gin_tuple.h" #include "access/parallel.h" +#include "access/parallel_index_build.h" #include "access/table.h" #include "access/tableam.h" #include "access/xloginsert.h" @@ -45,8 +46,7 @@ #define PARALLEL_KEY_GIN_SHARED UINT64CONST(0xB000000000000001) #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xB000000000000004) /* * Status for index builds performed in parallel. This is allocated in a @@ -54,45 +54,8 @@ */ typedef struct GinBuildShared { - /* - * These fields are not modified during the build. They primarily exist - * for the benefit of worker processes that need to create state - * corresponding to that used by the leader. - */ - Oid heaprelid; - Oid indexrelid; - bool isconcurrent; - int scantuplesortstates; - - /* - * workersdonecv is used to monitor the progress of workers. All parallel - * participants must indicate that they are done before leader can use - * results built by the workers (and before leader can write the data into - * the index). - */ - ConditionVariable workersdonecv; - - /* - * mutex protects all following fields - * - * These fields contain status information of interest to GIN index builds - * that must work just the same when an index is built in parallel. - */ - slock_t mutex; - - /* - * Mutable state that is maintained by workers, and reported back to - * leader at end of the scans. - * - * nparticipantsdone is number of worker processes finished. - * - * reltuples is the total number of input heap tuples. - * - * indtuples is the total number of tuples that made it into the index. - */ - int nparticipantsdone; - double reltuples; - double indtuples; + /* Common parallel index build state (must be first) */ + ParallelIndexBuildShared base; /* * ParallelTableScanDescData data follows. Can't directly embed here, as @@ -138,8 +101,7 @@ typedef struct GinLeader GinBuildShared *ginshared; Sharedsort *sharedsort; Snapshot snapshot; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; } GinLeader; typedef struct @@ -187,7 +149,6 @@ typedef struct static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state); -static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _gin_parallel_heapscan(GinBuildState *state); static double _gin_parallel_merge(GinBuildState *state); static void _gin_leader_participate_as_worker(GinBuildState *buildstate, @@ -945,10 +906,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, GinBuildShared *ginshared; Sharedsort *sharedsort; GinLeader *ginleader = palloc0_object(GinLeader); - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; bool leaderparticipates = true; - int querylen; #ifdef DISABLE_LEADER_PARTICIPATION leaderparticipates = false; @@ -957,59 +916,29 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, /* * Enter parallel mode, and create context for parallel build of gin index */ - EnterParallelMode(); - Assert(request > 0); - pcxt = CreateParallelContext("postgres", "_gin_parallel_build_main", - request); + pcxt = ParallelIndexBuildCreateContext("_gin_parallel_build_main", request); scantuplesortstates = leaderparticipates ? request + 1 : request; - /* - * Prepare for scan of the base relation. In a normal index build, we use - * SnapshotAny because we must retrieve all tuples and do our own time - * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. - */ - if (!isconcurrent) - snapshot = SnapshotAny; - else - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + /* Prepare for scan of the base relation. */ + snapshot = ParallelIndexBuildGetSnapshot(isconcurrent); /* * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace. */ - estginshared = _gin_parallel_estimate_shared(heap, snapshot); + estginshared = ParallelIndexBuildEstimateShared(heap, snapshot, + sizeof(GinBuildShared)); shm_toc_estimate_chunk(&pcxt->estimator, estginshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); shm_toc_estimate_keys(&pcxt->estimator, 2); - /* - * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE - * and PARALLEL_KEY_BUFFER_USAGE. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgWalUsage or - * pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); /* Everyone's had a chance to ask for space, so now create the DSM */ InitializeParallelDSM(pcxt); @@ -1026,23 +955,11 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, /* Store shared build state, for which we reserved space */ ginshared = (GinBuildShared *) shm_toc_allocate(pcxt->toc, estginshared); - /* Initialize immutable state */ - ginshared->heaprelid = RelationGetRelid(heap); - ginshared->indexrelid = RelationGetRelid(index); - ginshared->isconcurrent = isconcurrent; - ginshared->scantuplesortstates = scantuplesortstates; - - ConditionVariableInit(&ginshared->workersdonecv); - SpinLockInit(&ginshared->mutex); - - /* Initialize mutable state */ - ginshared->nparticipantsdone = 0; - ginshared->reltuples = 0.0; - ginshared->indtuples = 0.0; - - table_parallelscan_initialize(heap, - ParallelTableScanFromGinBuildShared(ginshared), - snapshot); + /* Initialize common state, and the parallel scan that follows the struct */ + ParallelIndexBuildInitShared(&ginshared->base, heap, index, isconcurrent, + scantuplesortstates, + ParallelTableScanFromGinBuildShared(ginshared), + snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1056,25 +973,10 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; + StoreParallelQueryText(pcxt, PARALLEL_KEY_QUERY_TEXT); - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); - } - - /* - * Allocate space for each worker's WalUsage and BufferUsage; no need to - * initialize. - */ - walusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); - bufferusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + /* Allocate space for each worker's Instrumentation. */ + instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -1085,8 +987,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, ginleader->ginshared = ginshared; ginleader->sharedsort = sharedsort; ginleader->snapshot = snapshot; - ginleader->walusage = walusage; - ginleader->bufferusage = bufferusage; + ginleader->instr = instr; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -1115,23 +1016,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) { - int i; - - /* Shutdown worker processes */ - WaitForParallelWorkersToFinish(ginleader->pcxt); - - /* - * Next, accumulate WAL usage. (This must wait for the workers to finish, - * or we might get incomplete data.) - */ - for (i = 0; i < ginleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]); - - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(ginleader->snapshot)) - UnregisterSnapshot(ginleader->snapshot); - DestroyParallelContext(ginleader->pcxt); - ExitParallelMode(); + ParallelIndexBuildEnd(ginleader->pcxt, ginleader->instr, + ginleader->snapshot); } /* @@ -1147,28 +1033,12 @@ static double _gin_parallel_heapscan(GinBuildState *state) { GinBuildShared *ginshared = state->bs_leader->ginshared; - int nparticipanttuplesorts; - - nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts; - for (;;) - { - SpinLockAcquire(&ginshared->mutex); - if (ginshared->nparticipantsdone == nparticipanttuplesorts) - { - /* copy the data into leader state */ - state->bs_reltuples = ginshared->reltuples; - state->bs_numtuples = ginshared->indtuples; - SpinLockRelease(&ginshared->mutex); - break; - } - SpinLockRelease(&ginshared->mutex); - - ConditionVariableSleep(&ginshared->workersdonecv, - WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); - } - - ConditionVariableCancelSleep(); + /* Wait for all participants to finish, and collect their results */ + ParallelIndexBuildWaitForWorkers(&ginshared->base, + state->bs_leader->nparticipanttuplesorts, + &state->bs_reltuples, &state->bs_numtuples, + NULL, NULL); return state->bs_reltuples; } @@ -1802,17 +1672,6 @@ _gin_parallel_merge(GinBuildState *state) return reltuples; } -/* - * Returns size of shared memory required to store state for a parallel - * gin index build based on the snapshot its parallel scan will use. - */ -static Size -_gin_parallel_estimate_shared(Relation heap, Snapshot snapshot) -{ - /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ - return add_size(BUFFERALIGN(sizeof(GinBuildShared)), - table_parallelscan_estimate(heap, snapshot)); -} /* * Within leader, participate as a parallel worker. @@ -2049,7 +1908,7 @@ _gin_parallel_scan_and_build(GinBuildState *state, state->work_mem = (sortmem / 2); /* remember how many workers participate in the build */ - state->bs_num_workers = ginshared->scantuplesortstates; + state->bs_num_workers = ginshared->base.scantuplesortstates; /* Begin "partial" tuplesort */ state->bs_sortstate = tuplesort_begin_index_gin(heap, index, @@ -2065,7 +1924,7 @@ _gin_parallel_scan_and_build(GinBuildState *state, /* Join parallel scan */ indexInfo = BuildIndexInfo(index); - indexInfo->ii_Concurrent = ginshared->isconcurrent; + indexInfo->ii_Concurrent = ginshared->base.isconcurrent; scan = table_beginscan_parallel(heap, ParallelTableScanFromGinBuildShared(ginshared), @@ -2089,17 +1948,9 @@ _gin_parallel_scan_and_build(GinBuildState *state, state->bs_reltuples += reltuples; - /* - * Done. Record ambuild statistics. - */ - SpinLockAcquire(&ginshared->mutex); - ginshared->nparticipantsdone++; - ginshared->reltuples += state->bs_reltuples; - ginshared->indtuples += state->bs_numtuples; - SpinLockRelease(&ginshared->mutex); - - /* Notify leader */ - ConditionVariableSignal(&ginshared->workersdonecv); + /* Done. Record ambuild statistics, and notify leader. */ + ParallelIndexBuildReportScanDone(&ginshared->base, state->bs_reltuples, + state->bs_numtuples, false, false); tuplesort_end(state->bs_sortstate); } @@ -2110,7 +1961,6 @@ _gin_parallel_scan_and_build(GinBuildState *state, void _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) { - char *sharedquery; GinBuildShared *ginshared; Sharedsort *sharedsort; GinBuildState buildstate; @@ -2118,8 +1968,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *worker_instr; int sortmem; /* @@ -2129,31 +1978,15 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Assert((MyProc->statusFlags == 0) || (MyProc->statusFlags == PROC_IN_SAFE_IC)); - /* Set debug_query_string for individual workers first */ - sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - - /* Report the query string from leader */ - pgstat_report_activity(STATE_RUNNING, debug_query_string); + /* Set debug_query_string and report the query string from leader */ + RestoreParallelQueryText(toc, PARALLEL_KEY_QUERY_TEXT); /* Look up gin shared state */ ginshared = shm_toc_lookup(toc, PARALLEL_KEY_GIN_SHARED, false); - /* Open relations using lock modes known to be obtained by index.c */ - if (!ginshared->isconcurrent) - { - heapLockmode = ShareLock; - indexLockmode = AccessExclusiveLock; - } - else - { - heapLockmode = ShareUpdateExclusiveLock; - indexLockmode = RowExclusiveLock; - } - - /* Open relations within worker */ - heapRel = table_open(ginshared->heaprelid, heapLockmode); - indexRel = index_open(ginshared->indexrelid, indexLockmode); + /* Open relations within worker, using the leader's lock modes */ + ParallelIndexBuildOpenRelations(&ginshared->base, &heapRel, &indexRel, + &heapLockmode, &indexLockmode); /* initialize the GIN build state */ initGinState(&buildstate.ginstate, indexRel); @@ -2193,19 +2026,17 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) * (when requested number of workers were not launched, this will be * somewhat higher than it is for other workers). */ - sortmem = maintenance_work_mem / ginshared->scantuplesortstates; + sortmem = maintenance_work_mem / ginshared->base.scantuplesortstates; _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort, heapRel, indexRel, sortmem, false); /* Report WAL/buffer usage during parallel execution */ - bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); - index_close(indexRel, indexLockmode); - table_close(heapRel, heapLockmode); + ParallelIndexBuildCloseRelations(heapRel, indexRel, heapLockmode, + indexLockmode); } /* diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 756dfa3dcf47e..c953805576ce3 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -42,6 +42,7 @@ #include "access/nbtree.h" #include "access/parallel.h" +#include "access/parallel_index_build.h" #include "access/relscan.h" #include "access/table.h" #include "access/tableam.h" @@ -66,8 +67,7 @@ #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002) #define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000005) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xA000000000000006) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xA000000000000005) /* * DISABLE_LEADER_PARTICIPATION disables the leader's participation in @@ -96,58 +96,12 @@ typedef struct BTSpool */ typedef struct BTShared { - /* - * These fields are not modified during the sort. They primarily exist - * for the benefit of worker processes that need to create BTSpool state - * corresponding to that used by the leader. - */ - Oid heaprelid; - Oid indexrelid; + /* Common parallel index build state (must be first) */ + ParallelIndexBuildShared base; + + /* nbtree-specific immutable state, not modified during the sort */ bool isunique; bool nulls_not_distinct; - bool isconcurrent; - int scantuplesortstates; - - /* Query ID, for report in worker processes */ - int64 queryid; - - /* - * workersdonecv is used to monitor the progress of workers. All parallel - * participants must indicate that they are done before leader can use - * mutable state that workers maintain during scan (and before leader can - * proceed to tuplesort_performsort()). - */ - ConditionVariable workersdonecv; - - /* - * mutex protects all fields before heapdesc. - * - * These fields contain status information of interest to B-Tree index - * builds that must work just the same when an index is built in parallel. - */ - slock_t mutex; - - /* - * Mutable state that is maintained by workers, and reported back to - * leader at end of parallel scan. - * - * nparticipantsdone is number of worker processes finished. - * - * reltuples is the total number of input heap tuples. - * - * havedead indicates if RECENTLY_DEAD tuples were encountered during - * build. - * - * indtuples is the total number of tuples that made it into the index. - * - * brokenhotchain indicates if any worker detected a broken HOT chain - * during build. - */ - int nparticipantsdone; - double reltuples; - bool havedead; - double indtuples; - bool brokenhotchain; /* * ParallelTableScanDescData data follows. Can't directly embed here, as @@ -195,8 +149,7 @@ typedef struct BTLeader Sharedsort *sharedsort; Sharedsort *sharedsort2; Snapshot snapshot; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; } BTLeader; /* @@ -282,7 +235,6 @@ static void _bt_load(BTWriteState *wstate, static void _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request); static void _bt_end_parallel(BTLeader *btleader); -static Size _bt_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _bt_parallel_heapscan(BTBuildState *buildstate, bool *brokenhotchain); static void _bt_leader_participate_as_worker(BTBuildState *buildstate); @@ -1408,10 +1360,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) Sharedsort *sharedsort2; BTSpool *btspool = buildstate->spool; BTLeader *btleader = palloc0_object(BTLeader); - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; bool leaderparticipates = true; - int querylen; #ifdef DISABLE_LEADER_PARTICIPATION leaderparticipates = false; @@ -1421,30 +1371,19 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) * Enter parallel mode, and create context for parallel build of btree * index */ - EnterParallelMode(); - Assert(request > 0); - pcxt = CreateParallelContext("postgres", "_bt_parallel_build_main", - request); + pcxt = ParallelIndexBuildCreateContext("_bt_parallel_build_main", request); scantuplesortstates = leaderparticipates ? request + 1 : request; - /* - * Prepare for scan of the base relation. In a normal index build, we use - * SnapshotAny because we must retrieve all tuples and do our own time - * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. - */ - if (!isconcurrent) - snapshot = SnapshotAny; - else - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + /* Prepare for scan of the base relation. */ + snapshot = ParallelIndexBuildGetSnapshot(isconcurrent); /* * Estimate size for our own PARALLEL_KEY_BTREE_SHARED workspace, and * PARALLEL_KEY_TUPLESORT tuplesort workspace */ - estbtshared = _bt_parallel_estimate_shared(btspool->heap, snapshot); + estbtshared = ParallelIndexBuildEstimateShared(btspool->heap, snapshot, + sizeof(BTShared)); shm_toc_estimate_chunk(&pcxt->estimator, estbtshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -1461,30 +1400,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) shm_toc_estimate_keys(&pcxt->estimator, 3); } - /* - * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE - * and PARALLEL_KEY_BUFFER_USAGE. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgWalUsage or - * pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); /* Everyone's had a chance to ask for space, so now create the DSM */ InitializeParallelDSM(pcxt); @@ -1501,25 +1421,14 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) /* Store shared build state, for which we reserved space */ btshared = (BTShared *) shm_toc_allocate(pcxt->toc, estbtshared); - /* Initialize immutable state */ - btshared->heaprelid = RelationGetRelid(btspool->heap); - btshared->indexrelid = RelationGetRelid(btspool->index); + /* Initialize nbtree-specific immutable state */ btshared->isunique = btspool->isunique; btshared->nulls_not_distinct = btspool->nulls_not_distinct; - btshared->isconcurrent = isconcurrent; - btshared->scantuplesortstates = scantuplesortstates; - btshared->queryid = pgstat_get_my_query_id(); - ConditionVariableInit(&btshared->workersdonecv); - SpinLockInit(&btshared->mutex); - /* Initialize mutable state */ - btshared->nparticipantsdone = 0; - btshared->reltuples = 0.0; - btshared->havedead = false; - btshared->indtuples = 0.0; - btshared->brokenhotchain = false; - table_parallelscan_initialize(btspool->heap, - ParallelTableScanFromBTShared(btshared), - snapshot); + /* Initialize common state, and the parallel scan that follows the struct */ + ParallelIndexBuildInitShared(&btshared->base, btspool->heap, btspool->index, + isconcurrent, scantuplesortstates, + ParallelTableScanFromBTShared(btshared), + snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1550,25 +1459,10 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) } /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); - } + StoreParallelQueryText(pcxt, PARALLEL_KEY_QUERY_TEXT); - /* - * Allocate space for each worker's WalUsage and BufferUsage; no need to - * initialize. - */ - walusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); - bufferusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + /* Allocate space for each worker's Instrumentation. */ + instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -1580,8 +1474,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btleader->sharedsort = sharedsort; btleader->sharedsort2 = sharedsort2; btleader->snapshot = snapshot; - btleader->walusage = walusage; - btleader->bufferusage = bufferusage; + btleader->instr = instr; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -1610,35 +1503,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) static void _bt_end_parallel(BTLeader *btleader) { - int i; - - /* Shutdown worker processes */ - WaitForParallelWorkersToFinish(btleader->pcxt); - - /* - * Next, accumulate WAL usage. (This must wait for the workers to finish, - * or we might get incomplete data.) - */ - for (i = 0; i < btleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]); - - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(btleader->snapshot)) - UnregisterSnapshot(btleader->snapshot); - DestroyParallelContext(btleader->pcxt); - ExitParallelMode(); -} - -/* - * Returns size of shared memory required to store state for a parallel - * btree index build based on the snapshot its parallel scan will use. - */ -static Size -_bt_parallel_estimate_shared(Relation heap, Snapshot snapshot) -{ - /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ - return add_size(BUFFERALIGN(sizeof(BTShared)), - table_parallelscan_estimate(heap, snapshot)); + ParallelIndexBuildEnd(btleader->pcxt, btleader->instr, btleader->snapshot); } /* @@ -1657,29 +1522,13 @@ static double _bt_parallel_heapscan(BTBuildState *buildstate, bool *brokenhotchain) { BTShared *btshared = buildstate->btleader->btshared; - int nparticipanttuplesorts; double reltuples; - nparticipanttuplesorts = buildstate->btleader->nparticipanttuplesorts; - for (;;) - { - SpinLockAcquire(&btshared->mutex); - if (btshared->nparticipantsdone == nparticipanttuplesorts) - { - buildstate->havedead = btshared->havedead; - buildstate->indtuples = btshared->indtuples; - *brokenhotchain = btshared->brokenhotchain; - reltuples = btshared->reltuples; - SpinLockRelease(&btshared->mutex); - break; - } - SpinLockRelease(&btshared->mutex); - - ConditionVariableSleep(&btshared->workersdonecv, - WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); - } - - ConditionVariableCancelSleep(); + /* Wait for all participants to finish, and collect their results */ + ParallelIndexBuildWaitForWorkers(&btshared->base, + buildstate->btleader->nparticipanttuplesorts, + &reltuples, &buildstate->indtuples, + &buildstate->havedead, brokenhotchain); return reltuples; } @@ -1743,7 +1592,6 @@ _bt_leader_participate_as_worker(BTBuildState *buildstate) void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) { - char *sharedquery; BTSpool *btspool; BTSpool *btspool2; BTShared *btshared; @@ -1753,8 +1601,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *worker_instr; int sortmem; #ifdef BTREE_BUILD_STATS @@ -1769,34 +1616,15 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) Assert((MyProc->statusFlags == 0) || (MyProc->statusFlags == PROC_IN_SAFE_IC)); - /* Set debug_query_string for individual workers first */ - sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - - /* Report the query string from leader */ - pgstat_report_activity(STATE_RUNNING, debug_query_string); + /* Set debug_query_string and report the query string from leader */ + RestoreParallelQueryText(toc, PARALLEL_KEY_QUERY_TEXT); /* Look up nbtree shared state */ btshared = shm_toc_lookup(toc, PARALLEL_KEY_BTREE_SHARED, false); - /* Open relations using lock modes known to be obtained by index.c */ - if (!btshared->isconcurrent) - { - heapLockmode = ShareLock; - indexLockmode = AccessExclusiveLock; - } - else - { - heapLockmode = ShareUpdateExclusiveLock; - indexLockmode = RowExclusiveLock; - } - - /* Track query ID */ - pgstat_report_query_id(btshared->queryid, false); - - /* Open relations within worker */ - heapRel = table_open(btshared->heaprelid, heapLockmode); - indexRel = index_open(btshared->indexrelid, indexLockmode); + /* Open relations within worker, using the leader's lock modes */ + ParallelIndexBuildOpenRelations(&btshared->base, &heapRel, &indexRel, + &heapLockmode, &indexLockmode); /* Initialize worker's own spool */ btspool = palloc0_object(BTSpool); @@ -1831,15 +1659,13 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) InstrStartParallelQuery(); /* Perform sorting of spool, and possibly a spool2 */ - sortmem = maintenance_work_mem / btshared->scantuplesortstates; + sortmem = maintenance_work_mem / btshared->base.scantuplesortstates; _bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort, sharedsort2, sortmem, false); /* Report WAL/buffer usage during parallel execution */ - bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); #ifdef BTREE_BUILD_STATS if (log_btree_build_stats) @@ -1849,8 +1675,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) } #endif /* BTREE_BUILD_STATS */ - index_close(indexRel, indexLockmode); - table_close(heapRel, heapLockmode); + ParallelIndexBuildCloseRelations(heapRel, indexRel, heapLockmode, + indexLockmode); } /* @@ -1926,7 +1752,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, /* Join parallel scan */ indexInfo = BuildIndexInfo(btspool->index); - indexInfo->ii_Concurrent = btshared->isconcurrent; + indexInfo->ii_Concurrent = btshared->base.isconcurrent; scan = table_beginscan_parallel(btspool->heap, ParallelTableScanFromBTShared(btshared), SO_NONE); @@ -1948,21 +1774,12 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, } /* - * Done. Record ambuild statistics, and whether we encountered a broken - * HOT chain. + * Done. Record ambuild statistics (including whether we encountered a + * broken HOT chain), and notify the leader. */ - SpinLockAcquire(&btshared->mutex); - btshared->nparticipantsdone++; - btshared->reltuples += reltuples; - if (buildstate.havedead) - btshared->havedead = true; - btshared->indtuples += buildstate.indtuples; - if (indexInfo->ii_BrokenHotChain) - btshared->brokenhotchain = true; - SpinLockRelease(&btshared->mutex); - - /* Notify leader */ - ConditionVariableSignal(&btshared->workersdonecv); + ParallelIndexBuildReportScanDone(&btshared->base, reltuples, + buildstate.indtuples, buildstate.havedead, + indexInfo->ii_BrokenHotChain); /* We can end tuplesorts immediately */ tuplesort_end(btspool->sortstate); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 89e9d224eec7d..f55cfa6853fc6 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -28,6 +28,7 @@ #include "commands/async.h" #include "commands/vacuum.h" #include "executor/execParallel.h" +#include "executor/instrument.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqmq.h" @@ -1026,6 +1027,79 @@ DestroyParallelContext(ParallelContext *pcxt) pfree(pcxt); } +/* + * Helpers for passing the current query text down to parallel workers. + */ + +/* Reserve DSM space for the query text, if any. */ +void +EstimateParallelQueryText(ParallelContext *pcxt) +{ + if (debug_query_string) + { + shm_toc_estimate_chunk(&pcxt->estimator, strlen(debug_query_string) + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* Copy the query text into DSM under the given key, if any. */ +void +StoreParallelQueryText(ParallelContext *pcxt, uint64 key) +{ + if (debug_query_string) + { + Size querylen = strlen(debug_query_string); + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, key, sharedquery); + } +} + +/* + * Restore the query text in a worker: set debug_query_string and report it as + * the current activity. The key is looked up with missing_ok, so this is a + * no-op (leaving debug_query_string NULL) when the leader stored no text. + */ +void +RestoreParallelQueryText(shm_toc *toc, uint64 key) +{ + debug_query_string = shm_toc_lookup(toc, key, true); + pgstat_report_activity(STATE_RUNNING, debug_query_string); +} + +/* + * Helpers for managing the per-worker Instrumentation array that parallel + * leaders allocate in DSM. The worker side fills in its own slot directly via + * InstrEndParallelQuery. + */ + +/* Reserve DSM space for the per-worker Instrumentation array. */ +void +EstimateParallelInstrumentation(ParallelContext *pcxt) +{ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * Allocate the per-worker Instrumentation array in DSM and publish it under + * the given key. No need to initialize; each worker fills in its own slot. + * Returns the array for the leader's convenience. + */ +Instrumentation * +StoreParallelInstrumentation(ParallelContext *pcxt, uint64 key) +{ + Instrumentation *instr; + + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, key, instr); + return instr; +} + /* * Are there any parallel contexts currently active? */ diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 41cefcfde54fe..f0726752f9db1 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -56,9 +56,8 @@ */ #define PARALLEL_VACUUM_KEY_SHARED 1 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2 -#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3 -#define PARALLEL_VACUUM_KEY_WAL_USAGE 4 -#define PARALLEL_VACUUM_KEY_INDEX_STATS 5 +#define PARALLEL_VACUUM_KEY_INSTRUMENTATION 3 +#define PARALLEL_VACUUM_KEY_INDEX_STATS 4 /* * Struct for cost-based vacuum delay related parameters to share among an @@ -236,11 +235,8 @@ struct ParallelVacuumState /* Shared dead items space among parallel vacuum workers */ TidStore *dead_items; - /* Points to buffer usage area in DSM */ - BufferUsage *buffer_usage; - - /* Points to WAL usage area in DSM */ - WalUsage *wal_usage; + /* Points to instrumentation area in DSM */ + Instrumentation *instr; /* * False if the index is totally unsuitable target for all parallel @@ -311,14 +307,11 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PVShared *shared; TidStore *dead_items; PVIndStats *indstats; - BufferUsage *buffer_usage; - WalUsage *wal_usage; bool *will_parallel_vacuum; Size est_indstats_len; Size est_shared_len; int nindexes_mwm = 0; int parallel_workers = 0; - int querylen; /* * A parallel vacuum must be requested and there must be indexes on the @@ -364,30 +357,11 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); shm_toc_estimate_keys(&pcxt->estimator, 1); - /* - * Estimate space for BufferUsage and WalUsage -- - * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgBufferUsage or - * pgWalUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); InitializeParallelDSM(pcxt); @@ -473,30 +447,12 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); pvs->shared = shared; - /* - * Allocate space for each worker's BufferUsage and WalUsage; no need to - * initialize - */ - buffer_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); - pvs->buffer_usage = buffer_usage; - wal_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); - pvs->wal_usage = wal_usage; + /* Allocate space for each worker's Instrumentation. */ + pvs->instr = StoreParallelInstrumentation(pcxt, + PARALLEL_VACUUM_KEY_INSTRUMENTATION); /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - sharedquery[querylen] = '\0'; - shm_toc_insert(pcxt->toc, - PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); - } + StoreParallelQueryText(pcxt, PARALLEL_VACUUM_KEY_QUERY_TEXT); /* Success -- return parallel vacuum state */ return pvs; @@ -944,8 +900,8 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan /* Wait for all vacuum workers to finish */ WaitForParallelWorkersToFinish(pvs->pcxt); - for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); + InstrAccumParallelQuery(pvs->instr, + pvs->pcxt->nworkers_launched); } /* @@ -1202,10 +1158,8 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) PVIndStats *indstats; PVShared *shared; TidStore *dead_items; - BufferUsage *buffer_usage; - WalUsage *wal_usage; + Instrumentation *worker_instr; int nindexes; - char *sharedquery; ErrorContextCallback errcallback; /* @@ -1219,9 +1173,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false); /* Set debug_query_string for individual workers */ - sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - pgstat_report_activity(STATE_RUNNING, debug_query_string); + RestoreParallelQueryText(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT); /* Track query ID */ pgstat_report_query_id(shared->queryid, false); @@ -1311,10 +1263,8 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) parallel_vacuum_process_safe_indexes(&pvs); /* Report buffer/WAL usage during parallel execution */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); - wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); /* Report any remaining cost-based vacuum delay time */ if (track_cost_delay_timing) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 81b87d82fab47..4f202f544b332 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -60,13 +60,12 @@ #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000004) #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) -#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) +#define PARALLEL_KEY_NODE_INSTRUMENTATION UINT64CONST(0xE000000000000006) #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -662,8 +661,6 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_data; char *pstmt_space; char *paramlistinfo_space; - BufferUsage *bufusage_space; - WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; @@ -726,23 +723,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); shm_toc_estimate_keys(&pcxt->estimator, 1); - /* - * Estimate space for BufferUsage. - * - * If EXPLAIN is not in use and there are no extensions loaded that care, - * we could skip this. But we have no way of knowing whether anyone's - * looking at pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* - * Same thing for WalUsage. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Estimate space for tuple queues. */ shm_toc_estimate_chunk(&pcxt->estimator, @@ -827,17 +809,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); - /* Allocate space for each worker's BufferUsage; no need to initialize. */ - bufusage_space = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); - pei->buffer_usage = bufusage_space; - - /* Same for WalUsage. */ - walusage_space = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space); - pei->wal_usage = walusage_space; + /* Allocate space for each worker's Instrumentation. */ + pei->instrumentation = StoreParallelInstrumentation(pcxt, + PARALLEL_KEY_INSTRUMENTATION); /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); @@ -863,9 +837,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, instrument = GetInstrumentationArray(instrumentation); for (i = 0; i < nworkers * e.nnodes; ++i) InstrInitNode(&instrument[i], estate->es_instrument, false); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, + shm_toc_insert(pcxt->toc, PARALLEL_KEY_NODE_INSTRUMENTATION, instrumentation); - pei->instrumentation = instrumentation; + pei->node_instrumentation = instrumentation; if (estate->es_jit_flags != PGJIT_NONE) { @@ -1258,8 +1232,7 @@ ExecParallelFinish(ParallelExecutorInfo *pei) * Next, accumulate buffer/WAL usage. (This must wait for the workers to * finish, or we might get incomplete data.) */ - for (i = 0; i < nworkers; i++) - InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); + InstrAccumParallelQuery(pei->instrumentation, nworkers); pei->finished = true; } @@ -1273,10 +1246,10 @@ ExecParallelFinish(ParallelExecutorInfo *pei) void ExecParallelCleanup(ParallelExecutorInfo *pei) { - /* Accumulate instrumentation, if any. */ - if (pei->instrumentation) + /* Accumulate node instrumentation, if any. */ + if (pei->node_instrumentation) ExecParallelRetrieveInstrumentation(pei->planstate, - pei->instrumentation); + pei->node_instrumentation); /* Accumulate JIT instrumentation, if any. */ if (pei->jit_instrumentation) @@ -1514,8 +1487,6 @@ void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { FixedParallelExecutorState *fpes; - BufferUsage *buffer_usage; - WalUsage *wal_usage; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1530,7 +1501,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); - instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); + instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_NODE_INSTRUMENTATION, true); if (instrumentation != NULL) instrument_options = instrumentation->instrument_options; jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION, @@ -1588,10 +1559,12 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecutorFinish(queryDesc); /* Report buffer/WAL usage during parallel execution. */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); + { + Instrumentation *worker_instr; + + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); + } /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index ffbcd57213396..20431e64fb4de 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -284,20 +284,29 @@ InstrStartParallelQuery(void) /* report usage after parallel executor shutdown */ void -InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage) +InstrEndParallelQuery(Instrumentation *instr) { - memset(bufusage, 0, sizeof(BufferUsage)); - BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage); - memset(walusage, 0, sizeof(WalUsage)); - WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage); + memset(&instr->bufusage, 0, sizeof(BufferUsage)); + BufferUsageAccumDiff(&instr->bufusage, &pgBufferUsage, &save_pgBufferUsage); + memset(&instr->walusage, 0, sizeof(WalUsage)); + WalUsageAccumDiff(&instr->walusage, &pgWalUsage, &save_pgWalUsage); } -/* accumulate work done by workers in leader's stats */ +/* + * Accumulate work done by parallel workers in the leader's stats. + * + * instr points to the per-worker Instrumentation array the leader allocated in + * DSM; each of the nworkers launched workers reported into its own slot via + * InstrEndParallelQuery. Must be called only after the workers have finished. + */ void -InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage) +InstrAccumParallelQuery(Instrumentation *instr, int nworkers) { - BufferUsageAdd(&pgBufferUsage, bufusage); - WalUsageAdd(&pgWalUsage, walusage); + for (int i = 0; i < nworkers; i++) + { + BufferUsageAdd(&pgBufferUsage, &instr[i].bufusage); + WalUsageAdd(&pgWalUsage, &instr[i].walusage); + } } /* dst += add */ diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 60f857675e05a..20e7e7673b902 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -72,6 +72,13 @@ extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); +extern void EstimateParallelQueryText(ParallelContext *pcxt); +extern void StoreParallelQueryText(ParallelContext *pcxt, uint64 key); +extern void RestoreParallelQueryText(shm_toc *toc, uint64 key); +extern void EstimateParallelInstrumentation(ParallelContext *pcxt); +extern struct Instrumentation *StoreParallelInstrumentation(ParallelContext *pcxt, + uint64 key); + extern void HandleParallelMessageInterrupt(void); extern void ProcessParallelMessages(void); extern void AtEOXact_Parallel(bool isCommit); diff --git a/src/include/access/parallel_index_build.h b/src/include/access/parallel_index_build.h new file mode 100644 index 0000000000000..f6d311218a4c3 --- /dev/null +++ b/src/include/access/parallel_index_build.h @@ -0,0 +1,119 @@ +/*------------------------------------------------------------------------- + * + * parallel_index_build.h + * Shared infrastructure for parallel index builds. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/parallel_index_build.h + * + *------------------------------------------------------------------------- + */ +#ifndef PARALLEL_INDEX_BUILD_H +#define PARALLEL_INDEX_BUILD_H + +#include "access/parallel.h" +#include "access/relscan.h" +#include "storage/condition_variable.h" +#include "storage/lockdefs.h" +#include "storage/spin.h" +#include "utils/relcache.h" +#include "utils/snapshot.h" + +struct Instrumentation; + +/* + * Shared state common to all parallel index builds. + * + * Access methods embed this as the first member of their own shared state + * and append any AM-specific fields, followed by the ParallelTableScanDescData + * for the heap scan. The immutable fields are set once by the leader before + * workers are launched; the mutable fields are maintained by the workers + * under mutex and read back by the leader once all workers have signalled + * completion. + */ +typedef struct ParallelIndexBuildShared +{ + /* Immutable state, set by the leader before launching workers */ + Oid heaprelid; + Oid indexrelid; + bool isconcurrent; + int scantuplesortstates; + + /* Query ID, for report in worker processes */ + int64 queryid; + + /* + * workersdonecv is used to monitor the progress of workers. All parallel + * participants must indicate that they are done before the leader can use + * the results built by the workers. + */ + ConditionVariable workersdonecv; + + /* + * mutex protects the mutable fields below. + */ + slock_t mutex; + + /* + * Mutable state that is maintained by workers for all index types, and + * reported back to leader at end of the scans. + * + * nparticipantsdone is the number of worker processes finished. + * + * reltuples is the total number of input heap tuples. + * + * indtuples is the total number of tuples that made it into the index. + */ + int nparticipantsdone; + double reltuples; + double indtuples; + + /* + * Mutable state that is maintained for exact index AMs (e.g. nbtree), and + * unused for lossy AMs. + * + * havedead indicates if RECENTLY_DEAD tuples were encountered during + * build. + * + * brokenhotchain indicates if any worker detected a broken HOT chain + * during build. + */ + bool havedead; + bool brokenhotchain; +} ParallelIndexBuildShared; + +/* Leader-side helpers */ +extern ParallelContext *ParallelIndexBuildCreateContext(const char *worker_function, + int nworkers); +extern Snapshot ParallelIndexBuildGetSnapshot(bool isconcurrent); +extern Size ParallelIndexBuildEstimateShared(Relation heap, Snapshot snapshot, + Size am_shared_size); +extern void ParallelIndexBuildInitShared(ParallelIndexBuildShared * shared, + Relation heap, Relation index, + bool isconcurrent, + int scantuplesortstates, + ParallelTableScanDesc pscan, + Snapshot snapshot); +extern void ParallelIndexBuildWaitForWorkers(ParallelIndexBuildShared * shared, + int nparticipants, + double *reltuples, double *indtuples, + bool *havedead, bool *brokenhotchain); +extern void ParallelIndexBuildEnd(ParallelContext *pcxt, + struct Instrumentation *instr, + Snapshot snapshot); + +/* Worker-side helpers */ +extern void ParallelIndexBuildReportScanDone(ParallelIndexBuildShared * shared, + double reltuples, double indtuples, + bool havedead, bool brokenhotchain); +extern void ParallelIndexBuildOpenRelations(ParallelIndexBuildShared * shared, + Relation *heapRel, Relation *indexRel, + LOCKMODE *heapLockmode, + LOCKMODE *indexLockmode); +extern void ParallelIndexBuildCloseRelations(Relation heapRel, Relation indexRel, + LOCKMODE heapLockmode, + LOCKMODE indexLockmode); + +#endif /* PARALLEL_INDEX_BUILD_H */ diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 5a2034811d563..6c8b602d07f98 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -25,9 +25,8 @@ typedef struct ParallelExecutorInfo { PlanState *planstate; /* plan subtree we're running in parallel */ ParallelContext *pcxt; /* parallel context we're using */ - BufferUsage *buffer_usage; /* points to bufusage area in DSM */ - WalUsage *wal_usage; /* walusage area in DSM */ - SharedExecutorInstrumentation *instrumentation; /* optional */ + Instrumentation *instrumentation; /* instrumentation area in DSM */ + SharedExecutorInstrumentation *node_instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index f093a52aae013..8fab0e17fd0fb 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -148,8 +148,8 @@ extern void InstrStartTrigger(TriggerInstrumentation *tginstr); extern void InstrStopTrigger(TriggerInstrumentation *tginstr, int64 firings); extern void InstrStartParallelQuery(void); -extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage); -extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage); +extern void InstrEndParallelQuery(Instrumentation *instr); +extern void InstrAccumParallelQuery(Instrumentation *instr, int nworkers); extern void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add, const BufferUsage *sub); extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add,