import { BrowserStream } from "@effect/platform-browser" import * as AsyncData from "@typed/async-data" import { type Cause, type Context, Effect, Fiber, identity, Option, Ref, type Scope, Stream, SubscriptionRef } from "effect" import type * as QueryClient from "../QueryClient.js" import * as QueryProgress from "../QueryProgress.js" import * as QueryState from "./QueryState.js" export interface QueryRunner { readonly context: Context.Context readonly latestKeyRef: SubscriptionRef.SubscriptionRef> readonly stateRef: SubscriptionRef.SubscriptionRef> readonly fiberRef: SubscriptionRef.SubscriptionRef>> readonly forkInterrupt: Effect.Effect> readonly forkFetch: Effect.Effect> readonly forkRefresh: Effect.Effect> readonly fetchOnKeyChange: Effect.Effect readonly refreshOnWindowFocus: Effect.Effect } export interface MakeProps { readonly QueryClient: QueryClient.GenericTagClass readonly key: Stream.Stream readonly query: (key: K) => Effect.Effect } export const make = ( { QueryClient, key, query, }: MakeProps ): Effect.Effect< QueryRunner, R>, never, R | QueryClient.TagClassShape | EH > => Effect.gen(function*() { const context = yield* Effect.context | EH>() const latestKeyRef = yield* SubscriptionRef.make(Option.none()) const stateRef = yield* SubscriptionRef.make(AsyncData.noData>()) const fiberRef = yield* SubscriptionRef.make(Option.none>()) const queryStateTag = QueryState.makeTag>() const queryStateLayer = QueryState.layer(queryStateTag, stateRef, value => Ref.set(stateRef, value)) const interrupt = fiberRef.pipe( Effect.flatMap(Option.match({ onSome: fiber => Ref.set(fiberRef, Option.none()).pipe( Effect.andThen(Fiber.interrupt(fiber)) ), onNone: () => Effect.void, })) ) const forkInterrupt = fiberRef.pipe( Effect.flatMap(Option.match({ onSome: fiber => Ref.set(fiberRef, Option.none()).pipe( Effect.andThen(Fiber.interrupt(fiber).pipe( Effect.asVoid, Effect.forkDaemon, )) ), onNone: () => Effect.forkDaemon(Effect.void), })) ) const run = Effect.all([ queryStateTag, QueryClient.pipe(Effect.flatMap(client => client.ErrorHandler)), ]).pipe( Effect.flatMap(([state, errorHandler]) => latestKeyRef.pipe( Effect.flatMap(identity), Effect.flatMap(key => query(key).pipe( errorHandler.handle, Effect.matchCauseEffect({ onSuccess: v => state.set(AsyncData.success(v)), onFailure: c => state.set(AsyncData.failure(c)), }), )), )), Effect.provide(context), Effect.provide(QueryProgress.QueryProgress.Live), ) const forkFetch = queryStateTag.pipe( Effect.flatMap(state => interrupt.pipe( Effect.andThen(state.set(AsyncData.loading()).pipe( Effect.andThen(run), Effect.fork, )), )), Effect.flatMap(fiber => Ref.set(fiberRef, Option.some(fiber)).pipe( Effect.andThen(Fiber.join(fiber)), Effect.andThen(Ref.set(fiberRef, Option.none())), ) ), Effect.forkDaemon, Effect.provide(queryStateLayer), ) const forkRefresh = queryStateTag.pipe( Effect.flatMap(state => interrupt.pipe( Effect.andThen(state.update(previous => { if (AsyncData.isSuccess(previous) || AsyncData.isFailure(previous)) return AsyncData.refreshing(previous) if (AsyncData.isRefreshing(previous)) return AsyncData.refreshing(previous.previous) return AsyncData.loading() }).pipe( Effect.andThen(run), Effect.fork, )) )), Effect.flatMap(fiber => Ref.set(fiberRef, Option.some(fiber)).pipe( Effect.andThen(Fiber.join(fiber)), Effect.andThen(Ref.set(fiberRef, Option.none())), ) ), Effect.forkDaemon, Effect.provide(queryStateLayer), ) const fetchOnKeyChange = Effect.addFinalizer(() => interrupt).pipe( Effect.andThen(Stream.runForEach(key, latestKey => Ref.set(latestKeyRef, Option.some(latestKey)).pipe( Effect.andThen(forkFetch) ) )) ) const refreshOnWindowFocus = Stream.runForEach( BrowserStream.fromEventListenerWindow("focus"), () => forkRefresh, ) return { context, latestKeyRef, stateRef, fiberRef, forkInterrupt, forkFetch, forkRefresh, fetchOnKeyChange, refreshOnWindowFocus, } })