0.2.1 #26

Merged
Thilawyn merged 144 commits from next into master 2025-12-01 23:37:40 +01:00
2 changed files with 19 additions and 9 deletions
Showing only changes of commit 83128bb467 - Show all commits

View File

@@ -1,4 +1,4 @@
import { Effect, Fiber, Option, Pipeable, Predicate, Stream, type Subscribable, SubscriptionRef } from "effect"
import { Effect, Fiber, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect"
import * as Result from "./Result.js"
@@ -45,11 +45,11 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
query(key: K): Effect.Effect<
Result.Result<A, E, P>,
never,
Result.forkEffect.OutputContext<A, E, R, never>
Result.unsafeForkEffect.OutputContext<A, E, R, P>
> {
return this.fiber.pipe(
Effect.andThen(this.interrupt()),
Effect.andThen(Result.unsafeForkEffect(this.f(key))),
Effect.andThen(Result.unsafeForkEffect(this.f(key), { initialProgress: this.initialProgress })),
Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))),
Effect.andThen(([sub]) => Effect.all([Effect.succeed(sub), sub.get])),
Effect.andThen(([sub, initial]) => Stream.runFoldEffect(
@@ -67,24 +67,31 @@ export const isQuery = (u: unknown): u is Query<unknown[], unknown, unknown, unk
export declare namespace make {
export interface Options<K extends readonly any[], A, E = never, R = never, P = never> {
readonly key: Stream.Stream<K>
readonly f: (key: NoInfer<K>) => Effect.Effect<A, E, R>
readonly f: (key: NoInfer<K>) => Effect.Effect<A, E, Result.forkEffect.InputContext<R, NoInfer<P>>>
readonly initialProgress?: P
}
}
export const make = Effect.fnUntraced(function* <K extends readonly any[], A, E = never, R = never, P = never>(
options: make.Options<K, A, E, R, P>
) {
): Effect.fn.Return<Query<K, A, E, Result.forkEffect.OutputContext<A, E, R, P>, P>> {
return new QueryImpl(
options.key,
options.f,
options.initialProgress as P,
yield* SubscriptionRef.make(Option.none<Fiber.Fiber<A, E>>()),
yield* SubscriptionRef.make(Result.initial() as Result.Result<A, E, P>),
yield* SubscriptionRef.make(Result.initial<A, E, P>()),
)
})
export const service = <K extends readonly any[], A, E = never, R = never, P = never>(
options: make.Options<K, A, E, R, P>
): Effect.Effect<Query<K, A, E, R, P>, never, Scope.Scope> => Effect.tap(
make(options),
query => Effect.forkScoped(run(query)),
)
export const run = <K extends readonly any[], A, E, R, P>(
self: Query<K, A, E, R, P>
) => Stream.runForEach(self.key, key => Effect.andThen(

View File

@@ -102,7 +102,10 @@ export const isSuccess = (u: unknown): u is Success<unknown> => isResult(u) && u
export const isFailure = (u: unknown): u is Failure<unknown, unknown> => isResult(u) && u._tag === "Failure"
export const isRefreshing = (u: unknown): u is Refreshing<unknown> => isResult(u) && Predicate.hasProperty(u, "refreshing") && u.refreshing
export const initial = (): Initial => Object.setPrototypeOf({ _tag: "Initial" }, ResultPrototype)
export const initial: {
(): Initial
<A, E = never, P = never>(): Result<A, E, P>
} = (): Initial => Object.setPrototypeOf({ _tag: "Initial" }, ResultPrototype)
export const running = <P = never>(progress?: P): Running<P> => Object.setPrototypeOf({ _tag: "Running", progress }, ResultPrototype)
export const succeed = <A>(value: A): Success<A> => Object.setPrototypeOf({ _tag: "Success", value }, ResultPrototype)
@@ -187,7 +190,7 @@ export const makeProgressLayer = <A, E, P = never>(): Layer.Layer<
export namespace unsafeForkEffect {
export type OutputContext<A, E, R, P> = Scope.Scope | Exclude<R, State<A, E, P> | Progress<P> | Progress<never>>
export type OutputContext<A, E, R, P> = Exclude<R, State<A, E, P> | Progress<P> | Progress<never>>
export interface Options<P> {
readonly initialProgress?: P
@@ -202,7 +205,7 @@ export const unsafeForkEffect = <A, E, R, P = never>(
never,
Scope.Scope | unsafeForkEffect.OutputContext<A, E, R, P>
> => Effect.Do.pipe(
Effect.bind("ref", () => Ref.make<Result<A, E, P>>(initial())),
Effect.bind("ref", () => Ref.make(initial<A, E, P>())),
Effect.bind("pubsub", () => PubSub.unbounded<Result<A, E, P>>()),
Effect.bind("fiber", ({ ref, pubsub }) => Effect.forkScoped(State<A, E, P>().pipe(
Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(