From 2b6b36713e2f86e32d3358f10a844dcaf255d679 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Thu, 20 Mar 2025 04:31:38 +0100 Subject: [PATCH] MutationRunner work --- .../extension-query/src/MutationRunner.ts | 79 +++++++++++-------- 1 file changed, 47 insertions(+), 32 deletions(-) diff --git a/packages/extension-query/src/MutationRunner.ts b/packages/extension-query/src/MutationRunner.ts index 83045c3..0370ab4 100644 --- a/packages/extension-query/src/MutationRunner.ts +++ b/packages/extension-query/src/MutationRunner.ts @@ -1,6 +1,8 @@ import * as AsyncData from "@typed/async-data" import { type Context, Effect, type Fiber, Queue, Ref, Stream, SubscriptionRef } from "effect" import type * as QueryClient from "./QueryClient.js" +import * as QueryProgress from "./QueryProgress.js" +import * as QueryState from "./QueryState.js" export interface MutationRunner { @@ -31,50 +33,63 @@ export const make = ( R | QueryClient.TagClassShape | EH > => Effect.gen(function*() { const context = yield* Effect.context | EH>() - const stateRef = yield* SubscriptionRef.make(AsyncData.noData>()) + const globalStateRef = yield* SubscriptionRef.make(AsyncData.noData>()) - const run = ( - key: K, - setState: (value: AsyncData.AsyncData>) => Effect.Effect, - ) => QueryClient.pipe( - Effect.flatMap(client => client.ErrorHandler), - Effect.flatMap(errorHandler => setState(AsyncData.loading()).pipe( - Effect.andThen(mutation(key)), - errorHandler.handle, - Effect.matchCauseEffect({ - onSuccess: v => Effect.succeed(AsyncData.success(v)).pipe( - Effect.tap(setState) - ), - onFailure: c => Effect.succeed(AsyncData.failure(c)).pipe( - Effect.tap(setState) - ), - }), + const run = (key: K) => Effect.all([ + QueryClient, + QueryState.makeTag>(), + ]).pipe( + Effect.flatMap(([client, state]) => client.ErrorHandler.pipe( + Effect.flatMap(errorHandler => state.set(AsyncData.loading()).pipe( + Effect.andThen(mutation(key)), + errorHandler.handle, + Effect.matchCauseEffect({ + onSuccess: v => Effect.succeed(AsyncData.success(v)).pipe( + Effect.tap(state.set) + ), + onFailure: c => Effect.succeed(AsyncData.failure(c)).pipe( + Effect.tap(state.set) + ), + }), + )) )), Effect.provide(context), + Effect.provide(QueryProgress.QueryProgress.Live), ) - const mutate = (...key: K) => run(key, value => Ref.set(stateRef, value)) + const mutate = (...key: K) => run(key).pipe( + Effect.provide(QueryState.layer( + QueryState.makeTag>(), + globalStateRef, + value => Ref.set(globalStateRef, value), + )) + ) - const forkMutate = (...key: K) => Queue.unbounded>>().pipe( - Effect.flatMap(stateQueue => - run( - key, - value => Ref.set(stateRef, value).pipe( - Effect.andThen(Queue.offer(stateQueue, value)) + const forkMutate = (...key: K) => Effect.gen(function*() { + const stateRef = yield* Ref.make(AsyncData.noData>()) + const stateQueue = yield* Queue.unbounded>>() + + const fiber = yield* Effect.forkDaemon(run(key).pipe( + Effect.tap(() => Queue.shutdown(stateQueue)), + + Effect.provide(QueryState.layer( + QueryState.makeTag>(), + stateRef, + value => Queue.offer(stateQueue, value).pipe( + Effect.andThen(Ref.set(stateRef, value)), + Effect.andThen(Ref.set(globalStateRef, value)), ), - ).pipe( - Effect.tap(() => Queue.shutdown(stateQueue)), - Effect.forkDaemon, - Effect.map(fiber => [fiber, Stream.fromQueue(stateQueue)] as const) - ) - ) - ) + )), + )) + + return [fiber, Stream.fromQueue(stateQueue)] as const + }) return { context, - stateRef, + stateRef: globalStateRef, mutate, forkMutate,