10 Commits

Author SHA1 Message Date
Julien Valverdé
73dd7bc160 Cleanup
All checks were successful
Lint / lint (push) Successful in 13s
2025-05-08 19:22:50 +02:00
Julien Valverdé
6bc07d5b2a Tests
All checks were successful
Lint / lint (push) Successful in 12s
2025-05-08 05:59:36 +02:00
Julien Valverdé
70e9b9218d Queue -> PubSub
All checks were successful
Lint / lint (push) Successful in 13s
2025-05-08 04:51:41 +02:00
Julien Valverdé
31b07f842b Fix
All checks were successful
Lint / lint (push) Successful in 13s
2025-05-08 04:42:24 +02:00
Julien Valverdé
10f23d4cb4 Fix
All checks were successful
Lint / lint (push) Successful in 13s
2025-05-08 01:28:41 +02:00
Julien Valverdé
39765102db useStreamFromReactiveValues
All checks were successful
Lint / lint (push) Successful in 14s
2025-05-08 01:15:35 +02:00
Julien Valverdé
04e78e1ea3 Tests
All checks were successful
Lint / lint (push) Successful in 13s
2025-05-07 04:53:40 +02:00
Julien Valverdé
606dd2c00f useStreamFromReactiveValues
All checks were successful
Lint / lint (push) Successful in 12s
2025-05-07 02:15:46 +02:00
Julien Valverdé
c13a8d549f useStreamFromReactiveValues
All checks were successful
Lint / lint (push) Successful in 15s
2025-05-07 02:07:50 +02:00
Julien Valverdé
4b9bfd0637 Stream PubSub
All checks were successful
Lint / lint (push) Successful in 14s
2025-05-05 21:52:55 +02:00
4 changed files with 29 additions and 15 deletions

View File

@@ -1,7 +1,7 @@
import { AlertDialog, Button, Flex, Text } from "@radix-ui/themes"
import { Cause, Console, Effect, Either, flow, Match, Option, Stream } from "effect"
import { useState } from "react"
import { AppQueryErrorHandler } from "./query"
import { AppQueryClient } from "./query"
import { R } from "./reffuse"
@@ -9,8 +9,8 @@ export function VQueryErrorHandler() {
const [open, setOpen] = useState(false)
const error = R.useSubscribeStream(
R.useMemo(() => AppQueryErrorHandler.pipe(
Effect.map(handler => handler.errors.pipe(
R.useMemo(() => AppQueryClient.pipe(
Effect.map(client => client.errorHandler.errors.pipe(
Stream.changes,
Stream.tap(Console.error),
Stream.tap(() => Effect.sync(() => setOpen(true))),

View File

@@ -3,12 +3,11 @@ import { Clipboard, Geolocation, Permissions } from "@effect/platform-browser"
import { LazyRefExtension } from "@reffuse/extension-lazyref"
import { QueryExtension } from "@reffuse/extension-query"
import { Reffuse, ReffuseContext } from "reffuse"
import { AppQueryClient, AppQueryErrorHandler } from "./query"
import { AppQueryClient } from "./query"
export const RootContext = ReffuseContext.make<
| AppQueryClient
| AppQueryErrorHandler
| Clipboard.Clipboard
| Geolocation.Geolocation
| Permissions.Permissions

View File

@@ -1,4 +1,4 @@
import { Cause, Context, Effect, identity, Layer, Queue, Stream } from "effect"
import { Cause, Context, Effect, identity, Layer, PubSub, Stream } from "effect"
import type { Mutable } from "effect/Types"
@@ -36,17 +36,17 @@ export const Service = <Self, HandledE = never>() => (
const TagClass = Context.Tag(id)() as ServiceResult<Self, Id, FallbackA, HandledE>
(TagClass as Mutable<typeof TagClass>).Live = Layer.effect(TagClass, Effect.gen(function*() {
const queue = yield* Queue.unbounded<Cause.Cause<HandledE>>()
const errors = Stream.fromQueue(queue)
const pubsub = yield* PubSub.unbounded<Cause.Cause<HandledE>>()
const errors = Stream.fromPubSub(pubsub)
const handle = <A, E, R>(
self: Effect.Effect<A, E, R>
): Effect.Effect<A | FallbackA, Exclude<E, HandledE>, R> => f(
self as unknown as Effect.Effect<never, HandledE, never>,
(failure: HandledE) => Queue.offer(queue, Cause.fail(failure)).pipe(
(failure: HandledE) => PubSub.publish(pubsub, Cause.fail(failure)).pipe(
Effect.andThen(Effect.failCause(Cause.empty))
),
(defect: unknown) => Queue.offer(queue, Cause.die(defect)).pipe(
(defect: unknown) => PubSub.publish(pubsub, Cause.die(defect)).pipe(
Effect.andThen(Effect.failCause(Cause.empty))
),
)

View File

@@ -1,4 +1,4 @@
import { Chunk, type Context, Effect, ExecutionStrategy, Exit, type Fiber, flow, type Layer, Match, Option, pipe, Pipeable, Queue, Ref, Runtime, Scope, Stream, SubscriptionRef } from "effect"
import { Chunk, type Context, Effect, ExecutionStrategy, Exit, type Fiber, flow, type Layer, Match, Option, pipe, Pipeable, PubSub, Ref, Runtime, Scope, Stream, SubscriptionRef } from "effect"
import * as React from "react"
import * as ReffuseContext from "./ReffuseContext.js"
import * as ReffuseRuntime from "./ReffuseRuntime.js"
@@ -474,11 +474,26 @@ export abstract class ReffuseNamespace<R> {
this: ReffuseNamespace<R>,
values: A,
): Stream.Stream<A> {
const [queue, stream] = this.useMemo(() => Queue.unbounded<A>().pipe(
Effect.map(queue => [queue, Stream.fromQueue(queue)] as const)
), [])
const scope = this.useScope()
const { latest, pubsub, stream } = this.useMemo(() => Effect.Do.pipe(
Effect.bind("latest", () => Ref.make(values)),
Effect.bind("pubsub", () => Effect.acquireRelease(PubSub.unbounded<A>(), PubSub.shutdown)),
Effect.let("stream", ({ latest, pubsub }) => Ref.get(latest).pipe(
Effect.flatMap(a => Effect.map(
Stream.fromPubSub(pubsub, { scoped: true }),
s => Stream.concat(Stream.make(a), s),
)),
Stream.unwrapScoped,
)),
Effect.provideService(Scope.Scope, scope),
), [scope], { doNotReExecuteOnRuntimeOrContextChange: true })
this.useEffect(() => Ref.set(latest, values).pipe(
Effect.andThen(PubSub.publish(pubsub, values)),
Effect.unlessEffect(PubSub.isShutdown(pubsub)),
), values, { doNotReExecuteOnRuntimeOrContextChange: true })
this.useEffect(() => Queue.offer(queue, values), values)
return stream
}