{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Queue.Fetch
( fetchNextJob
, pendingJobConditionSQL
) where
import IHP.Prelude
import IHP.Job.Queue.Pool (runPool)
import IHP.ModelSupport (Table (..), GetModelByTableName)
import IHP.ModelSupport.Types (PrimaryKey)
import IHP.Hasql.FromRow (FromRowHasql (..))
import qualified Hasql.Pool as HasqlPool
import qualified Hasql.Session as HasqlSession
import qualified Hasql.Statement as Hasql
import qualified Hasql.Encoders as Encoders
import qualified Hasql.Decoders as Decoders
import qualified Data.Text as Text
fetchNextJob :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FromRowHasql job
, Show (PrimaryKey (GetTableName job))
, Table job
) => HasqlPool.Pool -> UUID -> IO (Maybe job)
fetchNextJob :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
Show (PrimaryKey (GetTableName job)), Table job) =>
Pool -> UUID -> IO (Maybe job)
fetchNextJob Pool
pool UUID
workerId = do
let tableNameText :: Text
tableNameText = forall record. Table record => Text
tableName @job
let returningColumns :: Text
returningColumns = Text -> [Text] -> Text
Text.intercalate Text
", " (forall record. Table record => [Text]
columnNames @job)
let sql :: Text
sql = Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = 'job_status_running'"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", locked_at = NOW(), locked_by = $1"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", attempts_count = attempts_count + 1"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE id IN (SELECT id FROM " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
pendingJobConditionSQL
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ORDER BY created_at LIMIT 1 FOR UPDATE SKIP LOCKED)"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" RETURNING " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
returningColumns
let encoder :: Params UUID
encoder = NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UUID
Encoders.uuid)
let decoder :: Result (Maybe job)
decoder = Row job -> Result (Maybe job)
forall a. Row a -> Result (Maybe a)
Decoders.rowMaybe (forall a. FromRowHasql a => Row a
hasqlRowDecoder @job)
let statement :: Statement UUID (Maybe job)
statement = Text
-> Params UUID -> Result (Maybe job) -> Statement UUID (Maybe job)
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params UUID
encoder Result (Maybe job)
decoder
Pool -> Session (Maybe job) -> IO (Maybe job)
forall a. Pool -> Session a -> IO a
runPool Pool
pool (UUID -> Statement UUID (Maybe job) -> Session (Maybe job)
forall params result.
params -> Statement params result -> Session result
HasqlSession.statement UUID
workerId Statement UUID (Maybe job)
statement)
pendingJobConditionSQL :: Text
pendingJobConditionSQL :: Text
pendingJobConditionSQL =
Text
"(status = 'job_status_not_started'"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" OR status = 'job_status_retry'"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
") AND locked_by IS NULL AND run_at <= NOW()"