|
|
|
|
@@ -1,4 +1,4 @@
|
|
|
|
|
import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, PubSub, pipe, type Queue, Ref, type Scope, type Subscribable, SubscriptionRef } from "effect"
|
|
|
|
|
import { Cause, Context, Data, Effect, Equal, Exit, type Fiber, Hash, Layer, Match, Option, Pipeable, Predicate, PubSub, pipe, Ref, type Scope, Stream, Subscribable } from "effect"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result")
|
|
|
|
|
@@ -186,78 +186,6 @@ export const makeProgressLayer = <A, E, P = never>(): Layer.Layer<
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export namespace forkEffectSubscriptionRef {
|
|
|
|
|
export type InputContext<R, P> = R extends Progress<infer X> ? [X] extends [P] ? R : never : R
|
|
|
|
|
export type OutputContext<R> = Scope.Scope | Exclude<R, Progress<any> | Progress<never>>
|
|
|
|
|
|
|
|
|
|
export interface Options<P> {
|
|
|
|
|
readonly initialProgress?: P
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export const forkEffectSubscriptionRef = <A, E, R, P = never>(
|
|
|
|
|
effect: Effect.Effect<A, E, forkEffectSubscriptionRef.InputContext<R, NoInfer<P>>>,
|
|
|
|
|
options?: forkEffectSubscriptionRef.Options<P>,
|
|
|
|
|
): Effect.Effect<
|
|
|
|
|
Subscribable.Subscribable<Result<A, E, P>>,
|
|
|
|
|
never,
|
|
|
|
|
forkEffectSubscriptionRef.OutputContext<R>
|
|
|
|
|
> => Effect.tap(
|
|
|
|
|
SubscriptionRef.make<Result<A, E, P>>(initial()),
|
|
|
|
|
ref => Effect.forkScoped(State<A, E, P>().pipe(
|
|
|
|
|
Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(
|
|
|
|
|
Effect.andThen(effect),
|
|
|
|
|
Effect.onExit(exit => state.set(fromExit(exit))),
|
|
|
|
|
)),
|
|
|
|
|
Effect.provide(Layer.empty.pipe(
|
|
|
|
|
Layer.provideMerge(makeProgressLayer<A, E, P>()),
|
|
|
|
|
Layer.provideMerge(Layer.succeed(State<A, E, P>(), {
|
|
|
|
|
get: ref,
|
|
|
|
|
set: v => Ref.set(ref, v),
|
|
|
|
|
})),
|
|
|
|
|
)),
|
|
|
|
|
)),
|
|
|
|
|
) as Effect.Effect<Subscribable.Subscribable<Result<A, E, P>>, never, Scope.Scope>
|
|
|
|
|
|
|
|
|
|
export namespace forkEffectPubSub {
|
|
|
|
|
export type InputContext<R, P> = R extends Progress<infer X> ? [X] extends [P] ? R : never : R
|
|
|
|
|
export type OutputContext<R> = Scope.Scope | Exclude<R, Progress<any> | Progress<never>>
|
|
|
|
|
|
|
|
|
|
export interface Options<P> {
|
|
|
|
|
readonly initialProgress?: P
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export const forkEffectPubSub = <A, E, R, P = never>(
|
|
|
|
|
effect: Effect.Effect<A, E, forkEffectPubSub.InputContext<R, NoInfer<P>>>,
|
|
|
|
|
options?: forkEffectPubSub.Options<P>,
|
|
|
|
|
): Effect.Effect<
|
|
|
|
|
Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>,
|
|
|
|
|
never,
|
|
|
|
|
forkEffectPubSub.OutputContext<R>
|
|
|
|
|
> => Effect.all([
|
|
|
|
|
Ref.make<Result<A, E, P>>(initial()),
|
|
|
|
|
PubSub.unbounded<Result<A, E, P>>(),
|
|
|
|
|
]).pipe(
|
|
|
|
|
Effect.tap(([ref, pubsub]) => Effect.forkScoped(State<A, E, P>().pipe(
|
|
|
|
|
Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(
|
|
|
|
|
Effect.andThen(effect),
|
|
|
|
|
Effect.onExit(exit => Effect.andThen(
|
|
|
|
|
state.set(fromExit(exit)),
|
|
|
|
|
Effect.forkScoped(PubSub.shutdown(pubsub)),
|
|
|
|
|
)),
|
|
|
|
|
)),
|
|
|
|
|
Effect.provide(Layer.empty.pipe(
|
|
|
|
|
Layer.provideMerge(makeProgressLayer<A, E, P>()),
|
|
|
|
|
Layer.provideMerge(Layer.succeed(State<A, E, P>(), {
|
|
|
|
|
get: ref,
|
|
|
|
|
set: v => Effect.andThen(Ref.set(ref, v), PubSub.publish(pubsub, v))
|
|
|
|
|
})),
|
|
|
|
|
)),
|
|
|
|
|
))),
|
|
|
|
|
Effect.map(([, pubsub]) => pubsub.subscribe),
|
|
|
|
|
) as Effect.Effect<Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>, never, Scope.Scope>
|
|
|
|
|
|
|
|
|
|
export namespace forkEffect {
|
|
|
|
|
export type InputContext<R, P> = R extends Progress<infer X> ? [X] extends [P] ? R : never : R
|
|
|
|
|
export type OutputContext<R> = Scope.Scope | Exclude<R, Progress<any> | Progress<never>>
|
|
|
|
|
@@ -270,11 +198,14 @@ export namespace forkEffect {
|
|
|
|
|
export const forkEffect = <A, E, R, P = never>(
|
|
|
|
|
effect: Effect.Effect<A, E, forkEffect.InputContext<R, NoInfer<P>>>,
|
|
|
|
|
options?: forkEffect.Options<P>,
|
|
|
|
|
) => Effect.all([
|
|
|
|
|
Ref.make<Result<A, E, P>>(initial()),
|
|
|
|
|
PubSub.unbounded<Result<A, E, P>>(),
|
|
|
|
|
]).pipe(
|
|
|
|
|
Effect.tap(([ref, pubsub]) => Effect.forkScoped(State<A, E, P>().pipe(
|
|
|
|
|
): Effect.Effect<
|
|
|
|
|
readonly [result: Subscribable.Subscribable<Result<A, E, P>, never, never>, fiber: Fiber.Fiber<A, E>],
|
|
|
|
|
never,
|
|
|
|
|
Scope.Scope | forkEffect.OutputContext<R>
|
|
|
|
|
> => Effect.Do.pipe(
|
|
|
|
|
Effect.bind("ref", () => Ref.make<Result<A, E, P>>(initial())),
|
|
|
|
|
Effect.bind("pubsub", () => PubSub.unbounded<Result<A, E, P>>()),
|
|
|
|
|
Effect.bind("fiber", ({ ref, pubsub }) => Effect.forkScoped(State<A, E, P>().pipe(
|
|
|
|
|
Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(
|
|
|
|
|
Effect.andThen(effect),
|
|
|
|
|
Effect.onExit(exit => Effect.andThen(
|
|
|
|
|
@@ -290,5 +221,18 @@ export const forkEffect = <A, E, R, P = never>(
|
|
|
|
|
})),
|
|
|
|
|
)),
|
|
|
|
|
))),
|
|
|
|
|
Effect.map(([, pubsub]) => pubsub.subscribe),
|
|
|
|
|
) as Effect.Effect<Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>, never, Scope.Scope>
|
|
|
|
|
Effect.map(({ ref, pubsub, fiber }) => [
|
|
|
|
|
Subscribable.make({
|
|
|
|
|
get: ref,
|
|
|
|
|
changes: Stream.unwrapScoped(Effect.map(
|
|
|
|
|
Effect.all([ref, Stream.fromPubSub(pubsub, { scoped: true })]),
|
|
|
|
|
([latest, stream]) => Stream.concat(Stream.make(latest), stream),
|
|
|
|
|
)),
|
|
|
|
|
}),
|
|
|
|
|
fiber,
|
|
|
|
|
]),
|
|
|
|
|
) as Effect.Effect<
|
|
|
|
|
readonly [result: Subscribable.Subscribable<Result<A, E, P>, never, never>, fiber: Fiber.Fiber<A, E>],
|
|
|
|
|
never,
|
|
|
|
|
Scope.Scope | forkEffect.OutputContext<R>
|
|
|
|
|
>
|
|
|
|
|
|