Copyright | (c) digitally induced GmbH 2020 |
---|---|
Safe Haskell | None |
Synopsis
- fetchNextJob :: (?modelContext :: ModelContext, job ~ GetModelByTableName (GetTableName job), FilterPrimaryKey (GetTableName job), FromRow job, Show (PrimaryKey (GetTableName job)), FromField (PrimaryKey (GetTableName job)), Table job) => Maybe Int -> BackoffStrategy -> UUID -> IO (Maybe job)
- watchForJob :: (?modelContext :: ModelContext) => PGListener -> Text -> Int -> Maybe Int -> BackoffStrategy -> MVar JobWorkerProcessMessage -> IO (Subscription, Async ())
- pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Maybe Int -> BackoffStrategy -> MVar JobWorkerProcessMessage -> IO (Async ())
- createNotificationTrigger :: ByteString -> Query
- channelName :: ByteString -> ByteString
- jobDidFail :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> SomeException -> IO ()
- jobDidTimeout :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> IO ()
- jobDidSucceed :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> IO ()
- retryQuery :: BackoffStrategy -> ByteString
- timeoutCondition :: Maybe Int -> ByteString
Documentation
fetchNextJob :: (?modelContext :: ModelContext, job ~ GetModelByTableName (GetTableName job), FilterPrimaryKey (GetTableName job), FromRow job, Show (PrimaryKey (GetTableName job)), FromField (PrimaryKey (GetTableName job)), Table job) => Maybe Int -> BackoffStrategy -> UUID -> IO (Maybe job) Source #
Lock and fetch the next available job. In case no job is available returns Nothing.
The lock is set on the job row in an atomic way.
The job status is set to JobStatusRunning, lockedBy will be set to the worker id and the attemptsCount is incremented.
Example: Locking a SendMailJob
let workerId :: UUID = "faa5ba30-1d76-4adf-bf01-2d1f95cddc04" job <- fetchNextJob @SendMailJob workerId
After you're done with the job, call jobDidFail
or jobDidSucceed
to make it available to the queue again.
watchForJob :: (?modelContext :: ModelContext) => PGListener -> Text -> Int -> Maybe Int -> BackoffStrategy -> MVar JobWorkerProcessMessage -> IO (Subscription, Async ()) Source #
Calls a callback every time something is inserted, updated or deleted in a given database table.
In the background this function creates a database trigger to notify this function about table changes using pg_notify. When there are existing triggers, it will silently recreate them. So this will most likely not fail.
This function returns a Async. Call cancel
on the async to stop watching the database.
Example:
watchInsertOrUpdateTable "projects" do putStrLn "Something changed in the projects table"
Now insert something into the projects
table. E.g. by running make psql
and then running INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');
You will see that "Something changed in the projects table"
is printed onto the screen.
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Maybe Int -> BackoffStrategy -> MVar JobWorkerProcessMessage -> IO (Async ()) Source #
Periodically checks the queue table for open jobs. Calls the callback if there are any.
watchForJob
only catches jobs when something is changed on the table. When a job is scheduled
with a runAt
in the future, and no other operation is happening on the queue, the database triggers
will not run, and so watchForJob
cannot pick up the job even when runAt
is now in the past.
This function returns a Async. Call cancel
on the async to stop polling the database.
createNotificationTrigger :: ByteString -> Query Source #
channelName :: ByteString -> ByteString Source #
Retuns the event name of the event that the pg notify trigger dispatches
jobDidFail :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> SomeException -> IO () Source #
Called when a job failed. Sets the job status to JobStatusFailed
or JobStatusRetry
(if more attempts are possible) and resets lockedBy
jobDidTimeout :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> IO () Source #
jobDidSucceed :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> IO () Source #
Called when a job succeeded. Sets the job status to JobStatusSucceded
and resets lockedBy
timeoutCondition :: Maybe Int -> ByteString Source #
Orphan instances
Default JobStatus Source # | |
ParamReader JobStatus Source # | |
InputValue JobStatus Source # | |
inputValue :: JobStatus -> Text Source # | |
FromField JobStatus Source # | Mapping for CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry'); |
ToField JobStatus Source # | Mapping for CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry'); |