server: split off varlink-related code to separate module
Some checks failed
Build and test / check (push) Failing after 40s
Build and test / build (push) Successful in 1m37s
Build and test / test (push) Successful in 1m56s
Build and test / docs (push) Successful in 3m47s

This commit is contained in:
2026-01-09 04:47:19 +09:00
parent def1ff330c
commit dfa0f5a406
6 changed files with 127 additions and 112 deletions

View File

@@ -10,8 +10,9 @@ use anyhow::Context;
use clap::Parser;
use roowho2_lib::{
proto::WhodStatusUpdate,
server::rwhod::{
rwhod_client_server_task, rwhod_packet_receiver_task, rwhod_packet_sender_task,
server::{
rwhod::{rwhod_packet_receiver_task, rwhod_packet_sender_task},
varlink_api::varlink_client_server_task,
},
};
use tokio::{net::UdpSocket, sync::RwLock};
@@ -131,7 +132,7 @@ async fn client_server(
unsafe { std::os::unix::net::UnixListener::from_raw_fd(socket_fd.as_raw_fd()) };
std_socket.set_nonblocking(true)?;
let zlink_listener = zlink::unix::Listener::try_from(OwnedFd::from(std_socket))?;
let client_server_task = rwhod_client_server_task(zlink_listener, whod_status_store);
let client_server_task = varlink_client_server_task(zlink_listener, whod_status_store);
client_server_task.await?;

View File

@@ -2,7 +2,7 @@ use anyhow::Context;
use chrono::{Duration, Utc};
use clap::Parser;
use roowho2_lib::{proto::WhodStatusUpdate, server::rwhod::RwhodClientProxy};
use roowho2_lib::{proto::WhodStatusUpdate, server::varlink_api::RwhodClientProxy};
/// Show host status of local machines.
///

View File

@@ -1,6 +1,6 @@
use anyhow::Context;
use clap::Parser;
use roowho2_lib::{proto::WhodUserEntry, server::rwhod::RwhodClientProxy};
use roowho2_lib::{proto::WhodUserEntry, server::varlink_api::RwhodClientProxy};
/// Check who is logged in on local machines.
///

View File

@@ -1,2 +1,3 @@
pub mod config;
pub mod rwhod;
pub mod varlink_api;

View File

@@ -8,9 +8,7 @@ use std::{
use anyhow::Context;
use chrono::{DateTime, Duration, Timelike, Utc};
use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat};
use serde::{Deserialize, Serialize};
use uucore::utmpx::Utmpx;
use zlink::{ReplyError, service::MethodReply};
use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry};
@@ -119,7 +117,7 @@ pub fn determine_relevant_interfaces() -> anyhow::Result<Vec<RwhodSendTarget>> {
None => None,
}
})
/* keep first occurrence per interface name */
// keep first occurrence per interface name
.scan(HashSet::new(), |seen, n| {
if seen.insert(n.name.clone()) {
Some(n)
@@ -230,107 +228,3 @@ pub async fn rwhod_packet_sender_task(
}
}
}
#[zlink::proxy("no.ntnu.pvv.roowho2.rwhod")]
pub trait RwhodClientProxy {
async fn rwho(
&mut self,
all: bool,
) -> zlink::Result<Result<Vec<(String, WhodUserEntry)>, RwhodClientError>>;
async fn ruptime(&mut self) -> zlink::Result<Result<Vec<WhodStatusUpdate>, RwhodClientError>>;
}
#[derive(Debug, Deserialize)]
#[serde(tag = "method", content = "parameters")]
pub enum RwhodClientRequest {
#[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Rwho")]
Rwho {
/// Retrieve all users, even those that have been idle for a long time.
all: bool,
},
#[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Ruptime")]
Ruptime,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum RwhodClientResponse {
Rwho(Vec<(String, WhodUserEntry)>),
Ruptime(Vec<WhodStatusUpdate>),
}
#[derive(Debug, ReplyError)]
#[zlink(interface = "no.ntnu.pvv.roowho2.rwhod")]
pub enum RwhodClientError {
InvalidRequest,
}
#[derive(Debug, Clone)]
pub struct RwhodClientServer {
whod_status_store: Arc<tokio::sync::RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
}
impl RwhodClientServer {
pub fn new(
whod_status_store: Arc<tokio::sync::RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
) -> Self {
Self { whod_status_store }
}
}
impl zlink::Service for RwhodClientServer {
type MethodCall<'de> = RwhodClientRequest;
type ReplyParams<'se> = RwhodClientResponse;
type ReplyStreamParams = ();
type ReplyStream = futures_util::stream::Empty<zlink::Reply<()>>;
type ReplyError<'se> = RwhodClientError;
async fn handle<'ser, 'de: 'ser, Sock: zlink::connection::Socket>(
&'ser mut self,
call: zlink::Call<Self::MethodCall<'de>>,
_conn: &mut zlink::Connection<Sock>,
) -> MethodReply<Self::ReplyParams<'ser>, Self::ReplyStream, Self::ReplyError<'ser>> {
match call.method() {
// TODO: handle 'all' parameter
RwhodClientRequest::Rwho { .. } => {
let store = self.whod_status_store.read().await;
let mut all_user_entries = Vec::new();
for status_update in store.values() {
all_user_entries.extend_from_slice(
&status_update
.users
.iter()
.map(|user| (status_update.hostname.clone(), user.clone()))
.collect::<Vec<(String, WhodUserEntry)>>(),
);
}
MethodReply::Single(Some(RwhodClientResponse::Rwho(all_user_entries)))
}
RwhodClientRequest::Ruptime => {
let store = self.whod_status_store.read().await;
let all_status_updates = store.values().cloned().collect();
MethodReply::Single(Some(RwhodClientResponse::Ruptime(all_status_updates)))
}
}
}
}
pub async fn rwhod_client_server_task(
socket: zlink::unix::Listener,
whod_status_store: Arc<tokio::sync::RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
) -> anyhow::Result<()> {
let service = RwhodClientServer::new(whod_status_store);
let server = zlink::Server::new(socket, service);
tracing::info!("Starting Rwhod client API server");
server
.run()
.await
.context("Rwhod client API server failed")?;
Ok(())
}

119
src/server/varlink_api.rs Normal file
View File

@@ -0,0 +1,119 @@
use std::{
collections::HashMap,
net::IpAddr,
sync::Arc,
};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use zlink::{ReplyError, service::MethodReply};
use crate::proto::{WhodStatusUpdate, WhodUserEntry};
#[zlink::proxy("no.ntnu.pvv.roowho2.rwhod")]
pub trait RwhodClientProxy {
async fn rwho(
&mut self,
all: bool,
) -> zlink::Result<Result<Vec<(String, WhodUserEntry)>, RwhodClientError>>;
async fn ruptime(&mut self) -> zlink::Result<Result<Vec<WhodStatusUpdate>, RwhodClientError>>;
}
#[derive(Debug, Deserialize)]
#[serde(tag = "method", content = "parameters")]
pub enum RwhodClientRequest {
#[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Rwho")]
Rwho {
/// Retrieve all users, even those that have been idle for a long time.
all: bool,
},
#[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Ruptime")]
Ruptime,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum RwhodClientResponse {
Rwho(RwhoResponse),
Ruptime(RuptimeResponse),
}
pub type RwhoResponse = Vec<(String, WhodUserEntry)>;
pub type RuptimeResponse = Vec<WhodStatusUpdate>;
#[derive(Debug, ReplyError)]
#[zlink(interface = "no.ntnu.pvv.roowho2.rwhod")]
pub enum RwhodClientError {
InvalidRequest,
}
#[derive(Debug, Clone)]
pub struct RwhodClientServer {
whod_status_store: Arc<RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
}
impl RwhodClientServer {
pub fn new(
whod_status_store: Arc<RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
) -> Self {
Self { whod_status_store }
}
}
impl zlink::Service for RwhodClientServer {
type MethodCall<'de> = RwhodClientRequest;
type ReplyParams<'se> = RwhodClientResponse;
type ReplyStreamParams = ();
type ReplyStream = futures_util::stream::Empty<zlink::Reply<()>>;
type ReplyError<'se> = RwhodClientError;
async fn handle<'ser, 'de: 'ser, Sock: zlink::connection::Socket>(
&'ser mut self,
call: zlink::Call<Self::MethodCall<'de>>,
_conn: &mut zlink::Connection<Sock>,
) -> MethodReply<Self::ReplyParams<'ser>, Self::ReplyStream, Self::ReplyError<'ser>> {
match call.method() {
// TODO: handle 'all' parameter
RwhodClientRequest::Rwho { .. } => {
let store = self.whod_status_store.read().await;
let mut all_user_entries = Vec::new();
for status_update in store.values() {
all_user_entries.extend_from_slice(
&status_update
.users
.iter()
.map(|user| (status_update.hostname.clone(), user.clone()))
.collect::<Vec<(String, WhodUserEntry)>>(),
);
}
MethodReply::Single(Some(RwhodClientResponse::Rwho(all_user_entries)))
}
RwhodClientRequest::Ruptime => {
let store = self.whod_status_store.read().await;
let all_status_updates = store.values().cloned().collect();
MethodReply::Single(Some(RwhodClientResponse::Ruptime(all_status_updates)))
}
}
}
}
pub async fn varlink_client_server_task(
socket: zlink::unix::Listener,
whod_status_store: Arc<RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
) -> anyhow::Result<()> {
let service = RwhodClientServer::new(whod_status_store);
let server = zlink::Server::new(socket, service);
tracing::info!("Starting Rwhod client API server");
server
.run()
.await
.context("Rwhod client API server failed")?;
Ok(())
}