Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/src/Stack.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { BaseDomainEvent } from "event-types/_base";
import type {
DomainEvent,
PoppedEvent,
Expand Down
11 changes: 11 additions & 0 deletions core/src/aggregators/Aggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import type { Effect } from "../Effect";
import type { DomainEvent } from "../event-types";
import type { Stack } from "../Stack";

export interface Aggregator {
getStack(): Stack;
dispatchEvent(event: DomainEvent): void;
subscribeChanges: (
listener: (effects: Effect[], stack: Stack) => void,
) => () => void;
}
115 changes: 115 additions & 0 deletions core/src/aggregators/SyncAggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { head } from "lodash";
import { SwitchScheduler } from "utils/schedulers/SwitchScheduler";
import { aggregate } from "../aggregate";
import type { Effect } from "../Effect";
import type { DomainEvent } from "../event-types";
import { produceEffects } from "../produceEffects";
import { projectToOngoingTransitions } from "../projectToOngoingTransitions";
import type { Stack } from "../Stack";
import { delay } from "../utils/delay";
import { getAbortReason } from "../utils/getAbortReason";
import type { Publisher } from "../utils/publishers/Publisher";
import type { Aggregator } from "./Aggregator";

export class SyncAggregator implements Aggregator {
private changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>;
private updateScheduler: SwitchScheduler;
private updateErrorReporter: (error: unknown) => void;
private events: DomainEvent[];
private latestStackSnapshot: Stack | null;

constructor(options: {
changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>;
updateScheduler: SwitchScheduler;
updateErrorReporter: (error: unknown) => void;
initialEvents: DomainEvent[];
}) {
this.changePublisher = options.changePublisher;
this.updateScheduler = options.updateScheduler;
this.updateErrorReporter = options.updateErrorReporter;
this.events = options.initialEvents;
this.latestStackSnapshot = null;
}

getStack(): Stack {
return this.readSnapshot();
}

dispatchEvent(event: DomainEvent): void {
this.applyUpdate(() => {
this.events.push(event);
});
}

subscribeChanges(
listener: (effects: Effect[], stack: Stack) => void,
): () => void {
return this.changePublisher.subscribe(({ effects, stack }) => {
listener(effects, stack);
});
}

private readSnapshot(): Stack {
if (this.latestStackSnapshot === null) {
this.latestStackSnapshot = aggregate(this.events, Date.now());
}

return this.latestStackSnapshot;
}

private applyUpdate(update: () => void): void {
try {
const previousSnapshot = this.readSnapshot();
const projectionTime = Date.now();

update();
this.latestStackSnapshot = aggregate(this.events, projectionTime);
this.flushChanges(
produceEffects(previousSnapshot, this.latestStackSnapshot),
);
this.scheduleTransitionStateUpdates(projectionTime);
} catch (error) {
this.updateErrorReporter(error);
}
}

private flushChanges(effects: Effect[]): void {
if (effects.length === 0) return;

this.changePublisher.publish({ effects, stack: this.readSnapshot() });
}

private scheduleTransitionStateUpdates(projectionTime: number): void {
const ongoingTransitions = projectToOngoingTransitions(
this.events,
projectionTime,
);
const nextToComplete = head(
ongoingTransitions.sort(
(a, b) => a.estimatedTransitionEnd - b.estimatedTransitionEnd,
),
);

if (!nextToComplete) return;

this.updateScheduler
.schedule(async (options) => {
await delay(nextToComplete.estimatedTransitionEnd - Date.now(), {
signal: options?.signal,
});

if (options?.signal?.aborted) throw getAbortReason(options.signal);

this.applyUpdate(() => {});
})
.catch((error) => {
if (
error instanceof SwitchScheduler.SwitchException ||
(error instanceof DOMException && error.name === "AbortError")
)
return;

this.updateErrorReporter(error);
});
}
}
62 changes: 21 additions & 41 deletions core/src/makeCoreStore.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import isEqual from "react-fast-compare";
import { aggregate } from "./aggregate";
import type { Aggregator } from "./aggregators/Aggregator";
import { SyncAggregator } from "./aggregators/SyncAggregator";
import type { DomainEvent, PushedEvent, StepPushedEvent } from "./event-types";
import { makeEvent } from "./event-utils";
import type { StackflowActions, StackflowPlugin } from "./interfaces";
import { produceEffects } from "./produceEffects";
import type { Stack } from "./Stack";
import { divideBy, once } from "./utils";
import { makeActions } from "./utils/makeActions";
import { ScheduledPublisher } from "./utils/publishers/ScheduledPublisher";
import { SequentialScheduler } from "./utils/schedulers/SequentialScheduler";
import { SwitchScheduler } from "./utils/schedulers/SwitchScheduler";
import { triggerPostEffectHooks } from "./utils/triggerPostEffectHooks";

const SECOND = 1000;

// 60FPS
const INTERVAL_MS = SECOND / 60;

export type MakeCoreStoreOptions = {
initialEvents: DomainEvent[];
initialContext?: any;
Expand Down Expand Up @@ -76,39 +72,29 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
options.handlers?.onInitialActivityNotFound?.();
}

const events: { value: DomainEvent[] } = {
value: [...initialRemainingEvents, ...initialPushedEvents],
};
const aggregator: Aggregator = new SyncAggregator({
changePublisher: new ScheduledPublisher(new SequentialScheduler()),
updateScheduler: new SwitchScheduler({
scheduler: new SequentialScheduler(),
}),
updateErrorReporter: (error) => {
console.error(error);
},
initialEvents: [...initialRemainingEvents, ...initialPushedEvents],
});

const stack = {
value: aggregate(events.value, new Date().getTime()),
};
aggregator.subscribeChanges((effects) => {
triggerPostEffectHooks(effects, pluginInstances, actions);
});

const actions: StackflowActions = {
getStack() {
return stack.value;
return aggregator.getStack();
},
dispatchEvent(name, params) {
const newEvent = makeEvent(name, params);
const nextStackValue = aggregate(
[...events.value, newEvent],
new Date().getTime(),
);

events.value.push(newEvent);
setStackValue(nextStackValue);

const interval = setInterval(() => {
const nextStackValue = aggregate(events.value, new Date().getTime());

if (!isEqual(stack.value, nextStackValue)) {
setStackValue(nextStackValue);
}

if (nextStackValue.globalTransitionState === "idle") {
clearInterval(interval);
}
}, INTERVAL_MS);
aggregator.dispatchEvent(newEvent);
},
push: () => {},
replace: () => {},
Expand All @@ -120,12 +106,6 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
resume: () => {},
};

const setStackValue = (nextStackValue: Stack) => {
const effects = produceEffects(stack.value, nextStackValue);
stack.value = nextStackValue;
triggerPostEffectHooks(effects, pluginInstances, actions);
};

// Initialize action methods after actions object is fully created
Object.assign(
actions,
Expand All @@ -145,7 +125,7 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
});
});
}),
pullEvents: () => events.value,
pullEvents: () => aggregator.getStack().events,
subscribe(listener) {
storeListeners.push(listener);

Expand Down
14 changes: 7 additions & 7 deletions core/src/produceEffects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import type { Stack } from "./Stack";
import { omit } from "./utils";

export function produceEffects(prevOutput: Stack, nextOutput: Stack): Effect[] {
const output: Effect[] = [];

const somethingChanged = !isEqual(prevOutput, nextOutput);
if (isEqual(prevOutput, nextOutput)) {
return [];
}

if (somethingChanged) {
output.push({
const output: Effect[] = [
{
_TAG: "%SOMETHING_CHANGED%",
});
}
},
];

const isPaused =
prevOutput.globalTransitionState !== "paused" &&
Expand Down
19 changes: 19 additions & 0 deletions core/src/utils/Mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { SequentialScheduler } from "./schedulers/SequentialScheduler";

export class Mutex {
private sequentialScheduler: SequentialScheduler = new SequentialScheduler();

acquire(options?: { signal?: AbortSignal }): Promise<LockHandle> {
return new Promise((resolve, reject) => {
this.sequentialScheduler
.schedule(() => new Promise<void>((release) => resolve({ release })), {
signal: options?.signal,
})
.catch(reject);
});
}
}

export interface LockHandle {
release: () => void;
}
27 changes: 27 additions & 0 deletions core/src/utils/delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { getAbortReason } from "./getAbortReason";

export function delay(
ms: number,
options?: { signal?: AbortSignal },
): Promise<void> {
return new Promise((resolve, reject) => {
const signal = options?.signal;

if (signal?.aborted) throw getAbortReason(signal);

const abortHandler = () => {
if (!signal) return;

clearTimeout(timeoutId);
reject(getAbortReason(signal));
};
const timeoutId = setTimeout(() => {
if (signal?.aborted) abortHandler();
else resolve();

signal?.removeEventListener("abort", abortHandler);
}, ms);

signal?.addEventListener("abort", abortHandler, { once: true });
});
}
7 changes: 7 additions & 0 deletions core/src/utils/getAbortReason.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export function getAbortReason(signal: AbortSignal): unknown {
if (!signal.aborted) throw new Error("the signal was not aborted");

return (
signal.reason ?? new DOMException("an operation was aborted", "AbortError")
);
}
4 changes: 4 additions & 0 deletions core/src/utils/publishers/Publisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface Publisher<T> {
publish(value: T): void;
subscribe(subscriber: (value: T) => void): () => void;
}
39 changes: 39 additions & 0 deletions core/src/utils/publishers/ScheduledPublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type { Scheduler } from "../schedulers/Scheduler";
import type { Publisher } from "./Publisher";

export class ScheduledPublisher<T> implements Publisher<T> {
private scheduler: Scheduler;
private subscribers: ((value: T) => void)[];
private handlePublishError: (error: unknown, value: T) => void;

constructor(
scheduler: Scheduler,
options?: { handlePublishError?: (error: unknown, value: T) => void },
) {
this.scheduler = scheduler;
this.subscribers = [];
this.handlePublishError = options?.handlePublishError ?? (() => {});
}

publish(value: T): void {
const subscribers = this.subscribers.slice();

this.scheduler.schedule(async () => {
for (const subscriber of subscribers) {
try {
subscriber(value);
} catch (error) {
this.handlePublishError(error, value);
}
}
});
}

subscribe(subscriber: (value: T) => void): () => void {
this.subscribers.push(subscriber);

return () => {
this.subscribers = this.subscribers.filter((s) => s !== subscriber);
};
}
}
6 changes: 6 additions & 0 deletions core/src/utils/schedulers/Scheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface Scheduler {
schedule<T>(
task: (options?: { signal?: AbortSignal }) => Promise<T>,
options?: { signal?: AbortSignal },
): Promise<T>;
}
Loading
Loading