This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
import { R } from "@/reffuse"
|
||||
import { Button, Flex, Text } from "@radix-ui/themes"
|
||||
import { createFileRoute } from "@tanstack/react-router"
|
||||
import { Option, Random, Stream } from "effect"
|
||||
import { useMemo } from "react"
|
||||
import { Chunk, Effect, Exit, Option, Queue, Random, Scope, Stream } from "effect"
|
||||
import { useMemo, useState } from "react"
|
||||
|
||||
|
||||
export const Route = createFileRoute("/streams/pull")({
|
||||
@@ -11,13 +11,24 @@ export const Route = createFileRoute("/streams/pull")({
|
||||
|
||||
function RouteComponent() {
|
||||
const stream = useMemo(() => Stream.repeatEffect(Random.nextInt), [])
|
||||
const [value, pull] = R.usePullStream(stream)
|
||||
const pullNext = R.useCallbackSync(() => pull, [pull])
|
||||
const streamScope = R.useScope([stream])
|
||||
|
||||
const queue = R.useMemo(() => Effect.provideService(Stream.toQueueOfElements(stream), Scope.Scope, streamScope), [streamScope])
|
||||
|
||||
const [value, setValue] = useState(Option.none<number>())
|
||||
const pullLatest = R.useCallbackSync(() => Queue.takeAll(queue).pipe(
|
||||
Effect.flatMap(Chunk.last),
|
||||
Effect.flatMap(Exit.matchEffect({
|
||||
onSuccess: Effect.succeed,
|
||||
onFailure: Effect.fail,
|
||||
})),
|
||||
Effect.tap(v => Effect.sync(() => setValue(Option.some(v)))),
|
||||
), [queue])
|
||||
|
||||
return (
|
||||
<Flex direction="column" align="center" gap="2">
|
||||
{Option.isSome(value) && <Text>{value.value}</Text>}
|
||||
<Button onClick={pullNext}>Pull next</Button>
|
||||
<Button onClick={pullLatest}>Pull latest</Button>
|
||||
</Flex>
|
||||
)
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ export const Route = createFileRoute("/tests")({
|
||||
|
||||
function RouteComponent() {
|
||||
const runSync = R.useRunSync()
|
||||
const runPromise = R.useRunPromise()
|
||||
|
||||
const [uuid, setUuid] = useState(R.useMemo(() => makeUuid, []))
|
||||
const generateUuid = R.useCallbackSync(() => makeUuid.pipe(
|
||||
@@ -23,7 +22,7 @@ function RouteComponent() {
|
||||
), [])
|
||||
|
||||
const uuidStream = R.useStreamFromReactiveValues([uuid])
|
||||
const [uuidStreamLatestValue, pullUuidStream] = R.usePullStream(uuidStream)
|
||||
const uuidStreamLatestValue = R.useSubscribeStream(uuidStream)
|
||||
|
||||
const scope = R.useScope([uuid])
|
||||
|
||||
@@ -43,7 +42,6 @@ function RouteComponent() {
|
||||
onNone: () => <></>,
|
||||
})}
|
||||
</Text>
|
||||
<Button onClick={() => runPromise(pullUuidStream)}>Pull UUID</Button>
|
||||
</Flex>
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { Chunk, type Context, Effect, ExecutionStrategy, Exit, type Fiber, flow, type Layer, Match, Option, pipe, Pipeable, PubSub, Ref, Runtime, Scope, Stream, SubscriptionRef } from "effect"
|
||||
import { type Context, Effect, ExecutionStrategy, Exit, type Fiber, type Layer, Match, Option, pipe, Pipeable, PubSub, Ref, Runtime, Scope, Stream, SubscriptionRef } from "effect"
|
||||
import * as React from "react"
|
||||
import * as ReffuseContext from "./ReffuseContext.js"
|
||||
import * as ReffuseRuntime from "./ReffuseRuntime.js"
|
||||
@@ -527,50 +527,6 @@ export abstract class ReffuseNamespace<R> {
|
||||
return reactStateValue
|
||||
}
|
||||
|
||||
usePullStream<A, E, IE, R>(
|
||||
this: ReffuseNamespace<R>,
|
||||
stream: Stream.Stream<A, E, R>,
|
||||
): [latestValue: Option.Option<A>, pull: Effect.Effect<Chunk.Chunk<A>, Option.Option<E>>]
|
||||
usePullStream<A, E, IE, R>(
|
||||
this: ReffuseNamespace<R>,
|
||||
stream: Stream.Stream<A, E, R>,
|
||||
initialValue: () => Effect.Effect<A, IE, R>,
|
||||
): [latestValue: Option.Some<A>, pull: Effect.Effect<Chunk.Chunk<A>, Option.Option<E>>]
|
||||
usePullStream<A, E, IE, R>(
|
||||
this: ReffuseNamespace<R>,
|
||||
stream: Stream.Stream<A, E, R>,
|
||||
initialValue?: () => Effect.Effect<A, IE, R>,
|
||||
): [latestValue: Option.Option<A>, pull: Effect.Effect<Chunk.Chunk<A>, Option.Option<E>>] {
|
||||
const scope = this.useScope([stream])
|
||||
|
||||
const [reactStateValue, setReactStateValue] = React.useState(this.useMemo(
|
||||
() => initialValue
|
||||
? Effect.map(initialValue(), Option.some)
|
||||
: Effect.succeed(Option.none()),
|
||||
[],
|
||||
{ doNotReExecuteOnRuntimeOrContextChange: true },
|
||||
))
|
||||
|
||||
const pull = this.useMemo(() => Effect.context<R>().pipe(
|
||||
Effect.flatMap(context => Stream.toPull(Stream.changesWith(stream, (x, y) => x === y)).pipe(
|
||||
Effect.map(effect => effect.pipe(
|
||||
Effect.tap(flow(
|
||||
Chunk.last,
|
||||
v => Option.match(v, {
|
||||
onSome: () => Effect.sync(() => setReactStateValue(v)),
|
||||
onNone: () => Effect.void,
|
||||
}),
|
||||
)),
|
||||
Effect.provide(context),
|
||||
)),
|
||||
|
||||
Effect.provideService(Scope.Scope, scope),
|
||||
))
|
||||
), [stream, scope])
|
||||
|
||||
return [reactStateValue, pull]
|
||||
}
|
||||
|
||||
|
||||
SubRef<B, const P extends PropertyPath.Paths<B>, R>(
|
||||
this: ReffuseNamespace<R>,
|
||||
|
||||
Reference in New Issue
Block a user