huskies: merge 676_bug_apply_and_persist_silently_drops_ops_when_persist_channel_send_fails

This commit is contained in:
dave
2026-04-27 14:32:42 +00:00
parent cbb0a50729
commit ea872fa01c
+83 -20
View File
@@ -308,10 +308,15 @@ where
let raw_op = op_fn(state); let raw_op = op_fn(state);
let signed = raw_op.sign(&state.keypair); let signed = raw_op.sign(&state.keypair);
state.crdt.apply(signed.clone()); state.crdt.apply(signed.clone());
if let Err(e) = state.persist_tx.send(signed.clone()) { if state.persist_tx.send(signed.clone()).is_err() {
crate::slog_error!( let op_type = if signed.inner.is_deleted {
"[crdt] Failed to send op to persist task: {e}; persist task may be dead. \ "Delete"
In-memory state is now ahead of persisted state." } else {
"Insert"
};
let seq = signed.inner.seq;
crate::slog_warn!(
"[crdt_persist] persist channel send failed: op_type={op_type} seq={seq}"
); );
} }
@@ -488,7 +493,7 @@ mod tests {
} }
#[test] #[test]
fn persist_tx_send_failure_logs_error() { fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() {
let kp = make_keypair(); let kp = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&kp); let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let (persist_tx, persist_rx) = mpsc::unbounded_channel::<SignedOp>(); let (persist_tx, persist_rx) = mpsc::unbounded_channel::<SignedOp>();
@@ -506,7 +511,7 @@ mod tests {
drop(persist_rx); drop(persist_rx);
let item_json: JsonValue = json!({ let item_json: JsonValue = json!({
"story_id": "518_story_persist_fail", "story_id": "676_story_persist_fail",
"stage": "1_backlog", "stage": "1_backlog",
"name": "Persist Fail Test", "name": "Persist Fail Test",
"agent": "", "agent": "",
@@ -518,33 +523,91 @@ mod tests {
}) })
.into(); .into();
let before_errors = crate::log_buffer::global() let before_warns = crate::log_buffer::global()
.get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Error)) .get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Warn))
.len(); .len();
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
let error_entries = crate::log_buffer::global().get_recent_entries( let warn_entries = crate::log_buffer::global().get_recent_entries(
1000, 1000,
None, None,
Some(&crate::log_buffer::LogLevel::Error), Some(&crate::log_buffer::LogLevel::Warn),
); );
assert!( assert_eq!(
error_entries.len() > before_errors, warn_entries.len(),
"expected an ERROR log entry when persist_tx send fails, but none was added" before_warns + 1,
"expected exactly one WARN log entry when persist_tx send fails"
); );
let last_error = &error_entries[error_entries.len() - 1]; let warn = &warn_entries[warn_entries.len() - 1];
assert!( assert!(
last_error.message.contains("persist"), warn.message.contains("[crdt_persist]"),
"error message should mention persist: {}", "WARN message must be prefixed [crdt_persist]: {}",
last_error.message warn.message
); );
assert!( assert!(
last_error.message.contains("ahead") || last_error.message.contains("diverged"), warn.message.contains("op_type="),
"error message should note in-memory/persisted divergence: {}", "WARN message must include op_type: {}",
last_error.message warn.message
);
assert!(
warn.message.contains("seq="),
"WARN message must include seq: {}",
warn.message
);
}
#[test]
fn persist_tx_send_success_emits_no_warn() {
let kp = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let (persist_tx, _persist_rx) = mpsc::unbounded_channel::<SignedOp>();
let mut state = CrdtState {
crdt,
keypair: kp,
index: HashMap::new(),
node_index: HashMap::new(),
persist_tx,
lamport_floor: 0,
};
let item_json: JsonValue = json!({
"story_id": "676_story_happy_path",
"stage": "1_backlog",
"name": "Happy Path Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let before_warns = crate::log_buffer::global()
.get_recent_entries(
1000,
Some("[crdt_persist]"),
Some(&crate::log_buffer::LogLevel::Warn),
)
.len();
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
let after_warns = crate::log_buffer::global()
.get_recent_entries(
1000,
Some("[crdt_persist]"),
Some(&crate::log_buffer::LogLevel::Warn),
)
.len();
assert_eq!(
after_warns, before_warns,
"no [crdt_persist] WARN should be emitted when persist_tx send succeeds"
); );
} }