0.1.8 #11
@@ -1,7 +1,9 @@
|
|||||||
import { Effect, Effectable, PubSub, Readable, Ref, Stream, Subscribable, SubscriptionRef, SynchronizedRef } from "effect"
|
import { Effect, Effectable, Readable, Ref, Stream, Subscribable, SubscriptionRef, SynchronizedRef } from "effect"
|
||||||
|
|
||||||
|
|
||||||
export interface SubscriptionSubRef<in out A> extends SubscriptionRef.SubscriptionRef<A> {
|
export interface SubscriptionSubRef<in out A, in out B> extends SubscriptionRef.SubscriptionRef<A> {
|
||||||
|
readonly ref: SubscriptionRef.SubscriptionRef<B>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A stream containing the current value of the `Ref` as well as all changes
|
* A stream containing the current value of the `Ref` as well as all changes
|
||||||
* to that value.
|
* to that value.
|
||||||
@@ -9,56 +11,59 @@ export interface SubscriptionSubRef<in out A> extends SubscriptionRef.Subscripti
|
|||||||
readonly changes: Stream.Stream<A>
|
readonly changes: Stream.Stream<A>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
const synchronizedRefVariance = { _A: (_: any) => _ }
|
const synchronizedRefVariance = { _A: (_: any) => _ }
|
||||||
const subscriptionRefVariance = { _A: (_: any) => _ }
|
const subscriptionRefVariance = { _A: (_: any) => _ }
|
||||||
|
|
||||||
class SubscriptionSubRefImpl<in out A> extends Effectable.Class<A> implements SubscriptionSubRef<A> {
|
class SubscriptionSubRefImpl<in out A, in out B> extends Effectable.Class<A> implements SubscriptionSubRef<A, B> {
|
||||||
readonly [Readable.TypeId]: Readable.TypeId = Readable.TypeId
|
readonly [Readable.TypeId]: Readable.TypeId = Readable.TypeId
|
||||||
readonly [Subscribable.TypeId]: Subscribable.TypeId = Subscribable.TypeId
|
readonly [Subscribable.TypeId]: Subscribable.TypeId = Subscribable.TypeId
|
||||||
readonly [Ref.RefTypeId]: Ref.Ref.Variance<A>[Ref.RefTypeId] = { _A: (_: any) => _ }
|
readonly [Ref.RefTypeId]: Ref.Ref.Variance<A>[Ref.RefTypeId] = { _A: (_: any) => _ }
|
||||||
readonly [SynchronizedRef.SynchronizedRefTypeId]: SynchronizedRef.SynchronizedRef.Variance<A>[SynchronizedRef.SynchronizedRefTypeId] = synchronizedRefVariance
|
readonly [SynchronizedRef.SynchronizedRefTypeId]: SynchronizedRef.SynchronizedRef.Variance<A>[SynchronizedRef.SynchronizedRefTypeId] = synchronizedRefVariance
|
||||||
readonly [SubscriptionRef.SubscriptionRefTypeId]: SubscriptionRef.SubscriptionRef.Variance<A>[SubscriptionRef.SubscriptionRefTypeId] = subscriptionRefVariance
|
readonly [SubscriptionRef.SubscriptionRefTypeId]: SubscriptionRef.SubscriptionRef.Variance<A>[SubscriptionRef.SubscriptionRefTypeId] = subscriptionRefVariance
|
||||||
|
|
||||||
|
readonly get: Effect.Effect<A>
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
readonly ref: Ref.Ref<A>,
|
readonly ref: SubscriptionRef.SubscriptionRef<B>,
|
||||||
readonly pubsub: PubSub.PubSub<A>,
|
readonly select: (value: B) => A,
|
||||||
readonly semaphore: Effect.Semaphore,
|
readonly setter: (value: A) => B,
|
||||||
) {
|
) {
|
||||||
super()
|
super()
|
||||||
this.get = Ref.get(this.ref)
|
this.get = Ref.get(this.ref).pipe(Effect.map(this.select))
|
||||||
}
|
}
|
||||||
|
|
||||||
commit() {
|
commit() {
|
||||||
return this.get
|
return this.get
|
||||||
}
|
}
|
||||||
|
|
||||||
readonly get: Effect.Effect<A>
|
|
||||||
|
|
||||||
get changes(): Stream.Stream<A> {
|
get changes(): Stream.Stream<A> {
|
||||||
return Ref.get(this.ref).pipe(
|
return this.get.pipe(
|
||||||
Effect.flatMap(a => Effect.map(
|
Effect.map(a => this.ref.changes.pipe(
|
||||||
Stream.fromPubSub(this.pubsub, { scoped: true }),
|
Stream.map(this.select),
|
||||||
s => Stream.concat(Stream.make(a), s),
|
s => Stream.concat(Stream.make(a), s),
|
||||||
)),
|
)),
|
||||||
|
Stream.unwrap,
|
||||||
this.semaphore.withPermits(1),
|
|
||||||
Stream.unwrapScoped,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
modify<B>(f: (a: A) => readonly [B, A]): Effect.Effect<B> {
|
modify<C>(f: (a: A) => readonly [C, A]): Effect.Effect<C> {
|
||||||
return this.modifyEffect((a) => Effect.succeed(f(a)))
|
return this.modifyEffect(a => Effect.succeed(f(a)))
|
||||||
}
|
}
|
||||||
|
|
||||||
modifyEffect<B, E, R>(f: (a: A) => Effect.Effect<readonly [B, A], E, R>): Effect.Effect<B, E, R> {
|
modifyEffect<C, E, R>(f: (a: A) => Effect.Effect<readonly [C, A], E, R>): Effect.Effect<C, E, R> {
|
||||||
return Ref.get(this.ref).pipe(
|
return this.get.pipe(
|
||||||
Effect.flatMap(f),
|
Effect.flatMap(f),
|
||||||
Effect.flatMap(([b, a]) => Ref.set(this.ref, a).pipe(
|
Effect.flatMap(([b, a]) => Ref.set(this.ref, this.setter(a)).pipe(
|
||||||
Effect.as(b),
|
Effect.as(b)
|
||||||
Effect.zipLeft(PubSub.publish(this.pubsub, a))
|
|
||||||
)),
|
)),
|
||||||
|
|
||||||
this.semaphore.withPermits(1)
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
export const make = <A, B>(
|
||||||
|
ref: SubscriptionRef.SubscriptionRef<B>,
|
||||||
|
select: (value: B) => A,
|
||||||
|
setter: (value: A) => B,
|
||||||
|
): SubscriptionSubRef<A, B> => new SubscriptionSubRefImpl(ref, select, setter)
|
||||||
|
|||||||
@@ -4,3 +4,4 @@ export * as ReffuseExtension from "./ReffuseExtension.js"
|
|||||||
export * as ReffuseNamespace from "./ReffuseNamespace.js"
|
export * as ReffuseNamespace from "./ReffuseNamespace.js"
|
||||||
export * as ReffuseRuntime from "./ReffuseRuntime.js"
|
export * as ReffuseRuntime from "./ReffuseRuntime.js"
|
||||||
export * as SetStateAction from "./SetStateAction.js"
|
export * as SetStateAction from "./SetStateAction.js"
|
||||||
|
export * as SubscriptionSubRef from "./SubscriptionSubRef.js"
|
||||||
|
|||||||
Reference in New Issue
Block a user