{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Queue.Result
( jobDidFail
, jobDidTimeout
, jobDidSucceed
, backoffDelay
, recoverStaleJobs
) where

import IHP.Prelude
import IHP.Job.Types
import IHP.Job.Queue.Pool (runPool)
import IHP.Job.Queue.StatusInstances ()
import IHP.ModelSupport (Table (..), InputValue (..))
import IHP.ModelSupport.Types (Id' (..), PrimaryKey)
import qualified IHP.Log as Log
import qualified Hasql.Pool as HasqlPool
import qualified Hasql.Session as HasqlSession
import qualified Hasql.Statement as Hasql
import qualified Hasql.Encoders as Encoders
import qualified Hasql.Decoders as Decoders
import Data.Functor.Contravariant (contramap)

-- | Called when a job failed. Sets the job status to 'JobStatusFailed' or 'JobStatusRetry' (if more attempts are possible) and resets 'lockedBy'
jobDidFail :: forall job context.
    ( Table job
    , HasField "id" job (Id' (GetTableName job))
    , PrimaryKey (GetTableName job) ~ UUID
    , HasField "attemptsCount" job Int
    , HasField "runAt" job UTCTime
    , Job job
    , ?context :: context
    , HasField "logger" context Log.Logger
    ) => HasqlPool.Pool -> job -> SomeException -> IO ()
jobDidFail :: forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID,
 HasField "attemptsCount" job Int, HasField "runAt" job UTCTime,
 Job job, ?context::context, HasField "logger" context Logger) =>
Pool -> job -> SomeException -> IO ()
jobDidFail Pool
pool job
job SomeException
exception = do
    now <- IO UTCTime
getCurrentTime

    Log.warn ("Failed job with exception: " <> tshow exception)

    let ?job = job
    let canRetry = job
job.attemptsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
    let status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusFailed
    let nextRunAt = if Bool
canRetry
            then NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (forall job. Job job => BackoffStrategy
backoffStrategy @job) job
job.attemptsCount) UTCTime
now
            else job
job.runAt
    let Id jobId = job.id
    let tableNameText = forall record. Table record => Text
tableName @job
    let sql = Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = $1::public.job_status, locked_by = NULL, updated_at = $2, last_error = $3, run_at = $4 WHERE id = $5"
    let encoder =
            ((Text, UTCTime, Text, UTCTime, UUID) -> Text)
-> Params Text -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
s,UTCTime
_,Text
_,UTCTime
_,UUID
_) -> Text
s) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
            Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UTCTime)
-> Params UTCTime -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
u,Text
_,UTCTime
_,UUID
_) -> UTCTime
u) (NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UTCTime
Encoders.timestamptz))
            Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> Text)
-> Params Text -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
e,UTCTime
_,UUID
_) -> Text
e) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
            Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UTCTime)
-> Params UTCTime -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
_,UTCTime
r,UUID
_) -> UTCTime
r) (NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UTCTime
Encoders.timestamptz))
            Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UUID)
-> Params UUID -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
_,UTCTime
_,UUID
i) -> UUID
i) (NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UUID
Encoders.uuid))
    let statement = Text
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Result ()
-> Statement (Text, UTCTime, Text, UTCTime, UUID) ()
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params (Text, UTCTime, Text, UTCTime, UUID)
encoder Result ()
Decoders.noResult
    runPool pool (HasqlSession.statement (inputValue status, now, tshow exception, nextRunAt, jobId) statement)

jobDidTimeout :: forall job context.
    ( Table job
    , HasField "id" job (Id' (GetTableName job))
    , PrimaryKey (GetTableName job) ~ UUID
    , HasField "attemptsCount" job Int
    , HasField "runAt" job UTCTime
    , Job job
    , ?context :: context
    , HasField "logger" context Log.Logger
    ) => HasqlPool.Pool -> job -> IO ()
jobDidTimeout :: forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID,
 HasField "attemptsCount" job Int, HasField "runAt" job UTCTime,
 Job job, ?context::context, HasField "logger" context Logger) =>
Pool -> job -> IO ()
jobDidTimeout Pool
pool job
job = do
    now <- IO UTCTime
getCurrentTime

    Log.warn ("Job timed out" :: Text)

    let ?job = job
    let canRetry = job
job.attemptsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
    let status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusTimedOut
    let nextRunAt = if Bool
canRetry
            then NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (forall job. Job job => BackoffStrategy
backoffStrategy @job) job
job.attemptsCount) UTCTime
now
            else job
job.runAt
    let Id jobId = job.id
    let tableNameText = forall record. Table record => Text
tableName @job
    let sql = Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = $1::public.job_status, locked_by = NULL, updated_at = $2, last_error = $3, run_at = $4 WHERE id = $5"
    let encoder =
            ((Text, UTCTime, Text, UTCTime, UUID) -> Text)
-> Params Text -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
s,UTCTime
_,Text
_,UTCTime
_,UUID
_) -> Text
s) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
            Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UTCTime)
-> Params UTCTime -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
u,Text
_,UTCTime
_,UUID
_) -> UTCTime
u) (NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UTCTime
Encoders.timestamptz))
            Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> Text)
-> Params Text -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
e,UTCTime
_,UUID
_) -> Text
e) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
            Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UTCTime)
-> Params UTCTime -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
_,UTCTime
r,UUID
_) -> UTCTime
r) (NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UTCTime
Encoders.timestamptz))
            Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UUID)
-> Params UUID -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
_,UTCTime
_,UUID
i) -> UUID
i) (NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UUID
Encoders.uuid))
    let statement = Text
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Result ()
-> Statement (Text, UTCTime, Text, UTCTime, UUID) ()
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params (Text, UTCTime, Text, UTCTime, UUID)
encoder Result ()
Decoders.noResult
    runPool pool (HasqlSession.statement (inputValue status, now, "Timeout reached" :: Text, nextRunAt, jobId) statement)


