From be7b7025d57f2eb38fda7cfa79ecda8a3905f38d Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 21:22:55 +0000 Subject: [PATCH] huskies: merge 742_refactor_read_rpc_frame_multiplexer_on_crdt_sync_websocket --- server/src/crdt_sync/client.rs | 9 ++ server/src/crdt_sync/mod.rs | 1 + server/src/crdt_sync/rpc.rs | 211 +++++++++++++++++++++++++++++++++ server/src/crdt_sync/server.rs | 8 ++ server/src/crdt_sync/wire.rs | 165 ++++++++++++++++++++++++++ 5 files changed, 394 insertions(+) create mode 100644 server/src/crdt_sync/rpc.rs diff --git a/server/src/crdt_sync/client.rs b/server/src/crdt_sync/client.rs index 1a11fd1f..a3c60650 100644 --- a/server/src/crdt_sync/client.rs +++ b/server/src/crdt_sync/client.rs @@ -11,6 +11,7 @@ use crate::slog_warn; use super::auth; use super::dispatch::{handle_incoming_binary, handle_incoming_text}; +use super::rpc::try_handle_rpc_text; use super::wire::{AuthMessage, ChallengeMessage, HelloMessage, ServerAuthMessage, SyncMessage}; use super::{AUTH_TIMEOUT_SECS, PING_INTERVAL_SECS, PONG_TIMEOUT_SECS}; @@ -363,6 +364,14 @@ pub(crate) async fn connect_and_sync(url: &str, token: Option<&str>) -> Result<( if !flush_ok { break; } + } else if let Some(rpc_resp) = try_handle_rpc_text(text.as_ref()) { + // RPC request from the peer — dispatch and reply. + use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; + if let Ok(json) = serde_json::to_string(&rpc_resp) + && sink.send(TungsteniteMsg::Text(json.into())).await.is_err() + { + break; + } } else { handle_incoming_text(text.as_ref()); } diff --git a/server/src/crdt_sync/mod.rs b/server/src/crdt_sync/mod.rs index 6c1efb0e..9d129805 100644 --- a/server/src/crdt_sync/mod.rs +++ b/server/src/crdt_sync/mod.rs @@ -61,6 +61,7 @@ mod auth; mod client; mod dispatch; mod handshake; +mod rpc; mod server; mod wire; diff --git a/server/src/crdt_sync/rpc.rs b/server/src/crdt_sync/rpc.rs new file mode 100644 index 00000000..f1642659 --- /dev/null +++ b/server/src/crdt_sync/rpc.rs @@ -0,0 +1,211 @@ +//! RPC method registry for the `/crdt-sync` WebSocket multiplexer. +//! +//! Incoming [`RpcFrame::RpcRequest`] frames are dispatched through this +//! registry. Each method handler is a plain function that accepts a +//! `serde_json::Value` parameter bag and returns a `serde_json::Value` result. +//! +//! # Registering handlers +//! +//! Add a new entry to the `HANDLERS` static slice: +//! +//! ```rust,ignore +//! ("my.method", handle_my_method as Handler), +//! ``` +//! +//! # Unknown methods +//! +//! [`dispatch`] returns `Err("NOT_FOUND")` for any method not present in the +//! registry. The caller should translate this into an +//! [`RpcFrame::RpcResponse`] with `ok: false, code: "NOT_FOUND"`. + +use serde_json::Value; + +use super::wire::RpcFrame; + +/// Signature for a synchronous RPC method handler. +pub(super) type Handler = fn(Value) -> Value; + +/// Static registry mapping method names to handlers. +/// +/// Add new handlers here. The registry is a plain slice — linear scan is +/// fine for the small number of methods expected. +static HANDLERS: &[(&str, Handler)] = &[("health.check", handle_health_check)]; + +/// Handler for the `health.check` method. +/// +/// Returns `{"status": "ok"}` unconditionally. Used as a smoke test to +/// verify that the RPC multiplexer is wired up correctly. +fn handle_health_check(_params: Value) -> Value { + serde_json::json!({"status": "ok"}) +} + +/// Dispatch an incoming RPC method call to the registered handler. +/// +/// Returns `Ok(result)` on success or `Err("NOT_FOUND")` if no handler is +/// registered for `method`. +pub(super) fn dispatch(method: &str, params: Value) -> Result { + for (name, handler) in HANDLERS { + if *name == method { + return Ok(handler(params)); + } + } + Err("NOT_FOUND") +} + +/// Try to parse `text` as an [`RpcFrame::RpcRequest`] and, if successful, +/// dispatch it and return the corresponding [`RpcFrame::RpcResponse`]. +/// +/// Returns `None` if the text is not a valid `rpc_request` frame (i.e. it +/// should be forwarded to the CRDT sync handler instead). +pub(super) fn try_handle_rpc_text(text: &str) -> Option { + let frame: RpcFrame = serde_json::from_str(text).ok()?; + match frame { + RpcFrame::RpcRequest { + version, + correlation_id, + method, + params, + .. + } => { + let response = match dispatch(&method, params) { + Ok(result) => RpcFrame::RpcResponse { + version, + correlation_id, + ok: true, + result: Some(result), + error: None, + code: None, + }, + Err(code) => RpcFrame::RpcResponse { + version, + correlation_id, + ok: false, + result: None, + error: Some(format!("unknown method: {method}")), + code: Some(code.to_string()), + }, + }; + Some(response) + } + // Response frames arriving on our socket are not requests — nothing to send back. + RpcFrame::RpcResponse { .. } => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn health_check_returns_ok_status() { + let result = dispatch("health.check", serde_json::json!({})); + assert!(result.is_ok()); + let val = result.unwrap(); + assert_eq!(val["status"], "ok"); + } + + #[test] + fn unknown_method_returns_not_found() { + let result = dispatch("nonexistent.method", serde_json::json!({})); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "NOT_FOUND"); + } + + #[test] + fn try_handle_rpc_text_health_check() { + let req = serde_json::json!({ + "kind": "rpc_request", + "version": 1, + "correlation_id": "test-corr-1", + "ttl_ms": 5000, + "method": "health.check", + "params": {} + }); + let text = serde_json::to_string(&req).unwrap(); + let resp = try_handle_rpc_text(&text).expect("must produce a response"); + match resp { + RpcFrame::RpcResponse { + ok, + correlation_id, + result, + code, + .. + } => { + assert!(ok, "health.check must succeed"); + assert_eq!(correlation_id, "test-corr-1"); + assert!(result.is_some()); + assert_eq!(result.unwrap()["status"], "ok"); + assert!(code.is_none()); + } + _ => panic!("Expected RpcResponse"), + } + } + + #[test] + fn try_handle_rpc_text_unknown_method_returns_not_found() { + let req = serde_json::json!({ + "kind": "rpc_request", + "version": 1, + "correlation_id": "test-corr-2", + "ttl_ms": 1000, + "method": "no.such.method", + "params": {} + }); + let text = serde_json::to_string(&req).unwrap(); + let resp = try_handle_rpc_text(&text).expect("must produce a response for unknown method"); + match resp { + RpcFrame::RpcResponse { ok, code, .. } => { + assert!(!ok, "unknown method must not succeed"); + assert_eq!(code.unwrap(), "NOT_FOUND"); + } + _ => panic!("Expected RpcResponse"), + } + } + + #[test] + fn try_handle_rpc_text_ignores_non_rpc_frames() { + // A SyncMessage::Bulk frame must not be treated as an RPC request. + let bulk = r#"{"type":"bulk","ops":[]}"#; + assert!(try_handle_rpc_text(bulk).is_none()); + } + + #[test] + fn try_handle_rpc_text_ignores_rpc_response_frames() { + // An incoming rpc_response (e.g. reply to our own outbound request) must + // not trigger a further response. + let resp = serde_json::json!({ + "kind": "rpc_response", + "version": 1, + "correlation_id": "x", + "ok": true, + "result": {"status": "ok"} + }); + let text = serde_json::to_string(&resp).unwrap(); + assert!(try_handle_rpc_text(&text).is_none()); + } + + #[test] + fn try_handle_rpc_text_ignores_invalid_json() { + assert!(try_handle_rpc_text("not json at all").is_none()); + } + + #[test] + fn rpc_response_correlation_id_mirrors_request() { + let req = serde_json::json!({ + "kind": "rpc_request", + "version": 1, + "correlation_id": "mirror-me", + "ttl_ms": 500, + "method": "health.check", + "params": {} + }); + let text = serde_json::to_string(&req).unwrap(); + let resp = try_handle_rpc_text(&text).unwrap(); + match resp { + RpcFrame::RpcResponse { correlation_id, .. } => { + assert_eq!(correlation_id, "mirror-me"); + } + _ => panic!("Expected RpcResponse"), + } + } +} diff --git a/server/src/crdt_sync/server.rs b/server/src/crdt_sync/server.rs index b01be4ba..104186ed 100644 --- a/server/src/crdt_sync/server.rs +++ b/server/src/crdt_sync/server.rs @@ -21,6 +21,7 @@ use crate::slog_warn; use super::auth::{REQUIRE_TOKEN, trusted_keys, validate_join_token}; use super::dispatch::{handle_incoming_binary, handle_incoming_text}; +use super::rpc::try_handle_rpc_text; use super::wire::{AuthMessage, ChallengeMessage, SyncMessage}; use super::{AUTH_TIMEOUT_SECS, PING_INTERVAL_SECS, PONG_TIMEOUT_SECS}; @@ -269,6 +270,13 @@ pub async fn crdt_sync_handler( if !flush_ok { break; } + } else if let Some(rpc_resp) = try_handle_rpc_text(&text) { + // RPC request — dispatch to registry and send response. + if let Ok(json) = serde_json::to_string(&rpc_resp) + && sink.send(WsMessage::Text(json)).await.is_err() + { + break; + } } else { // Bulk state dump, legacy op frame, or clock frame. handle_incoming_text(&text); diff --git a/server/src/crdt_sync/wire.rs b/server/src/crdt_sync/wire.rs index c99336e3..45108eb6 100644 --- a/server/src/crdt_sync/wire.rs +++ b/server/src/crdt_sync/wire.rs @@ -59,6 +59,53 @@ pub(crate) enum SyncMessage { Ready, } +// ── RPC frame types ───────────────────────────────────────────────── + +/// Multiplexed RPC frames sent over the `/crdt-sync` WebSocket alongside CRDT +/// sync messages. The discriminator field is `"kind"` (not `"type"`) so that +/// these frames are unambiguously distinguishable from [`SyncMessage`] frames. +/// +/// The caller is responsible for synthesising a `TIMEOUT` response locally if +/// its `ttl_ms` elapses with no `rpc_response` arriving — the server only +/// produces `NOT_FOUND` for unknown methods, not timeout frames. +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub(crate) enum RpcFrame { + /// Outbound RPC call from a peer. + RpcRequest { + /// Protocol version (currently `1`). + version: u32, + /// Caller-generated opaque ID used to correlate the response. + correlation_id: String, + /// Caller's deadline in milliseconds. The server processes the call + /// regardless; the caller synthesises a local `TIMEOUT` if it expires. + ttl_ms: u64, + /// Dot-separated method name, e.g. `"health.check"`. + method: String, + /// Arbitrary JSON parameters passed to the method handler. + params: serde_json::Value, + }, + /// Response sent back to the caller. + RpcResponse { + /// Protocol version (mirrors the request version). + version: u32, + /// Matches the `correlation_id` of the originating `rpc_request`. + correlation_id: String, + /// `true` if the method executed successfully. + ok: bool, + /// Present on success (`ok == true`). + #[serde(skip_serializing_if = "Option::is_none")] + result: Option, + /// Human-readable error description (present on `ok == false`). + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, + /// Machine-readable error code (e.g. `"NOT_FOUND"`). Present on + /// `ok == false`. + #[serde(skip_serializing_if = "Option::is_none")] + code: Option, + }, +} + /// Crate-visible re-export of `SyncMessage` for backwards-compatibility testing. /// /// Used by `crdt_snapshot` tests to verify that snapshot messages are NOT @@ -157,4 +204,122 @@ mod tests { let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); assert!(matches!(deserialized, SyncMessage::Ready)); } + + #[test] + fn rpc_request_serialization_roundtrip() { + let frame = RpcFrame::RpcRequest { + version: 1, + correlation_id: "abc-123".to_string(), + ttl_ms: 5000, + method: "health.check".to_string(), + params: serde_json::json!({}), + }; + let json = serde_json::to_string(&frame).unwrap(); + assert!( + json.contains(r#""kind":"rpc_request""#), + "kind tag must be rpc_request" + ); + assert!(json.contains("abc-123")); + assert!(json.contains("health.check")); + let back: RpcFrame = serde_json::from_str(&json).unwrap(); + match back { + RpcFrame::RpcRequest { + version, + correlation_id, + ttl_ms, + method, + .. + } => { + assert_eq!(version, 1); + assert_eq!(correlation_id, "abc-123"); + assert_eq!(ttl_ms, 5000); + assert_eq!(method, "health.check"); + } + _ => panic!("Expected RpcRequest"), + } + } + + #[test] + fn rpc_response_ok_serialization_roundtrip() { + let frame = RpcFrame::RpcResponse { + version: 1, + correlation_id: "abc-123".to_string(), + ok: true, + result: Some(serde_json::json!({"status": "ok"})), + error: None, + code: None, + }; + let json = serde_json::to_string(&frame).unwrap(); + assert!(json.contains(r#""kind":"rpc_response""#)); + assert!(json.contains(r#""ok":true"#)); + // error and code must be absent + assert!( + !json.contains("error"), + "error field must be absent on success" + ); + assert!( + !json.contains("\"code\""), + "code field must be absent on success" + ); + let back: RpcFrame = serde_json::from_str(&json).unwrap(); + match back { + RpcFrame::RpcResponse { + ok, + result, + error, + code, + .. + } => { + assert!(ok); + assert!(result.is_some()); + assert!(error.is_none()); + assert!(code.is_none()); + } + _ => panic!("Expected RpcResponse"), + } + } + + #[test] + fn rpc_response_not_found_serialization_roundtrip() { + let frame = RpcFrame::RpcResponse { + version: 1, + correlation_id: "xyz-999".to_string(), + ok: false, + result: None, + error: Some("method not found".to_string()), + code: Some("NOT_FOUND".to_string()), + }; + let json = serde_json::to_string(&frame).unwrap(); + assert!(json.contains(r#""ok":false"#)); + assert!(json.contains("NOT_FOUND")); + assert!( + !json.contains("\"result\""), + "result must be absent on error" + ); + let back: RpcFrame = serde_json::from_str(&json).unwrap(); + match back { + RpcFrame::RpcResponse { + ok, result, code, .. + } => { + assert!(!ok); + assert!(result.is_none()); + assert_eq!(code.unwrap(), "NOT_FOUND"); + } + _ => panic!("Expected RpcResponse"), + } + } + + #[test] + fn rpc_request_not_parseable_as_sync_message() { + let json = r#"{"kind":"rpc_request","version":1,"correlation_id":"x","ttl_ms":1000,"method":"health.check","params":{}}"#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err(), "rpc_request must not parse as SyncMessage"); + } + + #[test] + fn sync_message_not_parseable_as_rpc_frame() { + let json = r#"{"type":"bulk","ops":[]}"#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err(), "SyncMessage must not parse as RpcFrame"); + } }