huskies: merge 742_refactor_read_rpc_frame_multiplexer_on_crdt_sync_websocket

This commit is contained in:
dave
2026-04-27 21:22:55 +00:00
parent c1bb5888a8
commit be7b7025d5
5 changed files with 394 additions and 0 deletions
+9
View File
@@ -11,6 +11,7 @@ use crate::slog_warn;
use super::auth; use super::auth;
use super::dispatch::{handle_incoming_binary, handle_incoming_text}; use super::dispatch::{handle_incoming_binary, handle_incoming_text};
use super::rpc::try_handle_rpc_text;
use super::wire::{AuthMessage, ChallengeMessage, HelloMessage, ServerAuthMessage, SyncMessage}; use super::wire::{AuthMessage, ChallengeMessage, HelloMessage, ServerAuthMessage, SyncMessage};
use super::{AUTH_TIMEOUT_SECS, PING_INTERVAL_SECS, PONG_TIMEOUT_SECS}; use super::{AUTH_TIMEOUT_SECS, PING_INTERVAL_SECS, PONG_TIMEOUT_SECS};
@@ -363,6 +364,14 @@ pub(crate) async fn connect_and_sync(url: &str, token: Option<&str>) -> Result<(
if !flush_ok { if !flush_ok {
break; break;
} }
} else if let Some(rpc_resp) = try_handle_rpc_text(text.as_ref()) {
// RPC request from the peer — dispatch and reply.
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
if let Ok(json) = serde_json::to_string(&rpc_resp)
&& sink.send(TungsteniteMsg::Text(json.into())).await.is_err()
{
break;
}
} else { } else {
handle_incoming_text(text.as_ref()); handle_incoming_text(text.as_ref());
} }
+1
View File
@@ -61,6 +61,7 @@ mod auth;
mod client; mod client;
mod dispatch; mod dispatch;
mod handshake; mod handshake;
mod rpc;
mod server; mod server;
mod wire; mod wire;
+211
View File
@@ -0,0 +1,211 @@
//! RPC method registry for the `/crdt-sync` WebSocket multiplexer.
//!
//! Incoming [`RpcFrame::RpcRequest`] frames are dispatched through this
//! registry. Each method handler is a plain function that accepts a
//! `serde_json::Value` parameter bag and returns a `serde_json::Value` result.
//!
//! # Registering handlers
//!
//! Add a new entry to the `HANDLERS` static slice:
//!
//! ```rust,ignore
//! ("my.method", handle_my_method as Handler),
//! ```
//!
//! # Unknown methods
//!
//! [`dispatch`] returns `Err("NOT_FOUND")` for any method not present in the
//! registry. The caller should translate this into an
//! [`RpcFrame::RpcResponse`] with `ok: false, code: "NOT_FOUND"`.
use serde_json::Value;
use super::wire::RpcFrame;
/// Signature for a synchronous RPC method handler.
pub(super) type Handler = fn(Value) -> Value;
/// Static registry mapping method names to handlers.
///
/// Add new handlers here. The registry is a plain slice — linear scan is
/// fine for the small number of methods expected.
static HANDLERS: &[(&str, Handler)] = &[("health.check", handle_health_check)];
/// Handler for the `health.check` method.
///
/// Returns `{"status": "ok"}` unconditionally. Used as a smoke test to
/// verify that the RPC multiplexer is wired up correctly.
fn handle_health_check(_params: Value) -> Value {
serde_json::json!({"status": "ok"})
}
/// Dispatch an incoming RPC method call to the registered handler.
///
/// Returns `Ok(result)` on success or `Err("NOT_FOUND")` if no handler is
/// registered for `method`.
pub(super) fn dispatch(method: &str, params: Value) -> Result<Value, &'static str> {
for (name, handler) in HANDLERS {
if *name == method {
return Ok(handler(params));
}
}
Err("NOT_FOUND")
}
/// Try to parse `text` as an [`RpcFrame::RpcRequest`] and, if successful,
/// dispatch it and return the corresponding [`RpcFrame::RpcResponse`].
///
/// Returns `None` if the text is not a valid `rpc_request` frame (i.e. it
/// should be forwarded to the CRDT sync handler instead).
pub(super) fn try_handle_rpc_text(text: &str) -> Option<RpcFrame> {
let frame: RpcFrame = serde_json::from_str(text).ok()?;
match frame {
RpcFrame::RpcRequest {
version,
correlation_id,
method,
params,
..
} => {
let response = match dispatch(&method, params) {
Ok(result) => RpcFrame::RpcResponse {
version,
correlation_id,
ok: true,
result: Some(result),
error: None,
code: None,
},
Err(code) => RpcFrame::RpcResponse {
version,
correlation_id,
ok: false,
result: None,
error: Some(format!("unknown method: {method}")),
code: Some(code.to_string()),
},
};
Some(response)
}
// Response frames arriving on our socket are not requests — nothing to send back.
RpcFrame::RpcResponse { .. } => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn health_check_returns_ok_status() {
let result = dispatch("health.check", serde_json::json!({}));
assert!(result.is_ok());
let val = result.unwrap();
assert_eq!(val["status"], "ok");
}
#[test]
fn unknown_method_returns_not_found() {
let result = dispatch("nonexistent.method", serde_json::json!({}));
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "NOT_FOUND");
}
#[test]
fn try_handle_rpc_text_health_check() {
let req = serde_json::json!({
"kind": "rpc_request",
"version": 1,
"correlation_id": "test-corr-1",
"ttl_ms": 5000,
"method": "health.check",
"params": {}
});
let text = serde_json::to_string(&req).unwrap();
let resp = try_handle_rpc_text(&text).expect("must produce a response");
match resp {
RpcFrame::RpcResponse {
ok,
correlation_id,
result,
code,
..
} => {
assert!(ok, "health.check must succeed");
assert_eq!(correlation_id, "test-corr-1");
assert!(result.is_some());
assert_eq!(result.unwrap()["status"], "ok");
assert!(code.is_none());
}
_ => panic!("Expected RpcResponse"),
}
}
#[test]
fn try_handle_rpc_text_unknown_method_returns_not_found() {
let req = serde_json::json!({
"kind": "rpc_request",
"version": 1,
"correlation_id": "test-corr-2",
"ttl_ms": 1000,
"method": "no.such.method",
"params": {}
});
let text = serde_json::to_string(&req).unwrap();
let resp = try_handle_rpc_text(&text).expect("must produce a response for unknown method");
match resp {
RpcFrame::RpcResponse { ok, code, .. } => {
assert!(!ok, "unknown method must not succeed");
assert_eq!(code.unwrap(), "NOT_FOUND");
}
_ => panic!("Expected RpcResponse"),
}
}
#[test]
fn try_handle_rpc_text_ignores_non_rpc_frames() {
// A SyncMessage::Bulk frame must not be treated as an RPC request.
let bulk = r#"{"type":"bulk","ops":[]}"#;
assert!(try_handle_rpc_text(bulk).is_none());
}
#[test]
fn try_handle_rpc_text_ignores_rpc_response_frames() {
// An incoming rpc_response (e.g. reply to our own outbound request) must
// not trigger a further response.
let resp = serde_json::json!({
"kind": "rpc_response",
"version": 1,
"correlation_id": "x",
"ok": true,
"result": {"status": "ok"}
});
let text = serde_json::to_string(&resp).unwrap();
assert!(try_handle_rpc_text(&text).is_none());
}
#[test]
fn try_handle_rpc_text_ignores_invalid_json() {
assert!(try_handle_rpc_text("not json at all").is_none());
}
#[test]
fn rpc_response_correlation_id_mirrors_request() {
let req = serde_json::json!({
"kind": "rpc_request",
"version": 1,
"correlation_id": "mirror-me",
"ttl_ms": 500,
"method": "health.check",
"params": {}
});
let text = serde_json::to_string(&req).unwrap();
let resp = try_handle_rpc_text(&text).unwrap();
match resp {
RpcFrame::RpcResponse { correlation_id, .. } => {
assert_eq!(correlation_id, "mirror-me");
}
_ => panic!("Expected RpcResponse"),
}
}
}
+8
View File
@@ -21,6 +21,7 @@ use crate::slog_warn;
use super::auth::{REQUIRE_TOKEN, trusted_keys, validate_join_token}; use super::auth::{REQUIRE_TOKEN, trusted_keys, validate_join_token};
use super::dispatch::{handle_incoming_binary, handle_incoming_text}; use super::dispatch::{handle_incoming_binary, handle_incoming_text};
use super::rpc::try_handle_rpc_text;
use super::wire::{AuthMessage, ChallengeMessage, SyncMessage}; use super::wire::{AuthMessage, ChallengeMessage, SyncMessage};
use super::{AUTH_TIMEOUT_SECS, PING_INTERVAL_SECS, PONG_TIMEOUT_SECS}; use super::{AUTH_TIMEOUT_SECS, PING_INTERVAL_SECS, PONG_TIMEOUT_SECS};
@@ -269,6 +270,13 @@ pub async fn crdt_sync_handler(
if !flush_ok { if !flush_ok {
break; break;
} }
} else if let Some(rpc_resp) = try_handle_rpc_text(&text) {
// RPC request — dispatch to registry and send response.
if let Ok(json) = serde_json::to_string(&rpc_resp)
&& sink.send(WsMessage::Text(json)).await.is_err()
{
break;
}
} else { } else {
// Bulk state dump, legacy op frame, or clock frame. // Bulk state dump, legacy op frame, or clock frame.
handle_incoming_text(&text); handle_incoming_text(&text);
+165
View File
@@ -59,6 +59,53 @@ pub(crate) enum SyncMessage {
Ready, Ready,
} }
// ── RPC frame types ─────────────────────────────────────────────────
/// Multiplexed RPC frames sent over the `/crdt-sync` WebSocket alongside CRDT
/// sync messages. The discriminator field is `"kind"` (not `"type"`) so that
/// these frames are unambiguously distinguishable from [`SyncMessage`] frames.
///
/// The caller is responsible for synthesising a `TIMEOUT` response locally if
/// its `ttl_ms` elapses with no `rpc_response` arriving — the server only
/// produces `NOT_FOUND` for unknown methods, not timeout frames.
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub(crate) enum RpcFrame {
/// Outbound RPC call from a peer.
RpcRequest {
/// Protocol version (currently `1`).
version: u32,
/// Caller-generated opaque ID used to correlate the response.
correlation_id: String,
/// Caller's deadline in milliseconds. The server processes the call
/// regardless; the caller synthesises a local `TIMEOUT` if it expires.
ttl_ms: u64,
/// Dot-separated method name, e.g. `"health.check"`.
method: String,
/// Arbitrary JSON parameters passed to the method handler.
params: serde_json::Value,
},
/// Response sent back to the caller.
RpcResponse {
/// Protocol version (mirrors the request version).
version: u32,
/// Matches the `correlation_id` of the originating `rpc_request`.
correlation_id: String,
/// `true` if the method executed successfully.
ok: bool,
/// Present on success (`ok == true`).
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<serde_json::Value>,
/// Human-readable error description (present on `ok == false`).
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
/// Machine-readable error code (e.g. `"NOT_FOUND"`). Present on
/// `ok == false`.
#[serde(skip_serializing_if = "Option::is_none")]
code: Option<String>,
},
}
/// Crate-visible re-export of `SyncMessage` for backwards-compatibility testing. /// Crate-visible re-export of `SyncMessage` for backwards-compatibility testing.
/// ///
/// Used by `crdt_snapshot` tests to verify that snapshot messages are NOT /// Used by `crdt_snapshot` tests to verify that snapshot messages are NOT
@@ -157,4 +204,122 @@ mod tests {
let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); let deserialized: SyncMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(deserialized, SyncMessage::Ready)); assert!(matches!(deserialized, SyncMessage::Ready));
} }
#[test]
fn rpc_request_serialization_roundtrip() {
let frame = RpcFrame::RpcRequest {
version: 1,
correlation_id: "abc-123".to_string(),
ttl_ms: 5000,
method: "health.check".to_string(),
params: serde_json::json!({}),
};
let json = serde_json::to_string(&frame).unwrap();
assert!(
json.contains(r#""kind":"rpc_request""#),
"kind tag must be rpc_request"
);
assert!(json.contains("abc-123"));
assert!(json.contains("health.check"));
let back: RpcFrame = serde_json::from_str(&json).unwrap();
match back {
RpcFrame::RpcRequest {
version,
correlation_id,
ttl_ms,
method,
..
} => {
assert_eq!(version, 1);
assert_eq!(correlation_id, "abc-123");
assert_eq!(ttl_ms, 5000);
assert_eq!(method, "health.check");
}
_ => panic!("Expected RpcRequest"),
}
}
#[test]
fn rpc_response_ok_serialization_roundtrip() {
let frame = RpcFrame::RpcResponse {
version: 1,
correlation_id: "abc-123".to_string(),
ok: true,
result: Some(serde_json::json!({"status": "ok"})),
error: None,
code: None,
};
let json = serde_json::to_string(&frame).unwrap();
assert!(json.contains(r#""kind":"rpc_response""#));
assert!(json.contains(r#""ok":true"#));
// error and code must be absent
assert!(
!json.contains("error"),
"error field must be absent on success"
);
assert!(
!json.contains("\"code\""),
"code field must be absent on success"
);
let back: RpcFrame = serde_json::from_str(&json).unwrap();
match back {
RpcFrame::RpcResponse {
ok,
result,
error,
code,
..
} => {
assert!(ok);
assert!(result.is_some());
assert!(error.is_none());
assert!(code.is_none());
}
_ => panic!("Expected RpcResponse"),
}
}
#[test]
fn rpc_response_not_found_serialization_roundtrip() {
let frame = RpcFrame::RpcResponse {
version: 1,
correlation_id: "xyz-999".to_string(),
ok: false,
result: None,
error: Some("method not found".to_string()),
code: Some("NOT_FOUND".to_string()),
};
let json = serde_json::to_string(&frame).unwrap();
assert!(json.contains(r#""ok":false"#));
assert!(json.contains("NOT_FOUND"));
assert!(
!json.contains("\"result\""),
"result must be absent on error"
);
let back: RpcFrame = serde_json::from_str(&json).unwrap();
match back {
RpcFrame::RpcResponse {
ok, result, code, ..
} => {
assert!(!ok);
assert!(result.is_none());
assert_eq!(code.unwrap(), "NOT_FOUND");
}
_ => panic!("Expected RpcResponse"),
}
}
#[test]
fn rpc_request_not_parseable_as_sync_message() {
let json = r#"{"kind":"rpc_request","version":1,"correlation_id":"x","ttl_ms":1000,"method":"health.check","params":{}}"#;
let result: Result<SyncMessage, _> = serde_json::from_str(json);
assert!(result.is_err(), "rpc_request must not parse as SyncMessage");
}
#[test]
fn sync_message_not_parseable_as_rpc_frame() {
let json = r#"{"type":"bulk","ops":[]}"#;
let result: Result<RpcFrame, _> = serde_json::from_str(json);
assert!(result.is_err(), "SyncMessage must not parse as RpcFrame");
}
} }