This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import { Cause, Context, Data, Effect, Equal, Exit, type Fiber, Hash, Layer, Match, pipe, Pipeable, Predicate, PubSub, Ref, type Scope, Stream, Subscribable } from "effect"
|
||||
import { Cause, Context, Data, Effect, Equal, Exit, type Fiber, Hash, Layer, Match, pipe, Pipeable, Predicate, PubSub, type Scope, Stream, type Subscribable, SynchronizedRef } from "effect"
|
||||
import { Lens } from "effect-lens"
|
||||
|
||||
|
||||
export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result")
|
||||
@@ -162,52 +163,36 @@ export const toExit: {
|
||||
}
|
||||
|
||||
|
||||
export interface State<A, E = never, P = never> {
|
||||
readonly get: Effect.Effect<Result<A, E, P>>
|
||||
readonly set: (v: Result<A, E, P>) => Effect.Effect<void>
|
||||
}
|
||||
|
||||
export const State = <A, E = never, P = never>(): Context.Tag<State<A, E, P>, State<A, E, P>> => Context.GenericTag("@effect-fc/Result/State")
|
||||
|
||||
export interface Progress<P = never> {
|
||||
readonly update: <E, R>(
|
||||
f: (previous: P) => Effect.Effect<P, E, R>
|
||||
) => Effect.Effect<void, PreviousResultNotRunningNorRefreshing | E, R>
|
||||
readonly progress: Lens.Lens<P, PreviousResultNotRunningNorRefreshing, never, never, never>
|
||||
}
|
||||
export const Progress = <P = never>(): Context.Tag<Progress<P>, Progress<P>> => Context.GenericTag("@effect-fc/Result/Progress")
|
||||
|
||||
export class PreviousResultNotRunningNorRefreshing extends Data.TaggedError("@effect-fc/Result/PreviousResultNotRunningNorRefreshing")<{
|
||||
readonly previous: Result<unknown, unknown, unknown>
|
||||
}> {}
|
||||
|
||||
export const Progress = <P = never>(): Context.Tag<Progress<P>, Progress<P>> => Context.GenericTag("@effect-fc/Result/Progress")
|
||||
|
||||
export const makeProgressLayer = <A, E, P = never>(): Layer.Layer<
|
||||
Progress<P>,
|
||||
never,
|
||||
State<A, E, P>
|
||||
> => Layer.effect(Progress<P>(), Effect.gen(function*() {
|
||||
const state = yield* State<A, E, P>()
|
||||
|
||||
return {
|
||||
update: <FE, FR>(f: (previous: P) => Effect.Effect<P, FE, FR>) => Effect.Do.pipe(
|
||||
Effect.bind("previous", () => Effect.andThen(state.get, previous =>
|
||||
(isRunning(previous) || hasRefreshingFlag(previous))
|
||||
? Effect.succeed(previous)
|
||||
: Effect.fail(new PreviousResultNotRunningNorRefreshing({ previous })),
|
||||
)),
|
||||
Effect.bind("progress", ({ previous }) => f(previous.progress)),
|
||||
Effect.let("next", ({ previous, progress }) => isRunning(previous)
|
||||
? running(progress)
|
||||
: refreshing(previous, progress) as Final<A, E, P> & Refreshing<P>
|
||||
),
|
||||
Effect.andThen(({ next }) => state.set(next)),
|
||||
),
|
||||
}
|
||||
}))
|
||||
export const makeProgressLayer = <A, E, P = never>(
|
||||
state: Lens.Lens<Result<A, E, P>, never, never, never, never>
|
||||
): Layer.Layer<Progress<P> | Progress<never>, never, never> => Layer.effect(
|
||||
Progress<P>() as Context.Tag<Progress<P> | Progress<never>, Progress<P> | Progress<never>>,
|
||||
Effect.gen(function*() {
|
||||
const lens = Lens.mapEffect(
|
||||
state,
|
||||
a => (isRunning(a) || hasRefreshingFlag(a))
|
||||
? Effect.succeed(a.progress)
|
||||
: Effect.fail(new PreviousResultNotRunningNorRefreshing({ previous: a })),
|
||||
(a, b) => isRunning(a)
|
||||
? Effect.succeed(running(b))
|
||||
: Effect.succeed(refreshing(a, b) as Final<A, E, P> & Refreshing<P>),
|
||||
)
|
||||
return { progress: lens } as any
|
||||
}),
|
||||
)
|
||||
|
||||
|
||||
export namespace unsafeForkEffect {
|
||||
export type OutputContext<A, E, R, P> = Exclude<R, State<A, E, P> | Progress<P> | Progress<never>>
|
||||
export type OutputContext<R, P> = Exclude<R, Progress<P> | Progress<never>>
|
||||
|
||||
export interface Options<A, E, P> {
|
||||
readonly initial?: Initial | Final<A, E, P>
|
||||
@@ -221,46 +206,43 @@ export const unsafeForkEffect = Effect.fnUntraced(function* <A, E, R, P = never>
|
||||
): Effect.fn.Return<
|
||||
readonly [result: Subscribable.Subscribable<Result<A, E, P>, never, never>, fiber: Fiber.Fiber<A, E>],
|
||||
never,
|
||||
Scope.Scope | unsafeForkEffect.OutputContext<A, E, R, P>
|
||||
Scope.Scope | unsafeForkEffect.OutputContext<R, P>
|
||||
> {
|
||||
const ref = yield* Ref.make(options?.initial ?? initial<A, E, P>())
|
||||
const ref = yield* SynchronizedRef.make<Result<A, E, P>>(options?.initial ?? initial<A, E, P>())
|
||||
const pubsub = yield* PubSub.unbounded<Result<A, E, P>>()
|
||||
const fiber = yield* (Effect.forkScoped(State<A, E, P>().pipe(
|
||||
Effect.andThen(state => state.set(
|
||||
const state = Lens.make<Result<A, E, P>, never, never, never, never>({
|
||||
get get() { return ref.get },
|
||||
get changes() {
|
||||
return Stream.unwrapScoped(Effect.map(
|
||||
Effect.all([ref.get, Stream.fromPubSub(pubsub, { scoped: true })]),
|
||||
([latest, stream]) => Stream.concat(Stream.make(latest), stream),
|
||||
))
|
||||
},
|
||||
modify: f => ref.modifyEffect(f),
|
||||
})
|
||||
|
||||
const fiber = yield* Effect.gen(function*() {
|
||||
yield* Lens.set(
|
||||
state,
|
||||
(isFinal(options?.initial) && hasWillRefreshFlag(options?.initial))
|
||||
? refreshing(options.initial, options?.initialProgress) as Result<A, E, P>
|
||||
: running(options?.initialProgress)
|
||||
).pipe(
|
||||
Effect.andThen(effect),
|
||||
Effect.onExit(exit => Effect.andThen(
|
||||
state.set(fromExit(exit)),
|
||||
Effect.forkScoped(PubSub.shutdown(pubsub)),
|
||||
)),
|
||||
)),
|
||||
Effect.provide(Layer.empty.pipe(
|
||||
Layer.provideMerge(makeProgressLayer<A, E, P>()),
|
||||
Layer.provideMerge(Layer.succeed(State<A, E, P>(), {
|
||||
get: Ref.get(ref),
|
||||
set: v => Effect.andThen(Ref.set(ref, v), PubSub.publish(pubsub, v))
|
||||
})),
|
||||
)),
|
||||
)) as Effect.Effect<Fiber.Fiber<A, E>, never, Scope.Scope | unsafeForkEffect.OutputContext<A, E, R, P>>)
|
||||
: running(options?.initialProgress),
|
||||
)
|
||||
return yield* Effect.onExit(effect, exit => Effect.andThen(
|
||||
Lens.set(state, fromExit(exit)),
|
||||
Effect.forkScoped(PubSub.shutdown(pubsub)),
|
||||
))
|
||||
}).pipe(
|
||||
Effect.forkScoped,
|
||||
Effect.provide(makeProgressLayer(state)),
|
||||
)
|
||||
|
||||
return [
|
||||
Subscribable.make({
|
||||
get: Ref.get(ref),
|
||||
changes: Stream.unwrapScoped(Effect.map(
|
||||
Effect.all([Ref.get(ref), Stream.fromPubSub(pubsub, { scoped: true })]),
|
||||
([latest, stream]) => Stream.concat(Stream.make(latest), stream),
|
||||
)),
|
||||
}),
|
||||
fiber,
|
||||
] as const
|
||||
return [state, fiber] as const
|
||||
})
|
||||
|
||||
export namespace forkEffect {
|
||||
export type InputContext<R, P> = R extends Progress<infer X> ? [X] extends [P] ? R : never : R
|
||||
export type OutputContext<A, E, R, P> = unsafeForkEffect.OutputContext<A, E, R, P>
|
||||
export type OutputContext<R, P> = unsafeForkEffect.OutputContext<R, P>
|
||||
export interface Options<A, E, P> extends unsafeForkEffect.Options<A, E, P> {}
|
||||
}
|
||||
|
||||
@@ -271,6 +253,6 @@ export const forkEffect: {
|
||||
): Effect.Effect<
|
||||
readonly [result: Subscribable.Subscribable<Result<A, E, P>, never, never>, fiber: Fiber.Fiber<A, E>],
|
||||
never,
|
||||
Scope.Scope | forkEffect.OutputContext<A, E, R, P>
|
||||
Scope.Scope | forkEffect.OutputContext<R, P>
|
||||
>
|
||||
} = unsafeForkEffect
|
||||
|
||||
Reference in New Issue
Block a user