{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Runner where
import IHP.Prelude
import IHP.ControllerPrelude
import IHP.ScriptSupport
import qualified IHP.Job.Queue as Queue
import qualified Control.Exception as Exception
import qualified Database.PostgreSQL.Simple.FromField as PG
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified System.Timeout as Timeout
import qualified IHP.PGListener as PGListener
import IHP.Log.Types
import qualified IHP.Log as Log
runJobWorkers :: [JobWorker] -> Script
runJobWorkers :: [JobWorker] -> Script
runJobWorkers [JobWorker]
jobWorkers = (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers
dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO ()
dedicatedProcessMainLoop :: (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers = do
ThreadId
threadId <- IO ThreadId
Concurrent.myThreadId
IORef Integer
exitSignalsCount <- forall a. a -> IO (IORef a)
newIORef Integer
0
UUID
workerId <- IO UUID
UUID.nextRandom
let logger :: Logger
logger = ?context::FrameworkConfig
?context forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get forall a. IsLabel "logger" a => a
#logger
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting worker " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> Text
tshow UUID
workerId)
PGListener
pgListener <- ModelContext -> IO PGListener
PGListener.init ?modelContext::ModelContext
?modelContext
MVar Any
stopSignal <- forall a. IO (MVar a)
Concurrent.newEmptyMVar
IO ()
waitForExitSignal <- IO (IO ())
installSignalHandlers
let jobWorkerArgs :: JobWorkerArgs
jobWorkerArgs = JobWorkerArgs { UUID
$sel:workerId:JobWorkerArgs :: UUID
workerId :: UUID
workerId, $sel:modelContext:JobWorkerArgs :: ModelContext
modelContext = ?modelContext::ModelContext
?modelContext, $sel:frameworkConfig:JobWorkerArgs :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
?context, PGListener
$sel:pgListener:JobWorkerArgs :: PGListener
pgListener :: PGListener
pgListener }
[JobWorkerProcess]
processes <- [JobWorker]
jobWorkers
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\(JobWorker JobWorkerArgs -> IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)
IO ()
waitForExitSignal
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Waiting for jobs to complete. CTRL+C again to force exit" :: Text)
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { Async ()
$sel:poller:JobWorkerProcess :: JobWorkerProcess -> Async ()
poller :: Async ()
poller, Subscription
$sel:subscription:JobWorkerProcess :: JobWorkerProcess -> Subscription
subscription :: Subscription
subscription, MVar JobWorkerProcessMessage
$sel:action:JobWorkerProcess :: JobWorkerProcess -> MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action } -> do
Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener
forall a. Async a -> IO ()
Async.cancel Async ()
poller
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
PGListener -> IO ()
PGListener.stop PGListener
pgListener
forall a. IO a -> IO (Async a)
async do
IO ()
waitForExitSignal
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Canceling all running jobs. CTRL+C again to force exit" :: Text)
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { [Async ()]
$sel:runners:JobWorkerProcess :: JobWorkerProcess -> [Async ()]
runners :: [Async ()]
runners } -> do
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
runners forall a. Async a -> IO ()
Async.cancel
forall e. Exception e => ThreadId -> e -> IO ()
Concurrent.throwTo ThreadId
threadId ExitCode
Exit.ExitSuccess
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { [Async ()]
runners :: [Async ()]
$sel:runners:JobWorkerProcess :: JobWorkerProcess -> [Async ()]
runners } -> do
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
runners forall a. Async a -> IO a
Async.wait
forall e. Exception e => ThreadId -> e -> IO ()
Concurrent.throwTo ThreadId
threadId ExitCode
Exit.ExitSuccess
devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop :: (?modelContext::ModelContext) =>
FrameworkConfig -> PGListener -> [JobWorker] -> IO ()
devServerMainLoop FrameworkConfig
frameworkConfig PGListener
pgListener [JobWorker]
jobWorkers = do
UUID
workerId <- IO UUID
UUID.nextRandom
let ?context = FrameworkConfig
frameworkConfig
let logger :: Logger
logger = FrameworkConfig
frameworkConfig forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get forall a. IsLabel "logger" a => a
#logger
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting worker " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> Text
tshow UUID
workerId)
let jobWorkerArgs :: JobWorkerArgs
jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
$sel:workerId:JobWorkerArgs :: UUID
workerId, $sel:modelContext:JobWorkerArgs :: ModelContext
modelContext = ?modelContext::ModelContext
?modelContext, $sel:frameworkConfig:JobWorkerArgs :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
?context, PGListener
pgListener :: PGListener
$sel:pgListener:JobWorkerArgs :: PGListener
pgListener }
[JobWorkerProcess]
processes <- [JobWorker]
jobWorkers
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\(JobWorker JobWorkerArgs -> IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)
(forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
Concurrent.threadDelay forall a. Bounded a => a
maxBound)) forall a b. IO a -> IO b -> IO a
`Exception.finally` do
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { Async ()
poller :: Async ()
$sel:poller:JobWorkerProcess :: JobWorkerProcess -> Async ()
poller, Subscription
subscription :: Subscription
$sel:subscription:JobWorkerProcess :: JobWorkerProcess -> Subscription
subscription, [Async ()]
runners :: [Async ()]
$sel:runners:JobWorkerProcess :: JobWorkerProcess -> [Async ()]
runners, MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
$sel:action:JobWorkerProcess :: JobWorkerProcess -> MVar JobWorkerProcessMessage
action } -> do
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
forall a. Async a -> IO ()
Async.cancel Async ()
poller
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
runners forall a. Async a -> IO ()
Async.cancel
installSignalHandlers :: IO (IO ())
installSignalHandlers :: IO (IO ())
installSignalHandlers = do
MVar ()
exitSignal <- forall a. IO (MVar a)
Concurrent.newEmptyMVar
let catchHandler :: IO ()
catchHandler = forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar ()
exitSignal ()
Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigINT (IO () -> Handler
Signals.Catch IO ()
catchHandler) forall a. Maybe a
Nothing
Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigTERM (IO () -> Handler
Signals.Catch IO ()
catchHandler) forall a. Maybe a
Nothing
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. MVar a -> IO a
Concurrent.takeMVar MVar ()
exitSignal)
stopExitHandler :: JobWorkerArgs -> p -> p
stopExitHandler JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
pgListener :: PGListener
frameworkConfig :: FrameworkConfig
modelContext :: ModelContext
workerId :: UUID
$sel:pgListener:JobWorkerArgs :: JobWorkerArgs -> PGListener
$sel:frameworkConfig:JobWorkerArgs :: JobWorkerArgs -> FrameworkConfig
$sel:modelContext:JobWorkerArgs :: JobWorkerArgs -> ModelContext
$sel:workerId:JobWorkerArgs :: JobWorkerArgs -> UUID
.. } p
main = p
main
worker :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FilterPrimaryKey (GetTableName job)
, FromRow job
, Show (PrimaryKey (GetTableName job))
, PG.FromField (PrimaryKey (GetTableName job))
, KnownSymbol (GetTableName job)
, SetField "attemptsCount" job Int
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, HasField "runAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, Table job
) => JobWorker
worker :: forall job.
(job ~ GetModelByTableName (GetTableName job),
FilterPrimaryKey (GetTableName job), FromRow job,
Show (PrimaryKey (GetTableName job)),
FromField (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "runAt" job UTCTime, HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, Table job) =>
JobWorker
worker = (JobWorkerArgs -> IO JobWorkerProcess) -> JobWorker
JobWorker (forall job.
(job ~ GetModelByTableName (GetTableName job),
FilterPrimaryKey (GetTableName job), FromRow job,
Show (PrimaryKey (GetTableName job)),
FromField (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, Table job) =>
JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop @job)
jobWorkerFetchAndRunLoop :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FilterPrimaryKey (GetTableName job)
, FromRow job
, Show (PrimaryKey (GetTableName job))
, PG.FromField (PrimaryKey (GetTableName job))
, KnownSymbol (GetTableName job)
, SetField "attemptsCount" job Int
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, Table job
) => JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop :: forall job.
(job ~ GetModelByTableName (GetTableName job),
FilterPrimaryKey (GetTableName job), FromRow job,
Show (PrimaryKey (GetTableName job)),
FromField (PrimaryKey (GetTableName job)),
KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, Table job) =>
JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
pgListener :: PGListener
frameworkConfig :: FrameworkConfig
modelContext :: ModelContext
workerId :: UUID
$sel:pgListener:JobWorkerArgs :: JobWorkerArgs -> PGListener
$sel:frameworkConfig:JobWorkerArgs :: JobWorkerArgs -> FrameworkConfig
$sel:modelContext:JobWorkerArgs :: JobWorkerArgs -> ModelContext
$sel:workerId:JobWorkerArgs :: JobWorkerArgs -> UUID
.. } = do
let ?context = FrameworkConfig
frameworkConfig
let ?modelContext = ModelContext
modelContext
MVar JobWorkerProcessMessage
action <- forall a. a -> IO (MVar a)
Concurrent.newMVar JobWorkerProcessMessage
JobAvailable
[Async ()]
runners <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..(forall job. Job job => Int
maxConcurrency @job)] \Int
index -> forall a. IO a -> IO (Async a)
async do
let loop :: IO ()
loop = do
JobWorkerProcessMessage
receivedAction <- forall a. MVar a -> IO a
Concurrent.takeMVar MVar JobWorkerProcessMessage
action
case JobWorkerProcessMessage
receivedAction of
JobWorkerProcessMessage
JobAvailable -> do
Maybe job
maybeJob <- forall job.
(?modelContext::ModelContext,
job ~ GetModelByTableName (GetTableName job),
FilterPrimaryKey (GetTableName job), FromRow job,
Show (PrimaryKey (GetTableName job)),
FromField (PrimaryKey (GetTableName job)), Table job) =>
UUID -> IO (Maybe job)
Queue.fetchNextJob @job UUID
workerId
case Maybe job
maybeJob of
Just job
job -> do
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting job: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> Text
tshow job
job)
let ?job = job
job
let Int
timeout :: Int = forall a. a -> Maybe a -> a
fromMaybe (-Int
1) (forall job. (Job job, ?job::job) => Maybe Int
timeoutInMicroseconds @job)
Either SomeException (Maybe ())
resultOrException <- forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (forall a. Int -> IO a -> IO (Maybe a)
Timeout.timeout Int
timeout (forall job.
(Job job, ?modelContext::ModelContext,
?context::FrameworkConfig) =>
job -> IO ()
perform job
job))
case Either SomeException (Maybe ())
resultOrException of
Left SomeException
exception -> forall job.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext) =>
job -> SomeException -> IO ()
Queue.jobDidFail job
job SomeException
exception
Right Maybe ()
Nothing -> forall job.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext) =>
job -> IO ()
Queue.jobDidTimeout job
job
Right (Just ()
_) -> forall job.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext) =>
job -> IO ()
Queue.jobDidSucceed job
job
IO ()
loop
Maybe job
Nothing -> IO ()
loop
JobWorkerProcessMessage
Stop -> do
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
IO ()
loop
(Subscription
subscription, Async ()
poller) <- (?modelContext::ModelContext) =>
PGListener
-> Text
-> Int
-> MVar JobWorkerProcessMessage
-> IO (Subscription, Async ())
Queue.watchForJob PGListener
pgListener (forall record. Table record => Text
tableName @job) (forall job. Job job => Int
queuePollInterval @job) MVar JobWorkerProcessMessage
action
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobWorkerProcess { [Async ()]
runners :: [Async ()]
$sel:runners:JobWorkerProcess :: [Async ()]
runners, Subscription
subscription :: Subscription
$sel:subscription:JobWorkerProcess :: Subscription
subscription, Async ()
poller :: Async ()
$sel:poller:JobWorkerProcess :: Async ()
poller, MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
$sel:action:JobWorkerProcess :: MVar JobWorkerProcessMessage
action }