0.2.2 #31

Merged
Thilawyn merged 184 commits from next into master 2026-01-16 17:05:31 +01:00
2 changed files with 39 additions and 12 deletions
Showing only changes of commit 3b5a9abefa - Show all commits

View File

@@ -1,4 +1,4 @@
import { Effect, Fiber, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect" import { Console, Effect, Fiber, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect"
import * as Result from "./Result.js" import * as Result from "./Result.js"
@@ -13,6 +13,7 @@ extends Pipeable.Pipeable {
readonly f: (key: K) => Effect.Effect<A, E, R> readonly f: (key: K) => Effect.Effect<A, E, R>
readonly initialProgress: P readonly initialProgress: P
readonly latestKey: Subscribable.Subscribable<Option.Option<K>>
readonly fiber: Subscribable.Subscribable<Option.Option<Fiber.Fiber<A, E>>> readonly fiber: Subscribable.Subscribable<Option.Option<Fiber.Fiber<A, E>>>
readonly result: Subscribable.Subscribable<Result.Result<A, E, P>> readonly result: Subscribable.Subscribable<Result.Result<A, E, P>>
} }
@@ -26,6 +27,7 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
readonly f: (key: K) => Effect.Effect<A, E, R>, readonly f: (key: K) => Effect.Effect<A, E, R>,
readonly initialProgress: P, readonly initialProgress: P,
readonly latestKey: SubscriptionRef.SubscriptionRef<Option.Option<K>>,
readonly fiber: SubscriptionRef.SubscriptionRef<Option.Option<Fiber.Fiber<A, E>>>, readonly fiber: SubscriptionRef.SubscriptionRef<Option.Option<Fiber.Fiber<A, E>>>,
readonly result: SubscriptionRef.SubscriptionRef<Result.Result<A, E, P>>, readonly result: SubscriptionRef.SubscriptionRef<Result.Result<A, E, P>>,
) { ) {
@@ -33,24 +35,43 @@ extends Pipeable.Class() implements Query<K, A, E, R, P> {
} }
readonly interrupt: Effect.Effect<void, never, never> = Effect.gen(this, function*() { readonly interrupt: Effect.Effect<void, never, never> = Effect.gen(this, function*() {
yield* Console.log("interrupt called")
return Option.match(yield* this.fiber, { return Option.match(yield* this.fiber, {
onSome: Fiber.interrupt, onSome: fiber => Effect.gen(function*() {
yield* Console.log("interrupting...")
yield* Fiber.interrupt(fiber)
yield* Console.log("done interrupting.")
}),
onNone: () => Effect.void, onNone: () => Effect.void,
}) })
}) })
query(key: K): Effect.Effect<Result.Result<A, E, P>, never, Scope.Scope | R> { start(key: K): Effect.Effect<
Subscribable.Subscribable<Result.Result<A, E, P>>,
never,
Scope.Scope | R
> {
return Result.unsafeForkEffect( return Result.unsafeForkEffect(
Effect.onExit(this.f(key), () => SubscriptionRef.set(this.fiber, Option.none())), Effect.onExit(this.f(key), exit => SubscriptionRef.set(this.fiber, Option.none()).pipe(
Effect.andThen(Console.log("exited", exit))
)),
{ initialProgress: this.initialProgress }, { initialProgress: this.initialProgress },
).pipe( ).pipe(
Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))), Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))),
Effect.andThen(([sub]) => Effect.all([Effect.succeed(sub), sub.get])), Effect.map(([sub]) => sub),
Effect.andThen(([sub, initial]) => Stream.runFoldEffect( )
}
watch(
sub: Subscribable.Subscribable<Result.Result<A, E, P>>
): Effect.Effect<Result.Result<A, E, P>> {
return Effect.andThen(
sub.get,
initial => Stream.runFoldEffect(
sub.changes, sub.changes,
initial, initial,
(_, result) => Effect.as(SubscriptionRef.set(this.result, result), result), (_, result) => Effect.as(SubscriptionRef.set(this.result, result), result),
)), ),
) )
} }
} }
@@ -73,6 +94,7 @@ export const make = Effect.fnUntraced(function* <K extends readonly any[], A, E
options.f as any, options.f as any,
options.initialProgress as P, options.initialProgress as P,
yield* SubscriptionRef.make(Option.none<K>()),
yield* SubscriptionRef.make(Option.none<Fiber.Fiber<A, E>>()), yield* SubscriptionRef.make(Option.none<Fiber.Fiber<A, E>>()),
yield* SubscriptionRef.make(Result.initial<A, E, P>()), yield* SubscriptionRef.make(Result.initial<A, E, P>()),
) )
@@ -91,7 +113,12 @@ export const service = <K extends readonly any[], A, E = never, R = never, P = n
export const run = <K extends readonly any[], A, E, R, P>( export const run = <K extends readonly any[], A, E, R, P>(
self: Query<K, A, E, R, P> self: Query<K, A, E, R, P>
): Effect.Effect<void, never, Scope.Scope | R> => Stream.runForEach(self.key, key => Effect.andThen( ): Effect.Effect<void, never, Scope.Scope | R> => Stream.runForEach(self.key, key =>
(self as QueryImpl<K, A, E, R, P>).interrupt, (self as QueryImpl<K, A, E, R, P>).interrupt.pipe(
Effect.forkScoped((self as QueryImpl<K, A, E, R, P>).query(key)), Effect.andThen(SubscriptionRef.set((self as QueryImpl<K, A, E, R, P>).latestKey, Option.some(key))),
)) Effect.andThen((self as QueryImpl<K, A, E, R, P>).start(key)),
Effect.andThen(sub => Effect.forkScoped(
(self as QueryImpl<K, A, E, R, P>).watch(sub)
)),
)
)

View File

@@ -21,7 +21,7 @@ const ResultView = Component.makeUntraced("Result")(function*() {
const query = yield* Query.service({ const query = yield* Query.service({
key, key,
f: ([, id]) => HttpClient.HttpClient.pipe( f: ([, id]) => HttpClient.HttpClient.pipe(
Effect.tap(Effect.sleep("250 millis")), Effect.tap(Effect.sleep("500 millis")),
Effect.andThen(client => client.get(`https://jsonplaceholder.typicode.com/posts/${ id }`)), Effect.andThen(client => client.get(`https://jsonplaceholder.typicode.com/posts/${ id }`)),
Effect.andThen(response => response.json), Effect.andThen(response => response.json),
Effect.andThen(Schema.decodeUnknown(Post)), Effect.andThen(Schema.decodeUnknown(Post)),