0.1.17 (#18)
Co-authored-by: Julien Valverdé <julien.valverde@mailo.com> Reviewed-on: #18
This commit was merged in pull request #18.
This commit is contained in:
20
src/TRPC/TRPCBuilder.ts
Normal file
20
src/TRPC/TRPCBuilder.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import { Context, Effect, Layer } from "effect"
|
||||
import { type TRPCContext } from "./TRPCContext"
|
||||
import { importTRPCServer } from "./importTRPCServer"
|
||||
|
||||
|
||||
const createTRPC = <R>() => importTRPCServer.pipe(Effect.map(({ initTRPC }) =>
|
||||
initTRPC.context<TRPCContext<R>>().create()
|
||||
))
|
||||
|
||||
export const Identifier = "@thilalib/TRPC/TRPCBuilder"
|
||||
export interface TRPCBuilder<R> extends Context.Tag<typeof Identifier, TRPCBuilderService<R>> {}
|
||||
export interface TRPCBuilderService<R> extends Effect.Effect.Success<ReturnType<typeof createTRPC<R>>> {}
|
||||
|
||||
|
||||
export const make = <R>() => {
|
||||
const TRPCBuilder = Context.GenericTag<typeof Identifier, TRPCBuilderService<R>>(Identifier)
|
||||
const TRPCBuilderLive = Layer.effect(TRPCBuilder, createTRPC())
|
||||
|
||||
return { TRPCBuilder, TRPCBuilderLive } as const
|
||||
}
|
||||
38
src/TRPC/TRPCContext.ts
Normal file
38
src/TRPC/TRPCContext.ts
Normal file
@@ -0,0 +1,38 @@
|
||||
import type { TRPCError } from "@trpc/server"
|
||||
import { Data, type Effect, type Runtime } from "effect"
|
||||
import type { RuntimeFiber } from "effect/Fiber"
|
||||
import type express from "express"
|
||||
import type { IncomingMessage } from "node:http"
|
||||
import type { WebSocket } from "ws"
|
||||
|
||||
|
||||
export interface TRPCContext<R> {
|
||||
readonly runtime: Runtime.Runtime<R>
|
||||
|
||||
readonly run: <A, E>(
|
||||
effect: Effect.Effect<A, E, R>,
|
||||
options?: { readonly signal?: AbortSignal },
|
||||
) => Promise<A>
|
||||
|
||||
readonly fork: <A, E>(
|
||||
effect: Effect.Effect<A, E, R>,
|
||||
options?: Runtime.RunForkOptions,
|
||||
) => RuntimeFiber<A, TRPCError>
|
||||
|
||||
readonly transaction: TRPCContextTransaction
|
||||
}
|
||||
|
||||
|
||||
export type TRPCContextTransaction = Data.TaggedEnum<{
|
||||
readonly Express: {
|
||||
readonly req: express.Request
|
||||
readonly res: express.Response
|
||||
}
|
||||
|
||||
readonly WebSocket: {
|
||||
readonly req: IncomingMessage
|
||||
readonly res: WebSocket
|
||||
}
|
||||
}>
|
||||
|
||||
export const TRPCContextTransactionEnum = Data.taggedEnum<TRPCContextTransaction>()
|
||||
57
src/TRPC/TRPCContextCreator.ts
Normal file
57
src/TRPC/TRPCContextCreator.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import type { CreateExpressContextOptions } from "@trpc/server/adapters/express"
|
||||
import type { CreateWSSContextFnOptions } from "@trpc/server/adapters/ws"
|
||||
import { Context, Effect, Layer, Runtime } from "effect"
|
||||
import { createTRCPErrorMapper } from "./createTRCPErrorMapper"
|
||||
import { TRPCContextTransactionEnum, type TRPCContext, type TRPCContextTransaction } from "./TRPCContext"
|
||||
|
||||
|
||||
export const Identifier = "@thilalib/TRPC/TRPCContextCreator"
|
||||
|
||||
export interface TRPCContextCreator<R> extends Context.Tag<typeof Identifier, TRPCContextCreatorService<R>> {}
|
||||
|
||||
export interface TRPCContextCreatorService<R> {
|
||||
readonly createContext: (transaction: TRPCContextTransaction) => TRPCContext<R>
|
||||
readonly createExpressContext: (context: CreateExpressContextOptions) => TRPCContext<R>
|
||||
readonly createWebSocketContext: (context: CreateWSSContextFnOptions) => TRPCContext<R>
|
||||
}
|
||||
|
||||
export const TRPCUnknownContextCreator = Context.GenericTag<typeof Identifier, TRPCContextCreatorService<unknown>>(Identifier)
|
||||
|
||||
|
||||
export const make = <R>() => {
|
||||
const TRPCContextCreator = Context.GenericTag<typeof Identifier, TRPCContextCreatorService<R>>(Identifier)
|
||||
|
||||
const TRPCContextCreatorLive = Layer.effect(TRPCContextCreator, Effect.gen(function*() {
|
||||
const runtime = yield* Effect.runtime<R>()
|
||||
const mapErrors = yield* createTRCPErrorMapper
|
||||
|
||||
const run = <A, E>(
|
||||
effect: Effect.Effect<A, E, R>,
|
||||
options?: { readonly signal?: AbortSignal },
|
||||
) => Runtime.runPromise(runtime)(
|
||||
effect.pipe(mapErrors),
|
||||
options,
|
||||
)
|
||||
|
||||
const fork = <A, E>(
|
||||
effect: Effect.Effect<A, E, R>,
|
||||
options?: Runtime.RunForkOptions,
|
||||
) => Runtime.runFork(runtime)(
|
||||
effect.pipe(mapErrors),
|
||||
options,
|
||||
)
|
||||
|
||||
const createContext = (transaction: TRPCContextTransaction) => ({
|
||||
runtime,
|
||||
run,
|
||||
fork,
|
||||
transaction,
|
||||
})
|
||||
const createExpressContext = (context: CreateExpressContextOptions) => createContext(TRPCContextTransactionEnum.Express(context))
|
||||
const createWebSocketContext = (context: CreateWSSContextFnOptions) => createContext(TRPCContextTransactionEnum.WebSocket(context))
|
||||
|
||||
return { createContext, createExpressContext, createWebSocketContext }
|
||||
}))
|
||||
|
||||
return { TRPCContextCreator, TRPCContextCreatorLive } as const
|
||||
}
|
||||
27
src/TRPC/TRPCExpressRoute.ts
Normal file
27
src/TRPC/TRPCExpressRoute.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
import { Config, Effect, Layer } from "effect"
|
||||
import { ExpressApp } from "../Express"
|
||||
import { ImportError } from "../ImportError"
|
||||
import { TRPCUnknownContextCreator } from "./TRPCContextCreator"
|
||||
import { TRPCAnyRouter } from "./TRPCRouter"
|
||||
|
||||
|
||||
const importTRPCServerExpressAdapter = Effect.tryPromise({
|
||||
try: () => import("@trpc/server/adapters/express"),
|
||||
catch: cause => new ImportError({ path: "@trpc/server/adapters/express", cause }),
|
||||
})
|
||||
|
||||
export const TRPCExpressRouteLive = (
|
||||
config: {
|
||||
readonly root: Config.Config<string>
|
||||
}
|
||||
) => Layer.effectDiscard(Effect.gen(function*() {
|
||||
const { createExpressMiddleware } = yield* importTRPCServerExpressAdapter
|
||||
const app = yield* ExpressApp.ExpressApp
|
||||
|
||||
app.use(yield* config.root,
|
||||
createExpressMiddleware({
|
||||
router: yield* TRPCAnyRouter,
|
||||
createContext: (yield* TRPCUnknownContextCreator).createExpressContext,
|
||||
})
|
||||
)
|
||||
}))
|
||||
18
src/TRPC/TRPCRouter.ts
Normal file
18
src/TRPC/TRPCRouter.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
import type { AnyRouter } from "@trpc/server"
|
||||
import { Context, Effect, Layer } from "effect"
|
||||
|
||||
|
||||
export const Identifier = "@thilalib/TRPC/TRPCRouter"
|
||||
export interface TRPCRouter<T extends AnyRouter> extends Context.Tag<typeof Identifier, T> {}
|
||||
|
||||
export const TRPCAnyRouter = Context.GenericTag<typeof Identifier, AnyRouter>(Identifier)
|
||||
|
||||
|
||||
export const make = <A extends AnyRouter, E, R>(
|
||||
router: Effect.Effect<A, E, R>
|
||||
) => {
|
||||
const TRPCRouter = Context.GenericTag<typeof Identifier, A>(Identifier)
|
||||
const TRPCRouterLive = Layer.effect(TRPCRouter, router)
|
||||
|
||||
return { TRPCRouter, TRPCRouterLive } as const
|
||||
}
|
||||
66
src/TRPC/TRPCWebSocketServer.ts
Normal file
66
src/TRPC/TRPCWebSocketServer.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import type { applyWSSHandler } from "@trpc/server/adapters/ws"
|
||||
import { Config, Context, Effect, Layer } from "effect"
|
||||
import type ws from "ws"
|
||||
import { ExpressNodeHTTPServer } from "../Express"
|
||||
import { ImportError } from "../ImportError"
|
||||
import { TRPCUnknownContextCreator } from "./TRPCContextCreator"
|
||||
import { TRPCAnyRouter } from "./TRPCRouter"
|
||||
|
||||
|
||||
export class TRPCWebSocketServer extends Context.Tag("@thilalib/TRPC/TRPCWebSocketServer")<TRPCWebSocketServer, TRPCWebSocketServerService>() {}
|
||||
|
||||
export interface TRPCWebSocketServerService {
|
||||
wss: ws.Server
|
||||
handler: ReturnType<typeof applyWSSHandler>
|
||||
}
|
||||
|
||||
|
||||
const importWS = Effect.tryPromise({
|
||||
try: () => import("ws"),
|
||||
catch: cause => new ImportError({ path: "ws", cause }),
|
||||
})
|
||||
|
||||
const importTRPCServerWSAdapter = Effect.tryPromise({
|
||||
try: () => import("@trpc/server/adapters/ws"),
|
||||
catch: cause => new ImportError({ path: "@trpc/server/adapters/ws", cause }),
|
||||
})
|
||||
|
||||
export const TRPCWebSocketServerLive = (
|
||||
config: {
|
||||
readonly host: Config.Config<string>
|
||||
}
|
||||
) => Layer.effect(TRPCWebSocketServer, Effect.gen(function*() {
|
||||
const { WebSocketServer } = yield* importWS
|
||||
const { applyWSSHandler } = yield* importTRPCServerWSAdapter
|
||||
|
||||
const host = yield* config.host
|
||||
|
||||
return yield* Effect.acquireRelease(
|
||||
Effect.gen(function*() {
|
||||
yield* Effect.logInfo(`WebSocket server started on ${ host }`)
|
||||
|
||||
const wss = new WebSocketServer({
|
||||
server: yield* ExpressNodeHTTPServer.ExpressNodeHTTPServer,
|
||||
host,
|
||||
})
|
||||
|
||||
return {
|
||||
wss,
|
||||
handler: applyWSSHandler({
|
||||
wss,
|
||||
router: yield* TRPCAnyRouter,
|
||||
createContext: (yield* TRPCUnknownContextCreator).createWebSocketContext,
|
||||
}),
|
||||
}
|
||||
}),
|
||||
|
||||
({ wss, handler }) => Effect.gen(function*() {
|
||||
yield* Effect.logInfo(`WebSocket server on ${ host } is stopping. Waiting for existing connections to end...`)
|
||||
|
||||
handler.broadcastReconnectNotification()
|
||||
yield* Effect.async(resume => {
|
||||
wss.close(() => resume(Effect.logInfo("WebSocket server closed")))
|
||||
})
|
||||
}),
|
||||
)
|
||||
}))
|
||||
63
src/TRPC/createTRCPErrorMapper.ts
Normal file
63
src/TRPC/createTRCPErrorMapper.ts
Normal file
@@ -0,0 +1,63 @@
|
||||
import { Effect, type Cause } from "effect"
|
||||
import { importTRPCServer } from "./importTRPCServer"
|
||||
|
||||
|
||||
export const createTRCPErrorMapper = importTRPCServer.pipe(Effect.map(({ TRPCError }) =>
|
||||
<A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.sandbox(effect).pipe(
|
||||
Effect.catchTags({
|
||||
Empty: cause => Effect.fail(
|
||||
new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
cause: new TRPCErrorCause(cause),
|
||||
})
|
||||
),
|
||||
|
||||
Fail: cause => Effect.fail(
|
||||
cause.error instanceof TRPCError
|
||||
? cause.error
|
||||
: new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
cause: new TRPCErrorCause(cause),
|
||||
})
|
||||
),
|
||||
|
||||
Die: cause => Effect.fail(
|
||||
cause.defect instanceof TRPCError
|
||||
? cause.defect
|
||||
: new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
cause: new TRPCErrorCause(cause),
|
||||
})
|
||||
),
|
||||
|
||||
Interrupt: cause => Effect.fail(
|
||||
new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
cause: new TRPCErrorCause(cause),
|
||||
})
|
||||
),
|
||||
|
||||
Sequential: cause => Effect.fail(
|
||||
new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
cause: new TRPCErrorCause(cause),
|
||||
})
|
||||
),
|
||||
|
||||
Parallel: cause => Effect.fail(
|
||||
new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
cause: new TRPCErrorCause(cause),
|
||||
})
|
||||
),
|
||||
}),
|
||||
|
||||
Effect.tapError(Effect.logError),
|
||||
)
|
||||
))
|
||||
|
||||
class TRPCErrorCause<E> extends Error {
|
||||
constructor(readonly cause: Cause.Cause<E>) {
|
||||
super()
|
||||
}
|
||||
}
|
||||
51
src/TRPC/example.ts
Normal file
51
src/TRPC/example.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import { Config, Effect, Layer } from "effect"
|
||||
import * as TRPC from "."
|
||||
import { Express, JSONWebToken } from ".."
|
||||
|
||||
|
||||
// Context available to the router procedures
|
||||
type Services =
|
||||
| JSONWebToken.JSONWebToken
|
||||
|
||||
const ServicesLive = Layer.empty.pipe(
|
||||
Layer.provideMerge(JSONWebToken.JSONWebTokenLive)
|
||||
)
|
||||
|
||||
|
||||
const { TRPCContextCreator, TRPCContextCreatorLive } = TRPC.TRPCContextCreator.make<Services>()
|
||||
const { TRPCBuilder, TRPCBuilderLive } = TRPC.TRPCBuilder.make<Services>()
|
||||
|
||||
|
||||
const router = TRPCBuilder.pipe(Effect.map(t => t.router({
|
||||
ping: t.procedure.query(({ ctx }) => ctx.run(
|
||||
Effect.succeed("pong")
|
||||
)),
|
||||
})))
|
||||
const { TRPCRouter, TRPCRouterLive } = TRPC.TRPCRouter.make(router)
|
||||
|
||||
|
||||
const ServerLive = Layer.empty.pipe(
|
||||
Layer.provideMerge(TRPC.TRPCExpressRoute.TRPCExpressRouteLive({
|
||||
root: Config.succeed("/rpc")
|
||||
})),
|
||||
Layer.provideMerge(TRPC.TRPCWebSocketServer.TRPCWebSocketServerLive({
|
||||
host: Config.succeed("/rpc")
|
||||
})),
|
||||
|
||||
Layer.provideMerge(TRPCRouterLive),
|
||||
Layer.provideMerge(TRPCBuilderLive),
|
||||
Layer.provideMerge(TRPCContextCreatorLive),
|
||||
|
||||
Layer.provideMerge(Express.ExpressNodeHTTPServer.ExpressNodeHTTPServerLive({
|
||||
port: Config.succeed(3000)
|
||||
})),
|
||||
Layer.provideMerge(Express.ExpressApp.ExpressAppLive())
|
||||
)
|
||||
|
||||
await Effect.gen(function*() {
|
||||
return yield* Layer.launch(ServerLive)
|
||||
}).pipe(
|
||||
Effect.provide(ServicesLive),
|
||||
Effect.scoped,
|
||||
Effect.runPromise,
|
||||
)
|
||||
11
src/TRPC/importTRPCServer.ts
Normal file
11
src/TRPC/importTRPCServer.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { Effect } from "effect"
|
||||
import { ImportError } from "../ImportError"
|
||||
|
||||
|
||||
export const importTRPCServer: Effect.Effect<
|
||||
typeof import("@trpc/server"),
|
||||
ImportError
|
||||
> = Effect.tryPromise({
|
||||
try: () => import("@trpc/server"),
|
||||
catch: cause => new ImportError({ path: "@trpc/server", cause }),
|
||||
})
|
||||
7
src/TRPC/index.ts
Normal file
7
src/TRPC/index.ts
Normal file
@@ -0,0 +1,7 @@
|
||||
export * from "./middlewares"
|
||||
export * as TRPCBuilder from "./TRPCBuilder"
|
||||
export * as TRPCContext from "./TRPCContext"
|
||||
export * as TRPCContextCreator from "./TRPCContextCreator"
|
||||
export * as TRPCExpressRoute from "./TRPCExpressRoute"
|
||||
export * as TRPCRouter from "./TRPCRouter"
|
||||
export * as TRPCWebSocketServer from "./TRPCWebSocketServer"
|
||||
44
src/TRPC/middlewares.ts
Normal file
44
src/TRPC/middlewares.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import { Effect, Match } from "effect"
|
||||
import type { TRPCContextTransaction } from "./TRPCContext"
|
||||
import { importTRPCServer } from "./importTRPCServer"
|
||||
|
||||
|
||||
export const ExpressOnly = importTRPCServer.pipe(Effect.map(({
|
||||
experimental_standaloneMiddleware,
|
||||
TRPCError,
|
||||
}) => experimental_standaloneMiddleware<{
|
||||
ctx: { readonly transaction: TRPCContextTransaction }
|
||||
}>().create(opts =>
|
||||
Match.value(opts.ctx.transaction).pipe(
|
||||
Match.tag("Express", transaction =>
|
||||
opts.next({ ctx: { transaction } })
|
||||
),
|
||||
|
||||
Match.orElse(() => {
|
||||
throw new TRPCError({
|
||||
code: "BAD_REQUEST",
|
||||
message: "Only Express transport is supported by this procedure",
|
||||
})
|
||||
}),
|
||||
)
|
||||
)))
|
||||
|
||||
export const WebSocketOnly = importTRPCServer.pipe(Effect.map(({
|
||||
experimental_standaloneMiddleware,
|
||||
TRPCError,
|
||||
}) => experimental_standaloneMiddleware<{
|
||||
ctx: { readonly transaction: TRPCContextTransaction }
|
||||
}>().create(opts =>
|
||||
Match.value(opts.ctx.transaction).pipe(
|
||||
Match.tag("WebSocket", transaction =>
|
||||
opts.next({ ctx: { transaction } })
|
||||
),
|
||||
|
||||
Match.orElse(() => {
|
||||
throw new TRPCError({
|
||||
code: "BAD_REQUEST",
|
||||
message: "Only WebSocket transport is supported by this procedure",
|
||||
})
|
||||
}),
|
||||
)
|
||||
)))
|
||||
Reference in New Issue
Block a user