server/rwhod: add both sender and receiver task
Some checks failed
Build and test / build (push) Successful in 1m8s
Build and test / check (push) Failing after 1m8s
Build and test / test (push) Failing after 1m25s
Build and test / docs (push) Failing after 1m46s

This commit is contained in:
2026-01-05 00:31:20 +09:00
parent dabc54a943
commit 2396c999bf
4 changed files with 108 additions and 32 deletions

11
Cargo.lock generated
View File

@@ -637,6 +637,16 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b"
dependencies = [
"errno",
"libc",
]
[[package]]
name = "siphasher"
version = "1.0.1"
@@ -759,6 +769,7 @@ dependencies = [
"libc",
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.61.2",

View File

@@ -19,7 +19,7 @@ bytes = "1.11.0"
chrono = { version = "0.4.42", features = ["serde"] }
clap = { version = "4.5.53", features = ["derive"] }
nix = { version = "0.30.1", features = ["hostname", "net"] }
tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread"] }
tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread", "signal", "sync", "time"] }
# onc-rpc = "0.3.2"
# sd-notify = "0.4.5"
# serde = { version = "1.0.228", features = ["derive"] }

View File

@@ -1,41 +1,37 @@
use std::net::SocketAddrV4;
use std::{collections::HashMap, net::SocketAddrV4, sync::Arc};
use anyhow::Context;
use chrono::Timelike;
use roowho2_lib::proto::{Whod, WhodStatusUpdate};
use roowho2_lib::server::rwhod::{rwhod_packet_receiver_task, rwhod_packet_sender_task};
use tokio::sync::RwLock;
const RWHOD_BROADCAST_PORT: u16 = 513;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let addr = SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, RWHOD_BROADCAST_PORT);
let socket = tokio::net::UdpSocket::bind(addr).await?;
let socket = Arc::new(tokio::net::UdpSocket::bind(addr).await?);
socket.set_broadcast(true)?;
let mut buf = [0u8; Whod::MAX_SIZE];
loop {
let (len, src) = socket.recv_from(&mut buf).await?;
if len < Whod::HEADER_SIZE {
eprintln!(
"Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)",
Whod::HEADER_SIZE
);
continue;
let interfaces = roowho2_lib::server::rwhod::determine_relevant_interfaces()?;
let sender_task = rwhod_packet_sender_task(socket.clone(), interfaces);
let status_store = Arc::new(RwLock::new(HashMap::new()));
let receiver_task = rwhod_packet_receiver_task(socket.clone(), status_store);
tokio::select! {
res = sender_task => {
if let Err(err) = res {
eprintln!("RWHOD sender task error: {}", err);
}
}
res = receiver_task => {
if let Err(err) = res {
eprintln!("RWHOD receiver task error: {}", err);
}
}
_ = tokio::signal::ctrl_c() => {
println!("Received Ctrl-C, shutting down.");
}
let result: WhodStatusUpdate = Whod::from_bytes(&buf[..len])
.context("Failed to parse whod packet")?
.try_into()
.map(|mut status_update: WhodStatusUpdate| {
let timestamp = chrono::Utc::now()
.with_nanosecond(0)
.unwrap_or(chrono::Utc::now());
status_update.recvtime = Some(timestamp);
status_update
})
.map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e))?;
println!("Received whod packet from {src}:\n{result:#?}");
buf = [0u8; Whod::MAX_SIZE];
}
Ok(())
}

View File

@@ -1,5 +1,11 @@
use std::{collections::HashSet, net::IpAddr, path::Path};
use std::{
collections::{HashMap, HashSet},
net::IpAddr,
path::Path,
sync::Arc,
};
use anyhow::Context;
use chrono::{Duration, Timelike};
use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat};
use uucore::utmpx::Utmpx;
@@ -120,7 +126,7 @@ pub fn determine_relevant_interfaces() -> anyhow::Result<Vec<RwhodSendTarget>> {
}
pub async fn send_rwhod_packet_to_interface(
socket: &mut tokio::net::UdpSocket,
socket: Arc<tokio::net::UdpSocket>,
interface: &RwhodSendTarget,
packet: &Whod,
) -> anyhow::Result<()> {
@@ -139,6 +145,69 @@ pub async fn send_rwhod_packet_to_interface(
Ok(())
}
pub async fn rwhod_packet_receiver_task(
socket: Arc<tokio::net::UdpSocket>,
whod_status_store: Arc<tokio::sync::RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
) -> anyhow::Result<()> {
let mut buf = [0u8; Whod::MAX_SIZE];
loop {
let (len, src) = socket.recv_from(&mut buf).await?;
if len < Whod::HEADER_SIZE {
eprintln!(
"Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)",
Whod::HEADER_SIZE
);
continue;
}
let result = Whod::from_bytes(&buf[..len])
.context("Failed to parse whod packet")?
.try_into()
.map(|mut status_update: WhodStatusUpdate| {
let timestamp = chrono::Utc::now()
.with_nanosecond(0)
.unwrap_or(chrono::Utc::now());
status_update.recvtime = Some(timestamp);
status_update
})
.map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e));
match result {
Ok(status_update) => {
let mut store = whod_status_store.write().await;
store.insert(src.ip(), status_update);
}
Err(e) => {
eprintln!("Error processing whod packet from {src}: {}", e);
}
}
}
}
pub async fn rwhod_packet_sender_task(
socket: Arc<tokio::net::UdpSocket>,
interfaces: Vec<RwhodSendTarget>,
) -> anyhow::Result<()> {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
loop {
interval.tick().await;
let packet = generate_rwhod_status_update()?;
for interface in &interfaces {
if let Err(e) = send_rwhod_packet_to_interface(socket.clone(), interface, &packet).await
{
eprintln!(
"Failed to send rwhod packet on interface {}: {}",
interface.name, e
);
}
}
}
}
// TODO: implement receiving rwhod packets from other hosts
// TODO: implement storing and loading rwhod packets to/from file