diff --git a/packages/effect-fc/src/Query.ts b/packages/effect-fc/src/Query.ts index 1c8a962..b72b954 100644 --- a/packages/effect-fc/src/Query.ts +++ b/packages/effect-fc/src/Query.ts @@ -1,4 +1,4 @@ -import { Effect, Fiber, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect" +import { Console, Effect, Fiber, Option, Pipeable, Predicate, type Scope, Stream, type Subscribable, SubscriptionRef } from "effect" import * as Result from "./Result.js" @@ -13,6 +13,7 @@ extends Pipeable.Pipeable { readonly f: (key: K) => Effect.Effect readonly initialProgress: P + readonly latestKey: Subscribable.Subscribable> readonly fiber: Subscribable.Subscribable>> readonly result: Subscribable.Subscribable> } @@ -26,6 +27,7 @@ extends Pipeable.Class() implements Query { readonly f: (key: K) => Effect.Effect, readonly initialProgress: P, + readonly latestKey: SubscriptionRef.SubscriptionRef>, readonly fiber: SubscriptionRef.SubscriptionRef>>, readonly result: SubscriptionRef.SubscriptionRef>, ) { @@ -33,24 +35,43 @@ extends Pipeable.Class() implements Query { } readonly interrupt: Effect.Effect = Effect.gen(this, function*() { + yield* Console.log("interrupt called") return Option.match(yield* this.fiber, { - onSome: Fiber.interrupt, + onSome: fiber => Effect.gen(function*() { + yield* Console.log("interrupting...") + yield* Fiber.interrupt(fiber) + yield* Console.log("done interrupting.") + }), onNone: () => Effect.void, }) }) - query(key: K): Effect.Effect, never, Scope.Scope | R> { + start(key: K): Effect.Effect< + Subscribable.Subscribable>, + never, + Scope.Scope | R + > { return Result.unsafeForkEffect( - Effect.onExit(this.f(key), () => SubscriptionRef.set(this.fiber, Option.none())), + Effect.onExit(this.f(key), exit => SubscriptionRef.set(this.fiber, Option.none()).pipe( + Effect.andThen(Console.log("exited", exit)) + )), { initialProgress: this.initialProgress }, ).pipe( Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))), - Effect.andThen(([sub]) => Effect.all([Effect.succeed(sub), sub.get])), - Effect.andThen(([sub, initial]) => Stream.runFoldEffect( + Effect.map(([sub]) => sub), + ) + } + + watch( + sub: Subscribable.Subscribable> + ): Effect.Effect> { + return Effect.andThen( + sub.get, + initial => Stream.runFoldEffect( sub.changes, initial, (_, result) => Effect.as(SubscriptionRef.set(this.result, result), result), - )), + ), ) } } @@ -73,6 +94,7 @@ export const make = Effect.fnUntraced(function* ()), yield* SubscriptionRef.make(Option.none>()), yield* SubscriptionRef.make(Result.initial()), ) @@ -91,7 +113,12 @@ export const service = ( self: Query -): Effect.Effect => Stream.runForEach(self.key, key => Effect.andThen( - (self as QueryImpl).interrupt, - Effect.forkScoped((self as QueryImpl).query(key)), -)) +): Effect.Effect => Stream.runForEach(self.key, key => + (self as QueryImpl).interrupt.pipe( + Effect.andThen(SubscriptionRef.set((self as QueryImpl).latestKey, Option.some(key))), + Effect.andThen((self as QueryImpl).start(key)), + Effect.andThen(sub => Effect.forkScoped( + (self as QueryImpl).watch(sub) + )), + ) +) diff --git a/packages/example/src/routes/query.tsx b/packages/example/src/routes/query.tsx index 2253fc8..1ae4659 100644 --- a/packages/example/src/routes/query.tsx +++ b/packages/example/src/routes/query.tsx @@ -21,7 +21,7 @@ const ResultView = Component.makeUntraced("Result")(function*() { const query = yield* Query.service({ key, f: ([, id]) => HttpClient.HttpClient.pipe( - Effect.tap(Effect.sleep("250 millis")), + Effect.tap(Effect.sleep("500 millis")), Effect.andThen(client => client.get(`https://jsonplaceholder.typicode.com/posts/${ id }`)), Effect.andThen(response => response.json), Effect.andThen(Schema.decodeUnknown(Post)),