Misc changes (see commit log) #3

Merged
oysteikt merged 9 commits from rework-error-messages into master 2024-05-04 00:50:33 +02:00
6 changed files with 291 additions and 99 deletions
Showing only changes of commit f50b4defc1 - Show all commits

View File

@ -1,99 +0,0 @@
use mpvipc::{MpvError, Mpv, MpvExt};
use std::path::Path;
use tokio::{
process::{Child, Command},
time::{sleep, timeout, Duration},
};
#[cfg(target_family = "unix")]
async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> {
let socket_path_str = format!("/tmp/mpv-ipc-{}", uuid::Uuid::new_v4());
let socket_path = Path::new(&socket_path_str);
let process_handle = Command::new("mpv")
.arg("--no-config")
.arg("--idle")
.arg("--no-video")
.arg("--no-audio")
.arg(format!(
"--input-ipc-server={}",
&socket_path.to_str().unwrap()
))
.spawn()
.expect("Failed to start mpv");
if timeout(Duration::from_millis(500), async {
while !&socket_path.exists() {
sleep(Duration::from_millis(10)).await;
}
})
.await
.is_err()
{
panic!("Failed to create mpv socket at {:?}", &socket_path);
}
let mpv = Mpv::connect(socket_path.to_str().unwrap()).await.unwrap();
Ok((process_handle, mpv))
}
#[tokio::test]
#[cfg(target_family = "unix")]
async fn test_get_mpv_version() {
let (mut proc, mpv) = spawn_headless_mpv().await.unwrap();
let version: String = mpv.get_property("mpv-version").await.unwrap();
assert!(version.starts_with("mpv"));
mpv.kill().await.unwrap();
proc.kill().await.unwrap();
}
#[tokio::test]
#[cfg(target_family = "unix")]
async fn test_set_property() {
let (mut proc, mpv) = spawn_headless_mpv().await.unwrap();
mpv.set_property("pause", true).await.unwrap();
let paused: bool = mpv.get_property("pause").await.unwrap();
assert!(paused);
mpv.kill().await.unwrap();
proc.kill().await.unwrap();
}
#[tokio::test]
#[cfg(target_family = "unix")]
async fn test_events() {
use futures::stream::StreamExt;
let (mut proc, mpv) = spawn_headless_mpv().await.unwrap();
mpv.observe_property(1337, "pause").await.unwrap();
let mut events = mpv.get_event_stream().await;
let event_checking_thread = tokio::spawn(async move {
loop {
let event = events.next().await.unwrap().unwrap();
if let (1337, property) = mpvipc::parse_event_property(event).unwrap() {
assert_eq!(property, mpvipc::Property::Pause(true));
break;
}
}
});
tokio::time::sleep(Duration::from_millis(10)).await;
mpv.set_property("pause", true).await.unwrap();
if tokio::time::timeout(
tokio::time::Duration::from_millis(500),
event_checking_thread,
)
.await
.is_err()
{
panic!("Event checking thread timed out");
}
mpv.kill().await.unwrap();
proc.kill().await.unwrap();
}

View File

