From 3ff646db0f4ef8a0f90d36fece59ad239d7cb57f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julien=20Valverd=C3=A9?= Date: Tue, 18 Nov 2025 04:50:11 +0100 Subject: [PATCH] Query work --- packages/effect-fc/src/Query.ts | 78 ++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 25 deletions(-) diff --git a/packages/effect-fc/src/Query.ts b/packages/effect-fc/src/Query.ts index c9a8ca9..20e4dfb 100644 --- a/packages/effect-fc/src/Query.ts +++ b/packages/effect-fc/src/Query.ts @@ -1,4 +1,4 @@ -import { Effect, Fiber, Option, Pipeable, Predicate, Stream, type Subscribable, type SubscriptionRef } from "effect" +import { Effect, Fiber, Option, Pipeable, Predicate, Stream, type Subscribable, SubscriptionRef } from "effect" import * as Result from "./Result.js" @@ -13,7 +13,7 @@ extends Pipeable.Pipeable { readonly f: (key: K) => Effect.Effect readonly initialProgress: P - readonly fiber: Subscribable.Subscribable>>> + readonly fiber: Subscribable.Subscribable>> readonly result: Subscribable.Subscribable> } @@ -26,40 +26,68 @@ extends Pipeable.Class() implements Query { readonly f: (key: K) => Effect.Effect, readonly initialProgress: P, - readonly fiber: SubscriptionRef.SubscriptionRef>>>, + readonly fiber: SubscriptionRef.SubscriptionRef>>, readonly result: SubscriptionRef.SubscriptionRef>, ) { super() } - query(key: K) { + interrupt(): Effect.Effect { + return Effect.andThen(this.fiber, Option.match({ + onSome: fiber => Effect.andThen( + Fiber.interrupt(fiber), + SubscriptionRef.set(this.fiber, Option.none()), + ), + onNone: () => Effect.void, + })) + } + + query(key: K): Effect.Effect< + Result.Result, + never, + Result.forkEffect.OutputContext + > { return this.fiber.pipe( - Effect.andThen(Option.match({ - onSome: Fiber.interrupt, - onNone: () => Effect.void, - })), - Effect.andThen(Result.forkEffect(this.f(key))), - Effect.tap(([result, fiber]) => ), + Effect.andThen(this.interrupt()), + Effect.andThen(Result.unsafeForkEffect(this.f(key))), + Effect.tap(([, fiber]) => SubscriptionRef.set(this.fiber, Option.some(fiber))), + Effect.andThen(([sub]) => Effect.all([Effect.succeed(sub), sub.get])), + Effect.andThen(([sub, initial]) => Stream.runFoldEffect( + sub.changes, + initial, + (_, result) => Effect.as(SubscriptionRef.set(this.result, result), result), + )), + Effect.tap(SubscriptionRef.set(this.fiber, Option.none())), ) } } export const isQuery = (u: unknown): u is Query => Predicate.hasProperty(u, QueryTypeId) +export declare namespace make { + export interface Options { + readonly key: Stream.Stream + readonly f: (key: NoInfer) => Effect.Effect + readonly initialProgress?: P + } +} + +export const make = Effect.fnUntraced(function* ( + options: make.Options +) { + return new QueryImpl( + options.key, + options.f, + options.initialProgress as P, + + yield* SubscriptionRef.make(Option.none>()), + yield* SubscriptionRef.make(Result.initial() as Result.Result), + ) +}) + export const run = ( self: Query -) => Stream.runForEach(self.key, -) - -export const query = ( - self: Query, - key: K, -) => self.fiberRef.pipe( - Effect.andThen(Option.match({ - onSome: Fiber.interrupt, - onNone: () => Effect.void, - })), - Effect.andThen(Effect.forkScoped( - - )) -) +) => Stream.runForEach(self.key, key => Effect.andThen( + (self as QueryImpl).interrupt(), + Effect.forkScoped((self as QueryImpl).query(key)), +))