Cached queries
All checks were successful
Lint / lint (push) Successful in 53s

This commit is contained in:
Julien Valverdé
2026-01-11 11:08:32 +01:00
parent 66694c7d7e
commit a6d91a93a5
2 changed files with 56 additions and 31 deletions

View File

@@ -1,4 +1,4 @@
import { type Cause, type Context, DateTime, type Duration, Effect, Fiber, HashMap, identity, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect"
import { type Cause, type Context, DateTime, Duration, Effect, Exit, Fiber, HashMap, identity, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect"
import * as QueryClient from "./QueryClient.js"
import * as Result from "./Result.js"
@@ -21,6 +21,7 @@ extends Pipeable.Pipeable {
readonly fiber: Subscribable.Subscribable<Option.Option<Fiber.Fiber<A, E>>>
readonly result: Subscribable.Subscribable<Result.Result<A, E, P>>
readonly run: Effect.Effect<void>
fetch(key: K): Effect.Effect<Result.Final<A, E, P>>
fetchSubscribable(key: K): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>>
readonly refetch: Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException>
@@ -54,6 +55,16 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
super()
}
get run(): Effect.Effect<void> {
return Stream.runForEach(this.key, key => this.interrupt.pipe(
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
Effect.andThen(this.start(key, Result.initial(), false)),
Effect.andThen(sub => Effect.forkScoped(this.watch(sub))),
Effect.provide(this.context),
this.runSemaphore.withPermits(1),
))
}
get interrupt(): Effect.Effect<void, never, never> {
return Effect.andThen(this.fiber, Option.match({
onSome: Fiber.interrupt,
@@ -64,21 +75,21 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
fetch(key: K): Effect.Effect<Result.Final<A, E, P>> {
return this.interrupt.pipe(
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
Effect.andThen(Effect.provide(this.start(key), this.context)),
Effect.andThen(Effect.provide(this.start(key, Result.initial(), false), this.context)),
Effect.andThen(sub => this.watch(sub)),
)
}
fetchSubscribable(key: K): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>> {
return this.interrupt.pipe(
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
Effect.andThen(Effect.provide(this.start(key), this.context)),
Effect.andThen(Effect.provide(this.start(key, Result.initial(), false), this.context)),
)
}
get refetch(): Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException> {
return this.interrupt.pipe(
Effect.andThen(this.latestKey),
Effect.andThen(identity),
Effect.andThen(key => Effect.provide(this.start(key), this.context)),
Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), false), this.context)),
Effect.andThen(sub => this.watch(sub)),
)
}
@@ -86,14 +97,14 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
return this.interrupt.pipe(
Effect.andThen(this.latestKey),
Effect.andThen(identity),
Effect.andThen(key => Effect.provide(this.start(key), this.context)),
Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), false), this.context)),
)
}
get refresh(): Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException> {
return this.interrupt.pipe(
Effect.andThen(this.latestKey),
Effect.andThen(identity),
Effect.andThen(key => Effect.provide(this.start(key, true), this.context)),
Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), true), this.context)),
Effect.andThen(sub => this.watch(sub)),
)
}
@@ -101,26 +112,52 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
return this.interrupt.pipe(
Effect.andThen(this.latestKey),
Effect.andThen(identity),
Effect.andThen(key => Effect.provide(this.start(key, true), this.context)),
Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), true), this.context)),
)
}
startCached(
key: K,
refresh: boolean,
): Effect.Effect<
Subscribable.Subscribable<Result.Result<A, E, P>>,
never,
Scope.Scope | QueryClient.QueryClient | R
> {
return this.getCacheEntry(key).pipe(
Effect.andThen(Option.match({
onSome: entry => Effect.andThen(
DateTime.now,
now => Duration.lessThan(DateTime.distanceDuration(entry.createdAt, now), this.staleTime)
? Result.optimistic(entry.result) as Result.Result<A, E, P>
: Result.initial<A, E, P>(),
),
onNone: () => Effect.succeed(Result.initial<A, E, P>()),
})),
Effect.andThen(initial => this.start(key, initial, refresh)),
)
}
start(
key: K,
refresh?: boolean,
initial: Result.Result<A, E, P>,
refresh: boolean,
): Effect.Effect<
Subscribable.Subscribable<Result.Result<A, E, P>>,
never,
Scope.Scope | R
Scope.Scope | QueryClient.QueryClient | R
> {
return this.result.pipe(
Effect.map(previous => Result.isFinal(previous)
? previous
: undefined
),
Effect.map(previous => Result.isFinal(previous) ? previous : undefined),
Effect.andThen(previous => Result.unsafeForkEffect(
Effect.onExit(this.f(key), () => SubscriptionRef.set(this.fiber, Option.none())),
Effect.onExit(this.f(key), exit => Effect.andThen(
Exit.isSuccess(exit)
? this.updateCacheEntry(key, Result.succeed(exit.value))
: Effect.void,
SubscriptionRef.set(this.fiber, Option.none()),
)),
{
initial,
initialProgress: this.initialProgress,
refresh: refresh && previous,
previous,
@@ -214,18 +251,5 @@ export const service = <K extends Query.AnyKey, A, E = never, R = never, P = nev
Scope.Scope | QueryClient.QueryClient | Result.forkEffect.OutputContext<A, E, R, P>
> => Effect.tap(
make(options),
query => Effect.forkScoped(run(query)),
query => Effect.forkScoped(query.run),
)
export const run = <K extends Query.AnyKey, A, E, R, P>(
self: Query<K, A, E, R, P>
): Effect.Effect<void> => {
const _self = self as QueryImpl<K, A, E, R, P>
return Stream.runForEach(_self.key, key => _self.interrupt.pipe(
Effect.andThen(SubscriptionRef.set(_self.latestKey, Option.some(key))),
Effect.andThen(_self.start(key)),
Effect.andThen(sub => Effect.forkScoped(_self.watch(sub))),
Effect.provide(_self.context),
_self.runSemaphore.withPermits(1),
))
}

View File

@@ -111,8 +111,8 @@ export const isInitial = (u: unknown): u is Initial => isResult(u) && u._tag ===
export const isRunning = (u: unknown): u is Running<unknown> => isResult(u) && u._tag === "Running"
export const isSuccess = (u: unknown): u is Success<unknown> => isResult(u) && u._tag === "Success"
export const isFailure = (u: unknown): u is Failure<unknown, unknown> => isResult(u) && u._tag === "Failure"
export const isRefreshing = (u: unknown): u is Refreshing<unknown> => isResult(u) && Predicate.hasProperty(u, "refreshing") && u.refreshing
export const isOptimistic = (u: unknown): u is Optimistic => isResult(u) && Predicate.hasProperty(u, "optimistic") && u.optimistic
export const isRefreshing = (u: unknown): u is Refreshing<unknown> => isResult(u) && Predicate.hasProperty(u, "refreshing")
export const isOptimistic = (u: unknown): u is Optimistic => isResult(u) && Predicate.hasProperty(u, "optimistic")
export const initial: {
(): Initial
@@ -214,6 +214,7 @@ export namespace unsafeForkEffect {
export type OutputContext<A, E, R, P> = Exclude<R, State<A, E, P> | Progress<P> | Progress<never>>
export type Options<A, E, P> = {
readonly initial?: Result<A, E, P>
readonly initialProgress?: P
readonly previous?: Final<A, E, P>
} & (
@@ -235,7 +236,7 @@ export const unsafeForkEffect = <A, E, R, P = never>(
never,
Scope.Scope | unsafeForkEffect.OutputContext<A, E, R, P>
> => Effect.Do.pipe(
Effect.bind("ref", () => Ref.make(initial<A, E, P>())),
Effect.bind("ref", () => Ref.make(options?.initial ?? initial<A, E, P>())),
Effect.bind("pubsub", () => PubSub.unbounded<Result<A, E, P>>()),
Effect.bind("fiber", ({ ref, pubsub }) => Effect.forkScoped(State<A, E, P>().pipe(
Effect.andThen(state => state.set(options?.refresh