@@ -1,4 +1,4 @@
|
|||||||
import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, pipe, Queue, Ref, Scope } from "effect"
|
import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, pipe, Queue, Ref, type Scope, type Subscribable, SubscriptionRef } from "effect"
|
||||||
|
|
||||||
|
|
||||||
export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result")
|
export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result")
|
||||||
@@ -142,38 +142,47 @@ export const toExit = <A, E, P>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
export interface State<A, E = never, P = never> {
|
||||||
|
readonly get: Effect.Effect<Result<A, E, P>>
|
||||||
|
readonly set: (v: Result<A, E, P>) => Effect.Effect<void>
|
||||||
|
}
|
||||||
|
|
||||||
|
export const State = <A, E = never, P = never>(): Context.Tag<State<A, E, P>, State<A, E, P>> => Context.GenericTag("@effect-fc/Result/State")
|
||||||
|
|
||||||
export interface Progress<P = never> {
|
export interface Progress<P = never> {
|
||||||
readonly update: <E, R>(
|
readonly update: <E, R>(
|
||||||
f: (previous: P) => Effect.Effect<P, E, R>
|
f: (previous: P) => Effect.Effect<P, E, R>
|
||||||
) => Effect.Effect<void, PreviousResultNotRunningOrRefreshing | E, R>
|
) => Effect.Effect<void, PreviousResultNotRunningNorRefreshing | E, R>
|
||||||
}
|
}
|
||||||
|
|
||||||
export class PreviousResultNotRunningOrRefreshing extends Data.TaggedError("@effect-fc/Result/PreviousResultNotRunningOrRefreshing")<{
|
export class PreviousResultNotRunningNorRefreshing extends Data.TaggedError("@effect-fc/Result/PreviousResultNotRunningNorRefreshing")<{
|
||||||
readonly previous: Result<unknown, unknown, unknown>
|
readonly previous: Result<unknown, unknown, unknown>
|
||||||
}> {}
|
}> {}
|
||||||
|
|
||||||
export const Progress = <P = never>(): Context.Tag<Progress<P>, Progress<P>> => Context.GenericTag("@effect-fc/Result/Progress")
|
export const Progress = <P = never>(): Context.Tag<Progress<P>, Progress<P>> => Context.GenericTag("@effect-fc/Result/Progress")
|
||||||
|
|
||||||
export const makeProgressLayer = <A, E, P = never>(
|
export const makeProgressLayer = <A, E, P = never>(): Layer.Layer<
|
||||||
queue: Queue.Enqueue<Result<A, E, P>>,
|
Progress<P>,
|
||||||
ref: Ref.Ref<Result<A, E, P>>,
|
never,
|
||||||
): Layer.Layer<Progress<P>> => Layer.sync(Progress<P>(), () => ({
|
State<A, E, P>
|
||||||
update: <E, R>(f: (previous: P) => Effect.Effect<P, E, R>) => Effect.Do.pipe(
|
> => Layer.effect(Progress<P>(), Effect.gen(function*() {
|
||||||
Effect.bind("previous", () => Effect.andThen(
|
const state = yield* State<A, E, P>()
|
||||||
ref,
|
|
||||||
previous => isRunning(previous) || isRefreshing(previous)
|
return {
|
||||||
? Effect.succeed(previous)
|
update: <E, R>(f: (previous: P) => Effect.Effect<P, E, R>) => Effect.Do.pipe(
|
||||||
: Effect.fail(new PreviousResultNotRunningOrRefreshing({ previous })),
|
Effect.bind("previous", () => Effect.andThen(state.get, previous =>
|
||||||
)),
|
isRunning(previous) || isRefreshing(previous)
|
||||||
Effect.bind("progress", ({ previous }) => f(previous.progress)),
|
? Effect.succeed(previous)
|
||||||
Effect.let("next", ({ previous, progress }) => Object.setPrototypeOf(
|
: Effect.fail(new PreviousResultNotRunningNorRefreshing({ previous })),
|
||||||
Object.assign({}, previous, { progress }),
|
)),
|
||||||
Object.getPrototypeOf(previous),
|
Effect.bind("progress", ({ previous }) => f(previous.progress)),
|
||||||
)),
|
Effect.let("next", ({ previous, progress }) => Object.setPrototypeOf(
|
||||||
Effect.tap(({ next }) => Ref.set(ref, next)),
|
Object.assign({}, previous, { progress }),
|
||||||
Effect.tap(({ next }) => Queue.offer(queue, next)),
|
Object.getPrototypeOf(previous),
|
||||||
Effect.asVoid,
|
)),
|
||||||
),
|
Effect.andThen(({ next }) => state.set(next)),
|
||||||
|
),
|
||||||
|
}
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
|
||||||
@@ -190,75 +199,55 @@ export const forkEffect = <A, E, R, P = never>(
|
|||||||
effect: Effect.Effect<A, E, forkEffect.InputContext<R, NoInfer<P>>>,
|
effect: Effect.Effect<A, E, forkEffect.InputContext<R, NoInfer<P>>>,
|
||||||
options?: forkEffect.Options<P>,
|
options?: forkEffect.Options<P>,
|
||||||
): Effect.Effect<
|
): Effect.Effect<
|
||||||
Queue.Dequeue<Result<A, E, P>>,
|
Subscribable.Subscribable<Result<A, E, P>>,
|
||||||
never,
|
never,
|
||||||
forkEffect.OutputContext<R>
|
forkEffect.OutputContext<R>
|
||||||
> => Effect.Do.pipe(
|
> => Effect.tap(
|
||||||
Effect.bind("scope", () => Scope.Scope),
|
SubscriptionRef.make<Result<A, E, P>>(initial()),
|
||||||
Effect.bind("queue", () => Queue.unbounded<Result<A, E, P>>()),
|
ref => Effect.forkScoped(State<A, E, P>().pipe(
|
||||||
Effect.bind("ref", () => Ref.make<Result<A, E, P>>(initial())),
|
Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(
|
||||||
Effect.tap(({ queue, ref }) => Effect.andThen(ref, v => Queue.offer(queue, v))),
|
Effect.andThen(effect),
|
||||||
Effect.tap(({ scope, queue, ref }) => Effect.forkScoped(
|
Effect.onExit(exit => state.set(fromExit(exit))),
|
||||||
Effect.addFinalizer(() => Queue.shutdown(queue)).pipe(
|
)),
|
||||||
Effect.andThen(Effect.succeed(running(options?.initialProgress)).pipe(
|
Effect.provide(Layer.empty.pipe(
|
||||||
Effect.tap(v => Ref.set(ref, v)),
|
Layer.provideMerge(makeProgressLayer<A, E, P>()),
|
||||||
Effect.tap(v => Queue.offer(queue, v)),
|
Layer.provideMerge(Layer.succeed(State<A, E, P>(), {
|
||||||
)),
|
get: ref,
|
||||||
Effect.andThen(Effect.provideService(effect, Scope.Scope, scope)),
|
set: v => Ref.set(ref, v),
|
||||||
Effect.exit,
|
})),
|
||||||
Effect.andThen(exit => Effect.succeed(fromExit(exit)).pipe(
|
)),
|
||||||
Effect.tap(v => Ref.set(ref, v)),
|
|
||||||
Effect.tap(v => Queue.offer(queue, v)),
|
|
||||||
)),
|
|
||||||
Effect.scoped,
|
|
||||||
Effect.provide(makeProgressLayer(queue, ref)),
|
|
||||||
)
|
|
||||||
)),
|
)),
|
||||||
Effect.map(({ queue }) => queue),
|
) as Effect.Effect<Subscribable.Subscribable<Result<A, E, P>>, never, Scope.Scope>
|
||||||
) as Effect.Effect<Queue.Queue<Result<A, E, P>>, never, Scope.Scope>
|
|
||||||
|
|
||||||
export namespace forkEffectScoped {
|
export namespace forkEffectDequeue {
|
||||||
export type InputContext<R, P> = (R extends Progress<infer X>
|
export type InputContext<R, P> = forkEffect.InputContext<R, P>
|
||||||
? [X] extends [P]
|
export type OutputContext<R> = forkEffect.OutputContext<R>
|
||||||
? R
|
export interface Options<P> extends forkEffect.Options<P> {}
|
||||||
: never
|
|
||||||
: R
|
|
||||||
)
|
|
||||||
|
|
||||||
export interface Options<P> {
|
|
||||||
readonly initialProgress?: P
|
|
||||||
}
|
|
||||||
|
|
||||||
export type OutputContext<R> = Scope.Scope | Exclude<R, Progress<any> | Progress<never>>
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export const forkEffectScoped = <A, E, R, P = never>(
|
export const forkEffectDequeue = <A, E, R, P = never>(
|
||||||
effect: Effect.Effect<A, E, forkEffectScoped.InputContext<R, NoInfer<P>>>,
|
effect: Effect.Effect<A, E, forkEffectDequeue.InputContext<R, NoInfer<P>>>,
|
||||||
options?: forkEffectScoped.Options<P>,
|
options?: forkEffectDequeue.Options<P>,
|
||||||
): Effect.Effect<
|
): Effect.Effect<
|
||||||
Queue.Dequeue<Result<A, E, P>>,
|
Queue.Dequeue<Result<A, E, P>>,
|
||||||
never,
|
never,
|
||||||
forkEffectScoped.OutputContext<R>
|
forkEffectDequeue.OutputContext<R>
|
||||||
> => Effect.Do.pipe(
|
> => Effect.all([
|
||||||
Effect.bind("scope", () => Scope.Scope),
|
Ref.make<Result<A, E, P>>(initial()),
|
||||||
Effect.bind("queue", () => Queue.unbounded<Result<A, E, P>>()),
|
Queue.unbounded<Result<A, E, P>>(),
|
||||||
Effect.bind("ref", () => Ref.make<Result<A, E, P>>(initial())),
|
]).pipe(
|
||||||
Effect.tap(({ queue, ref }) => Effect.andThen(ref, v => Queue.offer(queue, v))),
|
Effect.tap(([ref, queue]) => Effect.forkScoped(State<A, E, P>().pipe(
|
||||||
Effect.tap(({ scope, queue, ref }) => Effect.forkScoped(
|
Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(
|
||||||
Effect.addFinalizer(() => Queue.shutdown(queue)).pipe(
|
Effect.andThen(effect),
|
||||||
Effect.andThen(Effect.succeed(running(options?.initialProgress)).pipe(
|
Effect.onExit(exit => Effect.andThen(state.set(fromExit(exit)), Queue.shutdown(queue))),
|
||||||
Effect.tap(v => Ref.set(ref, v)),
|
)),
|
||||||
Effect.tap(v => Queue.offer(queue, v)),
|
Effect.provide(Layer.empty.pipe(
|
||||||
)),
|
Layer.provideMerge(makeProgressLayer<A, E, P>()),
|
||||||
Effect.andThen(Effect.provideService(effect, Scope.Scope, scope)),
|
Layer.provideMerge(Layer.succeed(State<A, E, P>(), {
|
||||||
Effect.exit,
|
get: ref,
|
||||||
Effect.andThen(exit => Effect.succeed(fromExit(exit)).pipe(
|
set: v => Effect.andThen(Ref.set(ref, v), Queue.offer(queue, v))
|
||||||
Effect.tap(v => Ref.set(ref, v)),
|
})),
|
||||||
Effect.tap(v => Queue.offer(queue, v)),
|
)),
|
||||||
)),
|
))),
|
||||||
Effect.scoped,
|
Effect.map(([, queue]) => queue),
|
||||||
Effect.provide(makeProgressLayer(queue, ref)),
|
) as Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>
|
||||||
)
|
|
||||||
)),
|
|
||||||
Effect.map(({ queue }) => queue),
|
|
||||||
) as Effect.Effect<Queue.Queue<Result<A, E, P>>, never, Scope.Scope>
|
|
||||||
|
|||||||
Reference in New Issue
Block a user