212 lines
6.9 KiB
Rust
212 lines
6.9 KiB
Rust
|
|
//! RPC method registry for the `/crdt-sync` WebSocket multiplexer.
|
||
|
|
//!
|
||
|
|
//! Incoming [`RpcFrame::RpcRequest`] frames are dispatched through this
|
||
|
|
//! registry. Each method handler is a plain function that accepts a
|
||
|
|
//! `serde_json::Value` parameter bag and returns a `serde_json::Value` result.
|
||
|
|
//!
|
||
|
|
//! # Registering handlers
|
||
|
|
//!
|
||
|
|
//! Add a new entry to the `HANDLERS` static slice:
|
||
|
|
//!
|
||
|
|
//! ```rust,ignore
|
||
|
|
//! ("my.method", handle_my_method as Handler),
|
||
|
|
//! ```
|
||
|
|
//!
|
||
|
|
//! # Unknown methods
|
||
|
|
//!
|
||
|
|
//! [`dispatch`] returns `Err("NOT_FOUND")` for any method not present in the
|
||
|
|
//! registry. The caller should translate this into an
|
||
|
|
//! [`RpcFrame::RpcResponse`] with `ok: false, code: "NOT_FOUND"`.
|
||
|
|
|
||
|
|
use serde_json::Value;
|
||
|
|
|
||
|
|
use super::wire::RpcFrame;
|
||
|
|
|
||
|
|
/// Signature for a synchronous RPC method handler.
|
||
|
|
pub(super) type Handler = fn(Value) -> Value;
|
||
|
|
|
||
|
|
/// Static registry mapping method names to handlers.
|
||
|
|
///
|
||
|
|
/// Add new handlers here. The registry is a plain slice — linear scan is
|
||
|
|
/// fine for the small number of methods expected.
|
||
|
|
static HANDLERS: &[(&str, Handler)] = &[("health.check", handle_health_check)];
|
||
|
|
|
||
|
|
/// Handler for the `health.check` method.
|
||
|
|
///
|
||
|
|
/// Returns `{"status": "ok"}` unconditionally. Used as a smoke test to
|
||
|
|
/// verify that the RPC multiplexer is wired up correctly.
|
||
|
|
fn handle_health_check(_params: Value) -> Value {
|
||
|
|
serde_json::json!({"status": "ok"})
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Dispatch an incoming RPC method call to the registered handler.
|
||
|
|
///
|
||
|
|
/// Returns `Ok(result)` on success or `Err("NOT_FOUND")` if no handler is
|
||
|
|
/// registered for `method`.
|
||
|
|
pub(super) fn dispatch(method: &str, params: Value) -> Result<Value, &'static str> {
|
||
|
|
for (name, handler) in HANDLERS {
|
||
|
|
if *name == method {
|
||
|
|
return Ok(handler(params));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
Err("NOT_FOUND")
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Try to parse `text` as an [`RpcFrame::RpcRequest`] and, if successful,
|
||
|
|
/// dispatch it and return the corresponding [`RpcFrame::RpcResponse`].
|
||
|
|
///
|
||
|
|
/// Returns `None` if the text is not a valid `rpc_request` frame (i.e. it
|
||
|
|
/// should be forwarded to the CRDT sync handler instead).
|
||
|
|
pub(super) fn try_handle_rpc_text(text: &str) -> Option<RpcFrame> {
|
||
|
|
let frame: RpcFrame = serde_json::from_str(text).ok()?;
|
||
|
|
match frame {
|
||
|
|
RpcFrame::RpcRequest {
|
||
|
|
version,
|
||
|
|
correlation_id,
|
||
|
|
method,
|
||
|
|
params,
|
||
|
|
..
|
||
|
|
} => {
|
||
|
|
let response = match dispatch(&method, params) {
|
||
|
|
Ok(result) => RpcFrame::RpcResponse {
|
||
|
|
version,
|
||
|
|
correlation_id,
|
||
|
|
ok: true,
|
||
|
|
result: Some(result),
|
||
|
|
error: None,
|
||
|
|
code: None,
|
||
|
|
},
|
||
|
|
Err(code) => RpcFrame::RpcResponse {
|
||
|
|
version,
|
||
|
|
correlation_id,
|
||
|
|
ok: false,
|
||
|
|
result: None,
|
||
|
|
error: Some(format!("unknown method: {method}")),
|
||
|
|
code: Some(code.to_string()),
|
||
|
|
},
|
||
|
|
};
|
||
|
|
Some(response)
|
||
|
|
}
|
||
|
|
// Response frames arriving on our socket are not requests — nothing to send back.
|
||
|
|
RpcFrame::RpcResponse { .. } => None,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
#[cfg(test)]
|
||
|
|
mod tests {
|
||
|
|
use super::*;
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn health_check_returns_ok_status() {
|
||
|
|
let result = dispatch("health.check", serde_json::json!({}));
|
||
|
|
assert!(result.is_ok());
|
||
|
|
let val = result.unwrap();
|
||
|
|
assert_eq!(val["status"], "ok");
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn unknown_method_returns_not_found() {
|
||
|
|
let result = dispatch("nonexistent.method", serde_json::json!({}));
|
||
|
|
assert!(result.is_err());
|
||
|
|
assert_eq!(result.unwrap_err(), "NOT_FOUND");
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn try_handle_rpc_text_health_check() {
|
||
|
|
let req = serde_json::json!({
|
||
|
|
"kind": "rpc_request",
|
||
|
|
"version": 1,
|
||
|
|
"correlation_id": "test-corr-1",
|
||
|
|
"ttl_ms": 5000,
|
||
|
|
"method": "health.check",
|
||
|
|
"params": {}
|
||
|
|
});
|
||
|
|
let text = serde_json::to_string(&req).unwrap();
|
||
|
|
let resp = try_handle_rpc_text(&text).expect("must produce a response");
|
||
|
|
match resp {
|
||
|
|
RpcFrame::RpcResponse {
|
||
|
|
ok,
|
||
|
|
correlation_id,
|
||
|
|
result,
|
||
|
|
code,
|
||
|
|
..
|
||
|
|
} => {
|
||
|
|
assert!(ok, "health.check must succeed");
|
||
|
|
assert_eq!(correlation_id, "test-corr-1");
|
||
|
|
assert!(result.is_some());
|
||
|
|
assert_eq!(result.unwrap()["status"], "ok");
|
||
|
|
assert!(code.is_none());
|
||
|
|
}
|
||
|
|
_ => panic!("Expected RpcResponse"),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn try_handle_rpc_text_unknown_method_returns_not_found() {
|
||
|
|
let req = serde_json::json!({
|
||
|
|
"kind": "rpc_request",
|
||
|
|
"version": 1,
|
||
|
|
"correlation_id": "test-corr-2",
|
||
|
|
"ttl_ms": 1000,
|
||
|
|
"method": "no.such.method",
|
||
|
|
"params": {}
|
||
|
|
});
|
||
|
|
let text = serde_json::to_string(&req).unwrap();
|
||
|
|
let resp = try_handle_rpc_text(&text).expect("must produce a response for unknown method");
|
||
|
|
match resp {
|
||
|
|
RpcFrame::RpcResponse { ok, code, .. } => {
|
||
|
|
assert!(!ok, "unknown method must not succeed");
|
||
|
|
assert_eq!(code.unwrap(), "NOT_FOUND");
|
||
|
|
}
|
||
|
|
_ => panic!("Expected RpcResponse"),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn try_handle_rpc_text_ignores_non_rpc_frames() {
|
||
|
|
// A SyncMessage::Bulk frame must not be treated as an RPC request.
|
||
|
|
let bulk = r#"{"type":"bulk","ops":[]}"#;
|
||
|
|
assert!(try_handle_rpc_text(bulk).is_none());
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn try_handle_rpc_text_ignores_rpc_response_frames() {
|
||
|
|
// An incoming rpc_response (e.g. reply to our own outbound request) must
|
||
|
|
// not trigger a further response.
|
||
|
|
let resp = serde_json::json!({
|
||
|
|
"kind": "rpc_response",
|
||
|
|
"version": 1,
|
||
|
|
"correlation_id": "x",
|
||
|
|
"ok": true,
|
||
|
|
"result": {"status": "ok"}
|
||
|
|
});
|
||
|
|
let text = serde_json::to_string(&resp).unwrap();
|
||
|
|
assert!(try_handle_rpc_text(&text).is_none());
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn try_handle_rpc_text_ignores_invalid_json() {
|
||
|
|
assert!(try_handle_rpc_text("not json at all").is_none());
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn rpc_response_correlation_id_mirrors_request() {
|
||
|
|
let req = serde_json::json!({
|
||
|
|
"kind": "rpc_request",
|
||
|
|
"version": 1,
|
||
|
|
"correlation_id": "mirror-me",
|
||
|
|
"ttl_ms": 500,
|
||
|
|
"method": "health.check",
|
||
|
|
"params": {}
|
||
|
|
});
|
||
|
|
let text = serde_json::to_string(&req).unwrap();
|
||
|
|
let resp = try_handle_rpc_text(&text).unwrap();
|
||
|
|
match resp {
|
||
|
|
RpcFrame::RpcResponse { correlation_id, .. } => {
|
||
|
|
assert_eq!(correlation_id, "mirror-me");
|
||
|
|
}
|
||
|
|
_ => panic!("Expected RpcResponse"),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|