module IHP.DataSync.ChangeNotifications
( channelName
, ChangeNotification (..)
, Change (..)
, createNotificationFunction
, installTableChangeTriggers
, makeCachedInstallTableChangeTriggers
) where

import IHP.Prelude
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.Notification as PG
import IHP.ModelSupport
import Data.String.Interpolate.IsString (i)
import Data.Aeson
import Data.Aeson.TH
import qualified Control.Concurrent.MVar as MVar
import IHP.DataSync.DynamicQuery (transformColumnNamesToFieldNames)
import qualified IHP.DataSync.RowLevelSecurity as RLS
import qualified IHP.PGListener as PGListener
import Data.Set (Set)
import qualified Data.Set as Set

data ChangeNotification
    = DidInsert { ChangeNotification -> UUID
id :: !UUID }
    | DidUpdate { id :: !UUID, ChangeNotification -> [Change]
changeSet :: ![Change] }
    | DidDelete { id :: !UUID }
    deriving (ChangeNotification -> ChangeNotification -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ChangeNotification -> ChangeNotification -> Bool
$c/= :: ChangeNotification -> ChangeNotification -> Bool
== :: ChangeNotification -> ChangeNotification -> Bool
$c== :: ChangeNotification -> ChangeNotification -> Bool
Eq, Int -> ChangeNotification -> ShowS
[ChangeNotification] -> ShowS
ChangeNotification -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ChangeNotification] -> ShowS
$cshowList :: [ChangeNotification] -> ShowS
show :: ChangeNotification -> String
$cshow :: ChangeNotification -> String
showsPrec :: Int -> ChangeNotification -> ShowS
$cshowsPrec :: Int -> ChangeNotification -> ShowS
Show)

data Change = Change
    { Change -> Text
col :: !Text
    , Change -> Value
new :: !Value
    } deriving (Change -> Change -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Change -> Change -> Bool
$c/= :: Change -> Change -> Bool
== :: Change -> Change -> Bool
$c== :: Change -> Change -> Bool
Eq, Int -> Change -> ShowS
[Change] -> ShowS
Change -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Change] -> ShowS
$cshowList :: [Change] -> ShowS
show :: Change -> String
$cshow :: Change -> String
showsPrec :: Int -> Change -> ShowS
$cshowsPrec :: Int -> Change -> ShowS
Show)

-- | Returns the sql code to set up a database trigger. Mainly used by 'watchInsertOrUpdateTable'.
createNotificationFunction :: RLS.TableWithRLS -> PG.Query
createNotificationFunction :: TableWithRLS -> Query
createNotificationFunction TableWithRLS
table = [i|
    DO $$
    BEGIN
            CREATE FUNCTION #{functionName}() RETURNS TRIGGER AS $BODY$
                BEGIN
                    CASE TG_OP
                    WHEN 'UPDATE' THEN
                        PERFORM pg_notify(
                            '#{channelName table}',
                            json_build_object(
                              'UPDATE', NEW.id::text,
                              'CHANGESET', (
                                    SELECT json_agg(row_to_json(t))
                                    FROM (
                                          SELECT pre.key AS "col", post.value AS "new"
                                          FROM jsonb_each(to_jsonb(OLD)) AS pre
                                          CROSS JOIN jsonb_each(to_jsonb(NEW)) AS post
                                          WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value
                                    ) t
                              )
                            )::text
                        );
                    WHEN 'DELETE' THEN
                        PERFORM pg_notify(
                            '#{channelName table}',
                            (json_build_object('DELETE', OLD.id)::text)
                        );
                    WHEN 'INSERT' THEN
                        PERFORM pg_notify(
                            '#{channelName table}',
                            json_build_object('INSERT', NEW.id)::text
                        );
                    END CASE;
                    RETURN new;
                END;
            $BODY$ language plpgsql;
            DROP TRIGGER IF EXISTS #{insertTriggerName} ON #{tableName};
            DROP TRIGGER IF EXISTS #{updateTriggerName} ON #{tableName};
            DROP TRIGGER IF EXISTS #{deleteTriggerName} ON #{tableName};


            CREATE TRIGGER #{insertTriggerName} AFTER INSERT ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
            CREATE TRIGGER #{updateTriggerName} AFTER UPDATE ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
            CREATE TRIGGER #{deleteTriggerName} AFTER DELETE ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
        EXCEPTION
            WHEN duplicate_function THEN
            null;

    END; $$
|]

    where
        tableName :: Text
