{-# LANGUAGE AllowAmbiguousTypes #-}
module IHP.Job.Queue.Fetch
( fetchNextJob
, pendingJobConditionSQL
) where

import IHP.Prelude
import IHP.Job.Queue.Pool (runPool)
import IHP.ModelSupport (Table (..), GetModelByTableName)
import IHP.ModelSupport.Types (PrimaryKey)
import IHP.Hasql.FromRow (FromRowHasql (..))
import qualified Hasql.Pool as HasqlPool
import qualified Hasql.Session as HasqlSession
import qualified Hasql.Statement as Hasql
import qualified Hasql.Encoders as Encoders
import qualified Hasql.Decoders as Decoders
import qualified Data.Text as Text

-- | Lock and fetch the next available job. In case no job is available returns Nothing.
--
-- The lock is set on the job row in an atomic way.
--
-- The job status is set to JobStatusRunning, lockedBy will be set to the worker id and the attemptsCount is incremented.
--
-- __Example:__ Locking a SendMailJob
--
-- > let workerId :: UUID = "faa5ba30-1d76-4adf-bf01-2d1f95cddc04"
-- > job <- fetchNextJob @SendMailJob pool workerId
--
-- After you're done with the job, call 'jobDidFail' or 'jobDidSucceed' to make it available to the queue again.
fetchNextJob :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , FromRowHasql job
    , Show (PrimaryKey (GetTableName job))
    , Table job
    ) => HasqlPool.Pool -> UUID -> IO (Maybe job)
fetchNextJob :: forall job.
(job ~ GetModelByTableName (GetTableName job), FromRowHasql job,
 Show (PrimaryKey (GetTableName job)), Table job) =>
Pool -> UUID -> IO (Maybe job)
fetchNextJob Pool
pool UUID
workerId = do
    let tableNameText :: Text
tableNameText = forall record. Table record => Text
tableName @job
    let returningColumns :: Text
returningColumns = Text -> [Text] -> Text
Text.intercalate Text
", " (forall record. Table record => [Text]
columnNames @job)
    let sql :: Text
sql = Text
"UPDATE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" SET status = 'job_status_running'"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", locked_at = NOW(), locked_by = $1"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", attempts_count = attempts_count + 1"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE id IN (SELECT id FROM " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableNameText
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
pendingJobConditionSQL
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ORDER BY created_at LIMIT 1 FOR UPDATE SKIP LOCKED)"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" RETURNING " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
returningColumns
    let encoder :: Params UUID
encoder = NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value UUID
Encoders.uuid)
    let decoder :: Result (Maybe job)
decoder = Row job -> Result (Maybe job)
forall a. Row a -> Result (Maybe a)
Decoders.rowMaybe (forall a. FromRowHasql a => Row a
hasqlRowDecoder @job)
    let statement :: Statement UUID (Maybe job)
statement = Text
-> Params UUID -> Result (Maybe job) -> Statement UUID (Maybe job)
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params UUID
encoder Result (Maybe job)
decoder
    Pool -> Session (Maybe job) -> IO (Maybe job)
forall a. Pool -> Session a -> IO a
runPool Pool
pool (UUID -> Statement UUID (Maybe job) -> Session (Maybe job)
forall params result.
params -> Statement params result -> Session result
HasqlSession.statement UUID
workerId Statement UUID (Maybe job)
statement)

-- | Shared WHERE condition for fetching pending jobs as a SQL text fragment.
-- Matches jobs that are either not started or in retry state,
-- not locked, and whose run_at time has passed.
-- Enum values are inlined as SQL string literals (PostgreSQL casts them to job_status).
pendingJobConditionSQL :: Text
pendingJobConditionSQL :: Text
pendingJobConditionSQL =
    Text
"(status = 'job_status_not_started'"
    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" OR status = 'job_status_retry'"
    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
") AND locked_by IS NULL AND run_at <= NOW()"