diff --git a/packages/effect-fc/src/Query.ts b/packages/effect-fc/src/Query.ts index 20e4dfb..091093d 100644 --- a/packages/effect-fc/src/Query.ts +++ b/packages/effect-fc/src/Query.ts @@ -1,4 +1,4 @@ -import { Effect, Fiber, Option, Pipeable, Predicate, Stream, type Subscribable, SubscriptionRef } from "effect" +import { Effect, Fiber, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect" import * as Result from "./Result.js" @@ -45,11 +45,11 @@ extends Pipeable.Class() implements Query { query(key: K): Effect.Effect< Result.Result, never, - Result.forkEffect.OutputContext + Result.unsafeForkEffect.OutputContext > { return this.fiber.pipe( Effect.andThen(this.interrupt()), - Effect.andThen(Result.unsafeForkEffect(this.f(key))), + Effect.andThen(Result.unsafeForkEffect(this.f(key), { initialProgress: this.initialProgress })), Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))), Effect.andThen(([sub]) => Effect.all([Effect.succeed(sub), sub.get])), Effect.andThen(([sub, initial]) => Stream.runFoldEffect( @@ -67,24 +67,31 @@ export const isQuery = (u: unknown): u is Query { readonly key: Stream.Stream - readonly f: (key: NoInfer) => Effect.Effect + readonly f: (key: NoInfer) => Effect.Effect>> readonly initialProgress?: P } } export const make = Effect.fnUntraced(function* ( options: make.Options -) { +): Effect.fn.Return, P>> { return new QueryImpl( options.key, options.f, options.initialProgress as P, yield* SubscriptionRef.make(Option.none>()), - yield* SubscriptionRef.make(Result.initial() as Result.Result), + yield* SubscriptionRef.make(Result.initial()), ) }) +export const service = ( + options: make.Options +): Effect.Effect, never, Scope.Scope> => Effect.tap( + make(options), + query => Effect.forkScoped(run(query)), +) + export const run = ( self: Query ) => Stream.runForEach(self.key, key => Effect.andThen( diff --git a/packages/effect-fc/src/Result.ts b/packages/effect-fc/src/Result.ts index b3f9862..16c7815 100644 --- a/packages/effect-fc/src/Result.ts +++ b/packages/effect-fc/src/Result.ts @@ -102,7 +102,10 @@ export const isSuccess = (u: unknown): u is Success => isResult(u) && u export const isFailure = (u: unknown): u is Failure => isResult(u) && u._tag === "Failure" export const isRefreshing = (u: unknown): u is Refreshing => isResult(u) && Predicate.hasProperty(u, "refreshing") && u.refreshing -export const initial = (): Initial => Object.setPrototypeOf({ _tag: "Initial" }, ResultPrototype) +export const initial: { + (): Initial + (): Result +} = (): Initial => Object.setPrototypeOf({ _tag: "Initial" }, ResultPrototype) export const running =

(progress?: P): Running

=> Object.setPrototypeOf({ _tag: "Running", progress }, ResultPrototype) export const succeed = (value: A): Success => Object.setPrototypeOf({ _tag: "Success", value }, ResultPrototype) @@ -187,7 +190,7 @@ export const makeProgressLayer = (): Layer.Layer< export namespace unsafeForkEffect { - export type OutputContext = Scope.Scope | Exclude | Progress

| Progress> + export type OutputContext = Exclude | Progress

| Progress> export interface Options

{ readonly initialProgress?: P @@ -202,7 +205,7 @@ export const unsafeForkEffect = ( never, Scope.Scope | unsafeForkEffect.OutputContext > => Effect.Do.pipe( - Effect.bind("ref", () => Ref.make>(initial())), + Effect.bind("ref", () => Ref.make(initial())), Effect.bind("pubsub", () => PubSub.unbounded>()), Effect.bind("fiber", ({ ref, pubsub }) => Effect.forkScoped(State().pipe( Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(