module IHP.Job.Queue.Watch
( watchForJob
, watchForJobWithPollerTriggerRepair
, pollForJob
, notificationTriggersHealthy
, ensureNotificationTriggers
, createNotificationTriggerSQL
, channelName
) where

import IHP.Prelude
import IHP.Job.Queue.Pool (runPool)
import IHP.Job.Queue.Fetch (pendingJobConditionSQL)
import IHP.Job.Queue.STM (tryWriteTBQueue)
import IHP.Job.Types (JobWorkerProcessMessage (..))
import qualified IHP.PGListener as PGListener
import qualified IHP.Log as Log
import Control.Monad.Trans.Resource
import qualified Control.Exception.Safe as Exception
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Random as Random
import qualified Hasql.Pool as HasqlPool
import qualified Hasql.Session as HasqlSession
import qualified Hasql.Connection as HasqlConnection
import qualified Hasql.Statement as Hasql
import qualified Hasql.Encoders as Encoders
import qualified Hasql.Decoders as Decoders
import Control.Concurrent.STM (TBQueue, atomically)
import Data.Functor.Contravariant (contramap)

-- | Calls a callback every time something is inserted, updated or deleted in a given database table.
--
-- In the background this function creates a database trigger to notify this function about table changes
-- using pg_notify. When there are existing triggers, it will silently recreate them. So this will most likely
-- not fail.
--
-- This function returns a Async. Call 'cancel' on the async to stop watching the database.
--
-- __Example:__
--
-- > watchInsertOrUpdateTable "projects" do
-- >     putStrLn "Something changed in the projects table"
--
-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@
-- You will see that @"Something changed in the projects table"@ is printed onto the screen.
watchForJob :: (?context :: context, HasField "logger" context Log.Logger) => HasqlPool.Pool -> PGListener.PGListener -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO (PGListener.Subscription, ReleaseKey)
watchForJob :: forall context.
(?context::context, HasField "logger" context Logger) =>
Pool
-> PGListener
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
watchForJob Pool
pool PGListener
pgListener Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob =
    Bool
-> Pool
-> PGListener
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
forall context.
(?context::context, HasField "logger" context Logger) =>
Bool
-> Pool
-> PGListener
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
watchForJobWithPollerTriggerRepair Bool
False Pool
pool PGListener
pgListener Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob

-- | Like 'watchForJob' but allows enabling a poller-side trigger integrity check.
-- Useful in development to recover from missing triggers after `make db`.
watchForJobWithPollerTriggerRepair :: (?context :: context, HasField "logger" context Log.Logger) => Bool -> HasqlPool.Pool -> PGListener.PGListener -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO (PGListener.Subscription, ReleaseKey)
watchForJobWithPollerTriggerRepair :: forall context.
(?context::context, HasField "logger" context Logger) =>
Bool
-> Pool
-> PGListener
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
watchForJobWithPollerTriggerRepair Bool
enablePollerTriggerRepair Pool
pool PGListener
pgListener Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob = do
    let tableNameBS :: ByteString
tableNameBS = Text -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs Text
tableName
    IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
        Pool -> Session () -> IO ()
forall a. Pool -> Session a -> IO a
runPool Pool
pool (Text -> Session ()
HasqlSession.script (ByteString -> Text
createNotificationTriggerSQL ByteString
tableNameBS))

        -- Recreate notification triggers when PGListener reconnects (e.g. after `make db` drops the database)
        (Connection -> IO ()) -> PGListener -> IO ()
PGListener.onReconnect (\Connection
connection -> do
            result <- Connection -> Session () -> IO (Either SessionError ())
forall a. Connection -> Session a -> IO (Either SessionError a)
HasqlConnection.use Connection
connection (Text -> Session ()
HasqlSession.script (ByteString -> Text
createNotificationTriggerSQL ByteString
tableNameBS))
            case result of
                Left SessionError
err -> Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.warn (Text
"Failed to recreate notification triggers for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SessionError -> Text
forall a. Show a => a -> Text
tshow SessionError
err Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
". Falling back to poller.")
                Right ()
_ -> Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Recreated notification triggers for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName)
            ) PGListener
pgListener

    poller <- Bool
-> Pool
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
forall context.
(?context::context, HasField "logger" context Logger) =>
Bool
-> Pool
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
pollForJob Bool
enablePollerTriggerRepair Pool
pool Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob
    subscription <- liftIO $ pgListener |> PGListener.subscribe (channelName tableNameBS) (const (do
            Log.debug ("Received pg_notify for " <> tableName)
            didWrite <- atomically $ tryWriteTBQueue onNewJob JobAvailable
            unless didWrite (Log.warn ("Job queue full for " <> tableName))
            ))

    pure (subscription, poller)

