From 1090a685d21eb32aebb9a849fd466d226ff3a7b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Fri, 31 Oct 2025 15:33:59 +0100 Subject: [PATCH] Fix --- packages/effect-fc/src/Result.ts | 161 ++++++++++++++----------------- 1 file changed, 75 insertions(+), 86 deletions(-) diff --git a/packages/effect-fc/src/Result.ts b/packages/effect-fc/src/Result.ts index 40c6bef..7818d28 100644 --- a/packages/effect-fc/src/Result.ts +++ b/packages/effect-fc/src/Result.ts @@ -1,4 +1,4 @@ -import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, pipe, Queue, Ref, Scope } from "effect" +import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, pipe, Queue, Ref, type Scope, type Subscribable, SubscriptionRef } from "effect" export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result") @@ -142,38 +142,47 @@ export const toExit = ( } +export interface State { + readonly get: Effect.Effect> + readonly set: (v: Result) => Effect.Effect +} + +export const State = (): Context.Tag, State> => Context.GenericTag("@effect-fc/Result/State") + export interface Progress

{ readonly update: ( f: (previous: P) => Effect.Effect - ) => Effect.Effect + ) => Effect.Effect } -export class PreviousResultNotRunningOrRefreshing extends Data.TaggedError("@effect-fc/Result/PreviousResultNotRunningOrRefreshing")<{ +export class PreviousResultNotRunningNorRefreshing extends Data.TaggedError("@effect-fc/Result/PreviousResultNotRunningNorRefreshing")<{ readonly previous: Result }> {} export const Progress =

(): Context.Tag, Progress

> => Context.GenericTag("@effect-fc/Result/Progress") -export const makeProgressLayer = ( - queue: Queue.Enqueue>, - ref: Ref.Ref>, -): Layer.Layer> => Layer.sync(Progress

(), () => ({ - update: (f: (previous: P) => Effect.Effect) => Effect.Do.pipe( - Effect.bind("previous", () => Effect.andThen( - ref, - previous => isRunning(previous) || isRefreshing(previous) - ? Effect.succeed(previous) - : Effect.fail(new PreviousResultNotRunningOrRefreshing({ previous })), - )), - Effect.bind("progress", ({ previous }) => f(previous.progress)), - Effect.let("next", ({ previous, progress }) => Object.setPrototypeOf( - Object.assign({}, previous, { progress }), - Object.getPrototypeOf(previous), - )), - Effect.tap(({ next }) => Ref.set(ref, next)), - Effect.tap(({ next }) => Queue.offer(queue, next)), - Effect.asVoid, - ), +export const makeProgressLayer = (): Layer.Layer< + Progress

, + never, + State +> => Layer.effect(Progress

(), Effect.gen(function*() { + const state = yield* State() + + return { + update: (f: (previous: P) => Effect.Effect) => Effect.Do.pipe( + Effect.bind("previous", () => Effect.andThen(state.get, previous => + isRunning(previous) || isRefreshing(previous) + ? Effect.succeed(previous) + : Effect.fail(new PreviousResultNotRunningNorRefreshing({ previous })), + )), + Effect.bind("progress", ({ previous }) => f(previous.progress)), + Effect.let("next", ({ previous, progress }) => Object.setPrototypeOf( + Object.assign({}, previous, { progress }), + Object.getPrototypeOf(previous), + )), + Effect.andThen(({ next }) => state.set(next)), + ), + } })) @@ -190,75 +199,55 @@ export const forkEffect = ( effect: Effect.Effect>>, options?: forkEffect.Options

, ): Effect.Effect< - Queue.Dequeue>, + Subscribable.Subscribable>, never, forkEffect.OutputContext -> => Effect.Do.pipe( - Effect.bind("scope", () => Scope.Scope), - Effect.bind("queue", () => Queue.unbounded>()), - Effect.bind("ref", () => Ref.make>(initial())), - Effect.tap(({ queue, ref }) => Effect.andThen(ref, v => Queue.offer(queue, v))), - Effect.tap(({ scope, queue, ref }) => Effect.forkScoped( - Effect.addFinalizer(() => Queue.shutdown(queue)).pipe( - Effect.andThen(Effect.succeed(running(options?.initialProgress)).pipe( - Effect.tap(v => Ref.set(ref, v)), - Effect.tap(v => Queue.offer(queue, v)), - )), - Effect.andThen(Effect.provideService(effect, Scope.Scope, scope)), - Effect.exit, - Effect.andThen(exit => Effect.succeed(fromExit(exit)).pipe( - Effect.tap(v => Ref.set(ref, v)), - Effect.tap(v => Queue.offer(queue, v)), - )), - Effect.scoped, - Effect.provide(makeProgressLayer(queue, ref)), - ) +> => Effect.tap( + SubscriptionRef.make>(initial()), + ref => Effect.forkScoped(State().pipe( + Effect.andThen(state => state.set(running(options?.initialProgress)).pipe( + Effect.andThen(effect), + Effect.onExit(exit => state.set(fromExit(exit))), + )), + Effect.provide(Layer.empty.pipe( + Layer.provideMerge(makeProgressLayer()), + Layer.provideMerge(Layer.succeed(State(), { + get: ref, + set: v => Ref.set(ref, v), + })), + )), )), - Effect.map(({ queue }) => queue), -) as Effect.Effect>, never, Scope.Scope> +) as Effect.Effect>, never, Scope.Scope> -export namespace forkEffectScoped { - export type InputContext = (R extends Progress - ? [X] extends [P] - ? R - : never - : R - ) - - export interface Options

{ - readonly initialProgress?: P - } - - export type OutputContext = Scope.Scope | Exclude | Progress> +export namespace forkEffectDequeue { + export type InputContext = forkEffect.InputContext + export type OutputContext = forkEffect.OutputContext + export interface Options

extends forkEffect.Options

{} } -export const forkEffectScoped = ( - effect: Effect.Effect>>, - options?: forkEffectScoped.Options

, +export const forkEffectDequeue = ( + effect: Effect.Effect>>, + options?: forkEffectDequeue.Options

, ): Effect.Effect< Queue.Dequeue>, never, - forkEffectScoped.OutputContext -> => Effect.Do.pipe( - Effect.bind("scope", () => Scope.Scope), - Effect.bind("queue", () => Queue.unbounded>()), - Effect.bind("ref", () => Ref.make>(initial())), - Effect.tap(({ queue, ref }) => Effect.andThen(ref, v => Queue.offer(queue, v))), - Effect.tap(({ scope, queue, ref }) => Effect.forkScoped( - Effect.addFinalizer(() => Queue.shutdown(queue)).pipe( - Effect.andThen(Effect.succeed(running(options?.initialProgress)).pipe( - Effect.tap(v => Ref.set(ref, v)), - Effect.tap(v => Queue.offer(queue, v)), - )), - Effect.andThen(Effect.provideService(effect, Scope.Scope, scope)), - Effect.exit, - Effect.andThen(exit => Effect.succeed(fromExit(exit)).pipe( - Effect.tap(v => Ref.set(ref, v)), - Effect.tap(v => Queue.offer(queue, v)), - )), - Effect.scoped, - Effect.provide(makeProgressLayer(queue, ref)), - ) - )), - Effect.map(({ queue }) => queue), -) as Effect.Effect>, never, Scope.Scope> + forkEffectDequeue.OutputContext +> => Effect.all([ + Ref.make>(initial()), + Queue.unbounded>(), +]).pipe( + Effect.tap(([ref, queue]) => Effect.forkScoped(State().pipe( + Effect.andThen(state => state.set(running(options?.initialProgress)).pipe( + Effect.andThen(effect), + Effect.onExit(exit => Effect.andThen(state.set(fromExit(exit)), Queue.shutdown(queue))), + )), + Effect.provide(Layer.empty.pipe( + Layer.provideMerge(makeProgressLayer()), + Layer.provideMerge(Layer.succeed(State(), { + get: ref, + set: v => Effect.andThen(Ref.set(ref, v), Queue.offer(queue, v)) + })), + )), + ))), + Effect.map(([, queue]) => queue), +) as Effect.Effect>, never, Scope.Scope>