huskies: merge 518_story_apply_and_persist_should_log_when_persist_tx_send_fails_instead_of_silently_dropping_the_op
This commit is contained in:
@@ -269,7 +269,12 @@ where
|
|||||||
let raw_op = op_fn(state);
|
let raw_op = op_fn(state);
|
||||||
let signed = raw_op.sign(&state.keypair);
|
let signed = raw_op.sign(&state.keypair);
|
||||||
state.crdt.apply(signed.clone());
|
state.crdt.apply(signed.clone());
|
||||||
let _ = state.persist_tx.send(signed.clone());
|
if let Err(e) = state.persist_tx.send(signed.clone()) {
|
||||||
|
crate::slog_error!(
|
||||||
|
"[crdt] Failed to send op to persist task: {e}; persist task may be dead. \
|
||||||
|
In-memory state is now ahead of persisted state."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Track in ALL_OPS and broadcast to sync peers.
|
// Track in ALL_OPS and broadcast to sync peers.
|
||||||
if let Ok(json) = serde_json::to_string(&signed)
|
if let Ok(json) = serde_json::to_string(&signed)
|
||||||
@@ -434,8 +439,13 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Persist the op (fire-and-forget).
|
// Persist the op.
|
||||||
let _ = state.persist_tx.send(op.clone());
|
if let Err(e) = state.persist_tx.send(op.clone()) {
|
||||||
|
crate::slog_error!(
|
||||||
|
"[crdt] Failed to send remote op to persist task: {e}; persist task may be dead. \
|
||||||
|
In-memory state is now ahead of persisted state."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Track in ALL_OPS.
|
// Track in ALL_OPS.
|
||||||
if let Ok(json) = serde_json::to_string(&op)
|
if let Ok(json) = serde_json::to_string(&op)
|
||||||
@@ -1394,4 +1404,60 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Story 518: persist_tx send failure logging ───────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn persist_tx_send_failure_logs_error() {
|
||||||
|
let kp = make_keypair();
|
||||||
|
let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
let (persist_tx, persist_rx) = mpsc::unbounded_channel::<SignedOp>();
|
||||||
|
|
||||||
|
let mut state = CrdtState {
|
||||||
|
crdt,
|
||||||
|
keypair: kp,
|
||||||
|
index: HashMap::new(),
|
||||||
|
persist_tx,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Drop the receiver so that the next send fails immediately.
|
||||||
|
drop(persist_rx);
|
||||||
|
|
||||||
|
let item_json: JsonValue = json!({
|
||||||
|
"story_id": "518_story_persist_fail",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Persist Fail Test",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
let before_errors = crate::log_buffer::global()
|
||||||
|
.get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Error))
|
||||||
|
.len();
|
||||||
|
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
|
||||||
|
|
||||||
|
let error_entries = crate::log_buffer::global()
|
||||||
|
.get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Error));
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
error_entries.len() > before_errors,
|
||||||
|
"expected an ERROR log entry when persist_tx send fails, but none was added"
|
||||||
|
);
|
||||||
|
|
||||||
|
let last_error = &error_entries[error_entries.len() - 1];
|
||||||
|
assert!(
|
||||||
|
last_error.message.contains("persist"),
|
||||||
|
"error message should mention persist: {}",
|
||||||
|
last_error.message
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
last_error.message.contains("ahead") || last_error.message.contains("diverged"),
|
||||||
|
"error message should note in-memory/persisted divergence: {}",
|
||||||
|
last_error.message
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user