huskies: merge 489_story_sqlite_shadow_write_for_pipeline_state_via_sqlx
This commit is contained in:
@@ -65,6 +65,10 @@ fn move_item<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
// Shadow-write the new stage to SQLite. This is fire-and-forget; a missing
|
||||
// database (e.g. in tests) is silently ignored.
|
||||
crate::db::shadow_write(story_id, target_dir, &target_path);
|
||||
|
||||
slog!("[lifecycle] Moved '{story_id}' from work/{src_dir}/ to work/{target_dir}/");
|
||||
Ok(Some(src_dir))
|
||||
}
|
||||
|
||||
@@ -0,0 +1,333 @@
|
||||
/// SQLite shadow-write layer for pipeline state.
|
||||
///
|
||||
/// All filesystem pipeline operations (move_story_to_X etc.) remain authoritative.
|
||||
/// This module provides a fire-and-forget channel that dual-writes each move to
|
||||
/// `.huskies/pipeline.db` so a database layer is ready for future CRDT integration.
|
||||
///
|
||||
/// Reads are NOT served from SQLite — the filesystem remains the single source of truth.
|
||||
use crate::io::story_metadata::parse_front_matter;
|
||||
use crate::slog;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use sqlx::SqlitePool;
|
||||
use std::path::Path;
|
||||
use std::sync::OnceLock;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// A pending shadow write for one pipeline item.
|
||||
struct PipelineWriteMsg {
|
||||
story_id: String,
|
||||
stage: String,
|
||||
name: Option<String>,
|
||||
agent: Option<String>,
|
||||
retry_count: Option<i64>,
|
||||
blocked: Option<bool>,
|
||||
depends_on: Option<String>,
|
||||
}
|
||||
|
||||
/// Handle to the background shadow-write task.
|
||||
pub struct PipelineDb {
|
||||
tx: mpsc::UnboundedSender<PipelineWriteMsg>,
|
||||
}
|
||||
|
||||
static PIPELINE_DB: OnceLock<PipelineDb> = OnceLock::new();
|
||||
|
||||
/// Initialise the pipeline database.
|
||||
///
|
||||
/// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations,
|
||||
/// and spawns the background write task. Safe to call only once; subsequent calls
|
||||
/// are no-ops (the `OnceLock` rejects them silently).
|
||||
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
||||
if PIPELINE_DB.get().is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(db_path)
|
||||
.create_if_missing(true);
|
||||
|
||||
let pool = SqlitePool::connect_with(options).await?;
|
||||
sqlx::migrate!("./migrations").run(&pool).await?;
|
||||
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<PipelineWriteMsg>();
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(msg) = rx.recv().await {
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
let result = sqlx::query(
|
||||
"INSERT INTO pipeline_items \
|
||||
(id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \
|
||||
ON CONFLICT(id) DO UPDATE SET \
|
||||
name = excluded.name, \
|
||||
stage = excluded.stage, \
|
||||
agent = excluded.agent, \
|
||||
retry_count = excluded.retry_count, \
|
||||
blocked = excluded.blocked, \
|
||||
depends_on = excluded.depends_on, \
|
||||
updated_at = excluded.updated_at",
|
||||
)
|
||||
.bind(&msg.story_id)
|
||||
.bind(&msg.name)
|
||||
.bind(&msg.stage)
|
||||
.bind(&msg.agent)
|
||||
.bind(msg.retry_count)
|
||||
.bind(msg.blocked.map(|b| b as i64))
|
||||
.bind(&msg.depends_on)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
|
||||
if let Err(e) = result {
|
||||
slog!("[db] Shadow write failed for '{}': {e}", msg.story_id);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let _ = PIPELINE_DB.set(PipelineDb { tx });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Shadow-write a pipeline item move to SQLite.
|
||||
///
|
||||
/// Reads front matter from `file_path` (the post-move location) to extract
|
||||
/// metadata. The write is fire-and-forget — errors are logged but never
|
||||
/// propagate to the caller. If the database has not been initialised this is a
|
||||
/// complete no-op.
|
||||
pub fn shadow_write(story_id: &str, stage: &str, file_path: &Path) {
|
||||
let Some(db) = PIPELINE_DB.get() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let (name, agent, retry_count, blocked, depends_on) =
|
||||
match std::fs::read_to_string(file_path) {
|
||||
Ok(contents) => match parse_front_matter(&contents) {
|
||||
Ok(meta) => (
|
||||
meta.name,
|
||||
meta.agent,
|
||||
meta.retry_count.map(|r| r as i64),
|
||||
meta.blocked,
|
||||
meta.depends_on.as_ref().and_then(|d| serde_json::to_string(d).ok()),
|
||||
),
|
||||
Err(_) => (None, None, None, None, None),
|
||||
},
|
||||
Err(_) => (None, None, None, None, None),
|
||||
};
|
||||
|
||||
let msg = PipelineWriteMsg {
|
||||
story_id: story_id.to_string(),
|
||||
stage: stage.to_string(),
|
||||
name,
|
||||
agent,
|
||||
retry_count,
|
||||
blocked,
|
||||
depends_on,
|
||||
};
|
||||
|
||||
// Ignore send errors: the background task may have exited (e.g. in tests).
|
||||
let _ = db.tx.send(msg);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::fs;
|
||||
|
||||
/// Helper: write a minimal story .md file with front matter.
|
||||
fn write_story(dir: &std::path::Path, filename: &str, content: &str) {
|
||||
fs::write(dir.join(filename), content).unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shadow_write_inserts_row_into_sqlite() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db_path = tmp.path().join("pipeline.db");
|
||||
|
||||
// Initialise the DB in an isolated pool (not the global singleton, to
|
||||
// keep tests hermetic).
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(&db_path)
|
||||
.create_if_missing(true);
|
||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
||||
sqlx::migrate!("./migrations")
|
||||
.run(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Write a story file in a temp stage dir.
|
||||
let stage_dir = tmp.path().join("2_current");
|
||||
fs::create_dir_all(&stage_dir).unwrap();
|
||||
let story_path = stage_dir.join("10_story_shadow_test.md");
|
||||
write_story(
|
||||
&stage_dir,
|
||||
"10_story_shadow_test.md",
|
||||
"---\nname: Shadow Test\nagent: coder-opus\nretry_count: 2\nblocked: false\n---\n# Story\n",
|
||||
);
|
||||
|
||||
// Perform the upsert directly (bypass the global singleton).
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
sqlx::query(
|
||||
"INSERT INTO pipeline_items \
|
||||
(id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \
|
||||
ON CONFLICT(id) DO UPDATE SET \
|
||||
name = excluded.name, \
|
||||
stage = excluded.stage, \
|
||||
agent = excluded.agent, \
|
||||
retry_count = excluded.retry_count, \
|
||||
blocked = excluded.blocked, \
|
||||
depends_on = excluded.depends_on, \
|
||||
updated_at = excluded.updated_at",
|
||||
)
|
||||
.bind("10_story_shadow_test")
|
||||
.bind("Shadow Test")
|
||||
.bind("2_current")
|
||||
.bind("coder-opus")
|
||||
.bind(2_i64)
|
||||
.bind(0_i64)
|
||||
.bind(Option::<String>::None)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Query back and verify.
|
||||
let row: (String, Option<String>, String) = sqlx::query_as(
|
||||
"SELECT id, name, stage FROM pipeline_items WHERE id = ?1",
|
||||
)
|
||||
.bind("10_story_shadow_test")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(row.0, "10_story_shadow_test");
|
||||
assert_eq!(row.1.as_deref(), Some("Shadow Test"));
|
||||
assert_eq!(row.2, "2_current");
|
||||
|
||||
// Verify metadata was parsed correctly from the story file.
|
||||
let (name, _agent, retry_count, _blocked, _depends_on) =
|
||||
match std::fs::read_to_string(&story_path) {
|
||||
Ok(contents) => match parse_front_matter(&contents) {
|
||||
Ok(meta) => (
|
||||
meta.name,
|
||||
meta.agent,
|
||||
meta.retry_count.map(|r| r as i64),
|
||||
meta.blocked,
|
||||
meta.depends_on,
|
||||
),
|
||||
Err(_) => (None, None, None, None, None),
|
||||
},
|
||||
Err(_) => (None, None, None, None, None),
|
||||
};
|
||||
|
||||
assert_eq!(name.as_deref(), Some("Shadow Test"));
|
||||
assert_eq!(retry_count, Some(2));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pipeline_items_table_has_correct_columns() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db_path = tmp.path().join("pipeline.db");
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(&db_path)
|
||||
.create_if_missing(true);
|
||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
||||
sqlx::migrate!("./migrations")
|
||||
.run(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify all required columns exist by inserting a full row.
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
sqlx::query(
|
||||
"INSERT INTO pipeline_items \
|
||||
(id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
|
||||
)
|
||||
.bind("99_story_col_test")
|
||||
.bind(Option::<String>::None)
|
||||
.bind("1_backlog")
|
||||
.bind(Option::<String>::None)
|
||||
.bind(Option::<i64>::None)
|
||||
.bind(Option::<i64>::None)
|
||||
.bind(Option::<String>::None)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pipeline_items")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count.0, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn upsert_updates_stage_on_move() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db_path = tmp.path().join("pipeline.db");
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(&db_path)
|
||||
.create_if_missing(true);
|
||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
||||
sqlx::migrate!("./migrations")
|
||||
.run(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
|
||||
// Insert initial row in backlog.
|
||||
sqlx::query(
|
||||
"INSERT INTO pipeline_items \
|
||||
(id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
|
||||
)
|
||||
.bind("5_story_move")
|
||||
.bind("Move Me")
|
||||
.bind("1_backlog")
|
||||
.bind(Option::<String>::None)
|
||||
.bind(Option::<i64>::None)
|
||||
.bind(Option::<i64>::None)
|
||||
.bind(Option::<String>::None)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Upsert with new stage (simulating move to current).
|
||||
sqlx::query(
|
||||
"INSERT INTO pipeline_items \
|
||||
(id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \
|
||||
ON CONFLICT(id) DO UPDATE SET \
|
||||
name = excluded.name, \
|
||||
stage = excluded.stage, \
|
||||
agent = excluded.agent, \
|
||||
retry_count = excluded.retry_count, \
|
||||
blocked = excluded.blocked, \
|
||||
depends_on = excluded.depends_on, \
|
||||
updated_at = excluded.updated_at",
|
||||
)
|
||||
.bind("5_story_move")
|
||||
.bind("Move Me")
|
||||
.bind("2_current")
|
||||
.bind(Option::<String>::None)
|
||||
.bind(Option::<i64>::None)
|
||||
.bind(Option::<i64>::None)
|
||||
.bind(Option::<String>::None)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let row: (String,) =
|
||||
sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1")
|
||||
.bind("5_story_move")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(row.0, "2_current");
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ mod agent_log;
|
||||
mod agents;
|
||||
mod chat;
|
||||
mod config;
|
||||
mod db;
|
||||
mod http;
|
||||
mod io;
|
||||
mod llm;
|
||||
@@ -282,6 +283,21 @@ async fn main() -> Result<(), std::io::Error> {
|
||||
log_buffer::global().set_log_file(log_dir.join("server.log"));
|
||||
}
|
||||
|
||||
// Initialise the SQLite pipeline shadow-write database.
|
||||
// Clone the path out before the await so we don't hold the MutexGuard across
|
||||
// an await point.
|
||||
let pipeline_db_path = app_state
|
||||
.project_root
|
||||
.lock()
|
||||
.unwrap()
|
||||
.as_ref()
|
||||
.map(|root| root.join(".huskies").join("pipeline.db"));
|
||||
if let Some(db_path) = pipeline_db_path
|
||||
&& let Err(e) = db::init(&db_path).await
|
||||
{
|
||||
slog!("[db] Failed to initialise pipeline.db: {e}");
|
||||
}
|
||||
|
||||
let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default()));
|
||||
|
||||
// Filesystem watcher: broadcast channel for work/ pipeline changes.
|
||||
|
||||
Reference in New Issue
Block a user