server: deduplicate rwhod entries by hostname, misc cleanup
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::IpAddr,
|
||||
os::fd::{AsRawFd, FromRawFd, OwnedFd},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
@@ -8,16 +7,14 @@ use std::{
|
||||
|
||||
use anyhow::Context;
|
||||
use clap::Parser;
|
||||
use roowho2_lib::{
|
||||
proto::WhodStatusUpdate,
|
||||
server::{
|
||||
rwhod::{rwhod_packet_receiver_task, rwhod_packet_sender_task},
|
||||
varlink_api::varlink_client_server_task,
|
||||
},
|
||||
};
|
||||
use tokio::{net::UdpSocket, sync::RwLock};
|
||||
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
use roowho2_lib::server::{
|
||||
rwhod::{RwhodStatusStore, rwhod_packet_receiver_task, rwhod_packet_sender_task},
|
||||
varlink_api::varlink_client_server_task,
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(
|
||||
author = "Programvareverkstedet <projects@pvv.ntnu.no>",
|
||||
@@ -106,7 +103,7 @@ async fn ctrl_c_handler() -> anyhow::Result<()> {
|
||||
|
||||
async fn rwhod_server(
|
||||
socket: UdpSocket,
|
||||
whod_status_store: Arc<RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
|
||||
whod_status_store: RwhodStatusStore,
|
||||
) -> anyhow::Result<()> {
|
||||
let socket = Arc::new(socket);
|
||||
|
||||
@@ -125,7 +122,7 @@ async fn rwhod_server(
|
||||
|
||||
async fn client_server(
|
||||
socket_fd: OwnedFd,
|
||||
whod_status_store: Arc<RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
|
||||
whod_status_store: RwhodStatusStore,
|
||||
) -> anyhow::Result<()> {
|
||||
// SAFETY: see above
|
||||
let std_socket =
|
||||
|
||||
@@ -1,13 +1,23 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
net::IpAddr,
|
||||
net::{IpAddr, SocketAddr},
|
||||
path::Path,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use chrono::{DateTime, Duration, Timelike, Utc};
|
||||
use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat};
|
||||
use nix::{
|
||||
ifaddrs::getifaddrs,
|
||||
net::if_::InterfaceFlags,
|
||||
sys::{stat::stat, sysinfo::sysinfo},
|
||||
unistd::gethostname,
|
||||
};
|
||||
use tokio::{
|
||||
net::UdpSocket,
|
||||
sync::RwLock,
|
||||
time::{Duration as TokioDuration, interval},
|
||||
};
|
||||
use uucore::utmpx::Utmpx;
|
||||
|
||||
use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry};
|
||||
@@ -15,6 +25,8 @@ use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry};
|
||||
/// Default port for rwhod communication.
|
||||
pub const RWHOD_BROADCAST_PORT: u16 = 513;
|
||||
|
||||
pub type RwhodStatusStore = Arc<RwLock<HashMap<String, WhodStatusUpdate>>>;
|
||||
|
||||
/// Reads utmp entries to determine currently logged-in users.
|
||||
pub fn generate_rwhod_user_entries(now: DateTime<Utc>) -> anyhow::Result<Vec<WhodUserEntry>> {
|
||||
Utmpx::iter_all_records()
|
||||
@@ -51,10 +63,10 @@ pub fn generate_rwhod_user_entries(now: DateTime<Utc>) -> anyhow::Result<Vec<Who
|
||||
|
||||
/// Generate a rwhod status update packet representing the current system state.
|
||||
pub fn generate_rwhod_status_update() -> anyhow::Result<WhodStatusUpdate> {
|
||||
let sysinfo = nix::sys::sysinfo::sysinfo().unwrap();
|
||||
let sysinfo = sysinfo().unwrap();
|
||||
let load_average = sysinfo.load_average();
|
||||
let uptime = sysinfo.uptime();
|
||||
let hostname = nix::unistd::gethostname()?.to_str().unwrap().to_string();
|
||||
let hostname = gethostname()?.to_str().unwrap().to_string();
|
||||
let now = Utc::now().with_nanosecond(0).unwrap_or(Utc::now());
|
||||
|
||||
let result = WhodStatusUpdate::new(
|
||||
@@ -130,7 +142,7 @@ pub fn determine_relevant_interfaces() -> anyhow::Result<Vec<RwhodSendTarget>> {
|
||||
}
|
||||
|
||||
pub async fn send_rwhod_packet_to_interface(
|
||||
socket: Arc<tokio::net::UdpSocket>,
|
||||
socket: Arc<UdpSocket>,
|
||||
interface: &RwhodSendTarget,
|
||||
packet: &Whod,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -138,8 +150,8 @@ pub async fn send_rwhod_packet_to_interface(
|
||||
|
||||
// TODO: the old rwhod daemon doesn't actually ever listen to ipv6, maybe remove it
|
||||
let target_addr = match interface.addr {
|
||||
IpAddr::V4(addr) => std::net::SocketAddr::new(IpAddr::V4(addr), RWHOD_BROADCAST_PORT),
|
||||
IpAddr::V6(addr) => std::net::SocketAddr::new(IpAddr::V6(addr), RWHOD_BROADCAST_PORT),
|
||||
IpAddr::V4(addr) => SocketAddr::new(IpAddr::V4(addr), RWHOD_BROADCAST_PORT),
|
||||
IpAddr::V6(addr) => SocketAddr::new(IpAddr::V6(addr), RWHOD_BROADCAST_PORT),
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
@@ -157,8 +169,8 @@ pub async fn send_rwhod_packet_to_interface(
|
||||
}
|
||||
|
||||
pub async fn rwhod_packet_receiver_task(
|
||||
socket: Arc<tokio::net::UdpSocket>,
|
||||
whod_status_store: Arc<tokio::sync::RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
|
||||
socket: Arc<UdpSocket>,
|
||||
whod_status_store: RwhodStatusStore,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut buf = [0u8; Whod::MAX_SIZE];
|
||||
|
||||
@@ -190,7 +202,7 @@ pub async fn rwhod_packet_receiver_task(
|
||||
tracing::debug!("Processed whod packet from {src}: {:?}", status_update);
|
||||
|
||||
let mut store = whod_status_store.write().await;
|
||||
store.insert(src.ip(), status_update);
|
||||
store.insert(status_update.hostname.clone(), status_update);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Error processing whod packet from {src}: {err}");
|
||||
@@ -200,10 +212,10 @@ pub async fn rwhod_packet_receiver_task(
|
||||
}
|
||||
|
||||
pub async fn rwhod_packet_sender_task(
|
||||
socket: Arc<tokio::net::UdpSocket>,
|
||||
socket: Arc<UdpSocket>,
|
||||
interfaces: Vec<RwhodSendTarget>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
|
||||
let mut interval = interval(TokioDuration::from_secs(60));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
@@ -1,15 +1,11 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::IpAddr,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::RwLock;
|
||||
use zlink::{ReplyError, service::MethodReply};
|
||||
|
||||
use crate::proto::{WhodStatusUpdate, WhodUserEntry};
|
||||
use crate::{
|
||||
proto::{WhodStatusUpdate, WhodUserEntry},
|
||||
server::rwhod::RwhodStatusStore,
|
||||
};
|
||||
|
||||
#[zlink::proxy("no.ntnu.pvv.roowho2.rwhod")]
|
||||
pub trait RwhodClientProxy {
|
||||
@@ -51,19 +47,42 @@ pub enum RwhodClientError {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RwhodClientServer {
|
||||
whod_status_store: Arc<RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
|
||||
pub struct Roowhoo2ClientServer {
|
||||
whod_status_store: RwhodStatusStore,
|
||||
}
|
||||
|
||||
impl RwhodClientServer {
|
||||
pub fn new(
|
||||
whod_status_store: Arc<RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
|
||||
) -> Self {
|
||||
impl Roowhoo2ClientServer {
|
||||
pub fn new(whod_status_store: RwhodStatusStore) -> Self {
|
||||
Self { whod_status_store }
|
||||
}
|
||||
}
|
||||
|
||||
impl zlink::Service for RwhodClientServer {
|
||||
impl Roowhoo2ClientServer {
|
||||
// TODO: handle 'all' parameter
|
||||
async fn handle_rwho_request(&self, _all: bool) -> RwhoResponse {
|
||||
let store = self.whod_status_store.read().await;
|
||||
|
||||
let mut all_user_entries = Vec::with_capacity(store.len());
|
||||
for status_update in store.values() {
|
||||
all_user_entries.extend_from_slice(
|
||||
&status_update
|
||||
.users
|
||||
.iter()
|
||||
.map(|user| (status_update.hostname.clone(), user.clone()))
|
||||
.collect::<Vec<(String, WhodUserEntry)>>(),
|
||||
);
|
||||
}
|
||||
|
||||
all_user_entries
|
||||
}
|
||||
|
||||
async fn handle_ruptime_request(&self) -> RuptimeResponse {
|
||||
let store = self.whod_status_store.read().await;
|
||||
store.values().cloned().collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl zlink::Service for Roowhoo2ClientServer {
|
||||
type MethodCall<'de> = RwhodClientRequest;
|
||||
type ReplyParams<'se> = RwhodClientResponse;
|
||||
type ReplyStreamParams = ();
|
||||
@@ -76,35 +95,21 @@ impl zlink::Service for RwhodClientServer {
|
||||
_conn: &mut zlink::Connection<Sock>,
|
||||
) -> MethodReply<Self::ReplyParams<'ser>, Self::ReplyStream, Self::ReplyError<'ser>> {
|
||||
match call.method() {
|
||||
// TODO: handle 'all' parameter
|
||||
RwhodClientRequest::Rwho { .. } => {
|
||||
let store = self.whod_status_store.read().await;
|
||||
let mut all_user_entries = Vec::new();
|
||||
for status_update in store.values() {
|
||||
all_user_entries.extend_from_slice(
|
||||
&status_update
|
||||
.users
|
||||
.iter()
|
||||
.map(|user| (status_update.hostname.clone(), user.clone()))
|
||||
.collect::<Vec<(String, WhodUserEntry)>>(),
|
||||
);
|
||||
}
|
||||
MethodReply::Single(Some(RwhodClientResponse::Rwho(all_user_entries)))
|
||||
}
|
||||
RwhodClientRequest::Ruptime => {
|
||||
let store = self.whod_status_store.read().await;
|
||||
let all_status_updates = store.values().cloned().collect();
|
||||
MethodReply::Single(Some(RwhodClientResponse::Ruptime(all_status_updates)))
|
||||
}
|
||||
RwhodClientRequest::Rwho { all } => MethodReply::Single(Some(
|
||||
RwhodClientResponse::Rwho(self.handle_rwho_request(*all).await),
|
||||
)),
|
||||
RwhodClientRequest::Ruptime => MethodReply::Single(Some(RwhodClientResponse::Ruptime(
|
||||
self.handle_ruptime_request().await,
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn varlink_client_server_task(
|
||||
socket: zlink::unix::Listener,
|
||||
whod_status_store: Arc<RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
|
||||
whod_status_store: RwhodStatusStore,
|
||||
) -> anyhow::Result<()> {
|
||||
let service = RwhodClientServer::new(whod_status_store);
|
||||
let service = Roowhoo2ClientServer::new(whod_status_store);
|
||||
|
||||
let server = zlink::Server::new(socket, service);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user