diff --git a/benchmark.mjs b/benchmark.mjs index 14a145f..b00d9b1 100644 --- a/benchmark.mjs +++ b/benchmark.mjs @@ -1,5 +1,5 @@ // Simple benchmark for rescript-signals -import { Signal, Computed, Effect } from './src/Signals.res.mjs'; +import { Signal, Computed, Effect } from './packages/rescript-signals/src/Signals.res.mjs'; import { writeFileSync, mkdirSync, existsSync } from 'fs'; const results = []; @@ -135,7 +135,7 @@ console.log('\n--- Effects ---'); { let effectCount = 0; const source = Signal.make(0); - const effect = Effect.run(() => { + const effect = Effect.runWithDisposer(() => { Signal.get(source); effectCount++; return undefined; diff --git a/packages/rescript-signals/rescript.json b/packages/rescript-signals/rescript.json index 5e7c222..859cd6f 100644 --- a/packages/rescript-signals/rescript.json +++ b/packages/rescript-signals/rescript.json @@ -1,6 +1,5 @@ { "name": "rescript-signals", - "namespace": "Signals", "sources": [ { "dir": "src", diff --git a/packages/rescript-signals/src/Signals.res b/packages/rescript-signals/src/Signals.res index 4077dd8..453963e 100644 --- a/packages/rescript-signals/src/Signals.res +++ b/packages/rescript-signals/src/Signals.res @@ -1,3 +1,11 @@ -module Signal = Signal -module Computed = Computed -module Effect = Effect +module Signal = { + include Signals__Signal +} + +module Computed = { + include Signals__Computed +} + +module Effect = { + include Signals__Effect +} diff --git a/packages/rescript-signals/src/signals/Core.res b/packages/rescript-signals/src/signals/Core.res index 73c9109..fbccdb0 100644 --- a/packages/rescript-signals/src/signals/Core.res +++ b/packages/rescript-signals/src/signals/Core.res @@ -1,275 +1 @@ -// Core types for the reactive system using linked lists -// All types defined here to handle circular references - -// Bitwise flags for observer state (avoids object property overhead) -let flag_dirty = 1 -let flag_pending = 2 -let flag_running = 4 - -// Global tracking version -let trackingVersion: ref = ref(0) -// Global mutation version (increments on real signal writes) -let globalVersion: ref = ref(0) - -type kind = [#Effect | #Computed] - -module rec Link: { - type t = { - // Direct reference to signal's subscriber list (type-erased) - mutable subs: Subs.t, - // Direct reference to observer - mutable observer: Observer.t, - // Links in the observer's dependency chain - mutable nextDep: option, - mutable prevDep: option, - // Links in the signal's subscriber chain - mutable nextSub: option, - mutable prevSub: option, - // Version stamp for duplicate detection within a compute cycle - mutable lastTrackedVersion: int, - } -} = Link - -// Signal subscriber list (head/tail of linked list) -// For computeds, this same object also serves as the observer (combined structure) -and Subs: { - type t = { - mutable first: option, - mutable last: option, - mutable computedSubscriberCount: int, - mutable version: int, - // === Observer fields (only used for computeds) === - // If compute is Some, this subs is a computed signal - mutable compute: option unit>, - mutable firstDep: option, - mutable lastDep: option, - mutable flags: int, - mutable level: int, - mutable deferEffectsUntilRecompute: bool, - mutable lastGlobalVersion: int, - } -} = Subs - -// Observer for effects only (computeds use subs directly) -and Observer: { - type t = { - id: int, - kind: kind, - run: unit => unit, - mutable firstDep: option, - mutable lastDep: option, - mutable flags: int, - mutable level: int, - name: option, - // For computed observers: direct reference to backing subs (the combined object) - mutable backingSubs: option, - } -} = Observer - -// Type aliases for convenience -type link = Link.t -type subs = Subs.t -type observer = Observer.t - -// Create empty subscriber list (for plain signals) -let makeSubs = (): subs => { - first: None, - last: None, - computedSubscriberCount: 0, - version: 0, - compute: None, - firstDep: None, - lastDep: None, - flags: 0, - level: 0, - deferEffectsUntilRecompute: false, - lastGlobalVersion: 0, -} - -// Create subs for a computed (with compute function) -let makeComputedSubs = (compute: unit => unit, ~deferEffectsUntilRecompute: bool=false): subs => { - first: None, - last: None, - computedSubscriberCount: 0, - version: 0, - compute: Some(compute), - firstDep: None, - lastDep: None, - flags: flag_dirty, // start dirty - level: 0, - deferEffectsUntilRecompute, - lastGlobalVersion: 0, -} - -// Create observer -let makeObserver = ( - id: int, - kind: kind, - run: unit => unit, - ~name: option=?, - ~backingSubs: option=?, -): observer => { - id, - kind, - run, - firstDep: None, - lastDep: None, - flags: flag_dirty, // start dirty - level: 0, - name, - backingSubs, -} - -// Flag operations for observer -let isDirty = (o: observer): bool => Int.bitwiseAnd(o.flags, flag_dirty) !== 0 -let setDirty = (o: observer): unit => o.flags = Int.bitwiseOr(o.flags, flag_dirty) -let clearDirty = (o: observer): unit => - o.flags = Int.bitwiseAnd(o.flags, Int.bitwiseNot(flag_dirty)) -let isPending = (o: observer): bool => Int.bitwiseAnd(o.flags, flag_pending) !== 0 -let setPending = (o: observer): unit => o.flags = Int.bitwiseOr(o.flags, flag_pending) -let clearPending = (o: observer): unit => - o.flags = Int.bitwiseAnd(o.flags, Int.bitwiseNot(flag_pending)) - -// Flag operations for subs -let isSubsDirty = (s: subs): bool => Int.bitwiseAnd(s.flags, flag_dirty) !== 0 -let setSubsDirty = (s: subs): unit => s.flags = Int.bitwiseOr(s.flags, flag_dirty) -let clearSubsDirty = (s: subs): unit => - s.flags = Int.bitwiseAnd(s.flags, Int.bitwiseNot(flag_dirty)) -let isSubsPending = (s: subs): bool => Int.bitwiseAnd(s.flags, flag_pending) !== 0 -let setSubsPending = (s: subs): unit => s.flags = Int.bitwiseOr(s.flags, flag_pending) -let clearSubsPending = (s: subs): unit => - s.flags = Int.bitwiseAnd(s.flags, Int.bitwiseNot(flag_pending)) - -// Check if subs is a computed -let isComputed = (s: subs): bool => s.compute !== None - -// Create a link node -let makeLink = (sourceSubs: subs, linkedObserver: observer): link => { - { - subs: sourceSubs, - observer: linkedObserver, - nextDep: None, - prevDep: None, - nextSub: None, - prevSub: None, - lastTrackedVersion: 0, - } -} - -// Add link to signal's subscriber list -let linkToSubs = (subs: subs, link: link): unit => { - link.prevSub = subs.last - link.nextSub = None - switch subs.last { - | Some(last) => last.nextSub = Some(link) - | None => subs.first = Some(link) - } - subs.last = Some(link) - - let linkedSubs = (Obj.magic(link.observer): subs) - if isComputed(linkedSubs) { - subs.computedSubscriberCount = subs.computedSubscriberCount + 1 - } -} - -// Add link to observer's dependency list -let linkToDeps = (observer: observer, link: link): unit => { - link.prevDep = observer.lastDep - link.nextDep = None - switch observer.lastDep { - | Some(last) => last.nextDep = Some(link) - | None => observer.firstDep = Some(link) - } - observer.lastDep = Some(link) -} - -// Remove link from subscriber list -let unlinkFromSubs = (link: link): unit => { - let subs = link.subs - switch link.prevSub { - | Some(prev) => prev.nextSub = link.nextSub - | None => subs.first = link.nextSub - } - switch link.nextSub { - | Some(next) => next.prevSub = link.prevSub - | None => subs.last = link.prevSub - } - link.prevSub = None - link.nextSub = None - - let linkedSubs = (Obj.magic(link.observer): subs) - if isComputed(linkedSubs) && subs.computedSubscriberCount > 0 { - subs.computedSubscriberCount = subs.computedSubscriberCount - 1 - } -} - -// Remove link from dependency list -let unlinkFromDeps = (observer: observer, link: link): unit => { - switch link.prevDep { - | Some(prev) => prev.nextDep = link.nextDep - | None => observer.firstDep = link.nextDep - } - switch link.nextDep { - | Some(next) => next.prevDep = link.prevDep - | None => observer.lastDep = link.prevDep - } - link.prevDep = None - link.nextDep = None -} - -// Remove link from subs's dependency list (for computeds - subs IS the observer) -let unlinkFromSubsDeps = (s: subs, link: link): unit => { - switch link.prevDep { - | Some(prev) => prev.nextDep = link.nextDep - | None => s.firstDep = link.nextDep - } - switch link.nextDep { - | Some(next) => next.prevDep = link.prevDep - | None => s.lastDep = link.prevDep - } - link.prevDep = None - link.nextDep = None -} - -// Clear all dependencies from observer (unlinks from all signals) -let clearDeps = (observer: observer): unit => { - let link = ref(observer.firstDep) - while link.contents !== None { - switch link.contents { - | Some(l) => - let next = l.nextDep - unlinkFromSubs(l) - link := next - | None => () - } - } - observer.firstDep = None - observer.lastDep = None -} - -// Clear all dependencies from subs (for computeds - subs IS the observer) -let clearSubsDeps = (s: subs): unit => { - let link = ref(s.firstDep) - while link.contents !== None { - switch link.contents { - | Some(l) => - let next = l.nextDep - unlinkFromSubs(l) - link := next - | None => () - } - } - s.firstDep = None - s.lastDep = None -} - -// Add link to subs's dependency list (for computeds - subs IS the observer) -let linkToSubsDeps = (s: subs, link: link): unit => { - link.prevDep = s.lastDep - link.nextDep = None - switch s.lastDep { - | Some(last) => last.nextDep = Some(link) - | None => s.firstDep = Some(link) - } - s.lastDep = Some(link) -} +include Signals__Core diff --git a/packages/rescript-signals/src/signals/Id.res b/packages/rescript-signals/src/signals/Id.res index ad1e4c9..3b0e3a2 100644 --- a/packages/rescript-signals/src/signals/Id.res +++ b/packages/rescript-signals/src/signals/Id.res @@ -1,5 +1 @@ -let next = ref(0) -let make = () => { - next := next.contents + 1 - next.contents -} \ No newline at end of file +include Signals__Id diff --git a/packages/rescript-signals/src/signals/Scheduler.res b/packages/rescript-signals/src/signals/Scheduler.res index 640fa04..ed31002 100644 --- a/packages/rescript-signals/src/signals/Scheduler.res +++ b/packages/rescript-signals/src/signals/Scheduler.res @@ -1,632 +1 @@ -// Current execution context for computeds (subs IS the observer) -let currentComputedSubs: ref> = ref(None) - -// Current execution context for effects -let currentObserver: ref> = ref(None) - -// Current dependency tracking version (shared across nested compute/effect runs) -let currentTrackingVersion: ref = ref(0) - -// Per-run dependency cursors (separate from true tail pointers). -let currentComputedDepCursor: ref> = ref(None) -let currentObserverDepCursor: ref> = ref(None) - -// Pending effects to execute -let pendingEffects: array = [] -// Pending computeds to recompute (subs that are dirty) -let pendingComputedSubs: array = [] -let flushing: ref = ref(false) -let pendingEffectsNeedsSort: ref = ref(false) -let pendingComputedNeedsSort: ref = ref(false) -let lastEnqueuedEffectLevel: ref = ref(0) -let lastEnqueuedComputedLevel: ref = ref(0) - -// Queue for iterative dirty marking -let dirtyQueue: array = [] - -// Efficient array clear -let clearArray: array<'a> => unit = %raw(`function(arr) { arr.length = 0 }`) -let drainProcessedPrefix: (array<'a>, int) => unit = %raw(` -function(arr, processedCount) { - if (processedCount <= 0) return; - if (processedCount >= arr.length) { - arr.length = 0; - return; - } - arr.copyWithin(0, processedCount); - arr.length = arr.length - processedCount; -} -`) - -// Add effect to pending if not already there -let addEffectToPending = (observer: Core.observer): unit => { - if !Core.isPending(observer) { - Core.setPending(observer) - let lengthBefore = pendingEffects->Array.length - if lengthBefore == 0 { - pendingEffectsNeedsSort := false - } else if observer.level < lastEnqueuedEffectLevel.contents { - pendingEffectsNeedsSort := true - } - pendingEffects->Array.push(observer)->ignore - lastEnqueuedEffectLevel := observer.level - } -} - -// Add computed to pending if not already there -let addComputedToPending = (subs: Core.subs): unit => { - if !Core.isSubsPending(subs) { - Core.setSubsPending(subs) - let lengthBefore = pendingComputedSubs->Array.length - if lengthBefore == 0 { - pendingComputedNeedsSort := false - } else if subs.level < lastEnqueuedComputedLevel.contents { - pendingComputedNeedsSort := true - } - pendingComputedSubs->Array.push(subs)->ignore - lastEnqueuedComputedLevel := subs.level - } -} - -// Track a dependency from a computed (subs tracks subs) -let trackDepFromComputed = (computedSubs: Core.subs, sourceSubs: Core.subs): unit => { - let computedObserver: Core.observer = Obj.magic(computedSubs) - - if computedSubs.firstDep === None { - let newLink: Core.link = Core.makeLink(sourceSubs, computedObserver) - newLink.lastTrackedVersion = currentTrackingVersion.contents - Core.linkToSubsDeps(computedSubs, newLink) - Core.linkToSubs(sourceSubs, newLink) - currentComputedDepCursor := Some(newLink) - } else { - let currentVersion = currentTrackingVersion.contents - // Fast path: reuse run cursor or cursor.next to avoid scanning in common cases. - let fastPathFound = ref(false) - switch currentComputedDepCursor.contents { - | Some(cursor) => - if cursor.subs === sourceSubs && cursor.observer === computedObserver { - cursor.lastTrackedVersion = currentVersion - fastPathFound.contents = true - } else { - switch cursor.nextDep { - | Some(nextDep) => - if nextDep.subs === sourceSubs && nextDep.observer === computedObserver { - nextDep.lastTrackedVersion = currentVersion - currentComputedDepCursor := Some(nextDep) - fastPathFound.contents = true - } - | None => () - } - } - | None => () - } - - if !fastPathFound.contents { - switch sourceSubs.last { - | Some(lastSubLink) => - if lastSubLink.lastTrackedVersion === currentVersion && lastSubLink.observer === computedObserver { - lastSubLink.lastTrackedVersion = currentVersion - currentComputedDepCursor := Some(lastSubLink) - fastPathFound.contents = true - } - | None => () - } - } - - if !fastPathFound.contents { - // Fall back to full scan - let found = ref(false) - let foundLink: ref> = ref(None) - let link = ref(computedSubs.firstDep) - while link.contents !== None && !found.contents { - switch link.contents { - | Some(l) => - if l.subs === sourceSubs { - l.lastTrackedVersion = currentVersion - foundLink := Some(l) - found := true - } else { - link := l.nextDep - } - | None => () - } - } - - // Create new link only if not found - if !found.contents { - let newLink: Core.link = Core.makeLink(sourceSubs, computedObserver) - newLink.lastTrackedVersion = currentVersion - Core.linkToSubsDeps(computedSubs, newLink) - Core.linkToSubs(sourceSubs, newLink) - currentComputedDepCursor := Some(newLink) - } else { - currentComputedDepCursor := foundLink.contents - } - } - } -} - -// Track a dependency from an effect (observer tracks subs) -// Uses version-based duplicate detection within a run cycle -let trackDepFromEffect = (observer: Core.observer, sourceSubs: Core.subs): unit => { - if observer.firstDep === None { - let newLink: Core.link = Core.makeLink(sourceSubs, observer) - newLink.lastTrackedVersion = currentTrackingVersion.contents - Core.linkToDeps(observer, newLink) - Core.linkToSubs(sourceSubs, newLink) - currentObserverDepCursor := Some(newLink) - } else { - let currentVersion = currentTrackingVersion.contents - // Fast path: reuse run cursor or cursor.next to avoid scanning in common cases. - let fastPathFound = ref(false) - switch currentObserverDepCursor.contents { - | Some(cursor) => - if cursor.subs === sourceSubs && cursor.observer === observer { - cursor.lastTrackedVersion = currentVersion - fastPathFound.contents = true - } else { - switch cursor.nextDep { - | Some(nextDep) => - if nextDep.subs === sourceSubs && nextDep.observer === observer { - nextDep.lastTrackedVersion = currentVersion - currentObserverDepCursor := Some(nextDep) - fastPathFound.contents = true - } - | None => () - } - } - | None => () - } - - if !fastPathFound.contents { - switch sourceSubs.last { - | Some(lastSubLink) => - if lastSubLink.lastTrackedVersion === currentVersion && lastSubLink.observer === observer { - lastSubLink.lastTrackedVersion = currentVersion - currentObserverDepCursor := Some(lastSubLink) - fastPathFound.contents = true - } - | None => () - } - } - - if !fastPathFound.contents { - let found = ref(false) - let foundLink: ref> = ref(None) - let link = ref(observer.firstDep) - while link.contents !== None && !found.contents { - switch link.contents { - | Some(l) => - if l.subs === sourceSubs { - l.lastTrackedVersion = currentVersion - foundLink := Some(l) - found := true - } else { - link := l.nextDep - } - | None => () - } - } - - // Create new link only if not found - if !found.contents { - let newLink: Core.link = Core.makeLink(sourceSubs, observer) - newLink.lastTrackedVersion = currentVersion - Core.linkToDeps(observer, newLink) - Core.linkToSubs(sourceSubs, newLink) - currentObserverDepCursor := Some(newLink) - } else { - currentObserverDepCursor := foundLink.contents - } - } - } -} - -// Track dependency - routes to appropriate function based on current context -let trackDep = (subs: Core.subs): unit => { - switch currentComputedSubs.contents { - | Some(computedSubs) => trackDepFromComputed(computedSubs, subs) - | None => - switch currentObserver.contents { - | Some(observer) => trackDepFromEffect(observer, subs) - | None => () - } - } -} - -// Compare by level for sorting -let compareEffectsByLevel = (a: Core.observer, b: Core.observer): float => { - Int.toFloat(a.level - b.level) -} - -let compareSubsByLevel = (a: Core.subs, b: Core.subs): float => { - Int.toFloat(a.level - b.level) -} - -// Compute level for a computed (based on its dependencies) -let computeSubsLevel = (s: Core.subs): int => { - let maxLevel = ref(0) - let link = ref(s.firstDep) - while link.contents !== None { - switch link.contents { - | Some(l) => - // Check if the source is a computed - if Core.isComputed(l.subs) { - if l.subs.level > maxLevel.contents { - maxLevel := l.subs.level - } - } - link := l.nextDep - | None => () - } - } - maxLevel.contents + 1 -} - -// Compute level for an effect -let computeLevel = (observer: Core.observer): int => { - let maxLevel = ref(0) - let link = ref(observer.firstDep) - while link.contents !== None { - switch link.contents { - | Some(l) => - if Core.isComputed(l.subs) { - if l.subs.level > maxLevel.contents { - maxLevel := l.subs.level - } - } - link := l.nextDep - | None => () - } - } - maxLevel.contents + 1 -} - -// Run one computed recompute cycle with link reuse. -let runComputedCycle = (subs: Core.subs, ~clearPending: bool): unit => { - let previousTrackingVersion = currentTrackingVersion.contents - let previousVersion = subs.version - - // Increment tracking version for this cycle - Core.trackingVersion := Core.trackingVersion.contents + 1 - currentTrackingVersion.contents = Core.trackingVersion.contents - - // DON'T clear deps - we'll reuse existing links - if clearPending { - Core.clearSubsPending(subs) - } - - let prev = currentComputedSubs.contents - let prevCursor = currentComputedDepCursor.contents - currentComputedSubs := Some(subs) - currentComputedDepCursor := subs.firstDep - - try { - switch subs.compute { - | Some(compute) => compute() - | None => () - } - - // After compute: unlink stale deps (version != current) - let link = ref(subs.firstDep) - while link.contents !== None { - switch link.contents { - | Some(l) => - let next = l.nextDep - if l.lastTrackedVersion !== currentTrackingVersion.contents { - // Stale - unlink from source's subscriber list and our dep list - Core.unlinkFromSubs(l) - Core.unlinkFromSubsDeps(subs, l) - } - link := next - | None => () - } - } - - Core.clearSubsDirty(subs) - subs.lastGlobalVersion = Core.globalVersion.contents - - // Propagate only when computed output changed. - if subs.first !== None && subs.version !== previousVersion { - let subLink = ref(subs.first) - while subLink.contents !== None { - switch subLink.contents { - | Some(l) => - let linkedSubs = (Obj.magic(l.observer): Core.subs) - if Core.isComputed(linkedSubs) { - // Mark downstream computed dirty (lazy propagation). - Core.setSubsDirty(linkedSubs) - } else { - // Effects get queued for execution unless this effect is already running. - switch currentObserver.contents { - | Some(currentObserver) => - if currentObserver !== l.observer { - addEffectToPending(l.observer) - } - | None => addEffectToPending(l.observer) - } - } - subLink := l.nextSub - | None => () - } - } - } - - currentComputedSubs := prev - currentComputedDepCursor := prevCursor - currentTrackingVersion.contents = previousTrackingVersion - } catch { - | exn => - currentComputedSubs := prev - currentComputedDepCursor := prevCursor - currentTrackingVersion.contents = previousTrackingVersion - throw(exn) - } -} - -// Retrack a computed (recompute with link reuse) -let retrackComputed = (s: Core.subs): unit => { - let oldLevel = s.level - runComputedCycle(s, ~clearPending=true) - - if oldLevel == 0 { - s.level = computeSubsLevel(s) - } -} - -// Retrack an effect (with link reuse) -let retrackEffect = (observer: Core.observer): unit => { - let oldLevel = observer.level - let previousTrackingVersion = currentTrackingVersion.contents - - // Increment tracking version for this cycle - Core.trackingVersion := Core.trackingVersion.contents + 1 - currentTrackingVersion.contents = Core.trackingVersion.contents - - // DON'T clear deps - we'll reuse existing links - Core.clearPending(observer) - - let prev = currentObserver.contents - let prevCursor = currentObserverDepCursor.contents - currentObserver := Some(observer) - currentObserverDepCursor := observer.firstDep - - try { - observer.run() - - // After run: unlink stale deps (version != current) - let link = ref(observer.firstDep) - while link.contents !== None { - switch link.contents { - | Some(l) => - let next = l.nextDep - if l.lastTrackedVersion !== currentTrackingVersion.contents { - // Stale - unlink from source's subscriber list and our dep list - Core.unlinkFromSubs(l) - Core.unlinkFromDeps(observer, l) - } - link := next - | None => () - } - } - - Core.clearDirty(observer) - currentObserver := prev - currentObserverDepCursor := prevCursor - currentTrackingVersion.contents = previousTrackingVersion - } catch { - | exn => - currentObserver := prev - currentObserverDepCursor := prevCursor - currentTrackingVersion.contents = previousTrackingVersion - throw(exn) - } - - if oldLevel == 0 { - observer.level = computeLevel(observer) - } -} - -// Flush pending observers -let flush = (): unit => { - flushing := true - - try { - while pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0 { - // Process computeds first (they might trigger more effects) - if pendingComputedSubs->Array.length > 0 { - let computedsLength = pendingComputedSubs->Array.length - if computedsLength > 1 && pendingComputedNeedsSort.contents { - // Sort by level - pendingComputedSubs->Array.sort(compareSubsByLevel)->ignore - pendingComputedNeedsSort := false - } - let i = ref(0) - while i.contents < computedsLength { - switch pendingComputedSubs->Array.get(i.contents) { - | Some(subs) => retrackComputed(subs) - | None => () - } - i := i.contents + 1 - } - drainProcessedPrefix(pendingComputedSubs, computedsLength) - if pendingComputedSubs->Array.length == 0 { - pendingComputedNeedsSort := false - } - } - - // Then process effects - if pendingEffects->Array.length > 0 { - let effectsLength = pendingEffects->Array.length - if effectsLength > 1 && pendingEffectsNeedsSort.contents { - pendingEffects->Array.sort(compareEffectsByLevel)->ignore - pendingEffectsNeedsSort := false - } - let i = ref(0) - while i.contents < effectsLength { - switch pendingEffects->Array.get(i.contents) { - | Some(effect) => retrackEffect(effect) - | None => () - } - i := i.contents + 1 - } - drainProcessedPrefix(pendingEffects, effectsLength) - if pendingEffects->Array.length == 0 { - pendingEffectsNeedsSort := false - } - } - } - - flushing := false - } catch { - | exn => - flushing := false - throw(exn) - } -} - -// Notify all subscribers of a signal (traverse linked list) -// Marks computeds dirty transitively. -// Direct effects are queued immediately. -// Effects reached through dirty computeds are deferred until parent computed recompute. -let notifySubs = (subs: Core.subs): unit => { - // Fast path: no subscribers, nothing to notify. - if subs.first === None { - () - } else if !Core.isComputed(subs) && subs.computedSubscriberCount == 0 { - // Fast path for plain signals with direct effect subscribers only. - let link = ref(subs.first) - while link.contents !== None { - switch link.contents { - | Some(l) => - addEffectToPending(l.observer) - link := l.nextSub - | None => () - } - } - } else { - dirtyQueue->Array.push(subs)->ignore - - let i = ref(0) - while i.contents < dirtyQueue->Array.length { - let currentSubs = dirtyQueue->Array.get(i.contents) - i := i.contents + 1 - switch currentSubs { - | None => () - | Some(s) => - let link = ref(s.first) - while link.contents !== None { - switch link.contents { - | Some(l) => - // The observer field might be a real observer (effect) or a subs (computed) - let linkedSubs = (Obj.magic(l.observer): Core.subs) - if Core.isComputed(linkedSubs) { - // It's a computed - mark dirty and propagate transitively - if !Core.isSubsDirty(linkedSubs) { - Core.setSubsDirty(linkedSubs) - dirtyQueue->Array.push(linkedSubs)->ignore - } - } else { - // It's an effect. - // If reached via a dirty computed, defer effect until computed recompute. - // This lets computed equality short-circuit downstream effect runs. - if Core.isComputed(s) { - if s.deferEffectsUntilRecompute { - addComputedToPending(s) - } else { - addEffectToPending(l.observer) - } - } else { - addEffectToPending(l.observer) - } - } - link := l.nextSub - | None => () - } - } - } - } - clearArray(dirtyQueue) - } - - if (pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0) && !flushing.contents { - flush() - } -} - -// Ensure a computed signal is fresh before reading (with link reuse) -let ensureComputedFresh = (subs: Core.subs): unit => { - if Core.isComputed(subs) { - if Core.isSubsDirty(subs) { - // Dirty without a newer global write means stale dirty flag only. - if subs.lastGlobalVersion === Core.globalVersion.contents { - Core.clearSubsDirty(subs) - } else { - let oldLevel = subs.level - runComputedCycle(subs, ~clearPending=false) - - if oldLevel == 0 { - subs.level = computeSubsLevel(subs) - } - } - } - } -} - -// Schedule an effect for execution -let schedule = (observer: Core.observer): unit => { - addEffectToPending(observer) - if !flushing.contents { - flush() - } -} - -// Batch multiple updates -let batch = fn => { - let wasFlushing = flushing.contents - flushing := true - - try { - let result = fn() - if !wasFlushing { - flushing := false - if pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0 { - flush() - } - } - result - } catch { - | exn => - if !wasFlushing { - flushing := false - } - throw(exn) - } -} - -// Execute without tracking dependencies -let untrack = (fn: unit => 'a): 'a => { - let prevComputed = currentComputedSubs.contents - let prevObserver = currentObserver.contents - let prevComputedCursor = currentComputedDepCursor.contents - let prevObserverCursor = currentObserverDepCursor.contents - currentComputedSubs := None - currentObserver := None - currentComputedDepCursor := None - currentObserverDepCursor := None - try { - let result = fn() - currentComputedSubs := prevComputed - currentObserver := prevObserver - currentComputedDepCursor := prevComputedCursor - currentObserverDepCursor := prevObserverCursor - result - } catch { - | exn => - currentComputedSubs := prevComputed - currentObserver := prevObserver - currentComputedDepCursor := prevComputedCursor - currentObserverDepCursor := prevObserverCursor - throw(exn) - } -} +include Signals__Scheduler diff --git a/packages/rescript-signals/src/signals/Computed.res b/packages/rescript-signals/src/signals/Signals__Computed.res similarity index 65% rename from packages/rescript-signals/src/signals/Computed.res rename to packages/rescript-signals/src/signals/Signals__Computed.res index 683f2e4..e8cea52 100644 --- a/packages/rescript-signals/src/signals/Computed.res +++ b/packages/rescript-signals/src/signals/Signals__Computed.res @@ -1,13 +1,13 @@ let makeWithoutEquals = ( compute: unit => 'a, ~name: option=?, -): Signal.t<'a> => { - let id = Id.make() +): Signals__Signal.t<'a> => { + let id = Signals__Id.make() let equalsFn: ('a, 'a) => bool = (_a, _b) => false // Create a mutable ref to hold the signal so the compute function can update it // Using Obj.magic to avoid Option wrapper overhead - let signalRef: ref> = ref(Obj.magic()) + let signalRef: ref> = ref(Obj.magic()) // Fast recompute path for default behavior (no custom equality checks) let recompute = () => { @@ -18,16 +18,16 @@ let makeWithoutEquals = ( } // Create combined subs (this IS the observer for the computed) - let subs = Core.makeComputedSubs(recompute, ~deferEffectsUntilRecompute=false) + let subs = Signals__Core.makeComputedSubs(recompute, ~deferEffectsUntilRecompute=false) // Initial computation under tracking to establish dependencies - let prev = Scheduler.currentComputedSubs.contents - Scheduler.currentComputedSubs := Some(subs) + let prev = Signals__Scheduler.currentComputedSubs.contents + Signals__Scheduler.currentComputedSubs := Some(subs) let initialValue = compute() - Scheduler.currentComputedSubs := prev + Signals__Scheduler.currentComputedSubs := prev // Create the signal with the initial value - let signal: Signal.t<'a> = { + let signal: Signals__Signal.t<'a> = { id, value: initialValue, equals: equalsFn, @@ -37,8 +37,8 @@ let makeWithoutEquals = ( // Set the ref so recompute can access the signal signalRef := signal - subs.lastGlobalVersion = Core.globalVersion.contents - Core.clearSubsDirty(subs) + subs.lastGlobalVersion = Signals__Core.globalVersion.contents + Signals__Core.clearSubsDirty(subs) signal } @@ -47,12 +47,12 @@ let makeWithEquals = ( compute: unit => 'a, equalsFn: ('a, 'a) => bool, ~name: option=?, -): Signal.t<'a> => { - let id = Id.make() +): Signals__Signal.t<'a> => { + let id = Signals__Id.make() // Create a mutable ref to hold the signal so the compute function can update it // Using Obj.magic to avoid Option wrapper overhead - let signalRef: ref> = ref(Obj.magic()) + let signalRef: ref> = ref(Obj.magic()) // Recompute function - updates the signal's value and tracks if it changed let recompute = () => { @@ -71,16 +71,16 @@ let makeWithEquals = ( } // Create combined subs (this IS the observer for the computed) - let subs = Core.makeComputedSubs(recompute, ~deferEffectsUntilRecompute=true) + let subs = Signals__Core.makeComputedSubs(recompute, ~deferEffectsUntilRecompute=true) // Initial computation under tracking to establish dependencies - let prev = Scheduler.currentComputedSubs.contents - Scheduler.currentComputedSubs := Some(subs) + let prev = Signals__Scheduler.currentComputedSubs.contents + Signals__Scheduler.currentComputedSubs := Some(subs) let initialValue = compute() - Scheduler.currentComputedSubs := prev + Signals__Scheduler.currentComputedSubs := prev // Create the signal with the initial value - let signal: Signal.t<'a> = { + let signal: Signals__Signal.t<'a> = { id, value: initialValue, equals: equalsFn, @@ -90,8 +90,8 @@ let makeWithEquals = ( // Set the ref so recompute can access the signal signalRef := signal - subs.lastGlobalVersion = Core.globalVersion.contents - Core.clearSubsDirty(subs) + subs.lastGlobalVersion = Signals__Core.globalVersion.contents + Signals__Core.clearSubsDirty(subs) signal } @@ -100,12 +100,12 @@ let make = ( compute: unit => 'a, ~name: option=?, ~equals: option<('a, 'a) => bool>=?, -): Signal.t<'a> => +): Signals__Signal.t<'a> => switch equals { | Some(eq) => makeWithEquals(compute, eq, ~name?) | None => makeWithoutEquals(compute, ~name?) } -let dispose = (signal: Signal.t<'a>): unit => { - Core.clearSubsDeps(signal.subs) +let dispose = (signal: Signals__Signal.t<'a>): unit => { + Signals__Core.clearSubsDeps(signal.subs) } diff --git a/packages/rescript-signals/src/signals/Signals__Core.res b/packages/rescript-signals/src/signals/Signals__Core.res new file mode 100644 index 0000000..73c9109 --- /dev/null +++ b/packages/rescript-signals/src/signals/Signals__Core.res @@ -0,0 +1,275 @@ +// Core types for the reactive system using linked lists +// All types defined here to handle circular references + +// Bitwise flags for observer state (avoids object property overhead) +let flag_dirty = 1 +let flag_pending = 2 +let flag_running = 4 + +// Global tracking version +let trackingVersion: ref = ref(0) +// Global mutation version (increments on real signal writes) +let globalVersion: ref = ref(0) + +type kind = [#Effect | #Computed] + +module rec Link: { + type t = { + // Direct reference to signal's subscriber list (type-erased) + mutable subs: Subs.t, + // Direct reference to observer + mutable observer: Observer.t, + // Links in the observer's dependency chain + mutable nextDep: option, + mutable prevDep: option, + // Links in the signal's subscriber chain + mutable nextSub: option, + mutable prevSub: option, + // Version stamp for duplicate detection within a compute cycle + mutable lastTrackedVersion: int, + } +} = Link + +// Signal subscriber list (head/tail of linked list) +// For computeds, this same object also serves as the observer (combined structure) +and Subs: { + type t = { + mutable first: option, + mutable last: option, + mutable computedSubscriberCount: int, + mutable version: int, + // === Observer fields (only used for computeds) === + // If compute is Some, this subs is a computed signal + mutable compute: option unit>, + mutable firstDep: option, + mutable lastDep: option, + mutable flags: int, + mutable level: int, + mutable deferEffectsUntilRecompute: bool, + mutable lastGlobalVersion: int, + } +} = Subs + +// Observer for effects only (computeds use subs directly) +and Observer: { + type t = { + id: int, + kind: kind, + run: unit => unit, + mutable firstDep: option, + mutable lastDep: option, + mutable flags: int, + mutable level: int, + name: option, + // For computed observers: direct reference to backing subs (the combined object) + mutable backingSubs: option, + } +} = Observer + +// Type aliases for convenience +type link = Link.t +type subs = Subs.t +type observer = Observer.t + +// Create empty subscriber list (for plain signals) +let makeSubs = (): subs => { + first: None, + last: None, + computedSubscriberCount: 0, + version: 0, + compute: None, + firstDep: None, + lastDep: None, + flags: 0, + level: 0, + deferEffectsUntilRecompute: false, + lastGlobalVersion: 0, +} + +// Create subs for a computed (with compute function) +let makeComputedSubs = (compute: unit => unit, ~deferEffectsUntilRecompute: bool=false): subs => { + first: None, + last: None, + computedSubscriberCount: 0, + version: 0, + compute: Some(compute), + firstDep: None, + lastDep: None, + flags: flag_dirty, // start dirty + level: 0, + deferEffectsUntilRecompute, + lastGlobalVersion: 0, +} + +// Create observer +let makeObserver = ( + id: int, + kind: kind, + run: unit => unit, + ~name: option=?, + ~backingSubs: option=?, +): observer => { + id, + kind, + run, + firstDep: None, + lastDep: None, + flags: flag_dirty, // start dirty + level: 0, + name, + backingSubs, +} + +// Flag operations for observer +let isDirty = (o: observer): bool => Int.bitwiseAnd(o.flags, flag_dirty) !== 0 +let setDirty = (o: observer): unit => o.flags = Int.bitwiseOr(o.flags, flag_dirty) +let clearDirty = (o: observer): unit => + o.flags = Int.bitwiseAnd(o.flags, Int.bitwiseNot(flag_dirty)) +let isPending = (o: observer): bool => Int.bitwiseAnd(o.flags, flag_pending) !== 0 +let setPending = (o: observer): unit => o.flags = Int.bitwiseOr(o.flags, flag_pending) +let clearPending = (o: observer): unit => + o.flags = Int.bitwiseAnd(o.flags, Int.bitwiseNot(flag_pending)) + +// Flag operations for subs +let isSubsDirty = (s: subs): bool => Int.bitwiseAnd(s.flags, flag_dirty) !== 0 +let setSubsDirty = (s: subs): unit => s.flags = Int.bitwiseOr(s.flags, flag_dirty) +let clearSubsDirty = (s: subs): unit => + s.flags = Int.bitwiseAnd(s.flags, Int.bitwiseNot(flag_dirty)) +let isSubsPending = (s: subs): bool => Int.bitwiseAnd(s.flags, flag_pending) !== 0 +let setSubsPending = (s: subs): unit => s.flags = Int.bitwiseOr(s.flags, flag_pending) +let clearSubsPending = (s: subs): unit => + s.flags = Int.bitwiseAnd(s.flags, Int.bitwiseNot(flag_pending)) + +// Check if subs is a computed +let isComputed = (s: subs): bool => s.compute !== None + +// Create a link node +let makeLink = (sourceSubs: subs, linkedObserver: observer): link => { + { + subs: sourceSubs, + observer: linkedObserver, + nextDep: None, + prevDep: None, + nextSub: None, + prevSub: None, + lastTrackedVersion: 0, + } +} + +// Add link to signal's subscriber list +let linkToSubs = (subs: subs, link: link): unit => { + link.prevSub = subs.last + link.nextSub = None + switch subs.last { + | Some(last) => last.nextSub = Some(link) + | None => subs.first = Some(link) + } + subs.last = Some(link) + + let linkedSubs = (Obj.magic(link.observer): subs) + if isComputed(linkedSubs) { + subs.computedSubscriberCount = subs.computedSubscriberCount + 1 + } +} + +// Add link to observer's dependency list +let linkToDeps = (observer: observer, link: link): unit => { + link.prevDep = observer.lastDep + link.nextDep = None + switch observer.lastDep { + | Some(last) => last.nextDep = Some(link) + | None => observer.firstDep = Some(link) + } + observer.lastDep = Some(link) +} + +// Remove link from subscriber list +let unlinkFromSubs = (link: link): unit => { + let subs = link.subs + switch link.prevSub { + | Some(prev) => prev.nextSub = link.nextSub + | None => subs.first = link.nextSub + } + switch link.nextSub { + | Some(next) => next.prevSub = link.prevSub + | None => subs.last = link.prevSub + } + link.prevSub = None + link.nextSub = None + + let linkedSubs = (Obj.magic(link.observer): subs) + if isComputed(linkedSubs) && subs.computedSubscriberCount > 0 { + subs.computedSubscriberCount = subs.computedSubscriberCount - 1 + } +} + +// Remove link from dependency list +let unlinkFromDeps = (observer: observer, link: link): unit => { + switch link.prevDep { + | Some(prev) => prev.nextDep = link.nextDep + | None => observer.firstDep = link.nextDep + } + switch link.nextDep { + | Some(next) => next.prevDep = link.prevDep + | None => observer.lastDep = link.prevDep + } + link.prevDep = None + link.nextDep = None +} + +// Remove link from subs's dependency list (for computeds - subs IS the observer) +let unlinkFromSubsDeps = (s: subs, link: link): unit => { + switch link.prevDep { + | Some(prev) => prev.nextDep = link.nextDep + | None => s.firstDep = link.nextDep + } + switch link.nextDep { + | Some(next) => next.prevDep = link.prevDep + | None => s.lastDep = link.prevDep + } + link.prevDep = None + link.nextDep = None +} + +// Clear all dependencies from observer (unlinks from all signals) +let clearDeps = (observer: observer): unit => { + let link = ref(observer.firstDep) + while link.contents !== None { + switch link.contents { + | Some(l) => + let next = l.nextDep + unlinkFromSubs(l) + link := next + | None => () + } + } + observer.firstDep = None + observer.lastDep = None +} + +// Clear all dependencies from subs (for computeds - subs IS the observer) +let clearSubsDeps = (s: subs): unit => { + let link = ref(s.firstDep) + while link.contents !== None { + switch link.contents { + | Some(l) => + let next = l.nextDep + unlinkFromSubs(l) + link := next + | None => () + } + } + s.firstDep = None + s.lastDep = None +} + +// Add link to subs's dependency list (for computeds - subs IS the observer) +let linkToSubsDeps = (s: subs, link: link): unit => { + link.prevDep = s.lastDep + link.nextDep = None + switch s.lastDep { + | Some(last) => last.nextDep = Some(link) + | None => s.firstDep = Some(link) + } + s.lastDep = Some(link) +} diff --git a/packages/rescript-signals/src/signals/Effect.res b/packages/rescript-signals/src/signals/Signals__Effect.res similarity index 70% rename from packages/rescript-signals/src/signals/Effect.res rename to packages/rescript-signals/src/signals/Signals__Effect.res index 060894f..109ea98 100644 --- a/packages/rescript-signals/src/signals/Effect.res +++ b/packages/rescript-signals/src/signals/Signals__Effect.res @@ -1,7 +1,7 @@ type disposer = {dispose: unit => unit} let runWithDisposer = (fn: unit => option unit>, ~name: option=?): disposer => { - let observerId = Id.make() + let observerId = Signals__Id.make() let cleanup: ref unit>> = ref(None) // Wrapper that handles cleanup @@ -17,24 +17,24 @@ let runWithDisposer = (fn: unit => option unit>, ~name: option=? } // Create observer using Core types - let observer = Core.makeObserver(observerId, #Effect, runWithCleanup, ~name?) + let observer = Signals__Core.makeObserver(observerId, #Effect, runWithCleanup, ~name?) // Initial run under tracking (no need to clearDeps - observer is fresh) - let prev = Scheduler.currentObserver.contents - Scheduler.currentObserver := Some(observer) + let prev = Signals__Scheduler.currentObserver.contents + Signals__Scheduler.currentObserver := Some(observer) try { observer.run() - Core.clearDirty(observer) - Scheduler.currentObserver := prev + Signals__Core.clearDirty(observer) + Signals__Scheduler.currentObserver := prev } catch { | exn => - Scheduler.currentObserver := prev + Signals__Scheduler.currentObserver := prev throw(exn) } // Compute level - observer.level = Scheduler.computeLevel(observer) + observer.level = Signals__Scheduler.computeLevel(observer) // Return disposer - stores observer reference directly (no Map lookup needed) let disposed = ref(false) @@ -49,7 +49,7 @@ let runWithDisposer = (fn: unit => option unit>, ~name: option=? | None => () } - Core.clearDeps(observer) + Signals__Core.clearDeps(observer) } } diff --git a/packages/rescript-signals/src/signals/Signals__Id.res b/packages/rescript-signals/src/signals/Signals__Id.res new file mode 100644 index 0000000..ad1e4c9 --- /dev/null +++ b/packages/rescript-signals/src/signals/Signals__Id.res @@ -0,0 +1,5 @@ +let next = ref(0) +let make = () => { + next := next.contents + 1 + next.contents +} \ No newline at end of file diff --git a/packages/rescript-signals/src/signals/Signals__Scheduler.res b/packages/rescript-signals/src/signals/Signals__Scheduler.res new file mode 100644 index 0000000..0ff66f0 --- /dev/null +++ b/packages/rescript-signals/src/signals/Signals__Scheduler.res @@ -0,0 +1,632 @@ +// Current execution context for computeds (subs IS the observer) +let currentComputedSubs: ref> = ref(None) + +// Current execution context for effects +let currentObserver: ref> = ref(None) + +// Current dependency tracking version (shared across nested compute/effect runs) +let currentTrackingVersion: ref = ref(0) + +// Per-run dependency cursors (separate from true tail pointers). +let currentComputedDepCursor: ref> = ref(None) +let currentObserverDepCursor: ref> = ref(None) + +// Pending effects to execute +let pendingEffects: array = [] +// Pending computeds to recompute (subs that are dirty) +let pendingComputedSubs: array = [] +let flushing: ref = ref(false) +let pendingEffectsNeedsSort: ref = ref(false) +let pendingComputedNeedsSort: ref = ref(false) +let lastEnqueuedEffectLevel: ref = ref(0) +let lastEnqueuedComputedLevel: ref = ref(0) + +// Queue for iterative dirty marking +let dirtyQueue: array = [] + +// Efficient array clear +let clearArray: array<'a> => unit = %raw(`function(arr) { arr.length = 0 }`) +let drainProcessedPrefix: (array<'a>, int) => unit = %raw(` +function(arr, processedCount) { + if (processedCount <= 0) return; + if (processedCount >= arr.length) { + arr.length = 0; + return; + } + arr.copyWithin(0, processedCount); + arr.length = arr.length - processedCount; +} +`) + +// Add effect to pending if not already there +let addEffectToPending = (observer: Signals__Core.observer): unit => { + if !Signals__Core.isPending(observer) { + Signals__Core.setPending(observer) + let lengthBefore = pendingEffects->Array.length + if lengthBefore == 0 { + pendingEffectsNeedsSort := false + } else if observer.level < lastEnqueuedEffectLevel.contents { + pendingEffectsNeedsSort := true + } + pendingEffects->Array.push(observer)->ignore + lastEnqueuedEffectLevel := observer.level + } +} + +// Add computed to pending if not already there +let addComputedToPending = (subs: Signals__Core.subs): unit => { + if !Signals__Core.isSubsPending(subs) { + Signals__Core.setSubsPending(subs) + let lengthBefore = pendingComputedSubs->Array.length + if lengthBefore == 0 { + pendingComputedNeedsSort := false + } else if subs.level < lastEnqueuedComputedLevel.contents { + pendingComputedNeedsSort := true + } + pendingComputedSubs->Array.push(subs)->ignore + lastEnqueuedComputedLevel := subs.level + } +} + +// Track a dependency from a computed (subs tracks subs) +let trackDepFromComputed = (computedSubs: Signals__Core.subs, sourceSubs: Signals__Core.subs): unit => { + let computedObserver: Signals__Core.observer = Obj.magic(computedSubs) + + if computedSubs.firstDep === None { + let newLink: Signals__Core.link = Signals__Core.makeLink(sourceSubs, computedObserver) + newLink.lastTrackedVersion = currentTrackingVersion.contents + Signals__Core.linkToSubsDeps(computedSubs, newLink) + Signals__Core.linkToSubs(sourceSubs, newLink) + currentComputedDepCursor := Some(newLink) + } else { + let currentVersion = currentTrackingVersion.contents + // Fast path: reuse run cursor or cursor.next to avoid scanning in common cases. + let fastPathFound = ref(false) + switch currentComputedDepCursor.contents { + | Some(cursor) => + if cursor.subs === sourceSubs && cursor.observer === computedObserver { + cursor.lastTrackedVersion = currentVersion + fastPathFound.contents = true + } else { + switch cursor.nextDep { + | Some(nextDep) => + if nextDep.subs === sourceSubs && nextDep.observer === computedObserver { + nextDep.lastTrackedVersion = currentVersion + currentComputedDepCursor := Some(nextDep) + fastPathFound.contents = true + } + | None => () + } + } + | None => () + } + + if !fastPathFound.contents { + switch sourceSubs.last { + | Some(lastSubLink) => + if lastSubLink.lastTrackedVersion === currentVersion && lastSubLink.observer === computedObserver { + lastSubLink.lastTrackedVersion = currentVersion + currentComputedDepCursor := Some(lastSubLink) + fastPathFound.contents = true + } + | None => () + } + } + + if !fastPathFound.contents { + // Fall back to full scan + let found = ref(false) + let foundLink: ref> = ref(None) + let link = ref(computedSubs.firstDep) + while link.contents !== None && !found.contents { + switch link.contents { + | Some(l) => + if l.subs === sourceSubs { + l.lastTrackedVersion = currentVersion + foundLink := Some(l) + found := true + } else { + link := l.nextDep + } + | None => () + } + } + + // Create new link only if not found + if !found.contents { + let newLink: Signals__Core.link = Signals__Core.makeLink(sourceSubs, computedObserver) + newLink.lastTrackedVersion = currentVersion + Signals__Core.linkToSubsDeps(computedSubs, newLink) + Signals__Core.linkToSubs(sourceSubs, newLink) + currentComputedDepCursor := Some(newLink) + } else { + currentComputedDepCursor := foundLink.contents + } + } + } +} + +// Track a dependency from an effect (observer tracks subs) +// Uses version-based duplicate detection within a run cycle +let trackDepFromEffect = (observer: Signals__Core.observer, sourceSubs: Signals__Core.subs): unit => { + if observer.firstDep === None { + let newLink: Signals__Core.link = Signals__Core.makeLink(sourceSubs, observer) + newLink.lastTrackedVersion = currentTrackingVersion.contents + Signals__Core.linkToDeps(observer, newLink) + Signals__Core.linkToSubs(sourceSubs, newLink) + currentObserverDepCursor := Some(newLink) + } else { + let currentVersion = currentTrackingVersion.contents + // Fast path: reuse run cursor or cursor.next to avoid scanning in common cases. + let fastPathFound = ref(false) + switch currentObserverDepCursor.contents { + | Some(cursor) => + if cursor.subs === sourceSubs && cursor.observer === observer { + cursor.lastTrackedVersion = currentVersion + fastPathFound.contents = true + } else { + switch cursor.nextDep { + | Some(nextDep) => + if nextDep.subs === sourceSubs && nextDep.observer === observer { + nextDep.lastTrackedVersion = currentVersion + currentObserverDepCursor := Some(nextDep) + fastPathFound.contents = true + } + | None => () + } + } + | None => () + } + + if !fastPathFound.contents { + switch sourceSubs.last { + | Some(lastSubLink) => + if lastSubLink.lastTrackedVersion === currentVersion && lastSubLink.observer === observer { + lastSubLink.lastTrackedVersion = currentVersion + currentObserverDepCursor := Some(lastSubLink) + fastPathFound.contents = true + } + | None => () + } + } + + if !fastPathFound.contents { + let found = ref(false) + let foundLink: ref> = ref(None) + let link = ref(observer.firstDep) + while link.contents !== None && !found.contents { + switch link.contents { + | Some(l) => + if l.subs === sourceSubs { + l.lastTrackedVersion = currentVersion + foundLink := Some(l) + found := true + } else { + link := l.nextDep + } + | None => () + } + } + + // Create new link only if not found + if !found.contents { + let newLink: Signals__Core.link = Signals__Core.makeLink(sourceSubs, observer) + newLink.lastTrackedVersion = currentVersion + Signals__Core.linkToDeps(observer, newLink) + Signals__Core.linkToSubs(sourceSubs, newLink) + currentObserverDepCursor := Some(newLink) + } else { + currentObserverDepCursor := foundLink.contents + } + } + } +} + +// Track dependency - routes to appropriate function based on current context +let trackDep = (subs: Signals__Core.subs): unit => { + switch currentComputedSubs.contents { + | Some(computedSubs) => trackDepFromComputed(computedSubs, subs) + | None => + switch currentObserver.contents { + | Some(observer) => trackDepFromEffect(observer, subs) + | None => () + } + } +} + +// Compare by level for sorting +let compareEffectsByLevel = (a: Signals__Core.observer, b: Signals__Core.observer): float => { + Int.toFloat(a.level - b.level) +} + +let compareSubsByLevel = (a: Signals__Core.subs, b: Signals__Core.subs): float => { + Int.toFloat(a.level - b.level) +} + +// Compute level for a computed (based on its dependencies) +let computeSubsLevel = (s: Signals__Core.subs): int => { + let maxLevel = ref(0) + let link = ref(s.firstDep) + while link.contents !== None { + switch link.contents { + | Some(l) => + // Check if the source is a computed + if Signals__Core.isComputed(l.subs) { + if l.subs.level > maxLevel.contents { + maxLevel := l.subs.level + } + } + link := l.nextDep + | None => () + } + } + maxLevel.contents + 1 +} + +// Compute level for an effect +let computeLevel = (observer: Signals__Core.observer): int => { + let maxLevel = ref(0) + let link = ref(observer.firstDep) + while link.contents !== None { + switch link.contents { + | Some(l) => + if Signals__Core.isComputed(l.subs) { + if l.subs.level > maxLevel.contents { + maxLevel := l.subs.level + } + } + link := l.nextDep + | None => () + } + } + maxLevel.contents + 1 +} + +// Run one computed recompute cycle with link reuse. +let runComputedCycle = (subs: Signals__Core.subs, ~clearPending: bool): unit => { + let previousTrackingVersion = currentTrackingVersion.contents + let previousVersion = subs.version + + // Increment tracking version for this cycle + Signals__Core.trackingVersion := Signals__Core.trackingVersion.contents + 1 + currentTrackingVersion.contents = Signals__Core.trackingVersion.contents + + // DON'T clear deps - we'll reuse existing links + if clearPending { + Signals__Core.clearSubsPending(subs) + } + + let prev = currentComputedSubs.contents + let prevCursor = currentComputedDepCursor.contents + currentComputedSubs := Some(subs) + currentComputedDepCursor := subs.firstDep + + try { + switch subs.compute { + | Some(compute) => compute() + | None => () + } + + // After compute: unlink stale deps (version != current) + let link = ref(subs.firstDep) + while link.contents !== None { + switch link.contents { + | Some(l) => + let next = l.nextDep + if l.lastTrackedVersion !== currentTrackingVersion.contents { + // Stale - unlink from source's subscriber list and our dep list + Signals__Core.unlinkFromSubs(l) + Signals__Core.unlinkFromSubsDeps(subs, l) + } + link := next + | None => () + } + } + + Signals__Core.clearSubsDirty(subs) + subs.lastGlobalVersion = Signals__Core.globalVersion.contents + + // Propagate only when computed output changed. + if subs.first !== None && subs.version !== previousVersion { + let subLink = ref(subs.first) + while subLink.contents !== None { + switch subLink.contents { + | Some(l) => + let linkedSubs = (Obj.magic(l.observer): Signals__Core.subs) + if Signals__Core.isComputed(linkedSubs) { + // Mark downstream computed dirty (lazy propagation). + Signals__Core.setSubsDirty(linkedSubs) + } else { + // Effects get queued for execution unless this effect is already running. + switch currentObserver.contents { + | Some(currentObserver) => + if currentObserver !== l.observer { + addEffectToPending(l.observer) + } + | None => addEffectToPending(l.observer) + } + } + subLink := l.nextSub + | None => () + } + } + } + + currentComputedSubs := prev + currentComputedDepCursor := prevCursor + currentTrackingVersion.contents = previousTrackingVersion + } catch { + | exn => + currentComputedSubs := prev + currentComputedDepCursor := prevCursor + currentTrackingVersion.contents = previousTrackingVersion + throw(exn) + } +} + +// Retrack a computed (recompute with link reuse) +let retrackComputed = (s: Signals__Core.subs): unit => { + let oldLevel = s.level + runComputedCycle(s, ~clearPending=true) + + if oldLevel == 0 { + s.level = computeSubsLevel(s) + } +} + +// Retrack an effect (with link reuse) +let retrackEffect = (observer: Signals__Core.observer): unit => { + let oldLevel = observer.level + let previousTrackingVersion = currentTrackingVersion.contents + + // Increment tracking version for this cycle + Signals__Core.trackingVersion := Signals__Core.trackingVersion.contents + 1 + currentTrackingVersion.contents = Signals__Core.trackingVersion.contents + + // DON'T clear deps - we'll reuse existing links + Signals__Core.clearPending(observer) + + let prev = currentObserver.contents + let prevCursor = currentObserverDepCursor.contents + currentObserver := Some(observer) + currentObserverDepCursor := observer.firstDep + + try { + observer.run() + + // After run: unlink stale deps (version != current) + let link = ref(observer.firstDep) + while link.contents !== None { + switch link.contents { + | Some(l) => + let next = l.nextDep + if l.lastTrackedVersion !== currentTrackingVersion.contents { + // Stale - unlink from source's subscriber list and our dep list + Signals__Core.unlinkFromSubs(l) + Signals__Core.unlinkFromDeps(observer, l) + } + link := next + | None => () + } + } + + Signals__Core.clearDirty(observer) + currentObserver := prev + currentObserverDepCursor := prevCursor + currentTrackingVersion.contents = previousTrackingVersion + } catch { + | exn => + currentObserver := prev + currentObserverDepCursor := prevCursor + currentTrackingVersion.contents = previousTrackingVersion + throw(exn) + } + + if oldLevel == 0 { + observer.level = computeLevel(observer) + } +} + +// Flush pending observers +let flush = (): unit => { + flushing := true + + try { + while pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0 { + // Process computeds first (they might trigger more effects) + if pendingComputedSubs->Array.length > 0 { + let computedsLength = pendingComputedSubs->Array.length + if computedsLength > 1 && pendingComputedNeedsSort.contents { + // Sort by level + pendingComputedSubs->Array.sort(compareSubsByLevel)->ignore + pendingComputedNeedsSort := false + } + let i = ref(0) + while i.contents < computedsLength { + switch pendingComputedSubs->Array.get(i.contents) { + | Some(subs) => retrackComputed(subs) + | None => () + } + i := i.contents + 1 + } + drainProcessedPrefix(pendingComputedSubs, computedsLength) + if pendingComputedSubs->Array.length == 0 { + pendingComputedNeedsSort := false + } + } + + // Then process effects + if pendingEffects->Array.length > 0 { + let effectsLength = pendingEffects->Array.length + if effectsLength > 1 && pendingEffectsNeedsSort.contents { + pendingEffects->Array.sort(compareEffectsByLevel)->ignore + pendingEffectsNeedsSort := false + } + let i = ref(0) + while i.contents < effectsLength { + switch pendingEffects->Array.get(i.contents) { + | Some(effect) => retrackEffect(effect) + | None => () + } + i := i.contents + 1 + } + drainProcessedPrefix(pendingEffects, effectsLength) + if pendingEffects->Array.length == 0 { + pendingEffectsNeedsSort := false + } + } + } + + flushing := false + } catch { + | exn => + flushing := false + throw(exn) + } +} + +// Notify all subscribers of a signal (traverse linked list) +// Marks computeds dirty transitively. +// Direct effects are queued immediately. +// Effects reached through dirty computeds are deferred until parent computed recompute. +let notifySubs = (subs: Signals__Core.subs): unit => { + // Fast path: no subscribers, nothing to notify. + if subs.first === None { + () + } else if !Signals__Core.isComputed(subs) && subs.computedSubscriberCount == 0 { + // Fast path for plain signals with direct effect subscribers only. + let link = ref(subs.first) + while link.contents !== None { + switch link.contents { + | Some(l) => + addEffectToPending(l.observer) + link := l.nextSub + | None => () + } + } + } else { + dirtyQueue->Array.push(subs)->ignore + + let i = ref(0) + while i.contents < dirtyQueue->Array.length { + let currentSubs = dirtyQueue->Array.get(i.contents) + i := i.contents + 1 + switch currentSubs { + | None => () + | Some(s) => + let link = ref(s.first) + while link.contents !== None { + switch link.contents { + | Some(l) => + // The observer field might be a real observer (effect) or a subs (computed) + let linkedSubs = (Obj.magic(l.observer): Signals__Core.subs) + if Signals__Core.isComputed(linkedSubs) { + // It's a computed - mark dirty and propagate transitively + if !Signals__Core.isSubsDirty(linkedSubs) { + Signals__Core.setSubsDirty(linkedSubs) + dirtyQueue->Array.push(linkedSubs)->ignore + } + } else { + // It's an effect. + // If reached via a dirty computed, defer effect until computed recompute. + // This lets computed equality short-circuit downstream effect runs. + if Signals__Core.isComputed(s) { + if s.deferEffectsUntilRecompute { + addComputedToPending(s) + } else { + addEffectToPending(l.observer) + } + } else { + addEffectToPending(l.observer) + } + } + link := l.nextSub + | None => () + } + } + } + } + clearArray(dirtyQueue) + } + + if (pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0) && !flushing.contents { + flush() + } +} + +// Ensure a computed signal is fresh before reading (with link reuse) +let ensureComputedFresh = (subs: Signals__Core.subs): unit => { + if Signals__Core.isComputed(subs) { + if Signals__Core.isSubsDirty(subs) { + // Dirty without a newer global write means stale dirty flag only. + if subs.lastGlobalVersion === Signals__Core.globalVersion.contents { + Signals__Core.clearSubsDirty(subs) + } else { + let oldLevel = subs.level + runComputedCycle(subs, ~clearPending=false) + + if oldLevel == 0 { + subs.level = computeSubsLevel(subs) + } + } + } + } +} + +// Schedule an effect for execution +let schedule = (observer: Signals__Core.observer): unit => { + addEffectToPending(observer) + if !flushing.contents { + flush() + } +} + +// Batch multiple updates +let batch = fn => { + let wasFlushing = flushing.contents + flushing := true + + try { + let result = fn() + if !wasFlushing { + flushing := false + if pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0 { + flush() + } + } + result + } catch { + | exn => + if !wasFlushing { + flushing := false + } + throw(exn) + } +} + +// Execute without tracking dependencies +let untrack = (fn: unit => 'a): 'a => { + let prevComputed = currentComputedSubs.contents + let prevObserver = currentObserver.contents + let prevComputedCursor = currentComputedDepCursor.contents + let prevObserverCursor = currentObserverDepCursor.contents + currentComputedSubs := None + currentObserver := None + currentComputedDepCursor := None + currentObserverDepCursor := None + try { + let result = fn() + currentComputedSubs := prevComputed + currentObserver := prevObserver + currentComputedDepCursor := prevComputedCursor + currentObserverDepCursor := prevObserverCursor + result + } catch { + | exn => + currentComputedSubs := prevComputed + currentObserver := prevObserver + currentComputedDepCursor := prevComputedCursor + currentObserverDepCursor := prevObserverCursor + throw(exn) + } +} diff --git a/packages/rescript-signals/src/signals/Signal.res b/packages/rescript-signals/src/signals/Signals__Signal.res similarity index 73% rename from packages/rescript-signals/src/signals/Signal.res rename to packages/rescript-signals/src/signals/Signals__Signal.res index b7f8a84..e952b56 100644 --- a/packages/rescript-signals/src/signals/Signal.res +++ b/packages/rescript-signals/src/signals/Signals__Signal.res @@ -4,7 +4,7 @@ type t<'a> = { equals: ('a, 'a) => bool, name: option, // Subscriber linked list (replaces signalObservers Map lookup) - subs: Core.subs, + subs: Signals__Core.subs, } let defaultEquals = (a: 'a, b: 'a): bool => a === b @@ -13,7 +13,7 @@ let neverEquals: ('a, 'a) => bool = (_a, _b) => false let make = (initialValue: 'a, ~name: option=?, ~equals: option<('a, 'a) => bool>=?): t< 'a, > => { - let id = Id.make() + let id = Signals__Id.make() let equalsFn = switch equals { | Some(eq) => eq | None => defaultEquals @@ -24,35 +24,35 @@ let make = (initialValue: 'a, ~name: option=?, ~equals: option<('a, 'a) value: initialValue, equals: equalsFn, name, - subs: Core.makeSubs(), + subs: Signals__Core.makeSubs(), } } // Optimized signal creation for computed backing signals (no equals check needed) let makeForComputed = (initialValue: 'a, ~name: option=?): t<'a> => { - let id = Id.make() + let id = Signals__Id.make() { id, value: initialValue, equals: neverEquals, // Computeds always check freshness via dirty flag name, - subs: Core.makeSubs(), + subs: Signals__Core.makeSubs(), } } // Optimized get - inlined hot path checks let get = (signal: t<'a>): 'a => { // Ensure computed is fresh - Scheduler.ensureComputedFresh(signal.subs) + Signals__Scheduler.ensureComputedFresh(signal.subs) // Track dependency if we're inside a computed or effect - Scheduler.trackDep(signal.subs) + Signals__Scheduler.trackDep(signal.subs) signal.value } let peek = (signal: t<'a>): 'a => { - Scheduler.ensureComputedFresh(signal.subs) + Signals__Scheduler.ensureComputedFresh(signal.subs) signal.value } @@ -66,13 +66,13 @@ let set = (signal: t<'a>, newValue: 'a): unit => { if shouldUpdate { signal.value = newValue signal.subs.version = signal.subs.version + 1 - Core.globalVersion := Core.globalVersion.contents + 1 - Scheduler.notifySubs(signal.subs) + Signals__Core.globalVersion := Signals__Core.globalVersion.contents + 1 + Signals__Scheduler.notifySubs(signal.subs) } } let update = (signal: t<'a>, fn: 'a => 'a): unit => signal->set(fn(signal.value)) -let batch = Scheduler.batch +let batch = Signals__Scheduler.batch -let untrack = Scheduler.untrack +let untrack = Signals__Scheduler.untrack diff --git a/scripts/ci/benchmark-pr-vs-frameworks.mjs b/scripts/ci/benchmark-pr-vs-frameworks.mjs index 0fa14d3..e6c8dd3 100644 --- a/scripts/ci/benchmark-pr-vs-frameworks.mjs +++ b/scripts/ci/benchmark-pr-vs-frameworks.mjs @@ -125,16 +125,43 @@ function createReScriptFramework(name, modules) { function resolveSignalsDir(baseDir) { const candidates = [ baseDir, + resolve(baseDir, "src"), resolve(baseDir, "src/signals"), resolve(baseDir, "lib/bs/src/signals"), ]; for (const candidate of candidates) { + const bundledEntryFile = resolve(candidate, "Signals.res.mjs"); + if (existsSync(bundledEntryFile)) { + return { + type: "entry", + file: bundledEntryFile, + }; + } + const signalFile = resolve(candidate, "Signal.res.mjs"); const computedFile = resolve(candidate, "Computed.res.mjs"); const effectFile = resolve(candidate, "Effect.res.mjs"); if (existsSync(signalFile) && existsSync(computedFile) && existsSync(effectFile)) { - return candidate; + return { + type: "split", + dir: candidate, + }; + } + + const prefixedSignalFile = resolve(candidate, "Signals__Signal.res.mjs"); + const prefixedComputedFile = resolve(candidate, "Signals__Computed.res.mjs"); + const prefixedEffectFile = resolve(candidate, "Signals__Effect.res.mjs"); + if ( + existsSync(prefixedSignalFile) && + existsSync(prefixedComputedFile) && + existsSync(prefixedEffectFile) + ) { + return { + type: "split", + dir: candidate, + prefixed: true, + }; } } @@ -143,10 +170,22 @@ function resolveSignalsDir(baseDir) { async function importSignalModules(baseDir) { const signalsDir = resolveSignalsDir(baseDir); + + if (signalsDir.type === "entry") { + const modules = await import(pathToFileURL(signalsDir.file).href); + return { + Signal: modules.Signal, + Computed: modules.Computed, + Effect: modules.Effect, + }; + } + + const prefix = signalsDir.prefixed ? "Signals__" : ""; + const dir = signalsDir.dir; return { - Signal: await import(pathToFileURL(resolve(signalsDir, "Signal.res.mjs")).href), - Computed: await import(pathToFileURL(resolve(signalsDir, "Computed.res.mjs")).href), - Effect: await import(pathToFileURL(resolve(signalsDir, "Effect.res.mjs")).href), + Signal: await import(pathToFileURL(resolve(dir, `${prefix}Signal.res.mjs`)).href), + Computed: await import(pathToFileURL(resolve(dir, `${prefix}Computed.res.mjs`)).href), + Effect: await import(pathToFileURL(resolve(dir, `${prefix}Effect.res.mjs`)).href), }; } diff --git a/scripts/ci/benchmark-pr-vs-main.mjs b/scripts/ci/benchmark-pr-vs-main.mjs index 2d2899f..55f345b 100644 --- a/scripts/ci/benchmark-pr-vs-main.mjs +++ b/scripts/ci/benchmark-pr-vs-main.mjs @@ -126,16 +126,43 @@ function createReScriptFramework(name, modules) { function resolveSignalsDir(baseDir, label) { const candidates = [ baseDir, + resolve(baseDir, "src"), resolve(baseDir, "src/signals"), resolve(baseDir, "lib/bs/src/signals"), ]; for (const candidate of candidates) { + const bundledEntryFile = resolve(candidate, "Signals.res.mjs"); + if (existsSync(bundledEntryFile)) { + return { + type: "entry", + file: bundledEntryFile, + }; + } + const signalFile = resolve(candidate, "Signal.res.mjs"); const computedFile = resolve(candidate, "Computed.res.mjs"); const effectFile = resolve(candidate, "Effect.res.mjs"); if (existsSync(signalFile) && existsSync(computedFile) && existsSync(effectFile)) { - return candidate; + return { + type: "split", + dir: candidate, + }; + } + + const prefixedSignalFile = resolve(candidate, "Signals__Signal.res.mjs"); + const prefixedComputedFile = resolve(candidate, "Signals__Computed.res.mjs"); + const prefixedEffectFile = resolve(candidate, "Signals__Effect.res.mjs"); + if ( + existsSync(prefixedSignalFile) && + existsSync(prefixedComputedFile) && + existsSync(prefixedEffectFile) + ) { + return { + type: "split", + dir: candidate, + prefixed: true, + }; } } @@ -146,10 +173,24 @@ function resolveSignalsDir(baseDir, label) { async function importSignalModules(signalsDir, label) { const resolvedSignalsDir = resolveSignalsDir(signalsDir, label); + + if (resolvedSignalsDir.type === "entry") { + const modules = await import(pathToFileURL(resolvedSignalsDir.file).href); + return { + Signal: modules.Signal, + Computed: modules.Computed, + Effect: modules.Effect, + }; + } + + const dir = resolvedSignalsDir.dir; + const isPrefixed = resolvedSignalsDir.prefixed; + const prefix = isPrefixed ? "Signals__" : ""; + return { - Signal: await import(pathToFileURL(resolve(resolvedSignalsDir, "Signal.res.mjs")).href), - Computed: await import(pathToFileURL(resolve(resolvedSignalsDir, "Computed.res.mjs")).href), - Effect: await import(pathToFileURL(resolve(resolvedSignalsDir, "Effect.res.mjs")).href), + Signal: await import(pathToFileURL(resolve(dir, `${prefix}Signal.res.mjs`)).href), + Computed: await import(pathToFileURL(resolve(dir, `${prefix}Computed.res.mjs`)).href), + Effect: await import(pathToFileURL(resolve(dir, `${prefix}Effect.res.mjs`)).href), }; }