From 46644a6bc9f0b28bccedcdf9504b24b4d67794f0 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 23 Feb 2026 13:33:33 +0000 Subject: [PATCH] story-kit: merge 68_story_frontend_pipeline_state_stale_after_server_restart --- frontend/src/api/client.test.ts | 178 +++++++++++++++++++++++++++++++- frontend/src/api/client.ts | 103 ++++++++++++------ server/src/http/workflow.rs | 36 +++++++ 3 files changed, 283 insertions(+), 34 deletions(-) diff --git a/frontend/src/api/client.test.ts b/frontend/src/api/client.test.ts index 07caf6c..6ab4eee 100644 --- a/frontend/src/api/client.test.ts +++ b/frontend/src/api/client.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { api, resolveWsHost } from "./client"; +import { api, ChatWebSocket, resolveWsHost } from "./client"; const mockFetch = vi.fn(); @@ -154,3 +154,179 @@ describe("api client", () => { }); }); }); + +// ── ChatWebSocket reconnect tests ─────────────────────────────────────────── + +interface MockWsInstance { + onopen: (() => void) | null; + onclose: (() => void) | null; + onmessage: ((e: { data: string }) => void) | null; + onerror: (() => void) | null; + readyState: number; + send: () => void; + close: () => void; + simulateClose: () => void; + simulateMessage: (data: Record) => void; +} + +function makeMockWebSocket() { + const instances: MockWsInstance[] = []; + + class MockWebSocket { + static readonly CONNECTING = 0; + static readonly OPEN = 1; + static readonly CLOSING = 2; + static readonly CLOSED = 3; + + onopen: (() => void) | null = null; + onclose: (() => void) | null = null; + onmessage: ((e: { data: string }) => void) | null = null; + onerror: (() => void) | null = null; + readyState = 0; + + constructor(_url: string) { + instances.push(this as unknown as MockWsInstance); + } + + send() {} + + close() { + this.readyState = 3; + this.onclose?.(); + } + + simulateClose() { + this.readyState = 3; + this.onclose?.(); + } + + simulateMessage(data: Record) { + this.onmessage?.({ data: JSON.stringify(data) }); + } + } + + return { MockWebSocket, instances }; +} + +describe("ChatWebSocket", () => { + beforeEach(() => { + vi.useFakeTimers(); + const { MockWebSocket } = makeMockWebSocket(); + vi.stubGlobal("WebSocket", MockWebSocket); + // Reset shared static state between tests + (ChatWebSocket as unknown as { sharedSocket: null }).sharedSocket = null; + (ChatWebSocket as unknown as { refCount: number }).refCount = 0; + }); + + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it("schedules reconnect after socket closes unexpectedly", () => { + const { MockWebSocket, instances } = makeMockWebSocket(); + vi.stubGlobal("WebSocket", MockWebSocket); + + const ws = new ChatWebSocket(); + ws.connect({}); + + expect(instances).toHaveLength(1); + + instances[0].simulateClose(); + + // No new socket created yet + expect(instances).toHaveLength(1); + + // Advance past the initial 1s reconnect delay + vi.advanceTimersByTime(1001); + + // A new socket should now have been created + expect(instances).toHaveLength(2); + }); + + it("delivers pipeline_state after reconnect", () => { + const { MockWebSocket, instances } = makeMockWebSocket(); + vi.stubGlobal("WebSocket", MockWebSocket); + + const onPipelineState = vi.fn(); + const ws = new ChatWebSocket(); + ws.connect({ onPipelineState }); + + // Simulate server restart + instances[0].simulateClose(); + vi.advanceTimersByTime(1001); + + // Server pushes pipeline_state on fresh connection + const freshState = { + upcoming: [{ story_id: "1_story_test", name: "Test", error: null }], + current: [], + qa: [], + merge: [], + }; + instances[1].simulateMessage({ type: "pipeline_state", ...freshState }); + + expect(onPipelineState).toHaveBeenCalledWith(freshState); + }); + + it("does not reconnect after explicit close()", () => { + const { MockWebSocket, instances } = makeMockWebSocket(); + vi.stubGlobal("WebSocket", MockWebSocket); + + const ws = new ChatWebSocket(); + ws.connect({}); + + // Explicit close disables reconnect + ws.close(); + + // Advance through both the DEV close-defer (250ms) and reconnect window + vi.advanceTimersByTime(2000); + + // No new socket should be created + expect(instances).toHaveLength(1); + }); + + it("uses exponential backoff on repeated failures", () => { + const { MockWebSocket, instances } = makeMockWebSocket(); + vi.stubGlobal("WebSocket", MockWebSocket); + + const ws = new ChatWebSocket(); + ws.connect({}); + + // First close → reconnects after 1s + instances[0].simulateClose(); + vi.advanceTimersByTime(1001); + expect(instances).toHaveLength(2); + + // Second close → reconnects after 2s (doubled) + instances[1].simulateClose(); + vi.advanceTimersByTime(1500); + // Not yet (delay is now 2s) + expect(instances).toHaveLength(2); + vi.advanceTimersByTime(600); + expect(instances).toHaveLength(3); + }); + + it("resets reconnect delay after successful open", () => { + const { MockWebSocket, instances } = makeMockWebSocket(); + vi.stubGlobal("WebSocket", MockWebSocket); + + const ws = new ChatWebSocket(); + ws.connect({}); + + // Disconnect and reconnect twice to raise the delay + instances[0].simulateClose(); + vi.advanceTimersByTime(1001); + + instances[1].simulateClose(); + vi.advanceTimersByTime(2001); + + // Simulate a successful open on third socket — resets delay to 1s + instances[2].onopen?.(); + + // Close again — should use the reset 1s delay + instances[2].simulateClose(); + vi.advanceTimersByTime(1001); + + expect(instances).toHaveLength(4); + }); +}); diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index 31e5056..6dffef5 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -246,46 +246,26 @@ export class ChatWebSocket { private onPipelineState?: (state: PipelineState) => void; private connected = false; private closeTimer?: number; + private wsPath = DEFAULT_WS_PATH; + private reconnectTimer?: number; + private reconnectDelay = 1000; + private shouldReconnect = false; - connect( - handlers: { - onToken?: (content: string) => void; - onUpdate?: (messages: Message[]) => void; - onSessionId?: (sessionId: string) => void; - onError?: (message: string) => void; - onPipelineState?: (state: PipelineState) => void; - }, - wsPath = DEFAULT_WS_PATH, - ) { - this.onToken = handlers.onToken; - this.onUpdate = handlers.onUpdate; - this.onSessionId = handlers.onSessionId; - this.onError = handlers.onError; - this.onPipelineState = handlers.onPipelineState; - - if (this.connected) { - return; - } - this.connected = true; - ChatWebSocket.refCount += 1; - + private _buildWsUrl(): string { const protocol = window.location.protocol === "https:" ? "wss" : "ws"; const wsHost = resolveWsHost( import.meta.env.DEV, typeof __STORYKIT_PORT__ !== "undefined" ? __STORYKIT_PORT__ : undefined, window.location.host, ); - const wsUrl = `${protocol}://${wsHost}${wsPath}`; - - if ( - !ChatWebSocket.sharedSocket || - ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSED || - ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSING - ) { - ChatWebSocket.sharedSocket = new WebSocket(wsUrl); - } - this.socket = ChatWebSocket.sharedSocket; + return `${protocol}://${wsHost}${this.wsPath}`; + } + private _attachHandlers(): void { + if (!this.socket) return; + this.socket.onopen = () => { + this.reconnectDelay = 1000; + }; this.socket.onmessage = (event) => { try { const data = JSON.parse(event.data) as WsResponse; @@ -304,10 +284,63 @@ export class ChatWebSocket { this.onError?.(String(err)); } }; - this.socket.onerror = () => { this.onError?.("WebSocket error"); }; + this.socket.onclose = () => { + if (this.shouldReconnect && this.connected) { + this._scheduleReconnect(); + } + }; + } + + private _scheduleReconnect(): void { + window.clearTimeout(this.reconnectTimer); + const delay = this.reconnectDelay; + this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000); + this.reconnectTimer = window.setTimeout(() => { + this.reconnectTimer = undefined; + const wsUrl = this._buildWsUrl(); + ChatWebSocket.sharedSocket = new WebSocket(wsUrl); + this.socket = ChatWebSocket.sharedSocket; + this._attachHandlers(); + }, delay); + } + + connect( + handlers: { + onToken?: (content: string) => void; + onUpdate?: (messages: Message[]) => void; + onSessionId?: (sessionId: string) => void; + onError?: (message: string) => void; + onPipelineState?: (state: PipelineState) => void; + }, + wsPath = DEFAULT_WS_PATH, + ) { + this.onToken = handlers.onToken; + this.onUpdate = handlers.onUpdate; + this.onSessionId = handlers.onSessionId; + this.onError = handlers.onError; + this.onPipelineState = handlers.onPipelineState; + this.wsPath = wsPath; + this.shouldReconnect = true; + + if (this.connected) { + return; + } + this.connected = true; + ChatWebSocket.refCount += 1; + + if ( + !ChatWebSocket.sharedSocket || + ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSED || + ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSING + ) { + const wsUrl = this._buildWsUrl(); + ChatWebSocket.sharedSocket = new WebSocket(wsUrl); + } + this.socket = ChatWebSocket.sharedSocket; + this._attachHandlers(); } sendChat(messages: Message[], config: ProviderConfig) { @@ -319,6 +352,10 @@ export class ChatWebSocket { } close() { + this.shouldReconnect = false; + window.clearTimeout(this.reconnectTimer); + this.reconnectTimer = undefined; + if (!this.connected) return; this.connected = false; ChatWebSocket.refCount = Math.max(0, ChatWebSocket.refCount - 1); diff --git a/server/src/http/workflow.rs b/server/src/http/workflow.rs index 6cc34cc..b2f0323 100644 --- a/server/src/http/workflow.rs +++ b/server/src/http/workflow.rs @@ -588,6 +588,42 @@ pub fn validate_story_dirs( mod tests { use super::*; + #[test] + fn load_pipeline_state_loads_all_stages() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path().to_path_buf(); + + for (stage, id) in &[ + ("1_upcoming", "10_story_upcoming"), + ("2_current", "20_story_current"), + ("3_qa", "30_story_qa"), + ("4_merge", "40_story_merge"), + ] { + let dir = root.join(".story_kit").join("work").join(stage); + fs::create_dir_all(&dir).unwrap(); + fs::write( + dir.join(format!("{id}.md")), + format!("---\nname: {id}\ntest_plan: pending\n---\n"), + ) + .unwrap(); + } + + let ctx = crate::http::context::AppContext::new_test(root); + let state = load_pipeline_state(&ctx).unwrap(); + + assert_eq!(state.upcoming.len(), 1); + assert_eq!(state.upcoming[0].story_id, "10_story_upcoming"); + + assert_eq!(state.current.len(), 1); + assert_eq!(state.current[0].story_id, "20_story_current"); + + assert_eq!(state.qa.len(), 1); + assert_eq!(state.qa[0].story_id, "30_story_qa"); + + assert_eq!(state.merge.len(), 1); + assert_eq!(state.merge[0].story_id, "40_story_merge"); + } + #[test] fn load_upcoming_returns_empty_when_no_dir() { let tmp = tempfile::tempdir().unwrap();