-- | Called when a job succeeded. Sets the job status to 'JobStatusSucceded' and resets 'lockedBy'
jobDidSucceed :: forall job context.
    ( Table job
    , HasField "id" job (Id' (GetTableName job))
    , PrimaryKey (GetTableName job) ~ UUID
    , ?context :: context
    , HasField "logger" context Log.Logger
    ) => HasqlPool.Pool -> job -> IO ()
jobDidSucceed :: forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
 PrimaryKey (GetTableName job) ~ UUID, ?context::context,
 HasField "logger" context Logger) =>
Pool -> job -> IO ()
jobDidSucceed Pool
pool job
job = do
    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Succeeded job" :: Text)
    updatedAt <- IO UTCTime
getCurrentTime
    let Id jobId = job.id
    let tableNameText = forall record. Table record => Text
tableName @job
    let sql = Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = 'job_status_succeeded', locked_by = NULL, updated_at = $1 WHERE id = $2"
    let encoder =
            ((UTCTime, UUID) -> UTCTime)
-> Params UTCTime -> Params (UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (UTCTime, UUID) -> UTCTime
forall a b. (a, b) -> a
fst (NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UTCTime
Encoders.timestamptz))
            Params (UTCTime, UUID)
-> Params (UTCTime, UUID) -> Params (UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((UTCTime, UUID) -> UUID) -> Params UUID -> Params (UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (UTCTime, UUID) -> UUID
forall a b. (a, b) -> b
snd (NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UUID
Encoders.uuid))
    let statement = Text
-> Params (UTCTime, UUID)
-> Result ()
-> Statement (UTCTime, UUID) ()
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params (UTCTime, UUID)
encoder Result ()
Decoders.noResult
    runPool pool (HasqlSession.statement (updatedAt, jobId) statement)

-- | Compute the delay before the next retry attempt.
--
-- For 'LinearBackoff', the delay is constant.
-- For 'ExponentialBackoff', the delay doubles each attempt, capped at 24 hours.
backoffDelay :: BackoffStrategy -> Int -> NominalDiffTime
backoffDelay :: BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (LinearBackoff { Int
delayInSeconds :: Int
delayInSeconds :: BackoffStrategy -> Int
delayInSeconds }) Int
_ = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delayInSeconds
backoffDelay (ExponentialBackoff { Int
delayInSeconds :: BackoffStrategy -> Int
delayInSeconds :: Int
delayInSeconds }) Int
attempts =
    NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Ord a => a -> a -> a
min NominalDiffTime
86400 (Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delayInSeconds NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* (NominalDiffTime
2 NominalDiffTime -> Int -> NominalDiffTime
forall a b. (Num a, Integral b) => a -> b -> a
^ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
attempts Int
20))

-- | Recover stale jobs that have been in 'JobStatusRunning' for too long,
-- likely due to a worker crash.
--
-- Two-tier recovery:
-- - Recently stale jobs (within 24h) are set back to retry
-- - Ancient stale jobs (older than 24h) are marked as failed
recoverStaleJobs :: forall job.
    ( Table job
    ) => HasqlPool.Pool -> NominalDiffTime -> IO ()
recoverStaleJobs :: forall job. Table job => Pool -> NominalDiffTime -> IO ()
recoverStaleJobs Pool
pool NominalDiffTime
staleThreshold = do
    let tableNameText :: Text
tableNameText = forall record. Table record => Text
tableName @job
    -- Tier 1: Recently stale jobs (threshold..24h) -> retry
    let retrySql :: Text
retrySql =
            Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = 'job_status_retry', locked_by = NULL, locked_at = NULL, run_at = NOW()"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE status = 'job_status_running'"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND locked_at < NOW() - interval '1 second' * $1"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND locked_at > NOW() - interval '1 day'"
    let retryEncoder :: Params Int
retryEncoder = NullableOrNot Value Int -> Params Int
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Int -> NullableOrNot Value Int
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable ((Int -> Int64) -> Value Int64 -> Value Int
forall a' a. (a' -> a) -> Value a -> Value a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral :: Int -> Int64) Value Int64
Encoders.int8))
    let retryStatement :: Statement Int ()
retryStatement = Text -> Params Int -> Result () -> Statement Int ()
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
retrySql Params Int
retryEncoder Result ()
Decoders.noResult

    -- Tier 2: Ancient stale jobs (>24h) -> mark failed
    let failSql :: Text
failSql =
            Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = 'job_status_failed', locked_by = NULL, locked_at = NULL"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", last_error = 'Stale job: worker likely crashed'"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE status = 'job_status_running'"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND locked_at < NOW() - interval '1 day'"
    let failStatement :: Statement () ()
failStatement = Text -> Params () -> Result () -> Statement () ()
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
failSql Params ()
Encoders.noParams Result ()
Decoders.noResult

    let thresholdSeconds :: Int
thresholdSeconds = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round NominalDiffTime
staleThreshold :: Int
    Pool -> Session () -> IO ()
forall a. Pool -> Session a -> IO a
runPool Pool
pool (Int -> Statement Int () -> Session ()
forall params result.
params -> Statement params result -> Session result
HasqlSession.statement Int
thresholdSeconds Statement Int ()
retryStatement)
    Pool -> Session () -> IO ()
forall a. Pool -> Session a -> IO a
runPool Pool
pool (() -> Statement () () -> Session ()
forall params result.
params -> Statement params result -> Session result
HasqlSession.statement () Statement () ()
failStatement)