story-kit: merge 68_story_frontend_pipeline_state_stale_after_server_restart
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
import { api, resolveWsHost } from "./client";
|
import { api, ChatWebSocket, resolveWsHost } from "./client";
|
||||||
|
|
||||||
const mockFetch = vi.fn();
|
const mockFetch = vi.fn();
|
||||||
|
|
||||||
@@ -154,3 +154,179 @@ describe("api client", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ── ChatWebSocket reconnect tests ───────────────────────────────────────────
|
||||||
|
|
||||||
|
interface MockWsInstance {
|
||||||
|
onopen: (() => void) | null;
|
||||||
|
onclose: (() => void) | null;
|
||||||
|
onmessage: ((e: { data: string }) => void) | null;
|
||||||
|
onerror: (() => void) | null;
|
||||||
|
readyState: number;
|
||||||
|
send: () => void;
|
||||||
|
close: () => void;
|
||||||
|
simulateClose: () => void;
|
||||||
|
simulateMessage: (data: Record<string, unknown>) => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeMockWebSocket() {
|
||||||
|
const instances: MockWsInstance[] = [];
|
||||||
|
|
||||||
|
class MockWebSocket {
|
||||||
|
static readonly CONNECTING = 0;
|
||||||
|
static readonly OPEN = 1;
|
||||||
|
static readonly CLOSING = 2;
|
||||||
|
static readonly CLOSED = 3;
|
||||||
|
|
||||||
|
onopen: (() => void) | null = null;
|
||||||
|
onclose: (() => void) | null = null;
|
||||||
|
onmessage: ((e: { data: string }) => void) | null = null;
|
||||||
|
onerror: (() => void) | null = null;
|
||||||
|
readyState = 0;
|
||||||
|
|
||||||
|
constructor(_url: string) {
|
||||||
|
instances.push(this as unknown as MockWsInstance);
|
||||||
|
}
|
||||||
|
|
||||||
|
send() {}
|
||||||
|
|
||||||
|
close() {
|
||||||
|
this.readyState = 3;
|
||||||
|
this.onclose?.();
|
||||||
|
}
|
||||||
|
|
||||||
|
simulateClose() {
|
||||||
|
this.readyState = 3;
|
||||||
|
this.onclose?.();
|
||||||
|
}
|
||||||
|
|
||||||
|
simulateMessage(data: Record<string, unknown>) {
|
||||||
|
this.onmessage?.({ data: JSON.stringify(data) });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return { MockWebSocket, instances };
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("ChatWebSocket", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const { MockWebSocket } = makeMockWebSocket();
|
||||||
|
vi.stubGlobal("WebSocket", MockWebSocket);
|
||||||
|
// Reset shared static state between tests
|
||||||
|
(ChatWebSocket as unknown as { sharedSocket: null }).sharedSocket = null;
|
||||||
|
(ChatWebSocket as unknown as { refCount: number }).refCount = 0;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
vi.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("schedules reconnect after socket closes unexpectedly", () => {
|
||||||
|
const { MockWebSocket, instances } = makeMockWebSocket();
|
||||||
|
vi.stubGlobal("WebSocket", MockWebSocket);
|
||||||
|
|
||||||
|
const ws = new ChatWebSocket();
|
||||||
|
ws.connect({});
|
||||||
|
|
||||||
|
expect(instances).toHaveLength(1);
|
||||||
|
|
||||||
|
instances[0].simulateClose();
|
||||||
|
|
||||||
|
// No new socket created yet
|
||||||
|
expect(instances).toHaveLength(1);
|
||||||
|
|
||||||
|
// Advance past the initial 1s reconnect delay
|
||||||
|
vi.advanceTimersByTime(1001);
|
||||||
|
|
||||||
|
// A new socket should now have been created
|
||||||
|
expect(instances).toHaveLength(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("delivers pipeline_state after reconnect", () => {
|
||||||
|
const { MockWebSocket, instances } = makeMockWebSocket();
|
||||||
|
vi.stubGlobal("WebSocket", MockWebSocket);
|
||||||
|
|
||||||
|
const onPipelineState = vi.fn();
|
||||||
|
const ws = new ChatWebSocket();
|
||||||
|
ws.connect({ onPipelineState });
|
||||||
|
|
||||||
|
// Simulate server restart
|
||||||
|
instances[0].simulateClose();
|
||||||
|
vi.advanceTimersByTime(1001);
|
||||||
|
|
||||||
|
// Server pushes pipeline_state on fresh connection
|
||||||
|
const freshState = {
|
||||||
|
upcoming: [{ story_id: "1_story_test", name: "Test", error: null }],
|
||||||
|
current: [],
|
||||||
|
qa: [],
|
||||||
|
merge: [],
|
||||||
|
};
|
||||||
|
instances[1].simulateMessage({ type: "pipeline_state", ...freshState });
|
||||||
|
|
||||||
|
expect(onPipelineState).toHaveBeenCalledWith(freshState);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not reconnect after explicit close()", () => {
|
||||||
|
const { MockWebSocket, instances } = makeMockWebSocket();
|
||||||
|
vi.stubGlobal("WebSocket", MockWebSocket);
|
||||||
|
|
||||||
|
const ws = new ChatWebSocket();
|
||||||
|
ws.connect({});
|
||||||
|
|
||||||
|
// Explicit close disables reconnect
|
||||||
|
ws.close();
|
||||||
|
|
||||||
|
// Advance through both the DEV close-defer (250ms) and reconnect window
|
||||||
|
vi.advanceTimersByTime(2000);
|
||||||
|
|
||||||
|
// No new socket should be created
|
||||||
|
expect(instances).toHaveLength(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("uses exponential backoff on repeated failures", () => {
|
||||||
|
const { MockWebSocket, instances } = makeMockWebSocket();
|
||||||
|
vi.stubGlobal("WebSocket", MockWebSocket);
|
||||||
|
|
||||||
|
const ws = new ChatWebSocket();
|
||||||
|
ws.connect({});
|
||||||
|
|
||||||
|
// First close → reconnects after 1s
|
||||||
|
instances[0].simulateClose();
|
||||||
|
vi.advanceTimersByTime(1001);
|
||||||
|
expect(instances).toHaveLength(2);
|
||||||
|
|
||||||
|
// Second close → reconnects after 2s (doubled)
|
||||||
|
instances[1].simulateClose();
|
||||||
|
vi.advanceTimersByTime(1500);
|
||||||
|
// Not yet (delay is now 2s)
|
||||||
|
expect(instances).toHaveLength(2);
|
||||||
|
vi.advanceTimersByTime(600);
|
||||||
|
expect(instances).toHaveLength(3);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("resets reconnect delay after successful open", () => {
|
||||||
|
const { MockWebSocket, instances } = makeMockWebSocket();
|
||||||
|
vi.stubGlobal("WebSocket", MockWebSocket);
|
||||||
|
|
||||||
|
const ws = new ChatWebSocket();
|
||||||
|
ws.connect({});
|
||||||
|
|
||||||
|
// Disconnect and reconnect twice to raise the delay
|
||||||
|
instances[0].simulateClose();
|
||||||
|
vi.advanceTimersByTime(1001);
|
||||||
|
|
||||||
|
instances[1].simulateClose();
|
||||||
|
vi.advanceTimersByTime(2001);
|
||||||
|
|
||||||
|
// Simulate a successful open on third socket — resets delay to 1s
|
||||||
|
instances[2].onopen?.();
|
||||||
|
|
||||||
|
// Close again — should use the reset 1s delay
|
||||||
|
instances[2].simulateClose();
|
||||||
|
vi.advanceTimersByTime(1001);
|
||||||
|
|
||||||
|
expect(instances).toHaveLength(4);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -246,46 +246,26 @@ export class ChatWebSocket {
|
|||||||
private onPipelineState?: (state: PipelineState) => void;
|
private onPipelineState?: (state: PipelineState) => void;
|
||||||
private connected = false;
|
private connected = false;
|
||||||
private closeTimer?: number;
|
private closeTimer?: number;
|
||||||
|
private wsPath = DEFAULT_WS_PATH;
|
||||||
|
private reconnectTimer?: number;
|
||||||
|
private reconnectDelay = 1000;
|
||||||
|
private shouldReconnect = false;
|
||||||
|
|
||||||
connect(
|
private _buildWsUrl(): string {
|
||||||
handlers: {
|
|
||||||
onToken?: (content: string) => void;
|
|
||||||
onUpdate?: (messages: Message[]) => void;
|
|
||||||
onSessionId?: (sessionId: string) => void;
|
|
||||||
onError?: (message: string) => void;
|
|
||||||
onPipelineState?: (state: PipelineState) => void;
|
|
||||||
},
|
|
||||||
wsPath = DEFAULT_WS_PATH,
|
|
||||||
) {
|
|
||||||
this.onToken = handlers.onToken;
|
|
||||||
this.onUpdate = handlers.onUpdate;
|
|
||||||
this.onSessionId = handlers.onSessionId;
|
|
||||||
this.onError = handlers.onError;
|
|
||||||
this.onPipelineState = handlers.onPipelineState;
|
|
||||||
|
|
||||||
if (this.connected) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.connected = true;
|
|
||||||
ChatWebSocket.refCount += 1;
|
|
||||||
|
|
||||||
const protocol = window.location.protocol === "https:" ? "wss" : "ws";
|
const protocol = window.location.protocol === "https:" ? "wss" : "ws";
|
||||||
const wsHost = resolveWsHost(
|
const wsHost = resolveWsHost(
|
||||||
import.meta.env.DEV,
|
import.meta.env.DEV,
|
||||||
typeof __STORYKIT_PORT__ !== "undefined" ? __STORYKIT_PORT__ : undefined,
|
typeof __STORYKIT_PORT__ !== "undefined" ? __STORYKIT_PORT__ : undefined,
|
||||||
window.location.host,
|
window.location.host,
|
||||||
);
|
);
|
||||||
const wsUrl = `${protocol}://${wsHost}${wsPath}`;
|
return `${protocol}://${wsHost}${this.wsPath}`;
|
||||||
|
}
|
||||||
if (
|
|
||||||
!ChatWebSocket.sharedSocket ||
|
|
||||||
ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSED ||
|
|
||||||
ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSING
|
|
||||||
) {
|
|
||||||
ChatWebSocket.sharedSocket = new WebSocket(wsUrl);
|
|
||||||
}
|
|
||||||
this.socket = ChatWebSocket.sharedSocket;
|
|
||||||
|
|
||||||
|
private _attachHandlers(): void {
|
||||||
|
if (!this.socket) return;
|
||||||
|
this.socket.onopen = () => {
|
||||||
|
this.reconnectDelay = 1000;
|
||||||
|
};
|
||||||
this.socket.onmessage = (event) => {
|
this.socket.onmessage = (event) => {
|
||||||
try {
|
try {
|
||||||
const data = JSON.parse(event.data) as WsResponse;
|
const data = JSON.parse(event.data) as WsResponse;
|
||||||
@@ -304,10 +284,63 @@ export class ChatWebSocket {
|
|||||||
this.onError?.(String(err));
|
this.onError?.(String(err));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
this.socket.onerror = () => {
|
this.socket.onerror = () => {
|
||||||
this.onError?.("WebSocket error");
|
this.onError?.("WebSocket error");
|
||||||
};
|
};
|
||||||
|
this.socket.onclose = () => {
|
||||||
|
if (this.shouldReconnect && this.connected) {
|
||||||
|
this._scheduleReconnect();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private _scheduleReconnect(): void {
|
||||||
|
window.clearTimeout(this.reconnectTimer);
|
||||||
|
const delay = this.reconnectDelay;
|
||||||
|
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
|
||||||
|
this.reconnectTimer = window.setTimeout(() => {
|
||||||
|
this.reconnectTimer = undefined;
|
||||||
|
const wsUrl = this._buildWsUrl();
|
||||||
|
ChatWebSocket.sharedSocket = new WebSocket(wsUrl);
|
||||||
|
this.socket = ChatWebSocket.sharedSocket;
|
||||||
|
this._attachHandlers();
|
||||||
|
}, delay);
|
||||||
|
}
|
||||||
|
|
||||||
|
connect(
|
||||||
|
handlers: {
|
||||||
|
onToken?: (content: string) => void;
|
||||||
|
onUpdate?: (messages: Message[]) => void;
|
||||||
|
onSessionId?: (sessionId: string) => void;
|
||||||
|
onError?: (message: string) => void;
|
||||||
|
onPipelineState?: (state: PipelineState) => void;
|
||||||
|
},
|
||||||
|
wsPath = DEFAULT_WS_PATH,
|
||||||
|
) {
|
||||||
|
this.onToken = handlers.onToken;
|
||||||
|
this.onUpdate = handlers.onUpdate;
|
||||||
|
this.onSessionId = handlers.onSessionId;
|
||||||
|
this.onError = handlers.onError;
|
||||||
|
this.onPipelineState = handlers.onPipelineState;
|
||||||
|
this.wsPath = wsPath;
|
||||||
|
this.shouldReconnect = true;
|
||||||
|
|
||||||
|
if (this.connected) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.connected = true;
|
||||||
|
ChatWebSocket.refCount += 1;
|
||||||
|
|
||||||
|
if (
|
||||||
|
!ChatWebSocket.sharedSocket ||
|
||||||
|
ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSED ||
|
||||||
|
ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSING
|
||||||
|
) {
|
||||||
|
const wsUrl = this._buildWsUrl();
|
||||||
|
ChatWebSocket.sharedSocket = new WebSocket(wsUrl);
|
||||||
|
}
|
||||||
|
this.socket = ChatWebSocket.sharedSocket;
|
||||||
|
this._attachHandlers();
|
||||||
}
|
}
|
||||||
|
|
||||||
sendChat(messages: Message[], config: ProviderConfig) {
|
sendChat(messages: Message[], config: ProviderConfig) {
|
||||||
@@ -319,6 +352,10 @@ export class ChatWebSocket {
|
|||||||
}
|
}
|
||||||
|
|
||||||
close() {
|
close() {
|
||||||
|
this.shouldReconnect = false;
|
||||||
|
window.clearTimeout(this.reconnectTimer);
|
||||||
|
this.reconnectTimer = undefined;
|
||||||
|
|
||||||
if (!this.connected) return;
|
if (!this.connected) return;
|
||||||
this.connected = false;
|
this.connected = false;
|
||||||
ChatWebSocket.refCount = Math.max(0, ChatWebSocket.refCount - 1);
|
ChatWebSocket.refCount = Math.max(0, ChatWebSocket.refCount - 1);
|
||||||
|
|||||||
@@ -588,6 +588,42 @@ pub fn validate_story_dirs(
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn load_pipeline_state_loads_all_stages() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let root = tmp.path().to_path_buf();
|
||||||
|
|
||||||
|
for (stage, id) in &[
|
||||||
|
("1_upcoming", "10_story_upcoming"),
|
||||||
|
("2_current", "20_story_current"),
|
||||||
|
("3_qa", "30_story_qa"),
|
||||||
|
("4_merge", "40_story_merge"),
|
||||||
|
] {
|
||||||
|
let dir = root.join(".story_kit").join("work").join(stage);
|
||||||
|
fs::create_dir_all(&dir).unwrap();
|
||||||
|
fs::write(
|
||||||
|
dir.join(format!("{id}.md")),
|
||||||
|
format!("---\nname: {id}\ntest_plan: pending\n---\n"),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let ctx = crate::http::context::AppContext::new_test(root);
|
||||||
|
let state = load_pipeline_state(&ctx).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(state.upcoming.len(), 1);
|
||||||
|
assert_eq!(state.upcoming[0].story_id, "10_story_upcoming");
|
||||||
|
|
||||||
|
assert_eq!(state.current.len(), 1);
|
||||||
|
assert_eq!(state.current[0].story_id, "20_story_current");
|
||||||
|
|
||||||
|
assert_eq!(state.qa.len(), 1);
|
||||||
|
assert_eq!(state.qa[0].story_id, "30_story_qa");
|
||||||
|
|
||||||
|
assert_eq!(state.merge.len(), 1);
|
||||||
|
assert_eq!(state.merge[0].story_id, "40_story_merge");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn load_upcoming_returns_empty_when_no_dir() {
|
fn load_upcoming_returns_empty_when_no_dir() {
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
|||||||
Reference in New Issue
Block a user