From a6d91a93a53c5079933f0f640b095f98afbb57d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Sun, 11 Jan 2026 11:08:32 +0100 Subject: [PATCH] Cached queries --- packages/effect-fc/src/Query.ts | 80 +++++++++++++++++++++----------- packages/effect-fc/src/Result.ts | 7 +-- 2 files changed, 56 insertions(+), 31 deletions(-) diff --git a/packages/effect-fc/src/Query.ts b/packages/effect-fc/src/Query.ts index edc5062..697f2a6 100644 --- a/packages/effect-fc/src/Query.ts +++ b/packages/effect-fc/src/Query.ts @@ -1,4 +1,4 @@ -import { type Cause, type Context, DateTime, type Duration, Effect, Fiber, HashMap, identity, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect" +import { type Cause, type Context, DateTime, Duration, Effect, Exit, Fiber, HashMap, identity, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect" import * as QueryClient from "./QueryClient.js" import * as Result from "./Result.js" @@ -21,6 +21,7 @@ extends Pipeable.Pipeable { readonly fiber: Subscribable.Subscribable>> readonly result: Subscribable.Subscribable> + readonly run: Effect.Effect fetch(key: K): Effect.Effect> fetchSubscribable(key: K): Effect.Effect>> readonly refetch: Effect.Effect, Cause.NoSuchElementException> @@ -54,6 +55,16 @@ extends Pipeable.Class() implements Query { super() } + get run(): Effect.Effect { + return Stream.runForEach(this.key, key => this.interrupt.pipe( + Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), + Effect.andThen(this.start(key, Result.initial(), false)), + Effect.andThen(sub => Effect.forkScoped(this.watch(sub))), + Effect.provide(this.context), + this.runSemaphore.withPermits(1), + )) + } + get interrupt(): Effect.Effect { return Effect.andThen(this.fiber, Option.match({ onSome: Fiber.interrupt, @@ -64,21 +75,21 @@ extends Pipeable.Class() implements Query { fetch(key: K): Effect.Effect> { return this.interrupt.pipe( Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), - Effect.andThen(Effect.provide(this.start(key), this.context)), + Effect.andThen(Effect.provide(this.start(key, Result.initial(), false), this.context)), Effect.andThen(sub => this.watch(sub)), ) } fetchSubscribable(key: K): Effect.Effect>> { return this.interrupt.pipe( Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), - Effect.andThen(Effect.provide(this.start(key), this.context)), + Effect.andThen(Effect.provide(this.start(key, Result.initial(), false), this.context)), ) } get refetch(): Effect.Effect, Cause.NoSuchElementException> { return this.interrupt.pipe( Effect.andThen(this.latestKey), Effect.andThen(identity), - Effect.andThen(key => Effect.provide(this.start(key), this.context)), + Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), false), this.context)), Effect.andThen(sub => this.watch(sub)), ) } @@ -86,14 +97,14 @@ extends Pipeable.Class() implements Query { return this.interrupt.pipe( Effect.andThen(this.latestKey), Effect.andThen(identity), - Effect.andThen(key => Effect.provide(this.start(key), this.context)), + Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), false), this.context)), ) } get refresh(): Effect.Effect, Cause.NoSuchElementException> { return this.interrupt.pipe( Effect.andThen(this.latestKey), Effect.andThen(identity), - Effect.andThen(key => Effect.provide(this.start(key, true), this.context)), + Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), true), this.context)), Effect.andThen(sub => this.watch(sub)), ) } @@ -101,26 +112,52 @@ extends Pipeable.Class() implements Query { return this.interrupt.pipe( Effect.andThen(this.latestKey), Effect.andThen(identity), - Effect.andThen(key => Effect.provide(this.start(key, true), this.context)), + Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), true), this.context)), + ) + } + + startCached( + key: K, + refresh: boolean, + ): Effect.Effect< + Subscribable.Subscribable>, + never, + Scope.Scope | QueryClient.QueryClient | R + > { + return this.getCacheEntry(key).pipe( + Effect.andThen(Option.match({ + onSome: entry => Effect.andThen( + DateTime.now, + now => Duration.lessThan(DateTime.distanceDuration(entry.createdAt, now), this.staleTime) + ? Result.optimistic(entry.result) as Result.Result + : Result.initial(), + ), + onNone: () => Effect.succeed(Result.initial()), + })), + Effect.andThen(initial => this.start(key, initial, refresh)), ) } start( key: K, - refresh?: boolean, + initial: Result.Result, + refresh: boolean, ): Effect.Effect< Subscribable.Subscribable>, never, - Scope.Scope | R + Scope.Scope | QueryClient.QueryClient | R > { return this.result.pipe( - Effect.map(previous => Result.isFinal(previous) - ? previous - : undefined - ), + Effect.map(previous => Result.isFinal(previous) ? previous : undefined), Effect.andThen(previous => Result.unsafeForkEffect( - Effect.onExit(this.f(key), () => SubscriptionRef.set(this.fiber, Option.none())), + Effect.onExit(this.f(key), exit => Effect.andThen( + Exit.isSuccess(exit) + ? this.updateCacheEntry(key, Result.succeed(exit.value)) + : Effect.void, + SubscriptionRef.set(this.fiber, Option.none()), + )), { + initial, initialProgress: this.initialProgress, refresh: refresh && previous, previous, @@ -214,18 +251,5 @@ export const service = > => Effect.tap( make(options), - query => Effect.forkScoped(run(query)), + query => Effect.forkScoped(query.run), ) - -export const run = ( - self: Query -): Effect.Effect => { - const _self = self as QueryImpl - return Stream.runForEach(_self.key, key => _self.interrupt.pipe( - Effect.andThen(SubscriptionRef.set(_self.latestKey, Option.some(key))), - Effect.andThen(_self.start(key)), - Effect.andThen(sub => Effect.forkScoped(_self.watch(sub))), - Effect.provide(_self.context), - _self.runSemaphore.withPermits(1), - )) -} diff --git a/packages/effect-fc/src/Result.ts b/packages/effect-fc/src/Result.ts index f676f93..2877412 100644 --- a/packages/effect-fc/src/Result.ts +++ b/packages/effect-fc/src/Result.ts @@ -111,8 +111,8 @@ export const isInitial = (u: unknown): u is Initial => isResult(u) && u._tag === export const isRunning = (u: unknown): u is Running => isResult(u) && u._tag === "Running" export const isSuccess = (u: unknown): u is Success => isResult(u) && u._tag === "Success" export const isFailure = (u: unknown): u is Failure => isResult(u) && u._tag === "Failure" -export const isRefreshing = (u: unknown): u is Refreshing => isResult(u) && Predicate.hasProperty(u, "refreshing") && u.refreshing -export const isOptimistic = (u: unknown): u is Optimistic => isResult(u) && Predicate.hasProperty(u, "optimistic") && u.optimistic +export const isRefreshing = (u: unknown): u is Refreshing => isResult(u) && Predicate.hasProperty(u, "refreshing") +export const isOptimistic = (u: unknown): u is Optimistic => isResult(u) && Predicate.hasProperty(u, "optimistic") export const initial: { (): Initial @@ -214,6 +214,7 @@ export namespace unsafeForkEffect { export type OutputContext = Exclude | Progress

| Progress> export type Options = { + readonly initial?: Result readonly initialProgress?: P readonly previous?: Final } & ( @@ -235,7 +236,7 @@ export const unsafeForkEffect = ( never, Scope.Scope | unsafeForkEffect.OutputContext > => Effect.Do.pipe( - Effect.bind("ref", () => Ref.make(initial())), + Effect.bind("ref", () => Ref.make(options?.initial ?? initial())), Effect.bind("pubsub", () => PubSub.unbounded>()), Effect.bind("fiber", ({ ref, pubsub }) => Effect.forkScoped(State().pipe( Effect.andThen(state => state.set(options?.refresh