{-|
Module: IHP.Job.Queue
Description: Functions to operate on the Job Queue Database
Copyright: (c) digitally induced GmbH, 2020
-}
module IHP.Job.Queue where

import IHP.Prelude
import IHP.Job.Types
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.FromField as PG
import qualified Database.PostgreSQL.Simple.ToField as PG
import qualified Database.PostgreSQL.Simple.Notification as PG
import qualified Control.Concurrent.Async as Async
import IHP.ModelSupport
import IHP.QueryBuilder
import IHP.Fetch
import IHP.Controller.Param

-- | Lock and fetch the next available job. In case no job is available returns Nothing.
--
-- The lock is set on the job row in an atomic way.
-- 
-- The job status is set to JobStatusRunning, lockedBy will be set to the worker id and the attemptsCount is incremented.
--
-- __Example:__ Locking a SendMailJob
--
-- > let workerId :: UUID = "faa5ba30-1d76-4adf-bf01-2d1f95cddc04"
-- > job <- fetchNextJob @SendMailJob workerId
--
-- After you're done with the job, call 'jobDidFail' or 'jobDidSucceed' to make it available to the queue again.
fetchNextJob :: forall job.
    ( ?modelContext :: ModelContext
    , job ~ GetModelByTableName (GetTableName job)
    , FilterPrimaryKey (GetTableName job)
    , FromRow job
    , Show (PrimaryKey (GetTableName job))
    , PG.FromField (PrimaryKey (GetTableName job))
    , KnownSymbol (GetTableName job)
    ) => UUID -> IO (Maybe job)
fetchNextJob :: UUID -> IO (Maybe job)
fetchNextJob UUID
workerId = do
    let query :: Query
query = Query
"UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE (status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds') AND locked_by IS NULL ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id"
    let params :: (Identifier, JobStatus, UUID, Identifier, JobStatus, JobStatus)
params = (Text -> Identifier
PG.Identifier (KnownSymbol (GetTableName job) => Text
forall model. KnownSymbol (GetTableName model) => Text
tableName @job), JobStatus
JobStatusRunning, UUID
workerId, Text -> Identifier
PG.Identifier (KnownSymbol (GetTableName job) => Text
forall model. KnownSymbol (GetTableName model) => Text
tableName @job), JobStatus
JobStatusNotStarted, JobStatus
JobStatusRetry)

    result :: [PG.Only (Id job)] <- Query
