{-# LANGUAGE AllowAmbiguousTypes #-}
{-|
Module: IHP.Job.Queue
Description: Functions to operate on the Job Queue Database
Copyright: (c) digitally induced GmbH, 2020
-}
module IHP.Job.Queue where

import IHP.Prelude
import IHP.Job.Types
import qualified Database.PostgreSQL.Simple.FromField as PG
import qualified Database.PostgreSQL.Simple.ToField as PG
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent as Concurrent
import qualified Control.Exception.Safe as Exception
import IHP.ModelSupport
import IHP.Controller.Param
import qualified System.Random as Random
import qualified IHP.PGListener as PGListener
import qualified IHP.Log as Log
import Control.Monad.Trans.Resource
import IHP.Hasql.FromRow (FromRowHasql(..))
import qualified Hasql.Encoders as Encoders
import Hasql.Implicits.Encoders (DefaultParamEncoder(..))

import qualified Data.HashMap.Strict as HashMap
import qualified Hasql.Pool as HasqlPool
import qualified Hasql.Session as HasqlSession
import qualified Hasql.Decoders as Decoders
import qualified Hasql.DynamicStatements.Snippet as Snippet
import IHP.Hasql.Encoders ()
import qualified Data.Text as Text
import Control.Concurrent.STM (TBQueue, atomically, writeTBQueue, STM)
import Control.Concurrent.STM.TBQueue (isFullTBQueue)

-- | Lock and fetch the next available job. In case no job is available returns Nothing.
--
-- The lock is set on the job row in an atomic way.
--
-- The job status is set to JobStatusRunning, lockedBy will be set to the worker id and the attemptsCount is incremented.
--
-- __Example:__ Locking a SendMailJob
--
-- > let workerId :: UUID = "faa5ba30-1d76-4adf-bf01-2d1f95cddc04"
-- > job <- fetchNextJob @SendMailJob workerId
--
-- After you're done with the job, call 'jobDidFail' or 'jobDidSucceed' to make it available to the queue again.
fetchNextJob :: forall job.
    ( ?modelContext :: ModelContext
    , job ~ GetModelByTableName (GetTableName job)
    , FromRowHasql job
    , Show (PrimaryKey (GetTableName job))
    , Table job
    ) => UUID -> IO (Maybe job)
fetchNextJob :: forall job.
(?modelContext::ModelContext,
 job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)), Table job) =>
UUID -> IO (Maybe job)
fetchNextJob UUID
workerId = do
    let tableNameText :: Text
tableNameText = forall record. Table record => Text
tableName @job
    let returningColumns :: Text
returningColumns = Text -> [Text] -> Text
Text.intercalate Text
", " (forall record. Table record => [Text]
columnNames @job)
    let snippet :: Snippet
snippet =
            Text -> Snippet
Snippet.sql Text
"UPDATE " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
tableNameText
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" SET status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusRunning
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
", locked_at = NOW(), locked_by = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> UUID -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param UUID
workerId
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
", attempts_count = attempts_count + 1"
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" WHERE id IN (SELECT id FROM " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
tableNameText
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" WHERE " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Snippet
pendingJobCondition
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" ORDER BY created_at LIMIT 1 FOR UPDATE SKIP LOCKED)"
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" RETURNING " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
returningColumns
    let decoder :: Result (Maybe job)
decoder = Row job -> Result (Maybe job)
forall a. Row a -> Result (Maybe a)
Decoders.rowMaybe (forall a. FromRowHasql a => Row a
hasqlRowDecoder @job)

    pool <- IO Pool
(?modelContext::ModelContext) => IO Pool
getHasqlPool
    withoutQueryLogging (sqlQueryHasql pool snippet decoder)

-- | Shared WHERE condition for fetching pending jobs.
-- Matches jobs that are either not started or in retry state,
-- not locked, and whose run_at time has passed.
pendingJobCondition :: Snippet.Snippet
pendingJobCondition :: Snippet
pendingJobCondition =
    Text -> Snippet
Snippet.sql Text
"(status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusNotStarted
    Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" OR status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusRetry
    Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
") AND locked_by IS NULL AND run_at <= NOW()"

