{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Runner where
import IHP.Prelude
import IHP.ControllerPrelude
import IHP.ScriptSupport
import qualified IHP.Job.Queue as Queue
import qualified Control.Exception as Exception
import qualified Database.PostgreSQL.Simple.FromField as PG
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified System.Timeout as Timeout
import qualified IHP.PGListener as PGListener
import qualified IHP.Log as Log
runJobWorkers :: [JobWorker] -> Script
runJobWorkers :: [JobWorker] -> Script
runJobWorkers [JobWorker]
jobWorkers = (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers
dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO ()
dedicatedProcessMainLoop :: (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers = do
ThreadId
threadId <- IO ThreadId
Concurrent.myThreadId
IORef Integer
exitSignalsCount <- Integer -> IO (IORef Integer)
forall a. a -> IO (IORef a)
newIORef Integer
0
UUID
workerId <- IO UUID
UUID.nextRandom
let logger :: Logger
logger = ?context::FrameworkConfig
FrameworkConfig
?context.logger
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting worker " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
forall a. Show a => a -> Text
tshow UUID
workerId)
PGListener
pgListener <- ModelContext -> IO PGListener
PGListener.init ?modelContext::ModelContext
ModelContext
?modelContext
MVar Any
stopSignal <- IO (MVar Any)
forall a. IO (MVar a)
Concurrent.newEmptyMVar
IO ()
waitForExitSignal <- IO (IO ())
installSignalHandlers
let jobWorkerArgs :: JobWorkerArgs
jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }
[JobWorkerProcess]
processes <- [JobWorker]
jobWorkers
[JobWorker]
-> ([JobWorker] -> IO [JobWorkerProcess]) -> IO [JobWorkerProcess]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (JobWorker -> IO JobWorkerProcess)
-> [JobWorker] -> IO [JobWorkerProcess]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\(JobWorker JobWorkerArgs -> IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)
IO ()
waitForExitSignal
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Waiting for jobs to complete. CTRL+C again to force exit" :: Text)
[JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { Async ()
poller :: Async ()
poller :: JobWorkerProcess -> Async ()
poller, Subscription
subscription :: Subscription
subscription :: JobWorkerProcess -> Subscription
subscription, MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action :: JobWorkerProcess -> MVar JobWorkerProcessMessage
action } -> do
Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener
Async () -> IO ()
forall a. Async a -> IO ()
Async.cancel Async ()
poller
MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
PGListener -> IO ()
PGListener.stop PGListener
pgListener
IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async do
IO ()
waitForExitSignal
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Canceling all running jobs. CTRL+C again to force exit" :: Text)
[JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { [Async ()]
runners :: [Async ()]
runners :: JobWorkerProcess -> [Async ()]
runners } -> do
[Async ()] -> (Element [Async ()] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
runners Async () -> IO ()
Element [Async ()] -> IO ()
forall a. Async a -> IO ()
Async.cancel
ThreadId -> ExitCode -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Concurrent.throwTo ThreadId
threadId ExitCode
Exit.ExitSuccess
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
[JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { [Async ()]
runners :: JobWorkerProcess -> [Async ()]
runners :: [Async ()]
runners } -> do
[Async ()] -> (Element [Async ()] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
runners Async () -> IO ()
Element [Async ()] -> IO ()
forall a. Async a -> IO a
Async.wait
ThreadId -> ExitCode -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Concurrent.throwTo ThreadId
threadId ExitCode
Exit.ExitSuccess
devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop :: (?modelContext::ModelContext) =>
FrameworkConfig -> PGListener -> [JobWorker] -> IO ()
devServerMainLoop FrameworkConfig
frameworkConfig PGListener
pgListener [JobWorker]
jobWorkers = do
UUID
workerId <- IO UUID
UUID.nextRandom
let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
let logger :: Logger
logger = FrameworkConfig
frameworkConfig.logger
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting worker " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
forall a. Show a => a -> Text
tshow UUID
workerId)
let jobWorkerArgs :: JobWorkerArgs
jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }
[JobWorkerProcess]
processes <- [JobWorker]
jobWorkers
[JobWorker]
-> ([JobWorker] -> IO [JobWorkerProcess]) -> IO [JobWorkerProcess]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (JobWorker -> IO JobWorkerProcess)
-> [JobWorker] -> IO [JobWorkerProcess]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\(JobWorker JobWorkerArgs -> IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)
(IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
Concurrent.threadDelay Int
forall a. Bounded a => a
maxBound)) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`Exception.finally` do
[JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { Async ()
poller :: JobWorkerProcess -> Async ()
poller :: Async ()
poller, Subscription
subscription :: JobWorkerProcess -> Subscription
subscription :: Subscription
subscription, [Async ()]
runners :: JobWorkerProcess -> [Async ()]
runners :: [Async ()]
runners, MVar JobWorkerProcessMessage
action :: JobWorkerProcess -> MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action } -> do
MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
Async () -> IO ()
forall a. Async a -> IO ()
Async.cancel Async ()
poller
[Async ()] -> (Element [Async ()] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
runners Async () -> IO ()
Element [Async ()] -> IO ()
forall a. Async a -> IO ()
Async.cancel
installSignalHandlers :: IO (IO ())
installSignalHandlers :: IO (IO ())
installSignalHandlers = do
MVar ()
exitSignal <- IO (MVar ())
forall a. IO (MVar a)
Concurrent.newEmptyMVar
let catchHandler :: IO ()
catchHandler = MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar ()
exitSignal ()
Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigINT (IO () -> Handler
Signals.Catch IO ()
catchHandler) Maybe SignalSet
forall a. Maybe a
Nothing
Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigTERM (IO () -> Handler
Signals.Catch IO ()
catchHandler) Maybe SignalSet
forall a. Maybe a
Nothing
IO () -> IO (IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVar () -> IO ()
forall a. MVar a -> IO a
Concurrent.takeMVar MVar ()
exitSignal)
stopExitHandler :: JobWorkerArgs -> p -> p
stopExitHandler JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } p
main = p
main
worker :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FilterPrimaryKey (GetTableName job)
, FromRow job
, Show (PrimaryKey (GetTableName job))
, PG.FromField (PrimaryKey (GetTableName job))
, KnownSymbol (GetTableName job)
, SetField "attemptsCount" job Int
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, HasField "runAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, Table job
) => JobWorker
worker :: forall job.
(job ~ GetModelByTableName (GetTableName job),
FilterPrimaryKey (GetTableName job), FromRow job,
Show (PrimaryKey (GetTableName job)),
FromField (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "runAt" job UTCTime, HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, Table job) =>
JobWorker
worker = (JobWorkerArgs -> IO JobWorkerProcess) -> JobWorker
JobWorker (forall job.
(job ~ GetModelByTableName (GetTableName job),
FilterPrimaryKey (GetTableName job), FromRow job,
Show (PrimaryKey (GetTableName job)),
FromField (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, Table job) =>
JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop @job)
jobWorkerFetchAndRunLoop :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FilterPrimaryKey (GetTableName job)
, FromRow job
, Show (PrimaryKey (GetTableName job))
, PG.FromField (PrimaryKey (GetTableName job))
, KnownSymbol (GetTableName job)
, SetField "attemptsCount" job Int
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, Table job
) => JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop :: forall job.
(job ~ GetModelByTableName (GetTableName job),
FilterPrimaryKey (GetTableName job), FromRow job,
Show (PrimaryKey (GetTableName job)),
FromField (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, Table job) =>
JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } = do
let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
let ?modelContext = ?modelContext::ModelContext
ModelContext
modelContext
MVar JobWorkerProcessMessage
action <- JobWorkerProcessMessage -> IO (MVar JobWorkerProcessMessage)
forall a. a -> IO (MVar a)
Concurrent.newMVar JobWorkerProcessMessage
JobAvailable
[Async ()]
runners <- [Int] -> (Int -> IO (Async ())) -> IO [Async ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..(forall job. Job job => Int
maxConcurrency @job)] \Int
index -> IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async do
let loop :: IO ()
loop = do
JobWorkerProcessMessage
receivedAction <- MVar JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a. MVar a -> IO a
Concurrent.takeMVar MVar JobWorkerProcessMessage
action
case JobWorkerProcessMessage
receivedAction of
JobWorkerProcessMessage
JobAvailable -> do
Maybe job
maybeJob <- forall job.
(?modelContext::ModelContext,
job ~ GetModelByTableName (GetTableName job),
FilterPrimaryKey (GetTableName job), FromRow job,
Show (PrimaryKey (GetTableName job)),
FromField (PrimaryKey (GetTableName job)), Table job) =>
Maybe Int -> BackoffStrategy -> UUID -> IO (Maybe job)
Queue.fetchNextJob @job (forall job. Job job => Maybe Int
timeoutInMicroseconds @job) (forall job. Job job => BackoffStrategy
backoffStrategy @job) UUID
workerId
case Maybe job
maybeJob of
Just job
job -> do
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> job -> Text
forall a. Show a => a -> Text
tshow job
job)
let ?job = job
?job::job
job
let Int
timeout :: Int = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (-Int
1) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job)
Either SomeException (Maybe ())
resultOrException <- IO (Maybe ()) -> IO (Either SomeException (Maybe ()))
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
Timeout.timeout Int
timeout (job -> IO ()
forall job.
(Job job, ?modelContext::ModelContext,
?context::FrameworkConfig) =>
job -> IO ()
perform job
job))
case Either SomeException (Maybe ())
resultOrException of
Left SomeException
exception -> job -> SomeException -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> SomeException -> IO ()
Queue.jobDidFail job
job SomeException
exception
Right Maybe ()
Nothing -> job -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> IO ()
Queue.jobDidTimeout job
job
Right (Just ()
_) -> job -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> IO ()
Queue.jobDidSucceed job
job
IO ()
loop
Maybe job
Nothing -> IO ()
loop
JobWorkerProcessMessage
Stop -> do
MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
IO ()
loop
(Subscription
subscription, Async ()
poller) <- (?modelContext::ModelContext) =>
PGListener
-> Text
-> Int
-> Maybe Int
-> BackoffStrategy
-> MVar JobWorkerProcessMessage
-> IO (Subscription, Async ())
PGListener
-> Text
-> Int
-> Maybe Int
-> BackoffStrategy
-> MVar JobWorkerProcessMessage
-> IO (Subscription, Async ())
Queue.watchForJob PGListener
pgListener (forall record. Table record => Text
tableName @job) (forall job. Job job => Int
queuePollInterval @job) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job) (forall job. Job job => BackoffStrategy
backoffStrategy @job) MVar JobWorkerProcessMessage
action
JobWorkerProcess -> IO JobWorkerProcess
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobWorkerProcess { [Async ()]
runners :: [Async ()]
runners :: [Async ()]
runners, Subscription
subscription :: Subscription
subscription :: Subscription
subscription, Async ()
poller :: Async ()
poller :: Async ()
poller, MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action }