{-# LANGUAGE UndecidableInstances #-}
module IHP.DataSync.ControllerImpl where
import IHP.ControllerPrelude hiding (OrderByClause)
import qualified Control.Exception as Exception
import qualified IHP.Log as Log
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Encoding.Internal as Aeson
import Data.Aeson.TH
import Data.Aeson
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.ToField as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Data.HashMap.Strict as HashMap
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent.MVar as MVar
import IHP.DataSync.Types
import IHP.DataSync.RowLevelSecurity
import IHP.DataSync.DynamicQuery
import IHP.DataSync.DynamicQueryCompiler
import qualified IHP.DataSync.ChangeNotifications as ChangeNotifications
import IHP.DataSync.REST.Controller (aesonValueToPostgresValue)
import qualified Data.ByteString.Char8 as ByteString
import qualified Data.ByteString.Builder as ByteString
import qualified IHP.PGListener as PGListener
import IHP.ApplicationContext
import Data.Set (Set)
import qualified Data.Set as Set
import qualified Data.Pool as Pool
import qualified IHP.GraphQL.Types as GraphQL
import qualified IHP.GraphQL.Parser as GraphQL
import qualified IHP.GraphQL.Compiler as GraphQL
import IHP.GraphQL.JSON ()
import qualified Data.Attoparsec.Text as Attoparsec
import qualified Network.WebSockets as WS
runDataSyncController ::
( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
, ?applicationContext :: ApplicationContext
, ?connection :: WS.Connection
, ?context :: ControllerContext
, ?modelContext :: ModelContext
, ?state :: IORef DataSyncController
, PG.ToField (PrimaryKey (GetTableName CurrentUserRecord))
, Typeable CurrentUserRecord
, HasNewSessionUrl CurrentUserRecord
, Show (PrimaryKey (GetTableName CurrentUserRecord))
) => _ -> _ -> _ -> _ -> _ -> IO ()
runDataSyncController :: (Text -> IO TableWithRLS)
-> (TableWithRLS -> IO a)
-> IO ByteString
-> (DataSyncResponse -> IO ())
-> ((DataSyncResponse -> IO ()) -> DataSyncMessage -> IO ())
-> IO ()
runDataSyncController Text -> IO TableWithRLS
ensureRLSEnabled TableWithRLS -> IO a
installTableChangeTriggers IO ByteString
receiveData DataSyncResponse -> IO ()
sendJSON (DataSyncResponse -> IO ()) -> DataSyncMessage -> IO ()
handleCustomMessage = do
DataSyncController -> IO ()
forall state. (?state::IORef state) => state -> IO ()
setState DataSyncReady :: HashMap UUID (MVar ())
-> HashMap UUID DataSyncTransaction
-> [Async ()]
-> DataSyncController
DataSyncReady { $sel:subscriptions:DataSyncController :: HashMap UUID (MVar ())
subscriptions = HashMap UUID (MVar ())
forall k v. HashMap k v
HashMap.empty, $sel:transactions:DataSyncController :: HashMap UUID DataSyncTransaction
transactions = HashMap UUID DataSyncTransaction
forall k v. HashMap k v
HashMap.empty, $sel:asyncs:DataSyncController :: [Async ()]
asyncs = [] }
let pgListener :: PGListener
pgListener = ?applicationContext::ApplicationContext
ApplicationContext
?applicationContext ApplicationContext
-> (ApplicationContext -> PGListener) -> PGListener
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "pgListener" -> ApplicationContext -> PGListener
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "pgListener" (Proxy "pgListener")
Proxy "pgListener"
#pgListener
let
handleMessage :: DataSyncMessage -> IO ()
handleMessage :: DataSyncMessage -> IO ()
handleMessage DataSyncQuery { DynamicSQLQuery
$sel:query:DataSyncQuery :: DataSyncMessage -> DynamicSQLQuery
query :: DynamicSQLQuery
query, Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId :: Int
requestId, Maybe UUID
$sel:transactionId:DataSyncQuery :: DataSyncMessage -> Maybe UUID
transactionId :: Maybe UUID
transactionId } = do
Text -> IO TableWithRLS
ensureRLSEnabled (Proxy "table" -> DynamicSQLQuery -> Text
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "table" (Proxy "table")
Proxy "table"
#table DynamicSQLQuery
query)
let (Query
theQuery, [Action]
theParams) = DynamicSQLQuery -> (Query, [Action])
compileQuery DynamicSQLQuery
query
[[Field]]
result :: [[Field]] <- Maybe UUID -> Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
theQuery [Action]
theParams
DataSyncResponse -> IO ()
sendJSON DataSyncResult :: [[Field]] -> Int -> DataSyncResponse
DataSyncResult { [[Field]]
$sel:result:DataSyncResult :: [[Field]]
result :: [[Field]]
result, Int
$sel:requestId:DataSyncResult :: Int
requestId :: Int
requestId }
handleMessage CreateDataSubscription { DynamicSQLQuery
query :: DynamicSQLQuery
$sel:query:DataSyncQuery :: DataSyncMessage -> DynamicSQLQuery
query, Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId } = do
IO ()
(?state::IORef DataSyncController, ?context::ControllerContext) =>
IO ()
ensureBelowSubscriptionsLimit
TableWithRLS
tableNameRLS <- Text -> IO TableWithRLS
ensureRLSEnabled (Proxy "table" -> DynamicSQLQuery -> Text
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "table" (Proxy "table")
Proxy "table"
#table DynamicSQLQuery
query)
UUID
subscriptionId <- IO UUID
UUID.nextRandom
let (Query
theQuery, [Action]
theParams) = DynamicSQLQuery -> (Query, [Action])
compileQuery DynamicSQLQuery
query
[[Field]]
result :: [[Field]] <- Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS Query
theQuery [Action]
theParams
let tableName :: Text
tableName = Proxy "table" -> DynamicSQLQuery -> Text
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "table" (Proxy "table")
Proxy "table"
#table DynamicSQLQuery
query
let watchedRecordIds :: [UUID]
watchedRecordIds = [[Field]] -> [UUID]
recordIds [[Field]]
result
IORef (Set UUID)
watchedRecordIdsRef <- Set UUID -> IO (IORef (Set UUID))
forall a. a -> IO (IORef a)
newIORef ([UUID] -> Set UUID
forall a. Ord a => [a] -> Set a
Set.fromList [UUID]
watchedRecordIds)
TableWithRLS -> IO a
installTableChangeTriggers TableWithRLS
tableNameRLS
let callback :: ChangeNotification -> IO ()
callback ChangeNotification
notification = case ChangeNotification
notification of
ChangeNotifications.DidInsert { UUID
$sel:id:DidInsert :: ChangeNotification -> UUID
id :: UUID
id } -> do
[[Field]]
newRecord :: [[Field]] <- Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS (Query
"SELECT * FROM (" Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
theQuery Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
") AS records WHERE records.id = ? LIMIT 1") ([Action]
theParams [Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [UUID -> Action
forall a. ToField a => a -> Action
PG.toField UUID
id])
case [[Field]] -> Maybe [Field]
forall a. [a] -> Maybe a
headMay [[Field]]
newRecord of
Just [Field]
record -> do
IORef (Set UUID) -> (Set UUID -> Set UUID) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Set UUID)
watchedRecordIdsRef (UUID -> Set UUID -> Set UUID
forall a. Ord a => a -> Set a -> Set a
Set.insert UUID
id)
DataSyncResponse -> IO ()
sendJSON DidInsert :: UUID -> [Field] -> DataSyncResponse
DidInsert { UUID
$sel:subscriptionId:DataSyncResult :: UUID
subscriptionId :: UUID
subscriptionId, [Field]
$sel:record:DataSyncResult :: [Field]
record :: [Field]
record }
Maybe [Field]
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ChangeNotifications.DidUpdate { UUID
id :: UUID
$sel:id:DidInsert :: ChangeNotification -> UUID
id, [Change]
$sel:changeSet:DidInsert :: ChangeNotification -> [Change]
changeSet :: [Change]
changeSet } -> do
Bool
isWatchingRecord <- UUID -> Set UUID -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member UUID
id (Set UUID -> Bool) -> IO (Set UUID) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set UUID) -> IO (Set UUID)
forall a. IORef a -> IO a
readIORef IORef (Set UUID)
watchedRecordIdsRef
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isWatchingRecord do
[(PG.Only Bool
isRecordInResultSet)] <- Query -> [Action] -> IO [Only Bool]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS (Query
"SELECT EXISTS(SELECT * FROM (" Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
theQuery Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> Query
") AS records WHERE records.id = ? LIMIT 1)") ([Action]
theParams [Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [UUID -> Action
forall a. ToField a => a -> Action
PG.toField UUID
id])
if Bool
isRecordInResultSet
then DataSyncResponse -> IO ()
sendJSON DidUpdate :: UUID -> UUID -> Value -> DataSyncResponse
DidUpdate { UUID
$sel:subscriptionId:DataSyncResult :: UUID
subscriptionId :: UUID
subscriptionId, UUID
$sel:id:DataSyncResult :: UUID
id :: UUID
id, $sel:changeSet:DataSyncResult :: Value
changeSet = [Change] -> Value
changesToValue [Change]
changeSet }
else DataSyncResponse -> IO ()
sendJSON DidDelete :: UUID -> UUID -> DataSyncResponse
DidDelete { UUID
$sel:subscriptionId:DataSyncResult :: UUID
subscriptionId :: UUID
subscriptionId, UUID
$sel:id:DataSyncResult :: UUID
id :: UUID
id }
ChangeNotifications.DidDelete { UUID
id :: UUID
$sel:id:DidInsert :: ChangeNotification -> UUID
id } -> do
Bool
isWatchingRecord <- UUID -> Set UUID -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member UUID
id (Set UUID -> Bool) -> IO (Set UUID) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set UUID) -> IO (Set UUID)
forall a. IORef a -> IO a
readIORef IORef (Set UUID)
watchedRecordIdsRef
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isWatchingRecord do
DataSyncResponse -> IO ()
sendJSON DidDelete :: UUID -> UUID -> DataSyncResponse
DidDelete { UUID
$sel:subscriptionId:DataSyncResult :: UUID
subscriptionId :: UUID
subscriptionId, UUID
id :: UUID
$sel:id:DataSyncResult :: UUID
id }
let subscribe :: IO Subscription
subscribe = ByteString
-> (ChangeNotification -> IO ()) -> PGListener -> IO Subscription
forall jsonValue.
FromJSON jsonValue =>
ByteString -> (jsonValue -> IO ()) -> PGListener -> IO Subscription
PGListener.subscribeJSON (TableWithRLS -> ByteString
ChangeNotifications.channelName TableWithRLS
tableNameRLS) ChangeNotification -> IO ()
callback PGListener
pgListener
let unsubscribe :: Subscription -> IO ()
unsubscribe Subscription
subscription = Subscription -> PGListener -> IO ()
PGListener.unsubscribe Subscription
subscription PGListener
pgListener
IO Subscription
-> (Subscription -> IO ()) -> (Subscription -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
Exception.bracket IO Subscription
subscribe Subscription -> IO ()
unsubscribe \Subscription
channelSubscription -> do
MVar ()
close <- IO (MVar ())
forall a. IO (MVar a)
MVar.newEmptyMVar
IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "subscriptions"
-> (HashMap UUID (MVar ()) -> HashMap UUID (MVar ()))
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify IsLabel "subscriptions" (Proxy "subscriptions")
Proxy "subscriptions"
#subscriptions (UUID -> MVar () -> HashMap UUID (MVar ()) -> HashMap UUID (MVar ())
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HashMap.insert UUID
subscriptionId MVar ()
close))
DataSyncResponse -> IO ()
sendJSON DidCreateDataSubscription :: Int -> UUID -> [[Field]] -> DataSyncResponse
DidCreateDataSubscription { UUID
$sel:subscriptionId:DataSyncResult :: UUID
subscriptionId :: UUID
subscriptionId, Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId, [[Field]]
result :: [[Field]]
$sel:result:DataSyncResult :: [[Field]]
result }
MVar () -> IO ()
forall a. MVar a -> IO a
MVar.takeMVar MVar ()
close
handleMessage DeleteDataSubscription { Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId, UUID
$sel:subscriptionId:DataSyncQuery :: DataSyncMessage -> UUID
subscriptionId :: UUID
subscriptionId } = do
DataSyncReady { HashMap UUID (MVar ())
subscriptions :: HashMap UUID (MVar ())
$sel:subscriptions:DataSyncController :: DataSyncController -> HashMap UUID (MVar ())
subscriptions } <- IO DataSyncController
forall state. (?state::IORef state) => IO state
getState
let (Just MVar ()
closeSignalMVar) = UUID -> HashMap UUID (MVar ()) -> Maybe (MVar ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup UUID
subscriptionId HashMap UUID (MVar ())
subscriptions
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar MVar ()
closeSignalMVar ()
IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "subscriptions"
-> (HashMap UUID (MVar ()) -> HashMap UUID (MVar ()))
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify IsLabel "subscriptions" (Proxy "subscriptions")
Proxy "subscriptions"
#subscriptions (UUID -> HashMap UUID (MVar ()) -> HashMap UUID (MVar ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HashMap.delete UUID
subscriptionId))
DataSyncResponse -> IO ()
sendJSON DidDeleteDataSubscription :: Int -> UUID -> DataSyncResponse
DidDeleteDataSubscription { UUID
subscriptionId :: UUID
$sel:subscriptionId:DataSyncResult :: UUID
subscriptionId, Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId }
handleMessage CreateRecordMessage { Text
$sel:table:DataSyncQuery :: DataSyncMessage -> Text
table :: Text
table, HashMap Text Value
$sel:record:DataSyncQuery :: DataSyncMessage -> HashMap Text Value
record :: HashMap Text Value
record, Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId, Maybe UUID
transactionId :: Maybe UUID
$sel:transactionId:DataSyncQuery :: DataSyncMessage -> Maybe UUID
transactionId } = do
Text -> IO TableWithRLS
ensureRLSEnabled Text
table
let query :: Query
query = Query
"INSERT INTO ? ? VALUES ? RETURNING *"
let columns :: [Text]
columns = HashMap Text Value
record
HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
[Text] -> ([Text] -> [Text]) -> [Text]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
fieldNameToColumnName
let values :: [Action]
values = HashMap Text Value
record
HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
[Value] -> ([Value] -> [Action]) -> [Action]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue
let params :: (Identifier, In [Identifier], In [Action])
params = (Text -> Identifier
PG.Identifier Text
table, [Identifier] -> In [Identifier]
forall a. a -> In a
PG.In ((Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier [Text]
columns), [Action] -> In [Action]
forall a. a -> In a
PG.In [Action]
values)
[[Field]]
result :: [[Field]] <- Maybe UUID
-> Query
-> (Identifier, In [Identifier], In [Action])
-> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
query (Identifier, In [Identifier], In [Action])
params
case [[Field]]
result of
[[Field]
record] -> DataSyncResponse -> IO ()
sendJSON DidCreateRecord :: Int -> [Field] -> DataSyncResponse
DidCreateRecord { Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId, [Field]
record :: [Field]
$sel:record:DataSyncResult :: [Field]
record }
[[Field]]
otherwise -> Text -> IO ()
forall a. Text -> a
error Text
"Unexpected result in CreateRecordMessage handler"
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handleMessage CreateRecordsMessage { Text
table :: Text
$sel:table:DataSyncQuery :: DataSyncMessage -> Text
table, [HashMap Text Value]
$sel:records:DataSyncQuery :: DataSyncMessage -> [HashMap Text Value]
records :: [HashMap Text Value]
records, Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId, Maybe UUID
transactionId :: Maybe UUID
$sel:transactionId:DataSyncQuery :: DataSyncMessage -> Maybe UUID
transactionId } = do
Text -> IO TableWithRLS
ensureRLSEnabled Text
table
let query :: Query
query = Query
"INSERT INTO ? ? ? RETURNING *"
let columns :: [Text]
columns = [HashMap Text Value]
records
[HashMap Text Value]
-> ([HashMap Text Value] -> Maybe (HashMap Text Value))
-> Maybe (HashMap Text Value)
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> [HashMap Text Value] -> Maybe (HashMap Text Value)
forall a. [a] -> Maybe a
head
Maybe (HashMap Text Value)
-> (Maybe (HashMap Text Value) -> HashMap Text Value)
-> HashMap Text Value
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> \case
Just HashMap Text Value
value -> HashMap Text Value
value
Maybe (HashMap Text Value)
Nothing -> Text -> HashMap Text Value
forall a. Text -> a
error Text
"Atleast one record is required"
HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
[Text] -> ([Text] -> [Text]) -> [Text]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
fieldNameToColumnName
let values :: [[Action]]
values = [HashMap Text Value]
records
[HashMap Text Value]
-> ([HashMap Text Value] -> [[Action]]) -> [[Action]]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (HashMap Text Value -> [Action])
-> [HashMap Text Value] -> [[Action]]
forall a b. (a -> b) -> [a] -> [b]
map (\HashMap Text Value
object ->
HashMap Text Value
object
HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
[Value] -> ([Value] -> [Action]) -> [Action]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue
)
let params :: (Identifier, In [Identifier], Values [Action])
params = (Text -> Identifier
PG.Identifier Text
table, [Identifier] -> In [Identifier]
forall a. a -> In a
PG.In ((Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier [Text]
columns), [QualifiedIdentifier] -> [[Action]] -> Values [Action]
forall a. [QualifiedIdentifier] -> [a] -> Values a
PG.Values [] [[Action]]
values)
[[Field]]
records :: [[Field]] <- Maybe UUID
-> Query
-> (Identifier, In [Identifier], Values [Action])
-> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
query (Identifier, In [Identifier], Values [Action])
params
DataSyncResponse -> IO ()
sendJSON DidCreateRecords :: Int -> [[Field]] -> DataSyncResponse
DidCreateRecords { Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId, [[Field]]
$sel:records:DataSyncResult :: [[Field]]
records :: [[Field]]
records }
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handleMessage UpdateRecordMessage { Text
table :: Text
$sel:table:DataSyncQuery :: DataSyncMessage -> Text
table, UUID
$sel:id:DataSyncQuery :: DataSyncMessage -> UUID
id :: UUID
id, HashMap Text Value
$sel:patch:DataSyncQuery :: DataSyncMessage -> HashMap Text Value
patch :: HashMap Text Value
patch, Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId, Maybe UUID
transactionId :: Maybe UUID
$sel:transactionId:DataSyncQuery :: DataSyncMessage -> Maybe UUID
transactionId } = do
Text -> IO TableWithRLS
ensureRLSEnabled Text
table
let columns :: [Identifier]
columns = HashMap Text Value
patch
HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
[Text] -> ([Text] -> [Text]) -> [Text]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
fieldNameToColumnName
[Text] -> ([Text] -> [Identifier]) -> [Identifier]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier
let values :: [Action]
values = HashMap Text Value
patch
HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
[Value] -> ([Value] -> [Action]) -> [Action]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue
let keyValues :: [(Identifier, Action)]
keyValues = [Identifier] -> [Action] -> [(Identifier, Action)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Identifier]
columns [Action]
values
let setCalls :: ByteString
setCalls = [(Identifier, Action)]
keyValues
[(Identifier, Action)]
-> ([(Identifier, Action)] -> [ByteString]) -> [ByteString]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> ((Identifier, Action) -> ByteString)
-> [(Identifier, Action)] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier, Action)
_ -> ByteString
"? = ?")
[ByteString] -> ([ByteString] -> ByteString) -> ByteString
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> ByteString -> [ByteString] -> ByteString
ByteString.intercalate ByteString
", "
let query :: ByteString
query = ByteString
"UPDATE ? SET " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
setCalls ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" WHERE id = ? RETURNING *"
let params :: [Action]
params = [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField (Text -> Identifier
PG.Identifier Text
table)]
[Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> ([[Action]] -> [Action]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (((Identifier, Action) -> [Action])
-> [(Identifier, Action)] -> [[Action]]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier
key, Action
value) -> [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField Identifier
key, Action
value]) [(Identifier, Action)]
keyValues))
[Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [UUID -> Action
forall a. ToField a => a -> Action
PG.toField UUID
id]
[[Field]]
result :: [[Field]] <- Maybe UUID -> Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId (ByteString -> Query
PG.Query ByteString
query) [Action]
params
case [[Field]]
result of
[[Field]
record] -> DataSyncResponse -> IO ()
sendJSON DidUpdateRecord :: Int -> [Field] -> DataSyncResponse
DidUpdateRecord { Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId, [Field]
record :: [Field]
$sel:record:DataSyncResult :: [Field]
record }
[[Field]]
otherwise -> Text -> IO ()
forall a. Text -> a
error Text
"Could not apply the update to the given record. Are you sure the record ID you passed is correct? If the record ID is correct, likely the row level security policy is not making the record visible to the UPDATE operation."
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handleMessage UpdateRecordsMessage { Text
table :: Text
$sel:table:DataSyncQuery :: DataSyncMessage -> Text
table, [UUID]
$sel:ids:DataSyncQuery :: DataSyncMessage -> [UUID]
ids :: [UUID]
ids, HashMap Text Value
patch :: HashMap Text Value
$sel:patch:DataSyncQuery :: DataSyncMessage -> HashMap Text Value
patch, Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId, Maybe UUID
transactionId :: Maybe UUID
$sel:transactionId:DataSyncQuery :: DataSyncMessage -> Maybe UUID
transactionId } = do
Text -> IO TableWithRLS
ensureRLSEnabled Text
table
let columns :: [Identifier]
columns = HashMap Text Value
patch
HashMap Text Value -> (HashMap Text Value -> [Text]) -> [Text]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Text]
forall k v. HashMap k v -> [k]
HashMap.keys
[Text] -> ([Text] -> [Text]) -> [Text]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
fieldNameToColumnName
[Text] -> ([Text] -> [Identifier]) -> [Identifier]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (Text -> Identifier) -> [Text] -> [Identifier]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Identifier
PG.Identifier
let values :: [Action]
values = HashMap Text Value
patch
HashMap Text Value -> (HashMap Text Value -> [Value]) -> [Value]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> HashMap Text Value -> [Value]
forall k v. HashMap k v -> [v]
HashMap.elems
[Value] -> ([Value] -> [Action]) -> [Action]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> (Value -> Action) -> [Value] -> [Action]
forall a b. (a -> b) -> [a] -> [b]
map Value -> Action
aesonValueToPostgresValue
let keyValues :: [(Identifier, Action)]
keyValues = [Identifier] -> [Action] -> [(Identifier, Action)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Identifier]
columns [Action]
values
let setCalls :: ByteString
setCalls = [(Identifier, Action)]
keyValues
[(Identifier, Action)]
-> ([(Identifier, Action)] -> [ByteString]) -> [ByteString]
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> ((Identifier, Action) -> ByteString)
-> [(Identifier, Action)] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier, Action)
_ -> ByteString
"? = ?")
[ByteString] -> ([ByteString] -> ByteString) -> ByteString
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> ByteString -> [ByteString] -> ByteString
ByteString.intercalate ByteString
", "
let query :: ByteString
query = ByteString
"UPDATE ? SET " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
setCalls ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" WHERE id IN ? RETURNING *"
let params :: [Action]
params = [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField (Text -> Identifier
PG.Identifier Text
table)]
[Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> ([[Action]] -> [Action]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (((Identifier, Action) -> [Action])
-> [(Identifier, Action)] -> [[Action]]
forall a b. (a -> b) -> [a] -> [b]
map (\(Identifier
key, Action
value) -> [Identifier -> Action
forall a. ToField a => a -> Action
PG.toField Identifier
key, Action
value]) [(Identifier, Action)]
keyValues))
[Action] -> [Action] -> [Action]
forall a. Semigroup a => a -> a -> a
<> [In [UUID] -> Action
forall a. ToField a => a -> Action
PG.toField ([UUID] -> In [UUID]
forall a. a -> In a
PG.In [UUID]
ids)]
[[Field]]
records <- Maybe UUID -> Query -> [Action] -> IO [[Field]]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, FromRow result,
?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId (ByteString -> Query
PG.Query ByteString
query) [Action]
params
DataSyncResponse -> IO ()
sendJSON DidUpdateRecords :: Int -> [[Field]] -> DataSyncResponse
DidUpdateRecords { Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId, [[Field]]
records :: [[Field]]
$sel:records:DataSyncResult :: [[Field]]
records }
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handleMessage DeleteRecordMessage { Text
table :: Text
$sel:table:DataSyncQuery :: DataSyncMessage -> Text
table, UUID
id :: UUID
$sel:id:DataSyncQuery :: DataSyncMessage -> UUID
id, Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId, Maybe UUID
transactionId :: Maybe UUID
$sel:transactionId:DataSyncQuery :: DataSyncMessage -> Maybe UUID
transactionId } = do
Text -> IO TableWithRLS
ensureRLSEnabled Text
table
Maybe UUID -> Query -> (Identifier, UUID) -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId Maybe UUID
transactionId Query
"DELETE FROM ? WHERE id = ?" (Text -> Identifier
PG.Identifier Text
table, UUID
id)
DataSyncResponse -> IO ()
sendJSON DidDeleteRecord :: Int -> DataSyncResponse
DidDeleteRecord { Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId }
handleMessage DeleteRecordsMessage { Text
table :: Text
$sel:table:DataSyncQuery :: DataSyncMessage -> Text
table, [UUID]
ids :: [UUID]
$sel:ids:DataSyncQuery :: DataSyncMessage -> [UUID]
ids, Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId, Maybe UUID
transactionId :: Maybe UUID
$sel:transactionId:DataSyncQuery :: DataSyncMessage -> Maybe UUID
transactionId } = do
Text -> IO TableWithRLS
ensureRLSEnabled Text
table
Maybe UUID -> Query -> (Identifier, In [UUID]) -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId Maybe UUID
transactionId Query
"DELETE FROM ? WHERE id IN ?" (Text -> Identifier
PG.Identifier Text
table, [UUID] -> In [UUID]
forall a. a -> In a
PG.In [UUID]
ids)
DataSyncResponse -> IO ()
sendJSON DidDeleteRecords :: Int -> DataSyncResponse
DidDeleteRecords { Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId }
handleMessage StartTransaction { Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId } = do
IO ()
(?state::IORef DataSyncController, ?context::ControllerContext) =>
IO ()
ensureBelowTransactionLimit
UUID
transactionId <- IO UUID
UUID.nextRandom
let takeConnection :: IO (Connection, LocalPool Connection)
takeConnection = ?modelContext::ModelContext
ModelContext
?modelContext
ModelContext
-> (ModelContext -> Pool Connection) -> Pool Connection
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "connectionPool" -> ModelContext -> Pool Connection
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "connectionPool" (Proxy "connectionPool")
Proxy "connectionPool"
#connectionPool
Pool Connection
-> (Pool Connection -> IO (Connection, LocalPool Connection))
-> IO (Connection, LocalPool Connection)
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Pool Connection -> IO (Connection, LocalPool Connection)
forall a. Pool a -> IO (a, LocalPool a)
Pool.takeResource
let releaseConnection :: (Connection, LocalPool Connection) -> IO ()
releaseConnection (Connection
connection, LocalPool Connection
localPool) = do
Connection -> Query -> () -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PG.execute Connection
connection Query
"ROLLBACK" ()
LocalPool Connection -> Connection -> IO ()
forall a. LocalPool a -> a -> IO ()
Pool.putResource LocalPool Connection
localPool Connection
connection
IO (Connection, LocalPool Connection)
-> ((Connection, LocalPool Connection) -> IO ())
-> ((Connection, LocalPool Connection) -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
Exception.bracket IO (Connection, LocalPool Connection)
takeConnection (Connection, LocalPool Connection) -> IO ()
releaseConnection \(Connection
connection, LocalPool Connection
localPool) -> do
MVar ()
transactionSignal <- IO (MVar ())
forall a. IO (MVar a)
MVar.newEmptyMVar
let globalModelContext :: ModelContext
globalModelContext = ?modelContext::ModelContext
ModelContext
?modelContext
let ?modelContext = globalModelContext { transactionConnection = Just connection } in Query -> () -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId) =>
Query -> parameters -> IO Int64
sqlExecWithRLS Query
"BEGIN" ()
let transaction :: DataSyncTransaction
transaction = DataSyncTransaction :: UUID -> Connection -> MVar () -> DataSyncTransaction
DataSyncTransaction
{ $sel:id:DataSyncTransaction :: UUID
id = UUID
transactionId
, Connection
$sel:connection:DataSyncTransaction :: Connection
connection :: Connection
connection
, $sel:close:DataSyncTransaction :: MVar ()
close = MVar ()
transactionSignal
}
IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "transactions"
-> (HashMap UUID DataSyncTransaction
-> HashMap UUID DataSyncTransaction)
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify IsLabel "transactions" (Proxy "transactions")
Proxy "transactions"
#transactions (UUID
-> DataSyncTransaction
-> HashMap UUID DataSyncTransaction
-> HashMap UUID DataSyncTransaction
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HashMap.insert UUID
transactionId DataSyncTransaction
transaction))
DataSyncResponse -> IO ()
sendJSON DidStartTransaction :: Int -> UUID -> DataSyncResponse
DidStartTransaction { Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId, UUID
$sel:transactionId:DataSyncResult :: UUID
transactionId :: UUID
transactionId }
MVar () -> IO ()
forall a. MVar a -> IO a
MVar.takeMVar MVar ()
transactionSignal
IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "transactions"
-> (HashMap UUID DataSyncTransaction
-> HashMap UUID DataSyncTransaction)
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify IsLabel "transactions" (Proxy "transactions")
Proxy "transactions"
#transactions (UUID
-> HashMap UUID DataSyncTransaction
-> HashMap UUID DataSyncTransaction
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HashMap.delete UUID
transactionId))
handleMessage RollbackTransaction { Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId, UUID
id :: UUID
$sel:id:DataSyncQuery :: DataSyncMessage -> UUID
id } = do
DataSyncTransaction { UUID
id :: UUID
$sel:id:DataSyncTransaction :: DataSyncTransaction -> UUID
id, MVar ()
close :: MVar ()
$sel:close:DataSyncTransaction :: DataSyncTransaction -> MVar ()
close } <- (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
UUID -> IO DataSyncTransaction
findTransactionById UUID
id
Maybe UUID -> Query -> () -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId (UUID -> Maybe UUID
forall a. a -> Maybe a
Just UUID
id) Query
"ROLLBACK" ()
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar MVar ()
close ()
DataSyncResponse -> IO ()
sendJSON DidRollbackTransaction :: Int -> UUID -> DataSyncResponse
DidRollbackTransaction { Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId, $sel:transactionId:DataSyncResult :: UUID
transactionId = UUID
id }
handleMessage CommitTransaction { Int
requestId :: Int
$sel:requestId:DataSyncQuery :: DataSyncMessage -> Int
requestId, UUID
id :: UUID
$sel:id:DataSyncQuery :: DataSyncMessage -> UUID
id } = do
DataSyncTransaction { UUID
id :: UUID
$sel:id:DataSyncTransaction :: DataSyncTransaction -> UUID
id, MVar ()
close :: MVar ()
$sel:close:DataSyncTransaction :: DataSyncTransaction -> MVar ()
close } <- (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
UUID -> IO DataSyncTransaction
findTransactionById UUID
id
Maybe UUID -> Query -> () -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, ?state::IORef DataSyncController) =>
Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId (UUID -> Maybe UUID
forall a. a -> Maybe a
Just UUID
id) Query
"COMMIT" ()
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar MVar ()
close ()
DataSyncResponse -> IO ()
sendJSON DidCommitTransaction :: Int -> UUID -> DataSyncResponse
DidCommitTransaction { Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId, $sel:transactionId:DataSyncResult :: UUID
transactionId = UUID
id }
handleMessage DataSyncMessage
otherwise = (DataSyncResponse -> IO ()) -> DataSyncMessage -> IO ()
handleCustomMessage DataSyncResponse -> IO ()
sendJSON DataSyncMessage
otherwise
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
Either String DataSyncMessage
message <- ByteString -> Either String DataSyncMessage
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecodeStrict' (ByteString -> Either String DataSyncMessage)
-> IO ByteString -> IO (Either String DataSyncMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ByteString
receiveData
case Either String DataSyncMessage
message of
Right DataSyncMessage
decodedMessage -> do
let requestId :: Int
requestId = Proxy "requestId" -> DataSyncMessage -> Int
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "requestId" (Proxy "requestId")
Proxy "requestId"
#requestId DataSyncMessage
decodedMessage
((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
Exception.mask \forall a. IO a -> IO a
restore -> do
Async ()
handlerProcess <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
restore do
Either SomeException ()
result <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (DataSyncMessage -> IO ()
handleMessage DataSyncMessage
decodedMessage)
case Either SomeException ()
result of
Left (SomeException
e :: Exception.SomeException) -> do
let errorMessage :: Text
errorMessage = case SomeException -> Maybe EnhancedSqlError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just (EnhancedSqlError
enhancedSqlError :: EnhancedSqlError) -> ByteString -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (Proxy "sqlErrorMsg" -> SqlError -> ByteString
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "sqlErrorMsg" (Proxy "sqlErrorMsg")
Proxy "sqlErrorMsg"
#sqlErrorMsg (Proxy "sqlError" -> EnhancedSqlError -> SqlError
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "sqlError" (Proxy "sqlError")
Proxy "sqlError"
#sqlError EnhancedSqlError
enhancedSqlError))
Maybe EnhancedSqlError
Nothing -> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
e)
Text -> IO ()
forall context string.
(?context::context, LoggingProvider context, ToLogStr string) =>
string -> IO ()
Log.error (SomeException -> Text
forall a. Show a => a -> Text
tshow SomeException
e)
DataSyncResponse -> IO ()
sendJSON DataSyncError :: Int -> Text -> DataSyncResponse
DataSyncError { Int
requestId :: Int
$sel:requestId:DataSyncResult :: Int
requestId, Text
$sel:errorMessage:DataSyncResult :: Text
errorMessage :: Text
errorMessage }
Right ()
result -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
IORef DataSyncController
-> (DataSyncController -> DataSyncController) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' ?state::IORef DataSyncController
IORef DataSyncController
?state (\DataSyncController
state -> DataSyncController
state DataSyncController
-> (DataSyncController -> DataSyncController) -> DataSyncController
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "asyncs"
-> ([Async ()] -> [Async ()])
-> DataSyncController
-> DataSyncController
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value,
SetField name model value) =>
Proxy name -> (value -> value) -> model -> model
modify IsLabel "asyncs" (Proxy "asyncs")
Proxy "asyncs"
#asyncs (Async ()
handlerProcessAsync () -> [Async ()] -> [Async ()]
forall a. a -> [a] -> [a]
:))
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left String
errorMessage -> DataSyncResponse -> IO ()
sendJSON FailedToDecodeMessageError :: Text -> DataSyncResponse
FailedToDecodeMessageError { $sel:errorMessage:DataSyncResult :: Text
errorMessage = String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs String
errorMessage }
{-# INLINE runDataSyncController #-}
cleanupAllSubscriptions :: _ => (?state :: IORef DataSyncController, ?applicationContext :: ApplicationContext) => IO ()
cleanupAllSubscriptions :: (?state::IORef DataSyncController,
?applicationContext::ApplicationContext) =>
IO ()
cleanupAllSubscriptions = do
DataSyncController
state <- IO DataSyncController
forall state. (?state::IORef state) => IO state
getState
let pgListener :: PGListener
pgListener = ?applicationContext::ApplicationContext
ApplicationContext
?applicationContext ApplicationContext
-> (ApplicationContext -> PGListener) -> PGListener
forall t1 t2. t1 -> (t1 -> t2) -> t2
|> Proxy "pgListener" -> ApplicationContext -> PGListener
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "pgListener" (Proxy "pgListener")
Proxy "pgListener"
#pgListener
case DataSyncController
state of
DataSyncReady { [Async ()]
asyncs :: [Async ()]
$sel:asyncs:DataSyncController :: DataSyncController -> [Async ()]
asyncs } -> [Async ()] -> (Element [Async ()] -> IO ()) -> IO ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
mono -> (Element mono -> m ()) -> m ()
forEach [Async ()]
asyncs Element [Async ()] -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel
DataSyncController
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
changesToValue :: [ChangeNotifications.Change] -> Value
changesToValue :: [Change] -> Value
changesToValue [Change]
changes = [Pair] -> Value
object ((Change -> Pair) -> [Change] -> [Pair]
forall a b. (a -> b) -> [a] -> [b]
map Change -> Pair
forall kv. KeyValue kv => Change -> kv
changeToPair [Change]
changes)
where
changeToPair :: Change -> kv
changeToPair ChangeNotifications.Change { Text
$sel:col:Change :: Change -> Text
col :: Text
col, Value
$sel:new:Change :: Change -> Value
new :: Value
new } = (Text -> Text
columnNameToFieldName Text
col) Text -> Value -> kv
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Value
new
runInModelContextWithTransaction :: (?state :: IORef DataSyncController, _) => ((?modelContext :: ModelContext) => IO result) -> Maybe UUID -> IO result
runInModelContextWithTransaction :: ((?modelContext::ModelContext) => IO result)
-> Maybe UUID -> IO result
runInModelContextWithTransaction (?modelContext::ModelContext) => IO result
function (Just UUID
transactionId) = do
let globalModelContext :: ModelContext
globalModelContext = ?modelContext::ModelContext
ModelContext
?modelContext
DataSyncTransaction { Connection
connection :: Connection
$sel:connection:DataSyncTransaction :: DataSyncTransaction -> Connection
connection } <- (?state::IORef DataSyncController) =>
UUID -> IO DataSyncTransaction
UUID -> IO DataSyncTransaction
findTransactionById UUID
transactionId
let
?modelContext = globalModelContext { transactionConnection = Just connection }
in
IO result
(?modelContext::ModelContext) => IO result
function
runInModelContextWithTransaction (?modelContext::ModelContext) => IO result
function Maybe UUID
Nothing = IO result
(?modelContext::ModelContext) => IO result
function
findTransactionById :: (?state :: IORef DataSyncController) => UUID -> IO DataSyncTransaction
findTransactionById :: UUID -> IO DataSyncTransaction
findTransactionById UUID
transactionId = do
HashMap UUID DataSyncTransaction
transactions <- Proxy "transactions"
-> DataSyncController -> HashMap UUID DataSyncTransaction
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "transactions" (Proxy "transactions")
Proxy "transactions"
#transactions (DataSyncController -> HashMap UUID DataSyncTransaction)
-> IO DataSyncController -> IO (HashMap UUID DataSyncTransaction)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef DataSyncController -> IO DataSyncController
forall a. IORef a -> IO a
readIORef ?state::IORef DataSyncController
IORef DataSyncController
?state
case UUID
-> HashMap UUID DataSyncTransaction -> Maybe DataSyncTransaction
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup UUID
transactionId HashMap UUID DataSyncTransaction
transactions of
Just DataSyncTransaction
transaction -> DataSyncTransaction -> IO DataSyncTransaction
forall (f :: * -> *) a. Applicative f => a -> f a
pure DataSyncTransaction
transaction
Maybe DataSyncTransaction
Nothing -> Text -> IO DataSyncTransaction
forall a. Text -> a
error Text
"No transaction with that id"
ensureBelowTransactionLimit :: (?state :: IORef DataSyncController, ?context :: ControllerContext) => IO ()
ensureBelowTransactionLimit :: IO ()
ensureBelowTransactionLimit = do
HashMap UUID DataSyncTransaction
transactions <- Proxy "transactions"
-> DataSyncController -> HashMap UUID DataSyncTransaction
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "transactions" (Proxy "transactions")
Proxy "transactions"
#transactions (DataSyncController -> HashMap UUID DataSyncTransaction)
-> IO DataSyncController -> IO (HashMap UUID DataSyncTransaction)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef DataSyncController -> IO DataSyncController
forall a. IORef a -> IO a
readIORef ?state::IORef DataSyncController
IORef DataSyncController
?state
let transactionCount :: Int
transactionCount = HashMap UUID DataSyncTransaction -> Int
forall k v. HashMap k v -> Int
HashMap.size HashMap UUID DataSyncTransaction
transactions
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
transactionCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
forall context. (?context::context, ConfigProvider context) => Int
maxTransactionsPerConnection) do
Text -> IO ()
forall a. Text -> a
error (Text
"You've reached the transaction limit of " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow Int
forall context. (?context::context, ConfigProvider context) => Int
maxTransactionsPerConnection Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" transactions")
ensureBelowSubscriptionsLimit :: (?state :: IORef DataSyncController, ?context :: ControllerContext) => IO ()
ensureBelowSubscriptionsLimit :: IO ()
ensureBelowSubscriptionsLimit = do
HashMap UUID (MVar ())
subscriptions <- Proxy "subscriptions"
-> DataSyncController -> HashMap UUID (MVar ())
forall model (name :: Symbol) value.
(KnownSymbol name, HasField name model value) =>
Proxy name -> model -> value
get IsLabel "subscriptions" (Proxy "subscriptions")
Proxy "subscriptions"
#subscriptions (DataSyncController -> HashMap UUID (MVar ()))
-> IO DataSyncController -> IO (HashMap UUID (MVar ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef DataSyncController -> IO DataSyncController
forall a. IORef a -> IO a
readIORef ?state::IORef DataSyncController
IORef DataSyncController
?state
let subscriptionsCount :: Int
subscriptionsCount = HashMap UUID (MVar ()) -> Int
forall k v. HashMap k v -> Int
HashMap.size HashMap UUID (MVar ())
subscriptions
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
subscriptionsCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
forall context. (?context::context, ConfigProvider context) => Int
maxSubscriptionsPerConnection) do
Text -> IO ()
forall a. Text -> a
error (Text
"You've reached the subscriptions limit of " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow Int
forall context. (?context::context, ConfigProvider context) => Int
maxSubscriptionsPerConnection Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" subscriptions")
maxTransactionsPerConnection :: _ => Int
maxTransactionsPerConnection :: Int
maxTransactionsPerConnection =
case forall context.
(?context::context, ConfigProvider context,
Typeable DataSyncMaxTransactionsPerConnection) =>
DataSyncMaxTransactionsPerConnection
forall configParameter context.
(?context::context, ConfigProvider context,
Typeable configParameter) =>
configParameter
getAppConfig @DataSyncMaxTransactionsPerConnection of
DataSyncMaxTransactionsPerConnection Int
value -> Int
value
maxSubscriptionsPerConnection :: _ => Int
maxSubscriptionsPerConnection :: Int
maxSubscriptionsPerConnection =
case forall context.
(?context::context, ConfigProvider context,
Typeable DataSyncMaxSubscriptionsPerConnection) =>
DataSyncMaxSubscriptionsPerConnection
forall configParameter context.
(?context::context, ConfigProvider context,
Typeable configParameter) =>
configParameter
getAppConfig @DataSyncMaxSubscriptionsPerConnection of
DataSyncMaxSubscriptionsPerConnection Int
value -> Int
value
sqlQueryWithRLSAndTransactionId ::
( ?modelContext :: ModelContext
, PG.ToRow parameters
, ?context :: ControllerContext
, userId ~ Id CurrentUserRecord
, Show (PrimaryKey (GetTableName CurrentUserRecord))
, HasNewSessionUrl CurrentUserRecord
, Typeable CurrentUserRecord
, ?context :: ControllerContext
, HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
, PG.ToField userId
, FromRow result
, ?state :: IORef DataSyncController
) => Maybe UUID -> PG.Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId :: Maybe UUID -> Query -> parameters -> IO [result]
sqlQueryWithRLSAndTransactionId Maybe UUID
transactionId Query
theQuery parameters
theParams = ((?modelContext::ModelContext) => IO [result])
-> Maybe UUID -> IO [result]
forall result.
(?state::IORef DataSyncController, ?modelContext::ModelContext) =>
((?modelContext::ModelContext) => IO result)
-> Maybe UUID -> IO result
runInModelContextWithTransaction (Query -> parameters -> IO [result]
forall parameters userId result.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId, FromRow result) =>
Query -> parameters -> IO [result]
sqlQueryWithRLS Query
theQuery parameters
theParams) Maybe UUID
transactionId
sqlExecWithRLSAndTransactionId ::
( ?modelContext :: ModelContext
, PG.ToRow parameters
, ?context :: ControllerContext
, userId ~ Id CurrentUserRecord
, Show (PrimaryKey (GetTableName CurrentUserRecord))
, HasNewSessionUrl CurrentUserRecord
, Typeable CurrentUserRecord
, ?context :: ControllerContext
, HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
, PG.ToField userId
, ?state :: IORef DataSyncController
) => Maybe UUID -> PG.Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId :: Maybe UUID -> Query -> parameters -> IO Int64
sqlExecWithRLSAndTransactionId Maybe UUID
transactionId Query
theQuery parameters
theParams = ((?modelContext::ModelContext) => IO Int64)
-> Maybe UUID -> IO Int64
forall result.
(?state::IORef DataSyncController, ?modelContext::ModelContext) =>
((?modelContext::ModelContext) => IO result)
-> Maybe UUID -> IO result
runInModelContextWithTransaction (Query -> parameters -> IO Int64
forall parameters userId.
(?modelContext::ModelContext, ToRow parameters,
?context::ControllerContext, userId ~ Id CurrentUserRecord,
Show (PrimaryKey (GetTableName CurrentUserRecord)),
HasNewSessionUrl CurrentUserRecord, Typeable CurrentUserRecord,
?context::ControllerContext,
HasField "id" CurrentUserRecord (Id CurrentUserRecord),
ToField userId) =>
Query -> parameters -> IO Int64
sqlExecWithRLS Query
theQuery parameters
theParams) Maybe UUID
transactionId
$(deriveFromJSON defaultOptions 'DataSyncQuery)
$(deriveToJSON defaultOptions 'DataSyncResult)
instance SetField "subscriptions" DataSyncController (HashMap UUID (MVar.MVar ())) where
setField :: HashMap UUID (MVar ()) -> DataSyncController -> DataSyncController
setField HashMap UUID (MVar ())
subscriptions DataSyncController
record = DataSyncController
record { HashMap UUID (MVar ())
subscriptions :: HashMap UUID (MVar ())
$sel:subscriptions:DataSyncController :: HashMap UUID (MVar ())
subscriptions }
instance SetField "transactions" DataSyncController (HashMap UUID DataSyncTransaction) where
setField :: HashMap UUID DataSyncTransaction
-> DataSyncController -> DataSyncController
setField HashMap UUID DataSyncTransaction
transactions DataSyncController
record = DataSyncController
record { HashMap UUID DataSyncTransaction
transactions :: HashMap UUID DataSyncTransaction
$sel:transactions:DataSyncController :: HashMap UUID DataSyncTransaction
transactions }
instance SetField "asyncs" DataSyncController [Async ()] where
setField :: [Async ()] -> DataSyncController -> DataSyncController
setField [Async ()]
asyncs DataSyncController
record = DataSyncController
record { [Async ()]
asyncs :: [Async ()]
$sel:asyncs:DataSyncController :: [Async ()]
asyncs }