{-# LANGUAGE UndecidableInstances #-}
module IHP.DataSync.ControllerImpl where

import IHP.ControllerPrelude hiding (OrderByClause)
import qualified Control.Exception as Exception
import qualified IHP.Log as Log
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Key as Aeson

import Data.Aeson.TH
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.ToField as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Data.HashMap.Strict as HashMap
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent.MVar as MVar
import IHP.DataSync.Types
import IHP.DataSync.RowLevelSecurity
import IHP.DataSync.DynamicQuery
import IHP.DataSync.DynamicQueryCompiler
import qualified IHP.DataSync.ChangeNotifications as ChangeNotifications
import IHP.DataSync.REST.Controller (aesonValueToPostgresValue)
import qualified Data.ByteString.Char8 as ByteString
import qualified IHP.PGListener as PGListener
import IHP.ApplicationContext
import qualified Data.Set as Set
import qualified Data.Pool as Pool


$(deriveFromJSON defaultOptions ''DataSyncMessage)
$(deriveToJSON defaultOptions 'DataSyncResult)

type EnsureRLSEnabledFn = Text -> IO TableWithRLS
type InstallTableChangeTriggerFn = TableWithRLS -> IO ()
type SendJSONFn = DataSyncResponse -> IO ()
type HandleCustomMessageFn = (DataSyncResponse -> IO ()) -> DataSyncMessage -> IO ()

runDataSyncController ::
    ( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
    , ?applicationContext :: ApplicationContext
    , ?context :: ControllerContext
    , ?modelContext :: ModelContext
    , ?state :: IORef DataSyncController
    , PG.ToField (PrimaryKey (GetTableName CurrentUserRecord))
    , Typeable CurrentUserRecord
    , HasNewSessionUrl CurrentUserRecord
    , Show (PrimaryKey (GetTableName CurrentUserRecord))
    ) => EnsureRLSEnabledFn -> InstallTableChangeTriggerFn -> IO ByteString -> SendJSONFn -> HandleCustomMessageFn -> (Text -> Renamer) -> IO ()
runDataSyncController :: (HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ?applicationContext::ApplicationContext,
 ?context::ControllerContext, ?modelContext::ModelContext,
 ?state::IORef DataSyncController,
 ToField (PrimaryKey (GetTableName CurrentUserRecord)),
 Typeable CurrentUserRecord, HasNewSessionUrl CurrentUserRecord,
 Show (PrimaryKey (GetTableName CurrentUserRecord))) =>
EnsureRLSEnabledFn
-> InstallTableChangeTriggerFn
-> IO ByteString
-> SendJSONFn
-> HandleCustomMessageFn
-> (Text -> Renamer)
-> IO ()
runDataSyncController EnsureRLSEnabledFn
ensureRLSEnabled InstallTableChangeTriggerFn
installTableChangeTriggers IO ByteString
receiveData SendJSONFn
sendJSON HandleCustomMessageFn
handleCustomMessage Text -> Renamer
renamer = do
        DataSyncController -> IO ()
forall state. (?state::IORef state) => state -> IO ()
setState DataSyncReady { subscriptions :: HashMap UUID (MVar ())
subscriptions = HashMap UUID (MVar ())
forall k v. HashMap k v
HashMap.empty, transactions :: HashMap UUID DataSyncTransaction
transactions = HashMap UUID DataSyncTransaction
forall k v. HashMap k v
HashMap.empty, asyncs :: [Async ()]
asyncs = [] }

        let DataSyncMessage -> IO ()
handleMessage :: DataSyncMessage -> IO () = (HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ?applicationContext::ApplicationContext,
 ?context::ControllerContext, ?modelContext::ModelContext,
 ?state::IORef DataSyncController,
 ToField (PrimaryKey (GetTableName CurrentUserRecord)),
 Typeable CurrentUserRecord, HasNewSessionUrl CurrentUserRecord,
 Show (PrimaryKey (GetTableName CurrentUserRecord))) =>
EnsureRLSEnabledFn
-> InstallTableChangeTriggerFn
-> SendJSONFn
-> HandleCustomMessageFn
-> (Text -> Renamer)
-> DataSyncMessage
-> IO ()
EnsureRLSEnabledFn
-> InstallTableChangeTriggerFn
-> SendJSONFn
-> HandleCustomMessageFn
-> (Text -> Renamer)
-> DataSyncMessage
-> IO ()
buildMessageHandler EnsureRLSEnabledFn
ensureRLSEnabled InstallTableChangeTriggerFn
installTableChangeTriggers SendJSONFn
sendJSON HandleCustomMessageFn
handleCustomMessage Text -> Renamer
renamer

        IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
            Either String DataSyncMessage
message <- ByteString -> Either String DataSyncMessage
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecodeStrict' (ByteString -> Either String DataSyncMessage)
-> IO ByteString -> IO (Either String DataSyncMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ByteString
receiveData

            case Either String DataSyncMessage
message of
                Right DataSyncMessage
decodedMessage -> do
                    let requestId :: Int
requestId = DataSyncMessage
decodedMessage.requestId

                    ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
Exception.mask \forall a. IO a -> IO a
restore -> do
                        -- Handle the messages in an async way
                        -- This increases throughput as multiple queries can be fetched
                        -- in parallel
                        Async ()
handlerProcess <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
restore do
                            Either SomeException ()
result <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (DataSyncMessage -> IO ()
handleMessage DataSyncMessage
decodedMessage)

                            case Either SomeException ()
result of
                                Left (SomeException
e :: Exception.SomeException) -> do
                                    let errorMessage :: Text
errorMessage = case SomeException -> Maybe EnhancedSqlError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
                                            Just (EnhancedSqlError
enhancedSqlError :: EnhancedSqlError) -> ByteString -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (EnhancedSqlError
enhancedSqlError.sqlError.sqlErrorMsg)
                                            Maybe EnhancedSqlError
Nothing -> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
e)
                                    Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
e)
                                    SendJSONFn
sendJSON DataSyncError { Int
requestId :: Int
requestId :: Int
requestId, Text
errorMessage :: Text
errorMessage :: Text
errorMessage }
                                Right ()
result -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

                        IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "asyncs"
-> ([Async ()] -> [Async ()])
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
 SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify Proxy "asyncs"
#asyncs (Async ()
handlerProcess:))
                        () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Left String
