0.2.2 #31

Merged
Thilawyn merged 184 commits from next into master 2026-01-16 17:05:31 +01:00
2 changed files with 36 additions and 22 deletions
Showing only changes of commit dad4cd60d1 - Show all commits

View File

@@ -72,7 +72,7 @@ extends Pipeable.Class() implements Mutation<K, A, E, R, P> {
? SubscriptionRef.set(this.fiber, Option.none()) ? SubscriptionRef.set(this.fiber, Option.none())
: Effect.void, : Effect.void,
onNone: () => Effect.void, onNone: () => Effect.void,
}) }),
)), )),
{ {

View File

@@ -1,4 +1,4 @@
import { type Cause, type Context, DateTime, type Duration, Effect, Equivalence, Exit, Fiber, HashMap, identity, Option, Pipeable, Predicate, type Scope, Stream, Subscribable, SubscriptionRef } from "effect" import { type Cause, type Context, DateTime, type Duration, Effect, Equal, Equivalence, Fiber, HashMap, identity, Option, Pipeable, Predicate, type Scope, Stream, Subscribable, SubscriptionRef } from "effect"
import * as QueryClient from "./QueryClient.js" import * as QueryClient from "./QueryClient.js"
import * as Result from "./Result.js" import * as Result from "./Result.js"
@@ -67,7 +67,7 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
? Result.willFetch(previous.value) as Result.Final<A, E, P> ? Result.willFetch(previous.value) as Result.Final<A, E, P>
: Result.initial() : Result.initial()
)), )),
Effect.andThen(sub => Effect.forkScoped(this.watch(sub))), Effect.andThen(sub => Effect.forkScoped(this.watch(key, sub))),
this.runSemaphore.withPermits(1), this.runSemaphore.withPermits(1),
)), )),
this.context, this.context,
@@ -89,7 +89,7 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
? Result.willFetch(previous.value) as Result.Final<A, E, P> ? Result.willFetch(previous.value) as Result.Final<A, E, P>
: Result.initial() : Result.initial()
)), )),
Effect.andThen(sub => this.watch(sub)), Effect.andThen(sub => this.watch(key, sub)),
Effect.provide(this.context), Effect.provide(this.context),
) )
} }
@@ -102,7 +102,7 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
? Result.willFetch(previous.value) as Result.Final<A, E, P> ? Result.willFetch(previous.value) as Result.Final<A, E, P>
: Result.initial() : Result.initial()
)), )),
Effect.tap(sub => Effect.forkScoped(this.watch(sub))), Effect.tap(sub => Effect.forkScoped(this.watch(key, sub))),
Effect.provide(this.context), Effect.provide(this.context),
) )
} }
@@ -112,11 +112,13 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
Effect.andThen(Effect.Do), Effect.andThen(Effect.Do),
Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)), Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)),
Effect.bind("latestFinalResult", () => this.latestFinalResult), Effect.bind("latestFinalResult", () => this.latestFinalResult),
Effect.andThen(({ latestKey, latestFinalResult }) => this.startCached(latestKey, Option.isSome(latestFinalResult) Effect.bind("subscribable", ({ latestKey, latestFinalResult }) =>
this.startCached(latestKey, Option.isSome(latestFinalResult)
? Result.willRefresh(latestFinalResult.value) as Result.Final<A, E, P> ? Result.willRefresh(latestFinalResult.value) as Result.Final<A, E, P>
: Result.initial() : Result.initial()
)), )
Effect.andThen(sub => this.watch(sub)), ),
Effect.andThen(({ latestKey, subscribable }) => this.watch(latestKey, subscribable)),
Effect.provide(this.context), Effect.provide(this.context),
) )
} }
@@ -129,11 +131,14 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
Effect.andThen(Effect.Do), Effect.andThen(Effect.Do),
Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)), Effect.bind("latestKey", () => Effect.andThen(this.latestKey, identity)),
Effect.bind("latestFinalResult", () => this.latestFinalResult), Effect.bind("latestFinalResult", () => this.latestFinalResult),
Effect.andThen(({ latestKey, latestFinalResult }) => this.startCached(latestKey, Option.isSome(latestFinalResult) Effect.bind("subscribable", ({ latestKey, latestFinalResult }) =>
this.startCached(latestKey, Option.isSome(latestFinalResult)
? Result.willRefresh(latestFinalResult.value) as Result.Final<A, E, P> ? Result.willRefresh(latestFinalResult.value) as Result.Final<A, E, P>
: Result.initial() : Result.initial()
)), )
Effect.tap(sub => Effect.forkScoped(this.watch(sub))), ),
Effect.tap(({ latestKey, subscribable }) => Effect.forkScoped(this.watch(latestKey, subscribable))),
Effect.map(({ subscribable }) => subscribable),
Effect.provide(this.context), Effect.provide(this.context),
) )
} }
@@ -166,15 +171,19 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
): Effect.Effect< ): Effect.Effect<
Subscribable.Subscribable<Result.Result<A, E, P>>, Subscribable.Subscribable<Result.Result<A, E, P>>,
never, never,
Scope.Scope | QueryClient.QueryClient | R Scope.Scope | R
> { > {
return Result.unsafeForkEffect( return Result.unsafeForkEffect(
Effect.onExit(this.f(key), exit => Effect.andThen( Effect.onExit(this.f(key), () => Effect.andThen(
SubscriptionRef.set(this.fiber, Option.none()), Effect.all([Effect.fiberId, this.fiber]),
Exit.isSuccess(exit) ([currentFiberId, fiber]) => Option.match(fiber, {
? this.updateCacheEntry(key, Result.fromExit(exit)) onSome: v => Equal.equals(currentFiberId, v.id())
? SubscriptionRef.set(this.fiber, Option.none())
: Effect.void, : Effect.void,
onNone: () => Effect.void,
}),
)), )),
{ {
initial, initial,
initialProgress: this.initialProgress, initialProgress: this.initialProgress,
@@ -186,8 +195,9 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
} }
watch( watch(
key: K,
sub: Subscribable.Subscribable<Result.Result<A, E, P>> sub: Subscribable.Subscribable<Result.Result<A, E, P>>
): Effect.Effect<Result.Final<A, E, P>> { ): Effect.Effect<Result.Final<A, E, P>, never, QueryClient.QueryClient> {
return sub.get.pipe( return sub.get.pipe(
Effect.andThen(initial => Stream.runFoldEffect( Effect.andThen(initial => Stream.runFoldEffect(
sub.changes, sub.changes,
@@ -195,6 +205,10 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
(_, result) => Effect.as(SubscriptionRef.set(this.result, result), result), (_, result) => Effect.as(SubscriptionRef.set(this.result, result), result),
) as Effect.Effect<Result.Final<A, E, P>>), ) as Effect.Effect<Result.Final<A, E, P>>),
Effect.tap(result => SubscriptionRef.set(this.latestFinalResult, Option.some(result))), Effect.tap(result => SubscriptionRef.set(this.latestFinalResult, Option.some(result))),
Effect.tap(result => Result.isSuccess(result)
? this.updateCacheEntry(key, result)
: Effect.void
),
) )
} }