From 6f50cf298991be8d48fcf2ffbeb7ea1187ad6b5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Tue, 25 Nov 2025 16:03:28 +0100 Subject: [PATCH] Refactor query --- packages/effect-fc/src/Query.ts | 78 ++++++++++++++++++++++++++------ packages/effect-fc/src/Result.ts | 6 +-- 2 files changed, 68 insertions(+), 16 deletions(-) diff --git a/packages/effect-fc/src/Query.ts b/packages/effect-fc/src/Query.ts index 3c7a2f5..33578d4 100644 --- a/packages/effect-fc/src/Query.ts +++ b/packages/effect-fc/src/Query.ts @@ -1,4 +1,4 @@ -import { Effect, Fiber, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect" +import { type Cause, type Context, Effect, Fiber, identity, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect" import * as Result from "./Result.js" @@ -16,9 +16,13 @@ extends Pipeable.Pipeable { readonly latestKey: Subscribable.Subscribable> readonly fiber: Subscribable.Subscribable>> readonly result: Subscribable.Subscribable> + + fetch(key: K): Effect.Effect>> + readonly refetch: Effect.Effect>, Cause.NoSuchElementException> + readonly refresh: Effect.Effect>, Cause.NoSuchElementException> } -class QueryImpl +class QueryImpl extends Pipeable.Class() implements Query { readonly [QueryTypeId]: QueryTypeId = QueryTypeId @@ -30,28 +34,43 @@ extends Pipeable.Class() implements Query { readonly latestKey: SubscriptionRef.SubscriptionRef>, readonly fiber: SubscriptionRef.SubscriptionRef>>, readonly result: SubscriptionRef.SubscriptionRef>, + + readonly context: Context.Context>, ) { super() } get interrupt(): Effect.Effect { - return Effect.andThen(this.fiber, Option.match({ - onSome: Fiber.interrupt, - onNone: () => Effect.void, - })) + return this.fiber.pipe( + Effect.andThen(Option.match({ + onSome: Fiber.interrupt, + onNone: () => Effect.void, + })), + Effect.andThen(Effect.sleep("0 millis")), + ) } start( - key: K + key: K, + refresh?: boolean, ): Effect.Effect< Subscribable.Subscribable>, never, Scope.Scope | R > { - return Result.unsafeForkEffect( - Effect.onExit(this.f(key), () => SubscriptionRef.set(this.fiber, Option.none())), - { initialProgress: this.initialProgress }, - ).pipe( + return this.result.pipe( + Effect.map(previous => (Result.isSuccess(previous) || Result.isFailure(previous)) + ? Option.some(previous) + : Option.none() + ), + Effect.andThen(previous => Result.unsafeForkEffect( + Effect.onExit(this.f(key), () => SubscriptionRef.set(this.fiber, Option.none())), + { + initialProgress: this.initialProgress, + refresh: refresh && Option.isSome(previous), + previous: Option.getOrUndefined(previous), + } as Result.unsafeForkEffect.Options, + )), Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))), Effect.map(([sub]) => sub), ) @@ -69,6 +88,32 @@ extends Pipeable.Class() implements Query { ), ) } + + fetch(key: K): Effect.Effect>> { + return this.interrupt.pipe( + Effect.andThen(SubscriptionRef.set(this.latestKey, Option.some(key))), + Effect.andThen(this.start(key)), + Effect.provide(this.context), + ) + } + + get refetch(): Effect.Effect>, Cause.NoSuchElementException> { + return this.interrupt.pipe( + Effect.andThen(this.latestKey), + Effect.andThen(identity), + Effect.andThen(key => this.start(key)), + Effect.provide(this.context), + ) + } + + get refresh(): Effect.Effect>, Cause.NoSuchElementException> { + return this.interrupt.pipe( + Effect.andThen(this.latestKey), + Effect.andThen(identity), + Effect.andThen(key => this.start(key, true)), + Effect.provide(this.context), + ) + } } export const isQuery = (u: unknown): u is Query => Predicate.hasProperty(u, QueryTypeId) @@ -83,7 +128,11 @@ export declare namespace make { export const make = Effect.fnUntraced(function* ( options: make.Options -): Effect.fn.Return, P>> { +): Effect.fn.Return< + Query, P>, + never, + Scope.Scope | Result.forkEffect.OutputContext +> { return new QueryImpl( options.key, options.f as any, @@ -92,6 +141,8 @@ export const make = Effect.fnUntraced(function* ()), yield* SubscriptionRef.make(Option.none>()), yield* SubscriptionRef.make(Result.initial()), + + yield* Effect.context>(), ) }) @@ -108,11 +159,12 @@ export const service = ( self: Query -): Effect.Effect => { +): Effect.Effect => { const _self = self as QueryImpl return Stream.runForEach(_self.key, key => _self.interrupt.pipe( Effect.andThen(SubscriptionRef.set(_self.latestKey, Option.some(key))), Effect.andThen(_self.start(key)), Effect.andThen(sub => Effect.forkScoped(_self.watch(sub))), + Effect.provide(_self.context), )) } diff --git a/packages/effect-fc/src/Result.ts b/packages/effect-fc/src/Result.ts index ec2679f..f91c5e5 100644 --- a/packages/effect-fc/src/Result.ts +++ b/packages/effect-fc/src/Result.ts @@ -198,11 +198,11 @@ export namespace unsafeForkEffect { readonly previous?: Success | Failure } & ( | { - readonly refreshing: true + readonly refresh: true readonly previous: Success | Failure } | { - readonly refreshing?: false + readonly refresh?: false } ) } @@ -218,7 +218,7 @@ export const unsafeForkEffect = ( Effect.bind("ref", () => Ref.make(initial())), Effect.bind("pubsub", () => PubSub.unbounded>()), Effect.bind("fiber", ({ ref, pubsub }) => Effect.forkScoped(State().pipe( - Effect.andThen(state => state.set(options?.refreshing + Effect.andThen(state => state.set(options?.refresh ? refreshing(options.previous, options?.initialProgress) as Result : running(options?.initialProgress) ).pipe(