diff --git a/src/bin/roowhod.rs b/src/bin/roowhod.rs index c3e7da4..dd8bfd3 100644 --- a/src/bin/roowhod.rs +++ b/src/bin/roowhod.rs @@ -10,8 +10,9 @@ use anyhow::Context; use clap::Parser; use roowho2_lib::{ proto::WhodStatusUpdate, - server::rwhod::{ - rwhod_client_server_task, rwhod_packet_receiver_task, rwhod_packet_sender_task, + server::{ + rwhod::{rwhod_packet_receiver_task, rwhod_packet_sender_task}, + varlink_api::varlink_client_server_task, }, }; use tokio::{net::UdpSocket, sync::RwLock}; @@ -131,7 +132,7 @@ async fn client_server( unsafe { std::os::unix::net::UnixListener::from_raw_fd(socket_fd.as_raw_fd()) }; std_socket.set_nonblocking(true)?; let zlink_listener = zlink::unix::Listener::try_from(OwnedFd::from(std_socket))?; - let client_server_task = rwhod_client_server_task(zlink_listener, whod_status_store); + let client_server_task = varlink_client_server_task(zlink_listener, whod_status_store); client_server_task.await?; diff --git a/src/bin/ruptime.rs b/src/bin/ruptime.rs index b53b598..90e1cca 100644 --- a/src/bin/ruptime.rs +++ b/src/bin/ruptime.rs @@ -2,7 +2,7 @@ use anyhow::Context; use chrono::{Duration, Utc}; use clap::Parser; -use roowho2_lib::{proto::WhodStatusUpdate, server::rwhod::RwhodClientProxy}; +use roowho2_lib::{proto::WhodStatusUpdate, server::varlink_api::RwhodClientProxy}; /// Show host status of local machines. /// diff --git a/src/bin/rwho.rs b/src/bin/rwho.rs index 6fa1e9a..1f01cc9 100644 --- a/src/bin/rwho.rs +++ b/src/bin/rwho.rs @@ -1,6 +1,6 @@ use anyhow::Context; use clap::Parser; -use roowho2_lib::{proto::WhodUserEntry, server::rwhod::RwhodClientProxy}; +use roowho2_lib::{proto::WhodUserEntry, server::varlink_api::RwhodClientProxy}; /// Check who is logged in on local machines. /// diff --git a/src/server.rs b/src/server.rs index 41da756..bf12f3f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,2 +1,3 @@ pub mod config; pub mod rwhod; +pub mod varlink_api; diff --git a/src/server/rwhod.rs b/src/server/rwhod.rs index 558e14e..2585741 100644 --- a/src/server/rwhod.rs +++ b/src/server/rwhod.rs @@ -8,9 +8,7 @@ use std::{ use anyhow::Context; use chrono::{DateTime, Duration, Timelike, Utc}; use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat}; -use serde::{Deserialize, Serialize}; use uucore::utmpx::Utmpx; -use zlink::{ReplyError, service::MethodReply}; use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry}; @@ -119,7 +117,7 @@ pub fn determine_relevant_interfaces() -> anyhow::Result> { None => None, } }) - /* keep first occurrence per interface name */ + // keep first occurrence per interface name .scan(HashSet::new(), |seen, n| { if seen.insert(n.name.clone()) { Some(n) @@ -230,107 +228,3 @@ pub async fn rwhod_packet_sender_task( } } } - -#[zlink::proxy("no.ntnu.pvv.roowho2.rwhod")] -pub trait RwhodClientProxy { - async fn rwho( - &mut self, - all: bool, - ) -> zlink::Result, RwhodClientError>>; - - async fn ruptime(&mut self) -> zlink::Result, RwhodClientError>>; -} - -#[derive(Debug, Deserialize)] -#[serde(tag = "method", content = "parameters")] -pub enum RwhodClientRequest { - #[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Rwho")] - Rwho { - /// Retrieve all users, even those that have been idle for a long time. - all: bool, - }, - - #[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Ruptime")] - Ruptime, -} - -#[derive(Debug, Serialize)] -#[serde(untagged)] -pub enum RwhodClientResponse { - Rwho(Vec<(String, WhodUserEntry)>), - Ruptime(Vec), -} - -#[derive(Debug, ReplyError)] -#[zlink(interface = "no.ntnu.pvv.roowho2.rwhod")] -pub enum RwhodClientError { - InvalidRequest, -} - -#[derive(Debug, Clone)] -pub struct RwhodClientServer { - whod_status_store: Arc>>, -} - -impl RwhodClientServer { - pub fn new( - whod_status_store: Arc>>, - ) -> Self { - Self { whod_status_store } - } -} - -impl zlink::Service for RwhodClientServer { - type MethodCall<'de> = RwhodClientRequest; - type ReplyParams<'se> = RwhodClientResponse; - type ReplyStreamParams = (); - type ReplyStream = futures_util::stream::Empty>; - type ReplyError<'se> = RwhodClientError; - - async fn handle<'ser, 'de: 'ser, Sock: zlink::connection::Socket>( - &'ser mut self, - call: zlink::Call>, - _conn: &mut zlink::Connection, - ) -> MethodReply, Self::ReplyStream, Self::ReplyError<'ser>> { - match call.method() { - // TODO: handle 'all' parameter - RwhodClientRequest::Rwho { .. } => { - let store = self.whod_status_store.read().await; - let mut all_user_entries = Vec::new(); - for status_update in store.values() { - all_user_entries.extend_from_slice( - &status_update - .users - .iter() - .map(|user| (status_update.hostname.clone(), user.clone())) - .collect::>(), - ); - } - MethodReply::Single(Some(RwhodClientResponse::Rwho(all_user_entries))) - } - RwhodClientRequest::Ruptime => { - let store = self.whod_status_store.read().await; - let all_status_updates = store.values().cloned().collect(); - MethodReply::Single(Some(RwhodClientResponse::Ruptime(all_status_updates))) - } - } - } -} - -pub async fn rwhod_client_server_task( - socket: zlink::unix::Listener, - whod_status_store: Arc>>, -) -> anyhow::Result<()> { - let service = RwhodClientServer::new(whod_status_store); - - let server = zlink::Server::new(socket, service); - - tracing::info!("Starting Rwhod client API server"); - - server - .run() - .await - .context("Rwhod client API server failed")?; - - Ok(()) -} diff --git a/src/server/varlink_api.rs b/src/server/varlink_api.rs new file mode 100644 index 0000000..1895b35 --- /dev/null +++ b/src/server/varlink_api.rs @@ -0,0 +1,119 @@ +use std::{ + collections::HashMap, + net::IpAddr, + sync::Arc, +}; + +use anyhow::Context; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use zlink::{ReplyError, service::MethodReply}; + +use crate::proto::{WhodStatusUpdate, WhodUserEntry}; + +#[zlink::proxy("no.ntnu.pvv.roowho2.rwhod")] +pub trait RwhodClientProxy { + async fn rwho( + &mut self, + all: bool, + ) -> zlink::Result, RwhodClientError>>; + + async fn ruptime(&mut self) -> zlink::Result, RwhodClientError>>; +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "method", content = "parameters")] +pub enum RwhodClientRequest { + #[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Rwho")] + Rwho { + /// Retrieve all users, even those that have been idle for a long time. + all: bool, + }, + + #[serde(rename = "no.ntnu.pvv.roowho2.rwhod.Ruptime")] + Ruptime, +} + +#[derive(Debug, Serialize)] +#[serde(untagged)] +pub enum RwhodClientResponse { + Rwho(RwhoResponse), + Ruptime(RuptimeResponse), +} + +pub type RwhoResponse = Vec<(String, WhodUserEntry)>; +pub type RuptimeResponse = Vec; + +#[derive(Debug, ReplyError)] +#[zlink(interface = "no.ntnu.pvv.roowho2.rwhod")] +pub enum RwhodClientError { + InvalidRequest, +} + +#[derive(Debug, Clone)] +pub struct RwhodClientServer { + whod_status_store: Arc>>, +} + +impl RwhodClientServer { + pub fn new( + whod_status_store: Arc>>, + ) -> Self { + Self { whod_status_store } + } +} + +impl zlink::Service for RwhodClientServer { + type MethodCall<'de> = RwhodClientRequest; + type ReplyParams<'se> = RwhodClientResponse; + type ReplyStreamParams = (); + type ReplyStream = futures_util::stream::Empty>; + type ReplyError<'se> = RwhodClientError; + + async fn handle<'ser, 'de: 'ser, Sock: zlink::connection::Socket>( + &'ser mut self, + call: zlink::Call>, + _conn: &mut zlink::Connection, + ) -> MethodReply, Self::ReplyStream, Self::ReplyError<'ser>> { + match call.method() { + // TODO: handle 'all' parameter + RwhodClientRequest::Rwho { .. } => { + let store = self.whod_status_store.read().await; + let mut all_user_entries = Vec::new(); + for status_update in store.values() { + all_user_entries.extend_from_slice( + &status_update + .users + .iter() + .map(|user| (status_update.hostname.clone(), user.clone())) + .collect::>(), + ); + } + MethodReply::Single(Some(RwhodClientResponse::Rwho(all_user_entries))) + } + RwhodClientRequest::Ruptime => { + let store = self.whod_status_store.read().await; + let all_status_updates = store.values().cloned().collect(); + MethodReply::Single(Some(RwhodClientResponse::Ruptime(all_status_updates))) + } + } + } +} + +pub async fn varlink_client_server_task( + socket: zlink::unix::Listener, + whod_status_store: Arc>>, +) -> anyhow::Result<()> { + let service = RwhodClientServer::new(whod_status_store); + + let server = zlink::Server::new(socket, service); + + tracing::info!("Starting Rwhod client API server"); + + server + .run() + .await + .context("Rwhod client API server failed")?; + + Ok(()) +}