0.2.1 #26
@@ -1,4 +1,4 @@
|
|||||||
import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, PubSub, pipe, Queue, Ref, type Scope, type Subscribable, SubscriptionRef } from "effect"
|
import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, PubSub, pipe, type Queue, Ref, type Scope, type Subscribable, SubscriptionRef } from "effect"
|
||||||
|
|
||||||
|
|
||||||
export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result")
|
export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result")
|
||||||
@@ -226,12 +226,12 @@ export namespace forkEffectPubSub {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export const forkEffectPubSub = <A, E, R, P = never>(
|
export const forkEffectPubSub = <A, E, R, P = never>(
|
||||||
effect: Effect.Effect<A, E, forkEffectDequeue.InputContext<R, NoInfer<P>>>,
|
effect: Effect.Effect<A, E, forkEffectPubSub.InputContext<R, NoInfer<P>>>,
|
||||||
options?: forkEffectDequeue.Options<P>,
|
options?: forkEffectPubSub.Options<P>,
|
||||||
): Effect.Effect<
|
): Effect.Effect<
|
||||||
Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>,
|
Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>,
|
||||||
never,
|
never,
|
||||||
forkEffectDequeue.OutputContext<R>
|
forkEffectPubSub.OutputContext<R>
|
||||||
> => Effect.all([
|
> => Effect.all([
|
||||||
Ref.make<Result<A, E, P>>(initial()),
|
Ref.make<Result<A, E, P>>(initial()),
|
||||||
PubSub.unbounded<Result<A, E, P>>(),
|
PubSub.unbounded<Result<A, E, P>>(),
|
||||||
@@ -254,39 +254,3 @@ export const forkEffectPubSub = <A, E, R, P = never>(
|
|||||||
))),
|
))),
|
||||||
Effect.map(([, pubsub]) => pubsub.subscribe),
|
Effect.map(([, pubsub]) => pubsub.subscribe),
|
||||||
) as Effect.Effect<Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>, never, Scope.Scope>
|
) as Effect.Effect<Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>, never, Scope.Scope>
|
||||||
|
|
||||||
export namespace forkEffectDequeue {
|
|
||||||
export type InputContext<R, P> = forkEffect.InputContext<R, P>
|
|
||||||
export type OutputContext<R> = forkEffect.OutputContext<R>
|
|
||||||
export interface Options<P> extends forkEffect.Options<P> {}
|
|
||||||
}
|
|
||||||
|
|
||||||
export const forkEffectDequeue = <A, E, R, P = never>(
|
|
||||||
effect: Effect.Effect<A, E, forkEffectDequeue.InputContext<R, NoInfer<P>>>,
|
|
||||||
options?: forkEffectDequeue.Options<P>,
|
|
||||||
): Effect.Effect<
|
|
||||||
Queue.Dequeue<Result<A, E, P>>,
|
|
||||||
never,
|
|
||||||
forkEffectDequeue.OutputContext<R>
|
|
||||||
> => Effect.all([
|
|
||||||
Ref.make<Result<A, E, P>>(initial()),
|
|
||||||
Queue.unbounded<Result<A, E, P>>(),
|
|
||||||
]).pipe(
|
|
||||||
Effect.tap(([ref, queue]) => Effect.forkScoped(State<A, E, P>().pipe(
|
|
||||||
Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(
|
|
||||||
Effect.andThen(effect),
|
|
||||||
Effect.onExit(exit => Effect.andThen(
|
|
||||||
state.set(fromExit(exit)),
|
|
||||||
Effect.forkScoped(Queue.shutdown(queue)),
|
|
||||||
)),
|
|
||||||
)),
|
|
||||||
Effect.provide(Layer.empty.pipe(
|
|
||||||
Layer.provideMerge(makeProgressLayer<A, E, P>()),
|
|
||||||
Layer.provideMerge(Layer.succeed(State<A, E, P>(), {
|
|
||||||
get: ref,
|
|
||||||
set: v => Effect.andThen(Ref.set(ref, v), Queue.offer(queue, v))
|
|
||||||
})),
|
|
||||||
)),
|
|
||||||
))),
|
|
||||||
Effect.map(([, queue]) => queue),
|
|
||||||
) as Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>
|
|
||||||
|
|||||||
Reference in New Issue
Block a user