Files
huskies/server/src/startup/project.rs
T

417 lines
18 KiB
Rust
Raw Normal View History

2026-04-28 19:12:55 +00:00
//! Project-root discovery, subsystem initialisation (log, identity, DB, CRDT),
//! and CRDT-sync configuration.
use crate::config;
use crate::crdt_state;
use crate::crdt_sync;
use crate::db;
use crate::io::fs::find_story_kit_root;
use crate::log_buffer;
use crate::node_identity;
use crate::state::SessionState;
use crate::store::JsonFileStore;
use crate::worktree;
use std::path::{Path, PathBuf};
use std::sync::Arc;
/// Open (or scaffold) the project root according to the CLI flags and CWD.
///
/// Handles `--init`, an explicit path argument, and the default auto-detect
/// behaviour. Modifies `app_state.project_root` as a side effect.
pub(crate) async fn open_project_root(
is_init: bool,
explicit_path: Option<PathBuf>,
cwd: &Path,
app_state: &Arc<SessionState>,
store: &Arc<JsonFileStore>,
port: u16,
) {
if is_init {
let init_root = explicit_path.unwrap_or_else(|| cwd.to_path_buf());
if !init_root.exists() {
std::fs::create_dir_all(&init_root).unwrap_or_else(|e| {
eprintln!(
"error: cannot create directory {}: {e}",
init_root.display()
);
std::process::exit(1);
});
}
match crate::io::fs::open_project(
init_root.to_string_lossy().to_string(),
app_state,
store.as_ref(),
port,
)
.await
{
Ok(_) => {
if let Some(root) = app_state.project_root.lock().unwrap().as_ref() {
config::ProjectConfig::load(root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
crate::io::wizard::WizardState::init_if_missing(root);
}
}
Err(e) => {
eprintln!("error: {e}");
std::process::exit(1);
}
}
} else if let Some(explicit_root) = explicit_path {
match crate::io::fs::open_project(
explicit_root.to_string_lossy().to_string(),
app_state,
store.as_ref(),
port,
)
.await
{
Ok(_) => {
if let Some(root) = app_state.project_root.lock().unwrap().as_ref() {
config::ProjectConfig::load(root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
}
}
Err(e) => {
eprintln!("error: {e}");
std::process::exit(1);
}
}
} else if let Some(project_root) = find_story_kit_root(cwd) {
crate::io::fs::open_project(
project_root.to_string_lossy().to_string(),
app_state,
store.as_ref(),
port,
)
.await
.unwrap_or_else(|e| {
crate::slog!("Warning: failed to auto-open project at {project_root:?}: {e}");
project_root.to_string_lossy().to_string()
});
config::ProjectConfig::load(&project_root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
} else {
crate::io::fs::open_project(
cwd.to_string_lossy().to_string(),
app_state,
store.as_ref(),
port,
)
.await
.unwrap_or_else(|e| {
crate::slog!("Warning: failed to scaffold project at {cwd:?}: {e}");
cwd.to_string_lossy().to_string()
});
}
}
2026-05-14 20:08:09 +00:00
/// One-shot migration: read existing JSON store files and insert their rows
/// into the SQLite tables created by the migration scripts, then rename the
/// originals to `*.migrated` so this function is a no-op on the next startup.
///
/// Runs after `db::init()` so the tables already exist and the shared pool is
/// available. Errors are logged but never fatal.
async fn migrate_json_stores_to_sqlite(huskies_dir: &Path) {
let pool = match crate::db::get_shared_pool() {
Some(p) => p,
None => {
crate::slog!("[db-migrate] Shared pool not available; skipping JSON migration");
return;
}
};
// ── event_triggers.json ───────────────────────────────────────────────────
let et_path = huskies_dir.join("event_triggers.json");
if et_path.exists() {
match std::fs::read_to_string(&et_path) {
Ok(s) => {
let triggers: Vec<crate::service::event_triggers::EventTrigger> =
serde_json::from_str(&s).unwrap_or_default();
for t in triggers {
let predicate_json = serde_json::to_string(&t.predicate).unwrap_or_default();
let action_json = serde_json::to_string(&t.action).unwrap_or_default();
let mode = match t.mode {
crate::service::event_triggers::FireMode::Once => "once",
crate::service::event_triggers::FireMode::Persistent => "persistent",
};
let created_at = t.created_at.to_rfc3339();
let _ = sqlx::query(
"INSERT OR IGNORE INTO event_triggers \
(id, predicate_json, action_json, mode, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5)",
)
.bind(&t.id)
.bind(&predicate_json)
.bind(&action_json)
.bind(mode)
.bind(&created_at)
.execute(pool)
.await;
}
let migrated = huskies_dir.join("event_triggers.json.migrated");
let _ = std::fs::rename(&et_path, &migrated);
crate::slog!("[db-migrate] Migrated event_triggers.json → SQLite");
}
Err(e) => crate::slog!("[db-migrate] Could not read event_triggers.json: {e}"),
}
}
// ── timers.json ───────────────────────────────────────────────────────────
let timers_path = huskies_dir.join("timers.json");
if timers_path.exists() {
match std::fs::read_to_string(&timers_path) {
Ok(s) => {
let entries: Vec<crate::service::timer::TimerEntry> =
serde_json::from_str(&s).unwrap_or_default();
for e in entries {
let _ = sqlx::query(
"INSERT OR IGNORE INTO timers (story_id, scheduled_at) \
VALUES (?1, ?2)",
)
.bind(&e.story_id)
.bind(e.scheduled_at.to_rfc3339())
.execute(pool)
.await;
}
let migrated = huskies_dir.join("timers.json.migrated");
let _ = std::fs::rename(&timers_path, &migrated);
crate::slog!("[db-migrate] Migrated timers.json → SQLite");
}
Err(e) => crate::slog!("[db-migrate] Could not read timers.json: {e}"),
}
}
// ── scheduled_timers.json ─────────────────────────────────────────────────
let st_path = huskies_dir.join("scheduled_timers.json");
if st_path.exists() {
match std::fs::read_to_string(&st_path) {
Ok(s) => {
use crate::service::timer::scheduled::ScheduledTimer;
let entries: Vec<ScheduledTimer> = serde_json::from_str(&s).unwrap_or_default();
for t in entries {
let action_json = serde_json::to_string(&t.action).unwrap_or_default();
let mode_json = serde_json::to_string(&t.mode).unwrap_or_default();
let _ = sqlx::query(
"INSERT OR IGNORE INTO scheduled_timers \
(id, label, fire_at, action_json, mode_json, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
)
.bind(&t.id)
.bind(&t.label)
.bind(t.fire_at.to_rfc3339())
.bind(&action_json)
.bind(&mode_json)
.bind(t.created_at.to_rfc3339())
.execute(pool)
.await;
}
let migrated = huskies_dir.join("scheduled_timers.json.migrated");
let _ = std::fs::rename(&st_path, &migrated);
crate::slog!("[db-migrate] Migrated scheduled_timers.json → SQLite");
}
Err(e) => crate::slog!("[db-migrate] Could not read scheduled_timers.json: {e}"),
}
}
}
2026-04-28 19:12:55 +00:00
/// Set up the server log file, node identity keypair, pipeline DB, and CRDT state.
2026-05-15 01:21:38 +00:00
///
/// When `is_agent` is `true` the pipeline database is opened at an isolated
/// temporary path (or at `HUSKIES_DB_PATH` if that env-var is set) so that the
/// headless build agent never touches the production `.huskies/pipeline.db`.
/// This prevents feature-branch migrations from being applied to the shared
/// database and bricking the next server restart.
pub(crate) async fn init_subsystems(app_state: &Arc<SessionState>, cwd: &Path, is_agent: bool) {
2026-04-28 19:12:55 +00:00
// Enable persistent server log file now that the project root is known.
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
let log_dir = root.join(".huskies").join("logs");
let _ = std::fs::create_dir_all(&log_dir);
2026-05-14 11:19:15 +00:00
log_buffer::global().set_log_dir(log_dir);
2026-04-28 19:12:55 +00:00
}
// Initialise the node's Ed25519 identity keypair (file-based, mode 0600).
// The key is stored at .huskies/node_identity.key and persisted across restarts.
{
let key_path = app_state
.project_root
.lock()
.unwrap()
.as_ref()
.map(|root| root.join(".huskies").join("node_identity.key"))
.unwrap_or_else(|| cwd.join(".huskies").join("node_identity.key"));
if let Err(e) = node_identity::init_identity(&key_path) {
crate::slog!("[identity] Failed to initialise node identity keypair: {e}");
} else if let Some(id) = node_identity::get_identity() {
crate::slog!("[identity] Node ID: {}", id.node_id);
}
}
2026-05-15 01:21:38 +00:00
// Resolve the pipeline DB path.
//
// Priority order:
// 1. HUSKIES_DB_PATH env var (operator override, any mode)
// 2. Agent mode: process-local temp file so the production DB is never touched
// 3. Default: {project_root}/.huskies/pipeline.db
let pipeline_db_path: Option<PathBuf> = if let Ok(env_path) = std::env::var("HUSKIES_DB_PATH") {
let p = PathBuf::from(&env_path);
crate::slog!("[db] HUSKIES_DB_PATH override: {}", p.display());
Some(p)
} else if is_agent {
// Headless agent: use an isolated temp DB so that any migrations compiled
// into this binary (e.g. from a feature branch) are never applied to the
// production database. The temp file is process-unique and harmless to
// leave behind after the agent exits.
let pid = std::process::id();
let temp_path = std::env::temp_dir().join(format!("huskies-agent-{pid}.db"));
crate::slog!(
"[db] Agent mode: using isolated DB at {} (not touching production pipeline.db)",
temp_path.display()
);
Some(temp_path)
} else {
// Server mode: use the project-local production database.
app_state
.project_root
.lock()
.unwrap()
.as_ref()
.map(|root| root.join(".huskies").join("pipeline.db"))
};
2026-04-28 19:12:55 +00:00
if let Some(ref db_path) = pipeline_db_path {
if let Err(e) = db::init(db_path).await {
crate::slog!("[db] Failed to initialise pipeline.db: {e}");
2026-05-14 20:08:09 +00:00
} else {
2026-05-15 01:21:38 +00:00
// ── Migration drift self-check (server mode only) ─────────────────────
//
// In server mode, detect whether the live database contains migrations
// that were applied by a newer binary (e.g. a feature-branch agent that
// ran before the feature was merged). If so, log each unknown migration
// and exit with a clear actionable message. This is the root cause of
// the 2026-05-14 21:07 production outage where the server came up but
// the CRDT never initialised.
if !is_agent && let Some(pool) = db::get_shared_pool() {
let drift = db::check_schema_drift(pool).await;
if !drift.is_empty() {
for m in &drift {
crate::slog!(
"[db] UNKNOWN migration {} ('{}') applied at {} \
is not in the compiled-in set",
m.version,
m.description,
m.installed_on,
);
}
eprintln!();
eprintln!(
"error: pipeline.db contains {} migration(s) that are not \
recognised by this binary:",
drift.len()
);
for m in &drift {
eprintln!(
" \u{2022} migration {} ('{}') applied at {}",
m.version, m.description, m.installed_on
);
}
eprintln!();
eprintln!(
"This means the database was previously opened by a newer \
version of huskies."
);
eprintln!(
"To fix: rebuild huskies from the latest source (the branch \
that added these migrations) and restart."
);
eprintln!(
"Do NOT start the old binary against this database — it will \
behave incorrectly."
);
std::process::exit(1);
}
}
2026-05-14 20:08:09 +00:00
// One-shot migration: move any existing JSON store files into SQLite.
let huskies_dir = db_path.parent().unwrap_or(db_path);
migrate_json_stores_to_sqlite(huskies_dir).await;
2026-04-28 19:12:55 +00:00
}
if let Err(e) = crdt_state::init(db_path).await {
crate::slog!("[crdt] Failed to initialise CRDT state layer: {e}");
} else {
crdt_state::migrate_names_from_slugs();
// Story 934 stage 6: rewrite any pre-934 directory-style stage
// strings to the clean post-934 wire vocabulary, and set the new
// `frozen` flag on items that were previously at `Stage::Frozen`.
// Must run before legacy stage-string acceptance is dropped from
// `Stage::from_dir` (also part of stage 6).
crdt_state::migrate_legacy_stage_strings();
2026-04-28 19:12:55 +00:00
let id_migrations = crdt_state::migrate_story_ids_to_numeric();
if !id_migrations.is_empty()
&& let Some(project_root) = db_path.parent().and_then(|p| p.parent())
{
worktree::migrate_slug_paths(project_root, &id_migrations);
}
2026-05-13 16:26:09 +00:00
// Story 987: upgrade four-bool MergeJob entries to typed MergeResult enum.
crdt_state::migrate_merge_job(db_path);
2026-05-13 22:50:13 +00:00
// Story 1009: drop legacy node-hex claims that can't be converted to AgentName.
crdt_state::migrate_node_claims_to_agent_claims();
2026-05-14 18:04:35 +00:00
// Story 1052: remove stale MergeJob entries for terminal-stage
// stories so they can never cause "FAILED" labels in the UI.
crdt_state::purge_done_stage_merge_jobs();
2026-04-28 19:12:55 +00:00
}
}
}
/// Wire up CRDT sync: trusted keys, token auth, and the rendezvous client.
///
/// In agent mode the rendezvous URL comes from the CLI; otherwise it is read
/// from `project.toml`.
pub(crate) fn configure_crdt_sync(
app_state: &Arc<SessionState>,
is_agent: bool,
agent_rendezvous: Option<String>,
crdt_join_token: Option<String>,
) {
let sync_config = if is_agent {
agent_rendezvous
.clone()
.map(|url| (url, Vec::new(), false, Vec::new()))
} else {
app_state
.project_root
.lock()
.unwrap()
.as_ref()
.and_then(|root| config::ProjectConfig::load(root).ok())
.and_then(|cfg| {
cfg.rendezvous.map(|url| {
(
url,
cfg.trusted_keys,
cfg.crdt_require_token,
cfg.crdt_tokens,
)
})
})
};
if let Some((rendezvous_url, trusted_keys, require_token, crdt_tokens)) = sync_config {
crdt_sync::init_trusted_keys(trusted_keys);
crdt_sync::init_token_auth(require_token, crdt_tokens);
crdt_sync::spawn_rendezvous_client(rendezvous_url, crdt_join_token);
} else {
let (keys, require_token, crdt_tokens) = app_state
.project_root
.lock()
.unwrap()
.as_ref()
.and_then(|root| config::ProjectConfig::load(root).ok())
.map(|cfg| (cfg.trusted_keys, cfg.crdt_require_token, cfg.crdt_tokens))
.unwrap_or_default();
crdt_sync::init_trusted_keys(keys);
crdt_sync::init_token_auth(require_token, crdt_tokens);
}
}