errorMessage -> SendJSONFn
sendJSON FailedToDecodeMessageError { errorMessage :: Text
errorMessage = String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs String
errorMessage }
{-# INLINE runDataSyncController #-}


buildMessageHandler ::
    ( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
    , ?applicationContext :: ApplicationContext
    , ?context :: ControllerContext
    , ?modelContext :: ModelContext
    , ?state :: IORef DataSyncController
    , PG.ToField (PrimaryKey (GetTableName CurrentUserRecord))
    , Typeable CurrentUserRecord
    , HasNewSessionUrl CurrentUserRecord
    , Show (PrimaryKey (GetTableName CurrentUserRecord))
    )
    => EnsureRLSEnabledFn -> InstallTableChangeTriggerFn -> SendJSONFn -> HandleCustomMessageFn -> (Text -> Renamer) -> (DataSyncMessage -> IO ())
buildMessageHandler :: (HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ?applicationContext::ApplicationContext,
 ?context::ControllerContext, ?modelContext::ModelContext,
 ?state::IORef DataSyncController,
 ToField (PrimaryKey (GetTableName CurrentUserRecord)),
 Typeable CurrentUserRecord, HasNewSessionUrl CurrentUserRecord,
 Show (PrimaryKey (GetTableName CurrentUserRecord))) =>
EnsureRLSEnabledFn
-> InstallTableChangeTriggerFn
-> SendJSONFn
-> HandleCustomMessageFn
-> (Text -> Renamer)
-> DataSyncMessage
-> IO ()
buildMessageHandler EnsureRLSEnabledFn
ensureRLSEnabled InstallTableChangeTriggerFn
installTableChangeTriggers SendJSONFn
sendJSON HandleCustomMessageFn
handleCustomMessage Text -> Renamer
renamer = DataSyncMessage -> IO ()
handleMessage
    where
            pgListener :: PGListener
pgListener = ?applicationContext::ApplicationContext
ApplicationContext
?applicationContext.pgListener
            handleMessage :: DataSyncMessage -> IO ()
            handleMessage :: DataSyncMessage -> IO ()
handleMessage DataSyncQuery { DynamicSQLQuery
query :: DynamicSQLQuery
query :: DataSyncMessage -> DynamicSQLQuery
query, Int
requestId :: Int
requestId :: DataSyncMessage -> Int
requestId, Maybe UUID
transactionId :: Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId } = do
                EnsureRLSEnabledFn
ensureRLSEnabled (DynamicSQLQuery
query.table)

                let (Query
theQuery, [Action]
theParams) = Renamer -> DynamicSQLQuery -> (Query, [Action])
compileQueryWithRenamer (Text -> Renamer
renamer DynamicSQLQuery
query.table) DynamicSQLQuery
query

                [[Field]]
rawResult :: [[Field]] <- Maybe UUID -> Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, FromRow result,
 ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
theQuery [Action]
theParams
                let result :: [[Field]]
result = ([Field] -> [Field]) -> [[Field]] -> [[Field]]
forall a b. (a -> b) -> [a] -> [b]
map ((Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer DynamicSQLQuery
query.table))) [[Field]]
rawResult

                SendJSONFn
sendJSON DataSyncResult { [[Field]]
result :: [[Field]]
result :: [[Field]]
result, Int
requestId :: Int
requestId :: Int
requestId }
            
            handleMessage CreateDataSubscription { DynamicSQLQuery
query :: DataSyncMessage -> DynamicSQLQuery
query :: DynamicSQLQuery
query, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId } = do
                IO ()
(?state::IORef DataSyncController, ?context::ControllerContext) =>
IO ()
ensureBelowSubscriptionsLimit

                TableWithRLS
tableNameRLS <- EnsureRLSEnabledFn
ensureRLSEnabled (DynamicSQLQuery
query.table)

                UUID
subscriptionId <- IO UUID
UUID.nextRandom

                -- Allocate the close handle as early as possible
                -- to make DeleteDataSubscription calls succeed even when the DataSubscription is
                -- not fully set up yet
                MVar ()
close <- IO (MVar ())
forall a. IO (MVar a)
MVar.newEmptyMVar
                IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "subscriptions"
-> (HashMap UUID (MVar ()) -> HashMap UUID (MVar ()))
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
 SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify Proxy "subscriptions"
#subscriptions (UUID -> MVar () -> HashMap UUID (MVar ()) -> HashMap UUID (MVar ())
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HashMap.insert UUID
subscriptionId MVar ()
close))

                let (Query
theQuery, [Action]
theParams) = Renamer -> DynamicSQLQuery -> (Query, [Action])
compileQueryWithRenamer (Text -> Renamer
renamer DynamicSQLQuery
query.table) DynamicSQLQuery
query

                [[Field]]
rawResult :: [[Field]] <- Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS Query
theQuery [Action]
theParams
                let result :: [[Field]]
result = ([Field] -> [Field]) -> [[Field]] -> [[Field]]
forall a b. (a -> b) -> [a] -> [b]
map ((Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer DynamicSQLQuery
query.table))) [[Field]]
rawResult

                let tableName :: Text
tableName = DynamicSQLQuery
query.table

                -- We need to keep track of all the ids of entities we're watching to make
                -- sure that we only send update notifications to clients that can actually
                -- access the record (e.g. if a RLS policy denies access)
                let watchedRecordIds :: [UUID]
watchedRecordIds = [[Field]] -> [UUID]
recordIds [[Field]]
rawResult

                -- Store it in IORef as an INSERT requires us to add an id
                IORef (Set UUID)
