{-|
Module: IHP.PGListener
Description: Event listener handling pg_notify messages
Copyright: (c) digitally induced GmbH, 2021

This module is solving the problem, that previously IHP was using one database connection
per running @LISTEN ..;@ statement. A @PGListener@ provides one central object to listen on
postgres channels, without manually dealing with connection management.
-}
module IHP.PGListener
( Channel
, Callback
, Subscription (..)
, PGListener (..)
, init
, stop
, subscribe
, subscribeJSON
, unsubscribe
) where

import IHP.Prelude hiding (init)
import IHP.ModelSupport
import Data.Set (Set)
import qualified Data.Set as Set
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.Notification as PG
import qualified Data.UUID.V4 as UUID
import Control.Concurrent.MVar (MVar)
import qualified Control.Concurrent.MVar as MVar
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict as HashMap
import qualified Control.Concurrent.Async as Async
import qualified Data.List as List
import qualified Data.Aeson as Aeson
import qualified IHP.Log as Log
import qualified Control.Exception as Exception
import qualified Control.Concurrent.Chan.Unagi as Queue

-- TODO: How to deal with timeout of the connection?

-- | The channel is like the event name
--
-- It's used in the postgres NOTIFY call:
--
-- > NOTIFY channel [ , payload ]
--
type Channel = ByteString

-- | An event callback receives the postgres notification object and can do IO
type Callback = PG.Notification -> IO ()

-- | Returned by a call to 'subscribe'
data Subscription = Subscription
    { Subscription -> UUID
id :: !UUID
    , Subscription -> Async ()
reader :: !(Async ())
    , Subscription -> InChan Notification
inChan :: !(Queue.InChan PG.Notification)
    , Subscription -> Channel
channel :: !Channel
    }

-- | The main datatype of the service. Keeps tracks of all channels we're watching on, as well as all open subscriptions
--
-- Use 'init' to create a new object and 'stop' to deallocate it.
data PGListener = PGListener
    { PGListener -> ModelContext
modelContext :: !ModelContext
    , PGListener -> MVar (Set Channel)
listeningTo :: !(MVar (Set Channel))
    , PGListener -> MVar Channel
listenTo :: !(MVar Channel)
    , PGListener -> IORef (HashMap Channel [Subscription])
subscriptions :: !(IORef (HashMap Channel [Subscription]))
    , PGListener -> Async ()
notifyLoopAsync :: !(Async ())
    }

-- | Creates a new 'PGListener' object
--
-- > let modelContext = ..
-- > pgListener <- PGListener.init modelContext
--
-- This will start a new async listening for postgres notifications. This will take one connection
-- from the database pool and keep it blocked until 'stop' is called.
--
init :: ModelContext -> IO PGListener
init :: ModelContext -> IO PGListener
init ModelContext
modelContext = do
    MVar (Set Channel)
listeningTo <- Set Channel -> IO (MVar (Set Channel))
forall a. a -> IO (MVar a)
MVar.newMVar Set Channel
forall a. Set a
Set.empty
    IORef (HashMap Channel [Subscription])
subscriptions <- HashMap Channel [Subscription]
-> IO (IORef (HashMap Channel [Subscription]))
forall a. a -> IO (IORef a)
newIORef HashMap Channel [Subscription]
forall k v. HashMap k v
HashMap.empty
    MVar Channel
listenTo <- IO (MVar Channel)
forall a. IO (MVar a)
MVar.newEmptyMVar

    let ?modelContext = modelContext
    Async ()
notifyLoopAsync <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async ((?modelContext::ModelContext) =>
MVar (Set Channel)
-> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
MVar (Set Channel)
-> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
notifyLoop MVar (Set Channel)
listeningTo MVar Channel
listenTo IORef (HashMap Channel [Subscription])
subscriptions)
    PGListener -> IO PGListener
forall (f :: * -> *) a. Applicative f => a -> f a
pure PGListener :: ModelContext
-> MVar (Set Channel)
-> MVar Channel
-> IORef (HashMap Channel [Subscription])
-> Async ()
-> PGListener
PGListener { ModelContext
modelContext :: ModelContext
$sel:modelContext:PGListener :: ModelContext
modelContext, MVar (Set Channel)
listeningTo :: MVar (Set Channel)
$sel:listeningTo:PGListener :: MVar (Set Channel)
listeningTo, IORef (HashMap Channel [Subscription])
subscriptions :: IORef (HashMap Channel [Subscription])
$sel:subscriptions:PGListener :: IORef (HashMap Channel [Subscription])
subscriptions, MVar Channel
listenTo :: MVar Channel
$sel:listenTo:PGListener :: MVar Channel
listenTo, Async ()
notifyLoopAsync :: Async ()
$sel:notifyLoopAsync:PGListener :: Async ()
notifyLoopAsync }

