server/rwhod: add both sender and receiver task
This commit is contained in:
148
Cargo.lock
generated
148
Cargo.lock
generated
@@ -2,6 +2,15 @@
|
||||
# It is not intended for manual editing.
|
||||
version = 4
|
||||
|
||||
[[package]]
|
||||
name = "aho-corasick"
|
||||
version = "1.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "android_system_properties"
|
||||
version = "0.1.5"
|
||||
@@ -391,6 +400,12 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazy_static"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.179"
|
||||
@@ -409,6 +424,15 @@ version = "0.4.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
|
||||
dependencies = [
|
||||
"regex-automata",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.7.6"
|
||||
@@ -448,6 +472,15 @@ dependencies = [
|
||||
"memoffset",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.50.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
|
||||
dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-conv"
|
||||
version = "0.1.0"
|
||||
@@ -557,6 +590,23 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.4.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.8.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
|
||||
|
||||
[[package]]
|
||||
name = "roowho2"
|
||||
version = "0.1.0"
|
||||
@@ -567,6 +617,8 @@ dependencies = [
|
||||
"clap",
|
||||
"nix",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uucore",
|
||||
]
|
||||
|
||||
@@ -631,12 +683,31 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sharded-slab"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "shlex"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b"
|
||||
dependencies = [
|
||||
"errno",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "siphasher"
|
||||
version = "1.0.1"
|
||||
@@ -706,6 +777,15 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thread_local"
|
||||
version = "1.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.3.44"
|
||||
@@ -759,6 +839,7 @@ dependencies = [
|
||||
"libc",
|
||||
"mio",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.61.2",
|
||||
@@ -775,6 +856,67 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.44"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
|
||||
dependencies = [
|
||||
"pin-project-lite",
|
||||
"tracing-attributes",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-attributes"
|
||||
version = "0.1.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.36"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"valuable",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
|
||||
dependencies = [
|
||||
"log",
|
||||
"once_cell",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-subscriber"
|
||||
version = "0.3.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex-automata",
|
||||
"sharded-slab",
|
||||
"smallvec",
|
||||
"thread_local",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "type-map"
|
||||
version = "0.5.1"
|
||||
@@ -853,6 +995,12 @@ dependencies = [
|
||||
"quote",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "valuable"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.11.1+wasi-snapshot-preview1"
|
||||
|
||||
@@ -19,7 +19,9 @@ bytes = "1.11.0"
|
||||
chrono = { version = "0.4.42", features = ["serde"] }
|
||||
clap = { version = "4.5.53", features = ["derive"] }
|
||||
nix = { version = "0.30.1", features = ["hostname", "net"] }
|
||||
tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread"] }
|
||||
tokio = { version = "1.49.0", features = ["macros", "net", "rt-multi-thread", "signal", "sync", "time"] }
|
||||
tracing = "0.1.44"
|
||||
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
|
||||
# onc-rpc = "0.3.2"
|
||||
# sd-notify = "0.4.5"
|
||||
# serde = { version = "1.0.228", features = ["derive"] }
|
||||
|
||||
@@ -1,41 +1,40 @@
|
||||
use std::net::SocketAddrV4;
|
||||
use std::{collections::HashMap, net::SocketAddrV4, sync::Arc};
|
||||
|
||||
use anyhow::Context;
|
||||
use chrono::Timelike;
|
||||
use roowho2_lib::proto::{Whod, WhodStatusUpdate};
|
||||
use roowho2_lib::server::rwhod::{rwhod_packet_receiver_task, rwhod_packet_sender_task};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
const RWHOD_BROADCAST_PORT: u16 = 513;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let addr = SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, RWHOD_BROADCAST_PORT);
|
||||
let socket = tokio::net::UdpSocket::bind(addr).await?;
|
||||
tracing::info!("Binding RWHOD socket to {}", addr);
|
||||
let socket = Arc::new(tokio::net::UdpSocket::bind(addr).await?);
|
||||
socket.set_broadcast(true)?;
|
||||
|
||||
let mut buf = [0u8; Whod::MAX_SIZE];
|
||||
loop {
|
||||
let (len, src) = socket.recv_from(&mut buf).await?;
|
||||
if len < Whod::HEADER_SIZE {
|
||||
eprintln!(
|
||||
"Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)",
|
||||
Whod::HEADER_SIZE
|
||||
);
|
||||
continue;
|
||||
let interfaces = roowho2_lib::server::rwhod::determine_relevant_interfaces()?;
|
||||
let sender_task = rwhod_packet_sender_task(socket.clone(), interfaces);
|
||||
|
||||
let status_store = Arc::new(RwLock::new(HashMap::new()));
|
||||
let receiver_task = rwhod_packet_receiver_task(socket.clone(), status_store);
|
||||
|
||||
tokio::select! {
|
||||
res = sender_task => {
|
||||
if let Err(err) = res {
|
||||
eprintln!("RWHOD sender task error: {}", err);
|
||||
}
|
||||
}
|
||||
res = receiver_task => {
|
||||
if let Err(err) = res {
|
||||
eprintln!("RWHOD receiver task error: {}", err);
|
||||
}
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
println!("Received Ctrl-C, shutting down.");
|
||||
}
|
||||
let result: WhodStatusUpdate = Whod::from_bytes(&buf[..len])
|
||||
.context("Failed to parse whod packet")?
|
||||
.try_into()
|
||||
.map(|mut status_update: WhodStatusUpdate| {
|
||||
let timestamp = chrono::Utc::now()
|
||||
.with_nanosecond(0)
|
||||
.unwrap_or(chrono::Utc::now());
|
||||
status_update.recvtime = Some(timestamp);
|
||||
status_update
|
||||
})
|
||||
.map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e))?;
|
||||
|
||||
println!("Received whod packet from {src}:\n{result:#?}");
|
||||
|
||||
buf = [0u8; Whod::MAX_SIZE];
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
use std::{collections::HashSet, net::IpAddr, path::Path};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
net::IpAddr,
|
||||
path::Path,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use chrono::{Duration, Timelike};
|
||||
use nix::{ifaddrs::getifaddrs, net::if_::InterfaceFlags, sys::stat::stat};
|
||||
use uucore::utmpx::Utmpx;
|
||||
@@ -120,7 +126,7 @@ pub fn determine_relevant_interfaces() -> anyhow::Result<Vec<RwhodSendTarget>> {
|
||||
}
|
||||
|
||||
pub async fn send_rwhod_packet_to_interface(
|
||||
socket: &mut tokio::net::UdpSocket,
|
||||
socket: Arc<tokio::net::UdpSocket>,
|
||||
interface: &RwhodSendTarget,
|
||||
packet: &Whod,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -131,6 +137,12 @@ pub async fn send_rwhod_packet_to_interface(
|
||||
IpAddr::V6(addr) => std::net::SocketAddr::new(IpAddr::V6(addr), 0),
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
"Sending rwhod packet to interface {} at address {}",
|
||||
interface.name,
|
||||
target_addr
|
||||
);
|
||||
|
||||
socket
|
||||
.send_to(&serialized_packet, &target_addr)
|
||||
.await
|
||||
@@ -139,8 +151,75 @@ pub async fn send_rwhod_packet_to_interface(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: implement receiving rwhod packets from other hosts
|
||||
pub async fn rwhod_packet_receiver_task(
|
||||
socket: Arc<tokio::net::UdpSocket>,
|
||||
whod_status_store: Arc<tokio::sync::RwLock<HashMap<IpAddr, WhodStatusUpdate>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut buf = [0u8; Whod::MAX_SIZE];
|
||||
|
||||
// TODO: implement storing and loading rwhod packets to/from file
|
||||
loop {
|
||||
let (len, src) = socket.recv_from(&mut buf).await?;
|
||||
|
||||
tracing::debug!("Received rwhod packet of length {} bytes from {}", len, src);
|
||||
|
||||
if len < Whod::HEADER_SIZE {
|
||||
tracing::error!(
|
||||
"Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)",
|
||||
Whod::HEADER_SIZE
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let result = Whod::from_bytes(&buf[..len])
|
||||
.context("Failed to parse whod packet")?
|
||||
.try_into()
|
||||
.map(|mut status_update: WhodStatusUpdate| {
|
||||
let timestamp = chrono::Utc::now()
|
||||
.with_nanosecond(0)
|
||||
.unwrap_or(chrono::Utc::now());
|
||||
status_update.recvtime = Some(timestamp);
|
||||
status_update
|
||||
})
|
||||
.map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e));
|
||||
|
||||
match result {
|
||||
Ok(status_update) => {
|
||||
tracing::debug!("Processed whod packet from {src}: {:?}", status_update);
|
||||
|
||||
let mut store = whod_status_store.write().await;
|
||||
store.insert(src.ip(), status_update);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Error processing whod packet from {src}: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn rwhod_packet_sender_task(
|
||||
socket: Arc<tokio::net::UdpSocket>,
|
||||
interfaces: Vec<RwhodSendTarget>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let packet = generate_rwhod_status_update()?;
|
||||
|
||||
tracing::debug!("Generated rwhod packet: {:?}", packet);
|
||||
|
||||
for interface in &interfaces {
|
||||
if let Err(e) = send_rwhod_packet_to_interface(socket.clone(), interface, &packet).await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to send rwhod packet on interface {}: {}",
|
||||
interface.name,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: implement protocol for cli - daemon communication
|
||||
|
||||
Reference in New Issue
Block a user