Compare commits
10 Commits
53fc1ef505
...
73dd7bc160
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
73dd7bc160 | ||
|
|
6bc07d5b2a | ||
|
|
70e9b9218d | ||
|
|
31b07f842b | ||
|
|
10f23d4cb4 | ||
|
|
39765102db | ||
|
|
04e78e1ea3 | ||
|
|
606dd2c00f | ||
|
|
c13a8d549f | ||
|
|
4b9bfd0637 |
@@ -1,7 +1,7 @@
|
||||
import { AlertDialog, Button, Flex, Text } from "@radix-ui/themes"
|
||||
import { Cause, Console, Effect, Either, flow, Match, Option, Stream } from "effect"
|
||||
import { useState } from "react"
|
||||
import { AppQueryErrorHandler } from "./query"
|
||||
import { AppQueryClient } from "./query"
|
||||
import { R } from "./reffuse"
|
||||
|
||||
|
||||
@@ -9,8 +9,8 @@ export function VQueryErrorHandler() {
|
||||
const [open, setOpen] = useState(false)
|
||||
|
||||
const error = R.useSubscribeStream(
|
||||
R.useMemo(() => AppQueryErrorHandler.pipe(
|
||||
Effect.map(handler => handler.errors.pipe(
|
||||
R.useMemo(() => AppQueryClient.pipe(
|
||||
Effect.map(client => client.errorHandler.errors.pipe(
|
||||
Stream.changes,
|
||||
Stream.tap(Console.error),
|
||||
Stream.tap(() => Effect.sync(() => setOpen(true))),
|
||||
|
||||
@@ -3,12 +3,11 @@ import { Clipboard, Geolocation, Permissions } from "@effect/platform-browser"
|
||||
import { LazyRefExtension } from "@reffuse/extension-lazyref"
|
||||
import { QueryExtension } from "@reffuse/extension-query"
|
||||
import { Reffuse, ReffuseContext } from "reffuse"
|
||||
import { AppQueryClient, AppQueryErrorHandler } from "./query"
|
||||
import { AppQueryClient } from "./query"
|
||||
|
||||
|
||||
export const RootContext = ReffuseContext.make<
|
||||
| AppQueryClient
|
||||
| AppQueryErrorHandler
|
||||
| Clipboard.Clipboard
|
||||
| Geolocation.Geolocation
|
||||
| Permissions.Permissions
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { Cause, Context, Effect, identity, Layer, Queue, Stream } from "effect"
|
||||
import { Cause, Context, Effect, identity, Layer, PubSub, Stream } from "effect"
|
||||
import type { Mutable } from "effect/Types"
|
||||
|
||||
|
||||
@@ -36,17 +36,17 @@ export const Service = <Self, HandledE = never>() => (
|
||||
const TagClass = Context.Tag(id)() as ServiceResult<Self, Id, FallbackA, HandledE>
|
||||
|
||||
(TagClass as Mutable<typeof TagClass>).Live = Layer.effect(TagClass, Effect.gen(function*() {
|
||||
const queue = yield* Queue.unbounded<Cause.Cause<HandledE>>()
|
||||
const errors = Stream.fromQueue(queue)
|
||||
const pubsub = yield* PubSub.unbounded<Cause.Cause<HandledE>>()
|
||||
const errors = Stream.fromPubSub(pubsub)
|
||||
|
||||
const handle = <A, E, R>(
|
||||
self: Effect.Effect<A, E, R>
|
||||
): Effect.Effect<A | FallbackA, Exclude<E, HandledE>, R> => f(
|
||||
self as unknown as Effect.Effect<never, HandledE, never>,
|
||||
(failure: HandledE) => Queue.offer(queue, Cause.fail(failure)).pipe(
|
||||
(failure: HandledE) => PubSub.publish(pubsub, Cause.fail(failure)).pipe(
|
||||
Effect.andThen(Effect.failCause(Cause.empty))
|
||||
),
|
||||
(defect: unknown) => Queue.offer(queue, Cause.die(defect)).pipe(
|
||||
(defect: unknown) => PubSub.publish(pubsub, Cause.die(defect)).pipe(
|
||||
Effect.andThen(Effect.failCause(Cause.empty))
|
||||
),
|
||||
)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { Chunk, type Context, Effect, ExecutionStrategy, Exit, type Fiber, flow, type Layer, Match, Option, pipe, Pipeable, Queue, Ref, Runtime, Scope, Stream, SubscriptionRef } from "effect"
|
||||
import { Chunk, type Context, Effect, ExecutionStrategy, Exit, type Fiber, flow, type Layer, Match, Option, pipe, Pipeable, PubSub, Ref, Runtime, Scope, Stream, SubscriptionRef } from "effect"
|
||||
import * as React from "react"
|
||||
import * as ReffuseContext from "./ReffuseContext.js"
|
||||
import * as ReffuseRuntime from "./ReffuseRuntime.js"
|
||||
@@ -474,11 +474,26 @@ export abstract class ReffuseNamespace<R> {
|
||||
this: ReffuseNamespace<R>,
|
||||
values: A,
|
||||
): Stream.Stream<A> {
|
||||
const [queue, stream] = this.useMemo(() => Queue.unbounded<A>().pipe(
|
||||
Effect.map(queue => [queue, Stream.fromQueue(queue)] as const)
|
||||
), [])
|
||||
const scope = this.useScope()
|
||||
|
||||
const { latest, pubsub, stream } = this.useMemo(() => Effect.Do.pipe(
|
||||
Effect.bind("latest", () => Ref.make(values)),
|
||||
Effect.bind("pubsub", () => Effect.acquireRelease(PubSub.unbounded<A>(), PubSub.shutdown)),
|
||||
Effect.let("stream", ({ latest, pubsub }) => Ref.get(latest).pipe(
|
||||
Effect.flatMap(a => Effect.map(
|
||||
Stream.fromPubSub(pubsub, { scoped: true }),
|
||||
s => Stream.concat(Stream.make(a), s),
|
||||
)),
|
||||
Stream.unwrapScoped,
|
||||
)),
|
||||
Effect.provideService(Scope.Scope, scope),
|
||||
), [scope], { doNotReExecuteOnRuntimeOrContextChange: true })
|
||||
|
||||
this.useEffect(() => Ref.set(latest, values).pipe(
|
||||
Effect.andThen(PubSub.publish(pubsub, values)),
|
||||
Effect.unlessEffect(PubSub.isShutdown(pubsub)),
|
||||
), values, { doNotReExecuteOnRuntimeOrContextChange: true })
|
||||
|
||||
this.useEffect(() => Queue.offer(queue, values), values)
|
||||
return stream
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user