watchedRecordIdsRef <- Set UUID -> IO (IORef (Set UUID))
forall a. a -> IO (IORef a)
newIORef ([UUID] -> Set UUID
forall a. Ord a => [a] -> Set a
Set.fromList [UUID]
watchedRecordIds)

                -- Make sure the database triggers are there
                InstallTableChangeTriggerFn
installTableChangeTriggers TableWithRLS
tableNameRLS

                let callback :: ChangeNotification -> IO ()
callback ChangeNotification
notification = case ChangeNotification
notification of
                            ChangeNotifications.DidInsert { UUID
id :: UUID
id :: ChangeNotification -> UUID
id } -> do
                                -- The new record could not be accessible to the current user with a RLS policy
                                -- E.g. it could be a new record in a 'projects' table, but the project belongs
                                -- to a different user, and thus the current user should not be able to see it.
                                --
                                -- The new record could also be not part of the WHERE condition of the initial query.
                                -- Therefore we need to use the subscriptions WHERE condition to fetch the new record here.
                                --
                                -- To honor the RLS policies we therefore need to fetch the record as the current user
                                -- If the result set is empty, we know the record is not accesible to us
                                [[Field]]
newRecord :: [[Field]] <- Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS (Query
"SELECT * FROM (" Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
theQuery Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
") AS records WHERE records.id = ? LIMIT 1") ([Action]
theParams [Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [UUID -> Action
forall a. ToField a => a -> Action
PG.toField UUID
id])

                                case [[Field]] -> Maybe [Field]
forall a. [a] -> Maybe a
headMay [[Field]]
newRecord of
                                    Just [Field]
rawRecord -> do
                                        -- Add the new record to 'watchedRecordIdsRef'
                                        -- Otherwise the updates and deletes will not be dispatched to the client
                                        IORef (Set UUID) -> (Set UUID -> Set UUID) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
modifyIORef' IORef (Set UUID)
watchedRecordIdsRef (UUID -> Set UUID -> Set UUID
forall a. Ord a => a -> Set a -> Set a
Set.insert UUID
id)

                                        let record :: [Field]
record = (Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer Text
tableName)) [Field]
rawRecord
                                        SendJSONFn
sendJSON DidInsert { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, [Field]
record :: [Field]
record :: [Field]
record }
                                    Maybe [Field]
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                            ChangeNotifications.DidUpdate { UUID
id :: ChangeNotification -> UUID
id :: UUID
id, ChangeSet
changeSet :: ChangeSet
changeSet :: ChangeNotification -> ChangeSet
changeSet } -> do
                                -- Only send the notifcation if the deleted record was part of the initial
                                -- results set
                                Bool
isWatchingRecord <- UUID -> Set UUID -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member UUID
id (Set UUID -> Bool) -> IO (Set UUID) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set UUID) -> IO (Set UUID)
forall a. IORef a -> IO a
readIORef IORef (Set UUID)
watchedRecordIdsRef
                                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isWatchingRecord do
                                    -- The updated record could not be part of the query result set anymore
                                    -- E.g. if it's not matched anymore by the WHERE condition after the update
                                    [(PG.Only Bool
isRecordInResultSet)] <- Query -> [Action] -> IO [Only Bool]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS (Query
"SELECT EXISTS(SELECT * FROM (" Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
theQuery Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
") AS records WHERE records.id = ? LIMIT 1)") ([Action]
theParams [Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [UUID -> Action
forall a. ToField a => a -> Action
PG.toField UUID
id])

                                    [Change]
changes <- (?modelContext::ModelContext) => ChangeSet -> IO [Change]
ChangeSet -> IO [Change]
ChangeNotifications.retrieveChanges ChangeSet
changeSet
                                    if Bool
isRecordInResultSet
                                        then SendJSONFn
sendJSON DidUpdate { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, UUID
id :: UUID
id :: UUID
id, changeSet :: Value
changeSet = Renamer -> [Change] -> Value
changesToValue (Text -> Renamer
renamer Text
tableName) [Change]
changes }
                                        else SendJSONFn
sendJSON DidDelete { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, UUID
id :: UUID
id :: UUID
id }
                            ChangeNotifications.DidDelete { UUID
id :: ChangeNotification -> UUID
id :: UUID
id } -> do
                                -- Only send the notifcation if the deleted record was part of the initial
                                -- results set
                                Bool
isWatchingRecord <- UUID -> Set UUID -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member UUID
id (Set UUID -> Bool) -> IO (Set UUID) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set UUID) -> IO (Set UUID)
forall a. IORef a -> IO a
readIORef IORef (Set UUID)
watchedRecordIdsRef
                                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isWatchingRecord do
                                    SendJSONFn
sendJSON DidDelete { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, UUID
id :: UUID
id :: UUID
id }

                let subscribe :: IO Subscription
subscribe = ByteString
-> (ChangeNotification -> IO ()) -> PGListener -> IO Subscription
forall jsonValue.
FromJSON jsonValue =>
ByteString -> (jsonValue -> IO ()) -> PGListener -> IO Subscription
PGListener.subscribeJSON (TableWithRLS -> ByteString
ChangeNotifications.channelName TableWithRLS
tableNameRLS) ChangeNotification -> IO ()
callback PGListener
pgListener
                let unsubscribe :: Subscription -> IO ()
unsubscribe Subscription
subscription = Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener

                IO Subscription
