0.2.1 #26

Merged
Thilawyn merged 144 commits from next into master 2025-12-01 23:37:40 +01:00
Showing only changes of commit 9feb94ea9e - Show all commits

View File

@@ -257,3 +257,38 @@ export const forkEffectPubSub = <A, E, R, P = never>(
))), ))),
Effect.map(([, pubsub]) => pubsub.subscribe), Effect.map(([, pubsub]) => pubsub.subscribe),
) as Effect.Effect<Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>, never, Scope.Scope> ) as Effect.Effect<Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>, never, Scope.Scope>
export namespace forkEffect {
export type InputContext<R, P> = R extends Progress<infer X> ? [X] extends [P] ? R : never : R
export type OutputContext<R> = Scope.Scope | Exclude<R, Progress<any> | Progress<never>>
export interface Options<P> {
readonly initialProgress?: P
}
}
export const forkEffect = <A, E, R, P = never>(
effect: Effect.Effect<A, E, forkEffect.InputContext<R, NoInfer<P>>>,
options?: forkEffect.Options<P>,
) => Effect.all([
Ref.make<Result<A, E, P>>(initial()),
PubSub.unbounded<Result<A, E, P>>(),
]).pipe(
Effect.tap(([ref, pubsub]) => Effect.forkScoped(State<A, E, P>().pipe(
Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(
Effect.andThen(effect),
Effect.onExit(exit => Effect.andThen(
state.set(fromExit(exit)),
Effect.forkScoped(PubSub.shutdown(pubsub)),
)),
)),
Effect.provide(Layer.empty.pipe(
Layer.provideMerge(makeProgressLayer<A, E, P>()),
Layer.provideMerge(Layer.succeed(State<A, E, P>(), {
get: ref,
set: v => Effect.andThen(Ref.set(ref, v), PubSub.publish(pubsub, v))
})),
)),
))),
Effect.map(([, pubsub]) => pubsub.subscribe),
) as Effect.Effect<Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>, never, Scope.Scope>