server/rwhod: add both sender and receiver task
Some checks failed
Build and test / check (push) Failing after 57s
Build and test / build (push) Successful in 1m12s
Build and test / test (push) Failing after 2m4s
Build and test / docs (push) Failing after 2m15s

This commit is contained in:
2026-01-05 00:31:20 +09:00
parent dabc54a943
commit 50665fe07b
5 changed files with 326 additions and 93 deletions

148
Cargo.lock generated
View File

@@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "aho-corasick"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
dependencies = [
"memchr",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
@@ -391,6 +400,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.179"
@@ -409,6 +424,15 @@ version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "matchers"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata",
]
[[package]]
name = "memchr"
version = "2.7.6"
@@ -448,6 +472,15 @@ dependencies = [
"memoffset",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "num-conv"
version = "0.1.0"
@@ -557,6 +590,23 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "regex-automata"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
[[package]]
name = "roowho2"
version = "0.1.0"
@@ -567,6 +617,8 @@ dependencies = [
"clap",
"nix",
"tokio",
"tracing",
"tracing-subscriber",
"uucore",
]
@@ -631,12 +683,31 @@ dependencies = [
"syn",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b"
dependencies = [
"errno",
"libc",
]
[[package]]
name = "siphasher"
version = "1.0.1"
@@ -706,6 +777,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
]
[[package]]
name = "time"
version = "0.3.44"
@@ -759,6 +839,7 @@ dependencies = [
"libc",
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.61.2",
@@ -775,6 +856,67 @@ dependencies = [
"syn",
]
[[package]]
name = "tracing"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex-automata",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
name = "type-map"
version = "0.5.1"
@@ -853,6 +995,12 @@ dependencies = [
"quote",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"

View File

@@ -19,7 +19,9 @@ bytes = "1.11.0"
chrono = { version = "0.4.42", features = ["serde"] }
clap = { version = "4.5.53", features = ["derive"] }
nix = { version = "0.30.1", features = ["hostname", "net"] }
tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread"] }
tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread", "signal", "sync", "time"] }
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
# onc-rpc = "0.3.2"
# sd-notify = "0.4.5"
# serde = { version = "1.0.228", features = ["derive"] }

View File

@@ -1,41 +1,48 @@
use std::net::SocketAddrV4;
use std::{
collections::HashMap,
net::{Ipv4Addr, SocketAddrV4},
sync::Arc,
};
use anyhow::Context;
use chrono::Timelike;
use roowho2_lib::proto::{Whod, WhodStatusUpdate};
const RWHOD_BROADCAST_PORT: u16 = 513;
use roowho2_lib::server::rwhod::{
RWHOD_BROADCAST_PORT, rwhod_packet_receiver_task, rwhod_packet_sender_task,
};
use tokio::sync::RwLock;
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let addr = SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, RWHOD_BROADCAST_PORT);
let socket = tokio::net::UdpSocket::bind(addr).await?;
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, RWHOD_BROADCAST_PORT);
tracing::info!("Binding RWHOD socket to {}", addr);
let socket = Arc::new(tokio::net::UdpSocket::bind(addr).await?);
socket.set_broadcast(true)?;
let mut buf = [0u8; Whod::MAX_SIZE];
loop {
let (len, src) = socket.recv_from(&mut buf).await?;
if len < Whod::HEADER_SIZE {
eprintln!(
"Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)",
Whod::HEADER_SIZE
);
continue;
let interfaces = roowho2_lib::server::rwhod::determine_relevant_interfaces()?;
let sender_task = rwhod_packet_sender_task(socket.clone(), interfaces);
let status_store = Arc::new(RwLock::new(HashMap::new()));
let receiver_task = rwhod_packet_receiver_task(socket.clone(), status_store);
tokio::select! {
res = sender_task => {
if let Err(err) = res {
eprintln!("RWHOD sender task error: {}", err);
}
}
res = receiver_task => {
if let Err(err) = res {
eprintln!("RWHOD receiver task error: {}", err);
}
}
_ = tokio::signal::ctrl_c() => {
println!("Received Ctrl-C, shutting down.");
}
let result: WhodStatusUpdate = Whod::from_bytes(&buf[..len])
.context("Failed to parse whod packet")?
.try_into()
.map(|mut status_update: WhodStatusUpdate| {
let timestamp = chrono::Utc::now()
.with_nanosecond(0)
.unwrap_or(chrono::Utc::now());
status_update.recvtime = Some(timestamp);
status_update
})
.map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e))?;
println!("Received whod packet from {src}:\n{result:#?}");
buf = [0u8; Whod::MAX_SIZE];
}
Ok(())
}

