Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve manager #156

Merged
merged 6 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 57 additions & 32 deletions Network/HTTP2/H2/Manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Network.HTTP2.H2.Manager (
stopAfter,
forkManaged,
forkManagedUnmask,
withTimeout,
forkManagedTimeout,
KilledByHttp2ThreadManager (..),
waitCounter0,
) where
Expand All @@ -19,8 +19,10 @@ import Control.Concurrent.STM
import Control.Exception
import qualified Control.Exception as E
import Data.Foldable
import Data.Map (Map)
import qualified Data.Map.Strict as Map
import Data.IORef
import Data.IntMap (IntMap)
import qualified Data.IntMap.Strict as Map
import System.Mem.Weak (Weak, deRefWeak)
import qualified System.TimeManager as T

import Imports
Expand All @@ -30,17 +32,15 @@ import Imports
-- | Manager to manage the thread and the timer.
data Manager = Manager T.Manager (TVar ManagedThreads)

type ManagedThreads = Map ThreadId TimeoutHandle
type ManagedThreads = IntMap ManagedThread

----------------------------------------------------------------

data TimeoutHandle
= ThreadWithTimeout T.Handle
| ThreadWithoutTimeout

cancelTimeout :: TimeoutHandle -> IO ()
cancelTimeout (ThreadWithTimeout th) = T.cancel th
cancelTimeout ThreadWithoutTimeout = return ()
-- 'IORef' prevents race between WAI TimeManager (TimeoutThread)
-- and stopAfter (KilledByHttp2ThreadManager).
-- It is initialized with 'False' and turned into 'True' when locked.
-- The winner can throw an asynchronous exception.
data ManagedThread = ManagedThread (Weak ThreadId) (IORef Bool)

----------------------------------------------------------------

Expand Down Expand Up @@ -74,10 +74,10 @@ stopAfter (Manager _timmgr var) action cleanup = do
m0 <- readTVar var
writeTVar var Map.empty
return m0
forM_ (Map.elems m) cancelTimeout
let er = either Just (const Nothing) ma
forM_ (Map.keys m) $ \tid ->
E.throwTo tid $ KilledByHttp2ThreadManager er
let ths = Map.elems m
er = either Just (const Nothing) ma
ex = KilledByHttp2ThreadManager er
forM_ ths $ \(ManagedThread wtid ref) -> lockAndKill wtid ref ex
case ma of
Left err -> cleanup (Just err) >> throwIO err
Right a -> cleanup Nothing >> return a
Expand All @@ -97,18 +97,44 @@ forkManaged mgr label io =
forkManagedUnmask
:: Manager -> String -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask (Manager _timmgr var) label io =
-- This is the top level of thread.
-- So, SomeException should be reasonable.
void $ mask_ $ forkIOWithUnmask $ \unmask -> E.handle ignore $ do
labelMe label
tid <- myThreadId
atomically $ modifyTVar var $ Map.insert tid ThreadWithoutTimeout
-- We catch the exception and do not rethrow it: we don't want the
-- exception printed to stderr.
io unmask `catch` ignore
atomically $ modifyTVar var $ Map.delete tid
where
ignore (E.SomeException _) = return ()
E.bracket (setup var) (clear var) $ \_ -> io unmask

forkManagedTimeout :: Manager -> String -> (T.Handle -> IO ()) -> IO ()
forkManagedTimeout (Manager timmgr var) label io =
void $ forkIO $ E.handle ignore $ do
labelMe label
E.bracket (setup var) (clear var) $ \(_n, wtid, ref) ->
-- 'TimeoutThread' is ignored by 'withHandle'.
T.withHandle timmgr (lockAndKill wtid ref T.TimeoutThread) io

setup :: TVar (IntMap ManagedThread) -> IO (Int, Weak ThreadId, IORef Bool)
setup var = do
(wtid, n) <- myWeakThradId
ref <- newIORef False
let ent = ManagedThread wtid ref
-- asking to throw KilledByHttp2ThreadManager to me
atomically $ modifyTVar' var $ Map.insert n ent
return (n, wtid, ref)

