{-|
Module: IHP.PGListener
Description: Event listener handling pg_notify messages
Copyright: (c) digitally induced GmbH, 2021

This module is solving the problem, that previously IHP was using one database connection
per running @LISTEN ..;@ statement. A @PGListener@ provides one central object to listen on
postgres channels, without manually dealing with connection management.
-}
module IHP.PGListener
( Channel
, Callback
, Subscription (..)
, PGListener (..)
, init
, stop
, subscribe
, subscribeJSON
, unsubscribe
) where

import IHP.Prelude hiding (init)
import IHP.ModelSupport
import qualified Data.Set as Set
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.Notification as PG
import qualified Data.UUID.V4 as UUID
import Control.Concurrent.MVar (MVar)
import qualified Control.Concurrent.MVar as MVar
import Data.HashMap.Strict as HashMap
import qualified Control.Concurrent.Async as Async
import qualified Data.List as List
import qualified Data.Aeson as Aeson
import qualified IHP.Log as Log
import qualified Control.Exception as Exception
import qualified Control.Concurrent.Chan.Unagi as Queue
import qualified Control.Concurrent

-- TODO: How to deal with timeout of the connection?

-- | The channel is like the event name
--
-- It's used in the postgres NOTIFY call:
--
-- > NOTIFY channel [ , payload ]
--
type Channel = ByteString

-- | An event callback receives the postgres notification object and can do IO
type Callback = PG.Notification -> IO ()

-- | Returned by a call to 'subscribe'
data Subscription = Subscription
    { Subscription -> UUID
id :: !UUID
    , Subscription -> Async ()
reader :: !(Async ())
    , Subscription -> InChan Notification
inChan :: !(Queue.InChan PG.Notification)
    , Subscription -> Channel
channel :: !Channel
    }

-- | The main datatype of the service. Keeps tracks of all channels we're watching on, as well as all open subscriptions
--
-- Use 'init' to create a new object and 'stop' to deallocate it.
data PGListener = PGListener
    { PGListener -> ModelContext
modelContext :: !ModelContext
    , PGListener -> MVar (Set Channel)
listeningTo :: !(MVar (Set Channel))
    , PGListener -> MVar Channel
listenTo :: !(MVar Channel)
    , PGListener -> IORef (HashMap Channel [Subscription])
subscriptions :: !(IORef (HashMap Channel [Subscription]))
    , PGListener -> Async ()
notifyLoopAsync :: !(Async ())
    }

-- | Creates a new 'PGListener' object
--
-- > let modelContext = ..
-- > pgListener <- PGListener.init modelContext
--
-- This will start a new async listening for postgres notifications. This will take one connection
-- from the database pool and keep it blocked until 'stop' is called.
--
init :: ModelContext -> IO PGListener
init :: ModelContext -> IO PGListener
init ModelContext
modelContext = do
    MVar (Set Channel)
listeningTo <- Set Channel -> IO (MVar (Set Channel))
forall a. a -> IO (MVar a)
MVar.newMVar Set Channel
forall a. Set a
Set.empty
    IORef (HashMap Channel [Subscription])
subscriptions <- HashMap Channel [Subscription]
-> IO (IORef (HashMap Channel [Subscription]))
forall a. a -> IO (IORef a)
newIORef HashMap Channel [Subscription]
forall k v. HashMap k v
HashMap.empty
    MVar Channel
listenTo <- IO (MVar Channel)
forall a. IO (MVar a)
MVar.newEmptyMVar

    let ?modelContext = ?modelContext::ModelContext
ModelContext
modelContext
    Async ()
