story-kit: merge 174_story_constrain_thinking_traces_in_chat_panel
This commit is contained in:
@@ -69,7 +69,9 @@ export type WsResponse =
|
||||
/** Heartbeat response confirming the connection is alive. */
|
||||
| { type: "pong" }
|
||||
/** Sent on connect when the project still needs onboarding (specs are placeholders). */
|
||||
| { type: "onboarding_status"; needs_onboarding: boolean };
|
||||
| { type: "onboarding_status"; needs_onboarding: boolean }
|
||||
/** Streaming thinking token from an extended-thinking block, separate from regular text. */
|
||||
| { type: "thinking_token"; content: string };
|
||||
|
||||
export interface ProviderConfig {
|
||||
provider: string;
|
||||
@@ -270,6 +272,7 @@ export class ChatWebSocket {
|
||||
private static refCount = 0;
|
||||
private socket?: WebSocket;
|
||||
private onToken?: (content: string) => void;
|
||||
private onThinkingToken?: (content: string) => void;
|
||||
private onUpdate?: (messages: Message[]) => void;
|
||||
private onSessionId?: (sessionId: string) => void;
|
||||
private onError?: (message: string) => void;
|
||||
@@ -339,6 +342,8 @@ export class ChatWebSocket {
|
||||
try {
|
||||
const data = JSON.parse(event.data) as WsResponse;
|
||||
if (data.type === "token") this.onToken?.(data.content);
|
||||
if (data.type === "thinking_token")
|
||||
this.onThinkingToken?.(data.content);
|
||||
if (data.type === "update") this.onUpdate?.(data.messages);
|
||||
if (data.type === "session_id") this.onSessionId?.(data.session_id);
|
||||
if (data.type === "error") this.onError?.(data.message);
|
||||
@@ -401,6 +406,7 @@ export class ChatWebSocket {
|
||||
connect(
|
||||
handlers: {
|
||||
onToken?: (content: string) => void;
|
||||
onThinkingToken?: (content: string) => void;
|
||||
onUpdate?: (messages: Message[]) => void;
|
||||
onSessionId?: (sessionId: string) => void;
|
||||
onError?: (message: string) => void;
|
||||
@@ -423,6 +429,7 @@ export class ChatWebSocket {
|
||||
wsPath = DEFAULT_WS_PATH,
|
||||
) {
|
||||
this.onToken = handlers.onToken;
|
||||
this.onThinkingToken = handlers.onThinkingToken;
|
||||
this.onUpdate = handlers.onUpdate;
|
||||
this.onSessionId = handlers.onSessionId;
|
||||
this.onError = handlers.onError;
|
||||
|
||||
@@ -13,6 +13,56 @@ import { StagePanel } from "./StagePanel";
|
||||
|
||||
const { useCallback, useEffect, useRef, useState } = React;
|
||||
|
||||
/** Fixed-height thinking trace block that auto-scrolls to bottom as text arrives. */
|
||||
function ThinkingBlock({ text }: { text: string }) {
|
||||
const scrollRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
useEffect(() => {
|
||||
const el = scrollRef.current;
|
||||
if (el) {
|
||||
el.scrollTop = el.scrollHeight;
|
||||
}
|
||||
}, [text]);
|
||||
|
||||
return (
|
||||
<div
|
||||
data-testid="chat-thinking-block"
|
||||
ref={scrollRef}
|
||||
style={{
|
||||
maxHeight: "96px",
|
||||
overflowY: "auto",
|
||||
background: "#161b22",
|
||||
border: "1px solid #2d333b",
|
||||
borderRadius: "6px",
|
||||
padding: "6px 10px",
|
||||
fontSize: "0.78em",
|
||||
fontFamily: "monospace",
|
||||
color: "#6e7681",
|
||||
fontStyle: "italic",
|
||||
whiteSpace: "pre-wrap",
|
||||
wordBreak: "break-word",
|
||||
lineHeight: "1.4",
|
||||
marginBottom: "8px",
|
||||
}}
|
||||
>
|
||||
<span
|
||||
style={{
|
||||
display: "block",
|
||||
fontSize: "0.8em",
|
||||
color: "#444c56",
|
||||
marginBottom: "4px",
|
||||
fontStyle: "normal",
|
||||
letterSpacing: "0.04em",
|
||||
textTransform: "uppercase",
|
||||
}}
|
||||
>
|
||||
thinking
|
||||
</span>
|
||||
{text}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
const NARROW_BREAKPOINT = 900;
|
||||
|
||||
function formatToolActivity(toolName: string): string {
|
||||
@@ -64,6 +114,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) {
|
||||
const [availableModels, setAvailableModels] = useState<string[]>([]);
|
||||
const [claudeModels, setClaudeModels] = useState<string[]>([]);
|
||||
const [streamingContent, setStreamingContent] = useState("");
|
||||
const [streamingThinking, setStreamingThinking] = useState("");
|
||||
const [showApiKeyDialog, setShowApiKeyDialog] = useState(false);
|
||||
const [apiKeyInput, setApiKeyInput] = useState("");
|
||||
const [hasAnthropicKey, setHasAnthropicKey] = useState(false);
|
||||
@@ -208,9 +259,13 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) {
|
||||
onToken: (content) => {
|
||||
setStreamingContent((prev: string) => prev + content);
|
||||
},
|
||||
onThinkingToken: (content) => {
|
||||
setStreamingThinking((prev: string) => prev + content);
|
||||
},
|
||||
onUpdate: (history) => {
|
||||
setMessages(history);
|
||||
setStreamingContent("");
|
||||
setStreamingThinking("");
|
||||
const last = history[history.length - 1];
|
||||
if (last?.role === "assistant" && !last.tool_calls) {
|
||||
setLoading(false);
|
||||
@@ -303,7 +358,8 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) {
|
||||
lastScrollTopRef.current = currentScrollTop;
|
||||
};
|
||||
|
||||
const autoScrollKey = messages.length + streamingContent.length;
|
||||
const autoScrollKey =
|
||||
messages.length + streamingContent.length + streamingThinking.length;
|
||||
|
||||
useEffect(() => {
|
||||
if (
|
||||
@@ -351,6 +407,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) {
|
||||
setStreamingContent("");
|
||||
}
|
||||
|
||||
setStreamingThinking("");
|
||||
setLoading(false);
|
||||
setActivityStatus(null);
|
||||
} catch (e) {
|
||||
@@ -395,6 +452,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) {
|
||||
}
|
||||
setLoading(true);
|
||||
setStreamingContent("");
|
||||
setStreamingThinking("");
|
||||
setActivityStatus(null);
|
||||
|
||||
try {
|
||||
@@ -471,6 +529,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) {
|
||||
|
||||
clearMessages();
|
||||
setStreamingContent("");
|
||||
setStreamingThinking("");
|
||||
setLoading(false);
|
||||
setActivityStatus(null);
|
||||
setClaudeSessionId(null);
|
||||
@@ -761,6 +820,9 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) {
|
||||
</div>
|
||||
</div>
|
||||
))}
|
||||
{loading && streamingThinking && (
|
||||
<ThinkingBlock text={streamingThinking} />
|
||||
)}
|
||||
{loading && streamingContent && (
|
||||
<div
|
||||
style={{
|
||||
@@ -817,7 +879,9 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) {
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
{loading && (activityStatus != null || !streamingContent) && (
|
||||
{loading &&
|
||||
(activityStatus != null ||
|
||||
(!streamingContent && !streamingThinking)) && (
|
||||
<div
|
||||
data-testid="activity-indicator"
|
||||
style={{
|
||||
@@ -1075,7 +1139,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) {
|
||||
stateVersion={agentStateVersion}
|
||||
/>
|
||||
|
||||
<StagePanel title="Done" items={pipeline.done} />
|
||||
<StagePanel title="Done" items={pipeline.done ?? []} />
|
||||
<StagePanel title="To Merge" items={pipeline.merge} />
|
||||
<StagePanel title="QA" items={pipeline.qa} />
|
||||
<StagePanel title="Current" items={pipeline.current} />
|
||||
|
||||
@@ -103,6 +103,12 @@ enum WsResponse {
|
||||
/// Heartbeat response to a client `Ping`. Lets the client confirm the
|
||||
/// connection is alive and cancel any stale-connection timeout.
|
||||
Pong,
|
||||
/// Streaming thinking token from an extended-thinking block.
|
||||
/// Sent separately from `Token` so the frontend can render them in
|
||||
/// a constrained, scrollable ThinkingBlock rather than inline.
|
||||
ThinkingToken {
|
||||
content: String,
|
||||
},
|
||||
/// Sent on connect when the project's spec files still contain scaffold
|
||||
/// placeholder content and the user needs to go through onboarding.
|
||||
OnboardingStatus {
|
||||
@@ -257,6 +263,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
|
||||
Ok(WsRequest::Chat { messages, config }) => {
|
||||
let tx_updates = tx.clone();
|
||||
let tx_tokens = tx.clone();
|
||||
let tx_thinking = tx.clone();
|
||||
let tx_activity = tx.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
|
||||
@@ -277,6 +284,11 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
|
||||
content: token.to_string(),
|
||||
});
|
||||
},
|
||||
move |thinking: &str| {
|
||||
let _ = tx_thinking.send(WsResponse::ThinkingToken {
|
||||
content: thinking.to_string(),
|
||||
});
|
||||
},
|
||||
move |tool_name: &str| {
|
||||
let _ = tx_activity.send(WsResponse::ToolActivity {
|
||||
tool_name: tool_name.to_string(),
|
||||
@@ -595,6 +607,16 @@ mod tests {
|
||||
assert_eq!(json["type"], "pong");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_thinking_token_response() {
|
||||
let resp = WsResponse::ThinkingToken {
|
||||
content: "I need to think about this...".to_string(),
|
||||
};
|
||||
let json = serde_json::to_value(&resp).unwrap();
|
||||
assert_eq!(json["type"], "thinking_token");
|
||||
assert_eq!(json["content"], "I need to think about this...");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_onboarding_status_true() {
|
||||
let resp = WsResponse::OnboardingStatus {
|
||||
|
||||
@@ -180,18 +180,20 @@ pub fn set_anthropic_api_key(store: &dyn StoreOps, api_key: String) -> Result<()
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn chat<F, U, A>(
|
||||
pub async fn chat<F, U, T, A>(
|
||||
messages: Vec<Message>,
|
||||
config: ProviderConfig,
|
||||
state: &SessionState,
|
||||
store: &dyn StoreOps,
|
||||
mut on_update: F,
|
||||
mut on_token: U,
|
||||
mut on_thinking: T,
|
||||
mut on_activity: A,
|
||||
) -> Result<ChatResult, String>
|
||||
where
|
||||
F: FnMut(&[Message]) + Send,
|
||||
U: FnMut(&str) + Send,
|
||||
T: FnMut(&str) + Send,
|
||||
A: FnMut(&str) + Send,
|
||||
{
|
||||
use crate::llm::providers::anthropic::AnthropicProvider;
|
||||
@@ -244,6 +246,7 @@ where
|
||||
config.session_id.as_deref(),
|
||||
&mut cancel_rx,
|
||||
|token| on_token(token),
|
||||
|thinking| on_thinking(thinking),
|
||||
|tool_name| on_activity(tool_name),
|
||||
)
|
||||
.await
|
||||
@@ -799,6 +802,7 @@ mod tests {
|
||||
|_| {},
|
||||
|_| {},
|
||||
|_| {},
|
||||
|_| {},
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -840,6 +844,7 @@ mod tests {
|
||||
|_| {},
|
||||
|_| {},
|
||||
|_| {},
|
||||
|_| {},
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -879,6 +884,7 @@ mod tests {
|
||||
|_| {},
|
||||
|_| {},
|
||||
|_| {},
|
||||
|_| {},
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -37,17 +37,19 @@ impl ClaudeCodeProvider {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn chat_stream<F, A>(
|
||||
pub async fn chat_stream<F, T, A>(
|
||||
&self,
|
||||
user_message: &str,
|
||||
project_root: &str,
|
||||
session_id: Option<&str>,
|
||||
cancel_rx: &mut watch::Receiver<bool>,
|
||||
mut on_token: F,
|
||||
mut on_thinking: T,
|
||||
mut on_activity: A,
|
||||
) -> Result<ClaudeCodeResult, String>
|
||||
where
|
||||
F: FnMut(&str) + Send,
|
||||
T: FnMut(&str) + Send,
|
||||
A: FnMut(&str) + Send,
|
||||
{
|
||||
let message = user_message.to_string();
|
||||
@@ -67,6 +69,7 @@ impl ClaudeCodeProvider {
|
||||
});
|
||||
|
||||
let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (thinking_tx, mut thinking_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (activity_tx, mut activity_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (msg_tx, msg_rx) = std::sync::mpsc::channel::<Message>();
|
||||
let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||
@@ -78,6 +81,7 @@ impl ClaudeCodeProvider {
|
||||
resume_id.as_deref(),
|
||||
cancelled,
|
||||
token_tx,
|
||||
thinking_tx,
|
||||
activity_tx,
|
||||
msg_tx,
|
||||
sid_tx,
|
||||
@@ -90,12 +94,20 @@ impl ClaudeCodeProvider {
|
||||
Some(t) => on_token(&t),
|
||||
None => break,
|
||||
},
|
||||
msg = thinking_rx.recv() => if let Some(t) = msg {
|
||||
on_thinking(&t);
|
||||
},
|
||||
msg = activity_rx.recv() => if let Some(name) = msg {
|
||||
on_activity(&name);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Drain any remaining activity/thinking messages that were buffered when
|
||||
// the token channel closed.
|
||||
while let Ok(t) = thinking_rx.try_recv() {
|
||||
on_thinking(&t);
|
||||
}
|
||||
// Drain any remaining activity messages that were buffered when the
|
||||
// token channel closed. The select! loop breaks on token_rx → None,
|
||||
// but activity_rx may still hold signals sent in the same instant.
|
||||
@@ -136,6 +148,7 @@ fn run_pty_session(
|
||||
resume_session_id: Option<&str>,
|
||||
cancelled: Arc<AtomicBool>,
|
||||
token_tx: tokio::sync::mpsc::UnboundedSender<String>,
|
||||
thinking_tx: tokio::sync::mpsc::UnboundedSender<String>,
|
||||
activity_tx: tokio::sync::mpsc::UnboundedSender<String>,
|
||||
msg_tx: std::sync::mpsc::Sender<Message>,
|
||||
sid_tx: tokio::sync::oneshot::Sender<String>,
|
||||
@@ -254,7 +267,7 @@ fn run_pty_session(
|
||||
|
||||
// Try to parse as JSON
|
||||
if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed)
|
||||
&& process_json_event(&json, &token_tx, &activity_tx, &msg_tx, &mut sid_tx)
|
||||
&& process_json_event(&json, &token_tx, &thinking_tx, &activity_tx, &msg_tx, &mut sid_tx)
|
||||
{
|
||||
got_result = true;
|
||||
}
|
||||
@@ -276,6 +289,7 @@ fn run_pty_session(
|
||||
process_json_event(
|
||||
&json,
|
||||
&token_tx,
|
||||
&thinking_tx,
|
||||
&activity_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
@@ -319,6 +333,7 @@ fn run_pty_session(
|
||||
fn process_json_event(
|
||||
json: &serde_json::Value,
|
||||
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
thinking_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
activity_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
msg_tx: &std::sync::mpsc::Sender<Message>,
|
||||
sid_tx: &mut Option<tokio::sync::oneshot::Sender<String>>,
|
||||
@@ -340,7 +355,7 @@ fn process_json_event(
|
||||
match event_type {
|
||||
"stream_event" => {
|
||||
if let Some(event) = json.get("event") {
|
||||
handle_stream_event(event, token_tx, activity_tx);
|
||||
handle_stream_event(event, token_tx, thinking_tx, activity_tx);
|
||||
}
|
||||
false
|
||||
}
|
||||
@@ -489,6 +504,7 @@ fn parse_tool_results(
|
||||
fn handle_stream_event(
|
||||
event: &serde_json::Value,
|
||||
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
thinking_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
activity_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
) {
|
||||
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
||||
@@ -516,7 +532,7 @@ fn handle_stream_event(
|
||||
if let Some(thinking) =
|
||||
delta.get("thinking").and_then(|t| t.as_str())
|
||||
{
|
||||
let _ = token_tx.send(format!("[thinking] {thinking}"));
|
||||
let _ = thinking_tx.send(thinking.to_string());
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
@@ -671,12 +687,13 @@ mod tests {
|
||||
#[test]
|
||||
fn handle_stream_event_text_delta_sends_token() {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "content_block_delta",
|
||||
"delta": {"type": "text_delta", "text": "hello "}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &atx);
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(tx);
|
||||
let tokens: Vec<_> = {
|
||||
let mut v = vec![];
|
||||
@@ -692,12 +709,13 @@ mod tests {
|
||||
fn handle_stream_event_input_json_delta_not_sent() {
|
||||
// Tool argument JSON deltas should NOT be sent as text tokens
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "content_block_delta",
|
||||
"delta": {"type": "input_json_delta", "partial_json": "{\"path\":"}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &atx);
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(tx);
|
||||
let tokens: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
@@ -710,15 +728,18 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_stream_event_thinking_delta_sends_prefixed_token() {
|
||||
fn handle_stream_event_thinking_delta_routes_to_thinking_channel() {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ttx, mut trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "content_block_delta",
|
||||
"delta": {"type": "thinking_delta", "thinking": "I should check the file"}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &atx);
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(tx);
|
||||
drop(ttx);
|
||||
// thinking token must NOT appear in the regular token channel
|
||||
let tokens: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(t) = rx.try_recv() {
|
||||
@@ -726,18 +747,28 @@ mod tests {
|
||||
}
|
||||
v
|
||||
};
|
||||
assert_eq!(tokens, vec!["[thinking] I should check the file"]);
|
||||
assert!(tokens.is_empty(), "thinking token leaked into token channel");
|
||||
// thinking token must appear in the dedicated thinking channel, without prefix
|
||||
let thinking: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(t) = trx.try_recv() {
|
||||
v.push(t);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert_eq!(thinking, vec!["I should check the file"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_stream_event_error_sends_error_token() {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "error",
|
||||
"error": {"type": "overloaded_error", "message": "Overloaded"}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &atx);
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(tx);
|
||||
let tokens: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
@@ -752,9 +783,10 @@ mod tests {
|
||||
#[test]
|
||||
fn handle_stream_event_unknown_type_is_noop() {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({"type": "ping"});
|
||||
handle_stream_event(&event, &tx, &atx);
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(tx);
|
||||
let tokens: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
@@ -846,65 +878,68 @@ mod tests {
|
||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||
tokio::sync::mpsc::UnboundedSender<String>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||
tokio::sync::mpsc::UnboundedSender<String>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||
std::sync::mpsc::Sender<Message>,
|
||||
std::sync::mpsc::Receiver<Message>,
|
||||
);
|
||||
|
||||
fn make_channels() -> Channels {
|
||||
let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (msg_tx, msg_rx) = std::sync::mpsc::channel();
|
||||
(tok_tx, tok_rx, act_tx, act_rx, msg_tx, msg_rx)
|
||||
(tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_result_returns_true() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||
let mut sid_tx_opt = Some(sid_tx);
|
||||
let json = json!({"type": "result", "subtype": "success"});
|
||||
assert!(process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt));
|
||||
assert!(process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx_opt));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_system_returns_false() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_rate_limit_returns_false() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({"type": "rate_limit_event"});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_unknown_type_returns_false() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({"type": "some_future_event"});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_no_type_returns_false() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({"content": "no type field"});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_captures_session_id() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||
let mut sid_tx_opt = Some(sid_tx);
|
||||
let json = json!({"type": "system", "session_id": "sess-abc-123"});
|
||||
process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt);
|
||||
process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx_opt);
|
||||
// sid_tx should have been consumed
|
||||
assert!(sid_tx_opt.is_none());
|
||||
let received = sid_rx.try_recv().unwrap();
|
||||
@@ -913,18 +948,18 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn process_json_event_preserves_sid_tx_if_no_session_id() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||
let mut sid_tx_opt = Some(sid_tx);
|
||||
let json = json!({"type": "system"});
|
||||
process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt);
|
||||
process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx_opt);
|
||||
// sid_tx should still be present since no session_id in event
|
||||
assert!(sid_tx_opt.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_stream_event_forwards_token() {
|
||||
let (tok_tx, mut tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, mut tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "stream_event",
|
||||
@@ -934,7 +969,7 @@ mod tests {
|
||||
"delta": {"type": "text_delta", "text": "word"}
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
drop(tok_tx);
|
||||
let tokens: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
@@ -950,7 +985,7 @@ mod tests {
|
||||
fn process_json_event_stream_event_tool_use_fires_activity() {
|
||||
// This is the primary activity path: stream_event wrapping content_block_start
|
||||
// with a tool_use block. Requires --include-partial-messages to be enabled.
|
||||
let (tok_tx, _tok_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "stream_event",
|
||||
@@ -961,7 +996,7 @@ mod tests {
|
||||
"content_block": {"type": "tool_use", "id": "toolu_abc", "name": "Bash", "input": {}}
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
drop(act_tx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
@@ -975,7 +1010,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn process_json_event_assistant_with_tool_use_fires_activity() {
|
||||
let (tok_tx, _tok_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
@@ -986,7 +1021,7 @@ mod tests {
|
||||
]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
drop(act_tx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
@@ -1000,7 +1035,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn process_json_event_assistant_with_multiple_tool_uses_fires_all_activities() {
|
||||
let (tok_tx, _tok_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
@@ -1011,7 +1046,7 @@ mod tests {
|
||||
]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
drop(act_tx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
@@ -1025,7 +1060,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn process_json_event_assistant_text_only_no_activity() {
|
||||
let (tok_tx, _tok_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
@@ -1033,7 +1068,7 @@ mod tests {
|
||||
"content": [{"type": "text", "text": "Just text, no tools."}]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
drop(act_tx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
@@ -1047,7 +1082,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn process_json_event_assistant_event_parses_message() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
@@ -1055,7 +1090,7 @@ mod tests {
|
||||
"content": [{"type": "text", "text": "Hi!"}]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
drop(msg_tx);
|
||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||
assert_eq!(msgs.len(), 1);
|
||||
@@ -1064,7 +1099,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn process_json_event_user_event_parses_tool_results() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "user",
|
||||
@@ -1072,7 +1107,7 @@ mod tests {
|
||||
"content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
drop(msg_tx);
|
||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||
assert_eq!(msgs.len(), 1);
|
||||
@@ -1082,13 +1117,13 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn process_json_event_assistant_without_content_array_is_noop() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
"message": {"content": "not an array"}
|
||||
});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
drop(msg_tx);
|
||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||
assert!(msgs.is_empty());
|
||||
@@ -1096,10 +1131,10 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn process_json_event_user_without_content_array_is_noop() {
|
||||
let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({"type": "user", "message": {"content": null}});
|
||||
assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx));
|
||||
drop(msg_tx);
|
||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||
assert!(msgs.is_empty());
|
||||
@@ -1113,13 +1148,14 @@ mod tests {
|
||||
#[test]
|
||||
fn handle_stream_event_tool_use_start_sends_activity() {
|
||||
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "content_block_start",
|
||||
"index": 1,
|
||||
"content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &atx);
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(atx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
@@ -1134,13 +1170,14 @@ mod tests {
|
||||
#[test]
|
||||
fn handle_stream_event_text_block_start_no_activity() {
|
||||
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "content_block_start",
|
||||
"index": 0,
|
||||
"content_block": {"type": "text", "text": ""}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &atx);
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(atx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
|
||||
Reference in New Issue
Block a user