{-# LANGUAGE AllowAmbiguousTypes #-}
{-|
Module: IHP.Job.Runner
Description: Functions to run jobs
Copyright: (c) digitally induced GmbH, 2020
-}
module IHP.Job.Runner where

import IHP.Prelude
import IHP.ControllerPrelude
import IHP.ScriptSupport
import qualified IHP.Job.Queue as Queue
import qualified Control.Exception.Safe as Exception
import qualified Database.PostgreSQL.Simple.FromField as PG
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified System.Timeout as Timeout
import qualified IHP.PGListener as PGListener
import Control.Monad.Trans.Resource
import qualified IHP.Log as Log

-- | Used by the RunJobs binary
runJobWorkers :: [JobWorker] -> Script
runJobWorkers :: [JobWorker] -> Script
runJobWorkers [JobWorker]
jobWorkers = (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers

-- | This job worker main loop is used when the job workers are running as part of their own binary
--
-- In dev mode the IHP dev server is using the 'devServerMainLoop' instead. We have two main loops
-- as the stop handling works a different in those cases.
--
dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO ()
dedicatedProcessMainLoop :: (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers = do
    ThreadId
threadId <- IO ThreadId
Concurrent.myThreadId
    IORef Integer
exitSignalsCount <- Integer -> IO (IORef Integer)
forall a. a -> IO (IORef a)
newIORef Integer
0
    UUID
workerId <- IO UUID
UUID.nextRandom
    let logger :: Logger
logger = ?context::FrameworkConfig
FrameworkConfig
?context.logger

    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting worker " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
forall a. Show a => a -> Text
tshow UUID
workerId)

    -- The job workers use their own dedicated PG listener as e.g. AutoRefresh or DataSync
    -- could overload the main PGListener connection. In that case we still want jobs to be
    -- run independent of the system being very busy.
    ModelContext -> (PGListener -> IO ()) -> IO ()
forall a. ModelContext -> (PGListener -> IO a) -> IO a
PGListener.withPGListener ?modelContext::ModelContext
ModelContext
?modelContext \PGListener
pgListener -> do
        MVar Any
stopSignal <- IO (MVar Any)
forall a. IO (MVar a)
Concurrent.newEmptyMVar

        ResourceT IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT do
            IO ()
waitForExitSignal <- IO (IO ()) -> ResourceT IO (IO ())
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (IO ())
installSignalHandlers

            let jobWorkerArgs :: JobWorkerArgs
jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }
            
            [JobWorkerProcess]
processes <- [JobWorker]
jobWorkers
                [JobWorker]
-> ([JobWorker] -> ResourceT IO [JobWorkerProcess])
-> ResourceT IO [JobWorkerProcess]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (JobWorker -> ResourceT IO JobWorkerProcess)
-> [JobWorker] -> ResourceT IO [JobWorkerProcess]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\(JobWorker JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)

            IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
waitForExitSignal

            IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ResourceT IO ()) -> IO () -> ResourceT IO ()
forall a b. (a -> b) -> a -> b
$ Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Waiting for jobs to complete. CTRL+C again to force exit" :: Text)

            -- Stop subscriptions and poller already
            -- This will stop all producers for the queue MVar
            IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ResourceT IO ()) -> IO () -> ResourceT IO ()
forall a b. (a -> b) -> a -> b
$ [JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { ReleaseKey
pollerReleaseKey :: ReleaseKey
pollerReleaseKey :: JobWorkerProcess -> ReleaseKey
pollerReleaseKey, Subscription
subscription :: Subscription
subscription :: JobWorkerProcess -> Subscription
subscription, MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action :: JobWorkerProcess -> MVar JobWorkerProcessMessage
action } -> do
                Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener
                ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
pollerReleaseKey
                MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop

            IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ResourceT IO ()) -> IO () -> ResourceT IO ()
forall a b. (a -> b) -> a -> b
$ PGListener -> IO ()
PGListener.stop PGListener
pgListener

            -- While waiting for all jobs to complete, we also wait for another exit signal
            -- If the user sends two exit signals, we just kill all processes
            IO (Async ()) -> ResourceT IO (Async ())
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> ResourceT IO (Async ()))
-> IO (Async ()) -> ResourceT IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async do
                IO ()
