Add PubSub module
All checks were successful
Lint / lint (push) Successful in 40s

This commit is contained in:
Julien Valverdé
2025-11-15 01:19:14 +01:00
parent 3b90384f8f
commit f78b9f318a
3 changed files with 16 additions and 24 deletions

View File

@@ -0,0 +1,14 @@
import { Effect, PubSub, type Scope } from "effect"
import type * as React from "react"
import * as Component from "./Component.js"
export const usePubSubFromReactiveValues = Effect.fnUntraced(function* <const A extends React.DependencyList>(
values: A
): Effect.fn.Return<PubSub.PubSub<A>, never, Scope.Scope> {
const pubsub = yield* Component.useOnMount(() => Effect.acquireRelease(PubSub.unbounded<A>(), PubSub.shutdown))
yield* Component.useReactEffect(() => Effect.unlessEffect(PubSub.publish(pubsub, values), PubSub.isShutdown(pubsub)), values)
return pubsub
})
export * from "effect/PubSub"

View File

@@ -1,4 +1,4 @@
import { Effect, Equivalence, Option, PubSub, Ref, type Scope, Stream } from "effect" import { Effect, Equivalence, Option, Stream } from "effect"
import * as React from "react" import * as React from "react"
import * as Component from "./Component.js" import * as Component from "./Component.js"
@@ -30,27 +30,4 @@ export const useStream: {
return reactStateValue as Option.Some<A> return reactStateValue as Option.Some<A>
}) })
export const useStreamFromReactiveValues = Effect.fnUntraced(function* <const A extends React.DependencyList>(
values: A
): Effect.fn.Return<Stream.Stream<A>, never, Scope.Scope> {
const { latest, pubsub, stream } = yield* Component.useOnMount(() => Effect.Do.pipe(
Effect.bind("latest", () => Ref.make(values)),
Effect.bind("pubsub", () => Effect.acquireRelease(PubSub.unbounded<A>(), PubSub.shutdown)),
Effect.let("stream", ({ latest, pubsub }) => latest.pipe(
Effect.flatMap(a => Effect.map(
Stream.fromPubSub(pubsub, { scoped: true }),
s => Stream.concat(Stream.make(a), s),
)),
Stream.unwrapScoped,
)),
))
yield* Component.useReactEffect(() => Ref.set(latest, values).pipe(
Effect.andThen(PubSub.publish(pubsub, values)),
Effect.unlessEffect(PubSub.isShutdown(pubsub)),
), values)
return stream
})
export * from "effect/Stream" export * from "effect/Stream"

View File

@@ -4,6 +4,7 @@ export * as ErrorObserver from "./ErrorObserver.js"
export * as Form from "./Form.js" export * as Form from "./Form.js"
export * as Memoized from "./Memoized.js" export * as Memoized from "./Memoized.js"
export * as PropertyPath from "./PropertyPath.js" export * as PropertyPath from "./PropertyPath.js"
export * as PubSub from "./PubSub.js"
export * as ReactRuntime from "./ReactRuntime.js" export * as ReactRuntime from "./ReactRuntime.js"
export * as Result from "./Result.js" export * as Result from "./Result.js"
export * as SetStateAction from "./SetStateAction.js" export * as SetStateAction from "./SetStateAction.js"