-- | Calls a callback every time something is inserted, updated or deleted in a given database table.
--
-- In the background this function creates a database trigger to notify this function about table changes
-- using pg_notify. When there are existing triggers, it will silently recreate them. So this will most likely
-- not fail.
--
-- This function returns a Async. Call 'cancel' on the async to stop watching the database.
--
-- __Example:__
--
-- > watchInsertOrUpdateTable "projects" do
-- >     putStrLn "Something changed in the projects table"
--
-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@
-- You will see that @"Something changed in the projects table"@ is printed onto the screen.
--
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO (PGListener.Subscription, ReleaseKey)
watchForJob :: (?modelContext::ModelContext) =>
PGListener
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
watchForJob PGListener
pgListener Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob = do
    let tableNameBS :: ByteString
tableNameBS = Text -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs Text
tableName
    IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
        pool <- IO Pool
(?modelContext::ModelContext) => IO Pool
getHasqlPool
        withoutQueryLogging (runSessionHasql pool (HasqlSession.script (createNotificationTriggerSQL tableNameBS)))

    poller <- (?modelContext::ModelContext) =>
Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
pollForJob Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob
    subscription <- liftIO $ pgListener |> PGListener.subscribe (channelName tableNameBS) (const (do _ <- atomically $ tryWriteTBQueue onNewJob JobAvailable; pure ()))

    pure (subscription, poller)

-- | Periodically checks the queue table for open jobs. Calls the callback if there are any.
--
-- 'watchForJob' only catches jobs when something is changed on the table. When a job is scheduled
-- with a 'runAt' in the future, and no other operation is happening on the queue, the database triggers
-- will not run, and so 'watchForJob' cannot pick up the job even when 'runAt' is now in the past.
--
-- This function returns a Async. Call 'cancel' on the async to stop polling the database.
--
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO ReleaseKey
pollForJob :: (?modelContext::ModelContext) =>
Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
pollForJob Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob = do
    let snippet :: Snippet
snippet =
            Text -> Snippet
Snippet.sql Text
"SELECT COUNT(*) FROM " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
tableName
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" WHERE " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Snippet
pendingJobCondition
    let decoder :: Result Int64
decoder = Row Int64 -> Result Int64
forall a. Row a -> Result a
Decoders.singleRow (NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
Decoders.column (Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
Decoders.nonNullable Value Int64
Decoders.int8))
    let handler :: IO (ZonkAny 0)
handler = do
            let ?context = ?context::ModelContext
?modelContext::ModelContext
ModelContext
?modelContext
            pool <- IO Pool
(?modelContext::ModelContext) => IO Pool
getHasqlPool
            forever do
                result <- Exception.tryAny do
                    -- We don't log the queries to the console as it's filling up the log entries with noise
                    count :: Int <- fromIntegral <$> withoutQueryLogging (sqlQueryHasql pool snippet decoder)

                    -- For every job we send one signal to the job workers
                    -- This way we use full concurrency when we find multiple jobs
                    -- that haven't been picked up by the PGListener
                    forEach [1..count] \Element [Int]
_ -> do
                        _ <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM Bool
forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue JobWorkerProcessMessage
onNewJob JobWorkerProcessMessage
JobAvailable
                        pure ()
                case result of
                    Left SomeException
exception -> Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (Text
"Job poller: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception)
                    Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

                -- Add up to 2 seconds of jitter to avoid all job queues polling at the same time
                jitter <- Random.randomRIO (0, 2000000)
                let pollIntervalWithJitter = Int
pollInterval Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
jitter

                Concurrent.threadDelay pollIntervalWithJitter

    (ReleaseKey, Async (ZonkAny 0)) -> ReleaseKey
forall a b. (a, b) -> a
fst ((ReleaseKey, Async (ZonkAny 0)) -> ReleaseKey)
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
-> ResourceT IO ReleaseKey
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Async (ZonkAny 0))
-> (Async (ZonkAny 0) -> IO ())
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (IO (ZonkAny 0) -> IO (Async (ZonkAny 0))
forall a. IO a -> IO (Async a)
Async.async IO (ZonkAny 0)
handler) Async (ZonkAny 0) -> IO ()
forall a. Async a -> IO ()
Async.cancel

