import type { applyWSSHandler } from "@trpc/server/adapters/ws" import { Config, Context, Effect, Layer } from "effect" import type ws from "ws" import { ExpressNodeHTTPServer } from "../Express" import { ImportError } from "../ImportError" import { TRPCUnknownContextCreator } from "./TRPCContextCreator" import { TRPCAnyRouter } from "./TRPCRouter" export class TRPCWebSocketServer extends Context.Tag("@thilalib/TRPC/TRPCWebSocketServer")() {} export interface TRPCWebSocketServerService { wss: ws.Server handler: ReturnType } const importWS = Effect.tryPromise({ try: () => import("ws"), catch: cause => new ImportError({ path: "ws", cause }), }) const importTRPCServerWSAdapter = Effect.tryPromise({ try: () => import("@trpc/server/adapters/ws"), catch: cause => new ImportError({ path: "@trpc/server/adapters/ws", cause }), }) export const TRPCWebSocketServerLive = ( config: { readonly host: Config.Config } ) => Layer.effect(TRPCWebSocketServer, Effect.gen(function*() { const { WebSocketServer } = yield* importWS const { applyWSSHandler } = yield* importTRPCServerWSAdapter const host = yield* config.host return yield* Effect.acquireRelease( Effect.gen(function*() { yield* Effect.logInfo(`WebSocket server started on ${ host }`) const wss = new WebSocketServer({ server: yield* ExpressNodeHTTPServer.ExpressNodeHTTPServer, host, }) return { wss, handler: applyWSSHandler({ wss, router: yield* TRPCAnyRouter, createContext: (yield* TRPCUnknownContextCreator).createWebSocketContext, }), } }), ({ wss, handler }) => Effect.gen(function*() { yield* Effect.logInfo(`WebSocket server on ${ host } is stopping. Waiting for existing connections to end...`) handler.broadcastReconnectNotification() yield* Effect.async(resume => { wss.close(() => resume(Effect.logInfo("WebSocket server closed"))) }) }), ) }))