-- | Periodically checks the queue table for open jobs. Calls the callback if there are any.
--
-- 'watchForJob' only catches jobs when something is changed on the table. When a job is scheduled
-- with a 'runAt' in the future, and no other operation is happening on the queue, the database triggers
-- will not run, and so 'watchForJob' cannot pick up the job even when 'runAt' is now in the past.
--
-- This function returns a Async. Call 'cancel' on the async to stop polling the database.
pollForJob :: (?context :: context, HasField "logger" context Log.Logger) => Bool -> HasqlPool.Pool -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO ReleaseKey
pollForJob :: forall context.
(?context::context, HasField "logger" context Logger) =>
Bool
-> Pool
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
pollForJob Bool
enablePollerTriggerRepair Pool
pool Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob = do
    let sql :: Text
sql = Text
"SELECT COUNT(*) FROM " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
pendingJobConditionSQL
    let decoder :: Result Int64
decoder = Row Int64 -> Result Int64
forall a. Row a -> Result a
Decoders.singleRow (NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
Decoders.column (Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
Decoders.nonNullable Value Int64
Decoders.int8))
    let statement :: Statement () Int64
statement = Text -> Params () -> Result Int64 -> Statement () Int64
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params ()
Encoders.noParams Result Int64
decoder
    let handler :: IO (ZonkAny 0)
handler = do
            IO () -> IO (ZonkAny 0)
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
                result <- IO () -> IO (Either SomeException ())
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> m (Either SomeException a)
Exception.tryAny do
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
enablePollerTriggerRepair do
                        Pool -> Text -> IO ()
forall context.
(?context::context, HasField "logger" context Logger) =>
Pool -> Text -> IO ()
ensureNotificationTriggers Pool
pool Text
tableName

                    count :: Int <- Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> IO Int64 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Pool -> Session Int64 -> IO Int64
forall a. Pool -> Session a -> IO a
runPool Pool
pool (() -> Statement () Int64 -> Session Int64
forall params result.
params -> Statement params result -> Session result
HasqlSession.statement () Statement () Int64
statement)

                    -- For every job we send one signal to the job workers
                    -- This way we use full concurrency when we find multiple jobs
                    -- that haven't been picked up by the PGListener
                    forEach [1..count] \Element [Int]
_ -> do
                        _ <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM Bool
forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue JobWorkerProcessMessage
onNewJob JobWorkerProcessMessage
JobAvailable
                        pure ()
                case result of
                    Left SomeException
exception -> Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (Text
"Job poller: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception)
                    Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

                -- Add up to 2 seconds of jitter to avoid all job queues polling at the same time
                jitter <- Random.randomRIO (0, 2000000)
                let pollIntervalWithJitter = Int
pollInterval Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
jitter

                Concurrent.threadDelay pollIntervalWithJitter

    (ReleaseKey, Async (ZonkAny 0)) -> ReleaseKey
forall a b. (a, b) -> a
fst ((ReleaseKey, Async (ZonkAny 0)) -> ReleaseKey)
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
-> ResourceT IO ReleaseKey
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Async (ZonkAny 0))
-> (Async (ZonkAny 0) -> IO ())
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (IO (ZonkAny 0) -> IO (Async (ZonkAny 0))
forall a. IO a -> IO (Async a)
Async.async IO (ZonkAny 0)
handler) Async (ZonkAny 0) -> IO ()
forall a. Async a -> IO ()
Async.cancel

notificationTriggersHealthy :: HasqlPool.Pool -> Text -> IO Bool
notificationTriggersHealthy :: Pool -> Text -> IO Bool
notificationTriggersHealthy Pool
pool Text
tableName = do
    let insertTriggerName :: Text
insertTriggerName = Text
"did_insert_job_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
    let updateTriggerName :: Text
updateTriggerName = Text
"did_update_job_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
    let sql :: Text
sql = Text
"SELECT COUNT(*) FROM pg_trigger t"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" JOIN pg_class c ON t.tgrelid = c.oid"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" JOIN pg_namespace n ON c.relnamespace = n.oid"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE n.nspname = current_schema()"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND c.relname = $1::name"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND NOT t.tgisinternal"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND (t.tgname = $2::name OR t.tgname = $3::name)"
    let encoder :: Params (Text, Text, Text)
encoder =
            ((Text, Text, Text) -> Text)
-> Params Text -> Params (Text, Text, Text)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
tableNameParam, Text
_, Text
_) -> Text
tableNameParam) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
            Params (Text, Text, Text)