-> (Subscription -> IO ()) -> (Subscription -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
Exception.bracket IO Subscription
subscribe Subscription -> IO ()
unsubscribe \Subscription
channelSubscription -> do
                    SendJSONFn
sendJSON DidCreateDataSubscription { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, Int
requestId :: Int
requestId :: Int
requestId, [[Field]]
result :: [[Field]]
result :: [[Field]]
result }

                    MVar () -> IO ()
forall a. MVar a -> IO a
MVar.takeMVar MVar ()
close

            handleMessage DeleteDataSubscription { Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, UUID
subscriptionId :: UUID
subscriptionId :: DataSyncMessage -> UUID
subscriptionId } = do
                DataSyncReady { HashMap UUID (MVar ())
subscriptions :: DataSyncController -> HashMap UUID (MVar ())
subscriptions :: HashMap UUID (MVar ())
subscriptions } <- IO DataSyncController
forall state. (?state::IORef state) => IO state
getState
                case UUID -> HashMap UUID (MVar ()) -> Maybe (MVar ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup UUID
subscriptionId HashMap UUID (MVar ())
subscriptions of
                    Just MVar ()
closeSignalMVar -> do
                        -- Cancel table watcher
                        MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar MVar ()
closeSignalMVar ()

                        IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "subscriptions"
-> (HashMap UUID (MVar ()) -> HashMap UUID (MVar ()))
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
 SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify Proxy "subscriptions"
#subscriptions (UUID -> HashMap UUID (MVar ()) -> HashMap UUID (MVar ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HashMap.delete UUID
subscriptionId))

                        SendJSONFn
sendJSON DidDeleteDataSubscription { UUID
subscriptionId :: UUID
subscriptionId :: UUID
subscriptionId, Int
requestId :: Int
requestId :: Int
requestId }
                    Maybe (MVar ())
Nothing -> Text -> IO ()
forall a. Text -> a
error (Text
"Failed to delete DataSubscription, could not find DataSubscription with id " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
forall a. Show a => a -> Text
tshow UUID
subscriptionId)

            handleMessage CreateRecordMessage { Text
table :: Text
table :: DataSyncMessage -> Text
table, HashMap Text Value
record :: HashMap Text Value
record :: DataSyncMessage -> HashMap Text Value
record, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId }  = do
                EnsureRLSEnabledFn
ensureRLSEnabled Text
table

                let query :: Query
query = Query
"INSERT INTO ? ? VALUES ? RETURNING *"
                let columns :: [Text]
columns = HashMap Text Value
record
                        HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
                        [Text] -> ([Text] -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> Renamer
renamer Text
table).fieldToColumn

                let values :: [Action]
values = HashMap Text Value
record
                        HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
                        [Value] -> ([Value] -> [Action]) -> [Action]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue

                let params :: (Identifier, In [Identifier], In [Action])
params = (Text -> Identifier
PG.Identifier Text
table, [Identifier] -> In [Identifier]
forall a. a -> In a
PG.In ((Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier [Text]
columns), [Action] -> In [Action]
forall a. a -> In a
PG.In [Action]
values)
                
                [[Field]]
result :: [[Field]] <- Maybe UUID
-> Query
-> (Identifier, In [Identifier], In [Action])
-> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, FromRow result,
 ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
query (Identifier, In [Identifier], In [Action])
params

                case [[Field]]
result of
                    [[Field]
rawRecord] ->
                        let
                            record :: [Field]
record = (Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer Text
table)) [Field]
rawRecord
                        in
                            SendJSONFn
sendJSON DidCreateRecord { Int
requestId :: Int
requestId :: Int
requestId, [Field]
record :: [Field]
record :: [Field]
record }
                    [[Field]]
otherwise -> Text -> IO ()
forall a. Text -> a
error Text
"Unexpected result in CreateRecordMessage handler"

                () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            
            handleMessage CreateRecordsMessage { Text
table :: DataSyncMessage -> Text
table :: Text
table, [HashMap Text Value]
records :: [HashMap Text Value]
records :: DataSyncMessage -> [HashMap Text Value]
records, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId }  = do
                EnsureRLSEnabledFn
ensureRLSEnabled Text
table

                let query :: Query
query = Query
"INSERT INTO ? ? ? RETURNING *"
                let columns :: [Text]
columns = [HashMap Text Value]
records
                        [HashMap Text Value]
-> ([HashMap Text Value] -> Maybe (HashMap Text Value))
-> Maybe (HashMap Text Value)
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> [HashMap Text Value] -> Maybe (HashMap Text Value)
forall a. [a] -> Maybe a
head
                        Maybe (HashMap Text Value)
-> (Maybe (HashMap Text Value) -> HashMap Text Value)
-> HashMap Text Value
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> \case
                            Just HashMap Text Value
value -> HashMap Text Value
value
                            Maybe (HashMap Text Value)
Nothing -> Text -> HashMap Text Value
forall a. Text -> a
error Text
"Atleast one record is required"
                        HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
                        [Text] -> ([Text] -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> Renamer
renamer Text
table).fieldToColumn

                let values :: [[Action]]
values = [HashMap Text Value]
records
                        [HashMap Text Value]
-> ([HashMap Text Value] -> [[Action]]) -> [[Action]]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (HashMap Text Value -> [Action])
-> [HashMap Text Value] -> [[Action]]
forall a b. (a -> b) -> [a] -> [b]
map (\HashMap Text Value
object ->
                                HashMap Text Value
object
                                HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
                                [Value] -> ([Value] -> [Action]) -> [Action]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue
                            )
                        

                let params :: (Identifier, In [Identifier], Values [Action])
params = (Text -> Identifier
PG.Identifier Text
table, [Identifier] -> In [Identifier]
forall a. a -> In a
PG.In ((Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier [Text]
columns), [QualifiedIdentifier] -> [[Action]] -> Values [Action]
forall a. [QualifiedIdentifier] -> [a] -> Values a
PG.Values [] [[Action]]
values)

                [[Field]]
rawRecords :: [[Field]] <- Maybe UUID
-> Query
-> (Identifier, In [Identifier], Values [Action])
-> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, FromRow result,
 ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
query (Identifier, In [Identifier], Values [Action])
params
                let records :: [[Field]]
records = ([Field] -> [Field]) -> [[Field]] -> [[Field]]
forall a b. (a -> b) -> [a] -> [b]
map ((Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer Text
table))) [[Field]]
rawRecords

                SendJSONFn
