{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Types
( Job (..)
, JobWorkerArgs (..)
, JobWorker (..)
, JobStatus (..)
, Worker (..)
, JobWorkerProcess (..)
, JobWorkerProcessMessage (..)
, BackoffStrategy (..)
)
where

import IHP.Prelude
import IHP.FrameworkConfig
import qualified IHP.PGListener as PGListener
import qualified Control.Concurrent as Concurrent

data BackoffStrategy
    = LinearBackoff { BackoffStrategy -> Int
delayInSeconds :: !Int }
    | ExponentialBackoff { delayInSeconds :: !Int }
    deriving (BackoffStrategy -> BackoffStrategy -> Bool
(BackoffStrategy -> BackoffStrategy -> Bool)
-> (BackoffStrategy -> BackoffStrategy -> Bool)
-> Eq BackoffStrategy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BackoffStrategy -> BackoffStrategy -> Bool
== :: BackoffStrategy -> BackoffStrategy -> Bool
$c/= :: BackoffStrategy -> BackoffStrategy -> Bool
/= :: BackoffStrategy -> BackoffStrategy -> Bool
Eq, Int -> BackoffStrategy -> ShowS
[BackoffStrategy] -> ShowS
BackoffStrategy -> String
(Int -> BackoffStrategy -> ShowS)
-> (BackoffStrategy -> String)
-> ([BackoffStrategy] -> ShowS)
-> Show BackoffStrategy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BackoffStrategy -> ShowS
showsPrec :: Int -> BackoffStrategy -> ShowS
$cshow :: BackoffStrategy -> String
show :: BackoffStrategy -> String
$cshowList :: [BackoffStrategy] -> ShowS
showList :: [BackoffStrategy] -> ShowS
Show)

class Job job where
    perform :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => job -> IO ()

    maxAttempts :: (?job :: job) => Int
    maxAttempts = Int
10

    timeoutInMicroseconds :: Maybe Int
    timeoutInMicroseconds = Maybe Int
forall a. Maybe a
Nothing

    -- | While jobs are typically fetch using pg_notiy, we have to poll the queue table
    -- periodically to catch jobs with a @run_at@ in the future
    --
    -- By default we only poll every minute
    queuePollInterval :: Int
    queuePollInterval = Int
60 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000

    -- | How many jobs of this type can be executed at the same time
    --
    -- This limit only applies to the running haskell process. If you run @N@ multiple
    -- independent processes of the job runner, the limit will be @N * maxConcurrency@
    maxConcurrency :: Int
    maxConcurrency = Int
16

    backoffStrategy :: BackoffStrategy
    backoffStrategy = LinearBackoff { delayInSeconds :: Int
delayInSeconds = Int
30 }

class Worker application where
    workers :: application -> [JobWorker]

data JobWorkerArgs = JobWorkerArgs
    { JobWorkerArgs -> UUID
workerId :: UUID
    , JobWorkerArgs -> ModelContext
modelContext :: ModelContext
    , JobWorkerArgs -> FrameworkConfig
frameworkConfig :: FrameworkConfig
    , JobWorkerArgs -> PGListener
pgListener :: PGListener.PGListener
    }

newtype JobWorker = JobWorker (JobWorkerArgs -> IO JobWorkerProcess)

-- | Mapping for @JOB_STATUS@. The DDL statement for this can be found in IHPSchema.sql:
--
-- > CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');
data JobStatus
    = JobStatusNotStarted
    | JobStatusRunning
    | JobStatusFailed
    | JobStatusTimedOut
    | JobStatusSucceeded
    | JobStatusRetry
    deriving (JobStatus -> JobStatus -> Bool
(JobStatus -> JobStatus -> Bool)
-> (JobStatus -> JobStatus -> Bool) -> Eq JobStatus
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: JobStatus -> JobStatus -> Bool
== :: JobStatus -> JobStatus -> Bool
$c/= :: JobStatus -> JobStatus -> Bool
/= :: JobStatus -> JobStatus -> Bool
Eq, Int -> JobStatus -> ShowS
[JobStatus] -> ShowS
JobStatus -> String
(Int -> JobStatus -> ShowS)
-> (JobStatus -> String)
-> ([JobStatus] -> ShowS)
-> Show JobStatus
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> JobStatus -> ShowS
showsPrec :: Int -> JobStatus -> ShowS
$cshow :: JobStatus -> String
show :: JobStatus -> String
$cshowList :: [JobStatus] -> ShowS
showList :: [JobStatus] -> ShowS
Show, ReadPrec [JobStatus]
ReadPrec JobStatus
Int -> ReadS JobStatus
ReadS [JobStatus]
(Int -> ReadS JobStatus)
-> ReadS [JobStatus]
-> ReadPrec JobStatus
-> ReadPrec [JobStatus]
-> Read JobStatus
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS JobStatus
readsPrec :: Int -> ReadS JobStatus
$creadList :: ReadS [JobStatus]
readList :: ReadS [JobStatus]
$creadPrec :: ReadPrec JobStatus
readPrec :: ReadPrec JobStatus
$creadListPrec :: ReadPrec [JobStatus]
readListPrec :: ReadPrec [JobStatus]
Read, Int -> JobStatus
JobStatus -> Int
JobStatus -> [JobStatus]
JobStatus -> JobStatus
JobStatus -> JobStatus -> [JobStatus]
JobStatus -> JobStatus -> JobStatus -> [JobStatus]
(JobStatus -> JobStatus)
-> (JobStatus -> JobStatus)
-> (Int -> JobStatus)
-> (JobStatus -> Int)
-> (JobStatus -> [JobStatus])
-> (JobStatus -> JobStatus -> [JobStatus])
-> (JobStatus -> JobStatus -> [JobStatus])
-> (JobStatus -> JobStatus -> JobStatus -> [JobStatus])
-> Enum JobStatus
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: JobStatus -> JobStatus
succ :: JobStatus -> JobStatus
$cpred :: JobStatus -> JobStatus
pred :: JobStatus -> JobStatus
$ctoEnum :: Int -> JobStatus
toEnum :: Int -> JobStatus
$cfromEnum :: JobStatus -> Int
fromEnum :: JobStatus -> Int
$cenumFrom :: JobStatus -> [JobStatus]
enumFrom :: JobStatus -> [JobStatus]
$cenumFromThen :: JobStatus -> JobStatus -> [JobStatus]
enumFromThen :: JobStatus -> JobStatus -> [JobStatus]
$cenumFromTo :: JobStatus -> JobStatus -> [JobStatus]
enumFromTo :: JobStatus -> JobStatus -> [JobStatus]
$cenumFromThenTo :: JobStatus -> JobStatus -> JobStatus -> [JobStatus]
enumFromThenTo :: JobStatus -> JobStatus -> JobStatus -> [JobStatus]
Enum)

data JobWorkerProcess
    = JobWorkerProcess
    { JobWorkerProcess -> [Async ()]
runners :: [Async ()]
    , JobWorkerProcess -> Subscription
subscription :: PGListener.Subscription
    , JobWorkerProcess -> Async ()
poller :: Async ()
    , JobWorkerProcess -> MVar JobWorkerProcessMessage
action :: Concurrent.MVar JobWorkerProcessMessage
    }

data JobWorkerProcessMessage
    = JobAvailable
    | Stop