-- | Returns a SQL script to create the notification trigger.
--
-- Wrapped in a DO $$ block with EXCEPTION handler because concurrent requests
-- can race to CREATE OR REPLACE the same function, causing PostgreSQL to throw
-- 'tuple concurrently updated' (SQLSTATE XX000). This is safe to ignore: the
-- other connection's CREATE OR REPLACE will have succeeded.
createNotificationTriggerSQL :: ByteString -> Text
createNotificationTriggerSQL :: ByteString -> Text
createNotificationTriggerSQL ByteString
tableName =
        ByteString -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$
        ByteString
"DO $$\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"BEGIN\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    CREATE OR REPLACE FUNCTION " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"() RETURNS TRIGGER AS $BODY$"
            ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"BEGIN\n"
            ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    PERFORM pg_notify('" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> ByteString
channelName ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"', '');\n"
            ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    RETURN new;"
            ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"END;\n"
            ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"$BODY$ language plpgsql;\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    DROP TRIGGER IF EXISTS " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
insertTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
";\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    CREATE TRIGGER " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
insertTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" AFTER INSERT ON \"" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"();\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    DROP TRIGGER IF EXISTS " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
updateTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
";\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    CREATE TRIGGER " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
updateTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" AFTER UPDATE ON \"" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"();\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"EXCEPTION\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    WHEN SQLSTATE 'XX000' THEN null; -- 'tuple concurrently updated': another connection installed it first\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"END; $$"
    where
        functionName :: ByteString
functionName = ByteString
"notify_job_queued_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
        insertTriggerName :: ByteString
insertTriggerName = ByteString
"did_insert_job_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
        updateTriggerName :: ByteString
updateTriggerName = ByteString
"did_update_job_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName

-- | Retuns the event name of the event that the pg notify trigger dispatches
channelName :: ByteString -> ByteString
channelName :: ByteString -> ByteString
channelName ByteString
tableName = ByteString
"job_available_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName

-- | Called when a job failed. Sets the job status to 'JobStatusFailed' or 'JobStatusRetry' (if more attempts are possible) and resets 'lockedBy'
jobDidFail :: forall job context.
    ( job ~ GetModelByTableName (GetTableName job)
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , SetField "runAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , ?modelContext :: ModelContext
    , ?context :: context
    , HasField "logger" context Log.Logger
    ) => job -> SomeException -> IO ()
jobDidFail :: forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 SetField "runAt" job UTCTime, HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> SomeException -> IO ()
jobDidFail job
job SomeException
exception = do
    now <- IO UTCTime
getCurrentTime

    Log.warn ("Failed job with exception: " <> tshow exception)

    let ?job = job
    let canRetry = job
job.attemptsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
    let status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusFailed
    let nextRunAt = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (forall job. Job job => BackoffStrategy
backoffStrategy @job) job
job.attemptsCount) UTCTime
now
    job
        |> set #status status
        |> set #lockedBy Nothing
        |> set #updatedAt now
        |> set #lastError (Just (tshow exception))
        |> (if canRetry then set #runAt nextRunAt else id)
        |> updateRecord

    pure ()

jobDidTimeout :: forall job context.
    ( job ~ GetModelByTableName (GetTableName job)
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , SetField "runAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , ?modelContext :: ModelContext
    , ?context :: context
    , HasField "logger" context Log.Logger
    ) => job -> IO ()
jobDidTimeout :: forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 SetField "runAt" job UTCTime, HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> IO ()
jobDidTimeout job
job = do
    now <- IO UTCTime
getCurrentTime

    Log.warn ("Job timed out" :: Text)

    let ?job = job
    let canRetry = job
job.attemptsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
    let status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusTimedOut
    let nextRunAt = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (forall job. Job job => BackoffStrategy
backoffStrategy @job) job
job.attemptsCount) UTCTime
now
    job
        |> set #status status
        |> set #lockedBy Nothing
        |> set #updatedAt now
        |> setJust #lastError "Timeout reached"
        |> (if canRetry then set #runAt nextRunAt else id)
        |> updateRecord

    pure ()


-- | Called when a job succeeded. Sets the job status to 'JobStatusSucceded' and resets 'lockedBy'
jobDidSucceed :: forall job context.
    ( job ~ GetModelByTableName (GetTableName job)
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , ?modelContext :: ModelContext
    , ?context :: context
    , HasField "logger" context Log.Logger
    ) => job -> IO ()
jobDidSucceed :: forall job context.
(job ~ GetModelByTableName (GetTableName job),
 SetField "lockedBy" job (Maybe UUID),
 SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
 HasField "attemptsCount" job Int,
 SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
 Show job, ?modelContext::ModelContext, ?context::context,
 HasField "logger" context Logger) =>
job -> IO ()
jobDidSucceed job
job = do
    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Succeeded job" :: Text)
    updatedAt <- IO UTCTime
getCurrentTime
    job
        |> set #status JobStatusSucceeded
        |> set #lockedBy Nothing
        |> set #updatedAt updatedAt
        |> updateRecord

    pure ()

-- | Compute the delay before the next retry attempt.
--
-- For 'LinearBackoff', the delay is constant.
-- For 'ExponentialBackoff', the delay doubles each attempt, capped at 24 hours.
backoffDelay :: BackoffStrategy -> Int -> NominalDiffTime
backoffDelay :: BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (LinearBackoff {Int
delayInSeconds :: Int
delayInSeconds :: BackoffStrategy -> Int
delayInSeconds}) Int
_ = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delayInSeconds
backoffDelay (ExponentialBackoff {Int
delayInSeconds :: BackoffStrategy -> Int
delayInSeconds :: Int
delayInSeconds}) Int
attempts =
    NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Ord a => a -> a -> a
min NominalDiffTime
86400 (Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delayInSeconds NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* (NominalDiffTime
2 NominalDiffTime -> Int -> NominalDiffTime
forall a b. (Num a, Integral b) => a -> b -> a
^ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
attempts Int
20))

-- | Recover stale jobs that have been in 'JobStatusRunning' for too long,
-- likely due to a worker crash.
--
-- Two-tier recovery:
-- - Recently stale jobs (within 24h) are set back to retry
-- - Ancient stale jobs (older than 24h) are marked as failed
recoverStaleJobs :: forall job.
    ( ?modelContext :: ModelContext
    , Table job
    ) => NominalDiffTime -> IO ()
recoverStaleJobs :: forall job.
(?modelContext::ModelContext, Table job) =>
NominalDiffTime -> IO ()
recoverStaleJobs NominalDiffTime
staleThreshold = do
    let tableNameText :: Text
tableNameText = forall record. Table record => Text
tableName @job
    -- Tier 1: Recently stale jobs (threshold..24h) → retry
    let retrySnippet :: Snippet
retrySnippet =
            Text -> Snippet
Snippet.sql Text
"UPDATE " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
tableNameText
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" SET status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusRetry
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
", locked_by = NULL, locked_at = NULL, run_at = NOW()"
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" WHERE status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusRunning
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" AND locked_at < NOW() - interval '1 second' * " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Int -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param (NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round NominalDiffTime
staleThreshold :: Int)
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" AND locked_at > NOW() - interval '1 day'"

    -- Tier 2: Ancient stale jobs (>24h) → mark failed
    let failSnippet :: Snippet
failSnippet =
            Text -> Snippet
Snippet.sql Text
"UPDATE " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
tableNameText
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" SET status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusFailed
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
", locked_by = NULL, locked_at = NULL"
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
", last_error = 'Stale job: worker likely crashed'"
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" WHERE status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusRunning
            Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" AND locked_at < NOW() - interval '1 day'"

    pool <- IO Pool
(?modelContext::ModelContext) => IO Pool
getHasqlPool
    withoutQueryLogging (sqlExecHasql pool retrySnippet)
    withoutQueryLogging (sqlExecHasql pool failSnippet)

-- | Mapping for @JOB_STATUS@:
--
-- > CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');
--
-- These instances are needed by the generated @FromRow@ instances in user apps
-- (see 'compileFromRowInstance' in "IHP.SchemaCompiler").
instance PG.FromField JobStatus where
    fromField :: FieldParser JobStatus
fromField Field
field (Just ByteString
"job_status_not_started") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusNotStarted
    fromField Field
field (Just ByteString
"job_status_running") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusRunning
    fromField Field
field (Just ByteString
"job_status_failed") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusFailed
    fromField Field
field (Just ByteString
"job_status_timed_out") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusTimedOut
    fromField Field
field (Just ByteString
"job_status_succeeded") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusSucceeded
    fromField Field
field (Just ByteString
"job_status_retry") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusRetry
    fromField Field
field (Just ByteString
value) = (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion JobStatus
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
PG.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
PG.ConversionFailed Field
field (String
"Unexpected value for enum value. Got: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a b. ConvertibleStrings a b => a -> b
cs ByteString
value)
    fromField Field
field Maybe ByteString
Nothing = (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion JobStatus
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
PG.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
PG.UnexpectedNull Field
field String
"Unexpected null for enum value"

-- The default state is @not started@
instance Default JobStatus where
    def :: JobStatus
def = JobStatus
JobStatusNotStarted

-- | See 'FromField' instance above.
instance PG.ToField JobStatus where
    toField :: JobStatus -> Action
toField JobStatus
JobStatusNotStarted = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_not_started" :: Text)
    toField JobStatus
JobStatusRunning = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_running" :: Text)
    toField JobStatus
JobStatusFailed = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_failed" :: Text)
    toField JobStatus
JobStatusTimedOut = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_timed_out" :: Text)
    toField JobStatus
JobStatusSucceeded = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_succeeded" :: Text)
    toField JobStatus
JobStatusRetry = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_retry" :: Text)

instance InputValue JobStatus where
    inputValue :: JobStatus -> Text
inputValue JobStatus
JobStatusNotStarted = Text
"job_status_not_started" :: Text
    inputValue JobStatus
JobStatusRunning = Text
"job_status_running" :: Text
    inputValue JobStatus
JobStatusFailed = Text
"job_status_failed" :: Text
    inputValue JobStatus
JobStatusTimedOut = Text
"job_status_timed_out" :: Text
    inputValue JobStatus
JobStatusSucceeded = Text
"job_status_succeeded" :: Text
    inputValue JobStatus
JobStatusRetry = Text
"job_status_retry" :: Text

instance IHP.Controller.Param.ParamReader JobStatus where
    readParameter :: ByteString -> Either ByteString JobStatus
readParameter = ByteString -> Either ByteString JobStatus
forall parameter.
(Enum parameter, InputValue parameter) =>
ByteString -> Either ByteString parameter
IHP.Controller.Param.enumParamReader

-- | Parses a Text value to a JobStatus. Used by hasql decoders.
-- Uses HashMap for O(1) lookup.
textToEnumJobStatusMap :: HashMap.HashMap Text JobStatus
textToEnumJobStatusMap :: HashMap Text JobStatus
textToEnumJobStatusMap = [(Text, JobStatus)] -> HashMap Text JobStatus
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList
    [ (Text
"job_status_not_started", JobStatus
JobStatusNotStarted)
    , (Text
"job_status_running", JobStatus
JobStatusRunning)
    , (Text
"job_status_failed", JobStatus
JobStatusFailed)
    , (Text
"job_status_timed_out", JobStatus
JobStatusTimedOut)
    , (Text
"job_status_succeeded", JobStatus
JobStatusSucceeded)
    , (Text
"job_status_retry", JobStatus
JobStatusRetry)
    ]

textToEnumJobStatus :: Text -> Maybe JobStatus
textToEnumJobStatus :: Text -> Maybe JobStatus
textToEnumJobStatus Text
t = Text -> HashMap Text JobStatus -> Maybe JobStatus
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup Text
t HashMap Text JobStatus
textToEnumJobStatusMap

-- | DefaultParamEncoder for hasql queries using JobStatus in filterWhere
instance DefaultParamEncoder JobStatus where
    defaultParam :: NullableOrNot Value JobStatus
defaultParam = Value JobStatus -> NullableOrNot Value JobStatus
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable (Maybe Text -> Text -> (JobStatus -> Text) -> Value JobStatus
forall a. Maybe Text -> Text -> (a -> Text) -> Value a
Encoders.enum (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"public") Text
"job_status" JobStatus -> Text
forall a. InputValue a => a -> Text
inputValue)

getHasqlPool :: (?modelContext :: ModelContext) => IO HasqlPool.Pool
getHasqlPool :: (?modelContext::ModelContext) => IO Pool
getHasqlPool = Pool -> IO Pool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ?modelContext::ModelContext
ModelContext
?modelContext.hasqlPool

-- | Non-blocking write to a TBQueue. Returns True if the value was written,
-- False if the queue was full.
tryWriteTBQueue :: TBQueue a -> a -> STM Bool
tryWriteTBQueue :: forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue a
queue a
value = do
    full <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue a
queue
    if full
        then pure False
        else do
            writeTBQueue queue value
            pure True