module IHP.DataSync.ChangeNotifications
( channelName
, ChangeNotification (..)
, Change (..)
, ChangeSet (..)
, createNotificationFunction
, installTableChangeTriggers
, makeCachedInstallTableChangeTriggers
, retrieveChanges
) where
import IHP.Prelude
import qualified Database.PostgreSQL.Simple as PG
import IHP.ModelSupport
import Data.String.Interpolate.IsString (i)
import Data.Aeson
import Data.Aeson.TH
import qualified IHP.DataSync.RowLevelSecurity as RLS
import qualified Data.Set as Set
import qualified Data.UUID as UUID
data ChangeNotification
= DidInsert { ChangeNotification -> UUID
id :: !UUID }
| DidUpdate { id :: !UUID, ChangeNotification -> ChangeSet
changeSet :: !ChangeSet }
| DidUpdateLarge { id :: !UUID, ChangeNotification -> UUID
payloadId :: !UUID }
| DidDelete { id :: !UUID }
deriving (ChangeNotification -> ChangeNotification -> Bool
(ChangeNotification -> ChangeNotification -> Bool)
-> (ChangeNotification -> ChangeNotification -> Bool)
-> Eq ChangeNotification
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ChangeNotification -> ChangeNotification -> Bool
== :: ChangeNotification -> ChangeNotification -> Bool
$c/= :: ChangeNotification -> ChangeNotification -> Bool
/= :: ChangeNotification -> ChangeNotification -> Bool
Eq, Int -> ChangeNotification -> ShowS
[ChangeNotification] -> ShowS
ChangeNotification -> String
(Int -> ChangeNotification -> ShowS)
-> (ChangeNotification -> String)
-> ([ChangeNotification] -> ShowS)
-> Show ChangeNotification
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ChangeNotification -> ShowS
showsPrec :: Int -> ChangeNotification -> ShowS
$cshow :: ChangeNotification -> String
show :: ChangeNotification -> String
$cshowList :: [ChangeNotification] -> ShowS
showList :: [ChangeNotification] -> ShowS
Show)
data ChangeSet
= InlineChangeSet { ChangeSet -> [Change]
changeSet :: ![Change] }
| ExternalChangeSet { ChangeSet -> UUID
largePgNotificationId :: !UUID }
deriving (ChangeSet -> ChangeSet -> Bool
(ChangeSet -> ChangeSet -> Bool)
-> (ChangeSet -> ChangeSet -> Bool) -> Eq ChangeSet
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ChangeSet -> ChangeSet -> Bool
== :: ChangeSet -> ChangeSet -> Bool
$c/= :: ChangeSet -> ChangeSet -> Bool
/= :: ChangeSet -> ChangeSet -> Bool
Eq, Int -> ChangeSet -> ShowS
[ChangeSet] -> ShowS
ChangeSet -> String
(Int -> ChangeSet -> ShowS)
-> (ChangeSet -> String)
-> ([ChangeSet] -> ShowS)
-> Show ChangeSet
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ChangeSet -> ShowS
showsPrec :: Int -> ChangeSet -> ShowS
$cshow :: ChangeSet -> String
show :: ChangeSet -> String
$cshowList :: [ChangeSet] -> ShowS
showList :: [ChangeSet] -> ShowS
Show)
data Change = Change
{ Change -> Text
col :: !Text
, Change -> Value
new :: !Value
} deriving (Change -> Change -> Bool
(Change -> Change -> Bool)
-> (Change -> Change -> Bool) -> Eq Change
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Change -> Change -> Bool
== :: Change -> Change -> Bool
$c/= :: Change -> Change -> Bool
/= :: Change -> Change -> Bool
Eq, Int -> Change -> ShowS
[Change] -> ShowS
Change -> String
(Int -> Change -> ShowS)
-> (Change -> String) -> ([Change] -> ShowS) -> Show Change
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Change -> ShowS
showsPrec :: Int -> Change -> ShowS
$cshow :: Change -> String
show :: Change -> String
$cshowList :: [Change] -> ShowS
showList :: [Change] -> ShowS
Show)
createNotificationFunction :: RLS.TableWithRLS -> PG.Query
createNotificationFunction :: TableWithRLS -> Query
createNotificationFunction TableWithRLS
table = [i|
DO $$
BEGIN
CREATE FUNCTION "#{functionName}"() RETURNS TRIGGER AS $BODY$
DECLARE
payload TEXT;
large_pg_notification_id UUID;
changeset JSON;
BEGIN
CASE TG_OP
WHEN 'UPDATE' THEN
SELECT coalesce(json_agg(row_to_json(t)), '[]'::json)
FROM (
SELECT pre.key AS "col", post.value AS "new"
FROM jsonb_each(to_jsonb(OLD)) AS pre
CROSS JOIN jsonb_each(to_jsonb(NEW)) AS post
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value
) t INTO changeset;
payload := json_build_object(
'UPDATE', NEW.id::text,
'CHANGESET', changeset
)::text;
IF LENGTH(payload) > 7800 THEN
INSERT INTO large_pg_notifications (payload) VALUES (changeset) RETURNING id INTO large_pg_notification_id;
payload := json_build_object(
'UPDATE', NEW.id::text,
'CHANGESET', large_pg_notification_id::text
)::text;
DELETE FROM large_pg_notifications WHERE created_at < CURRENT_TIMESTAMP - interval '30s';
END IF;
PERFORM pg_notify(
'#{channelName table}',
payload
);
WHEN 'DELETE' THEN
PERFORM pg_notify(
'#{channelName table}',
(json_build_object('DELETE', OLD.id)::text)
);
WHEN 'INSERT' THEN
PERFORM pg_notify(
'#{channelName table}',
json_build_object('INSERT', NEW.id)::text
);
END CASE;
RETURN new;
END;
$BODY$ language plpgsql;
DROP TRIGGER IF EXISTS "#{insertTriggerName}" ON "#{tableName}";
DROP TRIGGER IF EXISTS "#{updateTriggerName}" ON "#{tableName}";
DROP TRIGGER IF EXISTS "#{deleteTriggerName}" ON "#{tableName}";
CREATE TRIGGER "#{insertTriggerName}" AFTER INSERT ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE "#{functionName}"();
CREATE TRIGGER "#{updateTriggerName}" AFTER UPDATE ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE "#{functionName}"();
CREATE TRIGGER "#{deleteTriggerName}" AFTER DELETE ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE "#{functionName}"();
EXCEPTION
WHEN duplicate_function THEN
null;
CREATE UNLOGGED TABLE IF NOT EXISTS large_pg_notifications (
id UUID DEFAULT uuid_generate_v4() PRIMARY KEY NOT NULL,
payload TEXT DEFAULT null,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);
CREATE INDEX IF NOT EXISTS large_pg_notifications_created_at_index ON large_pg_notifications (created_at);
END; $$
|]
where
tableName :: Text
tableName = TableWithRLS
table.tableName
functionName :: Text
functionName = Text
"notify_did_change_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
insertTriggerName :: Text
insertTriggerName = Text
"did_insert_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
updateTriggerName :: Text
updateTriggerName = Text
"did_update_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
deleteTriggerName :: Text
deleteTriggerName = Text
"did_delete_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
installTableChangeTriggers :: (?modelContext :: ModelContext) => RLS.TableWithRLS -> IO ()
installTableChangeTriggers :: (?modelContext::ModelContext) => TableWithRLS -> IO ()
installTableChangeTriggers TableWithRLS
tableNameRLS = do
Query -> () -> IO Int64
forall q.
(?modelContext::ModelContext, ToRow q) =>
Query -> q -> IO Int64
sqlExec (TableWithRLS -> Query
createNotificationFunction TableWithRLS
tableNameRLS) ()
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
makeCachedInstallTableChangeTriggers :: (?modelContext :: ModelContext) => IO (RLS.TableWithRLS -> IO ())
makeCachedInstallTableChangeTriggers :: (?modelContext::ModelContext) => IO (TableWithRLS -> IO ())
makeCachedInstallTableChangeTriggers = do
IORef (Set TableWithRLS)
tables <- Set TableWithRLS -> IO (IORef (Set TableWithRLS))
forall a. a -> IO (IORef a)
newIORef Set TableWithRLS
forall a. Set a
Set.empty
(TableWithRLS -> IO ()) -> IO (TableWithRLS -> IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure \TableWithRLS
tableName -> do
Bool
triggersInstalled <- TableWithRLS -> Set TableWithRLS -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member TableWithRLS
tableName (Set TableWithRLS -> Bool) -> IO (Set TableWithRLS) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set TableWithRLS) -> IO (Set TableWithRLS)
forall a. IORef a -> IO a
readIORef IORef (Set TableWithRLS)
tables
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
triggersInstalled do
(?modelContext::ModelContext) => TableWithRLS -> IO ()
TableWithRLS -> IO ()
installTableChangeTriggers TableWithRLS
tableName
IORef (Set TableWithRLS)
-> (Set TableWithRLS -> Set TableWithRLS) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Set TableWithRLS)
tables (TableWithRLS -> Set TableWithRLS -> Set TableWithRLS
forall a. Ord a => a -> Set a -> Set a
Set.insert TableWithRLS
tableName)
channelName :: RLS.TableWithRLS -> ByteString
channelName :: TableWithRLS -> ByteString
channelName TableWithRLS
table = ByteString
"did_change_" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> (Text -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ TableWithRLS
table.tableName)
instance FromJSON ChangeNotification where
parseJSON :: Value -> Parser ChangeNotification
parseJSON = String
-> (Object -> Parser ChangeNotification)
-> Value
-> Parser ChangeNotification
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"ChangeNotification" ((Object -> Parser ChangeNotification)
-> Value -> Parser ChangeNotification)
-> (Object -> Parser ChangeNotification)
-> Value
-> Parser ChangeNotification
forall a b. (a -> b) -> a -> b
$ \Object
values -> Object -> Parser ChangeNotification
insert Object
values Parser ChangeNotification
-> Parser ChangeNotification -> Parser ChangeNotification
forall a. Parser a -> Parser a -> Parser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Object -> Parser ChangeNotification
update Object
values Parser ChangeNotification
-> Parser ChangeNotification -> Parser ChangeNotification
forall a. Parser a -> Parser a -> Parser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Object -> Parser ChangeNotification
delete Object
values
where
insert :: Object -> Parser ChangeNotification
insert Object
values = do
UUID
id <- Object
values Object -> Key -> Parser UUID
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"INSERT"
ChangeNotification -> Parser ChangeNotification
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DidInsert { UUID
id :: UUID
id :: UUID
id }
update :: Object -> Parser ChangeNotification
update Object
values = do
UUID
id <- Object
values Object -> Key -> Parser UUID
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"UPDATE"
ChangeSet
changeSet <- Object
values Object -> Key -> Parser ChangeSet
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"CHANGESET"
ChangeNotification -> Parser ChangeNotification
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ChangeNotification -> Parser ChangeNotification)
-> ChangeNotification -> Parser ChangeNotification
forall a b. (a -> b) -> a -> b
$ UUID -> ChangeSet -> ChangeNotification
DidUpdate UUID
id ChangeSet
changeSet
delete :: Object -> Parser ChangeNotification
delete Object
values = UUID -> ChangeNotification
DidDelete (UUID -> ChangeNotification)
-> Parser UUID -> Parser ChangeNotification
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
values Object -> Key -> Parser UUID
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"DELETE"
instance FromJSON ChangeSet where
parseJSON :: Value -> Parser ChangeSet
parseJSON array :: Value
array@(Array Array
v) = do
[Change]
changeSet <- Value -> Parser [Change]
forall a. FromJSON a => Value -> Parser a
parseJSON Value
array
ChangeSet -> Parser ChangeSet
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure InlineChangeSet { [Change]
changeSet :: [Change]
changeSet :: [Change]
changeSet }
parseJSON (String Text
id) = do
case Text -> Maybe UUID
UUID.fromText Text
id of
Just UUID
largePgNotificationId -> ChangeSet -> Parser ChangeSet
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ExternalChangeSet { UUID
largePgNotificationId :: UUID
largePgNotificationId :: UUID
largePgNotificationId }
Maybe UUID
Nothing -> String -> Parser ChangeSet
forall a. String -> Parser a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid UUID"
instance FromJSON Change where
parseJSON :: Value -> Parser Change
parseJSON = String -> (Object -> Parser Change) -> Value -> Parser Change
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Change" ((Object -> Parser Change) -> Value -> Parser Change)
-> (Object -> Parser Change) -> Value -> Parser Change
forall a b. (a -> b) -> a -> b
$ \Object
values -> do
Text
col <- Object
values Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"col"
Value
new <- Object
values Object -> Key -> Parser Value
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"new"
Change -> Parser Change
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Change { Text
Value
col :: Text
new :: Value
col :: Text
new :: Value
.. }
retrieveChanges :: (?modelContext :: ModelContext) => ChangeSet -> IO [Change]
retrieveChanges :: (?modelContext::ModelContext) => ChangeSet -> IO [Change]
retrieveChanges InlineChangeSet { [Change]
changeSet :: ChangeSet -> [Change]
changeSet :: [Change]
changeSet } = [Change] -> IO [Change]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Change]
changeSet
retrieveChanges ExternalChangeSet { UUID
largePgNotificationId :: ChangeSet -> UUID
largePgNotificationId :: UUID
largePgNotificationId } = do
(ByteString
payload :: ByteString) <- Query -> Only UUID -> IO ByteString
forall q value.
(?modelContext::ModelContext, ToRow q, FromField value) =>
Query -> q -> IO value
sqlQueryScalar Query
"SELECT payload FROM large_pg_notifications WHERE id = ? LIMIT 1" (UUID -> Only UUID
forall a. a -> Only a
PG.Only UUID
largePgNotificationId)
case ByteString -> Either String [Change]
forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict' ByteString
payload of
Left String
e -> String -> IO [Change]
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
e
Right [Change]
result -> [Change] -> IO [Change]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Change]
result
$(deriveToJSON defaultOptions 'Change)
$(deriveToJSON defaultOptions 'InlineChangeSet)
$(deriveToJSON defaultOptions 'DidInsert)