Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.stack-work/
dist-newstyle/
3 changes: 1 addition & 2 deletions stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
#
# resolver: ./custom-snapshot.yaml
# resolver: https://example.com/snapshots/2018-01-01.yaml
resolver:
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/8.yaml
resolver: lts-20.19

# User packages to be built.
# Various formats can be used as shown in the example below.
Expand Down
9 changes: 4 additions & 5 deletions stack.yaml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
packages: []
snapshots:
- completed:
size: 587126
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/8.yaml
sha256: 93a107557e8691ed5ca17beaee41e68222b142c48868fc8c04a4181fb233477d
original:
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/8.yaml
sha256: 42f77c84b34f68c30c2cd0bf8c349f617a0f428264362426290847a6a2019b64
size: 649618
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/20/19.yaml
original: lts-20.19
8 changes: 8 additions & 0 deletions thread-utils-context/ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# Changelog for thread-utils-context

## 0.3.0.4

- Fix compilation on GHC 8.12

## 0.3.0.3

- Fix compilation of purgeDeadThreads on GHC 9.6

## Unreleased changes
15 changes: 13 additions & 2 deletions thread-utils-context/package.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
name: thread-utils-context
version: 0.1.0.0
version: 0.3.0.4
github: "iand675/thread-utils"
license: BSD3
author: "Ian Duncan"
maintainer: "ian@iankduncan.com"
copyright: "2021 Ian Duncan"
copyright: "2023 Ian Duncan"

extra-source-files:
- README.md
Expand All @@ -25,8 +25,17 @@ dependencies:
- containers
- ghc-prim

flags:
debug:
description: Whether to enable some additional hooks to debug issues
default: false
manual: true

library:
source-dirs: src
when:
- condition: flag(debug)
cpp-options: -DDEBUG_HOOKS

tests:
thread-utils-context-test:
Expand All @@ -38,3 +47,5 @@ tests:
- -with-rtsopts=-N
dependencies:
- thread-utils-context
- hspec
- hspec-expectations
132 changes: 95 additions & 37 deletions thread-utils-context/src/Control/Concurrent/Thread/Storage.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnliftedFFITypes #-}
{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE BangPatterns #-}
-- | A perilous implementation of thread-local storage for Haskell.
-- This module uses a fair amount of GHC internals to enable performing
-- lookups of context for any threads that are alive. Caution should be
Expand Down Expand Up @@ -30,6 +32,9 @@ module Control.Concurrent.Thread.Storage
-- * Retrieve values from a 'ThreadStorageMap'
, lookup
, lookupOnThread
-- * Update values in a 'ThreadStorageMap'
, update
, updateOnThread
-- * Associate values with a thread in a 'ThreadStorageMap'
, attach
, attachOnThread
Expand All @@ -41,52 +46,66 @@ module Control.Concurrent.Thread.Storage
, adjustOnThread
-- * Monitoring utilities
, storedItems
-- * Thread ID manipulation
, getThreadId
#if MIN_VERSION_base(4,18,0)
, purgeDeadThreads
#endif
) where

