From 606dd2c00fc7e94c6dc500148dad56b8df61a7d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Wed, 7 May 2025 02:15:46 +0200 Subject: [PATCH] useStreamFromReactiveValues --- packages/reffuse/src/ReffuseNamespace.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/reffuse/src/ReffuseNamespace.ts b/packages/reffuse/src/ReffuseNamespace.ts index 37aee55..6c637eb 100644 --- a/packages/reffuse/src/ReffuseNamespace.ts +++ b/packages/reffuse/src/ReffuseNamespace.ts @@ -474,9 +474,11 @@ export abstract class ReffuseNamespace { this: ReffuseNamespace, values: A, ): Stream.Stream { + const scope = this.useScope() + const { latest, pubsub, stream } = this.useMemo(() => Effect.Do.pipe( Effect.bind("latest", () => Ref.make(values)), - Effect.bind("pubsub", () => PubSub.unbounded()), + Effect.bind("pubsub", () => Effect.acquireRelease(PubSub.unbounded(), PubSub.shutdown)), Effect.let("stream", ({ latest, pubsub }) => Ref.get(latest).pipe( Effect.flatMap(a => Effect.map( Stream.fromPubSub(pubsub, { scoped: true }), @@ -484,7 +486,8 @@ export abstract class ReffuseNamespace { )), Stream.unwrapScoped, )), - ), []) + Effect.provideService(Scope.Scope, scope), + ), [scope]) this.useEffect(() => Effect.andThen( Ref.set(latest, values),