Compare commits

...

2 Commits

Author SHA1 Message Date
Julien Valverdé
8203063253 Refactor Query
Some checks failed
Lint / lint (push) Failing after 11s
2026-01-14 12:42:16 +01:00
Julien Valverdé
931511b890 Refactor Result 2026-01-14 10:29:52 +01:00
2 changed files with 143 additions and 93 deletions

View File

@@ -20,14 +20,15 @@ extends Pipeable.Pipeable {
readonly latestKey: Subscribable.Subscribable<Option.Option<K>> readonly latestKey: Subscribable.Subscribable<Option.Option<K>>
readonly fiber: Subscribable.Subscribable<Option.Option<Fiber.Fiber<A, E>>> readonly fiber: Subscribable.Subscribable<Option.Option<Fiber.Fiber<A, E>>>
readonly result: Subscribable.Subscribable<Result.Result<A, E, P>> readonly result: Subscribable.Subscribable<Result.Result<A, E, P>>
readonly latestFinalResult: Subscribable.Subscribable<Option.Option<Result.Final<A, E, P>>>
readonly run: Effect.Effect<void> readonly run: Effect.Effect<void>
fetch(key: K): Effect.Effect<Result.Final<A, E, P>> fetch(key: K): Effect.Effect<Result.Final<A, E, P>>
fetchSubscribable(key: K): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>> fetchSubscribable(key: K): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, never, Scope.Scope>
readonly refetch: Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException> readonly refetch: Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException>
readonly refetchSubscribable: Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, Cause.NoSuchElementException> readonly refetchSubscribable: Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, Cause.NoSuchElementException, Scope.Scope>
readonly refresh: Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException> readonly refresh: Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException>
readonly refreshSubscribable: Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, Cause.NoSuchElementException> readonly refreshSubscribable: Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, Cause.NoSuchElementException, Scope.Scope>
} }
export declare namespace Query { export declare namespace Query {
@@ -49,6 +50,7 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
readonly latestKey: SubscriptionRef.SubscriptionRef<Option.Option<K>>, readonly latestKey: SubscriptionRef.SubscriptionRef<Option.Option<K>>,
readonly fiber: SubscriptionRef.SubscriptionRef<Option.Option<Fiber.Fiber<A, E>>>, readonly fiber: SubscriptionRef.SubscriptionRef<Option.Option<Fiber.Fiber<A, E>>>,
readonly result: SubscriptionRef.SubscriptionRef<Result.Result<A, E, P>>, readonly result: SubscriptionRef.SubscriptionRef<Result.Result<A, E, P>>,
readonly latestFinalResult: SubscriptionRef.SubscriptionRef<Option.Option<Result.Final<A, E, P>>>,
readonly runSemaphore: Effect.Semaphore, readonly runSemaphore: Effect.Semaphore,
) { ) {
@@ -56,13 +58,19 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
} }
get run(): Effect.Effect<void> { get run(): Effect.Effect<void> {
return Stream.runForEach(this.key, key => this.interrupt.pipe( return Effect.provide(
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), Stream.runForEach(this.key, key => this.interrupt.pipe(
Effect.andThen(this.startCached(key, Result.initial(), false)), Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
Effect.andThen(sub => Effect.forkScoped(this.watch(sub))), Effect.andThen(this.latestFinalResult),
Effect.provide(this.context), Effect.andThen(previous => this.startCached(key, Option.isSome(previous)
this.runSemaphore.withPermits(1), ? Result.willFetch(previous.value) as Result.Final<A, E, P>
)) : Result.initial()
)),
Effect.andThen(sub => Effect.forkScoped(this.watch(sub))),
this.runSemaphore.withPermits(1),
)),
this.context,
)
} }
get interrupt(): Effect.Effect<void, never, never> { get interrupt(): Effect.Effect<void, never, never> {
@@ -75,51 +83,112 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
fetch(key: K): Effect.Effect<Result.Final<A, E, P>> { fetch(key: K): Effect.Effect<Result.Final<A, E, P>> {
return this.interrupt.pipe( return this.interrupt.pipe(
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
Effect.andThen(Effect.provide(this.startCached(key, Result.initial(), false), this.context)), Effect.andThen(this.latestFinalResult),
Effect.andThen(previous => Effect.provide(
this.startCached(key, Option.isSome(previous)
? Result.willFetch(previous.value) as Result.Final<A, E, P>
: Result.initial()
),
this.context,
)),
Effect.andThen(sub => this.watch(sub)), Effect.andThen(sub => this.watch(sub)),
) )
} }
fetchSubscribable(key: K): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>> {
fetchSubscribable(key: K): Effect.Effect<
Subscribable.Subscribable<Result.Result<A, E, P>>,
never,
Scope.Scope
> {
return this.interrupt.pipe( return this.interrupt.pipe(
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
Effect.andThen(Effect.provide(this.startCached(key, Result.initial(), false), this.context)), Effect.andThen(this.latestFinalResult),
Effect.andThen(previous => Effect.provide(
this.startCached(key, Option.isSome(previous)
? Result.willFetch(previous.value) as Result.Final<A, E, P>
: Result.initial()
),
this.context,
)),
Effect.tap(sub => Effect.forkScoped(this.watch(sub))),
) )
} }
get refetch(): Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException> { get refetch(): Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException> {
return this.interrupt.pipe( return this.interrupt.pipe(
Effect.andThen(this.latestKey), Effect.andThen(Effect.Do),
Effect.andThen(identity), Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)),
Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), false), this.context)), Effect.bind("latestFinalResult", () => this.latestFinalResult),
Effect.andThen(({ latestKey, latestFinalResult }) => Effect.provide(
this.startCached(latestKey, Option.isSome(latestFinalResult)
? Result.willFetch(latestFinalResult.value) as Result.Final<A, E, P>
: Result.initial()
),
this.context,
)),
Effect.andThen(sub => this.watch(sub)), Effect.andThen(sub => this.watch(sub)),
) )
} }
get refetchSubscribable(): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, Cause.NoSuchElementException> {
get refetchSubscribable(): Effect.Effect<
Subscribable.Subscribable<Result.Result<A, E, P>>,
Cause.NoSuchElementException,
Scope.Scope
> {
return this.interrupt.pipe( return this.interrupt.pipe(
Effect.andThen(this.latestKey), Effect.andThen(Effect.Do),
Effect.andThen(identity), Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)),
Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), false), this.context)), Effect.bind("latestFinalResult", () => this.latestFinalResult),
Effect.andThen(({ latestKey, latestFinalResult }) => Effect.provide(
this.startCached(latestKey, Option.isSome(latestFinalResult)
? Result.willFetch(latestFinalResult.value) as Result.Final<A, E, P>
: Result.initial()
),
this.context,
)),
Effect.tap(sub => Effect.forkScoped(this.watch(sub))),
) )
} }
get refresh(): Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException> { get refresh(): Effect.Effect<Result.Final<A, E, P>, Cause.NoSuchElementException> {
return this.interrupt.pipe( return this.interrupt.pipe(
Effect.andThen(this.latestKey), Effect.andThen(Effect.Do),
Effect.andThen(identity), Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)),
Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), true), this.context)), Effect.bind("latestFinalResult", () => this.latestFinalResult),
Effect.andThen(({ latestKey, latestFinalResult }) => Effect.provide(
this.startCached(latestKey, Option.isSome(latestFinalResult)
? Result.willRefresh(latestFinalResult.value) as Result.Final<A, E, P>
: Result.initial()
),
this.context,
)),
Effect.andThen(sub => this.watch(sub)), Effect.andThen(sub => this.watch(sub)),
) )
} }
get refreshSubscribable(): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, Cause.NoSuchElementException> {
get refreshSubscribable(): Effect.Effect<
Subscribable.Subscribable<Result.Result<A, E, P>>,
Cause.NoSuchElementException,
Scope.Scope
> {
return this.interrupt.pipe( return this.interrupt.pipe(
Effect.andThen(this.latestKey), Effect.andThen(Effect.Do),
Effect.andThen(identity), Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)),
Effect.andThen(key => Effect.provide(this.start(key, Result.initial(), true), this.context)), Effect.bind("latestFinalResult", () => this.latestFinalResult),
Effect.andThen(({ latestKey, latestFinalResult }) => Effect.provide(
this.startCached(latestKey, Option.isSome(latestFinalResult)
? Result.willRefresh(latestFinalResult.value) as Result.Final<A, E, P>
: Result.initial()
),
this.context,
)),
Effect.tap(sub => Effect.forkScoped(this.watch(sub))),
) )
} }
startCached( startCached(
key: K, key: K,
initial: Result.Result<A, E, P>, initial: Result.Initial | Result.Final<A, E, P>,
refresh: boolean,
): Effect.Effect< ): Effect.Effect<
Subscribable.Subscribable<Result.Result<A, E, P>>, Subscribable.Subscribable<Result.Result<A, E, P>>,
never, never,
@@ -127,43 +196,35 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
> { > {
return Effect.andThen(this.getCacheEntry(key), Option.match({ return Effect.andThen(this.getCacheEntry(key), Option.match({
onSome: entry => QueryClient.isQueryClientCacheEntryStale(entry, this.staleTime) onSome: entry => QueryClient.isQueryClientCacheEntryStale(entry, this.staleTime)
? Effect.map( ? Effect.succeed(Subscribable.make({
SubscriptionRef.set(this.result, entry.result as Result.Result<A, E, P>), get: Effect.succeed(entry.result as Result.Result<A, E, P>),
() => Subscribable.make({ get changes() { return Stream.make(entry.result as Result.Result<A, E, P>) },
get: Effect.succeed(entry.result as Result.Result<A, E, P>), }))
get changes() { return Stream.empty }, : this.start(key, Result.willRefresh(entry.result) as Result.Final<A, E, P>),
}), onNone: () => this.start(key, initial),
)
: this.start(key, Result.optimistic(entry.result) as Result.Result<A, E, P>, false),
onNone: () => this.start(key, initial, refresh),
})) }))
} }
start( start(
key: K, key: K,
initial: Result.Result<A, E, P>, initial: Result.Initial | Result.Final<A, E, P>,
refresh: boolean,
): Effect.Effect< ): Effect.Effect<
Subscribable.Subscribable<Result.Result<A, E, P>>, Subscribable.Subscribable<Result.Result<A, E, P>>,
never, never,
Scope.Scope | QueryClient.QueryClient | R Scope.Scope | QueryClient.QueryClient | R
> { > {
return this.result.pipe( return Result.unsafeForkEffect(
Effect.map(previous => Result.isFinal(previous) ? previous : undefined), Effect.onExit(this.f(key), exit => Effect.andThen(
Effect.andThen(previous => Result.unsafeForkEffect( Exit.isSuccess(exit)
Effect.onExit(this.f(key), exit => Effect.andThen( ? this.updateCacheEntry(key, Result.fromExit(exit))
Exit.isSuccess(exit) : Effect.void,
? this.updateCacheEntry(key, Result.succeed(exit.value)) SubscriptionRef.set(this.fiber, Option.none()),
: Effect.void,
SubscriptionRef.set(this.fiber, Option.none()),
)),
{
initial,
initialProgress: this.initialProgress,
refresh: refresh && previous,
previous,
} as Result.unsafeForkEffect.Options<A, E, P>,
)), )),
{
initial,
initialProgress: this.initialProgress,
} as Result.unsafeForkEffect.Options<A, E, P>,
).pipe(
Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))), Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))),
Effect.map(([sub]) => sub), Effect.map(([sub]) => sub),
) )
@@ -172,14 +233,14 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
watch( watch(
sub: Subscribable.Subscribable<Result.Result<A, E, P>> sub: Subscribable.Subscribable<Result.Result<A, E, P>>
): Effect.Effect<Result.Final<A, E, P>> { ): Effect.Effect<Result.Final<A, E, P>> {
return Effect.andThen( return sub.get.pipe(
sub.get, Effect.andThen(initial => Stream.runFoldEffect(
initial => Stream.runFoldEffect(
Stream.filter(sub.changes, Predicate.not(Result.isInitial)), Stream.filter(sub.changes, Predicate.not(Result.isInitial)),
initial, initial,
(_, result) => Effect.as(SubscriptionRef.set(this.result, result), result), (_, result) => Effect.as(SubscriptionRef.set(this.result, result), result),
), ) as Effect.Effect<Result.Final<A, E, P>>),
) as Effect.Effect<Result.Final<A, E, P>> Effect.tap(result => SubscriptionRef.set(this.latestFinalResult, Option.some(result))),
)
} }
getCacheEntry( getCacheEntry(
@@ -239,6 +300,7 @@ export const make = Effect.fnUntraced(function* <K extends Query.AnyKey, A, E =
yield* SubscriptionRef.make(Option.none<K>()), yield* SubscriptionRef.make(Option.none<K>()),
yield* SubscriptionRef.make(Option.none<Fiber.Fiber<A, E>>()), yield* SubscriptionRef.make(Option.none<Fiber.Fiber<A, E>>()),
yield* SubscriptionRef.make(Result.initial<A, E, P>()), yield* SubscriptionRef.make(Result.initial<A, E, P>()),
yield* SubscriptionRef.make(Option.none<Result.Final<A, E, P>>()),
yield* Effect.makeSemaphore(1), yield* Effect.makeSemaphore(1),
) )

View File

@@ -1,18 +1,17 @@
import { Cause, Context, Data, Effect, Equal, Exit, type Fiber, Hash, Layer, Match, Option, Pipeable, Predicate, PubSub, pipe, Ref, type Scope, Stream, Subscribable } from "effect" import { Cause, Context, Data, Effect, Equal, Exit, type Fiber, Hash, Layer, Match, Pipeable, Predicate, PubSub, pipe, Ref, type Scope, Stream, Subscribable } from "effect"
export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result") export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result")
export type ResultTypeId = typeof ResultTypeId export type ResultTypeId = typeof ResultTypeId
export type Result<A, E = never, P = never> = ( export type Result<A, E = never, P = never> = (
// biome-ignore lint/complexity/noBannedTypes: "{}" is relevant here | Initial
| (Initial & ({} | WillFetch))
| Running<P> | Running<P>
| Final<A, E, P> | Final<A, E, P>
) )
// biome-ignore lint/complexity/noBannedTypes: "{}" is relevant here // biome-ignore lint/complexity/noBannedTypes: "{}" is relevant here
export type Final<A, E = never, P = never> = (Success<A> | Failure<A, E>) & ({} | Flags<P>) export type Final<A, E = never, P = never> = (Success<A> | Failure<E>) & ({} | Flags<P>)
export type Flags<P = never> = WillFetch | WillRefresh | Refreshing<P> export type Flags<P = never> = WillFetch | WillRefresh | Refreshing<P>
export declare namespace Result { export declare namespace Result {
@@ -43,10 +42,9 @@ export interface Success<A> extends Result.Prototype {
readonly value: A readonly value: A
} }
export interface Failure<A, E = never> extends Result.Prototype { export interface Failure<E = never> extends Result.Prototype {
readonly _tag: "Failure" readonly _tag: "Failure"
readonly cause: Cause.Cause<E> readonly cause: Cause.Cause<E>
readonly previousSuccess: Option.Option<Success<A>>
} }
export interface WillFetch { export interface WillFetch {
@@ -76,7 +74,7 @@ const ResultPrototype = Object.freeze({
Match.tag("Initial", () => true), Match.tag("Initial", () => true),
Match.tag("Running", self => Equal.equals(self.progress, (that as Running<any>).progress)), Match.tag("Running", self => Equal.equals(self.progress, (that as Running<any>).progress)),
Match.tag("Success", self => Equal.equals(self.value, (that as Success<any>).value)), Match.tag("Success", self => Equal.equals(self.value, (that as Success<any>).value)),
Match.tag("Failure", self => Equal.equals(self.cause, (that as Failure<any, any>).cause)), Match.tag("Failure", self => Equal.equals(self.cause, (that as Failure<any>).cause)),
Match.exhaustive, Match.exhaustive,
) )
}, },
@@ -105,7 +103,7 @@ export const isFinal = (u: unknown): u is Final<unknown, unknown, unknown> => is
export const isInitial = (u: unknown): u is Initial => isResult(u) && u._tag === "Initial" export const isInitial = (u: unknown): u is Initial => isResult(u) && u._tag === "Initial"
export const isRunning = (u: unknown): u is Running<unknown> => isResult(u) && u._tag === "Running" export const isRunning = (u: unknown): u is Running<unknown> => isResult(u) && u._tag === "Running"
export const isSuccess = (u: unknown): u is Success<unknown> => isResult(u) && u._tag === "Success" export const isSuccess = (u: unknown): u is Success<unknown> => isResult(u) && u._tag === "Success"
export const isFailure = (u: unknown): u is Failure<unknown, unknown> => isResult(u) && u._tag === "Failure" export const isFailure = (u: unknown): u is Failure<unknown> => isResult(u) && u._tag === "Failure"
export const hasFlag = (u: unknown): u is Flags => isResult(u) && Predicate.hasProperty(u, "_flag") export const hasFlag = (u: unknown): u is Flags => isResult(u) && Predicate.hasProperty(u, "_flag")
export const hasWillFetchFlag = (u: unknown): u is WillFetch => isResult(u) && Predicate.hasProperty(u, "_flag") && u._flag === "WillFetch" export const hasWillFetchFlag = (u: unknown): u is WillFetch => isResult(u) && Predicate.hasProperty(u, "_flag") && u._flag === "WillFetch"
export const hasWillRefreshFlag = (u: unknown): u is WillRefresh => isResult(u) && Predicate.hasProperty(u, "_flag") && u._flag === "WillRefresh" export const hasWillRefreshFlag = (u: unknown): u is WillRefresh => isResult(u) && Predicate.hasProperty(u, "_flag") && u._flag === "WillRefresh"
@@ -117,17 +115,9 @@ export const initial: {
} = (): Initial => Object.setPrototypeOf({ _tag: "Initial" }, ResultPrototype) } = (): Initial => Object.setPrototypeOf({ _tag: "Initial" }, ResultPrototype)
export const running = <P = never>(progress?: P): Running<P> => Object.setPrototypeOf({ _tag: "Running", progress }, ResultPrototype) export const running = <P = never>(progress?: P): Running<P> => Object.setPrototypeOf({ _tag: "Running", progress }, ResultPrototype)
export const succeed = <A>(value: A): Success<A> => Object.setPrototypeOf({ _tag: "Success", value }, ResultPrototype) export const succeed = <A>(value: A): Success<A> => Object.setPrototypeOf({ _tag: "Success", value }, ResultPrototype)
export const fail = <E>(cause: Cause.Cause<E> ): Failure<E> => Object.setPrototypeOf({ _tag: "Failure", cause }, ResultPrototype)
export const fail = <E, A = never>( export const willFetch = <R extends Final<any, any, any>>(
cause: Cause.Cause<E>,
previousSuccess?: Success<NoInfer<A>>,
): Failure<A, E> => Object.setPrototypeOf({
_tag: "Failure",
cause,
previousSuccess: Option.fromNullable(previousSuccess),
}, ResultPrototype)
export const willFetch = <R extends Initial | Final<any, any, any>>(
result: R result: R
): Omit<R, keyof Flags.Keys> & WillFetch => Object.setPrototypeOf( ): Omit<R, keyof Flags.Keys> & WillFetch => Object.setPrototypeOf(
Object.assign({}, result, { _flag: "WillFetch" }), Object.assign({}, result, { _flag: "WillFetch" }),
@@ -149,16 +139,18 @@ export const refreshing = <R extends Final<any, any, any>, P = never>(
Object.getPrototypeOf(result), Object.getPrototypeOf(result),
) )
export const fromExit = <A, E>( export const fromExit: {
exit: Exit.Exit<A, E>, <A, E>(exit: Exit.Success<A, E>): Success<A>
previousSuccess?: Success<NoInfer<A>>, <A, E>(exit: Exit.Failure<A, E>): Failure<E>
): Success<A> | Failure<A, E> => exit._tag === "Success" <A, E>(exit: Exit.Exit<A, E>): Success<A> | Failure<E>
? succeed(exit.value) } = exit => (exit._tag === "Success" ? succeed(exit.value) : fail(exit.cause)) as any
: fail(exit.cause, previousSuccess)
export const toExit = <A, E, P>( export const toExit: {
self: Result<A, E, P> <A>(self: Success<A>): Exit.Success<A, never>
): Exit.Exit<A, E | Cause.NoSuchElementException> => { <E>(self: Failure<E>): Exit.Failure<never, E>
<A, E, P>(self: Final<A, E, P>): Exit.Exit<A, E>
<A, E, P>(self: Result<A, E, P>): Exit.Exit<A, E | Cause.NoSuchElementException>
} = <A, E, P>(self: Result<A, E, P>): any => {
switch (self._tag) { switch (self._tag) {
case "Success": case "Success":
return Exit.succeed(self.value) return Exit.succeed(self.value)
@@ -218,8 +210,7 @@ export namespace unsafeForkEffect {
export type OutputContext<A, E, R, P> = Exclude<R, State<A, E, P> | Progress<P> | Progress<never>> export type OutputContext<A, E, R, P> = Exclude<R, State<A, E, P> | Progress<P> | Progress<never>>
export interface Options<A, E, P> { export interface Options<A, E, P> {
// biome-ignore lint/complexity/noBannedTypes: "{}" is relevant here readonly initial?: Initial | Final<A, E, P>
readonly initial?: (Initial | Success<A> | Failure<A, E>) & ({} | WillFetch | WillRefresh)
readonly initialProgress?: P readonly initialProgress?: P
} }
} }
@@ -242,10 +233,7 @@ export const unsafeForkEffect = <A, E, R, P = never>(
).pipe( ).pipe(
Effect.andThen(effect), Effect.andThen(effect),
Effect.onExit(exit => Effect.andThen( Effect.onExit(exit => Effect.andThen(
state.set(fromExit( state.set(fromExit(exit)),
exit,
isSuccess(options?.initial) ? options.initial : undefined),
),
Effect.forkScoped(PubSub.shutdown(pubsub)), Effect.forkScoped(PubSub.shutdown(pubsub)),
)), )),
)), )),