IHP Api Reference
Copyright(c) digitally induced GmbH 2020
Safe HaskellSafe-Inferred

IHP.Job.Queue

Description

 
Synopsis

Documentation

fetchNextJob :: forall job. (?modelContext :: ModelContext, job ~ GetModelByTableName (GetTableName job), FilterPrimaryKey (GetTableName job), FromRow job, Show (PrimaryKey (GetTableName job)), FromField (PrimaryKey (GetTableName job)), Table job) => UUID -> IO (Maybe job) Source #

Lock and fetch the next available job. In case no job is available returns Nothing.

The lock is set on the job row in an atomic way.

The job status is set to JobStatusRunning, lockedBy will be set to the worker id and the attemptsCount is incremented.

Example: Locking a SendMailJob

let workerId :: UUID = "faa5ba30-1d76-4adf-bf01-2d1f95cddc04"
job <- fetchNextJob @SendMailJob workerId

After you're done with the job, call jobDidFail or jobDidSucceed to make it available to the queue again.

watchForJob :: (?modelContext :: ModelContext) => PGListener -> Text -> Int -> MVar JobWorkerProcessMessage -> IO (Subscription, Async ()) Source #

Calls a callback every time something is inserted, updated or deleted in a given database table.

In the background this function creates a database trigger to notify this function about table changes using pg_notify. When there are existing triggers, it will silently recreate them. So this will most likely not fail.

This function returns a Async. Call cancel on the async to stop watching the database.

Example:

watchInsertOrUpdateTable "projects" do
    putStrLn "Something changed in the projects table"

Now insert something into the projects table. E.g. by running make psql and then running INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project'); You will see that "Something changed in the projects table" is printed onto the screen.

pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> MVar JobWorkerProcessMessage -> IO (Async ()) Source #

Periodically checks the queue table for open jobs. Calls the callback if there are any.

watchForJob only catches jobs when something is changed on the table. When a job is scheduled with a runAt in the future, and no other operation is happening on the queue, the database triggers will not run, and so watchForJob cannot pick up the job even when runAt is now in the past.

This function returns a Async. Call cancel on the async to stop polling the database.

channelName :: ByteString -> ByteString Source #

Retuns the event name of the event that the pg notify trigger dispatches

jobDidFail :: forall job. (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext) => job -> SomeException -> IO () Source #

Called when a job failed. Sets the job status to JobStatusFailed or JobStatusRetry (if more attempts are possible) and resets lockedBy

jobDidTimeout :: forall job. (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext) => job -> IO () Source #

jobDidSucceed :: forall job. (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext) => job -> IO () Source #

Called when a job succeeded. Sets the job status to JobStatusSucceded and resets lockedBy

Orphan instances

Default JobStatus Source # 
Instance details

Methods

def :: JobStatus #

ParamReader JobStatus Source # 
Instance details

InputValue JobStatus Source # 
Instance details

FromField JobStatus Source #

Mapping for JOB_STATUS:

CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');
Instance details

Methods

fromField :: FieldParser JobStatus

ToField JobStatus Source #

Mapping for JOB_STATUS:

CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');
Instance details

Methods

toField :: JobStatus -> Action