server/rwhod: varlink shenanigans
Some checks failed
Build and test / check (push) Failing after 1m19s
Build and test / build (push) Successful in 1m24s
Build and test / test (push) Successful in 2m47s
Build and test / docs (push) Successful in 3m2s

This commit is contained in:
2026-01-05 16:48:06 +09:00
parent 4f78b1ed1e
commit 0defac7a9f
6 changed files with 448 additions and 12 deletions

View File

@@ -4,8 +4,10 @@ use std::{
sync::Arc,
};
use anyhow::Context;
use roowho2_lib::server::rwhod::{
RWHOD_BROADCAST_PORT, rwhod_packet_receiver_task, rwhod_packet_sender_task,
RWHOD_BROADCAST_PORT, rwhod_client_server_task, rwhod_packet_receiver_task,
rwhod_packet_sender_task,
};
use tokio::sync::RwLock;
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
@@ -18,15 +20,26 @@ async fn main() -> anyhow::Result<()> {
.init();
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, RWHOD_BROADCAST_PORT);
tracing::info!("Binding RWHOD socket to {}", addr);
let socket = Arc::new(tokio::net::UdpSocket::bind(addr).await?);
socket.set_broadcast(true)?;
tracing::debug!("Binding RWHOD socket to {}", addr);
let socket = tokio::net::UdpSocket::bind(addr)
.await
.context("Failed to bind RWHOD UDP socket")
.and_then(|socket| {
socket.set_broadcast(true)?;
Ok(socket)
})
.context("Failed to enable broadcast on RWHOD UDP socket")
.map(Arc::new)?;
let interfaces = roowho2_lib::server::rwhod::determine_relevant_interfaces()?;
let sender_task = rwhod_packet_sender_task(socket.clone(), interfaces);
let status_store = Arc::new(RwLock::new(HashMap::new()));
let receiver_task = rwhod_packet_receiver_task(socket.clone(), status_store);
let receiver_task = rwhod_packet_receiver_task(socket.clone(), status_store.clone());
tracing::debug!("Binding RWHOD client-server socket at /run/roowho2/rwhod.socket");
let client_server_socket = zlink::unix::bind("/run/roowho2/rwhod.varlink")?;
let client_server_task = rwhod_client_server_task(client_server_socket, status_store.clone());
tokio::select! {
res = sender_task => {
@@ -39,6 +52,11 @@ async fn main() -> anyhow::Result<()> {
eprintln!("RWHOD receiver task error: {}", err);
}
}
res = client_server_task => {
if let Err(err) = res {
eprintln!("RWHOD client-server task error: {}", err);
}
}
_ = tokio::signal::ctrl_c() => {
println!("Received Ctrl-C, shutting down.");
}

View File

@@ -1,4 +1,5 @@
use clap::Parser;
use roowho2_lib::server::rwhod::RwhodClientProxy;
/// Check who is logged in on local machines.
///
@@ -20,7 +21,18 @@ pub struct Args {
json: bool,
}
fn main() {
let _args = Args::parse();
unimplemented!()
#[tokio::main]
async fn main() {
let args = Args::parse();
let mut conn = zlink::unix::connect("/run/roowho2/rwhod.varlink")
.await
.expect("Failed to connect to rwhod server");
let reply = conn
.rwho(args.all)
.await
.expect("Failed to send rwho request");
println!("{:?}", reply);
}

View File

@@ -1,5 +1,6 @@
use std::array;
use serde::{Deserialize, Serialize};
use bytes::{Buf, BufMut, BytesMut};
use chrono::{DateTime, Duration, Utc};
@@ -239,7 +240,7 @@ pub type LoadAverage = (i32, i32, i32);
///
/// This struct is intended for easier use in Rust code, with proper types and dynamic arrays.
/// It can be converted to and from the low-level [`Whod`] struct used for network transmission.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WhodStatusUpdate {
// NOTE: there is only one defined packet type, so we just omit it here
/// Timestamp by sender
@@ -285,7 +286,7 @@ impl WhodStatusUpdate {
///
/// This struct is intended for easier use in Rust code, with proper types.
/// It can be converted to and from the low-level [`Whoent`] struct used for network transmission.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WhodUserEntry {
/// TTY name (max 8 characters)
pub tty: String,

View File

@@ -8,7 +8,9 @@ use std::{
use anyhow::Context;
use chrono::{DateTime, Duration, Timelike, Utc};
use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat};
use serde::{Deserialize, Serialize};
use uucore::utmpx::Utmpx;
use zlink::{ReplyError, service::MethodReply};
use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry};
@@ -229,4 +231,100 @@ pub async fn rwhod_packet_sender_task(
}
}
// TODO: implement protocol for cli - daemon communication
#[zlink::proxy("no.ntnu.pvv.roowho2.rwhod")]
pub trait RwhodClientProxy {
async fn rwho(
&mut self,
all: bool,
) -> zlink::Result<Result<Vec<WhodUserEntry>, RwhodClientError>>;
async fn ruptime(&mut self) -> zlink::Result<Result<Vec<WhodStatusUpdate>, RwhodClientError>>;
}
#[derive(Debug, Deserialize)]
#[serde(tag = "method", content = "parameters")]
pub enum RwhodClientRequest {
#[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Rwho")]
Rwho {
/// Retrieve all users, even those that have been idle for a long time.
all: bool,
},
#[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Ruptime")]
Ruptime,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum RwhodClientResponse {
Rwho(Vec<WhodUserEntry>),
Ruptime(Vec<WhodStatusUpdate>),
}
#[derive(Debug, ReplyError)]
#[zlink(interface = "no.ntnu.pvv.roowho2.rwhod")]
pub enum RwhodClientError {
InvalidRequest,
}
#[derive(Debug, Clone)]
pub struct RwhodClientServer {
whod_status_store: Arc<tokio::sync::RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
}
impl<'a> RwhodClientServer {
pub fn new(
whod_status_store: Arc<tokio::sync::RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
) -> Self {
Self { whod_status_store }
}
}
impl zlink::Service for RwhodClientServer {
type MethodCall<'de> = RwhodClientRequest;
type ReplyParams<'se> = RwhodClientResponse;
type ReplyStreamParams = ();
type ReplyStream = futures_util::stream::Empty<zlink::Reply<()>>;
type ReplyError<'se> = RwhodClientError;
async fn handle<'ser, 'de: 'ser, Sock: zlink::connection::Socket>(
&'ser mut self,
call: zlink::Call<Self::MethodCall<'de>>,
_conn: &mut zlink::Connection<Sock>,
) -> MethodReply<Self::ReplyParams<'ser>, Self::ReplyStream, Self::ReplyError<'ser>> {
match call.method() {
// TODO: handle 'all' parameter
RwhodClientRequest::Rwho { .. } => {
let store = self.whod_status_store.read().await;
let mut all_user_entries = Vec::new();
for status_update in store.values() {
all_user_entries.extend_from_slice(&status_update.users);
}
MethodReply::Single(Some(RwhodClientResponse::Rwho(all_user_entries)))
}
RwhodClientRequest::Ruptime => {
let store = self.whod_status_store.read().await;
let all_status_updates = store.values().cloned().collect();
MethodReply::Single(Some(RwhodClientResponse::Ruptime(all_status_updates)))
}
}
}
}
pub async fn rwhod_client_server_task(
socket: zlink::unix::Listener,
whod_status_store: Arc<tokio::sync::RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
) -> anyhow::Result<()> {
let service = RwhodClientServer::new(whod_status_store);
let server = zlink::Server::new(socket, service);
tracing::info!("Starting Rwhod client API server");
server
.run()
.await
.context("Rwhod client API server failed")?;
Ok(())
}