module IHP.Job.Runner.MainLoop
( runJobWorkers
, dedicatedProcessMainLoop
, devServerMainLoop
, installSignalHandlers
, stopExitHandler
) where

import IHP.Prelude
import IHP.ControllerPrelude
import IHP.ScriptSupport
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified IHP.PGListener as PGListener
import Control.Monad.Trans.Resource
import qualified Control.Exception.Safe as Exception
import qualified IHP.Log as Log
import Control.Concurrent.STM (atomically, writeTBQueue)

-- | Used by the RunJobs binary
runJobWorkers :: [JobWorker] -> Script
runJobWorkers :: [JobWorker] -> Script
runJobWorkers [JobWorker]
jobWorkers = (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers

-- | This job worker main loop is used when the job workers are running as part of their own binary
--
-- In dev mode the IHP dev server is using the 'devServerMainLoop' instead. We have two main loops
-- as the stop handling works a different in those cases.
--
dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO ()
dedicatedProcessMainLoop :: (?modelContext::ModelContext, ?context::FrameworkConfig) =>
[JobWorker] -> IO ()
dedicatedProcessMainLoop [JobWorker]
jobWorkers = do
    threadId <- IO ThreadId
Concurrent.myThreadId
    exitSignalsCount <- newIORef 0
    workerId <- UUID.nextRandom
    let logger = ?context::FrameworkConfig
FrameworkConfig
?context.logger

    Log.info ("Starting worker " <> tshow workerId)

    -- The job workers use their own dedicated PG listener as e.g. AutoRefresh or DataSync
    -- could overload the main PGListener connection. In that case we still want jobs to be
    -- run independent of the system being very busy.
    PGListener.withPGListener ?context.databaseUrl ?context.logger \PGListener
pgListener -> do
        stopSignal <- IO (MVar (ZonkAny 0))
forall a. IO (MVar a)
Concurrent.newEmptyMVar

        runResourceT do
            waitForExitSignal <- liftIO installSignalHandlers

            let jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }

            processes <- jobWorkers
                |> mapM (\(JobWorker JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun)-> JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)

            liftIO waitForExitSignal

            liftIO $ Log.info ("Waiting for jobs to complete. CTRL+C again to force exit" :: Text)

            -- Stop subscriptions and poller already
            -- This will stop all producers for the queue
            liftIO $ forEach processes \JobWorkerProcess { ReleaseKey
pollerReleaseKey :: ReleaseKey
pollerReleaseKey :: JobWorkerProcess -> ReleaseKey
pollerReleaseKey, Subscription
subscription :: Subscription
subscription :: JobWorkerProcess -> Subscription
subscription, TBQueue JobWorkerProcessMessage
action :: TBQueue JobWorkerProcessMessage
action :: JobWorkerProcess -> TBQueue JobWorkerProcessMessage
action, Maybe ReleaseKey
staleRecoveryReleaseKey :: Maybe ReleaseKey
staleRecoveryReleaseKey :: JobWorkerProcess -> Maybe ReleaseKey
staleRecoveryReleaseKey } -> do
                Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener
                ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
pollerReleaseKey
                case Maybe ReleaseKey
staleRecoveryReleaseKey of
                    Just ReleaseKey
key -> ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
key
                    Maybe ReleaseKey
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                -- Single Stop for the dispatcher (it waits for active workers internally)
                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop

            liftIO $ PGListener.stop pgListener

            -- While waiting for all jobs to complete, we also wait for another exit signal
            -- If the user sends two exit signals, we just kill all processes
            liftIO $ async do
                waitForExitSignal

                Log.info ("Canceling all running jobs. CTRL+C again to force exit" :: Text)

                forEach processes \JobWorkerProcess { dispatcher :: JobWorkerProcess -> (ReleaseKey, Async ())
dispatcher = (ReleaseKey
dispatcherKey, Async ()
_) } -> do
                    ReleaseKey -> IO ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
dispatcherKey  -- cancels dispatcher, whose finally cancels all workers

                Concurrent.throwTo threadId Exit.ExitSuccess

                pure ()

            -- Wait for dispatchers (which wait for their workers before exiting)
            liftIO $ forEach processes \JobWorkerProcess { dispatcher :: JobWorkerProcess -> (ReleaseKey, Async ())
dispatcher = (ReleaseKey
_, Async ()
dispatcherAsync) } -> do
                Async () -> IO ()
forall a. Async a -> IO a
Async.wait Async ()
dispatcherAsync

            liftIO $ Concurrent.throwTo threadId Exit.ExitSuccess

devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop :: (?modelContext::ModelContext) =>
FrameworkConfig -> PGListener -> [JobWorker] -> IO ()
devServerMainLoop FrameworkConfig
frameworkConfig PGListener
pgListener [JobWorker]
jobWorkers = do
    workerId <- IO UUID
UUID.nextRandom
    let ?context = frameworkConfig
    let logger = FrameworkConfig
frameworkConfig.logger

    Log.info ("Starting worker " <> tshow workerId)

    runResourceT do
        let jobWorkerArgs = JobWorkerArgs { UUID
workerId :: UUID
workerId :: UUID
workerId, modelContext :: ModelContext
modelContext = ?modelContext::ModelContext
ModelContext
?modelContext, frameworkConfig :: FrameworkConfig
frameworkConfig = ?context::FrameworkConfig
FrameworkConfig
?context, PGListener
pgListener :: PGListener
pgListener :: PGListener
pgListener }

        processes <- jobWorkers
                |> mapM (\(JobWorker JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun) -> JobWorkerArgs -> ResourceT IO JobWorkerProcess
listenAndRun JobWorkerArgs
jobWorkerArgs)

        liftIO $ (forever (Concurrent.threadDelay maxBound)) `Exception.finally` do
            forEach processes \JobWorkerProcess { TBQueue JobWorkerProcessMessage
action :: JobWorkerProcess -> TBQueue JobWorkerProcessMessage
action :: TBQueue JobWorkerProcessMessage
action } -> do
                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue JobWorkerProcessMessage
action JobWorkerProcessMessage
Stop

-- | Installs signals handlers and returns an IO action that blocks until the next sigINT or sigTERM is sent
installSignalHandlers :: IO (IO ())
installSignalHandlers :: IO (IO ())
installSignalHandlers = do
    exitSignal <- IO (MVar ())
forall a. IO (MVar a)
Concurrent.newEmptyMVar

    let catchHandler = MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar ()
exitSignal ()

    Signals.installHandler Signals.sigINT (Signals.Catch catchHandler) Nothing
    Signals.installHandler Signals.sigTERM (Signals.Catch catchHandler) Nothing

    pure (Concurrent.takeMVar exitSignal)

stopExitHandler :: JobWorkerArgs -> IO a -> IO a
stopExitHandler :: forall a. JobWorkerArgs -> IO a -> IO a
stopExitHandler JobWorkerArgs { UUID
PGListener
ModelContext
FrameworkConfig
workerId :: JobWorkerArgs -> UUID
modelContext :: JobWorkerArgs -> ModelContext
frameworkConfig :: JobWorkerArgs -> FrameworkConfig
pgListener :: JobWorkerArgs -> PGListener
workerId :: UUID
modelContext :: ModelContext
frameworkConfig :: FrameworkConfig
pgListener :: PGListener
.. } IO a
main = IO a
main