{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Queue where
import IHP.Prelude
import IHP.Job.Types
import qualified Database.PostgreSQL.Simple.FromField as PG
import qualified Database.PostgreSQL.Simple.ToField as PG
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent as Concurrent
import qualified Control.Exception.Safe as Exception
import IHP.ModelSupport
import IHP.Controller.Param
import qualified System.Random as Random
import qualified IHP.PGListener as PGListener
import qualified IHP.Log as Log
import Control.Monad.Trans.Resource
import IHP.Hasql.FromRow (FromRowHasql(..))
import qualified Hasql.Encoders as Encoders
import Hasql.Implicits.Encoders (DefaultParamEncoder(..))
import qualified Data.HashMap.Strict as HashMap
import qualified Hasql.Pool as HasqlPool
import qualified Hasql.Session as HasqlSession
import qualified Hasql.Decoders as Decoders
import qualified Hasql.DynamicStatements.Snippet as Snippet
import IHP.Hasql.Encoders ()
import qualified Data.Text as Text
import Control.Concurrent.STM (TBQueue, atomically, writeTBQueue, STM)
import Control.Concurrent.STM.TBQueue (isFullTBQueue)
fetchNextJob :: forall job.
( ?modelContext :: ModelContext
, job ~ GetModelByTableName (GetTableName job)
, FromRowHasql job
, Show (PrimaryKey (GetTableName job))
, Table job
) => UUID -> IO (Maybe job)
fetchNextJob :: forall job.
(?modelContext::ModelContext,
job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
Show (PrimaryKey (GetTableName job)), Table job) =>
UUID -> IO (Maybe job)
fetchNextJob UUID
workerId = do
let tableNameText :: Text
tableNameText = forall record. Table record => Text
tableName @job
let returningColumns :: Text
returningColumns = Text -> [Text] -> Text
Text.intercalate Text
", " (forall record. Table record => [Text]
columnNames @job)
let snippet :: Snippet
snippet =
Text -> Snippet
Snippet.sql Text
"UPDATE " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
tableNameText
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" SET status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusRunning
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
", locked_at = NOW(), locked_by = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> UUID -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param UUID
workerId
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
", attempts_count = attempts_count + 1"
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" WHERE id IN (SELECT id FROM " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
tableNameText
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" WHERE " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Snippet
pendingJobCondition
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" ORDER BY created_at LIMIT 1 FOR UPDATE SKIP LOCKED)"
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" RETURNING " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
returningColumns
let decoder :: Result (Maybe job)
decoder = Row job -> Result (Maybe job)
forall a. Row a -> Result (Maybe a)
Decoders.rowMaybe (forall a. FromRowHasql a => Row a
hasqlRowDecoder @job)
pool <- IO Pool
(?modelContext::ModelContext) => IO Pool
getHasqlPool
withoutQueryLogging (sqlQueryHasql pool snippet decoder)
pendingJobCondition :: Snippet.Snippet
pendingJobCondition :: Snippet
pendingJobCondition =
Text -> Snippet
Snippet.sql Text
"(status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusNotStarted
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" OR status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusRetry
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
") AND locked_by IS NULL AND run_at <= NOW()"
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO (PGListener.Subscription, ReleaseKey)
watchForJob :: (?modelContext::ModelContext) =>
PGListener
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
watchForJob PGListener
pgListener Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob = do
let tableNameBS :: ByteString
tableNameBS = Text -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs Text
tableName
IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
pool <- IO Pool
(?modelContext::ModelContext) => IO Pool
getHasqlPool
withoutQueryLogging (runSessionHasql pool (HasqlSession.script (createNotificationTriggerSQL tableNameBS)))
poller <- (?modelContext::ModelContext) =>
Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
pollForJob Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob
subscription <- liftIO $ pgListener |> PGListener.subscribe (channelName tableNameBS) (const (do _ <- atomically $ tryWriteTBQueue onNewJob JobAvailable; pure ()))
pure (subscription, poller)
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO ReleaseKey
pollForJob :: (?modelContext::ModelContext) =>
Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
pollForJob Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob = do
let snippet :: Snippet
snippet =
Text -> Snippet
Snippet.sql Text
"SELECT COUNT(*) FROM " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
tableName
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" WHERE " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Snippet
pendingJobCondition
let decoder :: Result Int64
decoder = Row Int64 -> Result Int64
forall a. Row a -> Result a
Decoders.singleRow (NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
Decoders.column (Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
Decoders.nonNullable Value Int64
Decoders.int8))
let handler :: IO (ZonkAny 0)
handler = do
let ?context = ?context::ModelContext
?modelContext::ModelContext
ModelContext
?modelContext
pool <- IO Pool
(?modelContext::ModelContext) => IO Pool
getHasqlPool
forever do
result <- Exception.tryAny do
count :: Int <- fromIntegral <$> withoutQueryLogging (sqlQueryHasql pool snippet decoder)
forEach [1..count] \Element [Int]
_ -> do
_ <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM Bool
forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue JobWorkerProcessMessage
onNewJob JobWorkerProcessMessage
JobAvailable
pure ()
case result of
Left SomeException
exception -> Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (Text
"Job poller: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception)
Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
jitter <- Random.randomRIO (0, 2000000)
let pollIntervalWithJitter = Int
pollInterval Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
jitter
Concurrent.threadDelay pollIntervalWithJitter
(ReleaseKey, Async (ZonkAny 0)) -> ReleaseKey
forall a b. (a, b) -> a
fst ((ReleaseKey, Async (ZonkAny 0)) -> ReleaseKey)
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
-> ResourceT IO ReleaseKey
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Async (ZonkAny 0))
-> (Async (ZonkAny 0) -> IO ())
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (IO (ZonkAny 0) -> IO (Async (ZonkAny 0))
forall a. IO a -> IO (Async a)
Async.async IO (ZonkAny 0)
handler) Async (ZonkAny 0) -> IO ()
forall a. Async a -> IO ()
Async.cancel
createNotificationTriggerSQL :: ByteString -> Text
createNotificationTriggerSQL :: ByteString -> Text
createNotificationTriggerSQL ByteString
tableName =
ByteString -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$
ByteString
"DO $$\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"BEGIN\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" CREATE OR REPLACE FUNCTION " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"() RETURNS TRIGGER AS $BODY$"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"BEGIN\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" PERFORM pg_notify('" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> ByteString
channelName ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"', '');\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" RETURN new;"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"END;\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"$BODY$ language plpgsql;\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" DROP TRIGGER IF EXISTS " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
insertTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
";\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" CREATE TRIGGER " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
insertTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" AFTER INSERT ON \"" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"();\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" DROP TRIGGER IF EXISTS " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
updateTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
";\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" CREATE TRIGGER " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
updateTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" AFTER UPDATE ON \"" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"();\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"EXCEPTION\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" WHEN SQLSTATE 'XX000' THEN null; -- 'tuple concurrently updated': another connection installed it first\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"END; $$"
where
functionName :: ByteString
functionName = ByteString
"notify_job_queued_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
insertTriggerName :: ByteString
insertTriggerName = ByteString
"did_insert_job_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
updateTriggerName :: ByteString
updateTriggerName = ByteString
"did_update_job_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
channelName :: ByteString -> ByteString
channelName :: ByteString -> ByteString
channelName ByteString
tableName = ByteString
"job_available_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
jobDidFail :: forall job context.
( job ~ GetModelByTableName (GetTableName job)
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, SetField "runAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, ?modelContext :: ModelContext
, ?context :: context
, HasField "logger" context Log.Logger
) => job -> SomeException -> IO ()
jobDidFail :: forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
SetField "runAt" job UTCTime, HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> SomeException -> IO ()
jobDidFail job
job SomeException
exception = do
now <- IO UTCTime
getCurrentTime
Log.warn ("Failed job with exception: " <> tshow exception)
let ?job = job
let canRetry = job
job.attemptsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
let status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusFailed
let nextRunAt = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (forall job. Job job => BackoffStrategy
backoffStrategy @job) job
job.attemptsCount) UTCTime
now
job
|> set #status status
|> set #lockedBy Nothing
|> set #updatedAt now
|> set #lastError (Just (tshow exception))
|> (if canRetry then set #runAt nextRunAt else id)
|> updateRecord
pure ()
jobDidTimeout :: forall job context.
( job ~ GetModelByTableName (GetTableName job)
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, SetField "runAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, ?modelContext :: ModelContext
, ?context :: context
, HasField "logger" context Log.Logger
) => job -> IO ()
jobDidTimeout :: forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
SetField "runAt" job UTCTime, HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> IO ()
jobDidTimeout job
job = do
now <- IO UTCTime
getCurrentTime
Log.warn ("Job timed out" :: Text)
let ?job = job
let canRetry = job
job.attemptsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
let status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusTimedOut
let nextRunAt = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (forall job. Job job => BackoffStrategy
backoffStrategy @job) job
job.attemptsCount) UTCTime
now
job
|> set #status status
|> set #lockedBy Nothing
|> set #updatedAt now
|> setJust #lastError "Timeout reached"
|> (if canRetry then set #runAt nextRunAt else id)
|> updateRecord
pure ()
jobDidSucceed :: forall job context.
( job ~ GetModelByTableName (GetTableName job)
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, ?modelContext :: ModelContext
, ?context :: context
, HasField "logger" context Log.Logger
) => job -> IO ()
jobDidSucceed :: forall job context.
(job ~ GetModelByTableName (GetTableName job),
SetField "lockedBy" job (Maybe UUID),
SetField "status" job JobStatus, SetField "updatedAt" job UTCTime,
HasField "attemptsCount" job Int,
SetField "lastError" job (Maybe Text), Job job, CanUpdate job,
Show job, ?modelContext::ModelContext, ?context::context,
HasField "logger" context Logger) =>
job -> IO ()
jobDidSucceed job
job = do
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Succeeded job" :: Text)
updatedAt <- IO UTCTime
getCurrentTime
job
|> set #status JobStatusSucceeded
|> set #lockedBy Nothing
|> set #updatedAt updatedAt
|> updateRecord
pure ()
backoffDelay :: BackoffStrategy -> Int -> NominalDiffTime
backoffDelay :: BackoffStrategy -> Int -> NominalDiffTime
backoffDelay (LinearBackoff {Int
delayInSeconds :: Int
delayInSeconds :: BackoffStrategy -> Int
delayInSeconds}) Int
_ = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delayInSeconds
backoffDelay (ExponentialBackoff {Int
delayInSeconds :: BackoffStrategy -> Int
delayInSeconds :: Int
delayInSeconds}) Int
attempts =
NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Ord a => a -> a -> a
min NominalDiffTime
86400 (Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delayInSeconds NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* (NominalDiffTime
2 NominalDiffTime -> Int -> NominalDiffTime
forall a b. (Num a, Integral b) => a -> b -> a
^ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
attempts Int
20))
recoverStaleJobs :: forall job.
( ?modelContext :: ModelContext
, Table job
) => NominalDiffTime -> IO ()
recoverStaleJobs :: forall job.
(?modelContext::ModelContext, Table job) =>
NominalDiffTime -> IO ()
recoverStaleJobs NominalDiffTime
staleThreshold = do
let tableNameText :: Text
tableNameText = forall record. Table record => Text
tableName @job
let retrySnippet :: Snippet
retrySnippet =
Text -> Snippet
Snippet.sql Text
"UPDATE " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
tableNameText
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" SET status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusRetry
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
", locked_by = NULL, locked_at = NULL, run_at = NOW()"
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" WHERE status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusRunning
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" AND locked_at < NOW() - interval '1 second' * " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Int -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param (NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round NominalDiffTime
staleThreshold :: Int)
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" AND locked_at > NOW() - interval '1 day'"
let failSnippet :: Snippet
failSnippet =
Text -> Snippet
Snippet.sql Text
"UPDATE " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
tableNameText
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" SET status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusFailed
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
", locked_by = NULL, locked_at = NULL"
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
", last_error = 'Stale job: worker likely crashed'"
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" WHERE status = " Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> JobStatus -> Snippet
forall param. DefaultParamEncoder param => param -> Snippet
Snippet.param JobStatus
JobStatusRunning
Snippet -> Snippet -> Snippet
forall a. Semigroup a => a -> a -> a
<> Text -> Snippet
Snippet.sql Text
" AND locked_at < NOW() - interval '1 day'"
pool <- IO Pool
(?modelContext::ModelContext) => IO Pool
getHasqlPool
withoutQueryLogging (sqlExecHasql pool retrySnippet)
withoutQueryLogging (sqlExecHasql pool failSnippet)
instance PG.FromField JobStatus where
fromField :: FieldParser JobStatus
fromField Field
field (Just ByteString
"job_status_not_started") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusNotStarted
fromField Field
field (Just ByteString
"job_status_running") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusRunning
fromField Field
field (Just ByteString
"job_status_failed") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusFailed
fromField Field
field (Just ByteString
"job_status_timed_out") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusTimedOut
fromField Field
field (Just ByteString
"job_status_succeeded") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusSucceeded
fromField Field
field (Just ByteString
"job_status_retry") = JobStatus -> Conversion JobStatus
forall a. a -> Conversion a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusRetry
fromField Field
field (Just ByteString
value) = (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion JobStatus
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
PG.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
PG.ConversionFailed Field
field (String
"Unexpected value for enum value. Got: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a b. ConvertibleStrings a b => a -> b
cs ByteString
value)
fromField Field
field Maybe ByteString
Nothing = (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion JobStatus
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
PG.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
PG.UnexpectedNull Field
field String
"Unexpected null for enum value"
instance Default JobStatus where
def :: JobStatus
def = JobStatus
JobStatusNotStarted
instance PG.ToField JobStatus where
toField :: JobStatus -> Action
toField JobStatus
JobStatusNotStarted = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_not_started" :: Text)
toField JobStatus
JobStatusRunning = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_running" :: Text)
toField JobStatus
JobStatusFailed = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_failed" :: Text)
toField JobStatus
JobStatusTimedOut = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_timed_out" :: Text)
toField JobStatus
JobStatusSucceeded = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_succeeded" :: Text)
toField JobStatus
JobStatusRetry = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_retry" :: Text)
instance InputValue JobStatus where
inputValue :: JobStatus -> Text
inputValue JobStatus
JobStatusNotStarted = Text
"job_status_not_started" :: Text
inputValue JobStatus
JobStatusRunning = Text
"job_status_running" :: Text
inputValue JobStatus
JobStatusFailed = Text
"job_status_failed" :: Text
inputValue JobStatus
JobStatusTimedOut = Text
"job_status_timed_out" :: Text
inputValue JobStatus
JobStatusSucceeded = Text
"job_status_succeeded" :: Text
inputValue JobStatus
JobStatusRetry = Text
"job_status_retry" :: Text
instance IHP.Controller.Param.ParamReader JobStatus where
readParameter :: ByteString -> Either ByteString JobStatus
readParameter = ByteString -> Either ByteString JobStatus
forall parameter.
(Enum parameter, InputValue parameter) =>
ByteString -> Either ByteString parameter
IHP.Controller.Param.enumParamReader
textToEnumJobStatusMap :: HashMap.HashMap Text JobStatus
textToEnumJobStatusMap :: HashMap Text JobStatus
textToEnumJobStatusMap = [(Text, JobStatus)] -> HashMap Text JobStatus
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList
[ (Text
"job_status_not_started", JobStatus
JobStatusNotStarted)
, (Text
"job_status_running", JobStatus
JobStatusRunning)
, (Text
"job_status_failed", JobStatus
JobStatusFailed)
, (Text
"job_status_timed_out", JobStatus
JobStatusTimedOut)
, (Text
"job_status_succeeded", JobStatus
JobStatusSucceeded)
, (Text
"job_status_retry", JobStatus
JobStatusRetry)
]
textToEnumJobStatus :: Text -> Maybe JobStatus
textToEnumJobStatus :: Text -> Maybe JobStatus
textToEnumJobStatus Text
t = Text -> HashMap Text JobStatus -> Maybe JobStatus
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup Text
t HashMap Text JobStatus
textToEnumJobStatusMap
instance DefaultParamEncoder JobStatus where
defaultParam :: NullableOrNot Value JobStatus
defaultParam = Value JobStatus -> NullableOrNot Value JobStatus
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable (Maybe Text -> Text -> (JobStatus -> Text) -> Value JobStatus
forall a. Maybe Text -> Text -> (a -> Text) -> Value a
Encoders.enum (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"public") Text
"job_status" JobStatus -> Text
forall a. InputValue a => a -> Text
inputValue)
getHasqlPool :: (?modelContext :: ModelContext) => IO HasqlPool.Pool
getHasqlPool :: (?modelContext::ModelContext) => IO Pool
getHasqlPool = Pool -> IO Pool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ?modelContext::ModelContext
ModelContext
?modelContext.hasqlPool
tryWriteTBQueue :: TBQueue a -> a -> STM Bool
tryWriteTBQueue :: forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue a
queue a
value = do
full <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue a
queue
if full
then pure False
else do
writeTBQueue queue value
pure True