-> (Identifier, JobStatus, UUID, Identifier, JobStatus, JobStatus)
-> IO [Only (Id' (GetTableName job))]
forall q r.
(?modelContext::ModelContext, ToRow q, FromRow r, Show q) =>
Query -> q -> IO [r]
sqlQuery Query
query (Identifier, JobStatus, UUID, Identifier, JobStatus, JobStatus)
params
    case [Only (Id' (GetTableName job))]
result of
        [] -> Maybe job -> IO (Maybe job)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe job
forall a. Maybe a
Nothing
        [PG.Only Id' (GetTableName job)
id] -> job -> Maybe job
forall a. a -> Maybe a
Just (job -> Maybe job) -> IO job -> IO (Maybe job)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Id' (GetTableName job)
-> IO (FetchResult (Id' (GetTableName job)) job)
forall fetchable model.
(Fetchable fetchable model, KnownSymbol (GetTableName model),
 FromRow model, ?modelContext::ModelContext) =>
fetchable -> IO (FetchResult fetchable model)
fetch Id' (GetTableName job)
id
        [Only (Id' (GetTableName job))]
otherwise -> Text -> IO (Maybe job)
forall a. Text -> a
error ([Only (Id' (GetTableName job))] -> Text
forall a. Show a => a -> Text
show [Only (Id' (GetTableName job))]
otherwise)

-- | Calls a callback every time something is inserted, updated or deleted in a given database table.
--
-- In the background this function creates a database trigger to notify this function about table changes
-- using pg_notify. When there are existing triggers, it will silently recreate them. So this will most likely
-- not fail.
--
-- This function returns a Async. Call 'cancel' on the async to stop watching the database.
--
-- __Example:__
--
-- > watchInsertOrUpdateTable "projects" do
-- >     putStrLn "Something changed in the projects table"
--
-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@
-- You will see that @"Something changed in the projects table"@ is printed onto the screen.
--
watchForJob :: (?modelContext :: ModelContext) => Text -> IO () -> IO (Async.Async ())
watchForJob :: Text -> IO () -> IO (Async ())
watchForJob Text
tableName IO ()
handleJob = do
    Query -> () -> IO Int64
forall q.
(?modelContext::ModelContext, ToRow q, Show q) =>
Query -> q -> IO Int64
sqlExec (ByteString -> Query
PG.Query (ByteString -> Query) -> ByteString -> Query
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> Text
createNotificationTrigger Text
tableName) ()

    let listenStatement :: Query
listenStatement = Query
"LISTEN " Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> ByteString -> Query
PG.Query (Text -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> Text
eventName Text
tableName)
    IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.asyncBound do
        IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
            Notification
notification <- (Connection -> IO Notification) -> IO Notification
forall a.
(?modelContext::ModelContext) =>
(Connection -> IO a) -> IO a
withDatabaseConnection \Connection
databaseConnection -> do
                Connection -> Query -> () -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PG.execute Connection
databaseConnection Query
listenStatement ()
                Connection -> IO Notification
PG.getNotification Connection
databaseConnection

            IO ()
handleJob

createNotificationTrigger :: Text -> Text
createNotificationTrigger :: Text -> Text
createNotificationTrigger Text
tableName = Text
"CREATE OR REPLACE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
functionName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"() RETURNS TRIGGER AS $$"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"BEGIN\n"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"    PERFORM pg_notify('" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
eventName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"', '');\n"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"    RETURN new;"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"END;\n"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"$$ language plpgsql;"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
insertTriggerName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"; CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
insertTriggerName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AFTER INSERT ON \"" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
functionName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();\n"
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
updateTriggerName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"; CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
updateTriggerName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AFTER UPDATE ON \"" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
functionName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();\n"
    where
        functionName :: Text
functionName = Text
"notify_job_queued_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
        insertTriggerName :: Text
insertTriggerName = Text
"did_insert_job_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
        updateTriggerName :: Text
updateTriggerName = Text
"did_update_job_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName

-- | Retuns the event name of the event that the pg notify trigger dispatches
eventName :: Text -> Text
eventName :: Text -> Text
eventName Text
tableName = Text
"job_available_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName

-- | Called when a job failed. Sets the job status to 'JobStatusFailed' or 'JobStatusRetry' (if more attempts are possible) and resets 'lockedBy'
jobDidFail :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , ?modelContext :: ModelContext
    ) => job -> SomeException -> IO ()
jobDidFail :: job -> SomeException -> IO ()
jobDidFail job
job SomeException
exception = do
    UTCTime
updatedAt <- IO UTCTime
getCurrentTime

    Text -> IO ()
putStrLn (Text
"Failed job with exception: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception)

    let ?job = job
    let canRetry :: Bool
canRetry = Proxy "attemptsCount" -> job -> Int
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "attemptsCount" (Proxy "attemptsCount")
Proxy "attemptsCount"
#attemptsCount job
job Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
forall job. (Job job, ?job::job) => Int
maxAttempts
    let status :: JobStatus
status = if Bool
canRetry then JobStatus
JobStatusRetry else JobStatus
JobStatusFailed
    job
job
        job -> (job -> job) -> job
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "status" -> JobStatus -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set IsLabel "status" (Proxy "status")
Proxy "status"
#status JobStatus
status
        job -> (job -> job) -> job
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "lockedBy" -> Maybe UUID -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set IsLabel "lockedBy" (Proxy "lockedBy")
Proxy "lockedBy"
#lockedBy Maybe UUID
forall a. Maybe a
Nothing
        job -> (job -> job) -> job
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "updatedAt" -> UTCTime -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set IsLabel "updatedAt" (Proxy "updatedAt")
Proxy "updatedAt"
#updatedAt UTCTime
updatedAt
        job -> (job -> job) -> job
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "lastError" -> Maybe Text -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set IsLabel "lastError" (Proxy "lastError")
Proxy "lastError"
#lastError (Text -> Maybe Text
forall a. a -> Maybe a
Just (SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception))
        job -> (job -> IO job) -> IO job
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> job -> IO job
forall a. (CanUpdate a, ?modelContext::ModelContext) => a -> IO a
updateRecord

    () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Called when a job succeeded. Sets the job status to 'JobStatusSucceded' and resets 'lockedBy'
jobDidSucceed :: forall job.
    ( job ~ GetModelByTableName (GetTableName job)
    , SetField "lockedBy" job (Maybe UUID)
    , SetField "status" job JobStatus
    , SetField "updatedAt" job UTCTime
    , HasField "attemptsCount" job Int
    , SetField "lastError" job (Maybe Text)
    , Job job
    , CanUpdate job
    , Show job
    , ?modelContext :: ModelContext
    ) => job -> IO ()
jobDidSucceed :: job -> IO ()
jobDidSucceed job
job = do
    Text -> IO ()
putStrLn Text
"Succeeded job"
    UTCTime
updatedAt <- IO UTCTime
getCurrentTime
    job
job
        job -> (job -> job) -> job
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "status" -> JobStatus -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set IsLabel "status" (Proxy "status")
Proxy "status"
#status JobStatus
JobStatusSucceeded
        job -> (job -> job) -> job
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "lockedBy" -> Maybe UUID -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set IsLabel "lockedBy" (Proxy "lockedBy")
Proxy "lockedBy"
#lockedBy Maybe UUID
forall a. Maybe a
Nothing
        job -> (job -> job) -> job
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "updatedAt" -> UTCTime -> job -> job
forall model (name :: Symbol) value.
(KnownSymbol name, SetField name model value) =>
Proxy name -> value -> model -> model
set IsLabel "updatedAt" (Proxy "updatedAt")
Proxy "updatedAt"
#updatedAt UTCTime
updatedAt
        job -> (job -> IO job) -> IO job
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> job -> IO job
forall a. (CanUpdate a, ?modelContext::ModelContext) => a -> IO a
updateRecord

    () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Mapping for @JOB_STATUS@:
--
-- > CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');
instance PG.FromField JobStatus where
    fromField :: FieldParser JobStatus
fromField Field
field (Just ByteString
"job_status_not_started") = JobStatus -> Conversion JobStatus
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusNotStarted
    fromField Field
field (Just ByteString
"job_status_running") = JobStatus -> Conversion JobStatus
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusRunning
    fromField Field
field (Just ByteString
"job_status_failed") = JobStatus -> Conversion JobStatus
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusFailed
    fromField Field
field (Just ByteString
"job_status_succeeded") = JobStatus -> Conversion JobStatus
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusSucceeded
    fromField Field
field (Just ByteString
"job_status_retry") = JobStatus -> Conversion JobStatus
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobStatus
JobStatusRetry
    fromField Field
field (Just ByteString
value) = (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion JobStatus
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
PG.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
PG.ConversionFailed Field
field (String
"Unexpected value for enum value. Got: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a b. ConvertibleStrings a b => a -> b
cs ByteString
value)
    fromField Field
field Maybe ByteString
Nothing = (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion JobStatus
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
PG.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
PG.UnexpectedNull Field
field String
"Unexpected null for enum value"

-- The default state is @not started@
instance Default JobStatus where
    def :: JobStatus
def = JobStatus
JobStatusNotStarted

-- | Mapping for @JOB_STATUS@:
--
-- > CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');
instance PG.ToField JobStatus where
    toField :: JobStatus -> Action
toField JobStatus
JobStatusNotStarted = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_not_started" :: Text)
    toField JobStatus
JobStatusRunning = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_running" :: Text)
    toField JobStatus
JobStatusFailed = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_failed" :: Text)
    toField JobStatus
JobStatusSucceeded = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_succeeded" :: Text)
    toField JobStatus
JobStatusRetry = Text -> Action
forall a. ToField a => a -> Action
PG.toField (Text
"job_status_retry" :: Text)

instance InputValue JobStatus where
    inputValue :: JobStatus -> Text
inputValue JobStatus
JobStatusNotStarted = Text
"job_status_not_started" :: Text
    inputValue JobStatus
JobStatusRunning = Text
"job_status_running" :: Text
    inputValue JobStatus
JobStatusFailed = Text
"job_status_failed" :: Text
    inputValue JobStatus
JobStatusSucceeded = Text
"job_status_succeeded" :: Text
    inputValue JobStatus
JobStatusRetry = Text
"job_status_retry" :: Text

instance IHP.Controller.Param.ParamReader JobStatus where
    readParameter :: ByteString -> Either ByteString JobStatus
readParameter = ByteString -> Either ByteString JobStatus
forall parameter.
(Enum parameter, InputValue parameter) =>
ByteString -> Either ByteString parameter
IHP.Controller.Param.enumParamReader