{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Runner.WorkerLoop
( worker
, jobWorkerFetchAndRunLoop
) where

import IHP.Prelude
import IHP.ControllerPrelude
import qualified IHP.Environment as Environment
import qualified IHP.Job.Queue as Queue
import qualified Control.Exception.Safe as Exception
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Timeout as Timeout
import Control.Monad.Trans.Resource
import qualified IHP.Log as Log
import IHP.Hasql.FromRow (FromRowHasql)
import Control.Concurrent.STM (atomically, newTBQueue, readTBQueue, writeTBQueue, newTVarIO, readTVar, readTVarIO, writeTVar, modifyTVar', check)
import IHP.Job.Queue (tryWriteTBQueue)

worker :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FromRowHasql job
    , Show (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    , HasField "id" job (Id' (GetTableName job))
    , PrimaryKey (GetTableName job) ~ UUID
    , HasField "runAt" job UTCTime
    , HasField "attemptsCount" job Int
    , Job job
    , Show job
    , Table job
    ) => JobWorker
worker :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job),
 HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID, HasField "runAt" job UTCTime,
 HasField "attemptsCount" job Int, Job job, Show job, Table job) =>
JobWorker
worker = (JobWorkerArgs -> ResourceT IO JobWorkerProcess) -> JobWorker
JobWorker (forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job),
 HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID, HasField "runAt" job UTCTime,
 HasField "attemptsCount" job Int, Job job, Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop @job)


jobWorkerFetchAndRunLoop :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FromRowHasql job
    , Show (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    , HasField "id" job (Id' (GetTableName job))
    , PrimaryKey (GetTableName job) ~ UUID
    , HasField "runAt" job UTCTime
    , HasField "attemptsCount" job Int
    , Job job
    , Show job
    , Table job
    ) => JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job),
 HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID, HasField "runAt" job UTCTime,
 HasField "attemptsCount" job Int, Job job, Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { UUID
PGListener
ModelContext
FrameworkConfig
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
pgListener :: JobWorkerArgs -> PGListener
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
modelContext :: JobWorkerArgs -> ModelContext
workerId :: JobWorkerArgs -> UUID
.. } = do
    let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
    let ?modelContext = ?modelContext::ModelContext
ModelContext
modelContext
    let pool :: Pool
pool = ModelContext
modelContext.hasqlPool
    action <- IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage)
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue JobWorkerProcessMessage)
 -> ResourceT IO (TBQueue JobWorkerProcessMessage))
-> IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage)
forall a. STM a -> IO a
atomically (STM (TBQueue JobWorkerProcessMessage)
 -> IO (TBQueue JobWorkerProcessMessage))
-> STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue JobWorkerProcessMessage)
forall a. Natural -> STM (TBQueue a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (forall job. Job job => Int
maxConcurrency @job))
    -- Seed the queue with one initial JobAvailable so the dispatcher attempts a fetch on startup
    liftIO $ atomically $ writeTBQueue action JobAvailable

    activeCount <- liftIO $ newTVarIO (0 :: Int)
    activeWorkers <- liftIO $ newTVarIO ([] :: [Async ()])

    let runJobLoop = do
            fetchResult <- IO (Maybe job) -> IO (Either SomeException (Maybe job))
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> m (Either SomeException a)
Exception.tryAny (forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)), Table job) =>
Pool -> UUID -> IO (Maybe job)
Queue.fetchNextJob @job Pool
pool UUID
workerId)
            case fetchResult of
                Left SomeException
exception -> do
                    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (Text
"Job worker: Failed to fetch next job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception)
                    Int -> IO ()
Concurrent.threadDelay Int
1000000  -- 1s backoff to avoid tight error loops
                    IO ()
runJobLoop -- retry after transient error
                Right (Just job
job) -> do
                    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> job -> Text
forall a. Show a => a -> Text
tshow job
job)

                    let ?job = job
?job::job
job
                    let Int
timeout :: Int = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (-Int
1) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job)
                    resultOrException <- IO (Maybe ()) -> IO (Either SomeException (Maybe ()))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Exception.tryAsync (Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
Timeout.timeout Int
timeout (job -> IO ()
forall job.
(Job job, ?modelContext::ModelContext,
 ?context::FrameworkConfig) =>
job -> IO ()
perform job
job))
                    case resultOrException of
                        Left SomeException
exception -> do
                            Pool -> job -> SomeException -> IO ()
forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID,
 HasField "attemptsCount" job Int, HasField "runAt" job UTCTime,
 Job job, ?context::context, HasField "logger" context Logger) =>
Pool -> job -> SomeException -> IO ()
Queue.jobDidFail Pool
pool job
job SomeException
exception
                            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SomeException -> Bool
