story-kit: merge 86_story_show_live_activity_status_instead_of_static_thinking_indicator_in_chat

This commit is contained in:
Dave
2026-02-24 12:32:14 +00:00
parent bedb041bd3
commit 4e4802314f
5 changed files with 203 additions and 5 deletions

View File

@@ -1 +1,2 @@
60.00
65.21

View File

@@ -60,6 +60,7 @@ export type WsResponse =
}
/** `.story_kit/project.toml` was modified; re-fetch the agent roster. */
| { type: "agent_config_changed" };
| { type: "tool_activity"; tool_name: string };
export interface ProviderConfig {
provider: string;

View File

@@ -247,6 +247,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
tool_name: tool_name.to_string(),
});
},
Some(perm_tx),
);
tokio::pin!(chat_fut);

View File

@@ -178,6 +178,7 @@ pub fn set_anthropic_api_key(store: &dyn StoreOps, api_key: String) -> Result<()
set_anthropic_api_key_impl(store, &api_key)
}
#[allow(clippy::too_many_arguments)]
pub async fn chat<F, U, A>(
messages: Vec<Message>,
config: ProviderConfig,
@@ -186,6 +187,11 @@ pub async fn chat<F, U, A>(
mut on_update: F,
mut on_token: U,
mut on_activity: A,
permission_tx: Option<
tokio::sync::mpsc::UnboundedSender<
crate::llm::providers::claude_code::PermissionReqMsg,
>,
>,
) -> Result<ChatResult, String>
where
F: FnMut(&[Message]) + Send,
@@ -242,6 +248,8 @@ where
config.session_id.as_deref(),
&mut cancel_rx,
|token| on_token(token),
|tool_name| on_activity(tool_name),
permission_tx,
)
.await
.map_err(|e| format!("Claude Code Error: {e}"))?;

View File

