From b1f8cf9ba26a8c69a8fa0cc564545e340667ed3c Mon Sep 17 00:00:00 2001 From: h7x4 Date: Sun, 20 Oct 2024 22:05:56 +0200 Subject: [PATCH] systemd integration - Add watchdog timeout support - Add native journald logging support - Add application state notifications - Add verbosity flag --- Cargo.lock | 29 +++++++++++ Cargo.toml | 3 ++ module.nix | 26 ++++++++-- src/main.rs | 140 ++++++++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 174 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6aba7c6..b629b78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -284,6 +284,16 @@ dependencies = [ "clap_derive", ] +[[package]] +name = "clap-verbosity-flag" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e099138e1807662ff75e2cebe4ae2287add879245574489f9b1588eb5e5564ed" +dependencies = [ + "clap", + "log", +] + [[package]] name = "clap_builder" version = "4.5.20" @@ -496,11 +506,14 @@ dependencies = [ "anyhow", "axum", "clap", + "clap-verbosity-flag", "env_logger", "log", "mpvipc-async", + "sd-notify", "serde", "serde_json", + "systemd-journal-logger", "tempfile", "tokio", "tower", @@ -983,6 +996,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sd-notify" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1be20c5f7f393ee700f8b2f28ea35812e4e212f40774b550cd2a93ea91684451" + [[package]] name = "serde" version = "1.0.210" @@ -1100,6 +1119,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "systemd-journal-logger" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c18918ae65f3d828ec9ab7f4714b8c564149045f47407e319dd25cadfaf9d0cf" +dependencies = [ + "log", + "rustix", +] + [[package]] name = "tempfile" version = "3.13.0" diff --git a/Cargo.toml b/Cargo.toml index e40c614..d0f71a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,14 @@ readme = "README.md" anyhow = "1.0.82" axum = { version = "0.6.20", features = ["macros"] } clap = { version = "4.4.1", features = ["derive"] } +clap-verbosity-flag = "2.2.2" env_logger = "0.10.0" log = "0.4.20" mpvipc-async = { git = "https://git.pvv.ntnu.no/oysteikt/mpvipc-async.git", rev = "v0.1.0" } +sd-notify = "0.4.3" serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.105" +systemd-journal-logger = "2.2.0" tempfile = "3.11.0" tokio = { version = "1.32.0", features = ["full"] } tower = { version = "0.4.13", features = ["full"] } diff --git a/module.nix b/module.nix index 50b7bc9..2e8df5d 100644 --- a/module.nix +++ b/module.nix @@ -14,7 +14,19 @@ in enablePipewire = lib.mkEnableOption "pipewire" // { default = true; }; - enableDebug = lib.mkEnableOption "debug logs"; + logLevel = lib.mkOption { + type = lib.types.enum [ "quiet" "error" "warn" "info" "debug" "trace" ]; + default = "debug"; + description = "Log level."; + apply = level: { + "quiet" = "-q"; + "error" = ""; + "warn" = "-v"; + "info" = "-vv"; + "debug" = "-vvv"; + "trace" = "-vvvv"; + }.${level}; + }; # TODO: create some better descriptions settings = { @@ -87,12 +99,18 @@ in description = "greg-ng, an mpv based media player"; wantedBy = [ "graphical-session.target" ]; partOf = [ "graphical-session.target" ]; - environment.RUST_LOG = lib.mkIf cfg.enableDebug "greg_ng=trace,mpvipc=trace"; serviceConfig = { - Type = "simple"; - ExecStart = "${lib.getExe cfg.package} ${lib.cli.toGNUCommandLineShell { } cfg.settings}"; + Type = "notify"; + ExecStart = let + args = lib.cli.toGNUCommandLineShell { } (cfg.settings // { + systemd = true; + }); + in "${lib.getExe cfg.package} ${cfg.logLevel} ${args}"; + Restart = "always"; RestartSec = 3; + WatchdogSec = lib.mkDefault 15; + TimeoutStartSec = lib.mkDefault 30; RestrictAddressFamilies = [ "AF_UNIX" "AF_INET" "AF_INET6" ]; AmbientCapabilities = [ "" ]; diff --git a/src/main.rs b/src/main.rs index 9def7d5..26468df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,11 @@ use anyhow::Context; use axum::{Router, Server}; use clap::Parser; +use clap_verbosity_flag::Verbosity; use mpv_setup::{connect_to_mpv, create_mpv_config_file, show_grzegorz_image}; +use mpvipc_async::Mpv; use std::net::{IpAddr, SocketAddr}; +use systemd_journal_logger::JournalLog; use tempfile::NamedTempFile; mod api; @@ -16,6 +19,12 @@ struct Args { #[clap(short, long, default_value = "8008")] port: u16, + #[command(flatten)] + verbose: Verbosity, + + #[clap(long)] + systemd: bool, + #[clap(long, value_name = "PATH", default_value = "/run/mpv/mpv.sock")] mpv_socket_path: String, @@ -40,6 +49,8 @@ struct MpvConnectionArgs<'a> { force_auto_start: bool, } +/// Helper function to resolve a hostname to an IP address. +/// Why is this not in the standard library? >:( async fn resolve(host: &str) -> anyhow::Result { let addr = format!("{}:0", host); let addresses = tokio::net::lookup_host(addr).await?; @@ -50,11 +61,71 @@ async fn resolve(host: &str) -> anyhow::Result { .ok_or_else(|| anyhow::anyhow!("Failed to resolve address")) } +/// Helper function that spawns a tokio thread that +/// continuously sends a ping to systemd watchdog, if enabled. +async fn setup_systemd_watchdog_thread() -> anyhow::Result<()> { + let mut watchdog_microsecs: u64 = 0; + if sd_notify::watchdog_enabled(true, &mut watchdog_microsecs) { + watchdog_microsecs = watchdog_microsecs.div_ceil(2); + tokio::spawn(async move { + log::debug!( + "Starting systemd watchdog thread with {} millisecond interval", + watchdog_microsecs.div_ceil(1000) + ); + loop { + tokio::time::sleep(tokio::time::Duration::from_micros(watchdog_microsecs)).await; + if let Err(err) = sd_notify::notify(false, &[sd_notify::NotifyState::Watchdog]) { + log::warn!("Failed to notify systemd watchdog: {}", err); + } else { + log::trace!("Ping sent to systemd watchdog"); + } + } + }); + } else { + log::info!("Watchdog not enabled, skipping"); + } + Ok(()) +} + +async fn shutdown(mpv: Mpv, proc: Option) { + log::info!("Shutting down"); + sd_notify::notify(false, &[sd_notify::NotifyState::Stopping]) + .unwrap_or_else(|e| log::warn!("Failed to notify systemd that the service is stopping: {}", e)); + + mpv.disconnect() + .await + .unwrap_or_else(|e| log::warn!("Failed to disconnect from mpv: {}", e)); + if let Some(mut proc) = proc { + proc.kill() + .await + .unwrap_or_else(|e| log::warn!("Failed to kill mpv process: {}", e)); + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { - env_logger::init(); let args = Args::parse(); + let systemd_mode = args.systemd && sd_notify::booted().unwrap_or(false); + if systemd_mode { + JournalLog::new() + .context("Failed to initialize journald logging")? + .install() + .context("Failed to install journald logger")?; + + log::set_max_level(args.verbose.log_level_filter()); + + log::debug!("Running with systemd integration"); + + setup_systemd_watchdog_thread().await?; + } else { + env_logger::Builder::new() + .filter_level(args.verbose.log_level_filter()) + .init(); + + log::info!("Running without systemd integration"); + } + let mpv_config_file = create_mpv_config_file(args.mpv_config_file)?; let (mpv, proc) = connect_to_mpv(&MpvConnectionArgs { @@ -64,37 +135,65 @@ async fn main() -> anyhow::Result<()> { auto_start: args.auto_start_mpv, force_auto_start: args.force_auto_start, }) - .await?; + .await + .context("Failed to connect to mpv")?; if let Err(e) = show_grzegorz_image(mpv.clone()).await { - log::warn!("Could not show Grzegorz image: {}", e); + log::warn!("Could not show Grzegorz image: {}", e); } - let addr = SocketAddr::new(resolve(&args.host).await?, args.port); - log::info!("Starting API on {}", addr); + let addr = match resolve(&args.host) + .await + .context(format!("Failed to resolve address: {}", &args.host)) + { + Ok(addr) => addr, + Err(e) => { + log::error!("{}", e); + shutdown(mpv, proc).await; + return Err(e); + } + }; + let socket_addr = SocketAddr::new(addr, args.port); + log::info!("Starting API on {}", socket_addr); let app = Router::new().nest("/api", api::rest_api_routes(mpv.clone())); + let server = match Server::try_bind(&socket_addr.clone()) + .context(format!("Failed to bind API server to '{}'", &socket_addr)) + { + Ok(server) => server, + Err(e) => { + log::error!("{}", e); + shutdown(mpv, proc).await; + return Err(e); + } + }; + + if systemd_mode { + match sd_notify::notify(false, &[sd_notify::NotifyState::Ready]) + .context("Failed to notify systemd that the service is ready") + { + Ok(_) => log::trace!("Notified systemd that the service is ready"), + Err(e) => { + log::error!("{}", e); + shutdown(mpv, proc).await; + return Err(e); + } + } + } if let Some(mut proc) = proc { tokio::select! { exit_status = proc.wait() => { log::warn!("mpv process exited with status: {}", exit_status?); - mpv.disconnect().await?; + shutdown(mpv, Some(proc)).await; } _ = tokio::signal::ctrl_c() => { log::info!("Received Ctrl-C, exiting"); - mpv.disconnect().await?; - proc.kill().await?; + shutdown(mpv, Some(proc)).await; } - result = async { - match Server::try_bind(&addr.clone()).context("Failed to bind server") { - Ok(server) => server.serve(app.into_make_service()).await.context("Failed to serve app"), - Err(err) => Err(err), - } - } => { + result = server.serve(app.into_make_service()) => { log::info!("API server exited"); - mpv.disconnect().await?; - proc.kill().await?; + shutdown(mpv, Some(proc)).await; result?; } } @@ -102,11 +201,12 @@ async fn main() -> anyhow::Result<()> { tokio::select! { _ = tokio::signal::ctrl_c() => { log::info!("Received Ctrl-C, exiting"); - mpv.disconnect().await?; + shutdown(mpv.clone(), None).await; } - _ = Server::bind(&addr.clone()).serve(app.into_make_service()) => { - log::info!("API server exited"); - mpv.disconnect().await?; + result = server.serve(app.into_make_service()) => { + log::info!("API server exited"); + shutdown(mpv.clone(), None).await; + result?; } } }