From f78b9f318a5f3fc9d400cb0f134fae81eaa9046c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Sat, 15 Nov 2025 01:19:14 +0100 Subject: [PATCH] Add PubSub module --- packages/effect-fc/src/PubSub.ts | 14 ++++++++++++++ packages/effect-fc/src/Stream.ts | 25 +------------------------ packages/effect-fc/src/index.ts | 1 + 3 files changed, 16 insertions(+), 24 deletions(-) create mode 100644 packages/effect-fc/src/PubSub.ts diff --git a/packages/effect-fc/src/PubSub.ts b/packages/effect-fc/src/PubSub.ts new file mode 100644 index 0000000..36eccbe --- /dev/null +++ b/packages/effect-fc/src/PubSub.ts @@ -0,0 +1,14 @@ +import { Effect, PubSub, type Scope } from "effect" +import type * as React from "react" +import * as Component from "./Component.js" + + +export const usePubSubFromReactiveValues = Effect.fnUntraced(function* ( + values: A +): Effect.fn.Return, never, Scope.Scope> { + const pubsub = yield* Component.useOnMount(() => Effect.acquireRelease(PubSub.unbounded(), PubSub.shutdown)) + yield* Component.useReactEffect(() => Effect.unlessEffect(PubSub.publish(pubsub, values), PubSub.isShutdown(pubsub)), values) + return pubsub +}) + +export * from "effect/PubSub" diff --git a/packages/effect-fc/src/Stream.ts b/packages/effect-fc/src/Stream.ts index 71cba72..4eab704 100644 --- a/packages/effect-fc/src/Stream.ts +++ b/packages/effect-fc/src/Stream.ts @@ -1,4 +1,4 @@ -import { Effect, Equivalence, Option, PubSub, Ref, type Scope, Stream } from "effect" +import { Effect, Equivalence, Option, Stream } from "effect" import * as React from "react" import * as Component from "./Component.js" @@ -30,27 +30,4 @@ export const useStream: { return reactStateValue as Option.Some }) -export const useStreamFromReactiveValues = Effect.fnUntraced(function* ( - values: A -): Effect.fn.Return, never, Scope.Scope> { - const { latest, pubsub, stream } = yield* Component.useOnMount(() => Effect.Do.pipe( - Effect.bind("latest", () => Ref.make(values)), - Effect.bind("pubsub", () => Effect.acquireRelease(PubSub.unbounded(), PubSub.shutdown)), - Effect.let("stream", ({ latest, pubsub }) => latest.pipe( - Effect.flatMap(a => Effect.map( - Stream.fromPubSub(pubsub, { scoped: true }), - s => Stream.concat(Stream.make(a), s), - )), - Stream.unwrapScoped, - )), - )) - - yield* Component.useReactEffect(() => Ref.set(latest, values).pipe( - Effect.andThen(PubSub.publish(pubsub, values)), - Effect.unlessEffect(PubSub.isShutdown(pubsub)), - ), values) - - return stream -}) - export * from "effect/Stream" diff --git a/packages/effect-fc/src/index.ts b/packages/effect-fc/src/index.ts index 0b07c23..08e0b73 100644 --- a/packages/effect-fc/src/index.ts +++ b/packages/effect-fc/src/index.ts @@ -4,6 +4,7 @@ export * as ErrorObserver from "./ErrorObserver.js" export * as Form from "./Form.js" export * as Memoized from "./Memoized.js" export * as PropertyPath from "./PropertyPath.js" +export * as PubSub from "./PubSub.js" export * as ReactRuntime from "./ReactRuntime.js" export * as Result from "./Result.js" export * as SetStateAction from "./SetStateAction.js"