huskies: merge 540_bug_get_agent_output_mcp_tool_returns_no_agent_for_exited_agents_instead_of_reading_session_logs_from_disk
This commit is contained in:
@@ -404,77 +404,6 @@ pub fn next_item_number() -> u32 {
|
|||||||
max_num + 1
|
max_num + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
/// One-time migration: sync CRDT stages from the pipeline_items DB table.
|
|
||||||
///
|
|
||||||
/// During the filesystem→CRDT migration, many stories were imported into the
|
|
||||||
/// CRDT with stage `1_backlog` but then moved forward (to done/archived) via
|
|
||||||
/// filesystem-only moves that never wrote CRDT ops. This leaves stale
|
|
||||||
/// `1_backlog` entries in the CRDT for stories that are actually done.
|
|
||||||
///
|
|
||||||
/// This function reads the authoritative stage from `pipeline_items` and
|
|
||||||
/// calls `write_item` to correct any CRDT entries that disagree.
|
|
||||||
pub async fn sync_crdt_stages_from_db(db_path: &Path) {
|
|
||||||
slog!("[db-sync] START: sync_crdt_stages_from_db called with {}", db_path.display());
|
|
||||||
|
|
||||||
let options = SqliteConnectOptions::new().filename(db_path);
|
|
||||||
let Ok(pool) = SqlitePool::connect_with(options).await else {
|
|
||||||
slog!("[db-sync] FAIL: could not connect to pipeline.db");
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
type SyncRow = (String, String, Option<String>, Option<String>, Option<i64>, Option<bool>, Option<String>);
|
|
||||||
let rows: Vec<SyncRow> =
|
|
||||||
sqlx::query_as(
|
|
||||||
"SELECT id, stage, name, agent, retry_count, blocked, depends_on FROM pipeline_items"
|
|
||||||
)
|
|
||||||
.fetch_all(&pool)
|
|
||||||
.await
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
slog!("[db-sync] loaded {} rows from pipeline_items", rows.len());
|
|
||||||
|
|
||||||
let mut corrected = 0u32;
|
|
||||||
let mut skipped = 0u32;
|
|
||||||
let mut first_few = 0u32;
|
|
||||||
for (story_id, db_stage, name, agent, retry_count, blocked, depends_on) in &rows {
|
|
||||||
let crdt_stage = crate::crdt_state::read_item(story_id)
|
|
||||||
.map(|v| v.stage.clone());
|
|
||||||
|
|
||||||
if first_few < 5 {
|
|
||||||
slog!("[db-sync] sample: '{story_id}' crdt={crdt_stage:?} db={db_stage}");
|
|
||||||
first_few += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip stale "deleted" shadow rows left by old code that used the
|
|
||||||
// "deleted" sentinel as a soft-delete instead of issuing a real SQL
|
|
||||||
// DELETE. Syncing these back into the CRDT would resurrect tombstoned
|
|
||||||
// items with stage = "deleted".
|
|
||||||
if db_stage == "deleted" {
|
|
||||||
skipped += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if crdt_stage.as_deref() != Some(db_stage.as_str()) {
|
|
||||||
crate::crdt_state::write_item(
|
|
||||||
story_id,
|
|
||||||
db_stage,
|
|
||||||
name.as_deref(),
|
|
||||||
agent.as_deref(),
|
|
||||||
*retry_count,
|
|
||||||
*blocked,
|
|
||||||
depends_on.as_deref(),
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
None, // merged_at unknown for migrated items; epoch fallback sweeps them
|
|
||||||
);
|
|
||||||
corrected += 1;
|
|
||||||
} else {
|
|
||||||
skipped += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
slog!("[db-sync] DONE: corrected={corrected} skipped={skipped} total={}", rows.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
@@ -780,66 +709,4 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Regression test for bug 537: `sync_crdt_stages_from_db` must skip rows
|
|
||||||
/// with stage = "deleted" left by old code, so they cannot resurrect
|
|
||||||
/// tombstoned CRDT items on restart.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn sync_crdt_stages_skips_deleted_sentinel_rows() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let db_path = tmp.path().join("pipeline.db");
|
|
||||||
|
|
||||||
let options = SqliteConnectOptions::new()
|
|
||||||
.filename(&db_path)
|
|
||||||
.create_if_missing(true);
|
|
||||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
|
||||||
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
|
||||||
|
|
||||||
let now = chrono::Utc::now().to_rfc3339();
|
|
||||||
|
|
||||||
// Insert a stale "deleted" sentinel row (legacy data from old code).
|
|
||||||
sqlx::query(
|
|
||||||
"INSERT INTO pipeline_items \
|
|
||||||
(id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \
|
|
||||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)",
|
|
||||||
)
|
|
||||||
.bind("77_story_stale_deleted")
|
|
||||||
.bind("Ghost Story")
|
|
||||||
.bind("deleted")
|
|
||||||
.bind(Option::<String>::None)
|
|
||||||
.bind(Option::<i64>::None)
|
|
||||||
.bind(Option::<i64>::None)
|
|
||||||
.bind(Option::<String>::None)
|
|
||||||
.bind("---\nname: Ghost Story\n---\n")
|
|
||||||
.bind(&now)
|
|
||||||
.execute(&pool)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Run the sync. The CRDT is not initialised in this unit test context
|
|
||||||
// so write_item calls are no-ops, but crucially `sync_crdt_stages_from_db`
|
|
||||||
// must reach the `continue` guard before trying to call write_item on the
|
|
||||||
// "deleted" row. We verify this by calling sync and confirming it does
|
|
||||||
// not panic or attempt to resurrect the item in the CRDT.
|
|
||||||
sync_crdt_stages_from_db(&db_path).await;
|
|
||||||
|
|
||||||
// The "deleted" row should still be in the DB (sync only reads, doesn't
|
|
||||||
// clean it up), but write_item was NOT called on it — confirmed
|
|
||||||
// indirectly: if write_item had been called with stage = "deleted" it
|
|
||||||
// would have logged a warning and been a no-op because CRDT_STATE is
|
|
||||||
// not initialised. The important invariant is that the guard short-
|
|
||||||
// circuits before write_item.
|
|
||||||
let row: Option<(String,)> =
|
|
||||||
sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1")
|
|
||||||
.bind("77_story_stale_deleted")
|
|
||||||
.fetch_optional(&pool)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
row.map(|r| r.0).as_deref(),
|
|
||||||
Some("deleted"),
|
|
||||||
"stale deleted row should still be in DB (sync is read-only)"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -219,10 +219,42 @@ fn handle_agent_output_sse(
|
|||||||
|
|
||||||
let mut rx = match ctx.agents.subscribe(&story_id, &agent_name) {
|
let mut rx = match ctx.agents.subscribe(&story_id, &agent_name) {
|
||||||
Ok(rx) => rx,
|
Ok(rx) => rx,
|
||||||
Err(e) => return to_sse_response(JsonRpcResponse::success(
|
Err(_) => {
|
||||||
id,
|
// Agent not in pool (exited or never started) — fall back to disk logs.
|
||||||
json!({ "content": [{"type": "text", "text": e}], "isError": true }),
|
let text = if let Ok(project_root) = ctx.agents.get_project_root(&ctx.state) {
|
||||||
)),
|
use crate::agent_log;
|
||||||
|
let log_files = agent_log::list_story_log_files(
|
||||||
|
&project_root,
|
||||||
|
&story_id,
|
||||||
|
Some(&agent_name),
|
||||||
|
);
|
||||||
|
if log_files.is_empty() {
|
||||||
|
format!("No log files found for story '{story_id}' agent '{agent_name}'.")
|
||||||
|
} else {
|
||||||
|
let mut all_lines: Vec<String> = Vec::new();
|
||||||
|
for path in &log_files {
|
||||||
|
let file_name =
|
||||||
|
path.file_name().and_then(|n| n.to_str()).unwrap_or("?");
|
||||||
|
all_lines.push(format!(
|
||||||
|
"=== {} ===",
|
||||||
|
file_name.trim_end_matches(".log")
|
||||||
|
));
|
||||||
|
match agent_log::read_log_as_readable_lines(path) {
|
||||||
|
Ok(lines) => all_lines.extend(lines),
|
||||||
|
Err(e) => all_lines.push(format!("[ERROR reading log: {e}]")),
|
||||||
|
}
|
||||||
|
all_lines.push(String::new());
|
||||||
|
}
|
||||||
|
all_lines.join("\n")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
format!("No log files found for story '{story_id}' agent '{agent_name}'.")
|
||||||
|
};
|
||||||
|
return to_sse_response(JsonRpcResponse::success(
|
||||||
|
id,
|
||||||
|
json!({ "content": [{"type": "text", "text": text}] }),
|
||||||
|
));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let final_id = id;
|
let final_id = id;
|
||||||
@@ -1799,7 +1831,8 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn mcp_post_sse_get_agent_output_no_agent_returns_sse_error() {
|
async fn mcp_post_sse_get_agent_output_no_agent_no_logs_returns_not_found() {
|
||||||
|
// Agent not in pool and no log files → SSE success with "No log files found" message.
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
|
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
|
||||||
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
|
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
|
||||||
@@ -1816,5 +1849,40 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let body = resp.0.into_body().into_string().await.unwrap();
|
let body = resp.0.into_body().into_string().await.unwrap();
|
||||||
assert!(body.contains("data:"), "expected SSE data prefix: {body}");
|
assert!(body.contains("data:"), "expected SSE data prefix: {body}");
|
||||||
|
// Must NOT return isError — should be a success result with "No log files found"
|
||||||
|
assert!(!body.contains("isError"), "expected no isError for missing agent: {body}");
|
||||||
|
assert!(body.contains("No log files found"), "expected not-found message: {body}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn mcp_post_sse_get_agent_output_exited_agent_reads_disk_logs() {
|
||||||
|
use crate::agent_log::AgentLogWriter;
|
||||||
|
use crate::agents::AgentEvent;
|
||||||
|
// Agent has exited (not in pool) but wrote logs to disk.
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let root = tmp.path();
|
||||||
|
let mut writer =
|
||||||
|
AgentLogWriter::new(root, "42_story_foo", "coder-1", "sess-sse").unwrap();
|
||||||
|
writer
|
||||||
|
.write_event(&AgentEvent::Output {
|
||||||
|
story_id: "42_story_foo".to_string(),
|
||||||
|
agent_name: "coder-1".to_string(),
|
||||||
|
text: "disk output".to_string(),
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
drop(writer);
|
||||||
|
|
||||||
|
let ctx = std::sync::Arc::new(test_ctx(root));
|
||||||
|
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
|
||||||
|
let resp = cli
|
||||||
|
.post("/mcp")
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.header("accept", "text/event-stream")
|
||||||
|
.body(r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_agent_output","arguments":{"story_id":"42_story_foo","agent_name":"coder-1"}}}"#)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
let body = resp.0.into_body().into_string().await.unwrap();
|
||||||
|
assert!(body.contains("disk output"), "expected disk log content in SSE response: {body}");
|
||||||
|
assert!(!body.contains("isError"), "expected no error for exited agent with logs: {body}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user