module IHP.DataSync.ChangeNotifications
( channelName
, ChangeNotification (..)
, Change (..)
, ChangeSet (..)
, createNotificationFunction
, installTableChangeTriggers
, makeCachedInstallTableChangeTriggers
, retrieveChanges
) where

import IHP.Prelude
import qualified Database.PostgreSQL.Simple as PG
import IHP.ModelSupport
import Data.String.Interpolate.IsString (i)
import Data.Aeson
import Data.Aeson.TH
import qualified IHP.DataSync.RowLevelSecurity as RLS
import qualified Data.Set as Set
import qualified Data.UUID as UUID

data ChangeNotification
    = DidInsert { ChangeNotification -> UUID
id :: !UUID }
    | DidUpdate { id :: !UUID, ChangeNotification -> ChangeSet
changeSet :: !ChangeSet }
    | DidUpdateLarge { id :: !UUID, ChangeNotification -> UUID
payloadId :: !UUID }
    | DidDelete { id :: !UUID }
    deriving (ChangeNotification -> ChangeNotification -> Bool
(ChangeNotification -> ChangeNotification -> Bool)
-> (ChangeNotification -> ChangeNotification -> Bool)
-> Eq ChangeNotification
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ChangeNotification -> ChangeNotification -> Bool
== :: ChangeNotification -> ChangeNotification -> Bool
$c/= :: ChangeNotification -> ChangeNotification -> Bool
/= :: ChangeNotification -> ChangeNotification -> Bool
Eq, Int -> ChangeNotification -> ShowS
[ChangeNotification] -> ShowS
ChangeNotification -> String
(Int -> ChangeNotification -> ShowS)
-> (ChangeNotification -> String)
-> ([ChangeNotification] -> ShowS)
-> Show ChangeNotification
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ChangeNotification -> ShowS
showsPrec :: Int -> ChangeNotification -> ShowS
$cshow :: ChangeNotification -> String
show :: ChangeNotification -> String
$cshowList :: [ChangeNotification] -> ShowS
showList :: [ChangeNotification] -> ShowS
Show)

data ChangeSet
    = InlineChangeSet { ChangeSet -> [Change]
changeSet :: ![Change] } -- | When the patch fits into the 8000 bytes limit of @pg_notify@
    | ExternalChangeSet { ChangeSet -> UUID
largePgNotificationId :: !UUID } -- | The patch is over 8000 bytes, so we have stored it in the @large_pg_notifications@ table
    deriving (ChangeSet -> ChangeSet -> Bool
(ChangeSet -> ChangeSet -> Bool)
-> (ChangeSet -> ChangeSet -> Bool) -> Eq ChangeSet
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ChangeSet -> ChangeSet -> Bool
== :: ChangeSet -> ChangeSet -> Bool
$c/= :: ChangeSet -> ChangeSet -> Bool
/= :: ChangeSet -> ChangeSet -> Bool
Eq, Int -> ChangeSet -> ShowS
[ChangeSet] -> ShowS
ChangeSet -> String
(Int -> ChangeSet -> ShowS)
-> (ChangeSet -> String)
-> ([ChangeSet] -> ShowS)
-> Show ChangeSet
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ChangeSet -> ShowS
showsPrec :: Int -> ChangeSet -> ShowS
$cshow :: ChangeSet -> String
show :: ChangeSet -> String
$cshowList :: [ChangeSet] -> ShowS
showList :: [ChangeSet] -> ShowS
Show)

data Change = Change
    { Change -> Text
col :: !Text
    , Change -> Value
new :: !Value
    } deriving (Change -> Change -> Bool
(Change -> Change -> Bool)
-> (Change -> Change -> Bool) -> Eq Change
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Change -> Change -> Bool
== :: Change -> Change -> Bool
$c/= :: Change -> Change -> Bool
/= :: Change -> Change -> Bool
Eq, Int -> Change -> ShowS
[Change] -> ShowS
Change -> String
(Int -> Change -> ShowS)
-> (Change -> String) -> ([Change] -> ShowS) -> Show Change
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Change -> ShowS
showsPrec :: Int -> Change -> ShowS
$cshow :: Change -> String
show :: Change -> String
$cshowList :: [Change] -> ShowS
showList :: [Change] -> ShowS
Show)

