From dad4cd60d1811bca67d55beb5360dda10ec21f1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Thu, 15 Jan 2026 22:19:56 +0100 Subject: [PATCH] Refactor query --- packages/effect-fc/src/Mutation.ts | 2 +- packages/effect-fc/src/Query.ts | 56 +++++++++++++++++++----------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/packages/effect-fc/src/Mutation.ts b/packages/effect-fc/src/Mutation.ts index cec9b54..45dd11e 100644 --- a/packages/effect-fc/src/Mutation.ts +++ b/packages/effect-fc/src/Mutation.ts @@ -72,7 +72,7 @@ extends Pipeable.Class() implements Mutation { ? SubscriptionRef.set(this.fiber, Option.none()) : Effect.void, onNone: () => Effect.void, - }) + }), )), { diff --git a/packages/effect-fc/src/Query.ts b/packages/effect-fc/src/Query.ts index f4b49a7..32421f3 100644 --- a/packages/effect-fc/src/Query.ts +++ b/packages/effect-fc/src/Query.ts @@ -1,4 +1,4 @@ -import { type Cause, type Context, DateTime, type Duration, Effect, Equivalence, Exit, Fiber, HashMap, identity, Option, Pipeable, Predicate, type Scope, Stream, Subscribable, SubscriptionRef } from "effect" +import { type Cause, type Context, DateTime, type Duration, Effect, Equal, Equivalence, Fiber, HashMap, identity, Option, Pipeable, Predicate, type Scope, Stream, Subscribable, SubscriptionRef } from "effect" import * as QueryClient from "./QueryClient.js" import * as Result from "./Result.js" @@ -67,7 +67,7 @@ extends Pipeable.Class() implements Query { ? Result.willFetch(previous.value) as Result.Final : Result.initial() )), - Effect.andThen(sub => Effect.forkScoped(this.watch(sub))), + Effect.andThen(sub => Effect.forkScoped(this.watch(key, sub))), this.runSemaphore.withPermits(1), )), this.context, @@ -89,7 +89,7 @@ extends Pipeable.Class() implements Query { ? Result.willFetch(previous.value) as Result.Final : Result.initial() )), - Effect.andThen(sub => this.watch(sub)), + Effect.andThen(sub => this.watch(key, sub)), Effect.provide(this.context), ) } @@ -102,7 +102,7 @@ extends Pipeable.Class() implements Query { ? Result.willFetch(previous.value) as Result.Final : Result.initial() )), - Effect.tap(sub => Effect.forkScoped(this.watch(sub))), + Effect.tap(sub => Effect.forkScoped(this.watch(key, sub))), Effect.provide(this.context), ) } @@ -112,11 +112,13 @@ extends Pipeable.Class() implements Query { Effect.andThen(Effect.Do), Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)), Effect.bind("latestFinalResult", () => this.latestFinalResult), - Effect.andThen(({ latestKey, latestFinalResult }) => this.startCached(latestKey, Option.isSome(latestFinalResult) - ? Result.willRefresh(latestFinalResult.value) as Result.Final - : Result.initial() - )), - Effect.andThen(sub => this.watch(sub)), + Effect.bind("subscribable", ({ latestKey, latestFinalResult }) => + this.startCached(latestKey, Option.isSome(latestFinalResult) + ? Result.willRefresh(latestFinalResult.value) as Result.Final + : Result.initial() + ) + ), + Effect.andThen(({ latestKey, subscribable }) => this.watch(latestKey, subscribable)), Effect.provide(this.context), ) } @@ -129,11 +131,14 @@ extends Pipeable.Class() implements Query { Effect.andThen(Effect.Do), Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)), Effect.bind("latestFinalResult", () => this.latestFinalResult), - Effect.andThen(({ latestKey, latestFinalResult }) => this.startCached(latestKey, Option.isSome(latestFinalResult) - ? Result.willRefresh(latestFinalResult.value) as Result.Final - : Result.initial() - )), - Effect.tap(sub => Effect.forkScoped(this.watch(sub))), + Effect.bind("subscribable", ({ latestKey, latestFinalResult }) => + this.startCached(latestKey, Option.isSome(latestFinalResult) + ? Result.willRefresh(latestFinalResult.value) as Result.Final + : Result.initial() + ) + ), + Effect.tap(({ latestKey, subscribable }) => Effect.forkScoped(this.watch(latestKey, subscribable))), + Effect.map(({ subscribable }) => subscribable), Effect.provide(this.context), ) } @@ -166,15 +171,19 @@ extends Pipeable.Class() implements Query { ): Effect.Effect< Subscribable.Subscribable>, never, - Scope.Scope | QueryClient.QueryClient | R + Scope.Scope | R > { return Result.unsafeForkEffect( - Effect.onExit(this.f(key), exit => Effect.andThen( - SubscriptionRef.set(this.fiber, Option.none()), - Exit.isSuccess(exit) - ? this.updateCacheEntry(key, Result.fromExit(exit)) - : Effect.void, + Effect.onExit(this.f(key), () => Effect.andThen( + Effect.all([Effect.fiberId, this.fiber]), + ([currentFiberId, fiber]) => Option.match(fiber, { + onSome: v => Equal.equals(currentFiberId, v.id()) + ? SubscriptionRef.set(this.fiber, Option.none()) + : Effect.void, + onNone: () => Effect.void, + }), )), + { initial, initialProgress: this.initialProgress, @@ -186,8 +195,9 @@ extends Pipeable.Class() implements Query { } watch( + key: K, sub: Subscribable.Subscribable> - ): Effect.Effect> { + ): Effect.Effect, never, QueryClient.QueryClient> { return sub.get.pipe( Effect.andThen(initial => Stream.runFoldEffect( sub.changes, @@ -195,6 +205,10 @@ extends Pipeable.Class() implements Query { (_, result) => Effect.as(SubscriptionRef.set(this.result, result), result), ) as Effect.Effect>), Effect.tap(result => SubscriptionRef.set(this.latestFinalResult, Option.some(result))), + Effect.tap(result => Result.isSuccess(result) + ? this.updateCacheEntry(key, result) + : Effect.void + ), ) }