From 852ef91a15616276f89a51b709339d701bc30109 Mon Sep 17 00:00:00 2001 From: AaronPlave Date: Fri, 8 May 2026 13:11:02 -0700 Subject: [PATCH 1/4] Steam simulation results via subscription streams. May refactor to fetch instead to reduce complexity and brittleness. --- e2e-tests/tests/simulation.test.ts | 20 ++ src/components/timeline/Row.svelte | 160 ++++----- src/components/timeline/TimelinePanel.svelte | 10 +- .../timeline/TimelineStatusIndicator.svelte | 71 ++++ src/enums/gql.ts | 1 + src/stores/profile.test.ts | 239 +++++++++++++ src/stores/profile.ts | 318 ++++++++++++++++++ src/types/simulation.ts | 4 +- src/utilities/effects.ts | 21 +- src/utilities/gql.ts | 32 ++ src/utilities/permissions.ts | 2 + src/utilities/profile.test.ts | 56 +++ src/utilities/profile.ts | 47 +++ src/utilities/resources.test.ts | 165 ++++++++- 14 files changed, 1043 insertions(+), 103 deletions(-) create mode 100644 src/components/timeline/TimelineStatusIndicator.svelte create mode 100644 src/stores/profile.test.ts create mode 100644 src/stores/profile.ts create mode 100644 src/utilities/profile.test.ts create mode 100644 src/utilities/profile.ts diff --git a/e2e-tests/tests/simulation.test.ts b/e2e-tests/tests/simulation.test.ts index ca648aa331..b1d60004b9 100644 --- a/e2e-tests/tests/simulation.test.ts +++ b/e2e-tests/tests/simulation.test.ts @@ -70,4 +70,24 @@ test.describe.serial('Simulation', async () => { await setup.plan.addActivity(); await setup.plan.waitForSimulationStatus(Status.Modified); }); + + // Catches gross regressions in the streaming-profile pipeline that the unit + // tests can't see: actually exercises real Hasura cursors / interval ordering + // / WS reconnects via graphql-ws across a real sim → resimulate cycle, then + // asserts the global timeline status indicator never enters its error state + // and the timeline panel stays mounted. The "blank plot after resimulate" + // bug specifically isn't directly assertable here without canvas-pixel + // inspection or test hooks (deliberately avoided) — that's covered by the + // resimulate-fast unit test in src/stores/profile.test.ts. This e2e is a + // smoke test that the pipeline doesn't broadly fall over on real backend. + test(`Streaming pipeline survives a sim → resimulate cycle without timeline errors`, async () => { + const timelineErrorIndicator = setup.plan.page.getByRole('status', { name: 'Timeline data error' }); + + await setup.plan.runSimulation(); + await expect(timelineErrorIndicator).not.toBeVisible(); + + await setup.plan.reRunSimulation(); + await expect(timelineErrorIndicator).not.toBeVisible(); + await expect(setup.plan.panelTimeline).toBeVisible(); + }); }); diff --git a/src/components/timeline/Row.svelte b/src/components/timeline/Row.svelte index 4c8ab3eac7..2e12fd3813 100644 --- a/src/components/timeline/Row.svelte +++ b/src/components/timeline/Row.svelte @@ -4,18 +4,17 @@ import type { ScaleTime } from 'd3-scale'; import { select, type Selection } from 'd3-selection'; import { zoom as d3Zoom, zoomIdentity, type D3ZoomEvent, type ZoomBehavior, type ZoomTransform } from 'd3-zoom'; - import { createEventDispatcher } from 'svelte'; + import { createEventDispatcher, onDestroy } from 'svelte'; import FilterWithXIcon from '../../assets/filter-with-x.svg?component'; import { ViewDefaultDiscreteOptions } from '../../constants/view'; - import { Status } from '../../enums/status'; import { activityArgumentDefaultsMap } from '../../stores/activities'; - import { catchError, logMessage } from '../../stores/errors'; import { derivationGroupVisibilityMap, externalSources, planDerivationGroupLinks, } from '../../stores/external-source'; import { planModelActivityTypes } from '../../stores/plan'; + import { createProfileSubscription } from '../../stores/profile'; import { externalResources, fetchingResourcesExternal, @@ -66,8 +65,6 @@ import { getExternalEventRowId } from '../../utilities/externalEvents'; import { classNames } from '../../utilities/generic'; import { showConfirmActivityCreationModal } from '../../utilities/modal'; - import { sampleProfiles } from '../../utilities/resources'; - import { getSimulationStatus } from '../../utilities/simulation'; import { pluralize } from '../../utilities/text'; import { getDoyTime } from '../../utilities/time'; import { @@ -240,112 +237,83 @@ value.simulationDatasetId !== simulationDatasetId || (value.type === 'external' && !$resourceTypes.find(type => type.name === name)) ) { - value.controller?.abort(); + value.unsubscribe?.(); delete resourceRequestMap[key]; resourceRequestMap = { ...resourceRequestMap }; } }); - // Only update if simulation is complete - if ( - getSimulationStatus(simulationDataset) === Status.Complete || - getSimulationStatus(simulationDataset) === Status.Canceled - ) { - const startTimeYmd = simulationDataset?.simulation_start_time ?? plan.start_time; - resourceNames.forEach(async name => { - // Check if resource is external - const isExternal = !$resourceTypes.find(type => type.name === name); - if (isExternal) { - // Handle external datasets separately as they are globally loaded and subscribed to - let resource = null; - if (!$fetchingResourcesExternal) { - resource = $externalResources.find(resource => resource.name === name) || null; - } - let error = !resource && !$fetchingResourcesExternal ? 'External Profile not Found' : ''; + const startTimeYmd = simulationDataset?.simulation_start_time ?? plan.start_time; + resourceNames.forEach(name => { + // Check if resource is external + const isExternal = !$resourceTypes.find(type => type.name === name); + if (isExternal) { + // Handle external datasets separately as they are globally loaded and subscribed to + let resource = null; + if (!$fetchingResourcesExternal) { + resource = $externalResources.find(resource => resource.name === name) || null; + } + let error = !resource && !$fetchingResourcesExternal ? 'External Profile not Found' : ''; + resourceRequestMap = { + ...resourceRequestMap, + [name]: { + ...resourceRequestMap[name], + error, + loading: $fetchingResourcesExternal, + resource, + simulationDatasetId, + type: 'external', + }, + }; + } else { + // Skip if a profile subscription already exists for this (name, simulationDatasetId) + if ( + resourceRequestMap[name] && + simulationDatasetId === resourceRequestMap[name].simulationDatasetId && + resourceRequestMap[name].unsubscribe + ) { + return; + } + // The sub watches simulationDataset.status itself to drive its + // closing-value logic and stream-sub lifecycle: while the sim is + // running it terminates at the last received segment; once status + // goes terminal it honours header.duration so lines close to plan + // end. No coordination needed from this caller. + const subscription = createProfileSubscription(simulationDatasetId, name, startTimeYmd, user); + const storeUnsubscribe = subscription.store.subscribe(({ error, loading, resource }) => { resourceRequestMap = { ...resourceRequestMap, [name]: { ...resourceRequestMap[name], error, - loading: $fetchingResourcesExternal, + loading, resource, simulationDatasetId, - type: 'external', - }, - }; - } else { - // Skip matching resources requests that have already been added for this simulation - if ( - resourceRequestMap[name] && - simulationDatasetId === resourceRequestMap[name].simulationDatasetId && - (resourceRequestMap[name].loading || resourceRequestMap[name].error || resourceRequestMap[name].resource) - ) { - return; - } - - const controller = new AbortController(); - resourceRequestMap = { - ...resourceRequestMap, - [name]: { - ...resourceRequestMap[name], - controller, - error: '', - loading: true, - resource: null, - simulationDatasetId, type: 'internal', + unsubscribe: () => { + storeUnsubscribe(); + subscription.unsubscribe(); + }, }, }; - - let resource = null; - let error = ''; - let aborted = false; - try { - const startTime = performance.now(); - const response = await effects.getResource(simulationDatasetId, name, user, controller.signal); - const { profile } = response; - if (profile && profile.length === 1) { - resource = sampleProfiles([profile[0]], startTimeYmd)[0]; - logMessage( - `Retrieved profile ${name} (${profile[0].profile_segments.length} segment${pluralize(profile[0].profile_segments.length)}) for simulation ${simulationDatasetId}.`, - '', - performance.now() - startTime, - ); - } else { - throw new Error('Profile not Found'); - } - } catch (e) { - const err = e as Error; - if (err.name === 'AbortError') { - aborted = true; - } else { - catchError(`Profile Download Failed for ${name}`, e as Error); - error = err.message; - } - } finally { - if (!aborted) { - resourceRequestMap = { - ...resourceRequestMap, - [name]: { - ...resourceRequestMap[name], - error, - loading: false, - resource, - }, - }; - } - } - } - }); - } + }); + } + }); } else if (simulationDataset === null) { Object.entries(resourceRequestMap).forEach(([_key, value]) => { - value.controller?.abort(); + value.unsubscribe?.(); }); resourceRequestMap = {}; } + onDestroy(() => { + Object.values(resourceRequestMap).forEach(value => { + value.unsubscribe?.(); + }); + resourceRequestMap = {}; + }); + $: onDragenter(dragenter); $: onDragleave(dragleave); $: onDragover(dragover); @@ -384,6 +352,7 @@ $: if (resourceRequestMap) { const newLoadedResources: Resource[] = []; const newLoadingErrors: string[] = []; + let anyLoading = false; Object.values(resourceRequestMap).forEach(resourceRequest => { if (resourceRequest.resource) { newLoadedResources.push(resourceRequest.resource); @@ -391,14 +360,17 @@ if (resourceRequest.error) { newLoadingErrors.push(resourceRequest.error); } + if (resourceRequest.loading) { + anyLoading = true; + } }); loadedResources = newLoadedResources; resourceLoadingErrors = newLoadingErrors; - - // Consider row to be loading if the number of completed resource requests (loaded or error state) - // is not equal to the total number of resource requests - anyResourcesLoading = - loadedResources.length + resourceLoadingErrors.length !== Object.keys(resourceRequestMap).length; + // Use the per-request loading flag rather than counting loaded+errored vs + // total. A request with both data (e.g. from prefetch) and a streaming + // error would otherwise be double-counted, leaving anyResourcesLoading + // perpetually true and overlapping the loading + error indicators. + anyResourcesLoading = anyLoading; } // Compute scale domains for axes since it is optionally defined in the view diff --git a/src/components/timeline/TimelinePanel.svelte b/src/components/timeline/TimelinePanel.svelte index 482750ca08..42e7d7711d 100644 --- a/src/components/timeline/TimelinePanel.svelte +++ b/src/components/timeline/TimelinePanel.svelte @@ -50,6 +50,7 @@ import Panel from '../ui/Panel.svelte'; import PanelHeaderActions from '../ui/PanelHeaderActions.svelte'; import Timeline from './Timeline.svelte'; + import TimelineStatusIndicator from './TimelineStatusIndicator.svelte'; import TimelineViewControls from './TimelineViewControls.svelte'; export let user: User | null; @@ -181,7 +182,10 @@ -
Timeline
+
+
Timeline
+ +