-- | Returns the sql code to set up a database trigger. Mainly used by 'watchInsertOrUpdateTable'.
createNotificationFunction :: RLS.TableWithRLS -> PG.Query
createNotificationFunction :: TableWithRLS -> Query
createNotificationFunction TableWithRLS
table = [i|
    DO $$
    BEGIN
            CREATE FUNCTION #{functionName}() RETURNS TRIGGER AS $BODY$
                DECLARE
                    payload TEXT;
                    large_pg_notification_id UUID;
                    changeset JSON;
                BEGIN
                    CASE TG_OP
                    WHEN 'UPDATE' THEN
                        SELECT coalesce(json_agg(row_to_json(t)), '[]'::json)
                                FROM (
                                      SELECT pre.key AS "col", post.value AS "new"
                                      FROM jsonb_each(to_jsonb(OLD)) AS pre
                                      CROSS JOIN jsonb_each(to_jsonb(NEW)) AS post
                                      WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value
                                ) t INTO changeset;
                        payload := json_build_object(
                          'UPDATE', NEW.id::text,
                          'CHANGESET', changeset
                        )::text;
                        IF LENGTH(payload) > 7800 THEN
                            INSERT INTO large_pg_notifications (payload) VALUES (changeset) RETURNING id INTO large_pg_notification_id;
                            payload := json_build_object(
                                'UPDATE', NEW.id::text,
                                'CHANGESET', large_pg_notification_id::text
                            )::text;
                            DELETE FROM large_pg_notifications WHERE created_at < CURRENT_TIMESTAMP - interval '30s';
                        END IF;
                        PERFORM pg_notify(
                            '#{channelName table}',
                            payload
                        );
                    WHEN 'DELETE' THEN
                        PERFORM pg_notify(
                            '#{channelName table}',
                            (json_build_object('DELETE', OLD.id)::text)
                        );
                    WHEN 'INSERT' THEN
                        PERFORM pg_notify(
                            '#{channelName table}',
                            json_build_object('INSERT', NEW.id)::text
                        );
                    END CASE;
                    RETURN new;
                END;
            $BODY$ language plpgsql;
            DROP TRIGGER IF EXISTS #{insertTriggerName} ON #{tableName};
            DROP TRIGGER IF EXISTS #{updateTriggerName} ON #{tableName};
            DROP TRIGGER IF EXISTS #{deleteTriggerName} ON #{tableName};


            CREATE TRIGGER #{insertTriggerName} AFTER INSERT ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
            CREATE TRIGGER #{updateTriggerName} AFTER UPDATE ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
            CREATE TRIGGER #{deleteTriggerName} AFTER DELETE ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
        EXCEPTION
            WHEN duplicate_function THEN
            null;
        
        CREATE UNLOGGED TABLE IF NOT EXISTS large_pg_notifications (
            id UUID DEFAULT uuid_generate_v4() PRIMARY KEY NOT NULL,
            payload TEXT DEFAULT null,
            created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
        );
        CREATE INDEX IF NOT EXISTS large_pg_notifications_created_at_index ON large_pg_notifications (created_at);
    END; $$
|]

    where
        tableName :: Text
tableName = TableWithRLS
table.tableName

        functionName :: Text
functionName = Text
"notify_did_change_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
        insertTriggerName :: Text
insertTriggerName = Text
"did_insert_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
        updateTriggerName :: Text
updateTriggerName = Text
"did_update_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
        deleteTriggerName :: Text
deleteTriggerName = Text
"did_delete_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName

