{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Queue.Result
( jobDidFail
, jobDidTimeout
, jobDidSucceed
, backoffDelay
, recoverStaleJobs
) where
import IHP.Prelude
import IHP.Job.Types
import IHP.Job.Queue.Pool (runPool)
import IHP.Job.Queue.StatusInstances ()
import IHP.ModelSupport (Table (..), InputValue (..))
import IHP.ModelSupport.Types (Id' (..), PrimaryKey)
import qualified IHP.Log as Log
import qualified Hasql.Pool as HasqlPool
import qualified Hasql.Session as HasqlSession
import qualified Hasql.Statement as Hasql
import qualified Hasql.Encoders as Encoders
import qualified Hasql.Decoders as Decoders
import Data.Functor.Contravariant (contramap)
jobDidFail :: forall job context.
( Table job
, HasField "id" job (Id' (GetTableName job))
, PrimaryKey (GetTableName job) ~ UUID
, HasField "attemptsCount" job Int
, HasField "runAt" job UTCTime
, Job job
, ?context :: context
, HasField "logger" context Log.Logger
) => HasqlPool.Pool -> job -> SomeException -> IO ()
jobDidFail :: forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
PrimaryKey (GetTableName job) ~ UUID,
HasField "attemptsCount" job Int, HasField "runAt" job UTCTime,
Job job, ?context::context, HasField "logger" context Logger) =>
Pool -> job -> SomeException -> IO ()
jobDidFail Pool
pool job
job SomeException
exception = do
now <- IO UTCTime
getCurrentTime
Log.warn ("Failed job with exception: " <> tshow exception)
let ?job = job
let canRetry = job
job.attemptsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
let status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusFailed
let nextRunAt = if Bool
canRetry
then NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (forall job. Job job => BackoffStrategy
backoffStrategy @job) job
job.attemptsCount) UTCTime
now
else job
job.runAt
let Id jobId = job.id
let tableNameText = forall record. Table record => Text
tableName @job
let sql = Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = $1::public.job_status, locked_by = NULL, updated_at = $2, last_error = $3, run_at = $4 WHERE id = $5"
let encoder =
((Text, UTCTime, Text, UTCTime, UUID) -> Text)
-> Params Text -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
s,UTCTime
_,Text
_,UTCTime
_,UUID
_) -> Text
s) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UTCTime)
-> Params UTCTime -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
u,Text
_,UTCTime
_,UUID
_) -> UTCTime
u) (NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UTCTime
Encoders.timestamptz))
Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> Text)
-> Params Text -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
e,UTCTime
_,UUID
_) -> Text
e) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UTCTime)
-> Params UTCTime -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
_,UTCTime
r,UUID
_) -> UTCTime
r) (NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UTCTime
Encoders.timestamptz))
Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UUID)
-> Params UUID -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
_,UTCTime
_,UUID
i) -> UUID
i) (NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UUID
Encoders.uuid))
let statement = Text
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Result ()
-> Statement (Text, UTCTime, Text, UTCTime, UUID) ()
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params (Text, UTCTime, Text, UTCTime, UUID)
encoder Result ()
Decoders.noResult
runPool pool (HasqlSession.statement (inputValue status, now, tshow exception, nextRunAt, jobId) statement)
jobDidTimeout :: forall job context.
( Table job
, HasField "id" job (Id' (GetTableName job))
, PrimaryKey (GetTableName job) ~ UUID
, HasField "attemptsCount" job Int
, HasField "runAt" job UTCTime
, Job job
, ?context :: context
, HasField "logger" context Log.Logger
) => HasqlPool.Pool -> job -> IO ()
jobDidTimeout :: forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
PrimaryKey (GetTableName job) ~ UUID,
HasField "attemptsCount" job Int, HasField "runAt" job UTCTime,
Job job, ?context::context, HasField "logger" context Logger) =>
Pool -> job -> IO ()
jobDidTimeout Pool
pool job
job = do
now <- IO UTCTime
getCurrentTime
Log.warn ("Job timed out" :: Text)
let ?job = job
let canRetry = job
job.attemptsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
let status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusTimedOut
let nextRunAt = if Bool
canRetry
then NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (forall job. Job job => BackoffStrategy
backoffStrategy @job) job
job.attemptsCount) UTCTime
now
else job
job.runAt
let Id jobId = job.id
let tableNameText = forall record. Table record => Text
tableName @job
let sql = Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = $1::public.job_status, locked_by = NULL, updated_at = $2, last_error = $3, run_at = $4 WHERE id = $5"
let encoder =
((Text, UTCTime, Text, UTCTime, UUID) -> Text)
-> Params Text -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
s,UTCTime
_,Text
_,UTCTime
_,UUID
_) -> Text
s) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UTCTime)
-> Params UTCTime -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
u,Text
_,UTCTime
_,UUID
_) -> UTCTime
u) (NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UTCTime
Encoders.timestamptz))
Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> Text)
-> Params Text -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
e,UTCTime
_,UUID
_) -> Text
e) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UTCTime)
-> Params UTCTime -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
_,UTCTime
r,UUID
_) -> UTCTime
r) (NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UTCTime
Encoders.timestamptz))
Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((Text, UTCTime, Text, UTCTime, UUID) -> UUID)
-> Params UUID -> Params (Text, UTCTime, Text, UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_,UTCTime
_,Text
_,UTCTime
_,UUID
i) -> UUID
i) (NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UUID
Encoders.uuid))
let statement = Text
-> Params (Text, UTCTime, Text, UTCTime, UUID)
-> Result ()
-> Statement (Text, UTCTime, Text, UTCTime, UUID) ()
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params (Text, UTCTime, Text, UTCTime, UUID)
encoder Result ()
Decoders.noResult
runPool pool (HasqlSession.statement (inputValue status, now, "Timeout reached" :: Text, nextRunAt, jobId) statement)
jobDidSucceed :: forall job context.
( Table job
, HasField "id" job (Id' (GetTableName job))
, PrimaryKey (GetTableName job) ~ UUID
, ?context :: context
, HasField "logger" context Log.Logger
) => HasqlPool.Pool -> job -> IO ()
jobDidSucceed :: forall job context.
(Table job, HasField "id" job (Id' (GetTableName job)),
PrimaryKey (GetTableName job) ~ UUID, ?context::context,
HasField "logger" context Logger) =>
Pool -> job -> IO ()
jobDidSucceed Pool
pool job
job = do
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Succeeded job" :: Text)
updatedAt <- IO UTCTime
getCurrentTime
let Id jobId = job.id
let tableNameText = forall record. Table record => Text
tableName @job
let sql = Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = 'job_status_succeeded', locked_by = NULL, updated_at = $1 WHERE id = $2"
let encoder =
((UTCTime, UUID) -> UTCTime)
-> Params UTCTime -> Params (UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (UTCTime, UUID) -> UTCTime
forall a b. (a, b) -> a
fst (NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UTCTime
Encoders.timestamptz))
Params (UTCTime, UUID)
-> Params (UTCTime, UUID) -> Params (UTCTime, UUID)
forall a. Semigroup a => a -> a -> a
<> ((UTCTime, UUID) -> UUID) -> Params UUID -> Params (UTCTime, UUID)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (UTCTime, UUID) -> UUID
forall a b. (a, b) -> b
snd (NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UUID
Encoders.uuid))
let statement = Text
-> Params (UTCTime, UUID)
-> Result ()
-> Statement (UTCTime, UUID) ()
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params (UTCTime, UUID)
encoder Result ()
Decoders.noResult
runPool pool (HasqlSession.statement (updatedAt, jobId) statement)
backoffDelay :: BackoffStrategy -> Int -> NominalDiffTime
backoffDelay :: BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (LinearBackoff { Int
delayInSeconds :: Int
delayInSeconds :: BackoffStrategy -> Int
delayInSeconds }) Int
_ = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delayInSeconds
backoffDelay (ExponentialBackoff { Int
delayInSeconds :: BackoffStrategy -> Int
delayInSeconds :: Int
delayInSeconds }) Int
attempts =
NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Ord a => a -> a -> a
min NominalDiffTime
86400 (Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delayInSeconds NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* (NominalDiffTime
2 NominalDiffTime -> Int -> NominalDiffTime
forall a b. (Num a, Integral b) => a -> b -> a
^ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
attempts Int
20))
recoverStaleJobs :: forall job.
( Table job
) => HasqlPool.Pool -> NominalDiffTime -> IO ()
recoverStaleJobs :: forall job. Table job => Pool -> NominalDiffTime -> IO ()
recoverStaleJobs Pool
pool NominalDiffTime
staleThreshold = do
let tableNameText :: Text
tableNameText = forall record. Table record => Text
tableName @job
let retrySql :: Text
retrySql =
Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = 'job_status_retry', locked_by = NULL, locked_at = NULL, run_at = NOW()"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE status = 'job_status_running'"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND locked_at < NOW() - interval '1 second' * $1"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND locked_at > NOW() - interval '1 day'"
let retryEncoder :: Params Int
retryEncoder = NullableOrNot Value Int -> Params Int
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Int -> NullableOrNot Value Int
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable ((Int -> Int64) -> Value Int64 -> Value Int
forall a' a. (a' -> a) -> Value a -> Value a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral :: Int -> Int64) Value Int64
Encoders.int8))
let retryStatement :: Statement Int ()
retryStatement = Text -> Params Int -> Result () -> Statement Int ()
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
retrySql Params Int
retryEncoder Result ()
Decoders.noResult
let failSql :: Text
failSql =
Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = 'job_status_failed', locked_by = NULL, locked_at = NULL"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", last_error = 'Stale job: worker likely crashed'"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE status = 'job_status_running'"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND locked_at < NOW() - interval '1 day'"
let failStatement :: Statement () ()
failStatement = Text -> Params () -> Result () -> Statement () ()
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
failSql Params ()
Encoders.noParams Result ()
Decoders.noResult
let thresholdSeconds :: Int
thresholdSeconds = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round NominalDiffTime
staleThreshold :: Int
Pool -> Session () -> IO ()
forall a. Pool -> Session a -> IO a
runPool Pool
pool (Int -> Statement Int () -> Session ()
forall params result.
params -> Statement params result -> Session result
HasqlSession.statement Int
thresholdSeconds Statement Int ()
retryStatement)
Pool -> Session () -> IO ()
forall a. Pool -> Session a -> IO a
runPool Pool
pool (() -> Statement () () -> Session ()
forall params result.
params -> Statement params result -> Session result
HasqlSession.statement () Statement () ()
failStatement)