Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions src/Streamly/Extra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedLabels #-}

module Streamly.Extra where
Expand All @@ -16,6 +17,7 @@ import Control.Concurrent hiding (yield)
import qualified Control.Concurrent.STM.TChan as TChan
import Control.Monad.Catch (MonadMask)
import Control.Monad.Except (catchError, MonadError)
import Control.Monad (join)
import Control.Monad.IO.Class
import Control.Monad.Reader.Class
import qualified Control.Monad.STM as STM
Expand Down Expand Up @@ -154,7 +156,7 @@ collectTillEndOrTimeout keyFn isEnd timeout src =
*> atomicModifyIORef' hmRef (\hm -> (Map.insert b (tId, as') hm, ()))
)
((,) <$> liftIO (newIORef mempty) <*> liftIO (newIORef mempty))
(\(_, outRef) -> liftIO $ atomicModifyIORef' outRef (\mAs -> (Nothing, mAs))))
(\(_, outRef) -> liftIO $ atomicModifyIORef' outRef (Nothing,)))
src
incompletedSessionsStream c = SP.repeatM (liftIO $ STM.atomically $ TChan.readTChan c)

Expand Down Expand Up @@ -538,7 +540,7 @@ withRateGaugeWithElements tagGenerator src =
ask >>= measureAndRecord IN
where
measureAndRecord :: Direction -> LoggerConfig tag -> t m a
measureAndRecord direction (LoggerConfig { logger, samplingRate }) =
measureAndRecord direction LoggerConfig { logger, samplingRate } =
SP.mapMaybe id $
SP.scan (FL.Fold step begin end) withTimer
where
Expand Down Expand Up @@ -613,31 +615,36 @@ compress
. S.IsStream t
=> S.MonadAsync m
=> Monad (t m)
=> Int -- Compression Level ranging between 0,9 | 0 : lowest compression high Speed, 9 : highest compression but slow
=> Int -- Compression Level ranging between 0,9 | 0 : lowest compression high Speed, 9 : highest compression but slow
-> Zlib.WindowBits
-> t m BS.ByteString
-> t m BS.ByteString
compress compressionLevel stream = do
deflate <- SP.yieldM $ liftIO $ Zlib.initDeflate compressionLevel (Zlib.WindowBits 31) --for GZip compression WindowBits will be 31
SP.mapM
(\bs -> liftIO $ do
Zlib.feedDeflate deflate bs
popperRes <- Zlib.flushDeflate deflate
pure $ case popperRes of
Zlib.PRNext message -> message
_ -> mempty) stream
compress compressionLevel wb =
SP.mapMaybe id
. SP.scan (FL.Fold step begin done)
where
begin = (,Nothing) <$> liftIO (Zlib.initDeflate compressionLevel wb)
step (deflate, _) bs = liftIO $ do
Zlib.feedDeflate deflate bs
(\case
Zlib.PRNext message -> (deflate, Just message)
_ -> (deflate, Nothing)) <$> Zlib.flushDeflate deflate
done = pure . snd

decompress
:: forall t m
. S.IsStream t
=> S.MonadAsync m
=> Monad (t m)
=> t m BS.ByteString
=> Zlib.WindowBits
-> t m BS.ByteString
-> t m BS.ByteString
decompress stream = do
inflate <- SP.yieldM $ liftIO $ Zlib.initInflate (Zlib.WindowBits 31)
SP.mapM
(\bs ->
liftIO $ do
popper <- Zlib.feedInflate inflate bs
void popper
Zlib.flushInflate inflate) stream
decompress wb =
SP.mapMaybe id
. SP.scan (FL.Fold step begin done)
where
begin = (,Nothing) <$> liftIO (Zlib.initInflate wb)
step (inflate,_) bs = (inflate ,) . Just <$> liftIO (do
join $ Zlib.feedInflate inflate bs
Zlib.flushInflate inflate)
done = pure . snd