tableName = forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get forall a. IsLabel "tableName" a => a
#tableName TableWithRLS
table

        functionName :: Text
functionName = Text
"notify_did_change_" forall a. Semigroup a => a -> a -> a
<> Text
tableName
        insertTriggerName :: Text
insertTriggerName = Text
"did_insert_" forall a. Semigroup a => a -> a -> a
<> Text
tableName
        updateTriggerName :: Text
updateTriggerName = Text
"did_update_" forall a. Semigroup a => a -> a -> a
<> Text
tableName
        deleteTriggerName :: Text
deleteTriggerName = Text
"did_delete_" forall a. Semigroup a => a -> a -> a
<> Text
tableName

installTableChangeTriggers :: (?modelContext :: ModelContext) => RLS.TableWithRLS -> IO ()
installTableChangeTriggers :: (?modelContext::ModelContext) => TableWithRLS -> IO ()
installTableChangeTriggers TableWithRLS
tableNameRLS = do
    forall q.
(?modelContext::ModelContext, ToRow q) =>
Query -> q -> IO Int64
sqlExec (TableWithRLS -> Query
createNotificationFunction TableWithRLS
tableNameRLS) ()
    forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

makeCachedInstallTableChangeTriggers :: (?modelContext :: ModelContext) => IO (RLS.TableWithRLS -> IO ())
makeCachedInstallTableChangeTriggers :: (?modelContext::ModelContext) => IO (TableWithRLS -> IO ())
makeCachedInstallTableChangeTriggers = do
    IORef (Set TableWithRLS)
tables <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
Set.empty
    forall (f :: * -> *) a. Applicative f => a -> f a
pure \TableWithRLS
tableName -> do
        Bool
triggersInstalled <- forall a. Ord a => a -> Set a -> Bool
Set.member TableWithRLS
tableName forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef (Set TableWithRLS)
tables

        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
triggersInstalled do
            (?modelContext::ModelContext) => TableWithRLS -> IO ()
installTableChangeTriggers TableWithRLS
tableName
            forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Set TableWithRLS)
tables (forall a. Ord a => a -> Set a -> Set a
Set.insert TableWithRLS
tableName)

-- | Returns the event name of the event that the pg notify trigger dispatches
channelName :: RLS.TableWithRLS -> ByteString
channelName :: TableWithRLS -> ByteString
channelName TableWithRLS
table = ByteString
"did_change_" forall a. Semigroup a => a -> a -> a
<> (forall a b. ConvertibleStrings a b => a -> b
cs forall a b. (a -> b) -> a -> b
$ forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get forall a. IsLabel "tableName" a => a
#tableName TableWithRLS
table)


instance FromJSON ChangeNotification where
    parseJSON :: Value -> Parser ChangeNotification
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"ChangeNotification" forall a b. (a -> b) -> a -> b
$ \Object
values -> Object -> Parser ChangeNotification
insert Object
values forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Object -> Parser ChangeNotification
update Object
values forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Object -> Parser ChangeNotification
delete Object
values
        where
            insert :: Object -> Parser ChangeNotification
insert Object
values = do
                UUID
id <- Object
values forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"INSERT"
                forall (f :: * -> *) a. Applicative f => a -> f a
pure DidInsert { UUID
id :: UUID
$sel:id:DidInsert :: UUID
id }
            update :: Object -> Parser ChangeNotification
update Object
values = do
                UUID
id <- Object
values forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"UPDATE"
                [Change]
changeSet <- Object
values forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"CHANGESET"
                forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ UUID -> [Change] -> ChangeNotification
DidUpdate UUID
id [Change]
changeSet
            delete :: Object -> Parser ChangeNotification
delete Object
values = UUID -> ChangeNotification
DidDelete forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
values forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"DELETE"


instance FromJSON Change where
    parseJSON :: Value -> Parser Change
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Change" forall a b. (a -> b) -> a -> b
$ \Object
values -> do
        Text
col <- Object
values forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"col"
        Value
new <- Object
values forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"new"
        forall (f :: * -> *) a. Applicative f => a -> f a
pure Change { Text
Value
new :: Value
col :: Text
$sel:new:Change :: Value
$sel:col:Change :: Text
.. }

$(deriveToJSON defaultOptions 'Change)
$(deriveToJSON defaultOptions 'DidInsert)