Compare commits

..

3 Commits

Author SHA1 Message Date
Oystein Kristoffer Tveit 9934b11766
nix: use unwrapped program by default in module 2024-10-20 23:06:32 +02:00
Oystein Kristoffer Tveit cfb6bbd7a7
default.nix: better source filtering 2024-10-20 23:06:32 +02:00
Oystein Kristoffer Tveit b1f8cf9ba2
systemd integration
- Add watchdog timeout support
- Add native journald logging support
- Add application state notifications
- Add verbosity flag
2024-10-20 23:06:32 +02:00
6 changed files with 192 additions and 31 deletions

29
Cargo.lock generated
View File

@ -284,6 +284,16 @@ dependencies = [
"clap_derive",
]
[[package]]
name = "clap-verbosity-flag"
version = "2.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e099138e1807662ff75e2cebe4ae2287add879245574489f9b1588eb5e5564ed"
dependencies = [
"clap",
"log",
]
[[package]]
name = "clap_builder"
version = "4.5.20"
@ -496,11 +506,14 @@ dependencies = [
"anyhow",
"axum",
"clap",
"clap-verbosity-flag",
"env_logger",
"log",
"mpvipc-async",
"sd-notify",
"serde",
"serde_json",
"systemd-journal-logger",
"tempfile",
"tokio",
"tower",
@ -983,6 +996,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sd-notify"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1be20c5f7f393ee700f8b2f28ea35812e4e212f40774b550cd2a93ea91684451"
[[package]]
name = "serde"
version = "1.0.210"
@ -1100,6 +1119,16 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "systemd-journal-logger"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c18918ae65f3d828ec9ab7f4714b8c564149045f47407e319dd25cadfaf9d0cf"
dependencies = [
"log",
"rustix",
]
[[package]]
name = "tempfile"
version = "3.13.0"

View File

@ -12,11 +12,14 @@ readme = "README.md"
anyhow = "1.0.82"
axum = { version = "0.6.20", features = ["macros"] }
clap = { version = "4.4.1", features = ["derive"] }
clap-verbosity-flag = "2.2.2"
env_logger = "0.10.0"
log = "0.4.20"
mpvipc-async = { git = "https://git.pvv.ntnu.no/oysteikt/mpvipc-async.git", rev = "v0.1.0" }
sd-notify = "0.4.3"
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105"
systemd-journal-logger = "2.2.0"
tempfile = "3.11.0"
tokio = { version = "1.32.0", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }

View File

@ -4,6 +4,7 @@
, rustPlatform
, makeWrapper
, mpv
, wrapped ? false
}:
rustPlatform.buildRustPackage rec {
@ -13,13 +14,20 @@ rustPlatform.buildRustPackage rec {
baseName = baseNameOf (toString path);
in !(lib.any (b: b) [
(!(lib.cleanSourceFilter path type))
(baseName == "target" && type == "directory")
(baseName == "nix" && type == "directory")
(baseName == "flake.nix" && type == "regular")
(baseName == "flake.lock" && type == "regular")
(type == "directory" && lib.elem baseName [
".direnv"
".git"
"target"
"result"
])
(type == "regular" && lib.elem baseName [
"flake.nix"
"default.nix"
"module.nix"
".envrc"
])
])) ./.;
nativeBuildInputs = [ makeWrapper ];
cargoLock = {
@ -29,7 +37,7 @@ rustPlatform.buildRustPackage rec {
};
};
postInstall = ''
postInstall = lib.optionalString wrapped ''
wrapProgram $out/bin/greg-ng \
--prefix PATH : '${lib.makeBinPath [ mpv ]}'
'';

View File

@ -35,7 +35,7 @@
apps = forAllSystems (system: pkgs: _: {
default = self.apps.${system}.greg-ng;
greg-ng = let
package = self.packages.${system}.greg-ng;
package = self.packages.${system}.greg-ng-wrapped;
in {
type = "app";
program = lib.getExe package;
@ -63,6 +63,9 @@
packages = forAllSystems (system: pkgs: _: {
default = self.packages.${system}.greg-ng;
greg-ng = pkgs.callPackage ./default.nix { };
greg-ng-wrapped = pkgs.callPackage ./default.nix {
wrapped = true;
};
});
} // {
nixosModules.default = ./module.nix;

View File

@ -14,7 +14,19 @@ in
enablePipewire = lib.mkEnableOption "pipewire" // { default = true; };
enableDebug = lib.mkEnableOption "debug logs";
logLevel = lib.mkOption {
type = lib.types.enum [ "quiet" "error" "warn" "info" "debug" "trace" ];
default = "debug";
description = "Log level.";
apply = level: {
"quiet" = "-q";
"error" = "";
"warn" = "-v";
"info" = "-vv";
"debug" = "-vvv";
"trace" = "-vvvv";
}.${level};
};
# TODO: create some better descriptions
settings = {
@ -87,12 +99,18 @@ in
description = "greg-ng, an mpv based media player";
wantedBy = [ "graphical-session.target" ];
partOf = [ "graphical-session.target" ];
environment.RUST_LOG = lib.mkIf cfg.enableDebug "greg_ng=trace,mpvipc=trace";
serviceConfig = {
Type = "simple";
ExecStart = "${lib.getExe cfg.package} ${lib.cli.toGNUCommandLineShell { } cfg.settings}";
Type = "notify";
ExecStart = let
args = lib.cli.toGNUCommandLineShell { } (cfg.settings // {
systemd = true;
});
in "${lib.getExe cfg.package} ${cfg.logLevel} ${args}";
Restart = "always";
RestartSec = 3;
WatchdogSec = lib.mkDefault 15;
TimeoutStartSec = lib.mkDefault 30;
RestrictAddressFamilies = [ "AF_UNIX" "AF_INET" "AF_INET6" ];
AmbientCapabilities = [ "" ];

View File

@ -1,8 +1,11 @@
use anyhow::Context;
use axum::{Router, Server};
use clap::Parser;
use clap_verbosity_flag::Verbosity;
use mpv_setup::{connect_to_mpv, create_mpv_config_file, show_grzegorz_image};
use mpvipc_async::Mpv;
use std::net::{IpAddr, SocketAddr};
use systemd_journal_logger::JournalLog;
use tempfile::NamedTempFile;
mod api;
@ -16,6 +19,12 @@ struct Args {
#[clap(short, long, default_value = "8008")]
port: u16,
#[command(flatten)]
verbose: Verbosity,
#[clap(long)]
systemd: bool,
#[clap(long, value_name = "PATH", default_value = "/run/mpv/mpv.sock")]
mpv_socket_path: String,
@ -40,6 +49,8 @@ struct MpvConnectionArgs<'a> {
force_auto_start: bool,
}
/// Helper function to resolve a hostname to an IP address.
/// Why is this not in the standard library? >:(
async fn resolve(host: &str) -> anyhow::Result<IpAddr> {
let addr = format!("{}:0", host);
let addresses = tokio::net::lookup_host(addr).await?;
@ -50,11 +61,71 @@ async fn resolve(host: &str) -> anyhow::Result<IpAddr> {
.ok_or_else(|| anyhow::anyhow!("Failed to resolve address"))
}
/// Helper function that spawns a tokio thread that
/// continuously sends a ping to systemd watchdog, if enabled.
async fn setup_systemd_watchdog_thread() -> anyhow::Result<()> {
let mut watchdog_microsecs: u64 = 0;
if sd_notify::watchdog_enabled(true, &mut watchdog_microsecs) {
watchdog_microsecs = watchdog_microsecs.div_ceil(2);
tokio::spawn(async move {
log::debug!(
"Starting systemd watchdog thread with {} millisecond interval",
watchdog_microsecs.div_ceil(1000)
);
loop {
tokio::time::sleep(tokio::time::Duration::from_micros(watchdog_microsecs)).await;
if let Err(err) = sd_notify::notify(false, &[sd_notify::NotifyState::Watchdog]) {
log::warn!("Failed to notify systemd watchdog: {}", err);
} else {
log::trace!("Ping sent to systemd watchdog");
}
}
});
} else {
log::info!("Watchdog not enabled, skipping");
}
Ok(())
}
async fn shutdown(mpv: Mpv, proc: Option<tokio::process::Child>) {
log::info!("Shutting down");
sd_notify::notify(false, &[sd_notify::NotifyState::Stopping])
.unwrap_or_else(|e| log::warn!("Failed to notify systemd that the service is stopping: {}", e));
mpv.disconnect()
.await
.unwrap_or_else(|e| log::warn!("Failed to disconnect from mpv: {}", e));
if let Some(mut proc) = proc {
proc.kill()
.await
.unwrap_or_else(|e| log::warn!("Failed to kill mpv process: {}", e));
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
let args = Args::parse();
let systemd_mode = args.systemd && sd_notify::booted().unwrap_or(false);
if systemd_mode {
JournalLog::new()
.context("Failed to initialize journald logging")?
.install()
.context("Failed to install journald logger")?;
log::set_max_level(args.verbose.log_level_filter());
log::debug!("Running with systemd integration");
setup_systemd_watchdog_thread().await?;
} else {
env_logger::Builder::new()
.filter_level(args.verbose.log_level_filter())
.init();
log::info!("Running without systemd integration");
}
let mpv_config_file = create_mpv_config_file(args.mpv_config_file)?;
let (mpv, proc) = connect_to_mpv(&MpvConnectionArgs {
@ -64,37 +135,65 @@ async fn main() -> anyhow::Result<()> {
auto_start: args.auto_start_mpv,
force_auto_start: args.force_auto_start,
})
.await?;
.await
.context("Failed to connect to mpv")?;
if let Err(e) = show_grzegorz_image(mpv.clone()).await {
log::warn!("Could not show Grzegorz image: {}", e);
}
let addr = SocketAddr::new(resolve(&args.host).await?, args.port);
log::info!("Starting API on {}", addr);
let addr = match resolve(&args.host)
.await
.context(format!("Failed to resolve address: {}", &args.host))
{
Ok(addr) => addr,
Err(e) => {
log::error!("{}", e);
shutdown(mpv, proc).await;
return Err(e);
}
};
let socket_addr = SocketAddr::new(addr, args.port);
log::info!("Starting API on {}", socket_addr);
let app = Router::new().nest("/api", api::rest_api_routes(mpv.clone()));
let server = match Server::try_bind(&socket_addr.clone())
.context(format!("Failed to bind API server to '{}'", &socket_addr))
{
Ok(server) => server,
Err(e) => {
log::error!("{}", e);
shutdown(mpv, proc).await;
return Err(e);
}
};
if systemd_mode {
match sd_notify::notify(false, &[sd_notify::NotifyState::Ready])
.context("Failed to notify systemd that the service is ready")
{
Ok(_) => log::trace!("Notified systemd that the service is ready"),
Err(e) => {
log::error!("{}", e);
shutdown(mpv, proc).await;
return Err(e);
}
}
}
if let Some(mut proc) = proc {
tokio::select! {
exit_status = proc.wait() => {
log::warn!("mpv process exited with status: {}", exit_status?);
mpv.disconnect().await?;
shutdown(mpv, Some(proc)).await;
}
_ = tokio::signal::ctrl_c() => {
log::info!("Received Ctrl-C, exiting");
mpv.disconnect().await?;
proc.kill().await?;
shutdown(mpv, Some(proc)).await;
}
result = async {
match Server::try_bind(&addr.clone()).context("Failed to bind server") {
Ok(server) => server.serve(app.into_make_service()).await.context("Failed to serve app"),
Err(err) => Err(err),
}
} => {
result = server.serve(app.into_make_service()) => {
log::info!("API server exited");
mpv.disconnect().await?;
proc.kill().await?;
shutdown(mpv, Some(proc)).await;
result?;
}
}
@ -102,11 +201,12 @@ async fn main() -> anyhow::Result<()> {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received Ctrl-C, exiting");
mpv.disconnect().await?;
shutdown(mpv.clone(), None).await;
}
_ = Server::bind(&addr.clone()).serve(app.into_make_service()) => {
result = server.serve(app.into_make_service()) => {
log::info!("API server exited");
mpv.disconnect().await?;
shutdown(mpv.clone(), None).await;
result?;
}
}
}