-> Params (Text, Text, Text) -> Params (Text, Text, Text)
forall a. Semigroup a => a -> a -> a
<> ((Text, Text, Text) -> Text)
-> Params Text -> Params (Text, Text, Text)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_, Text
insertTriggerNameParam, Text
_) -> Text
insertTriggerNameParam) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
            Params (Text, Text, Text)
-> Params (Text, Text, Text) -> Params (Text, Text, Text)
forall a. Semigroup a => a -> a -> a
<> ((Text, Text, Text) -> Text)
-> Params Text -> Params (Text, Text, Text)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_, Text
_, Text
updateTriggerNameParam) -> Text
updateTriggerNameParam) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
    let decoder :: Result Int64
decoder = Row Int64 -> Result Int64
forall a. Row a -> Result a
Decoders.singleRow (NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
Decoders.column (Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
Decoders.nonNullable Value Int64
Decoders.int8))
    let statement :: Statement (Text, Text, Text) Int64
statement = Text
-> Params (Text, Text, Text)
-> Result Int64
-> Statement (Text, Text, Text) Int64
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params (Text, Text, Text)
encoder Result Int64
decoder
    count :: Int <- Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> IO Int64 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Pool -> Session Int64 -> IO Int64
forall a. Pool -> Session a -> IO a
runPool Pool
pool ((Text, Text, Text)
-> Statement (Text, Text, Text) Int64 -> Session Int64
forall params result.
params -> Statement params result -> Session result
HasqlSession.statement (Text
tableName, Text
insertTriggerName, Text
updateTriggerName) Statement (Text, Text, Text) Int64
statement)
    pure (count == 2)

ensureNotificationTriggers :: (?context :: context, HasField "logger" context Log.Logger) => HasqlPool.Pool -> Text -> IO ()
ensureNotificationTriggers :: forall context.
(?context::context, HasField "logger" context Logger) =>
Pool -> Text -> IO ()
ensureNotificationTriggers Pool
pool Text
tableName = do
    healthy <- Pool -> Text -> IO Bool
notificationTriggersHealthy Pool
pool Text
tableName
    unless healthy do
        let insertTriggerName = Text
"did_insert_job_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
        let updateTriggerName = Text
"did_update_job_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
        Log.warn ("Job poller: Missing notification triggers for " <> tableName <> " (" <> insertTriggerName <> ", " <> updateTriggerName <> "). Recreating.")
        runPool pool (HasqlSession.script (createNotificationTriggerSQL (cs tableName)))
        Log.info ("Job poller: Recreated notification triggers for " <> tableName)

-- | Returns a SQL script to create the notification trigger.
--
-- Wrapped in a DO $$ block with EXCEPTION handler because concurrent requests
-- can race to CREATE OR REPLACE the same function, causing PostgreSQL to throw
-- 'tuple concurrently updated' (SQLSTATE XX000). This is safe to ignore: the
-- other connection's CREATE OR REPLACE will have succeeded.
createNotificationTriggerSQL :: ByteString -> Text
createNotificationTriggerSQL :: ByteString -> Text
createNotificationTriggerSQL ByteString
tableName =
        ByteString -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$
        ByteString
"DO $$\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"BEGIN\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    CREATE OR REPLACE FUNCTION " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"() RETURNS TRIGGER AS $BODY$"
            ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"BEGIN\n"
            ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    PERFORM pg_notify('" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> ByteString
channelName ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"', '');\n"
            ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    RETURN new;"
            ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\nEND;\n"
            ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"$BODY$ language plpgsql;\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    DROP TRIGGER IF EXISTS " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
insertTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
";\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    CREATE TRIGGER " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
insertTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" AFTER INSERT ON \"" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"();\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    DROP TRIGGER IF EXISTS " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
updateTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
";\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    CREATE TRIGGER " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
updateTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" AFTER UPDATE ON \"" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"();\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"EXCEPTION\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"    WHEN SQLSTATE 'XX000' THEN null; -- 'tuple concurrently updated': another connection installed it first\n"
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"END; $$"
    where
        functionName :: ByteString
functionName = ByteString
"notify_job_queued_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
        insertTriggerName :: ByteString
insertTriggerName = ByteString
"did_insert_job_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
        updateTriggerName :: ByteString
updateTriggerName = ByteString
"did_update_job_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName

-- | Retuns the event name of the event that the pg notify trigger dispatches
channelName :: ByteString -> ByteString
channelName :: ByteString -> ByteString
channelName ByteString
tableName = ByteString
"job_available_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName