diff --git a/core/src/Stack.ts b/core/src/Stack.ts index fe2c0dfc2..9aaadf29b 100644 --- a/core/src/Stack.ts +++ b/core/src/Stack.ts @@ -1,4 +1,3 @@ -import type { BaseDomainEvent } from "event-types/_base"; import type { DomainEvent, PoppedEvent, diff --git a/core/src/aggregators/Aggregator.ts b/core/src/aggregators/Aggregator.ts new file mode 100644 index 000000000..5cf3b7912 --- /dev/null +++ b/core/src/aggregators/Aggregator.ts @@ -0,0 +1,11 @@ +import type { Effect } from "../Effect"; +import type { DomainEvent } from "../event-types"; +import type { Stack } from "../Stack"; + +export interface Aggregator { + getStack(): Stack; + dispatchEvent(event: DomainEvent): void; + subscribeChanges: ( + listener: (effects: Effect[], stack: Stack) => void, + ) => () => void; +} diff --git a/core/src/aggregators/SyncAggregator.ts b/core/src/aggregators/SyncAggregator.ts new file mode 100644 index 000000000..0ef904c02 --- /dev/null +++ b/core/src/aggregators/SyncAggregator.ts @@ -0,0 +1,115 @@ +import { SwitchScheduler } from "utils/schedulers/SwitchScheduler"; +import { aggregate } from "../aggregate"; +import type { Effect } from "../Effect"; +import type { DomainEvent } from "../event-types"; +import { produceEffects } from "../produceEffects"; +import { projectToOngoingTransitions } from "../projectToOngoingTransitions"; +import type { Stack } from "../Stack"; +import { delay } from "../utils/delay"; +import { getAbortReason } from "../utils/getAbortReason"; +import type { Publisher } from "../utils/publishers/Publisher"; +import type { Scheduler } from "../utils/schedulers/Scheduler"; +import type { Aggregator } from "./Aggregator"; + +export class SyncAggregator implements Aggregator { + private changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>; + private updateScheduler: SwitchScheduler; + private events: DomainEvent[]; + private latestStackSnapshot: Stack | null; + + private static UpdateOverridedError = + class UpdateOverridedError extends Error { + constructor() { + super("a new update is scheduled"); + } + }; + + constructor(options: { + changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>; + updateScheduler: Scheduler; + initialEvents: DomainEvent[]; + }) { + this.changePublisher = options.changePublisher; + this.updateScheduler = new SwitchScheduler({ + SwitchException: SyncAggregator.UpdateOverridedError, + scheduler: options.updateScheduler, + }); + this.events = options.initialEvents; + this.latestStackSnapshot = null; + } + + getStack(): Stack { + return this.readSnapshot(); + } + + dispatchEvent(event: DomainEvent): void { + this.applyUpdate(() => { + this.events.push(event); + }); + } + + subscribeChanges( + listener: (effects: Effect[], stack: Stack) => void, + ): () => void { + return this.changePublisher.subscribe(({ effects, stack }) => { + listener(effects, stack); + }); + } + + private readSnapshot(): Stack { + if (this.latestStackSnapshot === null) { + this.latestStackSnapshot = aggregate(this.events, Date.now()); + } + + return this.latestStackSnapshot; + } + + private applyUpdate(update: () => void): void { + const previousSnapshot = this.readSnapshot(); + + update(); + this.latestStackSnapshot = aggregate(this.events, Date.now()); + this.flushChanges( + produceEffects(previousSnapshot, this.latestStackSnapshot), + ); + this.scheduleTransitionStateUpdates(); + } + + private flushChanges(effects: Effect[]): void { + if (effects.length === 0) return; + + this.changePublisher.publish({ effects, stack: this.readSnapshot() }); + } + + private scheduleTransitionStateUpdates(): void { + const ongoingTransitions = projectToOngoingTransitions( + this.events, + Date.now(), + ); + const nextToComplete = ongoingTransitions.sort( + (a, b) => a.estimatedTransitionEnd - b.estimatedTransitionEnd, + )[0]; + + if (!nextToComplete) return; + + this.updateScheduler + .schedule(async (options) => { + await delay(nextToComplete.estimatedTransitionEnd - Date.now(), { + signal: options?.signal, + }); + + if (options?.signal?.aborted) throw getAbortReason(options.signal); + + this.applyUpdate(() => {}); + }) + .catch((error) => { + if ( + error instanceof SyncAggregator.UpdateOverridedError || + (error instanceof DOMException && error.name === "AbortError") + ) + return; + + throw error; + }); + } +} diff --git a/core/src/makeCoreStore.ts b/core/src/makeCoreStore.ts index cbdc99aaa..d06bae1a8 100644 --- a/core/src/makeCoreStore.ts +++ b/core/src/makeCoreStore.ts @@ -1,19 +1,16 @@ -import isEqual from "react-fast-compare"; -import { aggregate } from "./aggregate"; +import type { Aggregator } from "./aggregators/Aggregator"; +import { SyncAggregator } from "./aggregators/SyncAggregator"; import type { DomainEvent, PushedEvent, StepPushedEvent } from "./event-types"; import { makeEvent } from "./event-utils"; import type { StackflowActions, StackflowPlugin } from "./interfaces"; -import { produceEffects } from "./produceEffects"; -import type { Stack } from "./Stack"; import { divideBy, once } from "./utils"; +import { Mutex } from "./utils/Mutex"; import { makeActions } from "./utils/makeActions"; +import { ScheduledPublisher } from "./utils/publishers/ScheduledPublisher"; +import { SequentialScheduler } from "./utils/schedulers/SequentialScheduler"; +import { SwitchScheduler } from "./utils/schedulers/SwitchScheduler"; import { triggerPostEffectHooks } from "./utils/triggerPostEffectHooks"; -const SECOND = 1000; - -// 60FPS -const INTERVAL_MS = SECOND / 60; - export type MakeCoreStoreOptions = { initialEvents: DomainEvent[]; initialContext?: any; @@ -76,39 +73,26 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore { options.handlers?.onInitialActivityNotFound?.(); } - const events: { value: DomainEvent[] } = { - value: [...initialRemainingEvents, ...initialPushedEvents], - }; + const aggregator: Aggregator = new SyncAggregator({ + initialEvents: [...initialRemainingEvents, ...initialPushedEvents], + changePublisher: new ScheduledPublisher( + new SequentialScheduler(new Mutex()), + ), + updateScheduler: new SwitchScheduler(new Mutex()), + }); - const stack = { - value: aggregate(events.value, new Date().getTime()), - }; + aggregator.subscribeChanges((effects) => { + triggerPostEffectHooks(effects, pluginInstances, actions); + }); const actions: StackflowActions = { getStack() { - return stack.value; + return aggregator.getStack(); }, dispatchEvent(name, params) { const newEvent = makeEvent(name, params); - const nextStackValue = aggregate( - [...events.value, newEvent], - new Date().getTime(), - ); - - events.value.push(newEvent); - setStackValue(nextStackValue); - const interval = setInterval(() => { - const nextStackValue = aggregate(events.value, new Date().getTime()); - - if (!isEqual(stack.value, nextStackValue)) { - setStackValue(nextStackValue); - } - - if (nextStackValue.globalTransitionState === "idle") { - clearInterval(interval); - } - }, INTERVAL_MS); + aggregator.dispatchEvent(newEvent); }, push: () => {}, replace: () => {}, @@ -120,12 +104,6 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore { resume: () => {}, }; - const setStackValue = (nextStackValue: Stack) => { - const effects = produceEffects(stack.value, nextStackValue); - stack.value = nextStackValue; - triggerPostEffectHooks(effects, pluginInstances, actions); - }; - // Initialize action methods after actions object is fully created Object.assign( actions, @@ -145,7 +123,7 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore { }); }); }), - pullEvents: () => events.value, + pullEvents: () => aggregator.getStack().events, subscribe(listener) { storeListeners.push(listener); diff --git a/core/src/produceEffects.ts b/core/src/produceEffects.ts index 02331b203..45eb356e8 100644 --- a/core/src/produceEffects.ts +++ b/core/src/produceEffects.ts @@ -5,15 +5,15 @@ import type { Stack } from "./Stack"; import { omit } from "./utils"; export function produceEffects(prevOutput: Stack, nextOutput: Stack): Effect[] { - const output: Effect[] = []; - - const somethingChanged = !isEqual(prevOutput, nextOutput); + if (isEqual(prevOutput, nextOutput)) { + return []; + } - if (somethingChanged) { - output.push({ + const output: Effect[] = [ + { _TAG: "%SOMETHING_CHANGED%", - }); - } + }, + ]; const isPaused = prevOutput.globalTransitionState !== "paused" && diff --git a/core/src/utils/Mutex.ts b/core/src/utils/Mutex.ts new file mode 100644 index 000000000..fe1b324a3 --- /dev/null +++ b/core/src/utils/Mutex.ts @@ -0,0 +1,19 @@ +import { SequentialScheduler } from "./schedulers/SequentialScheduler"; + +export class Mutex { + private sequentialScheduler: SequentialScheduler = new SequentialScheduler(); + + acquire(options?: { signal?: AbortSignal }): Promise { + return new Promise((resolve, reject) => { + this.sequentialScheduler + .schedule(() => new Promise((release) => resolve({ release })), { + signal: options?.signal, + }) + .catch(reject); + }); + } +} + +export interface LockHandle { + release: () => void; +} diff --git a/core/src/utils/delay.ts b/core/src/utils/delay.ts new file mode 100644 index 000000000..106d2001a --- /dev/null +++ b/core/src/utils/delay.ts @@ -0,0 +1,27 @@ +import { getAbortReason } from "./getAbortReason"; + +export function delay( + ms: number, + options?: { signal?: AbortSignal }, +): Promise { + return new Promise((resolve, reject) => { + const signal = options?.signal; + + if (signal?.aborted) throw getAbortReason(signal); + + const abortHandler = () => { + if (!signal) return; + + clearTimeout(timeoutId); + reject(getAbortReason(signal)); + }; + const timeoutId = setTimeout(() => { + if (signal?.aborted) abortHandler(); + else resolve(); + + signal?.removeEventListener("abort", abortHandler); + }, ms); + + signal?.addEventListener("abort", abortHandler, { once: true }); + }); +} diff --git a/core/src/utils/getAbortReason.ts b/core/src/utils/getAbortReason.ts new file mode 100644 index 000000000..0a40d7665 --- /dev/null +++ b/core/src/utils/getAbortReason.ts @@ -0,0 +1,7 @@ +export function getAbortReason(signal: AbortSignal): unknown { + if (!signal.aborted) throw new Error("the signal was not aborted"); + + return ( + signal.reason ?? new DOMException("an operation was aborted", "AbortError") + ); +} diff --git a/core/src/utils/publishers/Publisher.ts b/core/src/utils/publishers/Publisher.ts new file mode 100644 index 000000000..fc2150426 --- /dev/null +++ b/core/src/utils/publishers/Publisher.ts @@ -0,0 +1,4 @@ +export interface Publisher { + publish(value: T): void; + subscribe(subscriber: (value: T) => void): () => void; +} diff --git a/core/src/utils/publishers/ScheduledPublisher.ts b/core/src/utils/publishers/ScheduledPublisher.ts new file mode 100644 index 000000000..6d89bfb33 --- /dev/null +++ b/core/src/utils/publishers/ScheduledPublisher.ts @@ -0,0 +1,39 @@ +import type { Scheduler } from "../schedulers/Scheduler"; +import type { Publisher } from "./Publisher"; + +export class ScheduledPublisher implements Publisher { + private scheduler: Scheduler; + private subscribers: ((value: T) => void)[]; + private handlePublishError: (error: unknown, value: T) => void; + + constructor( + scheduler: Scheduler, + options?: { handlePublishError?: (error: unknown, value: T) => void }, + ) { + this.scheduler = scheduler; + this.subscribers = []; + this.handlePublishError = options?.handlePublishError ?? (() => {}); + } + + publish(value: T): void { + const subscribers = this.subscribers.slice(); + + this.scheduler.schedule(async () => { + for (const subscriber of subscribers) { + try { + subscriber(value); + } catch (error) { + this.handlePublishError(error, value); + } + } + }); + } + + subscribe(subscriber: (value: T) => void): () => void { + this.subscribers.push(subscriber); + + return () => { + this.subscribers = this.subscribers.filter((s) => s !== subscriber); + }; + } +} diff --git a/core/src/utils/schedulers/Scheduler.ts b/core/src/utils/schedulers/Scheduler.ts new file mode 100644 index 000000000..3b7c649e7 --- /dev/null +++ b/core/src/utils/schedulers/Scheduler.ts @@ -0,0 +1,6 @@ +export interface Scheduler { + schedule( + task: (options?: { signal?: AbortSignal }) => Promise, + options?: { signal?: AbortSignal }, + ): Promise; +} diff --git a/core/src/utils/schedulers/SequentialScheduler.ts b/core/src/utils/schedulers/SequentialScheduler.ts new file mode 100644 index 000000000..f8f572133 --- /dev/null +++ b/core/src/utils/schedulers/SequentialScheduler.ts @@ -0,0 +1,65 @@ +import { getAbortReason } from "../getAbortReason"; +import type { Scheduler } from "./Scheduler"; + +export class SequentialScheduler implements Scheduler { + private taskRunnerQueue: (() => Promise)[] = []; + private taskRunnerQueueFlushTask: Promise | null = null; + + schedule( + task: (options?: { signal?: AbortSignal }) => Promise, + options?: { signal?: AbortSignal }, + ): Promise { + return new Promise((resolve, reject) => { + const signal = options?.signal; + + if (signal?.aborted) throw getAbortReason(signal); + + const abortHandler = () => { + if (!signal) return; + + this.taskRunnerQueue = this.taskRunnerQueue.filter( + (r) => r !== taskRunner, + ); + + reject(getAbortReason(signal)); + }; + const taskRunner = async () => { + if (signal?.aborted) { + abortHandler(); + + return; + } + + try { + resolve(await task({ signal })); + } catch (error) { + reject(error); + } finally { + signal?.removeEventListener("abort", abortHandler); + } + }; + + this.taskRunnerQueue.push(taskRunner); + + signal?.addEventListener("abort", abortHandler, { once: true }); + + this.flushTaskRunnerQueue(); + }); + } + + private flushTaskRunnerQueue(): void { + if (this.taskRunnerQueueFlushTask) return; + + this.taskRunnerQueueFlushTask = Promise.resolve().then(async () => { + while (this.taskRunnerQueue.length > 0) { + const nextTask = this.taskRunnerQueue.shift(); + + if (!nextTask) break; + + await nextTask(); + } + + this.taskRunnerQueueFlushTask = null; + }); + } +} diff --git a/core/src/utils/schedulers/SwitchScheduler.ts b/core/src/utils/schedulers/SwitchScheduler.ts new file mode 100644 index 000000000..a7aa6c96d --- /dev/null +++ b/core/src/utils/schedulers/SwitchScheduler.ts @@ -0,0 +1,43 @@ +import { getAbortReason } from "../getAbortReason"; +import { sumSignals } from "../sumSignals"; +import type { Scheduler } from "./Scheduler"; + +export class SwitchScheduler implements Scheduler { + private SwitchException: new ( + message?: string, + ) => Error; + private scheduler: Scheduler; + private previousTaskController: AbortController | null = null; + + constructor(options: { + SwitchException: new (message?: string) => Error; + scheduler: Scheduler; + }) { + this.SwitchException = options.SwitchException; + this.scheduler = options.scheduler; + } + + async schedule( + task: (options?: { signal?: AbortSignal }) => Promise, + options?: { signal?: AbortSignal }, + ): Promise { + const controller = new AbortController(); + const signal = options?.signal + ? sumSignals([options.signal, controller.signal]) + : controller.signal; + + if (signal.aborted) throw getAbortReason(signal); + + if (this.previousTaskController) { + this.previousTaskController.abort( + new this.SwitchException("a new task is scheduled"), + ); + } + + this.previousTaskController = controller; + + return await this.scheduler.schedule(task, { + signal, + }); + } +} diff --git a/core/src/utils/sumSignals.ts b/core/src/utils/sumSignals.ts new file mode 100644 index 000000000..25742e3c1 --- /dev/null +++ b/core/src/utils/sumSignals.ts @@ -0,0 +1,33 @@ +import { getAbortReason } from "./getAbortReason"; + +export function sumSignals(signals: Iterable): AbortSignal { + const controller = new AbortController(); + const abortHandlerCleanups: (() => void)[] = []; + + for (const signal of signals) { + if (signal.aborted) { + controller.abort(getAbortReason(signal)); + + for (const cleanup of abortHandlerCleanups) { + cleanup(); + } + + break; + } + + const abortHandler = () => { + controller.abort(getAbortReason(signal)); + + for (const cleanup of abortHandlerCleanups) { + cleanup(); + } + }; + + signal.addEventListener("abort", abortHandler); + abortHandlerCleanups.push(() => + signal.removeEventListener("abort", abortHandler), + ); + } + + return controller.signal; +}