/** * WebSocket client for real-time communication with the Huskies server. * Manages a shared socket with reference counting, automatic reconnection, * and heartbeat keepalive. All inbound message types are dispatched to * caller-supplied handler callbacks. */ import { resolveWsHost } from "./http"; import type { Message, PipelineState, ProviderConfig, StatusEvent, WizardStateData, WsRequest, WsResponse, } from "./types"; declare const __HUSKIES_PORT__: string; const DEFAULT_WS_PATH = "/ws"; /** * Singleton-backed WebSocket client with automatic reconnection and heartbeat. * Multiple callers share one underlying socket via reference counting; the * socket is closed only when the last caller disconnects. */ export class ChatWebSocket { private static sharedSocket: WebSocket | null = null; private static refCount = 0; private socket?: WebSocket; private onToken?: (content: string) => void; private onThinkingToken?: (content: string) => void; private onUpdate?: (messages: Message[]) => void; private onSessionId?: (sessionId: string) => void; private onError?: (message: string) => void; private onPipelineState?: (state: PipelineState) => void; private onPermissionRequest?: ( requestId: string, toolName: string, toolInput: Record, ) => void; private onActivity?: (toolName: string) => void; private onReconciliationProgress?: ( storyId: string, status: string, message: string, ) => void; private onAgentConfigChanged?: () => void; private onAgentStateChanged?: () => void; private onOnboardingStatus?: (needsOnboarding: boolean) => void; private onWizardState?: (state: WizardStateData) => void; private onSideQuestionToken?: (content: string) => void; private onSideQuestionDone?: (response: string) => void; private onLogEntry?: ( timestamp: string, level: string, message: string, ) => void; private onStatusUpdate?: (event: StatusEvent) => void; private onConnected?: () => void; private connected = false; private closeTimer?: number; private wsPath = DEFAULT_WS_PATH; private reconnectTimer?: number; private reconnectDelay = 1000; private shouldReconnect = false; private heartbeatInterval?: number; private heartbeatTimeout?: number; private static readonly HEARTBEAT_INTERVAL = 30_000; private static readonly HEARTBEAT_TIMEOUT = 5_000; private _startHeartbeat(): void { this._stopHeartbeat(); this.heartbeatInterval = window.setInterval(() => { if (!this.socket || this.socket.readyState !== WebSocket.OPEN) return; const ping: WsRequest = { type: "ping" }; this.socket.send(JSON.stringify(ping)); this.heartbeatTimeout = window.setTimeout(() => { // No pong received within timeout; close socket to trigger reconnect. this.socket?.close(); }, ChatWebSocket.HEARTBEAT_TIMEOUT); }, ChatWebSocket.HEARTBEAT_INTERVAL); } private _stopHeartbeat(): void { window.clearInterval(this.heartbeatInterval); window.clearTimeout(this.heartbeatTimeout); this.heartbeatInterval = undefined; this.heartbeatTimeout = undefined; } private _buildWsUrl(): string { const protocol = window.location.protocol === "https:" ? "wss" : "ws"; const wsHost = resolveWsHost( import.meta.env.DEV, typeof __HUSKIES_PORT__ !== "undefined" ? __HUSKIES_PORT__ : undefined, window.location.host, ); return `${protocol}://${wsHost}${this.wsPath}`; } private _attachHandlers(): void { if (!this.socket) return; this.socket.onopen = () => { this.reconnectDelay = 1000; this._startHeartbeat(); this.onConnected?.(); }; this.socket.onmessage = (event) => { try { const data = JSON.parse(event.data) as WsResponse; if (data.type === "token") this.onToken?.(data.content); if (data.type === "thinking_token") this.onThinkingToken?.(data.content); if (data.type === "update") this.onUpdate?.(data.messages); if (data.type === "session_id") this.onSessionId?.(data.session_id); if (data.type === "error") this.onError?.(data.message); if (data.type === "pipeline_state") this.onPipelineState?.({ backlog: data.backlog, current: data.current, qa: data.qa, merge: data.merge, done: data.done, }); if (data.type === "permission_request") this.onPermissionRequest?.( data.request_id, data.tool_name, data.tool_input, ); if (data.type === "tool_activity") this.onActivity?.(data.tool_name); if (data.type === "reconciliation_progress") this.onReconciliationProgress?.( data.story_id, data.status, data.message, ); if (data.type === "agent_config_changed") this.onAgentConfigChanged?.(); if (data.type === "agent_state_changed") this.onAgentStateChanged?.(); if (data.type === "onboarding_status") this.onOnboardingStatus?.(data.needs_onboarding); if (data.type === "wizard_state") this.onWizardState?.({ steps: data.steps, current_step_index: data.current_step_index, completed: data.completed, }); if (data.type === "side_question_token") this.onSideQuestionToken?.(data.content); if (data.type === "side_question_done") this.onSideQuestionDone?.(data.response); if (data.type === "log_entry") this.onLogEntry?.(data.timestamp, data.level, data.message); if (data.type === "status_update") this.onStatusUpdate?.(data.event); if (data.type === "pong") { window.clearTimeout(this.heartbeatTimeout); this.heartbeatTimeout = undefined; } } catch (err) { this.onError?.(String(err)); } }; this.socket.onerror = () => { this.onError?.("WebSocket error"); }; this.socket.onclose = () => { if (this.shouldReconnect && this.connected) { this._scheduleReconnect(); } }; } private _scheduleReconnect(): void { window.clearTimeout(this.reconnectTimer); const delay = this.reconnectDelay; this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000); this.reconnectTimer = window.setTimeout(() => { this.reconnectTimer = undefined; const wsUrl = this._buildWsUrl(); ChatWebSocket.sharedSocket = new WebSocket(wsUrl); this.socket = ChatWebSocket.sharedSocket; this._attachHandlers(); }, delay); } connect( handlers: { onToken?: (content: string) => void; onThinkingToken?: (content: string) => void; onUpdate?: (messages: Message[]) => void; onSessionId?: (sessionId: string) => void; onError?: (message: string) => void; onPipelineState?: (state: PipelineState) => void; onPermissionRequest?: ( requestId: string, toolName: string, toolInput: Record, ) => void; onActivity?: (toolName: string) => void; onReconciliationProgress?: ( storyId: string, status: string, message: string, ) => void; onAgentConfigChanged?: () => void; onAgentStateChanged?: () => void; onOnboardingStatus?: (needsOnboarding: boolean) => void; onWizardState?: (state: WizardStateData) => void; onSideQuestionToken?: (content: string) => void; onSideQuestionDone?: (response: string) => void; onLogEntry?: (timestamp: string, level: string, message: string) => void; onStatusUpdate?: (event: StatusEvent) => void; onConnected?: () => void; }, wsPath = DEFAULT_WS_PATH, ) { this.onToken = handlers.onToken; this.onThinkingToken = handlers.onThinkingToken; this.onUpdate = handlers.onUpdate; this.onSessionId = handlers.onSessionId; this.onError = handlers.onError; this.onPipelineState = handlers.onPipelineState; this.onPermissionRequest = handlers.onPermissionRequest; this.onActivity = handlers.onActivity; this.onReconciliationProgress = handlers.onReconciliationProgress; this.onAgentConfigChanged = handlers.onAgentConfigChanged; this.onAgentStateChanged = handlers.onAgentStateChanged; this.onOnboardingStatus = handlers.onOnboardingStatus; this.onWizardState = handlers.onWizardState; this.onSideQuestionToken = handlers.onSideQuestionToken; this.onSideQuestionDone = handlers.onSideQuestionDone; this.onLogEntry = handlers.onLogEntry; this.onStatusUpdate = handlers.onStatusUpdate; this.onConnected = handlers.onConnected; this.wsPath = wsPath; this.shouldReconnect = true; if (this.connected) { return; } this.connected = true; ChatWebSocket.refCount += 1; if ( !ChatWebSocket.sharedSocket || ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSED || ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSING ) { const wsUrl = this._buildWsUrl(); ChatWebSocket.sharedSocket = new WebSocket(wsUrl); } this.socket = ChatWebSocket.sharedSocket; this._attachHandlers(); } sendChat(messages: Message[], config: ProviderConfig) { this.send({ type: "chat", messages, config }); } sendSideQuestion( question: string, contextMessages: Message[], config: ProviderConfig, ) { this.send({ type: "side_question", question, context_messages: contextMessages, config, }); } cancel() { this.send({ type: "cancel" }); } sendPermissionResponse( requestId: string, approved: boolean, alwaysAllow = false, ) { this.send({ type: "permission_response", request_id: requestId, approved, always_allow: alwaysAllow, }); } close() { this.shouldReconnect = false; this._stopHeartbeat(); window.clearTimeout(this.reconnectTimer); this.reconnectTimer = undefined; if (!this.connected) return; this.connected = false; ChatWebSocket.refCount = Math.max(0, ChatWebSocket.refCount - 1); if (import.meta.env.DEV) { if (this.closeTimer) { window.clearTimeout(this.closeTimer); } this.closeTimer = window.setTimeout(() => { if (ChatWebSocket.refCount === 0) { ChatWebSocket.sharedSocket?.close(); ChatWebSocket.sharedSocket = null; } this.socket = ChatWebSocket.sharedSocket ?? undefined; this.closeTimer = undefined; }, 250); return; } if (ChatWebSocket.refCount === 0) { ChatWebSocket.sharedSocket?.close(); ChatWebSocket.sharedSocket = null; } this.socket = ChatWebSocket.sharedSocket ?? undefined; } private send(payload: WsRequest) { if (!this.socket || this.socket.readyState !== WebSocket.OPEN) { this.onError?.("WebSocket is not connected"); return; } this.socket.send(JSON.stringify(payload)); } }