From 2396c999bfa6335dd513fb21c7ab882e1833cddb Mon Sep 17 00:00:00 2001 From: h7x4 Date: Mon, 5 Jan 2026 00:31:20 +0900 Subject: [PATCH] server/rwhod: add both sender and receiver task --- Cargo.lock | 11 +++++++ Cargo.toml | 2 +- src/bin/roowhod.rs | 54 ++++++++++++++++----------------- src/server/rwhod.rs | 73 +++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 108 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ba2fc1..1ea36e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -637,6 +637,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "siphasher" version = "1.0.1" @@ -759,6 +769,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", diff --git a/Cargo.toml b/Cargo.toml index adec9f0..03b9fbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ bytes = "1.11.0" chrono = { version = "0.4.42", features = ["serde"] } clap = { version = "4.5.53", features = ["derive"] } nix = { version = "0.30.1", features = ["hostname", "net"] } -tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread"] } +tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread", "signal", "sync", "time"] } # onc-rpc = "0.3.2" # sd-notify = "0.4.5" # serde = { version = "1.0.228", features = ["derive"] } diff --git a/src/bin/roowhod.rs b/src/bin/roowhod.rs index bac400c..0fa9368 100644 --- a/src/bin/roowhod.rs +++ b/src/bin/roowhod.rs @@ -1,41 +1,37 @@ -use std::net::SocketAddrV4; +use std::{collections::HashMap, net::SocketAddrV4, sync::Arc}; -use anyhow::Context; -use chrono::Timelike; -use roowho2_lib::proto::{Whod, WhodStatusUpdate}; +use roowho2_lib::server::rwhod::{rwhod_packet_receiver_task, rwhod_packet_sender_task}; +use tokio::sync::RwLock; const RWHOD_BROADCAST_PORT: u16 = 513; #[tokio::main] async fn main() -> anyhow::Result<()> { let addr = SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, RWHOD_BROADCAST_PORT); - let socket = tokio::net::UdpSocket::bind(addr).await?; + let socket = Arc::new(tokio::net::UdpSocket::bind(addr).await?); socket.set_broadcast(true)?; - let mut buf = [0u8; Whod::MAX_SIZE]; - loop { - let (len, src) = socket.recv_from(&mut buf).await?; - if len < Whod::HEADER_SIZE { - eprintln!( - "Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)", - Whod::HEADER_SIZE - ); - continue; + let interfaces = roowho2_lib::server::rwhod::determine_relevant_interfaces()?; + let sender_task = rwhod_packet_sender_task(socket.clone(), interfaces); + + let status_store = Arc::new(RwLock::new(HashMap::new())); + let receiver_task = rwhod_packet_receiver_task(socket.clone(), status_store); + + tokio::select! { + res = sender_task => { + if let Err(err) = res { + eprintln!("RWHOD sender task error: {}", err); + } + } + res = receiver_task => { + if let Err(err) = res { + eprintln!("RWHOD receiver task error: {}", err); + } + } + _ = tokio::signal::ctrl_c() => { + println!("Received Ctrl-C, shutting down."); } - let result: WhodStatusUpdate = Whod::from_bytes(&buf[..len]) - .context("Failed to parse whod packet")? - .try_into() - .map(|mut status_update: WhodStatusUpdate| { - let timestamp = chrono::Utc::now() - .with_nanosecond(0) - .unwrap_or(chrono::Utc::now()); - status_update.recvtime = Some(timestamp); - status_update - }) - .map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e))?; - - println!("Received whod packet from {src}:\n{result:#?}"); - - buf = [0u8; Whod::MAX_SIZE]; } + + Ok(()) } diff --git a/src/server/rwhod.rs b/src/server/rwhod.rs index e1c32d7..87f65fa 100644 --- a/src/server/rwhod.rs +++ b/src/server/rwhod.rs @@ -1,5 +1,11 @@ -use std::{collections::HashSet, net::IpAddr, path::Path}; +use std::{ + collections::{HashMap, HashSet}, + net::IpAddr, + path::Path, + sync::Arc, +}; +use anyhow::Context; use chrono::{Duration, Timelike}; use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat}; use uucore::utmpx::Utmpx; @@ -120,7 +126,7 @@ pub fn determine_relevant_interfaces() -> anyhow::Result> { } pub async fn send_rwhod_packet_to_interface( - socket: &mut tokio::net::UdpSocket, + socket: Arc, interface: &RwhodSendTarget, packet: &Whod, ) -> anyhow::Result<()> { @@ -139,6 +145,69 @@ pub async fn send_rwhod_packet_to_interface( Ok(()) } +pub async fn rwhod_packet_receiver_task( + socket: Arc, + whod_status_store: Arc>>, +) -> anyhow::Result<()> { + let mut buf = [0u8; Whod::MAX_SIZE]; + + loop { + let (len, src) = socket.recv_from(&mut buf).await?; + if len < Whod::HEADER_SIZE { + eprintln!( + "Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)", + Whod::HEADER_SIZE + ); + continue; + } + + let result = Whod::from_bytes(&buf[..len]) + .context("Failed to parse whod packet")? + .try_into() + .map(|mut status_update: WhodStatusUpdate| { + let timestamp = chrono::Utc::now() + .with_nanosecond(0) + .unwrap_or(chrono::Utc::now()); + status_update.recvtime = Some(timestamp); + status_update + }) + .map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e)); + + match result { + Ok(status_update) => { + let mut store = whod_status_store.write().await; + store.insert(src.ip(), status_update); + } + Err(e) => { + eprintln!("Error processing whod packet from {src}: {}", e); + } + } + } +} + +pub async fn rwhod_packet_sender_task( + socket: Arc, + interfaces: Vec, +) -> anyhow::Result<()> { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); + + loop { + interval.tick().await; + + let packet = generate_rwhod_status_update()?; + + for interface in &interfaces { + if let Err(e) = send_rwhod_packet_to_interface(socket.clone(), interface, &packet).await + { + eprintln!( + "Failed to send rwhod packet on interface {}: {}", + interface.name, e + ); + } + } + } +} + // TODO: implement receiving rwhod packets from other hosts // TODO: implement storing and loading rwhod packets to/from file