diff --git a/packages/server/src/http/WebSocketServer.ts b/packages/server/src/http/WebSocketServer.ts new file mode 100644 index 0000000..711b001 --- /dev/null +++ b/packages/server/src/http/WebSocketServer.ts @@ -0,0 +1,40 @@ +import { applyWSSHandler } from "@trpc/server/adapters/ws" +import { Context, Effect, Layer, Runtime } from "effect" +import ws from "ws" +import { ExpressHTTPServer } from "../http/ExpressHTTPServer" +import { TRPCContextCreator } from "../trpc/TRPCContextCreator" + + +export class WebSocketServer extends Context.Tag("RPCWebSocketServer") +}>() {} + +export module WebSocketServer { + export const Live = Layer.effect(WebSocketServer, Effect.acquireRelease( + Effect.gen(function*() { + const runSync = yield* Effect.runtime().pipe( + Effect.map(Runtime.runSync) + ) + + const wss = new ws.WebSocketServer({ server: yield* ExpressHTTPServer }, () => + runSync(Effect.logInfo(`WebSocket RPC server started`)) + ) + + const handler = applyWSSHandler({ + wss, + router: yield* RPCRouter, + createContext: yield* TRPCContextCreator, + }) + + return { wss, handler } + }), + + ({ wss, handler }) => Effect.gen(function*() { + yield* Effect.logInfo(`WebSocket server is closing. Waiting for existing connections to end...`) + yield* Effect.async(resume => { + wss.close(() => resume(Effect.logInfo(`WebSocket server closed`))) + }) + }), + )) +}