lockAndKill :: Exception e => Weak ThreadId -> IORef Bool -> e -> IO ()
lockAndKill wtid ref e = do
alreadyLocked <- atomicModifyIORef' ref (\b -> (True, b)) -- try to lock
unless alreadyLocked $ do
mtid <- deRefWeak wtid
case mtid of
Nothing -> return ()
Just tid -> E.throwTo tid e

clear
:: TVar (IntMap ManagedThread)
-> (Map.Key, Weak ThreadId, IORef Bool)
-> IO ()
clear var (n, _, _) = atomically $ modifyTVar' var $ Map.delete n

ignore :: KilledByHttp2ThreadManager -> IO ()
ignore (KilledByHttp2ThreadManager _) = return ()

waitCounter0 :: Manager -> IO ()
waitCounter0 (Manager _timmgr var) = atomically $ do
Expand All @@ -117,10 +143,9 @@ waitCounter0 (Manager _timmgr var) = atomically $ do

----------------------------------------------------------------

withTimeout :: Manager -> (T.Handle -> IO ()) -> IO ()
withTimeout (Manager timmgr var) action =
T.withHandleKillThread timmgr (return ()) $ \th -> do
tid <- myThreadId
-- overriding ThreadWithoutTimeout
atomically $ modifyTVar var $ Map.insert tid $ ThreadWithTimeout th
action th
myWeakThradId :: IO (Weak ThreadId, Int)
myWeakThradId = do
tid <- myThreadId
wtid <- mkWeakThreadId tid
let n = read (drop 9 $ show tid) -- drop "ThreadId "

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kazu-yamamoto I am late to the party but there’s fromThreadId in base. It allows you to get a stable integer identifier of a thread without resorting to the show instance.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find fromThreadId anywhere?
Which module exports it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromThreadId is very new, sigh.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I don't think the weak references are strictly necessary. Provided that we keep the Map in WHNF, and we remove entries whenever the corresponding threads die, we should not be unnecessarily retaining any threads.

return (wtid, n)
45 changes: 21 additions & 24 deletions Network/HTTP2/Server/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@ import Network.HTTP2.H2

runServer :: Config -> Server -> Launch
runServer conf server ctx@Context{..} strm req =
forkManaged threadManager label $
withTimeout threadManager $ \th -> do
-- FIXME: exception
let req' = pauseRequestBody th
aux = Aux th mySockAddr peerSockAddr
request = Request req'
lc <- newLoopCheck strm Nothing
server request aux $ sendResponse conf ctx lc strm request
adjustRxWindow ctx strm
forkManagedTimeout threadManager label $ \th -> do
let req' = pauseRequestBody th
aux = Aux th mySockAddr peerSockAddr
request = Request req'
lc <- newLoopCheck strm Nothing
server request aux $ sendResponse conf ctx lc strm request
adjustRxWindow ctx strm
where
label = "H2 response sender for stream " ++ show (streamNumber strm)
pauseRequestBody th = req{inpObjBody = readBody'}
Expand Down Expand Up @@ -169,21 +167,20 @@ sendStreaming
-> IO (TBQueue StreamingChunk)
sendStreaming Context{..} strm strmbdy = do
tbq <- newTBQueueIO 10 -- fixme: hard coding: 10
forkManaged threadManager label $
withTimeout threadManager $ \th ->
withOutBodyIface tbq id $ \iface -> do
let iface' =
iface
{ outBodyPush = \b -> do
T.pause th
outBodyPush iface b
T.resume th
, outBodyPushFinal = \b -> do
T.pause th
outBodyPushFinal iface b
T.resume th
}
strmbdy iface'
forkManagedTimeout threadManager label $ \th ->
withOutBodyIface tbq id $ \iface -> do
let iface' =
iface
{ outBodyPush = \b -> do
T.pause th
outBodyPush iface b
T.resume th
, outBodyPushFinal = \b -> do
T.pause th
outBodyPushFinal iface b
T.resume th
}
strmbdy iface'
return tbq
where
label = "H2 response streaming sender for " ++ show (streamNumber strm)
Loading