Query work
All checks were successful
Lint / lint (push) Successful in 39s

This commit is contained in:
Julien Valverdé
2025-11-18 04:50:11 +01:00
parent c4bfcb07c1
commit 3ff646db0f

View File

@@ -1,4 +1,4 @@
import { Effect, Fiber, Option, Pipeable, Predicate, Stream, type Subscribable, type SubscriptionRef } from "effect" import { Effect, Fiber, Option, Pipeable, Predicate, Stream, type Subscribable, SubscriptionRef } from "effect"
import * as Result from "./Result.js" import * as Result from "./Result.js"
@@ -13,7 +13,7 @@ extends Pipeable.Pipeable {
readonly f: (key: K) => Effect.Effect<A, E, R> readonly f: (key: K) => Effect.Effect<A, E, R>
readonly initialProgress: P readonly initialProgress: P
readonly fiber: Subscribable.Subscribable<Option.Option<Fiber.Fiber<Result.Result<A, E, P>>>> readonly fiber: Subscribable.Subscribable<Option.Option<Fiber.Fiber<A, E>>>
readonly result: Subscribable.Subscribable<Result.Result<A, E, P>> readonly result: Subscribable.Subscribable<Result.Result<A, E, P>>
} }
@@ -26,40 +26,68 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
readonly f: (key: K) => Effect.Effect<A, E, R>, readonly f: (key: K) => Effect.Effect<A, E, R>,
readonly initialProgress: P, readonly initialProgress: P,
readonly fiber: SubscriptionRef.SubscriptionRef<Option.Option<Fiber.Fiber<Result.Result<A, E, P>>>>, readonly fiber: SubscriptionRef.SubscriptionRef<Option.Option<Fiber.Fiber<A, E>>>,
readonly result: SubscriptionRef.SubscriptionRef<Result.Result<A, E, P>>, readonly result: SubscriptionRef.SubscriptionRef<Result.Result<A, E, P>>,
) { ) {
super() super()
} }
query(key: K) { interrupt(): Effect.Effect<void> {
return Effect.andThen(this.fiber, Option.match({
onSome: fiber => Effect.andThen(
Fiber.interrupt(fiber),
SubscriptionRef.set(this.fiber, Option.none()),
),
onNone: () => Effect.void,
}))
}
query(key: K): Effect.Effect<
Result.Result<A, E, P>,
never,
Result.forkEffect.OutputContext<A, E, R, never>
> {
return this.fiber.pipe( return this.fiber.pipe(
Effect.andThen(Option.match({ Effect.andThen(this.interrupt()),
onSome: Fiber.interrupt, Effect.andThen(Result.unsafeForkEffect(this.f(key))),
onNone: () => Effect.void, Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))),
})), Effect.andThen(([sub]) => Effect.all([Effect.succeed(sub), sub.get])),
Effect.andThen(Result.forkEffect(this.f(key))), Effect.andThen(([sub, initial]) => Stream.runFoldEffect(
Effect.tap(([result, fiber]) => ), sub.changes,
initial,
(_, result) => Effect.as(SubscriptionRef.set(this.result, result), result),
)),
Effect.tap(SubscriptionRef.set(this.fiber, Option.none())),
) )
} }
} }
export const isQuery = (u: unknown): u is Query<unknown[], unknown, unknown, unknown, unknown> => Predicate.hasProperty(u, QueryTypeId) export const isQuery = (u: unknown): u is Query<unknown[], unknown, unknown, unknown, unknown> => Predicate.hasProperty(u, QueryTypeId)
export declare namespace make {
export interface Options<K extends readonly any[], A, E = never, R = never, P = never> {
readonly key: Stream.Stream<K>
readonly f: (key: NoInfer<K>) => Effect.Effect<A, E, R>
readonly initialProgress?: P
}
}
export const make = Effect.fnUntraced(function* <K extends readonly any[], A, E = never, R = never, P = never>(
options: make.Options<K, A, E, R, P>
) {
return new QueryImpl(
options.key,
options.f,
options.initialProgress as P,
yield* SubscriptionRef.make(Option.none<Fiber.Fiber<A, E>>()),
yield* SubscriptionRef.make(Result.initial() as Result.Result<A, E, P>),
)
})
export const run = <K extends readonly any[], A, E, R, P>( export const run = <K extends readonly any[], A, E, R, P>(
self: Query<K, A, E, R, P> self: Query<K, A, E, R, P>
) => Stream.runForEach(self.key, ) => Stream.runForEach(self.key, key => Effect.andThen(
) (self as QueryImpl<K, A, E, R, P>).interrupt(),
Effect.forkScoped((self as QueryImpl<K, A, E, R, P>).query(key)),
export const query = <K extends readonly any[], A, E, R, P>( ))
self: Query<K, A, E, R, P>,
key: K,
) => self.fiberRef.pipe(
Effect.andThen(Option.match({
onSome: Fiber.interrupt,
onNone: () => Effect.void,
})),
Effect.andThen(Effect.forkScoped(
))
)