@reffuse/extension-query 0.1.4 #15

Merged
Thilawyn merged 340 commits from next into master 2025-05-26 04:15:01 +02:00
18 changed files with 257 additions and 344 deletions

View File

@@ -11,8 +11,8 @@ import { routeTree } from "./routeTree.gen"
const layer = Layer.empty.pipe( const layer = Layer.empty.pipe(
Layer.provideMerge(AppQueryClient.Live), Layer.provideMerge(AppQueryClient.Default),
Layer.provideMerge(AppQueryErrorHandler.Live), Layer.provideMerge(AppQueryErrorHandler.Default),
Layer.provideMerge(Clipboard.layer), Layer.provideMerge(Clipboard.layer),
Layer.provideMerge(Geolocation.layer), Layer.provideMerge(Geolocation.layer),
Layer.provideMerge(Permissions.layer), Layer.provideMerge(Permissions.layer),

View File

@@ -1,10 +1,10 @@
import { QueryService } from "@reffuse/extension-query" import { QueryRunner } from "@reffuse/extension-query"
import { ParseResult, Schema } from "effect" import { ParseResult, Schema } from "effect"
export const Result = Schema.Array(Schema.String) export const Result = Schema.Array(Schema.String)
export class Uuid4Query extends QueryService.Tag("Uuid4Query")<Uuid4Query, export class Uuid4Query extends QueryRunner.Tag("Uuid4Query")<Uuid4Query,
readonly ["uuid4", number], readonly ["uuid4", number],
typeof Result.Type, typeof Result.Type,
ParseResult.ParseError ParseResult.ParseError

View File

@@ -8,7 +8,7 @@ export function Uuid4QueryService() {
const runFork = R.useRunFork() const runFork = R.useRunFork()
const query = R.useMemo(() => Uuid4Query.Uuid4Query, []) const query = R.useMemo(() => Uuid4Query.Uuid4Query, [])
const [state] = R.useRefState(query.state) const [state] = R.useSubscribeRefs(query.stateRef)
return ( return (

View File

@@ -4,7 +4,7 @@ import { Uuid4QueryService } from "@/query/views/Uuid4QueryService"
import { R } from "@/reffuse" import { R } from "@/reffuse"
import { HttpClient } from "@effect/platform" import { HttpClient } from "@effect/platform"
import { createFileRoute } from "@tanstack/react-router" import { createFileRoute } from "@tanstack/react-router"
import { Console, Effect, Schema } from "effect" import { Console, Effect, Layer, Schema } from "effect"
import { useMemo } from "react" import { useMemo } from "react"
@@ -17,15 +17,18 @@ function RouteComponent() {
key: R.useStreamFromReactiveValues(["uuid4", 10 as number]), key: R.useStreamFromReactiveValues(["uuid4", 10 as number]),
query: ([, count]) => Console.log(`Querying ${ count } IDs...`).pipe( query: ([, count]) => Console.log(`Querying ${ count } IDs...`).pipe(
Effect.andThen(Effect.sleep("500 millis")), Effect.andThen(Effect.sleep("500 millis")),
Effect.andThen(HttpClient.get(`https://www.uuidtools.com/api/generate/v4/count/${ count }`)), Effect.andThen(Effect.map(
HttpClient.withTracerPropagation(false), HttpClient.HttpClient,
HttpClient.withTracerPropagation(false),
)),
Effect.flatMap(client => client.get(`https://www.uuidtools.com/api/generate/v4/count/${ count }`)),
Effect.flatMap(res => res.json), Effect.flatMap(res => res.json),
Effect.flatMap(Schema.decodeUnknown(Uuid4Query.Result)), Effect.flatMap(Schema.decodeUnknown(Uuid4Query.Result)),
Effect.scoped, Effect.scoped,
), ),
}) })
const layer = useMemo(() => query.layer(Uuid4Query.Uuid4Query), [query]) const layer = useMemo(() => Layer.succeed(Uuid4Query.Uuid4Query, query), [query])
return ( return (
<QueryContext.Provider layer={layer}> <QueryContext.Provider layer={layer}>

View File

@@ -29,15 +29,18 @@ function RouteComponent() {
Effect.tap(() => QueryProgress.QueryProgress.update(() => Effect.tap(() => QueryProgress.QueryProgress.update(() =>
AsyncData.Progress.make({ loaded: 50, total: Option.some(100) }) AsyncData.Progress.make({ loaded: 50, total: Option.some(100) })
)), )),
Effect.andThen(HttpClient.get(`https://www.uuidtools.com/api/generate/v4/count/${ count }`)), Effect.andThen(Effect.map(
HttpClient.withTracerPropagation(false), HttpClient.HttpClient,
HttpClient.withTracerPropagation(false),
)),
Effect.flatMap(client => client.get(`https://www.uuidtools.com/api/generate/v4/count/${ count }`)),
Effect.flatMap(res => res.json), Effect.flatMap(res => res.json),
Effect.flatMap(Schema.decodeUnknown(Result)), Effect.flatMap(Schema.decodeUnknown(Result)),
Effect.scoped, Effect.scoped,
) )
}) })
const [state] = R.useSubscribeRefs(mutation.state) const [state] = R.useSubscribeRefs(mutation.stateRef)
return ( return (

View File

@@ -23,15 +23,18 @@ function RouteComponent() {
key: R.useStreamFromReactiveValues(["uuid4", count]), key: R.useStreamFromReactiveValues(["uuid4", count]),
query: ([, count]) => Console.log(`Querying ${ count } IDs...`).pipe( query: ([, count]) => Console.log(`Querying ${ count } IDs...`).pipe(
Effect.andThen(Effect.sleep("500 millis")), Effect.andThen(Effect.sleep("500 millis")),
Effect.andThen(HttpClient.get(`https://www.uuidtools.com/api/generate/v4/count/${ count }`)), Effect.andThen(Effect.map(
HttpClient.withTracerPropagation(false), HttpClient.HttpClient,
HttpClient.withTracerPropagation(false),
)),
Effect.flatMap(client => client.get(`https://www.uuidtools.com/api/generate/v4/count/${ count }`)),
Effect.flatMap(res => res.json), Effect.flatMap(res => res.json),
Effect.flatMap(Schema.decodeUnknown(Result)), Effect.flatMap(Schema.decodeUnknown(Result)),
Effect.scoped, Effect.scoped,
), ),
}) })
const [state] = R.useSubscribeRefs(query.state) const [state] = R.useSubscribeRefs(query.stateRef)
return ( return (

View File

@@ -1,6 +1,6 @@
{ {
"name": "@reffuse/extension-query", "name": "@reffuse/extension-query",
"version": "0.1.3", "version": "0.1.4",
"type": "module", "type": "module",
"files": [ "files": [
"./README.md", "./README.md",
@@ -39,6 +39,6 @@
"@types/react": "^19.0.0", "@types/react": "^19.0.0",
"effect": "^3.15.0", "effect": "^3.15.0",
"react": "^19.0.0", "react": "^19.0.0",
"reffuse": "^0.1.6" "reffuse": "^0.1.11"
} }
} }

View File

@@ -1,12 +1,11 @@
import * as AsyncData from "@typed/async-data" import * as AsyncData from "@typed/async-data"
import { type Context, Effect, type Fiber, Queue, Ref, Stream, SubscriptionRef } from "effect" import { Effect, type Fiber, Queue, Ref, Stream, SubscriptionRef } from "effect"
import type * as QueryClient from "../QueryClient.js" import type * as QueryClient from "./QueryClient.js"
import * as QueryProgress from "../QueryProgress.js" import * as QueryProgress from "./QueryProgress.js"
import * as QueryState from "./QueryState.js" import { QueryState } from "./internal/index.js"
export interface MutationRunner<K extends readonly unknown[], A, E, R> { export interface MutationRunner<K extends readonly unknown[], A, E> {
readonly context: Context.Context<R>
readonly stateRef: SubscriptionRef.SubscriptionRef<AsyncData.AsyncData<A, E>> readonly stateRef: SubscriptionRef.SubscriptionRef<AsyncData.AsyncData<A, E>>
readonly mutate: (...key: K) => Effect.Effect<AsyncData.Success<A> | AsyncData.Failure<E>> readonly mutate: (...key: K) => Effect.Effect<AsyncData.Success<A> | AsyncData.Failure<E>>
@@ -17,6 +16,11 @@ export interface MutationRunner<K extends readonly unknown[], A, E, R> {
} }
export const Tag = <const Id extends string>(id: Id) => <
Self, K extends readonly unknown[], A, E = never,
>() => Effect.Tag(id)<Self, MutationRunner<K, A, E>>()
export interface MakeProps<K extends readonly unknown[], A, FallbackA, E, HandledE, R> { export interface MakeProps<K extends readonly unknown[], A, FallbackA, E, HandledE, R> {
readonly QueryClient: QueryClient.GenericTagClass<FallbackA, HandledE> readonly QueryClient: QueryClient.GenericTagClass<FallbackA, HandledE>
readonly mutation: (key: K) => Effect.Effect<A, E, R | QueryProgress.QueryProgress> readonly mutation: (key: K) => Effect.Effect<A, E, R | QueryProgress.QueryProgress>
@@ -28,7 +32,7 @@ export const make = <K extends readonly unknown[], A, FallbackA, E, HandledE, R>
mutation, mutation,
}: MakeProps<K, A, FallbackA, E, HandledE, R> }: MakeProps<K, A, FallbackA, E, HandledE, R>
): Effect.Effect< ): Effect.Effect<
MutationRunner<K, A | FallbackA, Exclude<E, HandledE>, R>, MutationRunner<K, A | FallbackA, Exclude<E, HandledE>>,
never, never,
R | QueryClient.TagClassShape<FallbackA, HandledE> R | QueryClient.TagClassShape<FallbackA, HandledE>
> => Effect.gen(function*() { > => Effect.gen(function*() {
@@ -37,25 +41,18 @@ export const make = <K extends readonly unknown[], A, FallbackA, E, HandledE, R>
const queryStateTag = QueryState.makeTag<A | FallbackA, Exclude<E, HandledE>>() const queryStateTag = QueryState.makeTag<A | FallbackA, Exclude<E, HandledE>>()
const run = (key: K) => Effect.Do.pipe( const run = (key: K) => Effect.all([QueryClient, queryStateTag]).pipe(
Effect.bind("state", () => queryStateTag), Effect.flatMap(([client, state]) => state.set(AsyncData.loading()).pipe(
Effect.bind("client", () => QueryClient),
Effect.flatMap(({ state, client }) => state.set(AsyncData.loading()).pipe(
Effect.andThen(mutation(key)), Effect.andThen(mutation(key)),
client.errorHandler.handle, client.errorHandler.handle,
Effect.matchCauseEffect({ Effect.matchCauseEffect({
onSuccess: v => Effect.succeed(AsyncData.success(v)).pipe( onSuccess: v => Effect.tap(Effect.succeed(AsyncData.success(v)), state.set),
Effect.tap(state.set) onFailure: c => Effect.tap(Effect.succeed(AsyncData.failure(c)), state.set),
),
onFailure: c => Effect.succeed(AsyncData.failure(c)).pipe(
Effect.tap(state.set)
),
}), }),
)), )),
Effect.provide(context), Effect.provide(context),
Effect.provide(QueryProgress.QueryProgress.Live), Effect.provide(QueryProgress.QueryProgress.Default),
) )
const mutate = (...key: K) => Effect.provide(run(key), QueryState.layer( const mutate = (...key: K) => Effect.provide(run(key), QueryState.layer(
@@ -64,11 +61,11 @@ export const make = <K extends readonly unknown[], A, FallbackA, E, HandledE, R>
value => Ref.set(globalStateRef, value), value => Ref.set(globalStateRef, value),
)) ))
const forkMutate = (...key: K) => Effect.Do.pipe( const forkMutate = (...key: K) => Effect.all([
Effect.bind("stateRef", () => Ref.make(AsyncData.noData<A | FallbackA, Exclude<E, HandledE>>())), Ref.make(AsyncData.noData<A | FallbackA, Exclude<E, HandledE>>()),
Effect.bind("stateQueue", () => Queue.unbounded<AsyncData.AsyncData<A | FallbackA, Exclude<E, HandledE>>>()), Queue.unbounded<AsyncData.AsyncData<A | FallbackA, Exclude<E, HandledE>>>(),
]).pipe(
Effect.flatMap(({ stateRef, stateQueue }) => Effect.flatMap(([stateRef, stateQueue]) =>
Effect.addFinalizer(() => Queue.shutdown(stateQueue)).pipe( Effect.addFinalizer(() => Queue.shutdown(stateQueue)).pipe(
Effect.andThen(run(key)), Effect.andThen(run(key)),
Effect.scoped, Effect.scoped,

View File

@@ -1,16 +0,0 @@
import type * as AsyncData from "@typed/async-data"
import { Effect, type Fiber, type Stream, type SubscriptionRef } from "effect"
export interface MutationService<K extends readonly unknown[], A, E> {
readonly state: SubscriptionRef.SubscriptionRef<AsyncData.AsyncData<A, E>>
readonly mutate: (...key: K) => Effect.Effect<AsyncData.Success<A> | AsyncData.Failure<E>>
readonly forkMutate: (...key: K) => Effect.Effect<readonly [
fiber: Fiber.RuntimeFiber<AsyncData.Success<A> | AsyncData.Failure<E>>,
state: Stream.Stream<AsyncData.AsyncData<A, E>>,
]>
}
export const Tag = <const Id extends string>(id: Id) => <
Self, K extends readonly unknown[], A, E = never,
>() => Effect.Tag(id)<Self, MutationService<K, A, E>>()

View File

@@ -28,7 +28,7 @@ export interface ServiceResult<Self, EH, FallbackA, HandledE> extends Context.Ta
typeof id, typeof id,
QueryClient<FallbackA, HandledE> QueryClient<FallbackA, HandledE>
> { > {
readonly Live: Layer.Layer< readonly Default: Layer.Layer<
Self | (EH extends QueryErrorHandler.DefaultQueryErrorHandler ? EH : never), Self | (EH extends QueryErrorHandler.DefaultQueryErrorHandler ? EH : never),
never, never,
EH extends QueryErrorHandler.DefaultQueryErrorHandler ? never : EH EH extends QueryErrorHandler.DefaultQueryErrorHandler ? never : EH
@@ -45,7 +45,7 @@ export const Service = <Self>() => (
): ServiceResult<Self, EH, FallbackA, HandledE> => { ): ServiceResult<Self, EH, FallbackA, HandledE> => {
const TagClass = Context.Tag(id)() as ServiceResult<Self, EH, FallbackA, HandledE> const TagClass = Context.Tag(id)() as ServiceResult<Self, EH, FallbackA, HandledE>
(TagClass as Mutable<typeof TagClass>).Live = Layer.effect(TagClass, Effect.Do.pipe( (TagClass as Mutable<typeof TagClass>).Default = Layer.effect(TagClass, Effect.Do.pipe(
Effect.bind("errorHandler", () => Effect.bind("errorHandler", () =>
(props?.ErrorHandler ?? QueryErrorHandler.DefaultQueryErrorHandler) as Effect.Effect< (props?.ErrorHandler ?? QueryErrorHandler.DefaultQueryErrorHandler) as Effect.Effect<
QueryErrorHandler.QueryErrorHandler<FallbackA, HandledE>, QueryErrorHandler.QueryErrorHandler<FallbackA, HandledE>,
@@ -56,7 +56,7 @@ export const Service = <Self>() => (
)).pipe( )).pipe(
Layer.provideMerge((props?.ErrorHandler Layer.provideMerge((props?.ErrorHandler
? Layer.empty ? Layer.empty
: QueryErrorHandler.DefaultQueryErrorHandler.Live : QueryErrorHandler.DefaultQueryErrorHandler.Default
) as Layer.Layer<EH>) ) as Layer.Layer<EH>)
) )

View File

@@ -21,7 +21,7 @@ export interface ServiceResult<
Id, Id,
QueryErrorHandler<FallbackA, HandledE> QueryErrorHandler<FallbackA, HandledE>
> { > {
readonly Live: Layer.Layer<Self> readonly Default: Layer.Layer<Self>
} }
export const Service = <Self, HandledE = never>() => ( export const Service = <Self, HandledE = never>() => (
@@ -35,7 +35,7 @@ export const Service = <Self, HandledE = never>() => (
): ServiceResult<Self, Id, FallbackA, HandledE> => { ): ServiceResult<Self, Id, FallbackA, HandledE> => {
const TagClass = Context.Tag(id)() as ServiceResult<Self, Id, FallbackA, HandledE> const TagClass = Context.Tag(id)() as ServiceResult<Self, Id, FallbackA, HandledE>
(TagClass as Mutable<typeof TagClass>).Live = Layer.effect(TagClass, Effect.gen(function*() { (TagClass as Mutable<typeof TagClass>).Default = Layer.effect(TagClass, Effect.gen(function*() {
const pubsub = yield* PubSub.unbounded<Cause.Cause<HandledE>>() const pubsub = yield* PubSub.unbounded<Cause.Cause<HandledE>>()
const errors = Stream.fromPubSub(pubsub) const errors = Stream.fromPubSub(pubsub)

View File

@@ -1,53 +1,21 @@
import type * as AsyncData from "@typed/async-data" import type { Effect, Stream } from "effect"
import { type Cause, type Context, Effect, type Fiber, Layer, type Option, type Stream, type SubscriptionRef } from "effect"
import * as React from "react"
import { ReffuseExtension, type ReffuseNamespace } from "reffuse" import { ReffuseExtension, type ReffuseNamespace } from "reffuse"
import type * as MutationService from "./MutationService.js" import * as MutationRunner from "./MutationRunner.js"
import * as QueryClient from "./QueryClient.js" import * as QueryClient from "./QueryClient.js"
import type * as QueryProgress from "./QueryProgress.js" import type * as QueryProgress from "./QueryProgress.js"
import type * as QueryService from "./QueryService.js" import * as QueryRunner from "./QueryRunner.js"
import { MutationRunner, QueryRunner } from "./internal/index.js"
export interface UseQueryProps<K extends readonly unknown[], A, E, R> { export interface UseQueryProps<K extends readonly unknown[], A, E, R> {
readonly key: Stream.Stream<K> readonly key: Stream.Stream<K>
readonly query: (key: K) => Effect.Effect<A, E, R | QueryProgress.QueryProgress> readonly query: (key: K) => Effect.Effect<A, E, R | QueryProgress.QueryProgress>
readonly refreshOnWindowFocus?: boolean readonly options?: QueryRunner.RunOptions
} }
export interface UseQueryResult<K extends readonly unknown[], A, E> {
readonly latestKey: SubscriptionRef.SubscriptionRef<Option.Option<K>>
readonly state: SubscriptionRef.SubscriptionRef<AsyncData.AsyncData<A, E>>
readonly forkRefresh: Effect.Effect<readonly [
fiber: Fiber.RuntimeFiber<AsyncData.Success<A> | AsyncData.Failure<E>, Cause.NoSuchElementException>,
state: Stream.Stream<AsyncData.AsyncData<A, E>>,
]>
readonly layer: <Self, Id extends string>(
tag: Context.TagClass<Self, Id, QueryService.QueryService<K, A, E>>
) => Layer.Layer<Self>
}
export interface UseMutationProps<K extends readonly unknown[], A, E, R> { export interface UseMutationProps<K extends readonly unknown[], A, E, R> {
readonly mutation: (key: K) => Effect.Effect<A, E, R | QueryProgress.QueryProgress> readonly mutation: (key: K) => Effect.Effect<A, E, R | QueryProgress.QueryProgress>
} }
export interface UseMutationResult<K extends readonly unknown[], A, E> {
readonly state: SubscriptionRef.SubscriptionRef<AsyncData.AsyncData<A, E>>
readonly mutate: (...key: K) => Effect.Effect<AsyncData.Success<A> | AsyncData.Failure<E>>
readonly forkMutate: (...key: K) => Effect.Effect<readonly [
fiber: Fiber.RuntimeFiber<AsyncData.Success<A> | AsyncData.Failure<E>>,
state: Stream.Stream<AsyncData.AsyncData<A, E>>,
]>
readonly layer: <Self, Id extends string>(
tag: Context.TagClass<Self, Id, MutationService.MutationService<K, A, E>>
) => Layer.Layer<Self>
}
export const QueryExtension = ReffuseExtension.make(() => ({ export const QueryExtension = ReffuseExtension.make(() => ({
useQuery< useQuery<
@@ -61,32 +29,16 @@ export const QueryExtension = ReffuseExtension.make(() => ({
>( >(
this: ReffuseNamespace.ReffuseNamespace<R | QueryClient.TagClassShape<FallbackA, HandledE>>, this: ReffuseNamespace.ReffuseNamespace<R | QueryClient.TagClassShape<FallbackA, HandledE>>,
props: UseQueryProps<QK, QA, QE, QR>, props: UseQueryProps<QK, QA, QE, QR>,
): UseQueryResult<QK, QA | FallbackA, Exclude<QE, HandledE>> { ): QueryRunner.QueryRunner<QK, QA | FallbackA, Exclude<QE, HandledE>> {
const runner = this.useMemo(() => QueryRunner.make({ const runner = this.useMemo(() => QueryRunner.make({
QueryClient: QueryClient.makeGenericTagClass<FallbackA, HandledE>(), QueryClient: QueryClient.makeGenericTagClass<FallbackA, HandledE>(),
key: props.key, key: props.key,
query: props.query, query: props.query,
}), [props.key]) }), [props.key])
this.useFork(() => runner.fetchOnKeyChange, [runner]) this.useFork(() => QueryRunner.run(runner, props.options), [runner])
this.useFork(() => (props.refreshOnWindowFocus ?? true) return runner
? runner.refreshOnWindowFocus
: Effect.void,
[props.refreshOnWindowFocus, runner])
return React.useMemo(() => ({
latestKey: runner.latestKeyRef,
state: runner.stateRef,
forkRefresh: runner.forkRefresh,
layer: tag => Layer.succeed(tag, {
latestKey: runner.latestKeyRef,
state: runner.stateRef,
forkRefresh: runner.forkRefresh,
}),
}), [runner])
}, },
useMutation< useMutation<
@@ -100,23 +52,10 @@ export const QueryExtension = ReffuseExtension.make(() => ({
>( >(
this: ReffuseNamespace.ReffuseNamespace<R | QueryClient.TagClassShape<FallbackA, HandledE>>, this: ReffuseNamespace.ReffuseNamespace<R | QueryClient.TagClassShape<FallbackA, HandledE>>,
props: UseMutationProps<QK, QA, QE, QR>, props: UseMutationProps<QK, QA, QE, QR>,
): UseMutationResult<QK, QA | FallbackA, Exclude<QE, HandledE>> { ): MutationRunner.MutationRunner<QK, QA | FallbackA, Exclude<QE, HandledE>> {
const runner = this.useMemo(() => MutationRunner.make({ return this.useMemo(() => MutationRunner.make({
QueryClient: QueryClient.makeGenericTagClass<FallbackA, HandledE>(), QueryClient: QueryClient.makeGenericTagClass<FallbackA, HandledE>(),
mutation: props.mutation, mutation: props.mutation,
}), []) }), [])
return React.useMemo(() => ({
state: runner.stateRef,
mutate: runner.mutate,
forkMutate: runner.forkMutate,
layer: tag => Layer.succeed(tag, {
state: runner.stateRef,
mutate: runner.mutate,
forkMutate: runner.forkMutate,
}),
}), [runner])
}, },
})) }))

View File

@@ -10,7 +10,7 @@ export class QueryProgress extends Effect.Tag("@reffuse/extension-query/QueryPro
f: (previous: Option.Option<AsyncData.Progress>) => AsyncData.Progress f: (previous: Option.Option<AsyncData.Progress>) => AsyncData.Progress
) => Effect.Effect<void> ) => Effect.Effect<void>
}>() { }>() {
static readonly Live: Layer.Layer< static readonly Default: Layer.Layer<
QueryProgress, QueryProgress,
never, never,
QueryState.QueryState<any, any> QueryState.QueryState<any, any>

View File

@@ -0,0 +1,193 @@
import { BrowserStream } from "@effect/platform-browser"
import * as AsyncData from "@typed/async-data"
import { type Cause, Effect, Fiber, identity, Option, Queue, Ref, type Scope, Stream, SubscriptionRef } from "effect"
import type * as QueryClient from "./QueryClient.js"
import * as QueryProgress from "./QueryProgress.js"
import { QueryState } from "./internal/index.js"
export interface QueryRunner<K extends readonly unknown[], A, E> {
readonly queryKey: Stream.Stream<K>
readonly latestKeyValueRef: SubscriptionRef.SubscriptionRef<Option.Option<K>>
readonly stateRef: SubscriptionRef.SubscriptionRef<AsyncData.AsyncData<A, E>>
readonly fiberRef: SubscriptionRef.SubscriptionRef<Option.Option<Fiber.RuntimeFiber<
AsyncData.Success<A> | AsyncData.Failure<E>,
Cause.NoSuchElementException
>>>
readonly interrupt: Effect.Effect<void>
readonly forkInterrupt: Effect.Effect<Fiber.RuntimeFiber<void>>
readonly forkFetch: (keyValue: K) => Effect.Effect<readonly [
fiber: Fiber.RuntimeFiber<AsyncData.Success<A> | AsyncData.Failure<E>>,
state: Stream.Stream<AsyncData.AsyncData<A, E>>,
]>
readonly forkRefresh: Effect.Effect<readonly [
fiber: Fiber.RuntimeFiber<AsyncData.Success<A> | AsyncData.Failure<E>, Cause.NoSuchElementException>,
state: Stream.Stream<AsyncData.AsyncData<A, E>>,
]>
}
export const Tag = <const Id extends string>(id: Id) => <
Self, K extends readonly unknown[], A, E = never
>() => Effect.Tag(id)<Self, QueryRunner<K, A, E>>()
export interface MakeProps<K extends readonly unknown[], A, FallbackA, E, HandledE, R> {
readonly QueryClient: QueryClient.GenericTagClass<FallbackA, HandledE>
readonly key: Stream.Stream<K>
readonly query: (key: K) => Effect.Effect<A, E, R | QueryProgress.QueryProgress>
}
export const make = <K extends readonly unknown[], A, FallbackA, E, HandledE, R>(
{
QueryClient,
key,
query,
}: MakeProps<K, A, FallbackA, E, HandledE, R>
): Effect.Effect<
QueryRunner<K, A | FallbackA, Exclude<E, HandledE>>,
never,
R | QueryClient.TagClassShape<FallbackA, HandledE>
> => Effect.gen(function*() {
const context = yield* Effect.context<R | QueryClient.TagClassShape<FallbackA, HandledE>>()
const latestKeyValueRef = yield* SubscriptionRef.make(Option.none<K>())
const stateRef = yield* SubscriptionRef.make(AsyncData.noData<A | FallbackA, Exclude<E, HandledE>>())
const fiberRef = yield* SubscriptionRef.make(Option.none<Fiber.RuntimeFiber<
AsyncData.Success<A | FallbackA> | AsyncData.Failure<Exclude<E, HandledE>>,
Cause.NoSuchElementException
>>())
const queryStateTag = QueryState.makeTag<A | FallbackA, Exclude<E, HandledE>>()
const interrupt = Effect.flatMap(fiberRef, Option.match({
onSome: fiber => Ref.set(fiberRef, Option.none()).pipe(
Effect.andThen(Fiber.interrupt(fiber))
),
onNone: () => Effect.void,
}))
const forkInterrupt = Effect.flatMap(fiberRef, Option.match({
onSome: fiber => Ref.set(fiberRef, Option.none()).pipe(
Effect.andThen(Fiber.interrupt(fiber).pipe(
Effect.asVoid,
Effect.forkDaemon,
))
),
onNone: () => Effect.forkDaemon(Effect.void),
}))
const run = (keyValue: K) => Effect.all([QueryClient, queryStateTag]).pipe(
Effect.flatMap(([client, state]) => Ref.set(latestKeyValueRef, Option.some(keyValue)).pipe(
Effect.andThen(query(keyValue)),
client.errorHandler.handle,
Effect.matchCauseEffect({
onSuccess: v => Effect.tap(Effect.succeed(AsyncData.success(v)), state.set),
onFailure: c => Effect.tap(Effect.succeed(AsyncData.failure(c)), state.set),
}),
)),
Effect.provide(context),
Effect.provide(QueryProgress.QueryProgress.Default),
)
const forkFetch = (keyValue: K) => Queue.unbounded<AsyncData.AsyncData<A | FallbackA, Exclude<E, HandledE>>>().pipe(
Effect.flatMap(stateQueue => queryStateTag.pipe(
Effect.flatMap(state => interrupt.pipe(
Effect.andThen(
Effect.addFinalizer(() => Effect.andThen(
Ref.set(fiberRef, Option.none()),
Queue.shutdown(stateQueue),
)).pipe(
Effect.andThen(state.set(AsyncData.loading())),
Effect.andThen(run(keyValue)),
Effect.scoped,
Effect.forkDaemon,
)
),
Effect.tap(fiber => Ref.set(fiberRef, Option.some(fiber))),
Effect.map(fiber => [fiber, Stream.fromQueue(stateQueue)] as const),
)),
Effect.provide(QueryState.layer(
queryStateTag,
stateRef,
value => Effect.andThen(
Queue.offer(stateQueue, value),
Ref.set(stateRef, value),
),
)),
))
)
const setInitialRefreshState = Effect.flatMap(queryStateTag, state => state.update(previous => {
if (AsyncData.isSuccess(previous) || AsyncData.isFailure(previous))
return AsyncData.refreshing(previous)
if (AsyncData.isRefreshing(previous))
return AsyncData.refreshing(previous.previous)
return AsyncData.loading()
}))
const forkRefresh = Queue.unbounded<AsyncData.AsyncData<A | FallbackA, Exclude<E, HandledE>>>().pipe(
Effect.flatMap(stateQueue => interrupt.pipe(
Effect.andThen(
Effect.addFinalizer(() => Effect.andThen(
Ref.set(fiberRef, Option.none()),
Queue.shutdown(stateQueue),
)).pipe(
Effect.andThen(setInitialRefreshState),
Effect.andThen(latestKeyValueRef.pipe(
Effect.flatMap(identity),
Effect.flatMap(run),
)),
Effect.scoped,
Effect.forkDaemon,
)
),
Effect.tap(fiber => Ref.set(fiberRef, Option.some(fiber))),
Effect.map(fiber => [fiber, Stream.fromQueue(stateQueue)] as const),
Effect.provide(QueryState.layer(
queryStateTag,
stateRef,
value => Effect.andThen(
Queue.offer(stateQueue, value),
Ref.set(stateRef, value),
),
)),
))
)
return {
queryKey: key,
latestKeyValueRef,
stateRef,
fiberRef,
interrupt,
forkInterrupt,
forkFetch,
forkRefresh,
}
})
export interface RunOptions {
readonly refreshOnWindowFocus?: boolean
}
export const run = <K extends readonly unknown[], A, E>(
self: QueryRunner<K, A, E>,
options?: RunOptions,
): Effect.Effect<void, never, Scope.Scope> => Effect.gen(function*() {
if (typeof window !== "undefined" && (options?.refreshOnWindowFocus ?? true))
yield* Effect.forkScoped(
Stream.runForEach(BrowserStream.fromEventListenerWindow("focus"), () => self.forkRefresh)
)
yield* Effect.addFinalizer(() => self.interrupt)
yield* Stream.runForEach(Stream.changes(self.queryKey), latestKey => self.forkFetch(latestKey))
})

View File

@@ -1,16 +0,0 @@
import type * as AsyncData from "@typed/async-data"
import { type Cause, Effect, type Fiber, type Option, type Stream, type SubscriptionRef } from "effect"
export interface QueryService<K extends readonly unknown[], A, E> {
readonly latestKey: SubscriptionRef.SubscriptionRef<Option.Option<K>>
readonly state: SubscriptionRef.SubscriptionRef<AsyncData.AsyncData<A, E>>
readonly forkRefresh: Effect.Effect<readonly [
fiber: Fiber.RuntimeFiber<AsyncData.Success<A> | AsyncData.Failure<E>, Cause.NoSuchElementException>,
state: Stream.Stream<AsyncData.AsyncData<A, E>>,
]>
}
export const Tag = <const Id extends string>(id: Id) => <
Self, K extends readonly unknown[], A, E = never,
>() => Effect.Tag(id)<Self, QueryService<K, A, E>>()

View File

@@ -1,6 +1,6 @@
export * as MutationService from "./MutationService.js" export * as MutationRunner from "./MutationRunner.js"
export * as QueryClient from "./QueryClient.js" export * as QueryClient from "./QueryClient.js"
export * as QueryErrorHandler from "./QueryErrorHandler.js" export * as QueryErrorHandler from "./QueryErrorHandler.js"
export * from "./QueryExtension.js" export * from "./QueryExtension.js"
export * as QueryProgress from "./QueryProgress.js" export * as QueryProgress from "./QueryProgress.js"
export * as QueryService from "./QueryService.js" export * as QueryRunner from "./QueryRunner.js"

View File

@@ -1,191 +0,0 @@
import { BrowserStream } from "@effect/platform-browser"
import * as AsyncData from "@typed/async-data"
import { type Cause, type Context, Effect, Fiber, identity, Option, Queue, Ref, type Scope, Stream, SubscriptionRef } from "effect"
import type * as QueryClient from "../QueryClient.js"
import * as QueryProgress from "../QueryProgress.js"
import * as QueryState from "./QueryState.js"
export interface QueryRunner<K extends readonly unknown[], A, E, R> {
readonly context: Context.Context<R>
readonly latestKeyRef: SubscriptionRef.SubscriptionRef<Option.Option<K>>
readonly stateRef: SubscriptionRef.SubscriptionRef<AsyncData.AsyncData<A, E>>
readonly fiberRef: SubscriptionRef.SubscriptionRef<Option.Option<Fiber.RuntimeFiber<
AsyncData.Success<A> | AsyncData.Failure<E>,
Cause.NoSuchElementException
>>>
readonly forkInterrupt: Effect.Effect<Fiber.RuntimeFiber<void, Cause.NoSuchElementException>>
readonly forkFetch: Effect.Effect<readonly [
fiber: Fiber.RuntimeFiber<AsyncData.Success<A> | AsyncData.Failure<E>, Cause.NoSuchElementException>,
state: Stream.Stream<AsyncData.AsyncData<A, E>>,
]>
readonly forkRefresh: Effect.Effect<readonly [
fiber: Fiber.RuntimeFiber<AsyncData.Success<A> | AsyncData.Failure<E>, Cause.NoSuchElementException>,
state: Stream.Stream<AsyncData.AsyncData<A, E>>,
]>
readonly fetchOnKeyChange: Effect.Effect<void, Cause.NoSuchElementException, Scope.Scope>
readonly refreshOnWindowFocus: Effect.Effect<void>
}
export interface MakeProps<K extends readonly unknown[], A, FallbackA, E, HandledE, R> {
readonly QueryClient: QueryClient.GenericTagClass<FallbackA, HandledE>
readonly key: Stream.Stream<K>
readonly query: (key: K) => Effect.Effect<A, E, R | QueryProgress.QueryProgress>
}
export const make = <K extends readonly unknown[], A, FallbackA, E, HandledE, R>(
{
QueryClient,
key,
query,
}: MakeProps<K, A, FallbackA, E, HandledE, R>
): Effect.Effect<
QueryRunner<K, A | FallbackA, Exclude<E, HandledE>, R>,
never,
R | QueryClient.TagClassShape<FallbackA, HandledE>
> => Effect.gen(function*() {
const context = yield* Effect.context<R | QueryClient.TagClassShape<FallbackA, HandledE>>()
const latestKeyRef = yield* SubscriptionRef.make(Option.none<K>())
const stateRef = yield* SubscriptionRef.make(AsyncData.noData<A | FallbackA, Exclude<E, HandledE>>())
const fiberRef = yield* SubscriptionRef.make(Option.none<Fiber.RuntimeFiber<
AsyncData.Success<A | FallbackA> | AsyncData.Failure<Exclude<E, HandledE>>,
Cause.NoSuchElementException
>>())
const queryStateTag = QueryState.makeTag<A | FallbackA, Exclude<E, HandledE>>()
const interrupt = fiberRef.pipe(
Effect.flatMap(Option.match({
onSome: fiber => Ref.set(fiberRef, Option.none()).pipe(
Effect.andThen(Fiber.interrupt(fiber))
),
onNone: () => Effect.void,
}))
)
const forkInterrupt = fiberRef.pipe(
Effect.flatMap(Option.match({
onSome: fiber => Ref.set(fiberRef, Option.none()).pipe(
Effect.andThen(Fiber.interrupt(fiber).pipe(
Effect.asVoid,
Effect.forkDaemon,
))
),
onNone: () => Effect.forkDaemon(Effect.void),
}))
)
const run = Effect.Do.pipe(
Effect.bind("state", () => queryStateTag),
Effect.bind("client", () => QueryClient),
Effect.bind("latestKey", () => latestKeyRef.pipe(Effect.flatMap(identity))),
Effect.flatMap(({ state, client, latestKey }) => query(latestKey).pipe(
client.errorHandler.handle,
Effect.matchCauseEffect({
onSuccess: v => Effect.succeed(AsyncData.success(v)).pipe(
Effect.tap(state.set)
),
onFailure: c => Effect.succeed(AsyncData.failure(c)).pipe(
Effect.tap(state.set)
),
}),
)),
Effect.provide(context),
Effect.provide(QueryProgress.QueryProgress.Live),
)
const forkFetch = Queue.unbounded<AsyncData.AsyncData<A | FallbackA, Exclude<E, HandledE>>>().pipe(
Effect.flatMap(stateQueue => queryStateTag.pipe(
Effect.flatMap(state => interrupt.pipe(
Effect.andThen(Effect.addFinalizer(() => Ref.set(fiberRef, Option.none()).pipe(
Effect.andThen(Queue.shutdown(stateQueue))
)).pipe(
Effect.andThen(state.set(AsyncData.loading())),
Effect.andThen(run),
Effect.scoped,
Effect.forkDaemon,
)),
Effect.tap(fiber => Ref.set(fiberRef, Option.some(fiber))),
Effect.map(fiber => [fiber, Stream.fromQueue(stateQueue)] as const),
)),
Effect.provide(QueryState.layer(
queryStateTag,
stateRef,
value => Queue.offer(stateQueue, value).pipe(
Effect.andThen(Ref.set(stateRef, value))
),
)),
))
)
const setInitialRefreshState = queryStateTag.pipe(
Effect.flatMap(state => state.update(previous => {
if (AsyncData.isSuccess(previous) || AsyncData.isFailure(previous))
return AsyncData.refreshing(previous)
if (AsyncData.isRefreshing(previous))
return AsyncData.refreshing(previous.previous)
return AsyncData.loading()
}))
)
const forkRefresh = Queue.unbounded<AsyncData.AsyncData<A | FallbackA, Exclude<E, HandledE>>>().pipe(
Effect.flatMap(stateQueue => interrupt.pipe(
Effect.andThen(Effect.addFinalizer(() => Ref.set(fiberRef, Option.none()).pipe(
Effect.andThen(Queue.shutdown(stateQueue))
)).pipe(
Effect.andThen(setInitialRefreshState),
Effect.andThen(run),
Effect.scoped,
Effect.forkDaemon,
)),
Effect.tap(fiber => Ref.set(fiberRef, Option.some(fiber))),
Effect.map(fiber => [fiber, Stream.fromQueue(stateQueue)] as const),
Effect.provide(QueryState.layer(
queryStateTag,
stateRef,
value => Queue.offer(stateQueue, value).pipe(
Effect.andThen(Ref.set(stateRef, value))
),
)),
))
)
const fetchOnKeyChange = Effect.addFinalizer(() => interrupt).pipe(
Effect.andThen(Stream.runForEach(Stream.changes(key), latestKey =>
Ref.set(latestKeyRef, Option.some(latestKey)).pipe(
Effect.andThen(forkFetch)
)
))
)
const refreshOnWindowFocus = Stream.runForEach(
BrowserStream.fromEventListenerWindow("focus"),
() => forkRefresh,
)
return {
context,
latestKeyRef,
stateRef,
fiberRef,
forkInterrupt,
forkFetch,
forkRefresh,
fetchOnKeyChange,
refreshOnWindowFocus,
}
})

View File

@@ -1,3 +1 @@
export * as MutationRunner from "./MutationRunner.js"
export * as QueryRunner from "./QueryRunner.js"
export * as QueryState from "./QueryState.js" export * as QueryState from "./QueryState.js"