From 284a080f19508006c876fda8a023f15f0b766153 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Thu, 1 May 2025 20:27:11 +0200 Subject: [PATCH] usePullStream --- packages/reffuse/src/ReffuseNamespace.ts | 41 ++++++++++++++++-------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/packages/reffuse/src/ReffuseNamespace.ts b/packages/reffuse/src/ReffuseNamespace.ts index 54e22f4..081d88e 100644 --- a/packages/reffuse/src/ReffuseNamespace.ts +++ b/packages/reffuse/src/ReffuseNamespace.ts @@ -1,4 +1,4 @@ -import { type Context, Effect, ExecutionStrategy, Exit, type Fiber, type Layer, Match, Option, pipe, Pipeable, Queue, Ref, Runtime, Scope, Stream, SubscriptionRef } from "effect" +import { Chunk, type Context, Effect, ExecutionStrategy, Exit, type Fiber, flow, type Layer, Match, Option, pipe, Pipeable, Queue, Ref, Runtime, Scope, Stream, SubscriptionRef } from "effect" import * as React from "react" import * as ReffuseContext from "./ReffuseContext.js" import * as ReffuseRuntime from "./ReffuseRuntime.js" @@ -495,19 +495,34 @@ export abstract class ReffuseNamespace { return reactStateValue as InitialA extends A ? Option.Some : Option.Option } - // usePullStream( - // this: ReffuseNamespace, - // stream: Stream.Stream, - // initialValue?: InitialA, - // ): [ - // latestValue: InitialA extends A ? Option.Some : Option.Option, - // pull: () => void, - // ] { - // const [reactStateValue, setReactStateValue] = React.useState>(Option.fromNullable(initialValue)) - // const pull = this.useMemo(() => Stream.toPull(stream), [stream]) + usePullStream( + this: ReffuseNamespace, + stream: Stream.Stream, + initialValue?: InitialA, + ): [ + latestValue: InitialA extends A ? Option.Some : Option.Option, + pull: Effect.Effect, Option.Option, R>, + ] { + const scope = this.useScope([stream]) - // return reactStateValue as InitialA extends A ? Option.Some : Option.Option - // } + const [reactStateValue, setReactStateValue] = React.useState>(Option.fromNullable(initialValue)) + const pull = this.useMemo(() => Stream.toPull(stream).pipe( + Effect.map(Effect.tap(flow( + Chunk.last, + v => Option.match(v, { + onSome: () => Effect.sync(() => setReactStateValue(v)), + onNone: () => Effect.void, + }), + ))), + + Effect.provideService(Scope.Scope, scope), + ), [stream, scope]) + + return [ + reactStateValue as InitialA extends A ? Option.Some : Option.Option, + pull, + ] + } SubRef, R>(