diff --git a/packages/extension-query/src/QueryRunner.ts b/packages/extension-query/src/QueryRunner.ts index f87e7ba..c8754fd 100644 --- a/packages/extension-query/src/QueryRunner.ts +++ b/packages/extension-query/src/QueryRunner.ts @@ -8,7 +8,7 @@ import { QueryState } from "./internal/index.js" export interface QueryRunner { readonly queryKey: Stream.Stream - readonly latestKeyRef: SubscriptionRef.SubscriptionRef> + readonly latestKeyValueRef: SubscriptionRef.SubscriptionRef> readonly stateRef: SubscriptionRef.SubscriptionRef> readonly fiberRef: SubscriptionRef.SubscriptionRef | AsyncData.Failure, @@ -17,8 +17,8 @@ export interface QueryRunner { readonly interrupt: Effect.Effect readonly forkInterrupt: Effect.Effect> - readonly forkFetch: Effect.Effect | AsyncData.Failure, Cause.NoSuchElementException>, + readonly forkFetch: (keyValue: K) => Effect.Effect | AsyncData.Failure>, state: Stream.Stream>, ]> readonly forkRefresh: Effect.Effect > => Effect.gen(function*() { const context = yield* Effect.context>() - const latestKeyRef = yield* SubscriptionRef.make(Option.none()) + const latestKeyValueRef = yield* SubscriptionRef.make(Option.none()) const stateRef = yield* SubscriptionRef.make(AsyncData.noData>()) const fiberRef = yield* SubscriptionRef.make(Option.none | AsyncData.Failure>, @@ -78,20 +78,13 @@ export const make = onNone: () => Effect.forkDaemon(Effect.void), })) - const run = Effect.Do.pipe( - Effect.bind("state", () => queryStateTag), - Effect.bind("client", () => QueryClient), - Effect.bind("latestKey", () => latestKeyRef.pipe(Effect.flatMap(identity))), - - Effect.flatMap(({ state, client, latestKey }) => query(latestKey).pipe( + const run = (keyValue: K) => Effect.all([QueryClient, queryStateTag]).pipe( + Effect.flatMap(([client, state]) => Ref.set(latestKeyValueRef, Option.some(keyValue)).pipe( + Effect.andThen(query(keyValue)), client.errorHandler.handle, Effect.matchCauseEffect({ - onSuccess: v => Effect.succeed(AsyncData.success(v)).pipe( - Effect.tap(state.set) - ), - onFailure: c => Effect.succeed(AsyncData.failure(c)).pipe( - Effect.tap(state.set) - ), + onSuccess: v => Effect.tap(Effect.succeed(AsyncData.success(v)), state.set), + onFailure: c => Effect.tap(Effect.succeed(AsyncData.failure(c)), state.set), }), )), @@ -99,17 +92,20 @@ export const make = Effect.provide(QueryProgress.QueryProgress.Default), ) - const forkFetch = Queue.unbounded>>().pipe( + const forkFetch = (keyValue: K) => Queue.unbounded>>().pipe( Effect.flatMap(stateQueue => queryStateTag.pipe( Effect.flatMap(state => interrupt.pipe( - Effect.andThen(Effect.addFinalizer(() => Ref.set(fiberRef, Option.none()).pipe( - Effect.andThen(Queue.shutdown(stateQueue)) - )).pipe( - Effect.andThen(state.set(AsyncData.loading())), - Effect.andThen(run), - Effect.scoped, - Effect.forkDaemon, - )), + Effect.andThen( + Effect.addFinalizer(() => Effect.andThen( + Ref.set(fiberRef, Option.none()), + Queue.shutdown(stateQueue), + )).pipe( + Effect.andThen(state.set(AsyncData.loading())), + Effect.andThen(run(keyValue)), + Effect.scoped, + Effect.forkDaemon, + ) + ), Effect.tap(fiber => Ref.set(fiberRef, Option.some(fiber))), Effect.map(fiber => [fiber, Stream.fromQueue(stateQueue)] as const), @@ -118,8 +114,9 @@ export const make = Effect.provide(QueryState.layer( queryStateTag, stateRef, - value => Queue.offer(stateQueue, value).pipe( - Effect.andThen(Ref.set(stateRef, value)) + value => Effect.andThen( + Queue.offer(stateQueue, value), + Ref.set(stateRef, value), ), )), )) @@ -135,14 +132,20 @@ export const make = const forkRefresh = Queue.unbounded>>().pipe( Effect.flatMap(stateQueue => interrupt.pipe( - Effect.andThen(Effect.addFinalizer(() => Ref.set(fiberRef, Option.none()).pipe( - Effect.andThen(Queue.shutdown(stateQueue)) - )).pipe( - Effect.andThen(setInitialRefreshState), - Effect.andThen(run), - Effect.scoped, - Effect.forkDaemon, - )), + Effect.andThen( + Effect.addFinalizer(() => Effect.andThen( + Ref.set(fiberRef, Option.none()), + Queue.shutdown(stateQueue), + )).pipe( + Effect.andThen(setInitialRefreshState), + Effect.andThen(latestKeyValueRef.pipe( + Effect.flatMap(identity), + Effect.flatMap(run), + )), + Effect.scoped, + Effect.forkDaemon, + ) + ), Effect.tap(fiber => Ref.set(fiberRef, Option.some(fiber))), Effect.map(fiber => [fiber, Stream.fromQueue(stateQueue)] as const), @@ -150,8 +153,9 @@ export const make = Effect.provide(QueryState.layer( queryStateTag, stateRef, - value => Queue.offer(stateQueue, value).pipe( - Effect.andThen(Ref.set(stateRef, value)) + value => Effect.andThen( + Queue.offer(stateQueue, value), + Ref.set(stateRef, value), ), )), )) @@ -159,7 +163,7 @@ export const make = return { queryKey: key, - latestKeyRef, + latestKeyValueRef, stateRef, fiberRef, @@ -178,16 +182,12 @@ export interface RunOptions { export const run = ( self: QueryRunner, options?: RunOptions, -): Effect.Effect => Effect.gen(function*() { +): Effect.Effect => Effect.gen(function*() { if (typeof window !== "undefined" && (options?.refreshOnWindowFocus ?? true)) yield* Effect.forkScoped( Stream.runForEach(BrowserStream.fromEventListenerWindow("focus"), () => self.forkRefresh) ) yield* Effect.addFinalizer(() => self.interrupt) - yield* Stream.runForEach(Stream.changes(self.queryKey), latestKey => - Ref.set(self.latestKeyRef, Option.some(latestKey)).pipe( - Effect.andThen(self.forkFetch) - ) - ) + yield* Stream.runForEach(Stream.changes(self.queryKey), latestKey => self.forkFetch(latestKey)) })