-- | Stops the database listener async and puts the database connection used back into the database pool
--
-- > PGListener.stop pgListener
--
stop :: PGListener -> IO ()
stop :: PGListener -> IO ()
stop PGListener { Async ()
notifyLoopAsync :: Async ()
$sel:notifyLoopAsync:PGListener :: PGListener -> Async ()
notifyLoopAsync } = do
    Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
notifyLoopAsync

-- | After you subscribed to a channel, the provided callback will be called whenever there's a new
-- notification on the channel.
--
-- > pgListener <- PGListener.init
-- >
-- > let callback notification = do
-- >         let payload :: Text = cs (get #notificationData notification)
-- >         putStrLn ("Received notification: " <> payload)
-- > 
-- > subscription <- pgListener |> PGListener.subscribe "my_channel" callback
--
-- The @callback@ function will now be called whenever @NOTIFY "my_channel", "my payload"@ is executed on the postgres server.
--
-- When the subscription is not used anymore, call 'unsubscribe' to stop the callback from being called anymore.
--
subscribe :: Channel -> Callback -> PGListener -> IO Subscription
subscribe :: Channel -> Callback -> PGListener -> IO Subscription
subscribe Channel
channel Callback
callback PGListener
pgListener = do
    UUID
id <- IO UUID
UUID.nextRandom
    Channel -> PGListener -> IO ()
listenToChannelIfNeeded Channel
channel PGListener
pgListener

    -- We use a queue here to guarantee that the messages are processed in the right order
    -- while keeping high performance.
    --
    -- A naive implementation might be just kicking of an async for each message. But in that case
    -- the messages might be delivered to the final consumer out of order.
    (InChan Notification
inChan, OutChan Notification
outChan) <- IO (InChan Notification, OutChan Notification)
forall a. IO (InChan a, OutChan a)
Queue.newChan
    Async ()
reader <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (OutChan Notification -> IO Notification
forall a. OutChan a -> IO a
Queue.readChan OutChan Notification
outChan IO Notification -> Callback -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Callback
callback))
    let subscription :: Subscription
subscription = Subscription :: UUID -> Async () -> InChan Notification -> Channel -> Subscription
Subscription { Async ()
Channel
UUID
InChan Notification
reader :: Async ()
inChan :: InChan Notification
id :: UUID
channel :: Channel
$sel:channel:Subscription :: Channel
$sel:inChan:Subscription :: InChan Notification
$sel:reader:Subscription :: Async ()
$sel:id:Subscription :: UUID
.. }

    IORef (HashMap Channel [Subscription])
-> (HashMap Channel [Subscription]
    -> HashMap Channel [Subscription])
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' (Proxy "subscriptions"
-> PGListener -> IORef (HashMap Channel [Subscription])
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "subscriptions" (Proxy "subscriptions")
Proxy "subscriptions"
#subscriptions PGListener
pgListener) (([Subscription] -> [Subscription] -> [Subscription])
-> Channel
-> [Subscription]
-> HashMap Channel [Subscription]
-> HashMap Channel [Subscription]
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith [Subscription] -> [Subscription] -> [Subscription]
forall a. Monoid a => a -> a -> a
mappend Channel
channel [Subscription
subscription] )

    Subscription -> IO Subscription
forall (f :: * -> *) a. Applicative f => a -> f a
pure Subscription
subscription

-- | Like 'subscribe' but decodes the notification payload from JSON and passes the decoded data structure to the callback
--
-- When JSON parsing fails, this will ignore the notification.
--
-- > pgListener <- PGListener.init
-- >
-- > let callback (jsonObject :: Aeson.Value) = do
-- >         putStrLn ("Received notification: " <> tshow jsonObject)
-- > 
-- > subscription <- pgListener |> PGListener.subscribeJSON "my_json_channel" callback
--
-- The @callback@ function will now be called whenever @NOTIFY "my_json_channel", "{\"hello\":\"world\"}"@ is executed on the postgres server.
subscribeJSON :: Aeson.FromJSON jsonValue => Channel -> (jsonValue -> IO ()) -> PGListener -> IO Subscription
subscribeJSON :: Channel -> (jsonValue -> IO ()) -> PGListener -> IO Subscription
subscribeJSON Channel
channel jsonValue -> IO ()
callback PGListener
pgListener = Channel -> Callback -> PGListener -> IO Subscription
subscribe Channel
channel Callback
callback' PGListener
pgListener
    where
        callback' :: Callback
callback' Notification
notification = do
            let payload :: Channel
