Files
huskies/server/src/db/mod.rs
T

342 lines
12 KiB
Rust
Raw Normal View History

/// SQLite shadow-write layer for pipeline state.
///
/// All filesystem pipeline operations (move_story_to_X etc.) remain authoritative.
/// This module provides a fire-and-forget channel that dual-writes each move to
/// `.huskies/pipeline.db` so a database layer is ready for future CRDT integration.
///
/// Reads are NOT served from SQLite — the filesystem remains the single source of truth.
use crate::io::story_metadata::parse_front_matter;
use crate::slog;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::path::Path;
use std::sync::OnceLock;
use tokio::sync::mpsc;
/// A pending shadow write for one pipeline item.
struct PipelineWriteMsg {
story_id: String,
stage: String,
name: Option<String>,
agent: Option<String>,
retry_count: Option<i64>,
blocked: Option<bool>,
depends_on: Option<String>,
}
/// Handle to the background shadow-write task.
pub struct PipelineDb {
tx: mpsc::UnboundedSender<PipelineWriteMsg>,
}
static PIPELINE_DB: OnceLock<PipelineDb> = OnceLock::new();
/// Initialise the pipeline database.
///
/// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations,
/// and spawns the background write task. Safe to call only once; subsequent calls
/// are no-ops (the `OnceLock` rejects them silently).
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
if PIPELINE_DB.get().is_some() {
return Ok(());
}
let options = SqliteConnectOptions::new()
.filename(db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
let (tx, mut rx) = mpsc::unbounded_channel::<PipelineWriteMsg>();
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \
ON CONFLICT(id) DO UPDATE SET \
name = excluded.name, \
stage = excluded.stage, \
agent = excluded.agent, \
retry_count = excluded.retry_count, \
blocked = excluded.blocked, \
depends_on = excluded.depends_on, \
updated_at = excluded.updated_at",
)
.bind(&msg.story_id)
.bind(&msg.name)
.bind(&msg.stage)
.bind(&msg.agent)
.bind(msg.retry_count)
.bind(msg.blocked.map(|b| b as i64))
.bind(&msg.depends_on)
.bind(&now)
.execute(&pool)
.await;
if let Err(e) = result {
slog!("[db] Shadow write failed for '{}': {e}", msg.story_id);
}
}
});
let _ = PIPELINE_DB.set(PipelineDb { tx });
Ok(())
}
/// Write a pipeline item state to both the CRDT layer and the legacy SQLite
/// shadow table.
///
/// Reads front matter from `file_path` (the post-move location) to extract
/// metadata. The CRDT layer is the primary write path; the legacy shadow
/// table is kept for backwards compatibility. Both writes are fire-and-forget.
pub fn shadow_write(story_id: &str, stage: &str, file_path: &Path) {
let (name, agent, retry_count, blocked, depends_on) =
match std::fs::read_to_string(file_path) {
Ok(contents) => match parse_front_matter(&contents) {
Ok(meta) => (
meta.name,
meta.agent,
meta.retry_count.map(|r| r as i64),
meta.blocked,
meta.depends_on.as_ref().and_then(|d| serde_json::to_string(d).ok()),
),
Err(_) => (None, None, None, None, None),
},
Err(_) => (None, None, None, None, None),
};
// Primary: write through CRDT ops (persisted to SQLite crdt_ops table).
crate::crdt_state::write_item(
story_id,
stage,
name.as_deref(),
agent.as_deref(),
retry_count,
blocked,
depends_on.as_deref(),
);
// Legacy: fire-and-forget to the pipeline_items shadow table.
if let Some(db) = PIPELINE_DB.get() {
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: stage.to_string(),
name,
agent,
retry_count,
blocked,
depends_on,
};
let _ = db.tx.send(msg);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
/// Helper: write a minimal story .md file with front matter.
fn write_story(dir: &std::path::Path, filename: &str, content: &str) {
fs::write(dir.join(filename), content).unwrap();
}
#[tokio::test]
async fn shadow_write_inserts_row_into_sqlite() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
// Initialise the DB in an isolated pool (not the global singleton, to
// keep tests hermetic).
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations")
.run(&pool)
.await
.unwrap();
// Write a story file in a temp stage dir.
let stage_dir = tmp.path().join("2_current");
fs::create_dir_all(&stage_dir).unwrap();
let story_path = stage_dir.join("10_story_shadow_test.md");
write_story(
&stage_dir,
"10_story_shadow_test.md",
"---\nname: Shadow Test\nagent: coder-opus\nretry_count: 2\nblocked: false\n---\n# Story\n",
);
// Perform the upsert directly (bypass the global singleton).
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \
ON CONFLICT(id) DO UPDATE SET \
name = excluded.name, \
stage = excluded.stage, \
agent = excluded.agent, \
retry_count = excluded.retry_count, \
blocked = excluded.blocked, \
depends_on = excluded.depends_on, \
updated_at = excluded.updated_at",
)
.bind("10_story_shadow_test")
.bind("Shadow Test")
.bind("2_current")
.bind("coder-opus")
.bind(2_i64)
.bind(0_i64)
.bind(Option::<String>::None)
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Query back and verify.
let row: (String, Option<String>, String) = sqlx::query_as(
"SELECT id, name, stage FROM pipeline_items WHERE id = ?1",
)
.bind("10_story_shadow_test")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, "10_story_shadow_test");
assert_eq!(row.1.as_deref(), Some("Shadow Test"));
assert_eq!(row.2, "2_current");
// Verify metadata was parsed correctly from the story file.
let (name, _agent, retry_count, _blocked, _depends_on) =
match std::fs::read_to_string(&story_path) {
Ok(contents) => match parse_front_matter(&contents) {
Ok(meta) => (
meta.name,
meta.agent,
meta.retry_count.map(|r| r as i64),
meta.blocked,
meta.depends_on,
),
Err(_) => (None, None, None, None, None),
},
Err(_) => (None, None, None, None, None),
};
assert_eq!(name.as_deref(), Some("Shadow Test"));
assert_eq!(retry_count, Some(2));
}
#[tokio::test]
async fn pipeline_items_table_has_correct_columns() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations")
.run(&pool)
.await
.unwrap();
// Verify all required columns exist by inserting a full row.
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
)
.bind("99_story_col_test")
.bind(Option::<String>::None)
.bind("1_backlog")
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind(&now)
.execute(&pool)
.await
.unwrap();
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pipeline_items")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 1);
}
#[tokio::test]
async fn upsert_updates_stage_on_move() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations")
.run(&pool)
.await
.unwrap();
let now = chrono::Utc::now().to_rfc3339();
// Insert initial row in backlog.
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
)
.bind("5_story_move")
.bind("Move Me")
.bind("1_backlog")
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Upsert with new stage (simulating move to current).
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \
ON CONFLICT(id) DO UPDATE SET \
name = excluded.name, \
stage = excluded.stage, \
agent = excluded.agent, \
retry_count = excluded.retry_count, \
blocked = excluded.blocked, \
depends_on = excluded.depends_on, \
updated_at = excluded.updated_at",
)
.bind("5_story_move")
.bind("Move Me")
.bind("2_current")
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind(&now)
.execute(&pool)
.await
.unwrap();
let row: (String,) =
sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1")
.bind("5_story_move")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, "2_current");
}
}