From 6c3a9e9e12a54483b8249930ebd289c17ed7b21f Mon Sep 17 00:00:00 2001 From: h7x4 Date: Fri, 9 Jan 2026 04:58:20 +0900 Subject: [PATCH] server: deduplicate rwhod entries by hostname, misc cleanup --- src/bin/roowhod.rs | 17 ++++----- src/server/rwhod.rs | 36 ++++++++++++------ src/server/varlink_api.rs | 79 +++++++++++++++++++++------------------ 3 files changed, 73 insertions(+), 59 deletions(-) diff --git a/src/bin/roowhod.rs b/src/bin/roowhod.rs index dd8bfd3..b197fed 100644 --- a/src/bin/roowhod.rs +++ b/src/bin/roowhod.rs @@ -1,6 +1,5 @@ use std::{ collections::HashMap, - net::IpAddr, os::fd::{AsRawFd, FromRawFd, OwnedFd}, path::PathBuf, sync::Arc, @@ -8,16 +7,14 @@ use std::{ use anyhow::Context; use clap::Parser; -use roowho2_lib::{ - proto::WhodStatusUpdate, - server::{ - rwhod::{rwhod_packet_receiver_task, rwhod_packet_sender_task}, - varlink_api::varlink_client_server_task, - }, -}; use tokio::{net::UdpSocket, sync::RwLock}; use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt}; +use roowho2_lib::server::{ + rwhod::{RwhodStatusStore, rwhod_packet_receiver_task, rwhod_packet_sender_task}, + varlink_api::varlink_client_server_task, +}; + #[derive(Parser)] #[command( author = "Programvareverkstedet ", @@ -106,7 +103,7 @@ async fn ctrl_c_handler() -> anyhow::Result<()> { async fn rwhod_server( socket: UdpSocket, - whod_status_store: Arc>>, + whod_status_store: RwhodStatusStore, ) -> anyhow::Result<()> { let socket = Arc::new(socket); @@ -125,7 +122,7 @@ async fn rwhod_server( async fn client_server( socket_fd: OwnedFd, - whod_status_store: Arc>>, + whod_status_store: RwhodStatusStore, ) -> anyhow::Result<()> { // SAFETY: see above let std_socket = diff --git a/src/server/rwhod.rs b/src/server/rwhod.rs index 2585741..f614264 100644 --- a/src/server/rwhod.rs +++ b/src/server/rwhod.rs @@ -1,13 +1,23 @@ use std::{ collections::{HashMap, HashSet}, - net::IpAddr, + net::{IpAddr, SocketAddr}, path::Path, sync::Arc, }; use anyhow::Context; use chrono::{DateTime, Duration, Timelike, Utc}; -use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat}; +use nix::{ + ifaddrs::getifaddrs, + net::if_::InterfaceFlags, + sys::{stat::stat, sysinfo::sysinfo}, + unistd::gethostname, +}; +use tokio::{ + net::UdpSocket, + sync::RwLock, + time::{Duration as TokioDuration, interval}, +}; use uucore::utmpx::Utmpx; use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry}; @@ -15,6 +25,8 @@ use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry}; /// Default port for rwhod communication. pub const RWHOD_BROADCAST_PORT: u16 = 513; +pub type RwhodStatusStore = Arc>>; + /// Reads utmp entries to determine currently logged-in users. pub fn generate_rwhod_user_entries(now: DateTime) -> anyhow::Result> { Utmpx::iter_all_records() @@ -51,10 +63,10 @@ pub fn generate_rwhod_user_entries(now: DateTime) -> anyhow::Result anyhow::Result { - let sysinfo = nix::sys::sysinfo::sysinfo().unwrap(); + let sysinfo = sysinfo().unwrap(); let load_average = sysinfo.load_average(); let uptime = sysinfo.uptime(); - let hostname = nix::unistd::gethostname()?.to_str().unwrap().to_string(); + let hostname = gethostname()?.to_str().unwrap().to_string(); let now = Utc::now().with_nanosecond(0).unwrap_or(Utc::now()); let result = WhodStatusUpdate::new( @@ -130,7 +142,7 @@ pub fn determine_relevant_interfaces() -> anyhow::Result> { } pub async fn send_rwhod_packet_to_interface( - socket: Arc, + socket: Arc, interface: &RwhodSendTarget, packet: &Whod, ) -> anyhow::Result<()> { @@ -138,8 +150,8 @@ pub async fn send_rwhod_packet_to_interface( // TODO: the old rwhod daemon doesn't actually ever listen to ipv6, maybe remove it let target_addr = match interface.addr { - IpAddr::V4(addr) => std::net::SocketAddr::new(IpAddr::V4(addr), RWHOD_BROADCAST_PORT), - IpAddr::V6(addr) => std::net::SocketAddr::new(IpAddr::V6(addr), RWHOD_BROADCAST_PORT), + IpAddr::V4(addr) => SocketAddr::new(IpAddr::V4(addr), RWHOD_BROADCAST_PORT), + IpAddr::V6(addr) => SocketAddr::new(IpAddr::V6(addr), RWHOD_BROADCAST_PORT), }; tracing::debug!( @@ -157,8 +169,8 @@ pub async fn send_rwhod_packet_to_interface( } pub async fn rwhod_packet_receiver_task( - socket: Arc, - whod_status_store: Arc>>, + socket: Arc, + whod_status_store: RwhodStatusStore, ) -> anyhow::Result<()> { let mut buf = [0u8; Whod::MAX_SIZE]; @@ -190,7 +202,7 @@ pub async fn rwhod_packet_receiver_task( tracing::debug!("Processed whod packet from {src}: {:?}", status_update); let mut store = whod_status_store.write().await; - store.insert(src.ip(), status_update); + store.insert(status_update.hostname.clone(), status_update); } Err(err) => { tracing::error!("Error processing whod packet from {src}: {err}"); @@ -200,10 +212,10 @@ pub async fn rwhod_packet_receiver_task( } pub async fn rwhod_packet_sender_task( - socket: Arc, + socket: Arc, interfaces: Vec, ) -> anyhow::Result<()> { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); + let mut interval = interval(TokioDuration::from_secs(60)); loop { interval.tick().await; diff --git a/src/server/varlink_api.rs b/src/server/varlink_api.rs index 1895b35..f007536 100644 --- a/src/server/varlink_api.rs +++ b/src/server/varlink_api.rs @@ -1,15 +1,11 @@ -use std::{ - collections::HashMap, - net::IpAddr, - sync::Arc, -}; - use anyhow::Context; use serde::{Deserialize, Serialize}; -use tokio::sync::RwLock; use zlink::{ReplyError, service::MethodReply}; -use crate::proto::{WhodStatusUpdate, WhodUserEntry}; +use crate::{ + proto::{WhodStatusUpdate, WhodUserEntry}, + server::rwhod::RwhodStatusStore, +}; #[zlink::proxy("no.ntnu.pvv.roowho2.rwhod")] pub trait RwhodClientProxy { @@ -51,19 +47,42 @@ pub enum RwhodClientError { } #[derive(Debug, Clone)] -pub struct RwhodClientServer { - whod_status_store: Arc>>, +pub struct Roowhoo2ClientServer { + whod_status_store: RwhodStatusStore, } -impl RwhodClientServer { - pub fn new( - whod_status_store: Arc>>, - ) -> Self { +impl Roowhoo2ClientServer { + pub fn new(whod_status_store: RwhodStatusStore) -> Self { Self { whod_status_store } } } -impl zlink::Service for RwhodClientServer { +impl Roowhoo2ClientServer { + // TODO: handle 'all' parameter + async fn handle_rwho_request(&self, _all: bool) -> RwhoResponse { + let store = self.whod_status_store.read().await; + + let mut all_user_entries = Vec::with_capacity(store.len()); + for status_update in store.values() { + all_user_entries.extend_from_slice( + &status_update + .users + .iter() + .map(|user| (status_update.hostname.clone(), user.clone())) + .collect::>(), + ); + } + + all_user_entries + } + + async fn handle_ruptime_request(&self) -> RuptimeResponse { + let store = self.whod_status_store.read().await; + store.values().cloned().collect() + } +} + +impl zlink::Service for Roowhoo2ClientServer { type MethodCall<'de> = RwhodClientRequest; type ReplyParams<'se> = RwhodClientResponse; type ReplyStreamParams = (); @@ -76,35 +95,21 @@ impl zlink::Service for RwhodClientServer { _conn: &mut zlink::Connection, ) -> MethodReply, Self::ReplyStream, Self::ReplyError<'ser>> { match call.method() { - // TODO: handle 'all' parameter - RwhodClientRequest::Rwho { .. } => { - let store = self.whod_status_store.read().await; - let mut all_user_entries = Vec::new(); - for status_update in store.values() { - all_user_entries.extend_from_slice( - &status_update - .users - .iter() - .map(|user| (status_update.hostname.clone(), user.clone())) - .collect::>(), - ); - } - MethodReply::Single(Some(RwhodClientResponse::Rwho(all_user_entries))) - } - RwhodClientRequest::Ruptime => { - let store = self.whod_status_store.read().await; - let all_status_updates = store.values().cloned().collect(); - MethodReply::Single(Some(RwhodClientResponse::Ruptime(all_status_updates))) - } + RwhodClientRequest::Rwho { all } => MethodReply::Single(Some( + RwhodClientResponse::Rwho(self.handle_rwho_request(*all).await), + )), + RwhodClientRequest::Ruptime => MethodReply::Single(Some(RwhodClientResponse::Ruptime( + self.handle_ruptime_request().await, + ))), } } } pub async fn varlink_client_server_task( socket: zlink::unix::Listener, - whod_status_store: Arc>>, + whod_status_store: RwhodStatusStore, ) -> anyhow::Result<()> { - let service = RwhodClientServer::new(whod_status_store); + let service = Roowhoo2ClientServer::new(whod_status_store); let server = zlink::Server::new(socket, service);