story-kit: merge 86_story_show_live_activity_status_instead_of_static_thinking_indicator_in_chat
This commit is contained in:
@@ -60,6 +60,7 @@ export type WsResponse =
|
|||||||
}
|
}
|
||||||
/** `.story_kit/project.toml` was modified; re-fetch the agent roster. */
|
/** `.story_kit/project.toml` was modified; re-fetch the agent roster. */
|
||||||
| { type: "agent_config_changed" };
|
| { type: "agent_config_changed" };
|
||||||
|
| { type: "tool_activity"; tool_name: string };
|
||||||
|
|
||||||
export interface ProviderConfig {
|
export interface ProviderConfig {
|
||||||
provider: string;
|
provider: string;
|
||||||
|
|||||||
@@ -247,6 +247,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
|
|||||||
tool_name: tool_name.to_string(),
|
tool_name: tool_name.to_string(),
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
Some(perm_tx),
|
||||||
);
|
);
|
||||||
tokio::pin!(chat_fut);
|
tokio::pin!(chat_fut);
|
||||||
|
|
||||||
|
|||||||
@@ -178,6 +178,7 @@ pub fn set_anthropic_api_key(store: &dyn StoreOps, api_key: String) -> Result<()
|
|||||||
set_anthropic_api_key_impl(store, &api_key)
|
set_anthropic_api_key_impl(store, &api_key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub async fn chat<F, U, A>(
|
pub async fn chat<F, U, A>(
|
||||||
messages: Vec<Message>,
|
messages: Vec<Message>,
|
||||||
config: ProviderConfig,
|
config: ProviderConfig,
|
||||||
@@ -186,6 +187,11 @@ pub async fn chat<F, U, A>(
|
|||||||
mut on_update: F,
|
mut on_update: F,
|
||||||
mut on_token: U,
|
mut on_token: U,
|
||||||
mut on_activity: A,
|
mut on_activity: A,
|
||||||
|
permission_tx: Option<
|
||||||
|
tokio::sync::mpsc::UnboundedSender<
|
||||||
|
crate::llm::providers::claude_code::PermissionReqMsg,
|
||||||
|
>,
|
||||||
|
>,
|
||||||
) -> Result<ChatResult, String>
|
) -> Result<ChatResult, String>
|
||||||
where
|
where
|
||||||
F: FnMut(&[Message]) + Send,
|
F: FnMut(&[Message]) + Send,
|
||||||
@@ -242,6 +248,8 @@ where
|
|||||||
config.session_id.as_deref(),
|
config.session_id.as_deref(),
|
||||||
&mut cancel_rx,
|
&mut cancel_rx,
|
||||||
|token| on_token(token),
|
|token| on_token(token),
|
||||||
|
|tool_name| on_activity(tool_name),
|
||||||
|
permission_tx,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("Claude Code Error: {e}"))?;
|
.map_err(|e| format!("Claude Code Error: {e}"))?;
|
||||||
|
|||||||
@@ -36,16 +36,20 @@ impl ClaudeCodeProvider {
|
|||||||
Self
|
Self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn chat_stream<F>(
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub async fn chat_stream<F, A>(
|
||||||
&self,
|
&self,
|
||||||
user_message: &str,
|
user_message: &str,
|
||||||
project_root: &str,
|
project_root: &str,
|
||||||
session_id: Option<&str>,
|
session_id: Option<&str>,
|
||||||
cancel_rx: &mut watch::Receiver<bool>,
|
cancel_rx: &mut watch::Receiver<bool>,
|
||||||
mut on_token: F,
|
mut on_token: F,
|
||||||
|
mut on_activity: A,
|
||||||
|
permission_tx: Option<tokio::sync::mpsc::UnboundedSender<PermissionReqMsg>>,
|
||||||
) -> Result<ClaudeCodeResult, String>
|
) -> Result<ClaudeCodeResult, String>
|
||||||
where
|
where
|
||||||
F: FnMut(&str) + Send,
|
F: FnMut(&str) + Send,
|
||||||
|
A: FnMut(&str) + Send,
|
||||||
{
|
{
|
||||||
let message = user_message.to_string();
|
let message = user_message.to_string();
|
||||||
let cwd = project_root.to_string();
|
let cwd = project_root.to_string();
|
||||||
@@ -64,6 +68,7 @@ impl ClaudeCodeProvider {
|
|||||||
});
|
});
|
||||||
|
|
||||||
let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (activity_tx, mut activity_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
let (msg_tx, msg_rx) = std::sync::mpsc::channel::<Message>();
|
let (msg_tx, msg_rx) = std::sync::mpsc::channel::<Message>();
|
||||||
let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::<String>();
|
let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||||
|
|
||||||
@@ -74,13 +79,22 @@ impl ClaudeCodeProvider {
|
|||||||
resume_id.as_deref(),
|
resume_id.as_deref(),
|
||||||
cancelled,
|
cancelled,
|
||||||
token_tx,
|
token_tx,
|
||||||
|
activity_tx,
|
||||||
msg_tx,
|
msg_tx,
|
||||||
sid_tx,
|
sid_tx,
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
while let Some(token) = token_rx.recv().await {
|
loop {
|
||||||
on_token(&token);
|
tokio::select! {
|
||||||
|
msg = token_rx.recv() => match msg {
|
||||||
|
Some(t) => on_token(&t),
|
||||||
|
None => break,
|
||||||
|
},
|
||||||
|
msg = activity_rx.recv() => if let Some(name) = msg {
|
||||||
|
on_activity(&name);
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pty_handle
|
pty_handle
|
||||||
@@ -115,6 +129,7 @@ fn run_pty_session(
|
|||||||
resume_session_id: Option<&str>,
|
resume_session_id: Option<&str>,
|
||||||
cancelled: Arc<AtomicBool>,
|
cancelled: Arc<AtomicBool>,
|
||||||
token_tx: tokio::sync::mpsc::UnboundedSender<String>,
|
token_tx: tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
activity_tx: tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
msg_tx: std::sync::mpsc::Sender<Message>,
|
msg_tx: std::sync::mpsc::Sender<Message>,
|
||||||
sid_tx: tokio::sync::oneshot::Sender<String>,
|
sid_tx: tokio::sync::oneshot::Sender<String>,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
@@ -229,6 +244,98 @@ fn run_pty_session(
|
|||||||
&& process_json_event(&json, &token_tx, &msg_tx, &mut sid_tx)
|
&& process_json_event(&json, &token_tx, &msg_tx, &mut sid_tx)
|
||||||
{
|
{
|
||||||
got_result = true;
|
got_result = true;
|
||||||
|
// Capture session_id from any event that has it
|
||||||
|
if let Some(tx) = sid_tx.take() {
|
||||||
|
if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) {
|
||||||
|
let _ = tx.send(sid.to_string());
|
||||||
|
} else {
|
||||||
|
// Put it back if this event didn't have a session_id
|
||||||
|
sid_tx = Some(tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match event_type {
|
||||||
|
// Streaming deltas — used for real-time text display only
|
||||||
|
"stream_event" => {
|
||||||
|
if let Some(event) = json.get("event") {
|
||||||
|
handle_stream_event(event, &token_tx, &activity_tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Complete assistant message — extract text and tool_use blocks
|
||||||
|
"assistant" => {
|
||||||
|
if let Some(message) = json.get("message")
|
||||||
|
&& let Some(content) =
|
||||||
|
message.get("content").and_then(|c| c.as_array())
|
||||||
|
{
|
||||||
|
parse_assistant_message(content, &msg_tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// User message containing tool results from Claude Code's own execution
|
||||||
|
"user" => {
|
||||||
|
if let Some(message) = json.get("message")
|
||||||
|
&& let Some(content) =
|
||||||
|
message.get("content").and_then(|c| c.as_array())
|
||||||
|
{
|
||||||
|
parse_tool_results(content, &msg_tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Final result with usage stats
|
||||||
|
"result" => {
|
||||||
|
got_result = true;
|
||||||
|
}
|
||||||
|
// System init — suppress noisy model/apiKey notification
|
||||||
|
"system" => {}
|
||||||
|
// Rate limit info — suppress noisy notification
|
||||||
|
"rate_limit_event" => {}
|
||||||
|
// Claude Code is requesting user approval before executing a tool.
|
||||||
|
// Forward the request to the async context via permission_tx and
|
||||||
|
// block until the user responds (or a 5-minute timeout elapses).
|
||||||
|
"permission_request" => {
|
||||||
|
let request_id = json
|
||||||
|
.get("id")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.unwrap_or("")
|
||||||
|
.to_string();
|
||||||
|
let tool_name = json
|
||||||
|
.get("tool_name")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.unwrap_or("unknown")
|
||||||
|
.to_string();
|
||||||
|
let tool_input = json
|
||||||
|
.get("input")
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
|
||||||
|
|
||||||
|
if let Some(ref ptx) = permission_tx {
|
||||||
|
let (resp_tx, resp_rx) = std::sync::mpsc::sync_channel(1);
|
||||||
|
let _ = ptx.send(PermissionReqMsg {
|
||||||
|
request_id: request_id.clone(),
|
||||||
|
tool_name,
|
||||||
|
tool_input,
|
||||||
|
response_tx: resp_tx,
|
||||||
|
});
|
||||||
|
// Block until the user responds or a 5-minute timeout elapses.
|
||||||
|
let approved = resp_rx
|
||||||
|
.recv_timeout(std::time::Duration::from_secs(300))
|
||||||
|
.unwrap_or(false);
|
||||||
|
let response = serde_json::json!({
|
||||||
|
"type": "permission_response",
|
||||||
|
"id": request_id,
|
||||||
|
"approved": approved,
|
||||||
|
});
|
||||||
|
let _ = writeln!(pty_writer, "{}", response);
|
||||||
|
} else {
|
||||||
|
// No handler configured — deny by default.
|
||||||
|
let response = serde_json::json!({
|
||||||
|
"type": "permission_response",
|
||||||
|
"id": request_id,
|
||||||
|
"approved": false,
|
||||||
|
});
|
||||||
|
let _ = writeln!(pty_writer, "{}", response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Ignore non-JSON lines (terminal escape sequences)
|
// Ignore non-JSON lines (terminal escape sequences)
|
||||||
|
|
||||||
@@ -245,6 +352,36 @@ fn run_pty_session(
|
|||||||
let trimmed = line.trim();
|
let trimmed = line.trim();
|
||||||
if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed) {
|
if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed) {
|
||||||
process_json_event(&json, &token_tx, &msg_tx, &mut sid_tx);
|
process_json_event(&json, &token_tx, &msg_tx, &mut sid_tx);
|
||||||
|
if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed)
|
||||||
|
&& let Some(event_type) =
|
||||||
|
json.get("type").and_then(|t| t.as_str())
|
||||||
|
{
|
||||||
|
match event_type {
|
||||||
|
"stream_event" => {
|
||||||
|
if let Some(event) = json.get("event") {
|
||||||
|
handle_stream_event(event, &token_tx, &activity_tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"assistant" => {
|
||||||
|
if let Some(message) = json.get("message")
|
||||||
|
&& let Some(content) = message
|
||||||
|
.get("content")
|
||||||
|
.and_then(|c| c.as_array())
|
||||||
|
{
|
||||||
|
parse_assistant_message(content, &msg_tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"user" => {
|
||||||
|
if let Some(message) = json.get("message")
|
||||||
|
&& let Some(content) = message
|
||||||
|
.get("content")
|
||||||
|
.and_then(|c| c.as_array())
|
||||||
|
{
|
||||||
|
parse_tool_results(content, &msg_tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@@ -442,10 +579,19 @@ fn parse_tool_results(
|
|||||||
fn handle_stream_event(
|
fn handle_stream_event(
|
||||||
event: &serde_json::Value,
|
event: &serde_json::Value,
|
||||||
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
activity_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
) {
|
) {
|
||||||
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
||||||
|
|
||||||
match event_type {
|
match event_type {
|
||||||
|
"content_block_start" => {
|
||||||
|
if let Some(cb) = event.get("content_block")
|
||||||
|
&& cb.get("type").and_then(|t| t.as_str()) == Some("tool_use")
|
||||||
|
&& let Some(name) = cb.get("name").and_then(|n| n.as_str())
|
||||||
|
{
|
||||||
|
let _ = activity_tx.send(name.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
// Text content streaming — only text_delta, not input_json_delta (tool args)
|
// Text content streaming — only text_delta, not input_json_delta (tool args)
|
||||||
"content_block_delta" => {
|
"content_block_delta" => {
|
||||||
if let Some(delta) = event.get("delta") {
|
if let Some(delta) = event.get("delta") {
|
||||||
@@ -615,11 +761,12 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn handle_stream_event_text_delta_sends_token() {
|
fn handle_stream_event_text_delta_sends_token() {
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
let event = json!({
|
let event = json!({
|
||||||
"type": "content_block_delta",
|
"type": "content_block_delta",
|
||||||
"delta": {"type": "text_delta", "text": "hello "}
|
"delta": {"type": "text_delta", "text": "hello "}
|
||||||
});
|
});
|
||||||
handle_stream_event(&event, &tx);
|
handle_stream_event(&event, &tx, &atx);
|
||||||
drop(tx);
|
drop(tx);
|
||||||
let tokens: Vec<_> = {
|
let tokens: Vec<_> = {
|
||||||
let mut v = vec![];
|
let mut v = vec![];
|
||||||
@@ -635,11 +782,12 @@ mod tests {
|
|||||||
fn handle_stream_event_input_json_delta_not_sent() {
|
fn handle_stream_event_input_json_delta_not_sent() {
|
||||||
// Tool argument JSON deltas should NOT be sent as text tokens
|
// Tool argument JSON deltas should NOT be sent as text tokens
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
let event = json!({
|
let event = json!({
|
||||||
"type": "content_block_delta",
|
"type": "content_block_delta",
|
||||||
"delta": {"type": "input_json_delta", "partial_json": "{\"path\":"}
|
"delta": {"type": "input_json_delta", "partial_json": "{\"path\":"}
|
||||||
});
|
});
|
||||||
handle_stream_event(&event, &tx);
|
handle_stream_event(&event, &tx, &atx);
|
||||||
drop(tx);
|
drop(tx);
|
||||||
let tokens: Vec<String> = {
|
let tokens: Vec<String> = {
|
||||||
let mut v = vec![];
|
let mut v = vec![];
|
||||||
@@ -943,5 +1091,44 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn claude_code_provider_new() {
|
fn claude_code_provider_new() {
|
||||||
let _provider = ClaudeCodeProvider::new();
|
let _provider = ClaudeCodeProvider::new();
|
||||||
|
fn handle_stream_event_tool_use_start_sends_activity() {
|
||||||
|
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let event = json!({
|
||||||
|
"type": "content_block_start",
|
||||||
|
"index": 1,
|
||||||
|
"content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}}
|
||||||
|
});
|
||||||
|
handle_stream_event(&event, &tx, &atx);
|
||||||
|
drop(atx);
|
||||||
|
let activities: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(a) = arx.try_recv() {
|
||||||
|
v.push(a);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert_eq!(activities, vec!["Read"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn handle_stream_event_text_block_start_no_activity() {
|
||||||
|
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let event = json!({
|
||||||
|
"type": "content_block_start",
|
||||||
|
"index": 0,
|
||||||
|
"content_block": {"type": "text", "text": ""}
|
||||||
|
});
|
||||||
|
handle_stream_event(&event, &tx, &atx);
|
||||||
|
drop(atx);
|
||||||
|
let activities: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(a) = arx.try_recv() {
|
||||||
|
v.push(a);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert!(activities.is_empty());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user