{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Runner where
import IHP.Prelude
import IHP.ControllerPrelude
import IHP.ScriptSupport
import qualified IHP.Job.Queue as Queue
import qualified Control.Exception.Safe as Exception
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified System.Timeout as Timeout
import qualified IHP.PGListener as PGListener
import Control.Monad.Trans.Resource
import qualified IHP.Log as Log
import IHP.Hasql.FromRow (FromRowHasql)
import Control.Concurrent.STM (atomically, newTBQueue, readTBQueue, writeTBQueue, newTVarIO, readTVar, readTVarIO, writeTVar, modifyTVar', check)
import IHP.Job.Queue (tryWriteTBQueue)
runJobWorkers :: [JobWorker] -> Script
runJobWorkers :: [JobWorker] -> Script
runJobWorkers [JobWorker]
jobWorkers = (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers
dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO ()
dedicatedProcessMainLoop :: (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers = do
threadId <- IO ThreadId
Concurrent.myThreadId
exitSignalsCount <- newIORef 0
workerId <- UUID.nextRandom
let logger = ?context::FrameworkConfig
FrameworkConfig
?context.logger
Log.info ("Starting worker " <> tshow workerId)
PGListener.withPGListener ?context.databaseUrl ?context.logger \PGListener
pgListener -> do
stopSignal <- IO (MVar (ZonkAny 1))
forall a. IO (MVar a)
Concurrent.newEmptyMVar
runResourceT do
waitForExitSignal <- liftIO installSignalHandlers
let jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }
processes <- jobWorkers
|> mapM (\(JobWorker JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)
liftIO waitForExitSignal
liftIO $ Log.info ("Waiting for jobs to complete. CTRL+C again to force exit" :: Text)
liftIO $ forEach processes \JobWorkerProcess { ReleaseKey
pollerReleaseKey :: ReleaseKey
pollerReleaseKey :: JobWorkerProcess -> ReleaseKey
pollerReleaseKey, Subscription
subscription :: Subscription
subscription :: JobWorkerProcess -> Subscription
subscription, TBQueue JobWorkerProcessMessage
action :: TBQueue JobWorkerProcessMessage
action :: JobWorkerProcess -> TBQueue JobWorkerProcessMessage
action, Maybe ReleaseKey
staleRecoveryReleaseKey :: Maybe ReleaseKey
staleRecoveryReleaseKey :: JobWorkerProcess -> Maybe ReleaseKey
staleRecoveryReleaseKey } -> do
Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener
ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
pollerReleaseKey
case Maybe ReleaseKey
staleRecoveryReleaseKey of
Just ReleaseKey
key -> ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
key
Maybe ReleaseKey
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
liftIO $ PGListener.stop pgListener
liftIO $ async do
waitForExitSignal
Log.info ("Canceling all running jobs. CTRL+C again to force exit" :: Text)
forEach processes \JobWorkerProcess { dispatcher :: JobWorkerProcess -> (ReleaseKey, Async ())
dispatcher = (ReleaseKey
dispatcherKey, Async ()
_) } -> do
ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
dispatcherKey
Concurrent.throwTo threadId Exit.ExitSuccess
pure ()
liftIO $ forEach processes \JobWorkerProcess { dispatcher :: JobWorkerProcess -> (ReleaseKey, Async ())
dispatcher = (ReleaseKey
_, Async ()
dispatcherAsync) } -> do
Async () -> IO ()
forall a. Async a -> IO a
Async.wait Async ()
dispatcherAsync
liftIO $ Concurrent.throwTo threadId Exit.ExitSuccess
devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop :: (?modelContext::ModelContext) =>
FrameworkConfig -> PGListener -> [JobWorker] -> IO ()
devServerMainLoop FrameworkConfig
frameworkConfig PGListener
pgListener [JobWorker]
jobWorkers = do
workerId <- IO UUID
UUID.nextRandom
let ?context = frameworkConfig
let logger = FrameworkConfig
frameworkConfig.logger
Log.info ("Starting worker " <> tshow workerId)
runResourceT do
let jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }
processes <- jobWorkers
|> mapM (\(JobWorker JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun) -> JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)
liftIO $ (forever (Concurrent.threadDelay maxBound)) `Exception.finally` do
forEach processes \JobWorkerProcess { TBQueue JobWorkerProcessMessage
action :: JobWorkerProcess -> TBQueue JobWorkerProcessMessage
action :: TBQueue JobWorkerProcessMessage
action } -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
installSignalHandlers :: IO (IO ())
installSignalHandlers :: IO (IO ())
installSignalHandlers = do
exitSignal <- IO (MVar ())
forall a. IO (MVar a)
Concurrent.newEmptyMVar
let catchHandler = MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar ()
exitSignal ()
Signals.installHandler Signals.sigINT (Signals.Catch catchHandler) Nothing
Signals.installHandler Signals.sigTERM (Signals.Catch catchHandler) Nothing
pure (Concurrent.takeMVar exitSignal)
stopExitHandler :: JobWorkerArgs -> p -> p
stopExitHandler JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } p
main = p
main
worker :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FromRowHasql job
, Show (PrimaryKey (GetTableName job))
, KnownSymbol (GetTableName job)
, SetField "attemptsCount" job Int
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, SetField "runAt" job UTCTime
, HasField "runAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, Table job
) => JobWorker
worker :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
Show (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
SetField "runAt" job UTCTime, HasField "runAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, Table job) =>
JobWorker
worker = (JobWorkerArgs -> ResourceT IO JobWorkerProcess) -> JobWorker
JobWorker (forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
Show (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
SetField "runAt" job UTCTime, HasField "runAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop @job)
jobWorkerFetchAndRunLoop :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FromRowHasql job
, Show (PrimaryKey (GetTableName job))
, KnownSymbol (GetTableName job)
, SetField "attemptsCount" job Int
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, SetField "runAt" job UTCTime
, HasField "runAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, Table job
) => JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
Show (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
SetField "runAt" job UTCTime, HasField "runAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } = do
let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
let ?modelContext = ?modelContext::ModelContext
ModelContext
modelContext
action <- IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage)
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage))
-> IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage)
forall a. STM a -> IO a
atomically (STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage))
-> STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue JobWorkerProcessMessage)
forall a. Natural -> STM (TBQueue a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (forall job. Job job => Int
maxConcurrency @job))
liftIO $ atomically $ writeTBQueue action JobAvailable
activeCount <- liftIO $ newTVarIO (0 :: Int)
activeWorkers <- liftIO $ newTVarIO ([] :: [Async ()])
let runJobLoop = do
fetchResult <- IO (Maybe job) -> IO (Either SomeException (Maybe job))
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> m (Either SomeException a)
Exception.tryAny (forall job.
(?modelContext::ModelContext,
job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
Show (PrimaryKey (GetTableName job)), Table job) =>
UUID -> IO (Maybe job)
Queue.fetchNextJob @job UUID
workerId)
case fetchResult of
Left SomeException
exception -> do
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (Text
"Job worker: Failed to fetch next job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception)
Int -> IO ()
Concurrent.threadDelay Int
1000000
Right (Just job
job) -> do
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> job -> Text
forall a. Show a => a -> Text
tshow job
job)
let ?job = job
?job::job
job
let Int
timeout :: Int = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (-Int
1) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job)
resultOrException <- IO (Maybe ()) -> IO (Either SomeException (Maybe ()))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Exception.tryAsync (Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
Timeout.timeout Int
timeout (job -> IO ()
forall job.
(Job job, ?modelContext::ModelContext,
?context::FrameworkConfig) =>
job -> IO ()
perform job
job))
case resultOrException of
Left SomeException
exception -> do
job -> SomeException -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
SetField "runAt" job UTCTime, HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> SomeException -> IO ()
Queue.jobDidFail job
job SomeException
exception
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SomeException -> Bool
forall e. Exception e => e -> Bool
Exception.isAsyncException SomeException
exception) (SomeException -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
Exception.throwIO SomeException
exception)
Right Maybe ()
Nothing -> job -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
SetField "runAt" job UTCTime, HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> IO ()
Queue.jobDidTimeout job
job
Right (Just ()
_) -> job -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> IO ()
Queue.jobDidSucceed job
job
runJobLoop
Right Maybe job
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
let dispatcherLoop = do
msg <- STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a. STM a -> IO a
atomically (STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage)
-> STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage -> STM JobWorkerProcessMessage
forall a. TBQueue a -> STM a
readTBQueue TBQueue JobWorkerProcessMessage
action
case msg of
JobWorkerProcessMessage
Stop -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
count <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeCount
check (count == 0)
JobWorkerProcessMessage
JobAvailable -> do
acquired <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
count <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeCount
if count < maxConcurrency @job
then do
writeTVar activeCount (count + 1)
pure True
else pure False
when acquired do
selfVar <- Concurrent.newEmptyMVar
workerAsync <- async $
(do self <- Concurrent.readMVar selfVar
runJobLoop)
`Exception.finally`
(do maybeSelf <- Concurrent.tryReadMVar selfVar
atomically do
modifyTVar' activeCount (subtract 1)
case maybeSelf of
Just Async ()
self -> TVar [Async ()] -> ([Async ()] -> [Async ()]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar [Async ()]
activeWorkers ((Async () -> Bool) -> [Async ()] -> [Async ()]
forall a. (a -> Bool) -> [a] -> [a]
filter (Async () -> Async () -> Bool
forall a. Eq a => a -> a -> Bool
/= Async ()
self))
Maybe (Async ())
Nothing -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Concurrent.putMVar selfVar workerAsync
atomically $ modifyTVar' activeWorkers (workerAsync :)
dispatcherLoop
let cancelAllWorkers = do
workers <- TVar [Async ()] -> IO [Async ()]
forall a. TVar a -> IO a
readTVarIO TVar [Async ()]
activeWorkers
mapM_ Async.cancel workers
dispatcher <- allocate (async (dispatcherLoop `Exception.finally` cancelAllWorkers)) cancel
(subscription, pollerReleaseKey) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) action
staleRecoveryReleaseKey <- case staleJobTimeout @job of
Just NominalDiffTime
threshold -> do
let intervalMicroseconds :: Int
intervalMicroseconds = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime
threshold NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Fractional a => a -> a -> a
/ NominalDiffTime
2) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000
let recoveryLoop :: IO (ZonkAny 0)
recoveryLoop = IO () -> IO (ZonkAny 0)
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
forall job.
(?modelContext::ModelContext, Table job) =>
NominalDiffTime -> IO ()
Queue.recoverStaleJobs @job NominalDiffTime
threshold
_ <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM Bool
forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
JobAvailable
pure ()
Concurrent.threadDelay intervalMicroseconds
(key, _) <- IO (Async (ZonkAny 0))
-> (Async (ZonkAny 0) -> IO ())
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (IO (ZonkAny 0) -> IO (Async (ZonkAny 0))
forall a. IO a -> IO (Async a)
Async.async IO (ZonkAny 0)
recoveryLoop) Async (ZonkAny 0) -> IO ()
forall a. Async a -> IO ()
Async.cancel
pure (Just key)
Maybe NominalDiffTime
Nothing -> Maybe ReleaseKey -> ResourceT IO (Maybe ReleaseKey)
forall a. a -> ResourceT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReleaseKey
forall a. Maybe a
Nothing
pure JobWorkerProcess { dispatcher, subscription, pollerReleaseKey, action, staleRecoveryReleaseKey, activeCount }