338 lines
10 KiB
TypeScript
338 lines
10 KiB
TypeScript
/**
|
|
* WebSocket client for real-time communication with the Huskies server.
|
|
* Manages a shared socket with reference counting, automatic reconnection,
|
|
* and heartbeat keepalive. All inbound message types are dispatched to
|
|
* caller-supplied handler callbacks.
|
|
*/
|
|
|
|
import { resolveWsHost } from "./http";
|
|
import type {
|
|
Message,
|
|
PipelineState,
|
|
ProviderConfig,
|
|
StatusEvent,
|
|
WizardStateData,
|
|
WsRequest,
|
|
WsResponse,
|
|
} from "./types";
|
|
|
|
declare const __HUSKIES_PORT__: string;
|
|
|
|
const DEFAULT_WS_PATH = "/ws";
|
|
|
|
/**
|
|
* Singleton-backed WebSocket client with automatic reconnection and heartbeat.
|
|
* Multiple callers share one underlying socket via reference counting; the
|
|
* socket is closed only when the last caller disconnects.
|
|
*/
|
|
export class ChatWebSocket {
|
|
private static sharedSocket: WebSocket | null = null;
|
|
private static refCount = 0;
|
|
private socket?: WebSocket;
|
|
private onToken?: (content: string) => void;
|
|
private onThinkingToken?: (content: string) => void;
|
|
private onUpdate?: (messages: Message[]) => void;
|
|
private onSessionId?: (sessionId: string) => void;
|
|
private onError?: (message: string) => void;
|
|
private onPipelineState?: (state: PipelineState) => void;
|
|
private onPermissionRequest?: (
|
|
requestId: string,
|
|
toolName: string,
|
|
toolInput: Record<string, unknown>,
|
|
) => void;
|
|
private onActivity?: (toolName: string) => void;
|
|
private onReconciliationProgress?: (
|
|
storyId: string,
|
|
status: string,
|
|
message: string,
|
|
) => void;
|
|
private onAgentConfigChanged?: () => void;
|
|
private onAgentStateChanged?: () => void;
|
|
private onOnboardingStatus?: (needsOnboarding: boolean) => void;
|
|
private onWizardState?: (state: WizardStateData) => void;
|
|
private onSideQuestionToken?: (content: string) => void;
|
|
private onSideQuestionDone?: (response: string) => void;
|
|
private onLogEntry?: (
|
|
timestamp: string,
|
|
level: string,
|
|
message: string,
|
|
) => void;
|
|
private onStatusUpdate?: (event: StatusEvent) => void;
|
|
private onConnected?: () => void;
|
|
private onDisconnected?: () => void;
|
|
private connected = false;
|
|
private closeTimer?: number;
|
|
private wsPath = DEFAULT_WS_PATH;
|
|
private reconnectTimer?: number;
|
|
private reconnectDelay = 1000;
|
|
private shouldReconnect = false;
|
|
private heartbeatInterval?: number;
|
|
private heartbeatTimeout?: number;
|
|
private static readonly HEARTBEAT_INTERVAL = 30_000;
|
|
private static readonly HEARTBEAT_TIMEOUT = 5_000;
|
|
|
|
private _startHeartbeat(): void {
|
|
this._stopHeartbeat();
|
|
this.heartbeatInterval = window.setInterval(() => {
|
|
if (!this.socket || this.socket.readyState !== WebSocket.OPEN) return;
|
|
const ping: WsRequest = { type: "ping" };
|
|
this.socket.send(JSON.stringify(ping));
|
|
this.heartbeatTimeout = window.setTimeout(() => {
|
|
// No pong received within timeout; close socket to trigger reconnect.
|
|
this.socket?.close();
|
|
}, ChatWebSocket.HEARTBEAT_TIMEOUT);
|
|
}, ChatWebSocket.HEARTBEAT_INTERVAL);
|
|
}
|
|
|
|
private _stopHeartbeat(): void {
|
|
window.clearInterval(this.heartbeatInterval);
|
|
window.clearTimeout(this.heartbeatTimeout);
|
|
this.heartbeatInterval = undefined;
|
|
this.heartbeatTimeout = undefined;
|
|
}
|
|
|
|
private _buildWsUrl(): string {
|
|
const protocol = window.location.protocol === "https:" ? "wss" : "ws";
|
|
const wsHost = resolveWsHost(
|
|
import.meta.env.DEV,
|
|
typeof __HUSKIES_PORT__ !== "undefined" ? __HUSKIES_PORT__ : undefined,
|
|
window.location.host,
|
|
);
|
|
return `${protocol}://${wsHost}${this.wsPath}`;
|
|
}
|
|
|
|
private _attachHandlers(): void {
|
|
if (!this.socket) return;
|
|
this.socket.onopen = () => {
|
|
this.reconnectDelay = 1000;
|
|
this._startHeartbeat();
|
|
this.onConnected?.();
|
|
};
|
|
this.socket.onmessage = (event) => {
|
|
try {
|
|
const data = JSON.parse(event.data) as WsResponse;
|
|
if (data.type === "token") this.onToken?.(data.content);
|
|
if (data.type === "thinking_token")
|
|
this.onThinkingToken?.(data.content);
|
|
if (data.type === "update") this.onUpdate?.(data.messages);
|
|
if (data.type === "session_id") this.onSessionId?.(data.session_id);
|
|
if (data.type === "error") this.onError?.(data.message);
|
|
if (data.type === "pipeline_state")
|
|
this.onPipelineState?.({
|
|
backlog: data.backlog,
|
|
current: data.current,
|
|
qa: data.qa,
|
|
merge: data.merge,
|
|
done: data.done,
|
|
deterministic_merges_in_flight:
|
|
data.deterministic_merges_in_flight ?? [],
|
|
});
|
|
if (data.type === "permission_request")
|
|
this.onPermissionRequest?.(
|
|
data.request_id,
|
|
data.tool_name,
|
|
data.tool_input,
|
|
);
|
|
if (data.type === "tool_activity") this.onActivity?.(data.tool_name);
|
|
if (data.type === "reconciliation_progress")
|
|
this.onReconciliationProgress?.(
|
|
data.story_id,
|
|
data.status,
|
|
data.message,
|
|
);
|
|
if (data.type === "agent_config_changed") this.onAgentConfigChanged?.();
|
|
if (data.type === "agent_state_changed") this.onAgentStateChanged?.();
|
|
if (data.type === "onboarding_status")
|
|
this.onOnboardingStatus?.(data.needs_onboarding);
|
|
if (data.type === "wizard_state")
|
|
this.onWizardState?.({
|
|
steps: data.steps,
|
|
current_step_index: data.current_step_index,
|
|
completed: data.completed,
|
|
});
|
|
if (data.type === "side_question_token")
|
|
this.onSideQuestionToken?.(data.content);
|
|
if (data.type === "side_question_done")
|
|
this.onSideQuestionDone?.(data.response);
|
|
if (data.type === "log_entry")
|
|
this.onLogEntry?.(data.timestamp, data.level, data.message);
|
|
if (data.type === "status_update") this.onStatusUpdate?.(data.event);
|
|
if (data.type === "pong") {
|
|
window.clearTimeout(this.heartbeatTimeout);
|
|
this.heartbeatTimeout = undefined;
|
|
}
|
|
} catch (err) {
|
|
this.onError?.(String(err));
|
|
}
|
|
};
|
|
this.socket.onerror = () => {
|
|
this.onError?.("WebSocket error");
|
|
};
|
|
this.socket.onclose = () => {
|
|
if (this.shouldReconnect && this.connected) {
|
|
this.onDisconnected?.();
|
|
this._scheduleReconnect();
|
|
}
|
|
};
|
|
}
|
|
|
|
private _scheduleReconnect(): void {
|
|
window.clearTimeout(this.reconnectTimer);
|
|
const delay = this.reconnectDelay;
|
|
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
|
|
this.reconnectTimer = window.setTimeout(() => {
|
|
this.reconnectTimer = undefined;
|
|
const wsUrl = this._buildWsUrl();
|
|
ChatWebSocket.sharedSocket = new WebSocket(wsUrl);
|
|
this.socket = ChatWebSocket.sharedSocket;
|
|
this._attachHandlers();
|
|
}, delay);
|
|
}
|
|
|
|
connect(
|
|
handlers: {
|
|
onToken?: (content: string) => void;
|
|
onThinkingToken?: (content: string) => void;
|
|
onUpdate?: (messages: Message[]) => void;
|
|
onSessionId?: (sessionId: string) => void;
|
|
onError?: (message: string) => void;
|
|
onPipelineState?: (state: PipelineState) => void;
|
|
onPermissionRequest?: (
|
|
requestId: string,
|
|
toolName: string,
|
|
toolInput: Record<string, unknown>,
|
|
) => void;
|
|
onActivity?: (toolName: string) => void;
|
|
onReconciliationProgress?: (
|
|
storyId: string,
|
|
status: string,
|
|
message: string,
|
|
) => void;
|
|
onAgentConfigChanged?: () => void;
|
|
onAgentStateChanged?: () => void;
|
|
onOnboardingStatus?: (needsOnboarding: boolean) => void;
|
|
onWizardState?: (state: WizardStateData) => void;
|
|
onSideQuestionToken?: (content: string) => void;
|
|
onSideQuestionDone?: (response: string) => void;
|
|
onLogEntry?: (timestamp: string, level: string, message: string) => void;
|
|
onStatusUpdate?: (event: StatusEvent) => void;
|
|
onConnected?: () => void;
|
|
onDisconnected?: () => void;
|
|
},
|
|
wsPath = DEFAULT_WS_PATH,
|
|
) {
|
|
this.onToken = handlers.onToken;
|
|
this.onThinkingToken = handlers.onThinkingToken;
|
|
this.onUpdate = handlers.onUpdate;
|
|
this.onSessionId = handlers.onSessionId;
|
|
this.onError = handlers.onError;
|
|
this.onPipelineState = handlers.onPipelineState;
|
|
this.onPermissionRequest = handlers.onPermissionRequest;
|
|
this.onActivity = handlers.onActivity;
|
|
this.onReconciliationProgress = handlers.onReconciliationProgress;
|
|
this.onAgentConfigChanged = handlers.onAgentConfigChanged;
|
|
this.onAgentStateChanged = handlers.onAgentStateChanged;
|
|
this.onOnboardingStatus = handlers.onOnboardingStatus;
|
|
this.onWizardState = handlers.onWizardState;
|
|
this.onSideQuestionToken = handlers.onSideQuestionToken;
|
|
this.onSideQuestionDone = handlers.onSideQuestionDone;
|
|
this.onLogEntry = handlers.onLogEntry;
|
|
this.onStatusUpdate = handlers.onStatusUpdate;
|
|
this.onConnected = handlers.onConnected;
|
|
this.onDisconnected = handlers.onDisconnected;
|
|
this.wsPath = wsPath;
|
|
this.shouldReconnect = true;
|
|
|
|
if (this.connected) {
|
|
return;
|
|
}
|
|
this.connected = true;
|
|
ChatWebSocket.refCount += 1;
|
|
|
|
if (
|
|
!ChatWebSocket.sharedSocket ||
|
|
ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSED ||
|
|
ChatWebSocket.sharedSocket.readyState === WebSocket.CLOSING
|
|
) {
|
|
const wsUrl = this._buildWsUrl();
|
|
ChatWebSocket.sharedSocket = new WebSocket(wsUrl);
|
|
}
|
|
this.socket = ChatWebSocket.sharedSocket;
|
|
this._attachHandlers();
|
|
}
|
|
|
|
sendChat(messages: Message[], config: ProviderConfig) {
|
|
this.send({ type: "chat", messages, config });
|
|
}
|
|
|
|
sendSideQuestion(
|
|
question: string,
|
|
contextMessages: Message[],
|
|
config: ProviderConfig,
|
|
) {
|
|
this.send({
|
|
type: "side_question",
|
|
question,
|
|
context_messages: contextMessages,
|
|
config,
|
|
});
|
|
}
|
|
|
|
cancel() {
|
|
this.send({ type: "cancel" });
|
|
}
|
|
|
|
sendPermissionResponse(
|
|
requestId: string,
|
|
approved: boolean,
|
|
alwaysAllow = false,
|
|
) {
|
|
this.send({
|
|
type: "permission_response",
|
|
request_id: requestId,
|
|
approved,
|
|
always_allow: alwaysAllow,
|
|
});
|
|
}
|
|
|
|
close() {
|
|
this.shouldReconnect = false;
|
|
this._stopHeartbeat();
|
|
window.clearTimeout(this.reconnectTimer);
|
|
this.reconnectTimer = undefined;
|
|
|
|
if (!this.connected) return;
|
|
this.connected = false;
|
|
ChatWebSocket.refCount = Math.max(0, ChatWebSocket.refCount - 1);
|
|
|
|
if (import.meta.env.DEV) {
|
|
if (this.closeTimer) {
|
|
window.clearTimeout(this.closeTimer);
|
|
}
|
|
this.closeTimer = window.setTimeout(() => {
|
|
if (ChatWebSocket.refCount === 0) {
|
|
ChatWebSocket.sharedSocket?.close();
|
|
ChatWebSocket.sharedSocket = null;
|
|
}
|
|
this.socket = ChatWebSocket.sharedSocket ?? undefined;
|
|
this.closeTimer = undefined;
|
|
}, 250);
|
|
return;
|
|
}
|
|
|
|
if (ChatWebSocket.refCount === 0) {
|
|
ChatWebSocket.sharedSocket?.close();
|
|
ChatWebSocket.sharedSocket = null;
|
|
}
|
|
this.socket = ChatWebSocket.sharedSocket ?? undefined;
|
|
}
|
|
|
|
private send(payload: WsRequest) {
|
|
if (!this.socket || this.socket.readyState !== WebSocket.OPEN) {
|
|
this.onError?.("WebSocket is not connected");
|
|
return;
|
|
}
|
|
this.socket.send(JSON.stringify(payload));
|
|
}
|
|
}
|