diff --git a/server/src/crdt_state/state.rs b/server/src/crdt_state/state.rs index 29e53f6a..620000ff 100644 --- a/server/src/crdt_state/state.rs +++ b/server/src/crdt_state/state.rs @@ -308,10 +308,15 @@ where let raw_op = op_fn(state); let signed = raw_op.sign(&state.keypair); state.crdt.apply(signed.clone()); - if let Err(e) = state.persist_tx.send(signed.clone()) { - crate::slog_error!( - "[crdt] Failed to send op to persist task: {e}; persist task may be dead. \ - In-memory state is now ahead of persisted state." + if state.persist_tx.send(signed.clone()).is_err() { + let op_type = if signed.inner.is_deleted { + "Delete" + } else { + "Insert" + }; + let seq = signed.inner.seq; + crate::slog_warn!( + "[crdt_persist] persist channel send failed: op_type={op_type} seq={seq}" ); } @@ -488,7 +493,7 @@ mod tests { } #[test] - fn persist_tx_send_failure_logs_error() { + fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() { let kp = make_keypair(); let crdt = BaseCrdt::::new(&kp); let (persist_tx, persist_rx) = mpsc::unbounded_channel::(); @@ -506,7 +511,7 @@ mod tests { drop(persist_rx); let item_json: JsonValue = json!({ - "story_id": "518_story_persist_fail", + "story_id": "676_story_persist_fail", "stage": "1_backlog", "name": "Persist Fail Test", "agent": "", @@ -518,33 +523,91 @@ mod tests { }) .into(); - let before_errors = crate::log_buffer::global() - .get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Error)) + let before_warns = crate::log_buffer::global() + .get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Warn)) .len(); apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); - let error_entries = crate::log_buffer::global().get_recent_entries( + let warn_entries = crate::log_buffer::global().get_recent_entries( 1000, None, - Some(&crate::log_buffer::LogLevel::Error), + Some(&crate::log_buffer::LogLevel::Warn), ); - assert!( - error_entries.len() > before_errors, - "expected an ERROR log entry when persist_tx send fails, but none was added" + assert_eq!( + warn_entries.len(), + before_warns + 1, + "expected exactly one WARN log entry when persist_tx send fails" ); - let last_error = &error_entries[error_entries.len() - 1]; + let warn = &warn_entries[warn_entries.len() - 1]; assert!( - last_error.message.contains("persist"), - "error message should mention persist: {}", - last_error.message + warn.message.contains("[crdt_persist]"), + "WARN message must be prefixed [crdt_persist]: {}", + warn.message ); assert!( - last_error.message.contains("ahead") || last_error.message.contains("diverged"), - "error message should note in-memory/persisted divergence: {}", - last_error.message + warn.message.contains("op_type="), + "WARN message must include op_type: {}", + warn.message + ); + assert!( + warn.message.contains("seq="), + "WARN message must include seq: {}", + warn.message + ); + } + + #[test] + fn persist_tx_send_success_emits_no_warn() { + let kp = make_keypair(); + let crdt = BaseCrdt::::new(&kp); + let (persist_tx, _persist_rx) = mpsc::unbounded_channel::(); + + let mut state = CrdtState { + crdt, + keypair: kp, + index: HashMap::new(), + node_index: HashMap::new(), + persist_tx, + lamport_floor: 0, + }; + + let item_json: JsonValue = json!({ + "story_id": "676_story_happy_path", + "stage": "1_backlog", + "name": "Happy Path Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let before_warns = crate::log_buffer::global() + .get_recent_entries( + 1000, + Some("[crdt_persist]"), + Some(&crate::log_buffer::LogLevel::Warn), + ) + .len(); + + apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); + + let after_warns = crate::log_buffer::global() + .get_recent_entries( + 1000, + Some("[crdt_persist]"), + Some(&crate::log_buffer::LogLevel::Warn), + ) + .len(); + + assert_eq!( + after_warns, before_warns, + "no [crdt_persist] WARN should be emitted when persist_tx send succeeds" ); }