This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import { Cause, Context, Data, Effect, Equal, Exit, type Fiber, Hash, Layer, Match, pipe, Pipeable, Predicate, PubSub, type Scope, Stream, type Subscribable, SynchronizedRef } from "effect"
|
||||
import { Cause, Context, Data, Effect, Equal, Exit, type Fiber, Hash, Layer, Match, pipe, Pipeable, Predicate, PubSub, Ref, type Scope, Stream, type Subscribable, SynchronizedRef } from "effect"
|
||||
import { Lens } from "effect-lens"
|
||||
|
||||
|
||||
@@ -214,15 +214,24 @@ export const unsafeForkEffect = Effect.fnUntraced(function* <A, E, R, P = never>
|
||||
> {
|
||||
const ref = yield* SynchronizedRef.make<Result<A, E, P>>(options?.initial ?? initial<A, E, P>())
|
||||
const pubsub = yield* PubSub.unbounded<Result<A, E, P>>()
|
||||
|
||||
const state = Lens.make<Result<A, E, P>, never, never, never, never>({
|
||||
get get() { return ref.get },
|
||||
get get() { return Ref.get(ref) },
|
||||
get changes() {
|
||||
return Stream.unwrapScoped(Effect.map(
|
||||
Effect.all([ref.get, Stream.fromPubSub(pubsub, { scoped: true })]),
|
||||
Effect.all([Ref.get(ref), Stream.fromPubSub(pubsub, { scoped: true })]),
|
||||
([latest, stream]) => Stream.concat(Stream.make(latest), stream),
|
||||
))
|
||||
},
|
||||
modify: f => ref.modifyEffect(f),
|
||||
modify: <B, E1, R1>(
|
||||
f: (a: Result<A, E, P>) => Effect.Effect<readonly [B, Result<A, E, P>], E1, R1>
|
||||
): Effect.Effect<B, E1, R1> => Ref.get(ref).pipe(
|
||||
Effect.flatMap(f),
|
||||
Effect.flatMap(([b, a]) => Ref.set(ref, a).pipe(
|
||||
Effect.as(b),
|
||||
Effect.zipLeft(PubSub.publish(pubsub, a))
|
||||
)),
|
||||
),
|
||||
})
|
||||
|
||||
const fiber = yield* Effect.gen(function*() {
|
||||
|
||||
Reference in New Issue
Block a user