View File

@@ -1,7 +1,7 @@
use std::array;
use bytes::{Buf, BufMut, BytesMut};
use chrono::Duration;
use chrono::{DateTime, Duration, Utc};
/// Classic C struct for utmp data for a single user session.
///
@@ -120,7 +120,7 @@ impl Whod {
}
}
pub fn to_bytes(&self) -> [u8; Whod::MAX_SIZE] {
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = BytesMut::with_capacity(Whod::MAX_SIZE);
buf.put_u8(self.wd_vers);
buf.put_u8(self.wd_type);
@@ -133,18 +133,14 @@ impl Whod {
buf.put_i32(self.wd_loadav[2]);
buf.put_i32(self.wd_boottime);
for whoent in &self.wd_we {
for whoent in self.wd_we.iter().take_while(|entry| !entry.is_zeroed()) {
buf.put_slice(&whoent.we_utmp.out_line);
buf.put_slice(&whoent.we_utmp.out_name);
buf.put_i32(whoent.we_utmp.out_time);
buf.put_i32(whoent.we_idle);
}
// SAFETY: this should never happen, Whod::MAX_SIZE is computed from the struct size
buf
.to_vec()
.try_into()
.expect("Buffer length mismatch, this should never happen")
buf.to_vec()
}
pub fn from_bytes(input: &[u8]) -> anyhow::Result<Self> {
@@ -236,10 +232,10 @@ impl Whod {
pub struct WhodStatusUpdate {
// NOTE: there is only one defined packet type, so we just omit it here
/// Timestamp by sender
pub sendtime: chrono::DateTime<chrono::Utc>,
pub sendtime: DateTime<Utc>,
/// Timestamp applied by receiver
pub recvtime: Option<chrono::DateTime<chrono::Utc>>,
pub recvtime: Option<DateTime<Utc>>,
/// Name of the host sending the status update (max 32 characters)
pub hostname: String,
@@ -252,7 +248,7 @@ pub struct WhodStatusUpdate {
pub load_average_15_min: i32,
/// Which time the system was booted
pub boot_time: chrono::DateTime<chrono::Utc>,
pub boot_time: DateTime<Utc>,
/// List of users currently logged in to the host (max 42 entries)
pub users: Vec<WhodUserEntry>,
@@ -260,13 +256,13 @@ pub struct WhodStatusUpdate {
impl WhodStatusUpdate {
pub fn new(
sendtime: chrono::DateTime<chrono::Utc>,
recvtime: Option<chrono::DateTime<chrono::Utc>>,
sendtime: DateTime<Utc>,
recvtime: Option<DateTime<Utc>>,
hostname: String,
load_average_5_min: i32,
load_average_10_min: i32,
load_average_15_min: i32,
boot_time: chrono::DateTime<chrono::Utc>,
boot_time: DateTime<Utc>,
users: Vec<WhodUserEntry>,
) -> Self {
Self {
@@ -295,7 +291,7 @@ pub struct WhodUserEntry {
pub user_id: String,
/// Time when the user logged in
pub login_time: chrono::DateTime<chrono::Utc>,
pub login_time: DateTime<Utc>,
/// How long since the user last typed on the TTY
pub idle_time: Duration,
@@ -305,7 +301,7 @@ impl WhodUserEntry {
pub fn new(
tty: String,
user_id: String,
login_time: chrono::DateTime<chrono::Utc>,
login_time: DateTime<Utc>,
idle_time: Duration,
) -> Self {
Self {
@@ -339,11 +335,9 @@ impl TryFrom<Whoent> for WhodUserEntry {
let user_id = String::from_utf8(value.we_utmp.out_name[..user_id_end].to_vec())
.map_err(|e| format!("Invalid UTF-8 in user ID: {}", e))?;
let login_time = chrono::DateTime::from_timestamp_secs(value.we_utmp.out_time as i64)
.ok_or(format!(
"Invalid login time timestamp: {}",
value.we_utmp.out_time
))?;
let login_time = DateTime::from_timestamp_secs(value.we_utmp.out_time as i64).ok_or(
format!("Invalid login time timestamp: {}", value.we_utmp.out_time),
)?;
Ok(WhodUserEntry {
tty,
@@ -365,15 +359,16 @@ impl TryFrom<Whod> for WhodStatusUpdate {
));
}
let sendtime = chrono::DateTime::from_timestamp_secs(value.wd_sendtime as i64).ok_or(
format!("Invalid send time timestamp: {}", value.wd_sendtime),
)?;
let sendtime = DateTime::from_timestamp_secs(value.wd_sendtime as i64).ok_or(format!(
"Invalid send time timestamp: {}",
value.wd_sendtime
))?;
let recvtime = if value.wd_recvtime == 0 {
None
} else {
Some(
chrono::DateTime::from_timestamp_secs(value.wd_recvtime as i64).ok_or(format!(
DateTime::from_timestamp_secs(value.wd_recvtime as i64).ok_or(format!(
"Invalid receive time timestamp: {}",
value.wd_recvtime
))?,
@@ -388,9 +383,10 @@ impl TryFrom<Whod> for WhodStatusUpdate {
let hostname = String::from_utf8(value.wd_hostname[..hostname_end].to_vec())
.map_err(|e| format!("Invalid UTF-8 in hostname: {}", e))?;
let boot_time = chrono::DateTime::from_timestamp_secs(value.wd_boottime as i64).ok_or(
format!("Invalid boot time timestamp: {}", value.wd_boottime),
)?;
let boot_time = DateTime::from_timestamp_secs(value.wd_boottime as i64).ok_or(format!(
"Invalid boot time timestamp: {}",
value.wd_boottime
))?;
let users = value
.wd_we
@@ -428,14 +424,12 @@ impl TryFrom<WhodUserEntry> for Whoent {
let out_time = value
.login_time
.timestamp()
.max(i32::MAX as i64)
.min(i32::MIN as i64) as i32;
.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
let we_idle = value
.idle_time
.num_seconds()
.max(i32::MAX as i64)
.min(i32::MIN as i64) as i32;
.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
Ok(Whoent {
we_utmp: Outmp {
@@ -460,18 +454,16 @@ impl TryFrom<WhodStatusUpdate> for Whod {
let wd_sendtime = value
.sendtime
.timestamp()
.max(i32::MAX as i64)
.min(i32::MIN as i64) as i32;
.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
let wd_recvtime = value.recvtime.map_or(0, |dt| {
dt.timestamp().max(i32::MAX as i64).min(i32::MIN as i64) as i32
dt.timestamp().clamp(i32::MIN as i64, i32::MAX as i64) as i32
});
let wd_boottime = value
.boot_time
.timestamp()
.max(i32::MAX as i64)
.min(i32::MIN as i64) as i32;
.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
let wd_we = value
.users

View File

@@ -1,35 +1,44 @@
use std::{collections::HashSet, net::IpAddr, path::Path};
use std::{
collections::{HashMap, HashSet},
net::IpAddr,
path::Path,
sync::Arc,
};
use chrono::{Duration, Timelike};
use anyhow::Context;
use chrono::{DateTime, Duration, Timelike, Utc};
use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat};
use uucore::utmpx::Utmpx;
use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry};
/// Default port for rwhod communication.
pub const RWHOD_BROADCAST_PORT: u16 = 513;
/// Reads utmp entries to determine currently logged-in users.
pub fn generate_rwhod_user_entries() -> anyhow::Result<Vec<WhodUserEntry>> {
pub fn generate_rwhod_user_entries(now: DateTime<Utc>) -> anyhow::Result<Vec<WhodUserEntry>> {
Utmpx::iter_all_records()
.filter(|entry| entry.is_user_process())
.map(|entry| {
let login_time = entry
.login_time()
.checked_to_utc()
.and_then(|t| {
chrono::DateTime::<chrono::Utc>::from_timestamp_secs(t.unix_timestamp())
})
.and_then(|t| DateTime::<Utc>::from_timestamp_secs(t.unix_timestamp()))
.ok_or_else(|| anyhow::anyhow!("Failed to convert login time to UTC"))?;
let idle_time = stat(&Path::new("/dev").join(entry.tty_device()))
.ok()
.and_then(|st| {
let last_active =
chrono::DateTime::<chrono::Utc>::from_timestamp_secs(st.st_atime)?;
let now = chrono::Utc::now().with_nanosecond(0)?;
Some(now - last_active)
let last_active = DateTime::<Utc>::from_timestamp_secs(st.st_atime)?;
Some((now - last_active).max(Duration::zero()))
})
.unwrap_or(Duration::zero());
debug_assert!(
idle_time.num_seconds() >= 0,
"Idle time should never be negative"
);
Ok(WhodUserEntry::new(
entry.tty_device(),
entry.user(),
@@ -41,24 +50,23 @@ pub fn generate_rwhod_user_entries() -> anyhow::Result<Vec<WhodUserEntry>> {
}
/// Generate a rwhod status update packet representing the current system state.
pub fn generate_rwhod_status_update() -> anyhow::Result<Whod> {
pub fn generate_rwhod_status_update() -> anyhow::Result<WhodStatusUpdate> {
let sysinfo = nix::sys::sysinfo::sysinfo().unwrap();
let load_average = sysinfo.load_average();
let uptime = sysinfo.uptime();
let hostname = nix::unistd::gethostname()?.to_str().unwrap().to_string();
let now = Utc::now().with_nanosecond(0).unwrap_or(Utc::now());
let result = WhodStatusUpdate::new(
chrono::Utc::now(),
now,
None,
hostname,
(load_average.0 * 100.0).abs() as i32,
(load_average.1 * 100.0).abs() as i32,
(load_average.2 * 100.0).abs() as i32,
chrono::Utc::now() - uptime,
generate_rwhod_user_entries()?,
)
.try_into()
.map_err(|e| anyhow::anyhow!("{}", e))?;
now - uptime,
generate_rwhod_user_entries(now)?,
);
Ok(result)
}
@@ -120,17 +128,24 @@ pub fn determine_relevant_interfaces() -> anyhow::Result<Vec<RwhodSendTarget>> {
}
pub async fn send_rwhod_packet_to_interface(
socket: &mut tokio::net::UdpSocket,
socket: Arc<tokio::net::UdpSocket>,
interface: &RwhodSendTarget,
packet: &Whod,
) -> anyhow::Result<()> {
let serialized_packet = packet.to_bytes();
// TODO: the old rwhod daemon doesn't actually ever listen to ipv6, maybe remove it
let target_addr = match interface.addr {
IpAddr::V4(addr) => std::net::SocketAddr::new(IpAddr::V4(addr), 0),
IpAddr::V6(addr) => std::net::SocketAddr::new(IpAddr::V6(addr), 0),
IpAddr::V4(addr) => std::net::SocketAddr::new(IpAddr::V4(addr), RWHOD_BROADCAST_PORT),
IpAddr::V6(addr) => std::net::SocketAddr::new(IpAddr::V6(addr), RWHOD_BROADCAST_PORT),
};
tracing::debug!(
"Sending rwhod packet to interface {} at address {}",
interface.name,
target_addr
);
socket
.send_to(&serialized_packet, &target_addr)
.await
@@ -139,8 +154,77 @@ pub async fn send_rwhod_packet_to_interface(
Ok(())
}
// TODO: implement receiving rwhod packets from other hosts
pub async fn rwhod_packet_receiver_task(
socket: Arc<tokio::net::UdpSocket>,
whod_status_store: Arc<tokio::sync::RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
) -> anyhow::Result<()> {
let mut buf = [0u8; Whod::MAX_SIZE];
// TODO: implement storing and loading rwhod packets to/from file
loop {
let (len, src) = socket.recv_from(&mut buf).await?;
tracing::debug!("Received rwhod packet of length {} bytes from {}", len, src);
if len < Whod::HEADER_SIZE {
tracing::error!(
"Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)",
Whod::HEADER_SIZE
);
continue;
}
let result = Whod::from_bytes(&buf[..len])
.context("Failed to parse whod packet")?
.try_into()
.map(|mut status_update: WhodStatusUpdate| {
let timestamp = Utc::now().with_nanosecond(0).unwrap_or(Utc::now());
status_update.recvtime = Some(timestamp);
status_update
})
.map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e));
match result {
Ok(status_update) => {
tracing::debug!("Processed whod packet from {src}: {:?}", status_update);
let mut store = whod_status_store.write().await;
store.insert(src.ip(), status_update);
}
Err(err) => {
tracing::error!("Error processing whod packet from {src}: {err}");
}
}
}
}
pub async fn rwhod_packet_sender_task(
socket: Arc<tokio::net::UdpSocket>,
interfaces: Vec<RwhodSendTarget>,
) -> anyhow::Result<()> {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
loop {
interval.tick().await;
let status_update = generate_rwhod_status_update()?;
tracing::debug!("Generated rwhod packet: {:?}", status_update);
let packet = status_update
.try_into()
.map_err(|e| anyhow::anyhow!("{}", e))?;
for interface in &interfaces {
if let Err(e) = send_rwhod_packet_to_interface(socket.clone(), interface, &packet).await
{
tracing::error!(
"Failed to send rwhod packet on interface {}: {}",
interface.name,
e
);
}
}
}
}
// TODO: implement protocol for cli - daemon communication