notifyLoopAsync <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async ((?modelContext::ModelContext) =>
MVar (Set Channel)
-> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
MVar (Set Channel)
-> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
notifyLoop MVar (Set Channel)
listeningTo MVar Channel
listenTo IORef (HashMap Channel [Subscription])
subscriptions)
    PGListener -> IO PGListener
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PGListener { ModelContext
$sel:modelContext:PGListener :: ModelContext
modelContext :: ModelContext
modelContext, MVar (Set Channel)
$sel:listeningTo:PGListener :: MVar (Set Channel)
listeningTo :: MVar (Set Channel)
listeningTo, IORef (HashMap Channel [Subscription])
$sel:subscriptions:PGListener :: IORef (HashMap Channel [Subscription])
subscriptions :: IORef (HashMap Channel [Subscription])
subscriptions, MVar Channel
$sel:listenTo:PGListener :: MVar Channel
listenTo :: MVar Channel
listenTo, Async ()
$sel:notifyLoopAsync:PGListener :: Async ()
notifyLoopAsync :: Async ()
notifyLoopAsync }

-- | Stops the database listener async and puts the database connection used back into the database pool
--
-- > PGListener.stop pgListener
--
stop :: PGListener -> IO ()
stop :: PGListener -> IO ()
stop PGListener { Async ()
$sel:notifyLoopAsync:PGListener :: PGListener -> Async ()
notifyLoopAsync :: Async ()
notifyLoopAsync } = do
    Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
notifyLoopAsync

-- | After you subscribed to a channel, the provided callback will be called whenever there's a new
-- notification on the channel.
--
-- > pgListener <- PGListener.init
-- >
-- > let callback notification = do
-- >         let payload :: Text = cs (notification.notificationData)
-- >         putStrLn ("Received notification: " <> payload)
-- > 
-- > subscription <- pgListener |> PGListener.subscribe "my_channel" callback
--
-- The @callback@ function will now be called whenever @NOTIFY "my_channel", "my payload"@ is executed on the postgres server.
--
-- When the subscription is not used anymore, call 'unsubscribe' to stop the callback from being called anymore.
--
subscribe :: Channel -> Callback -> PGListener -> IO Subscription
subscribe :: Channel -> Callback -> PGListener -> IO Subscription
subscribe Channel
channel Callback
callback PGListener
pgListener = do
    UUID
id <- IO UUID
UUID.nextRandom
    Channel -> PGListener -> IO ()
listenToChannelIfNeeded Channel
channel PGListener
pgListener

    -- We use a queue here to guarantee that the messages are processed in the right order
    -- while keeping high performance.
    --
    -- A naive implementation might be just kicking of an async for each message. But in that case
    -- the messages might be delivered to the final consumer out of order.
    (InChan Notification
inChan, OutChan Notification
outChan) <- IO (InChan Notification, OutChan Notification)
forall a. IO (InChan a, OutChan a)
Queue.newChan
    
    let
        -- We need to log any exception, otherwise there might be silent errors
        logException :: SomeException -> IO ()
        logException :: SomeException -> IO ()
logException SomeException
exception = PGListener -> Text -> IO ()
logError PGListener
pgListener (Text
"Error in pg_notify handler: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
exception))

    Async ()
reader <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
            Notification
message <- OutChan Notification -> IO Notification
forall a. OutChan a -> IO a
Queue.readChan OutChan Notification
outChan
            Callback