installTableChangeTriggers :: (?modelContext :: ModelContext) => RLS.TableWithRLS -> IO ()
installTableChangeTriggers :: (?modelContext::ModelContext) => TableWithRLS -> IO ()
installTableChangeTriggers TableWithRLS
tableNameRLS = do
    Query -> () -> IO Int64
forall q.
(?modelContext::ModelContext, ToRow q) =>
Query -> q -> IO Int64
sqlExec (TableWithRLS -> Query
createNotificationFunction TableWithRLS
tableNameRLS) ()
    () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

makeCachedInstallTableChangeTriggers :: (?modelContext :: ModelContext) => IO (RLS.TableWithRLS -> IO ())
makeCachedInstallTableChangeTriggers :: (?modelContext::ModelContext) => IO (TableWithRLS -> IO ())
makeCachedInstallTableChangeTriggers = do
    IORef (Set TableWithRLS)
tables <- Set TableWithRLS -> IO (IORef (Set TableWithRLS))
forall a. a -> IO (IORef a)
newIORef Set TableWithRLS
forall a. Set a
Set.empty
    (TableWithRLS -> IO ()) -> IO (TableWithRLS -> IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure \TableWithRLS
tableName -> do
        Bool
triggersInstalled <- TableWithRLS -> Set TableWithRLS -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member TableWithRLS
tableName (Set TableWithRLS -> Bool) -> IO (Set TableWithRLS) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set TableWithRLS) -> IO (Set TableWithRLS)
forall a. IORef a -> IO a
readIORef IORef (Set TableWithRLS)
tables

        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
triggersInstalled do
            (?modelContext::ModelContext) => TableWithRLS -> IO ()
TableWithRLS -> IO ()
installTableChangeTriggers TableWithRLS
tableName
            IORef (Set TableWithRLS)
-> (Set TableWithRLS -> Set TableWithRLS) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Set TableWithRLS)
tables (TableWithRLS -> Set TableWithRLS -> Set TableWithRLS
forall a. Ord a => a -> Set a -> Set a
Set.insert TableWithRLS
tableName)

-- | Returns the event name of the event that the pg notify trigger dispatches
channelName :: RLS.TableWithRLS -> ByteString
channelName :: TableWithRLS -> ByteString
channelName TableWithRLS
table = ByteString
"did_change_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> (Text -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ TableWithRLS
table.tableName)


instance FromJSON ChangeNotification where
    parseJSON :: Value -> Parser ChangeNotification
parseJSON = String
-> (Object -> Parser ChangeNotification)
-> Value
-> Parser ChangeNotification
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"ChangeNotification" ((Object -> Parser ChangeNotification)
 -> Value -> Parser ChangeNotification)
-> (Object -> Parser ChangeNotification)
-> Value
-> Parser ChangeNotification
forall a b. (a -> b) -> a -> b
$ \Object
values -> Object -> Parser ChangeNotification
insert Object
values Parser ChangeNotification
-> Parser ChangeNotification -> Parser ChangeNotification
forall a. Parser a -> Parser a -> Parser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Object -> Parser ChangeNotification
update Object
values Parser ChangeNotification
-> Parser ChangeNotification -> Parser ChangeNotification
forall a. Parser a -> Parser a -> Parser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Object -> Parser ChangeNotification
delete Object
values
        where
            insert :: Object -> Parser ChangeNotification
insert Object
values = do
                UUID
id <- Object
values Object -> Key -> Parser UUID
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"INSERT"
                ChangeNotification -> Parser ChangeNotification
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DidInsert { UUID
$sel:id:DidInsert :: UUID
id :: UUID
id }
            update :: Object -> Parser ChangeNotification
update Object
values = do
                UUID
id <- Object
values Object -> Key -> Parser UUID
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"UPDATE"
                ChangeSet
