{-# LANGUAGE UndecidableInstances #-}
module IHP.DataSync.ControllerImpl where
import IHP.ControllerPrelude hiding (OrderByClause)
import qualified Control.Exception as Exception
import qualified IHP.Log as Log
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Key as Aeson
import Data.Aeson.TH
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.ToField as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Data.HashMap.Strict as HashMap
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent.MVar as MVar
import IHP.DataSync.Types
import IHP.DataSync.RowLevelSecurity
import IHP.DataSync.DynamicQuery
import IHP.DataSync.DynamicQueryCompiler
import qualified IHP.DataSync.ChangeNotifications as ChangeNotifications
import IHP.DataSync.REST.Controller (aesonValueToPostgresValue)
import qualified Data.ByteString.Char8 as ByteString
import qualified IHP.PGListener as PGListener
import IHP.ApplicationContext
import qualified Data.Set as Set
import qualified Data.Pool as Pool
$(deriveFromJSON defaultOptions ''DataSyncMessage)
$(deriveToJSON defaultOptions 'DataSyncResult)
type EnsureRLSEnabledFn = Text -> IO TableWithRLS
type InstallTableChangeTriggerFn = TableWithRLS -> IO ()
type SendJSONFn = DataSyncResponse -> IO ()
type HandleCustomMessageFn = (DataSyncResponse -> IO ()) -> DataSyncMessage -> IO ()
runDataSyncController ::
( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
, ?applicationContext :: ApplicationContext
, ?context :: ControllerContext
, ?modelContext :: ModelContext
, ?state :: IORef DataSyncController
, PG.ToField (PrimaryKey (GetTableName CurrentUserRecord))
, Typeable CurrentUserRecord
, HasNewSessionUrl CurrentUserRecord
, Show (PrimaryKey (GetTableName CurrentUserRecord))
) => EnsureRLSEnabledFn -> InstallTableChangeTriggerFn -> IO ByteString -> SendJSONFn -> HandleCustomMessageFn -> (Text -> Renamer) -> IO ()
runDataSyncController :: (HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
?applicationContext::ApplicationContext,
?context::ControllerContext, ?modelContext::ModelContext,
?state::IORef DataSyncController,
ToField (PrimaryKey (GetTableName CurrentUserRecord)),
Typeable CurrentUserRecord, HasNewSessionUrl CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord))) =>
EnsureRLSEnabledFn
-> InstallTableChangeTriggerFn
-> IO ByteString
-> SendJSONFn
-> HandleCustomMessageFn
-> (Text -> Renamer)
-> IO ()
runDataSyncController EnsureRLSEnabledFn
ensureRLSEnabled InstallTableChangeTriggerFn
installTableChangeTriggers IO ByteString
receiveData SendJSONFn
sendJSON HandleCustomMessageFn
handleCustomMessage Text -> Renamer
renamer = do
DataSyncController -> IO ()
forall state. (?state::IORef state) => state -> IO ()
setState DataSyncReady { subscriptions :: HashMap UUID (MVar ())
subscriptions = HashMap UUID (MVar ())
forall k v. HashMap k v
HashMap.empty, transactions :: HashMap UUID DataSyncTransaction
transactions = HashMap UUID DataSyncTransaction
forall k v. HashMap k v
HashMap.empty, asyncs :: [Async ()]
asyncs = [] }
let DataSyncMessage -> IO ()
handleMessage :: DataSyncMessage -> IO () = (HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
?applicationContext::ApplicationContext,
?context::ControllerContext, ?modelContext::ModelContext,
?state::IORef DataSyncController,
ToField (PrimaryKey (GetTableName CurrentUserRecord)),
Typeable CurrentUserRecord, HasNewSessionUrl CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord))) =>
EnsureRLSEnabledFn
-> InstallTableChangeTriggerFn
-> SendJSONFn
-> HandleCustomMessageFn
-> (Text -> Renamer)
-> DataSyncMessage
-> IO ()
EnsureRLSEnabledFn
-> InstallTableChangeTriggerFn
-> SendJSONFn
-> HandleCustomMessageFn
-> (Text -> Renamer)
-> DataSyncMessage
-> IO ()
buildMessageHandler EnsureRLSEnabledFn
ensureRLSEnabled InstallTableChangeTriggerFn
installTableChangeTriggers SendJSONFn
sendJSON HandleCustomMessageFn
handleCustomMessage Text -> Renamer
renamer
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
Either String DataSyncMessage
message <- ByteString -> Either String DataSyncMessage
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecodeStrict' (ByteString -> Either String DataSyncMessage)
-> IO ByteString -> IO (Either String DataSyncMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ByteString
receiveData
case Either String DataSyncMessage
message of
Right DataSyncMessage
decodedMessage -> do
let requestId :: Int
requestId = DataSyncMessage
decodedMessage.requestId
((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
Exception.mask \forall a. IO a -> IO a
restore -> do
Async ()
handlerProcess <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
restore do
Either SomeException ()
result <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (DataSyncMessage -> IO ()
handleMessage DataSyncMessage
decodedMessage)
case Either SomeException ()
result of
Left (SomeException
e :: Exception.SomeException) -> do
let errorMessage :: Text
errorMessage = case SomeException -> Maybe EnhancedSqlError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just (EnhancedSqlError
enhancedSqlError :: EnhancedSqlError) -> ByteString -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (EnhancedSqlError
enhancedSqlError.sqlError.sqlErrorMsg)
Maybe EnhancedSqlError
Nothing -> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
e)
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
e)
SendJSONFn
sendJSON DataSyncError { Int
requestId :: Int
requestId :: Int
requestId, Text
errorMessage :: Text
errorMessage :: Text
errorMessage }
Right ()
result -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "asyncs"
-> ([Async ()] -> [Async ()])
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify Proxy "asyncs"
#asyncs (Async ()
handlerProcess:))
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left String
errorMessage -> SendJSONFn
sendJSON FailedToDecodeMessageError { errorMessage :: Text
errorMessage = String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs String
errorMessage }
{-# INLINE runDataSyncController #-}
buildMessageHandler ::
( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
, ?applicationContext :: ApplicationContext
, ?context :: ControllerContext
, ?modelContext :: ModelContext
, ?state :: IORef DataSyncController
, PG.ToField (PrimaryKey (GetTableName CurrentUserRecord))
, Typeable CurrentUserRecord
, HasNewSessionUrl CurrentUserRecord
, Show (PrimaryKey (GetTableName CurrentUserRecord))
)
=> EnsureRLSEnabledFn -> InstallTableChangeTriggerFn -> SendJSONFn -> HandleCustomMessageFn -> (Text -> Renamer) -> (DataSyncMessage -> IO ())
buildMessageHandler :: (HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
?applicationContext::ApplicationContext,
?context::ControllerContext, ?modelContext::ModelContext,
?state::IORef DataSyncController,
ToField (PrimaryKey (GetTableName CurrentUserRecord)),
Typeable CurrentUserRecord, HasNewSessionUrl CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord))) =>
EnsureRLSEnabledFn
-> InstallTableChangeTriggerFn
-> SendJSONFn
-> HandleCustomMessageFn
-> (Text -> Renamer)
-> DataSyncMessage
-> IO ()
buildMessageHandler EnsureRLSEnabledFn
ensureRLSEnabled InstallTableChangeTriggerFn
installTableChangeTriggers SendJSONFn
sendJSON HandleCustomMessageFn
handleCustomMessage Text -> Renamer
renamer = DataSyncMessage -> IO ()
handleMessage
where
pgListener :: PGListener
pgListener = ?applicationContext::ApplicationContext
ApplicationContext
?applicationContext.pgListener
handleMessage :: DataSyncMessage -> IO ()
handleMessage :: DataSyncMessage -> IO ()
handleMessage DataSyncQuery { DynamicSQLQuery
query :: DynamicSQLQuery
query :: DataSyncMessage -> DynamicSQLQuery
query, Int
requestId :: Int
requestId :: DataSyncMessage -> Int
requestId, Maybe UUID
transactionId :: Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId } = do
EnsureRLSEnabledFn
ensureRLSEnabled (DynamicSQLQuery
query.table)
let (Query
theQuery, [Action]
theParams) = Renamer -> DynamicSQLQuery -> (Query, [Action])
compileQueryWithRenamer (Text -> Renamer
renamer DynamicSQLQuery
query.table) DynamicSQLQuery
query
[[Field]]
rawResult :: [[Field]] <- Maybe UUID -> Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
theQuery [Action]
theParams
let result :: [[Field]]
result = ([Field] -> [Field]) -> [[Field]] -> [[Field]]
forall a b. (a -> b) -> [a] -> [b]
map ((Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer DynamicSQLQuery
query.table))) [[Field]]
rawResult
SendJSONFn
sendJSON DataSyncResult { [[Field]]
result :: [[Field]]
result :: [[Field]]
result, Int
requestId :: Int
requestId :: Int
requestId }
handleMessage CreateDataSubscription { DynamicSQLQuery
query :: DataSyncMessage -> DynamicSQLQuery
query :: DynamicSQLQuery
query, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId } = do
IO ()
(?state::IORef DataSyncController, ?context::ControllerContext) =>
IO ()
ensureBelowSubscriptionsLimit
TableWithRLS
tableNameRLS <- EnsureRLSEnabledFn
ensureRLSEnabled (DynamicSQLQuery
query.table)
UUID
subscriptionId <- IO UUID
UUID.nextRandom
MVar ()
close <- IO (MVar ())
forall a. IO (MVar a)
MVar.newEmptyMVar
IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "subscriptions"
-> (HashMap UUID (MVar ()) -> HashMap UUID (MVar ()))
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify Proxy "subscriptions"
#subscriptions (UUID -> MVar () -> HashMap UUID (MVar ()) -> HashMap UUID (MVar ())
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HashMap.insert UUID
subscriptionId MVar ()
close))
let (Query
theQuery, [Action]
theParams) = Renamer -> DynamicSQLQuery -> (Query, [Action])
compileQueryWithRenamer (Text -> Renamer
renamer DynamicSQLQuery
query.table) DynamicSQLQuery
query
[[Field]]
rawResult :: [[Field]] <- Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS Query
theQuery [Action]
theParams
let result :: [[Field]]
result = ([Field] -> [Field]) -> [[Field]] -> [[Field]]
forall a b. (a -> b) -> [a] -> [b]
map ((Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer DynamicSQLQuery
query.table))) [[Field]]
rawResult
let tableName :: Text
tableName = DynamicSQLQuery
query.table
let watchedRecordIds :: [UUID]
watchedRecordIds = [[Field]] -> [UUID]
recordIds [[Field]]
rawResult
IORef (Set UUID)
watchedRecordIdsRef <- Set UUID -> IO (IORef (Set UUID))
forall a. a -> IO (IORef a)
newIORef ([UUID] -> Set UUID
forall a. Ord a => [a] -> Set a
Set.fromList [UUID]
watchedRecordIds)
InstallTableChangeTriggerFn
installTableChangeTriggers TableWithRLS
tableNameRLS
let callback :: ChangeNotification -> IO ()
callback ChangeNotification
notification = case ChangeNotification
notification of
ChangeNotifications.DidInsert { UUID
id :: UUID
id :: ChangeNotification -> UUID
id } -> do
[[Field]]
newRecord :: [[Field]] <- Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS (Query
"SELECT * FROM (" Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
theQuery Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
") AS records WHERE records.id = ? LIMIT 1") ([Action]
theParams [Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [UUID -> Action
forall a. ToField a => a -> Action
PG.toField UUID
id])
case [[Field]] -> Maybe [Field]
forall a. [a] -> Maybe a
headMay [[Field]]
newRecord of
Just [Field]
rawRecord -> do
IORef (Set UUID) -> (Set UUID -> Set UUID) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
modifyIORef' IORef (Set UUID)
watchedRecordIdsRef (UUID -> Set UUID -> Set UUID
forall a. Ord a => a -> Set a -> Set a
Set.insert UUID
id)
let record :: [Field]
record = (Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer Text
tableName)) [Field]
rawRecord
SendJSONFn
sendJSON DidInsert { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, [Field]
record :: [Field]
record :: [Field]
record }
Maybe [Field]
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ChangeNotifications.DidUpdate { UUID
id :: ChangeNotification -> UUID
id :: UUID
id, ChangeSet
changeSet :: ChangeSet
changeSet :: ChangeNotification -> ChangeSet
changeSet } -> do
Bool
isWatchingRecord <- UUID -> Set UUID -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member UUID
id (Set UUID -> Bool) -> IO (Set UUID) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set UUID) -> IO (Set UUID)
forall a. IORef a -> IO a
readIORef IORef (Set UUID)
watchedRecordIdsRef
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isWatchingRecord do
[(PG.Only Bool
isRecordInResultSet)] <- Query -> [Action] -> IO [Only Bool]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS (Query
"SELECT EXISTS(SELECT * FROM (" Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
theQuery Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
") AS records WHERE records.id = ? LIMIT 1)") ([Action]
theParams [Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [UUID -> Action
forall a. ToField a => a -> Action
PG.toField UUID
id])
[Change]
changes <- (?modelContext::ModelContext) => ChangeSet -> IO [Change]
ChangeSet -> IO [Change]
ChangeNotifications.retrieveChanges ChangeSet
changeSet
if Bool
isRecordInResultSet
then SendJSONFn
sendJSON DidUpdate { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, UUID
id :: UUID
id :: UUID
id, changeSet :: Value
changeSet = Renamer -> [Change] -> Value
changesToValue (Text -> Renamer
renamer Text
tableName) [Change]
changes }
else SendJSONFn
sendJSON DidDelete { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, UUID
id :: UUID
id :: UUID
id }
ChangeNotifications.DidDelete { UUID
id :: ChangeNotification -> UUID
id :: UUID
id } -> do
Bool
isWatchingRecord <- UUID -> Set UUID -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member UUID
id (Set UUID -> Bool) -> IO (Set UUID) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set UUID) -> IO (Set UUID)
forall a. IORef a -> IO a
readIORef IORef (Set UUID)
watchedRecordIdsRef
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isWatchingRecord do
SendJSONFn
sendJSON DidDelete { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, UUID
id :: UUID
id :: UUID
id }
let subscribe :: IO Subscription
subscribe = ByteString
-> (ChangeNotification -> IO ()) -> PGListener -> IO Subscription
forall jsonValue.
FromJSON jsonValue =>
ByteString -> (jsonValue -> IO ()) -> PGListener -> IO Subscription
PGListener.subscribeJSON (TableWithRLS -> ByteString
ChangeNotifications.channelName TableWithRLS
tableNameRLS) ChangeNotification -> IO ()
callback PGListener
pgListener
let unsubscribe :: Subscription -> IO ()
unsubscribe Subscription
subscription = Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener
IO Subscription
-> (Subscription -> IO ()) -> (Subscription -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
Exception.bracket IO Subscription
subscribe Subscription -> IO ()
unsubscribe \Subscription
channelSubscription -> do
SendJSONFn
sendJSON DidCreateDataSubscription { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, Int
requestId :: Int
requestId :: Int
requestId, [[Field]]
result :: [[Field]]
result :: [[Field]]
result }
MVar () -> IO ()
forall a. MVar a -> IO a
MVar.takeMVar MVar ()
close
handleMessage DeleteDataSubscription { Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, UUID
subscriptionId :: UUID
subscriptionId :: DataSyncMessage -> UUID
subscriptionId } = do
DataSyncReady { HashMap UUID (MVar ())
subscriptions :: DataSyncController -> HashMap UUID (MVar ())
subscriptions :: HashMap UUID (MVar ())
subscriptions } <- IO DataSyncController
forall state. (?state::IORef state) => IO state
getState
case UUID -> HashMap UUID (MVar ()) -> Maybe (MVar ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup UUID
subscriptionId HashMap UUID (MVar ())
subscriptions of
Just MVar ()
closeSignalMVar -> do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar MVar ()
closeSignalMVar ()
IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "subscriptions"
-> (HashMap UUID (MVar ()) -> HashMap UUID (MVar ()))
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify Proxy "subscriptions"
#subscriptions (UUID -> HashMap UUID (MVar ()) -> HashMap UUID (MVar ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HashMap.delete UUID
subscriptionId))
SendJSONFn
sendJSON DidDeleteDataSubscription { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, Int
requestId :: Int
requestId :: Int
requestId }
Maybe (MVar ())
Nothing -> Text -> IO ()
forall a. Text -> a
error (Text
"Failed to delete DataSubscription, could not find DataSubscription with id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
forall a. Show a => a -> Text
tshow UUID
subscriptionId)
handleMessage CreateRecordMessage { Text
table :: Text
table :: DataSyncMessage -> Text
table, HashMap Text Value
record :: HashMap Text Value
record :: DataSyncMessage -> HashMap Text Value
record, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
EnsureRLSEnabledFn
ensureRLSEnabled Text
table
let query :: Query
query = Query
"INSERT INTO ? ? VALUES ? RETURNING *"
let columns :: [Text]
columns = HashMap Text Value
record
HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
[Text] -> ([Text] -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> Renamer
renamer Text
table).fieldToColumn
let values :: [Action]
values = HashMap Text Value
record
HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
[Value] -> ([Value] -> [Action]) -> [Action]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue
let params :: (Identifier, In [Identifier], In [Action])
params = (Text -> Identifier
PG.Identifier Text
table, [Identifier] -> In [Identifier]
forall a. a -> In a
PG.In ((Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier [Text]
columns), [Action] -> In [Action]
forall a. a -> In a
PG.In [Action]
values)
[[Field]]
result :: [[Field]] <- Maybe UUID
-> Query
-> (Identifier, In [Identifier], In [Action])
-> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
query (Identifier, In [Identifier], In [Action])
params
case [[Field]]
result of
[[Field]
rawRecord] ->
let
record :: [Field]
record = (Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer Text
table)) [Field]
rawRecord
in
SendJSONFn
sendJSON DidCreateRecord { Int
requestId :: Int
requestId :: Int
requestId, [Field]
record :: [Field]
record :: [Field]
record }
[[Field]]
otherwise -> Text -> IO ()
forall a. Text -> a
error Text
"Unexpected result in CreateRecordMessage handler"
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handleMessage CreateRecordsMessage { Text
table :: DataSyncMessage -> Text
table :: Text
table, [HashMap Text Value]
records :: [HashMap Text Value]
records :: DataSyncMessage -> [HashMap Text Value]
records, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
EnsureRLSEnabledFn
ensureRLSEnabled Text
table
let query :: Query
query = Query
"INSERT INTO ? ? ? RETURNING *"
let columns :: [Text]
columns = [HashMap Text Value]
records
[HashMap Text Value]
-> ([HashMap Text Value] -> Maybe (HashMap Text Value))
-> Maybe (HashMap Text Value)
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> [HashMap Text Value] -> Maybe (HashMap Text Value)
forall a. [a] -> Maybe a
head
Maybe (HashMap Text Value)
-> (Maybe (HashMap Text Value) -> HashMap Text Value)
-> HashMap Text Value
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> \case
Just HashMap Text Value
value -> HashMap Text Value
value
Maybe (HashMap Text Value)
Nothing -> Text -> HashMap Text Value
forall a. Text -> a
error Text
"Atleast one record is required"
HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
[Text] -> ([Text] -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> Renamer
renamer Text
table).fieldToColumn
let values :: [[Action]]
values = [HashMap Text Value]
records
[HashMap Text Value]
-> ([HashMap Text Value] -> [[Action]]) -> [[Action]]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (HashMap Text Value -> [Action])
-> [HashMap Text Value] -> [[Action]]
forall a b. (a -> b) -> [a] -> [b]
map (\HashMap Text Value
object ->
HashMap Text Value
object
HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
[Value] -> ([Value] -> [Action]) -> [Action]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue
)
let params :: (Identifier, In [Identifier], Values [Action])
params = (Text -> Identifier
PG.Identifier Text
table, [Identifier] -> In [Identifier]
forall a. a -> In a
PG.In ((Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier [Text]
columns), [QualifiedIdentifier] -> [[Action]] -> Values [Action]
forall a. [QualifiedIdentifier] -> [a] -> Values a
PG.Values [] [[Action]]
values)
[[Field]]
rawRecords :: [[Field]] <- Maybe UUID
-> Query
-> (Identifier, In [Identifier], Values [Action])
-> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
query (Identifier, In [Identifier], Values [Action])
params
let records :: [[Field]]
records = ([Field] -> [Field]) -> [[Field]] -> [[Field]]
forall a b. (a -> b) -> [a] -> [b]
map ((Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer Text
table))) [[Field]]
rawRecords
SendJSONFn
sendJSON DidCreateRecords { Int
requestId :: Int
requestId :: Int
requestId, [[Field]]
records :: [[Field]]
records :: [[Field]]
records }
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handleMessage UpdateRecordMessage { Text
table :: DataSyncMessage -> Text
table :: Text
table, UUID
id :: UUID
id :: DataSyncMessage -> UUID
id, HashMap Text Value
patch :: HashMap Text Value
patch :: DataSyncMessage -> HashMap Text Value
patch, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
EnsureRLSEnabledFn
ensureRLSEnabled Text
table
let columns :: [Identifier]
columns = HashMap Text Value
patch
HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
[Text] -> ([Text] -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
fieldNameToColumnName
[Text] -> ([Text] -> [Identifier]) -> [Identifier]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier
let values :: [Action]
values = HashMap Text Value
patch
HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
[Value] -> ([Value] -> [Action]) -> [Action]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue
let keyValues :: [(Identifier, Action)]
keyValues = [Identifier] -> [Action] -> [(Identifier, Action)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Identifier]
columns [Action]
values
let setCalls :: ByteString
setCalls = [(Identifier, Action)]
keyValues
[(Identifier, Action)]
-> ([(Identifier, Action)] -> [ByteString]) -> [ByteString]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> ((Identifier, Action) -> ByteString)
-> [(Identifier, Action)] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier, Action)
_ -> ByteString
"? = ?")
[ByteString] -> ([ByteString] -> ByteString) -> ByteString
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> ByteString -> [ByteString] -> ByteString
ByteString.intercalate ByteString
", "
let query :: ByteString
query = ByteString
"UPDATE ? SET " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
setCalls ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" WHERE id = ? RETURNING *"
let params :: [Action]
params = [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField (Text -> Identifier
PG.Identifier Text
table)]
[Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> ([[Action]] -> [Action]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (((Identifier, Action) -> [Action])
-> [(Identifier, Action)] -> [[Action]]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier
key, Action
value) -> [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField Identifier
key, Action
value]) [(Identifier, Action)]
keyValues))
[Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [UUID -> Action
forall a. ToField a => a -> Action
PG.toField UUID
id]
[[Field]]
result :: [[Field]] <- Maybe UUID -> Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId (ByteString -> Query
PG.Query ByteString
query) [Action]
params
case [[Field]]
result of
[[Field]
rawRecord] ->
let
record :: [Field]
record = (Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer Text
table)) [Field]
rawRecord
in
SendJSONFn
sendJSON DidUpdateRecord { Int
requestId :: Int
requestId :: Int
requestId, [Field]
record :: [Field]
record :: [Field]
record }
[[Field]]
otherwise -> Text -> IO ()
forall a. Text -> a
error Text
"Could not apply the update to the given record. Are you sure the record ID you passed is correct? If the record ID is correct, likely the row level security policy is not making the record visible to the UPDATE operation."
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handleMessage UpdateRecordsMessage { Text
table :: DataSyncMessage -> Text
table :: Text
table, [UUID]
ids :: [UUID]
ids :: DataSyncMessage -> [UUID]
ids, HashMap Text Value
patch :: DataSyncMessage -> HashMap Text Value
patch :: HashMap Text Value
patch, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
EnsureRLSEnabledFn
ensureRLSEnabled Text
table
let columns :: [Identifier]
columns = HashMap Text Value
patch
HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
[Text] -> ([Text] -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
fieldNameToColumnName
[Text] -> ([Text] -> [Identifier]) -> [Identifier]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier
let values :: [Action]
values = HashMap Text Value
patch
HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
[Value] -> ([Value] -> [Action]) -> [Action]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue
let keyValues :: [(Identifier, Action)]
keyValues = [Identifier] -> [Action] -> [(Identifier, Action)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Identifier]
columns [Action]
values
let setCalls :: ByteString
setCalls = [(Identifier, Action)]
keyValues
[(Identifier, Action)]
-> ([(Identifier, Action)] -> [ByteString]) -> [ByteString]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> ((Identifier, Action) -> ByteString)
-> [(Identifier, Action)] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier, Action)
_ -> ByteString
"? = ?")
[ByteString] -> ([ByteString] -> ByteString) -> ByteString
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> ByteString -> [ByteString] -> ByteString
ByteString.intercalate ByteString
", "
let query :: ByteString
query = ByteString
"UPDATE ? SET " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
setCalls ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" WHERE id IN ? RETURNING *"
let params :: [Action]
params = [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField (Text -> Identifier
PG.Identifier Text
table)]
[Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> ([[Action]] -> [Action]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (((Identifier, Action) -> [Action])
-> [(Identifier, Action)] -> [[Action]]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier
key, Action
value) -> [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField Identifier
key, Action
value]) [(Identifier, Action)]
keyValues))
[Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [In [UUID] -> Action
forall a. ToField a => a -> Action
PG.toField ([UUID] -> In [UUID]
forall a. a -> In a
PG.In [UUID]
ids)]
[[Field]]
rawRecords <- Maybe UUID -> Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId (ByteString -> Query
PG.Query ByteString
query) [Action]
params
let records :: [[Field]]
records = ([Field] -> [Field]) -> [[Field]] -> [[Field]]
forall a b. (a -> b) -> [a] -> [b]
map ((Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer Text
table))) [[Field]]
rawRecords
SendJSONFn
sendJSON DidUpdateRecords { Int
requestId :: Int
requestId :: Int
requestId, [[Field]]
records :: [[Field]]
records :: [[Field]]
records }
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handleMessage DeleteRecordMessage { Text
table :: DataSyncMessage -> Text
table :: Text
table, UUID
id :: DataSyncMessage -> UUID
id :: UUID
id, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
EnsureRLSEnabledFn
ensureRLSEnabled Text
table
Maybe UUID -> Query -> (Identifier, UUID) -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId Maybe UUID
transactionId Query
"DELETE FROM ? WHERE id = ?" (Text -> Identifier
PG.Identifier Text
table, UUID
id)
SendJSONFn
sendJSON DidDeleteRecord { Int
requestId :: Int
requestId :: Int
requestId }
handleMessage DeleteRecordsMessage { Text
table :: DataSyncMessage -> Text
table :: Text
table, [UUID]
ids :: DataSyncMessage -> [UUID]
ids :: [UUID]
ids, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
EnsureRLSEnabledFn
ensureRLSEnabled Text
table
Maybe UUID -> Query -> (Identifier, In [UUID]) -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId Maybe UUID
transactionId Query
"DELETE FROM ? WHERE id IN ?" (Text -> Identifier
PG.Identifier Text
table, [UUID] -> In [UUID]
forall a. a -> In a
PG.In [UUID]
ids)
SendJSONFn
sendJSON DidDeleteRecords { Int
requestId :: Int
requestId :: Int
requestId }
handleMessage StartTransaction { Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId } = do
IO ()
(?state::IORef DataSyncController, ?context::ControllerContext) =>
IO ()
ensureBelowTransactionLimit
UUID
transactionId <- IO UUID
UUID.nextRandom
let takeConnection :: IO (Connection, LocalPool Connection)
takeConnection = ?modelContext::ModelContext
ModelContext
?modelContext.connectionPool
Pool Connection
-> (Pool Connection -> IO (Connection, LocalPool Connection))
-> IO (Connection, LocalPool Connection)
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Pool Connection -> IO (Connection, LocalPool Connection)
forall a. Pool a -> IO (a, LocalPool a)
Pool.takeResource
let releaseConnection :: (Connection, LocalPool Connection) -> IO ()
releaseConnection (Connection
connection, LocalPool Connection
localPool) = do
Connection -> Query -> () -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PG.execute Connection
connection Query
"ROLLBACK" ()
LocalPool Connection -> Connection -> IO ()
forall a. LocalPool a -> a -> IO ()
Pool.putResource LocalPool Connection
localPool Connection
connection
IO (Connection, LocalPool Connection)
-> ((Connection, LocalPool Connection) -> IO ())
-> ((Connection, LocalPool Connection) -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
Exception.bracket IO (Connection, LocalPool Connection)
takeConnection (Connection, LocalPool Connection) -> IO ()
releaseConnection \(Connection
connection, LocalPool Connection
localPool) -> do
MVar ()
transactionSignal <- IO (MVar ())
forall a. IO (MVar a)
MVar.newEmptyMVar
let globalModelContext :: ModelContext
globalModelContext = ?modelContext::ModelContext
ModelContext
?modelContext
let ?modelContext = ModelContext
globalModelContext { transactionConnection = Just connection } in Query -> () -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId) =>
Query -> parameters -> IO Int64
sqlExecWithRLS Query
"BEGIN" ()
let transaction :: DataSyncTransaction
transaction = DataSyncTransaction
{ id :: UUID
id = UUID
transactionId
, Connection
connection :: Connection
connection :: Connection
connection
, close :: MVar ()
close = MVar ()
transactionSignal
}
IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "transactions"
-> (HashMap UUID DataSyncTransaction
-> HashMap UUID DataSyncTransaction)
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify Proxy "transactions"
#transactions (UUID
-> DataSyncTransaction
-> HashMap UUID DataSyncTransaction
-> HashMap UUID DataSyncTransaction
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HashMap.insert UUID
transactionId DataSyncTransaction
transaction))
SendJSONFn
sendJSON DidStartTransaction { Int
requestId :: Int
requestId :: Int
requestId, UUID
transactionId :: UUID
transactionId :: UUID
transactionId }
MVar () -> IO ()
forall a. MVar a -> IO a
MVar.takeMVar MVar ()
transactionSignal
IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "transactions"
-> (HashMap UUID DataSyncTransaction
-> HashMap UUID DataSyncTransaction)
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify Proxy "transactions"
#transactions (UUID
-> HashMap UUID DataSyncTransaction
-> HashMap UUID DataSyncTransaction
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HashMap.delete UUID
transactionId))
handleMessage RollbackTransaction { Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, UUID
id :: DataSyncMessage -> UUID
id :: UUID
id } = do
DataSyncTransaction { UUID
id :: DataSyncTransaction -> UUID
id :: UUID
id, MVar ()
close :: DataSyncTransaction -> MVar ()
close :: MVar ()
close } <- (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
UUID -> IO DataSyncTransaction
findTransactionById UUID
id
Maybe UUID -> Query -> () -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId (UUID -> Maybe UUID
forall a. a -> Maybe a
Just UUID
id) Query
"ROLLBACK" ()
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar MVar ()
close ()
SendJSONFn
sendJSON DidRollbackTransaction { Int
requestId :: Int
requestId :: Int
requestId, transactionId :: UUID
transactionId = UUID
id }
handleMessage CommitTransaction { Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, UUID
id :: DataSyncMessage -> UUID
id :: UUID
id } = do
DataSyncTransaction { UUID
id :: DataSyncTransaction -> UUID
id :: UUID
id, MVar ()
close :: DataSyncTransaction -> MVar ()
close :: MVar ()
close } <- (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
UUID -> IO DataSyncTransaction
findTransactionById UUID
id
Maybe UUID -> Query -> () -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId (UUID -> Maybe UUID
forall a. a -> Maybe a
Just UUID
id) Query
"COMMIT" ()
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar MVar ()
close ()
SendJSONFn
sendJSON DidCommitTransaction { Int
requestId :: Int
requestId :: Int
requestId, transactionId :: UUID
transactionId = UUID
id }
handleMessage DataSyncMessage
otherwise = HandleCustomMessageFn
handleCustomMessage SendJSONFn
sendJSON DataSyncMessage
otherwise
cleanupAllSubscriptions :: (?state :: IORef DataSyncController, ?applicationContext :: ApplicationContext) => IO ()
cleanupAllSubscriptions :: (?state::IORef DataSyncController,
?applicationContext::ApplicationContext) =>
IO ()
cleanupAllSubscriptions = do
DataSyncController
state <- IO DataSyncController
forall state. (?state::IORef state) => IO state
getState
let pgListener :: PGListener
pgListener = ?applicationContext::ApplicationContext
ApplicationContext
?applicationContext.pgListener
case DataSyncController
state of
DataSyncReady { [Async ()]
asyncs :: DataSyncController -> [Async ()]
asyncs :: [Async ()]
asyncs } -> [Async ()] -> (Element [Async ()] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
asyncs Async () -> IO ()
Element [Async ()] -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel
DataSyncController
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
changesToValue :: Renamer -> [ChangeNotifications.Change] -> Value
changesToValue :: Renamer -> [Change] -> Value
changesToValue Renamer
renamer [Change]
changes = [Pair] -> Value
object ((Change -> Pair) -> [Change] -> [Pair]
forall a b. (a -> b) -> [a] -> [b]
map Change -> Pair
changeToPair [Change]
changes)
where
changeToPair :: Change -> Pair
changeToPair ChangeNotifications.Change { Text
col :: Text
col :: Change -> Text
col, Value
new :: Value
new :: Change -> Value
new } = (Text -> Key
Aeson.fromText (Text -> Key) -> Text -> Key
forall a b. (a -> b) -> a -> b
$ Renamer
renamer.columnToField Text
col) Key -> Value -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Value
new
runInModelContextWithTransaction :: (?state :: IORef DataSyncController, ?modelContext :: ModelContext) => ((?modelContext :: ModelContext) => IO result) -> Maybe UUID -> IO result
runInModelContextWithTransaction :: forall result.
(?state::IORef DataSyncController, ?modelContext::ModelContext) =>
((?modelContext::ModelContext) => IO result)
-> Maybe UUID -> IO result
runInModelContextWithTransaction (?modelContext::ModelContext) => IO result
function (Just UUID
transactionId) = do
let globalModelContext :: ModelContext
globalModelContext = ?modelContext::ModelContext
ModelContext
?modelContext
DataSyncTransaction { Connection
connection :: DataSyncTransaction -> Connection
connection :: Connection
connection } <- (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
UUID -> IO DataSyncTransaction
findTransactionById UUID
transactionId
let
?modelContext = ModelContext
globalModelContext { transactionConnection = Just connection }
in
IO result
(?modelContext::ModelContext) => IO result
function
runInModelContextWithTransaction (?modelContext::ModelContext) => IO result
function Maybe UUID
Nothing = IO result
(?modelContext::ModelContext) => IO result
function
findTransactionById :: (?state :: IORef DataSyncController) => UUID -> IO DataSyncTransaction
findTransactionById :: (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
findTransactionById UUID
transactionId = do
HashMap UUID DataSyncTransaction
transactions <- (.transactions) (DataSyncController -> HashMap UUID DataSyncTransaction)
-> IO DataSyncController -> IO (HashMap UUID DataSyncTransaction)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef DataSyncController -> IO DataSyncController
forall a. IORef a -> IO a
readIORef ?state::IORef DataSyncController
IORef DataSyncController
?state
case UUID
-> HashMap UUID DataSyncTransaction -> Maybe DataSyncTransaction
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup UUID
transactionId HashMap UUID DataSyncTransaction
transactions of
Just DataSyncTransaction
transaction -> DataSyncTransaction -> IO DataSyncTransaction
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DataSyncTransaction
transaction
Maybe DataSyncTransaction
Nothing -> Text -> IO DataSyncTransaction
forall a. Text -> a
error Text
"No transaction with that id"
ensureBelowTransactionLimit :: (?state :: IORef DataSyncController, ?context :: ControllerContext) => IO ()
ensureBelowTransactionLimit :: (?state::IORef DataSyncController, ?context::ControllerContext) =>
IO ()
ensureBelowTransactionLimit = do
HashMap UUID DataSyncTransaction
transactions <- (.transactions) (DataSyncController -> HashMap UUID DataSyncTransaction)
-> IO DataSyncController -> IO (HashMap UUID DataSyncTransaction)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef DataSyncController -> IO DataSyncController
forall a. IORef a -> IO a
readIORef ?state::IORef DataSyncController
IORef DataSyncController
?state
let transactionCount :: Int
transactionCount = HashMap UUID DataSyncTransaction -> Int
forall k v. HashMap k v -> Int
HashMap.size HashMap UUID DataSyncTransaction
transactions
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
transactionCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
(?context::ControllerContext) => Int
maxTransactionsPerConnection) do
Text -> IO ()
forall a. Text -> a
error (Text
"You've reached the transaction limit of " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow Int
(?context::ControllerContext) => Int
maxTransactionsPerConnection Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" transactions")
ensureBelowSubscriptionsLimit :: (?state :: IORef DataSyncController, ?context :: ControllerContext) => IO ()
ensureBelowSubscriptionsLimit :: (?state::IORef DataSyncController, ?context::ControllerContext) =>
IO ()
ensureBelowSubscriptionsLimit = do
HashMap UUID (MVar ())
subscriptions <- (.subscriptions) (DataSyncController -> HashMap UUID (MVar ()))
-> IO DataSyncController -> IO (HashMap UUID (MVar ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef DataSyncController -> IO DataSyncController
forall a. IORef a -> IO a
readIORef ?state::IORef DataSyncController
IORef DataSyncController
?state
let subscriptionsCount :: Int
subscriptionsCount = HashMap UUID (MVar ()) -> Int
forall k v. HashMap k v -> Int
HashMap.size HashMap UUID (MVar ())
subscriptions
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
subscriptionsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
(?context::ControllerContext) => Int
maxSubscriptionsPerConnection) do
Text -> IO ()
forall a. Text -> a
error (Text
"You've reached the subscriptions limit of " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow Int
(?context::ControllerContext) => Int
maxSubscriptionsPerConnection Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" subscriptions")
maxTransactionsPerConnection :: (?context :: ControllerContext) => Int
maxTransactionsPerConnection :: (?context::ControllerContext) => Int
maxTransactionsPerConnection =
case forall configParameter context.
(?context::context, ConfigProvider context,
Typeable configParameter) =>
configParameter
getAppConfig @DataSyncMaxTransactionsPerConnection of
DataSyncMaxTransactionsPerConnection Int
value -> Int
value
maxSubscriptionsPerConnection :: (?context :: ControllerContext) => Int
maxSubscriptionsPerConnection :: (?context::ControllerContext) => Int
maxSubscriptionsPerConnection =
case forall configParameter context.
(?context::context, ConfigProvider context,
Typeable configParameter) =>
configParameter
getAppConfig @DataSyncMaxSubscriptionsPerConnection of
DataSyncMaxSubscriptionsPerConnection Int
value -> Int
value
sqlQueryWithRLSAndTransactionId ::
( ?modelContext :: ModelContext
, PG.ToRow parameters
, ?context :: ControllerContext
, userId ~ Id CurrentUserRecord
, Show (PrimaryKey (GetTableName CurrentUserRecord))
, HasNewSessionUrl CurrentUserRecord
, Typeable CurrentUserRecord
, ?context :: ControllerContext
, HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
, PG.ToField userId
, FromRow result
, ?state :: IORef DataSyncController
) => Maybe UUID -> PG.Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId :: forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
theQuery parameters
theParams = ((?modelContext::ModelContext) => IO [result])
-> Maybe UUID -> IO [result]
forall result.
(?state::IORef DataSyncController, ?modelContext::ModelContext) =>
((?modelContext::ModelContext) => IO result)
-> Maybe UUID -> IO result
runInModelContextWithTransaction (Query -> parameters -> IO [result]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS Query
theQuery parameters
theParams) Maybe UUID
transactionId
sqlExecWithRLSAndTransactionId ::
( ?modelContext :: ModelContext
, PG.ToRow parameters
, ?context :: ControllerContext
, userId ~ Id CurrentUserRecord
, Show (PrimaryKey (GetTableName CurrentUserRecord))
, HasNewSessionUrl CurrentUserRecord
, Typeable CurrentUserRecord
, ?context :: ControllerContext
, HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
, PG.ToField userId
, ?state :: IORef DataSyncController
) => Maybe UUID -> PG.Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId :: forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId Maybe UUID
transactionId Query
theQuery parameters
theParams = ((?modelContext::ModelContext) => IO Int64)
-> Maybe UUID -> IO Int64
forall result.
(?state::IORef DataSyncController, ?modelContext::ModelContext) =>
((?modelContext::ModelContext) => IO result)
-> Maybe UUID -> IO result
runInModelContextWithTransaction (Query -> parameters -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext,
userId ~ Id' (GetTableName CurrentUserRecord),
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField
"id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
ToField userId) =>
Query -> parameters -> IO Int64
sqlExecWithRLS Query
theQuery parameters
theParams) Maybe UUID
transactionId
instance SetField "subscriptions" DataSyncController (HashMap UUID (MVar.MVar ())) where
setField :: HashMap UUID (MVar ()) -> DataSyncController -> DataSyncController
setField HashMap UUID (MVar ())
subscriptions DataSyncController
record = DataSyncController
record { subscriptions }
instance SetField "transactions" DataSyncController (HashMap UUID DataSyncTransaction) where
setField :: HashMap UUID DataSyncTransaction
-> DataSyncController -> DataSyncController
setField HashMap UUID DataSyncTransaction
transactions DataSyncController
record = DataSyncController
record { transactions }
instance SetField "asyncs" DataSyncController [Async ()] where
setField :: [Async ()] -> DataSyncController -> DataSyncController
setField [Async ()]
asyncs DataSyncController
record = DataSyncController
record { asyncs }
atomicModifyIORef'' :: IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' IORef t
ref t -> t
updateFn = IORef t -> (t -> (t, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef t
ref (\t
value -> (t -> t
updateFn t
value, ()))