Copyright | (c) digitally induced GmbH 2021 |
---|---|
Safe Haskell | None |
This module is solving the problem, that previously IHP was using one database connection
per running LISTEN ..;
statement. A PGListener
provides one central object to listen on
postgres channels, without manually dealing with connection management.
Synopsis
- type Channel = ByteString
- type Callback = Notification -> IO ()
- data Subscription = Subscription {}
- data PGListener = PGListener {
- modelContext :: !ModelContext
- listeningTo :: !(MVar (Set Channel))
- listenTo :: !(MVar Channel)
- subscriptions :: !(IORef (HashMap Channel [Subscription]))
- notifyLoopAsync :: !(Async ())
- init :: ModelContext -> IO PGListener
- stop :: PGListener -> IO ()
- subscribe :: Channel -> Callback -> PGListener -> IO Subscription
- subscribeJSON :: FromJSON jsonValue => Channel -> (jsonValue -> IO ()) -> PGListener -> IO Subscription
- unsubscribe :: Subscription -> PGListener -> IO ()
Documentation
type Channel = ByteString Source #
The channel is like the event name
It's used in the postgres NOTIFY call:
NOTIFY channel [ , payload ]
type Callback = Notification -> IO () Source #
An event callback receives the postgres notification object and can do IO
data Subscription Source #
Returned by a call to subscribe
data PGListener Source #
The main datatype of the service. Keeps tracks of all channels we're watching on, as well as all open subscriptions
PGListener | |
|
init :: ModelContext -> IO PGListener Source #
Creates a new PGListener
object
let modelContext = .. pgListener <- PGListener.init modelContext
This will start a new async listening for postgres notifications. This will take one connection
from the database pool and keep it blocked until stop
is called.
stop :: PGListener -> IO () Source #
Stops the database listener async and puts the database connection used back into the database pool
PGListener.stop pgListener
subscribe :: Channel -> Callback -> PGListener -> IO Subscription Source #
After you subscribed to a channel, the provided callback will be called whenever there's a new notification on the channel.
pgListener <- PGListener.init let callback notification = do let payload :: Text = cs (notification.notificationData) putStrLn ("Received notification: " <> payload) subscription <- pgListener |> PGListener.subscribe "my_channel" callback
The callback
function will now be called whenever NOTIFY "my_channel", "my payload"
is executed on the postgres server.
When the subscription is not used anymore, call unsubscribe
to stop the callback from being called anymore.
subscribeJSON :: FromJSON jsonValue => Channel -> (jsonValue -> IO ()) -> PGListener -> IO Subscription Source #
Like subscribe
but decodes the notification payload from JSON and passes the decoded data structure to the callback
When JSON parsing fails, this will ignore the notification.
pgListener <- PGListener.init let callback (jsonObject :: Aeson.Value) = do putStrLn ("Received notification: " <> tshow jsonObject) subscription <- pgListener |> PGListener.subscribeJSON "my_json_channel" callback
The callback
function will now be called whenever NOTIFY "my_json_channel", "{"hello":"world"}"
is executed on the postgres server.
unsubscribe :: Subscription -> PGListener -> IO () Source #
Stops the callback of a subscription from receiving further notifications
pgListener <- PGListener.init subscription <- pgListener |> PGListener.subscribe "my_channel" callback doSomethingExpensive pgListener |> PGListener.unsubscribe subscription