RPCWebSocketServer

This commit is contained in:
Julien Valverdé
2024-07-17 04:51:41 +02:00
parent 552483d915
commit ec5cc51f82
3 changed files with 50 additions and 5 deletions

View File

@@ -9,7 +9,7 @@ import { WebSocketServer } from "./http/WebSocketServer"
import { RPCPlaygroundRoute } from "./rpc/RPCPlaygroundRoute"
import { RPCRoute } from "./rpc/RPCRoute"
import { RPCRouter } from "./rpc/RPCRouter"
import { RPCWebSocketHandler } from "./rpc/RPCWebSocketHandler"
import { RPCWebSocketServer } from "./rpc/RPCWebSocketServer"
import { RPCProcedureBuilder } from "./rpc/procedures/RPCProcedureBuilder"
import { TodoRepository } from "./todo/TodoRepository"
import { TRPCBuilder } from "./trpc/TRPCBuilder"
@@ -19,14 +19,13 @@ import { TRPCContextCreator } from "./trpc/TRPCContextCreator"
const ServerDev = Layer.empty.pipe(
Layer.provideMerge(RPCRoute.Live),
Layer.provideMerge(RPCPlaygroundRoute.Dev),
Layer.provideMerge(RPCWebSocketHandler.Live),
Layer.provideMerge(RPCWebSocketServer.Live),
Layer.provideMerge(RPCRouter.Live),
Layer.provideMerge(RPCProcedureBuilder.Live),
Layer.provideMerge(TRPCBuilder.Live),
Layer.provideMerge(TRPCContextCreator.Live),
Layer.provideMerge(WebSocketServer.Live),
Layer.provideMerge(ExpressHTTPServer.Live),
Layer.provideMerge(ExpressApp.Live),
)
@@ -34,7 +33,7 @@ const ServerDev = Layer.empty.pipe(
const ServerLive = Layer.empty.pipe(
Layer.provideMerge(RPCRoute.Live),
Layer.provideMerge(RPCPlaygroundRoute.Live),
Layer.provideMerge(RPCWebSocketHandler.Live),
Layer.provideMerge(RPCWebSocketServer.Live),
Layer.provideMerge(RPCRouter.Live),
Layer.provideMerge(RPCProcedureBuilder.Live),

View File

@@ -0,0 +1,46 @@
import { applyWSSHandler } from "@trpc/server/adapters/ws"
import { Context, Effect, Layer } from "effect"
import ws from "ws"
import { ExpressHTTPServer } from "../http/ExpressHTTPServer"
import { ServerConfig } from "../ServerConfig"
import { TRPCContextCreator } from "../trpc/TRPCContextCreator"
import { RPCRouter } from "./RPCRouter"
export class RPCWebSocketServer extends Context.Tag("RPCWebSocketServer")<RPCWebSocketServer, {
wss: ws.Server
handler: ReturnType<typeof applyWSSHandler>
}>() {}
export module RPCWebSocketServer {
export const Live = Layer.effect(RPCWebSocketServer, ServerConfig.rpcHTTPRoot.pipe(
Effect.flatMap(rpcHTTPRoot => Effect.acquireRelease(
Effect.gen(function*() {
yield* Effect.logInfo(`WebSocket server started on ${ rpcHTTPRoot }`)
const wss = new ws.WebSocketServer({
server: yield* ExpressHTTPServer,
host: rpcHTTPRoot,
})
return {
wss,
handler: applyWSSHandler({
wss,
router: yield* RPCRouter,
createContext: (yield* TRPCContextCreator).createWebSocketContext,
}),
}
}),
({ wss, handler }) => Effect.gen(function*() {
yield* Effect.logInfo(`WebSocket server on ${ rpcHTTPRoot } is stopping. Waiting for existing connections to end...`)
handler.broadcastReconnectNotification()
yield* Effect.async(resume => {
wss.close(() => resume(Effect.logInfo("WebSocket server closed")))
})
}),
))
))
}