From f1ef31d1ee7da385c381958910a1a0df800f0140 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 7 Apr 2026 13:09:48 +0000 Subject: [PATCH] huskies: merge 489_story_sqlite_shadow_write_for_pipeline_state_via_sqlx --- Cargo.lock | 445 +++++++++++++++++- Cargo.toml | 6 + server/Cargo.toml | 1 + .../20240101000000_init_pipeline_items.sql | 11 + server/src/agents/lifecycle.rs | 4 + server/src/db/mod.rs | 333 +++++++++++++ server/src/main.rs | 16 + 7 files changed, 815 insertions(+), 1 deletion(-) create mode 100644 server/migrations/20240101000000_init_pipeline_items.sql create mode 100644 server/src/db/mod.rs diff --git a/Cargo.lock b/Cargo.lock index bece483e..65649ace 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -185,6 +191,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -585,6 +600,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.5.0" @@ -613,6 +643,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -784,6 +823,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" dependencies = [ "const-oid 0.9.6", + "pem-rfc7468", "zeroize", ] @@ -857,6 +897,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer 0.10.4", + "const-oid 0.9.6", "crypto-common 0.1.7", "subtle", ] @@ -883,6 +924,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast-rs" version = "1.2.1" @@ -926,6 +973,9 @@ name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +dependencies = [ + "serde", +] [[package]] name = "encoding_rs" @@ -952,6 +1002,17 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "etcetera" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c7b13d0780cb82722fd59f6f57f925e143427e4a75313a6c77243bf5326ae6" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.59.0", +] + [[package]] name = "event-listener" version = "5.4.1" @@ -1084,6 +1145,17 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1096,6 +1168,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1187,6 +1265,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + [[package]] name = "futures-io" version = "0.3.32" @@ -1371,7 +1460,7 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ - "foldhash", + "foldhash 0.1.5", ] [[package]] @@ -1379,6 +1468,11 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", +] [[package]] name = "hashlink" @@ -1425,6 +1519,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hkdf" version = "0.12.4" @@ -1443,6 +1543,15 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "home" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "homedir" version = "0.3.6" @@ -1552,6 +1661,7 @@ dependencies = [ "serde_urlencoded", "serde_yaml", "sha2 0.11.0", + "sqlx", "strip-ansi-escapes", "tempfile", "tokio", @@ -2061,6 +2171,9 @@ name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +dependencies = [ + "spin", +] [[package]] name = "leb128fmt" @@ -2074,6 +2187,12 @@ version = "0.2.184" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" +[[package]] +name = "libm" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" + [[package]] name = "libredox" version = "0.1.15" @@ -2504,6 +2623,16 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest 0.10.7", +] + [[package]] name = "memchr" version = "2.8.0" @@ -2696,12 +2825,48 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-bigint-dig" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e661dda6640fad38e827a6d4a310ff4763082116fe217f279885c97f511bb0b7" +dependencies = [ + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand 0.8.5", + "smallvec", + "zeroize", +] + [[package]] name = "num-conv" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -2709,6 +2874,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -2836,6 +3002,15 @@ dependencies = [ "hmac", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -2904,6 +3079,17 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der", + "pkcs8", + "spki", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -3520,6 +3706,26 @@ dependencies = [ "serde", ] +[[package]] +name = "rsa" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" +dependencies = [ + "const-oid 0.9.6", + "digest 0.10.7", + "num-bigint-dig", + "num-integer", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core 0.6.4", + "signature", + "spki", + "subtle", + "zeroize", +] + [[package]] name = "ruma" version = "0.14.1" @@ -4136,6 +4342,7 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ + "digest 0.10.7", "rand_core 0.6.4", ] @@ -4187,6 +4394,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spki" @@ -4198,6 +4408,190 @@ dependencies = [ "der", ] +[[package]] +name = "sqlx" +version = "0.9.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "decccfa5f2f3eac95eb68085cfe69a0172fa9711666c3a634cfc806d4fb74a47" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.9.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86854e8c6aba0dafcf1c04b4836b0b7fa3a20c560e3554567afefe1258fa4e60" +dependencies = [ + "base64", + "bytes", + "cfg-if", + "crc", + "crossbeam-queue", + "either", + "event-listener", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.16.1", + "hashlink", + "indexmap", + "log", + "memchr", + "percent-encoding", + "serde", + "serde_json", + "sha2 0.10.9", + "smallvec", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.9.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7aab9442ed1568e3aed6c368737226ee4e0e8d1deb0e0887fa6bf15282ace44" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 2.0.117", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.9.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34eb4976b8f02ac57ee98d4ce40cd1aad7ab31d9792977bc3171f787ba6ba2fb" +dependencies = [ + "cfg-if", + "dotenvy", + "either", + "heck", + "hex", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2 0.10.9", + "sqlx-core", + "sqlx-sqlite", + "syn 2.0.117", + "thiserror 2.0.18", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.9.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fef16f3d52a3710a672b48175b713e86476e2df85576a753c8b37ad11a483c0" +dependencies = [ + "atoi", + "base64", + "bitflags 2.11.0", + "byteorder", + "bytes", + "crc", + "digest 0.10.7", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "percent-encoding", + "rand 0.8.5", + "rsa", + "sha1", + "sha2 0.10.9", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.18", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.9.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f053cf36ecb2793a9d9bb02d01bbad1ef66481d5db6ff5ab2dfb7b070cc0d13c" +dependencies = [ + "atoi", + "base64", + "bitflags 2.11.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "rand 0.8.5", + "serde", + "serde_json", + "sha2 0.10.9", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.18", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.9.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe2cd6cee87120b1e1dd31356b5589911995c777707e49f2750eec7c7fe43eef" +dependencies = [ + "atoi", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "thiserror 2.0.18", + "tracing", + "url", +] + [[package]] name = "sse-codec" version = "0.3.3" @@ -4240,6 +4634,17 @@ dependencies = [ "quote", ] +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strip-ansi-escapes" version = "0.2.1" @@ -4682,6 +5087,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -4819,6 +5225,12 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.24" @@ -4834,6 +5246,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-properties" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d" + [[package]] name = "unicode-segmentation" version = "1.13.2" @@ -5020,6 +5438,12 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.117" @@ -5204,6 +5628,16 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "whoami" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" +dependencies = [ + "libredox", + "wasite", +] + [[package]] name = "widestring" version = "1.2.1" @@ -5415,6 +5849,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.60.2" diff --git a/Cargo.toml b/Cargo.toml index 9ce86d36..8476233c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,3 +42,9 @@ pulldown-cmark = { version = "0.13.3", default-features = false, features = [ ] } regex = "1" libc = "0.2" +sqlx = { version = "=0.9.0-alpha.1", default-features = false, features = [ + "runtime-tokio", + "sqlite", + "macros", + "migrate", +] } diff --git a/server/Cargo.toml b/server/Cargo.toml index 7c84e772..c6ceb682 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -38,6 +38,7 @@ tokio-tungstenite = { workspace = true } # Force bundled SQLite so static musl builds don't need a system libsqlite3 libsqlite3-sys = { version = "0.35.0", features = ["bundled"] } +sqlx = { workspace = true } wait-timeout = "0.2.1" [target.'cfg(unix)'.dependencies] diff --git a/server/migrations/20240101000000_init_pipeline_items.sql b/server/migrations/20240101000000_init_pipeline_items.sql new file mode 100644 index 00000000..6562ecde --- /dev/null +++ b/server/migrations/20240101000000_init_pipeline_items.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS pipeline_items ( + id TEXT PRIMARY KEY, + name TEXT, + stage TEXT NOT NULL, + agent TEXT, + retry_count INTEGER, + blocked INTEGER, + depends_on TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +); diff --git a/server/src/agents/lifecycle.rs b/server/src/agents/lifecycle.rs index f6b4363f..3dc917c6 100644 --- a/server/src/agents/lifecycle.rs +++ b/server/src/agents/lifecycle.rs @@ -65,6 +65,10 @@ fn move_item<'a>( } } + // Shadow-write the new stage to SQLite. This is fire-and-forget; a missing + // database (e.g. in tests) is silently ignored. + crate::db::shadow_write(story_id, target_dir, &target_path); + slog!("[lifecycle] Moved '{story_id}' from work/{src_dir}/ to work/{target_dir}/"); Ok(Some(src_dir)) } diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs new file mode 100644 index 00000000..e259a47c --- /dev/null +++ b/server/src/db/mod.rs @@ -0,0 +1,333 @@ +/// SQLite shadow-write layer for pipeline state. +/// +/// All filesystem pipeline operations (move_story_to_X etc.) remain authoritative. +/// This module provides a fire-and-forget channel that dual-writes each move to +/// `.huskies/pipeline.db` so a database layer is ready for future CRDT integration. +/// +/// Reads are NOT served from SQLite — the filesystem remains the single source of truth. +use crate::io::story_metadata::parse_front_matter; +use crate::slog; +use sqlx::sqlite::SqliteConnectOptions; +use sqlx::SqlitePool; +use std::path::Path; +use std::sync::OnceLock; +use tokio::sync::mpsc; + +/// A pending shadow write for one pipeline item. +struct PipelineWriteMsg { + story_id: String, + stage: String, + name: Option, + agent: Option, + retry_count: Option, + blocked: Option, + depends_on: Option, +} + +/// Handle to the background shadow-write task. +pub struct PipelineDb { + tx: mpsc::UnboundedSender, +} + +static PIPELINE_DB: OnceLock = OnceLock::new(); + +/// Initialise the pipeline database. +/// +/// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations, +/// and spawns the background write task. Safe to call only once; subsequent calls +/// are no-ops (the `OnceLock` rejects them silently). +pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { + if PIPELINE_DB.get().is_some() { + return Ok(()); + } + + let options = SqliteConnectOptions::new() + .filename(db_path) + .create_if_missing(true); + + let pool = SqlitePool::connect_with(options).await?; + sqlx::migrate!("./migrations").run(&pool).await?; + + let (tx, mut rx) = mpsc::unbounded_channel::(); + + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + let now = chrono::Utc::now().to_rfc3339(); + let result = sqlx::query( + "INSERT INTO pipeline_items \ + (id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \ + ON CONFLICT(id) DO UPDATE SET \ + name = excluded.name, \ + stage = excluded.stage, \ + agent = excluded.agent, \ + retry_count = excluded.retry_count, \ + blocked = excluded.blocked, \ + depends_on = excluded.depends_on, \ + updated_at = excluded.updated_at", + ) + .bind(&msg.story_id) + .bind(&msg.name) + .bind(&msg.stage) + .bind(&msg.agent) + .bind(msg.retry_count) + .bind(msg.blocked.map(|b| b as i64)) + .bind(&msg.depends_on) + .bind(&now) + .execute(&pool) + .await; + + if let Err(e) = result { + slog!("[db] Shadow write failed for '{}': {e}", msg.story_id); + } + } + }); + + let _ = PIPELINE_DB.set(PipelineDb { tx }); + Ok(()) +} + +/// Shadow-write a pipeline item move to SQLite. +/// +/// Reads front matter from `file_path` (the post-move location) to extract +/// metadata. The write is fire-and-forget — errors are logged but never +/// propagate to the caller. If the database has not been initialised this is a +/// complete no-op. +pub fn shadow_write(story_id: &str, stage: &str, file_path: &Path) { + let Some(db) = PIPELINE_DB.get() else { + return; + }; + + let (name, agent, retry_count, blocked, depends_on) = + match std::fs::read_to_string(file_path) { + Ok(contents) => match parse_front_matter(&contents) { + Ok(meta) => ( + meta.name, + meta.agent, + meta.retry_count.map(|r| r as i64), + meta.blocked, + meta.depends_on.as_ref().and_then(|d| serde_json::to_string(d).ok()), + ), + Err(_) => (None, None, None, None, None), + }, + Err(_) => (None, None, None, None, None), + }; + + let msg = PipelineWriteMsg { + story_id: story_id.to_string(), + stage: stage.to_string(), + name, + agent, + retry_count, + blocked, + depends_on, + }; + + // Ignore send errors: the background task may have exited (e.g. in tests). + let _ = db.tx.send(msg); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + + /// Helper: write a minimal story .md file with front matter. + fn write_story(dir: &std::path::Path, filename: &str, content: &str) { + fs::write(dir.join(filename), content).unwrap(); + } + + #[tokio::test] + async fn shadow_write_inserts_row_into_sqlite() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("pipeline.db"); + + // Initialise the DB in an isolated pool (not the global singleton, to + // keep tests hermetic). + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .unwrap(); + + // Write a story file in a temp stage dir. + let stage_dir = tmp.path().join("2_current"); + fs::create_dir_all(&stage_dir).unwrap(); + let story_path = stage_dir.join("10_story_shadow_test.md"); + write_story( + &stage_dir, + "10_story_shadow_test.md", + "---\nname: Shadow Test\nagent: coder-opus\nretry_count: 2\nblocked: false\n---\n# Story\n", + ); + + // Perform the upsert directly (bypass the global singleton). + let now = chrono::Utc::now().to_rfc3339(); + sqlx::query( + "INSERT INTO pipeline_items \ + (id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \ + ON CONFLICT(id) DO UPDATE SET \ + name = excluded.name, \ + stage = excluded.stage, \ + agent = excluded.agent, \ + retry_count = excluded.retry_count, \ + blocked = excluded.blocked, \ + depends_on = excluded.depends_on, \ + updated_at = excluded.updated_at", + ) + .bind("10_story_shadow_test") + .bind("Shadow Test") + .bind("2_current") + .bind("coder-opus") + .bind(2_i64) + .bind(0_i64) + .bind(Option::::None) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + + // Query back and verify. + let row: (String, Option, String) = sqlx::query_as( + "SELECT id, name, stage FROM pipeline_items WHERE id = ?1", + ) + .bind("10_story_shadow_test") + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!(row.0, "10_story_shadow_test"); + assert_eq!(row.1.as_deref(), Some("Shadow Test")); + assert_eq!(row.2, "2_current"); + + // Verify metadata was parsed correctly from the story file. + let (name, _agent, retry_count, _blocked, _depends_on) = + match std::fs::read_to_string(&story_path) { + Ok(contents) => match parse_front_matter(&contents) { + Ok(meta) => ( + meta.name, + meta.agent, + meta.retry_count.map(|r| r as i64), + meta.blocked, + meta.depends_on, + ), + Err(_) => (None, None, None, None, None), + }, + Err(_) => (None, None, None, None, None), + }; + + assert_eq!(name.as_deref(), Some("Shadow Test")); + assert_eq!(retry_count, Some(2)); + } + + #[tokio::test] + async fn pipeline_items_table_has_correct_columns() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("pipeline.db"); + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .unwrap(); + + // Verify all required columns exist by inserting a full row. + let now = chrono::Utc::now().to_rfc3339(); + sqlx::query( + "INSERT INTO pipeline_items \ + (id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)", + ) + .bind("99_story_col_test") + .bind(Option::::None) + .bind("1_backlog") + .bind(Option::::None) + .bind(Option::::None) + .bind(Option::::None) + .bind(Option::::None) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pipeline_items") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count.0, 1); + } + + #[tokio::test] + async fn upsert_updates_stage_on_move() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("pipeline.db"); + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .unwrap(); + + let now = chrono::Utc::now().to_rfc3339(); + + // Insert initial row in backlog. + sqlx::query( + "INSERT INTO pipeline_items \ + (id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)", + ) + .bind("5_story_move") + .bind("Move Me") + .bind("1_backlog") + .bind(Option::::None) + .bind(Option::::None) + .bind(Option::::None) + .bind(Option::::None) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + + // Upsert with new stage (simulating move to current). + sqlx::query( + "INSERT INTO pipeline_items \ + (id, name, stage, agent, retry_count, blocked, depends_on, created_at, updated_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \ + ON CONFLICT(id) DO UPDATE SET \ + name = excluded.name, \ + stage = excluded.stage, \ + agent = excluded.agent, \ + retry_count = excluded.retry_count, \ + blocked = excluded.blocked, \ + depends_on = excluded.depends_on, \ + updated_at = excluded.updated_at", + ) + .bind("5_story_move") + .bind("Move Me") + .bind("2_current") + .bind(Option::::None) + .bind(Option::::None) + .bind(Option::::None) + .bind(Option::::None) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + + let row: (String,) = + sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1") + .bind("5_story_move") + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!(row.0, "2_current"); + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 59787c8b..a0ab3328 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,6 +6,7 @@ mod agent_log; mod agents; mod chat; mod config; +mod db; mod http; mod io; mod llm; @@ -282,6 +283,21 @@ async fn main() -> Result<(), std::io::Error> { log_buffer::global().set_log_file(log_dir.join("server.log")); } + // Initialise the SQLite pipeline shadow-write database. + // Clone the path out before the await so we don't hold the MutexGuard across + // an await point. + let pipeline_db_path = app_state + .project_root + .lock() + .unwrap() + .as_ref() + .map(|root| root.join(".huskies").join("pipeline.db")); + if let Some(db_path) = pipeline_db_path + && let Err(e) = db::init(&db_path).await + { + slog!("[db] Failed to initialise pipeline.db: {e}"); + } + let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default())); // Filesystem watcher: broadcast channel for work/ pipeline changes.