payload = (Proxy "notificationData" -> Notification -> Channel
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "notificationData" (Proxy "notificationData")
Proxy "notificationData"
#notificationData Notification
notification)
            case Channel -> Maybe jsonValue
forall a. FromJSON a => Channel -> Maybe a
Aeson.decodeStrict' Channel
payload of
                Just jsonValue
payload -> jsonValue -> IO ()
callback jsonValue
payload
                Maybe jsonValue
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Stops the callback of a subscription from receiving further notifications
--
-- > pgListener <- PGListener.init
-- >
-- > subscription <- pgListener |> PGListener.subscribe "my_channel" callback
-- > doSomethingExpensive
-- > pgListener |> PGListener.unsubscribe subscription
--
unsubscribe :: Subscription -> PGListener -> IO ()
unsubscribe :: Subscription -> PGListener -> IO ()
unsubscribe subscription :: Subscription
subscription@(Subscription { Async ()
Channel
UUID
InChan Notification
channel :: Channel
inChan :: InChan Notification
reader :: Async ()
id :: UUID
$sel:channel:Subscription :: Subscription -> Channel
$sel:inChan:Subscription :: Subscription -> InChan Notification
$sel:reader:Subscription :: Subscription -> Async ()
$sel:id:Subscription :: Subscription -> UUID
.. }) PGListener
pgListener = do
    let
        deleteById :: [Subscription] -> [Subscription]
        deleteById :: [Subscription] -> [Subscription]
deleteById = (Subscription -> Subscription -> Bool)
-> Subscription -> [Subscription] -> [Subscription]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy (\Subscription
a Subscription
b -> Proxy "id" -> Subscription -> UUID
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "id" (Proxy "id")
Proxy "id"
#id Subscription
a UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Proxy "id" -> Subscription -> UUID
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "id" (Proxy "id")
Proxy "id"
#id Subscription
b) Subscription
subscription
    IORef (HashMap Channel [Subscription])
-> (HashMap Channel [Subscription]
    -> HashMap Channel [Subscription])
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' (Proxy "subscriptions"
-> PGListener -> IORef (HashMap Channel [Subscription])
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "subscriptions" (Proxy "subscriptions")
Proxy "subscriptions"
#subscriptions PGListener
pgListener) (([Subscription] -> [Subscription])
-> Channel
-> HashMap Channel [Subscription]
-> HashMap Channel [Subscription]
forall k v.
(Eq k, Hashable k) =>
(v -> v) -> k -> HashMap k v -> HashMap k v
HashMap.adjust [Subscription] -> [Subscription]
deleteById Channel
channel)
    Async () -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async ()
reader
    () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()     

-- | Runs a @LISTEN ..;@ statements on the postgres connection, if not already listening on that channel
listenToChannelIfNeeded :: Channel -> PGListener -> IO ()
listenToChannelIfNeeded :: Channel -> PGListener -> IO ()
listenToChannelIfNeeded Channel
channel PGListener
pgListener = do
    Set Channel
listeningTo <- MVar (Set Channel) -> IO (Set Channel)
forall a. MVar a -> IO a
MVar.readMVar (Proxy "listeningTo" -> PGListener -> MVar (Set Channel)
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "listeningTo" (Proxy "listeningTo")
Proxy "listeningTo"
#listeningTo PGListener
pgListener)
    let alreadyListening :: Bool
alreadyListening = Channel
channel Channel -> Set Channel -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set Channel
listeningTo

    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
alreadyListening do
        MVar Channel -> Channel -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar (Proxy "listenTo" -> PGListener -> MVar Channel
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "listenTo" (Proxy "listenTo")
Proxy "listenTo"
#listenTo PGListener
pgListener) Channel
channel
            

            

-- | The main loop that is receiving events from the database and triggering callbacks
--
-- Todo: What happens when the connection dies?
notifyLoop :: (?modelContext :: ModelContext) => MVar (Set Channel) -> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
notifyLoop :: MVar (Set Channel)
-> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
notifyLoop MVar (Set Channel)
listeningToVar MVar Channel
listenToVar IORef (HashMap Channel [Subscription])
subscriptions = do
    -- Wait until the first LISTEN is requested before taking a database connection from the pool
    MVar Channel -> IO Channel
forall a. MVar a -> IO a
MVar.readMVar MVar Channel
listenToVar

    let innerLoop :: IO Any
innerLoop = do
            (Connection -> IO Any) -> IO Any
forall a.
(?modelContext::ModelContext) =>
(Connection -> IO a) -> IO a
withDatabaseConnection \Connection
databaseConnection -> do

                -- If listeningTo already contains channels, this means that previously the database connection
                -- died, so we're restarting here. Therefore we need to replay all LISTEN calls to restore the previous state
                Set Channel
