module IHP.Job.Runner.MainLoop
( runJobWorkers
, dedicatedProcessMainLoop
, devServerMainLoop
, installSignalHandlers
, stopExitHandler
) where
import IHP.Prelude
import IHP.ControllerPrelude
import IHP.ScriptSupport
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified IHP.PGListener as PGListener
import Control.Monad.Trans.Resource
import qualified Control.Exception.Safe as Exception
import qualified IHP.Log as Log
import Control.Concurrent.STM (atomically, writeTBQueue)
runJobWorkers :: [JobWorker] -> Script
runJobWorkers :: [JobWorker] -> Script
runJobWorkers [JobWorker]
jobWorkers = (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers
dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO ()
dedicatedProcessMainLoop :: (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers = do
threadId <- IO ThreadId
Concurrent.myThreadId
exitSignalsCount <- newIORef 0
workerId <- UUID.nextRandom
let logger = ?context::FrameworkConfig
FrameworkConfig
?context.logger
Log.info ("Starting worker " <> tshow workerId)
PGListener.withPGListener ?context.databaseUrl ?context.logger \PGListener
pgListener -> do
stopSignal <- IO (MVar (ZonkAny 0))
forall a. IO (MVar a)
Concurrent.newEmptyMVar
runResourceT do
waitForExitSignal <- liftIO installSignalHandlers
let jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }
processes <- jobWorkers
|> mapM (\(JobWorker JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)
liftIO waitForExitSignal
liftIO $ Log.info ("Waiting for jobs to complete. CTRL+C again to force exit" :: Text)
liftIO $ forEach processes \JobWorkerProcess { ReleaseKey
pollerReleaseKey :: ReleaseKey
pollerReleaseKey :: JobWorkerProcess -> ReleaseKey
pollerReleaseKey, Subscription
subscription :: Subscription
subscription :: JobWorkerProcess -> Subscription
subscription, TBQueue JobWorkerProcessMessage
action :: TBQueue JobWorkerProcessMessage
action :: JobWorkerProcess -> TBQueue JobWorkerProcessMessage
action, Maybe ReleaseKey
staleRecoveryReleaseKey :: Maybe ReleaseKey
staleRecoveryReleaseKey :: JobWorkerProcess -> Maybe ReleaseKey
staleRecoveryReleaseKey } -> do
Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener
ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
pollerReleaseKey
case Maybe ReleaseKey
staleRecoveryReleaseKey of
Just ReleaseKey
key -> ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
key
Maybe ReleaseKey
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
liftIO $ PGListener.stop pgListener
liftIO $ async do
waitForExitSignal
Log.info ("Canceling all running jobs. CTRL+C again to force exit" :: Text)
forEach processes \JobWorkerProcess { dispatcher :: JobWorkerProcess -> (ReleaseKey, Async ())
dispatcher = (ReleaseKey
dispatcherKey, Async ()
_) } -> do
ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
dispatcherKey
Concurrent.throwTo threadId Exit.ExitSuccess
pure ()
liftIO $ forEach processes \JobWorkerProcess { dispatcher :: JobWorkerProcess -> (ReleaseKey, Async ())
dispatcher = (ReleaseKey
_, Async ()
dispatcherAsync) } -> do
Async () -> IO ()
forall a. Async a -> IO a
Async.wait Async ()
dispatcherAsync
liftIO $ Concurrent.throwTo threadId Exit.ExitSuccess
devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop :: (?modelContext::ModelContext) =>
FrameworkConfig -> PGListener -> [JobWorker] -> IO ()
devServerMainLoop FrameworkConfig
frameworkConfig PGListener
pgListener [JobWorker]
jobWorkers = do
workerId <- IO UUID
UUID.nextRandom
let ?context = frameworkConfig
let logger = FrameworkConfig
frameworkConfig.logger
Log.info ("Starting worker " <> tshow workerId)
runResourceT do
let jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }
processes <- jobWorkers
|> mapM (\(JobWorker JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun) -> JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)
liftIO $ (forever (Concurrent.threadDelay maxBound)) `Exception.finally` do
forEach processes \JobWorkerProcess { TBQueue JobWorkerProcessMessage
action :: JobWorkerProcess -> TBQueue JobWorkerProcessMessage
action :: TBQueue JobWorkerProcessMessage
action } -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
installSignalHandlers :: IO (IO ())
installSignalHandlers :: IO (IO ())
installSignalHandlers = do
exitSignal <- IO (MVar ())
forall a. IO (MVar a)
Concurrent.newEmptyMVar
let catchHandler = MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar ()
exitSignal ()
Signals.installHandler Signals.sigINT (Signals.Catch catchHandler) Nothing
Signals.installHandler Signals.sigTERM (Signals.Catch catchHandler) Nothing
pure (Concurrent.takeMVar exitSignal)
stopExitHandler :: JobWorkerArgs -> IO a -> IO a
stopExitHandler :: forall a. JobWorkerArgs -> IO a -> IO a
stopExitHandler JobWorkerArgs { UUID
PGListener
ModelContext
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } IO a
main = IO a
main