| Copyright | (c) digitally induced GmbH 2020 |
|---|---|
| Safe Haskell | None |
| Language | GHC2021 |
IHP.Job.Queue
Contents
Description
Synopsis
- fetchNextJob :: (?modelContext :: ModelContext, job ~ GetModelByTableName (GetTableName job), FromRowHasql job, Show (PrimaryKey (GetTableName job)), Table job) => UUID -> IO (Maybe job)
- pendingJobCondition :: Snippet
- watchForJob :: (?modelContext :: ModelContext) => PGListener -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO (Subscription, ReleaseKey)
- pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO ReleaseKey
- createNotificationTriggerSQL :: ByteString -> Text
- channelName :: ByteString -> ByteString
- jobDidFail :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, SetField "runAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> SomeException -> IO ()
- jobDidTimeout :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, SetField "runAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> IO ()
- jobDidSucceed :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> IO ()
- backoffDelay :: BackoffStrategy -> Int -> NominalDiffTime
- recoverStaleJobs :: (?modelContext :: ModelContext, Table job) => NominalDiffTime -> IO ()
- textToEnumJobStatusMap :: HashMap Text JobStatus
- textToEnumJobStatus :: Text -> Maybe JobStatus
- getHasqlPool :: (?modelContext :: ModelContext) => IO Pool
- tryWriteTBQueue :: TBQueue a -> a -> STM Bool
Documentation
fetchNextJob :: (?modelContext :: ModelContext, job ~ GetModelByTableName (GetTableName job), FromRowHasql job, Show (PrimaryKey (GetTableName job)), Table job) => UUID -> IO (Maybe job) Source #
Lock and fetch the next available job. In case no job is available returns Nothing.
The lock is set on the job row in an atomic way.
The job status is set to JobStatusRunning, lockedBy will be set to the worker id and the attemptsCount is incremented.
Example: Locking a SendMailJob
let workerId :: UUID = "faa5ba30-1d76-4adf-bf01-2d1f95cddc04" job <- fetchNextJob @SendMailJob workerId
After you're done with the job, call jobDidFail or jobDidSucceed to make it available to the queue again.
pendingJobCondition :: Snippet Source #
Shared WHERE condition for fetching pending jobs. Matches jobs that are either not started or in retry state, not locked, and whose run_at time has passed.
watchForJob :: (?modelContext :: ModelContext) => PGListener -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO (Subscription, ReleaseKey) Source #
Calls a callback every time something is inserted, updated or deleted in a given database table.
In the background this function creates a database trigger to notify this function about table changes using pg_notify. When there are existing triggers, it will silently recreate them. So this will most likely not fail.
This function returns a Async. Call cancel on the async to stop watching the database.
Example:
watchInsertOrUpdateTable "projects" do
putStrLn "Something changed in the projects table"Now insert something into the projects table. E.g. by running make psql and then running INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');
You will see that "Something changed in the projects table" is printed onto the screen.
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO ReleaseKey Source #
Periodically checks the queue table for open jobs. Calls the callback if there are any.
watchForJob only catches jobs when something is changed on the table. When a job is scheduled
with a runAt in the future, and no other operation is happening on the queue, the database triggers
will not run, and so watchForJob cannot pick up the job even when runAt is now in the past.
This function returns a Async. Call cancel on the async to stop polling the database.
createNotificationTriggerSQL :: ByteString -> Text Source #
Returns a SQL script to create the notification trigger.
Wrapped in a DO $$ block with EXCEPTION handler because concurrent requests can race to CREATE OR REPLACE the same function, causing PostgreSQL to throw 'tuple concurrently updated' (SQLSTATE XX000). This is safe to ignore: the other connection's CREATE OR REPLACE will have succeeded.
channelName :: ByteString -> ByteString Source #
Retuns the event name of the event that the pg notify trigger dispatches
jobDidFail :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, SetField "runAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> SomeException -> IO () Source #
Called when a job failed. Sets the job status to JobStatusFailed or JobStatusRetry (if more attempts are possible) and resets lockedBy
jobDidTimeout :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, SetField "runAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> IO () Source #
jobDidSucceed :: (job ~ GetModelByTableName (GetTableName job), SetField "lockedBy" job (Maybe UUID), SetField "status" job JobStatus, SetField "updatedAt" job UTCTime, HasField "attemptsCount" job Int, SetField "lastError" job (Maybe Text), Job job, CanUpdate job, Show job, ?modelContext :: ModelContext, ?context :: context, HasField "logger" context Logger) => job -> IO () Source #
Called when a job succeeded. Sets the job status to JobStatusSucceded and resets lockedBy
backoffDelay :: BackoffStrategy -> Int -> NominalDiffTime Source #
Compute the delay before the next retry attempt.
For LinearBackoff, the delay is constant.
For ExponentialBackoff, the delay doubles each attempt, capped at 24 hours.
recoverStaleJobs :: (?modelContext :: ModelContext, Table job) => NominalDiffTime -> IO () Source #
Recover stale jobs that have been in JobStatusRunning for too long,
likely due to a worker crash.
Two-tier recovery: - Recently stale jobs (within 24h) are set back to retry - Ancient stale jobs (older than 24h) are marked as failed
textToEnumJobStatusMap :: HashMap Text JobStatus Source #
Parses a Text value to a JobStatus. Used by hasql decoders. Uses HashMap for O(1) lookup.
getHasqlPool :: (?modelContext :: ModelContext) => IO Pool Source #
tryWriteTBQueue :: TBQueue a -> a -> STM Bool Source #
Non-blocking write to a TBQueue. Returns True if the value was written, False if the queue was full.
Orphan instances
| Default JobStatus Source # | |
| DefaultParamEncoder JobStatus Source # | DefaultParamEncoder for hasql queries using JobStatus in filterWhere |
Methods | |
| InputValue JobStatus Source # | |
Methods inputValue :: JobStatus -> Text Source # | |
| FromField JobStatus Source # | Mapping for CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');These instances are needed by the generated |
Methods | |
| ToField JobStatus Source # | See |
| ParamReader JobStatus Source # | |
Methods readParameter :: ByteString -> Either ByteString JobStatus # | |