{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Runner.WorkerLoop
( worker
, jobWorkerFetchAndRunLoop
) where

import IHP.Prelude
import IHP.ControllerPrelude
import qualified IHP.Environment as Environment
import qualified IHP.Job.Queue as Queue
import qualified Control.Exception.Safe as Exception
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Timeout as Timeout
import Control.Monad.Trans.Resource
import System.Log.FastLogger (toLogStr)
import IHP.Hasql.FromRow (FromRowHasql)
import Control.Concurrent.STM (atomically, newTBQueue, readTBQueue, writeTBQueue, newTVarIO, readTVar, readTVarIO, writeTVar, modifyTVar', check)
import IHP.Job.Queue (tryWriteTBQueue)

worker :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FromRowHasql job
    , Show (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    , HasField "id" job (Id' (GetTableName job))
    , PrimaryKey (GetTableName job) ~ UUID
    , HasField "runAt" job UTCTime
    , HasField "attemptsCount" job Int
    , Job job
    , Show job
    , Table job
    ) => JobWorker
worker :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job),
 HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID, HasField "runAt" job UTCTime,
 HasField "attemptsCount" job Int, Job job, Show job, Table job) =>
JobWorker
worker = (JobWorkerArgs -> ResourceT IO JobWorkerProcess) -> JobWorker
JobWorker (forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job),
 HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID, HasField "runAt" job UTCTime,
 HasField "attemptsCount" job Int, Job job, Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop @job)


jobWorkerFetchAndRunLoop :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FromRowHasql job
    , Show (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    , HasField "id" job (Id' (GetTableName job))
    , PrimaryKey (GetTableName job) ~ UUID
    , HasField "runAt" job UTCTime
    , HasField "attemptsCount" job Int
    , Job job
    , Show job
    , Table job
    ) => JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job),
 HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID, HasField "runAt" job UTCTime,
 HasField "attemptsCount" job Int, Job job, Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { UUID
PGListener
ModelContext
FrameworkConfig
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
pgListener :: JobWorkerArgs -> PGListener
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
modelContext :: JobWorkerArgs -> ModelContext
workerId :: JobWorkerArgs -> UUID
.. } = do
    let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
    let ?modelContext = ?modelContext::ModelContext
ModelContext
modelContext
    let pool :: Pool
pool = ModelContext
modelContext.hasqlPool
    action <- IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage)
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue JobWorkerProcessMessage)
 -> ResourceT IO (TBQueue JobWorkerProcessMessage))
-> IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage)
forall a. STM a -> IO a
atomically (STM (TBQueue JobWorkerProcessMessage)
 -> IO (TBQueue JobWorkerProcessMessage))
-> STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue JobWorkerProcessMessage)
forall a. Natural -> STM (TBQueue a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (forall job. Job job => Int
maxConcurrency @job))
    -- Seed the queue with one initial JobAvailable so the dispatcher attempts a fetch on startup
    liftIO $ atomically $ writeTBQueue action JobAvailable

    activeCount <- liftIO $ newTVarIO (0 :: Int)
    activeWorkers <- liftIO $ newTVarIO ([] :: [Async ()])
    isStopping <- liftIO $ newTVarIO False

    let runJobLoop = do
            stopping <- TVar Bool -> IO Bool
forall a. TVar a -> IO a
readTVarIO TVar Bool
isStopping
            unless stopping do
                fetchResult <- Exception.tryAny (Queue.fetchNextJob @job pool workerId)
                case fetchResult of
                    Left SomeException
exception -> do
                        ?context::FrameworkConfig
FrameworkConfig
?context.logger (Text -> LogStr
forall msg. ToLogStr msg => msg -> LogStr
toLogStr (Text
"Job worker: Failed to fetch next job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception))
                        Int -> IO ()
Concurrent.threadDelay Int
1000000  -- 1s backoff to avoid tight error loops
                        IO ()
runJobLoop -- retry after transient error
                    Right (Just job
job) -> do
                        ?context::FrameworkConfig
FrameworkConfig
?context.logger (Text -> LogStr
forall msg. ToLogStr msg => msg -> LogStr
toLogStr (Text
"Starting job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> job -> Text
forall a. Show a => a -> Text
tshow job
job))

                        let ?job = job
?job::job
job
                        let Int
timeout :: Int = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (-Int
1) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job)
                        resultOrException <- IO (Maybe ()) -> IO (Either SomeException (Maybe ()))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Exception.tryAsync (Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
Timeout.timeout Int
timeout (job -> IO ()
forall job.
(Job job, ?modelContext::ModelContext,
 ?context::FrameworkConfig) =>
job -> IO ()
perform job
job))
                        case resultOrException of
                            Left SomeException
exception -> do
                                Pool -> job -> SomeException -> IO ()
forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID,
 HasField "attemptsCount" job Int, HasField "runAt" job UTCTime,
 Job job, ?context::context,
 HasField "logger" context (LogStr -> IO ())) =>
Pool -> job -> SomeException -> IO ()
Queue.jobDidFail Pool
pool job
job SomeException
exception
                                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SomeException -> Bool
