Files
huskies/server/src/startup/project.rs
T

340 lines
14 KiB
Rust
Raw Normal View History

2026-04-28 19:12:55 +00:00
//! Project-root discovery, subsystem initialisation (log, identity, DB, CRDT),
//! and CRDT-sync configuration.
use crate::config;
use crate::crdt_state;
use crate::crdt_sync;
use crate::db;
use crate::io::fs::find_story_kit_root;
use crate::log_buffer;
use crate::node_identity;
use crate::state::SessionState;
use crate::store::JsonFileStore;
use crate::worktree;
use std::path::{Path, PathBuf};
use std::sync::Arc;
/// Open (or scaffold) the project root according to the CLI flags and CWD.
///
/// Handles `--init`, an explicit path argument, and the default auto-detect
/// behaviour. Modifies `app_state.project_root` as a side effect.
pub(crate) async fn open_project_root(
is_init: bool,
explicit_path: Option<PathBuf>,
cwd: &Path,
app_state: &Arc<SessionState>,
store: &Arc<JsonFileStore>,
port: u16,
) {
if is_init {
let init_root = explicit_path.unwrap_or_else(|| cwd.to_path_buf());
if !init_root.exists() {
std::fs::create_dir_all(&init_root).unwrap_or_else(|e| {
eprintln!(
"error: cannot create directory {}: {e}",
init_root.display()
);
std::process::exit(1);
});
}
match crate::io::fs::open_project(
init_root.to_string_lossy().to_string(),
app_state,
store.as_ref(),
port,
)
.await
{
Ok(_) => {
if let Some(root) = app_state.project_root.lock().unwrap().as_ref() {
config::ProjectConfig::load(root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
crate::io::wizard::WizardState::init_if_missing(root);
}
}
Err(e) => {
eprintln!("error: {e}");
std::process::exit(1);
}
}
} else if let Some(explicit_root) = explicit_path {
match crate::io::fs::open_project(
explicit_root.to_string_lossy().to_string(),
app_state,
store.as_ref(),
port,
)
.await
{
Ok(_) => {
if let Some(root) = app_state.project_root.lock().unwrap().as_ref() {
config::ProjectConfig::load(root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
}
}
Err(e) => {
eprintln!("error: {e}");
std::process::exit(1);
}
}
} else if let Some(project_root) = find_story_kit_root(cwd) {
crate::io::fs::open_project(
project_root.to_string_lossy().to_string(),
app_state,
store.as_ref(),
port,
)
.await
.unwrap_or_else(|e| {
crate::slog!("Warning: failed to auto-open project at {project_root:?}: {e}");
project_root.to_string_lossy().to_string()
});
config::ProjectConfig::load(&project_root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
} else {
crate::io::fs::open_project(
cwd.to_string_lossy().to_string(),
app_state,
store.as_ref(),
port,
)
.await
.unwrap_or_else(|e| {
crate::slog!("Warning: failed to scaffold project at {cwd:?}: {e}");
cwd.to_string_lossy().to_string()
});
}
}
2026-05-14 20:08:09 +00:00
/// One-shot migration: read existing JSON store files and insert their rows
/// into the SQLite tables created by the migration scripts, then rename the
/// originals to `*.migrated` so this function is a no-op on the next startup.
///
/// Runs after `db::init()` so the tables already exist and the shared pool is
/// available. Errors are logged but never fatal.
async fn migrate_json_stores_to_sqlite(huskies_dir: &Path) {
let pool = match crate::db::get_shared_pool() {
Some(p) => p,
None => {
crate::slog!("[db-migrate] Shared pool not available; skipping JSON migration");
return;
}
};
// ── event_triggers.json ───────────────────────────────────────────────────
let et_path = huskies_dir.join("event_triggers.json");
if et_path.exists() {
match std::fs::read_to_string(&et_path) {
Ok(s) => {
let triggers: Vec<crate::service::event_triggers::EventTrigger> =
serde_json::from_str(&s).unwrap_or_default();
for t in triggers {
let predicate_json = serde_json::to_string(&t.predicate).unwrap_or_default();
let action_json = serde_json::to_string(&t.action).unwrap_or_default();
let mode = match t.mode {
crate::service::event_triggers::FireMode::Once => "once",
crate::service::event_triggers::FireMode::Persistent => "persistent",
};
let created_at = t.created_at.to_rfc3339();
let _ = sqlx::query(
"INSERT OR IGNORE INTO event_triggers \
(id, predicate_json, action_json, mode, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5)",
)
.bind(&t.id)
.bind(&predicate_json)
.bind(&action_json)
.bind(mode)
.bind(&created_at)
.execute(pool)
.await;
}
let migrated = huskies_dir.join("event_triggers.json.migrated");
let _ = std::fs::rename(&et_path, &migrated);
crate::slog!("[db-migrate] Migrated event_triggers.json → SQLite");
}
Err(e) => crate::slog!("[db-migrate] Could not read event_triggers.json: {e}"),
}
}
// ── timers.json ───────────────────────────────────────────────────────────
let timers_path = huskies_dir.join("timers.json");
if timers_path.exists() {
match std::fs::read_to_string(&timers_path) {
Ok(s) => {
let entries: Vec<crate::service::timer::TimerEntry> =
serde_json::from_str(&s).unwrap_or_default();
for e in entries {
let _ = sqlx::query(
"INSERT OR IGNORE INTO timers (story_id, scheduled_at) \
VALUES (?1, ?2)",
)
.bind(&e.story_id)
.bind(e.scheduled_at.to_rfc3339())
.execute(pool)
.await;
}
let migrated = huskies_dir.join("timers.json.migrated");
let _ = std::fs::rename(&timers_path, &migrated);
crate::slog!("[db-migrate] Migrated timers.json → SQLite");
}
Err(e) => crate::slog!("[db-migrate] Could not read timers.json: {e}"),
}
}
// ── scheduled_timers.json ─────────────────────────────────────────────────
let st_path = huskies_dir.join("scheduled_timers.json");
if st_path.exists() {
match std::fs::read_to_string(&st_path) {
Ok(s) => {
use crate::service::timer::scheduled::ScheduledTimer;
let entries: Vec<ScheduledTimer> = serde_json::from_str(&s).unwrap_or_default();
for t in entries {
let action_json = serde_json::to_string(&t.action).unwrap_or_default();
let mode_json = serde_json::to_string(&t.mode).unwrap_or_default();
let _ = sqlx::query(
"INSERT OR IGNORE INTO scheduled_timers \
(id, label, fire_at, action_json, mode_json, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
)
.bind(&t.id)
.bind(&t.label)
.bind(t.fire_at.to_rfc3339())
.bind(&action_json)
.bind(&mode_json)
.bind(t.created_at.to_rfc3339())
.execute(pool)
.await;
}
let migrated = huskies_dir.join("scheduled_timers.json.migrated");
let _ = std::fs::rename(&st_path, &migrated);
crate::slog!("[db-migrate] Migrated scheduled_timers.json → SQLite");
}
Err(e) => crate::slog!("[db-migrate] Could not read scheduled_timers.json: {e}"),
}
}
}
2026-04-28 19:12:55 +00:00
/// Set up the server log file, node identity keypair, pipeline DB, and CRDT state.
pub(crate) async fn init_subsystems(app_state: &Arc<SessionState>, cwd: &Path) {
// Enable persistent server log file now that the project root is known.
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
let log_dir = root.join(".huskies").join("logs");
let _ = std::fs::create_dir_all(&log_dir);
2026-05-14 11:19:15 +00:00
log_buffer::global().set_log_dir(log_dir);
2026-04-28 19:12:55 +00:00
}
// Initialise the node's Ed25519 identity keypair (file-based, mode 0600).
// The key is stored at .huskies/node_identity.key and persisted across restarts.
{
let key_path = app_state
.project_root
.lock()
.unwrap()
.as_ref()
.map(|root| root.join(".huskies").join("node_identity.key"))
.unwrap_or_else(|| cwd.join(".huskies").join("node_identity.key"));
if let Err(e) = node_identity::init_identity(&key_path) {
crate::slog!("[identity] Failed to initialise node identity keypair: {e}");
} else if let Some(id) = node_identity::get_identity() {
crate::slog!("[identity] Node ID: {}", id.node_id);
}
}
// Initialise the SQLite pipeline shadow-write database and CRDT state layer.
// Clone the path out before the await so we don't hold the MutexGuard across
// an await point.
let pipeline_db_path = app_state
.project_root
.lock()
.unwrap()
.as_ref()
.map(|root| root.join(".huskies").join("pipeline.db"));
if let Some(ref db_path) = pipeline_db_path {
if let Err(e) = db::init(db_path).await {
crate::slog!("[db] Failed to initialise pipeline.db: {e}");
2026-05-14 20:08:09 +00:00
} else {
// One-shot migration: move any existing JSON store files into SQLite.
let huskies_dir = db_path.parent().unwrap_or(db_path);
migrate_json_stores_to_sqlite(huskies_dir).await;
2026-04-28 19:12:55 +00:00
}
if let Err(e) = crdt_state::init(db_path).await {
crate::slog!("[crdt] Failed to initialise CRDT state layer: {e}");
} else {
crdt_state::migrate_names_from_slugs();
// Story 934 stage 6: rewrite any pre-934 directory-style stage
// strings to the clean post-934 wire vocabulary, and set the new
// `frozen` flag on items that were previously at `Stage::Frozen`.
// Must run before legacy stage-string acceptance is dropped from
// `Stage::from_dir` (also part of stage 6).
crdt_state::migrate_legacy_stage_strings();
2026-04-28 19:12:55 +00:00
let id_migrations = crdt_state::migrate_story_ids_to_numeric();
if !id_migrations.is_empty()
&& let Some(project_root) = db_path.parent().and_then(|p| p.parent())
{
worktree::migrate_slug_paths(project_root, &id_migrations);
}
2026-05-13 16:26:09 +00:00
// Story 987: upgrade four-bool MergeJob entries to typed MergeResult enum.
crdt_state::migrate_merge_job(db_path);
2026-05-13 22:50:13 +00:00
// Story 1009: drop legacy node-hex claims that can't be converted to AgentName.
crdt_state::migrate_node_claims_to_agent_claims();
2026-05-14 18:04:35 +00:00
// Story 1052: remove stale MergeJob entries for terminal-stage
// stories so they can never cause "FAILED" labels in the UI.
crdt_state::purge_done_stage_merge_jobs();
2026-04-28 19:12:55 +00:00
}
}
}
/// Wire up CRDT sync: trusted keys, token auth, and the rendezvous client.
///
/// In agent mode the rendezvous URL comes from the CLI; otherwise it is read
/// from `project.toml`.
pub(crate) fn configure_crdt_sync(
app_state: &Arc<SessionState>,
is_agent: bool,
agent_rendezvous: Option<String>,
crdt_join_token: Option<String>,
) {
let sync_config = if is_agent {
agent_rendezvous
.clone()
.map(|url| (url, Vec::new(), false, Vec::new()))
} else {
app_state
.project_root
.lock()
.unwrap()
.as_ref()
.and_then(|root| config::ProjectConfig::load(root).ok())
.and_then(|cfg| {
cfg.rendezvous.map(|url| {
(
url,
cfg.trusted_keys,
cfg.crdt_require_token,
cfg.crdt_tokens,
)
})
})
};
if let Some((rendezvous_url, trusted_keys, require_token, crdt_tokens)) = sync_config {
crdt_sync::init_trusted_keys(trusted_keys);
crdt_sync::init_token_auth(require_token, crdt_tokens);
crdt_sync::spawn_rendezvous_client(rendezvous_url, crdt_join_token);
} else {
let (keys, require_token, crdt_tokens) = app_state
.project_root
.lock()
.unwrap()
.as_ref()
.and_then(|root| config::ProjectConfig::load(root).ok())
.map(|cfg| (cfg.trusted_keys, cfg.crdt_require_token, cfg.crdt_tokens))
.unwrap_or_default();
crdt_sync::init_trusted_keys(keys);
crdt_sync::init_token_auth(require_token, crdt_tokens);
}
}