diff --git a/packages/effect-fc/src/Query.ts b/packages/effect-fc/src/Query.ts index c3a1933..b7c8ae6 100644 --- a/packages/effect-fc/src/Query.ts +++ b/packages/effect-fc/src/Query.ts @@ -20,14 +20,15 @@ extends Pipeable.Pipeable { readonly latestKey: Subscribable.Subscribable> readonly fiber: Subscribable.Subscribable>> readonly result: Subscribable.Subscribable> + readonly latestFinalResult: Subscribable.Subscribable>> readonly run: Effect.Effect fetch(key: K): Effect.Effect> - fetchSubscribable(key: K): Effect.Effect>> + fetchSubscribable(key: K): Effect.Effect>, never, Scope.Scope> readonly refetch: Effect.Effect, Cause.NoSuchElementException> - readonly refetchSubscribable: Effect.Effect>, Cause.NoSuchElementException> + readonly refetchSubscribable: Effect.Effect>, Cause.NoSuchElementException, Scope.Scope> readonly refresh: Effect.Effect, Cause.NoSuchElementException> - readonly refreshSubscribable: Effect.Effect>, Cause.NoSuchElementException> + readonly refreshSubscribable: Effect.Effect>, Cause.NoSuchElementException, Scope.Scope> } export declare namespace Query { @@ -49,6 +50,7 @@ extends Pipeable.Class() implements Query { readonly latestKey: SubscriptionRef.SubscriptionRef>, readonly fiber: SubscriptionRef.SubscriptionRef>>, readonly result: SubscriptionRef.SubscriptionRef>, + readonly latestFinalResult: SubscriptionRef.SubscriptionRef>>, readonly runSemaphore: Effect.Semaphore, ) { @@ -56,13 +58,19 @@ extends Pipeable.Class() implements Query { } get run(): Effect.Effect { - return Stream.runForEach(this.key, key => this.interrupt.pipe( - Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), - Effect.andThen(this.startCached(key, Result.initial(), false)), - Effect.andThen(sub => Effect.forkScoped(this.watch(sub))), - Effect.provide(this.context), - this.runSemaphore.withPermits(1), - )) + return Effect.provide( + Stream.runForEach(this.key, key => this.interrupt.pipe( + Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), + Effect.andThen(this.latestFinalResult), + Effect.andThen(previous => this.startCached(key, Option.isSome(previous) + ? Result.willFetch(previous.value) as Result.Final + : Result.initial() + )), + Effect.andThen(sub => Effect.forkScoped(this.watch(sub))), + this.runSemaphore.withPermits(1), + )), + this.context, + ) } get interrupt(): Effect.Effect { @@ -75,51 +83,112 @@ extends Pipeable.Class() implements Query { fetch(key: K): Effect.Effect> { return this.interrupt.pipe( Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), - Effect.andThen(Effect.provide(this.startCached(key, Result.initial(), false), this.context)), + Effect.andThen(this.latestFinalResult), + Effect.andThen(previous => Effect.provide( + this.startCached(key, Option.isSome(previous) + ? Result.willFetch(previous.value) as Result.Final + : Result.initial() + ), + this.context, + )), Effect.andThen(sub => this.watch(sub)), ) } - fetchSubscribable(key: K): Effect.Effect>> { + + fetchSubscribable(key: K): Effect.Effect< + Subscribable.Subscribable>, + never, + Scope.Scope + > { return this.interrupt.pipe( Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), - Effect.andThen(Effect.provide(this.startCached(key, Result.initial(), false), this.context)), + Effect.andThen(this.latestFinalResult), + Effect.andThen(previous => Effect.provide( + this.startCached(key, Option.isSome(previous) + ? Result.willFetch(previous.value) as Result.Final + : Result.initial() + ), + this.context, + )), + Effect.tap(sub => Effect.forkScoped(this.watch(sub))), ) } + get refetch(): Effect.Effect, Cause.NoSuchElementException> { return this.interrupt.pipe( - Effect.andThen(this.latestKey), - Effect.andThen(identity), - Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), false), this.context)), + Effect.andThen(Effect.Do), + Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)), + Effect.bind("latestFinalResult", () => this.latestFinalResult), + Effect.andThen(({ latestKey, latestFinalResult }) => Effect.provide( + this.startCached(latestKey, Option.isSome(latestFinalResult) + ? Result.willFetch(latestFinalResult.value) as Result.Final + : Result.initial() + ), + this.context, + )), Effect.andThen(sub => this.watch(sub)), ) } - get refetchSubscribable(): Effect.Effect>, Cause.NoSuchElementException> { + + get refetchSubscribable(): Effect.Effect< + Subscribable.Subscribable>, + Cause.NoSuchElementException, + Scope.Scope + > { return this.interrupt.pipe( - Effect.andThen(this.latestKey), - Effect.andThen(identity), - Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), false), this.context)), + Effect.andThen(Effect.Do), + Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)), + Effect.bind("latestFinalResult", () => this.latestFinalResult), + Effect.andThen(({ latestKey, latestFinalResult }) => Effect.provide( + this.startCached(latestKey, Option.isSome(latestFinalResult) + ? Result.willFetch(latestFinalResult.value) as Result.Final + : Result.initial() + ), + this.context, + )), + Effect.tap(sub => Effect.forkScoped(this.watch(sub))), ) } + get refresh(): Effect.Effect, Cause.NoSuchElementException> { return this.interrupt.pipe( - Effect.andThen(this.latestKey), - Effect.andThen(identity), - Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), true), this.context)), + Effect.andThen(Effect.Do), + Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)), + Effect.bind("latestFinalResult", () => this.latestFinalResult), + Effect.andThen(({ latestKey, latestFinalResult }) => Effect.provide( + this.startCached(latestKey, Option.isSome(latestFinalResult) + ? Result.willRefresh(latestFinalResult.value) as Result.Final + : Result.initial() + ), + this.context, + )), Effect.andThen(sub => this.watch(sub)), ) } - get refreshSubscribable(): Effect.Effect>, Cause.NoSuchElementException> { + + get refreshSubscribable(): Effect.Effect< + Subscribable.Subscribable>, + Cause.NoSuchElementException, + Scope.Scope + > { return this.interrupt.pipe( - Effect.andThen(this.latestKey), - Effect.andThen(identity), - Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), true), this.context)), + Effect.andThen(Effect.Do), + Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)), + Effect.bind("latestFinalResult", () => this.latestFinalResult), + Effect.andThen(({ latestKey, latestFinalResult }) => Effect.provide( + this.startCached(latestKey, Option.isSome(latestFinalResult) + ? Result.willRefresh(latestFinalResult.value) as Result.Final + : Result.initial() + ), + this.context, + )), + Effect.tap(sub => Effect.forkScoped(this.watch(sub))), ) } startCached( key: K, - initial: Result.Result, - refresh: boolean, + initial: Result.Initial | Result.Final, ): Effect.Effect< Subscribable.Subscribable>, never, @@ -127,43 +196,35 @@ extends Pipeable.Class() implements Query { > { return Effect.andThen(this.getCacheEntry(key), Option.match({ onSome: entry => QueryClient.isQueryClientCacheEntryStale(entry, this.staleTime) - ? Effect.map( - SubscriptionRef.set(this.result, entry.result as Result.Result), - () => Subscribable.make({ - get: Effect.succeed(entry.result as Result.Result), - get changes() { return Stream.empty }, - }), - ) - : this.start(key, Result.optimistic(entry.result) as Result.Result, false), - onNone: () => this.start(key, initial, refresh), + ? Effect.succeed(Subscribable.make({ + get: Effect.succeed(entry.result as Result.Result), + get changes() { return Stream.make(entry.result as Result.Result) }, + })) + : this.start(key, Result.willRefresh(entry.result) as Result.Final), + onNone: () => this.start(key, initial), })) } start( key: K, - initial: Result.Result, - refresh: boolean, + initial: Result.Initial | Result.Final, ): Effect.Effect< Subscribable.Subscribable>, never, Scope.Scope | QueryClient.QueryClient | R > { - return this.result.pipe( - Effect.map(previous => Result.isFinal(previous) ? previous : undefined), - Effect.andThen(previous => Result.unsafeForkEffect( - Effect.onExit(this.f(key), exit => Effect.andThen( - Exit.isSuccess(exit) - ? this.updateCacheEntry(key, Result.succeed(exit.value)) - : Effect.void, - SubscriptionRef.set(this.fiber, Option.none()), - )), - { - initial, - initialProgress: this.initialProgress, - refresh: refresh && previous, - previous, - } as Result.unsafeForkEffect.Options, + return Result.unsafeForkEffect( + Effect.onExit(this.f(key), exit => Effect.andThen( + Exit.isSuccess(exit) + ? this.updateCacheEntry(key, Result.fromExit(exit)) + : Effect.void, + SubscriptionRef.set(this.fiber, Option.none()), )), + { + initial, + initialProgress: this.initialProgress, + } as Result.unsafeForkEffect.Options, + ).pipe( Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))), Effect.map(([sub]) => sub), ) @@ -172,14 +233,14 @@ extends Pipeable.Class() implements Query { watch( sub: Subscribable.Subscribable> ): Effect.Effect> { - return Effect.andThen( - sub.get, - initial => Stream.runFoldEffect( + return sub.get.pipe( + Effect.andThen(initial => Stream.runFoldEffect( Stream.filter(sub.changes, Predicate.not(Result.isInitial)), initial, (_, result) => Effect.as(SubscriptionRef.set(this.result, result), result), - ), - ) as Effect.Effect> + ) as Effect.Effect>), + Effect.tap(result => SubscriptionRef.set(this.latestFinalResult, Option.some(result))), + ) } getCacheEntry( @@ -239,6 +300,7 @@ export const make = Effect.fnUntraced(function* ()), yield* SubscriptionRef.make(Option.none>()), yield* SubscriptionRef.make(Result.initial()), + yield* SubscriptionRef.make(Option.none>()), yield* Effect.makeSemaphore(1), ) diff --git a/packages/effect-fc/src/Result.ts b/packages/effect-fc/src/Result.ts index c0275c9..3cef66c 100644 --- a/packages/effect-fc/src/Result.ts +++ b/packages/effect-fc/src/Result.ts @@ -5,8 +5,7 @@ export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result" export type ResultTypeId = typeof ResultTypeId export type Result = ( - // biome-ignore lint/complexity/noBannedTypes: "{}" is relevant here - | (Initial & ({} | WillFetch)) + | Initial | Running

| Final ) @@ -118,7 +117,7 @@ export const running =

(progress?: P): Running

=> Object.setProtot export const succeed = (value: A): Success => Object.setPrototypeOf({ _tag: "Success", value }, ResultPrototype) export const fail = (cause: Cause.Cause ): Failure => Object.setPrototypeOf({ _tag: "Failure", cause }, ResultPrototype) -export const willFetch = >( +export const willFetch = >( result: R ): Omit & WillFetch => Object.setPrototypeOf( Object.assign({}, result, { _flag: "WillFetch" }), @@ -140,13 +139,18 @@ export const refreshing = , P = never>( Object.getPrototypeOf(result), ) -export const fromExit = ( - exit: Exit.Exit -): Success | Failure => exit._tag === "Success" ? succeed(exit.value) : fail(exit.cause) +export const fromExit: { + (exit: Exit.Success): Success + (exit: Exit.Failure): Failure + (exit: Exit.Exit): Success | Failure +} = exit => (exit._tag === "Success" ? succeed(exit.value) : fail(exit.cause)) as any -export const toExit = ( - self: Result -): Exit.Exit => { +export const toExit: { + (self: Success): Exit.Success + (self: Failure): Exit.Failure + (self: Final): Exit.Exit + (self: Result): Exit.Exit +} = (self: Result): any => { switch (self._tag) { case "Success": return Exit.succeed(self.value) @@ -206,8 +210,7 @@ export namespace unsafeForkEffect { export type OutputContext = Exclude | Progress

| Progress> export interface Options { - // biome-ignore lint/complexity/noBannedTypes: "{}" is relevant here - readonly initial?: (Initial | Success | Failure) & ({} | WillFetch | WillRefresh) + readonly initial?: Initial | Final readonly initialProgress?: P } }