callback Notification
message IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` SomeException -> IO ()
logException
    let subscription :: Subscription
subscription = Subscription { Async ()
Channel
UUID
InChan Notification
$sel:id:Subscription :: UUID
$sel:reader:Subscription :: Async ()
$sel:inChan:Subscription :: InChan Notification
$sel:channel:Subscription :: Channel
channel :: Channel
id :: UUID
inChan :: InChan Notification
reader :: Async ()
.. }

    IORef (HashMap Channel [Subscription])
-> (HashMap Channel [Subscription]
    -> HashMap Channel [Subscription])
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' (PGListener
pgListener.subscriptions) (([Subscription] -> [Subscription] -> [Subscription])
-> Channel
-> [Subscription]
-> HashMap Channel [Subscription]
-> HashMap Channel [Subscription]
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith [Subscription] -> [Subscription] -> [Subscription]
forall a. Monoid a => a -> a -> a
mappend Channel
channel [Subscription
subscription] )

    Subscription -> IO Subscription
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Subscription
subscription

-- | Like 'subscribe' but decodes the notification payload from JSON and passes the decoded data structure to the callback
--
-- When JSON parsing fails, this will ignore the notification.
--
-- > pgListener <- PGListener.init
-- >
-- > let callback (jsonObject :: Aeson.Value) = do
-- >         putStrLn ("Received notification: " <> tshow jsonObject)
-- > 
-- > subscription <- pgListener |> PGListener.subscribeJSON "my_json_channel" callback
--
-- The @callback@ function will now be called whenever @NOTIFY "my_json_channel", "{\"hello\":\"world\"}"@ is executed on the postgres server.
subscribeJSON :: Aeson.FromJSON jsonValue => Channel -> (jsonValue -> IO ()) -> PGListener -> IO Subscription
subscribeJSON :: forall jsonValue.
FromJSON jsonValue =>
Channel -> (jsonValue -> IO ()) -> PGListener -> IO Subscription
subscribeJSON Channel
channel jsonValue -> IO ()
callback PGListener
pgListener = Channel -> Callback -> PGListener -> IO Subscription
subscribe Channel
channel Callback
callback' PGListener
pgListener
    where
        callback' :: Callback
callback' Notification
notification = do
            let payload :: Channel
payload = (Notification
notification.notificationData)
            case Channel -> Maybe jsonValue
forall a. FromJSON a => Channel -> Maybe a
Aeson.decodeStrict' Channel
payload of
                Just jsonValue
payload -> jsonValue -> IO ()
callback jsonValue
payload
                Maybe jsonValue
Nothing -> PGListener -> Text -> IO ()
logError PGListener
pgListener (Text
"PGListener.subscribeJSON: Failed to parse " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Channel -> Text
forall a. Show a => a -> Text
tshow Channel
payload)

-- | Stops the callback of a subscription from receiving further notifications
--
-- > pgListener <- PGListener.init
-- >
-- > subscription <- pgListener |> PGListener.subscribe "my_channel" callback
-- > doSomethingExpensive
-- > pgListener |> PGListener.unsubscribe subscription
--
unsubscribe :: Subscription -> PGListener -> IO ()
unsubscribe :: Subscription -> PGListener -> IO ()
unsubscribe subscription :: Subscription
subscription@(Subscription { Async ()
Channel
UUID
InChan Notification
$sel:id:Subscription :: Subscription -> UUID
$sel:reader:Subscription :: Subscription -> Async ()
$sel:inChan:Subscription :: Subscription -> InChan Notification
$sel:channel:Subscription :: Subscription -> Channel
id :: UUID
reader :: Async ()
inChan :: InChan Notification
channel :: Channel
.. }) PGListener
pgListener = do
    let
        deleteById :: [Subscription] -> [Subscription]
        deleteById :: [Subscription] -> [Subscription]
deleteById = (Subscription -> Subscription -> Bool)
-> Subscription -> [Subscription] -> [Subscription]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy (\Subscription
a Subscription
b -> Subscription
a.id UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Subscription
b.id) Subscription
subscription
    IORef (HashMap Channel [Subscription])
-> (HashMap Channel [Subscription]
    -> HashMap Channel [Subscription])
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' (PGListener
pgListener.subscriptions) (([Subscription] -> [Subscription])
-> Channel
-> HashMap Channel [Subscription]
-> HashMap Channel [Subscription]
forall k v.
(Eq k, Hashable k) =>
(v -> v) -> k -> HashMap k v -> HashMap k v
HashMap.adjust [Subscription] -> [Subscription]
deleteById Channel
channel)
    Async () -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async ()
reader
    () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()     

-- | Runs a @LISTEN ..;@ statements on the postgres connection, if not already listening on that channel
listenToChannelIfNeeded :: Channel -> PGListener -> IO ()
listenToChannelIfNeeded :: Channel -> PGListener -> IO ()
listenToChannelIfNeeded Channel
channel PGListener
pgListener = do
    Set Channel
listeningTo <- MVar (Set Channel) -> IO (Set Channel)
forall a. MVar a -> IO a
MVar.readMVar (PGListener
pgListener.listeningTo)
    let alreadyListening :: Bool
alreadyListening = Channel
channel Channel -> Set Channel -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set Channel
listeningTo

    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
alreadyListening do
        MVar Channel -> Channel -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar (PGListener
pgListener.listenTo) Channel
channel
            

            

-- | The main loop that is receiving events from the database and triggering callbacks
--
-- Todo: What happens when the connection dies?
notifyLoop :: (?modelContext :: ModelContext) => MVar (Set Channel) -> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
notifyLoop :: (?modelContext::ModelContext) =>
MVar (Set Channel)
-> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
notifyLoop MVar (Set Channel)
listeningToVar MVar Channel
listenToVar IORef (HashMap Channel [Subscription])
subscriptions = do
    -- Wait until the first LISTEN is requested before taking a database connection from the pool
    MVar Channel -> IO Channel
forall a. MVar a -> IO a
MVar.readMVar MVar Channel
listenToVar

    let innerLoop :: IO Any
innerLoop = do
            (Connection -> IO Any) -> IO Any
forall a.
(?modelContext::ModelContext) =>
(Connection -> IO a) -> IO a
withDatabaseConnection \Connection
databaseConnection -> do

                -- If listeningTo already contains channels, this means that previously the database connection
                -- died, so we're restarting here. Therefore we need to replay all LISTEN calls to restore the previous state
                Set Channel
listeningTo <- MVar (Set Channel) -> IO (Set Channel)
forall a. MVar a -> IO a
MVar.readMVar MVar (Set Channel)
listeningToVar
                Set Channel -> (Element (Set Channel) -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach Set Channel
listeningTo (Connection -> Channel -> IO ()
listenToChannel Connection
databaseConnection)

                -- This loop reads channels from the 'listenToVar' and then triggers a LISTEN statement on
                -- the current database connections
                let listenLoop :: IO Any
listenLoop = IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
                        Channel
channel <- MVar Channel -> IO Channel
forall a. MVar a -> IO a
MVar.takeMVar MVar Channel
listenToVar
                        
                        MVar (Set Channel) -> (Set Channel -> IO (Set Channel)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
MVar.modifyMVar_ MVar (Set Channel)
listeningToVar \Set Channel
listeningTo -> do
                            let alreadyListening :: Bool
alreadyListening = Channel
channel Channel -> Set Channel -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set Channel
listeningTo

                            if Bool
alreadyListening
                                then Set Channel -> IO (Set Channel)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Set Channel
listeningTo
                                else do
                                    Connection -> Channel -> IO ()
listenToChannel Connection
databaseConnection Channel
channel
                                    Set Channel -> IO (Set Channel)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Channel -> Set Channel -> Set Channel
forall a. Ord a => a -> Set a -> Set a
Set.insert Channel
channel Set Channel
listeningTo)

                IO Any -> (Async Any -> IO Any) -> IO Any
forall a b. IO a -> (Async a -> IO b) -> IO b
Async.withAsync IO Any
listenLoop \Async Any
listenLoopAsync -> do
                    IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
                        Notification
notification <- Connection -> IO Notification
PG.getNotification Connection
databaseConnection
                        let channel :: Channel
channel = Notification
notification.notificationChannel

                        HashMap Channel [Subscription]
allSubscriptions <- IORef (HashMap Channel [Subscription])
-> IO (HashMap Channel [Subscription])
forall a. IORef a -> IO a
readIORef IORef (HashMap Channel [Subscription])
subscriptions
                        let channelSubscriptions :: [Subscription]
channelSubscriptions = HashMap Channel [Subscription]
allSubscriptions
                                HashMap Channel [Subscription]
-> (HashMap Channel [Subscription] -> Maybe [Subscription])
-> Maybe [Subscription]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Channel -> HashMap Channel [Subscription] -> Maybe [Subscription]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup Channel
channel
                                Maybe [Subscription]
-> (Maybe [Subscription] -> [Subscription]) -> [Subscription]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> [Subscription] -> Maybe [Subscription] -> [Subscription]
forall a. a -> Maybe a -> a
fromMaybe []

                        [Subscription] -> (Element [Subscription] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Subscription]
channelSubscriptions \Element [Subscription]
subscription -> do
                            let inChan :: InChan Notification
inChan = Element [Subscription]
Subscription
subscription.inChan
                            InChan Notification -> Callback
forall a. InChan a -> a -> IO ()
Queue.writeChan InChan Notification
inChan Notification
notification

    -- Initial delay (in microseconds)
    let initialDelay :: Int
initialDelay = Int
500 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
    -- Max delay (in microseconds)
    let maxDelay :: Int
maxDelay = Int
60 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
    -- This outer loop restarts the listeners if the database connection dies (e.g. due to a timeout)
    let retryLoop :: Int -> Bool -> IO ()
retryLoop Int
delay Bool
isFirstError = do
            Either SomeException Any
result <- IO Any -> IO (Either SomeException Any)
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try IO Any
innerLoop
            case Either SomeException Any
result of
                Left (SomeException
error :: SomeException) -> do
                    case SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
error of
                        Just (AsyncCancelled
error :: AsyncCancelled) -> AsyncCancelled -> IO ()
forall a e. Exception e => e -> a
throw AsyncCancelled
error
                        Maybe AsyncCancelled
notification -> do
                            let ?context = ?context::ModelContext
?modelContext::ModelContext
ModelContext
?modelContext -- Log onto the modelContext logger
                            if Bool
isFirstError then do
                                String -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (String
"PGListener is going to restart, loop failed with exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
error) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
". Retrying immediately.")
                                Int -> Bool -> IO ()
retryLoop Int
delay Bool
False -- Retry with no delay interval on first error, but will increase delay interval in subsequent retries 
                            else do
                                let increasedDelay :: Int
increasedDelay = Int
delay Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2 -- Double current delay
                                let nextDelay :: Int
nextDelay = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
increasedDelay Int
maxDelay -- Picks whichever delay is lowest of increasedDelay * 2 or maxDelay
                                String -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (String
"PGListener is going to restart, loop failed with exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
error) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
". Retrying in " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a b. ConvertibleStrings a b => a -> b
cs (Int -> Text
printTimeToNextRetry Int
delay) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
".")
                                Int -> IO ()
Control.Concurrent.threadDelay Int
delay -- Sleep for the current delay
                                Int -> Bool -> IO ()
retryLoop Int
nextDelay Bool
False -- Retry with longer interval
                Right Any
_ -> 
                    Int -> Bool -> IO ()
retryLoop Int
initialDelay Bool
True -- If all went well, re-run with no sleeping and reset current delay to the initial value
    Int -> Bool -> IO ()
retryLoop Int
initialDelay Bool
True

printTimeToNextRetry :: Int -> Text
printTimeToNextRetry :: Int -> Text
printTimeToNextRetry Int
microseconds
    | Int
microseconds Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000000000 =  Int -> Text
forall a. Show a => a -> Text
show (Int
microseconds Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
1000000000) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" min"
    | Int
microseconds Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000000 =  Int -> Text
forall a. Show a => a -> Text
show (Int
microseconds Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
1000000) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" s"
    | Int
microseconds Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000 = Int -> Text
forall a. Show a => a -> Text
show (Int
microseconds Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
1000) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ms"
    | Bool
otherwise = Int -> Text
forall a. Show a => a -> Text
show Int
microseconds Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" µs"

listenToChannel :: PG.Connection -> Channel -> IO ()
listenToChannel :: Connection -> Channel -> IO ()
listenToChannel Connection
databaseConnection Channel
channel = do
    Connection -> Query -> [Identifier] -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PG.execute Connection
databaseConnection Query
"LISTEN ?" [Text -> Identifier
PG.Identifier (Channel -> Text
forall a b. ConvertibleStrings a b => a -> b
cs Channel
channel)]
    () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

logError :: PGListener -> Text -> IO ()
logError :: PGListener -> Text -> IO ()
logError PGListener
pgListener Text
message = let ?context = PGListener
pgListener.modelContext in Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error Text
message