This commit is contained in:
@@ -1,4 +1,4 @@
|
|||||||
import { type Cause, type Context, DateTime, Duration, Effect, Exit, Fiber, HashMap, identity, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect"
|
import { type Cause, type Context, DateTime, type Duration, Effect, Exit, Fiber, HashMap, identity, Option, Pipeable, Predicate, type Scope, Stream, Subscribable, SubscriptionRef } from "effect"
|
||||||
import * as QueryClient from "./QueryClient.js"
|
import * as QueryClient from "./QueryClient.js"
|
||||||
import * as Result from "./Result.js"
|
import * as Result from "./Result.js"
|
||||||
|
|
||||||
@@ -58,7 +58,7 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
|
|||||||
get run(): Effect.Effect<void> {
|
get run(): Effect.Effect<void> {
|
||||||
return Stream.runForEach(this.key, key => this.interrupt.pipe(
|
return Stream.runForEach(this.key, key => this.interrupt.pipe(
|
||||||
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
|
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
|
||||||
Effect.andThen(this.start(key, Result.initial(), false)),
|
Effect.andThen(this.startCached(key, Result.initial(), false)),
|
||||||
Effect.andThen(sub => Effect.forkScoped(this.watch(sub))),
|
Effect.andThen(sub => Effect.forkScoped(this.watch(sub))),
|
||||||
Effect.provide(this.context),
|
Effect.provide(this.context),
|
||||||
this.runSemaphore.withPermits(1),
|
this.runSemaphore.withPermits(1),
|
||||||
@@ -75,14 +75,14 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
|
|||||||
fetch(key: K): Effect.Effect<Result.Final<A, E, P>> {
|
fetch(key: K): Effect.Effect<Result.Final<A, E, P>> {
|
||||||
return this.interrupt.pipe(
|
return this.interrupt.pipe(
|
||||||
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
|
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
|
||||||
Effect.andThen(Effect.provide(this.start(key, Result.initial(), false), this.context)),
|
Effect.andThen(Effect.provide(this.startCached(key, Result.initial(), false), this.context)),
|
||||||
Effect.andThen(sub => this.watch(sub)),
|
Effect.andThen(sub => this.watch(sub)),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
fetchSubscribable(key: K): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>> {
|
fetchSubscribable(key: K): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>> {
|
||||||
return this.interrupt.pipe(
|
return this.interrupt.pipe(
|
||||||
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
|
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
|
||||||
Effect.andThen(Effect.provide(this.start(key, Result.initial(), false), this.context)),
|
Effect.andThen(Effect.provide(this.startCached(key, Result.initial(), false), this.context)),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
get refetch(): Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException> {
|
get refetch(): Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException> {
|
||||||
@@ -118,24 +118,25 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
|
|||||||
|
|
||||||
startCached(
|
startCached(
|
||||||
key: K,
|
key: K,
|
||||||
|
initial: Result.Result<A, E, P>,
|
||||||
refresh: boolean,
|
refresh: boolean,
|
||||||
): Effect.Effect<
|
): Effect.Effect<
|
||||||
Subscribable.Subscribable<Result.Result<A, E, P>>,
|
Subscribable.Subscribable<Result.Result<A, E, P>>,
|
||||||
never,
|
never,
|
||||||
Scope.Scope | QueryClient.QueryClient | R
|
Scope.Scope | QueryClient.QueryClient | R
|
||||||
> {
|
> {
|
||||||
return this.getCacheEntry(key).pipe(
|
return Effect.andThen(this.getCacheEntry(key), Option.match({
|
||||||
Effect.andThen(Option.match({
|
onSome: entry => QueryClient.isQueryClientCacheEntryStale(entry, this.staleTime)
|
||||||
onSome: entry => Effect.andThen(
|
? Effect.map(
|
||||||
DateTime.now,
|
SubscriptionRef.set(this.result, entry.result as Result.Result<A, E, P>),
|
||||||
now => Duration.lessThan(DateTime.distanceDuration(entry.createdAt, now), this.staleTime)
|
() => Subscribable.make({
|
||||||
? Result.optimistic(entry.result) as Result.Result<A, E, P>
|
get: Effect.succeed(entry.result as Result.Result<A, E, P>),
|
||||||
: Result.initial<A, E, P>(),
|
get changes() { return Stream.empty },
|
||||||
),
|
}),
|
||||||
onNone: () => Effect.succeed(Result.initial<A, E, P>()),
|
)
|
||||||
})),
|
: this.start(key, Result.optimistic(entry.result) as Result.Result<A, E, P>, false),
|
||||||
Effect.andThen(initial => this.start(key, initial, refresh)),
|
onNone: () => this.start(key, initial, refresh),
|
||||||
)
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
start(
|
start(
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { type DateTime, type Duration, Effect, Equal, Equivalence, Hash, HashMap, Pipeable, Predicate, type Scope, SubscriptionRef } from "effect"
|
import { DateTime, Duration, Effect, Equal, Equivalence, Hash, HashMap, Pipeable, Predicate, type Scope, SubscriptionRef } from "effect"
|
||||||
import type * as Query from "./Query.js"
|
import type * as Query from "./Query.js"
|
||||||
import type * as Result from "./Result.js"
|
import type * as Result from "./Result.js"
|
||||||
|
|
||||||
@@ -105,3 +105,11 @@ implements Pipeable.Pipeable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export const isQueryClientCacheEntry = (u: unknown): u is QueryClientCacheEntry => Predicate.hasProperty(u, QueryClientCacheEntryTypeId)
|
export const isQueryClientCacheEntry = (u: unknown): u is QueryClientCacheEntry => Predicate.hasProperty(u, QueryClientCacheEntryTypeId)
|
||||||
|
|
||||||
|
export const isQueryClientCacheEntryStale = (
|
||||||
|
self: QueryClientCacheEntry,
|
||||||
|
staleTime: Duration.DurationInput,
|
||||||
|
): Effect.Effect<boolean> => Effect.andThen(
|
||||||
|
DateTime.now,
|
||||||
|
now => Duration.lessThan(DateTime.distanceDuration(self.createdAt, now), staleTime),
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user