Copyright | (c) digitally induced GmbH 2020 |
---|---|
Safe Haskell | Safe-Inferred |
IHP.Job.Queue
Contents
Description
Synopsis
- fetchNextJob :: forall job. (?modelContext :: ModelContext, job ~ GetModelByTableName (GetTableName job), FilterPrimaryKey (GetTableName job), FromRow job, Show (PrimaryKey (GetTableName job)), FromField (PrimaryKey (GetTableName job)), Table job) => UUID -> IO (Maybe job)
- watchForJob :: (?modelContext :: ModelContext) => PGListener -> Text -> Int -> MVar JobWorkerProcessMessage -> IO (Subscription, Async ())
- pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> MVar JobWorkerProcessMessage -> IO (Async ())
- createNotificationTrigger :: ByteString -> Query
- channelName :: ByteString -> ByteString
- jobDidFail :: forall job. (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext) => job -> SomeException -> IO ()
- jobDidTimeout :: forall job. (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext) => job -> IO ()
- jobDidSucceed :: forall job. (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext) => job -> IO ()
Documentation
fetchNextJob :: forall job. (?modelContext :: ModelContext, job ~ GetModelByTableName (GetTableName job), FilterPrimaryKey (GetTableName job), FromRow job, Show (PrimaryKey (GetTableName job)), FromField (PrimaryKey (GetTableName job)), Table job) => UUID -> IO (Maybe job) Source #
Lock and fetch the next available job. In case no job is available returns Nothing.
The lock is set on the job row in an atomic way.
The job status is set to JobStatusRunning, lockedBy will be set to the worker id and the attemptsCount is incremented.
Example: Locking a SendMailJob
let workerId :: UUID = "faa5ba30-1d76-4adf-bf01-2d1f95cddc04" job <- fetchNextJob @SendMailJob workerId
After you're done with the job, call jobDidFail
or jobDidSucceed
to make it available to the queue again.
watchForJob :: (?modelContext :: ModelContext) => PGListener -> Text -> Int -> MVar JobWorkerProcessMessage -> IO (Subscription, Async ()) Source #
Calls a callback every time something is inserted, updated or deleted in a given database table.
In the background this function creates a database trigger to notify this function about table changes using pg_notify. When there are existing triggers, it will silently recreate them. So this will most likely not fail.
This function returns a Async. Call cancel
on the async to stop watching the database.
Example:
watchInsertOrUpdateTable "projects" do putStrLn "Something changed in the projects table"
Now insert something into the projects
table. E.g. by running make psql
and then running INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');
You will see that "Something changed in the projects table"
is printed onto the screen.
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> MVar JobWorkerProcessMessage -> IO (Async ()) Source #
Periodically checks the queue table for open jobs. Calls the callback if there are any.
watchForJob
only catches jobs when something is changed on the table. When a job is scheduled
with a runAt
in the future, and no other operation is happening on the queue, the database triggers
will not run, and so watchForJob
cannot pick up the job even when runAt
is now in the past.
This function returns a Async. Call cancel
on the async to stop polling the database.
createNotificationTrigger :: ByteString -> Query Source #
channelName :: ByteString -> ByteString Source #
Retuns the event name of the event that the pg notify trigger dispatches
jobDidFail :: forall job. (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext) => job -> SomeException -> IO () Source #
Called when a job failed. Sets the job status to JobStatusFailed
or JobStatusRetry
(if more attempts are possible) and resets lockedBy
jobDidTimeout :: forall job. (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext) => job -> IO () Source #
jobDidSucceed :: forall job. (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext) => job -> IO () Source #
Called when a job succeeded. Sets the job status to JobStatusSucceded
and resets lockedBy
Orphan instances
Default JobStatus Source # | |
ParamReader JobStatus Source # | |
Methods readParameter :: ByteString -> Either ByteString JobStatus Source # readParameterJSON :: Value -> Either ByteString JobStatus Source # | |
InputValue JobStatus Source # | |
Methods inputValue :: JobStatus -> Text Source # | |
FromField JobStatus Source # | Mapping for CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry'); |
ToField JobStatus Source # | Mapping for CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry'); |