waitForExitSignal

                Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Canceling all running jobs. CTRL+C again to force exit" :: Text)
                
                [JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { [(ReleaseKey, Async ())]
runners :: [(ReleaseKey, Async ())]
runners :: JobWorkerProcess -> [(ReleaseKey, Async ())]
runners } -> do
                    [(ReleaseKey, Async ())]
-> (Element [(ReleaseKey, Async ())] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [(ReleaseKey, Async ())]
runners \(ReleaseKey
releaseKey, Async ()
_) -> ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
releaseKey

                ThreadId -> ExitCode -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Concurrent.throwTo ThreadId
threadId ExitCode
Exit.ExitSuccess

                () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

            -- Wait for all runners to complete
            IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ResourceT IO ()) -> IO () -> ResourceT IO ()
forall a b. (a -> b) -> a -> b
$ [JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { [(ReleaseKey, Async ())]
runners :: JobWorkerProcess -> [(ReleaseKey, Async ())]
runners :: [(ReleaseKey, Async ())]
runners } -> do
                [(ReleaseKey, Async ())]
-> (Element [(ReleaseKey, Async ())] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [(ReleaseKey, Async ())]
runners \(ReleaseKey
_, Async ()
async) -> Async () -> IO ()
forall a. Async a -> IO a
Async.wait Async ()
async

            IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ResourceT IO ()) -> IO () -> ResourceT IO ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> ExitCode -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Concurrent.throwTo ThreadId
threadId ExitCode
Exit.ExitSuccess

devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop :: (?modelContext::ModelContext) =>
FrameworkConfig -> PGListener -> [JobWorker] -> IO ()
devServerMainLoop FrameworkConfig
frameworkConfig PGListener
pgListener [JobWorker]
jobWorkers = do
    UUID
workerId <- IO UUID
UUID.nextRandom
    let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
    let logger :: Logger
logger = FrameworkConfig
frameworkConfig.logger

    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting worker " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
forall a. Show a => a -> Text
tshow UUID
workerId)
    
    ResourceT IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT do
        let jobWorkerArgs :: JobWorkerArgs
jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }
        
        [JobWorkerProcess]
processes <- [JobWorker]
jobWorkers
                [JobWorker]
-> ([JobWorker] -> ResourceT IO [JobWorkerProcess])
-> ResourceT IO [JobWorkerProcess]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (JobWorker -> ResourceT IO JobWorkerProcess)
-> [JobWorker] -> ResourceT IO [JobWorkerProcess]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\(JobWorker JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun) -> JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)

        IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ResourceT IO ()) -> IO () -> ResourceT IO ()
forall a b. (a -> b) -> a -> b
$ (IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
Concurrent.threadDelay Int
forall a. Bounded a => a
maxBound)) IO () -> IO () -> IO ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
`Exception.finally` do
            [JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { MVar JobWorkerProcessMessage
action :: JobWorkerProcess -> MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action } -> do
                MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop

-- | Installs signals handlers and returns an IO action that blocks until the next sigINT or sigTERM is sent
installSignalHandlers :: IO (IO ())
installSignalHandlers :: IO (IO ())
installSignalHandlers = do
    MVar ()
exitSignal <- IO (MVar ())
forall a. IO (MVar a)
Concurrent.newEmptyMVar

    let catchHandler :: IO ()
catchHandler = MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar ()
exitSignal ()
            
    Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigINT (IO () -> Handler
Signals.Catch IO ()
catchHandler) Maybe SignalSet
forall a. Maybe a
Nothing
    Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigTERM (IO () -> Handler
Signals.Catch IO ()
catchHandler) Maybe SignalSet
forall a. Maybe a
Nothing

    IO () -> IO (IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVar () -> IO ()
forall a. MVar a -> IO a
Concurrent.takeMVar MVar ()
exitSignal)

stopExitHandler :: JobWorkerArgs -> p -> p
stopExitHandler JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } p
main = p
main

worker :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FilterPrimaryKey (GetTableName job)
    , FromRow job
    , Show (PrimaryKey (GetTableName job))
    , PG.FromField (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    , SetField "attemptsCount" job Int
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , HasField "runAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , Table job
    ) => JobWorker
