diff --git a/Cargo.lock b/Cargo.lock index 0ba2fc1..96ecdf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "android_system_properties" version = "0.1.5" @@ -391,6 +400,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.179" @@ -409,6 +424,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.7.6" @@ -448,6 +472,15 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -557,6 +590,23 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex-automata" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" + [[package]] name = "roowho2" version = "0.1.0" @@ -567,6 +617,8 @@ dependencies = [ "clap", "nix", "tokio", + "tracing", + "tracing-subscriber", "uucore", ] @@ -631,12 +683,31 @@ dependencies = [ "syn", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "siphasher" version = "1.0.1" @@ -706,6 +777,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "time" version = "0.3.44" @@ -759,6 +839,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", @@ -775,6 +856,67 @@ dependencies = [ "syn", ] +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "type-map" version = "0.5.1" @@ -853,6 +995,12 @@ dependencies = [ "quote", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index adec9f0..6918f5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,9 @@ bytes = "1.11.0" chrono = { version = "0.4.42", features = ["serde"] } clap = { version = "4.5.53", features = ["derive"] } nix = { version = "0.30.1", features = ["hostname", "net"] } -tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread"] } +tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread", "signal", "sync", "time"] } +tracing = "0.1.44" +tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } # onc-rpc = "0.3.2" # sd-notify = "0.4.5" # serde = { version = "1.0.228", features = ["derive"] } diff --git a/src/bin/roowhod.rs b/src/bin/roowhod.rs index bac400c..dc845d0 100644 --- a/src/bin/roowhod.rs +++ b/src/bin/roowhod.rs @@ -1,41 +1,48 @@ -use std::net::SocketAddrV4; +use std::{ + collections::HashMap, + net::{Ipv4Addr, SocketAddrV4}, + sync::Arc, +}; -use anyhow::Context; -use chrono::Timelike; -use roowho2_lib::proto::{Whod, WhodStatusUpdate}; - -const RWHOD_BROADCAST_PORT: u16 = 513; +use roowho2_lib::server::rwhod::{ + RWHOD_BROADCAST_PORT, rwhod_packet_receiver_task, rwhod_packet_sender_task, +}; +use tokio::sync::RwLock; +use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt}; #[tokio::main] async fn main() -> anyhow::Result<()> { - let addr = SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, RWHOD_BROADCAST_PORT); - let socket = tokio::net::UdpSocket::bind(addr).await?; + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, RWHOD_BROADCAST_PORT); + tracing::info!("Binding RWHOD socket to {}", addr); + let socket = Arc::new(tokio::net::UdpSocket::bind(addr).await?); socket.set_broadcast(true)?; - let mut buf = [0u8; Whod::MAX_SIZE]; - loop { - let (len, src) = socket.recv_from(&mut buf).await?; - if len < Whod::HEADER_SIZE { - eprintln!( - "Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)", - Whod::HEADER_SIZE - ); - continue; + let interfaces = roowho2_lib::server::rwhod::determine_relevant_interfaces()?; + let sender_task = rwhod_packet_sender_task(socket.clone(), interfaces); + + let status_store = Arc::new(RwLock::new(HashMap::new())); + let receiver_task = rwhod_packet_receiver_task(socket.clone(), status_store); + + tokio::select! { + res = sender_task => { + if let Err(err) = res { + eprintln!("RWHOD sender task error: {}", err); + } + } + res = receiver_task => { + if let Err(err) = res { + eprintln!("RWHOD receiver task error: {}", err); + } + } + _ = tokio::signal::ctrl_c() => { + println!("Received Ctrl-C, shutting down."); } - let result: WhodStatusUpdate = Whod::from_bytes(&buf[..len]) - .context("Failed to parse whod packet")? - .try_into() - .map(|mut status_update: WhodStatusUpdate| { - let timestamp = chrono::Utc::now() - .with_nanosecond(0) - .unwrap_or(chrono::Utc::now()); - status_update.recvtime = Some(timestamp); - status_update - }) - .map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e))?; - - println!("Received whod packet from {src}:\n{result:#?}"); - - buf = [0u8; Whod::MAX_SIZE]; } + + Ok(()) } diff --git a/src/proto/rwhod_protocol.rs b/src/proto/rwhod_protocol.rs index 343043e..3828bcf 100644 --- a/src/proto/rwhod_protocol.rs +++ b/src/proto/rwhod_protocol.rs @@ -1,7 +1,7 @@ use std::array; use bytes::{Buf, BufMut, BytesMut}; -use chrono::Duration; +use chrono::{DateTime, Duration, Utc}; /// Classic C struct for utmp data for a single user session. /// @@ -120,7 +120,7 @@ impl Whod { } } - pub fn to_bytes(&self) -> [u8; Whod::MAX_SIZE] { + pub fn to_bytes(&self) -> Vec { let mut buf = BytesMut::with_capacity(Whod::MAX_SIZE); buf.put_u8(self.wd_vers); buf.put_u8(self.wd_type); @@ -133,18 +133,14 @@ impl Whod { buf.put_i32(self.wd_loadav[2]); buf.put_i32(self.wd_boottime); - for whoent in &self.wd_we { + for whoent in self.wd_we.iter().take_while(|entry| !entry.is_zeroed()) { buf.put_slice(&whoent.we_utmp.out_line); buf.put_slice(&whoent.we_utmp.out_name); buf.put_i32(whoent.we_utmp.out_time); buf.put_i32(whoent.we_idle); } - // SAFETY: this should never happen, Whod::MAX_SIZE is computed from the struct size - buf - .to_vec() - .try_into() - .expect("Buffer length mismatch, this should never happen") + buf.to_vec() } pub fn from_bytes(input: &[u8]) -> anyhow::Result { @@ -236,10 +232,10 @@ impl Whod { pub struct WhodStatusUpdate { // NOTE: there is only one defined packet type, so we just omit it here /// Timestamp by sender - pub sendtime: chrono::DateTime, + pub sendtime: DateTime, /// Timestamp applied by receiver - pub recvtime: Option>, + pub recvtime: Option>, /// Name of the host sending the status update (max 32 characters) pub hostname: String, @@ -252,7 +248,7 @@ pub struct WhodStatusUpdate { pub load_average_15_min: i32, /// Which time the system was booted - pub boot_time: chrono::DateTime, + pub boot_time: DateTime, /// List of users currently logged in to the host (max 42 entries) pub users: Vec, @@ -260,13 +256,13 @@ pub struct WhodStatusUpdate { impl WhodStatusUpdate { pub fn new( - sendtime: chrono::DateTime, - recvtime: Option>, + sendtime: DateTime, + recvtime: Option>, hostname: String, load_average_5_min: i32, load_average_10_min: i32, load_average_15_min: i32, - boot_time: chrono::DateTime, + boot_time: DateTime, users: Vec, ) -> Self { Self { @@ -295,7 +291,7 @@ pub struct WhodUserEntry { pub user_id: String, /// Time when the user logged in - pub login_time: chrono::DateTime, + pub login_time: DateTime, /// How long since the user last typed on the TTY pub idle_time: Duration, @@ -305,7 +301,7 @@ impl WhodUserEntry { pub fn new( tty: String, user_id: String, - login_time: chrono::DateTime, + login_time: DateTime, idle_time: Duration, ) -> Self { Self { @@ -339,11 +335,9 @@ impl TryFrom for WhodUserEntry { let user_id = String::from_utf8(value.we_utmp.out_name[..user_id_end].to_vec()) .map_err(|e| format!("Invalid UTF-8 in user ID: {}", e))?; - let login_time = chrono::DateTime::from_timestamp_secs(value.we_utmp.out_time as i64) - .ok_or(format!( - "Invalid login time timestamp: {}", - value.we_utmp.out_time - ))?; + let login_time = DateTime::from_timestamp_secs(value.we_utmp.out_time as i64).ok_or( + format!("Invalid login time timestamp: {}", value.we_utmp.out_time), + )?; Ok(WhodUserEntry { tty, @@ -365,15 +359,16 @@ impl TryFrom for WhodStatusUpdate { )); } - let sendtime = chrono::DateTime::from_timestamp_secs(value.wd_sendtime as i64).ok_or( - format!("Invalid send time timestamp: {}", value.wd_sendtime), - )?; + let sendtime = DateTime::from_timestamp_secs(value.wd_sendtime as i64).ok_or(format!( + "Invalid send time timestamp: {}", + value.wd_sendtime + ))?; let recvtime = if value.wd_recvtime == 0 { None } else { Some( - chrono::DateTime::from_timestamp_secs(value.wd_recvtime as i64).ok_or(format!( + DateTime::from_timestamp_secs(value.wd_recvtime as i64).ok_or(format!( "Invalid receive time timestamp: {}", value.wd_recvtime ))?, @@ -388,9 +383,10 @@ impl TryFrom for WhodStatusUpdate { let hostname = String::from_utf8(value.wd_hostname[..hostname_end].to_vec()) .map_err(|e| format!("Invalid UTF-8 in hostname: {}", e))?; - let boot_time = chrono::DateTime::from_timestamp_secs(value.wd_boottime as i64).ok_or( - format!("Invalid boot time timestamp: {}", value.wd_boottime), - )?; + let boot_time = DateTime::from_timestamp_secs(value.wd_boottime as i64).ok_or(format!( + "Invalid boot time timestamp: {}", + value.wd_boottime + ))?; let users = value .wd_we @@ -428,14 +424,12 @@ impl TryFrom for Whoent { let out_time = value .login_time .timestamp() - .max(i32::MAX as i64) - .min(i32::MIN as i64) as i32; + .clamp(i32::MIN as i64, i32::MAX as i64) as i32; let we_idle = value .idle_time .num_seconds() - .max(i32::MAX as i64) - .min(i32::MIN as i64) as i32; + .clamp(i32::MIN as i64, i32::MAX as i64) as i32; Ok(Whoent { we_utmp: Outmp { @@ -460,18 +454,16 @@ impl TryFrom for Whod { let wd_sendtime = value .sendtime .timestamp() - .max(i32::MAX as i64) - .min(i32::MIN as i64) as i32; + .clamp(i32::MIN as i64, i32::MAX as i64) as i32; let wd_recvtime = value.recvtime.map_or(0, |dt| { - dt.timestamp().max(i32::MAX as i64).min(i32::MIN as i64) as i32 + dt.timestamp().clamp(i32::MIN as i64, i32::MAX as i64) as i32 }); let wd_boottime = value .boot_time .timestamp() - .max(i32::MAX as i64) - .min(i32::MIN as i64) as i32; + .clamp(i32::MIN as i64, i32::MAX as i64) as i32; let wd_we = value .users diff --git a/src/server/rwhod.rs b/src/server/rwhod.rs index e1c32d7..2a5d361 100644 --- a/src/server/rwhod.rs +++ b/src/server/rwhod.rs @@ -1,35 +1,44 @@ -use std::{collections::HashSet, net::IpAddr, path::Path}; +use std::{ + collections::{HashMap, HashSet}, + net::IpAddr, + path::Path, + sync::Arc, +}; -use chrono::{Duration, Timelike}; +use anyhow::Context; +use chrono::{DateTime, Duration, Timelike, Utc}; use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat}; use uucore::utmpx::Utmpx; use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry}; +/// Default port for rwhod communication. +pub const RWHOD_BROADCAST_PORT: u16 = 513; + /// Reads utmp entries to determine currently logged-in users. -pub fn generate_rwhod_user_entries() -> anyhow::Result> { +pub fn generate_rwhod_user_entries(now: DateTime) -> anyhow::Result> { Utmpx::iter_all_records() .filter(|entry| entry.is_user_process()) .map(|entry| { let login_time = entry .login_time() .checked_to_utc() - .and_then(|t| { - chrono::DateTime::::from_timestamp_secs(t.unix_timestamp()) - }) + .and_then(|t| DateTime::::from_timestamp_secs(t.unix_timestamp())) .ok_or_else(|| anyhow::anyhow!("Failed to convert login time to UTC"))?; let idle_time = stat(&Path::new("/dev").join(entry.tty_device())) .ok() .and_then(|st| { - let last_active = - chrono::DateTime::::from_timestamp_secs(st.st_atime)?; - let now = chrono::Utc::now().with_nanosecond(0)?; - - Some(now - last_active) + let last_active = DateTime::::from_timestamp_secs(st.st_atime)?; + Some((now - last_active).max(Duration::zero())) }) .unwrap_or(Duration::zero()); + debug_assert!( + idle_time.num_seconds() >= 0, + "Idle time should never be negative" + ); + Ok(WhodUserEntry::new( entry.tty_device(), entry.user(), @@ -41,24 +50,23 @@ pub fn generate_rwhod_user_entries() -> anyhow::Result> { } /// Generate a rwhod status update packet representing the current system state. -pub fn generate_rwhod_status_update() -> anyhow::Result { +pub fn generate_rwhod_status_update() -> anyhow::Result { let sysinfo = nix::sys::sysinfo::sysinfo().unwrap(); let load_average = sysinfo.load_average(); let uptime = sysinfo.uptime(); let hostname = nix::unistd::gethostname()?.to_str().unwrap().to_string(); + let now = Utc::now().with_nanosecond(0).unwrap_or(Utc::now()); let result = WhodStatusUpdate::new( - chrono::Utc::now(), + now, None, hostname, (load_average.0 * 100.0).abs() as i32, (load_average.1 * 100.0).abs() as i32, (load_average.2 * 100.0).abs() as i32, - chrono::Utc::now() - uptime, - generate_rwhod_user_entries()?, - ) - .try_into() - .map_err(|e| anyhow::anyhow!("{}", e))?; + now - uptime, + generate_rwhod_user_entries(now)?, + ); Ok(result) } @@ -120,17 +128,24 @@ pub fn determine_relevant_interfaces() -> anyhow::Result> { } pub async fn send_rwhod_packet_to_interface( - socket: &mut tokio::net::UdpSocket, + socket: Arc, interface: &RwhodSendTarget, packet: &Whod, ) -> anyhow::Result<()> { let serialized_packet = packet.to_bytes(); + // TODO: the old rwhod daemon doesn't actually ever listen to ipv6, maybe remove it let target_addr = match interface.addr { - IpAddr::V4(addr) => std::net::SocketAddr::new(IpAddr::V4(addr), 0), - IpAddr::V6(addr) => std::net::SocketAddr::new(IpAddr::V6(addr), 0), + IpAddr::V4(addr) => std::net::SocketAddr::new(IpAddr::V4(addr), RWHOD_BROADCAST_PORT), + IpAddr::V6(addr) => std::net::SocketAddr::new(IpAddr::V6(addr), RWHOD_BROADCAST_PORT), }; + tracing::debug!( + "Sending rwhod packet to interface {} at address {}", + interface.name, + target_addr + ); + socket .send_to(&serialized_packet, &target_addr) .await @@ -139,8 +154,77 @@ pub async fn send_rwhod_packet_to_interface( Ok(()) } -// TODO: implement receiving rwhod packets from other hosts +pub async fn rwhod_packet_receiver_task( + socket: Arc, + whod_status_store: Arc>>, +) -> anyhow::Result<()> { + let mut buf = [0u8; Whod::MAX_SIZE]; -// TODO: implement storing and loading rwhod packets to/from file + loop { + let (len, src) = socket.recv_from(&mut buf).await?; + + tracing::debug!("Received rwhod packet of length {} bytes from {}", len, src); + + if len < Whod::HEADER_SIZE { + tracing::error!( + "Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)", + Whod::HEADER_SIZE + ); + continue; + } + + let result = Whod::from_bytes(&buf[..len]) + .context("Failed to parse whod packet")? + .try_into() + .map(|mut status_update: WhodStatusUpdate| { + let timestamp = Utc::now().with_nanosecond(0).unwrap_or(Utc::now()); + status_update.recvtime = Some(timestamp); + status_update + }) + .map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e)); + + match result { + Ok(status_update) => { + tracing::debug!("Processed whod packet from {src}: {:?}", status_update); + + let mut store = whod_status_store.write().await; + store.insert(src.ip(), status_update); + } + Err(err) => { + tracing::error!("Error processing whod packet from {src}: {err}"); + } + } + } +} + +pub async fn rwhod_packet_sender_task( + socket: Arc, + interfaces: Vec, +) -> anyhow::Result<()> { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); + + loop { + interval.tick().await; + + let status_update = generate_rwhod_status_update()?; + + tracing::debug!("Generated rwhod packet: {:?}", status_update); + + let packet = status_update + .try_into() + .map_err(|e| anyhow::anyhow!("{}", e))?; + + for interface in &interfaces { + if let Err(e) = send_rwhod_packet_to_interface(socket.clone(), interface, &packet).await + { + tracing::error!( + "Failed to send rwhod packet on interface {}: {}", + interface.name, + e + ); + } + } + } +} // TODO: implement protocol for cli - daemon communication