{-# LANGUAGE AllowAmbiguousTypes #-}
{-|
Module: IHP.Job.Runner
Description: Functions to run jobs
Copyright: (c) digitally induced GmbH, 2020
-}
module IHP.Job.Runner where

import IHP.Prelude
import IHP.ControllerPrelude
import IHP.ScriptSupport
import qualified IHP.Job.Queue as Queue
import qualified Control.Exception.Safe as Exception
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified System.Timeout as Timeout
import qualified IHP.PGListener as PGListener
import Control.Monad.Trans.Resource
import qualified IHP.Log as Log
import IHP.Hasql.FromRow (FromRowHasql)
import Control.Concurrent.STM (atomically, newTBQueue, readTBQueue, writeTBQueue, newTVarIO, readTVar, readTVarIO, writeTVar, modifyTVar', check)
import IHP.Job.Queue (tryWriteTBQueue)

-- | Used by the RunJobs binary
runJobWorkers :: [JobWorker] -> Script
runJobWorkers :: [JobWorker] -> Script
runJobWorkers [JobWorker]
jobWorkers = (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers

-- | This job worker main loop is used when the job workers are running as part of their own binary
--
-- In dev mode the IHP dev server is using the 'devServerMainLoop' instead. We have two main loops
-- as the stop handling works a different in those cases.
--
dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO ()
dedicatedProcessMainLoop :: (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers = do
    threadId <- IO ThreadId
Concurrent.myThreadId
    exitSignalsCount <- newIORef 0
    workerId <- UUID.nextRandom
    let logger = ?context::FrameworkConfig
FrameworkConfig
?context.logger

    Log.info ("Starting worker " <> tshow workerId)

    -- The job workers use their own dedicated PG listener as e.g. AutoRefresh or DataSync
    -- could overload the main PGListener connection. In that case we still want jobs to be
    -- run independent of the system being very busy.
    PGListener.withPGListener ?context.databaseUrl ?context.logger \PGListener
pgListener -> do
        stopSignal <- IO (MVar (ZonkAny 1))
forall a. IO (MVar a)
Concurrent.newEmptyMVar

        runResourceT do
            waitForExitSignal <- liftIO installSignalHandlers

            let jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }

            processes <- jobWorkers
                |> mapM (\(JobWorker JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)

            liftIO waitForExitSignal

            liftIO $ Log.info ("Waiting for jobs to complete. CTRL+C again to force exit" :: Text)

            -- Stop subscriptions and poller already
            -- This will stop all producers for the queue
            liftIO $ forEach processes \JobWorkerProcess { ReleaseKey
pollerReleaseKey :: ReleaseKey
pollerReleaseKey :: JobWorkerProcess -> ReleaseKey
pollerReleaseKey, Subscription
subscription :: Subscription
subscription :: JobWorkerProcess -> Subscription
subscription, TBQueue JobWorkerProcessMessage
action :: TBQueue JobWorkerProcessMessage
action :: JobWorkerProcess -> TBQueue JobWorkerProcessMessage
action, Maybe ReleaseKey
staleRecoveryReleaseKey :: Maybe ReleaseKey
staleRecoveryReleaseKey :: JobWorkerProcess -> Maybe ReleaseKey
staleRecoveryReleaseKey } -> do
                Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener
                ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
pollerReleaseKey
                case Maybe ReleaseKey
staleRecoveryReleaseKey of
                    Just ReleaseKey
key -> ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
key
                    Maybe ReleaseKey
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                -- Single Stop for the dispatcher (it waits for active workers internally)
                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop

            liftIO $ PGListener.stop pgListener

            -- While waiting for all jobs to complete, we also wait for another exit signal
            -- If the user sends two exit signals, we just kill all processes
            liftIO $ async do
                waitForExitSignal

                Log.info ("Canceling all running jobs. CTRL+C again to force exit" :: Text)

                forEach processes \JobWorkerProcess { dispatcher :: JobWorkerProcess -> (ReleaseKey, Async ())
dispatcher = (ReleaseKey
dispatcherKey, Async ()
_) } -> do
                    ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
dispatcherKey  -- cancels dispatcher, whose finally cancels all workers

                Concurrent.throwTo threadId Exit.ExitSuccess

                pure ()

            -- Wait for dispatchers (which wait for their workers before exiting)
            liftIO $ forEach processes \JobWorkerProcess { dispatcher :: JobWorkerProcess -> (ReleaseKey, Async ())
dispatcher = (ReleaseKey
_, Async ()
dispatcherAsync) } -> do
                Async () -> IO ()
forall a. Async a -> IO a
Async.wait Async ()
dispatcherAsync

            liftIO $ Concurrent.throwTo threadId Exit.ExitSuccess

devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop :: (?modelContext::ModelContext) =>
FrameworkConfig -> PGListener -> [JobWorker] -> IO ()
devServerMainLoop FrameworkConfig
frameworkConfig PGListener
pgListener [JobWorker]
jobWorkers = do
    workerId <- IO UUID
UUID.nextRandom
    let ?context = frameworkConfig
    let logger = FrameworkConfig
frameworkConfig.logger

    Log.info ("Starting worker " <> tshow workerId)

    runResourceT do
        let jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }

        processes <- jobWorkers
                |> mapM (\(JobWorker JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun) -> JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)

        liftIO $ (forever (Concurrent.threadDelay maxBound)) `Exception.finally` do
            forEach processes \JobWorkerProcess { TBQueue JobWorkerProcessMessage
action :: JobWorkerProcess -> TBQueue JobWorkerProcessMessage
action :: TBQueue JobWorkerProcessMessage
action } -> do
                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop

-- | Installs signals handlers and returns an IO action that blocks until the next sigINT or sigTERM is sent
installSignalHandlers :: IO (IO ())
installSignalHandlers :: IO (IO ())
installSignalHandlers = do
    exitSignal <- IO (MVar ())
forall a. IO (MVar a)
Concurrent.newEmptyMVar

    let catchHandler = MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar ()
exitSignal ()

    Signals.installHandler Signals.sigINT (Signals.Catch catchHandler) Nothing
    Signals.installHandler Signals.sigTERM (Signals.Catch catchHandler) Nothing

    pure (Concurrent.takeMVar exitSignal)

stopExitHandler :: JobWorkerArgs -> p -> p
stopExitHandler JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } p
main = p
main

worker :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FromRowHasql job
    , Show (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    , SetField "attemptsCount" job Int
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , SetField "runAt" job UTCTime
    , HasField "runAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , Table job
    ) => JobWorker
worker :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 SetField "runAt" job UTCTime, HasField "runAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, Table job) =>
JobWorker
worker = (JobWorkerArgs -> ResourceT IO JobWorkerProcess) -> JobWorker
JobWorker (forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 SetField "runAt" job UTCTime, HasField "runAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop @job)


jobWorkerFetchAndRunLoop :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FromRowHasql job
    , Show (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    , SetField "attemptsCount" job Int
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , SetField "runAt" job UTCTime
    , HasField "runAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , Table job
    ) => JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 SetField "runAt" job UTCTime, HasField "runAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } = do
    let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
    let ?modelContext = ?modelContext::ModelContext
ModelContext
modelContext
    action <- IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage)
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue JobWorkerProcessMessage)
 -> ResourceT IO (TBQueue JobWorkerProcessMessage))
