>>,
+ options?: forkEffectDequeue.Options,
): Effect.Effect<
Queue.Dequeue>,
never,
- forkEffectScoped.OutputContext
-> => Effect.Do.pipe(
- Effect.bind("scope", () => Scope.Scope),
- Effect.bind("queue", () => Queue.unbounded>()),
- Effect.bind("ref", () => Ref.make>(initial())),
- Effect.tap(({ queue, ref }) => Effect.andThen(ref, v => Queue.offer(queue, v))),
- Effect.tap(({ scope, queue, ref }) => Effect.forkScoped(
- Effect.addFinalizer(() => Queue.shutdown(queue)).pipe(
- Effect.andThen(Effect.succeed(running(options?.initialProgress)).pipe(
- Effect.tap(v => Ref.set(ref, v)),
- Effect.tap(v => Queue.offer(queue, v)),
- )),
- Effect.andThen(Effect.provideService(effect, Scope.Scope, scope)),
- Effect.exit,
- Effect.andThen(exit => Effect.succeed(fromExit(exit)).pipe(
- Effect.tap(v => Ref.set(ref, v)),
- Effect.tap(v => Queue.offer(queue, v)),
- )),
- Effect.scoped,
- Effect.provide(makeProgressLayer(queue, ref)),
- )
- )),
- Effect.map(({ queue }) => queue),
-) as Effect.Effect>, never, Scope.Scope>
+ forkEffectDequeue.OutputContext
+> => Effect.all([
+ Ref.make>(initial()),
+ Queue.unbounded>(),
+]).pipe(
+ Effect.tap(([ref, queue]) => Effect.forkScoped(State().pipe(
+ Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(
+ Effect.andThen(effect),
+ Effect.onExit(exit => Effect.andThen(state.set(fromExit(exit)), Queue.shutdown(queue))),
+ )),
+ Effect.provide(Layer.empty.pipe(
+ Layer.provideMerge(makeProgressLayer()),
+ Layer.provideMerge(Layer.succeed(State(), {
+ get: ref,
+ set: v => Effect.andThen(Ref.set(ref, v), Queue.offer(queue, v))
+ })),
+ )),
+ ))),
+ Effect.map(([, queue]) => queue),
+) as Effect.Effect>, never, Scope.Scope>