diff --git a/packages/extension-query/src/MutationRunner.ts b/packages/extension-query/src/MutationRunner.ts index 323c344..25334d9 100644 --- a/packages/extension-query/src/MutationRunner.ts +++ b/packages/extension-query/src/MutationRunner.ts @@ -1,5 +1,5 @@ import * as AsyncData from "@typed/async-data" -import { type Context, Effect, type Fiber, Ref, SubscriptionRef } from "effect" +import { type Context, Effect, type Fiber, Queue, Ref, Stream, SubscriptionRef } from "effect" import type * as QueryClient from "./QueryClient.js" @@ -7,8 +7,11 @@ export interface MutationRunner { readonly context: Context.Context readonly stateRef: SubscriptionRef.SubscriptionRef> - readonly mutate: (...key: K) => Effect.Effect - readonly forkMutate: (...key: K) => Effect.Effect> + readonly mutate: (...key: K) => Effect.Effect | AsyncData.Failure> + readonly forkMutate: (...key: K) => Effect.Effect | AsyncData.Failure>, + state: Stream.Stream>, + ]> } @@ -30,19 +33,41 @@ export const make = ( const context = yield* Effect.context | EH>() const stateRef = yield* SubscriptionRef.make(AsyncData.noData>()) - const mutate = (...key: K) => QueryClient.pipe( + const run = ( + key: K, + setState: (value: AsyncData.AsyncData>) => Effect.Effect, + ) => QueryClient.pipe( Effect.flatMap(client => client.ErrorHandler), - Effect.flatMap(errorHandler => Ref.set(stateRef, AsyncData.loading()).pipe( + Effect.flatMap(errorHandler => setState(AsyncData.loading()).pipe( Effect.andThen(mutation(key)), errorHandler.handle, - Effect.tapErrorCause(c => Ref.set(stateRef, AsyncData.failure(c))), - Effect.tap(v => Ref.set(stateRef, AsyncData.success(v))), + Effect.matchCauseEffect({ + onSuccess: v => Effect.succeed(AsyncData.success(v)).pipe( + Effect.tap(setState) + ), + onFailure: c => Effect.succeed(AsyncData.failure(c)).pipe( + Effect.tap(setState) + ), + }), )), Effect.provide(context), ) - const forkMutate = (...key: K) => Effect.forkDaemon(mutate(...key)) + const mutate = (...key: K) => run(key, value => Ref.set(stateRef, value)) + + const forkMutate = (...key: K) => Queue.unbounded>>().pipe( + Effect.flatMap(stateQueue => + run(key, value => Queue.offer(stateQueue, value).pipe( + Effect.andThen(Ref.set(stateRef, value)) + )).pipe( + Effect.tap(() => Queue.shutdown(stateQueue)), + Effect.forkDaemon, + Effect.map(fiber => [fiber, Stream.fromQueue(stateQueue)] as const) + ) + ) + ) + return { context, diff --git a/packages/extension-query/src/MutationService.ts b/packages/extension-query/src/MutationService.ts index e366571..bdfb30a 100644 --- a/packages/extension-query/src/MutationService.ts +++ b/packages/extension-query/src/MutationService.ts @@ -1,11 +1,14 @@ import type * as AsyncData from "@typed/async-data" -import { Effect, type Fiber, type SubscriptionRef } from "effect" +import { Effect, type Fiber, type Stream, type SubscriptionRef } from "effect" export interface MutationService { readonly state: SubscriptionRef.SubscriptionRef> - readonly mutate: (...key: K) => Effect.Effect - readonly forkMutate: (...key: K) => Effect.Effect> + readonly mutate: (...key: K) => Effect.Effect | AsyncData.Failure> + readonly forkMutate: (...key: K) => Effect.Effect | AsyncData.Failure>, + state: Stream.Stream>, + ]> } export const Tag = (id: Id) => < diff --git a/packages/extension-query/src/QueryExtension.ts b/packages/extension-query/src/QueryExtension.ts index 0fa0b87..1be7476 100644 --- a/packages/extension-query/src/QueryExtension.ts +++ b/packages/extension-query/src/QueryExtension.ts @@ -32,8 +32,11 @@ export interface UseMutationProps { export interface UseMutationResult { readonly state: SubscriptionRef.SubscriptionRef> - readonly mutate: (...key: K) => Effect.Effect - readonly forkMutate: (...key: K) => Effect.Effect> + readonly mutate: (...key: K) => Effect.Effect | AsyncData.Failure> + readonly forkMutate: (...key: K) => Effect.Effect | AsyncData.Failure>, + state: Stream.Stream>, + ]> readonly layer: ( tag: Context.TagClass>