Compare commits

...

7 Commits

25 changed files with 3705 additions and 248 deletions
+24
View File
@@ -0,0 +1,24 @@
# Huskies project-local agent guidance
## Documentation
Docs live in `website/docs/*.html` (static HTML), **not** Markdown files. When a story asks you to document something, edit the relevant `.html` file in `website/docs/`.
## Configuration files
- Agent config: `.huskies/agents.toml` (preferred) or `[[agent]]` blocks in `.huskies/project.toml`
- Project settings: `.huskies/project.toml`
- Bot credentials: `.huskies/bot.toml` (gitignored — never commit)
## Frontend build
The frontend is embedded into the Rust binary via `rust-embed`. Run `npm run build` in `frontend/` before testing frontend changes, or the embedded assets will be stale.
## Quality gates (all enforced by `script/test`)
1. `npm run build` (frontend)
2. `cargo fmt --all --check`
3. `cargo clippy -- -D warnings`
4. `cargo test`
5. `npm test` (frontend Vitest)
Clippy is zero-tolerance: no warnings allowed. Fix every warning before committing.
## Runtime validation
The `validate_agents` function in `server/src/config.rs` rejects unknown runtimes. Supported values: `"claude-code"` and `"gemini"`. Adding a new runtime requires updating that function.
+3
View File
@@ -136,6 +136,9 @@ The gateway presents a unified MCP surface to the chat agent. All tool calls are
| `switch_project` | Change the active project | | `switch_project` | Change the active project |
| `gateway_status` | Show active project and list all registered projects | | `gateway_status` | Show active project and list all registered projects |
| `gateway_health` | Health check all containers | | `gateway_health` | Health check all containers |
| `init_project` | Scaffold a new `.huskies/` project at a given path — prefer this over asking the user to run `huskies init` on the CLI |
**Initialising a new project via MCP (preferred):** Instead of asking the user to run `huskies init <path>` in a terminal, call `init_project` with the `path` argument. Optionally pass `name` and `url` to register the project in `projects.toml` immediately. After that, start a huskies server at the path and use `switch_project` to make it active before calling `wizard_status`.
### Example: multi-project Docker Compose ### Example: multi-project Docker Compose
+7
View File
@@ -79,6 +79,13 @@ cd frontend && npm install && npm run dev
Configuration lives in `.huskies/project.toml`. See `.huskies/bot.toml.*.example` for transport setup. Configuration lives in `.huskies/project.toml`. See `.huskies/bot.toml.*.example` for transport setup.
## Architecture
Internal architecture documentation lives in [`docs/architecture/`](docs/architecture/):
- [Service module conventions](docs/architecture/service-modules.md) — layout, layering rules, and patterns for `server/src/service/`
- [Future extraction targets](docs/architecture/future-extractions.md) — recommended order for remaining handler extractions
## Releasing ## Releasing
Requires a Gitea API token in `.env` (`GITEA_TOKEN=your_token`). Requires a Gitea API token in `.env` (`GITEA_TOKEN=your_token`).
+29
View File
@@ -0,0 +1,29 @@
# Future Service Module Extractions
Recommended order for extracting remaining HTTP handlers into `service/<domain>/`
modules, following the conventions in [service-modules.md](service-modules.md).
## Recommended Order
1. **`settings`** — small surface, few dependencies, good warm-up
2. **`oauth`** — reads/writes token files; pure validation logic separates cleanly
3. **`wizard`** — stateless generation logic is already mostly pure; thin I/O layer
4. **`project`** — project scaffolding; wraps `io::fs::scaffold`, clean separation
5. **`io`** (search/shell) — wraps `io::search` and `io::shell`; pure query-building separable
6. **`anthropic`** — token-proxy handler; pure request-shaping + thin HTTP I/O
7. **`stories`** (workflow) — CRDT-backed story ops; typed errors for 400/404/409/500
8. **`events`** — SSE handler; mostly framework wiring, but event filtering is pure
## Special Case: `ws`
The WebSocket handler (`http/ws.rs`) is a **dedicated harder extraction** because
it mixes multiple concerns (chat dispatch, permission forwarding, SSE bridging)
and depends on long-lived async streams. Extract it last, after the above list
is complete and the service module pattern is well-established.
## Notes
- Each extraction should link back to `docs/architecture/service-modules.md`
in the story description to maintain consistency.
- The `agents` extraction (story 604) is the reference implementation every
future extraction should follow.
+191
View File
@@ -0,0 +1,191 @@
# Service Module Conventions
This document defines the layout, layering rules, and patterns for all service
modules under `server/src/service/`. Every extraction from the HTTP handlers to
a service module **must** follow these conventions.
---
## 1. Directory Layout
```
server/src/service/<domain>/
mod.rs — public API, typed Error, orchestration, integration tests
io.rs — every side-effectful call; the ONLY file that may touch the
filesystem, spawn processes, or call external crates that do
<topic>.rs — pure logic for a named concern within the domain; no I/O
```
### Rules
- `<domain>` matches the HTTP handler filename (e.g. `agents`, `settings`,
`oauth`).
- **No file named `logic.rs`** — use a descriptive domain name instead
(e.g. `selection.rs`, `token.rs`, `validation.rs`).
- New topic files are added when a pure concern grows beyond ~50 lines or when
it has independent test coverage needs.
---
## 2. The Functional-Core / Imperative-Shell Rule
```
io.rs (imperative shell) ←→ mod.rs (orchestrator) ←→ <topic>.rs (functional core)
```
| Layer | Allowed | Forbidden |
|-------|---------|-----------|
| `<topic>.rs` | Pure Rust, data-transformation, branching logic, pattern matching | Any I/O |
| `io.rs` | `std::fs`, `std::process`, `tokio::fs`, network calls, `SystemTime::now` | Business logic beyond a thin wrapper |
| `mod.rs` | Calls into `io.rs` and `<topic>.rs`; owns the `Error` type | Direct I/O without going through `io.rs` |
**Grep-enforceable check:** The following must NOT appear in any `service/<domain>/` file other than `io.rs`:
- `std::fs`
- `std::process`
- `std::thread::sleep`
- `tokio::fs`
- `reqwest`
- `SystemTime::now`
---
## 3. Error Type Pattern
Each service domain declares its own typed error enum in `mod.rs`:
```rust
/// Errors returned by `service::agents` operations.
#[derive(Debug)]
pub enum Error {
ProjectRootNotConfigured,
AgentNotFound(String),
WorkItemNotFound(String),
WorktreeError(String),
ConfigError(String),
IoError(String),
}
impl std::fmt::Display for Error { ... }
```
HTTP handlers map service errors to **specific** HTTP status codes:
| Error variant | HTTP status |
|--------------|-------------|
| `ProjectRootNotConfigured` | 400 Bad Request |
| `AgentNotFound` | 404 Not Found |
| `WorkItemNotFound` | 404 Not Found |
| `WorktreeError` | 400 Bad Request |
| `ConfigError` | 400 Bad Request |
| `IoError` | 500 Internal Server Error |
**No generic `bad_request` for everything** — distinguish 400 vs 404 vs 500.
---
## 4. Test Pattern
### Pure topic files (`<topic>.rs`)
```rust
#[cfg(test)]
mod tests {
use super::*;
// Unit tests MUST:
// - Use no tempdir, tokio runtime, or filesystem
// - Cover every branch of every public function
#[test]
fn filter_removes_archived_agents() { ... }
}
```
### `io.rs`
```rust
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
// IO tests MAY use tempdirs and real filesystem.
// Keep them few and focused on the thin I/O wrapper contract.
#[test]
fn is_archived_returns_true_when_in_done() { ... }
}
```
### `mod.rs`
```rust
#[cfg(test)]
mod tests {
use super::*;
// Integration tests compose io + pure layers end-to-end.
// May use tempdirs. Keep the count small — they are integration-level.
#[tokio::test]
async fn list_agents_excludes_archived() { ... }
}
```
---
## 5. Dependency Injection Pattern
Service functions take **only the dependencies they actually use**:
```rust
// Good — takes only what it needs
pub async fn start_agent(
pool: &AgentPool,
project_root: &Path,
story_id: &str,
agent_name: Option<&str>,
) -> Result<AgentInfo, Error> { ... }
// Bad — takes the whole AppContext
pub async fn start_agent(ctx: &AppContext, ...) -> Result<AgentInfo, Error> { ... }
```
Standard injected dependencies for `service::agents`:
| Type | Purpose |
|------|---------|
| `&AgentPool` | Agent lifecycle operations |
| `&Path` (`project_root`) | Filesystem operations scoped to the project |
| `&WorkflowState` | In-memory test result cache |
**The dependency set chosen for `agents` is the reference pattern for all future
service module extractions.**
---
## 6. HTTP Handler Contract
After extraction, HTTP handlers are thin adapters:
```rust
async fn start_agent(&self, payload: Json<StartAgentPayload>) -> OpenApiResult<...> {
let project_root = self.ctx.agents.get_project_root(&self.ctx.state)
.map_err(|e| bad_request(e))?; // extract from AppContext
let info = service::agents::start_agent( // call service
&self.ctx.agents, &project_root, &payload.story_id, payload.agent_name.as_deref(),
).await.map_err(map_service_error)?; // map typed error → HTTP
Ok(Json(AgentInfoResponse { ... })) // shape DTO
}
```
Handlers must contain **no**:
- `std::fs` / file reads
- `std::process` invocations
- Inline load-mutate-save sequences
- Inline validation that belongs in the service layer
---
## 7. Follow-up Extractions
See [future-extractions.md](future-extractions.md) for the recommended order
and rationale for remaining extraction targets.
+118
View File
@@ -0,0 +1,118 @@
//! Project-local agent prompt layer.
//!
//! Reads `.huskies/AGENT.md` from the project root and appends its content to
//! the baked-in agent prompt at spawn time. This lets projects record
//! non-obvious facts (directory conventions, known traps, etc.) that every
//! agent should know without modifying the shared agent configuration.
//!
//! Behaviour contract:
//! - If the file is missing or empty the caller receives `None`; agents spawn
//! normally with no warnings or errors.
//! - If the file exists and is non-empty, the content is returned and an
//! INFO-level log line is emitted with the file path and byte count.
//! - The file is read fresh on every agent spawn — no caching.
use std::path::Path;
/// Attempt to load the project-local agent prompt from `.huskies/AGENT.md`.
///
/// Returns `Some(content)` when the file exists and is non-empty, or `None`
/// when the file is absent or empty. Never returns an error; any I/O problem
/// is silently treated as "no local prompt".
pub fn read_project_local_prompt(project_root: &Path) -> Option<String> {
let path = project_root.join(".huskies/AGENT.md");
let content = std::fs::read_to_string(&path).ok()?;
let trimmed = content.trim();
if trimmed.is_empty() {
return None;
}
crate::slog!(
"[agents] project-local prompt loaded: {} ({} bytes)",
path.display(),
trimmed.len()
);
Some(trimmed.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn returns_none_when_file_absent() {
let tmp = tempfile::tempdir().unwrap();
let result = read_project_local_prompt(tmp.path());
assert!(result.is_none(), "missing file must return None");
}
#[test]
fn returns_none_when_file_empty() {
let tmp = tempfile::tempdir().unwrap();
let huskies_dir = tmp.path().join(".huskies");
std::fs::create_dir_all(&huskies_dir).unwrap();
std::fs::write(huskies_dir.join("AGENT.md"), "").unwrap();
let result = read_project_local_prompt(tmp.path());
assert!(result.is_none(), "empty file must return None");
}
#[test]
fn returns_none_when_file_whitespace_only() {
let tmp = tempfile::tempdir().unwrap();
let huskies_dir = tmp.path().join(".huskies");
std::fs::create_dir_all(&huskies_dir).unwrap();
std::fs::write(huskies_dir.join("AGENT.md"), " \n\n ").unwrap();
let result = read_project_local_prompt(tmp.path());
assert!(result.is_none(), "whitespace-only file must return None");
}
#[test]
fn returns_content_when_file_non_empty() {
let tmp = tempfile::tempdir().unwrap();
let huskies_dir = tmp.path().join(".huskies");
std::fs::create_dir_all(&huskies_dir).unwrap();
let marker = "DISTINCTIVE_MARKER_XYZ42";
std::fs::write(huskies_dir.join("AGENT.md"), format!("# Hints\n{marker}\n")).unwrap();
let result = read_project_local_prompt(tmp.path());
assert!(result.is_some(), "non-empty file must return Some");
let content = result.unwrap();
assert!(
content.contains(marker),
"returned content must include the marker: {content}"
);
}
#[test]
fn appended_to_prompt_integration() {
// Simulates the start.rs usage: marker appears in the constructed
// system prompt when the file is present, absent when it is not.
let tmp_with = tempfile::tempdir().unwrap();
let huskies_dir = tmp_with.path().join(".huskies");
std::fs::create_dir_all(&huskies_dir).unwrap();
let marker = "INTEGRATION_MARKER_601";
std::fs::write(huskies_dir.join("AGENT.md"), marker).unwrap();
let base_prompt = "You are a coder agent.".to_string();
let local = read_project_local_prompt(tmp_with.path());
let effective = match local {
Some(ref extra) => format!("{base_prompt}\n\n{extra}"),
None => base_prompt.clone(),
};
assert!(
effective.contains(marker),
"marker must appear in effective prompt when file present: {effective}"
);
// Without the file
let tmp_without = tempfile::tempdir().unwrap();
let local2 = read_project_local_prompt(tmp_without.path());
assert!(local2.is_none(), "no marker when file absent");
let effective2 = match local2 {
Some(ref extra) => format!("{base_prompt}\n\n{extra}"),
None => base_prompt.clone(),
};
assert!(
!effective2.contains(marker),
"marker must NOT appear in effective prompt when file absent: {effective2}"
);
}
}
+1
View File
@@ -1,6 +1,7 @@
//! Agent subsystem — types, configuration, and orchestration for coding agents. //! Agent subsystem — types, configuration, and orchestration for coding agents.
pub mod gates; pub mod gates;
pub mod lifecycle; pub mod lifecycle;
pub mod local_prompt;
pub mod merge; pub mod merge;
mod pool; mod pool;
pub(crate) mod pty; pub(crate) mod pty;
+11
View File
@@ -410,6 +410,17 @@ impl AgentPool {
} }
}; };
// Append project-local prompt content (.huskies/AGENT.md) to the
// baked-in prompt so every agent role sees project-specific guidance
// without any config changes. The file is read fresh each spawn;
// if absent or empty, the prompt is unchanged and no warning is logged.
if let Some(local) =
crate::agents::local_prompt::read_project_local_prompt(&project_root_clone)
{
prompt.push_str("\n\n");
prompt.push_str(&local);
}
// Build the effective prompt and determine resume session. // Build the effective prompt and determine resume session.
// //
// When resuming a previous session, discard the full rendered prompt // When resuming a previous session, discard the full rendered prompt
@@ -185,7 +185,8 @@ pub(super) async fn on_room_message(
// endpoint. Only a small set of gateway-local commands are handled here. // endpoint. Only a small set of gateway-local commands are handled here.
if ctx.is_gateway() { if ctx.is_gateway() {
// Commands that are meaningful on the gateway itself (no project state needed). // Commands that are meaningful on the gateway itself (no project state needed).
const GATEWAY_LOCAL_COMMANDS: &[&str] = &["help", "ambient", "reset", "switch"]; const GATEWAY_LOCAL_COMMANDS: &[&str] =
&["help", "ambient", "reset", "switch", "all_status"];
let stripped = crate::chat::util::strip_bot_mention( let stripped = crate::chat::util::strip_bot_mention(
&user_message, &user_message,
@@ -229,6 +230,26 @@ pub(super) async fn on_room_message(
} }
return; return;
} }
// `all_status` — aggregate pipeline status across all projects (gateway-only).
if cmd == "all_status" {
let project_urls = ctx.gateway_project_urls.clone();
let client = reqwest::Client::new();
let statuses =
crate::gateway::fetch_all_project_pipeline_statuses(&project_urls, &client).await;
let response = crate::gateway::format_aggregate_status_compact(&statuses);
let html = markdown_to_html(&response);
if let Ok(msg_id) = ctx
.transport
.send_message(&room_id_str, &response, &html)
.await
&& let Ok(event_id) = msg_id.parse()
{
ctx.bot_sent_event_ids.lock().await.insert(event_id);
}
return;
}
// Gateway-local commands and freeform text fall through to normal handling below. // Gateway-local commands and freeform text fall through to normal handling below.
} }
@@ -168,6 +168,11 @@ pub async fn run_bot(
let notif_room_ids = target_room_ids.clone(); let notif_room_ids = target_room_ids.clone();
let notif_project_root = project_root.clone(); let notif_project_root = project_root.clone();
let announce_room_ids = target_room_ids.clone(); let announce_room_ids = target_room_ids.clone();
// Clone values needed by the gateway notification poller (only used in gateway mode).
let poller_room_ids: Vec<String> = target_room_ids.iter().map(|r| r.to_string()).collect();
let poller_project_urls = gateway_project_urls.clone();
let poller_poll_interval = config.aggregated_notifications_poll_interval_secs;
let poller_enabled = config.aggregated_notifications_enabled;
let persisted = load_history(&project_root); let persisted = load_history(&project_root);
slog!( slog!(
@@ -271,6 +276,20 @@ pub async fn run_bot(
notif_project_root, notif_project_root,
); );
// In gateway mode, spawn the cross-project notification poller.
// It polls every registered project's `/api/events` endpoint and forwards
// new events to the configured gateway rooms with a `[project-name]` prefix.
// The poller is controlled by the gateway-level `aggregated_notifications_enabled`
// flag in bot.toml — set it to `false` to disable without touching per-project configs.
if !poller_project_urls.is_empty() && poller_enabled {
crate::gateway::spawn_gateway_notification_poller(
Arc::clone(&transport),
poller_room_ids,
poller_project_urls,
poller_poll_interval,
);
}
// Spawn a shutdown watcher that sends a best-effort goodbye message to all // Spawn a shutdown watcher that sends a best-effort goodbye message to all
// configured rooms when the server is about to stop (SIGINT/SIGTERM or rebuild). // configured rooms when the server is about to stop (SIGINT/SIGTERM or rebuild).
{ {
@@ -10,6 +10,14 @@ fn default_permission_timeout_secs() -> u64 {
120 120
} }
fn default_aggregated_notifications_poll_interval_secs() -> u64 {
5
}
fn default_aggregated_notifications_enabled() -> bool {
true
}
/// Configuration for the Matrix bot, read from `.huskies/bot.toml`. /// Configuration for the Matrix bot, read from `.huskies/bot.toml`.
#[derive(Deserialize, Clone, Debug)] #[derive(Deserialize, Clone, Debug)]
pub struct BotConfig { pub struct BotConfig {
@@ -146,6 +154,26 @@ pub struct BotConfig {
/// When empty or absent, all users in configured channels are allowed. /// When empty or absent, all users in configured channels are allowed.
#[serde(default)] #[serde(default)]
pub discord_allowed_users: Vec<String>, pub discord_allowed_users: Vec<String>,
/// How often (in seconds) the gateway polls each project server's
/// `/api/events` endpoint to aggregate cross-project notifications.
///
/// Only used when the gateway's bot is enabled. Defaults to 5 seconds.
#[serde(default = "default_aggregated_notifications_poll_interval_secs")]
pub aggregated_notifications_poll_interval_secs: u64,
/// Whether the gateway-level aggregated cross-project notification stream
/// is enabled. When `false`, the gateway will not poll per-project
/// servers for events even if the bot is otherwise enabled.
///
/// Set this in the **gateway's** `bot.toml` (not in per-project configs).
/// Adding a new project to `projects.toml` never requires touching
/// per-project bot configs — the aggregated stream picks it up
/// automatically once this flag is `true` (the default).
///
/// Defaults to `true`.
#[serde(default = "default_aggregated_notifications_enabled")]
pub aggregated_notifications_enabled: bool,
} }
fn default_transport() -> String { fn default_transport() -> String {
@@ -658,6 +686,47 @@ require_verified_devices = true
); );
} }
#[test]
fn aggregated_notifications_enabled_defaults_to_true() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".huskies");
fs::create_dir_all(&sk).unwrap();
fs::write(
sk.join("bot.toml"),
r#"
homeserver = "https://matrix.example.com"
username = "@bot:example.com"
password = "secret"
room_ids = ["!abc:example.com"]
enabled = true
"#,
)
.unwrap();
let config = BotConfig::load(tmp.path()).unwrap();
assert!(config.aggregated_notifications_enabled);
}
#[test]
fn aggregated_notifications_enabled_can_be_set_to_false() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".huskies");
fs::create_dir_all(&sk).unwrap();
fs::write(
sk.join("bot.toml"),
r#"
homeserver = "https://matrix.example.com"
username = "@bot:example.com"
password = "secret"
room_ids = ["!abc:example.com"]
enabled = true
aggregated_notifications_enabled = false
"#,
)
.unwrap();
let config = BotConfig::load(tmp.path()).unwrap();
assert!(!config.aggregated_notifications_enabled);
}
#[test] #[test]
fn load_reads_ambient_rooms() { fn load_reads_ambient_rooms() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
+1434 -43
View File
File diff suppressed because it is too large Load Diff
+131 -200
View File
@@ -1,11 +1,14 @@
//! HTTP agent endpoints — REST API for listing, starting, stopping, and inspecting agents. //! HTTP agent endpoints — thin adapters over `service::agents`.
use crate::config::ProjectConfig; //!
//! Each handler: extracts payload → calls `service::agents::X` → shapes
//! response DTO → returns HTTP result. No filesystem access, no inline
//! validation, no process invocations.
use crate::http::context::{AppContext, OpenApiResult, bad_request, not_found}; use crate::http::context::{AppContext, OpenApiResult, bad_request, not_found};
use crate::service::agents::{self as svc, AgentConfigEntry, WorkItemContent};
use crate::workflow::{StoryTestResults, TestCaseResult, TestStatus}; use crate::workflow::{StoryTestResults, TestCaseResult, TestStatus};
use crate::worktree; use poem::http::StatusCode;
use poem_openapi::{Object, OpenApi, Tags, param::Path, payload::Json}; use poem_openapi::{Object, OpenApi, Tags, param::Path, payload::Json};
use serde::Serialize; use serde::Serialize;
use std::path;
use std::sync::Arc; use std::sync::Arc;
#[derive(Tags)] #[derive(Tags)]
@@ -45,6 +48,20 @@ struct AgentConfigInfoResponse {
max_budget_usd: Option<f64>, max_budget_usd: Option<f64>,
} }
impl From<AgentConfigEntry> for AgentConfigInfoResponse {
fn from(e: AgentConfigEntry) -> Self {
Self {
name: e.name,
role: e.role,
stage: e.stage,
model: e.model,
allowed_tools: e.allowed_tools,
max_turns: e.max_turns,
max_budget_usd: e.max_budget_usd,
}
}
}
#[derive(Object)] #[derive(Object)]
struct CreateWorktreePayload { struct CreateWorktreePayload {
story_id: String, story_id: String,
@@ -73,6 +90,17 @@ struct WorkItemContentResponse {
agent: Option<String>, agent: Option<String>,
} }
impl From<WorkItemContent> for WorkItemContentResponse {
fn from(w: WorkItemContent) -> Self {
Self {
content: w.content,
stage: w.stage,
name: w.name,
agent: w.agent,
}
}
}
/// A single test case result for the OpenAPI response. /// A single test case result for the OpenAPI response.
#[derive(Object, Serialize)] #[derive(Object, Serialize)]
struct TestCaseResultResponse { struct TestCaseResultResponse {
@@ -153,15 +181,23 @@ struct AllTokenUsageResponse {
records: Vec<TokenUsageRecordResponse>, records: Vec<TokenUsageRecordResponse>,
} }
/// Returns true if the story file exists in `work/5_done/` or `work/6_archived/`. /// Map a `service::agents::Error` to a Poem HTTP error with the correct status.
/// fn map_svc_error(err: svc::Error) -> poem::Error {
/// Used to exclude agents for already-archived stories from the `list_agents` match err {
/// response so the agents panel is not cluttered with old completed items on svc::Error::AgentNotFound(_) => {
/// frontend startup. poem::Error::from_string(err.to_string(), StatusCode::NOT_FOUND)
pub fn story_is_archived(project_root: &path::Path, story_id: &str) -> bool { }
let work = project_root.join(".huskies").join("work"); svc::Error::WorkItemNotFound(_) => {
let filename = format!("{story_id}.md"); poem::Error::from_string(err.to_string(), StatusCode::NOT_FOUND)
work.join("5_done").join(&filename).exists() || work.join("6_archived").join(&filename).exists() }
svc::Error::Worktree(_) => {
poem::Error::from_string(err.to_string(), StatusCode::BAD_REQUEST)
}
svc::Error::Config(_) => poem::Error::from_string(err.to_string(), StatusCode::BAD_REQUEST),
svc::Error::Io(_) => {
poem::Error::from_string(err.to_string(), StatusCode::INTERNAL_SERVER_ERROR)
}
}
} }
pub struct AgentsApi { pub struct AgentsApi {
@@ -183,18 +219,16 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let info = self let info = svc::start_agent(
.ctx &self.ctx.agents,
.agents &project_root,
.start_agent( &payload.0.story_id,
&project_root, payload.0.agent_name.as_deref(),
&payload.0.story_id, None,
payload.0.agent_name.as_deref(), None,
None, )
None, .await
) .map_err(map_svc_error)?;
.await
.map_err(bad_request)?;
Ok(Json(AgentInfoResponse { Ok(Json(AgentInfoResponse {
story_id: info.story_id, story_id: info.story_id,
@@ -214,11 +248,14 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
self.ctx svc::stop_agent(
.agents &self.ctx.agents,
.stop_agent(&project_root, &payload.0.story_id, &payload.0.agent_name) &project_root,
.await &payload.0.story_id,
.map_err(bad_request)?; &payload.0.agent_name,
)
.await
.map_err(map_svc_error)?;
Ok(Json(true)) Ok(Json(true))
} }
@@ -231,17 +268,12 @@ impl AgentsApi {
#[oai(path = "/agents", method = "get")] #[oai(path = "/agents", method = "get")]
async fn list_agents(&self) -> OpenApiResult<Json<Vec<AgentInfoResponse>>> { async fn list_agents(&self) -> OpenApiResult<Json<Vec<AgentInfoResponse>>> {
let project_root = self.ctx.agents.get_project_root(&self.ctx.state).ok(); let project_root = self.ctx.agents.get_project_root(&self.ctx.state).ok();
let agents = self.ctx.agents.list_agents().map_err(bad_request)?; let agents =
svc::list_agents(&self.ctx.agents, project_root.as_deref()).map_err(map_svc_error)?;
Ok(Json( Ok(Json(
agents agents
.into_iter() .into_iter()
.filter(|info| {
project_root
.as_deref()
.map(|root| !story_is_archived(root, &info.story_id))
.unwrap_or(true)
})
.map(|info| AgentInfoResponse { .map(|info| AgentInfoResponse {
story_id: info.story_id, story_id: info.story_id,
agent_name: info.agent_name, agent_name: info.agent_name,
@@ -262,21 +294,11 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let config = ProjectConfig::load(&project_root).map_err(bad_request)?; let entries = svc::get_agent_config(&project_root).map_err(map_svc_error)?;
Ok(Json( Ok(Json(
config entries
.agent .into_iter()
.iter() .map(AgentConfigInfoResponse::from)
.map(|a| AgentConfigInfoResponse {
name: a.name.clone(),
role: a.role.clone(),
stage: a.stage.clone(),
model: a.model.clone(),
allowed_tools: a.allowed_tools.clone(),
max_turns: a.max_turns,
max_budget_usd: a.max_budget_usd,
})
.collect(), .collect(),
)) ))
} }
@@ -290,21 +312,11 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let config = ProjectConfig::load(&project_root).map_err(bad_request)?; let entries = svc::reload_config(&project_root).map_err(map_svc_error)?;
Ok(Json( Ok(Json(
config entries
.agent .into_iter()
.iter() .map(AgentConfigInfoResponse::from)
.map(|a| AgentConfigInfoResponse {
name: a.name.clone(),
role: a.role.clone(),
stage: a.stage.clone(),
model: a.model.clone(),
allowed_tools: a.allowed_tools.clone(),
max_turns: a.max_turns,
max_budget_usd: a.max_budget_usd,
})
.collect(), .collect(),
)) ))
} }
@@ -321,12 +333,9 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let info = self let info = svc::create_worktree(&self.ctx.agents, &project_root, &payload.0.story_id)
.ctx
.agents
.create_worktree(&project_root, &payload.0.story_id)
.await .await
.map_err(bad_request)?; .map_err(map_svc_error)?;
Ok(Json(WorktreeInfoResponse { Ok(Json(WorktreeInfoResponse {
story_id: payload.0.story_id, story_id: payload.0.story_id,
@@ -345,7 +354,7 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let entries = worktree::list_worktrees(&project_root).map_err(bad_request)?; let entries = svc::list_worktrees(&project_root).map_err(map_svc_error)?;
Ok(Json( Ok(Json(
entries entries
@@ -373,64 +382,12 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let stages = [ let item = svc::get_work_item_content(&project_root, &story_id.0).map_err(|e| match e {
("1_backlog", "backlog"), svc::Error::WorkItemNotFound(_) => not_found(e.to_string()),
("2_current", "current"), other => map_svc_error(other),
("3_qa", "qa"), })?;
("4_merge", "merge"),
("5_done", "done"),
("6_archived", "archived"),
];
let work_dir = project_root.join(".huskies").join("work"); Ok(Json(WorkItemContentResponse::from(item)))
let filename = format!("{}.md", story_id.0);
for (stage_dir, stage_name) in &stages {
let file_path = work_dir.join(stage_dir).join(&filename);
if file_path.exists() {
let content = std::fs::read_to_string(&file_path)
.map_err(|e| bad_request(format!("Failed to read work item: {e}")))?;
let metadata = crate::io::story_metadata::parse_front_matter(&content).ok();
let name = metadata.as_ref().and_then(|m| m.name.clone());
let agent = metadata.and_then(|m| m.agent);
return Ok(Json(WorkItemContentResponse {
content,
stage: stage_name.to_string(),
name,
agent,
}));
}
}
// Filesystem miss — fall back to CRDT-only path (story exists in the CRDT
// but has no corresponding .md file on disk).
if let Some(content) = crate::db::read_content(&story_id.0) {
let item = crate::pipeline_state::read_typed(&story_id.0)
.map_err(|e| bad_request(format!("Pipeline read error: {e}")))?;
let stage = item
.as_ref()
.map(|i| match &i.stage {
crate::pipeline_state::Stage::Backlog => "backlog",
crate::pipeline_state::Stage::Coding => "current",
crate::pipeline_state::Stage::Qa => "qa",
crate::pipeline_state::Stage::Merge { .. } => "merge",
crate::pipeline_state::Stage::Done { .. } => "done",
crate::pipeline_state::Stage::Archived { .. } => "archived",
})
.unwrap_or("unknown")
.to_string();
let metadata = crate::io::story_metadata::parse_front_matter(&content).ok();
let name = metadata.as_ref().and_then(|m| m.name.clone());
let agent = metadata.and_then(|m| m.agent);
return Ok(Json(WorkItemContentResponse {
content,
stage,
name,
agent,
}));
}
Err(not_found(format!("Work item not found: {}", story_id.0)))
} }
/// Get test results for a work item by its story_id. /// Get test results for a work item by its story_id.
@@ -442,30 +399,37 @@ impl AgentsApi {
&self, &self,
story_id: Path<String>, story_id: Path<String>,
) -> OpenApiResult<Json<Option<TestResultsResponse>>> { ) -> OpenApiResult<Json<Option<TestResultsResponse>>> {
// Try in-memory workflow state first. // Fast path: return from in-memory state without requiring project_root.
let workflow = self let in_memory = {
.ctx let workflow = self
.workflow .ctx
.lock() .workflow
.map_err(|e| bad_request(format!("Lock error: {e}")))?; .lock()
.map_err(|e| bad_request(format!("Lock error: {e}")))?;
if let Some(results) = workflow.results.get(&story_id.0) { workflow.results.get(&story_id.0).cloned()
return Ok(Json(Some(TestResultsResponse::from_story_results(results)))); };
if let Some(results) = in_memory {
return Ok(Json(Some(TestResultsResponse::from_story_results(
&results,
))));
} }
drop(workflow);
// Fall back to file-persisted results. // Slow path: fall back to results persisted in the story file.
let project_root = self let project_root = self
.ctx .ctx
.agents .agents
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let file_results = let workflow = self
crate::http::workflow::read_test_results_from_story_file(&project_root, &story_id.0); .ctx
.workflow
.lock()
.map_err(|e| bad_request(format!("Lock error: {e}")))?;
let results = svc::get_test_results(&project_root, &story_id.0, &workflow);
Ok(Json( Ok(Json(
file_results.map(|r| TestResultsResponse::from_story_results(&r)), results.map(|r| TestResultsResponse::from_story_results(&r)),
)) ))
} }
@@ -486,26 +450,8 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let log_path = crate::agent_log::find_latest_log(&project_root, &story_id.0, &agent_name.0); let output = svc::get_agent_output(&project_root, &story_id.0, &agent_name.0)
.map_err(map_svc_error)?;
let Some(path) = log_path else {
return Ok(Json(AgentOutputResponse {
output: String::new(),
}));
};
let entries = crate::agent_log::read_log(&path).map_err(bad_request)?;
let output: String = entries
.iter()
.filter(|e| e.event.get("type").and_then(|t| t.as_str()) == Some("output"))
.filter_map(|e| {
e.event
.get("text")
.and_then(|t| t.as_str())
.map(str::to_owned)
})
.collect();
Ok(Json(AgentOutputResponse { output })) Ok(Json(AgentOutputResponse { output }))
} }
@@ -519,10 +465,9 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let config = ProjectConfig::load(&project_root).map_err(bad_request)?; svc::remove_worktree(&project_root, &story_id.0)
worktree::remove_worktree_by_story_id(&project_root, &story_id.0, &config)
.await .await
.map_err(bad_request)?; .map_err(map_svc_error)?;
Ok(Json(true)) Ok(Json(true))
} }
@@ -542,39 +487,25 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let all_records = crate::agents::token_usage::read_all(&project_root) let summary =
.map_err(|e| bad_request(format!("Failed to read token usage: {e}")))?; svc::get_work_item_token_cost(&project_root, &story_id.0).map_err(map_svc_error)?;
let mut agent_map: std::collections::HashMap<String, AgentCostEntry> = let agents = summary
std::collections::HashMap::new(); .agents
.into_iter()
let mut total_cost_usd = 0.0_f64; .map(|a| AgentCostEntry {
agent_name: a.agent_name,
for record in all_records.into_iter().filter(|r| r.story_id == story_id.0) { model: a.model,
total_cost_usd += record.usage.total_cost_usd; input_tokens: a.input_tokens,
let entry = agent_map output_tokens: a.output_tokens,
.entry(record.agent_name.clone()) cache_creation_input_tokens: a.cache_creation_input_tokens,
.or_insert_with(|| AgentCostEntry { cache_read_input_tokens: a.cache_read_input_tokens,
agent_name: record.agent_name.clone(), total_cost_usd: a.total_cost_usd,
model: record.model.clone(), })
input_tokens: 0, .collect();
output_tokens: 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0,
total_cost_usd: 0.0,
});
entry.input_tokens += record.usage.input_tokens;
entry.output_tokens += record.usage.output_tokens;
entry.cache_creation_input_tokens += record.usage.cache_creation_input_tokens;
entry.cache_read_input_tokens += record.usage.cache_read_input_tokens;
entry.total_cost_usd += record.usage.total_cost_usd;
}
let mut agents: Vec<AgentCostEntry> = agent_map.into_values().collect();
agents.sort_by(|a, b| a.agent_name.cmp(&b.agent_name));
Ok(Json(TokenCostResponse { Ok(Json(TokenCostResponse {
total_cost_usd, total_cost_usd: summary.total_cost_usd,
agents, agents,
})) }))
} }
@@ -590,8 +521,7 @@ impl AgentsApi {
.get_project_root(&self.ctx.state) .get_project_root(&self.ctx.state)
.map_err(bad_request)?; .map_err(bad_request)?;
let records = crate::agents::token_usage::read_all(&project_root) let records = svc::get_all_token_usage(&project_root).map_err(map_svc_error)?;
.map_err(|e| bad_request(format!("Failed to read token usage: {e}")))?;
let response_records: Vec<TokenUsageRecordResponse> = records let response_records: Vec<TokenUsageRecordResponse> = records
.into_iter() .into_iter()
@@ -618,6 +548,7 @@ impl AgentsApi {
mod tests { mod tests {
use super::*; use super::*;
use crate::agents::AgentStatus; use crate::agents::AgentStatus;
use std::path;
use tempfile::TempDir; use tempfile::TempDir;
fn make_work_dirs(tmp: &TempDir) -> path::PathBuf { fn make_work_dirs(tmp: &TempDir) -> path::PathBuf {
@@ -632,7 +563,7 @@ mod tests {
fn story_is_archived_false_when_file_absent() { fn story_is_archived_false_when_file_absent() {
let tmp = TempDir::new().unwrap(); let tmp = TempDir::new().unwrap();
let root = make_work_dirs(&tmp); let root = make_work_dirs(&tmp);
assert!(!story_is_archived(&root, "79_story_foo")); assert!(!svc::is_archived(&root, "79_story_foo"));
} }
#[test] #[test]
@@ -644,7 +575,7 @@ mod tests {
"---\nname: test\n---\n", "---\nname: test\n---\n",
) )
.unwrap(); .unwrap();
assert!(story_is_archived(&root, "79_story_foo")); assert!(svc::is_archived(&root, "79_story_foo"));
} }
#[test] #[test]
@@ -656,7 +587,7 @@ mod tests {
"---\nname: test\n---\n", "---\nname: test\n---\n",
) )
.unwrap(); .unwrap();
assert!(story_is_archived(&root, "79_story_foo")); assert!(svc::is_archived(&root, "79_story_foo"));
} }
#[tokio::test] #[tokio::test]
+176
View File
@@ -81,7 +81,9 @@ async fn dispatch_command(
"start" => dispatch_start(args, project_root, agents).await, "start" => dispatch_start(args, project_root, agents).await,
"delete" => dispatch_delete(args, project_root, agents).await, "delete" => dispatch_delete(args, project_root, agents).await,
"rebuild" => dispatch_rebuild(project_root, agents).await, "rebuild" => dispatch_rebuild(project_root, agents).await,
"rmtree" => dispatch_rmtree(args, project_root, agents).await,
"timer" => dispatch_timer(args, project_root).await, "timer" => dispatch_timer(args, project_root).await,
"htop" => dispatch_htop(args, agents).await,
// All other commands go through the synchronous command registry. // All other commands go through the synchronous command registry.
_ => dispatch_sync(cmd, args, project_root, agents), _ => dispatch_sync(cmd, args, project_root, agents),
} }
@@ -203,6 +205,24 @@ async fn dispatch_delete(
.await .await
} }
async fn dispatch_rmtree(
args: &str,
project_root: &std::path::Path,
agents: &Arc<crate::agents::AgentPool>,
) -> String {
let number_str = args.trim();
if number_str.is_empty() || !number_str.chars().all(|c| c.is_ascii_digit()) {
return "Usage: `/rmtree <number>` (e.g. `/rmtree 42`)".to_string();
}
crate::chat::transport::matrix::rmtree::handle_rmtree(
"web-ui",
number_str,
project_root,
agents,
)
.await
}
async fn dispatch_rebuild( async fn dispatch_rebuild(
project_root: &std::path::Path, project_root: &std::path::Path,
agents: &Arc<crate::agents::AgentPool>, agents: &Arc<crate::agents::AgentPool>,
@@ -230,6 +250,34 @@ async fn dispatch_timer(args: &str, project_root: &std::path::Path) -> String {
crate::chat::timer::handle_timer_command(timer_cmd, &store, project_root).await crate::chat::timer::handle_timer_command(timer_cmd, &store, project_root).await
} }
/// Handle the `htop` command from the web UI.
///
/// The web UI uses a one-shot HTTP request, so live updates are not possible
/// here. Returns a static snapshot of the process dashboard. For `htop stop`,
/// returns a helpful message (no persistent session state exists in the web UI).
async fn dispatch_htop(args: &str, agents: &Arc<crate::agents::AgentPool>) -> String {
use crate::chat::transport::matrix::htop::{HtopCommand, build_htop_message};
// Re-use the existing parser by constructing a synthetic message.
let synthetic = if args.is_empty() {
"__web_ui__ htop".to_string()
} else {
format!("__web_ui__ htop {args}")
};
match crate::chat::transport::matrix::htop::extract_htop_command(
&synthetic,
"__web_ui__",
"@__web_ui__:localhost",
) {
Some(HtopCommand::Stop) => "No active htop session in the web UI. \
Live sessions are only supported in chat transports (Matrix, Slack, Discord)."
.to_string(),
Some(HtopCommand::Start { duration_secs }) => build_htop_message(agents, 0, duration_secs),
None => build_htop_message(agents, 0, 300),
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Tests // Tests
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -349,6 +397,134 @@ mod tests {
); );
} }
// -- htop (web-UI slash-command path) ------------------------------------
#[tokio::test]
async fn htop_returns_dashboard_not_unknown_command() {
let dir = TempDir::new().unwrap();
let api = test_api(&dir);
let body = BotCommandRequest {
command: "htop".to_string(),
args: String::new(),
};
let result = api.run_command(Json(body)).await;
assert!(result.is_ok());
let resp = result.unwrap().0;
assert!(
!resp.response.contains("Unknown command"),
"htop should not return 'Unknown command': {}",
resp.response
);
assert!(
resp.response.contains("htop"),
"htop response should contain 'htop': {}",
resp.response
);
}
#[tokio::test]
async fn htop_with_duration_returns_dashboard() {
let dir = TempDir::new().unwrap();
let api = test_api(&dir);
let body = BotCommandRequest {
command: "htop".to_string(),
args: "10m".to_string(),
};
let result = api.run_command(Json(body)).await;
assert!(result.is_ok());
let resp = result.unwrap().0;
assert!(
!resp.response.contains("Unknown command"),
"htop 10m should not return 'Unknown command': {}",
resp.response
);
}
#[tokio::test]
async fn htop_stop_returns_response_not_unknown_command() {
let dir = TempDir::new().unwrap();
let api = test_api(&dir);
let body = BotCommandRequest {
command: "htop".to_string(),
args: "stop".to_string(),
};
let result = api.run_command(Json(body)).await;
assert!(result.is_ok());
let resp = result.unwrap().0;
assert!(
!resp.response.contains("Unknown command"),
"htop stop should not return 'Unknown command': {}",
resp.response
);
}
// -- rmtree ----------------------------------------------------------------
#[tokio::test]
async fn rmtree_without_number_returns_usage() {
let dir = TempDir::new().unwrap();
let api = test_api(&dir);
let body = BotCommandRequest {
command: "rmtree".to_string(),
args: String::new(),
};
let result = api.run_command(Json(body)).await;
assert!(result.is_ok());
let resp = result.unwrap().0;
assert!(
resp.response.contains("Usage"),
"expected usage hint for bare /rmtree: {}",
resp.response
);
}
#[tokio::test]
async fn rmtree_with_non_numeric_arg_returns_usage() {
let dir = TempDir::new().unwrap();
let api = test_api(&dir);
let body = BotCommandRequest {
command: "rmtree".to_string(),
args: "foo".to_string(),
};
let result = api.run_command(Json(body)).await;
assert!(result.is_ok());
let resp = result.unwrap().0;
assert!(
resp.response.contains("Usage"),
"expected usage hint for /rmtree foo: {}",
resp.response
);
}
#[tokio::test]
async fn rmtree_does_not_return_unknown_command() {
let dir = TempDir::new().unwrap();
let api = test_api(&dir);
let body = BotCommandRequest {
command: "rmtree".to_string(),
args: "999".to_string(),
};
let result = api.run_command(Json(body)).await;
assert!(result.is_ok());
let resp = result.unwrap().0;
assert!(
!resp.response.contains("Unknown command"),
"/rmtree should not return 'Unknown command': {}",
resp.response
);
}
// -- htop bot-command path (regression: htop must remain in command registry) --
#[test]
fn htop_is_registered_in_bot_command_registry() {
let commands = crate::chat::commands::commands();
assert!(
commands.iter().any(|c| c.name == "htop"),
"htop must be registered in the bot command registry so /help lists it"
);
}
#[tokio::test] #[tokio::test]
async fn run_command_requires_project_root() { async fn run_command_requires_project_root() {
// Create a context with no project root set. // Create a context with no project root set.
+341
View File
@@ -0,0 +1,341 @@
//! Per-project event buffer and `GET /api/events` HTTP endpoint.
//!
//! The gateway polls `/api/events?since={ts_ms}` on each registered project
//! server to aggregate cross-project pipeline notifications into a single
//! gateway chat channel. Each project server buffers up to 500 events in
//! memory and serves them via this endpoint.
use crate::io::watcher::WatcherEvent;
use poem::web::{Data, Query};
use poem::{Response, handler, http::StatusCode};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
/// Maximum number of events retained in the in-memory buffer.
const MAX_BUFFER_SIZE: usize = 500;
/// A pipeline event stored in the event buffer with a timestamp.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StoredEvent {
/// A work item transitioned between pipeline stages.
StageTransition {
/// Work item ID (e.g. `"42_story_my_feature"`).
story_id: String,
/// The stage the item moved FROM (display name, e.g. `"Current"`).
from_stage: String,
/// The stage the item moved TO (directory key, e.g. `"3_qa"`).
to_stage: String,
/// Unix timestamp in milliseconds when this event was recorded.
timestamp_ms: u64,
},
/// A merge operation failed for a story.
MergeFailure {
/// Work item ID (e.g. `"42_story_my_feature"`).
story_id: String,
/// Human-readable description of the failure.
reason: String,
/// Unix timestamp in milliseconds when this event was recorded.
timestamp_ms: u64,
},
/// A story was blocked (e.g. retry limit exceeded).
StoryBlocked {
/// Work item ID (e.g. `"42_story_my_feature"`).
story_id: String,
/// Human-readable reason the story was blocked.
reason: String,
/// Unix timestamp in milliseconds when this event was recorded.
timestamp_ms: u64,
},
}
impl StoredEvent {
/// Returns the `timestamp_ms` field common to all event variants.
pub fn timestamp_ms(&self) -> u64 {
match self {
StoredEvent::StageTransition { timestamp_ms, .. } => *timestamp_ms,
StoredEvent::MergeFailure { timestamp_ms, .. } => *timestamp_ms,
StoredEvent::StoryBlocked { timestamp_ms, .. } => *timestamp_ms,
}
}
}
/// Shared, thread-safe ring buffer of recent pipeline events.
///
/// Wrapped in `Arc` so it can be shared between the background subscriber
/// task and the HTTP handler. The inner `Mutex` guards the `VecDeque`.
#[derive(Clone, Debug)]
pub struct EventBuffer(Arc<Mutex<VecDeque<StoredEvent>>>);
impl EventBuffer {
/// Create a new, empty event buffer.
pub fn new() -> Self {
EventBuffer(Arc::new(Mutex::new(VecDeque::new())))
}
/// Append an event to the buffer, evicting the oldest entry if the buffer
/// exceeds [`MAX_BUFFER_SIZE`].
pub fn push(&self, event: StoredEvent) {
let mut buf = self.0.lock().unwrap();
if buf.len() >= MAX_BUFFER_SIZE {
buf.pop_front();
}
buf.push_back(event);
}
/// Return all events whose `timestamp_ms` is strictly greater than `since_ms`.
pub fn events_since(&self, since_ms: u64) -> Vec<StoredEvent> {
let buf = self.0.lock().unwrap();
buf.iter()
.filter(|e| e.timestamp_ms() > since_ms)
.cloned()
.collect()
}
}
impl Default for EventBuffer {
fn default() -> Self {
Self::new()
}
}
/// Returns the current Unix timestamp in milliseconds.
fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
/// Spawn a background task that consumes [`WatcherEvent`] broadcasts and
/// stores relevant events in `buffer`.
///
/// Only [`WatcherEvent::WorkItem`] (with a known `from_stage`),
/// [`WatcherEvent::MergeFailure`], and [`WatcherEvent::StoryBlocked`]
/// variants are stored. All other variants are silently ignored.
pub fn subscribe_to_watcher(buffer: EventBuffer, mut rx: broadcast::Receiver<WatcherEvent>) {
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(WatcherEvent::WorkItem {
stage,
item_id,
from_stage,
..
}) => {
// Only store genuine transitions (from_stage is known).
if let Some(from) = from_stage {
buffer.push(StoredEvent::StageTransition {
story_id: item_id,
from_stage: from,
to_stage: stage,
timestamp_ms: now_ms(),
});
}
}
Ok(WatcherEvent::MergeFailure { story_id, reason }) => {
buffer.push(StoredEvent::MergeFailure {
story_id,
reason,
timestamp_ms: now_ms(),
});
}
Ok(WatcherEvent::StoryBlocked { story_id, reason }) => {
buffer.push(StoredEvent::StoryBlocked {
story_id,
reason,
timestamp_ms: now_ms(),
});
}
Ok(_) => {} // Ignore all other event types.
Err(broadcast::error::RecvError::Lagged(n)) => {
crate::slog!("[events] Subscriber lagged, skipped {n} events");
}
Err(broadcast::error::RecvError::Closed) => {
crate::slog!("[events] Watcher channel closed; stopping event subscriber");
break;
}
}
}
});
}
/// Query parameters for `GET /api/events`.
#[derive(Deserialize)]
pub struct EventsQuery {
/// Return only events with `timestamp_ms` strictly greater than this value.
/// Defaults to `0` (return all buffered events).
#[serde(default)]
pub since: u64,
}
/// `GET /api/events?since={ts_ms}`
///
/// Returns a JSON array of [`StoredEvent`] objects recorded after `since` ms.
/// The gateway polls this endpoint on each registered project server to build
/// an aggregated cross-project notification stream.
#[handler]
pub fn events_handler(
Query(params): Query<EventsQuery>,
Data(buffer): Data<&EventBuffer>,
) -> Response {
let events = buffer.events_since(params.since);
let body = serde_json::to_vec(&events).unwrap_or_else(|_| b"[]".to_vec());
Response::builder()
.status(StatusCode::OK)
.header(poem::http::header::CONTENT_TYPE, "application/json")
.body(body)
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::broadcast;
#[test]
fn event_buffer_push_and_retrieve() {
let buf = EventBuffer::new();
buf.push(StoredEvent::MergeFailure {
story_id: "42_story_x".to_string(),
reason: "conflict".to_string(),
timestamp_ms: 1000,
});
buf.push(StoredEvent::StoryBlocked {
story_id: "43_story_y".to_string(),
reason: "retry limit".to_string(),
timestamp_ms: 2000,
});
let all = buf.events_since(0);
assert_eq!(all.len(), 2);
let after_1000 = buf.events_since(1000);
assert_eq!(after_1000.len(), 1);
assert!(matches!(after_1000[0], StoredEvent::StoryBlocked { .. }));
}
#[test]
fn event_buffer_evicts_oldest_when_full() {
let buf = EventBuffer::new();
for i in 0..MAX_BUFFER_SIZE + 1 {
buf.push(StoredEvent::MergeFailure {
story_id: format!("{i}_story_x"),
reason: "x".to_string(),
timestamp_ms: i as u64,
});
}
// Buffer must not exceed MAX_BUFFER_SIZE.
assert_eq!(buf.events_since(0).len(), MAX_BUFFER_SIZE);
// Oldest entry (timestamp_ms == 0) should have been evicted.
assert!(buf.events_since(0).iter().all(|e| e.timestamp_ms() > 0));
}
#[test]
fn stage_transition_timestamp_ms_accessor() {
let e = StoredEvent::StageTransition {
story_id: "1".to_string(),
from_stage: "2_current".to_string(),
to_stage: "3_qa".to_string(),
timestamp_ms: 9999,
};
assert_eq!(e.timestamp_ms(), 9999);
}
#[tokio::test]
async fn subscribe_to_watcher_stores_work_item_with_from_stage() {
let buf = EventBuffer::new();
let (tx, rx) = broadcast::channel(16);
subscribe_to_watcher(buf.clone(), rx);
tx.send(crate::io::watcher::WatcherEvent::WorkItem {
stage: "3_qa".to_string(),
item_id: "42_story_foo".to_string(),
action: "qa".to_string(),
commit_msg: "huskies: qa 42_story_foo".to_string(),
from_stage: Some("2_current".to_string()),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let events = buf.events_since(0);
assert_eq!(events.len(), 1);
assert!(matches!(events[0], StoredEvent::StageTransition { .. }));
if let StoredEvent::StageTransition {
ref story_id,
ref from_stage,
ref to_stage,
..
} = events[0]
{
assert_eq!(story_id, "42_story_foo");
assert_eq!(from_stage, "2_current");
assert_eq!(to_stage, "3_qa");
}
}
#[tokio::test]
async fn subscribe_to_watcher_ignores_work_item_without_from_stage() {
let buf = EventBuffer::new();
let (tx, rx) = broadcast::channel(16);
subscribe_to_watcher(buf.clone(), rx);
// Synthetic event: no from_stage.
tx.send(crate::io::watcher::WatcherEvent::WorkItem {
stage: "2_current".to_string(),
item_id: "99_story_syn".to_string(),
action: "start".to_string(),
commit_msg: "huskies: start 99_story_syn".to_string(),
from_stage: None,
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(buf.events_since(0).len(), 0);
}
#[tokio::test]
async fn subscribe_to_watcher_stores_merge_failure() {
let buf = EventBuffer::new();
let (tx, rx) = broadcast::channel(16);
subscribe_to_watcher(buf.clone(), rx);
tx.send(crate::io::watcher::WatcherEvent::MergeFailure {
story_id: "42_story_foo".to_string(),
reason: "merge conflict".to_string(),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let events = buf.events_since(0);
assert_eq!(events.len(), 1);
assert!(matches!(events[0], StoredEvent::MergeFailure { .. }));
}
#[tokio::test]
async fn subscribe_to_watcher_stores_story_blocked() {
let buf = EventBuffer::new();
let (tx, rx) = broadcast::channel(16);
subscribe_to_watcher(buf.clone(), rx);
tx.send(crate::io::watcher::WatcherEvent::StoryBlocked {
story_id: "43_story_bar".to_string(),
reason: "retry limit exceeded".to_string(),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let events = buf.events_since(0);
assert_eq!(events.len(), 1);
assert!(matches!(events[0], StoredEvent::StoryBlocked { .. }));
}
}
+1 -1
View File
@@ -86,7 +86,7 @@ pub(super) fn tool_list_agents(ctx: &AppContext) -> Result<String, String> {
.filter(|a| { .filter(|a| {
project_root project_root
.as_deref() .as_deref()
.map(|root| !crate::http::agents::story_is_archived(root, &a.story_id)) .map(|root| !crate::service::agents::is_archived(root, &a.story_id))
.unwrap_or(true) .unwrap_or(true)
}) })
.map(|a| json!({ .map(|a| json!({
+16 -2
View File
@@ -7,6 +7,7 @@ pub mod bot_command;
pub mod bot_config; pub mod bot_config;
pub mod chat; pub mod chat;
pub mod context; pub mod context;
pub mod events;
pub mod health; pub mod health;
pub mod io; pub mod io;
pub mod mcp; pub mod mcp;
@@ -68,6 +69,7 @@ pub fn build_routes(
whatsapp_ctx: Option<Arc<WhatsAppWebhookContext>>, whatsapp_ctx: Option<Arc<WhatsAppWebhookContext>>,
slack_ctx: Option<Arc<SlackWebhookContext>>, slack_ctx: Option<Arc<SlackWebhookContext>>,
port: u16, port: u16,
event_buffer: Option<events::EventBuffer>,
) -> impl poem::Endpoint { ) -> impl poem::Endpoint {
let ctx_arc = std::sync::Arc::new(ctx); let ctx_arc = std::sync::Arc::new(ctx);
@@ -103,6 +105,10 @@ pub fn build_routes(
.at("/", get(assets::embedded_index)) .at("/", get(assets::embedded_index))
.at("/*path", get(assets::embedded_file)); .at("/*path", get(assets::embedded_file));
if let Some(buf) = event_buffer {
route = route.at("/api/events", get(events::events_handler).data(buf));
}
if let Some(wa_ctx) = whatsapp_ctx { if let Some(wa_ctx) = whatsapp_ctx {
route = route.at( route = route.at(
"/webhook/whatsapp", "/webhook/whatsapp",
@@ -302,7 +308,7 @@ mod tests {
fn build_routes_constructs_without_panic() { fn build_routes_constructs_without_panic() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let ctx = context::AppContext::new_test(tmp.path().to_path_buf()); let ctx = context::AppContext::new_test(tmp.path().to_path_buf());
let _endpoint = build_routes(ctx, None, None, 3001); let _endpoint = build_routes(ctx, None, None, 3001, None);
} }
#[test] #[test]
@@ -311,6 +317,14 @@ mod tests {
// ensuring the port parameter flows through to OAuthState. // ensuring the port parameter flows through to OAuthState.
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let ctx = context::AppContext::new_test(tmp.path().to_path_buf()); let ctx = context::AppContext::new_test(tmp.path().to_path_buf());
let _endpoint = build_routes(ctx, None, None, 9999); let _endpoint = build_routes(ctx, None, None, 9999, None);
}
#[test]
fn build_routes_with_event_buffer_constructs_without_panic() {
let tmp = tempfile::tempdir().unwrap();
let ctx = context::AppContext::new_test(tmp.path().to_path_buf());
let buf = events::EventBuffer::new();
let _endpoint = build_routes(ctx, None, None, 3001, Some(buf));
} }
} }
+15 -1
View File
@@ -20,6 +20,7 @@ mod llm;
pub mod log_buffer; pub mod log_buffer;
pub(crate) mod pipeline_state; pub(crate) mod pipeline_state;
pub mod rebuild; pub mod rebuild;
mod service;
mod state; mod state;
mod store; mod store;
mod workflow; mod workflow;
@@ -544,6 +545,8 @@ async fn main() -> Result<(), std::io::Error> {
let watcher_rx_for_whatsapp = watcher_tx.subscribe(); let watcher_rx_for_whatsapp = watcher_tx.subscribe();
let watcher_rx_for_slack = watcher_tx.subscribe(); let watcher_rx_for_slack = watcher_tx.subscribe();
let watcher_rx_for_discord = watcher_tx.subscribe(); let watcher_rx_for_discord = watcher_tx.subscribe();
// Subscribe to watcher events for the per-project event buffer (gateway polling).
let watcher_rx_for_events = watcher_tx.subscribe();
// Wrap perm_rx in Arc<Mutex> so it can be shared with both the WebSocket // Wrap perm_rx in Arc<Mutex> so it can be shared with both the WebSocket
// handler (via AppContext) and the Matrix bot. // handler (via AppContext) and the Matrix bot.
let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx)); let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx));
@@ -802,7 +805,18 @@ async fn main() -> Result<(), std::io::Error> {
test_jobs: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())), test_jobs: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
}; };
let app = build_routes(ctx, whatsapp_ctx.clone(), slack_ctx.clone(), port); // Create the per-project event buffer and subscribe it to the watcher channel
// so that pipeline events are buffered for the gateway's `/api/events` poller.
let event_buffer = crate::http::events::EventBuffer::new();
crate::http::events::subscribe_to_watcher(event_buffer.clone(), watcher_rx_for_events);
let app = build_routes(
ctx,
whatsapp_ctx.clone(),
slack_ctx.clone(),
port,
Some(event_buffer),
);
// Unified 1-second background tick loop: fires due timers, detects orphaned // Unified 1-second background tick loop: fires due timers, detects orphaned
// agents (watchdog), and promotes done→archived items (sweep). Replaces the // agents (watchdog), and promotes done→archived items (sweep). Replaces the
+190
View File
@@ -0,0 +1,190 @@
//! Agent I/O wrappers — the ONLY place in `service/agents/` that may perform
//! filesystem reads, process invocations, or other side effects.
//!
//! Every function here is a thin adapter over an existing lower-level call.
//! No business logic lives here; all branching belongs in the pure topic files
//! or in `mod.rs`.
use crate::agent_log::{self, LogEntry};
use crate::agents::token_usage::{self, TokenUsageRecord};
use crate::config::ProjectConfig;
use crate::worktree::{self, WorktreeListEntry};
use std::path::Path;
use super::Error;
/// Return `true` if the story's `.md` file exists in `5_done/` or `6_archived/`.
pub fn is_archived(project_root: &Path, story_id: &str) -> bool {
let work = project_root.join(".huskies").join("work");
let filename = format!("{story_id}.md");
work.join("5_done").join(&filename).exists() || work.join("6_archived").join(&filename).exists()
}
/// Read and return all log entries for the most recent session of an agent.
///
/// Returns `Ok(vec![])` when no log file exists yet.
pub fn read_agent_log(
project_root: &Path,
story_id: &str,
agent_name: &str,
) -> Result<Vec<LogEntry>, Error> {
let log_path = agent_log::find_latest_log(project_root, story_id, agent_name);
let Some(path) = log_path else {
return Ok(Vec::new());
};
agent_log::read_log(&path).map_err(Error::Io)
}
/// Read all token usage records from the persistent JSONL file.
///
/// Returns an empty vec when the file does not yet exist.
pub fn read_token_records(project_root: &Path) -> Result<Vec<TokenUsageRecord>, Error> {
token_usage::read_all(project_root).map_err(Error::Io)
}
/// Load the project configuration from `project.toml`.
///
/// Falls back to default config when the file is absent.
pub fn load_config(project_root: &Path) -> Result<ProjectConfig, Error> {
ProjectConfig::load(project_root).map_err(Error::Config)
}
/// List all worktrees under `.huskies/worktrees/`.
pub fn list_worktrees(project_root: &Path) -> Result<Vec<WorktreeListEntry>, Error> {
worktree::list_worktrees(project_root).map_err(Error::Io)
}
/// Remove the git worktree for a story by ID.
///
/// Loads the project config to honour teardown commands. Returns an error if
/// the worktree directory does not exist.
pub async fn remove_worktree(project_root: &Path, story_id: &str) -> Result<(), Error> {
let config = load_config(project_root)?;
worktree::remove_worktree_by_story_id(project_root, story_id, &config)
.await
.map_err(Error::Worktree)
}
/// Read test results persisted in a story's markdown file.
///
/// Returns `None` when the story has no test results section.
pub fn read_test_results_from_file(
project_root: &Path,
story_id: &str,
) -> Option<crate::workflow::StoryTestResults> {
crate::http::workflow::read_test_results_from_story_file(project_root, story_id)
}
/// Read a work item file from a pipeline stage directory.
///
/// Returns `Ok(Some(content))` when found, `Ok(None)` when absent.
pub fn read_work_item_from_stage(
work_dir: &std::path::Path,
stage_dir: &str,
filename: &str,
) -> Result<Option<String>, Error> {
let file_path = work_dir.join(stage_dir).join(filename);
if file_path.exists() {
let content = std::fs::read_to_string(&file_path)
.map_err(|e| Error::Io(format!("Failed to read work item: {e}")))?;
Ok(Some(content))
} else {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn make_work_dirs(tmp: &TempDir) {
for stage in &["5_done", "6_archived"] {
std::fs::create_dir_all(tmp.path().join(".huskies").join("work").join(stage)).unwrap();
}
}
// ── is_archived ───────────────────────────────────────────────────────────
#[test]
fn is_archived_false_when_file_absent() {
let tmp = TempDir::new().unwrap();
make_work_dirs(&tmp);
assert!(!is_archived(tmp.path(), "42_story_foo"));
}
#[test]
fn is_archived_true_when_in_5_done() {
let tmp = TempDir::new().unwrap();
make_work_dirs(&tmp);
std::fs::write(
tmp.path().join(".huskies/work/5_done/42_story_foo.md"),
"---\nname: test\n---\n",
)
.unwrap();
assert!(is_archived(tmp.path(), "42_story_foo"));
}
#[test]
fn is_archived_true_when_in_6_archived() {
let tmp = TempDir::new().unwrap();
make_work_dirs(&tmp);
std::fs::write(
tmp.path().join(".huskies/work/6_archived/42_story_foo.md"),
"---\nname: test\n---\n",
)
.unwrap();
assert!(is_archived(tmp.path(), "42_story_foo"));
}
// ── read_agent_log ────────────────────────────────────────────────────────
#[test]
fn read_agent_log_returns_empty_when_no_log() {
let tmp = TempDir::new().unwrap();
let entries = read_agent_log(tmp.path(), "42_story_foo", "coder-1").unwrap();
assert!(entries.is_empty());
}
// ── read_token_records ────────────────────────────────────────────────────
#[test]
fn read_token_records_returns_empty_when_no_file() {
let tmp = TempDir::new().unwrap();
let records = read_token_records(tmp.path()).unwrap();
assert!(records.is_empty());
}
// ── load_config ───────────────────────────────────────────────────────────
#[test]
fn load_config_returns_default_when_no_file() {
let tmp = TempDir::new().unwrap();
std::fs::create_dir_all(tmp.path().join(".huskies")).unwrap();
let config = load_config(tmp.path()).unwrap();
// Default config has one "default" agent
assert_eq!(config.agent.len(), 1);
assert_eq!(config.agent[0].name, "default");
}
// ── list_worktrees ────────────────────────────────────────────────────────
#[test]
fn list_worktrees_empty_when_no_dir() {
let tmp = TempDir::new().unwrap();
let entries = list_worktrees(tmp.path()).unwrap();
assert!(entries.is_empty());
}
#[test]
fn list_worktrees_returns_subdirs() {
let tmp = TempDir::new().unwrap();
let wt_dir = tmp.path().join(".huskies").join("worktrees");
std::fs::create_dir_all(wt_dir.join("42_story_foo")).unwrap();
std::fs::create_dir_all(wt_dir.join("43_story_bar")).unwrap();
let mut entries = list_worktrees(tmp.path()).unwrap();
entries.sort_by(|a, b| a.story_id.cmp(&b.story_id));
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].story_id, "42_story_foo");
assert_eq!(entries[1].story_id, "43_story_bar");
}
}
+476
View File
@@ -0,0 +1,476 @@
//! Agent service — public API for the agent domain.
//!
//! This module orchestrates calls to `io.rs` (side effects) and the pure
//! topic modules (`selection`, `token`) to implement the full agent service
//! surface. HTTP handlers call these functions instead of reaching directly
//! into `AgentPool` or the filesystem.
//!
//! Conventions: `docs/architecture/service-modules.md`
mod io;
pub mod selection;
pub mod token;
use crate::agents::AgentInfo;
use crate::agents::AgentPool;
use crate::agents::token_usage::TokenUsageRecord;
use crate::config::ProjectConfig;
use crate::workflow::StoryTestResults;
use crate::worktree::{WorktreeInfo, WorktreeListEntry};
use std::path::Path;
pub use io::is_archived;
pub use token::TokenCostSummary;
// ── Error type ────────────────────────────────────────────────────────────────
/// Typed errors returned by `service::agents` functions.
///
/// HTTP handlers map these to specific status codes — see the conventions doc
/// for the full mapping table.
#[derive(Debug)]
pub enum Error {
/// No agent with the given name/story exists in the pool.
AgentNotFound(String),
/// No work item found for the requested story ID.
WorkItemNotFound(String),
/// A worktree operation failed.
Worktree(String),
/// Project configuration could not be loaded.
Config(String),
/// A filesystem or I/O operation failed.
Io(String),
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AgentNotFound(msg) => write!(f, "Agent not found: {msg}"),
Self::WorkItemNotFound(msg) => write!(f, "Work item not found: {msg}"),
Self::Worktree(msg) => write!(f, "Worktree error: {msg}"),
Self::Config(msg) => write!(f, "Config error: {msg}"),
Self::Io(msg) => write!(f, "I/O error: {msg}"),
}
}
}
// ── Shared service types ─────────────────────────────────────────────────────
/// Content and metadata for a work-item (story) file.
#[derive(Debug, Clone)]
pub struct WorkItemContent {
pub content: String,
pub stage: String,
pub name: Option<String>,
pub agent: Option<String>,
}
/// A single entry in the project's configured agent roster.
#[derive(Debug, Clone)]
pub struct AgentConfigEntry {
pub name: String,
pub role: String,
pub stage: Option<String>,
pub model: Option<String>,
pub allowed_tools: Option<Vec<String>>,
pub max_turns: Option<u32>,
pub max_budget_usd: Option<f64>,
}
// ── Public API ────────────────────────────────────────────────────────────────
/// Start an agent for a story.
///
/// Takes only what it needs: the pool (for spawning) and the project root
/// (for config and worktree creation). Does not touch `AppContext`.
pub async fn start_agent(
pool: &AgentPool,
project_root: &Path,
story_id: &str,
agent_name: Option<&str>,
resume_context: Option<&str>,
session_id_to_resume: Option<String>,
) -> Result<AgentInfo, Error> {
pool.start_agent(
project_root,
story_id,
agent_name,
resume_context,
session_id_to_resume,
)
.await
.map_err(Error::AgentNotFound)
}
/// Stop a running agent.
pub async fn stop_agent(
pool: &AgentPool,
project_root: &Path,
story_id: &str,
agent_name: &str,
) -> Result<(), Error> {
pool.stop_agent(project_root, story_id, agent_name)
.await
.map_err(Error::AgentNotFound)
}
/// List all agents, optionally filtering out those belonging to archived stories.
///
/// When `project_root` is `None` the archive filter is skipped and all agents
/// are returned (safe default when the server is not yet fully configured).
pub fn list_agents(pool: &AgentPool, project_root: Option<&Path>) -> Result<Vec<AgentInfo>, Error> {
let agents = pool.list_agents().map_err(Error::Io)?;
match project_root {
Some(root) => Ok(selection::filter_non_archived(agents, |id| {
io::is_archived(root, id)
})),
None => Ok(agents),
}
}
/// Create a git worktree for a story.
pub async fn create_worktree(
pool: &AgentPool,
project_root: &Path,
story_id: &str,
) -> Result<WorktreeInfo, Error> {
pool.create_worktree(project_root, story_id)
.await
.map_err(Error::Worktree)
}
/// List all worktrees under `.huskies/worktrees/`.
pub fn list_worktrees(project_root: &Path) -> Result<Vec<WorktreeListEntry>, Error> {
io::list_worktrees(project_root)
}
/// Remove the git worktree for a story.
pub async fn remove_worktree(project_root: &Path, story_id: &str) -> Result<(), Error> {
io::remove_worktree(project_root, story_id).await
}
/// Get the configured agent roster from `project.toml`.
pub fn get_agent_config(project_root: &Path) -> Result<Vec<AgentConfigEntry>, Error> {
let config = io::load_config(project_root)?;
Ok(config_to_entries(&config))
}
/// Reload and return the project's agent configuration.
///
/// Semantically identical to `get_agent_config`; provided as a distinct
/// function so callers can express intent (UI "Reload" button).
pub fn reload_config(project_root: &Path) -> Result<Vec<AgentConfigEntry>, Error> {
get_agent_config(project_root)
}
/// Get the concatenated output text for an agent's most recent session.
///
/// Returns an empty string when no log file exists yet.
pub fn get_agent_output(
project_root: &Path,
story_id: &str,
agent_name: &str,
) -> Result<String, Error> {
let entries = io::read_agent_log(project_root, story_id, agent_name)?;
Ok(selection::collect_output_text(&entries))
}
/// Get the markdown content and metadata for a work item.
///
/// Searches all pipeline stage directories, falling back to the CRDT content
/// store when no file is present on disk. Returns `Error::WorkItemNotFound`
/// when neither source has the item.
pub fn get_work_item_content(
project_root: &Path,
story_id: &str,
) -> Result<WorkItemContent, Error> {
let stages = [
("1_backlog", "backlog"),
("2_current", "current"),
("3_qa", "qa"),
("4_merge", "merge"),
("5_done", "done"),
("6_archived", "archived"),
];
let work_dir = project_root.join(".huskies").join("work");
let filename = format!("{story_id}.md");
for (stage_dir, stage_name) in &stages {
if let Some(content) = io::read_work_item_from_stage(&work_dir, stage_dir, &filename)? {
let metadata = crate::io::story_metadata::parse_front_matter(&content).ok();
return Ok(WorkItemContent {
content,
stage: stage_name.to_string(),
name: metadata.as_ref().and_then(|m| m.name.clone()),
agent: metadata.and_then(|m| m.agent),
});
}
}
// CRDT-only fallback
if let Some(content) = crate::db::read_content(story_id) {
let item = crate::pipeline_state::read_typed(story_id)
.map_err(|e| Error::Io(format!("Pipeline read error: {e}")))?;
let stage = item
.as_ref()
.map(|i| match &i.stage {
crate::pipeline_state::Stage::Backlog => "backlog",
crate::pipeline_state::Stage::Coding => "current",
crate::pipeline_state::Stage::Qa => "qa",
crate::pipeline_state::Stage::Merge { .. } => "merge",
crate::pipeline_state::Stage::Done { .. } => "done",
crate::pipeline_state::Stage::Archived { .. } => "archived",
})
.unwrap_or("unknown")
.to_string();
let metadata = crate::io::story_metadata::parse_front_matter(&content).ok();
return Ok(WorkItemContent {
content,
stage,
name: metadata.as_ref().and_then(|m| m.name.clone()),
agent: metadata.and_then(|m| m.agent),
});
}
Err(Error::WorkItemNotFound(format!(
"Work item not found: {story_id}"
)))
}
/// Get test results for a work item.
///
/// Checks in-memory workflow state first (fast path), then falls back to
/// results persisted in the story file.
pub fn get_test_results(
project_root: &Path,
story_id: &str,
workflow: &crate::workflow::WorkflowState,
) -> Option<StoryTestResults> {
if let Some(results) = workflow.results.get(story_id) {
return Some(results.clone());
}
io::read_test_results_from_file(project_root, story_id)
}
/// Get the aggregated token cost for a specific story.
pub fn get_work_item_token_cost(
project_root: &Path,
story_id: &str,
) -> Result<TokenCostSummary, Error> {
let records = io::read_token_records(project_root)?;
Ok(token::aggregate_for_story(&records, story_id))
}
/// Get all token usage records across all stories.
pub fn get_all_token_usage(project_root: &Path) -> Result<Vec<TokenUsageRecord>, Error> {
io::read_token_records(project_root)
}
// ── Helpers ───────────────────────────────────────────────────────────────────
fn config_to_entries(config: &ProjectConfig) -> Vec<AgentConfigEntry> {
config
.agent
.iter()
.map(|a| AgentConfigEntry {
name: a.name.clone(),
role: a.role.clone(),
stage: a.stage.clone(),
model: a.model.clone(),
allowed_tools: a.allowed_tools.clone(),
max_turns: a.max_turns,
max_budget_usd: a.max_budget_usd,
})
.collect()
}
// ── Integration tests ─────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use crate::agents::AgentStatus;
use std::sync::Arc;
use tempfile::TempDir;
fn make_pool(tmp: &TempDir) -> Arc<AgentPool> {
let (tx, _) = tokio::sync::broadcast::channel(64);
let pool = AgentPool::new(3001, tx);
let state = crate::state::SessionState::default();
*state.project_root.lock().unwrap() = Some(tmp.path().to_path_buf());
Arc::new(pool)
}
fn make_work_dirs(tmp: &TempDir) {
for stage in &["5_done", "6_archived"] {
std::fs::create_dir_all(tmp.path().join(".huskies").join("work").join(stage)).unwrap();
}
}
fn make_stage_dirs(tmp: &TempDir) {
for stage in &[
"1_backlog",
"2_current",
"3_qa",
"4_merge",
"5_done",
"6_archived",
] {
std::fs::create_dir_all(tmp.path().join(".huskies").join("work").join(stage)).unwrap();
}
}
fn make_project_toml(tmp: &TempDir, content: &str) {
let sk_dir = tmp.path().join(".huskies");
std::fs::create_dir_all(&sk_dir).unwrap();
std::fs::write(sk_dir.join("project.toml"), content).unwrap();
}
// ── list_agents ───────────────────────────────────────────────────────────
#[tokio::test]
async fn list_agents_excludes_archived_stories() {
let tmp = TempDir::new().unwrap();
make_work_dirs(&tmp);
std::fs::write(
tmp.path()
.join(".huskies/work/6_archived/79_story_archived.md"),
"---\nname: archived\n---\n",
)
.unwrap();
let pool = make_pool(&tmp);
pool.inject_test_agent("79_story_archived", "coder-1", AgentStatus::Completed);
pool.inject_test_agent("80_story_active", "coder-1", AgentStatus::Running);
let agents = list_agents(&pool, Some(tmp.path())).unwrap();
assert!(!agents.iter().any(|a| a.story_id == "79_story_archived"));
assert!(agents.iter().any(|a| a.story_id == "80_story_active"));
}
#[tokio::test]
async fn list_agents_includes_all_when_no_project_root() {
let tmp = TempDir::new().unwrap();
let pool = make_pool(&tmp);
pool.inject_test_agent("42_story_whatever", "coder-1", AgentStatus::Completed);
let agents = list_agents(&pool, None).unwrap();
assert!(agents.iter().any(|a| a.story_id == "42_story_whatever"));
}
// ── get_agent_config ──────────────────────────────────────────────────────
#[test]
fn get_agent_config_returns_default_when_no_toml() {
let tmp = TempDir::new().unwrap();
std::fs::create_dir_all(tmp.path().join(".huskies")).unwrap();
let entries = get_agent_config(tmp.path()).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].name, "default");
}
#[test]
fn get_agent_config_returns_configured_agents() {
let tmp = TempDir::new().unwrap();
make_project_toml(
&tmp,
r#"
[[agent]]
name = "coder-1"
role = "Full-stack engineer"
model = "sonnet"
max_turns = 30
max_budget_usd = 5.0
"#,
);
let entries = get_agent_config(tmp.path()).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].name, "coder-1");
assert_eq!(entries[0].model, Some("sonnet".to_string()));
assert_eq!(entries[0].max_turns, Some(30));
}
// ── get_agent_output ──────────────────────────────────────────────────────
#[test]
fn get_agent_output_returns_empty_when_no_log() {
let tmp = TempDir::new().unwrap();
let output = get_agent_output(tmp.path(), "42_story_foo", "coder-1").unwrap();
assert_eq!(output, "");
}
// ── get_work_item_content ─────────────────────────────────────────────────
#[test]
fn get_work_item_content_reads_from_backlog() {
let tmp = TempDir::new().unwrap();
make_stage_dirs(&tmp);
std::fs::write(
tmp.path().join(".huskies/work/1_backlog/42_story_foo.md"),
"---\nname: \"Foo Story\"\n---\n\nSome content.",
)
.unwrap();
let item = get_work_item_content(tmp.path(), "42_story_foo").unwrap();
assert!(item.content.contains("Some content."));
assert_eq!(item.stage, "backlog");
assert_eq!(item.name, Some("Foo Story".to_string()));
}
#[test]
fn get_work_item_content_returns_not_found_for_absent_story() {
let tmp = TempDir::new().unwrap();
make_stage_dirs(&tmp);
let result = get_work_item_content(tmp.path(), "99_story_nonexistent");
assert!(matches!(result, Err(Error::WorkItemNotFound(_))));
}
// ── get_work_item_token_cost ──────────────────────────────────────────────
#[test]
fn get_work_item_token_cost_returns_zero_when_no_records() {
let tmp = TempDir::new().unwrap();
let summary = get_work_item_token_cost(tmp.path(), "42_story_foo").unwrap();
assert_eq!(summary.total_cost_usd, 0.0);
assert!(summary.agents.is_empty());
}
// ── get_all_token_usage ───────────────────────────────────────────────────
#[test]
fn get_all_token_usage_returns_empty_when_no_file() {
let tmp = TempDir::new().unwrap();
let records = get_all_token_usage(tmp.path()).unwrap();
assert!(records.is_empty());
}
// ── get_test_results ──────────────────────────────────────────────────────
#[test]
fn get_test_results_returns_none_when_no_results() {
let tmp = TempDir::new().unwrap();
let workflow = crate::workflow::WorkflowState::default();
let result = get_test_results(tmp.path(), "42_story_foo", &workflow);
assert!(result.is_none());
}
#[test]
fn get_test_results_returns_in_memory_results_first() {
let tmp = TempDir::new().unwrap();
let mut workflow = crate::workflow::WorkflowState::default();
workflow
.record_test_results_validated(
"42_story_foo".to_string(),
vec![crate::workflow::TestCaseResult {
name: "test1".to_string(),
status: crate::workflow::TestStatus::Pass,
details: None,
}],
vec![],
)
.unwrap();
let result =
get_test_results(tmp.path(), "42_story_foo", &workflow).expect("should have results");
assert_eq!(result.unit.len(), 1);
assert_eq!(result.unit[0].name, "test1");
}
}
+171
View File
@@ -0,0 +1,171 @@
//! Pure agent selection and filtering logic — no I/O, no side effects.
//!
//! All functions in this module are pure: they take data, transform it, and
//! return a result without touching the filesystem, network, or any mutable
//! global state. This makes them fast to test without tempdirs or async runtimes.
use crate::agent_log::LogEntry;
use crate::agents::AgentInfo;
/// Filter a list of agents, removing any whose story is archived.
///
/// `is_archived` is a predicate injected by the caller — typically a closure
/// over the project root that calls `io::is_archived`. This keeps the function
/// pure: it never touches the filesystem itself.
pub fn filter_non_archived<F>(agents: Vec<AgentInfo>, is_archived: F) -> Vec<AgentInfo>
where
F: Fn(&str) -> bool,
{
agents
.into_iter()
.filter(|info| !is_archived(&info.story_id))
.collect()
}
/// Concatenate the text of all `output` events from an agent log.
///
/// Non-output events (status, done, error, agent_json, thinking) are silently
/// skipped. Returns an empty string when `entries` is empty or contains no
/// output events.
pub fn collect_output_text(entries: &[LogEntry]) -> String {
entries
.iter()
.filter(|e| e.event.get("type").and_then(|t| t.as_str()) == Some("output"))
.filter_map(|e| {
e.event
.get("text")
.and_then(|t| t.as_str())
.map(str::to_owned)
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agents::AgentStatus;
fn make_agent(story_id: &str) -> AgentInfo {
AgentInfo {
story_id: story_id.to_string(),
agent_name: "coder-1".to_string(),
status: AgentStatus::Running,
session_id: None,
worktree_path: None,
base_branch: None,
completion: None,
log_session_id: None,
throttled: false,
}
}
fn make_log_entry(event_type: &str, text: Option<&str>) -> LogEntry {
let mut obj = serde_json::Map::new();
obj.insert(
"type".to_string(),
serde_json::Value::String(event_type.to_string()),
);
if let Some(t) = text {
obj.insert("text".to_string(), serde_json::Value::String(t.to_string()));
}
LogEntry {
timestamp: "2024-01-01T00:00:00Z".to_string(),
event: serde_json::Value::Object(obj),
}
}
// ── filter_non_archived ───────────────────────────────────────────────────
#[test]
fn filter_keeps_non_archived_agents() {
let agents = vec![make_agent("10_active"), make_agent("11_active")];
let result = filter_non_archived(agents, |_| false);
assert_eq!(result.len(), 2);
}
#[test]
fn filter_removes_archived_agents() {
let agents = vec![make_agent("10_archived"), make_agent("11_active")];
let result = filter_non_archived(agents, |id| id == "10_archived");
assert_eq!(result.len(), 1);
assert_eq!(result[0].story_id, "11_active");
}
#[test]
fn filter_removes_all_when_all_archived() {
let agents = vec![make_agent("10_a"), make_agent("11_b")];
let result = filter_non_archived(agents, |_| true);
assert!(result.is_empty());
}
#[test]
fn filter_returns_empty_for_empty_input() {
let result = filter_non_archived(vec![], |_| false);
assert!(result.is_empty());
}
#[test]
fn filter_preserves_order() {
let agents = vec![
make_agent("1_a"),
make_agent("2_b"),
make_agent("3_c"),
make_agent("4_d"),
];
let result = filter_non_archived(agents, |id| id == "2_b");
assert_eq!(result.len(), 3);
assert_eq!(result[0].story_id, "1_a");
assert_eq!(result[1].story_id, "3_c");
assert_eq!(result[2].story_id, "4_d");
}
// ── collect_output_text ───────────────────────────────────────────────────
#[test]
fn collect_output_text_empty_entries() {
let result = collect_output_text(&[]);
assert_eq!(result, "");
}
#[test]
fn collect_output_text_skips_non_output_events() {
let entries = vec![
make_log_entry("status", Some("running")),
make_log_entry("done", None),
];
let result = collect_output_text(&entries);
assert_eq!(result, "");
}
#[test]
fn collect_output_text_concatenates_output_events() {
let entries = vec![
make_log_entry("output", Some("Hello ")),
make_log_entry("output", Some("world\n")),
];
let result = collect_output_text(&entries);
assert_eq!(result, "Hello world\n");
}
#[test]
fn collect_output_text_skips_output_without_text_field() {
let entry = LogEntry {
timestamp: "2024-01-01T00:00:00Z".to_string(),
event: serde_json::json!({"type": "output"}),
};
let result = collect_output_text(&[entry]);
assert_eq!(result, "");
}
#[test]
fn collect_output_text_mixed_event_types() {
let entries = vec![
make_log_entry("status", Some("running")),
make_log_entry("output", Some("line1\n")),
make_log_entry("agent_json", None),
make_log_entry("output", Some("line2\n")),
make_log_entry("done", None),
];
let result = collect_output_text(&entries);
assert_eq!(result, "line1\nline2\n");
}
}
+160
View File
@@ -0,0 +1,160 @@
//! Pure token usage aggregation — no I/O, no side effects.
//!
//! Functions here take slices of `TokenUsageRecord` (already loaded by `io.rs`)
//! and compute summaries. Tests cover every branch without touching the filesystem.
use crate::agents::token_usage::TokenUsageRecord;
use std::collections::HashMap;
/// Per-agent cost breakdown entry.
#[derive(Debug, Clone, PartialEq)]
pub struct AgentTokenCost {
pub agent_name: String,
pub model: Option<String>,
pub input_tokens: u64,
pub output_tokens: u64,
pub cache_creation_input_tokens: u64,
pub cache_read_input_tokens: u64,
pub total_cost_usd: f64,
}
/// Aggregated token cost for a story.
#[derive(Debug, Clone, PartialEq)]
pub struct TokenCostSummary {
pub total_cost_usd: f64,
pub agents: Vec<AgentTokenCost>,
}
/// Aggregate token usage records for a single story.
///
/// Records for other stories are ignored. The returned `agents` list is sorted
/// alphabetically by `agent_name` for deterministic output. Returns a zero-cost
/// summary when no records match the given `story_id`.
pub fn aggregate_for_story(records: &[TokenUsageRecord], story_id: &str) -> TokenCostSummary {
let mut agent_map: HashMap<String, AgentTokenCost> = HashMap::new();
let mut total_cost_usd = 0.0_f64;
for record in records.iter().filter(|r| r.story_id == story_id) {
total_cost_usd += record.usage.total_cost_usd;
let entry = agent_map
.entry(record.agent_name.clone())
.or_insert_with(|| AgentTokenCost {
agent_name: record.agent_name.clone(),
model: record.model.clone(),
input_tokens: 0,
output_tokens: 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0,
total_cost_usd: 0.0,
});
entry.input_tokens += record.usage.input_tokens;
entry.output_tokens += record.usage.output_tokens;
entry.cache_creation_input_tokens += record.usage.cache_creation_input_tokens;
entry.cache_read_input_tokens += record.usage.cache_read_input_tokens;
entry.total_cost_usd += record.usage.total_cost_usd;
}
let mut agents: Vec<AgentTokenCost> = agent_map.into_values().collect();
agents.sort_by(|a, b| a.agent_name.cmp(&b.agent_name));
TokenCostSummary {
total_cost_usd,
agents,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agents::TokenUsage;
fn make_record(story_id: &str, agent: &str, cost: f64) -> TokenUsageRecord {
TokenUsageRecord {
story_id: story_id.to_string(),
agent_name: agent.to_string(),
timestamp: "2024-01-01T00:00:00Z".to_string(),
model: None,
usage: TokenUsage {
input_tokens: 100,
output_tokens: 50,
cache_creation_input_tokens: 10,
cache_read_input_tokens: 20,
total_cost_usd: cost,
},
}
}
#[test]
fn aggregate_returns_zero_when_no_records() {
let summary = aggregate_for_story(&[], "42_story_foo");
assert_eq!(summary.total_cost_usd, 0.0);
assert!(summary.agents.is_empty());
}
#[test]
fn aggregate_filters_to_story_id() {
let records = vec![
make_record("42_story_foo", "coder-1", 1.0),
make_record("99_story_other", "coder-1", 5.0),
];
let summary = aggregate_for_story(&records, "42_story_foo");
assert!((summary.total_cost_usd - 1.0).abs() < f64::EPSILON);
assert_eq!(summary.agents.len(), 1);
}
#[test]
fn aggregate_sums_tokens_per_agent() {
let records = vec![
make_record("42_story_foo", "coder-1", 1.0),
make_record("42_story_foo", "coder-1", 2.0),
];
let summary = aggregate_for_story(&records, "42_story_foo");
assert!((summary.total_cost_usd - 3.0).abs() < f64::EPSILON);
assert_eq!(summary.agents.len(), 1);
assert_eq!(summary.agents[0].input_tokens, 200);
assert_eq!(summary.agents[0].output_tokens, 100);
assert!((summary.agents[0].total_cost_usd - 3.0).abs() < f64::EPSILON);
}
#[test]
fn aggregate_splits_by_agent() {
let records = vec![
make_record("42_story_foo", "coder-1", 1.0),
make_record("42_story_foo", "qa", 0.5),
];
let summary = aggregate_for_story(&records, "42_story_foo");
assert!((summary.total_cost_usd - 1.5).abs() < f64::EPSILON);
assert_eq!(summary.agents.len(), 2);
// sorted alphabetically
assert_eq!(summary.agents[0].agent_name, "coder-1");
assert_eq!(summary.agents[1].agent_name, "qa");
}
#[test]
fn aggregate_sorts_agents_alphabetically() {
let records = vec![
make_record("42_story_foo", "z-agent", 1.0),
make_record("42_story_foo", "a-agent", 1.0),
make_record("42_story_foo", "m-agent", 1.0),
];
let summary = aggregate_for_story(&records, "42_story_foo");
assert_eq!(summary.agents[0].agent_name, "a-agent");
assert_eq!(summary.agents[1].agent_name, "m-agent");
assert_eq!(summary.agents[2].agent_name, "z-agent");
}
#[test]
fn aggregate_returns_zero_when_no_matching_story() {
let records = vec![make_record("99_other", "coder-1", 5.0)];
let summary = aggregate_for_story(&records, "42_story_foo");
assert_eq!(summary.total_cost_usd, 0.0);
assert!(summary.agents.is_empty());
}
#[test]
fn aggregate_preserves_model_from_first_record() {
let mut r = make_record("42_story_foo", "coder-1", 1.0);
r.model = Some("claude-sonnet".to_string());
let summary = aggregate_for_story(&[r], "42_story_foo");
assert_eq!(summary.agents[0].model, Some("claude-sonnet".to_string()));
}
}
+8
View File
@@ -0,0 +1,8 @@
//! Service layer — domain logic extracted from HTTP handlers.
//!
//! Each sub-module follows the conventions documented in
//! `docs/architecture/service-modules.md`:
//! - `mod.rs` orchestrates and owns the typed `Error` type
//! - `io.rs` is the only file that performs side effects
//! - Topic-named pure files contain branching logic with no I/O
pub mod agents;
+85
View File
@@ -200,6 +200,36 @@ prompt = "You are working on story {{story_id}} ..."
system_prompt = "You are a senior full-stack engineer ..."</code></pre> system_prompt = "You are a senior full-stack engineer ..."</code></pre>
<p>To use this agent for a specific story, add <code>agent: opus</code> to the story's front matter, or run <code>start &lt;number&gt; opus</code> in chat.</p> <p>To use this agent for a specific story, add <code>agent: opus</code> to the story's front matter, or run <code>start &lt;number&gt; opus</code> in chat.</p>
<h2 id="agent-md">Project-local agent prompt (<code>.huskies/AGENT.md</code>)</h2>
<p>Place a file at <code>.huskies/AGENT.md</code> in your project root to append project-specific guidance to every agent's initial prompt at spawn time.</p>
<h3>How it works</h3>
<ul>
<li>Huskies reads <code>.huskies/AGENT.md</code> each time an agent is spawned — no caching, no restart required.</li>
<li>The file content is appended <em>after</em> the baked-in agent prompt, so project guidance refines core instructions without overriding them.</li>
<li>Applies to all agent roles: coder, QA, mergemaster, and supervisor.</li>
<li>If the file is missing or empty, agents spawn normally — no warnings, no errors.</li>
<li>When the file exists and is non-empty, a single <code>INFO</code> log line is emitted showing the file path and byte count.</li>
</ul>
<h3>Ordering</h3>
<ol>
<li>Baked-in agent prompt (from <code>agents.toml</code> or <code>project.toml</code>)</li>
<li>Project-local content from <code>.huskies/AGENT.md</code></li>
<li>Resume context (only on agent restart after a gate failure)</li>
</ol>
<h3>Example</h3>
<pre><code># .huskies/AGENT.md
## Documentation
Docs live in `website/docs/*.html`, not Markdown files.
Edit the relevant .html file when a story asks for documentation.
## Quality gates
Run `cargo clippy -- -D warnings` before committing. Zero warnings allowed.</code></pre>
<p>Edit the file at any time — the next agent spawn picks up the latest content automatically.</p>
<h2 id="bot-toml">bot.toml</h2> <h2 id="bot-toml">bot.toml</h2>
<p>Chat transport configuration. Lives at <code>.huskies/bot.toml</code>. This file is gitignored as it contains credentials. Copy the appropriate example file to get started:</p> <p>Chat transport configuration. Lives at <code>.huskies/bot.toml</code>. This file is gitignored as it contains credentials. Copy the appropriate example file to get started:</p>
<pre><code>cp .huskies/bot.toml.matrix.example .huskies/bot.toml</code></pre> <pre><code>cp .huskies/bot.toml.matrix.example .huskies/bot.toml</code></pre>
@@ -217,6 +247,61 @@ system_prompt = "You are a senior full-stack engineer ..."</code></pre>
<tr><td>history_size</td><td>Optional. Maximum conversation turns to remember per room/user (default: 20).</td></tr> <tr><td>history_size</td><td>Optional. Maximum conversation turns to remember per room/user (default: 20).</td></tr>
</tbody> </tbody>
</table> </table>
<h2 id="gateway-aggregated-stream">Gateway: aggregated chat stream</h2>
<p>When running <code>huskies --gateway</code>, you can configure a single bot that receives pipeline notifications from <strong>all</strong> registered projects. Events are prefixed with <code>[project-name]</code> so you can tell them apart in one shared room.</p>
<p>The aggregated stream is configured entirely in the <strong>gateway's</strong> <code>.huskies/bot.toml</code> — no per-project bot config is required and no per-project files need to change when you add a new project to <code>projects.toml</code>.</p>
<h3>Enabling the aggregated stream</h3>
<p>Add or edit <code>&lt;gateway-config-dir&gt;/.huskies/bot.toml</code> and set <code>enabled = true</code>. The gateway bot will automatically poll every project listed in <code>projects.toml</code> and forward events to the configured rooms.</p>
<pre><code># &lt;gateway-config-dir&gt;/.huskies/bot.toml
enabled = true
transport = "matrix"
homeserver = "https://matrix.example.com"
username = "@gateway-bot:example.com"
password = "secret"
room_ids = ["!gateway-room:example.com"]
allowed_users = ["@you:example.com"]
# Gateway-specific: poll interval and on/off switch
aggregated_notifications_poll_interval_secs = 5 # default
aggregated_notifications_enabled = true # default</code></pre>
<h3>Aggregated stream settings</h3>
<table>
<thead>
<tr><th>Key</th><th>Type</th><th>Default</th><th>Description</th></tr>
</thead>
<tbody>
<tr>
<td>aggregated_notifications_enabled</td>
<td>bool</td>
<td><code>true</code></td>
<td>Set to <code>false</code> to disable the aggregated stream without disabling the gateway bot entirely. Per-project configs are never consulted.</td>
</tr>
<tr>
<td>aggregated_notifications_poll_interval_secs</td>
<td>integer</td>
<td><code>5</code></td>
<td>How often (in seconds) the gateway polls each project's <code>/api/events</code> endpoint. Lower values reduce notification latency.</td>
</tr>
</tbody>
</table>
<h3>No-duplicate guarantee</h3>
<p>Per-project bots and the gateway aggregated stream send to different rooms — they are independent. Events from a per-project bot go to that project's rooms; events from the gateway stream go to the gateway rooms. The same event will never appear twice in either room.</p>
<h3>Unreachable projects</h3>
<p>If a per-project server is temporarily unreachable, the gateway logs a warning and skips that project for the current poll cycle. All other projects continue to deliver notifications normally. No configuration change is required — the poller retries on the next interval.</p>
<h3>Supported event types</h3>
<p>The aggregated stream delivers the following event types, each prefixed with the project name:</p>
<ul>
<li><strong>Stage transitions</strong> — story created, agent started, QA requested, QA approved/rejected, merge succeeded (all pipeline stage moves)</li>
<li><strong>Merge failures</strong> — merge failed with a reason</li>
<li><strong>Story blocked</strong> — story blocked after exceeding retry limit</li>
</ul>
</main> </main>
</div> </div>
+7
View File
@@ -200,6 +200,13 @@
<tr><td>discord_allowed_users</td><td>Optional. Discord user IDs allowed to interact. When absent, all users in configured channels can interact.</td></tr> <tr><td>discord_allowed_users</td><td>Optional. Discord user IDs allowed to interact. When absent, all users in configured channels can interact.</td></tr>
</tbody> </tbody>
</table> </table>
<h2 id="gateway-aggregated">Gateway: aggregated notifications</h2>
<p>When using <code>huskies --gateway</code>, you can configure the gateway bot to receive notifications from <strong>all</strong> registered projects in a single room. Events are prefixed with <code>[project-name]</code>.</p>
<p>No additional transport is required — the gateway aggregated stream works with any of the transports above. Configure the gateway's <code>.huskies/bot.toml</code> with your transport credentials and set <code>aggregated_notifications_enabled = true</code> (the default). See <a href="configuration.html#gateway-aggregated-stream">Configuration → Gateway aggregated stream</a> for the full reference.</p>
<div class="note">
<strong>No per-project changes needed:</strong> Adding a new project to <code>projects.toml</code> does not require editing per-project bot configs — the gateway picks it up automatically.
</div>
</main> </main>
</div> </div>