From 9f8640888f2cd46ccfa4697944bea92c9650808d Mon Sep 17 00:00:00 2001 From: h7x4 Date: Fri, 3 May 2024 17:20:00 +0200 Subject: [PATCH] add some tests for event property parser --- tests/integration.rs | 99 ---------- tests/integration/event_property_parser.rs | 216 +++++++++++++++++++++ tests/integration/misc.rs | 26 +++ tests/integration/mod.rs | 5 + tests/integration/util.rs | 43 ++++ tests/mod.rs | 1 + 6 files changed, 291 insertions(+), 99 deletions(-) delete mode 100644 tests/integration.rs create mode 100644 tests/integration/event_property_parser.rs create mode 100644 tests/integration/misc.rs create mode 100644 tests/integration/mod.rs create mode 100644 tests/integration/util.rs create mode 100644 tests/mod.rs diff --git a/tests/integration.rs b/tests/integration.rs deleted file mode 100644 index a75b323..0000000 --- a/tests/integration.rs +++ /dev/null @@ -1,99 +0,0 @@ -use mpvipc::{MpvError, Mpv, MpvExt}; -use std::path::Path; -use tokio::{ - process::{Child, Command}, - time::{sleep, timeout, Duration}, -}; - -#[cfg(target_family = "unix")] -async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> { - let socket_path_str = format!("/tmp/mpv-ipc-{}", uuid::Uuid::new_v4()); - let socket_path = Path::new(&socket_path_str); - - let process_handle = Command::new("mpv") - .arg("--no-config") - .arg("--idle") - .arg("--no-video") - .arg("--no-audio") - .arg(format!( - "--input-ipc-server={}", - &socket_path.to_str().unwrap() - )) - .spawn() - .expect("Failed to start mpv"); - - if timeout(Duration::from_millis(500), async { - while !&socket_path.exists() { - sleep(Duration::from_millis(10)).await; - } - }) - .await - .is_err() - { - panic!("Failed to create mpv socket at {:?}", &socket_path); - } - - let mpv = Mpv::connect(socket_path.to_str().unwrap()).await.unwrap(); - Ok((process_handle, mpv)) -} - -#[tokio::test] -#[cfg(target_family = "unix")] -async fn test_get_mpv_version() { - let (mut proc, mpv) = spawn_headless_mpv().await.unwrap(); - let version: String = mpv.get_property("mpv-version").await.unwrap(); - assert!(version.starts_with("mpv")); - - mpv.kill().await.unwrap(); - proc.kill().await.unwrap(); -} - -#[tokio::test] -#[cfg(target_family = "unix")] -async fn test_set_property() { - let (mut proc, mpv) = spawn_headless_mpv().await.unwrap(); - mpv.set_property("pause", true).await.unwrap(); - let paused: bool = mpv.get_property("pause").await.unwrap(); - assert!(paused); - - mpv.kill().await.unwrap(); - proc.kill().await.unwrap(); -} - -#[tokio::test] -#[cfg(target_family = "unix")] -async fn test_events() { - use futures::stream::StreamExt; - - let (mut proc, mpv) = spawn_headless_mpv().await.unwrap(); - - mpv.observe_property(1337, "pause").await.unwrap(); - - let mut events = mpv.get_event_stream().await; - let event_checking_thread = tokio::spawn(async move { - loop { - let event = events.next().await.unwrap().unwrap(); - if let (1337, property) = mpvipc::parse_event_property(event).unwrap() { - assert_eq!(property, mpvipc::Property::Pause(true)); - break; - } - } - }); - - tokio::time::sleep(Duration::from_millis(10)).await; - - mpv.set_property("pause", true).await.unwrap(); - - if tokio::time::timeout( - tokio::time::Duration::from_millis(500), - event_checking_thread, - ) - .await - .is_err() - { - panic!("Event checking thread timed out"); - } - - mpv.kill().await.unwrap(); - proc.kill().await.unwrap(); -} diff --git a/tests/integration/event_property_parser.rs b/tests/integration/event_property_parser.rs new file mode 100644 index 0000000..6981e97 --- /dev/null +++ b/tests/integration/event_property_parser.rs @@ -0,0 +1,216 @@ +use futures::{stream::StreamExt, Stream}; +use mpvipc::{parse_event_property, Event, Mpv, MpvError, MpvExt}; +use thiserror::Error; +use tokio::time::sleep; +use tokio::time::{timeout, Duration}; + +use test_log::test; + +use super::*; + +const MPV_CHANNEL_ID: usize = 1337; + +#[derive(Error, Debug)] +enum PropertyCheckingThreadError { + #[error("Unexpected property: {0:?}")] + UnexpectedPropertyError(mpvipc::Property), + + #[error(transparent)] + MpvError(#[from] MpvError), +} + +fn create_interruptable_event_property_checking_thread( + mut events: impl Stream> + Unpin + Send + 'static, + on_property: T, +) -> ( + tokio::task::JoinHandle>, + tokio_util::sync::CancellationToken, +) +where + T: Fn(mpvipc::Property) -> bool + Send + 'static, +{ + let cancellation_token = tokio_util::sync::CancellationToken::new(); + let cancellation_token_clone = cancellation_token.clone(); + let handle = tokio::spawn(async move { + loop { + tokio::select! { + event = events.next() => { + match event { + Some(Ok(event)) => { + match event { + Event::PropertyChange { id: MPV_CHANNEL_ID, .. } => { + let property = parse_event_property(event).unwrap().1; + if !on_property(property.clone()) { + return Err(PropertyCheckingThreadError::UnexpectedPropertyError(property)) + } + } + _ => { + log::trace!("Received unrelated event, ignoring: {:?}", event); + } + } + } + Some(Err(err)) => return Err(err.into()), + None => return Ok(()), + } + } + _ = cancellation_token_clone.cancelled() => return Ok(()), + } + } + }); + + (handle, cancellation_token) +} + +async fn graceful_shutdown( + cancellation_token: tokio_util::sync::CancellationToken, + handle: tokio::task::JoinHandle>, + mpv: Mpv, + mut proc: tokio::process::Child, +) -> Result<(), MpvError> { + cancellation_token.cancel(); + + match timeout(Duration::from_millis(500), handle).await { + Ok(Ok(Ok(()))) => {} + Ok(Ok(Err(err))) => match err { + PropertyCheckingThreadError::UnexpectedPropertyError(property) => { + return Err(MpvError::Other(format!( + "Unexpected property: {:?}", + property + ))); + } + PropertyCheckingThreadError::MpvError(err) => return Err(err), + }, + Ok(Err(_)) => { + return Err(MpvError::InternalConnectionError( + "Event checking thread timed out".to_owned(), + )); + } + Err(_) => { + return Err(MpvError::InternalConnectionError( + "Event checking thread panicked".to_owned(), + )); + } + } + + mpv.kill().await?; + proc.wait().await.map_err(|err| { + MpvError::InternalConnectionError(format!( + "Failed to wait for mpv process to exit: {}", + err + )) + })?; + + Ok(()) +} + +#[test(tokio::test)] +#[cfg(target_family = "unix")] +async fn test_highlevel_event_pause() -> Result<(), MpvError> { + let (proc, mpv) = spawn_headless_mpv().await?; + + mpv.observe_property(MPV_CHANNEL_ID, "pause").await?; + + let events = mpv.get_event_stream().await; + let (handle, cancellation_token) = + create_interruptable_event_property_checking_thread(events, |property| match property { + mpvipc::Property::Pause(_) => { + log::debug!("{:?}", property); + true + } + _ => false, + }); + + sleep(Duration::from_millis(5)).await; + mpv.set_property("pause", false).await?; + sleep(Duration::from_millis(5)).await; + mpv.set_property("pause", true).await?; + sleep(Duration::from_millis(5)).await; + + graceful_shutdown(cancellation_token, handle, mpv, proc).await?; + + Ok(()) +} + +#[test(tokio::test)] +#[cfg(target_family = "unix")] +async fn test_highlevel_event_volume() -> Result<(), MpvError> { + let (proc, mpv) = spawn_headless_mpv().await?; + + mpv.observe_property(1337, "volume").await?; + let events = mpv.get_event_stream().await; + let (handle, cancellation_token) = + create_interruptable_event_property_checking_thread(events, |property| match property { + mpvipc::Property::Volume(_) => { + log::trace!("{:?}", property); + true + } + _ => false, + }); + + sleep(Duration::from_millis(5)).await; + mpv.set_property("volume", 100.0).await?; + sleep(Duration::from_millis(5)).await; + mpv.set_property("volume", 40).await?; + sleep(Duration::from_millis(5)).await; + mpv.set_property("volume", 0.0).await?; + sleep(Duration::from_millis(5)).await; + + graceful_shutdown(cancellation_token, handle, mpv, proc).await?; + + Ok(()) +} + +#[test(tokio::test)] +#[cfg(target_family = "unix")] +async fn test_highlevel_event_mute() -> Result<(), MpvError> { + let (proc, mpv) = spawn_headless_mpv().await?; + + mpv.observe_property(1337, "mute").await?; + let events = mpv.get_event_stream().await; + let (handle, cancellation_token) = + create_interruptable_event_property_checking_thread(events, |property| match property { + mpvipc::Property::Mute(_) => { + log::trace!("{:?}", property); + true + } + _ => false, + }); + + sleep(Duration::from_millis(5)).await; + mpv.set_property("mute", true).await?; + sleep(Duration::from_millis(5)).await; + mpv.set_property("mute", false).await?; + sleep(Duration::from_millis(5)).await; + + graceful_shutdown(cancellation_token, handle, mpv, proc).await?; + + Ok(()) +} + +#[test(tokio::test)] +#[cfg(target_family = "unix")] +async fn test_highlevel_event_duration() -> Result<(), MpvError> { + let (proc, mpv) = spawn_headless_mpv().await?; + + mpv.observe_property(1337, "duration").await?; + + let events = mpv.get_event_stream().await; + let (handle, cancellation_token) = + create_interruptable_event_property_checking_thread(events, |property| match property { + mpvipc::Property::Duration(_) => { + log::trace!("{:?}", property); + true + } + _ => false, + }); + + sleep(Duration::from_millis(5)).await; + mpv.set_property("pause", true).await?; + sleep(Duration::from_millis(5)).await; + mpv.set_property("pause", false).await?; + sleep(Duration::from_millis(5)).await; + + graceful_shutdown(cancellation_token, handle, mpv, proc).await?; + + Ok(()) +} diff --git a/tests/integration/misc.rs b/tests/integration/misc.rs new file mode 100644 index 0000000..8049f78 --- /dev/null +++ b/tests/integration/misc.rs @@ -0,0 +1,26 @@ +use mpvipc::MpvExt; + +use super::*; + +#[tokio::test] +#[cfg(target_family = "unix")] +async fn test_get_mpv_version() { + let (mut proc, mpv) = spawn_headless_mpv().await.unwrap(); + let version: String = mpv.get_property("mpv-version").await.unwrap(); + assert!(version.starts_with("mpv")); + + mpv.kill().await.unwrap(); + proc.kill().await.unwrap(); +} + +#[tokio::test] +#[cfg(target_family = "unix")] +async fn test_set_property() { + let (mut proc, mpv) = spawn_headless_mpv().await.unwrap(); + mpv.set_property("pause", true).await.unwrap(); + let paused: bool = mpv.get_property("pause").await.unwrap(); + assert!(paused); + + mpv.kill().await.unwrap(); + proc.kill().await.unwrap(); +} \ No newline at end of file diff --git a/tests/integration/mod.rs b/tests/integration/mod.rs new file mode 100644 index 0000000..212fb84 --- /dev/null +++ b/tests/integration/mod.rs @@ -0,0 +1,5 @@ +mod event_property_parser; +mod util; +mod misc; + +use util::*; \ No newline at end of file diff --git a/tests/integration/util.rs b/tests/integration/util.rs new file mode 100644 index 0000000..1de4bad --- /dev/null +++ b/tests/integration/util.rs @@ -0,0 +1,43 @@ +use std::{path::Path, time::Duration}; + +use mpvipc::{Mpv, MpvError}; +use tokio::{ + process::{Child, Command}, + time::{sleep, timeout}, +}; + +#[cfg(target_family = "unix")] +pub async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> { + let socket_path_str = format!("/tmp/mpv-ipc-{}", uuid::Uuid::new_v4()); + let socket_path = Path::new(&socket_path_str); + + // TODO: Verify that `mpv` exists in `PATH`` + let process_handle = Command::new("mpv") + .arg("--no-config") + .arg("--idle") + .arg("--no-video") + .arg("--no-audio") + .arg(format!( + "--input-ipc-server={}", + &socket_path.to_str().unwrap() + )) + .kill_on_drop(true) + .spawn() + .expect("Failed to start mpv"); + + timeout(Duration::from_millis(500), async { + while !&socket_path.exists() { + sleep(Duration::from_millis(10)).await; + } + }) + .await + .map_err(|_| { + MpvError::MpvSocketConnectionError(format!( + "Failed to create mpv socket at {:?}, timed out waiting for socket file to be created", + &socket_path + )) + })?; + + let mpv = Mpv::connect(socket_path.to_str().unwrap()).await?; + Ok((process_handle, mpv)) +} diff --git a/tests/mod.rs b/tests/mod.rs new file mode 100644 index 0000000..605134b --- /dev/null +++ b/tests/mod.rs @@ -0,0 +1 @@ +mod integration; \ No newline at end of file