forall e. Exception e => e -> Bool
Exception.isAsyncException SomeException
exception) (SomeException -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
Exception.throwIO SomeException
exception)
                            Right Maybe ()
Nothing -> Pool -> job -> IO ()
forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID,
 HasField "attemptsCount" job Int, HasField "runAt" job UTCTime,
 Job job, ?context::context,
 HasField "logger" context (LogStr -> IO ())) =>
Pool -> job -> IO ()
Queue.jobDidTimeout Pool
pool job
job
                            Right (Just ()
_) -> Pool -> job -> IO ()
forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID, ?context::context,
 HasField "logger" context (LogStr -> IO ())) =>
Pool -> job -> IO ()
Queue.jobDidSucceed Pool
pool job
job

                        runJobLoop -- try next job immediately
                    Right Maybe job
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    let waitForActiveWorkers = STM () -> IO ()
forall a. STM a -> IO a
atomically do
            count <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeCount
            check (count == 0)

    let dispatcherLoop = do
            stopping <- TVar Bool -> IO Bool
forall a. TVar a -> IO a
readTVarIO TVar Bool
isStopping
            if stopping
                then waitForActiveWorkers
                else do
                    msg <- atomically $ readTBQueue action
                    case msg of
                        JobWorkerProcessMessage
Stop -> do
                            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
isStopping Bool
True
                            IO ()
waitForActiveWorkers
                        JobWorkerProcessMessage
JobAvailable -> do
                            acquired <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically do
                                stopping <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
isStopping
                                count <- readTVar activeCount
                                if not stopping && count < maxConcurrency @job
                                    then do
                                        writeTVar activeCount (count + 1)
                                        pure True
                                    else pure False
                            when acquired do
                                selfVar <- Concurrent.newEmptyMVar
                                workerAsync <- async $
                                    (do self <- Concurrent.readMVar selfVar
                                        runJobLoop)
                                    `Exception.finally`
                                        (do maybeSelf <- Concurrent.tryReadMVar selfVar
                                            atomically do
                                                modifyTVar' activeCount (subtract 1)
                                                case maybeSelf of
                                                    Just Async ()
self -> TVar [Async ()] -> ([Async ()] -> [Async ()]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar [Async ()]
activeWorkers ((Async () -> Bool) -> [Async ()] -> [Async ()]
forall a. (a -> Bool) -> [a] -> [a]
filter (Async () -> Async () -> Bool
forall a. Eq a => a -> a -> Bool
/= Async ()
self))
                                                    Maybe (Async ())
Nothing -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
                                Concurrent.putMVar selfVar workerAsync
                                atomically $ modifyTVar' activeWorkers (workerAsync :)
                            dispatcherLoop

    let cancelAllWorkers = do
            workers <- TVar [Async ()] -> IO [Async ()]
forall a. TVar a -> IO a
readTVarIO TVar [Async ()]
activeWorkers
            mapM_ Async.cancel workers

    dispatcher <- allocate (async (dispatcherLoop `Exception.finally` cancelAllWorkers)) cancel

    let enablePollerTriggerRepair = FrameworkConfig
frameworkConfig.environment Environment -> Environment -> Bool
forall a. Eq a => a -> a -> Bool
== Environment
Environment.Development
    (subscription, pollerReleaseKey) <- Queue.watchForJobWithPollerTriggerRepair enablePollerTriggerRepair pool pgListener (tableName @job) (queuePollInterval @job) action

    -- Start stale job recovery if configured
    staleRecoveryReleaseKey <- case staleJobTimeout @job of
        Just NominalDiffTime
threshold -> do
            let intervalMicroseconds :: Int
intervalMicroseconds = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime
threshold NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Fractional a => a -> a -> a
/ NominalDiffTime
2) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000
            let recoveryLoop :: IO (ZonkAny 0)
recoveryLoop = IO () -> IO (ZonkAny 0)
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
                    forall job. Table job => Pool -> NominalDiffTime -> IO ()
Queue.recoverStaleJobs @job Pool
pool NominalDiffTime
threshold
                    -- Signal workers to check for recovered jobs
                    _ <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM Bool
forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
JobAvailable
                    pure ()
                    Concurrent.threadDelay intervalMicroseconds
            (key, _) <- IO (Async (ZonkAny 0))
-> (Async (ZonkAny 0) -> IO ())
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (IO (ZonkAny 0) -> IO (Async (ZonkAny 0))
forall a. IO a -> IO (Async a)
Async.async IO (ZonkAny 0)
recoveryLoop) Async (ZonkAny 0) -> IO ()
forall a. Async a -> IO ()
Async.cancel
            pure (Just key)
        Maybe NominalDiffTime
Nothing -> Maybe ReleaseKey -> ResourceT IO (Maybe ReleaseKey)
forall a. a -> ResourceT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReleaseKey
forall a. Maybe a
Nothing

    pure JobWorkerProcess { dispatcher, subscription, pollerReleaseKey, action, staleRecoveryReleaseKey, activeCount, isStopping }