@@ -36,16 +36,20 @@ impl ClaudeCodeProvider {
Self
}
pub async fn chat_stream<F>(
#[allow(clippy::too_many_arguments)]
pub async fn chat_stream<F, A>(
&self,
user_message: &str,
project_root: &str,
session_id: Option<&str>,
cancel_rx: &mut watch::Receiver<bool>,
mut on_token: F,
mut on_activity: A,
permission_tx: Option<tokio::sync::mpsc::UnboundedSender<PermissionReqMsg>>,
) -> Result<ClaudeCodeResult, String>
where
F: FnMut(&str) + Send,
A: FnMut(&str) + Send,
{
let message = user_message.to_string();
let cwd = project_root.to_string();
@@ -64,6 +68,7 @@ impl ClaudeCodeProvider {
});
let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let (activity_tx, mut activity_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let (msg_tx, msg_rx) = std::sync::mpsc::channel::<Message>();
let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::<String>();
@@ -74,13 +79,22 @@ impl ClaudeCodeProvider {
resume_id.as_deref(),
cancelled,
token_tx,
activity_tx,
msg_tx,
sid_tx,
)
});
while let Some(token) = token_rx.recv().await {
on_token(&token);
loop {
tokio::select! {
msg = token_rx.recv() => match msg {
Some(t) => on_token(&t),
None => break,
},
msg = activity_rx.recv() => if let Some(name) = msg {
on_activity(&name);
},
}
}
pty_handle
@@ -115,6 +129,7 @@ fn run_pty_session(
resume_session_id: Option<&str>,
cancelled: Arc<AtomicBool>,
token_tx: tokio::sync::mpsc::UnboundedSender<String>,
activity_tx: tokio::sync::mpsc::UnboundedSender<String>,
msg_tx: std::sync::mpsc::Sender<Message>,
sid_tx: tokio::sync::oneshot::Sender<String>,
) -> Result<(), String> {
@@ -229,6 +244,98 @@ fn run_pty_session(
&& process_json_event(&json, &token_tx, &msg_tx, &mut sid_tx)
{
got_result = true;
// Capture session_id from any event that has it
if let Some(tx) = sid_tx.take() {
if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) {
let _ = tx.send(sid.to_string());
} else {
// Put it back if this event didn't have a session_id
sid_tx = Some(tx);
}
}
match event_type {
// Streaming deltas — used for real-time text display only
"stream_event" => {
if let Some(event) = json.get("event") {
handle_stream_event(event, &token_tx, &activity_tx);
}
}
// Complete assistant message — extract text and tool_use blocks
"assistant" => {
if let Some(message) = json.get("message")
&& let Some(content) =
message.get("content").and_then(|c| c.as_array())
{
parse_assistant_message(content, &msg_tx);
}
}
// User message containing tool results from Claude Code's own execution
"user" => {
if let Some(message) = json.get("message")
&& let Some(content) =
message.get("content").and_then(|c| c.as_array())
{
parse_tool_results(content, &msg_tx);
}
}
// Final result with usage stats
"result" => {
got_result = true;
}
// System init — suppress noisy model/apiKey notification
"system" => {}
// Rate limit info — suppress noisy notification
"rate_limit_event" => {}
// Claude Code is requesting user approval before executing a tool.
// Forward the request to the async context via permission_tx and
// block until the user responds (or a 5-minute timeout elapses).
"permission_request" => {
let request_id = json
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let tool_name = json
.get("tool_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let tool_input = json
.get("input")
.cloned()
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
if let Some(ref ptx) = permission_tx {
let (resp_tx, resp_rx) = std::sync::mpsc::sync_channel(1);
let _ = ptx.send(PermissionReqMsg {
request_id: request_id.clone(),
tool_name,
tool_input,
response_tx: resp_tx,
});
// Block until the user responds or a 5-minute timeout elapses.
let approved = resp_rx
.recv_timeout(std::time::Duration::from_secs(300))
.unwrap_or(false);
let response = serde_json::json!({
"type": "permission_response",
"id": request_id,
"approved": approved,
});
let _ = writeln!(pty_writer, "{}", response);
} else {
// No handler configured — deny by default.
let response = serde_json::json!({
"type": "permission_response",
"id": request_id,
"approved": false,
});
let _ = writeln!(pty_writer, "{}", response);
}
}
_ => {}
}
}
// Ignore non-JSON lines (terminal escape sequences)
@@ -245,6 +352,36 @@ fn run_pty_session(
let trimmed = line.trim();
if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed) {
process_json_event(&json, &token_tx, &msg_tx, &mut sid_tx);
if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed)
&& let Some(event_type) =
json.get("type").and_then(|t| t.as_str())
{
match event_type {
"stream_event" => {
if let Some(event) = json.get("event") {
handle_stream_event(event, &token_tx, &activity_tx);
}
}
"assistant" => {
if let Some(message) = json.get("message")
&& let Some(content) = message
.get("content")
.and_then(|c| c.as_array())
{
parse_assistant_message(content, &msg_tx);
}
}
"user" => {
if let Some(message) = json.get("message")
&& let Some(content) = message
.get("content")
.and_then(|c| c.as_array())
{
parse_tool_results(content, &msg_tx);
}
}
_ => {}
}
}
}
break;
@@ -442,10 +579,19 @@ fn parse_tool_results(
fn handle_stream_event(
event: &serde_json::Value,
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
activity_tx: &tokio::sync::mpsc::UnboundedSender<String>,
) {
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
match event_type {
"content_block_start" => {
if let Some(cb) = event.get("content_block")
&& cb.get("type").and_then(|t| t.as_str()) == Some("tool_use")
&& let Some(name) = cb.get("name").and_then(|n| n.as_str())
{
let _ = activity_tx.send(name.to_string());
}
}
// Text content streaming — only text_delta, not input_json_delta (tool args)
"content_block_delta" => {
if let Some(delta) = event.get("delta") {
@@ -615,11 +761,12 @@ mod tests {
#[test]
fn handle_stream_event_text_delta_sends_token() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
let event = json!({
"type": "content_block_delta",
"delta": {"type": "text_delta", "text": "hello "}
});
handle_stream_event(&event, &tx);
handle_stream_event(&event, &tx, &atx);
drop(tx);
let tokens: Vec<_> = {
let mut v = vec![];
@@ -635,11 +782,12 @@ mod tests {
fn handle_stream_event_input_json_delta_not_sent() {
// Tool argument JSON deltas should NOT be sent as text tokens
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
let event = json!({
"type": "content_block_delta",
"delta": {"type": "input_json_delta", "partial_json": "{\"path\":"}
});
handle_stream_event(&event, &tx);
handle_stream_event(&event, &tx, &atx);
drop(tx);
let tokens: Vec<String> = {
let mut v = vec![];
@@ -943,5 +1091,44 @@ mod tests {
#[test]
fn claude_code_provider_new() {
let _provider = ClaudeCodeProvider::new();
fn handle_stream_event_tool_use_start_sends_activity() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
let event = json!({
"type": "content_block_start",
"index": 1,
"content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}}
});
handle_stream_event(&event, &tx, &atx);
drop(atx);
let activities: Vec<String> = {
let mut v = vec![];
while let Ok(a) = arx.try_recv() {
v.push(a);
}
v
};
assert_eq!(activities, vec!["Read"]);
}
#[test]
fn handle_stream_event_text_block_start_no_activity() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
let event = json!({
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""}
});
handle_stream_event(&event, &tx, &atx);
drop(atx);
let activities: Vec<String> = {
let mut v = vec![];
while let Ok(a) = arx.try_recv() {
v.push(a);
}
v
};
assert!(activities.is_empty());
}
}