-> IO (TBQueue JobWorkerProcessMessage)
-> ResourceT IO (TBQueue JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage)
forall a. STM a -> IO a
atomically (STM (TBQueue JobWorkerProcessMessage)
 -> IO (TBQueue JobWorkerProcessMessage))
-> STM (TBQueue JobWorkerProcessMessage)
-> IO (TBQueue JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue JobWorkerProcessMessage)
forall a. Natural -> STM (TBQueue a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (forall job. Job job => Int
maxConcurrency @job))
    -- Seed the queue with one initial JobAvailable so the dispatcher attempts a fetch on startup
    liftIO $ atomically $ writeTBQueue action JobAvailable

    activeCount <- liftIO $ newTVarIO (0 :: Int)
    activeWorkers <- liftIO $ newTVarIO ([] :: [Async ()])

    let runJobLoop = do
            fetchResult <- IO (Maybe job) -> IO (Either SomeException (Maybe job))
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> m (Either SomeException a)
Exception.tryAny (forall job.
(?modelContext::ModelContext,
 job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)), Table job) =>
UUID -> IO (Maybe job)
Queue.fetchNextJob @job UUID
workerId)
            case fetchResult of
                Left SomeException
exception -> do
                    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (Text
"Job worker: Failed to fetch next job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception)
                    Int -> IO ()
Concurrent.threadDelay Int
1000000  -- 1s backoff to avoid tight error loops
                Right (Just job
job) -> do
                    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> job -> Text
forall a. Show a => a -> Text
tshow job
job)

                    let ?job = job
?job::job
job
                    let Int
