diff --git a/packages/effect-fc/src/Result.ts b/packages/effect-fc/src/Result.ts index 70ecbe7..5b3d21e 100644 --- a/packages/effect-fc/src/Result.ts +++ b/packages/effect-fc/src/Result.ts @@ -1,4 +1,5 @@ -import { Cause, Context, Data, Effect, Equal, Exit, type Fiber, Hash, Layer, Match, pipe, Pipeable, Predicate, PubSub, Ref, type Scope, Stream, Subscribable } from "effect" +import { Cause, Context, Data, Effect, Equal, Exit, type Fiber, Hash, Layer, Match, pipe, Pipeable, Predicate, PubSub, type Scope, Stream, type Subscribable, SynchronizedRef } from "effect" +import { Lens } from "effect-lens" export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result") @@ -162,52 +163,36 @@ export const toExit: { } -export interface State { - readonly get: Effect.Effect> - readonly set: (v: Result) => Effect.Effect -} - -export const State = (): Context.Tag, State> => Context.GenericTag("@effect-fc/Result/State") - export interface Progress

{ - readonly update: ( - f: (previous: P) => Effect.Effect - ) => Effect.Effect + readonly progress: Lens.Lens } +export const Progress =

(): Context.Tag, Progress

> => Context.GenericTag("@effect-fc/Result/Progress") export class PreviousResultNotRunningNorRefreshing extends Data.TaggedError("@effect-fc/Result/PreviousResultNotRunningNorRefreshing")<{ readonly previous: Result }> {} -export const Progress =

(): Context.Tag, Progress

> => Context.GenericTag("@effect-fc/Result/Progress") - -export const makeProgressLayer = (): Layer.Layer< - Progress

, - never, - State -> => Layer.effect(Progress

(), Effect.gen(function*() { - const state = yield* State() - - return { - update: (f: (previous: P) => Effect.Effect) => Effect.Do.pipe( - Effect.bind("previous", () => Effect.andThen(state.get, previous => - (isRunning(previous) || hasRefreshingFlag(previous)) - ? Effect.succeed(previous) - : Effect.fail(new PreviousResultNotRunningNorRefreshing({ previous })), - )), - Effect.bind("progress", ({ previous }) => f(previous.progress)), - Effect.let("next", ({ previous, progress }) => isRunning(previous) - ? running(progress) - : refreshing(previous, progress) as Final & Refreshing

- ), - Effect.andThen(({ next }) => state.set(next)), - ), - } -})) +export const makeProgressLayer = ( + state: Lens.Lens, never, never, never, never> +): Layer.Layer | Progress, never, never> => Layer.effect( + Progress

() as Context.Tag | Progress, Progress

| Progress>, + Effect.gen(function*() { + const lens = Lens.mapEffect( + state, + a => (isRunning(a) || hasRefreshingFlag(a)) + ? Effect.succeed(a.progress) + : Effect.fail(new PreviousResultNotRunningNorRefreshing({ previous: a })), + (a, b) => isRunning(a) + ? Effect.succeed(running(b)) + : Effect.succeed(refreshing(a, b) as Final & Refreshing

), + ) + return { progress: lens } as any + }), +) export namespace unsafeForkEffect { - export type OutputContext = Exclude | Progress

| Progress> + export type OutputContext = Exclude | Progress> export interface Options { readonly initial?: Initial | Final @@ -221,46 +206,43 @@ export const unsafeForkEffect = Effect.fnUntraced(function* ): Effect.fn.Return< readonly [result: Subscribable.Subscribable, never, never>, fiber: Fiber.Fiber], never, - Scope.Scope | unsafeForkEffect.OutputContext + Scope.Scope | unsafeForkEffect.OutputContext > { - const ref = yield* Ref.make(options?.initial ?? initial()) + const ref = yield* SynchronizedRef.make>(options?.initial ?? initial()) const pubsub = yield* PubSub.unbounded>() - const fiber = yield* (Effect.forkScoped(State().pipe( - Effect.andThen(state => state.set( + const state = Lens.make, never, never, never, never>({ + get get() { return ref.get }, + get changes() { + return Stream.unwrapScoped(Effect.map( + Effect.all([ref.get, Stream.fromPubSub(pubsub, { scoped: true })]), + ([latest, stream]) => Stream.concat(Stream.make(latest), stream), + )) + }, + modify: f => ref.modifyEffect(f), + }) + + const fiber = yield* Effect.gen(function*() { + yield* Lens.set( + state, (isFinal(options?.initial) && hasWillRefreshFlag(options?.initial)) ? refreshing(options.initial, options?.initialProgress) as Result - : running(options?.initialProgress) - ).pipe( - Effect.andThen(effect), - Effect.onExit(exit => Effect.andThen( - state.set(fromExit(exit)), - Effect.forkScoped(PubSub.shutdown(pubsub)), - )), - )), - Effect.provide(Layer.empty.pipe( - Layer.provideMerge(makeProgressLayer()), - Layer.provideMerge(Layer.succeed(State(), { - get: Ref.get(ref), - set: v => Effect.andThen(Ref.set(ref, v), PubSub.publish(pubsub, v)) - })), - )), - )) as Effect.Effect, never, Scope.Scope | unsafeForkEffect.OutputContext>) + : running(options?.initialProgress), + ) + return yield* Effect.onExit(effect, exit => Effect.andThen( + Lens.set(state, fromExit(exit)), + Effect.forkScoped(PubSub.shutdown(pubsub)), + )) + }).pipe( + Effect.forkScoped, + Effect.provide(makeProgressLayer(state)), + ) - return [ - Subscribable.make({ - get: Ref.get(ref), - changes: Stream.unwrapScoped(Effect.map( - Effect.all([Ref.get(ref), Stream.fromPubSub(pubsub, { scoped: true })]), - ([latest, stream]) => Stream.concat(Stream.make(latest), stream), - )), - }), - fiber, - ] as const + return [state, fiber] as const }) export namespace forkEffect { export type InputContext = R extends Progress ? [X] extends [P] ? R : never : R - export type OutputContext = unsafeForkEffect.OutputContext + export type OutputContext = unsafeForkEffect.OutputContext export interface Options extends unsafeForkEffect.Options {} } @@ -271,6 +253,6 @@ export const forkEffect: { ): Effect.Effect< readonly [result: Subscribable.Subscribable, never, never>, fiber: Fiber.Fiber], never, - Scope.Scope | forkEffect.OutputContext + Scope.Scope | forkEffect.OutputContext > } = unsafeForkEffect