import Control.Concurrent
import Control.Concurrent.Thread.Finalizers
import Control.Monad ( void )
import Control.Monad ( when, void, forM_ )
import Control.Monad.IO.Class
import GHC.IO (IO(..))
import Data.Maybe (isNothing, isJust)
import Data.Word (Word64)
import GHC.Base (Addr#)
import GHC.IO (IO(..), mask_)
import GHC.Int
#if MIN_VERSION_base(4,18,0)
import GHC.Conc (listThreads)
#endif
import GHC.Conc.Sync ( ThreadId(..) )
import GHC.Prim
import qualified Data.IntMap.Lazy as I
import qualified Data.IntMap.Strict as I
import qualified Data.IntSet as IS
import Foreign.C.Types
import Prelude hiding (lookup)
import GHC.Exts (unsafeCoerce#)

foreign import ccall unsafe "rts_getThreadId" c_getThreadId :: ThreadId# -> CInt
foreign import ccall unsafe "rts_getThreadId" c_getThreadId :: Addr# -> CULLong

numStripes :: Int
numStripes :: Word
numStripes = 32

getThreadId :: ThreadId -> Int
getThreadId (ThreadId tid#) = fromIntegral (c_getThreadId tid#)
getThreadId :: ThreadId -> Word
getThreadId (ThreadId tid#) = fromIntegral (c_getThreadId (unsafeCoerce# tid#))

threadHash :: Int -> Int
threadHash = (`mod` numStripes)
stripeHash :: Word -> Int
stripeHash = fromIntegral . (`mod` numStripes)

readStripe :: ThreadStorageMap a -> ThreadId -> IO (I.IntMap a)
readStripe (ThreadStorageMap arr#) t = IO $ \s -> readArray# arr# tid# s
where
(I# tid#) = threadHash $ getThreadId t
(I# tid#) = stripeHash $ getThreadId t

atomicModifyStripe :: ThreadStorageMap a -> Int -> (I.IntMap a -> (I.IntMap a, b)) -> IO b
atomicModifyStripe :: ThreadStorageMap a -> Word -> (I.IntMap a -> (I.IntMap a, b)) -> IO b
atomicModifyStripe (ThreadStorageMap arr#) tid f = IO $ \s -> go s
where
(I# stripe#) = threadHash tid
(I# stripe#) = fromIntegral $ stripeHash tid
go s = case readArray# arr# stripe# s of
(# s1, intMap #) ->
(# s1, intMap #) ->
let (updatedIntMap, result) = f intMap
in case casArray# arr# stripe# intMap updatedIntMap s1 of
(# s2, outcome, old #) -> case outcome of
0# -> (# s2, result #)
0# -> updatedIntMap `seq` (# s2, result #)
1# -> go s2
_ -> error "Got impossible result in atomicModifyStripe"

-- | A storage mechanism for values of a type. This structure retains items
-- on per-(green)thread basis, which can be useful in rare cases.
data ThreadStorageMap a = ThreadStorageMap (MutableArray# RealWorld (I.IntMap a))
data ThreadStorageMap a = ThreadStorageMap
(MutableArray# RealWorld (I.IntMap a))

-- | Create a new thread storage map. The map is striped by thread
-- into 32 sections in order to reduce contention.
Expand All @@ -95,7 +114,7 @@ newThreadStorageMap
newThreadStorageMap = liftIO $ IO $ \s -> case newArray# numStripes# mempty s of
(# s1, ma #) -> (# s1, ThreadStorageMap ma #)
where
(I# numStripes#) = numStripes
(I# numStripes#) = fromIntegral numStripes

-- | Retrieve a value if it exists for the current thread
lookup :: MonadIO m => ThreadStorageMap a -> m (Maybe a)
Expand All @@ -106,22 +125,24 @@ lookup tsm = liftIO $ do
-- | Retrieve a value if it exists for the specified thread
lookupOnThread :: MonadIO m => ThreadStorageMap a -> ThreadId -> m (Maybe a)
lookupOnThread tsm tid = liftIO $ do
let threadAsInt = getThreadId tid
m <- readStripe tsm tid
pure $ I.lookup threadAsInt m
where
threadAsInt = fromIntegral $ getThreadId tid

-- | Associate the provided value with the current thread
attach :: MonadIO m => ThreadStorageMap a -> a -> m ()
-- | Associate the provided value with the current thread.
--
-- Returns the previous value if it was set.
attach :: MonadIO m => ThreadStorageMap a -> a -> m (Maybe a)
attach tsm x = liftIO $ do
tid <- myThreadId
attachOnThread tsm tid x

-- | Associate the provided value with the specified thread
attachOnThread :: MonadIO m => ThreadStorageMap a -> ThreadId -> a -> m ()
attachOnThread tsm tid ctxt = liftIO $ do
let threadAsInt = getThreadId tid
addThreadFinalizer tid $ cleanUp tsm threadAsInt
atomicModifyStripe tsm threadAsInt $ \m -> (I.insert threadAsInt ctxt m, ())
-- | Associate the provided value with the specified thread. This replaces
-- any values already associated with the 'ThreadId'.
attachOnThread :: MonadIO m => ThreadStorageMap a -> ThreadId -> a -> m (Maybe a)
attachOnThread tsm tid ctxt =
updateOnThread tsm tid (\prev -> (Just ctxt, prev))

-- | Disassociate the associated value from the current thread, returning it if it exists.
detach :: MonadIO m => ThreadStorageMap a -> m (Maybe a)
Expand All @@ -133,7 +154,32 @@ detach tsm = liftIO $ do
detachFromThread :: MonadIO m => ThreadStorageMap a -> ThreadId -> m (Maybe a)
detachFromThread tsm tid = liftIO $ do
let threadAsInt = getThreadId tid
atomicModifyStripe tsm threadAsInt $ \m -> (I.delete threadAsInt m, I.lookup threadAsInt m)
updateOnThread tsm tid (\prev -> (Nothing, prev))

-- | The most general function in this library. Update a 'ThreadStorageMap' on a given thread,
-- with the ability to add or remove values and return some sort of result.
updateOnThread :: MonadIO m => ThreadStorageMap a -> ThreadId -> (Maybe a -> (Maybe a, b)) -> m b
updateOnThread tsm tid f = liftIO $ mask_ $ do
-- ^ We mask here in order to ensure that the finalizer will always be created
(isNewThreadEntry, result) <- atomicModifyStripe tsm threadAsWord $ \m ->
let (resultWithNewThreadDetection, m') =
I.alterF
(\x -> case f x of
(!x', !y) -> ((isNothing x && isJust x', y), x')
)
(fromIntegral threadAsWord)
m
in (m', resultWithNewThreadDetection)
when isNewThreadEntry $ do
addThreadFinalizer tid $ cleanUp tsm threadAsWord
pure result
where
threadAsWord = getThreadId tid

update :: MonadIO m => ThreadStorageMap a -> (Maybe a -> (Maybe a, b)) -> m b
update tsm f = liftIO $ do
tid <- myThreadId
updateOnThread tsm tid f

-- | Update the associated value for the current thread if it is attached.
adjust :: MonadIO m => ThreadStorageMap a -> (a -> a) -> m ()
Expand All @@ -144,23 +190,35 @@ adjust tsm f = liftIO $ do
-- | Update the associated value for the specified thread if it is attached.
adjustOnThread :: MonadIO m => ThreadStorageMap a -> ThreadId -> (a -> a) -> m ()
adjustOnThread tsm tid f = liftIO $ do
let threadAsInt = getThreadId tid
atomicModifyStripe tsm threadAsInt $ \m -> (I.adjust f threadAsInt m, ())
atomicModifyStripe tsm threadAsWord $ \m -> (I.adjust f (fromIntegral threadAsWord) m, ())
where
threadAsWord = getThreadId tid

-- Remove this context for thread from the map on finalization
cleanUp :: ThreadStorageMap a -> Int -> IO ()
cleanUp tsm tid = atomicModifyStripe tsm tid $ \m ->
(I.delete tid m, ())
cleanUp :: ThreadStorageMap a -> Word -> IO ()
cleanUp tsm tid = do
atomicModifyStripe tsm tid $ \m ->
(I.delete (fromIntegral tid) m, ())

-- | List thread ids with live entries in the 'ThreadStorageMap'.
--
-- This is useful for monitoring purposes to verify that there
-- are no memory leaks retaining threads and thus preventing
-- items from being freed from a 'ThreadStorageMap'
storedItems :: ThreadStorageMap a -> IO [Int]
storedItems :: ThreadStorageMap a -> IO [(Int, a)]
storedItems tsm = do
stripes <- mapM (stripeByIndex tsm) [0..(numStripes - 1)]
pure $ concatMap I.keys stripes

stripeByIndex :: ThreadStorageMap a -> Int -> IO (I.IntMap a)
stripeByIndex (ThreadStorageMap arr#) (I# i#) = IO $ \s -> readArray# arr# i# s
stripes <- mapM (stripeByIndex tsm) [0..(fromIntegral numStripes - 1)]
pure $ concatMap I.toList stripes
where
stripeByIndex :: ThreadStorageMap a -> Int -> IO (I.IntMap a)
stripeByIndex (ThreadStorageMap arr#) (I# i#) = IO $ \s -> readArray# arr# i# s

#if MIN_VERSION_base(4,18,0)
-- | This should generally not be needed, but may be used to remove values prior to GC-triggered finalizers being run from the 'ThreadStorageMap' for threads that have exited.
purgeDeadThreads :: MonadIO m => ThreadStorageMap a -> m ()
purgeDeadThreads tsm = liftIO $ do
tids <- listThreads
let threadSet = IS.fromList $ map (fromIntegral . getThreadId) tids
forM_ [0..(numStripes - 1)] $ \stripe ->
atomicModifyStripe tsm stripe $ \im -> (I.restrictKeys im threadSet, ())
#endif
52 changes: 36 additions & 16 deletions thread-utils-context/test/Spec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,42 @@ import Control.Concurrent
import Control.Concurrent.MVar
import Control.Concurrent.Thread.Storage
import Control.Monad
import Data.List hiding (lookup)
import Test.Hspec
import Prelude hiding (lookup)

main :: IO ()
main = do
mv <- newEmptyMVar
tsm <- newThreadStorageMap
replicateM_ 20 $ do
forkIO $ do
myThreadId >>= print
attach tsm ()
readMVar mv
threadDelay 2_000_000
print =<< storedItems tsm
putMVar mv ()
threadDelay 2_000_000
performGC
print =<< storedItems tsm


main = hspec $ do

describe "cleanup" $ do
it "works" $ do
mv <- newEmptyMVar
tsm <- newThreadStorageMap
replicateM_ 100000 $ do
forkIO $ do
attach tsm ()
readMVar mv
threadDelay 10_000_000
putMVar mv ()
threadDelay 10_000_000
performGC
threadDelay 10_000_000
thingsStillInStorage <- storedItems tsm
sort thingsStillInStorage `shouldBe` []
it "doesn't happen while a thread is still alive" $ do
tsm <- newThreadStorageMap
mv <- newEmptyMVar
resultVar <- newEmptyMVar
forkIO $ do
attach tsm ()
readMVar mv
putMVar resultVar =<< lookup tsm
threadDelay 5_000_000
performGC
putMVar mv ()
result <- readMVar resultVar
result `shouldBe` Just ()
performGC
threadDelay 5_000_000
items <- storedItems tsm
items `shouldBe` []
Loading