Changes procedure
This commit is contained in:
@@ -1,12 +1,13 @@
|
|||||||
import { Schema as S } from "@effect/schema"
|
import { Schema as S } from "@effect/schema"
|
||||||
import { JsonifiableTodo } from "@todo-tests/common/data"
|
import { JsonifiableTodo } from "@todo-tests/common/data"
|
||||||
import { Chunk, Effect } from "effect"
|
import { observable } from "@trpc/server/observable"
|
||||||
|
import { Effect, Fiber, Runtime, Stream, flow } from "effect"
|
||||||
import { TodoRepository } from "../../TodoRepository"
|
import { TodoRepository } from "../../TodoRepository"
|
||||||
import { TRPCBuilder } from "../../trpc/TRPCBuilder"
|
import { TRPCBuilder } from "../../trpc/TRPCBuilder"
|
||||||
import { RPCProcedureBuilder } from "../procedures/RPCProcedureBuilder"
|
import { RPCProcedureBuilder } from "../procedures/RPCProcedureBuilder"
|
||||||
|
|
||||||
|
|
||||||
const encodeTodos = S.encode(S.Array(JsonifiableTodo))
|
const encodeTodos = S.encode(S.Chunk(JsonifiableTodo))
|
||||||
const encodeOptionalTodo = S.encode(S.OptionFromNullOr(JsonifiableTodo))
|
const encodeOptionalTodo = S.encode(S.OptionFromNullOr(JsonifiableTodo))
|
||||||
|
|
||||||
|
|
||||||
@@ -18,12 +19,31 @@ export const todosRouter = Effect.gen(function*() {
|
|||||||
all: procedure
|
all: procedure
|
||||||
.query(({ ctx }) => ctx.run(Effect.gen(function*() {
|
.query(({ ctx }) => ctx.run(Effect.gen(function*() {
|
||||||
const todos = yield* TodoRepository
|
const todos = yield* TodoRepository
|
||||||
|
return yield* encodeTodos(yield* todos.todos.get)
|
||||||
return yield* encodeTodos(
|
|
||||||
Chunk.toReadonlyArray(yield* todos.todos.get)
|
|
||||||
)
|
|
||||||
}))),
|
}))),
|
||||||
|
|
||||||
|
changes: procedure
|
||||||
|
.subscription(({ ctx }) =>
|
||||||
|
observable<readonly (typeof JsonifiableTodo.Encoded)[]>(emit => {
|
||||||
|
const watcher = Runtime.runFork(ctx.runtime)(Effect.gen(function*() {
|
||||||
|
const todos = yield* TodoRepository
|
||||||
|
|
||||||
|
yield* todos.todos.changes.pipe(
|
||||||
|
Stream.runForEach(flow(
|
||||||
|
encodeTodos,
|
||||||
|
Effect.flatMap(values =>
|
||||||
|
Effect.sync(() => emit.next(values))
|
||||||
|
),
|
||||||
|
))
|
||||||
|
)
|
||||||
|
}))
|
||||||
|
|
||||||
|
return () => Runtime.runSync(ctx.runtime)(
|
||||||
|
Fiber.interruptFork(watcher)
|
||||||
|
)
|
||||||
|
})
|
||||||
|
),
|
||||||
|
|
||||||
getByID: procedure
|
getByID: procedure
|
||||||
.input(S.decodeUnknownPromise(S.String))
|
.input(S.decodeUnknownPromise(S.String))
|
||||||
.query(({ ctx, input }) => ctx.run(Effect.gen(function*() {
|
.query(({ ctx, input }) => ctx.run(Effect.gen(function*() {
|
||||||
|
|||||||
Reference in New Issue
Block a user