@ -0,0 +1,216 @@
use futures::{stream::StreamExt, Stream};
use mpvipc::{parse_event_property, Event, Mpv, MpvError, MpvExt};
use thiserror::Error;
use tokio::time::sleep;
use tokio::time::{timeout, Duration};
use test_log::test;
use super::*;
const MPV_CHANNEL_ID: usize = 1337;
#[derive(Error, Debug)]
enum PropertyCheckingThreadError {
#[error("Unexpected property: {0:?}")]
UnexpectedPropertyError(mpvipc::Property),
#[error(transparent)]
MpvError(#[from] MpvError),
}
fn create_interruptable_event_property_checking_thread<T>(
mut events: impl Stream<Item = Result<Event, MpvError>> + Unpin + Send + 'static,
on_property: T,
) -> (
tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
tokio_util::sync::CancellationToken,
)
where
T: Fn(mpvipc::Property) -> bool + Send + 'static,
{
let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
event = events.next() => {
match event {
Some(Ok(event)) => {
match event {
Event::PropertyChange { id: MPV_CHANNEL_ID, .. } => {
let property = parse_event_property(event).unwrap().1;
if !on_property(property.clone()) {
return Err(PropertyCheckingThreadError::UnexpectedPropertyError(property))
}
}
_ => {
log::trace!("Received unrelated event, ignoring: {:?}", event);
}
}
}
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
_ = cancellation_token_clone.cancelled() => return Ok(()),
}
}
});
(handle, cancellation_token)
}
async fn graceful_shutdown(
cancellation_token: tokio_util::sync::CancellationToken,
handle: tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
mpv: Mpv,
mut proc: tokio::process::Child,
) -> Result<(), MpvError> {
cancellation_token.cancel();
match timeout(Duration::from_millis(500), handle).await {
Ok(Ok(Ok(()))) => {}
Ok(Ok(Err(err))) => match err {
PropertyCheckingThreadError::UnexpectedPropertyError(property) => {
return Err(MpvError::Other(format!(
"Unexpected property: {:?}",
property
)));
}
PropertyCheckingThreadError::MpvError(err) => return Err(err),
},
Ok(Err(_)) => {
return Err(MpvError::InternalConnectionError(
"Event checking thread timed out".to_owned(),
));
}
Err(_) => {
return Err(MpvError::InternalConnectionError(
"Event checking thread panicked".to_owned(),
));
}
}
mpv.kill().await?;
proc.wait().await.map_err(|err| {
MpvError::InternalConnectionError(format!(
"Failed to wait for mpv process to exit: {}",
err
))
})?;
Ok(())
}
#[test(tokio::test)]
#[cfg(target_family = "unix")]
async fn test_highlevel_event_pause() -> Result<(), MpvError> {
let (proc, mpv) = spawn_headless_mpv().await?;
mpv.observe_property(MPV_CHANNEL_ID, "pause").await?;
let events = mpv.get_event_stream().await;
let (handle, cancellation_token) =
create_interruptable_event_property_checking_thread(events, |property| match property {
mpvipc::Property::Pause(_) => {
log::debug!("{:?}", property);
true
}
_ => false,
});
sleep(Duration::from_millis(5)).await;
mpv.set_property("pause", false).await?;
sleep(Duration::from_millis(5)).await;
mpv.set_property("pause", true).await?;
sleep(Duration::from_millis(5)).await;
graceful_shutdown(cancellation_token, handle, mpv, proc).await?;
Ok(())
}
#[test(tokio::test)]
#[cfg(target_family = "unix")]
async fn test_highlevel_event_volume() -> Result<(), MpvError> {
let (proc, mpv) = spawn_headless_mpv().await?;
mpv.observe_property(1337, "volume").await?;
let events = mpv.get_event_stream().await;
let (handle, cancellation_token) =
create_interruptable_event_property_checking_thread(events, |property| match property {
mpvipc::Property::Volume(_) => {
log::trace!("{:?}", property);
true
}
_ => false,
});
sleep(Duration::from_millis(5)).await;
mpv.set_property("volume", 100.0).await?;
sleep(Duration::from_millis(5)).await;
mpv.set_property("volume", 40).await?;
sleep(Duration::from_millis(5)).await;
mpv.set_property("volume", 0.0).await?;
sleep(Duration::from_millis(5)).await;
graceful_shutdown(cancellation_token, handle, mpv, proc).await?;
Ok(())
}
#[test(tokio::test)]
#[cfg(target_family = "unix")]
async fn test_highlevel_event_mute() -> Result<(), MpvError> {
let (proc, mpv) = spawn_headless_mpv().await?;
mpv.observe_property(1337, "mute").await?;
let events = mpv.get_event_stream().await;
let (handle, cancellation_token) =
create_interruptable_event_property_checking_thread(events, |property| match property {
mpvipc::Property::Mute(_) => {
log::trace!("{:?}", property);
true
}
_ => false,
});
sleep(Duration::from_millis(5)).await;
mpv.set_property("mute", true).await?;
sleep(Duration::from_millis(5)).await;
mpv.set_property("mute", false).await?;
sleep(Duration::from_millis(5)).await;
graceful_shutdown(cancellation_token, handle, mpv, proc).await?;
Ok(())
}
#[test(tokio::test)]
#[cfg(target_family = "unix")]
async fn test_highlevel_event_duration() -> Result<(), MpvError> {
let (proc, mpv) = spawn_headless_mpv().await?;
mpv.observe_property(1337, "duration").await?;
let events = mpv.get_event_stream().await;
let (handle, cancellation_token) =
create_interruptable_event_property_checking_thread(events, |property| match property {
mpvipc::Property::Duration(_) => {
log::trace!("{:?}", property);
true
}
_ => false,
});
sleep(Duration::from_millis(5)).await;
mpv.set_property("pause", true).await?;
sleep(Duration::from_millis(5)).await;
mpv.set_property("pause", false).await?;
sleep(Duration::from_millis(5)).await;
graceful_shutdown(cancellation_token, handle, mpv, proc).await?;
Ok(())
}

26
tests/integration/misc.rs Normal file
View File

@ -0,0 +1,26 @@
use mpvipc::MpvExt;
use super::*;
#[tokio::test]
#[cfg(target_family = "unix")]
async fn test_get_mpv_version() {
let (mut proc, mpv) = spawn_headless_mpv().await.unwrap();
let version: String = mpv.get_property("mpv-version").await.unwrap();
assert!(version.starts_with("mpv"));
mpv.kill().await.unwrap();
proc.kill().await.unwrap();
}
#[tokio::test]
#[cfg(target_family = "unix")]
async fn test_set_property() {
let (mut proc, mpv) = spawn_headless_mpv().await.unwrap();
mpv.set_property("pause", true).await.unwrap();
let paused: bool = mpv.get_property("pause").await.unwrap();
assert!(paused);
mpv.kill().await.unwrap();
proc.kill().await.unwrap();
}

5
tests/integration/mod.rs Normal file
View File

@ -0,0 +1,5 @@
mod event_property_parser;
mod util;
mod misc;
use util::*;

43
tests/integration/util.rs Normal file
View File

@ -0,0 +1,43 @@
use std::{path::Path, time::Duration};
use mpvipc::{Mpv, MpvError};
use tokio::{
process::{Child, Command},
time::{sleep, timeout},
};
#[cfg(target_family = "unix")]
pub async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> {
let socket_path_str = format!("/tmp/mpv-ipc-{}", uuid::Uuid::new_v4());
let socket_path = Path::new(&socket_path_str);
// TODO: Verify that `mpv` exists in `PATH``
let process_handle = Command::new("mpv")
.arg("--no-config")
.arg("--idle")
.arg("--no-video")
.arg("--no-audio")
.arg(format!(
"--input-ipc-server={}",
&socket_path.to_str().unwrap()
))
.kill_on_drop(true)
.spawn()
.expect("Failed to start mpv");
timeout(Duration::from_millis(500), async {
while !&socket_path.exists() {
sleep(Duration::from_millis(10)).await;
}
})
.await
.map_err(|_| {
MpvError::MpvSocketConnectionError(format!(
"Failed to create mpv socket at {:?}, timed out waiting for socket file to be created",
&socket_path
))
})?;
let mpv = Mpv::connect(socket_path.to_str().unwrap()).await?;
Ok((process_handle, mpv))
}

1
tests/mod.rs Normal file
View File

@ -0,0 +1 @@
mod integration;