changeSet <- Object
values Object -> Key -> Parser ChangeSet
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"CHANGESET"
                ChangeNotification -> Parser ChangeNotification
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ChangeNotification -> Parser ChangeNotification)
-> ChangeNotification -> Parser ChangeNotification
forall a b. (a -> b) -> a -> b
$ UUID -> ChangeSet -> ChangeNotification
DidUpdate UUID
id ChangeSet
changeSet
            delete :: Object -> Parser ChangeNotification
delete Object
values = UUID -> ChangeNotification
DidDelete (UUID -> ChangeNotification)
-> Parser UUID -> Parser ChangeNotification
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
values Object -> Key -> Parser UUID
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"DELETE"

instance FromJSON ChangeSet where
    parseJSON :: Value -> Parser ChangeSet
parseJSON array :: Value
array@(Array Array
v) = do
            [Change]
changeSet <- Value -> Parser [Change]
forall a. FromJSON a => Value -> Parser a
parseJSON Value
array
            ChangeSet -> Parser ChangeSet
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure InlineChangeSet { [Change]
$sel:changeSet:InlineChangeSet :: [Change]
changeSet :: [Change]
changeSet }
    parseJSON (String Text
id) = do
        case Text -> Maybe UUID
UUID.fromText Text
id of
            Just UUID
largePgNotificationId -> ChangeSet -> Parser ChangeSet
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ExternalChangeSet { UUID
$sel:largePgNotificationId:InlineChangeSet :: UUID
largePgNotificationId :: UUID
largePgNotificationId }
            Maybe UUID
Nothing -> String -> Parser ChangeSet
forall a. String -> Parser a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid UUID"

instance FromJSON Change where
    parseJSON :: Value -> Parser Change
parseJSON = String -> (Object -> Parser Change) -> Value -> Parser Change
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Change" ((Object -> Parser Change) -> Value -> Parser Change)
-> (Object -> Parser Change) -> Value -> Parser Change
forall a b. (a -> b) -> a -> b
$ \Object
values -> do
        Text
col <- Object
values Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"col"
        Value
new <- Object
values Object -> Key -> Parser Value
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"new"
        Change -> Parser Change
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Change { Text
Value
$sel:col:Change :: Text
$sel:new:Change :: Value
col :: Text
new :: Value
.. }
-- | The @pg_notify@ function has a payload limit of 8000 bytes. When a record update is larger than the payload size
-- we store the patch in the @large_pg_notifications@ table and pass over the id to the patch.
--
-- This function retrieves the patch from the @large_pg_notifications@ table, or directly returns the patch
-- when it's less than 8000 bytes.
retrieveChanges :: (?modelContext :: ModelContext) => ChangeSet -> IO [Change]
retrieveChanges :: (?modelContext::ModelContext) => ChangeSet -> IO [Change]
retrieveChanges InlineChangeSet { [Change]
$sel:changeSet:InlineChangeSet :: ChangeSet -> [Change]
changeSet :: [Change]
changeSet } = [Change] -> IO [Change]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Change]
changeSet
retrieveChanges ExternalChangeSet { UUID
$sel:largePgNotificationId:InlineChangeSet :: ChangeSet -> UUID
largePgNotificationId :: UUID
largePgNotificationId } = do
    (ByteString
payload :: ByteString) <- Query -> Only UUID -> IO ByteString
forall q value.
(?modelContext::ModelContext, ToRow q, FromField value) =>
Query -> q -> IO value
sqlQueryScalar Query
"SELECT payload FROM large_pg_notifications WHERE id = ? LIMIT 1" (UUID -> Only UUID
forall a. a -> Only a
PG.Only UUID
largePgNotificationId)
    case ByteString -> Either String [Change]
forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict' ByteString
payload of
        Left String
e -> String -> IO [Change]
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
e
        Right [Change]
result -> [Change] -> IO [Change]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Change]
result

$(deriveToJSON defaultOptions 'Change)
$(deriveToJSON defaultOptions 'InlineChangeSet)
$(deriveToJSON defaultOptions 'DidInsert)