From 6ae311cdfdb3458133c430cf2c4bd886307fe0fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Thu, 23 Oct 2025 23:01:27 +0200 Subject: [PATCH] Refactor --- packages/effect-fc/src/Component.ts | 2 +- packages/effect-fc/src/Stream.ts | 29 +++++++++++++++++++++-- packages/effect-fc/src/Subscribable.ts | 2 +- packages/effect-fc/src/SubscriptionRef.ts | 4 ++-- 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/packages/effect-fc/src/Component.ts b/packages/effect-fc/src/Component.ts index 17e26ac..e6f4a2e 100644 --- a/packages/effect-fc/src/Component.ts +++ b/packages/effect-fc/src/Component.ts @@ -545,7 +545,7 @@ const runReactEffect = ( Effect.bind("exit", ({ scope }) => Effect.exit(Effect.provideService(f(), Scope.Scope, scope))), Effect.map(({ scope }) => () => { - switch (options?.finalizerExecutionMode ?? "sync") { + switch (options?.finalizerExecutionMode ?? "fork") { case "sync": Runtime.runSync(runtime)(Scope.close(scope, Exit.void)) break diff --git a/packages/effect-fc/src/Stream.ts b/packages/effect-fc/src/Stream.ts index 23468d9..726b7be 100644 --- a/packages/effect-fc/src/Stream.ts +++ b/packages/effect-fc/src/Stream.ts @@ -1,4 +1,4 @@ -import { Effect, Equivalence, Option, Stream } from "effect" +import { Effect, Equivalence, Option, PubSub, Ref, type Scope, Stream } from "effect" import * as React from "react" import * as Component from "./Component.js" @@ -25,9 +25,34 @@ export const useStream: { Stream.changesWith(stream, Equivalence.strict()), v => Effect.sync(() => setReactStateValue(Option.some(v))), ) - ), [stream], { finalizerExecutionMode: "fork" }) + ), [stream]) return reactStateValue as Option.Some }) +export const useStreamFromReactiveValues: { + ( + values: A + ): Effect.Effect, never, Scope.Scope> +} = Effect.fnUntraced(function* (values: A) { + const { latest, pubsub, stream } = yield* Component.useOnMount(() => Effect.Do.pipe( + Effect.bind("latest", () => Ref.make(values)), + Effect.bind("pubsub", () => Effect.acquireRelease(PubSub.unbounded(), PubSub.shutdown)), + Effect.let("stream", ({ latest, pubsub }) => latest.pipe( + Effect.flatMap(a => Effect.map( + Stream.fromPubSub(pubsub, { scoped: true }), + s => Stream.concat(Stream.make(a), s), + )), + Stream.unwrapScoped, + )), + )) + + yield* Component.useReactEffect(() => Ref.set(latest, values).pipe( + Effect.andThen(PubSub.publish(pubsub, values)), + Effect.unlessEffect(PubSub.isShutdown(pubsub)), + ), values) + + return stream +}) + export * from "effect/Stream" diff --git a/packages/effect-fc/src/Subscribable.ts b/packages/effect-fc/src/Subscribable.ts index 707b0fd..22f5dbe 100644 --- a/packages/effect-fc/src/Subscribable.ts +++ b/packages/effect-fc/src/Subscribable.ts @@ -39,7 +39,7 @@ export const useSubscribables: { Stream.runForEach(v => Effect.sync(() => setReactStateValue(v)) ), - )), elements, { finalizerExecutionMode: "fork" }) + )), elements) return reactStateValue as any }) diff --git a/packages/effect-fc/src/SubscriptionRef.ts b/packages/effect-fc/src/SubscriptionRef.ts index 6a28b02..9891abd 100644 --- a/packages/effect-fc/src/SubscriptionRef.ts +++ b/packages/effect-fc/src/SubscriptionRef.ts @@ -16,7 +16,7 @@ export const useSubscriptionRefState: { Stream.changesWith(ref.changes, Equivalence.strict()), v => Effect.sync(() => setReactStateValue(v)), ) - ), [ref], { finalizerExecutionMode: "fork" }) + ), [ref]) const setValue = yield* Component.useCallbackSync((setStateAction: React.SetStateAction) => Effect.andThen( @@ -38,7 +38,7 @@ export const useSubscriptionRefFromState: { Stream.changesWith(ref.changes, Equivalence.strict()), v => Effect.sync(() => setValue(v)), ) - ), [setValue], { finalizerExecutionMode: "fork" }) + ), [setValue]) yield* Component.useReactEffect(() => Ref.set(ref, value), [value]) return ref