module IHP.Job.Queue.Watch
( watchForJob
, watchForJobWithPollerTriggerRepair
, pollForJob
, notificationTriggersHealthy
, ensureNotificationTriggers
, createNotificationTriggerSQL
, channelName
) where
import IHP.Prelude
import IHP.Job.Queue.Pool (runPool)
import IHP.Job.Queue.Fetch (pendingJobConditionSQL)
import IHP.Job.Queue.STM (tryWriteTBQueue)
import IHP.Job.Types (JobWorkerProcessMessage (..))
import qualified IHP.PGListener as PGListener
import qualified IHP.Log as Log
import Control.Monad.Trans.Resource
import qualified Control.Exception.Safe as Exception
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Random as Random
import qualified Hasql.Pool as HasqlPool
import qualified Hasql.Session as HasqlSession
import qualified Hasql.Connection as HasqlConnection
import qualified Hasql.Statement as Hasql
import qualified Hasql.Encoders as Encoders
import qualified Hasql.Decoders as Decoders
import Control.Concurrent.STM (TBQueue, atomically)
import Data.Functor.Contravariant (contramap)
watchForJob :: (?context :: context, HasField "logger" context Log.Logger) => HasqlPool.Pool -> PGListener.PGListener -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO (PGListener.Subscription, ReleaseKey)
watchForJob :: forall context.
(?context::context, HasField "logger" context Logger) =>
Pool
-> PGListener
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
watchForJob Pool
pool PGListener
pgListener Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob =
Bool
-> Pool
-> PGListener
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
forall context.
(?context::context, HasField "logger" context Logger) =>
Bool
-> Pool
-> PGListener
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
watchForJobWithPollerTriggerRepair Bool
False Pool
pool PGListener
pgListener Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob
watchForJobWithPollerTriggerRepair :: (?context :: context, HasField "logger" context Log.Logger) => Bool -> HasqlPool.Pool -> PGListener.PGListener -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO (PGListener.Subscription, ReleaseKey)
watchForJobWithPollerTriggerRepair :: forall context.
(?context::context, HasField "logger" context Logger) =>
Bool
-> Pool
-> PGListener
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO (Subscription, ReleaseKey)
watchForJobWithPollerTriggerRepair Bool
enablePollerTriggerRepair Pool
pool PGListener
pgListener Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob = do
let tableNameBS :: ByteString
tableNameBS = Text -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs Text
tableName
IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
Pool -> Session () -> IO ()
forall a. Pool -> Session a -> IO a
runPool Pool
pool (Text -> Session ()
HasqlSession.script (ByteString -> Text
createNotificationTriggerSQL ByteString
tableNameBS))
(Connection -> IO ()) -> PGListener -> IO ()
PGListener.onReconnect (\Connection
connection -> do
result <- Connection -> Session () -> IO (Either SessionError ())
forall a. Connection -> Session a -> IO (Either SessionError a)
HasqlConnection.use Connection
connection (Text -> Session ()
HasqlSession.script (ByteString -> Text
createNotificationTriggerSQL ByteString
tableNameBS))
case result of
Left SessionError
err -> Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.warn (Text
"Failed to recreate notification triggers for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SessionError -> Text
forall a. Show a => a -> Text
tshow SessionError
err Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
". Falling back to poller.")
Right ()
_ -> Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (Text
"Recreated notification triggers for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName)
) PGListener
pgListener
poller <- Bool
-> Pool
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
forall context.
(?context::context, HasField "logger" context Logger) =>
Bool
-> Pool
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
pollForJob Bool
enablePollerTriggerRepair Pool
pool Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob
subscription <- liftIO $ pgListener |> PGListener.subscribe (channelName tableNameBS) (const (do
Log.debug ("Received pg_notify for " <> tableName)
didWrite <- atomically $ tryWriteTBQueue onNewJob JobAvailable
unless didWrite (Log.warn ("Job queue full for " <> tableName))
))
pure (subscription, poller)
pollForJob :: (?context :: context, HasField "logger" context Log.Logger) => Bool -> HasqlPool.Pool -> Text -> Int -> TBQueue JobWorkerProcessMessage -> ResourceT IO ReleaseKey
pollForJob :: forall context.
(?context::context, HasField "logger" context Logger) =>
Bool
-> Pool
-> Text
-> Int
-> TBQueue JobWorkerProcessMessage
-> ResourceT IO ReleaseKey
pollForJob Bool
enablePollerTriggerRepair Pool
pool Text
tableName Int
pollInterval TBQueue JobWorkerProcessMessage
onNewJob = do
let sql :: Text
sql = Text
"SELECT COUNT(*) FROM " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
pendingJobConditionSQL
let decoder :: Result Int64
decoder = Row Int64 -> Result Int64
forall a. Row a -> Result a
Decoders.singleRow (NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
Decoders.column (Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
Decoders.nonNullable Value Int64
Decoders.int8))
let statement :: Statement () Int64
statement = Text -> Params () -> Result Int64 -> Statement () Int64
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params ()
Encoders.noParams Result Int64
decoder
let handler :: IO (ZonkAny 0)
handler = do
IO () -> IO (ZonkAny 0)
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
result <- IO () -> IO (Either SomeException ())
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> m (Either SomeException a)
Exception.tryAny do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
enablePollerTriggerRepair do
Pool -> Text -> IO ()
forall context.
(?context::context, HasField "logger" context Logger) =>
Pool -> Text -> IO ()
ensureNotificationTriggers Pool
pool Text
tableName
count :: Int <- Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> IO Int64 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Pool -> Session Int64 -> IO Int64
forall a. Pool -> Session a -> IO a
runPool Pool
pool (() -> Statement () Int64 -> Session Int64
forall params result.
params -> Statement params result -> Session result
HasqlSession.statement () Statement () Int64
statement)
forEach [1..count] \Element [Int]
_ -> do
_ <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue JobWorkerProcessMessage
-> JobWorkerProcessMessage -> STM Bool
forall a. TBQueue a -> a -> STM Bool
tryWriteTBQueue TBQueue JobWorkerProcessMessage
onNewJob JobWorkerProcessMessage
JobAvailable
pure ()
case result of
Left SomeException
exception -> Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (Text
"Job poller: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
exception)
Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
jitter <- Random.randomRIO (0, 2000000)
let pollIntervalWithJitter = Int
pollInterval Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
jitter
Concurrent.threadDelay pollIntervalWithJitter
(ReleaseKey, Async (ZonkAny 0)) -> ReleaseKey
forall a b. (a, b) -> a
fst ((ReleaseKey, Async (ZonkAny 0)) -> ReleaseKey)
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
-> ResourceT IO ReleaseKey
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Async (ZonkAny 0))
-> (Async (ZonkAny 0) -> IO ())
-> ResourceT IO (ReleaseKey, Async (ZonkAny 0))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (IO (ZonkAny 0) -> IO (Async (ZonkAny 0))
forall a. IO a -> IO (Async a)
Async.async IO (ZonkAny 0)
handler) Async (ZonkAny 0) -> IO ()
forall a. Async a -> IO ()
Async.cancel
notificationTriggersHealthy :: HasqlPool.Pool -> Text -> IO Bool
notificationTriggersHealthy :: Pool -> Text -> IO Bool
notificationTriggersHealthy Pool
pool Text
tableName = do
let insertTriggerName :: Text
insertTriggerName = Text
"did_insert_job_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
let updateTriggerName :: Text
updateTriggerName = Text
"did_update_job_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
let sql :: Text
sql = Text
"SELECT COUNT(*) FROM pg_trigger t"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" JOIN pg_class c ON t.tgrelid = c.oid"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" JOIN pg_namespace n ON c.relnamespace = n.oid"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE n.nspname = current_schema()"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND c.relname = $1::name"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND NOT t.tgisinternal"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" AND (t.tgname = $2::name OR t.tgname = $3::name)"
let encoder :: Params (Text, Text, Text)
encoder =
((Text, Text, Text) -> Text)
-> Params Text -> Params (Text, Text, Text)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
tableNameParam, Text
_, Text
_) -> Text
tableNameParam) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
Params (Text, Text, Text)
-> Params (Text, Text, Text) -> Params (Text, Text, Text)
forall a. Semigroup a => a -> a -> a
<> ((Text, Text, Text) -> Text)
-> Params Text -> Params (Text, Text, Text)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_, Text
insertTriggerNameParam, Text
_) -> Text
insertTriggerNameParam) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
Params (Text, Text, Text)
-> Params (Text, Text, Text) -> Params (Text, Text, Text)
forall a. Semigroup a => a -> a -> a
<> ((Text, Text, Text) -> Text)
-> Params Text -> Params (Text, Text, Text)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(Text
_, Text
_, Text
updateTriggerNameParam) -> Text
updateTriggerNameParam) (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
let decoder :: Result Int64
decoder = Row Int64 -> Result Int64
forall a. Row a -> Result a
Decoders.singleRow (NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
Decoders.column (Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
Decoders.nonNullable Value Int64
Decoders.int8))
let statement :: Statement (Text, Text, Text) Int64
statement = Text
-> Params (Text, Text, Text)
-> Result Int64
-> Statement (Text, Text, Text) Int64
forall params result.
Text -> Params params -> Result result -> Statement params result
Hasql.unpreparable Text
sql Params (Text, Text, Text)
encoder Result Int64
decoder
count :: Int <- Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> IO Int64 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Pool -> Session Int64 -> IO Int64
forall a. Pool -> Session a -> IO a
runPool Pool
pool ((Text, Text, Text)
-> Statement (Text, Text, Text) Int64 -> Session Int64
forall params result.
params -> Statement params result -> Session result
HasqlSession.statement (Text
tableName, Text
insertTriggerName, Text
updateTriggerName) Statement (Text, Text, Text) Int64
statement)
pure (count == 2)
ensureNotificationTriggers :: (?context :: context, HasField "logger" context Log.Logger) => HasqlPool.Pool -> Text -> IO ()
ensureNotificationTriggers :: forall context.
(?context::context, HasField "logger" context Logger) =>
Pool -> Text -> IO ()
ensureNotificationTriggers Pool
pool Text
tableName = do
healthy <- Pool -> Text -> IO Bool
notificationTriggersHealthy Pool
pool Text
tableName
unless healthy do
let insertTriggerName = Text
"did_insert_job_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
let updateTriggerName = Text
"did_update_job_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
Log.warn ("Job poller: Missing notification triggers for " <> tableName <> " (" <> insertTriggerName <> ", " <> updateTriggerName <> "). Recreating.")
runPool pool (HasqlSession.script (createNotificationTriggerSQL (cs tableName)))
Log.info ("Job poller: Recreated notification triggers for " <> tableName)
createNotificationTriggerSQL :: ByteString -> Text
createNotificationTriggerSQL :: ByteString -> Text
createNotificationTriggerSQL ByteString
tableName =
ByteString -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$
ByteString
"DO $$\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"BEGIN\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" CREATE OR REPLACE FUNCTION " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"() RETURNS TRIGGER AS $BODY$"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"BEGIN\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" PERFORM pg_notify('" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> ByteString
channelName ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"', '');\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" RETURN new;"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\nEND;\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"$BODY$ language plpgsql;\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" DROP TRIGGER IF EXISTS " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
insertTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
";\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" CREATE TRIGGER " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
insertTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" AFTER INSERT ON \"" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"();\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" DROP TRIGGER IF EXISTS " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
updateTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" ON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
";\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" CREATE TRIGGER " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
updateTriggerName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" AFTER UPDATE ON \"" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
functionName ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"();\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"EXCEPTION\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" WHEN SQLSTATE 'XX000' THEN null; -- 'tuple concurrently updated': another connection installed it first\n"
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"END; $$"
where
functionName :: ByteString
functionName = ByteString
"notify_job_queued_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
insertTriggerName :: ByteString
insertTriggerName = ByteString
"did_insert_job_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
updateTriggerName :: ByteString
updateTriggerName = ByteString
"did_update_job_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName
channelName :: ByteString -> ByteString
channelName :: ByteString -> ByteString
channelName ByteString
tableName = ByteString
"job_available_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
tableName