listeningTo <- MVar (Set Channel) -> IO (Set Channel)
forall a. MVar a -> IO a
MVar.readMVar MVar (Set Channel)
listeningToVar
                Set Channel -> (Element (Set Channel) -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach Set Channel
listeningTo (Connection -> Channel -> IO ()
listenToChannel Connection
databaseConnection)

                -- This loop reads channels from the 'listenToVar' and then triggers a LISTEN statement on
                -- the current database connections
                let listenLoop :: IO Any
listenLoop = IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
                        Channel
channel <- MVar Channel -> IO Channel
forall a. MVar a -> IO a
MVar.takeMVar MVar Channel
listenToVar
                        
                        MVar (Set Channel) -> (Set Channel -> IO (Set Channel)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
MVar.modifyMVar_ MVar (Set Channel)
listeningToVar \Set Channel
listeningTo -> do
                            let alreadyListening :: Bool
alreadyListening = Channel
channel Channel -> Set Channel -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set Channel
listeningTo

                            if Bool
alreadyListening
                                then Set Channel -> IO (Set Channel)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Set Channel
listeningTo
                                else do
                                    Connection -> Channel -> IO ()
listenToChannel Connection
databaseConnection Channel
channel
                                    Set Channel -> IO (Set Channel)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Channel -> Set Channel -> Set Channel
forall a. Ord a => a -> Set a -> Set a
Set.insert Channel
channel Set Channel
listeningTo)

                IO Any -> (Async Any -> IO Any) -> IO Any
forall a b. IO a -> (Async a -> IO b) -> IO b
Async.withAsync IO Any
listenLoop \Async Any
listenLoopAsync -> do
                    IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
                        Notification
notification <- Connection -> IO Notification
PG.getNotification Connection
databaseConnection
                        let channel :: Channel
channel = Proxy "notificationChannel" -> Notification -> Channel
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "notificationChannel" (Proxy "notificationChannel")
Proxy "notificationChannel"
#notificationChannel Notification
notification

                        HashMap Channel [Subscription]
allSubscriptions <- IORef (HashMap Channel [Subscription])
-> IO (HashMap Channel [Subscription])
forall a. IORef a -> IO a
readIORef IORef (HashMap Channel [Subscription])
subscriptions
                        let channelSubscriptions :: [Subscription]
channelSubscriptions = HashMap Channel [Subscription]
allSubscriptions
                                HashMap Channel [Subscription]
-> (HashMap Channel [Subscription] -> Maybe [Subscription])
-> Maybe [Subscription]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Channel -> HashMap Channel [Subscription] -> Maybe [Subscription]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup Channel
channel
                                Maybe [Subscription]
-> (Maybe [Subscription] -> [Subscription]) -> [Subscription]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> [Subscription] -> Maybe [Subscription] -> [Subscription]
forall a. a -> Maybe a -> a
fromMaybe []

                        [Subscription] -> (Element [Subscription] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Subscription]
channelSubscriptions \Element [Subscription]
subscription -> do
                            let inChan :: InChan Notification
inChan = Proxy "inChan" -> Subscription -> InChan Notification
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "inChan" (Proxy "inChan")
Proxy "inChan"
#inChan Element [Subscription]
Subscription
subscription
                            InChan Notification -> Callback
forall a. InChan a -> a -> IO ()
Queue.writeChan InChan Notification
inChan Notification
notification

    -- This outer loop restarts the listeners if the database connection dies (e.g. due to a timeout)
    IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
        Either SomeException Any
result <- IO Any -> IO (Either SomeException Any)
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try IO Any
innerLoop
        case Either SomeException Any
result of
            Left (SomeException
error :: SomeException) -> do
                case SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
error of
                    Just (AsyncCancelled
error :: AsyncCancelled) -> AsyncCancelled -> IO ()
forall a e. Exception e => e -> a
throw AsyncCancelled
error
                    Maybe AsyncCancelled
notification -> do
                        let ?context = ?modelContext -- Log onto the modelContext logger
                        String -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (String
"PGListener is going to restart, loop failed with exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
error)
            Right Any
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()


listenToChannel :: PG.Connection -> Channel -> IO ()
listenToChannel :: Connection -> Channel -> IO ()
listenToChannel Connection
databaseConnection Channel
channel = do
    Connection -> Query -> [Identifier] -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PG.execute Connection
databaseConnection Query
"LISTEN ?" [Text -> Identifier
PG.Identifier (Channel -> Text
forall a b. ConvertibleStrings a b => a -> b
cs Channel
channel)]
    () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()