sendJSON DidCreateRecords { Int
requestId :: Int
requestId :: Int
requestId, [[Field]]
records :: [[Field]]
records :: [[Field]]
records }

                () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

            handleMessage UpdateRecordMessage { Text
table :: DataSyncMessage -> Text
table :: Text
table, UUID
id :: UUID
id :: DataSyncMessage -> UUID
id, HashMap Text Value
patch :: HashMap Text Value
patch :: DataSyncMessage -> HashMap Text Value
patch, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
                EnsureRLSEnabledFn
ensureRLSEnabled Text
table

                let columns :: [Identifier]
columns = HashMap Text Value
patch
                        HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
                        [Text] -> ([Text] -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
fieldNameToColumnName
                        [Text] -> ([Text] -> [Identifier]) -> [Identifier]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier

                let values :: [Action]
values = HashMap Text Value
patch
                        HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
                        [Value] -> ([Value] -> [Action]) -> [Action]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue

                let keyValues :: [(Identifier, Action)]
keyValues = [Identifier] -> [Action] -> [(Identifier, Action)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Identifier]
columns [Action]
values

                let setCalls :: ByteString
setCalls = [(Identifier, Action)]
keyValues
                        [(Identifier, Action)]
-> ([(Identifier, Action)] -> [ByteString]) -> [ByteString]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> ((Identifier, Action) -> ByteString)
-> [(Identifier, Action)] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier, Action)
_ -> ByteString
"? = ?")
                        [ByteString] -> ([ByteString] -> ByteString) -> ByteString
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> ByteString -> [ByteString] -> ByteString
ByteString.intercalate ByteString
", "
                let query :: ByteString
query = ByteString
"UPDATE ? SET " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
setCalls ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" WHERE id = ? RETURNING *"

                let params :: [Action]
params = [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField (Text -> Identifier
PG.Identifier Text
table)]
                        [Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> ([[Action]] -> [Action]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (((Identifier, Action) -> [Action])
-> [(Identifier, Action)] -> [[Action]]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier
key, Action
value) -> [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField Identifier
key, Action
value]) [(Identifier, Action)]
keyValues))
                        [Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [UUID -> Action
forall a. ToField a => a -> Action
PG.toField UUID
id]

                [[Field]]
result :: [[Field]] <- Maybe UUID -> Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, FromRow result,
 ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId (ByteString -> Query
PG.Query ByteString
query) [Action]
params
                
                case [[Field]]
result of
                    [[Field]
rawRecord] ->
                        let
                            record :: [Field]
record = (Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer Text
table)) [Field]
rawRecord
                        in
                            SendJSONFn
sendJSON DidUpdateRecord { Int
requestId :: Int
requestId :: Int
requestId, [Field]
record :: [Field]
record :: [Field]
record }
                    [[Field]]
otherwise -> Text -> IO ()
forall a. Text -> a
error Text
"Could not apply the update to the given record. Are you sure the record ID you passed is correct? If the record ID is correct, likely the row level security policy is not making the record visible to the UPDATE operation."

                () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

            handleMessage UpdateRecordsMessage { Text
table :: DataSyncMessage -> Text
table :: Text
table, [UUID]
ids :: [UUID]
ids :: DataSyncMessage -> [UUID]
ids, HashMap Text Value
patch :: DataSyncMessage -> HashMap Text Value
patch :: HashMap Text Value
patch, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
                EnsureRLSEnabledFn
ensureRLSEnabled Text
table

                let columns :: [Identifier]
columns = HashMap Text Value
patch
                        HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
                        [Text] -> ([Text] -> [Text]) -> [Text]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
fieldNameToColumnName
                        [Text] -> ([Text] -> [Identifier]) -> [Identifier]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier

                let values :: [Action]
values = HashMap Text Value
patch
                        HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
                        [Value] -> ([Value] -> [Action]) -> [Action]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue

                let keyValues :: [(Identifier, Action)]
keyValues = [Identifier] -> [Action] -> [(Identifier, Action)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Identifier]
columns [Action]
values

                let setCalls :: ByteString
setCalls = [(Identifier, Action)]
keyValues
                        [(Identifier, Action)]
-> ([(Identifier, Action)] -> [ByteString]) -> [ByteString]
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> ((Identifier, Action) -> ByteString)
-> [(Identifier, Action)] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier, Action)
_ -> ByteString
"? = ?")
                        [ByteString] -> ([ByteString] -> ByteString) -> ByteString
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> ByteString -> [ByteString] -> ByteString
ByteString.intercalate ByteString
", "
                let query :: ByteString
query = ByteString
"UPDATE ? SET " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
setCalls ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" WHERE id IN ? RETURNING *"

                let params :: [Action]
