module IHP.PGListener
( Channel
, Callback
, Subscription (..)
, PGListener (..)
, init
, stop
, subscribe
, subscribeJSON
, unsubscribe
) where
import IHP.Prelude hiding (init)
import IHP.ModelSupport
import qualified Data.Set as Set
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.Notification as PG
import qualified Data.UUID.V4 as UUID
import Control.Concurrent.MVar (MVar)
import qualified Control.Concurrent.MVar as MVar
import Data.HashMap.Strict as HashMap
import qualified Control.Concurrent.Async as Async
import qualified Data.List as List
import qualified Data.Aeson as Aeson
import qualified IHP.Log as Log
import qualified Control.Exception as Exception
import qualified Control.Concurrent.Chan.Unagi as Queue
import qualified Control.Concurrent
type Channel = ByteString
type Callback = PG.Notification -> IO ()
data Subscription = Subscription
{ Subscription -> UUID
id :: !UUID
, Subscription -> Async ()
reader :: !(Async ())
, Subscription -> InChan Notification
inChan :: !(Queue.InChan PG.Notification)
, Subscription -> Channel
channel :: !Channel
}
data PGListener = PGListener
{ PGListener -> ModelContext
modelContext :: !ModelContext
, PGListener -> MVar (Set Channel)
listeningTo :: !(MVar (Set Channel))
, PGListener -> MVar Channel
listenTo :: !(MVar Channel)
, PGListener -> IORef (HashMap Channel [Subscription])
subscriptions :: !(IORef (HashMap Channel [Subscription]))
, PGListener -> Async ()
notifyLoopAsync :: !(Async ())
}
init :: ModelContext -> IO PGListener
init :: ModelContext -> IO PGListener
init ModelContext
modelContext = do
MVar (Set Channel)
listeningTo <- Set Channel -> IO (MVar (Set Channel))
forall a. a -> IO (MVar a)
MVar.newMVar Set Channel
forall a. Set a
Set.empty
IORef (HashMap Channel [Subscription])
subscriptions <- HashMap Channel [Subscription]
-> IO (IORef (HashMap Channel [Subscription]))
forall a. a -> IO (IORef a)
newIORef HashMap Channel [Subscription]
forall k v. HashMap k v
HashMap.empty
MVar Channel
listenTo <- IO (MVar Channel)
forall a. IO (MVar a)
MVar.newEmptyMVar
let ?modelContext = ?modelContext::ModelContext
ModelContext
modelContext
Async ()
notifyLoopAsync <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async ((?modelContext::ModelContext) =>
MVar (Set Channel)
-> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
MVar (Set Channel)
-> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
notifyLoop MVar (Set Channel)
listeningTo MVar Channel
listenTo IORef (HashMap Channel [Subscription])
subscriptions)
PGListener -> IO PGListener
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PGListener { ModelContext
modelContext :: ModelContext
modelContext :: ModelContext
modelContext, MVar (Set Channel)
listeningTo :: MVar (Set Channel)
listeningTo :: MVar (Set Channel)
listeningTo, IORef (HashMap Channel [Subscription])
subscriptions :: IORef (HashMap Channel [Subscription])
subscriptions :: IORef (HashMap Channel [Subscription])
subscriptions, MVar Channel
listenTo :: MVar Channel
listenTo :: MVar Channel
listenTo, Async ()
notifyLoopAsync :: Async ()
notifyLoopAsync :: Async ()
notifyLoopAsync }
stop :: PGListener -> IO ()
stop :: PGListener -> IO ()
stop PGListener { Async ()
notifyLoopAsync :: PGListener -> Async ()
notifyLoopAsync :: Async ()
notifyLoopAsync } = do
Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
notifyLoopAsync
subscribe :: Channel -> Callback -> PGListener -> IO Subscription
subscribe :: Channel -> Callback -> PGListener -> IO Subscription
subscribe Channel
channel Callback
callback PGListener
pgListener = do
UUID
id <- IO UUID
UUID.nextRandom
Channel -> PGListener -> IO ()
listenToChannelIfNeeded Channel
channel PGListener
pgListener
(InChan Notification
inChan, OutChan Notification
outChan) <- IO (InChan Notification, OutChan Notification)
forall a. IO (InChan a, OutChan a)
Queue.newChan
let
logException :: SomeException -> IO ()
logException :: SomeException -> IO ()
logException SomeException
exception = PGListener -> Text -> IO ()
logError PGListener
pgListener (Text
"Error in pg_notify handler: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
exception))
Async ()
reader <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
Notification
message <- OutChan Notification -> IO Notification
forall a. OutChan a -> IO a
Queue.readChan OutChan Notification
outChan
Callback
callback Notification
message IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` SomeException -> IO ()
logException
let subscription :: Subscription
subscription = Subscription { Async ()
Channel
UUID
InChan Notification
id :: UUID
reader :: Async ()
inChan :: InChan Notification
channel :: Channel
channel :: Channel
id :: UUID
inChan :: InChan Notification
reader :: Async ()
.. }
IORef (HashMap Channel [Subscription])
-> (HashMap Channel [Subscription]
-> HashMap Channel [Subscription])
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' (PGListener
pgListener.subscriptions) (([Subscription] -> [Subscription] -> [Subscription])
-> Channel
-> [Subscription]
-> HashMap Channel [Subscription]
-> HashMap Channel [Subscription]
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith [Subscription] -> [Subscription] -> [Subscription]
forall a. Monoid a => a -> a -> a
mappend Channel
channel [Subscription
subscription] )
Subscription -> IO Subscription
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Subscription
subscription
subscribeJSON :: Aeson.FromJSON jsonValue => Channel -> (jsonValue -> IO ()) -> PGListener -> IO Subscription
subscribeJSON :: forall jsonValue.
FromJSON jsonValue =>
Channel -> (jsonValue -> IO ()) -> PGListener -> IO Subscription
subscribeJSON Channel
channel jsonValue -> IO ()
callback PGListener
pgListener = Channel -> Callback -> PGListener -> IO Subscription
subscribe Channel
channel Callback
callback' PGListener
pgListener
where
callback' :: Callback
callback' Notification
notification = do
let payload :: Channel
payload = (Notification
notification.notificationData)
case Channel -> Maybe jsonValue
forall a. FromJSON a => Channel -> Maybe a
Aeson.decodeStrict' Channel
payload of
Just jsonValue
payload -> jsonValue -> IO ()
callback jsonValue
payload
Maybe jsonValue
Nothing -> PGListener -> Text -> IO ()
logError PGListener
pgListener (Text
"PGListener.subscribeJSON: Failed to parse " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Channel -> Text
forall a. Show a => a -> Text
tshow Channel
payload)
unsubscribe :: Subscription -> PGListener -> IO ()
unsubscribe :: Subscription -> PGListener -> IO ()
unsubscribe subscription :: Subscription
subscription@(Subscription { Async ()
Channel
UUID
InChan Notification
id :: Subscription -> UUID
reader :: Subscription -> Async ()
inChan :: Subscription -> InChan Notification
channel :: Subscription -> Channel
id :: UUID
reader :: Async ()
inChan :: InChan Notification
channel :: Channel
.. }) PGListener
pgListener = do
let
deleteById :: [Subscription] -> [Subscription]
deleteById :: [Subscription] -> [Subscription]
deleteById = (Subscription -> Subscription -> Bool)
-> Subscription -> [Subscription] -> [Subscription]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy (\Subscription
a Subscription
b -> Subscription
a.id UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Subscription
b.id) Subscription
subscription
IORef (HashMap Channel [Subscription])
-> (HashMap Channel [Subscription]
-> HashMap Channel [Subscription])
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' (PGListener
pgListener.subscriptions) (([Subscription] -> [Subscription])
-> Channel
-> HashMap Channel [Subscription]
-> HashMap Channel [Subscription]
forall k v.
(Eq k, Hashable k) =>
(v -> v) -> k -> HashMap k v -> HashMap k v
HashMap.adjust [Subscription] -> [Subscription]
deleteById Channel
channel)
Async () -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async ()
reader
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
listenToChannelIfNeeded :: Channel -> PGListener -> IO ()
listenToChannelIfNeeded :: Channel -> PGListener -> IO ()
listenToChannelIfNeeded Channel
channel PGListener
pgListener = do
Set Channel
listeningTo <- MVar (Set Channel) -> IO (Set Channel)
forall a. MVar a -> IO a
MVar.readMVar (PGListener
pgListener.listeningTo)
let alreadyListening :: Bool
alreadyListening = Channel
channel Channel -> Set Channel -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set Channel
listeningTo
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
alreadyListening do
MVar Channel -> Channel -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar (PGListener
pgListener.listenTo) Channel
channel
notifyLoop :: (?modelContext :: ModelContext) => MVar (Set Channel) -> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
notifyLoop :: (?modelContext::ModelContext) =>
MVar (Set Channel)
-> MVar Channel -> IORef (HashMap Channel [Subscription]) -> IO ()
notifyLoop MVar (Set Channel)
listeningToVar MVar Channel
listenToVar IORef (HashMap Channel [Subscription])
subscriptions = do
MVar Channel -> IO Channel
forall a. MVar a -> IO a
MVar.readMVar MVar Channel
listenToVar
let innerLoop :: IO Any
innerLoop = do
(Connection -> IO Any) -> IO Any
forall a.
(?modelContext::ModelContext) =>
(Connection -> IO a) -> IO a
withDatabaseConnection \Connection
databaseConnection -> do
Set Channel
listeningTo <- MVar (Set Channel) -> IO (Set Channel)
forall a. MVar a -> IO a
MVar.readMVar MVar (Set Channel)
listeningToVar
Set Channel -> (Element (Set Channel) -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach Set Channel
listeningTo (Connection -> Channel -> IO ()
listenToChannel Connection
databaseConnection)
let listenLoop :: IO Any
listenLoop = IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
Channel
channel <- MVar Channel -> IO Channel
forall a. MVar a -> IO a
MVar.takeMVar MVar Channel
listenToVar
MVar (Set Channel) -> (Set Channel -> IO (Set Channel)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
MVar.modifyMVar_ MVar (Set Channel)
listeningToVar \Set Channel
listeningTo -> do
let alreadyListening :: Bool
alreadyListening = Channel
channel Channel -> Set Channel -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set Channel
listeningTo
if Bool
alreadyListening
then Set Channel -> IO (Set Channel)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Set Channel
listeningTo
else do
Connection -> Channel -> IO ()
listenToChannel Connection
databaseConnection Channel
channel
Set Channel -> IO (Set Channel)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Channel -> Set Channel -> Set Channel
forall a. Ord a => a -> Set a -> Set a
Set.insert Channel
channel Set Channel
listeningTo)
IO Any -> (Async Any -> IO Any) -> IO Any
forall a b. IO a -> (Async a -> IO b) -> IO b
Async.withAsync IO Any
listenLoop \Async Any
listenLoopAsync -> do
IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
Notification
notification <- Connection -> IO Notification
PG.getNotification Connection
databaseConnection
let channel :: Channel
channel = Notification
notification.notificationChannel
HashMap Channel [Subscription]
allSubscriptions <- IORef (HashMap Channel [Subscription])
-> IO (HashMap Channel [Subscription])
forall a. IORef a -> IO a
readIORef IORef (HashMap Channel [Subscription])
subscriptions
let channelSubscriptions :: [Subscription]
channelSubscriptions = HashMap Channel [Subscription]
allSubscriptions
HashMap Channel [Subscription]
-> (HashMap Channel [Subscription] -> Maybe [Subscription])
-> Maybe [Subscription]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Channel -> HashMap Channel [Subscription] -> Maybe [Subscription]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup Channel
channel
Maybe [Subscription]
-> (Maybe [Subscription] -> [Subscription]) -> [Subscription]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> [Subscription] -> Maybe [Subscription] -> [Subscription]
forall a. a -> Maybe a -> a
fromMaybe []
[Subscription] -> (Element [Subscription] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Subscription]
channelSubscriptions \Element [Subscription]
subscription -> do
let inChan :: InChan Notification
inChan = Element [Subscription]
Subscription
subscription.inChan
InChan Notification -> Callback
forall a. InChan a -> a -> IO ()
Queue.writeChan InChan Notification
inChan Notification
notification
let initialDelay :: Int
initialDelay = Int
500 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
let maxDelay :: Int
maxDelay = Int
60 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
let retryLoop :: Int -> Bool -> IO ()
retryLoop Int
delay Bool
isFirstError = do
Either SomeException Any
result <- IO Any -> IO (Either SomeException Any)
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try IO Any
innerLoop
case Either SomeException Any
result of
Left (SomeException
error :: SomeException) -> do
case SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
error of
Just (AsyncCancelled
error :: AsyncCancelled) -> AsyncCancelled -> IO ()
forall a e. Exception e => e -> a
throw AsyncCancelled
error
Maybe AsyncCancelled
notification -> do
let ?context = ?context::ModelContext
?modelContext::ModelContext
ModelContext
?modelContext
if Bool
isFirstError then do
String -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (String
"PGListener is going to restart, loop failed with exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
error) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
". Retrying immediately.")
Int -> Bool -> IO ()
retryLoop Int
delay Bool
False
else do
let increasedDelay :: Int
increasedDelay = Int
delay Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2
let nextDelay :: Int
nextDelay = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
increasedDelay Int
maxDelay
String -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.info (String
"PGListener is going to restart, loop failed with exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
error) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
". Retrying in " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a b. ConvertibleStrings a b => a -> b
cs (Int -> Text
printTimeToNextRetry Int
delay) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
".")
Int -> IO ()
Control.Concurrent.threadDelay Int
delay
Int -> Bool -> IO ()
retryLoop Int
nextDelay Bool
False
Right Any
_ ->
Int -> Bool -> IO ()
retryLoop Int
initialDelay Bool
True
Int -> Bool -> IO ()
retryLoop Int
initialDelay Bool
True
printTimeToNextRetry :: Int -> Text
printTimeToNextRetry :: Int -> Text
printTimeToNextRetry Int
microseconds
| Int
microseconds Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000000000 = Int -> Text
forall a. Show a => a -> Text
show (Int
microseconds Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
1000000000) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" min"
| Int
microseconds Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000000 = Int -> Text
forall a. Show a => a -> Text
show (Int
microseconds Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
1000000) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" s"
| Int
microseconds Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000 = Int -> Text
forall a. Show a => a -> Text
show (Int
microseconds Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
1000) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ms"
| Bool
otherwise = Int -> Text
forall a. Show a => a -> Text
show Int
microseconds Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" µs"
listenToChannel :: PG.Connection -> Channel -> IO ()
listenToChannel :: Connection -> Channel -> IO ()
listenToChannel Connection
databaseConnection Channel
channel = do
Connection -> Query -> [Identifier] -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PG.execute Connection
databaseConnection Query
"LISTEN ?" [Text -> Identifier
PG.Identifier (Channel -> Text
forall a b. ConvertibleStrings a b => a -> b
cs Channel
channel)]
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
logError :: PGListener -> Text -> IO ()
logError :: PGListener -> Text -> IO ()
logError PGListener
pgListener Text
message = let ?context = PGListener
pgListener.modelContext in Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error Text
message