huskies: merge 843
This commit is contained in:
@@ -0,0 +1,105 @@
|
|||||||
|
//! In-memory content store — fast synchronous reads for story markdown.
|
||||||
|
//!
|
||||||
|
//! Backed by a `HashMap<story_id, markdown>` wrapped in a `Mutex`. In
|
||||||
|
//! non-test builds the store lives in a process-global `OnceLock`; in tests
|
||||||
|
//! each thread gets its own isolated copy via a `thread_local!` to avoid
|
||||||
|
//! cross-test pollution.
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Mutex, OnceLock};
|
||||||
|
|
||||||
|
static CONTENT_STORE: OnceLock<Mutex<HashMap<String, String>>> = OnceLock::new();
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
thread_local! {
|
||||||
|
/// Per-thread isolated content store used in tests to prevent cross-test pollution.
|
||||||
|
pub(super) static CONTENT_STORE_TL: OnceLock<Mutex<HashMap<String, String>>> = const { OnceLock::new() };
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(test))]
|
||||||
|
/// Return a reference to the process-global content store, or `None` if not yet initialised.
|
||||||
|
pub(super) fn get_content_store() -> Option<&'static Mutex<HashMap<String, String>>> {
|
||||||
|
CONTENT_STORE.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
/// Return the thread-local content store for tests, falling back to the global store.
|
||||||
|
pub(super) fn get_content_store() -> Option<&'static Mutex<HashMap<String, String>>> {
|
||||||
|
let tl = CONTENT_STORE_TL.with(|lock| {
|
||||||
|
if lock.get().is_some() {
|
||||||
|
Some(lock as *const OnceLock<Mutex<HashMap<String, String>>>)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if let Some(ptr) = tl {
|
||||||
|
// SAFETY: The thread-local lives as long as the thread, which outlives
|
||||||
|
// any test using it. We only need 'static for the return type.
|
||||||
|
let lock = unsafe { &*ptr };
|
||||||
|
lock.get()
|
||||||
|
} else {
|
||||||
|
CONTENT_STORE.get()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the full markdown content of a story from the in-memory store.
|
||||||
|
pub fn read_content(story_id: &str) -> Option<String> {
|
||||||
|
let store = get_content_store()?;
|
||||||
|
let map = store.lock().ok()?;
|
||||||
|
map.get(story_id).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write (or overwrite) the full markdown content of a story.
|
||||||
|
///
|
||||||
|
/// Updates the in-memory store immediately.
|
||||||
|
pub fn write_content(story_id: &str, content: &str) {
|
||||||
|
if let Some(store) = get_content_store()
|
||||||
|
&& let Ok(mut map) = store.lock()
|
||||||
|
{
|
||||||
|
map.insert(story_id.to_string(), content.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a story's content from the in-memory store.
|
||||||
|
pub fn delete_content(story_id: &str) {
|
||||||
|
if let Some(store) = get_content_store()
|
||||||
|
&& let Ok(mut map) = store.lock()
|
||||||
|
{
|
||||||
|
map.remove(story_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ensure the in-memory content store is initialised.
|
||||||
|
///
|
||||||
|
/// Safe to call multiple times — the `OnceLock` is set at most once.
|
||||||
|
pub fn ensure_content_store() {
|
||||||
|
#[cfg(not(test))]
|
||||||
|
{
|
||||||
|
let _ = CONTENT_STORE.set(Mutex::new(HashMap::new()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
{
|
||||||
|
CONTENT_STORE_TL.with(|lock| {
|
||||||
|
if lock.get().is_none() {
|
||||||
|
let _ = lock.set(Mutex::new(HashMap::new()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return all story IDs present in the content store.
|
||||||
|
pub fn all_content_ids() -> Vec<String> {
|
||||||
|
match get_content_store() {
|
||||||
|
Some(store) => match store.lock() {
|
||||||
|
Ok(map) => map.keys().cloned().collect(),
|
||||||
|
Err(_) => Vec::new(),
|
||||||
|
},
|
||||||
|
None => Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialise the content store from a pre-loaded map (used during DB startup).
|
||||||
|
pub(super) fn init_content_store(map: HashMap<String, String>) {
|
||||||
|
let _ = CONTENT_STORE.set(Mutex::new(map));
|
||||||
|
}
|
||||||
+18
-412
@@ -14,417 +14,23 @@
|
|||||||
///
|
///
|
||||||
/// On startup, existing content is loaded from the database into memory so
|
/// On startup, existing content is loaded from the database into memory so
|
||||||
/// no filesystem scan is needed after migration.
|
/// no filesystem scan is needed after migration.
|
||||||
use crate::io::story_metadata::parse_front_matter;
|
pub mod content_store;
|
||||||
use crate::slog;
|
/// Write operations for the pipeline — content, stage transitions, and deletions.
|
||||||
use sqlx::SqlitePool;
|
pub mod ops;
|
||||||
use sqlx::sqlite::SqliteConnectOptions;
|
/// Background shadow-write task — persists pipeline items to SQLite asynchronously.
|
||||||
use std::collections::HashMap;
|
pub mod shadow_write;
|
||||||
use std::path::Path;
|
|
||||||
use std::sync::{Mutex, OnceLock};
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
/// A pending shadow write for one pipeline item.
|
pub use content_store::{all_content_ids, delete_content, read_content, write_content};
|
||||||
struct PipelineWriteMsg {
|
pub use ops::{delete_item, move_item_stage, next_item_number, write_item_with_content};
|
||||||
story_id: String,
|
pub use shadow_write::init;
|
||||||
stage: String,
|
|
||||||
name: Option<String>,
|
|
||||||
agent: Option<String>,
|
|
||||||
retry_count: Option<i64>,
|
|
||||||
blocked: Option<bool>,
|
|
||||||
depends_on: Option<String>,
|
|
||||||
content: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handle to the background shadow-write task.
|
|
||||||
pub struct PipelineDb {
|
|
||||||
tx: mpsc::UnboundedSender<PipelineWriteMsg>,
|
|
||||||
}
|
|
||||||
|
|
||||||
static PIPELINE_DB: OnceLock<PipelineDb> = OnceLock::new();
|
|
||||||
|
|
||||||
// ── In-memory content store ─────────────────────────────────────────
|
|
||||||
|
|
||||||
static CONTENT_STORE: OnceLock<Mutex<HashMap<String, String>>> = OnceLock::new();
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
thread_local! {
|
pub use content_store::ensure_content_store;
|
||||||
static CONTENT_STORE_TL: OnceLock<Mutex<HashMap<String, String>>> = const { OnceLock::new() };
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(not(test))]
|
|
||||||
fn get_content_store() -> Option<&'static Mutex<HashMap<String, String>>> {
|
|
||||||
CONTENT_STORE.get()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
fn get_content_store() -> Option<&'static Mutex<HashMap<String, String>>> {
|
|
||||||
let tl = CONTENT_STORE_TL.with(|lock| {
|
|
||||||
if lock.get().is_some() {
|
|
||||||
Some(lock as *const OnceLock<Mutex<HashMap<String, String>>>)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if let Some(ptr) = tl {
|
|
||||||
// SAFETY: The thread-local lives as long as the thread, which outlives
|
|
||||||
// any test using it. We only need 'static for the return type.
|
|
||||||
let lock = unsafe { &*ptr };
|
|
||||||
lock.get()
|
|
||||||
} else {
|
|
||||||
CONTENT_STORE.get()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read the full markdown content of a story from the in-memory store.
|
|
||||||
pub fn read_content(story_id: &str) -> Option<String> {
|
|
||||||
let store = get_content_store()?;
|
|
||||||
let map = store.lock().ok()?;
|
|
||||||
map.get(story_id).cloned()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write (or overwrite) the full markdown content of a story.
|
|
||||||
///
|
|
||||||
/// Updates the in-memory store immediately.
|
|
||||||
pub fn write_content(story_id: &str, content: &str) {
|
|
||||||
if let Some(store) = get_content_store()
|
|
||||||
&& let Ok(mut map) = store.lock()
|
|
||||||
{
|
|
||||||
map.insert(story_id.to_string(), content.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove a story's content from the in-memory store.
|
|
||||||
pub fn delete_content(story_id: &str) {
|
|
||||||
if let Some(store) = get_content_store()
|
|
||||||
&& let Ok(mut map) = store.lock()
|
|
||||||
{
|
|
||||||
map.remove(story_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Ensure the in-memory content store is initialised.
|
|
||||||
///
|
|
||||||
/// Safe to call multiple times — the `OnceLock` is set at most once.
|
|
||||||
pub fn ensure_content_store() {
|
|
||||||
#[cfg(not(test))]
|
|
||||||
{
|
|
||||||
let _ = CONTENT_STORE.set(Mutex::new(HashMap::new()));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
{
|
|
||||||
CONTENT_STORE_TL.with(|lock| {
|
|
||||||
if lock.get().is_none() {
|
|
||||||
let _ = lock.set(Mutex::new(HashMap::new()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
crate::crdt_state::init_for_test();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return all story IDs present in the content store.
|
|
||||||
pub fn all_content_ids() -> Vec<String> {
|
|
||||||
match get_content_store() {
|
|
||||||
Some(store) => match store.lock() {
|
|
||||||
Ok(map) => map.keys().cloned().collect(),
|
|
||||||
Err(_) => Vec::new(),
|
|
||||||
},
|
|
||||||
None => Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Initialisation ──────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// Initialise the pipeline database.
|
|
||||||
///
|
|
||||||
/// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations,
|
|
||||||
/// loads existing story content into the in-memory store, and spawns the
|
|
||||||
/// background write task. Safe to call only once; subsequent calls are no-ops.
|
|
||||||
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
|
||||||
if PIPELINE_DB.get().is_some() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let options = SqliteConnectOptions::new()
|
|
||||||
.filename(db_path)
|
|
||||||
.create_if_missing(true);
|
|
||||||
|
|
||||||
let pool = SqlitePool::connect_with(options).await?;
|
|
||||||
sqlx::migrate!("./migrations").run(&pool).await?;
|
|
||||||
|
|
||||||
// Load existing content into the in-memory store.
|
|
||||||
let rows: Vec<(String, Option<String>)> =
|
|
||||||
sqlx::query_as("SELECT id, content FROM pipeline_items WHERE content IS NOT NULL")
|
|
||||||
.fetch_all(&pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let mut content_map = HashMap::new();
|
|
||||||
for (id, content) in rows {
|
|
||||||
if let Some(c) = content {
|
|
||||||
content_map.insert(id, c);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let _ = CONTENT_STORE.set(Mutex::new(content_map));
|
|
||||||
|
|
||||||
let (tx, mut rx) = mpsc::unbounded_channel::<PipelineWriteMsg>();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
while let Some(msg) = rx.recv().await {
|
|
||||||
// The "deleted" sentinel means the caller wants the row gone.
|
|
||||||
// Issue a real DELETE so the shadow table stays clean and
|
|
||||||
// sync_crdt_stages_from_db cannot resurrect a tombstoned item on
|
|
||||||
// the next restart.
|
|
||||||
if msg.stage == "deleted" {
|
|
||||||
let result = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1")
|
|
||||||
.bind(&msg.story_id)
|
|
||||||
.execute(&pool)
|
|
||||||
.await;
|
|
||||||
if let Err(e) = result {
|
|
||||||
slog!("[db] Shadow delete failed for '{}': {e}", msg.story_id);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let now = chrono::Utc::now().to_rfc3339();
|
|
||||||
let result = sqlx::query(
|
|
||||||
"INSERT INTO pipeline_items \
|
|
||||||
(id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \
|
|
||||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \
|
|
||||||
ON CONFLICT(id) DO UPDATE SET \
|
|
||||||
name = excluded.name, \
|
|
||||||
stage = excluded.stage, \
|
|
||||||
agent = excluded.agent, \
|
|
||||||
retry_count = excluded.retry_count, \
|
|
||||||
blocked = excluded.blocked, \
|
|
||||||
depends_on = excluded.depends_on, \
|
|
||||||
content = COALESCE(excluded.content, pipeline_items.content), \
|
|
||||||
updated_at = excluded.updated_at",
|
|
||||||
)
|
|
||||||
.bind(&msg.story_id)
|
|
||||||
.bind(&msg.name)
|
|
||||||
.bind(&msg.stage)
|
|
||||||
.bind(&msg.agent)
|
|
||||||
.bind(msg.retry_count)
|
|
||||||
.bind(msg.blocked.map(|b| b as i64))
|
|
||||||
.bind(&msg.depends_on)
|
|
||||||
.bind(&msg.content)
|
|
||||||
.bind(&now)
|
|
||||||
.execute(&pool)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if let Err(e) = result {
|
|
||||||
slog!("[db] Shadow write failed for '{}': {e}", msg.story_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let _ = PIPELINE_DB.set(PipelineDb { tx });
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Write path ──────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// Write a pipeline item from in-memory content (no filesystem access).
|
|
||||||
///
|
|
||||||
/// This is the primary write path for the DB-backed pipeline. It updates
|
|
||||||
/// the CRDT, the in-memory content store, and the SQLite shadow table.
|
|
||||||
pub fn write_item_with_content(story_id: &str, stage: &str, content: &str) {
|
|
||||||
let (name, agent, retry_count, blocked, depends_on) = match parse_front_matter(content) {
|
|
||||||
Ok(meta) => (
|
|
||||||
meta.name,
|
|
||||||
meta.agent,
|
|
||||||
meta.retry_count.map(|r| r as i64),
|
|
||||||
meta.blocked,
|
|
||||||
meta.depends_on
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|d| serde_json::to_string(d).ok()),
|
|
||||||
),
|
|
||||||
Err(_) => (None, None, None, None, None),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Update in-memory content store.
|
|
||||||
ensure_content_store();
|
|
||||||
write_content(story_id, content);
|
|
||||||
|
|
||||||
// Primary: CRDT ops.
|
|
||||||
let merged_at_ts = if crate::pipeline_state::Stage::from_dir(stage)
|
|
||||||
.is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. }))
|
|
||||||
{
|
|
||||||
Some(chrono::Utc::now().timestamp() as f64)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
crate::crdt_state::write_item(
|
|
||||||
story_id,
|
|
||||||
stage,
|
|
||||||
name.as_deref(),
|
|
||||||
agent.as_deref(),
|
|
||||||
retry_count,
|
|
||||||
blocked,
|
|
||||||
depends_on.as_deref(),
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
merged_at_ts,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Shadow: pipeline_items table (only when DB is initialised).
|
|
||||||
if let Some(db) = PIPELINE_DB.get() {
|
|
||||||
let msg = PipelineWriteMsg {
|
|
||||||
story_id: story_id.to_string(),
|
|
||||||
stage: stage.to_string(),
|
|
||||||
name,
|
|
||||||
agent,
|
|
||||||
retry_count,
|
|
||||||
blocked,
|
|
||||||
depends_on,
|
|
||||||
content: Some(content.to_string()),
|
|
||||||
};
|
|
||||||
let _ = db.tx.send(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Update only the stage of an existing item (used by move operations).
|
|
||||||
///
|
|
||||||
/// Reads current content from the in-memory store, updates the CRDT stage,
|
|
||||||
/// and persists the change. Optionally modifies the content (e.g. to clear
|
|
||||||
/// front-matter fields).
|
|
||||||
pub fn move_item_stage(
|
|
||||||
story_id: &str,
|
|
||||||
new_stage: &str,
|
|
||||||
content_transform: Option<&dyn Fn(&str) -> String>,
|
|
||||||
) {
|
|
||||||
let current_content = read_content(story_id);
|
|
||||||
|
|
||||||
let content = match (¤t_content, content_transform) {
|
|
||||||
(Some(c), Some(transform)) => {
|
|
||||||
let new_content = transform(c);
|
|
||||||
write_content(story_id, &new_content);
|
|
||||||
Some(new_content)
|
|
||||||
}
|
|
||||||
(Some(c), None) => Some(c.clone()),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let (name, agent, _ignored_retry_count, blocked, depends_on) = content
|
|
||||||
.as_deref()
|
|
||||||
.or(current_content.as_deref())
|
|
||||||
.and_then(|c| parse_front_matter(c).ok())
|
|
||||||
.map(|meta| {
|
|
||||||
(
|
|
||||||
meta.name,
|
|
||||||
meta.agent,
|
|
||||||
meta.retry_count.map(|r| r as i64),
|
|
||||||
meta.blocked,
|
|
||||||
meta.depends_on
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|d| serde_json::to_string(d).ok()),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.unwrap_or((None, None, None, None, None));
|
|
||||||
|
|
||||||
// CRDT stage transition.
|
|
||||||
let merged_at_ts = if crate::pipeline_state::Stage::from_dir(new_stage)
|
|
||||||
.is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. }))
|
|
||||||
{
|
|
||||||
Some(chrono::Utc::now().timestamp() as f64)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
crate::crdt_state::write_item(
|
|
||||||
story_id,
|
|
||||||
new_stage,
|
|
||||||
name.as_deref(),
|
|
||||||
agent.as_deref(),
|
|
||||||
None,
|
|
||||||
blocked,
|
|
||||||
depends_on.as_deref(),
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
merged_at_ts,
|
|
||||||
);
|
|
||||||
// Bug 780: stage transitions reset retry_count to 0. retry_count tracks
|
|
||||||
// attempts at THIS stage's work (coding, merging, qa); a fresh attempt at
|
|
||||||
// a new stage is conceptually distinct from prior attempts at a different
|
|
||||||
// stage. `blocked` is preserved — that's a human-set signal that survives
|
|
||||||
// transitions.
|
|
||||||
crate::crdt_state::set_retry_count(story_id, 0);
|
|
||||||
|
|
||||||
// Shadow table — always reset retry_count to 0 on stage transition.
|
|
||||||
let retry_count: Option<i64> = Some(0);
|
|
||||||
if let Some(db) = PIPELINE_DB.get() {
|
|
||||||
let msg = PipelineWriteMsg {
|
|
||||||
story_id: story_id.to_string(),
|
|
||||||
stage: new_stage.to_string(),
|
|
||||||
name,
|
|
||||||
agent,
|
|
||||||
retry_count,
|
|
||||||
blocked,
|
|
||||||
depends_on,
|
|
||||||
content,
|
|
||||||
};
|
|
||||||
let _ = db.tx.send(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Delete a story from the shadow table (fire-and-forget).
|
|
||||||
pub fn delete_item(story_id: &str) {
|
|
||||||
delete_content(story_id);
|
|
||||||
|
|
||||||
if let Some(db) = PIPELINE_DB.get() {
|
|
||||||
// Reuse the channel with a special "deleted" stage marker.
|
|
||||||
// The background task will handle it.
|
|
||||||
// Actually, we send a delete message by abusing the write — we'll
|
|
||||||
// just remove it by setting stage to "deleted".
|
|
||||||
let msg = PipelineWriteMsg {
|
|
||||||
story_id: story_id.to_string(),
|
|
||||||
stage: "deleted".to_string(),
|
|
||||||
name: None,
|
|
||||||
agent: None,
|
|
||||||
retry_count: None,
|
|
||||||
blocked: None,
|
|
||||||
depends_on: None,
|
|
||||||
content: None,
|
|
||||||
};
|
|
||||||
let _ = db.tx.send(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the next available item number by scanning both the CRDT state
|
|
||||||
/// and the in-memory content store for the highest existing number.
|
|
||||||
pub fn next_item_number() -> u32 {
|
|
||||||
let mut max_num: u32 = 0;
|
|
||||||
|
|
||||||
// Scan CRDT items via typed projection.
|
|
||||||
for item in crate::pipeline_state::read_all_typed() {
|
|
||||||
let num_str: String = item
|
|
||||||
.story_id
|
|
||||||
.0
|
|
||||||
.chars()
|
|
||||||
.take_while(|c| c.is_ascii_digit())
|
|
||||||
.collect();
|
|
||||||
if let Ok(n) = num_str.parse::<u32>()
|
|
||||||
&& n > max_num
|
|
||||||
{
|
|
||||||
max_num = n;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Also scan the content store (might have items not yet in CRDT).
|
|
||||||
for id in all_content_ids() {
|
|
||||||
let num_str: String = id.chars().take_while(|c| c.is_ascii_digit()).collect();
|
|
||||||
if let Ok(n) = num_str.parse::<u32>()
|
|
||||||
&& n > max_num
|
|
||||||
{
|
|
||||||
max_num = n;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
max_num + 1
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::io::story_metadata::parse_front_matter;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
|
||||||
/// Helper: write a minimal story .md file with front matter.
|
/// Helper: write a minimal story .md file with front matter.
|
||||||
@@ -439,10 +45,10 @@ mod tests {
|
|||||||
|
|
||||||
// Initialise the DB in an isolated pool (not the global singleton, to
|
// Initialise the DB in an isolated pool (not the global singleton, to
|
||||||
// keep tests hermetic).
|
// keep tests hermetic).
|
||||||
let options = SqliteConnectOptions::new()
|
let options = sqlx::sqlite::SqliteConnectOptions::new()
|
||||||
.filename(&db_path)
|
.filename(&db_path)
|
||||||
.create_if_missing(true);
|
.create_if_missing(true);
|
||||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
|
||||||
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
||||||
|
|
||||||
// Write a story file in a temp stage dir.
|
// Write a story file in a temp stage dir.
|
||||||
@@ -520,10 +126,10 @@ mod tests {
|
|||||||
async fn pipeline_items_table_has_content_column() {
|
async fn pipeline_items_table_has_content_column() {
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
let db_path = tmp.path().join("pipeline.db");
|
let db_path = tmp.path().join("pipeline.db");
|
||||||
let options = SqliteConnectOptions::new()
|
let options = sqlx::sqlite::SqliteConnectOptions::new()
|
||||||
.filename(&db_path)
|
.filename(&db_path)
|
||||||
.create_if_missing(true);
|
.create_if_missing(true);
|
||||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
|
||||||
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
||||||
|
|
||||||
// Verify content column exists by inserting a full row.
|
// Verify content column exists by inserting a full row.
|
||||||
@@ -560,10 +166,10 @@ mod tests {
|
|||||||
async fn upsert_updates_stage_on_move() {
|
async fn upsert_updates_stage_on_move() {
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
let db_path = tmp.path().join("pipeline.db");
|
let db_path = tmp.path().join("pipeline.db");
|
||||||
let options = SqliteConnectOptions::new()
|
let options = sqlx::sqlite::SqliteConnectOptions::new()
|
||||||
.filename(&db_path)
|
.filename(&db_path)
|
||||||
.create_if_missing(true);
|
.create_if_missing(true);
|
||||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
|
||||||
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
||||||
|
|
||||||
let now = chrono::Utc::now().to_rfc3339();
|
let now = chrono::Utc::now().to_rfc3339();
|
||||||
@@ -664,10 +270,10 @@ mod tests {
|
|||||||
let tmp = tempfile::tempdir().unwrap();
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
let db_path = tmp.path().join("pipeline.db");
|
let db_path = tmp.path().join("pipeline.db");
|
||||||
|
|
||||||
let options = SqliteConnectOptions::new()
|
let options = sqlx::sqlite::SqliteConnectOptions::new()
|
||||||
.filename(&db_path)
|
.filename(&db_path)
|
||||||
.create_if_missing(true);
|
.create_if_missing(true);
|
||||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
|
||||||
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
||||||
|
|
||||||
let now = chrono::Utc::now().to_rfc3339();
|
let now = chrono::Utc::now().to_rfc3339();
|
||||||
|
|||||||
@@ -0,0 +1,208 @@
|
|||||||
|
//! Write operations for the pipeline — content, stage transitions, and deletions.
|
||||||
|
//!
|
||||||
|
//! Each function updates three layers atomically in order: the in-memory
|
||||||
|
//! content store, the CRDT (source of truth for metadata), and the SQLite
|
||||||
|
//! shadow table (via the background channel).
|
||||||
|
use super::content_store::{
|
||||||
|
all_content_ids, delete_content, ensure_content_store, read_content, write_content,
|
||||||
|
};
|
||||||
|
use super::shadow_write::{PIPELINE_DB, PipelineWriteMsg};
|
||||||
|
use crate::io::story_metadata::parse_front_matter;
|
||||||
|
|
||||||
|
/// Write a pipeline item from in-memory content (no filesystem access).
|
||||||
|
///
|
||||||
|
/// This is the primary write path for the DB-backed pipeline. It updates
|
||||||
|
/// the CRDT, the in-memory content store, and the SQLite shadow table.
|
||||||
|
pub fn write_item_with_content(story_id: &str, stage: &str, content: &str) {
|
||||||
|
let (name, agent, retry_count, blocked, depends_on) = match parse_front_matter(content) {
|
||||||
|
Ok(meta) => (
|
||||||
|
meta.name,
|
||||||
|
meta.agent,
|
||||||
|
meta.retry_count.map(|r| r as i64),
|
||||||
|
meta.blocked,
|
||||||
|
meta.depends_on
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|d| serde_json::to_string(d).ok()),
|
||||||
|
),
|
||||||
|
Err(_) => (None, None, None, None, None),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Update in-memory content store.
|
||||||
|
ensure_content_store();
|
||||||
|
write_content(story_id, content);
|
||||||
|
|
||||||
|
// Primary: CRDT ops.
|
||||||
|
let merged_at_ts = if crate::pipeline_state::Stage::from_dir(stage)
|
||||||
|
.is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. }))
|
||||||
|
{
|
||||||
|
Some(chrono::Utc::now().timestamp() as f64)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
crate::crdt_state::write_item(
|
||||||
|
story_id,
|
||||||
|
stage,
|
||||||
|
name.as_deref(),
|
||||||
|
agent.as_deref(),
|
||||||
|
retry_count,
|
||||||
|
blocked,
|
||||||
|
depends_on.as_deref(),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
merged_at_ts,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Shadow: pipeline_items table (only when DB is initialised).
|
||||||
|
if let Some(db) = PIPELINE_DB.get() {
|
||||||
|
let msg = PipelineWriteMsg {
|
||||||
|
story_id: story_id.to_string(),
|
||||||
|
stage: stage.to_string(),
|
||||||
|
name,
|
||||||
|
agent,
|
||||||
|
retry_count,
|
||||||
|
blocked,
|
||||||
|
depends_on,
|
||||||
|
content: Some(content.to_string()),
|
||||||
|
};
|
||||||
|
let _ = db.tx.send(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update only the stage of an existing item (used by move operations).
|
||||||
|
///
|
||||||
|
/// Reads current content from the in-memory store, updates the CRDT stage,
|
||||||
|
/// and persists the change. Optionally modifies the content (e.g. to clear
|
||||||
|
/// front-matter fields).
|
||||||
|
pub fn move_item_stage(
|
||||||
|
story_id: &str,
|
||||||
|
new_stage: &str,
|
||||||
|
content_transform: Option<&dyn Fn(&str) -> String>,
|
||||||
|
) {
|
||||||
|
let current_content = read_content(story_id);
|
||||||
|
|
||||||
|
let content = match (¤t_content, content_transform) {
|
||||||
|
(Some(c), Some(transform)) => {
|
||||||
|
let new_content = transform(c);
|
||||||
|
write_content(story_id, &new_content);
|
||||||
|
Some(new_content)
|
||||||
|
}
|
||||||
|
(Some(c), None) => Some(c.clone()),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (name, agent, _ignored_retry_count, blocked, depends_on) = content
|
||||||
|
.as_deref()
|
||||||
|
.or(current_content.as_deref())
|
||||||
|
.and_then(|c| parse_front_matter(c).ok())
|
||||||
|
.map(|meta| {
|
||||||
|
(
|
||||||
|
meta.name,
|
||||||
|
meta.agent,
|
||||||
|
meta.retry_count.map(|r| r as i64),
|
||||||
|
meta.blocked,
|
||||||
|
meta.depends_on
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|d| serde_json::to_string(d).ok()),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.unwrap_or((None, None, None, None, None));
|
||||||
|
|
||||||
|
// CRDT stage transition.
|
||||||
|
let merged_at_ts = if crate::pipeline_state::Stage::from_dir(new_stage)
|
||||||
|
.is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. }))
|
||||||
|
{
|
||||||
|
Some(chrono::Utc::now().timestamp() as f64)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
crate::crdt_state::write_item(
|
||||||
|
story_id,
|
||||||
|
new_stage,
|
||||||
|
name.as_deref(),
|
||||||
|
agent.as_deref(),
|
||||||
|
None,
|
||||||
|
blocked,
|
||||||
|
depends_on.as_deref(),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
merged_at_ts,
|
||||||
|
);
|
||||||
|
// Bug 780: stage transitions reset retry_count to 0. retry_count tracks
|
||||||
|
// attempts at THIS stage's work (coding, merging, qa); a fresh attempt at
|
||||||
|
// a new stage is conceptually distinct from prior attempts at a different
|
||||||
|
// stage. `blocked` is preserved — that's a human-set signal that survives
|
||||||
|
// transitions.
|
||||||
|
crate::crdt_state::set_retry_count(story_id, 0);
|
||||||
|
|
||||||
|
// Shadow table — always reset retry_count to 0 on stage transition.
|
||||||
|
let retry_count: Option<i64> = Some(0);
|
||||||
|
if let Some(db) = PIPELINE_DB.get() {
|
||||||
|
let msg = PipelineWriteMsg {
|
||||||
|
story_id: story_id.to_string(),
|
||||||
|
stage: new_stage.to_string(),
|
||||||
|
name,
|
||||||
|
agent,
|
||||||
|
retry_count,
|
||||||
|
blocked,
|
||||||
|
depends_on,
|
||||||
|
content,
|
||||||
|
};
|
||||||
|
let _ = db.tx.send(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete a story from the shadow table (fire-and-forget).
|
||||||
|
pub fn delete_item(story_id: &str) {
|
||||||
|
delete_content(story_id);
|
||||||
|
|
||||||
|
if let Some(db) = PIPELINE_DB.get() {
|
||||||
|
// Reuse the channel with a special "deleted" stage marker.
|
||||||
|
// The background task will handle it.
|
||||||
|
// Actually, we send a delete message by abusing the write — we'll
|
||||||
|
// just remove it by setting stage to "deleted".
|
||||||
|
let msg = PipelineWriteMsg {
|
||||||
|
story_id: story_id.to_string(),
|
||||||
|
stage: "deleted".to_string(),
|
||||||
|
name: None,
|
||||||
|
agent: None,
|
||||||
|
retry_count: None,
|
||||||
|
blocked: None,
|
||||||
|
depends_on: None,
|
||||||
|
content: None,
|
||||||
|
};
|
||||||
|
let _ = db.tx.send(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the next available item number by scanning both the CRDT state
|
||||||
|
/// and the in-memory content store for the highest existing number.
|
||||||
|
pub fn next_item_number() -> u32 {
|
||||||
|
let mut max_num: u32 = 0;
|
||||||
|
|
||||||
|
// Scan CRDT items via typed projection.
|
||||||
|
for item in crate::pipeline_state::read_all_typed() {
|
||||||
|
let num_str: String = item
|
||||||
|
.story_id
|
||||||
|
.0
|
||||||
|
.chars()
|
||||||
|
.take_while(|c| c.is_ascii_digit())
|
||||||
|
.collect();
|
||||||
|
if let Ok(n) = num_str.parse::<u32>()
|
||||||
|
&& n > max_num
|
||||||
|
{
|
||||||
|
max_num = n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also scan the content store (might have items not yet in CRDT).
|
||||||
|
for id in all_content_ids() {
|
||||||
|
let num_str: String = id.chars().take_while(|c| c.is_ascii_digit()).collect();
|
||||||
|
if let Ok(n) = num_str.parse::<u32>()
|
||||||
|
&& n > max_num
|
||||||
|
{
|
||||||
|
max_num = n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
max_num + 1
|
||||||
|
}
|
||||||
@@ -0,0 +1,119 @@
|
|||||||
|
//! Background shadow-write task — persists pipeline items to SQLite asynchronously.
|
||||||
|
//!
|
||||||
|
//! `init` opens the database, runs migrations, loads existing content into the
|
||||||
|
//! in-memory store, and spawns the write loop. All subsequent writes are sent
|
||||||
|
//! over an unbounded channel so callers never block on I/O.
|
||||||
|
use crate::slog;
|
||||||
|
use sqlx::SqlitePool;
|
||||||
|
use sqlx::sqlite::SqliteConnectOptions;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::OnceLock;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
/// A pending shadow write for one pipeline item.
|
||||||
|
pub(super) struct PipelineWriteMsg {
|
||||||
|
pub(super) story_id: String,
|
||||||
|
pub(super) stage: String,
|
||||||
|
pub(super) name: Option<String>,
|
||||||
|
pub(super) agent: Option<String>,
|
||||||
|
pub(super) retry_count: Option<i64>,
|
||||||
|
pub(super) blocked: Option<bool>,
|
||||||
|
pub(super) depends_on: Option<String>,
|
||||||
|
pub(super) content: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle to the background shadow-write task.
|
||||||
|
pub struct PipelineDb {
|
||||||
|
pub(super) tx: mpsc::UnboundedSender<PipelineWriteMsg>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process-global handle to the background shadow-write task, set once during `init`.
|
||||||
|
pub(super) static PIPELINE_DB: OnceLock<PipelineDb> = OnceLock::new();
|
||||||
|
|
||||||
|
/// Initialise the pipeline database.
|
||||||
|
///
|
||||||
|
/// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations,
|
||||||
|
/// loads existing story content into the in-memory store, and spawns the
|
||||||
|
/// background write task. Safe to call only once; subsequent calls are no-ops.
|
||||||
|
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
||||||
|
if PIPELINE_DB.get().is_some() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let options = SqliteConnectOptions::new()
|
||||||
|
.filename(db_path)
|
||||||
|
.create_if_missing(true);
|
||||||
|
|
||||||
|
let pool = SqlitePool::connect_with(options).await?;
|
||||||
|
sqlx::migrate!("./migrations").run(&pool).await?;
|
||||||
|
|
||||||
|
// Load existing content into the in-memory store.
|
||||||
|
let rows: Vec<(String, Option<String>)> =
|
||||||
|
sqlx::query_as("SELECT id, content FROM pipeline_items WHERE content IS NOT NULL")
|
||||||
|
.fetch_all(&pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut content_map = HashMap::new();
|
||||||
|
for (id, content) in rows {
|
||||||
|
if let Some(c) = content {
|
||||||
|
content_map.insert(id, c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
super::content_store::init_content_store(content_map);
|
||||||
|
|
||||||
|
let (tx, mut rx) = mpsc::unbounded_channel::<PipelineWriteMsg>();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(msg) = rx.recv().await {
|
||||||
|
// The "deleted" sentinel means the caller wants the row gone.
|
||||||
|
// Issue a real DELETE so the shadow table stays clean and
|
||||||
|
// sync_crdt_stages_from_db cannot resurrect a tombstoned item on
|
||||||
|
// the next restart.
|
||||||
|
if msg.stage == "deleted" {
|
||||||
|
let result = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1")
|
||||||
|
.bind(&msg.story_id)
|
||||||
|
.execute(&pool)
|
||||||
|
.await;
|
||||||
|
if let Err(e) = result {
|
||||||
|
slog!("[db] Shadow delete failed for '{}': {e}", msg.story_id);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = chrono::Utc::now().to_rfc3339();
|
||||||
|
let result = sqlx::query(
|
||||||
|
"INSERT INTO pipeline_items \
|
||||||
|
(id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \
|
||||||
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \
|
||||||
|
ON CONFLICT(id) DO UPDATE SET \
|
||||||
|
name = excluded.name, \
|
||||||
|
stage = excluded.stage, \
|
||||||
|
agent = excluded.agent, \
|
||||||
|
retry_count = excluded.retry_count, \
|
||||||
|
blocked = excluded.blocked, \
|
||||||
|
depends_on = excluded.depends_on, \
|
||||||
|
content = COALESCE(excluded.content, pipeline_items.content), \
|
||||||
|
updated_at = excluded.updated_at",
|
||||||
|
)
|
||||||
|
.bind(&msg.story_id)
|
||||||
|
.bind(&msg.name)
|
||||||
|
.bind(&msg.stage)
|
||||||
|
.bind(&msg.agent)
|
||||||
|
.bind(msg.retry_count)
|
||||||
|
.bind(msg.blocked.map(|b| b as i64))
|
||||||
|
.bind(&msg.depends_on)
|
||||||
|
.bind(&msg.content)
|
||||||
|
.bind(&now)
|
||||||
|
.execute(&pool)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if let Err(e) = result {
|
||||||
|
slog!("[db] Shadow write failed for '{}': {e}", msg.story_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let _ = PIPELINE_DB.set(PipelineDb { tx });
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user