huskies: merge 960

This commit is contained in:
dave
2026-05-13 13:17:46 +00:00
parent a47fbc4179
commit 77dc09668c
14 changed files with 138 additions and 193 deletions
+10 -5
View File
@@ -163,21 +163,26 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
// Detect and broadcast stage transitions.
for (sid, &idx) in &state.index {
let new_stage = match state.crdt.doc.items[idx].stage.view() {
let new_stage_str = match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => s,
_ => continue,
};
let old_stage = pre_stages.get(sid).cloned();
let changed = old_stage.as_deref() != Some(&new_stage);
let old_stage_str = pre_stages.get(sid).cloned();
let changed = old_stage_str.as_deref() != Some(&new_stage_str);
if changed {
// Storage seam: convert the raw CRDT stage strings to typed Stage values here.
let Some(to_stage) = crate::pipeline_state::Stage::from_dir(&new_stage_str) else {
continue;
};
let from_stage = old_stage_str.and_then(|s| crate::pipeline_state::Stage::from_dir(&s));
let name = match state.crdt.doc.items[idx].name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
emit_event(CrdtEvent {
story_id: sid.clone(),
from_stage: old_stage,
to_stage: new_stage,
from_stage,
to_stage,
name,
});
}
+9 -3
View File
@@ -124,7 +124,10 @@ async fn subscribe_receives_stage_transition_events() {
let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on insert");
assert_eq!(evt.story_id, "906_story_subscribe");
assert!(evt.from_stage.is_none());
assert_eq!(evt.to_stage, "backlog");
assert!(matches!(
evt.to_stage,
crate::pipeline_state::Stage::Backlog
));
// Update stage — emit_event fires again with the real from_stage.
write_item_str(
@@ -141,8 +144,11 @@ async fn subscribe_receives_stage_transition_events() {
let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on stage change");
assert_eq!(evt.story_id, "906_story_subscribe");
assert_eq!(evt.from_stage.as_deref(), Some("backlog"));
assert_eq!(evt.to_stage, "coding");
assert!(matches!(
evt.from_stage,
Some(crate::pipeline_state::Stage::Backlog)
));
assert!(matches!(evt.to_stage, crate::pipeline_state::Stage::Coding));
}
#[tokio::test]
+16 -12
View File
@@ -12,9 +12,9 @@ pub struct CrdtEvent {
/// Work item ID (e.g. `"42_story_my_feature"`).
pub story_id: String,
/// The stage the item was in before this transition, or `None` for new items.
pub from_stage: Option<String>,
pub from_stage: Option<crate::pipeline_state::Stage>,
/// The stage the item is now in.
pub to_stage: String,
pub to_stage: crate::pipeline_state::Stage,
/// Human-readable story name from the CRDT document.
pub name: Option<String>,
}
@@ -536,13 +536,16 @@ mod tests {
fn crdt_event_has_expected_fields() {
let evt = CrdtEvent {
story_id: "42_story_foo".to_string(),
from_stage: Some("1_backlog".to_string()),
to_stage: "2_current".to_string(),
from_stage: Some(crate::pipeline_state::Stage::Backlog),
to_stage: crate::pipeline_state::Stage::Coding,
name: Some("Foo Feature".to_string()),
};
assert_eq!(evt.story_id, "42_story_foo");
assert_eq!(evt.from_stage.as_deref(), Some("1_backlog"));
assert_eq!(evt.to_stage, "2_current");
assert!(matches!(
evt.from_stage,
Some(crate::pipeline_state::Stage::Backlog)
));
assert!(matches!(evt.to_stage, crate::pipeline_state::Stage::Coding));
assert_eq!(evt.name.as_deref(), Some("Foo Feature"));
}
@@ -551,7 +554,7 @@ mod tests {
let evt = CrdtEvent {
story_id: "10_story_bar".to_string(),
from_stage: None,
to_stage: "1_backlog".to_string(),
to_stage: crate::pipeline_state::Stage::Backlog,
name: None,
};
let cloned = evt.clone();
@@ -569,7 +572,7 @@ mod tests {
emit_event(CrdtEvent {
story_id: "99_story_noop".to_string(),
from_stage: None,
to_stage: "1_backlog".to_string(),
to_stage: crate::pipeline_state::Stage::Backlog,
name: None,
});
}
@@ -686,19 +689,20 @@ mod tests {
#[test]
fn crdt_event_broadcast_channel_round_trip() {
use crate::pipeline_state::Stage;
let (tx, mut rx) = broadcast::channel::<CrdtEvent>(16);
let evt = CrdtEvent {
story_id: "70_story_broadcast".to_string(),
from_stage: Some("1_backlog".to_string()),
to_stage: "2_current".to_string(),
from_stage: Some(Stage::Backlog),
to_stage: Stage::Coding,
name: Some("Broadcast Test".to_string()),
};
tx.send(evt).unwrap();
let received = rx.try_recv().unwrap();
assert_eq!(received.story_id, "70_story_broadcast");
assert_eq!(received.from_stage.as_deref(), Some("1_backlog"));
assert_eq!(received.to_stage, "2_current");
assert!(matches!(received.from_stage, Some(Stage::Backlog)));
assert!(matches!(received.to_stage, Stage::Coding));
assert_eq!(received.name.as_deref(), Some("Broadcast Test"));
}
}
+5 -3
View File
@@ -276,10 +276,12 @@ pub fn write_item(
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
// Storage seam: convert the old raw CRDT stage string to a typed Stage.
let from_stage = old_stage.and_then(|s| Stage::from_dir(&s));
emit_event(CrdtEvent {
story_id: story_id.to_string(),
from_stage: old_stage,
to_stage: stage_str.to_string(),
from_stage,
to_stage: stage.clone(),
name: current_name,
});
}
@@ -333,7 +335,7 @@ pub fn write_item(
emit_event(CrdtEvent {
story_id: story_id.to_string(),
from_stage: None,
to_stage: stage_str.to_string(),
to_stage: stage.clone(),
name: name.map(String::from),
});
}