From ebe72de08881bb3f02e223cca69b4121bbd938d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Tue, 9 Jul 2024 04:13:56 +0200 Subject: [PATCH] Changes procedure --- packages/server/src/rpc/routers/todos.ts | 32 +++++++++++++++++++----- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/packages/server/src/rpc/routers/todos.ts b/packages/server/src/rpc/routers/todos.ts index 07b4a8f..4048ac4 100644 --- a/packages/server/src/rpc/routers/todos.ts +++ b/packages/server/src/rpc/routers/todos.ts @@ -1,12 +1,13 @@ import { Schema as S } from "@effect/schema" import { JsonifiableTodo } from "@todo-tests/common/data" -import { Chunk, Effect } from "effect" +import { observable } from "@trpc/server/observable" +import { Effect, Fiber, Runtime, Stream, flow } from "effect" import { TodoRepository } from "../../TodoRepository" import { TRPCBuilder } from "../../trpc/TRPCBuilder" import { RPCProcedureBuilder } from "../procedures/RPCProcedureBuilder" -const encodeTodos = S.encode(S.Array(JsonifiableTodo)) +const encodeTodos = S.encode(S.Chunk(JsonifiableTodo)) const encodeOptionalTodo = S.encode(S.OptionFromNullOr(JsonifiableTodo)) @@ -18,12 +19,31 @@ export const todosRouter = Effect.gen(function*() { all: procedure .query(({ ctx }) => ctx.run(Effect.gen(function*() { const todos = yield* TodoRepository - - return yield* encodeTodos( - Chunk.toReadonlyArray(yield* todos.todos.get) - ) + return yield* encodeTodos(yield* todos.todos.get) }))), + changes: procedure + .subscription(({ ctx }) => + observable(emit => { + const watcher = Runtime.runFork(ctx.runtime)(Effect.gen(function*() { + const todos = yield* TodoRepository + + yield* todos.todos.changes.pipe( + Stream.runForEach(flow( + encodeTodos, + Effect.flatMap(values => + Effect.sync(() => emit.next(values)) + ), + )) + ) + })) + + return () => Runtime.runSync(ctx.runtime)( + Fiber.interruptFork(watcher) + ) + }) + ), + getByID: procedure .input(S.decodeUnknownPromise(S.String)) .query(({ ctx, input }) => ctx.run(Effect.gen(function*() {