Files
storkit/server/src/main.rs

376 lines
15 KiB
Rust
Raw Normal View History

// matrix-sdk-crypto's deeply nested types require a higher recursion limit
// when the `e2e-encryption` feature is enabled.
#![recursion_limit = "256"]
mod agent_log;
mod agents;
mod config;
2026-02-16 16:24:21 +00:00
mod http;
mod io;
mod llm;
pub mod log_buffer;
mod matrix;
pub mod rebuild;
2026-03-20 12:26:02 +00:00
pub mod slack;
mod state;
mod store;
pub mod transport;
pub mod whatsapp;
2026-03-20 12:26:02 +00:00
mod workflow;
mod worktree;
use crate::agents::AgentPool;
2026-02-16 16:24:21 +00:00
use crate::http::build_routes;
use crate::http::context::AppContext;
use crate::http::{remove_port_file, resolve_port, write_port_file};
use crate::io::fs::find_story_kit_root;
use crate::state::SessionState;
use crate::store::JsonFileStore;
use crate::workflow::WorkflowState;
2026-02-16 16:24:21 +00:00
use poem::Server;
use poem::listener::TcpListener;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::broadcast;
/// Resolve the optional positional path argument (everything after the binary
/// name) into an absolute `PathBuf`. Returns `None` when no argument was
/// supplied so that the caller can fall back to the auto-detect behaviour.
fn parse_project_path_arg(args: &[String], cwd: &std::path::Path) -> Option<PathBuf> {
args.first().map(|s| io::fs::resolve_cli_path(cwd, s))
}
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let app_state = Arc::new(SessionState::default());
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
let store = Arc::new(
JsonFileStore::from_path(PathBuf::from("store.json")).map_err(std::io::Error::other)?,
);
let port = resolve_port();
// Collect CLI args, skipping the binary name (argv[0]).
let cli_args: Vec<String> = std::env::args().skip(1).collect();
let explicit_path = parse_project_path_arg(&cli_args, &cwd);
if let Some(explicit_root) = explicit_path {
// An explicit path was given on the command line.
// Open it directly — scaffold .storkit/ if it is missing — and
// exit with a clear error message if the path is invalid.
match io::fs::open_project(
explicit_root.to_string_lossy().to_string(),
&app_state,
store.as_ref(),
)
.await
{
Ok(_) => {
if let Some(root) = app_state.project_root.lock().unwrap().as_ref() {
config::ProjectConfig::load(root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
}
}
Err(e) => {
eprintln!("error: {e}");
std::process::exit(1);
}
}
} else {
// No path argument — auto-detect a .storkit/ project in cwd or
// parent directories (preserves existing behaviour).
if let Some(project_root) = find_story_kit_root(&cwd) {
io::fs::open_project(
project_root.to_string_lossy().to_string(),
&app_state,
store.as_ref(),
)
.await
.unwrap_or_else(|e| {
slog!("Warning: failed to auto-open project at {project_root:?}: {e}");
project_root.to_string_lossy().to_string()
});
// Validate agent config for the detected project root.
config::ProjectConfig::load(&project_root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
} else {
// No .storkit/ found — fall back to cwd so existing behaviour is preserved.
// TRACE:MERGE-DEBUG — remove once root cause is found
2026-03-20 12:26:02 +00:00
slog!(
"[MERGE-DEBUG] main: no .storkit/ found, falling back to cwd {:?}",
cwd
);
*app_state.project_root.lock().unwrap() = Some(cwd.clone());
}
}
// Enable persistent server log file now that the project root is known.
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
let log_dir = root.join(".storkit").join("logs");
let _ = std::fs::create_dir_all(&log_dir);
log_buffer::global().set_log_file(log_dir.join("server.log"));
}
let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default()));
// Filesystem watcher: broadcast channel for work/ pipeline changes.
// Created before AgentPool so the pool can emit AgentStateChanged events.
let (watcher_tx, _) = broadcast::channel::<io::watcher::WatcherEvent>(1024);
let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));
// Start the background watchdog that detects and cleans up orphaned Running agents.
// When orphans are found, auto-assign is triggered to reassign free agents.
let watchdog_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
AgentPool::spawn_watchdog(Arc::clone(&agents), watchdog_root);
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
let work_dir = root.join(".storkit").join("work");
if work_dir.is_dir() {
let watcher_config = config::ProjectConfig::load(root)
.map(|c| c.watcher)
.unwrap_or_default();
2026-03-20 12:26:02 +00:00
io::watcher::start_watcher(work_dir, root.clone(), watcher_tx.clone(), watcher_config);
}
}
// Subscribe to watcher events so that auto-assign triggers when a work item
// file is moved into an active pipeline stage (2_current/, 3_qa/, 4_merge/).
{
let watcher_auto_rx = watcher_tx.subscribe();
let watcher_auto_agents = Arc::clone(&agents);
2026-03-20 12:26:02 +00:00
let watcher_auto_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
if let Some(root) = watcher_auto_root {
tokio::spawn(async move {
let mut rx = watcher_auto_rx;
while let Ok(event) = rx.recv().await {
if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event
&& matches!(stage.as_str(), "2_current" | "3_qa" | "4_merge")
{
slog!(
"[auto-assign] Watcher detected work item in {stage}/; \
triggering auto-assign."
);
2026-03-20 12:26:02 +00:00
watcher_auto_agents.auto_assign_available_work(&root).await;
}
}
});
}
}
// Reconciliation progress channel: startup reconciliation → WebSocket clients.
2026-03-20 12:26:02 +00:00
let (reconciliation_tx, _) = broadcast::channel::<agents::ReconciliationEvent>(64);
// Permission channel: MCP prompt_permission → WebSocket handler.
let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel();
// Clone watcher_tx for the Matrix bot before it is moved into AppContext.
let watcher_tx_for_bot = watcher_tx.clone();
// Wrap perm_rx in Arc<Mutex> so it can be shared with both the WebSocket
// handler (via AppContext) and the Matrix bot.
let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx));
let perm_rx_for_bot = Arc::clone(&perm_rx);
// Capture project root, agents Arc, and reconciliation sender before ctx
// is consumed by build_routes.
let startup_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
let startup_agents = Arc::clone(&agents);
let startup_reconciliation_tx = reconciliation_tx.clone();
// Clone for shutdown cleanup — kill orphaned PTY children before exiting.
let agents_for_shutdown = Arc::clone(&agents);
let ctx = AppContext {
state: app_state,
store,
workflow,
agents,
watcher_tx,
reconciliation_tx,
perm_tx,
perm_rx,
qa_app_process: Arc::new(std::sync::Mutex::new(None)),
};
// Build WhatsApp webhook context if bot.toml configures transport = "whatsapp".
let whatsapp_ctx: Option<Arc<whatsapp::WhatsAppWebhookContext>> = startup_root
.as_ref()
.and_then(|root| matrix::BotConfig::load(root))
.filter(|cfg| cfg.transport == "whatsapp")
.map(|cfg| {
let template_name = cfg
.whatsapp_notification_template
.clone()
.unwrap_or_else(|| "pipeline_notification".to_string());
let transport = Arc::new(whatsapp::WhatsAppTransport::new(
cfg.whatsapp_phone_number_id.clone().unwrap_or_default(),
cfg.whatsapp_access_token.clone().unwrap_or_default(),
template_name,
));
let bot_name = cfg
.display_name
.clone()
.unwrap_or_else(|| "Assistant".to_string());
let root = startup_root.clone().unwrap();
let history = whatsapp::load_whatsapp_history(&root);
Arc::new(whatsapp::WhatsAppWebhookContext {
verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(),
transport,
project_root: root,
agents: Arc::clone(&startup_agents),
bot_name,
bot_user_id: "whatsapp-bot".to_string(),
ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
history_size: cfg.history_size,
window_tracker: Arc::new(whatsapp::MessagingWindowTracker::new()),
})
});
// Build Slack webhook context if bot.toml configures transport = "slack".
let slack_ctx: Option<Arc<slack::SlackWebhookContext>> = startup_root
.as_ref()
.and_then(|root| matrix::BotConfig::load(root))
.filter(|cfg| cfg.transport == "slack")
.map(|cfg| {
let transport = Arc::new(slack::SlackTransport::new(
cfg.slack_bot_token.clone().unwrap_or_default(),
));
let bot_name = cfg
.display_name
.clone()
.unwrap_or_else(|| "Assistant".to_string());
let root = startup_root.clone().unwrap();
let history = slack::load_slack_history(&root);
let channel_ids: std::collections::HashSet<String> =
cfg.slack_channel_ids.iter().cloned().collect();
Arc::new(slack::SlackWebhookContext {
signing_secret: cfg.slack_signing_secret.clone().unwrap_or_default(),
transport,
project_root: root,
agents: Arc::clone(&startup_agents),
bot_name,
bot_user_id: "slack-bot".to_string(),
ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
history_size: cfg.history_size,
channel_ids,
})
});
let app = build_routes(ctx, whatsapp_ctx, slack_ctx);
// Optional Matrix bot: connect to the homeserver and start listening for
// messages if `.storkit/bot.toml` is present and enabled.
if let Some(ref root) = startup_root {
2026-03-20 12:26:02 +00:00
matrix::spawn_bot(
root,
watcher_tx_for_bot,
perm_rx_for_bot,
Arc::clone(&startup_agents),
);
}
// On startup:
// 1. Reconcile any stories whose agent work was committed while the server was
// offline (worktree has commits ahead of master but pipeline didn't advance).
// 2. Auto-assign free agents to remaining unassigned work in the pipeline.
if let Some(root) = startup_root {
tokio::spawn(async move {
2026-03-20 12:26:02 +00:00
slog!("[startup] Reconciling completed worktrees from previous session.");
startup_agents
.reconcile_on_startup(&root, &startup_reconciliation_tx)
.await;
2026-03-20 12:26:02 +00:00
slog!("[auto-assign] Scanning pipeline stages for unassigned work.");
startup_agents.auto_assign_available_work(&root).await;
});
}
let addr = format!("127.0.0.1:{port}");
2026-02-16 17:10:23 +00:00
println!(
"\x1b[95;1m ____ _ _ ___ _ \n / ___|| |_ ___ _ __| | _|_ _| |_ \n \\___ \\| __/ _ \\| '__| |/ /| || __|\n ___) | || (_) | | | < | || |_ \n |____/ \\__\\___/|_| |_|\\_\\___|\\__|\n\x1b[0m"
);
2026-03-20 12:26:02 +00:00
println!("STORKIT_PORT={port}");
println!("\x1b[96;1mFrontend:\x1b[0m \x1b[94mhttp://{addr}\x1b[0m");
println!("\x1b[92;1mOpenAPI Docs:\x1b[0m \x1b[94mhttp://{addr}/docs\x1b[0m");
let port_file = write_port_file(&cwd, port);
let result = Server::new(TcpListener::bind(&addr)).run(app).await;
// Kill all active PTY child processes before exiting to prevent orphaned
// Claude Code processes from running after the server restarts.
agents_for_shutdown.kill_all_children();
if let Some(ref path) = port_file {
remove_port_file(path);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(expected = "Invalid project.toml: Duplicate agent name")]
fn panics_on_duplicate_agent_names() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".storkit");
std::fs::create_dir_all(&sk).unwrap();
std::fs::write(
sk.join("project.toml"),
r#"
[[agent]]
name = "coder"
[[agent]]
name = "coder"
"#,
)
.unwrap();
config::ProjectConfig::load(tmp.path())
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
}
// ── parse_project_path_arg ────────────────────────────────────────────
#[test]
fn parse_project_path_arg_none_when_no_args() {
let cwd = PathBuf::from("/home/user/project");
let result = parse_project_path_arg(&[], &cwd);
assert!(result.is_none());
}
#[test]
fn parse_project_path_arg_returns_path_for_absolute_arg() {
let cwd = PathBuf::from("/home/user/project");
let args = vec!["/some/absolute/path".to_string()];
let result = parse_project_path_arg(&args, &cwd).unwrap();
// Absolute path returned as-is (canonicalize may fail, fallback used)
2026-03-20 12:26:02 +00:00
assert!(
result.ends_with("absolute/path") || result == PathBuf::from("/some/absolute/path")
);
}
#[test]
fn parse_project_path_arg_resolves_dot_to_cwd() {
let tmp = tempfile::tempdir().unwrap();
let cwd = tmp.path().to_path_buf();
let args = vec![".".to_string()];
let result = parse_project_path_arg(&args, &cwd).unwrap();
// "." relative to an existing cwd should canonicalize to the cwd itself
assert_eq!(result, cwd.canonicalize().unwrap_or(cwd));
}
#[test]
fn parse_project_path_arg_resolves_relative_path() {
let tmp = tempfile::tempdir().unwrap();
let cwd = tmp.path().to_path_buf();
let subdir = cwd.join("myproject");
std::fs::create_dir_all(&subdir).unwrap();
let args = vec!["myproject".to_string()];
let result = parse_project_path_arg(&args, &cwd).unwrap();
assert_eq!(result, subdir.canonicalize().unwrap_or(subdir));
}
}