worker :: forall job.
(job ~ GetModelByTableName (GetTableName job),
 FilterPrimaryKey (GetTableName job), FromRow job,
 Show (PrimaryKey (GetTableName job)),
 FromField (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "runAt" job UTCTime, HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, Table job) =>
JobWorker
worker = (JobWorkerArgs -> ResourceT IO JobWorkerProcess) -> JobWorker
JobWorker (forall job.
(job ~ GetModelByTableName (GetTableName job),
 FilterPrimaryKey (GetTableName job), FromRow job,
 Show (PrimaryKey (GetTableName job)),
 FromField (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop @job)


jobWorkerFetchAndRunLoop :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FilterPrimaryKey (GetTableName job)
    , FromRow job
    , Show (PrimaryKey (GetTableName job))
    , PG.FromField (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    , SetField "attemptsCount" job Int
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , Table job
    ) => JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop :: forall job.
(job ~ GetModelByTableName (GetTableName job),
 FilterPrimaryKey (GetTableName job), FromRow job,
 Show (PrimaryKey (GetTableName job)),
 FromField (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, Table job) =>
JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } = do
    let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
    let ?modelContext = ?modelContext::ModelContext
ModelContext
modelContext
    MVar JobWorkerProcessMessage
action <- IO (MVar JobWorkerProcessMessage)
-> ResourceT IO (MVar JobWorkerProcessMessage)
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MVar JobWorkerProcessMessage)
 -> ResourceT IO (MVar JobWorkerProcessMessage))
-> IO (MVar JobWorkerProcessMessage)
-> ResourceT IO (MVar JobWorkerProcessMessage)
forall a b. (a -> b) -> a -> b
$ JobWorkerProcessMessage -> IO (MVar JobWorkerProcessMessage)
forall a. a -> IO (MVar a)
Concurrent.newMVar JobWorkerProcessMessage
JobAvailable
    let loop :: IO ()
loop = do
            JobWorkerProcessMessage
receivedAction <- MVar JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a. MVar a -> IO a
Concurrent.takeMVar MVar JobWorkerProcessMessage
action

            case JobWorkerProcessMessage
receivedAction of
                JobWorkerProcessMessage
JobAvailable -> do
                    Maybe job
maybeJob <- forall job.
(?modelContext::ModelContext,
 job ~ GetModelByTableName (GetTableName job),
 FilterPrimaryKey (GetTableName job), FromRow job,
 Show (PrimaryKey (GetTableName job)),
 FromField (PrimaryKey (GetTableName job)), Table job) =>
Maybe Int -> BackoffStrategy -> UUID -> IO (Maybe job)
Queue.fetchNextJob @job (forall job. Job job => Maybe Int
timeoutInMicroseconds @job) (forall job. Job job => BackoffStrategy
backoffStrategy @job) UUID
workerId
                    case Maybe job
maybeJob of
                        Just job
job -> do
                            Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> job -> Text
forall a. Show a => a -> Text
tshow job
job)

                            let ?job = job
?job::job
job
                            let Int
timeout :: Int = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (-Int
1) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job)
                            Either SomeException (Maybe ())
resultOrException <- IO (Maybe ()) -> IO (Either SomeException (Maybe ()))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Exception.tryAsync (Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
Timeout.timeout Int
timeout (job -> IO ()
forall job.
(Job job, ?modelContext::ModelContext,
 ?context::FrameworkConfig) =>
job -> IO ()
perform job
job))
                            case Either SomeException (Maybe ())
resultOrException of
                                Left SomeException
exception -> do
                                    job -> SomeException -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> SomeException -> IO ()
Queue.jobDidFail job
job SomeException
exception
                                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SomeException -> Bool
forall e. Exception e => e -> Bool
Exception.isAsyncException SomeException
exception) (SomeException -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
Exception.throwIO SomeException
exception)
                                Right Maybe ()
Nothing -> job -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> IO ()
Queue.jobDidTimeout job
job
                                Right (Just ()
_) -> job -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> IO ()
Queue.jobDidSucceed job
job

                            IO ()
loop
                        Maybe job
Nothing -> IO ()
loop
                JobWorkerProcessMessage
Stop -> do
                    -- Put the stop signal back in to stop the other runners as well
                    MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
                    () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    [(ReleaseKey, Async ())]
runners <- [Int]
-> (Int -> ResourceT IO (ReleaseKey, Async ()))
-> ResourceT IO [(ReleaseKey, Async ())]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..(forall job. Job job => Int
maxConcurrency @job)] \Int
index -> IO (Async ())
-> (Async () -> IO ()) -> ResourceT IO (ReleaseKey, Async ())
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async IO ()
loop) Async () -> IO ()
forall a. Async a -> IO ()
cancel

    (Subscription
subscription, ReleaseKey
pollerReleaseKey) <- (?modelContext::ModelContext) =>
PGListener
-> Text
-> Int
-> Maybe Int
-> BackoffStrategy
-> MVar JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
PGListener
-> Text
-> Int
-> Maybe Int
-> BackoffStrategy
-> MVar JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
Queue.watchForJob PGListener
pgListener (forall record. Table record => Text
tableName @job) (forall job. Job job => Int
queuePollInterval @job) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job) (forall job. Job job => BackoffStrategy
backoffStrategy @job) MVar JobWorkerProcessMessage
action


    JobWorkerProcess -> ResourceT IO JobWorkerProcess
forall a. a -> ResourceT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobWorkerProcess { [(ReleaseKey, Async ())]
runners :: [(ReleaseKey, Async ())]
runners :: [(ReleaseKey, Async ())]
runners, Subscription
subscription :: Subscription
subscription :: Subscription
subscription, ReleaseKey
pollerReleaseKey :: ReleaseKey
pollerReleaseKey :: ReleaseKey
pollerReleaseKey, MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action }