{-# LANGUAGE ApplicativeDo, BangPatterns, TypeFamilies, DataKinds, PolyKinds, TypeApplications, ScopedTypeVariables, ConstraintKinds, TypeOperators, GADTs, UndecidableInstances, FlexibleContexts, AllowAmbiguousTypes #-}

{-|
Module: IHP.FetchPipelined
Description: Fetch multiple independent queries in a single database round trip
Copyright: (c) digitally induced GmbH, 2026

Uses PostgreSQL's pipeline mode (via hasql) to send multiple independent queries
in a single network round trip. This is especially beneficial for cloud database
deployments where round-trip latency is 1-5ms.

Compose queries using @do@ notation (via @ApplicativeDo@):

> (users, posts) <- pipeline do
>     users <- query @User |> fetchPipelined
>     posts <- query @Post |> orderByDesc #createdAt |> fetchPipelined
>     pure (users, posts)

'Pipeline' is 'Applicative' but NOT 'Monad', which enforces at the type level
that only independent queries can be pipelined. @ApplicativeDo@ desugars the
@do@ block into applicative operations, so each line runs as a separate query
in the same pipeline batch.
-}
module IHP.FetchPipelined
( fetchPipelined
, fetchVectorPipelined
, fetchOneOrNothingPipelined
, fetchCountPipelined
, fetchExistsPipelined
, pipeline
, Pipeline.Pipeline
) where

import IHP.Prelude
import IHP.ModelSupport
import IHP.QueryBuilder
import IHP.Hasql.FromRow (FromRowHasql(..))
import IHP.Fetch.Statement (buildQueryListStatement, buildQueryVectorStatement, buildQueryMaybeStatement, buildCountStatement, buildExistsStatement)
import qualified Hasql.Decoders as Decoders
import qualified Hasql.Encoders as Encoders
import qualified Hasql.Pipeline as Pipeline
import qualified Hasql.Session as HasqlSession
import qualified Hasql.Statement as HasqlStatement
import qualified IHP.Log as Log
import IHP.Hasql.Pool (usePoolWithRetry)
import Data.Functor.Contravariant (contramap)
import Data.Functor.Contravariant.Divisible (conquer)

-- | Convert a query builder into a 'Pipeline' step returning all matching rows.
--
-- __Example:__ Fetching users and posts in a single round trip
--
-- > (users, posts) <- pipeline do
-- >     users <- query @User |> filterWhere (#active, True) |> fetchPipelined
-- >     posts <- query @Post |> orderByDesc #createdAt |> fetchPipelined
-- >     pure (users, posts)
fetchPipelined :: forall model table.
    ( Table model
    , model ~ GetModelByTableName table
    , KnownSymbol table
    , FromRowHasql model
    ) => QueryBuilder table -> Pipeline.Pipeline [model]
fetchPipelined :: forall model (table :: Symbol).
(Table model, model ~ GetModelByTableName table, KnownSymbol table,
 FromRowHasql model) =>
QueryBuilder table -> Pipeline [model]
fetchPipelined !QueryBuilder table
queryBuilder = () -> Statement () [model] -> Pipeline [model]
forall params result.
params -> Statement params result -> Pipeline result
Pipeline.statement () (QueryBuilder table -> Statement () [model]
forall model (table :: Symbol).
(Table model, model ~ GetModelByTableName table, KnownSymbol table,
 FromRowHasql model) =>
QueryBuilder table -> Statement () [model]
buildQueryListStatement QueryBuilder table
queryBuilder)
{-# INLINE fetchPipelined #-}

-- | Like 'fetchPipelined', but returns a 'Vector' instead of a list.
--
-- __Example:__
--
-- > (users, posts) <- pipeline do
-- >     users <- query @User |> fetchVectorPipelined
-- >     posts <- query @Post |> fetchVectorPipelined
-- >     pure (users, posts)
fetchVectorPipelined :: forall model table.
    ( Table model
    , model ~ GetModelByTableName table
    , KnownSymbol table
    , FromRowHasql model
    ) => QueryBuilder table -> Pipeline.Pipeline (Vector model)
fetchVectorPipelined :: forall model (table :: Symbol).
(Table model, model ~ GetModelByTableName table, KnownSymbol table,
 FromRowHasql model) =>
QueryBuilder table -> Pipeline (Vector model)
fetchVectorPipelined !QueryBuilder table
queryBuilder = () -> Statement () (Vector model) -> Pipeline (Vector model)
forall params result.
params -> Statement params result -> Pipeline result
Pipeline.statement () (QueryBuilder table -> Statement () (Vector model)
forall model (table :: Symbol).
(Table model, model ~ GetModelByTableName table, KnownSymbol table,
 FromRowHasql model) =>
QueryBuilder table -> Statement () (Vector model)
buildQueryVectorStatement QueryBuilder table
queryBuilder)
{-# INLINE fetchVectorPipelined #-}

-- | Convert a query builder into a 'Pipeline' step returning at most one row.
--
-- __Example:__
--
-- > (maybeUser, posts) <- pipeline do
-- >     maybeUser <- query @User |> filterWhere (#email, email) |> fetchOneOrNothingPipelined
-- >     posts <- query @Post |> fetchPipelined
-- >     pure (maybeUser, posts)
fetchOneOrNothingPipelined :: forall model table.
    ( Table model
    , model ~ GetModelByTableName table
    , KnownSymbol table
    , FromRowHasql model
    ) => QueryBuilder table -> Pipeline.Pipeline (Maybe model)
fetchOneOrNothingPipelined :: forall model (table :: Symbol).
(Table model, model ~ GetModelByTableName table, KnownSymbol table,
 FromRowHasql model) =>
QueryBuilder table -> Pipeline (Maybe model)
fetchOneOrNothingPipelined !QueryBuilder table
queryBuilder = () -> Statement () (Maybe model) -> Pipeline (Maybe model)
forall params result.
params -> Statement params result -> Pipeline result
Pipeline.statement () (QueryBuilder table -> Statement () (Maybe model)
forall model (table :: Symbol).
(Table model, model ~ GetModelByTableName table, KnownSymbol table,
 FromRowHasql model) =>
QueryBuilder table -> Statement () (Maybe model)
buildQueryMaybeStatement QueryBuilder table
queryBuilder)
{-# INLINE fetchOneOrNothingPipelined #-}

-- | Convert a query builder into a 'Pipeline' step returning a count.
--
-- __Example:__
--
-- > (users, userCount) <- pipeline do
-- >     users <- query @User |> fetchPipelined
-- >     userCount <- query @User |> filterWhere (#active, True) |> fetchCountPipelined
-- >     pure (users, userCount)
fetchCountPipelined :: forall table.
    ( KnownSymbol table
    ) => QueryBuilder table -> Pipeline.Pipeline Int
fetchCountPipelined :: forall (table :: Symbol).
KnownSymbol table =>
QueryBuilder table -> Pipeline Int
fetchCountPipelined !QueryBuilder table
queryBuilder = Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Pipeline Int64 -> Pipeline Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> () -> Statement () Int64 -> Pipeline Int64
forall params result.
params -> Statement params result -> Pipeline result
Pipeline.statement () (QueryBuilder table -> Statement () Int64
forall (table :: Symbol).
KnownSymbol table =>
QueryBuilder table -> Statement () Int64
buildCountStatement QueryBuilder table
queryBuilder)
{-# INLINE fetchCountPipelined #-}

-- | Convert a query builder into a 'Pipeline' step returning a boolean.
--
-- __Example:__
--
-- > (users, hasUnread) <- pipeline do
-- >     users <- query @User |> fetchPipelined
-- >     hasUnread <- query @Message |> filterWhere (#isUnread, True) |> fetchExistsPipelined
-- >     pure (users, hasUnread)
fetchExistsPipelined :: forall table.
    ( KnownSymbol table
    ) => QueryBuilder table -> Pipeline.Pipeline Bool
fetchExistsPipelined :: forall (table :: Symbol).
KnownSymbol table =>
QueryBuilder table -> Pipeline Bool
fetchExistsPipelined !QueryBuilder table
queryBuilder = () -> Statement () Bool -> Pipeline Bool
forall params result.
params -> Statement params result -> Pipeline result
Pipeline.statement () (QueryBuilder table -> Statement () Bool
forall (table :: Symbol).
KnownSymbol table =>
QueryBuilder table -> Statement () Bool
buildExistsStatement QueryBuilder table
queryBuilder)
{-# INLINE fetchExistsPipelined #-}

-- | Execute a 'Pipeline' in a single database round trip.
--
-- When row-level security (RLS) is enabled, the pipeline is automatically wrapped
-- with @set_config@ / reset statements to preserve the request's RLS context.
-- These are included in the same pipeline batch, adding no extra round trips.
--
-- __Example:__
--
-- > action DashboardAction = do
-- >     (users, posts, commentCount) <- pipeline do
-- >         users <- query @User |> fetchPipelined
-- >         posts <- query @Post |> orderByDesc #createdAt |> limit 10 |> fetchPipelined
-- >         commentCount <- query @Comment |> fetchCountPipelined
-- >         pure (users, posts, commentCount)
-- >     render DashboardView { .. }
pipeline :: (?modelContext :: ModelContext) => Pipeline.Pipeline a -> IO a
pipeline :: forall a. (?modelContext::ModelContext) => Pipeline a -> IO a
pipeline Pipeline a
thePipeline = do
    let pool :: Pool
pool = ?modelContext::ModelContext
ModelContext
?modelContext.hasqlPool
    -- When RLS is enabled and we're not already in a transaction, wrap the
    -- pipeline with session-scoped set_config/reset statements.  These are
    -- part of the same pipeline batch so they add no extra round trips.
    -- In pipeline mode the server processes statements sequentially, so the
    -- set_config takes effect before the user queries execute.
    let effectivePipeline :: Pipeline a
effectivePipeline = case (?modelContext::ModelContext
ModelContext
?modelContext.transactionRunner, ?modelContext::ModelContext
ModelContext
?modelContext.rowLevelSecurity) of
            (Maybe TransactionRunner
Nothing, Just RowLevelSecurityContext { Text
rlsAuthenticatedRole :: Text
rlsAuthenticatedRole :: RowLevelSecurityContext -> Text
rlsAuthenticatedRole, Text
rlsUserId :: Text
rlsUserId :: RowLevelSecurityContext -> Text
rlsUserId }) ->
                (\()
_ a
a ()
_ -> a
a)
                    (() -> a -> () -> a) -> Pipeline () -> Pipeline (a -> () -> a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Text, Text) -> Statement (Text, Text) () -> Pipeline ()
forall params result.
params -> Statement params result -> Pipeline result
Pipeline.statement (Text
rlsAuthenticatedRole, Text
rlsUserId) Statement (Text, Text) ()
setRLSConfigPipelineStatement
                    Pipeline (a -> () -> a) -> Pipeline a -> Pipeline (() -> a)
forall a b. Pipeline (a -> b) -> Pipeline a -> Pipeline b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Pipeline a
thePipeline
                    Pipeline (() -> a) -> Pipeline () -> Pipeline a
forall a b. Pipeline (a -> b) -> Pipeline a -> Pipeline b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> () -> Statement () () -> Pipeline ()
forall params result.
params -> Statement params result -> Pipeline result
Pipeline.statement () Statement () ()
resetRLSConfigPipelineStatement
            (Maybe TransactionRunner, Maybe RowLevelSecurityContext)
_ -> Pipeline a
thePipeline
    let session :: Session a
session = Pipeline a -> Session a
forall result. Pipeline result -> Session result
HasqlSession.pipeline Pipeline a
effectivePipeline
    let ?context = ?context::ModelContext
?modelContext::ModelContext
ModelContext
?modelContext
    let currentLogLevel :: LogLevel
currentLogLevel = ?modelContext::ModelContext
ModelContext
?modelContext.logger.level
    let runQuery :: IO a
runQuery = case ?modelContext::ModelContext
ModelContext
?modelContext.transactionRunner of
            Just (TransactionRunner forall a. Session a -> IO a
runner) -> Session a -> IO a
forall a. Session a -> IO a
runner Session a
session
            Maybe TransactionRunner
Nothing -> Pool -> Session a -> IO a
forall a. Pool -> Session a -> IO a
usePoolWithRetry Pool
pool Session a
session
    LogLevel -> Text -> IO a -> IO a
forall a.
(?context::ModelContext) =>
LogLevel -> Text -> IO a -> IO a
logQueryTiming LogLevel
currentLogLevel Text
"🔍 Pipeline" IO a
runQuery
{-# INLINABLE pipeline #-}

-- | Session-scoped RLS config for pipeline mode.
--
-- Uses @is_local = false@ (session-scoped) instead of @true@ (transaction-local)
-- because pipeline mode runs each statement in its own implicit transaction.
-- The companion 'resetRLSConfigPipelineStatement' resets these at the end of
-- the pipeline batch.
setRLSConfigPipelineStatement :: HasqlStatement.Statement (Text, Text) ()
setRLSConfigPipelineStatement :: Statement (Text, Text) ()
setRLSConfigPipelineStatement = Text
-> Params (Text, Text) -> Result () -> Statement (Text, Text) ()
forall params result.
Text -> Params params -> Result result -> Statement params result
HasqlStatement.preparable
    Text
"SELECT set_config('role', $1, false), set_config('rls.ihp_user_id', $2, false)"
    (((Text, Text) -> Text) -> Params Text -> Params (Text, Text)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (Text, Text) -> Text
forall a b. (a, b) -> a
fst (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text))
     Params (Text, Text) -> Params (Text, Text) -> Params (Text, Text)
forall a. Semigroup a => a -> a -> a
<> ((Text, Text) -> Text) -> Params Text -> Params (Text, Text)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (Text, Text) -> Text
forall a b. (a, b) -> b
snd (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
Encoders.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
Encoders.nonNullable Value Text
Encoders.text)))
    (Row () -> Result ()
forall a. Row a -> Result a
Decoders.singleRow (NullableOrNot Value (Maybe Text) -> Row (Maybe Text)
forall a. NullableOrNot Value a -> Row a
Decoders.column (Value Text -> NullableOrNot Value (Maybe Text)
forall (decoder :: * -> *) a.
decoder a -> NullableOrNot decoder (Maybe a)
Decoders.nullable Value Text
Decoders.text) Row (Maybe Text) -> Row (Maybe Text) -> Row (Maybe Text)
forall a b. Row a -> Row b -> Row b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> NullableOrNot Value (Maybe Text) -> Row (Maybe Text)
forall a. NullableOrNot Value a -> Row a
Decoders.column (Value Text -> NullableOrNot Value (Maybe Text)
forall (decoder :: * -> *) a.
decoder a -> NullableOrNot decoder (Maybe a)
Decoders.nullable Value Text
Decoders.text) Row (Maybe Text) -> Row () -> Row ()
forall a b. Row a -> Row b -> Row b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> () -> Row ()
forall a. a -> Row a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))

-- | Reset role and RLS user to connection defaults after the pipeline completes.
--
-- Uses @session_user@ to restore the original connection role, matching
-- the behavior of @RESET ROLE@.
resetRLSConfigPipelineStatement :: HasqlStatement.Statement () ()
resetRLSConfigPipelineStatement :: Statement () ()
resetRLSConfigPipelineStatement = Text -> Params () -> Result () -> Statement () ()
forall params result.
Text -> Params params -> Result result -> Statement params result
HasqlStatement.preparable
    Text
"SELECT set_config('role', session_user::text, false), set_config('rls.ihp_user_id', '', false)"
    Params ()
forall a. Params a
forall (f :: * -> *) a. Divisible f => f a
conquer
    (Row () -> Result ()
forall a. Row a -> Result a
Decoders.singleRow (NullableOrNot Value (Maybe Text) -> Row (Maybe Text)
forall a. NullableOrNot Value a -> Row a
Decoders.column (Value Text -> NullableOrNot Value (Maybe Text)
forall (decoder :: * -> *) a.
decoder a -> NullableOrNot decoder (Maybe a)
Decoders.nullable Value Text
Decoders.text) Row (Maybe Text) -> Row (Maybe Text) -> Row (Maybe Text)
forall a b. Row a -> Row b -> Row b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> NullableOrNot Value (Maybe Text) -> Row (Maybe Text)
forall a. NullableOrNot Value a -> Row a
Decoders.column (Value Text -> NullableOrNot Value (Maybe Text)
forall (decoder :: * -> *) a.
decoder a -> NullableOrNot decoder (Maybe a)
Decoders.nullable Value Text
Decoders.text) Row (Maybe Text) -> Row () -> Row ()
forall a b. Row a -> Row b -> Row b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> () -> Row ()
forall a. a -> Row a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))