This commit is contained in:
@@ -32,23 +32,19 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
|
||||
super()
|
||||
}
|
||||
|
||||
interrupt(): Effect.Effect<void> {
|
||||
return Effect.andThen(this.fiber, Option.match({
|
||||
readonly interrupt: Effect.Effect<void, never, never> = Effect.gen(this, function*() {
|
||||
return yield* Effect.andThen(this.fiber, Option.match({
|
||||
onSome: fiber => Effect.andThen(
|
||||
Fiber.interrupt(fiber),
|
||||
SubscriptionRef.set(this.fiber, Option.none()),
|
||||
),
|
||||
onNone: () => Effect.void,
|
||||
}))
|
||||
}
|
||||
})
|
||||
|
||||
query(key: K): Effect.Effect<
|
||||
Result.Result<A, E, P>,
|
||||
never,
|
||||
Result.unsafeForkEffect.OutputContext<A, E, R, P>
|
||||
> {
|
||||
query(key: K): Effect.Effect<Result.Result<A, E, P>, never, Scope.Scope | R> {
|
||||
return this.fiber.pipe(
|
||||
Effect.andThen(this.interrupt()),
|
||||
Effect.andThen(this.interrupt),
|
||||
Effect.andThen(Result.unsafeForkEffect(this.f(key), { initialProgress: this.initialProgress })),
|
||||
Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))),
|
||||
Effect.andThen(([sub]) => Effect.all([Effect.succeed(sub), sub.get])),
|
||||
@@ -77,7 +73,7 @@ export const make = Effect.fnUntraced(function* <K extends readonly any[], A, E
|
||||
): Effect.fn.Return<Query<K, A, E, Result.forkEffect.OutputContext<A, E, R, P>, P>> {
|
||||
return new QueryImpl(
|
||||
options.key,
|
||||
options.f,
|
||||
options.f as any,
|
||||
options.initialProgress as P,
|
||||
|
||||
yield* SubscriptionRef.make(Option.none<Fiber.Fiber<A, E>>()),
|
||||
@@ -87,14 +83,18 @@ export const make = Effect.fnUntraced(function* <K extends readonly any[], A, E
|
||||
|
||||
export const service = <K extends readonly any[], A, E = never, R = never, P = never>(
|
||||
options: make.Options<K, A, E, R, P>
|
||||
): Effect.Effect<Query<K, A, E, R, P>, never, Scope.Scope> => Effect.tap(
|
||||
): Effect.Effect<
|
||||
Query<K, A, E, Result.forkEffect.OutputContext<A, E, R, P>, P>,
|
||||
never,
|
||||
Scope.Scope | Result.forkEffect.OutputContext<A, E, R, P>
|
||||
> => Effect.tap(
|
||||
make(options),
|
||||
query => Effect.forkScoped(run(query)),
|
||||
)
|
||||
|
||||
export const run = <K extends readonly any[], A, E, R, P>(
|
||||
self: Query<K, A, E, R, P>
|
||||
) => Stream.runForEach(self.key, key => Effect.andThen(
|
||||
(self as QueryImpl<K, A, E, R, P>).interrupt(),
|
||||
): Effect.Effect<void, never, Scope.Scope | R> => Stream.runForEach(self.key, key => Effect.andThen(
|
||||
(self as QueryImpl<K, A, E, R, P>).interrupt,
|
||||
Effect.forkScoped((self as QueryImpl<K, A, E, R, P>).query(key)),
|
||||
))
|
||||
|
||||
Reference in New Issue
Block a user