From 0defac7a9ffa3c76b9eeb4a61c45f06448ab59ef Mon Sep 17 00:00:00 2001 From: h7x4 Date: Mon, 5 Jan 2026 16:48:06 +0900 Subject: [PATCH] server/rwhod: varlink shenanigans --- Cargo.lock | 305 ++++++++++++++++++++++++++++++++++++ Cargo.toml | 4 +- src/bin/roowhod.rs | 28 +++- src/bin/rwho.rs | 18 ++- src/proto/rwhod_protocol.rs | 5 +- src/server/rwhod.rs | 100 +++++++++++- 6 files changed, 448 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96ecdf6..939d05d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,48 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +[[package]] +name = "async-broadcast" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-io" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456b8a8feb6f42d237746d4b3e9a178494627745c3c56c6ea55d92ba50d026fc" +dependencies = [ + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "windows-sys 0.61.2", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -183,12 +225,27 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "deranged" version = "0.5.5" @@ -231,6 +288,33 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "find-msvc-tools" version = "0.1.6" @@ -282,6 +366,68 @@ dependencies = [ "thiserror", ] +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "glob" version = "0.3.3" @@ -294,6 +440,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "iana-time-zone" version = "0.1.64" @@ -526,6 +678,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "phf" version = "0.13.1" @@ -551,6 +709,26 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "polling" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi", + "pin-project-lite", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "portable-atomic" version = "1.13.0" @@ -615,11 +793,14 @@ dependencies = [ "bytes", "chrono", "clap", + "futures-util", "nix", + "serde", "tokio", "tracing", "tracing-subscriber", "uucore", + "zlink", ] [[package]] @@ -647,6 +828,12 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "ryu" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" + [[package]] name = "self_cell" version = "1.2.2" @@ -683,6 +870,19 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.148" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3084b546a1dd6289475996f182a22aba973866ea8e8b02c51d9f46b1336a22da" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -714,6 +914,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + [[package]] name = "smallvec" version = "1.15.1" @@ -836,12 +1042,14 @@ version = "1.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86" dependencies = [ + "bytes", "libc", "mio", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.61.2", ] @@ -856,6 +1064,31 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-util" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tracing" version = "0.1.44" @@ -1218,3 +1451,75 @@ dependencies = [ "serde", "zerofrom", ] + +[[package]] +name = "zlink" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04baab6c44f6c5f33dd26dffabe2c6473d9a93a080c3424865df068d4e76f58a" +dependencies = [ + "zlink-smol", + "zlink-tokio", +] + +[[package]] +name = "zlink-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487e09febc08bcbac32cee2c26e85779351f711146db0176a3f61a3fea1c955f" +dependencies = [ + "futures-util", + "itoa", + "libc", + "pin-project-lite", + "rustix", + "ryu", + "serde", + "serde_json", + "tracing", + "zlink-macros", +] + +[[package]] +name = "zlink-macros" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6a10a1ed09222634dde2db7055226eafae571389ce57dae3707bd2a76f6dc7b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zlink-smol" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72ec157812fcde1a3f45fea27db5a6fa1868ecbf5e11644166ab4caf5e3546dd" +dependencies = [ + "async-broadcast", + "async-channel", + "async-io", + "futures-lite", + "futures-util", + "pin-project-lite", + "zlink-core", +] + +[[package]] +name = "zlink-tokio" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ab6f490a817bcbbc67d82b22427f1fafd9764da04a6b9499cf73e7ef68f4482" +dependencies = [ + "futures-util", + "tokio", + "tokio-stream", + "zlink-core", +] + +[[package]] +name = "zmij" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30e0d8dffbae3d840f64bda38e28391faef673a7b5a6017840f2a106c8145868" diff --git a/Cargo.toml b/Cargo.toml index 6918f5f..685fd30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,15 +18,17 @@ anyhow = "1.0.100" bytes = "1.11.0" chrono = { version = "0.4.42", features = ["serde"] } clap = { version = "4.5.53", features = ["derive"] } +futures-util = "0.3.31" nix = { version = "0.30.1", features = ["hostname", "net"] } +serde = { version = "1.0.228", features = ["derive"] } tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread", "signal", "sync", "time"] } tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } # onc-rpc = "0.3.2" # sd-notify = "0.4.5" -# serde = { version = "1.0.228", features = ["derive"] } # serde_json = "1.0.148" uucore = { version = "0.5.0", features = ["utmpx"] } +zlink = { version = "0.2.0", features = ["introspection"] } [lib] name = "roowho2_lib" diff --git a/src/bin/roowhod.rs b/src/bin/roowhod.rs index dc845d0..6b9ee27 100644 --- a/src/bin/roowhod.rs +++ b/src/bin/roowhod.rs @@ -4,8 +4,10 @@ use std::{ sync::Arc, }; +use anyhow::Context; use roowho2_lib::server::rwhod::{ - RWHOD_BROADCAST_PORT, rwhod_packet_receiver_task, rwhod_packet_sender_task, + RWHOD_BROADCAST_PORT, rwhod_client_server_task, rwhod_packet_receiver_task, + rwhod_packet_sender_task, }; use tokio::sync::RwLock; use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt}; @@ -18,15 +20,26 @@ async fn main() -> anyhow::Result<()> { .init(); let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, RWHOD_BROADCAST_PORT); - tracing::info!("Binding RWHOD socket to {}", addr); - let socket = Arc::new(tokio::net::UdpSocket::bind(addr).await?); - socket.set_broadcast(true)?; + tracing::debug!("Binding RWHOD socket to {}", addr); + let socket = tokio::net::UdpSocket::bind(addr) + .await + .context("Failed to bind RWHOD UDP socket") + .and_then(|socket| { + socket.set_broadcast(true)?; + Ok(socket) + }) + .context("Failed to enable broadcast on RWHOD UDP socket") + .map(Arc::new)?; let interfaces = roowho2_lib::server::rwhod::determine_relevant_interfaces()?; let sender_task = rwhod_packet_sender_task(socket.clone(), interfaces); let status_store = Arc::new(RwLock::new(HashMap::new())); - let receiver_task = rwhod_packet_receiver_task(socket.clone(), status_store); + let receiver_task = rwhod_packet_receiver_task(socket.clone(), status_store.clone()); + + tracing::debug!("Binding RWHOD client-server socket at /run/roowho2/rwhod.socket"); + let client_server_socket = zlink::unix::bind("/run/roowho2/rwhod.varlink")?; + let client_server_task = rwhod_client_server_task(client_server_socket, status_store.clone()); tokio::select! { res = sender_task => { @@ -39,6 +52,11 @@ async fn main() -> anyhow::Result<()> { eprintln!("RWHOD receiver task error: {}", err); } } + res = client_server_task => { + if let Err(err) = res { + eprintln!("RWHOD client-server task error: {}", err); + } + } _ = tokio::signal::ctrl_c() => { println!("Received Ctrl-C, shutting down."); } diff --git a/src/bin/rwho.rs b/src/bin/rwho.rs index fec79be..3aab587 100644 --- a/src/bin/rwho.rs +++ b/src/bin/rwho.rs @@ -1,4 +1,5 @@ use clap::Parser; +use roowho2_lib::server::rwhod::RwhodClientProxy; /// Check who is logged in on local machines. /// @@ -20,7 +21,18 @@ pub struct Args { json: bool, } -fn main() { - let _args = Args::parse(); - unimplemented!() +#[tokio::main] +async fn main() { + let args = Args::parse(); + + let mut conn = zlink::unix::connect("/run/roowho2/rwhod.varlink") + .await + .expect("Failed to connect to rwhod server"); + + let reply = conn + .rwho(args.all) + .await + .expect("Failed to send rwho request"); + + println!("{:?}", reply); } diff --git a/src/proto/rwhod_protocol.rs b/src/proto/rwhod_protocol.rs index 9f0f28c..880867c 100644 --- a/src/proto/rwhod_protocol.rs +++ b/src/proto/rwhod_protocol.rs @@ -1,5 +1,6 @@ use std::array; +use serde::{Deserialize, Serialize}; use bytes::{Buf, BufMut, BytesMut}; use chrono::{DateTime, Duration, Utc}; @@ -239,7 +240,7 @@ pub type LoadAverage = (i32, i32, i32); /// /// This struct is intended for easier use in Rust code, with proper types and dynamic arrays. /// It can be converted to and from the low-level [`Whod`] struct used for network transmission. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct WhodStatusUpdate { // NOTE: there is only one defined packet type, so we just omit it here /// Timestamp by sender @@ -285,7 +286,7 @@ impl WhodStatusUpdate { /// /// This struct is intended for easier use in Rust code, with proper types. /// It can be converted to and from the low-level [`Whoent`] struct used for network transmission. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct WhodUserEntry { /// TTY name (max 8 characters) pub tty: String, diff --git a/src/server/rwhod.rs b/src/server/rwhod.rs index 6f8baeb..4af5a4b 100644 --- a/src/server/rwhod.rs +++ b/src/server/rwhod.rs @@ -8,7 +8,9 @@ use std::{ use anyhow::Context; use chrono::{DateTime, Duration, Timelike, Utc}; use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat}; +use serde::{Deserialize, Serialize}; use uucore::utmpx::Utmpx; +use zlink::{ReplyError, service::MethodReply}; use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry}; @@ -229,4 +231,100 @@ pub async fn rwhod_packet_sender_task( } } -// TODO: implement protocol for cli - daemon communication +#[zlink::proxy("no.ntnu.pvv.roowho2.rwhod")] +pub trait RwhodClientProxy { + async fn rwho( + &mut self, + all: bool, + ) -> zlink::Result, RwhodClientError>>; + + async fn ruptime(&mut self) -> zlink::Result, RwhodClientError>>; +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "method", content = "parameters")] +pub enum RwhodClientRequest { + #[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Rwho")] + Rwho { + /// Retrieve all users, even those that have been idle for a long time. + all: bool, + }, + + #[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Ruptime")] + Ruptime, +} + +#[derive(Debug, Serialize)] +#[serde(untagged)] +pub enum RwhodClientResponse { + Rwho(Vec), + Ruptime(Vec), +} + +#[derive(Debug, ReplyError)] +#[zlink(interface = "no.ntnu.pvv.roowho2.rwhod")] +pub enum RwhodClientError { + InvalidRequest, +} + +#[derive(Debug, Clone)] +pub struct RwhodClientServer { + whod_status_store: Arc>>, +} + +impl<'a> RwhodClientServer { + pub fn new( + whod_status_store: Arc>>, + ) -> Self { + Self { whod_status_store } + } +} + +impl zlink::Service for RwhodClientServer { + type MethodCall<'de> = RwhodClientRequest; + type ReplyParams<'se> = RwhodClientResponse; + type ReplyStreamParams = (); + type ReplyStream = futures_util::stream::Empty>; + type ReplyError<'se> = RwhodClientError; + + async fn handle<'ser, 'de: 'ser, Sock: zlink::connection::Socket>( + &'ser mut self, + call: zlink::Call>, + _conn: &mut zlink::Connection, + ) -> MethodReply, Self::ReplyStream, Self::ReplyError<'ser>> { + match call.method() { + // TODO: handle 'all' parameter + RwhodClientRequest::Rwho { .. } => { + let store = self.whod_status_store.read().await; + let mut all_user_entries = Vec::new(); + for status_update in store.values() { + all_user_entries.extend_from_slice(&status_update.users); + } + MethodReply::Single(Some(RwhodClientResponse::Rwho(all_user_entries))) + } + RwhodClientRequest::Ruptime => { + let store = self.whod_status_store.read().await; + let all_status_updates = store.values().cloned().collect(); + MethodReply::Single(Some(RwhodClientResponse::Ruptime(all_status_updates))) + } + } + } +} + +pub async fn rwhod_client_server_task( + socket: zlink::unix::Listener, + whod_status_store: Arc>>, +) -> anyhow::Result<()> { + let service = RwhodClientServer::new(whod_status_store); + + let server = zlink::Server::new(socket, service); + + tracing::info!("Starting Rwhod client API server"); + + server + .run() + .await + .context("Rwhod client API server failed")?; + + Ok(()) +}