0.2.2 #31

Merged
Thilawyn merged 184 commits from next into master 2026-01-16 17:05:31 +01:00
2 changed files with 41 additions and 4 deletions
Showing only changes of commit c789de3ad8 - Show all commits

View File

@@ -149,11 +149,12 @@ export const submit = <A, I, R, SA, SE, SR>(
): Effect.Effect<Option.Option<Result.Result<SA, SE>>, NoSuchElementException, Scope.Scope | SR> => Effect.whenEffect(
self.valueRef.pipe(
Effect.andThen(identity),
Effect.andThen(value => Result.forkEffectDequeue(
self.onSubmit(value) as Effect.Effect<SA, SE, Result.forkEffectDequeue.InputContext<SR, never>>)
Effect.andThen(value => Result.forkEffectPubSub(
self.onSubmit(value) as Effect.Effect<SA, SE, Result.forkEffectPubSub.InputContext<SR, never>>)
),
Effect.andThen(identity),
Effect.andThen(Stream.fromQueue),
Stream.unwrap,
Stream.unwrapScoped,
Stream.runFoldEffect(
Result.initial() as Result.Result<SA, SE>,
(_, result) => Effect.as(Ref.set(self.submitResultRef, result), result),

View File

@@ -1,4 +1,4 @@
import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, pipe, Queue, Ref, type Scope, type Subscribable, SubscriptionRef } from "effect"
import { Cause, Context, Data, Effect, Equal, Exit, Hash, Layer, Match, Option, Pipeable, Predicate, PubSub, pipe, Queue, Ref, type Scope, type Subscribable, SubscriptionRef } from "effect"
export const ResultTypeId: unique symbol = Symbol.for("@effect-fc/Result/Result")
@@ -219,6 +219,42 @@ export const forkEffect = <A, E, R, P = never>(
)),
) as Effect.Effect<Subscribable.Subscribable<Result<A, E, P>>, never, Scope.Scope>
export namespace forkEffectPubSub {
export type InputContext<R, P> = forkEffect.InputContext<R, P>
export type OutputContext<R> = forkEffect.OutputContext<R>
export interface Options<P> extends forkEffect.Options<P> {}
}
export const forkEffectPubSub = <A, E, R, P = never>(
effect: Effect.Effect<A, E, forkEffectDequeue.InputContext<R, NoInfer<P>>>,
options?: forkEffectDequeue.Options<P>,
): Effect.Effect<
Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>,
never,
forkEffectDequeue.OutputContext<R>
> => Effect.all([
Ref.make<Result<A, E, P>>(initial()),
PubSub.unbounded<Result<A, E, P>>(),
]).pipe(
Effect.tap(([ref, pubsub]) => Effect.forkScoped(State<A, E, P>().pipe(
Effect.andThen(state => state.set(running(options?.initialProgress)).pipe(
Effect.andThen(effect),
Effect.onExit(exit => Effect.andThen(
state.set(fromExit(exit)),
Effect.forkScoped(PubSub.shutdown(pubsub)),
)),
)),
Effect.provide(Layer.empty.pipe(
Layer.provideMerge(makeProgressLayer<A, E, P>()),
Layer.provideMerge(Layer.succeed(State<A, E, P>(), {
get: ref,
set: v => Effect.andThen(Ref.set(ref, v), PubSub.publish(pubsub, v))
})),
)),
))),
Effect.map(([, pubsub]) => pubsub.subscribe),
) as Effect.Effect<Effect.Effect<Queue.Dequeue<Result<A, E, P>>, never, Scope.Scope>, never, Scope.Scope>
export namespace forkEffectDequeue {
export type InputContext<R, P> = forkEffect.InputContext<R, P>
export type OutputContext<R> = forkEffect.OutputContext<R>