import { BrowserStream } from "@effect/platform-browser" import * as AsyncData from "@typed/async-data" import { type Cause, type Context, Effect, Fiber, identity, Option, Queue, Ref, type Scope, Stream, SubscriptionRef } from "effect" import type * as QueryClient from "../QueryClient.js" import * as QueryProgress from "../QueryProgress.js" import * as QueryState from "./QueryState.js" export interface QueryRunner { readonly context: Context.Context readonly latestKeyRef: SubscriptionRef.SubscriptionRef> readonly stateRef: SubscriptionRef.SubscriptionRef> readonly fiberRef: SubscriptionRef.SubscriptionRef | AsyncData.Failure, Cause.NoSuchElementException >>> readonly forkInterrupt: Effect.Effect> readonly forkFetch: Effect.Effect | AsyncData.Failure, Cause.NoSuchElementException>, state: Stream.Stream>, ]> readonly forkRefresh: Effect.Effect | AsyncData.Failure, Cause.NoSuchElementException>, state: Stream.Stream>, ]> readonly fetchOnKeyChange: Effect.Effect } export interface MakeProps { readonly QueryClient: QueryClient.GenericTagClass readonly key: Stream.Stream readonly query: (key: K) => Effect.Effect } export const make = ( { QueryClient, key, query, }: MakeProps ): Effect.Effect< QueryRunner, R>, never, R | QueryClient.TagClassShape > => Effect.gen(function*() { const context = yield* Effect.context>() const latestKeyRef = yield* SubscriptionRef.make(Option.none()) const stateRef = yield* SubscriptionRef.make(AsyncData.noData>()) const fiberRef = yield* SubscriptionRef.make(Option.none | AsyncData.Failure>, Cause.NoSuchElementException >>()) const queryStateTag = QueryState.makeTag>() const interrupt = fiberRef.pipe( Effect.flatMap(Option.match({ onSome: fiber => Ref.set(fiberRef, Option.none()).pipe( Effect.andThen(Fiber.interrupt(fiber)) ), onNone: () => Effect.void, })) ) const forkInterrupt = fiberRef.pipe( Effect.flatMap(Option.match({ onSome: fiber => Ref.set(fiberRef, Option.none()).pipe( Effect.andThen(Fiber.interrupt(fiber).pipe( Effect.asVoid, Effect.forkDaemon, )) ), onNone: () => Effect.forkDaemon(Effect.void), })) ) const run = Effect.Do.pipe( Effect.bind("state", () => queryStateTag), Effect.bind("client", () => QueryClient), Effect.bind("latestKey", () => latestKeyRef.pipe(Effect.flatMap(identity))), Effect.flatMap(({ state, client, latestKey }) => query(latestKey).pipe( client.errorHandler.handle, Effect.matchCauseEffect({ onSuccess: v => Effect.succeed(AsyncData.success(v)).pipe( Effect.tap(state.set) ), onFailure: c => Effect.succeed(AsyncData.failure(c)).pipe( Effect.tap(state.set) ), }), )), Effect.provide(context), Effect.provide(QueryProgress.QueryProgress.Live), ) const forkFetch = Queue.unbounded>>().pipe( Effect.flatMap(stateQueue => queryStateTag.pipe( Effect.flatMap(state => interrupt.pipe( Effect.andThen(Effect.addFinalizer(() => Ref.set(fiberRef, Option.none()).pipe( Effect.andThen(Queue.shutdown(stateQueue)) )).pipe( Effect.andThen(state.set(AsyncData.loading())), Effect.andThen(run), Effect.scoped, Effect.forkDaemon, )), Effect.tap(fiber => Ref.set(fiberRef, Option.some(fiber))), Effect.map(fiber => [fiber, Stream.fromQueue(stateQueue)] as const), )), Effect.provide(QueryState.layer( queryStateTag, stateRef, value => Queue.offer(stateQueue, value).pipe( Effect.andThen(Ref.set(stateRef, value)) ), )), )) ) const setInitialRefreshState = queryStateTag.pipe( Effect.flatMap(state => state.update(previous => { if (AsyncData.isSuccess(previous) || AsyncData.isFailure(previous)) return AsyncData.refreshing(previous) if (AsyncData.isRefreshing(previous)) return AsyncData.refreshing(previous.previous) return AsyncData.loading() })) ) const forkRefresh = Queue.unbounded>>().pipe( Effect.flatMap(stateQueue => interrupt.pipe( Effect.andThen(Effect.addFinalizer(() => Ref.set(fiberRef, Option.none()).pipe( Effect.andThen(Queue.shutdown(stateQueue)) )).pipe( Effect.andThen(setInitialRefreshState), Effect.andThen(run), Effect.scoped, Effect.forkDaemon, )), Effect.tap(fiber => Ref.set(fiberRef, Option.some(fiber))), Effect.map(fiber => [fiber, Stream.fromQueue(stateQueue)] as const), Effect.provide(QueryState.layer( queryStateTag, stateRef, value => Queue.offer(stateQueue, value).pipe( Effect.andThen(Ref.set(stateRef, value)) ), )), )) ) const fetchOnKeyChange = Effect.addFinalizer(() => interrupt).pipe( Effect.andThen(Stream.runForEach(Stream.changes(key), latestKey => Ref.set(latestKeyRef, Option.some(latestKey)).pipe( Effect.andThen(forkFetch) ) )) ) return { context, latestKeyRef, stateRef, fiberRef, forkInterrupt, forkFetch, forkRefresh, fetchOnKeyChange, } }) export interface RunOptions { readonly refreshOnWindowFocus?: boolean } export const run = ( self: QueryRunner, options?: RunOptions, ): Effect.Effect => Effect.gen(function*() { if (options?.refreshOnWindowFocus ?? true) yield* Stream.runForEach( typeof window !== "undefined" ? BrowserStream.fromEventListenerWindow("focus") : Stream.empty, () => self.forkRefresh, ) yield* self.fetchOnKeyChange })