module IHP.Job.Queue where
import IHP.Prelude
import IHP.Job.Types
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.FromField as PG
import qualified Database.PostgreSQL.Simple.ToField as PG
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent as Concurrent
import IHP.ModelSupport
import IHP.QueryBuilder
import IHP.Fetch
import IHP.Controller.Param
import qualified System.Random as Random
import qualified IHP.PGListener as PGListener
import qualified IHP.Log as Log
fetchNextJob :: forall job.
( ?modelContext :: ModelContext
, job ~ GetModelByTableName (GetTableName job)
, FilterPrimaryKey (GetTableName job)
, FromRow job
, Show (PrimaryKey (GetTableName job))
, PG.FromField (PrimaryKey (GetTableName job))
, Table job
) => Maybe Int -> BackoffStrategy -> UUID -> IO (Maybe job)
fetchNextJob :: forall job.
(?modelContext::ModelContext,
job ~ GetModelByTableName (GetTableName job),
FilterPrimaryKey (GetTableName job), FromRow job,
Show (PrimaryKey (GetTableName job)),
FromField (PrimaryKey (GetTableName job)), Table job) =>
Maybe Int -> BackoffStrategy -> UUID -> IO (Maybe job)
fetchNextJob Maybe Int
timeoutInMicroseconds BackoffStrategy
backoffStrategy UUID
workerId = do
let query :: Query
query = ByteString -> Query
PG.Query (ByteString
"UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE (((status = ?) OR (status = ? AND " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> BackoffStrategy -> ByteString
retryQuery BackoffStrategy
backoffStrategy ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
")) AND locked_by IS NULL AND run_at <= NOW()) " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Maybe Int -> ByteString
timeoutCondition Maybe Int
timeoutInMicroseconds ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id")
let params :: (Identifier, JobStatus, UUID, Identifier, JobStatus, JobStatus,
Int, Maybe Int)
params = (Text -> Identifier
PG.Identifier (forall record. Table record => Text
tableName @job), JobStatus
JobStatusRunning, UUID
workerId, Text -> Identifier
PG.Identifier (forall record. Table record => Text
tableName @job), JobStatus
JobStatusNotStarted, JobStatus
JobStatusRetry, BackoffStrategy
backoffStrategy.delayInSeconds, Maybe Int
timeoutInMicroseconds)
[Only (Id' (GetTableName job))]
result :: [PG.Only (Id job)] <- Query
-> (Identifier, JobStatus, UUID, Identifier, JobStatus, JobStatus,
Int, Maybe Int)
-> IO [Only (Id' (GetTableName job))]
forall q r.
(?modelContext::ModelContext, ToRow q, FromRow r) =>
Query -> q -> IO [r]
sqlQuery Query
query (Identifier, JobStatus, UUID, Identifier, JobStatus, JobStatus,
Int, Maybe Int)
params
case [Only (Id' (GetTableName job))]
result of
[] -> Maybe job -> IO (Maybe job)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe job
forall a. Maybe a
Nothing
[PG.Only Id' (GetTableName job)
id] -> job -> Maybe job
forall a. a -> Maybe a
Just (job -> Maybe job) -> IO job -> IO (Maybe job)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Id' (GetTableName job)
-> IO (FetchResult (Id' (GetTableName job)) job)
forall fetchable model.
(Fetchable fetchable model, Table model, FromRow model,
?modelContext::ModelContext) =>
fetchable -> IO (FetchResult fetchable model)
fetch Id' (GetTableName job)
id
[Only (Id' (GetTableName job))]
otherwise -> Text -> IO (Maybe job)
forall a. Text -> a
error ([Only (Id' (GetTableName job))] -> Text
forall a. Show a => a -> Text
show [Only (Id' (GetTableName job))]
otherwise)
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ())
watchForJob :: (?modelContext::ModelContext) =>
PGListener
-> Text
-> Int
-> Maybe Int
-> BackoffStrategy
-> MVar JobWorkerProcessMessage
-> IO (Subscription, Async ())
watchForJob PGListener
pgListener Text
tableName Int
pollInterval Maybe Int
timeoutInMicroseconds BackoffStrategy
backoffStrategy MVar JobWorkerProcessMessage
onNewJob = do
let tableNameBS :: ByteString
tableNameBS = Text -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs Text
tableName
Query -> () -> IO Int64
forall q.
(?modelContext::ModelContext, ToRow q) =>
Query -> q -> IO Int64
sqlExec (ByteString -> Query
createNotificationTrigger ByteString
tableNameBS) ()
Async ()
poller <- (?modelContext::ModelContext) =>
Text
-> Int
-> Maybe Int
-> BackoffStrategy
-> MVar JobWorkerProcessMessage
-> IO (Async ())
Text
-> Int
-> Maybe Int
-> BackoffStrategy
-> MVar JobWorkerProcessMessage
-> IO (Async ())
pollForJob Text
tableName Int
pollInterval Maybe Int
timeoutInMicroseconds BackoffStrategy
backoffStrategy MVar JobWorkerProcessMessage
onNewJob
Subscription
subscription <- PGListener
pgListener PGListener -> (PGListener -> IO Subscription) -> IO Subscription
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> ByteString -> Callback -> PGListener -> IO Subscription
PGListener.subscribe (ByteString -> ByteString
channelName ByteString
tableNameBS) (IO () -> Callback
forall a b. a -> b -> a
const (MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
onNewJob JobWorkerProcessMessage
JobAvailable))
(Subscription, Async ()) -> IO (Subscription, Async ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Subscription
subscription, Async ()
poller)
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ())
pollForJob :: (?modelContext::ModelContext) =>
Text
-> Int
-> Maybe Int
-> BackoffStrategy
-> MVar JobWorkerProcessMessage
-> IO (Async ())
pollForJob Text
tableName Int
pollInterval Maybe Int
timeoutInMicroseconds BackoffStrategy
backoffStrategy MVar JobWorkerProcessMessage
onNewJob = do
let query :: Query
query = ByteString -> Query
PG.Query (ByteString
"SELECT COUNT(*) FROM ? WHERE (((status = ?) OR (status = ? AND " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> BackoffStrategy -> ByteString
retryQuery BackoffStrategy
backoffStrategy ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
")) AND locked_by IS NULL AND run_at <= NOW()) " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Maybe Int -> ByteString
timeoutCondition Maybe Int
timeoutInMicroseconds ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" LIMIT 1")
let params :: (Identifier, JobStatus, JobStatus, Int, Maybe Int)
params = (Text -> Identifier
PG.Identifier Text
tableName, JobStatus
JobStatusNotStarted, JobStatus
JobStatusRetry, BackoffStrategy
backoffStrategy.delayInSeconds, Maybe Int
timeoutInMicroseconds)
IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.asyncBound do
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
Int
count :: Int <- Query
-> (Identifier, JobStatus, JobStatus, Int, Maybe Int) -> IO Int
forall q value.
(?modelContext::ModelContext, ToRow q, FromField value) =>
Query -> q -> IO value
sqlQueryScalar Query
query (Identifier, JobStatus, JobStatus, Int, Maybe Int)
params
[Int] -> (Element [Int] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Int
1..Int
count] \Element [Int]
_ -> do
MVar JobWorkerProcessMessage -> JobWorkerProcessMessage -> IO ()
forall a. MVar a -> a -> IO ()
Concurrent.putMVar MVar JobWorkerProcessMessage
onNewJob JobWorkerProcessMessage
JobAvailable
Int
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
Random.randomRIO (Int
0, Int
2000000)
let pollIntervalWithJitter :: Int
pollIntervalWithJitter = Int
pollInterval Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
jitter
Int -> IO ()
Concurrent.threadDelay Int
pollIntervalWithJitter
createNotificationTrigger :: ByteString -> PG.Query
createNotificationTrigger :: ByteString -> Query
createNotificationTrigger ByteString
tableName = ByteString -> Query
PG.Query (ByteString -> Query) -> ByteString -> Query
forall a b. (a -> b) -> a -> b
$ ByteString
""
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"BEGIN;\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"CREATE OR REPLACE FUNCTION " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"() RETURNS TRIGGER AS $$"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"BEGIN\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" PERFORM pg_notify('" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> ByteString
channelName ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"', '');\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" RETURN new;"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"END;\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"$$ language plpgsql;"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"DROP TRIGGER IF EXISTS " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
insertTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"; CREATE TRIGGER " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
insertTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" AFTER INSERT ON \"" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"();\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"DROP TRIGGER IF EXISTS " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
updateTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"; CREATE TRIGGER " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
updateTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" AFTER UPDATE ON \"" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"();\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"COMMIT;"
where
functionName :: ByteString
functionName = ByteString
"notify_job_queued_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
insertTriggerName :: ByteString
insertTriggerName = ByteString
"did_insert_job_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
updateTriggerName :: ByteString
updateTriggerName = ByteString
"did_update_job_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
channelName :: ByteString -> ByteString
channelName :: ByteString -> ByteString
channelName ByteString
tableName = ByteString
"job_available_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
jobDidFail :: forall job context.
( job ~ GetModelByTableName (GetTableName job)
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, ?modelContext :: ModelContext
, ?context :: context
, HasField "logger" context Log.Logger
) => job -> SomeException -> IO ()
jobDidFail :: forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> SomeException -> IO ()
jobDidFail job
job SomeException
exception = do
UTCTime
updatedAt <- IO UTCTime
getCurrentTime
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.warn (Text
"Failed job with exception: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception)
let ?job = job
?job::job
job
let canRetry :: Bool
canRetry = job
job.attemptsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
let status :: JobStatus
status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusFailed
job
job
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "status" -> JobStatus -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set Proxy "status"
#status JobStatus
status
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "lockedBy" -> Maybe UUID -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set Proxy "lockedBy"
#lockedBy Maybe UUID
forall a. Maybe a
Nothing
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "updatedAt" -> UTCTime -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set Proxy "updatedAt"
#updatedAt UTCTime
updatedAt
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "lastError" -> Maybe Text -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set Proxy "lastError"
#lastError (Text -> Maybe Text
forall a. a -> Maybe a
Just (SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception))
job -> (job -> IO job) -> IO job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> job -> IO job
forall a. (CanUpdate a, ?modelContext::ModelContext) => a -> IO a
updateRecord
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
jobDidTimeout :: forall job context.
( job ~ GetModelByTableName (GetTableName job)
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, ?modelContext :: ModelContext
, ?context :: context
, HasField "logger" context Log.Logger
) => job -> IO ()
jobDidTimeout :: forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> IO ()
jobDidTimeout job
job = do
UTCTime
updatedAt <- IO UTCTime
getCurrentTime
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.warn (Text
"Job timed out" :: Text)
let ?job = job
?job::job
job
let canRetry :: Bool
canRetry = job
job.attemptsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
let status :: JobStatus
status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusTimedOut
job
job
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "status" -> JobStatus -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set Proxy "status"
#status JobStatus
status
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "lockedBy" -> Maybe UUID -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set Proxy "lockedBy"
#lockedBy Maybe UUID
forall a. Maybe a
Nothing
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "updatedAt" -> UTCTime -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set Proxy "updatedAt"
#updatedAt UTCTime
updatedAt
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "lastError" -> Text -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model (Maybe value)) =>
Proxy name -> value -> model -> model
setJust Proxy "lastError"
#lastError Text
"Timeout reached"
job -> (job -> IO job) -> IO job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> job -> IO job
forall a. (CanUpdate a, ?modelContext::ModelContext) => a -> IO a
updateRecord
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
jobDidSucceed :: forall job context.
( job ~ GetModelByTableName (GetTableName job)
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, ?modelContext :: ModelContext
, ?context :: context
, HasField "logger" context Log.Logger
) => job -> IO ()
jobDidSucceed :: forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> IO ()
jobDidSucceed job
job = do
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Succeeded job" :: Text)
UTCTime
updatedAt <- IO UTCTime
getCurrentTime
job
job
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "status" -> JobStatus -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set Proxy "status"
#status JobStatus
JobStatusSucceeded
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "lockedBy" -> Maybe UUID -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set Proxy "lockedBy"
#lockedBy Maybe UUID
forall a. Maybe a
Nothing
job -> (job -> job) -> job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "updatedAt" -> UTCTime -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set Proxy "updatedAt"
#updatedAt UTCTime
updatedAt
job -> (job -> IO job) -> IO job
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> job -> IO job
forall a. (CanUpdate a, ?modelContext::ModelContext) => a -> IO a
updateRecord
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
instance PG.FromField JobStatus where
fromField :: FieldParser JobStatus
fromField Field
field (Just ByteString
"job_status_not_started") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusNotStarted
fromField Field
field (Just ByteString
"job_status_running") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusRunning
fromField Field
field (Just ByteString
"job_status_failed") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusFailed
fromField Field
field (Just ByteString
"job_status_timed_out") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusTimedOut
fromField Field
field (Just ByteString
"job_status_succeeded") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusSucceeded
fromField Field
field (Just ByteString
"job_status_retry") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusRetry
fromField Field
field (Just ByteString
value) = (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion JobStatus
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
PG.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
PG.ConversionFailed Field
field (String
"Unexpected value for enum value. Got: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a b. ConvertibleStrings a b => a -> b
cs ByteString
value)
fromField Field
field Maybe ByteString
Nothing = (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion JobStatus
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
PG.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
PG.UnexpectedNull Field
field String
"Unexpected null for enum value"
instance Default JobStatus where
def :: JobStatus
def = JobStatus
JobStatusNotStarted
instance PG.ToField JobStatus where
toField :: JobStatus -> Action
toField JobStatus
JobStatusNotStarted = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_not_started" :: Text)
toField JobStatus
JobStatusRunning = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_running" :: Text)
toField JobStatus
JobStatusFailed = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_failed" :: Text)
toField JobStatus
JobStatusTimedOut = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_timed_out" :: Text)
toField JobStatus
JobStatusSucceeded = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_succeeded" :: Text)
toField JobStatus
JobStatusRetry = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_retry" :: Text)
instance InputValue JobStatus where
inputValue :: JobStatus -> Text
inputValue JobStatus
JobStatusNotStarted = Text
"job_status_not_started" :: Text
inputValue JobStatus
JobStatusRunning = Text
"job_status_running" :: Text
inputValue JobStatus
JobStatusFailed = Text
"job_status_failed" :: Text
inputValue JobStatus
JobStatusTimedOut = Text
"job_status_timed_out" :: Text
inputValue JobStatus
JobStatusSucceeded = Text
"job_status_succeeded" :: Text
inputValue JobStatus
JobStatusRetry = Text
"job_status_retry" :: Text
instance IHP.Controller.Param.ParamReader JobStatus where
readParameter :: ByteString -> Either ByteString JobStatus
readParameter = ByteString -> Either ByteString JobStatus
forall parameter.
(Enum parameter, InputValue parameter) =>
ByteString -> Either ByteString parameter
IHP.Controller.Param.enumParamReader
retryQuery :: BackoffStrategy -> ByteString
retryQuery :: BackoffStrategy -> ByteString
retryQuery LinearBackoff {} = ByteString
"updated_at < NOW() + (interval '1 second' * ?)"
retryQuery ExponentialBackoff {} = ByteString
"updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count)"
timeoutCondition :: Maybe Int -> ByteString
timeoutCondition :: Maybe Int -> ByteString
timeoutCondition (Just Int
timeoutInMicroseconds) = ByteString
"OR (status = 'job_status_running' AND locked_by IS NOT NULL AND locked_at + ((? + 1000000) || 'microseconds')::interval < NOW())"
timeoutCondition Maybe Int
Nothing = ByteString
"AND (? IS NULL)"