Refactor query
All checks were successful
Lint / lint (push) Successful in 13s

This commit is contained in:
Julien Valverdé
2025-11-25 16:03:28 +01:00
parent aa243c6493
commit 6f50cf2989
2 changed files with 68 additions and 16 deletions

View File

@@ -1,4 +1,4 @@
import { Effect, Fiber, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect"
import { type Cause, type Context, Effect, Fiber, identity, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect"
import * as Result from "./Result.js"
@@ -16,9 +16,13 @@ extends Pipeable.Pipeable {
readonly latestKey: Subscribable.Subscribable<Option.Option<K>>
readonly fiber: Subscribable.Subscribable<Option.Option<Fiber.Fiber<A, E>>>
readonly result: Subscribable.Subscribable<Result.Result<A, E, P>>
fetch(key: K): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>>
readonly refetch: Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, Cause.NoSuchElementException>
readonly refresh: Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, Cause.NoSuchElementException>
}
class QueryImpl<in out K extends readonly any[], in out A, in out E = never, out R = never, in out P = never>
class QueryImpl<in out K extends readonly any[], in out A, in out E = never, in out R = never, in out P = never>
extends Pipeable.Class() implements Query<K, A, E, R, P> {
readonly [QueryTypeId]: QueryTypeId = QueryTypeId
@@ -30,28 +34,43 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
readonly latestKey: SubscriptionRef.SubscriptionRef<Option.Option<K>>,
readonly fiber: SubscriptionRef.SubscriptionRef<Option.Option<Fiber.Fiber<A, E>>>,
readonly result: SubscriptionRef.SubscriptionRef<Result.Result<A, E, P>>,
readonly context: Context.Context<Scope.Scope | NoInfer<R>>,
) {
super()
}
get interrupt(): Effect.Effect<void, never, never> {
return Effect.andThen(this.fiber, Option.match({
onSome: Fiber.interrupt,
onNone: () => Effect.void,
}))
return this.fiber.pipe(
Effect.andThen(Option.match({
onSome: Fiber.interrupt,
onNone: () => Effect.void,
})),
Effect.andThen(Effect.sleep("0 millis")),
)
}
start(
key: K
key: K,
refresh?: boolean,
): Effect.Effect<
Subscribable.Subscribable<Result.Result<A, E, P>>,
never,
Scope.Scope | R
> {
return Result.unsafeForkEffect(
Effect.onExit(this.f(key), () => SubscriptionRef.set(this.fiber, Option.none())),
{ initialProgress: this.initialProgress },
).pipe(
return this.result.pipe(
Effect.map(previous => (Result.isSuccess(previous) || Result.isFailure(previous))
? Option.some(previous)
: Option.none()
),
Effect.andThen(previous => Result.unsafeForkEffect(
Effect.onExit(this.f(key), () => SubscriptionRef.set(this.fiber, Option.none())),
{
initialProgress: this.initialProgress,
refresh: refresh && Option.isSome(previous),
previous: Option.getOrUndefined(previous),
} as Result.unsafeForkEffect.Options<A, E, P>,
)),
Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))),
Effect.map(([sub]) => sub),
)
@@ -69,6 +88,32 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
),
)
}
fetch(key: K): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>> {
return this.interrupt.pipe(
Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))),
Effect.andThen(this.start(key)),
Effect.provide(this.context),
)
}
get refetch(): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, Cause.NoSuchElementException> {
return this.interrupt.pipe(
Effect.andThen(this.latestKey),
Effect.andThen(identity),
Effect.andThen(key => this.start(key)),
Effect.provide(this.context),
)
}
get refresh(): Effect.Effect<Subscribable.Subscribable<Result.Result<A, E, P>>, Cause.NoSuchElementException> {
return this.interrupt.pipe(
Effect.andThen(this.latestKey),
Effect.andThen(identity),
Effect.andThen(key => this.start(key, true)),
Effect.provide(this.context),
)
}
}
export const isQuery = (u: unknown): u is Query<unknown[], unknown, unknown, unknown, unknown> => Predicate.hasProperty(u, QueryTypeId)
@@ -83,7 +128,11 @@ export declare namespace make {
export const make = Effect.fnUntraced(function* <K extends readonly any[], A, E = never, R = never, P = never>(
options: make.Options<K, A, E, R, P>
): Effect.fn.Return<Query<K, A, E, Result.forkEffect.OutputContext<A, E, R, P>, P>> {
): Effect.fn.Return<
Query<K, A, E, Result.forkEffect.OutputContext<A, E, R, P>, P>,
never,
Scope.Scope | Result.forkEffect.OutputContext<A, E, R, P>
> {
return new QueryImpl(
options.key,
options.f as any,
@@ -92,6 +141,8 @@ export const make = Effect.fnUntraced(function* <K extends readonly any[], A, E
yield* SubscriptionRef.make(Option.none<K>()),
yield* SubscriptionRef.make(Option.none<Fiber.Fiber<A, E>>()),
yield* SubscriptionRef.make(Result.initial<A, E, P>()),
yield* Effect.context<Scope.Scope | Result.forkEffect.OutputContext<A, E, R, P>>(),
)
})
@@ -108,11 +159,12 @@ export const service = <K extends readonly any[], A, E = never, R = never, P = n
export const run = <K extends readonly any[], A, E, R, P>(
self: Query<K, A, E, R, P>
): Effect.Effect<void, never, Scope.Scope | R> => {
): Effect.Effect<void> => {
const _self = self as QueryImpl<K, A, E, R, P>
return Stream.runForEach(_self.key, key => _self.interrupt.pipe(
Effect.andThen(SubscriptionRef.set(_self.latestKey, Option.some(key))),
Effect.andThen(_self.start(key)),
Effect.andThen(sub => Effect.forkScoped(_self.watch(sub))),
Effect.provide(_self.context),
))
}

View File

@@ -198,11 +198,11 @@ export namespace unsafeForkEffect {
readonly previous?: Success<A> | Failure<A, E>
} & (
| {
readonly refreshing: true
readonly refresh: true
readonly previous: Success<A> | Failure<A, E>
}
| {
readonly refreshing?: false
readonly refresh?: false
}
)
}
@@ -218,7 +218,7 @@ export const unsafeForkEffect = <A, E, R, P = never>(
Effect.bind("ref", () => Ref.make(initial<A, E, P>())),
Effect.bind("pubsub", () => PubSub.unbounded<Result<A, E, P>>()),
Effect.bind("fiber", ({ ref, pubsub }) => Effect.forkScoped(State<A, E, P>().pipe(
Effect.andThen(state => state.set(options?.refreshing
Effect.andThen(state => state.set(options?.refresh
? refreshing(options.previous, options?.initialProgress) as Result<A, E, P>
: running(options?.initialProgress)
).pipe(