From e5a7fe8ad6c74679bd996dcd7eec303226c47437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Sun, 11 May 2025 03:17:35 +0200 Subject: [PATCH] useSubscribeStream --- packages/example/src/routes/tests.tsx | 4 ++-- packages/reffuse/src/ReffuseNamespace.ts | 29 ++++++++++++++++++++---- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/packages/example/src/routes/tests.tsx b/packages/example/src/routes/tests.tsx index 5552ffb..985cee9 100644 --- a/packages/example/src/routes/tests.tsx +++ b/packages/example/src/routes/tests.tsx @@ -22,7 +22,7 @@ function RouteComponent() { ), []) const uuidStream = R.useStreamFromReactiveValues([uuid]) - const [uuidStreamLatestValue, pullUuidStream] = R.usePullStream(uuidStream) + const uuidStreamLatestValue = R.useSubscribeStream(uuidStream, true) const scope = R.useScope([uuid]) @@ -42,7 +42,7 @@ function RouteComponent() { onNone: () => <>, })} - + {/* */} ) } diff --git a/packages/reffuse/src/ReffuseNamespace.ts b/packages/reffuse/src/ReffuseNamespace.ts index 0250fdc..02a4a93 100644 --- a/packages/reffuse/src/ReffuseNamespace.ts +++ b/packages/reffuse/src/ReffuseNamespace.ts @@ -1,4 +1,4 @@ -import { Chunk, type Context, Effect, ExecutionStrategy, Exit, type Fiber, flow, type Layer, Match, Option, pipe, Pipeable, PubSub, Ref, Runtime, Scope, Stream, SubscriptionRef } from "effect" +import { type Cause, Chunk, type Context, Effect, ExecutionStrategy, Exit, type Fiber, flow, Function, type Layer, Match, Option, pipe, Pipeable, PubSub, Queue, Ref, Runtime, Scope, Stream, SubscriptionRef } from "effect" import * as React from "react" import * as ReffuseContext from "./ReffuseContext.js" import * as ReffuseRuntime from "./ReffuseRuntime.js" @@ -501,6 +501,11 @@ export abstract class ReffuseNamespace { this: ReffuseNamespace, stream: Stream.Stream, ): Option.Option + useSubscribeStream( + this: ReffuseNamespace, + stream: Stream.Stream, + pullLatestValue: true, + ): Option.Some useSubscribeStream( this: ReffuseNamespace, stream: Stream.Stream, @@ -509,11 +514,25 @@ export abstract class ReffuseNamespace { useSubscribeStream( this: ReffuseNamespace, stream: Stream.Stream, - initialValue?: () => Effect.Effect, + pullLatestOrInitialValue?: true | (() => Effect.Effect), ): Option.Option { - const [reactStateValue, setReactStateValue] = React.useState(this.useMemo( - () => initialValue - ? Effect.map(initialValue(), Option.some) + const [reactStateValue, setReactStateValue] = React.useState(this.useMemo< + Option.Option, + Cause.NoSuchElementException | Cause.Cause> | IE, + R + >( + () => pullLatestOrInitialValue + ? Function.isFunction(pullLatestOrInitialValue) + ? Effect.map(pullLatestOrInitialValue(), Option.some) + : Stream.toQueueOfElements(stream).pipe( + Effect.flatMap(Queue.takeAll), + Effect.flatMap(Chunk.last), + Effect.flatMap(Exit.matchEffect({ + onSuccess: v => Effect.succeed(Option.some(v)), + onFailure: Effect.fail, + })), + Effect.scoped, + ) : Effect.succeed(Option.none()), [], { doNotReExecuteOnRuntimeOrContextChange: true },