From c13a8d549fd30ffae157f28f2a73f61bd2f5a230 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Wed, 7 May 2025 02:07:50 +0200 Subject: [PATCH] useStreamFromReactiveValues --- packages/reffuse/src/ReffuseNamespace.ts | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/packages/reffuse/src/ReffuseNamespace.ts b/packages/reffuse/src/ReffuseNamespace.ts index f1717e6..37aee55 100644 --- a/packages/reffuse/src/ReffuseNamespace.ts +++ b/packages/reffuse/src/ReffuseNamespace.ts @@ -474,11 +474,23 @@ export abstract class ReffuseNamespace { this: ReffuseNamespace, values: A, ): Stream.Stream { - const [pubsub, stream] = this.useMemo(() => PubSub.unbounded().pipe( - Effect.map(pubsub => [pubsub, Stream.fromPubSub(pubsub)] as const) + const { latest, pubsub, stream } = this.useMemo(() => Effect.Do.pipe( + Effect.bind("latest", () => Ref.make(values)), + Effect.bind("pubsub", () => PubSub.unbounded()), + Effect.let("stream", ({ latest, pubsub }) => Ref.get(latest).pipe( + Effect.flatMap(a => Effect.map( + Stream.fromPubSub(pubsub, { scoped: true }), + s => Stream.concat(Stream.make(a), s), + )), + Stream.unwrapScoped, + )), ), []) - this.useEffect(() => PubSub.publish(pubsub, values), values) + this.useEffect(() => Effect.andThen( + Ref.set(latest, values), + PubSub.publish(pubsub, values), + ), values) + return stream }