forall e. Exception e => e -> Bool
Exception.isAsyncException SomeException
exception) (SomeException -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
Exception.throwIO SomeException
exception)
                        Right Maybe ()
Nothing -> Pool -> job -> IO ()
forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID,
 HasField "attemptsCount" job Int, HasField "runAt" job UTCTime,
 Job job, ?context::context, HasField "logger" context Logger) =>
Pool -> job -> IO ()
Queue.jobDidTimeout Pool
pool job
job
                        Right (Just ()
_) -> Pool -> job -> IO ()
forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID, ?context::context,
 HasField "logger" context Logger) =>
Pool -> job -> IO ()
Queue.jobDidSucceed Pool
pool job
job

                    runJobLoop -- try next job immediately
                Right Maybe job
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    let dispatcherLoop = do
            msg <- STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a. STM a -> IO a
atomically (STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage)
-> STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage -> STM JobWorkerProcessMessage
forall a. TBQueue a -> STM a
readTBQueue TBQueue JobWorkerProcessMessage
action
            case msg of
                JobWorkerProcessMessage
Stop -> do
                    -- Wait for all active workers to finish
                    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                        count <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeCount
                        check (count == 0)
                JobWorkerProcessMessage
JobAvailable -> do
                    acquired <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
                        count <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeCount
                        if count < maxConcurrency @job
                            then do
                                writeTVar activeCount (count + 1)
                                pure True
                            else pure False
                    when acquired do
                        selfVar <- Concurrent.newEmptyMVar
                        workerAsync <- async $
                            (do self <- Concurrent.readMVar selfVar
                                runJobLoop)
                            `Exception.finally`
                                (do maybeSelf <- Concurrent.tryReadMVar selfVar
                                    atomically do
                                        modifyTVar' activeCount (subtract 1)
                                        case maybeSelf of
                                            Just Async ()
self -> TVar [Async ()] -> ([Async ()] -> [Async ()]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar [Async ()]
activeWorkers ((Async () -> Bool) -> [Async ()] -> [Async ()]
forall a. (a -> Bool) -> [a] -> [a]
filter (Async () -> Async () -> Bool
forall a. Eq a => a -> a -> Bool
/= Async ()
self))
                                            Maybe (Async ())
Nothing -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
                        Concurrent.putMVar selfVar workerAsync
                        atomically $ modifyTVar' activeWorkers (workerAsync :)
                    dispatcherLoop

    let cancelAllWorkers = do
            workers <- TVar [Async ()] -> IO [Async ()]
forall a. TVar a -> IO a
readTVarIO TVar [Async ()]
activeWorkers
            mapM_ Async.cancel workers

    dispatcher <- allocate (async (dispatcherLoop `Exception.finally` cancelAllWorkers)) cancel

    let enablePollerTriggerRepair = FrameworkConfig
frameworkConfig.environment Environment -> Environment -> Bool
forall a. Eq a => a -> a -> Bool
== Environment
Environment.Development
    (subscription, pollerReleaseKey) <- Queue.watchForJobWithPollerTriggerRepair enablePollerTriggerRepair pool pgListener (tableName @job) (queuePollInterval @job) action

    -- Start stale job recovery if configured
    staleRecoveryReleaseKey <- case staleJobTimeout @job of
        Just NominalDiffTime
threshold -> do
            let intervalMicroseconds :: Int
intervalMicroseconds = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime
threshold NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Fractional a => a -> a -> a
/ NominalDiffTime
2) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000
            let recoveryLoop :: IO (ZonkAny 0)
recoveryLoop = IO () -> IO (ZonkAny 0)
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
                    forall job. Table job => Pool -> NominalDiffTime -> IO ()
Queue.recoverStaleJobs @job Pool
pool NominalDiffTime
threshold
                    -- Signal workers to check for recovered jobs
                    _ <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM Bool
forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
JobAvailable
                    pure ()
                    Concurrent.threadDelay intervalMicroseconds
            (key, _) <- IO (Async (ZonkAny 0))
-> (Async (ZonkAny 0) -> IO ())
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (IO (ZonkAny 0) -> IO (Async (ZonkAny 0))
forall a. IO a -> IO (Async a)
Async.async IO (ZonkAny 0)
recoveryLoop) Async (ZonkAny 0) -> IO ()
forall a. Async a -> IO ()
Async.cancel
            pure (Just key)
        Maybe NominalDiffTime
Nothing -> Maybe ReleaseKey -> ResourceT IO (Maybe ReleaseKey)
forall a. a -> ResourceT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReleaseKey
forall a. Maybe a
Nothing

    pure JobWorkerProcess { dispatcher, subscription, pollerReleaseKey, action, staleRecoveryReleaseKey, activeCount }