systemd integration
- Add watchdog timeout support - Add native journald logging support - Add application state notifications - Add verbosity flag
This commit is contained in:
parent
64bcef4307
commit
b1f8cf9ba2
29
Cargo.lock
generated
29
Cargo.lock
generated
@ -284,6 +284,16 @@ dependencies = [
|
||||
"clap_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap-verbosity-flag"
|
||||
version = "2.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e099138e1807662ff75e2cebe4ae2287add879245574489f9b1588eb5e5564ed"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.5.20"
|
||||
@ -496,11 +506,14 @@ dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"clap",
|
||||
"clap-verbosity-flag",
|
||||
"env_logger",
|
||||
"log",
|
||||
"mpvipc-async",
|
||||
"sd-notify",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"systemd-journal-logger",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tower",
|
||||
@ -983,6 +996,12 @@ version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "sd-notify"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1be20c5f7f393ee700f8b2f28ea35812e4e212f40774b550cd2a93ea91684451"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.210"
|
||||
@ -1100,6 +1119,16 @@ version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
|
||||
|
||||
[[package]]
|
||||
name = "systemd-journal-logger"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c18918ae65f3d828ec9ab7f4714b8c564149045f47407e319dd25cadfaf9d0cf"
|
||||
dependencies = [
|
||||
"log",
|
||||
"rustix",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tempfile"
|
||||
version = "3.13.0"
|
||||
|
@ -12,11 +12,14 @@ readme = "README.md"
|
||||
anyhow = "1.0.82"
|
||||
axum = { version = "0.6.20", features = ["macros"] }
|
||||
clap = { version = "4.4.1", features = ["derive"] }
|
||||
clap-verbosity-flag = "2.2.2"
|
||||
env_logger = "0.10.0"
|
||||
log = "0.4.20"
|
||||
mpvipc-async = { git = "https://git.pvv.ntnu.no/oysteikt/mpvipc-async.git", rev = "v0.1.0" }
|
||||
sd-notify = "0.4.3"
|
||||
serde = { version = "1.0.188", features = ["derive"] }
|
||||
serde_json = "1.0.105"
|
||||
systemd-journal-logger = "2.2.0"
|
||||
tempfile = "3.11.0"
|
||||
tokio = { version = "1.32.0", features = ["full"] }
|
||||
tower = { version = "0.4.13", features = ["full"] }
|
||||
|
26
module.nix
26
module.nix
@ -14,7 +14,19 @@ in
|
||||
|
||||
enablePipewire = lib.mkEnableOption "pipewire" // { default = true; };
|
||||
|
||||
enableDebug = lib.mkEnableOption "debug logs";
|
||||
logLevel = lib.mkOption {
|
||||
type = lib.types.enum [ "quiet" "error" "warn" "info" "debug" "trace" ];
|
||||
default = "debug";
|
||||
description = "Log level.";
|
||||
apply = level: {
|
||||
"quiet" = "-q";
|
||||
"error" = "";
|
||||
"warn" = "-v";
|
||||
"info" = "-vv";
|
||||
"debug" = "-vvv";
|
||||
"trace" = "-vvvv";
|
||||
}.${level};
|
||||
};
|
||||
|
||||
# TODO: create some better descriptions
|
||||
settings = {
|
||||
@ -87,12 +99,18 @@ in
|
||||
description = "greg-ng, an mpv based media player";
|
||||
wantedBy = [ "graphical-session.target" ];
|
||||
partOf = [ "graphical-session.target" ];
|
||||
environment.RUST_LOG = lib.mkIf cfg.enableDebug "greg_ng=trace,mpvipc=trace";
|
||||
serviceConfig = {
|
||||
Type = "simple";
|
||||
ExecStart = "${lib.getExe cfg.package} ${lib.cli.toGNUCommandLineShell { } cfg.settings}";
|
||||
Type = "notify";
|
||||
ExecStart = let
|
||||
args = lib.cli.toGNUCommandLineShell { } (cfg.settings // {
|
||||
systemd = true;
|
||||
});
|
||||
in "${lib.getExe cfg.package} ${cfg.logLevel} ${args}";
|
||||
|
||||
Restart = "always";
|
||||
RestartSec = 3;
|
||||
WatchdogSec = lib.mkDefault 15;
|
||||
TimeoutStartSec = lib.mkDefault 30;
|
||||
|
||||
RestrictAddressFamilies = [ "AF_UNIX" "AF_INET" "AF_INET6" ];
|
||||
AmbientCapabilities = [ "" ];
|
||||
|
140
src/main.rs
140
src/main.rs
@ -1,8 +1,11 @@
|
||||
use anyhow::Context;
|
||||
use axum::{Router, Server};
|
||||
use clap::Parser;
|
||||
use clap_verbosity_flag::Verbosity;
|
||||
use mpv_setup::{connect_to_mpv, create_mpv_config_file, show_grzegorz_image};
|
||||
use mpvipc_async::Mpv;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use systemd_journal_logger::JournalLog;
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
mod api;
|
||||
@ -16,6 +19,12 @@ struct Args {
|
||||
#[clap(short, long, default_value = "8008")]
|
||||
port: u16,
|
||||
|
||||
#[command(flatten)]
|
||||
verbose: Verbosity,
|
||||
|
||||
#[clap(long)]
|
||||
systemd: bool,
|
||||
|
||||
#[clap(long, value_name = "PATH", default_value = "/run/mpv/mpv.sock")]
|
||||
mpv_socket_path: String,
|
||||
|
||||
@ -40,6 +49,8 @@ struct MpvConnectionArgs<'a> {
|
||||
force_auto_start: bool,
|
||||
}
|
||||
|
||||
/// Helper function to resolve a hostname to an IP address.
|
||||
/// Why is this not in the standard library? >:(
|
||||
async fn resolve(host: &str) -> anyhow::Result<IpAddr> {
|
||||
let addr = format!("{}:0", host);
|
||||
let addresses = tokio::net::lookup_host(addr).await?;
|
||||
@ -50,11 +61,71 @@ async fn resolve(host: &str) -> anyhow::Result<IpAddr> {
|
||||
.ok_or_else(|| anyhow::anyhow!("Failed to resolve address"))
|
||||
}
|
||||
|
||||
/// Helper function that spawns a tokio thread that
|
||||
/// continuously sends a ping to systemd watchdog, if enabled.
|
||||
async fn setup_systemd_watchdog_thread() -> anyhow::Result<()> {
|
||||
let mut watchdog_microsecs: u64 = 0;
|
||||
if sd_notify::watchdog_enabled(true, &mut watchdog_microsecs) {
|
||||
watchdog_microsecs = watchdog_microsecs.div_ceil(2);
|
||||
tokio::spawn(async move {
|
||||
log::debug!(
|
||||
"Starting systemd watchdog thread with {} millisecond interval",
|
||||
watchdog_microsecs.div_ceil(1000)
|
||||
);
|
||||
loop {
|
||||
tokio::time::sleep(tokio::time::Duration::from_micros(watchdog_microsecs)).await;
|
||||
if let Err(err) = sd_notify::notify(false, &[sd_notify::NotifyState::Watchdog]) {
|
||||
log::warn!("Failed to notify systemd watchdog: {}", err);
|
||||
} else {
|
||||
log::trace!("Ping sent to systemd watchdog");
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
log::info!("Watchdog not enabled, skipping");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn shutdown(mpv: Mpv, proc: Option<tokio::process::Child>) {
|
||||
log::info!("Shutting down");
|
||||
sd_notify::notify(false, &[sd_notify::NotifyState::Stopping])
|
||||
.unwrap_or_else(|e| log::warn!("Failed to notify systemd that the service is stopping: {}", e));
|
||||
|
||||
mpv.disconnect()
|
||||
.await
|
||||
.unwrap_or_else(|e| log::warn!("Failed to disconnect from mpv: {}", e));
|
||||
if let Some(mut proc) = proc {
|
||||
proc.kill()
|
||||
.await
|
||||
.unwrap_or_else(|e| log::warn!("Failed to kill mpv process: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
env_logger::init();
|
||||
let args = Args::parse();
|
||||
|
||||
let systemd_mode = args.systemd && sd_notify::booted().unwrap_or(false);
|
||||
if systemd_mode {
|
||||
JournalLog::new()
|
||||
.context("Failed to initialize journald logging")?
|
||||
.install()
|
||||
.context("Failed to install journald logger")?;
|
||||
|
||||
log::set_max_level(args.verbose.log_level_filter());
|
||||
|
||||
log::debug!("Running with systemd integration");
|
||||
|
||||
setup_systemd_watchdog_thread().await?;
|
||||
} else {
|
||||
env_logger::Builder::new()
|
||||
.filter_level(args.verbose.log_level_filter())
|
||||
.init();
|
||||
|
||||
log::info!("Running without systemd integration");
|
||||
}
|
||||
|
||||
let mpv_config_file = create_mpv_config_file(args.mpv_config_file)?;
|
||||
|
||||
let (mpv, proc) = connect_to_mpv(&MpvConnectionArgs {
|
||||
@ -64,37 +135,65 @@ async fn main() -> anyhow::Result<()> {
|
||||
auto_start: args.auto_start_mpv,
|
||||
force_auto_start: args.force_auto_start,
|
||||
})
|
||||
.await?;
|
||||
.await
|
||||
.context("Failed to connect to mpv")?;
|
||||
|
||||
if let Err(e) = show_grzegorz_image(mpv.clone()).await {
|
||||
log::warn!("Could not show Grzegorz image: {}", e);
|
||||
log::warn!("Could not show Grzegorz image: {}", e);
|
||||
}
|
||||
|
||||
let addr = SocketAddr::new(resolve(&args.host).await?, args.port);
|
||||
log::info!("Starting API on {}", addr);
|
||||
let addr = match resolve(&args.host)
|
||||
.await
|
||||
.context(format!("Failed to resolve address: {}", &args.host))
|
||||
{
|
||||
Ok(addr) => addr,
|
||||
Err(e) => {
|
||||
log::error!("{}", e);
|
||||
shutdown(mpv, proc).await;
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let socket_addr = SocketAddr::new(addr, args.port);
|
||||
log::info!("Starting API on {}", socket_addr);
|
||||
|
||||
let app = Router::new().nest("/api", api::rest_api_routes(mpv.clone()));
|
||||
let server = match Server::try_bind(&socket_addr.clone())
|
||||
.context(format!("Failed to bind API server to '{}'", &socket_addr))
|
||||
{
|
||||
Ok(server) => server,
|
||||
Err(e) => {
|
||||
log::error!("{}", e);
|
||||
shutdown(mpv, proc).await;
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
if systemd_mode {
|
||||
match sd_notify::notify(false, &[sd_notify::NotifyState::Ready])
|
||||
.context("Failed to notify systemd that the service is ready")
|
||||
{
|
||||
Ok(_) => log::trace!("Notified systemd that the service is ready"),
|
||||
Err(e) => {
|
||||
log::error!("{}", e);
|
||||
shutdown(mpv, proc).await;
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(mut proc) = proc {
|
||||
tokio::select! {
|
||||
exit_status = proc.wait() => {
|
||||
log::warn!("mpv process exited with status: {}", exit_status?);
|
||||
mpv.disconnect().await?;
|
||||
shutdown(mpv, Some(proc)).await;
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
log::info!("Received Ctrl-C, exiting");
|
||||
mpv.disconnect().await?;
|
||||
proc.kill().await?;
|
||||
shutdown(mpv, Some(proc)).await;
|
||||
}
|
||||
result = async {
|
||||
match Server::try_bind(&addr.clone()).context("Failed to bind server") {
|
||||
Ok(server) => server.serve(app.into_make_service()).await.context("Failed to serve app"),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
} => {
|
||||
result = server.serve(app.into_make_service()) => {
|
||||
log::info!("API server exited");
|
||||
mpv.disconnect().await?;
|
||||
proc.kill().await?;
|
||||
shutdown(mpv, Some(proc)).await;
|
||||
result?;
|
||||
}
|
||||
}
|
||||
@ -102,11 +201,12 @@ async fn main() -> anyhow::Result<()> {
|
||||
tokio::select! {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
log::info!("Received Ctrl-C, exiting");
|
||||
mpv.disconnect().await?;
|
||||
shutdown(mpv.clone(), None).await;
|
||||
}
|
||||
_ = Server::bind(&addr.clone()).serve(app.into_make_service()) => {
|
||||
log::info!("API server exited");
|
||||
mpv.disconnect().await?;
|
||||
result = server.serve(app.into_make_service()) => {
|
||||
log::info!("API server exited");
|
||||
shutdown(mpv.clone(), None).await;
|
||||
result?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user