From 9feb94ea9e35061600f853073e72d7dc3808bf73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Sun, 16 Nov 2025 06:54:34 +0100 Subject: [PATCH] Add forkEffect --- packages/effect-fc/src/Result.ts | 35 ++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/packages/effect-fc/src/Result.ts b/packages/effect-fc/src/Result.ts index 23318f8..676a044 100644 --- a/packages/effect-fc/src/Result.ts +++ b/packages/effect-fc/src/Result.ts @@ -257,3 +257,38 @@ export const forkEffectPubSub = ( ))), Effect.map(([, pubsub]) => pubsub.subscribe), ) as Effect.Effect>, never, Scope.Scope>, never, Scope.Scope> + +export namespace forkEffect { + export type InputContext = R extends Progress ? [X] extends [P] ? R : never : R + export type OutputContext = Scope.Scope | Exclude | Progress> + + export interface Options

{ + readonly initialProgress?: P + } +} + +export const forkEffect = ( + effect: Effect.Effect>>, + options?: forkEffect.Options

, +) => Effect.all([ + Ref.make>(initial()), + PubSub.unbounded>(), +]).pipe( + Effect.tap(([ref, pubsub]) => Effect.forkScoped(State().pipe( + Effect.andThen(state => state.set(running(options?.initialProgress)).pipe( + Effect.andThen(effect), + Effect.onExit(exit => Effect.andThen( + state.set(fromExit(exit)), + Effect.forkScoped(PubSub.shutdown(pubsub)), + )), + )), + Effect.provide(Layer.empty.pipe( + Layer.provideMerge(makeProgressLayer()), + Layer.provideMerge(Layer.succeed(State(), { + get: ref, + set: v => Effect.andThen(Ref.set(ref, v), PubSub.publish(pubsub, v)) + })), + )), + ))), + Effect.map(([, pubsub]) => pubsub.subscribe), +) as Effect.Effect>, never, Scope.Scope>, never, Scope.Scope>