module IHP.DataSync.ChangeNotifications
( channelName
, ChangeNotification (..)
, Change (..)
, createNotificationFunction
, installTableChangeTriggers
, makeCachedInstallTableChangeTriggers
) where

import IHP.Prelude
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.Notification as PG
import IHP.ModelSupport
import Data.String.Interpolate.IsString (i)
import Data.Aeson
import Data.Aeson.TH
import qualified Control.Concurrent.MVar as MVar
import IHP.DataSync.DynamicQuery (transformColumnNamesToFieldNames)
import qualified IHP.DataSync.RowLevelSecurity as RLS
import qualified IHP.PGListener as PGListener
import Data.Set (Set)
import qualified Data.Set as Set

data ChangeNotification
    = DidInsert { ChangeNotification -> UUID
id :: !UUID }
    | DidUpdate { id :: !UUID, ChangeNotification -> [Change]
changeSet :: ![Change] }
    | DidDelete { id :: !UUID }
    deriving (ChangeNotification -> ChangeNotification -> Bool
(ChangeNotification -> ChangeNotification -> Bool)
-> (ChangeNotification -> ChangeNotification -> Bool)
-> Eq ChangeNotification
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ChangeNotification -> ChangeNotification -> Bool
$c/= :: ChangeNotification -> ChangeNotification -> Bool
== :: ChangeNotification -> ChangeNotification -> Bool
$c== :: ChangeNotification -> ChangeNotification -> Bool
Eq, Int -> ChangeNotification -> ShowS
[ChangeNotification] -> ShowS
ChangeNotification -> String
(Int -> ChangeNotification -> ShowS)
-> (ChangeNotification -> String)
-> ([ChangeNotification] -> ShowS)
-> Show ChangeNotification
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ChangeNotification] -> ShowS
$cshowList :: [ChangeNotification] -> ShowS
show :: ChangeNotification -> String
$cshow :: ChangeNotification -> String
showsPrec :: Int -> ChangeNotification -> ShowS
$cshowsPrec :: Int -> ChangeNotification -> ShowS
Show)

data Change = Change
    { Change -> Text
col :: !Text
    , Change -> Value
new :: !Value
    } deriving (Change -> Change -> Bool
(Change -> Change -> Bool)
-> (Change -> Change -> Bool) -> Eq Change
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Change -> Change -> Bool
$c/= :: Change -> Change -> Bool
== :: Change -> Change -> Bool
$c== :: Change -> Change -> Bool
Eq, Int -> Change -> ShowS
[Change] -> ShowS
Change -> String
(Int -> Change -> ShowS)
-> (Change -> String) -> ([Change] -> ShowS) -> Show Change
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Change] -> ShowS
$cshowList :: [Change] -> ShowS
show :: Change -> String
$cshow :: Change -> String
showsPrec :: Int -> Change -> ShowS
$cshowsPrec :: Int -> Change -> ShowS
Show)

-- | Returns the sql code to set up a database trigger. Mainly used by 'watchInsertOrUpdateTable'.
createNotificationFunction :: RLS.TableWithRLS -> PG.Query
createNotificationFunction :: TableWithRLS -> Query
createNotificationFunction TableWithRLS
table = [i|
    DO $$
    BEGIN
            CREATE FUNCTION #{functionName}() RETURNS TRIGGER AS $BODY$
                BEGIN
                    CASE TG_OP
                    WHEN 'UPDATE' THEN
                        PERFORM pg_notify(
                            '#{channelName table}',
                            json_build_object(
                              'UPDATE', NEW.id::text,
                              'CHANGESET', (
                                    SELECT json_agg(row_to_json(t))
                                    FROM (
                                          SELECT pre.key AS "col", post.value AS "new"
                                          FROM jsonb_each(to_jsonb(OLD)) AS pre
                                          CROSS JOIN jsonb_each(to_jsonb(NEW)) AS post
                                          WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value
                                    ) t
                              )
                            )::text
                        );
                    WHEN 'DELETE' THEN
                        PERFORM pg_notify(
                            '#{channelName table}',
                            (json_build_object('DELETE', OLD.id)::text)
                        );
                    WHEN 'INSERT' THEN
                        PERFORM pg_notify(
                            '#{channelName table}',
                            json_build_object('INSERT', NEW.id)::text
                        );
                    END CASE;
                    RETURN new;
                END;
            $BODY$ language plpgsql;
            DROP TRIGGER IF EXISTS #{insertTriggerName} ON #{tableName};
            DROP TRIGGER IF EXISTS #{updateTriggerName} ON #{tableName};
            DROP TRIGGER IF EXISTS #{deleteTriggerName} ON #{tableName};


            CREATE TRIGGER #{insertTriggerName} AFTER INSERT ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
            CREATE TRIGGER #{updateTriggerName} AFTER UPDATE ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
            CREATE TRIGGER #{deleteTriggerName} AFTER DELETE ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
        EXCEPTION
            WHEN duplicate_function THEN
            null;

    END; $$
|]

    where
        tableName :: Text
tableName = Proxy "tableName" -> TableWithRLS -> Text
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "tableName" (Proxy "tableName")
Proxy "tableName"
#tableName TableWithRLS
table

        functionName :: Text
functionName = Text
"notify_did_change_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
        insertTriggerName :: Text
insertTriggerName = Text
"did_insert_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
        updateTriggerName :: Text
updateTriggerName = Text
"did_update_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
        deleteTriggerName :: Text