params = [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField (Text -> Identifier
PG.Identifier Text
table)]
                        [Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> ([[Action]] -> [Action]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (((Identifier, Action) -> [Action])
-> [(Identifier, Action)] -> [[Action]]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier
key, Action
value) -> [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField Identifier
key, Action
value]) [(Identifier, Action)]
keyValues))
                        [Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [In [UUID] -> Action
forall a. ToField a => a -> Action
PG.toField ([UUID] -> In [UUID]
forall a. a -> In a
PG.In [UUID]
ids)]

                [[Field]]
rawRecords <- Maybe UUID -> Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, FromRow result,
 ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId (ByteString -> Query
PG.Query ByteString
query) [Action]
params
                let records :: [[Field]]
records = ([Field] -> [Field]) -> [[Field]] -> [[Field]]
forall a b. (a -> b) -> [a] -> [b]
map ((Field -> Field) -> [Field] -> [Field]
forall a b. (a -> b) -> [a] -> [b]
map (Renamer -> Field -> Field
renameField (Text -> Renamer
renamer Text
table))) [[Field]]
rawRecords
                
                SendJSONFn
sendJSON DidUpdateRecords { Int
requestId :: Int
requestId :: Int
requestId, [[Field]]
records :: [[Field]]
records :: [[Field]]
records }

                () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            
            handleMessage DeleteRecordMessage { Text
table :: DataSyncMessage -> Text
table :: Text
table, UUID
id :: DataSyncMessage -> UUID
id :: UUID
id, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
                EnsureRLSEnabledFn
ensureRLSEnabled Text
table

                Maybe UUID -> Query -> (Identifier, UUID) -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId Maybe UUID
transactionId Query
"DELETE FROM ? WHERE id = ?" (Text -> Identifier
PG.Identifier Text
table, UUID
id)

                SendJSONFn
sendJSON DidDeleteRecord { Int
requestId :: Int
requestId :: Int
requestId }
            
            handleMessage DeleteRecordsMessage { Text
table :: DataSyncMessage -> Text
table :: Text
table, [UUID]
ids :: DataSyncMessage -> [UUID]
ids :: [UUID]
ids, Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
transactionId :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
                EnsureRLSEnabledFn
ensureRLSEnabled Text
table

                Maybe UUID -> Query -> (Identifier, In [UUID]) -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId Maybe UUID
transactionId Query
"DELETE FROM ? WHERE id IN ?" (Text -> Identifier
PG.Identifier Text
table, [UUID] -> In [UUID]
forall a. a -> In a
PG.In [UUID]
ids)

                SendJSONFn
sendJSON DidDeleteRecords { Int
requestId :: Int
requestId :: Int
requestId }

            handleMessage StartTransaction { Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId } = do
                IO ()
(?state::IORef DataSyncController, ?context::ControllerContext) =>
IO ()
ensureBelowTransactionLimit

                UUID
transactionId <- IO UUID
UUID.nextRandom


                let takeConnection :: IO (Connection, LocalPool Connection)
takeConnection = ?modelContext::ModelContext
ModelContext
?modelContext.connectionPool
                                    Pool Connection
-> (Pool Connection -> IO (Connection, LocalPool Connection))
-> IO (Connection, LocalPool Connection)
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Pool Connection -> IO (Connection, LocalPool Connection)
forall a. Pool a -> IO (a, LocalPool a)
Pool.takeResource

                let releaseConnection :: (Connection, LocalPool Connection) -> IO ()
releaseConnection (Connection
connection, LocalPool Connection
localPool) = do
                        Connection -> Query -> () -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PG.execute Connection
connection Query
"ROLLBACK" () -- Make sure there's no pending transaction in case something went wrong
                        LocalPool Connection -> Connection -> IO ()
forall a. LocalPool a -> a -> IO ()
Pool.putResource LocalPool Connection
localPool Connection
connection

                IO (Connection, LocalPool Connection)
-> ((Connection, LocalPool Connection) -> IO ())
-> ((Connection, LocalPool Connection) -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
Exception.bracket IO (Connection, LocalPool Connection)
takeConnection (Connection, LocalPool Connection) -> IO ()
releaseConnection \(Connection
connection, LocalPool Connection
localPool) -> do
                    MVar ()
transactionSignal <- IO (MVar ())
forall a. IO (MVar a)
MVar.newEmptyMVar

                    let globalModelContext :: ModelContext
globalModelContext = ?modelContext::ModelContext
ModelContext
?modelContext
                    let ?modelContext = ModelContext
globalModelContext { transactionConnection = Just connection } in Query -> () -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId) =>
Query -> parameters -> IO Int64
sqlExecWithRLS Query
"BEGIN" ()

                    let transaction :: DataSyncTransaction
transaction = DataSyncTransaction
                            { id :: UUID
id = UUID
transactionId
                            , Connection
connection :: Connection
connection :: Connection
connection
                            , close :: MVar ()
close = MVar ()
transactionSignal
                            }

                    IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "transactions"
-> (HashMap UUID DataSyncTransaction
    -> HashMap UUID DataSyncTransaction)
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
 SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify Proxy "transactions"
#transactions (UUID
-> DataSyncTransaction
-> HashMap UUID DataSyncTransaction
-> HashMap UUID DataSyncTransaction
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HashMap.insert UUID
transactionId DataSyncTransaction
transaction))

                    SendJSONFn
sendJSON DidStartTransaction { Int
requestId :: Int
requestId :: Int
requestId, UUID
transactionId :: UUID
transactionId :: UUID
transactionId }

                    MVar () -> IO ()
forall a. MVar a -> IO a
MVar.takeMVar MVar ()
transactionSignal

                    IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall {t}. IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall {t1} {t2}. t1 -> (t1 -> t2) -> t2
|> Proxy "transactions"
-> (HashMap UUID DataSyncTransaction
    -> HashMap UUID DataSyncTransaction)
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
 SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify Proxy "transactions"
#transactions (UUID
-> HashMap UUID DataSyncTransaction
-> HashMap UUID DataSyncTransaction
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HashMap.delete UUID
transactionId))

            handleMessage RollbackTransaction { Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, UUID
id :: DataSyncMessage -> UUID
id :: UUID
id } = do
                DataSyncTransaction { UUID
id :: DataSyncTransaction -> UUID
id :: UUID
id, MVar ()
close :: DataSyncTransaction -> MVar ()
close :: MVar ()
close } <- (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
UUID -> IO DataSyncTransaction
findTransactionById UUID
id

                Maybe UUID -> Query -> () -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId (UUID -> Maybe UUID
forall a. a -> Maybe a
Just UUID
id) Query
"ROLLBACK" ()
                MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar MVar ()
close ()

                SendJSONFn
sendJSON DidRollbackTransaction { Int
requestId :: Int
requestId :: Int
requestId, transactionId :: UUID
transactionId = UUID
id }

            handleMessage CommitTransaction { Int
requestId :: DataSyncMessage -> Int
requestId :: Int
requestId, UUID
id :: DataSyncMessage -> UUID
id :: UUID
id } = do
                DataSyncTransaction { UUID
id :: DataSyncTransaction -> UUID
id :: UUID
id, MVar ()
close :: DataSyncTransaction -> MVar ()
close :: MVar ()
close } <- (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
UUID -> IO DataSyncTransaction
findTransactionById UUID
id

                Maybe UUID -> Query -> () -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId (UUID -> Maybe UUID
forall a. a -> Maybe a
Just UUID
id) Query
"COMMIT" ()
                MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar MVar ()
close ()

                SendJSONFn
sendJSON DidCommitTransaction { Int
requestId :: Int
requestId :: Int
requestId, transactionId :: UUID
transactionId = UUID
id }

            handleMessage DataSyncMessage
otherwise = HandleCustomMessageFn
handleCustomMessage SendJSONFn
sendJSON DataSyncMessage
otherwise

cleanupAllSubscriptions :: (?state :: IORef DataSyncController, ?applicationContext :: ApplicationContext) => IO ()
cleanupAllSubscriptions :: (?state::IORef DataSyncController,
 ?applicationContext::ApplicationContext) =>
IO ()
cleanupAllSubscriptions = do
    DataSyncController
state <- IO DataSyncController
forall state. (?state::IORef state) => IO state
getState
    let pgListener :: PGListener
pgListener = ?applicationContext::ApplicationContext
ApplicationContext
?applicationContext.pgListener

    case DataSyncController
state of
        DataSyncReady { [Async ()]
asyncs :: DataSyncController -> [Async ()]
asyncs :: [Async ()]
asyncs } -> [Async ()] -> (Element [Async ()] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
asyncs Async () -> IO ()
Element [Async ()] -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel
        DataSyncController
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

changesToValue :: Renamer -> [ChangeNotifications.Change] -> Value
changesToValue :: Renamer -> [Change] -> Value
changesToValue Renamer
renamer [Change]
changes = [Pair] -> Value
object ((Change -> Pair) -> [Change] -> [Pair]
forall a b. (a -> b) -> [a] -> [b]
map Change -> Pair
changeToPair [Change]
changes)
    where
        changeToPair :: Change -> Pair
changeToPair ChangeNotifications.Change { Text
col :: Text
col :: Change -> Text
col, Value
new :: Value
new :: Change -> Value
new } = (Text -> Key
Aeson.fromText (Text -> Key) -> Text -> Key
forall a b. (a -> b) -> a -> b
$ Renamer
renamer.columnToField Text
col) Key -> Value -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Value
new

runInModelContextWithTransaction :: (?state :: IORef DataSyncController, ?modelContext :: ModelContext) => ((?modelContext :: ModelContext) => IO result) -> Maybe UUID -> IO result
runInModelContextWithTransaction :: forall result.
(?state::IORef DataSyncController, ?modelContext::ModelContext) =>
((?modelContext::ModelContext) => IO result)
-> Maybe UUID -> IO result
runInModelContextWithTransaction (?modelContext::ModelContext) => IO result
function (Just UUID
transactionId) = do
    let globalModelContext :: ModelContext
globalModelContext = ?modelContext::ModelContext
ModelContext
?modelContext

    DataSyncTransaction { Connection
connection :: DataSyncTransaction -> Connection
connection :: Connection
connection } <- (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
UUID -> IO DataSyncTransaction
findTransactionById UUID
transactionId
    let
            ?modelContext = ModelContext
globalModelContext { transactionConnection = Just connection }
        in
            IO result
(?modelContext::ModelContext) => IO result
function
runInModelContextWithTransaction (?modelContext::ModelContext) => IO result
function Maybe UUID
Nothing = IO result
(?modelContext::ModelContext) => IO result
function

findTransactionById :: (?state :: IORef DataSyncController) => UUID -> IO DataSyncTransaction
findTransactionById :: (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
findTransactionById UUID
transactionId = do
    HashMap UUID DataSyncTransaction
transactions <- (.transactions) (DataSyncController -> HashMap UUID DataSyncTransaction)
-> IO DataSyncController -> IO (HashMap UUID DataSyncTransaction)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef DataSyncController -> IO DataSyncController
forall a. IORef a -> IO a
readIORef ?state::IORef DataSyncController
IORef DataSyncController
?state
    case UUID
-> HashMap UUID DataSyncTransaction -> Maybe DataSyncTransaction
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup UUID
transactionId HashMap UUID DataSyncTransaction
transactions of
        Just DataSyncTransaction
transaction -> DataSyncTransaction -> IO DataSyncTransaction
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DataSyncTransaction
transaction
        Maybe DataSyncTransaction
Nothing -> Text -> IO DataSyncTransaction
forall a. Text -> a
error Text
"No transaction with that id"

-- | Allow max 10 concurrent transactions per connection to avoid running out of database connections
--
-- Each transaction removes a database connection from the connection pool. If we don't limit the transactions,
-- a single user could take down the application by starting more than 'IHP.FrameworkConfig.DBPoolMaxConnections'
-- concurrent transactions. Then all database connections are removed from the connection pool and further database
-- queries for other users will fail.
--
ensureBelowTransactionLimit :: (?state :: IORef DataSyncController, ?context :: ControllerContext) => IO ()
ensureBelowTransactionLimit :: (?state::IORef DataSyncController, ?context::ControllerContext) =>
IO ()
ensureBelowTransactionLimit = do
    HashMap UUID DataSyncTransaction
transactions <- (.transactions) (DataSyncController -> HashMap UUID DataSyncTransaction)
-> IO DataSyncController -> IO (HashMap UUID DataSyncTransaction)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef DataSyncController -> IO DataSyncController
forall a. IORef a -> IO a
readIORef ?state::IORef DataSyncController
IORef DataSyncController
?state
    let transactionCount :: Int
transactionCount = HashMap UUID DataSyncTransaction -> Int
forall k v. HashMap k v -> Int
HashMap.size HashMap UUID DataSyncTransaction
transactions
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
transactionCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
(?context::ControllerContext) => Int
maxTransactionsPerConnection) do
        Text -> IO ()
forall a. Text -> a
error (Text
"You've reached the transaction limit of " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow Int
(?context::ControllerContext) => Int
maxTransactionsPerConnection Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" transactions")

ensureBelowSubscriptionsLimit :: (?state :: IORef DataSyncController, ?context :: ControllerContext) => IO ()
ensureBelowSubscriptionsLimit :: (?state::IORef DataSyncController, ?context::ControllerContext) =>
IO ()
ensureBelowSubscriptionsLimit = do
    HashMap UUID (MVar ())
subscriptions <- (.subscriptions) (DataSyncController -> HashMap UUID (MVar ()))
-> IO DataSyncController -> IO (HashMap UUID (MVar ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef DataSyncController -> IO DataSyncController
forall a. IORef a -> IO a
readIORef ?state::IORef DataSyncController
IORef DataSyncController
?state
    let subscriptionsCount :: Int
subscriptionsCount = HashMap UUID (MVar ()) -> Int
forall k v. HashMap k v -> Int
HashMap.size HashMap UUID (MVar ())
subscriptions
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
subscriptionsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
(?context::ControllerContext) => Int
maxSubscriptionsPerConnection) do
        Text -> IO ()
forall a. Text -> a
error (Text
"You've reached the subscriptions limit of " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow Int
(?context::ControllerContext) => Int
maxSubscriptionsPerConnection Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" subscriptions")

maxTransactionsPerConnection :: (?context :: ControllerContext) => Int
maxTransactionsPerConnection :: (?context::ControllerContext) => Int
maxTransactionsPerConnection = 
    case forall configParameter context.
(?context::context, ConfigProvider context,
 Typeable configParameter) =>
configParameter
getAppConfig @DataSyncMaxTransactionsPerConnection of
        DataSyncMaxTransactionsPerConnection Int
value -> Int
value

maxSubscriptionsPerConnection :: (?context :: ControllerContext) => Int
maxSubscriptionsPerConnection :: (?context::ControllerContext) => Int
maxSubscriptionsPerConnection = 
    case forall configParameter context.
(?context::context, ConfigProvider context,
 Typeable configParameter) =>
configParameter
getAppConfig @DataSyncMaxSubscriptionsPerConnection of
        DataSyncMaxSubscriptionsPerConnection Int
value -> Int
value

sqlQueryWithRLSAndTransactionId ::
    ( ?modelContext :: ModelContext
    , PG.ToRow parameters
    , ?context :: ControllerContext
    , userId ~ Id CurrentUserRecord
    , Show (PrimaryKey (GetTableName CurrentUserRecord))
    , HasNewSessionUrl CurrentUserRecord
    , Typeable CurrentUserRecord
    , ?context :: ControllerContext
    , HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
    , PG.ToField userId
    , FromRow result
    , ?state :: IORef DataSyncController
    ) => Maybe UUID -> PG.Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId :: forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, FromRow result,
 ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
theQuery parameters
theParams = ((?modelContext::ModelContext) => IO [result])
-> Maybe UUID -> IO [result]
forall result.
(?state::IORef DataSyncController, ?modelContext::ModelContext) =>
((?modelContext::ModelContext) => IO result)
-> Maybe UUID -> IO result
runInModelContextWithTransaction (Query -> parameters -> IO [result]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS Query
theQuery parameters
theParams) Maybe UUID
transactionId

sqlExecWithRLSAndTransactionId ::
    ( ?modelContext :: ModelContext
    , PG.ToRow parameters
    , ?context :: ControllerContext
    , userId ~ Id CurrentUserRecord
    , Show (PrimaryKey (GetTableName CurrentUserRecord))
    , HasNewSessionUrl CurrentUserRecord
    , Typeable CurrentUserRecord
    , ?context :: ControllerContext
    , HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
    , PG.ToField userId
    , ?state :: IORef DataSyncController
    ) => Maybe UUID -> PG.Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId :: forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId Maybe UUID
transactionId Query
theQuery parameters
theParams = ((?modelContext::ModelContext) => IO Int64)
-> Maybe UUID -> IO Int64
forall result.
(?state::IORef DataSyncController, ?modelContext::ModelContext) =>
((?modelContext::ModelContext) => IO result)
-> Maybe UUID -> IO result
runInModelContextWithTransaction (Query -> parameters -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
 ?context::ControllerContext,
 userId ~ Id' (GetTableName CurrentUserRecord),
 Show (PrimaryKey (GetTableName CurrentUserRecord)),
 HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
 ?context::ControllerContext,
 HasField
   "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)),
 ToField userId) =>
Query -> parameters -> IO Int64
sqlExecWithRLS Query
theQuery parameters
theParams) Maybe UUID
transactionId

instance SetField "subscriptions" DataSyncController (HashMap UUID (MVar.MVar ())) where
    setField :: HashMap UUID (MVar ()) -> DataSyncController -> DataSyncController
setField HashMap UUID (MVar ())
subscriptions DataSyncController
record = DataSyncController
record { subscriptions }

instance SetField "transactions" DataSyncController (HashMap UUID DataSyncTransaction) where
    setField :: HashMap UUID DataSyncTransaction
-> DataSyncController -> DataSyncController
setField HashMap UUID DataSyncTransaction
transactions DataSyncController
record = DataSyncController
record { transactions }

instance SetField "asyncs" DataSyncController [Async ()] where
    setField :: [Async ()] -> DataSyncController -> DataSyncController
setField [Async ()]
asyncs DataSyncController
record = DataSyncController
record { asyncs }

atomicModifyIORef'' :: IORef t -> (t -> t) -> IO ()
atomicModifyIORef'' IORef t
ref t -> t
updateFn = IORef t -> (t -> (t, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef t
ref (\t
value -> (t -> t
updateFn t
value, ()))