{-# LANGUAGE AllowAmbiguousTypes #-}
{-|
Module: IHP.Job.Runner
Description: Functions to run jobs
Copyright: (c) digitally induced GmbH, 2020
-}
module IHP.Job.Runner where

import IHP.Prelude
import IHP.ControllerPrelude
import IHP.ScriptSupport
import qualified IHP.Job.Queue as Queue
import qualified Control.Exception as Exception
import qualified Database.PostgreSQL.Simple.FromField as PG
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified System.Timeout as Timeout
import qualified IHP.PGListener as PGListener

import qualified IHP.Log as Log

-- | Used by the RunJobs binary
runJobWorkers :: [JobWorker] -> Script
runJobWorkers :: [JobWorker] -> Script
runJobWorkers [JobWorker]
jobWorkers = (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers

-- | This job worker main loop is used when the job workers are running as part of their own binary
--
-- In dev mode the IHP dev server is using the 'devServerMainLoop' instead. We have two main loops
-- as the stop handling works a different in those cases.
--
dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO ()
dedicatedProcessMainLoop :: (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers = do
    ThreadId
threadId <- IO ThreadId
Concurrent.myThreadId
    IORef Integer
exitSignalsCount <- Integer -> IO (IORef Integer)
forall a. a -> IO (IORef a)
newIORef Integer
0
    UUID
workerId <- IO UUID
UUID.nextRandom
    let logger :: Logger
logger = ?context::FrameworkConfig
FrameworkConfig
?context.logger

    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting worker " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
forall a. Show a => a -> Text
tshow UUID
workerId)

    -- The job workers use their own dedicated PG listener as e.g. AutoRefresh or DataSync
    -- could overload the main PGListener connection. In that case we still want jobs to be
    -- run independent of the system being very busy.
    PGListener
pgListener <- ModelContext -> IO PGListener
PGListener.init ?modelContext::ModelContext
ModelContext
?modelContext
    MVar Any
stopSignal <- IO (MVar Any)
forall a. IO (MVar a)
Concurrent.newEmptyMVar
    IO ()
waitForExitSignal <- IO (IO ())
installSignalHandlers

    let jobWorkerArgs :: JobWorkerArgs
jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }
    
    [JobWorkerProcess]
processes <- [JobWorker]
jobWorkers
        [JobWorker]
-> ([JobWorker] -> IO [JobWorkerProcess]) -> IO [JobWorkerProcess]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (JobWorker -> IO JobWorkerProcess)
-> [JobWorker] -> IO [JobWorkerProcess]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\(JobWorker JobWorkerArgs -> IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)

    IO ()
waitForExitSignal

    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Waiting for jobs to complete. CTRL+C again to force exit" :: Text)

    -- Stop subscriptions and poller already
    -- This will stop all producers for the queue MVar
    [JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { Async ()
poller :: Async ()
poller :: JobWorkerProcess -> Async ()
poller, Subscription
subscription :: Subscription
subscription :: JobWorkerProcess -> Subscription
subscription, MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action :: JobWorkerProcess -> MVar JobWorkerProcessMessage
action } -> do
        Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener
        Async () -> IO ()
forall a. Async a -> IO ()
Async.cancel Async ()
poller
        MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop

    PGListener -> IO ()
PGListener.stop PGListener
pgListener

    -- While waiting for all jobs to complete, we also wait for another exit signal
    -- If the user sends two exit signals, we just kill all processes
    IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async do
        IO ()
waitForExitSignal

        Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Canceling all running jobs. CTRL+C again to force exit" :: Text)
        
        [JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { [Async ()]
runners :: [Async ()]
runners :: JobWorkerProcess -> [Async ()]
runners } -> do
            [Async ()] -> (Element [Async ()] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
runners Async () -> IO ()
Element [Async ()] -> IO ()
forall a. Async a -> IO ()
Async.cancel

        ThreadId -> ExitCode -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Concurrent.throwTo ThreadId
threadId ExitCode
Exit.ExitSuccess

        () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    -- Wait for all runners to complete
    [JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { [Async ()]
runners :: JobWorkerProcess -> [Async ()]
runners :: [Async ()]
runners } -> do
        [Async ()] -> (Element [Async ()] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
runners Async () -> IO ()
Element [Async ()] -> IO ()
forall a. Async a -> IO a
Async.wait

    ThreadId -> ExitCode -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Concurrent.throwTo ThreadId
threadId ExitCode
Exit.ExitSuccess

devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop :: (?modelContext::ModelContext) =>
FrameworkConfig -> PGListener -> [JobWorker] -> IO ()
devServerMainLoop FrameworkConfig
frameworkConfig PGListener
pgListener [JobWorker]
jobWorkers = do
    UUID
workerId <- IO UUID
UUID.nextRandom
    let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
    let logger :: Logger
logger = FrameworkConfig
frameworkConfig.logger

    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting worker " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
forall a. Show a => a -> Text
tshow UUID
workerId)

    let jobWorkerArgs :: JobWorkerArgs
jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }
    
    [JobWorkerProcess]
processes <- [JobWorker]
jobWorkers
        [JobWorker]
-> ([JobWorker] -> IO [JobWorkerProcess]) -> IO [JobWorkerProcess]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (JobWorker -> IO JobWorkerProcess)
-> [JobWorker] -> IO [JobWorkerProcess]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\(JobWorker JobWorkerArgs -> IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)

    (IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
Concurrent.threadDelay Int
forall a. Bounded a => a
maxBound)) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`Exception.finally` do
        [JobWorkerProcess]
-> (Element [JobWorkerProcess] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [JobWorkerProcess]
processes \JobWorkerProcess { Async ()
poller :: JobWorkerProcess -> Async ()
poller :: Async ()
poller, Subscription
subscription :: JobWorkerProcess -> Subscription
subscription :: Subscription
subscription, [Async ()]
runners :: JobWorkerProcess -> [Async ()]
runners :: [Async ()]
runners, MVar JobWorkerProcessMessage
action :: JobWorkerProcess -> MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action } -> do
            MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
            Async () -> IO ()
forall a. Async a -> IO ()
Async.cancel Async ()
poller
            [Async ()] -> (Element [Async ()] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
runners Async () -> IO ()
Element [Async ()] -> IO ()
forall a. Async a -> IO ()
Async.cancel

-- | Installs signals handlers and returns an IO action that blocks until the next sigINT or sigTERM is sent
installSignalHandlers :: IO (IO ())
installSignalHandlers :: IO (IO ())
installSignalHandlers = do
    MVar ()
exitSignal <- IO (MVar ())
forall a. IO (MVar a)
Concurrent.newEmptyMVar

    let catchHandler :: IO ()
catchHandler = MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar ()
exitSignal ()
            
    Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigINT (IO () -> Handler
Signals.Catch IO ()
catchHandler) Maybe SignalSet
forall a. Maybe a
Nothing
    Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigTERM (IO () -> Handler
Signals.Catch IO ()
catchHandler) Maybe SignalSet
forall a. Maybe a
Nothing

    IO () -> IO (IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVar () -> IO ()
forall a. MVar a -> IO a
Concurrent.takeMVar MVar ()
exitSignal)

stopExitHandler :: JobWorkerArgs -> p -> p
stopExitHandler JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } p
main = p
main

worker :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FilterPrimaryKey (GetTableName job)
    , FromRow job
    , Show (PrimaryKey (GetTableName job))
    , PG.FromField (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    , SetField "attemptsCount" job Int
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , HasField "runAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , Table job
    ) => JobWorker
worker :: forall job.
(job ~ GetModelByTableName (GetTableName job),
 FilterPrimaryKey (GetTableName job), FromRow job,
 Show (PrimaryKey (GetTableName job)),
 FromField (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "runAt" job UTCTime, HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, Table job) =>
JobWorker
worker = (JobWorkerArgs -> IO JobWorkerProcess) -> JobWorker
JobWorker (forall job.
(job ~ GetModelByTableName (GetTableName job),
 FilterPrimaryKey (GetTableName job), FromRow job,
 Show (PrimaryKey (GetTableName job)),
 FromField (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, Table job) =>
JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop @job)


jobWorkerFetchAndRunLoop :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FilterPrimaryKey (GetTableName job)
    , FromRow job
    , Show (PrimaryKey (GetTableName job))
    , PG.FromField (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    , SetField "attemptsCount" job Int
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , Table job
    ) => JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop :: forall job.
(job ~ GetModelByTableName (GetTableName job),
 FilterPrimaryKey (GetTableName job), FromRow job,
 Show (PrimaryKey (GetTableName job)),
 FromField (PrimaryKey (GetTableName job)),
 KnownSymbol (GetTableName job), SetField "attemptsCount" job Int,
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, Table job) =>
JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { UUID
ModelContext
PGListener
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } = do
    let ?context = ?context::FrameworkConfig
FrameworkConfig
frameworkConfig
    let ?modelContext = ?modelContext::ModelContext
ModelContext
modelContext

    MVar JobWorkerProcessMessage
action <- JobWorkerProcessMessage -> IO (MVar JobWorkerProcessMessage)
forall a. a -> IO (MVar a)
Concurrent.newMVar JobWorkerProcessMessage
JobAvailable
    [Async ()]
runners <- [Int] -> (Int -> IO (Async ())) -> IO [Async ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..(forall job. Job job => Int
maxConcurrency @job)] \Int
index -> IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async do
        let loop :: IO ()
loop = do
                JobWorkerProcessMessage
receivedAction <- MVar JobWorkerProcessMessage -> IO JobWorkerProcessMessage
forall a. MVar a -> IO a
Concurrent.takeMVar MVar JobWorkerProcessMessage
action

                case JobWorkerProcessMessage
receivedAction of
                    JobWorkerProcessMessage
JobAvailable -> do
                        Maybe job
maybeJob <- forall job.
(?modelContext::ModelContext,
 job ~ GetModelByTableName (GetTableName job),
 FilterPrimaryKey (GetTableName job), FromRow job,
 Show (PrimaryKey (GetTableName job)),
 FromField (PrimaryKey (GetTableName job)), Table job) =>
Maybe Int -> BackoffStrategy -> UUID -> IO (Maybe job)
Queue.fetchNextJob @job (forall job. Job job => Maybe Int
timeoutInMicroseconds @job) (forall job. Job job => BackoffStrategy
backoffStrategy @job) UUID
workerId
                        case Maybe job
maybeJob of
                            Just job
job -> do
                                Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Starting job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> job -> Text
forall a. Show a => a -> Text
tshow job
job)

                                let ?job = job
?job::job
job
                                let Int
timeout :: Int = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (-Int
1) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job)
                                Either SomeException (Maybe ())
resultOrException <- IO (Maybe ()) -> IO (Either SomeException (Maybe ()))
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
Timeout.timeout Int
timeout (job -> IO ()
forall job.
(Job job, ?modelContext::ModelContext,
 ?context::FrameworkConfig) =>
job -> IO ()
perform job
job))
                                case Either SomeException (Maybe ())
resultOrException of
                                    Left SomeException
exception -> job -> SomeException -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> SomeException -> IO ()
Queue.jobDidFail job
job SomeException
exception
                                    Right Maybe ()
Nothing -> job -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> IO ()
Queue.jobDidTimeout job
job
                                    Right (Just ()
_) -> job -> IO ()
forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> IO ()
Queue.jobDidSucceed job
job

                                IO ()
loop
                            Maybe job
Nothing -> IO ()
loop
                    JobWorkerProcessMessage
Stop -> do
                        -- Put the stop signal back in to stop the other runners as well
                        MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop
                        () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

        IO ()
loop

    (Subscription
subscription, Async ()
poller) <- (?modelContext::ModelContext) =>
PGListener
-> Text
-> Int
-> Maybe Int
-> BackoffStrategy
-> MVar JobWorkerProcessMessage
-> IO (Subscription, Async ())
PGListener
-> Text
-> Int
-> Maybe Int
-> BackoffStrategy
-> MVar JobWorkerProcessMessage
-> IO (Subscription, Async ())
Queue.watchForJob PGListener
pgListener (forall record. Table record => Text
tableName @job) (forall job. Job job => Int
queuePollInterval @job) (forall job. Job job => Maybe Int
timeoutInMicroseconds @job) (forall job. Job job => BackoffStrategy
backoffStrategy @job) MVar JobWorkerProcessMessage
action


    JobWorkerProcess -> IO JobWorkerProcess
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobWorkerProcess { [Async ()]
runners :: [Async ()]
runners :: [Async ()]
runners, Subscription
subscription :: Subscription
subscription :: Subscription
subscription, Async ()
poller :: Async ()
poller :: Async ()
poller, MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action :: MVar JobWorkerProcessMessage
action }