deleteTriggerName = Text
"did_delete_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName

installTableChangeTriggers :: (?modelContext :: ModelContext) => RLS.TableWithRLS -> IO ()
installTableChangeTriggers :: TableWithRLS -> IO ()
installTableChangeTriggers TableWithRLS
tableNameRLS = do
    Query -> () -> IO Int64
forall q.
(?modelContext::ModelContext, ToRow q, Show q) =>
Query -> q -> IO Int64
sqlExec (TableWithRLS -> Query
createNotificationFunction TableWithRLS
tableNameRLS) ()
    () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

makeCachedInstallTableChangeTriggers :: (?modelContext :: ModelContext) => IO (RLS.TableWithRLS -> IO ())
makeCachedInstallTableChangeTriggers :: IO (TableWithRLS -> IO ())
makeCachedInstallTableChangeTriggers = do
    IORef (Set TableWithRLS)
tables <- Set TableWithRLS -> IO (IORef (Set TableWithRLS))
forall a. a -> IO (IORef a)
newIORef Set TableWithRLS
forall a. Set a
Set.empty
    (TableWithRLS -> IO ()) -> IO (TableWithRLS -> IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure \TableWithRLS
tableName -> do
        Bool
triggersInstalled <- TableWithRLS -> Set TableWithRLS -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member TableWithRLS
tableName (Set TableWithRLS -> Bool) -> IO (Set TableWithRLS) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set TableWithRLS) -> IO (Set TableWithRLS)
forall a. IORef a -> IO a
readIORef IORef (Set TableWithRLS)
tables

        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
triggersInstalled do
            (?modelContext::ModelContext) => TableWithRLS -> IO ()
TableWithRLS -> IO ()
installTableChangeTriggers TableWithRLS
tableName
            IORef (Set TableWithRLS)
-> (Set TableWithRLS -> Set TableWithRLS) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Set TableWithRLS)
tables (TableWithRLS -> Set TableWithRLS -> Set TableWithRLS
forall a. Ord a => a -> Set a -> Set a
Set.insert TableWithRLS
tableName)

-- | Returns the event name of the event that the pg notify trigger dispatches
channelName :: RLS.TableWithRLS -> ByteString
channelName :: TableWithRLS -> ByteString
channelName TableWithRLS
table = ByteString
"did_change_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> (Text -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Proxy "tableName" -> TableWithRLS -> Text
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "tableName" (Proxy "tableName")
Proxy "tableName"
#tableName TableWithRLS
table)


instance FromJSON ChangeNotification where
    parseJSON :: Value -> Parser ChangeNotification
parseJSON = String
-> (Object -> Parser ChangeNotification)
-> Value
-> Parser ChangeNotification
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"ChangeNotification" ((Object -> Parser ChangeNotification)
 -> Value -> Parser ChangeNotification)
-> (Object -> Parser ChangeNotification)
-> Value
-> Parser ChangeNotification
forall a b. (a -> b) -> a -> b
$ \Object
values -> Object -> Parser ChangeNotification
insert Object
values Parser ChangeNotification
-> Parser ChangeNotification -> Parser ChangeNotification
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Object -> Parser ChangeNotification
update Object
values Parser ChangeNotification
-> Parser ChangeNotification -> Parser ChangeNotification
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Object -> Parser ChangeNotification
delete Object
values
        where
            insert :: Object -> Parser ChangeNotification
insert Object
values = do
                UUID
id <- Object
values Object -> Text -> Parser UUID
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"INSERT"
                ChangeNotification -> Parser ChangeNotification
forall (f :: * -> *) a. Applicative f => a -> f a
pure DidInsert :: UUID -> ChangeNotification
DidInsert { UUID
id :: UUID
$sel:id:DidInsert :: UUID
id }
            update :: Object -> Parser ChangeNotification
update Object
values = do
                UUID
id <- Object
values Object -> Text -> Parser UUID
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"UPDATE"
                [Change]
changeSet <- Object
values Object -> Text -> Parser [Change]
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"CHANGESET"
                ChangeNotification -> Parser ChangeNotification
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ChangeNotification -> Parser ChangeNotification)
-> ChangeNotification -> Parser ChangeNotification
forall a b. (a -> b) -> a -> b
$ UUID -> [Change] -> ChangeNotification
DidUpdate UUID
id [Change]
changeSet
            delete :: Object -> Parser ChangeNotification
delete Object
values = UUID -> ChangeNotification
DidDelete (UUID -> ChangeNotification)
-> Parser UUID -> Parser ChangeNotification
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
values Object -> Text -> Parser UUID
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"DELETE"


instance FromJSON Change where
    parseJSON :: Value -> Parser Change
parseJSON = String -> (Object -> Parser Change) -> Value -> Parser Change
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Change" ((Object -> Parser Change) -> Value -> Parser Change)
-> (Object -> Parser Change) -> Value -> Parser Change
forall a b. (a -> b) -> a -> b
$ \Object
values -> do
        Text
col <- Object
values Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"col"
        Value
new <- Object
values Object -> Text -> Parser Value
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"new"
        Change -> Parser Change
forall (f :: * -> *) a. Applicative f => a -> f a
pure Change :: Text -> Value -> Change
Change { Text
Value
new :: Value
col :: Text
$sel:new:Change :: Value
$sel:col:Change :: Text
.. }

$(deriveToJSON defaultOptions 'DidInsert)
$(deriveToJSON defaultOptions 'Change)