From c789de3ad83c22bf1fb3d3cc5d2c052c48226598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Tue, 11 Nov 2025 02:22:37 +0100 Subject: [PATCH] Refactoring --- packages/effect-fc/src/Form.ts | 7 +++--- packages/effect-fc/src/Result.ts | 38 +++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/packages/effect-fc/src/Form.ts b/packages/effect-fc/src/Form.ts index 8a0c16b..cfdc570 100644 --- a/packages/effect-fc/src/Form.ts +++ b/packages/effect-fc/src/Form.ts @@ -149,11 +149,12 @@ export const submit = ( ): Effect.Effect>, NoSuchElementException, Scope.Scope | SR> => Effect.whenEffect( self.valueRef.pipe( Effect.andThen(identity), - Effect.andThen(value => Result.forkEffectDequeue( - self.onSubmit(value) as Effect.Effect>) + Effect.andThen(value => Result.forkEffectPubSub( + self.onSubmit(value) as Effect.Effect>) ), + Effect.andThen(identity), Effect.andThen(Stream.fromQueue), - Stream.unwrap, + Stream.unwrapScoped, Stream.runFoldEffect( Result.initial() as Result.Result, (_, result) => Effect.as(Ref.set(self.submitResultRef, result), result), diff --git a/packages/effect-fc/src/Result.ts b/packages/effect-fc/src/Result.ts index edb38a9..856b85d 100644 --- a/packages/effect-fc/src/Result.ts +++ b/packages/effect-fc/src/Result.ts @@ -1,4 +1,4 @@ -import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, pipe, Queue, Ref, type Scope, type Subscribable, SubscriptionRef } from "effect" +import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, PubSub, pipe, Queue, Ref, type Scope, type Subscribable, SubscriptionRef } from "effect" export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result") @@ -219,6 +219,42 @@ export const forkEffect = ( )), ) as Effect.Effect>, never, Scope.Scope> +export namespace forkEffectPubSub { + export type InputContext = forkEffect.InputContext + export type OutputContext = forkEffect.OutputContext + export interface Options

extends forkEffect.Options

{} +} + +export const forkEffectPubSub = ( + effect: Effect.Effect>>, + options?: forkEffectDequeue.Options

, +): Effect.Effect< + Effect.Effect>, never, Scope.Scope>, + never, + forkEffectDequeue.OutputContext +> => Effect.all([ + Ref.make>(initial()), + PubSub.unbounded>(), +]).pipe( + Effect.tap(([ref, pubsub]) => Effect.forkScoped(State().pipe( + Effect.andThen(state => state.set(running(options?.initialProgress)).pipe( + Effect.andThen(effect), + Effect.onExit(exit => Effect.andThen( + state.set(fromExit(exit)), + Effect.forkScoped(PubSub.shutdown(pubsub)), + )), + )), + Effect.provide(Layer.empty.pipe( + Layer.provideMerge(makeProgressLayer()), + Layer.provideMerge(Layer.succeed(State(), { + get: ref, + set: v => Effect.andThen(Ref.set(ref, v), PubSub.publish(pubsub, v)) + })), + )), + ))), + Effect.map(([, pubsub]) => pubsub.subscribe), +) as Effect.Effect>, never, Scope.Scope>, never, Scope.Scope> + export namespace forkEffectDequeue { export type InputContext = forkEffect.InputContext export type OutputContext = forkEffect.OutputContext