From 70dcdf8160791d6db6a54c348c5e700ed00ae32d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Mon, 17 Nov 2025 04:09:26 +0100 Subject: [PATCH] Refactor Result --- packages/effect-fc/src/Form.ts | 14 ++--- packages/effect-fc/src/Query.ts | 5 +- packages/effect-fc/src/Result.ts | 104 +++++++------------------------ 3 files changed, 33 insertions(+), 90 deletions(-) diff --git a/packages/effect-fc/src/Form.ts b/packages/effect-fc/src/Form.ts index 4969ad2..c31f12a 100644 --- a/packages/effect-fc/src/Form.ts +++ b/packages/effect-fc/src/Form.ts @@ -65,7 +65,7 @@ export namespace make { readonly onSubmit: ( this: Form, NoInfer, NoInfer, unknown, unknown, unknown>, value: NoInfer, - ) => Effect.Effect>> + ) => Effect.Effect>> readonly initialSubmitProgress?: SP readonly autosubmit?: boolean readonly debounce?: Duration.DurationInput @@ -160,17 +160,15 @@ export const submit = ( > => Effect.whenEffect( self.valueRef.pipe( Effect.andThen(identity), - Effect.andThen(value => Result.forkEffectPubSub( - self.onSubmit(value) as Effect.Effect>, + Effect.andThen(value => Result.forkEffect( + self.onSubmit(value) as Effect.Effect>, { initialProgress: self.initialSubmitProgress }, )), - Effect.andThen(identity), - Effect.andThen(Stream.fromQueue), - Stream.unwrapScoped, - Stream.runFoldEffect( + Effect.andThen(([result]) => Stream.runFoldEffect( + result.changes, Result.initial() as Result.Result, (_, result) => Effect.as(Ref.set(self.submitResultRef, result), result), - ), + )), Effect.tap(result => Result.isFailure(result) ? Option.match( Chunk.findFirst( diff --git a/packages/effect-fc/src/Query.ts b/packages/effect-fc/src/Query.ts index b2ab6c9..c9a8ca9 100644 --- a/packages/effect-fc/src/Query.ts +++ b/packages/effect-fc/src/Query.ts @@ -1,5 +1,5 @@ import { Effect, Fiber, Option, Pipeable, Predicate, Stream, type Subscribable, type SubscriptionRef } from "effect" -import type * as Result from "./Result.js" +import * as Result from "./Result.js" export const QueryTypeId: unique symbol = Symbol.for("@effect-fc/Query/Query") @@ -38,7 +38,8 @@ extends Pipeable.Class() implements Query { onSome: Fiber.interrupt, onNone: () => Effect.void, })), - Effect.andThen(), + Effect.andThen(Result.forkEffect(this.f(key))), + Effect.tap(([result, fiber]) => ), ) } } diff --git a/packages/effect-fc/src/Result.ts b/packages/effect-fc/src/Result.ts index 676a044..b54153e 100644 --- a/packages/effect-fc/src/Result.ts +++ b/packages/effect-fc/src/Result.ts @@ -1,4 +1,4 @@ -import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, PubSub, pipe, type Queue, Ref, type Scope, type Subscribable, SubscriptionRef } from "effect" +import { Cause, Context, Data, Effect, Equal, Exit, type Fiber, Hash, Layer, Match, Option, Pipeable, Predicate, PubSub, pipe, Ref, type Scope, Stream, Subscribable } from "effect" export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result") @@ -186,78 +186,6 @@ export const makeProgressLayer = (): Layer.Layer< })) -export namespace forkEffectSubscriptionRef { - export type InputContext = R extends Progress ? [X] extends [P] ? R : never : R - export type OutputContext = Scope.Scope | Exclude | Progress> - - export interface Options

{ - readonly initialProgress?: P - } -} - -export const forkEffectSubscriptionRef = ( - effect: Effect.Effect>>, - options?: forkEffectSubscriptionRef.Options

, -): Effect.Effect< - Subscribable.Subscribable>, - never, - forkEffectSubscriptionRef.OutputContext -> => Effect.tap( - SubscriptionRef.make>(initial()), - ref => Effect.forkScoped(State().pipe( - Effect.andThen(state => state.set(running(options?.initialProgress)).pipe( - Effect.andThen(effect), - Effect.onExit(exit => state.set(fromExit(exit))), - )), - Effect.provide(Layer.empty.pipe( - Layer.provideMerge(makeProgressLayer()), - Layer.provideMerge(Layer.succeed(State(), { - get: ref, - set: v => Ref.set(ref, v), - })), - )), - )), -) as Effect.Effect>, never, Scope.Scope> - -export namespace forkEffectPubSub { - export type InputContext = R extends Progress ? [X] extends [P] ? R : never : R - export type OutputContext = Scope.Scope | Exclude | Progress> - - export interface Options

{ - readonly initialProgress?: P - } -} - -export const forkEffectPubSub = ( - effect: Effect.Effect>>, - options?: forkEffectPubSub.Options

, -): Effect.Effect< - Effect.Effect>, never, Scope.Scope>, - never, - forkEffectPubSub.OutputContext -> => Effect.all([ - Ref.make>(initial()), - PubSub.unbounded>(), -]).pipe( - Effect.tap(([ref, pubsub]) => Effect.forkScoped(State().pipe( - Effect.andThen(state => state.set(running(options?.initialProgress)).pipe( - Effect.andThen(effect), - Effect.onExit(exit => Effect.andThen( - state.set(fromExit(exit)), - Effect.forkScoped(PubSub.shutdown(pubsub)), - )), - )), - Effect.provide(Layer.empty.pipe( - Layer.provideMerge(makeProgressLayer()), - Layer.provideMerge(Layer.succeed(State(), { - get: ref, - set: v => Effect.andThen(Ref.set(ref, v), PubSub.publish(pubsub, v)) - })), - )), - ))), - Effect.map(([, pubsub]) => pubsub.subscribe), -) as Effect.Effect>, never, Scope.Scope>, never, Scope.Scope> - export namespace forkEffect { export type InputContext = R extends Progress ? [X] extends [P] ? R : never : R export type OutputContext = Scope.Scope | Exclude | Progress> @@ -270,11 +198,14 @@ export namespace forkEffect { export const forkEffect = ( effect: Effect.Effect>>, options?: forkEffect.Options

, -) => Effect.all([ - Ref.make>(initial()), - PubSub.unbounded>(), -]).pipe( - Effect.tap(([ref, pubsub]) => Effect.forkScoped(State().pipe( +): Effect.Effect< + readonly [result: Subscribable.Subscribable, never, never>, fiber: Fiber.Fiber], + never, + Scope.Scope | forkEffect.OutputContext +> => Effect.Do.pipe( + Effect.bind("ref", () => Ref.make>(initial())), + Effect.bind("pubsub", () => PubSub.unbounded>()), + Effect.bind("fiber", ({ ref, pubsub }) => Effect.forkScoped(State().pipe( Effect.andThen(state => state.set(running(options?.initialProgress)).pipe( Effect.andThen(effect), Effect.onExit(exit => Effect.andThen( @@ -290,5 +221,18 @@ export const forkEffect = ( })), )), ))), - Effect.map(([, pubsub]) => pubsub.subscribe), -) as Effect.Effect>, never, Scope.Scope>, never, Scope.Scope> + Effect.map(({ ref, pubsub, fiber }) => [ + Subscribable.make({ + get: ref, + changes: Stream.unwrapScoped(Effect.map( + Effect.all([ref, Stream.fromPubSub(pubsub, { scoped: true })]), + ([latest, stream]) => Stream.concat(Stream.make(latest), stream), + )), + }), + fiber, + ]), +) as Effect.Effect< + readonly [result: Subscribable.Subscribable, never, never>, fiber: Fiber.Fiber], + never, + Scope.Scope | forkEffect.OutputContext +>