From 49d4bd4d433115117fb3ea0dac2d83f6bbc4ac61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Sun, 20 Apr 2025 00:22:24 +0200 Subject: [PATCH] SubscriptionSubRef work --- packages/reffuse/src/SubscriptionSubRef.ts | 76 +++++++++++++++++----- 1 file changed, 61 insertions(+), 15 deletions(-) diff --git a/packages/reffuse/src/SubscriptionSubRef.ts b/packages/reffuse/src/SubscriptionSubRef.ts index 7c671fa..dbc7f01 100644 --- a/packages/reffuse/src/SubscriptionSubRef.ts +++ b/packages/reffuse/src/SubscriptionSubRef.ts @@ -1,18 +1,64 @@ -import type { Stream, SubscriptionRef, SynchronizedRef, Unify } from "effect" +import { Effect, Effectable, PubSub, Readable, Ref, Stream, Subscribable, SubscriptionRef, SynchronizedRef } from "effect" -export interface SubscriptionSubRef extends SynchronizedRef.SynchronizedRef { - readonly parent: Ref.Ref - /** @internal */ - readonly pubsub: PubSub.PubSub - /** @internal */ - readonly semaphore: Effect.Semaphore - /** - * A stream containing the current value of the `Ref` as well as all changes - * to that value. - */ - readonly changes: Stream.Stream - readonly [Unify.typeSymbol]?: unknown - readonly [Unify.unifySymbol]?: SubscriptionRefUnify - readonly [Unify.ignoreSymbol]?: SubscriptionRefUnifyIgnore +export interface SubscriptionSubRef extends SubscriptionRef.SubscriptionRef { + /** + * A stream containing the current value of the `Ref` as well as all changes + * to that value. + */ + readonly changes: Stream.Stream +} + +const synchronizedRefVariance = { _A: (_: any) => _ } +const subscriptionRefVariance = { _A: (_: any) => _ } + +class SubscriptionSubRefImpl extends Effectable.Class implements SubscriptionSubRef { + readonly [Readable.TypeId]: Readable.TypeId = Readable.TypeId + readonly [Subscribable.TypeId]: Subscribable.TypeId = Subscribable.TypeId + readonly [Ref.RefTypeId]: Ref.Ref.Variance[Ref.RefTypeId] = { _A: (_: any) => _ } + readonly [SynchronizedRef.SynchronizedRefTypeId]: SynchronizedRef.SynchronizedRef.Variance[SynchronizedRef.SynchronizedRefTypeId] = synchronizedRefVariance + readonly [SubscriptionRef.SubscriptionRefTypeId]: SubscriptionRef.SubscriptionRef.Variance[SubscriptionRef.SubscriptionRefTypeId] = subscriptionRefVariance + + constructor( + readonly ref: Ref.Ref, + readonly pubsub: PubSub.PubSub, + readonly semaphore: Effect.Semaphore, + ) { + super() + this.get = Ref.get(this.ref) + } + + commit() { + return this.get + } + + readonly get: Effect.Effect + + get changes(): Stream.Stream { + return Ref.get(this.ref).pipe( + Effect.flatMap(a => Effect.map( + Stream.fromPubSub(this.pubsub, { scoped: true }), + s => Stream.concat(Stream.make(a), s), + )), + + this.semaphore.withPermits(1), + Stream.unwrapScoped, + ) + } + + modify(f: (a: A) => readonly [B, A]): Effect.Effect { + return this.modifyEffect((a) => Effect.succeed(f(a))) + } + + modifyEffect(f: (a: A) => Effect.Effect): Effect.Effect { + return Ref.get(this.ref).pipe( + Effect.flatMap(f), + Effect.flatMap(([b, a]) => Ref.set(this.ref, a).pipe( + Effect.as(b), + Effect.zipLeft(PubSub.publish(this.pubsub, a)) + )), + + this.semaphore.withPermits(1) + ) + } }