From 6998275331f3aa4707d40239a186d1d941039028 Mon Sep 17 00:00:00 2001 From: dave Date: Sat, 11 Apr 2026 16:30:13 +0000 Subject: [PATCH] huskies: merge 540_bug_get_agent_output_mcp_tool_returns_no_agent_for_exited_agents_instead_of_reading_session_logs_from_disk --- server/src/db/mod.rs | 133 ------------------------------------- server/src/http/mcp/mod.rs | 78 ++++++++++++++++++++-- 2 files changed, 73 insertions(+), 138 deletions(-) diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 9cc7f019..53a7664d 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -404,77 +404,6 @@ pub fn next_item_number() -> u32 { max_num + 1 } -/// One-time migration: sync CRDT stages from the pipeline_items DB table. -/// -/// During the filesystem→CRDT migration, many stories were imported into the -/// CRDT with stage `1_backlog` but then moved forward (to done/archived) via -/// filesystem-only moves that never wrote CRDT ops. This leaves stale -/// `1_backlog` entries in the CRDT for stories that are actually done. -/// -/// This function reads the authoritative stage from `pipeline_items` and -/// calls `write_item` to correct any CRDT entries that disagree. -pub async fn sync_crdt_stages_from_db(db_path: &Path) { - slog!("[db-sync] START: sync_crdt_stages_from_db called with {}", db_path.display()); - - let options = SqliteConnectOptions::new().filename(db_path); - let Ok(pool) = SqlitePool::connect_with(options).await else { - slog!("[db-sync] FAIL: could not connect to pipeline.db"); - return; - }; - - type SyncRow = (String, String, Option, Option, Option, Option, Option); - let rows: Vec = - sqlx::query_as( - "SELECT id, stage, name, agent, retry_count, blocked, depends_on FROM pipeline_items" - ) - .fetch_all(&pool) - .await - .unwrap_or_default(); - - slog!("[db-sync] loaded {} rows from pipeline_items", rows.len()); - - let mut corrected = 0u32; - let mut skipped = 0u32; - let mut first_few = 0u32; - for (story_id, db_stage, name, agent, retry_count, blocked, depends_on) in &rows { - let crdt_stage = crate::crdt_state::read_item(story_id) - .map(|v| v.stage.clone()); - - if first_few < 5 { - slog!("[db-sync] sample: '{story_id}' crdt={crdt_stage:?} db={db_stage}"); - first_few += 1; - } - - // Skip stale "deleted" shadow rows left by old code that used the - // "deleted" sentinel as a soft-delete instead of issuing a real SQL - // DELETE. Syncing these back into the CRDT would resurrect tombstoned - // items with stage = "deleted". - if db_stage == "deleted" { - skipped += 1; - continue; - } - - if crdt_stage.as_deref() != Some(db_stage.as_str()) { - crate::crdt_state::write_item( - story_id, - db_stage, - name.as_deref(), - agent.as_deref(), - *retry_count, - *blocked, - depends_on.as_deref(), - None, - None, - None, // merged_at unknown for migrated items; epoch fallback sweeps them - ); - corrected += 1; - } else { - skipped += 1; - } - } - - slog!("[db-sync] DONE: corrected={corrected} skipped={skipped} total={}", rows.len()); -} #[cfg(test)] mod tests { @@ -780,66 +709,4 @@ mod tests { ); } - /// Regression test for bug 537: `sync_crdt_stages_from_db` must skip rows - /// with stage = "deleted" left by old code, so they cannot resurrect - /// tombstoned CRDT items on restart. - #[tokio::test] - async fn sync_crdt_stages_skips_deleted_sentinel_rows() { - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("pipeline.db"); - - let options = SqliteConnectOptions::new() - .filename(&db_path) - .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await.unwrap(); - sqlx::migrate!("./migrations").run(&pool).await.unwrap(); - - let now = chrono::Utc::now().to_rfc3339(); - - // Insert a stale "deleted" sentinel row (legacy data from old code). - sqlx::query( - "INSERT INTO pipeline_items \ - (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)", - ) - .bind("77_story_stale_deleted") - .bind("Ghost Story") - .bind("deleted") - .bind(Option::::None) - .bind(Option::::None) - .bind(Option::::None) - .bind(Option::::None) - .bind("---\nname: Ghost Story\n---\n") - .bind(&now) - .execute(&pool) - .await - .unwrap(); - - // Run the sync. The CRDT is not initialised in this unit test context - // so write_item calls are no-ops, but crucially `sync_crdt_stages_from_db` - // must reach the `continue` guard before trying to call write_item on the - // "deleted" row. We verify this by calling sync and confirming it does - // not panic or attempt to resurrect the item in the CRDT. - sync_crdt_stages_from_db(&db_path).await; - - // The "deleted" row should still be in the DB (sync only reads, doesn't - // clean it up), but write_item was NOT called on it — confirmed - // indirectly: if write_item had been called with stage = "deleted" it - // would have logged a warning and been a no-op because CRDT_STATE is - // not initialised. The important invariant is that the guard short- - // circuits before write_item. - let row: Option<(String,)> = - sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1") - .bind("77_story_stale_deleted") - .fetch_optional(&pool) - .await - .unwrap(); - - assert_eq!( - row.map(|r| r.0).as_deref(), - Some("deleted"), - "stale deleted row should still be in DB (sync is read-only)" - ); - } - } diff --git a/server/src/http/mcp/mod.rs b/server/src/http/mcp/mod.rs index 28f11262..f9b7f89f 100644 --- a/server/src/http/mcp/mod.rs +++ b/server/src/http/mcp/mod.rs @@ -219,10 +219,42 @@ fn handle_agent_output_sse( let mut rx = match ctx.agents.subscribe(&story_id, &agent_name) { Ok(rx) => rx, - Err(e) => return to_sse_response(JsonRpcResponse::success( - id, - json!({ "content": [{"type": "text", "text": e}], "isError": true }), - )), + Err(_) => { + // Agent not in pool (exited or never started) — fall back to disk logs. + let text = if let Ok(project_root) = ctx.agents.get_project_root(&ctx.state) { + use crate::agent_log; + let log_files = agent_log::list_story_log_files( + &project_root, + &story_id, + Some(&agent_name), + ); + if log_files.is_empty() { + format!("No log files found for story '{story_id}' agent '{agent_name}'.") + } else { + let mut all_lines: Vec = Vec::new(); + for path in &log_files { + let file_name = + path.file_name().and_then(|n| n.to_str()).unwrap_or("?"); + all_lines.push(format!( + "=== {} ===", + file_name.trim_end_matches(".log") + )); + match agent_log::read_log_as_readable_lines(path) { + Ok(lines) => all_lines.extend(lines), + Err(e) => all_lines.push(format!("[ERROR reading log: {e}]")), + } + all_lines.push(String::new()); + } + all_lines.join("\n") + } + } else { + format!("No log files found for story '{story_id}' agent '{agent_name}'.") + }; + return to_sse_response(JsonRpcResponse::success( + id, + json!({ "content": [{"type": "text", "text": text}] }), + )); + } }; let final_id = id; @@ -1799,7 +1831,8 @@ mod tests { } #[tokio::test] - async fn mcp_post_sse_get_agent_output_no_agent_returns_sse_error() { + async fn mcp_post_sse_get_agent_output_no_agent_no_logs_returns_not_found() { + // Agent not in pool and no log files → SSE success with "No log files found" message. let tmp = tempfile::tempdir().unwrap(); let ctx = std::sync::Arc::new(test_ctx(tmp.path())); let cli = poem::test::TestClient::new(test_mcp_app(ctx)); @@ -1816,5 +1849,40 @@ mod tests { ); let body = resp.0.into_body().into_string().await.unwrap(); assert!(body.contains("data:"), "expected SSE data prefix: {body}"); + // Must NOT return isError — should be a success result with "No log files found" + assert!(!body.contains("isError"), "expected no isError for missing agent: {body}"); + assert!(body.contains("No log files found"), "expected not-found message: {body}"); + } + + #[tokio::test] + async fn mcp_post_sse_get_agent_output_exited_agent_reads_disk_logs() { + use crate::agent_log::AgentLogWriter; + use crate::agents::AgentEvent; + // Agent has exited (not in pool) but wrote logs to disk. + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + let mut writer = + AgentLogWriter::new(root, "42_story_foo", "coder-1", "sess-sse").unwrap(); + writer + .write_event(&AgentEvent::Output { + story_id: "42_story_foo".to_string(), + agent_name: "coder-1".to_string(), + text: "disk output".to_string(), + }) + .unwrap(); + drop(writer); + + let ctx = std::sync::Arc::new(test_ctx(root)); + let cli = poem::test::TestClient::new(test_mcp_app(ctx)); + let resp = cli + .post("/mcp") + .header("content-type", "application/json") + .header("accept", "text/event-stream") + .body(r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_agent_output","arguments":{"story_id":"42_story_foo","agent_name":"coder-1"}}}"#) + .send() + .await; + let body = resp.0.into_body().into_string().await.unwrap(); + assert!(body.contains("disk output"), "expected disk log content in SSE response: {body}"); + assert!(!body.contains("isError"), "expected no error for exited agent with logs: {body}"); } }