From d6011f78970850824091b52a528e58746fe06ea1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Tue, 11 Mar 2025 02:17:50 +0100 Subject: [PATCH] MutationRunner --- .../extension-query/src/MutationRunner.ts | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 packages/extension-query/src/MutationRunner.ts diff --git a/packages/extension-query/src/MutationRunner.ts b/packages/extension-query/src/MutationRunner.ts new file mode 100644 index 0000000..a845a8d --- /dev/null +++ b/packages/extension-query/src/MutationRunner.ts @@ -0,0 +1,144 @@ +import { BrowserStream } from "@effect/platform-browser" +import * as AsyncData from "@typed/async-data" +import { type Cause, Effect, Fiber, identity, Option, Ref, type Scope, Stream, SubscriptionRef } from "effect" + + +export interface MutationRunner { + readonly key: Stream.Stream + readonly mutation: (key: K) => Effect.Effect + + readonly latestKeyRef: SubscriptionRef.SubscriptionRef> + readonly stateRef: SubscriptionRef.SubscriptionRef> + readonly fiberRef: SubscriptionRef.SubscriptionRef>> + + readonly forkInterrupt: Effect.Effect> + readonly forkFetch: Effect.Effect> + readonly forkRefresh: Effect.Effect> + + readonly fetchOnKeyChange: Effect.Effect +} + + +export interface MakeProps { + readonly key: Stream.Stream + readonly mutation: (key: K) => Effect.Effect +} + +export const make = ( + { key, query }: MakeProps +): Effect.Effect, never, R> => Effect.gen(function*() { + const context = yield* Effect.context() + + const latestKeyRef = yield* SubscriptionRef.make(Option.none()) + const stateRef = yield* SubscriptionRef.make(AsyncData.noData()) + const fiberRef = yield* SubscriptionRef.make(Option.none>()) + + const interrupt = fiberRef.pipe( + Effect.flatMap(Option.match({ + onSome: fiber => Ref.set(fiberRef, Option.none()).pipe( + Effect.andThen(Fiber.interrupt(fiber)) + ), + onNone: () => Effect.void, + })) + ) + + const forkInterrupt = fiberRef.pipe( + Effect.flatMap(Option.match({ + onSome: fiber => Ref.set(fiberRef, Option.none()).pipe( + Effect.andThen(Fiber.interrupt(fiber).pipe( + Effect.asVoid, + Effect.forkDaemon, + )) + ), + onNone: () => Effect.forkDaemon(Effect.void), + })) + ) + + const forkFetch = interrupt.pipe( + Effect.andThen( + Ref.set(stateRef, AsyncData.loading()).pipe( + Effect.andThen(latestKeyRef), + Effect.flatMap(identity), + Effect.flatMap(key => query(key).pipe( + Effect.matchCauseEffect({ + onSuccess: v => Ref.set(stateRef, AsyncData.success(v)), + onFailure: c => Ref.set(stateRef, AsyncData.failure(c)), + }) + )), + + Effect.provide(context), + Effect.fork, + ) + ), + + Effect.flatMap(fiber => + Ref.set(fiberRef, Option.some(fiber)).pipe( + Effect.andThen(Fiber.join(fiber)), + Effect.andThen(Ref.set(fiberRef, Option.none())), + ) + ), + + Effect.forkDaemon, + ) + + const forkRefresh = interrupt.pipe( + Effect.andThen( + Ref.update(stateRef, previous => { + if (AsyncData.isSuccess(previous) || AsyncData.isFailure(previous)) + return AsyncData.refreshing(previous) + if (AsyncData.isRefreshing(previous)) + return AsyncData.refreshing(previous.previous) + return AsyncData.loading() + }).pipe( + Effect.andThen(latestKeyRef), + Effect.flatMap(identity), + Effect.flatMap(key => query(key).pipe( + Effect.matchCauseEffect({ + onSuccess: v => Ref.set(stateRef, AsyncData.success(v)), + onFailure: c => Ref.set(stateRef, AsyncData.failure(c)), + }) + )), + + Effect.provide(context), + Effect.fork, + ) + ), + + Effect.flatMap(fiber => + Ref.set(fiberRef, Option.some(fiber)).pipe( + Effect.andThen(Fiber.join(fiber)), + Effect.andThen(Ref.set(fiberRef, Option.none())), + ) + ), + + Effect.forkDaemon, + ) + + const fetchOnKeyChange = Effect.addFinalizer(() => interrupt).pipe( + Effect.andThen(Stream.runForEach(key, latestKey => + Ref.set(latestKeyRef, Option.some(latestKey)).pipe( + Effect.andThen(forkFetch) + ) + )) + ) + + const refreshOnWindowFocus = Stream.runForEach( + BrowserStream.fromEventListenerWindow("focus"), + () => forkRefresh, + ) + + return { + query, + + latestKeyRef, + stateRef, + fiberRef, + + forkInterrupt, + forkFetch, + forkRefresh, + + fetchOnKeyChange, + refreshOnWindowFocus, + } +})