{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Runner.WorkerLoop
( worker
, jobWorkerFetchAndRunLoop
) where
import IHP.Prelude
import IHP.ControllerPrelude
import qualified IHP.Environment as Environment
import qualified IHP.Job.Queue as Queue
import qualified Control.Exception.Safe as Exception
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Timeout as Timeout
import Control.Monad.Trans.Resource
import qualified IHP.Log as Log
import IHP.Hasql.FromRow (FromRowHasql)
import Control.Concurrent.STM (atomically, newTBQueue, readTBQueue, writeTBQueue, newTVarIO, readTVar, readTVarIO, writeTVar, modifyTVar', check)
import IHP.Job.Queue (tryWriteTBQueue)
worker :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FromRowHasql job
, Show (PrimaryKey (GetTableName job))
, KnownSymbol (GetTableName job)
, HasField "id" job (Id' (GetTableName job))
, PrimaryKey (GetTableName job) ~ UUID
, HasField "runAt" job UTCTime
, HasField "attemptsCount" job Int
, Job job
, Show job
, Table job
) => JobWorker
worker :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
Show (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job),
HasField "id" job (Id' (GetTableName job)),
PrimaryKey (GetTableName job) ~ UUID, HasField "runAt" job UTCTime,
HasField "attemptsCount" job Int, Job job, Show job, Table job) =>
JobWorker
worker = (JobWorkerArgs -> ResourceT IO JobWorkerProcess) -> JobWorker
JobWorker (forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
Show (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job),
HasField "id" job (Id' (GetTableName job)),
PrimaryKey (GetTableName job) ~ UUID, HasField "runAt" job UTCTime,
HasField "attemptsCount" job Int, Job job, Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop @job)
jobWorkerFetchAndRunLoop :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FromRowHasql job
, Show (PrimaryKey (GetTableName job))
, KnownSymbol (GetTableName job)
, HasField "id" job (Id' (GetTableName job))
, PrimaryKey (GetTableName job) ~ UUID
, HasField "runAt" job UTCTime
, HasField "attemptsCount" job Int
, Job job
, Show job
, Table job
) => JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
Show (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job),
HasField "id" job (Id' (GetTableName job)),
PrimaryKey (GetTableName job) ~ UUID, HasField "runAt" job UTCTime,
HasField "attemptsCount" job Int, Job job, Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { UUID
PGListener
ModelContext
FrameworkConfig
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
pgListener :: JobWorkerArgs -> PGListener
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
modelContext :: JobWorkerArgs -> ModelContext
workerId :: JobWorkerArgs -> UUID
.. } = do
let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
let ?modelContext = ?modelContext::ModelContext
ModelContext
modelContext
let pool :: Pool
pool = ModelContext
modelContext.hasqlPool
action <- IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage)
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage))
-> IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage)
forall a. STM a -> IO a
atomically (STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage))
-> STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue JobWorkerProcessMessage)
forall a. Natural -> STM (TBQueue a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (forall job. Job job => Int
maxConcurrency @job))
liftIO $ atomically $ writeTBQueue action JobAvailable
activeCount <- liftIO $ newTVarIO (0 :: Int)
activeWorkers <- liftIO $ newTVarIO ([] :: [Async ()])
let runJobLoop = do
fetchResult <- IO (Maybe job) -> IO (Either SomeException (Maybe job))
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> m (Either SomeException a)
Exception.tryAny (forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
Show (PrimaryKey (GetTableName job)), Table job) =>
Pool -> UUID -> IO (Maybe job)
Queue.fetchNextJob @job Pool
pool UUID
workerId)
case fetchResult of
Left SomeException
exception -> do
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (Text
"Job worker: Failed to fetch next job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception)
Int -> IO ()
Concurrent.threadDelay Int
1000000
IO ()
runJobLoop
Right (Just job
job) -> do
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> job -> Text
forall a. Show a => a -> Text
tshow job
job)
let ?job = job
?job::job
job
let Int
timeout :: Int = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (-Int
1) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job)
resultOrException <- IO (Maybe ()) -> IO (Either SomeException (Maybe ()))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Exception.tryAsync (Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
Timeout.timeout Int
timeout (job -> IO ()
forall job.
(Job job, ?modelContext::ModelContext,
?context::FrameworkConfig) =>
job -> IO ()
perform job
job))
case resultOrException of
Left SomeException
exception -> do
Pool -> job -> SomeException -> IO ()
forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
PrimaryKey (GetTableName job) ~ UUID,
HasField "attemptsCount" job Int, HasField "runAt" job UTCTime,
Job job, ?context::context, HasField "logger" context Logger) =>
Pool -> job -> SomeException -> IO ()
Queue.jobDidFail Pool
pool job
job SomeException
exception
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SomeException -> Bool
forall e. Exception e => e -> Bool
Exception.isAsyncException SomeException
exception) (SomeException -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
Exception.throwIO SomeException
exception)
Right Maybe ()
Nothing -> Pool -> job -> IO ()
forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
PrimaryKey (GetTableName job) ~ UUID,
HasField "attemptsCount" job Int, HasField "runAt" job UTCTime,
Job job, ?context::context, HasField "logger" context Logger) =>
Pool -> job -> IO ()
Queue.jobDidTimeout Pool
pool job
job
Right (Just ()
_) -> Pool -> job -> IO ()
forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
PrimaryKey (GetTableName job) ~ UUID, ?context::context,
HasField "logger" context Logger) =>
Pool -> job -> IO ()
Queue.jobDidSucceed Pool
pool job
job
runJobLoop
Right Maybe job
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
let dispatcherLoop = do
msg <- STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a. STM a -> IO a
atomically (STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage)
-> STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage -> STM JobWorkerProcessMessage
forall a. TBQueue a -> STM a
readTBQueue TBQueue JobWorkerProcessMessage
action
case msg of
JobWorkerProcessMessage
Stop -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
count <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeCount
check (count == 0)
JobWorkerProcessMessage
JobAvailable -> do
acquired <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
count <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeCount
if count < maxConcurrency @job
then do
writeTVar activeCount (count + 1)
pure True
else pure False
when acquired do
selfVar <- Concurrent.newEmptyMVar
workerAsync <- async $
(do self <- Concurrent.readMVar selfVar
runJobLoop)
`Exception.finally`
(do maybeSelf <- Concurrent.tryReadMVar selfVar
atomically do
modifyTVar' activeCount (subtract 1)
case maybeSelf of
Just Async ()
self -> TVar [Async ()] -> ([Async ()] -> [Async ()]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar [Async ()]
activeWorkers ((Async () -> Bool) -> [Async ()] -> [Async ()]
forall a. (a -> Bool) -> [a] -> [a]
filter (Async () -> Async () -> Bool
forall a. Eq a => a -> a -> Bool
/= Async ()
self))
Maybe (Async ())
Nothing -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Concurrent.putMVar selfVar workerAsync
atomically $ modifyTVar' activeWorkers (workerAsync :)
dispatcherLoop
let cancelAllWorkers = do
workers <- TVar [Async ()] -> IO [Async ()]
forall a. TVar a -> IO a
readTVarIO TVar [Async ()]
activeWorkers
mapM_ Async.cancel workers
dispatcher <- allocate (async (dispatcherLoop `Exception.finally` cancelAllWorkers)) cancel
let enablePollerTriggerRepair = FrameworkConfig
frameworkConfig.environment Environment -> Environment -> Bool
forall a. Eq a => a -> a -> Bool
== Environment
Environment.Development
(subscription, pollerReleaseKey) <- Queue.watchForJobWithPollerTriggerRepair enablePollerTriggerRepair pool pgListener (tableName @job) (queuePollInterval @job) action
staleRecoveryReleaseKey <- case staleJobTimeout @job of
Just NominalDiffTime
threshold -> do
let intervalMicroseconds :: Int
intervalMicroseconds = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime
threshold NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Fractional a => a -> a -> a
/ NominalDiffTime
2) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000
let recoveryLoop :: IO (ZonkAny 0)
recoveryLoop = IO () -> IO (ZonkAny 0)
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
forall job. Table job => Pool -> NominalDiffTime -> IO ()
Queue.recoverStaleJobs @job Pool
pool NominalDiffTime
threshold
_ <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM Bool
forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
JobAvailable
pure ()
Concurrent.threadDelay intervalMicroseconds
(key, _) <- IO (Async (ZonkAny 0))
-> (Async (ZonkAny 0) -> IO ())
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (IO (ZonkAny 0) -> IO (Async (ZonkAny 0))
forall a. IO a -> IO (Async a)
Async.async IO (ZonkAny 0)
recoveryLoop) Async (ZonkAny 0) -> IO ()
forall a. Async a -> IO ()
Async.cancel
pure (Just key)
Maybe NominalDiffTime
Nothing -> Maybe ReleaseKey -> ResourceT IO (Maybe ReleaseKey)
forall a. a -> ResourceT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReleaseKey
forall a. Maybe a
Nothing
pure JobWorkerProcess { dispatcher, subscription, pollerReleaseKey, action, staleRecoveryReleaseKey, activeCount }