timeout :: Int = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (-Int
1) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job)
                    resultOrException <- IO (Maybe ()) -> IO (Either SomeException (Maybe ()))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Exception.tryAsync (Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
Timeout.timeout Int
timeout (job -> IO ()
forall job.
(Job job, ?modelContext::ModelContext,
 ?context::FrameworkConfig) =>
job -> IO ()
perform job
job))
                    case resultOrException of
                        Left SomeException
exception -> do
                            job -> SomeException -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 SetField "runAt" job UTCTime, HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> SomeException -> IO ()
Queue.jobDidFail job
job SomeException
exception
                            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SomeException -> Bool
forall e. Exception e => e -> Bool
Exception.isAsyncException SomeException
exception) (SomeException -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
Exception.throwIO SomeException
exception)
                        Right Maybe ()
Nothing -> job -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 SetField "runAt" job UTCTime, HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> IO ()
Queue.jobDidTimeout job
job
                        Right (Just ()
_) -> job -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> IO ()
Queue.jobDidSucceed job
job

                    runJobLoop -- try next job immediately
                Right Maybe job
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    let dispatcherLoop = do
            msg <- STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a. STM a -> IO a
atomically (STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage)
-> STM JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage -> STM JobWorkerProcessMessage
forall a. TBQueue a -> STM a
readTBQueue TBQueue JobWorkerProcessMessage
action
            case msg of
                JobWorkerProcessMessage
Stop -> do
                    -- Wait for all active workers to finish
                    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                        count <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeCount
                        check (count == 0)
                JobWorkerProcessMessage
JobAvailable -> do
                    acquired <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
                        count <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
activeCount
                        if count < maxConcurrency @job
                            then do
                                writeTVar activeCount (count + 1)
                                pure True
                            else pure False
                    when acquired do
                        selfVar <- Concurrent.newEmptyMVar
                        workerAsync <- async $
                            (do self <- Concurrent.readMVar selfVar
                                runJobLoop)
                            `Exception.finally`
                                (do maybeSelf <- Concurrent.tryReadMVar selfVar
                                    atomically do
                                        modifyTVar' activeCount (subtract 1)
                                        case maybeSelf of
                                            Just Async ()
self -> TVar [Async ()] -> ([Async ()] -> [Async ()]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar [Async ()]
activeWorkers ((Async () -> Bool) -> [Async ()] -> [Async ()]
forall a. (a -> Bool) -> [a] -> [a]
filter (Async () -> Async () -> Bool
forall a. Eq a => a -> a -> Bool
/= Async ()
self))
                                            Maybe (Async ())
Nothing -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
                        Concurrent.putMVar selfVar workerAsync
                        atomically $ modifyTVar' activeWorkers (workerAsync :)
                    dispatcherLoop

    let cancelAllWorkers = do
            workers <- TVar [Async ()] -> IO [Async ()]
forall a. TVar a -> IO a
readTVarIO TVar [Async ()]
activeWorkers
            mapM_ Async.cancel workers

    dispatcher <- allocate (async (dispatcherLoop `Exception.finally` cancelAllWorkers)) cancel

    (subscription, pollerReleaseKey) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) action

    -- Start stale job recovery if configured
    staleRecoveryReleaseKey <- case staleJobTimeout @job of
        Just NominalDiffTime
threshold -> do
            let intervalMicroseconds :: Int
intervalMicroseconds = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime
threshold NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Fractional a => a -> a -> a
/ NominalDiffTime
2) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000
            let recoveryLoop :: IO (ZonkAny 0)
recoveryLoop = IO () -> IO (ZonkAny 0)
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
                    forall job.
(?modelContext::ModelContext, Table job) =>
NominalDiffTime -> IO ()
Queue.recoverStaleJobs @job NominalDiffTime
threshold
                    -- Signal workers to check for recovered jobs
                    _ <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM Bool
forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
JobAvailable
                    pure ()
                    Concurrent.threadDelay intervalMicroseconds
            (key, _) <- IO (Async (ZonkAny 0))
-> (Async (ZonkAny 0) -> IO ())
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (IO (ZonkAny 0) -> IO (Async (ZonkAny 0))
forall a. IO a -> IO (Async a)
Async.async IO (ZonkAny 0)
recoveryLoop) Async (ZonkAny 0) -> IO ()
forall a. Async a -> IO ()
Async.cancel
            pure (Just key)
        Maybe NominalDiffTime
Nothing -> Maybe ReleaseKey -> ResourceT IO (Maybe ReleaseKey)
forall a. a -> ResourceT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReleaseKey
forall a. Maybe a
Nothing

    pure JobWorkerProcess { dispatcher, subscription, pollerReleaseKey, action, staleRecoveryReleaseKey, activeCount }