Files
huskies/server/src/db/mod.rs
T

586 lines
20 KiB
Rust
Raw Normal View History

/// SQLite storage layer for pipeline state and story content.
///
/// The CRDT layer (`crdt_state`) is the primary source of truth for pipeline
/// metadata (stage, name, agent, etc.). This module provides:
///
/// 1. **Content store** — an in-memory `HashMap<story_id, markdown>` backed
/// by the `pipeline_items.content` column. Provides fast synchronous
/// reads for MCP tools and other callers.
///
/// 2. **Shadow-write channel** — a fire-and-forget background task that
/// upserts `pipeline_items` rows so the database always has a full copy
/// of story content plus metadata.
///
/// On startup, existing content is loaded from the database into memory so
/// no filesystem scan is needed after migration.
use crate::io::story_metadata::parse_front_matter;
use crate::slog;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Mutex, OnceLock};
use tokio::sync::mpsc;
/// A pending shadow write for one pipeline item.
struct PipelineWriteMsg {
story_id: String,
stage: String,
name: Option<String>,
agent: Option<String>,
retry_count: Option<i64>,
blocked: Option<bool>,
depends_on: Option<String>,
content: Option<String>,
}
/// Handle to the background shadow-write task.
pub struct PipelineDb {
tx: mpsc::UnboundedSender<PipelineWriteMsg>,
}
static PIPELINE_DB: OnceLock<PipelineDb> = OnceLock::new();
// ── In-memory content store ─────────────────────────────────────────
static CONTENT_STORE: OnceLock<Mutex<HashMap<String, String>>> = OnceLock::new();
/// Read the full markdown content of a story from the in-memory store.
pub fn read_content(story_id: &str) -> Option<String> {
let store = CONTENT_STORE.get()?;
let map = store.lock().ok()?;
map.get(story_id).cloned()
}
/// Write (or overwrite) the full markdown content of a story.
///
/// Updates the in-memory store immediately.
pub fn write_content(story_id: &str, content: &str) {
if let Some(store) = CONTENT_STORE.get() && let Ok(mut map) = store.lock() {
map.insert(story_id.to_string(), content.to_string());
}
}
/// Remove a story's content from the in-memory store.
pub fn delete_content(story_id: &str) {
if let Some(store) = CONTENT_STORE.get() && let Ok(mut map) = store.lock() {
map.remove(story_id);
}
}
/// Ensure the in-memory content store is initialised.
///
/// Safe to call multiple times — the `OnceLock` is set at most once.
pub fn ensure_content_store() {
let _ = CONTENT_STORE.set(Mutex::new(HashMap::new()));
// In tests, also initialise the in-memory CRDT state so that
// write_item_with_content() and read_all_typed() work without async SQLite.
#[cfg(test)]
crate::crdt_state::init_for_test();
}
/// Return all story IDs present in the content store.
pub fn all_content_ids() -> Vec<String> {
match CONTENT_STORE.get() {
Some(store) => match store.lock() {
Ok(map) => map.keys().cloned().collect(),
Err(_) => Vec::new(),
},
None => Vec::new(),
}
}
// ── Initialisation ──────────────────────────────────────────────────
/// Initialise the pipeline database.
///
/// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations,
/// loads existing story content into the in-memory store, and spawns the
/// background write task. Safe to call only once; subsequent calls are no-ops.
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
if PIPELINE_DB.get().is_some() {
return Ok(());
}
let options = SqliteConnectOptions::new()
.filename(db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
// Load existing content into the in-memory store.
let rows: Vec<(String, Option<String>)> =
sqlx::query_as("SELECT id, content FROM pipeline_items WHERE content IS NOT NULL")
.fetch_all(&pool)
.await?;
let mut content_map = HashMap::new();
for (id, content) in rows {
if let Some(c) = content {
content_map.insert(id, c);
}
}
let _ = CONTENT_STORE.set(Mutex::new(content_map));
let (tx, mut rx) = mpsc::unbounded_channel::<PipelineWriteMsg>();
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \
ON CONFLICT(id) DO UPDATE SET \
name = excluded.name, \
stage = excluded.stage, \
agent = excluded.agent, \
retry_count = excluded.retry_count, \
blocked = excluded.blocked, \
depends_on = excluded.depends_on, \
content = COALESCE(excluded.content, pipeline_items.content), \
updated_at = excluded.updated_at",
)
.bind(&msg.story_id)
.bind(&msg.name)
.bind(&msg.stage)
.bind(&msg.agent)
.bind(msg.retry_count)
.bind(msg.blocked.map(|b| b as i64))
.bind(&msg.depends_on)
.bind(&msg.content)
.bind(&now)
.execute(&pool)
.await;
if let Err(e) = result {
slog!("[db] Shadow write failed for '{}': {e}", msg.story_id);
}
}
});
let _ = PIPELINE_DB.set(PipelineDb { tx });
Ok(())
}
// ── Write path ──────────────────────────────────────────────────────
/// Write a pipeline item from in-memory content (no filesystem access).
///
/// This is the primary write path for the DB-backed pipeline. It updates
/// the CRDT, the in-memory content store, and the SQLite shadow table.
pub fn write_item_with_content(
story_id: &str,
stage: &str,
content: &str,
) {
let (name, agent, retry_count, blocked, depends_on) = match parse_front_matter(content) {
Ok(meta) => (
meta.name,
meta.agent,
meta.retry_count.map(|r| r as i64),
meta.blocked,
meta.depends_on
.as_ref()
.and_then(|d| serde_json::to_string(d).ok()),
),
Err(_) => (None, None, None, None, None),
};
// Update in-memory content store.
ensure_content_store();
write_content(story_id, content);
// Primary: CRDT ops.
crate::crdt_state::write_item(
story_id,
stage,
name.as_deref(),
agent.as_deref(),
retry_count,
blocked,
depends_on.as_deref(),
);
// Shadow: pipeline_items table (only when DB is initialised).
if let Some(db) = PIPELINE_DB.get() {
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: stage.to_string(),
name,
agent,
retry_count,
blocked,
depends_on,
content: Some(content.to_string()),
};
let _ = db.tx.send(msg);
}
}
/// Update only the stage of an existing item (used by move operations).
///
/// Reads current content from the in-memory store, updates the CRDT stage,
/// and persists the change. Optionally modifies the content (e.g. to clear
/// front-matter fields).
pub fn move_item_stage(
story_id: &str,
new_stage: &str,
content_transform: Option<&dyn Fn(&str) -> String>,
) {
let current_content = read_content(story_id);
let content = match (&current_content, content_transform) {
(Some(c), Some(transform)) => {
let new_content = transform(c);
write_content(story_id, &new_content);
Some(new_content)
}
(Some(c), None) => Some(c.clone()),
_ => None,
};
let (name, agent, retry_count, blocked, depends_on) = content
.as_deref()
.or(current_content.as_deref())
.and_then(|c| parse_front_matter(c).ok())
.map(|meta| {
(
meta.name,
meta.agent,
meta.retry_count.map(|r| r as i64),
meta.blocked,
meta.depends_on
.as_ref()
.and_then(|d| serde_json::to_string(d).ok()),
)
})
.unwrap_or((None, None, None, None, None));
// CRDT stage transition.
crate::crdt_state::write_item(
story_id,
new_stage,
name.as_deref(),
agent.as_deref(),
retry_count,
blocked,
depends_on.as_deref(),
);
// Shadow table.
if let Some(db) = PIPELINE_DB.get() {
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: new_stage.to_string(),
name,
agent,
retry_count,
blocked,
depends_on,
content,
};
let _ = db.tx.send(msg);
}
}
/// Delete a story from the shadow table (fire-and-forget).
pub fn delete_item(story_id: &str) {
delete_content(story_id);
if let Some(db) = PIPELINE_DB.get() {
// Reuse the channel with a special "deleted" stage marker.
// The background task will handle it.
// Actually, we send a delete message by abusing the write — we'll
// just remove it by setting stage to "deleted".
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: "deleted".to_string(),
name: None,
agent: None,
retry_count: None,
blocked: None,
depends_on: None,
content: None,
};
let _ = db.tx.send(msg);
}
}
/// Get the next available item number by scanning both the CRDT state
/// and the in-memory content store for the highest existing number.
pub fn next_item_number() -> u32 {
let mut max_num: u32 = 0;
// Scan CRDT items via typed projection.
for item in crate::pipeline_state::read_all_typed() {
let num_str: String = item
.story_id
.0
.chars()
.take_while(|c| c.is_ascii_digit())
.collect();
if let Ok(n) = num_str.parse::<u32>() && n > max_num {
max_num = n;
}
}
// Also scan the content store (might have items not yet in CRDT).
for id in all_content_ids() {
let num_str: String = id.chars().take_while(|c| c.is_ascii_digit()).collect();
if let Ok(n) = num_str.parse::<u32>() && n > max_num {
max_num = n;
}
}
max_num + 1
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
/// Helper: write a minimal story .md file with front matter.
fn write_story(dir: &std::path::Path, filename: &str, content: &str) {
fs::write(dir.join(filename), content).unwrap();
}
#[tokio::test]
async fn shadow_write_inserts_row_into_sqlite() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
// Initialise the DB in an isolated pool (not the global singleton, to
// keep tests hermetic).
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations")
.run(&pool)
.await
.unwrap();
// Write a story file in a temp stage dir.
let stage_dir = tmp.path().join("2_current");
fs::create_dir_all(&stage_dir).unwrap();
let story_path = stage_dir.join("10_story_shadow_test.md");
write_story(
&stage_dir,
"10_story_shadow_test.md",
"---\nname: Shadow Test\nagent: coder-opus\nretry_count: 2\nblocked: false\n---\n# Story\n",
);
// Perform the upsert directly (bypass the global singleton).
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \
ON CONFLICT(id) DO UPDATE SET \
name = excluded.name, \
stage = excluded.stage, \
agent = excluded.agent, \
retry_count = excluded.retry_count, \
blocked = excluded.blocked, \
depends_on = excluded.depends_on, \
content = COALESCE(excluded.content, pipeline_items.content), \
updated_at = excluded.updated_at",
)
.bind("10_story_shadow_test")
.bind("Shadow Test")
.bind("2_current")
.bind("coder-opus")
.bind(2_i64)
.bind(0_i64)
.bind(Option::<String>::None)
.bind("---\nname: Shadow Test\n---\n# Story\n")
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Query back and verify.
let row: (String, Option<String>, String) = sqlx::query_as(
"SELECT id, name, stage FROM pipeline_items WHERE id = ?1",
)
.bind("10_story_shadow_test")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, "10_story_shadow_test");
assert_eq!(row.1.as_deref(), Some("Shadow Test"));
assert_eq!(row.2, "2_current");
// Verify metadata was parsed correctly from the story file.
let (name, _agent, retry_count, _blocked, _depends_on) =
match std::fs::read_to_string(&story_path) {
Ok(contents) => match parse_front_matter(&contents) {
Ok(meta) => (
meta.name,
meta.agent,
meta.retry_count.map(|r| r as i64),
meta.blocked,
meta.depends_on,
),
Err(_) => (None, None, None, None, None),
},
Err(_) => (None, None, None, None, None),
};
assert_eq!(name.as_deref(), Some("Shadow Test"));
assert_eq!(retry_count, Some(2));
}
#[tokio::test]
async fn pipeline_items_table_has_content_column() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations")
.run(&pool)
.await
.unwrap();
// Verify content column exists by inserting a full row.
let now = chrono::Utc::now().to_rfc3339();
let content = "---\nname: Test\n---\n# Story\n";
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)",
)
.bind("99_story_col_test")
.bind(Option::<String>::None)
.bind("1_backlog")
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind(content)
.bind(&now)
.execute(&pool)
.await
.unwrap();
let row: (Option<String>,) = sqlx::query_as(
"SELECT content FROM pipeline_items WHERE id = ?1",
)
.bind("99_story_col_test")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0.as_deref(), Some(content));
}
#[tokio::test]
async fn upsert_updates_stage_on_move() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations")
.run(&pool)
.await
.unwrap();
let now = chrono::Utc::now().to_rfc3339();
// Insert initial row in backlog.
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)",
)
.bind("5_story_move")
.bind("Move Me")
.bind("1_backlog")
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind("---\nname: Move Me\n---\n")
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Upsert with new stage (simulating move to current).
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \
ON CONFLICT(id) DO UPDATE SET \
name = excluded.name, \
stage = excluded.stage, \
agent = excluded.agent, \
retry_count = excluded.retry_count, \
blocked = excluded.blocked, \
depends_on = excluded.depends_on, \
content = COALESCE(excluded.content, pipeline_items.content), \
updated_at = excluded.updated_at",
)
.bind("5_story_move")
.bind("Move Me")
.bind("2_current")
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind(Option::<String>::None) // content NULL → COALESCE preserves existing
.bind(&now)
.execute(&pool)
.await
.unwrap();
let row: (String,) =
sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1")
.bind("5_story_move")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, "2_current");
}
#[test]
fn content_store_read_write_delete() {
ensure_content_store();
let story_id = "100_story_content_test";
let markdown = "---\nname: Content Test\n---\n# Story\n";
// Write.
write_content(story_id, markdown);
assert_eq!(read_content(story_id).as_deref(), Some(markdown));
// Overwrite.
let updated = "---\nname: Updated\n---\n# Updated Story\n";
write_content(story_id, updated);
assert_eq!(read_content(story_id).as_deref(), Some(updated));
// Delete.
delete_content(story_id);
assert!(read_content(story_id).is_none());
}
#[test]
fn next_item_number_returns_1_when_empty() {
// When no items exist, should return 1.
// Note: in test context the global CRDT/content store may or may not
// be initialised, so the function falls back gracefully.
let n = next_item_number();
assert!(n >= 1);
}
}