diff --git a/compiler/rustc_data_structures/src/cache_entry.rs b/compiler/rustc_data_structures/src/cache_entry.rs new file mode 100644 index 0000000000000..34e69bc2669f6 --- /dev/null +++ b/compiler/rustc_data_structures/src/cache_entry.rs @@ -0,0 +1,325 @@ +use std::cell::UnsafeCell; +use std::mem::{ManuallyDrop, MaybeUninit, needs_drop}; +use std::sync::atomic::{self, AtomicU32, Ordering}; + +use rustc_thread_pool::current_num_threads; + +use crate::sync::{DynSend, DynSync, Parker, Unparker}; + +pub enum GetOrStartError<'a, V, I> { + InProgress(EntryInProgress<'a, V>), + Interrupted(I), +} + +pub enum GetError { + InProgress, + Interrupted(I), +} + +pub struct Status(AtomicU32); + +impl Status { + const EMPTY: u32 = 0; + const POISONED: u32 = 1; + + // FIXME: consider using lower bits instead of high ones and swap bits' meaning with its + // opposite to optimize immediate values on RISC architectures. + + // If set then lower status bits should represent which worker threads are waiting on this query + const IN_PROGRESS_BIT: u32 = 1 << (u32::BITS - 1); + // If set then lower status bits should represent associated DepNodeIndex + const NOT_IN_PROGRESS_COMPLETE_BIT: u32 = 1 << (u32::BITS - 2); + + const NOT_IN_PROGRESS_COMPLETE_INDEX_MASK: u32 = + !(Self::IN_PROGRESS_BIT | Self::NOT_IN_PROGRESS_COMPLETE_BIT); + const IN_PROGRESS_THREAD_INDEX_MASK: u32 = !Self::IN_PROGRESS_BIT; + + const fn complete(index: u32) -> u32 { + debug_assert!(index & !Status::NOT_IN_PROGRESS_COMPLETE_INDEX_MASK == 0); + index | Status::NOT_IN_PROGRESS_COMPLETE_BIT + } + + pub fn waiter_threads(&self) -> u32 { + let status = self.0.load(atomic::Ordering::Relaxed); + assert!(status & Self::IN_PROGRESS_BIT != 0); + status & Self::IN_PROGRESS_THREAD_INDEX_MASK + } + + pub fn remove_waiter_threads(&self, thread_mask: u32) { + assert!( + thread_mask & Self::IN_PROGRESS_BIT == 0, + "{} {}", + thread_mask, + current_num_threads() + ); + let mut status = self.0.load(atomic::Ordering::Relaxed); + loop { + assert!(status & Self::IN_PROGRESS_BIT != 0); + assert!(status & thread_mask == thread_mask); + let res = self.0.compare_exchange_weak( + status, + status & !thread_mask, + atomic::Ordering::Relaxed, + atomic::Ordering::Relaxed, + ); + if let Err(new_status) = res { + status = new_status; + } else { + break; + } + } + } +} + +const _: () = { + if Status::IN_PROGRESS_THREAD_INDEX_MASK.count_ones() as usize + != rustc_thread_pool::max_num_threads() + { + panic!(); + } +}; + +pub struct CacheEntry { + status: Status, + value: UnsafeCell>, +} + +unsafe impl Sync for CacheEntry {} +unsafe impl DynSync for CacheEntry {} + +impl Default for CacheEntry { + fn default() -> Self { + CacheEntry::empty() + } +} + +impl CacheEntry { + #[inline] + pub const fn empty() -> Self { + CacheEntry { + status: Status(AtomicU32::new(0)), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + #[inline] + pub const fn complete(index: u32, x: V) -> Self { + CacheEntry { + status: Status(AtomicU32::new(Status::complete(index))), + value: UnsafeCell::new(MaybeUninit::new(x)), + } + } + + pub fn status(&self) -> &Status { + &self.status + } + + #[inline] + pub fn get_or_start( + &self, + parker: P, + ) -> Result<(&V, u32), GetOrStartError<'_, V, P::Interrupt>> { + // Should this load be relaxed and use an Acquire fence if complete (or poisoned)? + let mut status = self.status.0.load(Ordering::Acquire); + loop { + if status & !Status::NOT_IN_PROGRESS_COMPLETE_INDEX_MASK + == Status::NOT_IN_PROGRESS_COMPLETE_BIT + { + return Ok(unsafe { self.assume_complete(status) }); + } else if status == Status::EMPTY { + let res = self.status.0.compare_exchange_weak( + Status::EMPTY, + Status::IN_PROGRESS_BIT, + Ordering::Relaxed, + Ordering::Relaxed, + ); + match res { + Ok(_) => { + return Err(GetOrStartError::InProgress(EntryInProgress { entry: self })); + } + Err(new) => { + status = new; + continue; + } + } + } else { + return self.wait(parker).map_err(GetOrStartError::Interrupted); + } + } + } + + #[inline] + pub fn try_start(&self) -> Option> { + let mut status = self.status.0.load(Ordering::Acquire); + loop { + if status & !Status::NOT_IN_PROGRESS_COMPLETE_INDEX_MASK + == Status::NOT_IN_PROGRESS_COMPLETE_BIT + { + return None; + } else if status == Status::EMPTY { + let res = self.status.0.compare_exchange_weak( + Status::EMPTY, + Status::IN_PROGRESS_BIT, + Ordering::Relaxed, + Ordering::Relaxed, + ); + match res { + Ok(_) => return Some(EntryInProgress { entry: self }), + Err(new) => { + status = new; + continue; + } + } + } else if status == Status::POISONED { + panic!("panic propagation") + } else { + return None; + } + } + } + + #[inline] + pub fn get(&self, parker: P) -> Result<(&V, u32), GetError> { + let status = self.status.0.load(Ordering::Acquire); + if status & !Status::NOT_IN_PROGRESS_COMPLETE_INDEX_MASK + == Status::NOT_IN_PROGRESS_COMPLETE_BIT + { + Ok(unsafe { self.assume_complete(status) }) + } else if status == Status::EMPTY { + Err(GetError::InProgress) + } else { + self.wait(parker).map_err(GetError::Interrupted) + } + } + + #[inline] + pub fn get_finished(&self) -> Option<(&V, u32)> { + let status = self.status.0.load(Ordering::Acquire); + if status & !Status::NOT_IN_PROGRESS_COMPLETE_INDEX_MASK + == Status::NOT_IN_PROGRESS_COMPLETE_BIT + { + Some(unsafe { self.assume_complete(status) }) + } else if status == Status::EMPTY { + None + } else { + Self::in_progress_or_poisoned_panic(status) + } + } + + #[cold] + fn in_progress_or_poisoned_panic(status: u32) -> ! { + if status == Status::POISONED { + panic!("panic propagation") + } else { + panic!("Entry is unexpectedly in progress") + } + } + + #[cold] + fn wait(&self, parker: P) -> Result<(&V, u32), P::Interrupt> { + // FIXME: try spinning, that's a good trick + let mut status = self.status.0.load(Ordering::Relaxed); + let mut did_park = false; + parker.park(|thread_index| { + let thread_bit = 1 << thread_index; + debug_assert_ne!(status, Status::EMPTY); + loop { + if status & !Status::NOT_IN_PROGRESS_COMPLETE_INDEX_MASK + == Status::NOT_IN_PROGRESS_COMPLETE_BIT + { + break false; + } + if status == Status::POISONED { + break false; + } + // Empty status is ruled as this is the slow path + debug_assert!(status & Status::IN_PROGRESS_BIT != 0); + debug_assert!(status & thread_bit == 0); + let res = self.status.0.compare_exchange_weak( + status, + status | thread_bit, + Ordering::Relaxed, + Ordering::Relaxed, + ); + if let Err(new) = res { + status = new; + } else { + did_park = true; + break true; + } + } + })?; + let old_status = status; + if did_park { + status = self.status.0.load(Ordering::Acquire); + } else { + atomic::fence(Ordering::Acquire); + } + if status & !Status::NOT_IN_PROGRESS_COMPLETE_INDEX_MASK + == Status::NOT_IN_PROGRESS_COMPLETE_BIT + { + unsafe { Ok(self.assume_complete(status)) } + } else { + debug_assert_eq!(status, Status::POISONED, "old status = {old_status:x}"); + panic!("Propagating panic") + } + } + + #[inline] + unsafe fn assume_complete(&self, status: u32) -> (&V, u32) { + debug_assert_eq!( + status & !Status::NOT_IN_PROGRESS_COMPLETE_INDEX_MASK, + Status::NOT_IN_PROGRESS_COMPLETE_BIT + ); + ( + unsafe { (*self.value.get()).assume_init_ref() }, + status & Status::NOT_IN_PROGRESS_COMPLETE_INDEX_MASK, + ) + } + + unsafe fn complete_unchecked(&self, value: V, index: u32, unparker: impl Unparker) -> &V { + debug_assert!(self.status.0.load(atomic::Ordering::Relaxed) & Status::IN_PROGRESS_BIT != 0); + let value = unsafe { (*self.value.get()).write(value) }; + let status = self.status.0.swap(Status::complete(index), Ordering::Release); + debug_assert!(status & Status::IN_PROGRESS_BIT != 0); + let waiters = status & !Status::IN_PROGRESS_BIT; + if waiters != 0 { + unparker.unpark(waiters); + } + value + } +} + +impl Drop for CacheEntry { + fn drop(&mut self) { + if needs_drop::() { + debug_assert!(*self.status.0.get_mut() & Status::IN_PROGRESS_BIT == 0); + if *self.status.0.get_mut() & !Status::NOT_IN_PROGRESS_COMPLETE_INDEX_MASK + == Status::NOT_IN_PROGRESS_COMPLETE_BIT + { + unsafe { self.value.get_mut().assume_init_drop() }; + } + } + } +} + +pub struct EntryInProgress<'a, V> { + entry: &'a CacheEntry, +} + +impl<'a, V> EntryInProgress<'a, V> { + pub fn entry(&self) -> &'a CacheEntry { + self.entry + } + + pub fn complete(self, value: V, index: u32, unparker: impl Unparker) -> &'a V { + let this = ManuallyDrop::new(self); + unsafe { this.entry.complete_unchecked(value, index, unparker) } + } +} + +impl<'a, V> Drop for EntryInProgress<'a, V> { + fn drop(&mut self) { + self.entry.status.0.store(Status::POISONED, Ordering::Release); + } +} diff --git a/compiler/rustc_data_structures/src/lib.rs b/compiler/rustc_data_structures/src/lib.rs index be8538acd30eb..d5424ea8d1be5 100644 --- a/compiler/rustc_data_structures/src/lib.rs +++ b/compiler/rustc_data_structures/src/lib.rs @@ -54,6 +54,7 @@ pub use {either, indexmap, smallvec, thin_vec}; pub mod aligned; pub mod base_n; pub mod binary_search_util; +pub mod cache_entry; pub mod fingerprint; pub mod flat_map_in_place; pub mod flock; diff --git a/compiler/rustc_data_structures/src/marker.rs b/compiler/rustc_data_structures/src/marker.rs index 997077ac4402e..0aa2fd1d20a57 100644 --- a/compiler/rustc_data_structures/src/marker.rs +++ b/compiler/rustc_data_structures/src/marker.rs @@ -62,8 +62,8 @@ already_send!( [std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8] [std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr] [std::io::Error][std::fs::File][std::panic::Location<'_>][rustc_arena::DroplessArena] - [jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap] - [crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice] + [jobserver_crate::Client][jobserver_crate::HelperThread][rustc_thread_pool::Registry] + [crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice] ); #[cfg(target_has_atomic = "64")] @@ -141,8 +141,8 @@ macro_rules! already_sync { already_sync!( [std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8] [std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File][std::panic::Location<'_>] - [jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap] - [crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice] + [jobserver_crate::Client][jobserver_crate::HelperThread][rustc_thread_pool::Registry] + [crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice] ); // Use portable AtomicU64 for targets without native 64-bit atomics diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 3d5bc85278286..f3a5b296b18f7 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -41,6 +41,7 @@ pub use self::parallel::{ broadcast, par_fns, par_for_each_in, par_join, par_map, parallel_guard, spawn, try_par_for_each_in, }; +pub use self::parker::{Parker, Unparker}; pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec}; pub use self::worker_local::{Registry, WorkerLocal}; pub use crate::marker::*; @@ -48,6 +49,7 @@ pub use crate::marker::*; mod freeze; mod lock; mod parallel; +mod parker; mod vec; mod worker_local; diff --git a/compiler/rustc_data_structures/src/sync/parker.rs b/compiler/rustc_data_structures/src/sync/parker.rs new file mode 100644 index 0000000000000..631b38d27d65f --- /dev/null +++ b/compiler/rustc_data_structures/src/sync/parker.rs @@ -0,0 +1,9 @@ +pub trait Parker { + type Interrupt; + + fn park(self, validate: impl FnOnce(usize) -> bool) -> Result<(), Self::Interrupt>; +} + +pub trait Unparker: Copy { + fn unpark(self, thread_bitmask: u32); +} diff --git a/compiler/rustc_data_structures/src/vec_cache.rs b/compiler/rustc_data_structures/src/vec_cache.rs index 6d026bb2c7f74..e27b69fd7a94b 100644 --- a/compiler/rustc_data_structures/src/vec_cache.rs +++ b/compiler/rustc_data_structures/src/vec_cache.rs @@ -9,24 +9,17 @@ use std::fmt::{self, Debug}; use std::marker::PhantomData; use std::ops::{Index, IndexMut}; -use std::sync::atomic::{AtomicPtr, AtomicU32, AtomicUsize, Ordering}; +use std::ptr::{drop_in_place, slice_from_raw_parts_mut}; +use std::slice; +use std::sync::atomic::{AtomicPtr, Ordering}; use rustc_index::Idx; +use crate::cache_entry::CacheEntry; + #[cfg(test)] mod tests; -struct Slot { - // We never construct &Slot so it's fine for this to not be in an UnsafeCell. - value: V, - // This is both an index and a once-lock. - // - // 0: not yet initialized. - // 1: lock held, initializing. - // 2..u32::MAX - 2: initialized. - index_and_lock: AtomicU32, -} - /// This uniquely identifies a single `Slot` entry in the buckets map, and provides accessors for /// either getting the value or putting a value. #[derive(Copy, Clone, Debug)] @@ -68,50 +61,17 @@ impl SlotIndex { SlotIndex { bucket_idx, index_in_bucket } } - // SAFETY: Buckets must be managed solely by functions here (i.e., get/put on SlotIndex) and - // `self` comes from SlotIndex::from_index - #[inline] - unsafe fn get(&self, buckets: &[AtomicPtr>; 21]) -> Option<(V, u32)> { - let bucket = &buckets[self.bucket_idx]; - let ptr = bucket.load(Ordering::Acquire); - // Bucket is not yet initialized: then we obviously won't find this entry in that bucket. - if ptr.is_null() { - return None; - } - debug_assert!(self.index_in_bucket < self.bucket_idx.capacity()); - // SAFETY: `bucket` was allocated (so <= isize in total bytes) to hold `entries`, so this - // must be inbounds. - let slot = unsafe { ptr.add(self.index_in_bucket) }; - - // SAFETY: initialized bucket has zeroed all memory within the bucket, so we are valid for - // AtomicU32 access. - let index_and_lock = unsafe { &(*slot).index_and_lock }; - let current = index_and_lock.load(Ordering::Acquire); - let index = match current { - 0 => return None, - // Treat "initializing" as actually just not initialized at all. - // The only reason this is a separate state is that `complete` calls could race and - // we can't allow that, but from load perspective there's no difference. - 1 => return None, - _ => current - 2, - }; - - // SAFETY: - // * slot is a valid pointer (buckets are always valid for the index we get). - // * value is initialized since we saw a >= 2 index above. - // * `V: Copy`, so safe to read. - let value = unsafe { (*slot).value }; - Some((value, index)) - } - - fn bucket_ptr(&self, bucket: &AtomicPtr>) -> *mut Slot { + fn bucket_ptr(&self, bucket: &AtomicPtr>) -> *mut CacheEntry { let ptr = bucket.load(Ordering::Acquire); if ptr.is_null() { Self::initialize_bucket(bucket, self.bucket_idx) } else { ptr } } #[cold] #[inline(never)] - fn initialize_bucket(bucket: &AtomicPtr>, bucket_idx: BucketIndex) -> *mut Slot { + fn initialize_bucket( + bucket: &AtomicPtr>, + bucket_idx: BucketIndex, + ) -> *mut CacheEntry { static LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); // If we are initializing the bucket, then acquire a global lock. @@ -126,12 +86,13 @@ impl SlotIndex { // initialize this bucket. if ptr.is_null() { let bucket_layout = - std::alloc::Layout::array::>(bucket_idx.capacity()).unwrap(); + std::alloc::Layout::array::>(bucket_idx.capacity()).unwrap(); // This is more of a sanity check -- this code is very cold, so it's safe to pay a // little extra cost here. assert!(bucket_layout.size() > 0); // SAFETY: Just checked that size is non-zero. - let allocated = unsafe { std::alloc::alloc_zeroed(bucket_layout).cast::>() }; + let allocated = + unsafe { std::alloc::alloc_zeroed(bucket_layout).cast::>() }; if allocated.is_null() { std::alloc::handle_alloc_error(bucket_layout); } @@ -146,65 +107,18 @@ impl SlotIndex { /// Returns true if this successfully put into the map. #[inline] - fn put(&self, buckets: &[AtomicPtr>; 21], value: V, extra: u32) -> bool { + fn get_or_init<'a, V>(&self, buckets: &'a [AtomicPtr>; 21]) -> &'a CacheEntry { let bucket = &buckets[self.bucket_idx]; let ptr = self.bucket_ptr(bucket); debug_assert!(self.index_in_bucket < self.bucket_idx.capacity()); // SAFETY: `bucket` was allocated (so <= isize in total bytes) to hold `entries`, so this // must be inbounds. - let slot = unsafe { ptr.add(self.index_in_bucket) }; + let entry_ptr = unsafe { ptr.add(self.index_in_bucket) }; // SAFETY: initialized bucket has zeroed all memory within the bucket, so we are valid for - // AtomicU32 access. - let index_and_lock = unsafe { &(*slot).index_and_lock }; - match index_and_lock.compare_exchange(0, 1, Ordering::AcqRel, Ordering::Acquire) { - Ok(_) => { - // We have acquired the initialization lock. It is our job to write `value` and - // then set the lock to the real index. - - unsafe { - (&raw mut (*slot).value).write(value); - } - - index_and_lock.store(extra.checked_add(2).unwrap(), Ordering::Release); - - true - } - - // Treat "initializing" as the caller's fault. Callers are responsible for ensuring that - // there are no races on initialization. In the compiler's current usage for query - // caches, that's the "active query map" which ensures each query actually runs once - // (even if concurrently started). - Err(1) => panic!("caller raced calls to put()"), - - // This slot was already populated. Also ignore, currently this is the same as - // "initializing". - Err(_) => false, - } - } - - /// Inserts into the map, given that the slot is unique, so it won't race with other threads. - #[inline] - unsafe fn put_unique(&self, buckets: &[AtomicPtr>; 21], value: V, extra: u32) { - let bucket = &buckets[self.bucket_idx]; - let ptr = self.bucket_ptr(bucket); - - debug_assert!(self.index_in_bucket < self.bucket_idx.capacity()); - // SAFETY: `bucket` was allocated (so <= isize in total bytes) to hold `entries`, so this - // must be inbounds. - let slot = unsafe { ptr.add(self.index_in_bucket) }; - - // SAFETY: We known our slot is unique as a precondition of this function, so this can't race. - unsafe { - (&raw mut (*slot).value).write(value); - } - - // SAFETY: initialized bucket has zeroed all memory within the bucket, so we are valid for - // AtomicU32 access. - let index_and_lock = unsafe { &(*slot).index_and_lock }; - - index_and_lock.store(extra.checked_add(2).unwrap(), Ordering::Release); + // CacheEntry access. + unsafe { &*entry_ptr } } } @@ -231,54 +145,35 @@ pub struct VecCache { // Bucket 19: 1073741824 // Bucket 20: 2147483648 // The total number of entries if all buckets are initialized is 2^32. - buckets: [AtomicPtr>; BUCKETS], - - // In the compiler's current usage these are only *read* during incremental and self-profiling. - // They are an optimization over iterating the full buckets array. - present: [AtomicPtr>; BUCKETS], - len: AtomicUsize, + buckets: [AtomicPtr>; BUCKETS], key: PhantomData<(K, I)>, } impl Default for VecCache { fn default() -> Self { - VecCache { - buckets: Default::default(), - key: PhantomData, - len: Default::default(), - present: Default::default(), - } + VecCache { buckets: Default::default(), key: PhantomData } } } // SAFETY: No access to `V` is made. unsafe impl Drop for VecCache { fn drop(&mut self) { - // We have unique ownership, so no locks etc. are needed. Since `K` and `V` are both `Copy`, + // We have unique ownership, so no locks etc. are needed. Since `K` is `Copy`, // we are also guaranteed to just need to deallocate any large arrays (not iterate over // contents). - // - // Confirm no need to deallocate individual entries. Note that `V: Copy` is asserted on - // insert/lookup but not necessarily construction, primarily to avoid annoyingly propagating - // the bounds into struct definitions everywhere. assert!(!std::mem::needs_drop::()); - assert!(!std::mem::needs_drop::()); for (idx, bucket) in BucketIndex::enumerate_buckets(&self.buckets) { let bucket = bucket.load(Ordering::Acquire); if !bucket.is_null() { - let layout = std::alloc::Layout::array::>(ENTRIES_BY_BUCKET[idx]).unwrap(); - unsafe { - std::alloc::dealloc(bucket.cast(), layout); + if std::mem::needs_drop::() { + unsafe { + drop_in_place(slice_from_raw_parts_mut(bucket, ENTRIES_BY_BUCKET[idx])) + } } - } - } - - for (idx, bucket) in BucketIndex::enumerate_buckets(&self.present) { - let bucket = bucket.load(Ordering::Acquire); - if !bucket.is_null() { - let layout = std::alloc::Layout::array::>(ENTRIES_BY_BUCKET[idx]).unwrap(); + let layout = + std::alloc::Layout::array::>(ENTRIES_BY_BUCKET[idx]).unwrap(); unsafe { std::alloc::dealloc(bucket.cast(), layout); } @@ -290,55 +185,33 @@ unsafe impl Drop for VecCache { impl VecCache where K: Eq + Idx + Copy + Debug, - V: Copy, I: Idx + Copy, { - #[inline(always)] - pub fn lookup(&self, key: &K) -> Option<(V, I)> { - let key = u32::try_from(key.index()).unwrap(); - let slot_idx = SlotIndex::from_index(key); - match unsafe { slot_idx.get(&self.buckets) } { - Some((value, idx)) => Some((value, I::new(idx as usize))), - None => None, - } - } - #[inline] - pub fn complete(&self, key: K, value: V, index: I) { + pub fn lookup(&self, key: K) -> &CacheEntry { let key = u32::try_from(key.index()).unwrap(); let slot_idx = SlotIndex::from_index(key); - if slot_idx.put(&self.buckets, value, index.index() as u32) { - let present_idx = self.len.fetch_add(1, Ordering::Relaxed); - let slot = SlotIndex::from_index(u32::try_from(present_idx).unwrap()); - // SAFETY: We should always be uniquely putting due to `len` fetch_add returning unique values. - // We can't get here if `len` overflows because `put` will not succeed u32::MAX + 1 times - // as it will run out of slots. - unsafe { slot.put_unique(&self.present, (), key) }; - } + slot_idx.get_or_init(&self.buckets) } - pub fn for_each(&self, f: &mut dyn FnMut(&K, &V, I)) { - for idx in 0..self.len.load(Ordering::Acquire) { - let key = SlotIndex::from_index(idx as u32); - match unsafe { key.get(&self.present) } { - // This shouldn't happen in our current usage (iter is really only - // used long after queries are done running), but if we hit this in practice it's - // probably fine to just break early. - None => unreachable!(), - Some(((), key)) => { - let key = K::new(key as usize); - // unwrap() is OK: present entries are always written only after we put the real - // entry. - let value = self.lookup(&key).unwrap(); - f(&key, &value.0, value.1); + pub fn for_each(&self, mut f: impl FnMut(K, &V, I)) { + for (bucket_idx, bucket) in BucketIndex::enumerate_buckets(&self.buckets) { + let mut idx = + if let BucketIndex::Bucket00 = bucket_idx { 0 } else { bucket_idx.capacity() }; + let bucket = bucket.load(Ordering::Acquire); + if !bucket.is_null() { + let entries = + unsafe { slice::from_raw_parts(bucket, ENTRIES_BY_BUCKET[bucket_idx]) }; + for entry in entries { + if let Some((value, additional_index)) = entry.get_finished() { + let key = K::new(idx); + f(key, value, I::new(additional_index as usize)); + } + idx += 1; } } } } - - pub fn len(&self) -> usize { - self.len.load(Ordering::Acquire) - } } /// Index into an array of buckets. diff --git a/compiler/rustc_interface/src/passes.rs b/compiler/rustc_interface/src/passes.rs index bcd1a52ce9dcd..c653f3905fb1e 100644 --- a/compiler/rustc_interface/src/passes.rs +++ b/compiler/rustc_interface/src/passes.rs @@ -1,6 +1,7 @@ use std::any::Any; use std::ffi::{OsStr, OsString}; use std::io::{self, BufWriter, Write}; +use std::mem::transmute; use std::path::{Path, PathBuf}; use std::sync::{Arc, LazyLock, OnceLock}; use std::{env, fs, iter}; @@ -30,7 +31,7 @@ use rustc_lint::{BufferedEarlyLint, EarlyCheckNode, LintStore, unerased_lint_sto use rustc_metadata::EncodedMetadata; use rustc_metadata::creader::CStore; use rustc_middle::arena::Arena; -use rustc_middle::ty::{self, RegisteredTools, TyCtxt}; +use rustc_middle::ty::{self, GlobalCtxt, RegisteredTools, TyCtxt}; use rustc_middle::util::Providers; use rustc_parse::lexer::StripTokens; use rustc_parse::{new_parser_from_file, new_parser_from_source_str, unwrap_or_emit_fatal}; @@ -979,17 +980,17 @@ pub fn create_and_enter_global_ctxt FnOnce(TyCtxt<'tcx>) -> T>( // Similarly, by creating `arena` here and passing in `&arena`, that reference has the type // `&'tcx WorkerLocal>`, also with one lifetime. And likewise for `hir_arena`. - let gcx_cell = OnceLock::new(); + let gcx_cell: OnceLock> = OnceLock::new(); let arena = WorkerLocal::new(|_| Arena::default()); let hir_arena = WorkerLocal::new(|_| rustc_hir::Arena::default()); TyCtxt::create_global_ctxt( - &gcx_cell, + unsafe { transmute(&gcx_cell) }, &compiler.sess, crate_types, stable_crate_id, - &arena, - &hir_arena, + unsafe { transmute(&arena) }, + unsafe { transmute(&hir_arena) }, untracked, dep_graph, rustc_query_impl::make_dep_kind_vtables(&arena), diff --git a/compiler/rustc_interface/src/util.rs b/compiler/rustc_interface/src/util.rs index d7d306918fd0d..4445867dc75be 100644 --- a/compiler/rustc_interface/src/util.rs +++ b/compiler/rustc_interface/src/util.rs @@ -20,14 +20,13 @@ use rustc_data_structures::sync; use rustc_metadata::{DylibError, EncodedMetadata, load_symbol_from_dylib}; use rustc_middle::dep_graph::{WorkProduct, WorkProductId}; use rustc_middle::ty::{CurrentGcx, TyCtxt}; -use rustc_query_impl::{CollectActiveJobsKind, collect_active_query_jobs}; use rustc_session::config::{ Cfg, CrateType, OutFileName, OutputFilenames, OutputTypes, Sysroot, host_tuple, }; use rustc_session::{EarlyDiagCtxt, Session, filesearch}; use rustc_span::edition::Edition; use rustc_span::source_map::SourceMapInputs; -use rustc_span::{SessionGlobals, Symbol, sym}; +use rustc_span::{Symbol, sym}; use rustc_target::spec::Target; use tracing::info; @@ -225,9 +224,6 @@ pub(crate) fn run_in_thread_pool_with_globals< let current_gcx2 = current_gcx2.clone(); let registry = rustc_thread_pool::Registry::current(); - let session_globals = rustc_span::with_session_globals(|session_globals| { - session_globals as *const SessionGlobals as usize - }); thread::Builder::new() .name("rustc query cycle handler".to_string()) .spawn(move || { @@ -249,20 +245,7 @@ internal compiler error: query cycle handler thread panicked, aborting process"; tls::enter_context(&tls::ImplicitCtxt::new(gcx), || { tls::with(|tcx| { // Accessing session globals is sound as they outlive `GlobalCtxt`. - // They are needed to hash query keys containing spans or symbols. - let job_map = rustc_span::set_session_globals_then( - unsafe { &*(session_globals as *const SessionGlobals) }, - || { - // Ensure there were no errors collecting all active jobs. - // We need the complete map to ensure we find a cycle to - // break. - collect_active_query_jobs( - tcx, - CollectActiveJobsKind::FullNoContention, - ) - }, - ); - break_query_cycle(job_map, ®istry); + break_query_cycle(tcx, ®istry); }) }) }); diff --git a/compiler/rustc_middle/src/query/caches.rs b/compiler/rustc_middle/src/query/caches.rs index 665d4f4d7dfae..41ef3bdaa228d 100644 --- a/compiler/rustc_middle/src/query/caches.rs +++ b/compiler/rustc_middle/src/query/caches.rs @@ -1,6 +1,7 @@ -use std::sync::OnceLock; - +use rustc_arena::TypedArena; +use rustc_data_structures::cache_entry::CacheEntry; use rustc_data_structures::sharded::ShardedHashMap; +use rustc_data_structures::sync::{DynSend, DynSync, WorkerLocal}; pub use rustc_data_structures::vec_cache::VecCache; use rustc_hir::def_id::LOCAL_CRATE; use rustc_index::Idx; @@ -14,111 +15,113 @@ use crate::query::keys::QueryKey; /// /// Types implementing this trait are associated with actual key/value types /// by the `Cache` associated type of the `rustc_middle::query::Key` trait. -pub trait QueryCache: Sized { +pub trait QueryCache: Sized + DynSync { type Key: QueryKey; - type Value: Copy; + type Value: Copy + DynSend + DynSync; /// Returns the cached value (and other information) associated with the /// given key, if it is present in the cache. - fn lookup(&self, key: &Self::Key) -> Option<(Self::Value, DepNodeIndex)>; - - /// Adds a key/value entry to this cache. - /// - /// Called by some part of the query system, after having obtained the - /// value by executing the query or loading a cached value from disk. - fn complete(&self, key: Self::Key, value: Self::Value, index: DepNodeIndex); - - /// Calls a closure on each entry in this cache. - fn for_each(&self, f: &mut dyn FnMut(&Self::Key, &Self::Value, DepNodeIndex)); - - /// Returns the number of entries currently in this cache. - /// - /// Useful for reserving capacity in data structures that will hold the - /// output of a call to [`Self::for_each`]. - fn len(&self) -> usize; + fn lookup(&self, key: Self::Key) -> &CacheEntry; + + /// Calls a closure on each entry in this cache. Panics if any cache entry is still in progress. + fn for_each(&self, f: impl FnMut(Self::Key, &Self::Value, DepNodeIndex)); } +struct SyncConstPtr(*const T); + +impl Copy for SyncConstPtr {} + +impl Clone for SyncConstPtr { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} +unsafe impl Send for SyncConstPtr {} +unsafe impl Sync for SyncConstPtr {} +unsafe impl DynSend for SyncConstPtr {} +unsafe impl DynSync for SyncConstPtr {} + /// In-memory cache for queries whose keys aren't suitable for any of the /// more specialized kinds of cache. Backed by a sharded hashmap. pub struct DefaultCache { - cache: ShardedHashMap, + cache: ShardedHashMap>>, + arena: WorkerLocal>>, +} + +impl DefaultCache { + pub fn store_without_tracking(&self, x: V) -> &V { + self.arena + .alloc(CacheEntry::complete(DepNodeIndex::FOREVER_RED_NODE.as_u32(), x)) + .get_finished() + .unwrap() + .0 + } } impl Default for DefaultCache { fn default() -> Self { - DefaultCache { cache: Default::default() } + DefaultCache { cache: Default::default(), arena: Default::default() } } } impl QueryCache for DefaultCache where K: QueryKey, - V: Copy, + V: Copy + DynSend + DynSync, { type Key = K; type Value = V; #[inline(always)] - fn lookup(&self, key: &K) -> Option<(V, DepNodeIndex)> { - self.cache.get(key) - } - - #[inline] - fn complete(&self, key: K, value: V, index: DepNodeIndex) { - self.cache.insert_unique(key, (value, index)); + fn lookup(&self, key: K) -> &CacheEntry { + // FIXME: zero out arena to avoid writes + unsafe { + &*self + .cache + .get_or_insert_with(key, || SyncConstPtr(self.arena.alloc(CacheEntry::empty()))) + .0 + } } - fn for_each(&self, f: &mut dyn FnMut(&Self::Key, &Self::Value, DepNodeIndex)) { + fn for_each(&self, mut f: impl FnMut(Self::Key, &Self::Value, DepNodeIndex)) { for shard in self.cache.lock_shards() { - for (k, v) in shard.iter() { - f(k, &v.0, v.1); + for &(k, ent) in shard.iter() { + let Some((v, i)) = (unsafe { (*ent.0).get_finished() }) else { continue }; + f(k, v, DepNodeIndex::from_u32(i)); } } } - - fn len(&self) -> usize { - self.cache.len() - } } /// In-memory cache for queries whose key type only has one value (e.g. `()`). /// The cache therefore only needs to store one query return value. pub struct SingleCache { - cache: OnceLock<(V, DepNodeIndex)>, + entry: CacheEntry, } impl Default for SingleCache { fn default() -> Self { - SingleCache { cache: OnceLock::new() } + SingleCache { entry: CacheEntry::empty() } } } impl QueryCache for SingleCache where - V: Copy, + V: Copy + DynSend + DynSync, { type Key = (); type Value = V; #[inline(always)] - fn lookup(&self, _key: &()) -> Option<(V, DepNodeIndex)> { - self.cache.get().copied() - } - - #[inline] - fn complete(&self, _key: (), value: V, index: DepNodeIndex) { - self.cache.set((value, index)).ok(); + fn lookup(&self, _key: ()) -> &CacheEntry { + &self.entry } - fn for_each(&self, f: &mut dyn FnMut(&Self::Key, &Self::Value, DepNodeIndex)) { - if let Some(value) = self.cache.get() { - f(&(), &value.0, value.1) + fn for_each(&self, mut f: impl FnMut(Self::Key, &Self::Value, DepNodeIndex)) { + if let Some((value, index)) = self.entry.get_finished() { + f((), value, DepNodeIndex::from_u32(index)) } } - - fn len(&self) -> usize { - self.cache.get().is_some().into() - } } /// In-memory cache for queries whose key is a [`DefId`]. @@ -138,66 +141,56 @@ impl Default for DefIdCache { } } +impl DefIdCache { + pub fn store_without_tracking(&self, x: V) -> &V { + self.foreign + .arena + .alloc(CacheEntry::complete(DepNodeIndex::FOREVER_RED_NODE.as_u32(), x)) + .get_finished() + .unwrap() + .0 + } +} + impl QueryCache for DefIdCache where - V: Copy, + V: Copy + DynSend + DynSync, { type Key = DefId; type Value = V; #[inline(always)] - fn lookup(&self, key: &DefId) -> Option<(V, DepNodeIndex)> { + fn lookup(&self, key: DefId) -> &CacheEntry { if key.krate == LOCAL_CRATE { - self.local.lookup(&key.index) + self.local.lookup(key.index) } else { self.foreign.lookup(key) } } - #[inline] - fn complete(&self, key: DefId, value: V, index: DepNodeIndex) { - if key.krate == LOCAL_CRATE { - self.local.complete(key.index, value, index) - } else { - self.foreign.complete(key, value, index) - } - } - - fn for_each(&self, f: &mut dyn FnMut(&Self::Key, &Self::Value, DepNodeIndex)) { - self.local.for_each(&mut |key, value, index| { - f(&DefId { krate: LOCAL_CRATE, index: *key }, value, index); + fn for_each(&self, mut f: impl FnMut(Self::Key, &Self::Value, DepNodeIndex)) { + self.local.for_each(|index, value, dep_index| { + f(DefId { krate: LOCAL_CRATE, index }, value, dep_index); }); self.foreign.for_each(f); } - - fn len(&self) -> usize { - self.local.len() + self.foreign.len() - } } impl QueryCache for VecCache where K: Idx + QueryKey, - V: Copy, + V: Copy + DynSend + DynSync, { type Key = K; type Value = V; #[inline(always)] - fn lookup(&self, key: &K) -> Option<(V, DepNodeIndex)> { + fn lookup(&self, key: K) -> &CacheEntry { self.lookup(key) } - #[inline] - fn complete(&self, key: K, value: V, index: DepNodeIndex) { - self.complete(key, value, index) - } - - fn for_each(&self, f: &mut dyn FnMut(&Self::Key, &Self::Value, DepNodeIndex)) { + #[inline(always)] + fn for_each(&self, f: impl FnMut(K, &V, DepNodeIndex)) { self.for_each(f) } - - fn len(&self) -> usize { - self.len() - } } diff --git a/compiler/rustc_middle/src/query/inner.rs b/compiler/rustc_middle/src/query/inner.rs index 402c448a1fa3a..3562e3b907d64 100644 --- a/compiler/rustc_middle/src/query/inner.rs +++ b/compiler/rustc_middle/src/query/inner.rs @@ -1,12 +1,12 @@ //! Helper functions that serve as the immediate implementation of //! `tcx.$query(..)` and its variations. +use rustc_data_structures::cache_entry::{self, CacheEntry, EntryInProgress}; use rustc_span::{DUMMY_SP, ErrorGuaranteed, Span}; -use crate::dep_graph; -use crate::dep_graph::DepNodeKey; +use crate::dep_graph::{self, DepNodeKey}; use crate::query::erase::{self, Erasable, Erased}; -use crate::query::{EnsureMode, QueryCache, QueryMode, QueryVTable}; +use crate::query::{Cycle, EnsureMode, QueryCache, QueryMode, QueryVTable}; use crate::ty::TyCtxt; /// Checks whether there is already a value for this key in the in-memory @@ -14,17 +14,37 @@ use crate::ty::TyCtxt; /// /// (Also performs some associated bookkeeping, if a value was found.) #[inline(always)] -fn try_get_cached<'tcx, C>(tcx: TyCtxt<'tcx>, cache: &C, key: C::Key) -> Option -where - C: QueryCache, -{ - match cache.lookup(&key) { - Some((value, index)) => { +fn get_cached_or_start<'tcx, V: Copy>( + tcx: TyCtxt<'tcx>, + span: Span, + entry: &'tcx CacheEntry, + handle_cycle: impl FnOnce(TyCtxt<'tcx>, Cycle<'tcx>) -> V, +) -> Result> { + match tcx.cache_entry_get_or_start(entry, span) { + Ok((value, index)) => { + tcx.prof.query_cache_hit(index.into()); + tcx.dep_graph.read_index(index); + Ok(*value) + } + Err(cache_entry::GetOrStartError::InProgress(in_progress)) => Err(in_progress), + Err(cache_entry::GetOrStartError::Interrupted(cycle)) => Ok(handle_cycle(tcx, cycle)), + } +} + +#[inline(always)] +fn try_get_cached<'tcx, V: Copy>( + tcx: TyCtxt<'tcx>, + entry: &'tcx CacheEntry, + handle_cycle: impl FnOnce(TyCtxt<'tcx>, Cycle<'tcx>) -> V, +) -> Option { + match tcx.cache_entry_get(entry, DUMMY_SP) { + Ok((value, index)) => { tcx.prof.query_cache_hit(index.into()); tcx.dep_graph.read_index(index); - Some(value) + Some(*value) } - None => None, + Err(cache_entry::GetError::Interrupted(cycle)) => Some(handle_cycle(tcx, cycle)), + Err(cache_entry::GetError::InProgress) => None, } } @@ -40,9 +60,10 @@ pub(crate) fn query_get_at<'tcx, C>( where C: QueryCache, { - match try_get_cached(tcx, &query.cache, key) { - Some(value) => value, - None => (query.execute_query_fn)(tcx, span, key, QueryMode::Get).unwrap(), + let entry = query.cache.lookup(key); + match get_cached_or_start(tcx, span, entry, query.cycle_handler(key)) { + Ok(value) => value, + Err(entry) => (query.execute_query_fn)(tcx, span, key, QueryMode::Get { entry }).unwrap(), } } @@ -57,10 +78,11 @@ pub(crate) fn query_ensure_ok_or_done<'tcx, C>( ) where C: QueryCache, { - match try_get_cached(tcx, &query.cache, key) { + let entry = query.cache.lookup(key); + match try_get_cached(tcx, entry, query.cycle_handler(key)) { Some(_value) => {} None => { - (query.execute_query_fn)(tcx, DUMMY_SP, key, QueryMode::Ensure { ensure_mode }); + (query.execute_query_fn)(tcx, DUMMY_SP, key, QueryMode::Ensure { ensure_mode, entry }); } } } @@ -68,7 +90,7 @@ pub(crate) fn query_ensure_ok_or_done<'tcx, C>( /// Implementation of `tcx.ensure_result().$query(..)` for queries that /// return `Result<_, ErrorGuaranteed>`. #[inline] -pub(crate) fn query_ensure_result<'tcx, C, T>( +pub(crate) fn query_ensure_result<'tcx, C, T: 'tcx>( tcx: TyCtxt<'tcx>, query: &'tcx QueryVTable<'tcx, C>, key: C::Key, @@ -84,14 +106,15 @@ where } }; - match try_get_cached(tcx, &query.cache, key) { + let entry = query.cache.lookup(key); + match try_get_cached(tcx, entry, query.cycle_handler(key)) { Some(value) => convert(value), None => { match (query.execute_query_fn)( tcx, DUMMY_SP, key, - QueryMode::Ensure { ensure_mode: EnsureMode::Ok }, + QueryMode::Ensure { ensure_mode: EnsureMode::Ok, entry }, ) { // We executed the query. Convert the successful result. Some(res) => convert(res), @@ -122,8 +145,9 @@ pub(crate) fn query_feed<'tcx, C>( let format_value = query.format_value; // Check whether the in-memory cache already has a value for this key. - match try_get_cached(tcx, &query.cache, key) { - Some(old) => { + let entry = query.cache.lookup(key); + match get_cached_or_start(tcx, DUMMY_SP, entry, query.cycle_handler(key)) { + Ok(old) => { // The query already has a cached value for this key. // That's OK if both values are the same, i.e. they have the same hash, // so now we check their hashes. @@ -154,7 +178,7 @@ pub(crate) fn query_feed<'tcx, C>( ) } } - None => { + Err(entry) => { // There is no cached value for this key, so feed the query by // adding the provided value to the cache. let dep_node = dep_graph::DepNode::construct(tcx, query.dep_kind, &key); @@ -165,7 +189,7 @@ pub(crate) fn query_feed<'tcx, C>( query.hash_value_fn, query.format_value, ); - query.cache.complete(key, value, dep_node_index); + entry.complete(value, dep_node_index.as_u32(), &tcx.parking_area); } } } diff --git a/compiler/rustc_middle/src/query/job.rs b/compiler/rustc_middle/src/query/job.rs index 8c78bf24287e0..b981a30abb416 100644 --- a/compiler/rustc_middle/src/query/job.rs +++ b/compiler/rustc_middle/src/query/job.rs @@ -1,138 +1,207 @@ -use std::fmt::Debug; -use std::hash::Hash; -use std::num::NonZero; use std::sync::Arc; +use std::{mem, ptr}; -use parking_lot::{Condvar, Mutex}; +use parking_lot::{Condvar, Mutex, MutexGuard}; +use rustc_data_structures::sync::{CacheAligned, DynSync, Parker, Unparker}; +use rustc_data_structures::{cache_entry, jobserver}; use rustc_span::Span; +use rustc_thread_pool::{Registry, current_thread_index}; +use crate::queries::TaggedQueryKey; use crate::query::Cycle; -use crate::ty::TyCtxt; -/// A value uniquely identifying an active query job. -#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] -pub struct QueryJobId(pub NonZero); +pub type QueryJobRef<'a, 'tcx> = &'a QueryJob<'a, 'tcx>; /// Represents an active query job. -#[derive(Clone, Debug)] -pub struct QueryJob<'tcx> { - pub id: QueryJobId, - +#[derive(Clone, Copy)] +pub struct QueryJob<'a, 'tcx> { /// The span corresponding to the reason for which this query was required. pub span: Span, /// The parent query job which created this job and is implicitly waiting on it. - pub parent: Option, + pub parent: Option>, + + pub form_tagged_key: &'a (dyn Fn() -> TaggedQueryKey<'tcx> + DynSync), - /// The latch that is used to wait on this job. - pub latch: Option>, + pub entry_status: &'a cache_entry::Status, } -impl<'tcx> QueryJob<'tcx> { - /// Creates a new query job. - #[inline] - pub fn new(id: QueryJobId, span: Span, parent: Option) -> Self { - QueryJob { id, span, parent, latch: None } +#[derive(Clone, Copy)] +pub struct QueryWaiter<'a, 'tcx> { + pub span: Span, + pub parent: Option>, +} + +pub struct WorkerParkingArea<'tcx> { + registry: Arc, + proxy: Arc, + lots: Box<[CacheAligned>]>, +} + +impl<'tcx> WorkerParkingArea<'tcx> { + pub fn new(proxy: Arc) -> Self { + let registry = Registry::current(); + let lots = + (0..registry.num_threads()).map(|_| CacheAligned(WorkerParkingLot::new())).collect(); + Self { registry, proxy, lots } + } + + #[inline(always)] + pub fn parker<'a>(&'a self, waiter: QueryWaiter<'a, 'tcx>) -> WorkerParker<'a, 'tcx> { + WorkerParker { area: self, waiter } } +} - pub fn latch(&mut self) -> QueryLatch<'tcx> { - self.latch.get_or_insert_with(QueryLatch::new).clone() +#[derive(Clone, Copy)] +pub struct WorkerParker<'a, 'tcx> { + area: &'a WorkerParkingArea<'tcx>, + waiter: QueryWaiter<'a, 'tcx>, +} + +impl<'a, 'tcx> Parker for WorkerParker<'a, 'tcx> { + type Interrupt = Cycle<'tcx>; + + fn park(self, validate: impl FnOnce(usize) -> bool) -> Result<(), Self::Interrupt> { + debug_assert!(Registry::with_current(|registry| ptr::eq( + &**registry, + &*self.area.registry + ))); + let thread_index = current_thread_index().unwrap(); + debug_assert!(thread_index < self.area.registry.num_threads()); + let validate = || validate(thread_index).then(|| self.waiter); + self.area.lots[thread_index].0.park(validate, &self.area.proxy) } +} - /// Signals to waiters that the query is complete. - /// - /// This does nothing for single threaded rustc, - /// as there are no concurrent jobs which could be waiting on us - #[inline] - pub fn signal_complete(self) { - if let Some(latch) = self.latch { - latch.set(); +impl<'a, 'tcx> Unparker for &'a WorkerParkingArea<'tcx> { + fn unpark(self, thread_bitmask: u32) { + debug_assert_eq!( + thread_bitmask + & !(u32::MAX >> (u32::BITS - u32::try_from(self.registry.num_threads()).unwrap())), + 0 + ); + let mut waiters = [(); rustc_thread_pool::max_num_threads()].map(|()| None); + for i in 0..self.registry.num_threads() { + if thread_bitmask & (1 << i) != 0 { + waiters[i] = Some( + self.lots[i].0.lock_waiter().expect("trying to unpark a non-parked thread"), + ); + } + } + rustc_thread_pool::mark_unblocked(&self.registry, thread_bitmask.count_ones() as usize); + for waiter in waiters.iter_mut() { + if let Some(waiter) = waiter.take() { + waiter.unpark(); + } } } } -#[derive(Debug)] -pub struct QueryWaiter<'tcx> { - pub parent: Option, - pub condvar: Condvar, - pub span: Span, - pub cycle: Mutex>>, +impl<'tcx> WorkerParkingArea<'tcx> { + pub fn lock_waiter(&self, thread_index: usize) -> Option> { + self.lots[thread_index].0.lock_waiter() + } } -#[derive(Clone, Debug)] -pub struct QueryLatch<'tcx> { - /// The `Option` is `Some(..)` when the job is active, and `None` once completed. - pub waiters: Arc>>>>>, +enum WorkerStatus<'tcx> { + Free, + Cycle(Cycle<'tcx>), + Waiting(QueryWaiter<'tcx, 'tcx>), } -impl<'tcx> QueryLatch<'tcx> { - fn new() -> Self { - QueryLatch { waiters: Arc::new(Mutex::new(Some(Vec::new()))) } +struct WorkerParkingLot<'tcx> { + condvar: Condvar, + status: Mutex>, +} + +pub struct QueryWaiterGuard<'a, 'tcx> { + guard: MutexGuard<'a, WorkerStatus<'tcx>>, + condvar: &'a Condvar, +} + +impl<'tcx> WorkerParkingLot<'tcx> { + const fn new() -> Self { + Self { condvar: Condvar::new(), status: Mutex::new(WorkerStatus::Free) } } - /// Awaits for the query job to complete. - pub fn wait_on( + fn park<'a>( &self, - tcx: TyCtxt<'tcx>, - query: Option, - span: Span, - ) -> Result<(), Cycle<'tcx>> { - let mut waiters_guard = self.waiters.lock(); - let Some(waiters) = &mut *waiters_guard else { - return Ok(()); // already complete + validate: impl FnOnce() -> Option>, + jobserver_proxy: &jobserver::Proxy, + ) -> Result<(), Cycle<'tcx>> + where + 'tcx: 'a, + { + let mut status_lock = self.status.lock(); + assert!( + matches!(*status_lock, WorkerStatus::Free), + "tried to park on a used worker parking lot" + ); + let Some(waiter) = validate() else { + return Ok(()); }; - - let waiter = Arc::new(QueryWaiter { - parent: query, - span, - cycle: Mutex::new(None), - condvar: Condvar::new(), - }); - - // We push the waiter on to the `waiters` list. It can be accessed inside - // the `wait` call below, by 1) the `set` method or 2) by deadlock detection. - // Both of these will remove it from the `waiters` list before resuming - // this thread. - waiters.push(Arc::clone(&waiter)); - - // Awaits the caller on this latch by blocking the current thread. - // If this detects a deadlock and the deadlock handler wants to resume this thread - // we have to be in the `wait` call. This is ensured by the deadlock handler - // getting the self.info lock. rustc_thread_pool::mark_blocked(); - tcx.jobserver_proxy.release_thread(); - waiter.condvar.wait(&mut waiters_guard); - // Release the lock before we potentially block in `acquire_thread` - drop(waiters_guard); - tcx.jobserver_proxy.acquire_thread(); - - // FIXME: Get rid of this lock. We have ownership of the QueryWaiter - // although another thread may still have a Arc reference so we cannot - // use Arc::get_mut - let mut cycle = waiter.cycle.lock(); - match cycle.take() { - None => Ok(()), - Some(cycle) => Err(cycle), + jobserver_proxy.release_thread(); + // SAFETY: WorkerParkingLot::lock_waiting makes sure transmuted lifetime remains valid + unsafe { + *status_lock = WorkerStatus::Waiting(mem::transmute(waiter)); } + self.condvar.wait(&mut status_lock); + // Spurious wakes aren't possible in parking_lot + let res = match mem::replace(&mut *status_lock, WorkerStatus::Free) { + WorkerStatus::Free => Ok(()), + WorkerStatus::Cycle(cycle) => Err(cycle), + WorkerStatus::Waiting(waiter) => { + span_bug!(waiter.span, "unexpectedly found a waiter after unparking") + } + }; + drop(status_lock); + jobserver_proxy.acquire_thread(); + res } - /// Sets the latch and resumes all waiters on it - fn set(&self) { - let mut waiters_guard = self.waiters.lock(); - let waiters = waiters_guard.take().unwrap(); // mark the latch as complete - let registry = rustc_thread_pool::Registry::current(); - for waiter in waiters { - rustc_thread_pool::mark_unblocked(®istry); - waiter.condvar.notify_one(); + #[inline] + fn lock_waiter(&self) -> Option> { + let status_lock = self.status.lock(); + match *status_lock { + WorkerStatus::Free => None, + WorkerStatus::Cycle(_) => { + panic!("Waiter thread unexpectedly has a pending cycle error") + } + WorkerStatus::Waiting(_) => { + Some(QueryWaiterGuard { guard: status_lock, condvar: &self.condvar }) + } } } +} + +impl<'a, 'tcx> QueryWaiterGuard<'a, 'tcx> { + #[inline] + pub fn span(&self) -> Span { + let WorkerStatus::Waiting(waiter) = &*self.guard else { panic!() }; + waiter.span + } + + #[inline] + pub fn parent(&self) -> Option> { + let WorkerStatus::Waiting(waiter) = &*self.guard else { panic!() }; + waiter.parent + } - /// Removes a single waiter from the list of waiters. - /// This is used to break query cycles. - pub fn extract_waiter(&self, waiter: usize) -> Arc> { - let mut waiters_guard = self.waiters.lock(); - let waiters = waiters_guard.as_mut().expect("non-empty waiters vec"); - // Remove the waiter from the list of waiters - waiters.remove(waiter) + #[inline] + fn unpark(mut self) { + debug_assert!(matches!(*self.guard, WorkerStatus::Waiting(_))); + *self.guard = WorkerStatus::Free; + drop(self.guard); + assert!(self.condvar.notify_one()); + } + + #[inline] + pub fn unpark_with_cycle(mut self, cycle: Cycle<'tcx>, registry: &Registry) { + debug_assert!(matches!(*self.guard, WorkerStatus::Waiting(_))); + *self.guard = WorkerStatus::Cycle(cycle); + rustc_thread_pool::mark_unblocked(registry, 1); + drop(self.guard); + assert!(self.condvar.notify_one()); } } diff --git a/compiler/rustc_middle/src/query/keys.rs b/compiler/rustc_middle/src/query/keys.rs index 569c30a6067a9..07168a3a11479 100644 --- a/compiler/rustc_middle/src/query/keys.rs +++ b/compiler/rustc_middle/src/query/keys.rs @@ -7,6 +7,7 @@ use std::hash::Hash; use rustc_ast::tokenstream::TokenStream; use rustc_data_structures::sso::SsoHashSet; use rustc_data_structures::stable_hash::StableHash; +use rustc_data_structures::sync::{DynSend, DynSync}; use rustc_hir::def_id::{CrateNum, DefId, LOCAL_CRATE, LocalDefId, LocalModDefId}; use rustc_hir::hir_id::OwnerId; use rustc_span::{DUMMY_SP, Ident, LocalExpnId, Span, Symbol}; @@ -24,7 +25,7 @@ use crate::{mir, traits}; #[derive(Copy, Clone, Debug)] pub struct LocalCrate; -pub trait QueryKeyBounds = Copy + Debug + Eq + Hash + StableHash; +pub trait QueryKeyBounds = Copy + Debug + Eq + Hash + StableHash + DynSync + DynSend; /// Controls what types can legally be used as the key for a query. pub trait QueryKey: Sized + QueryKeyBounds { diff --git a/compiler/rustc_middle/src/query/mod.rs b/compiler/rustc_middle/src/query/mod.rs index b7e5e9bcb5e32..bba7aac1246dc 100644 --- a/compiler/rustc_middle/src/query/mod.rs +++ b/compiler/rustc_middle/src/query/mod.rs @@ -2,11 +2,11 @@ use rustc_hir::def_id::LocalDefId; pub use self::caches::{DefIdCache, DefaultCache, QueryCache, SingleCache, VecCache}; pub use self::into_query_key::IntoQueryKey; -pub use self::job::{QueryJob, QueryJobId, QueryLatch, QueryWaiter}; +pub use self::job::{QueryJob, QueryJobRef, QueryWaiter, QueryWaiterGuard, WorkerParkingArea}; pub use self::keys::{AsLocalQueryKey, LocalCrate, QueryKey}; pub use self::plumbing::{ - ActiveKeyStatus, Cycle, EnsureMode, QueryMode, QueryState, QuerySystem, QueryVTable, TyCtxtAt, - TyCtxtEnsureDone, TyCtxtEnsureOk, TyCtxtEnsureResult, + Cycle, EnsureMode, QueryMode, QuerySystem, QueryVTable, TyCtxtAt, TyCtxtEnsureDone, + TyCtxtEnsureOk, TyCtxtEnsureResult, }; pub use self::stack::QueryStackFrame; pub use crate::queries::Providers; diff --git a/compiler/rustc_middle/src/query/plumbing.rs b/compiler/rustc_middle/src/query/plumbing.rs index 8cc5e8105949a..94e4ebe09fcef 100644 --- a/compiler/rustc_middle/src/query/plumbing.rs +++ b/compiler/rustc_middle/src/query/plumbing.rs @@ -1,12 +1,10 @@ use std::fmt; use std::ops::Deref; +use rustc_data_structures::cache_entry::{CacheEntry, EntryInProgress}; use rustc_data_structures::fingerprint::Fingerprint; use rustc_data_structures::fx::FxIndexMap; -use rustc_data_structures::hash_table::HashTable; -use rustc_data_structures::sharded::Sharded; -use rustc_data_structures::sync::{AtomicU64, Lock, WorkerLocal}; -use rustc_errors::Diag; +use rustc_data_structures::sync::{Lock, WorkerLocal}; use rustc_hir::def_id::LocalDefId; use rustc_span::Span; @@ -14,40 +12,9 @@ use crate::dep_graph::{DepKind, DepNodeIndex, QuerySideEffect, SerializedDepNode use crate::ich::StableHashState; use crate::queries::{ExternProviders, Providers, QueryArenas, QueryVTables, TaggedQueryKey}; use crate::query::on_disk_cache::OnDiskCache; -use crate::query::{IntoQueryKey, QueryCache, QueryJob, QueryStackFrame}; +use crate::query::{IntoQueryKey, QueryCache, QueryStackFrame}; use crate::ty::{self, TyCtxt}; -/// For a particular query, keeps track of "active" keys, i.e. keys whose -/// evaluation has started but has not yet finished successfully. -/// -/// (Successful query evaluation for a key is represented by an entry in the -/// query's in-memory cache.) -pub struct QueryState<'tcx, K> { - pub active: Sharded)>>, -} - -impl<'tcx, K> Default for QueryState<'tcx, K> { - fn default() -> QueryState<'tcx, K> { - QueryState { active: Default::default() } - } -} - -/// For a particular query and key, tracks the status of a query evaluation -/// that has started, but has not yet finished successfully. -/// -/// (Successful query evaluation for a key is represented by an entry in the -/// query's in-memory cache.) -pub enum ActiveKeyStatus<'tcx> { - /// Some thread is already evaluating the query for this key. - /// - /// The enclosed [`QueryJob`] can be used to wait for it to finish. - Started(QueryJob<'tcx>), - - /// The query panicked. Queries trying to wait on this will raise a fatal error which will - /// silently panic. - Poisoned, -} - #[derive(Debug)] pub struct Cycle<'tcx> { /// The query and related span that uses the cycle. @@ -57,12 +24,11 @@ pub struct Cycle<'tcx> { pub frames: Vec>, } -#[derive(Debug)] -pub enum QueryMode { +pub enum QueryMode<'tcx, V> { /// This is a normal query call to `tcx.$query(..)` or `tcx.at(span).$query(..)`. - Get, + Get { entry: EntryInProgress<'tcx, V> }, /// This is a call to `tcx.ensure_ok().$query(..)` or `tcx.ensure_done().$query(..)`. - Ensure { ensure_mode: EnsureMode }, + Ensure { entry: &'tcx CacheEntry, ensure_mode: EnsureMode }, } /// Distinguishes between `tcx.ensure_ok()` and `tcx.ensure_done()` in shared @@ -87,7 +53,6 @@ pub struct QueryVTable<'tcx, C: QueryCache> { pub feedable: bool, pub dep_kind: DepKind, - pub state: QueryState<'tcx, C::Key>, pub cache: C, /// Function pointer that actually calls this query's provider. @@ -114,8 +79,7 @@ pub struct QueryVTable<'tcx, C: QueryCache> { /// it should be emitted) or `delay_as_bug` (if it need not be emitted because an alternative /// error is created and emitted). A value may be returned, or (more commonly) the function may /// just abort after emitting the error. - pub handle_cycle_error_fn: - fn(tcx: TyCtxt<'tcx>, key: C::Key, cycle: Cycle<'tcx>, error: Diag<'_>) -> C::Value, + pub handle_cycle_error_fn: fn(tcx: TyCtxt<'tcx>, key: C::Key, cycle: Cycle<'tcx>) -> C::Value, pub format_value: fn(&C::Value) -> String, @@ -130,7 +94,17 @@ pub struct QueryVTable<'tcx, C: QueryCache> { /// and putting the obtained value into the in-memory cache. /// /// [^1]: [`TyCtxt`], [`TyCtxtAt`], [`TyCtxtEnsureOk`], [`TyCtxtEnsureDone`] - pub execute_query_fn: fn(TyCtxt<'tcx>, Span, C::Key, QueryMode) -> Option, + // FIXME: Substituting bound lifetime 'a with 'tcx causes x120 compile time slowdown for some reason. + pub execute_query_fn: + for<'a> fn(TyCtxt<'tcx>, Span, C::Key, QueryMode<'a, C::Value>) -> Option, +} + +impl<'tcx, C: QueryCache> QueryVTable<'tcx, C> { + #[inline] + pub fn cycle_handler(&self, key: C::Key) -> impl FnOnce(TyCtxt<'tcx>, Cycle<'tcx>) -> C::Value { + let handle_cycle_error = self.handle_cycle_error_fn; + move |tcx, cycle| handle_cycle_error(tcx, key, cycle) + } } impl<'tcx, C: QueryCache> fmt::Debug for QueryVTable<'tcx, C> { @@ -165,8 +139,6 @@ pub struct QuerySystem<'tcx> { pub local_providers: Providers, pub extern_providers: ExternProviders, - pub jobs: AtomicU64, - pub cycle_handler_nesting: Lock, } @@ -408,8 +380,7 @@ macro_rules! define_callbacks { } )* - /// Identifies a query by kind and key. This is in contrast to `QueryJobId` which is just a - /// number. + /// Identifies a query by kind and key. #[allow(non_camel_case_types)] #[derive(Clone, Copy, Debug)] pub enum TaggedQueryKey<'tcx> { diff --git a/compiler/rustc_middle/src/ty/context.rs b/compiler/rustc_middle/src/ty/context.rs index e756277b92d76..eab00d1221fa0 100644 --- a/compiler/rustc_middle/src/ty/context.rs +++ b/compiler/rustc_middle/src/ty/context.rs @@ -17,6 +17,7 @@ use std::{fmt, iter, mem}; use rustc_abi::{ExternAbi, FieldIdx, Layout, LayoutData, TargetDataLayout, VariantIdx}; use rustc_ast as ast; +use rustc_data_structures::cache_entry::{CacheEntry, GetError, GetOrStartError}; use rustc_data_structures::defer; use rustc_data_structures::fx::FxHashMap; use rustc_data_structures::intern::Interned; @@ -60,7 +61,10 @@ use crate::middle::codegen_fn_attrs::{CodegenFnAttrs, TargetFeature}; use crate::middle::resolve_bound_vars; use crate::mir::interpret::{self, Allocation, ConstAllocation}; use crate::mir::{Body, Local, Place, PlaceElem, ProjectionKind, Promoted}; -use crate::query::{IntoQueryKey, LocalCrate, Providers, QuerySystem, TyCtxtAt}; +use crate::query::{ + Cycle, IntoQueryKey, LocalCrate, Providers, QuerySystem, QueryWaiter, TyCtxtAt, + WorkerParkingArea, +}; use crate::thir::Thir; use crate::traits; use crate::traits::solve::{ExternalConstraints, ExternalConstraintsData, PredefinedOpaques}; @@ -676,6 +680,7 @@ impl<'tcx> Deref for TyCtxt<'tcx> { /// See [TyCtxt] for details about this type. pub struct GlobalCtxt<'tcx> { + pub parking_area: WorkerParkingArea<'tcx>, pub arena: &'tcx WorkerLocal>, pub hir_arena: &'tcx WorkerLocal>, @@ -744,9 +749,6 @@ pub struct GlobalCtxt<'tcx> { pub(crate) alloc_map: interpret::AllocMap<'tcx>, current_gcx: CurrentGcx, - - /// A jobserver reference used to release then acquire a token while waiting on a query. - pub jobserver_proxy: Arc, } impl<'tcx> GlobalCtxt<'tcx> { @@ -909,7 +911,7 @@ impl<'tcx> TyCtxt<'tcx> { /// By only providing the `TyCtxt` inside of the closure we enforce that the type /// context and any interned value (types, args, etc.) can only be used while `ty::tls` /// has a valid reference to the context, to allow formatting values that need it. - pub fn create_global_ctxt( + pub fn create_global_ctxt( gcx_cell: &'tcx OnceLock>, sess: &'tcx Session, crate_types: Vec, @@ -923,8 +925,11 @@ impl<'tcx> TyCtxt<'tcx> { hooks: crate::hooks::Providers, current_gcx: CurrentGcx, jobserver_proxy: Arc, - f: impl FnOnce(TyCtxt<'tcx>) -> T, - ) -> T { + f: F, + ) -> T + where + F: for<'a> FnOnce(TyCtxt<'a>) -> T, + { let data_layout = sess.target.parse_data_layout().unwrap_or_else(|err| { sess.dcx().emit_fatal(err); }); @@ -960,7 +965,7 @@ impl<'tcx> TyCtxt<'tcx> { data_layout, alloc_map: interpret::AllocMap::new(), current_gcx, - jobserver_proxy, + parking_area: WorkerParkingArea::new(jobserver_proxy), }); // This is a separate function to work around a crash with parallel rustc (#135870) @@ -1775,6 +1780,32 @@ macro_rules! sty_debug_print { } impl<'tcx> TyCtxt<'tcx> { + #[inline] + pub fn cache_entry_get_or_start<'a, V>( + self, + entry: &'a CacheEntry, + span: Span, + ) -> Result<(&'a V, DepNodeIndex), GetOrStartError<'a, V, Cycle<'tcx>>> { + tls::with_related_context(self, |icx| { + entry + .get_or_start(self.parking_area.parker(QueryWaiter { span, parent: icx.query })) + .map(|(v, i)| (v, DepNodeIndex::from_u32(i))) + }) + } + + #[inline] + pub fn cache_entry_get( + self, + entry: &'tcx CacheEntry, + span: Span, + ) -> Result<(&'tcx V, DepNodeIndex), GetError>> { + tls::with_related_context(self, |icx| { + entry + .get(self.parking_area.parker(QueryWaiter { span, parent: icx.query })) + .map(|(v, i)| (v, DepNodeIndex::from_u32(i))) + }) + } + pub fn debug_stats(self) -> impl fmt::Debug { fmt::from_fn(move |fmt| { sty_debug_print!( diff --git a/compiler/rustc_middle/src/ty/context/tls.rs b/compiler/rustc_middle/src/ty/context/tls.rs index d1561c37172c3..ff272cd9866d9 100644 --- a/compiler/rustc_middle/src/ty/context/tls.rs +++ b/compiler/rustc_middle/src/ty/context/tls.rs @@ -1,8 +1,10 @@ +use std::mem::transmute; + use rustc_data_structures::sync; use super::{GlobalCtxt, TyCtxt}; use crate::dep_graph::TaskDepsRef; -use crate::query::QueryJobId; +use crate::query::QueryJobRef; /// This is the implicit state of rustc. It contains the current /// `TyCtxt` and query. It is updated when creating a local interner or @@ -14,7 +16,7 @@ pub struct ImplicitCtxt<'a, 'tcx> { pub tcx: TyCtxt<'tcx>, /// The current query job, if any. - pub query: Option, + pub query: Option>, /// Used to prevent queries from calling too deeply. pub query_depth: usize, @@ -86,6 +88,22 @@ where with_context_opt(|opt_context| f(opt_context.expect("no ImplicitCtxt stored in tls"))) } +#[inline] +pub fn with_related_context<'tcx, F, R>(tcx: TyCtxt<'tcx>, f: F) -> R +where + F: for<'a> FnOnce(&ImplicitCtxt<'a, 'tcx>) -> R, +{ + with_context_opt(|opt_context| { + let icx = opt_context.expect("no ImplicitCtxt stored in tls"); + assert_eq!( + &**icx.tcx as *const _ as *const (), &**tcx as *const _ as *const (), + "argument `tcx` isn't related to implicit context's `tcx`" + ); + // SAFETY: checked above + f(unsafe { transmute(icx) }) + }) +} + /// Allows access to the `TyCtxt` in the current `ImplicitCtxt`. /// Panics if there is no `ImplicitCtxt` available. #[inline] diff --git a/compiler/rustc_query_impl/src/execution.rs b/compiler/rustc_query_impl/src/execution.rs index b614bc14b4539..77bf34444dd49 100644 --- a/compiler/rustc_query_impl/src/execution.rs +++ b/compiler/rustc_query_impl/src/execution.rs @@ -1,420 +1,79 @@ -use std::hash::Hash; -use std::mem::ManuallyDrop; - -use rustc_data_structures::hash_table::{Entry, HashTable}; +use rustc_data_structures::cache_entry::{self, EntryInProgress}; use rustc_data_structures::stack::ensure_sufficient_stack; -use rustc_data_structures::sync::{DynSend, DynSync}; -use rustc_data_structures::{defer, outline, sharded, sync}; -use rustc_errors::FatalError; use rustc_middle::dep_graph::{DepGraphData, DepNodeKey, SerializedDepNodeIndex}; -use rustc_middle::query::{ - ActiveKeyStatus, Cycle, EnsureMode, QueryCache, QueryJob, QueryJobId, QueryKey, QueryLatch, - QueryMode, QueryState, QueryVTable, -}; -use rustc_middle::ty::TyCtxt; +use rustc_middle::query::{EnsureMode, QueryCache, QueryJob, QueryJobRef, QueryMode, QueryVTable}; +use rustc_middle::ty::{TyCtxt, tls}; use rustc_middle::verify_ich::incremental_verify_ich; use rustc_span::{DUMMY_SP, Span}; -use tracing::warn; use crate::dep_graph::{DepNode, DepNodeIndex}; -use crate::handle_cycle_error; -use crate::job::{QueryJobInfo, QueryJobMap, create_cycle_error, find_cycle_in_stack}; -use crate::plumbing::{current_query_job, loadable_from_disk, next_job_id, start_query}; -use crate::query_impl::for_each_query_vtable; +use crate::job::_find_cycle_in_stack; +use crate::plumbing::{loadable_from_disk, start_query}; #[inline] -fn equivalent_key(k: K) -> impl Fn(&(K, V)) -> bool { +fn _equivalent_key(k: K) -> impl Fn(&(K, V)) -> bool { move |x| x.0 == k } -pub(crate) fn all_inactive<'tcx, K>(state: &QueryState<'tcx, K>) -> bool { - state.active.lock_shards().all(|shard| shard.is_empty()) -} - -#[derive(Clone, Copy)] -pub enum CollectActiveJobsKind { - /// We need the full query job map, and we are willing to wait to obtain the query state - /// shard lock(s). - Full, - - /// We need the full query job map, and we shouldn't need to wait to obtain the shard lock(s), - /// because we are in a place where nothing else could hold the shard lock(s). - FullNoContention, - - /// We can get by without the full query job map, so we won't bother waiting to obtain the - /// shard lock(s) if they're not already unlocked. - PartialAllowed, -} - -/// Returns a map of currently active query jobs, collected from all queries. -pub fn collect_active_query_jobs<'tcx>( - tcx: TyCtxt<'tcx>, - collect_kind: CollectActiveJobsKind, -) -> QueryJobMap<'tcx> { - let mut job_map = QueryJobMap::default(); - - for_each_query_vtable!(ALL, tcx, |query| { - collect_active_query_jobs_inner(query, collect_kind, &mut job_map); - }); - - job_map -} - -/// Internal plumbing for collecting the set of active jobs for this query. -/// -/// Aborts if jobs can't be gathered as specified by `collect_kind`. -fn collect_active_query_jobs_inner<'tcx, C>( - query: &'tcx QueryVTable<'tcx, C>, - collect_kind: CollectActiveJobsKind, - job_map: &mut QueryJobMap<'tcx>, -) where - C: QueryCache, - QueryVTable<'tcx, C>: DynSync, -{ - let mut collect_shard_jobs = |shard: &HashTable<(C::Key, ActiveKeyStatus<'tcx>)>| { - for (key, status) in shard.iter() { - if let ActiveKeyStatus::Started(job) = status { - // It's fine to call `create_tagged_key` with the shard locked, - // because it's just a `TaggedQueryKey` variant constructor. - let tagged_key = (query.create_tagged_key)(*key); - job_map.insert(job.id, QueryJobInfo { tagged_key, job: job.clone() }); - } - } - }; - - match collect_kind { - CollectActiveJobsKind::Full => { - for shard in query.state.active.lock_shards() { - collect_shard_jobs(&shard); - } - } - CollectActiveJobsKind::FullNoContention => { - for shard in query.state.active.try_lock_shards() { - match shard { - Some(shard) => collect_shard_jobs(&shard), - None => panic!("Failed to collect active jobs for query `{}`!", query.name), - } - } - } - CollectActiveJobsKind::PartialAllowed => { - for shard in query.state.active.try_lock_shards() { - match shard { - Some(shard) => collect_shard_jobs(&shard), - None => warn!("Failed to collect active jobs for query `{}`!", query.name), - } - } - } - } -} - #[cold] #[inline(never)] -fn handle_cycle<'tcx, C: QueryCache>( +fn _find_and_handle_cycle<'a, 'tcx, C: QueryCache>( query: &'tcx QueryVTable<'tcx, C>, tcx: TyCtxt<'tcx>, key: C::Key, - cycle: Cycle<'tcx>, -) -> C::Value { - let nested; - { - let mut nesting = tcx.query_system.cycle_handler_nesting.lock(); - nested = match *nesting { - 0 => false, - 1 => true, - _ => { - // Don't print further nested errors to avoid cases of infinite recursion - tcx.dcx().delayed_bug("doubly nested cycle error").raise_fatal() - } - }; - *nesting += 1; - } - let _guard = defer(|| *tcx.query_system.cycle_handler_nesting.lock() -= 1); - - let error = create_cycle_error(tcx, &cycle, nested); - - if nested { - // Avoid custom handlers and only use the robust `create_cycle_error` for nested cycle errors - handle_cycle_error::default(error) - } else { - (query.handle_cycle_error_fn)(tcx, key, cycle, error) - } -} - -/// Guard object representing the responsibility to execute a query job and -/// mark it as completed. -/// -/// This will poison the relevant query key if it is dropped without calling -/// [`Self::complete`]. -struct ActiveJobGuard<'tcx, K> -where - K: Eq + Hash + Copy, -{ - state: &'tcx QueryState<'tcx, K>, - key: K, - key_hash: u64, -} - -impl<'tcx, K> ActiveJobGuard<'tcx, K> -where - K: Eq + Hash + Copy, -{ - /// Completes the query by updating the query cache with the `result`, - /// signals the waiter, and forgets the guard so it won't poison the query. - fn complete(self, cache: &C, value: C::Value, dep_node_index: DepNodeIndex) - where - C: QueryCache, - { - // Mark as complete before we remove the job from the active state - // so no other thread can re-execute this query. - cache.complete(self.key, value, dep_node_index); - - let mut this = ManuallyDrop::new(self); - - // Drop everything without poisoning the query. - this.drop_and_maybe_poison(/* poison */ false); - } - - fn drop_and_maybe_poison(&mut self, poison: bool) { - let status = { - let mut shard = self.state.active.lock_shard_by_hash(self.key_hash); - match shard.find_entry(self.key_hash, equivalent_key(self.key)) { - Err(_) => { - // Note: we must not panic while holding the lock, because unwinding also looks - // at this map, which can result in a double panic. So drop it first. - drop(shard); - panic!(); - } - Ok(occupied) => { - let ((key, status), vacant) = occupied.remove(); - if poison { - vacant.insert((key, ActiveKeyStatus::Poisoned)); - } - status - } - } - }; - - // Also signal the completion of the job, so waiters will continue execution. - match status { - ActiveKeyStatus::Started(job) => job.signal_complete(), - ActiveKeyStatus::Poisoned => panic!(), - } - } -} - -impl<'tcx, K> Drop for ActiveJobGuard<'tcx, K> -where - K: Eq + Hash + Copy, -{ - #[inline(never)] - #[cold] - fn drop(&mut self) { - // Poison the query so jobs waiting on it panic. - self.drop_and_maybe_poison(/* poison */ true); - } -} - -#[cold] -#[inline(never)] -fn find_and_handle_cycle<'tcx, C: QueryCache>( - query: &'tcx QueryVTable<'tcx, C>, - tcx: TyCtxt<'tcx>, - key: C::Key, - try_execute: QueryJobId, + try_execute: QueryJobRef<'a, 'tcx>, span: Span, ) -> (C::Value, Option) { - // Ensure there were no errors collecting all active jobs. - // We need the complete map to ensure we find a cycle to break. - let job_map = collect_active_query_jobs(tcx, CollectActiveJobsKind::FullNoContention); - - let cycle = find_cycle_in_stack(try_execute, job_map, ¤t_query_job(), span); - (handle_cycle(query, tcx, key, cycle), None) -} - -#[inline(always)] -fn wait_for_query<'tcx, C: QueryCache>( - query: &'tcx QueryVTable<'tcx, C>, - tcx: TyCtxt<'tcx>, - span: Span, - key: C::Key, - key_hash: u64, - latch: QueryLatch<'tcx>, - current: Option, -) -> (C::Value, Option) { - // For parallel queries, we'll block and wait until the query running - // in another thread has completed. Record how long we wait in the - // self-profiler. - let query_blocked_prof_timer = tcx.prof.query_blocked(); - - // With parallel queries we might just have to wait on some other thread. - let result = latch.wait_on(tcx, current, span); - - match result { - Ok(()) => { - let Some((v, index)) = query.cache.lookup(&key) else { - outline(|| { - // We didn't find the query result in the query cache. Check if it was - // poisoned due to a panic instead. - let shard = query.state.active.lock_shard_by_hash(key_hash); - match shard.find(key_hash, equivalent_key(key)) { - // The query we waited on panicked. Continue unwinding here. - Some((_, ActiveKeyStatus::Poisoned)) => FatalError.raise(), - _ => panic!( - "query '{}' result must be in the cache or the query must be poisoned after a wait", - query.name - ), - } - }) - }; - - tcx.prof.query_cache_hit(index.into()); - query_blocked_prof_timer.finish_with_query_invocation_id(index.into()); - - (v, Some(index)) - } - Err(cycle) => (handle_cycle(query, tcx, key, cycle), None), - } + tls::with_related_context(tcx, |icx| { + let cycle = _find_cycle_in_stack(try_execute, icx.query, span); + ((query.handle_cycle_error_fn)(tcx, key, cycle), None) + }) } /// Shared main part of both [`execute_query_incr_inner`] and [`execute_query_non_incr_inner`]. #[inline(never)] -fn try_execute_query<'tcx, C: QueryCache, const INCR: bool>( +fn try_execute_query<'a, 'tcx, C: QueryCache, const INCR: bool>( query: &'tcx QueryVTable<'tcx, C>, tcx: TyCtxt<'tcx>, span: Span, key: C::Key, dep_node: Option, // `None` for non-incremental, `Some` for incremental -) -> (C::Value, Option) { - let key_hash = sharded::make_hash(&key); - let mut state_lock = query.state.active.lock_shard_by_hash(key_hash); - - // For the parallel compiler we need to check both the query cache and query state structures - // while holding the state lock to ensure that 1) the query has not yet completed and 2) the - // query is not still executing. Without checking the query cache here, we can end up - // re-executing the query since `try_start` only checks that the query is not currently - // executing, but another thread may have already completed the query and stores it result - // in the query cache. - if tcx.sess.threads().is_some() { - if let Some((value, index)) = query.cache.lookup(&key) { - tcx.prof.query_cache_hit(index.into()); - return (value, Some(index)); - } - } - - let current_job_id = current_query_job(); - - match state_lock.entry(key_hash, equivalent_key(key), |(k, _)| sharded::make_hash(k)) { - Entry::Vacant(entry) => { - // Nothing has computed or is computing the query, so we start a new job and insert it - // in the state map. - let id = next_job_id(tcx); - let job = QueryJob::new(id, span, current_job_id); - entry.insert((key, ActiveKeyStatus::Started(job))); - - // Drop the lock before we start executing the query. - drop(state_lock); - - // Set up a guard object that will automatically poison the query if a - // panic occurs while executing the query (or any intermediate plumbing). - let job_guard = ActiveJobGuard { state: &query.state, key, key_hash }; - - // Delegate to another function to actually execute the query job. - let (value, dep_node_index) = if INCR { - execute_job_incr(query, tcx, key, dep_node.unwrap(), id) - } else { - execute_job_non_incr(query, tcx, key, id) - }; - - if query.feedable { - check_feedable_consistency(tcx, query, key, &value); - } - - // Tell the guard to insert `value` in the cache and remove the status entry from - // `query.state`. - job_guard.complete(&query.cache, value, dep_node_index); - - (value, Some(dep_node_index)) - } - Entry::Occupied(mut entry) => { - match &mut entry.get_mut().1 { - ActiveKeyStatus::Started(job) => { - if sync::is_dyn_thread_safe() { - // Get the latch out - let latch = job.latch(); - drop(state_lock); - - // Only call `wait_for_query` if we're using a Rayon thread pool - // as it will attempt to mark the worker thread as blocked. - wait_for_query(query, tcx, span, key, key_hash, latch, current_job_id) - } else { - let id = job.id; - drop(state_lock); - - // If we are single-threaded we know that we have cycle error, - // so we just return the error. - find_and_handle_cycle(query, tcx, key, id, span) - } - } - ActiveKeyStatus::Poisoned => FatalError.raise(), - } - } - } -} + entry: EntryInProgress<'a, C::Value>, +) -> (&'a C::Value, DepNodeIndex) { + tls::with_related_context(tcx, |icx| { + let form_tagged_key = || (query.create_tagged_key)(key); + let job = QueryJob { + span, + parent: icx.query, + form_tagged_key: &form_tagged_key, + entry_status: entry.entry().status(), + }; + // Delegate to another function to actually execute the query job. + let (value, dep_node_index) = if INCR { + execute_job_incr(query, tcx, key, dep_node.unwrap(), &job) + } else { + execute_job_non_incr(query, tcx, key, &job) + }; -#[inline(always)] -fn check_feedable_consistency<'tcx, C: QueryCache>( - tcx: TyCtxt<'tcx>, - query: &'tcx QueryVTable<'tcx, C>, - key: C::Key, - value: &C::Value, -) { - // We should not compute queries that also got a value via feeding. - // This can't happen, as query feeding adds the very dependencies to the fed query - // as its feeding query had. So if the fed query is red, so is its feeder, which will - // get evaluated first, and re-feed the query. - let Some((cached_value, _)) = query.cache.lookup(&key) else { return }; - - let Some(hash_value_fn) = query.hash_value_fn else { - panic!( - "no_hash fed query later has its value computed.\n\ - Remove `no_hash` modifier to allow recomputation.\n\ - The already cached value: {}", - (query.format_value)(&cached_value) - ); - }; + let value = entry.complete(value, dep_node_index.as_u32(), &tcx.parking_area); - let (old_hash, new_hash) = tcx.with_stable_hashing_context(|mut hcx| { - (hash_value_fn(&mut hcx, &cached_value), hash_value_fn(&mut hcx, value)) - }); - let formatter = query.format_value; - if old_hash != new_hash { - // We have an inconsistency. This can happen if one of the two - // results is tainted by errors. - assert!( - tcx.dcx().has_errors().is_some(), - "Computed query value for {:?}({:?}) is inconsistent with fed value,\n\ - computed={:#?}\nfed={:#?}", - query.dep_kind, - key, - formatter(value), - formatter(&cached_value), - ); - } + (value, dep_node_index) + }) } // Fast path for when incr. comp. is off. #[inline(always)] -fn execute_job_non_incr<'tcx, C: QueryCache>( +fn execute_job_non_incr<'a, 'tcx, C: QueryCache>( query: &'tcx QueryVTable<'tcx, C>, tcx: TyCtxt<'tcx>, key: C::Key, - job_id: QueryJobId, + job: QueryJobRef<'a, 'tcx>, ) -> (C::Value, DepNodeIndex) { debug_assert!(!tcx.dep_graph.is_fully_enabled()); let prof_timer = tcx.prof.query_provider(); // Call the query provider. - let value = start_query(job_id, query.depth_limit, || (query.invoke_provider_fn)(tcx, key)); + let value = start_query(tcx, job, query.depth_limit, || (query.invoke_provider_fn)(tcx, key)); let dep_node_index = tcx.dep_graph.next_virtual_depnode_index(); prof_timer.finish_with_query_invocation_id(dep_node_index.into()); @@ -432,12 +91,12 @@ fn execute_job_non_incr<'tcx, C: QueryCache>( } #[inline(always)] -fn execute_job_incr<'tcx, C: QueryCache>( +fn execute_job_incr<'a, 'tcx, C: QueryCache>( query: &'tcx QueryVTable<'tcx, C>, tcx: TyCtxt<'tcx>, key: C::Key, dep_node: DepNode, - job_id: QueryJobId, + job: QueryJobRef<'a, 'tcx>, ) -> (C::Value, DepNodeIndex) { let dep_graph_data = tcx.dep_graph.data().expect("should always be present in incremental mode"); @@ -445,7 +104,7 @@ fn execute_job_incr<'tcx, C: QueryCache>( if !query.eval_always { // The diagnostics for this query will be promoted to the current session during // `try_mark_green()`, so we can ignore them here. - if let Some(ret) = start_query(job_id, false, || try { + if let Some(ret) = start_query(tcx, job, false, || try { let (prev_index, dep_node_index) = dep_graph_data.try_mark_green(tcx, &dep_node)?; let value = load_from_disk_or_invoke_provider_green( tcx, @@ -464,7 +123,7 @@ fn execute_job_incr<'tcx, C: QueryCache>( let prof_timer = tcx.prof.query_provider(); - let (result, dep_node_index) = start_query(job_id, query.depth_limit, || { + let (result, dep_node_index) = start_query(tcx, job, query.depth_limit, || { // Call the query provider. dep_graph_data.with_task( dep_node, @@ -614,41 +273,63 @@ fn ensure_can_skip_execution<'tcx, C: QueryCache>( /// Called by a macro-generated impl of [`QueryVTable::execute_query_fn`], /// in non-incremental mode. #[inline(always)] -pub(super) fn execute_query_non_incr_inner<'tcx, C: QueryCache>( +pub(super) fn execute_query_non_incr_inner<'a, 'tcx, C: QueryCache>( query: &'tcx QueryVTable<'tcx, C>, tcx: TyCtxt<'tcx>, span: Span, key: C::Key, + mode: QueryMode<'a, C::Value>, ) -> C::Value { - ensure_sufficient_stack(|| try_execute_query::(query, tcx, span, key, None).0) + let entry = match mode { + QueryMode::Get { entry } => entry, + QueryMode::Ensure { entry, .. } => match tcx.cache_entry_get_or_start(entry, span) { + Ok((result, _)) => return *result, + Err(cache_entry::GetOrStartError::InProgress(entry)) => entry, + Err(cache_entry::GetOrStartError::Interrupted(cycle)) => { + return (query.handle_cycle_error_fn)(tcx, key, cycle); + } + }, + }; + ensure_sufficient_stack(|| *try_execute_query::(query, tcx, span, key, None, entry).0) } /// Called by a macro-generated impl of [`QueryVTable::execute_query_fn`], /// in incremental mode. #[inline(always)] -pub(super) fn execute_query_incr_inner<'tcx, C: QueryCache>( +pub(super) fn execute_query_incr_inner<'a, 'tcx, C: QueryCache>( query: &'tcx QueryVTable<'tcx, C>, tcx: TyCtxt<'tcx>, span: Span, key: C::Key, - mode: QueryMode, + mode: QueryMode<'a, C::Value>, ) -> Option { let dep_node = DepNode::construct(tcx, query.dep_kind, &key); // Check if query execution can be skipped, for `ensure_ok` or `ensure_done`. - if let QueryMode::Ensure { ensure_mode } = mode - && ensure_can_skip_execution(query, tcx, key, dep_node, ensure_mode) - { - return None; - } + let entry = match mode { + QueryMode::Ensure { ensure_mode, entry } => { + if ensure_can_skip_execution(query, tcx, key, dep_node, ensure_mode) { + return None; + } + match tcx.cache_entry_get_or_start(entry, span) { + Ok((result, dep_node_index)) => { + tcx.dep_graph.read_index(dep_node_index); + return Some(*result); + } + Err(cache_entry::GetOrStartError::InProgress(entry)) => entry, + Err(cache_entry::GetOrStartError::Interrupted(cycle)) => { + return Some((query.handle_cycle_error_fn)(tcx, key, cycle)); + } + } + } + QueryMode::Get { entry } => entry, + }; let (result, dep_node_index) = ensure_sufficient_stack(|| { - try_execute_query::(query, tcx, span, key, Some(dep_node)) + try_execute_query::(query, tcx, span, key, Some(dep_node), entry) }); - if let Some(dep_node_index) = dep_node_index { - tcx.dep_graph.read_index(dep_node_index) - } - Some(result) + tcx.dep_graph.read_index(dep_node_index); + Some(*result) } /// Inner implementation of [`DepKindVTable::force_from_dep_node_fn`][force_fn] @@ -666,9 +347,18 @@ pub(crate) fn force_query_dep_node<'tcx, C: QueryCache>( return false; }; - ensure_sufficient_stack(|| { - try_execute_query::(query, tcx, DUMMY_SP, key, Some(dep_node)) - }); + let entry = query.cache.lookup(key); + match tcx.cache_entry_get_or_start(entry, DUMMY_SP) { + Err(cache_entry::GetOrStartError::InProgress(entry)) => { + ensure_sufficient_stack(|| { + try_execute_query::(query, tcx, DUMMY_SP, key, Some(dep_node), entry) + }); + } + Err(cache_entry::GetOrStartError::Interrupted(cycle)) => { + (query.handle_cycle_error_fn)(tcx, key, cycle); + } + Ok(_) => (), + } // We did manage to recover a key and force the node, though it's up to // the caller to check whether the node ended up marked red or green. diff --git a/compiler/rustc_query_impl/src/handle_cycle_error.rs b/compiler/rustc_query_impl/src/handle_cycle_error.rs index 79e7788cafe81..e21f72abb614c 100644 --- a/compiler/rustc_query_impl/src/handle_cycle_error.rs +++ b/compiler/rustc_query_impl/src/handle_cycle_error.rs @@ -4,6 +4,7 @@ use std::iter; use std::ops::ControlFlow; use rustc_data_structures::fx::FxHashSet; +use rustc_data_structures::sync::Lock; use rustc_errors::codes::*; use rustc_errors::{Applicability, Diag, MultiSpan, pluralize, struct_span_code_err}; use rustc_hir as hir; @@ -19,17 +20,62 @@ use crate::job::create_cycle_error; // Default cycle handler used for all queries that don't use the `handle_cycle_error` query // modifier. -pub(crate) fn default(err: Diag<'_>) -> ! { +pub(crate) fn default<'tcx>(tcx: TyCtxt<'tcx>, cycle: Cycle<'tcx>) -> ! { + let (_guard, err) = intro(tcx, &cycle); let guar = err.emit(); guar.raise_fatal() } +struct CycleHandlerNestingGuard<'tcx> { + nesting: &'tcx Lock, + nested: bool, +} + +impl<'tcx> CycleHandlerNestingGuard<'tcx> { + fn new(tcx: TyCtxt<'tcx>) -> Self { + let mutex = &tcx.query_system.cycle_handler_nesting; + let mut nesting = mutex.lock(); + let nested = match *nesting { + 0 => false, + 1 => true, + _ => { + // Don't print further nested errors to avoid cases of infinite recursion + tcx.dcx().delayed_bug("doubly nested cycle error").raise_fatal() + } + }; + *nesting += 1; + CycleHandlerNestingGuard { nesting: mutex, nested } + } +} + +impl<'tcx> Drop for CycleHandlerNestingGuard<'tcx> { + fn drop(&mut self) { + *self.nesting.lock() -= 1; + } +} + +fn intro<'tcx>( + tcx: TyCtxt<'tcx>, + cycle: &Cycle<'tcx>, +) -> (CycleHandlerNestingGuard<'tcx>, Diag<'tcx>) { + let guard = CycleHandlerNestingGuard::new(tcx); + + let error = create_cycle_error(tcx, &cycle, guard.nested); + + if guard.nested { + // Avoid custom handlers and only use the robust `create_cycle_error` for nested cycle errors + error.emit().raise_fatal() + } + + (guard, error) +} + pub(crate) fn fn_sig<'tcx>( tcx: TyCtxt<'tcx>, def_id: DefId, - _: Cycle<'tcx>, - err: Diag<'_>, + cycle: Cycle<'tcx>, ) -> ty::EarlyBinder<'tcx, ty::PolyFnSig<'tcx>> { + let (_guard, err) = intro(tcx, &cycle); let guar = err.delay_as_bug(); let err = Ty::new_error(tcx, guar); @@ -52,7 +98,6 @@ pub(crate) fn check_representability<'tcx>( tcx: TyCtxt<'tcx>, _key: LocalDefId, cycle: Cycle<'tcx>, - _err: Diag<'_>, ) { check_representability_inner(tcx, cycle); } @@ -61,12 +106,12 @@ pub(crate) fn check_representability_adt_ty<'tcx>( tcx: TyCtxt<'tcx>, _key: Ty<'tcx>, cycle: Cycle<'tcx>, - _err: Diag<'_>, ) { check_representability_inner(tcx, cycle); } fn check_representability_inner<'tcx>(tcx: TyCtxt<'tcx>, cycle: Cycle<'tcx>) -> ! { + let (_guard, _diag) = intro(tcx, &cycle); let mut item_and_field_ids = Vec::new(); let mut representable_ids = FxHashSet::default(); for frame in &cycle.frames { @@ -100,9 +145,9 @@ fn check_representability_inner<'tcx>(tcx: TyCtxt<'tcx>, cycle: Cycle<'tcx>) -> pub(crate) fn variances_of<'tcx>( tcx: TyCtxt<'tcx>, def_id: DefId, - _cycle: Cycle<'tcx>, - err: Diag<'_>, + cycle: Cycle<'tcx>, ) -> &'tcx [ty::Variance] { + let (_guard, err) = intro(tcx, &cycle); let _guar = err.delay_as_bug(); let n = tcx.generics_of(def_id).own_params.len(); tcx.arena.alloc_from_iter(iter::repeat_n(ty::Bivariant, n)) @@ -131,8 +176,8 @@ pub(crate) fn layout_of<'tcx>( tcx: TyCtxt<'tcx>, _key: ty::PseudoCanonicalInput<'tcx, Ty<'tcx>>, cycle: Cycle<'tcx>, - err: Diag<'_>, ) -> Result, &'tcx ty::layout::LayoutError<'tcx>> { + let (_guard, err) = intro(tcx, &cycle); let _guar = err.delay_as_bug(); let diag = search_for_cycle_permutation( &cycle.frames, diff --git a/compiler/rustc_query_impl/src/job.rs b/compiler/rustc_query_impl/src/job.rs index bf0493b29fd1e..f8d224110a527 100644 --- a/compiler/rustc_query_impl/src/job.rs +++ b/compiler/rustc_query_impl/src/job.rs @@ -1,71 +1,27 @@ use std::io::Write; use std::ops::ControlFlow; -use std::sync::Arc; -use std::{iter, mem}; +use std::{iter, mem, ptr}; -use rustc_data_structures::fx::{FxHashMap, FxHashSet}; +use rustc_data_structures::fx::FxHashSet; use rustc_errors::{Diag, DiagCtxtHandle}; use rustc_hir::def::DefKind; use rustc_middle::queries::TaggedQueryKey; -use rustc_middle::query::{Cycle, QueryJob, QueryJobId, QueryLatch, QueryStackFrame, QueryWaiter}; +use rustc_middle::query::{Cycle, QueryJob, QueryJobRef, QueryStackFrame, QueryWaiterGuard}; use rustc_middle::ty::TyCtxt; use rustc_span::{DUMMY_SP, Span}; -use crate::{CollectActiveJobsKind, collect_active_query_jobs}; - -/// Map from query job IDs to job information collected by -/// `collect_active_query_jobs`. -#[derive(Debug, Default)] -pub struct QueryJobMap<'tcx> { - map: FxHashMap>, -} - -impl<'tcx> QueryJobMap<'tcx> { - /// Adds information about a job ID to the job map. - /// - /// Should only be called by `collect_active_query_jobs_inner`. - pub(crate) fn insert(&mut self, id: QueryJobId, info: QueryJobInfo<'tcx>) { - self.map.insert(id, info); - } - - fn tagged_key_of(&self, id: QueryJobId) -> TaggedQueryKey<'tcx> { - self.map[&id].tagged_key - } - - fn span_of(&self, id: QueryJobId) -> Span { - self.map[&id].job.span - } - - fn parent_of(&self, id: QueryJobId) -> Option { - self.map[&id].job.parent - } - - fn latch_of(&self, id: QueryJobId) -> Option<&QueryLatch<'tcx>> { - self.map[&id].job.latch.as_ref() - } -} - -#[derive(Debug)] -pub(crate) struct QueryJobInfo<'tcx> { - pub(crate) tagged_key: TaggedQueryKey<'tcx>, - pub(crate) job: QueryJob<'tcx>, -} - -pub(crate) fn find_cycle_in_stack<'tcx>( - id: QueryJobId, - job_map: QueryJobMap<'tcx>, - current_job: &Option, +pub(crate) fn _find_cycle_in_stack<'a, 'tcx>( + top_job: QueryJobRef<'a, 'tcx>, + mut current_job: Option>, span: Span, ) -> Cycle<'tcx> { // Find the waitee amongst `current_job` parents. let mut frames = Vec::new(); - let mut current_job = Option::clone(current_job); while let Some(job) = current_job { - let info = &job_map.map[&job]; - frames.push(QueryStackFrame { span: info.job.span, tagged_key: info.tagged_key }); + frames.push(QueryStackFrame { span: job.span, tagged_key: (job.form_tagged_key)() }); - if job == id { + if ptr::eq(job, top_job) { frames.reverse(); // This is the end of the cycle. The span entry we included was for @@ -74,13 +30,13 @@ pub(crate) fn find_cycle_in_stack<'tcx>( frames[0].span = span; // Find out why the cycle itself was used. let usage = try { - let parent = info.job.parent?; - QueryStackFrame { span: info.job.span, tagged_key: job_map.tagged_key_of(parent) } + let parent = job.parent?; + QueryStackFrame { span: job.span, tagged_key: (parent.form_tagged_key)() } }; return Cycle { usage, frames }; } - current_job = info.job.parent; + current_job = job.parent; } panic!("did not find a cycle") @@ -90,62 +46,65 @@ pub(crate) fn find_cycle_in_stack<'tcx>( /// (but not necessarily the same query key), and returns information about it. #[cold] #[inline(never)] -pub(crate) fn find_dep_kind_root<'tcx>( +pub(crate) fn find_dep_kind_root<'a, 'tcx>( tcx: TyCtxt<'tcx>, - id: QueryJobId, - job_map: QueryJobMap<'tcx>, + mut job: QueryJobRef<'a, 'tcx>, ) -> (Span, String, usize) { let mut depth = 1; - let mut info = &job_map.map[&id]; // Two query jobs are for the same query method if they have the same // `TaggedQueryKey` discriminant. - let expected_query = mem::discriminant::>(&info.tagged_key); - let mut last_info = info; + let mut tagged_key = (job.form_tagged_key)(); + let expected_query = mem::discriminant::>(&tagged_key); - while let Some(id) = info.job.parent { - info = &job_map.map[&id]; - if mem::discriminant(&info.tagged_key) == expected_query { + while let Some(next_job) = job.parent { + let next_tagged_key = (next_job.form_tagged_key)(); + if mem::discriminant(&next_tagged_key) == expected_query { depth += 1; - last_info = info; + job = next_job; + tagged_key = next_tagged_key; } } - (last_info.job.span, last_info.tagged_key.description(tcx), depth) + (job.span, tagged_key.description(tcx), depth) } -/// The locaton of a resumable waiter. The usize is the index into waiters in the query's latch. +/// The locaton of a resumable waiter. The usize is the thread index into worker thread pool. +// FIXME: correct this comment: /// We'll use this to remove the waiter using `QueryLatch::extract_waiter` if we're waking it up. -type ResumableWaiterLocation = (QueryJobId, usize); +type ResumableWaiterLocation<'a, 'tcx> = (QueryJobRef<'a, 'tcx>, usize); /// This abstracts over non-resumable waiters which are found in `QueryJob`'s `parent` field /// and resumable waiters are in `latch` field. -struct AbstractedWaiter { +struct AbstractedWaiter<'a, 'tcx> { /// The span corresponding to the reason for why we're waiting on this query. span: Span, /// The query which we are waiting from, if none the waiter is from a compiler root. - parent: Option, - resumable: Option, + parent: Option>, + resumable: Option>, } /// Returns all the non-resumable and resumable waiters of a query. /// This is used so we can uniformly loop over both non-resumable and resumable waiters. -fn abstracted_waiters_of(job_map: &QueryJobMap<'_>, query: QueryJobId) -> Vec { +fn abstracted_waiters_of<'a, 'tcx>( + query: QueryJobRef<'a, 'tcx>, + waiters: &'a [Option>], +) -> Vec> { let mut result = Vec::new(); // Add the parent which is a non-resumable waiter since it's on the same stack - result.push(AbstractedWaiter { - span: job_map.span_of(query), - parent: job_map.parent_of(query), - resumable: None, - }); + result.push(AbstractedWaiter { span: query.span, parent: query.parent, resumable: None }); // Add the explicit waiters which use condvars and are resumable - if let Some(latch) = job_map.latch_of(query) { - for (i, waiter) in latch.waiters.lock().as_ref().unwrap().iter().enumerate() { - result.push(AbstractedWaiter { - span: waiter.span, - parent: waiter.parent, - resumable: Some((query, i)), - }); + let worker_threads = query.entry_status.waiter_threads(); + if worker_threads != 0 { + for i in 0..rustc_thread_pool::max_num_threads() { + if worker_threads & (1 << i) != 0 { + let waiter = waiters[i as usize].as_ref().unwrap(); + result.push(AbstractedWaiter { + span: waiter.span(), + parent: waiter.parent(), + resumable: Some((query, i as usize)), + }); + } } } @@ -156,15 +115,15 @@ fn abstracted_waiters_of(job_map: &QueryJobMap<'_>, query: QueryJobId) -> Vec( - job_map: &QueryJobMap<'tcx>, - query: QueryJobId, +fn find_cycle<'a, 'tcx>( + query: QueryJobRef<'a, 'tcx>, span: Span, - stack: &mut Vec<(Span, QueryJobId)>, - visited: &mut FxHashSet, -) -> ControlFlow> { + waiters: &'a [Option>], + stack: &mut Vec<(Span, QueryJobRef<'a, 'tcx>)>, + visited: &mut FxHashSet<*const QueryJob<'a, 'tcx>>, +) -> ControlFlow>> { if !visited.insert(query) { - return if let Some(pos) = stack.iter().position(|q| q.1 == query) { + return if let Some(pos) = stack.iter().position(|q| ptr::eq(q.1, query)) { // We detected a query cycle, fix up the initial span and return Some // Remove previous stack entries @@ -181,13 +140,13 @@ fn find_cycle<'tcx>( stack.push((span, query)); // Visit all the waiters - for abstracted_waiter in abstracted_waiters_of(job_map, query) { + for abstracted_waiter in abstracted_waiters_of(query, waiters) { let Some(parent) = abstracted_waiter.parent else { // Skip waiters which are not queries continue; }; if let ControlFlow::Break(maybe_resumable) = - find_cycle(job_map, parent, abstracted_waiter.span, stack, visited) + find_cycle(parent, abstracted_waiter.span, waiters, stack, visited) { // Return the resumable waiter in `waiter.resumable` if present return ControlFlow::Break(abstracted_waiter.resumable.or(maybe_resumable)); @@ -203,10 +162,10 @@ fn find_cycle<'tcx>( /// Finds out if there's a path to the compiler root (aka. code which isn't in a query) /// from `query` without going through any of the queries in `visited`. /// This is achieved with a depth first search. -fn connected_to_root<'tcx>( - job_map: &QueryJobMap<'tcx>, - query: QueryJobId, - visited: &mut FxHashSet, +fn connected_to_root<'a, 'tcx>( + query: QueryJobRef<'a, 'tcx>, + waiters: &'a [Option>], + visited: &mut FxHashSet<*const QueryJob<'a, 'tcx>>, ) -> bool { // We already visited this or we're deliberately ignoring it if !visited.insert(query) { @@ -214,12 +173,12 @@ fn connected_to_root<'tcx>( } // Visit all the waiters - for abstracted_waiter in abstracted_waiters_of(job_map, query) { + for abstracted_waiter in abstracted_waiters_of(query, waiters) { match abstracted_waiter.parent { // This query is connected to the root None => return true, Some(parent) => { - if connected_to_root(job_map, parent, visited) { + if connected_to_root(parent, waiters, visited) { return true; } } @@ -230,7 +189,10 @@ fn connected_to_root<'tcx>( } /// Processes a found query cycle into a `Cycle` -fn process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId)>) -> Cycle<'tcx> { +fn process_cycle<'a, 'tcx>( + stack: Vec<(Span, QueryJobRef<'a, 'tcx>)>, + waiters: &'a [Option>], +) -> Cycle<'tcx> { // The stack is a vector of pairs of spans and queries; reverse it so that // the earlier entries require later entries let (mut spans, queries): (Vec<_>, Vec<_>) = stack.into_iter().rev().unzip(); @@ -241,9 +203,9 @@ fn process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId // Zip them back together let mut stack: Vec<_> = iter::zip(spans, queries).collect(); - struct EntryPoint { - query_in_cycle: QueryJobId, - query_waiting_on_cycle: Option<(Span, QueryJobId)>, + struct EntryPoint<'a, 'tcx> { + query_in_cycle: QueryJobRef<'a, 'tcx>, + query_waiting_on_cycle: Option<(Span, QueryJobRef<'a, 'tcx>)>, } // Find the queries in the cycle which are @@ -255,7 +217,7 @@ fn process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId let mut query_waiting_on_cycle = None; // Find a direct waiter who leads to the root - for abstracted_waiter in abstracted_waiters_of(job_map, query_in_cycle) { + for abstracted_waiter in abstracted_waiters_of(query_in_cycle, waiters) { let Some(parent) = abstracted_waiter.parent else { // The query in the cycle is directly connected to root. entrypoint = true; @@ -264,9 +226,9 @@ fn process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId // Mark all the other queries in the cycle as already visited, // so paths to the root through the cycle itself won't count. - let mut visited = FxHashSet::from_iter(stack.iter().map(|q| q.1)); + let mut visited = FxHashSet::from_iter(stack.iter().map(|q| q.1 as *const _)); - if connected_to_root(job_map, parent, &mut visited) { + if connected_to_root(parent, waiters, &mut visited) { query_waiting_on_cycle = Some((abstracted_waiter.span, parent)); entrypoint = true; break; @@ -275,7 +237,7 @@ fn process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId entrypoint.then_some(EntryPoint { query_in_cycle, query_waiting_on_cycle }) }) - .collect::>(); + .collect::>>(); // Pick an entry point, preferring ones with waiters let entry_point = entry_points @@ -284,51 +246,49 @@ fn process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId .unwrap_or(&entry_points[0]); // Shift the stack so that our entry point is first - let entry_point_pos = stack.iter().position(|(_, query)| *query == entry_point.query_in_cycle); + let entry_point_pos = + stack.iter().position(|(_, query)| ptr::eq(*query, entry_point.query_in_cycle)); if let Some(pos) = entry_point_pos { stack.rotate_left(pos); } let usage = entry_point .query_waiting_on_cycle - .map(|(span, job)| QueryStackFrame { span, tagged_key: job_map.tagged_key_of(job) }); + .map(|(span, job)| QueryStackFrame { span, tagged_key: (job.form_tagged_key)() }); // Create the cycle error Cycle { usage, frames: stack .iter() - .map(|&(span, job)| QueryStackFrame { span, tagged_key: job_map.tagged_key_of(job) }) + .map(|&(span, job)| QueryStackFrame { span, tagged_key: (job.form_tagged_key)() }) .collect(), } } /// Looks for a query cycle starting at `query`. /// Returns a waiter to resume if a cycle is found. -fn find_and_process_cycle<'tcx>( - job_map: &QueryJobMap<'tcx>, - query: QueryJobId, -) -> Option>> { +fn find_and_process_cycle<'a, 'tcx>( + query: QueryJobRef<'a, 'tcx>, + waiters: &'a [Option>], +) -> Option<(usize, Cycle<'tcx>)> { let mut visited = FxHashSet::default(); let mut stack = Vec::new(); if let ControlFlow::Break(resumable) = - find_cycle(job_map, query, DUMMY_SP, &mut stack, &mut visited) + find_cycle(query, DUMMY_SP, waiters, &mut stack, &mut visited) { // Create the cycle error - let error = process_cycle(job_map, stack); + let error = process_cycle(stack, waiters); // We unwrap `resumable` here since there must always be one // edge which is resumable / waited using a query latch - let (waitee_query, waiter_idx) = resumable.unwrap(); - - // Extract the waiter we want to resume - let waiter = job_map.latch_of(waitee_query).unwrap().extract_waiter(waiter_idx); + let (waitee_query, thread_idx) = resumable.unwrap(); - // Set the cycle error so it will be picked up when resumed - *waiter.cycle.lock() = Some(error); + // Remove a query waiter we want to resume + waitee_query.entry_status.remove_waiter_threads(1 << thread_idx); // Put the waiter on the list of things to resume - Some(waiter) + Some((thread_idx, error)) } else { None } @@ -341,23 +301,31 @@ fn find_and_process_cycle<'tcx>( /// There may be multiple cycles involved in a deadlock, but this only breaks one at a time so /// there will be multiple rounds through the deadlock handler if multiple cycles are present. #[allow(rustc::potential_query_instability)] -pub fn break_query_cycle<'tcx>(job_map: QueryJobMap<'tcx>, registry: &rustc_thread_pool::Registry) { +pub fn break_query_cycle<'tcx>(tcx: TyCtxt<'tcx>, registry: &rustc_thread_pool::Registry) { + let mut waiters: Box<[Option>]> = (0..registry.num_threads()) + .map(|thrd_idx| tcx.parking_area.lock_waiter(thrd_idx)) + .collect(); + + let mut iter = waiters.iter().flatten(); // Look for a cycle starting at each query job - let waiter = job_map - .map - .keys() - .find_map(|query| find_and_process_cycle(&job_map, *query)) - .expect("unable to find a query cycle"); + let (waiter_idx, cycle) = 'cycle: loop { + let waiter = iter.next().expect("unable to find a query cycle"); + let mut parent = waiter.parent(); + while let Some(query) = parent { + if let Some(res) = find_and_process_cycle(query, &waiters) { + break 'cycle res; + } + parent = query.parent; + } + }; // Mark the thread we're about to wake up as unblocked. - rustc_thread_pool::mark_unblocked(registry); - - assert!(waiter.condvar.notify_one(), "unable to wake the waiter"); + waiters[waiter_idx].take().unwrap().unpark_with_cycle(cycle, registry); } -pub fn print_query_stack<'tcx>( +pub fn print_query_stack<'a, 'tcx>( tcx: TyCtxt<'tcx>, - mut current_query: Option, + mut current_query: Option>, dcx: DiagCtxtHandle<'_>, limit_frames: Option, mut file: Option, @@ -368,24 +336,19 @@ pub fn print_query_stack<'tcx>( let mut count_printed = 0; let mut count_total = 0; - // Make use of a partial query job map if we fail to take locks collecting active queries. - let job_map = collect_active_query_jobs(tcx, CollectActiveJobsKind::PartialAllowed); - if let Some(ref mut file) = file { let _ = writeln!(file, "\n\nquery stack during panic:"); } while let Some(query) = current_query { - let Some(query_info) = job_map.map.get(&query) else { - break; - }; - let description = query_info.tagged_key.description(tcx); + let tagged_key = (query.form_tagged_key)(); + let description = tagged_key.description(tcx); if Some(count_printed) < limit_frames || limit_frames.is_none() { // Only print to stderr as many stack frames as `num_frames` when present. dcx.struct_failure_note(format!( "#{count_printed} [{query_name}] {description}", - query_name = query_info.tagged_key.query_name(), + query_name = tagged_key.query_name(), )) - .with_span(query_info.job.span) + .with_span(query.span) .emit(); count_printed += 1; } @@ -394,11 +357,11 @@ pub fn print_query_stack<'tcx>( let _ = writeln!( file, "#{count_total} [{query_name}] {description}", - query_name = query_info.tagged_key.query_name(), + query_name = tagged_key.query_name(), ); } - current_query = query_info.job.parent; + current_query = query.parent; count_total += 1; } diff --git a/compiler/rustc_query_impl/src/lib.rs b/compiler/rustc_query_impl/src/lib.rs index d5133aa04dccc..51cc480812ed2 100644 --- a/compiler/rustc_query_impl/src/lib.rs +++ b/compiler/rustc_query_impl/src/lib.rs @@ -8,7 +8,7 @@ #![feature(try_blocks)] // tidy-alphabetical-end -use rustc_data_structures::sync::{AtomicU64, Lock}; +use rustc_data_structures::sync::Lock; use rustc_middle::dep_graph; use rustc_middle::queries::{ExternProviders, Providers}; use rustc_middle::query::on_disk_cache::OnDiskCache; @@ -16,8 +16,7 @@ use rustc_middle::query::{QueryCache, QuerySystem, QueryVTable}; use rustc_middle::ty::TyCtxt; pub use crate::dep_kind_vtables::make_dep_kind_vtables; -pub use crate::execution::{CollectActiveJobsKind, collect_active_query_jobs}; -pub use crate::job::{QueryJobMap, break_query_cycle, print_query_stack}; +pub use crate::job::{break_query_cycle, print_query_stack}; mod dep_kind_vtables; mod error; @@ -55,7 +54,6 @@ pub fn query_system<'tcx>( on_disk_cache, local_providers, extern_providers, - jobs: AtomicU64::new(1), cycle_handler_nesting: Lock::new(0), } } diff --git a/compiler/rustc_query_impl/src/plumbing.rs b/compiler/rustc_query_impl/src/plumbing.rs index 11f960598c387..339d27b73dc63 100644 --- a/compiler/rustc_query_impl/src/plumbing.rs +++ b/compiler/rustc_query_impl/src/plumbing.rs @@ -1,5 +1,3 @@ -use std::num::NonZero; - use rustc_data_structures::unord::UnordMap; use rustc_hir::limit::Limit; use rustc_middle::bug; @@ -8,7 +6,7 @@ use rustc_middle::dep_graph::DepKindVTable; use rustc_middle::dep_graph::{DepNode, DepNodeKey, SerializedDepNodeIndex}; use rustc_middle::query::erase::{Erasable, Erased}; use rustc_middle::query::on_disk_cache::{CacheDecoder, CacheEncoder}; -use rustc_middle::query::{QueryCache, QueryJobId, QueryMode, QueryVTable, erase}; +use rustc_middle::query::{QueryCache, QueryJobRef, QueryMode, QueryVTable, erase}; use rustc_middle::ty::TyCtxt; use rustc_middle::ty::tls::{self, ImplicitCtxt}; use rustc_serialize::{Decodable, Encodable}; @@ -16,14 +14,11 @@ use rustc_span::DUMMY_SP; use rustc_span::def_id::LOCAL_CRATE; use crate::error::{QueryOverflow, QueryOverflowNote}; -use crate::execution::all_inactive; use crate::job::find_dep_kind_root; use crate::query_impl::for_each_query_vtable; -use crate::{CollectActiveJobsKind, collect_active_query_jobs}; -fn depth_limit_error<'tcx>(tcx: TyCtxt<'tcx>, job: QueryJobId) { - let job_map = collect_active_query_jobs(tcx, CollectActiveJobsKind::Full); - let (span, desc, depth) = find_dep_kind_root(tcx, job, job_map); +fn depth_limit_error<'a, 'tcx>(tcx: TyCtxt<'tcx>, job: QueryJobRef<'a, 'tcx>) { + let (span, desc, depth) = find_dep_kind_root(tcx, job); let suggested_limit = match tcx.recursion_limit() { Limit(0) => Limit(2), @@ -38,34 +33,22 @@ fn depth_limit_error<'tcx>(tcx: TyCtxt<'tcx>, job: QueryJobId) { }); } -#[inline] -pub(crate) fn next_job_id<'tcx>(tcx: TyCtxt<'tcx>) -> QueryJobId { - QueryJobId( - NonZero::new(tcx.query_system.jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed)) - .unwrap(), - ) -} - -#[inline] -pub(crate) fn current_query_job() -> Option { - tls::with_context(|icx| icx.query) -} - /// Executes a job by changing the `ImplicitCtxt` to point to the new query job while it executes. #[inline(always)] -pub(crate) fn start_query( - job_id: QueryJobId, +pub(crate) fn start_query<'a, 'tcx, R>( + tcx: TyCtxt<'tcx>, + job: QueryJobRef<'a, 'tcx>, depth_limit: bool, compute: impl FnOnce() -> R, ) -> R { - tls::with_context(move |icx| { - if depth_limit && !icx.tcx.recursion_limit().value_within_limit(icx.query_depth) { - depth_limit_error(icx.tcx, job_id); + tls::with_related_context(tcx, move |icx| { + if depth_limit && !tcx.recursion_limit().value_within_limit(icx.query_depth) { + depth_limit_error(tcx, job); } // Update the `ImplicitCtxt` to point to our new query job. let icx = ImplicitCtxt { - query: Some(job_id), + query: Some(job), query_depth: icx.query_depth + if depth_limit { 1 } else { 0 }, ..*icx }; @@ -91,9 +74,8 @@ fn encode_query_values_inner<'a, 'tcx, C, V>( { let _timer = tcx.prof.generic_activity_with_arg("encode_query_results_for", query.name); - assert!(all_inactive(&query.state)); - query.cache.for_each(&mut |key, value, dep_node| { - if (query.will_cache_on_disk_for_key_fn)(*key) { + query.cache.for_each(|key, value, dep_node| { + if (query.will_cache_on_disk_for_key_fn)(key) { encoder.encode_query_value::(dep_node, &erase::restore_val::(*value)); } }); @@ -116,10 +98,10 @@ fn verify_query_key_hashes_inner<'tcx, C: QueryCache>( let _timer = tcx.prof.generic_activity_with_arg("query_key_hash_verify_for", query.name); let cache = &query.cache; - let mut map = UnordMap::with_capacity(cache.len()); - cache.for_each(&mut |key, _, _| { - let node = DepNode::construct(tcx, query.dep_kind, key); - if let Some(other_key) = map.insert(node, *key) { + let mut map = UnordMap::default(); + cache.for_each(|key, _, _| { + let node = DepNode::construct(tcx, query.dep_kind, &key); + if let Some(other_key) = map.insert(node, key) { bug!( "query key:\n\ `{:?}`\n\ @@ -156,9 +138,10 @@ pub(crate) fn promote_from_disk_inner<'tcx, C: QueryCache>( return; } - match query.cache.lookup(&key) { + let entry = query.cache.lookup(key); + match entry.try_start() { // If the value is already in memory, then promotion isn't needed. - Some(_) => {} + None => {} // "Execute" the query to load its disk-cached value into memory. // @@ -167,8 +150,8 @@ pub(crate) fn promote_from_disk_inner<'tcx, C: QueryCache>( // // FIXME(Zalathar): Is there a reasonable way to skip more of the // query bookkeeping when doing this? - None => { - (query.execute_query_fn)(tcx, DUMMY_SP, key, QueryMode::Get); + Some(entry) => { + (query.execute_query_fn)(tcx, DUMMY_SP, key, QueryMode::Get { entry }); } } } diff --git a/compiler/rustc_query_impl/src/profiling_support.rs b/compiler/rustc_query_impl/src/profiling_support.rs index 980e2b1305245..a576e85b42ae4 100644 --- a/compiler/rustc_query_impl/src/profiling_support.rs +++ b/compiler/rustc_query_impl/src/profiling_support.rs @@ -224,7 +224,7 @@ fn alloc_self_profile_query_strings_inner<'tcx, C>( // locked while doing so. Instead we copy out the // `(query_key, dep_node_index)` pairs and release the lock again. let mut query_keys_and_indices = Vec::new(); - query.cache.for_each(&mut |k, _, i| query_keys_and_indices.push((*k, i))); + query.cache.for_each(|k, _, i| query_keys_and_indices.push((k, i))); // Now actually allocate the strings. If allocating the strings // generates new entries in the query cache, we'll miss them but @@ -252,7 +252,7 @@ fn alloc_self_profile_query_strings_inner<'tcx, C>( // instead of passing the `DepNodeIndex` to `finish_with_query_invocation_id`, // when recording the event in the first place. let mut query_invocation_ids = Vec::new(); - query.cache.for_each(&mut |_, _, i| { + query.cache.for_each(|_, _, i| { query_invocation_ids.push(i.into()); }); diff --git a/compiler/rustc_query_impl/src/query_impl.rs b/compiler/rustc_query_impl/src/query_impl.rs index 4425acc6b86b8..a436263ab550b 100644 --- a/compiler/rustc_query_impl/src/query_impl.rs +++ b/compiler/rustc_query_impl/src/query_impl.rs @@ -50,11 +50,11 @@ macro_rules! define_queries { // Adding `__rust_end_short_backtrace` marker to backtraces so that we emit the frames // when `RUST_BACKTRACE=1`, add a new mod with `$name` here is to allow duplicate naming #[inline(never)] - pub(crate) fn __rust_end_short_backtrace<'tcx>( + pub(crate) fn __rust_end_short_backtrace<'a, 'tcx>( tcx: TyCtxt<'tcx>, span: Span, key: Key<'tcx>, - mode: QueryMode, + mode: QueryMode<'a, Erased>>, ) -> Option>> { #[cfg(debug_assertions)] let _guard = tracing::span!(tracing::Level::TRACE, stringify!($name), ?key).entered(); @@ -77,13 +77,14 @@ macro_rules! define_queries { tcx: TyCtxt<'tcx>, span: Span, key: Key<'tcx>, - __mode: QueryMode, + mode: QueryMode<'_, Erased>>, ) -> Option>> { Some(crate::execution::execute_query_non_incr_inner( &tcx.query_system.query_vtables.$name, tcx, span, key, + mode )) } } @@ -151,7 +152,6 @@ macro_rules! define_queries { depth_limit: $depth_limit, feedable: $feedable, dep_kind: rustc_middle::dep_graph::DepKind::$name, - state: Default::default(), cache: Default::default(), invoke_provider_fn: self::invoke_provider_fn::__rust_begin_short_backtrace, @@ -173,14 +173,14 @@ macro_rules! define_queries { try_load_from_disk_fn: |_tcx, _prev_index| None, #[cfg($handle_cycle_error)] - handle_cycle_error_fn: |tcx, key, cycle, err| { + handle_cycle_error_fn: |tcx, key, cycle| { use rustc_middle::query::erase::erase_val; - erase_val($crate::handle_cycle_error::$name(tcx, key, cycle, err)) + erase_val($crate::handle_cycle_error::$name(tcx, key, cycle)) }, #[cfg(not($handle_cycle_error))] - handle_cycle_error_fn: |_tcx, _key, _cycle, err| { - $crate::handle_cycle_error::default(err) + handle_cycle_error_fn: |tcx, _key, cycle| { + $crate::handle_cycle_error::default(tcx, cycle) }, #[cfg($no_hash)] diff --git a/compiler/rustc_span/src/fatal_error.rs b/compiler/rustc_span/src/fatal_error.rs index 5e2d82681a11a..84f6b18f2a4c0 100644 --- a/compiler/rustc_span/src/fatal_error.rs +++ b/compiler/rustc_span/src/fatal_error.rs @@ -12,6 +12,7 @@ pub use rustc_data_structures::FatalErrorMarker; impl !Send for FatalError {} impl FatalError { + #[cold] pub fn raise(self) -> ! { std::panic::resume_unwind(Box::new(FatalErrorMarker)) } diff --git a/compiler/rustc_thread_pool/src/lib.rs b/compiler/rustc_thread_pool/src/lib.rs index 7ce7fbc27eabe..05d95341df3bc 100644 --- a/compiler/rustc_thread_pool/src/lib.rs +++ b/compiler/rustc_thread_pool/src/lib.rs @@ -109,7 +109,8 @@ pub use self::thread_pool::{ /// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum. /// /// The value may vary between different targets, and is subject to change in new Rayon versions. -pub fn max_num_threads() -> usize { +#[inline(always)] +pub const fn max_num_threads() -> usize { // We are limited by the bits available in the sleep counter's `AtomicUsize`. crate::sleep::THREADS_MAX } diff --git a/compiler/rustc_thread_pool/src/registry.rs b/compiler/rustc_thread_pool/src/registry.rs index 9510c1842f86a..85b2488b7c33a 100644 --- a/compiler/rustc_thread_pool/src/registry.rs +++ b/compiler/rustc_thread_pool/src/registry.rs @@ -320,6 +320,11 @@ impl Registry { } pub fn current() -> Arc { + Self::with_current(Arc::clone) + } + + #[inline] + pub fn with_current(f: impl FnOnce(&Arc) -> R) -> R { unsafe { let worker_thread = WorkerThread::current(); let registry = if worker_thread.is_null() { @@ -327,7 +332,7 @@ impl Registry { } else { &(*worker_thread).registry }; - Arc::clone(registry) + f(registry) } } @@ -360,7 +365,7 @@ impl Registry { RegistryId { addr: self as *const Self as usize } } - pub(super) fn num_threads(&self) -> usize { + pub fn num_threads(&self) -> usize { self.thread_infos.len() } @@ -628,8 +633,8 @@ pub fn mark_blocked() { /// Mark a previously blocked Rayon worker thread as unblocked #[inline] -pub fn mark_unblocked(registry: &Registry) { - registry.sleep.mark_unblocked() +pub fn mark_unblocked(registry: &Registry, thread_count: usize) { + registry.sleep.mark_unblocked(thread_count) } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] diff --git a/compiler/rustc_thread_pool/src/sleep/counters.rs b/compiler/rustc_thread_pool/src/sleep/counters.rs index f2682028b96a6..6909e38c18425 100644 --- a/compiler/rustc_thread_pool/src/sleep/counters.rs +++ b/compiler/rustc_thread_pool/src/sleep/counters.rs @@ -53,11 +53,7 @@ impl JobsEventCounter { } /// Number of bits used for the thread counters. -#[cfg(target_pointer_width = "64")] -const THREADS_BITS: usize = 16; - -#[cfg(target_pointer_width = "32")] -const THREADS_BITS: usize = 8; +const THREADS_BITS: usize = 5; /// Bits to shift to select the sleeping threads /// (used with `select_bits`). diff --git a/compiler/rustc_thread_pool/src/sleep/mod.rs b/compiler/rustc_thread_pool/src/sleep/mod.rs index cbaead52902dc..a8916bb74b2eb 100644 --- a/compiler/rustc_thread_pool/src/sleep/mod.rs +++ b/compiler/rustc_thread_pool/src/sleep/mod.rs @@ -114,12 +114,12 @@ impl Sleep { /// Mark a previously blocked Rayon worker thread as unblocked #[inline] - pub(super) fn mark_unblocked(&self) { + pub(super) fn mark_unblocked(&self, thread_count: usize) { let mut data = self.data.lock().unwrap(); - debug_assert!(data.active_threads < data.worker_count); - debug_assert!(data.blocked_threads > 0); - data.active_threads += 1; - data.blocked_threads -= 1; + debug_assert!(data.active_threads + thread_count <= data.worker_count); + debug_assert!(data.blocked_threads >= thread_count); + data.active_threads += thread_count; + data.blocked_threads -= thread_count; } #[inline]