From 78e228f9c405c039ec0756cf7f183166b93abba1 Mon Sep 17 00:00:00 2001 From: h7x4 Date: Fri, 2 Jan 2026 20:24:53 +0900 Subject: [PATCH] tests/misc: test unobserving an event --- .../event_property_parser.rs | 115 +----------------- tests/integration_tests/misc.rs | 49 +++++++- tests/integration_tests/util.rs | 108 +++++++++++++++- 3 files changed, 159 insertions(+), 113 deletions(-) diff --git a/tests/integration_tests/event_property_parser.rs b/tests/integration_tests/event_property_parser.rs index f1e94e8..240e18d 100644 --- a/tests/integration_tests/event_property_parser.rs +++ b/tests/integration_tests/event_property_parser.rs @@ -1,118 +1,11 @@ -use futures::stream::StreamExt; -use mpvipc_async::{Event, Mpv, MpvError, MpvExt, Property, parse_property}; -use thiserror::Error; -use tokio::time::sleep; -use tokio::time::{Duration, timeout}; - use test_log::test; +use tokio::time::Duration; +use tokio::time::sleep; + +use mpvipc_async::{MpvError, MpvExt, Property}; use super::*; -const MPV_CHANNEL_ID: u64 = 1337; - -#[derive(Error, Debug)] -enum PropertyCheckingThreadError { - #[error("Unexpected property: {0:?}")] - UnexpectedPropertyError(Property), - - #[error(transparent)] - MpvError(#[from] MpvError), -} - -/// This function will create an ongoing tokio task that collects [`Event::PropertyChange`] events, -/// and parses them into [`Property`]s. It will then run the property through the provided -/// closure, and return an error if the closure returns false. -/// -/// The returned cancellation token can be used to stop the task. -fn create_interruptable_event_property_checking_thread( - mpv: Mpv, - on_property: T, -) -> ( - tokio::task::JoinHandle>, - tokio_util::sync::CancellationToken, -) -where - T: Fn(Property) -> bool + Send + 'static, -{ - let cancellation_token = tokio_util::sync::CancellationToken::new(); - let cancellation_token_clone = cancellation_token.clone(); - let handle = tokio::spawn(async move { - let mut events = mpv.get_event_stream().await; - - loop { - tokio::select! { - event = events.next() => { - match event { - Some(Ok(event)) => { - match event { - Event::PropertyChange { id: Some(MPV_CHANNEL_ID), name, data } => { - let property = parse_property(&name, data).unwrap(); - if !on_property(property.clone()) { - return Err(PropertyCheckingThreadError::UnexpectedPropertyError(property)) - } - } - _ => { - log::trace!("Received unrelated event, ignoring: {:?}", event); - } - } - } - Some(Err(err)) => return Err(err.into()), - None => return Ok(()), - } - } - _ = cancellation_token_clone.cancelled() => return Ok(()), - } - } - }); - - (handle, cancellation_token) -} - -/// This helper function will gracefully shut down both the event checking thread and the mpv process. -/// It will also return an error if the event checking thread happened to panic, or if it times out -/// The timeout is hardcoded to 500ms. -async fn graceful_shutdown( - cancellation_token: tokio_util::sync::CancellationToken, - handle: tokio::task::JoinHandle>, - mpv: Mpv, - mut proc: tokio::process::Child, -) -> Result<(), MpvError> { - cancellation_token.cancel(); - - match timeout(Duration::from_millis(500), handle).await { - Ok(Ok(Ok(()))) => {} - Ok(Ok(Err(err))) => match err { - PropertyCheckingThreadError::UnexpectedPropertyError(property) => { - return Err(MpvError::Other(format!( - "Unexpected property: {:?}", - property - ))); - } - PropertyCheckingThreadError::MpvError(err) => return Err(err), - }, - Ok(Err(_)) => { - return Err(MpvError::InternalConnectionError( - "Event checking thread timed out".to_owned(), - )); - } - Err(_) => { - return Err(MpvError::InternalConnectionError( - "Event checking thread panicked".to_owned(), - )); - } - } - - mpv.kill().await?; - proc.wait().await.map_err(|err| { - MpvError::InternalConnectionError(format!( - "Failed to wait for mpv process to exit: {}", - err - )) - })?; - - Ok(()) -} - /// Test correct parsing of different values of the "pause" property #[test(tokio::test)] #[cfg(target_family = "unix")] diff --git a/tests/integration_tests/misc.rs b/tests/integration_tests/misc.rs index 2f73b71..54fcca6 100644 --- a/tests/integration_tests/misc.rs +++ b/tests/integration_tests/misc.rs @@ -1,4 +1,9 @@ -use mpvipc_async::{MpvError, MpvExt}; +use std::time::Duration; + +use test_log::test; +use tokio::time::sleep; + +use mpvipc_async::{MpvError, MpvExt, Property}; use super::*; @@ -60,3 +65,45 @@ async fn test_get_nonexistent_property() -> Result<(), MpvError> { Ok(()) } + +#[test(tokio::test)] +#[cfg(target_family = "unix")] +async fn test_unobserve_property() -> Result<(), MpvError> { + let (proc, mpv) = spawn_headless_mpv().await?; + + mpv.observe_property(MPV_CHANNEL_ID, "pause").await?; + + let (handle, cancellation_token) = create_interruptable_event_property_checking_thread( + mpv.clone(), + |property| match property { + Property::Pause(_) => { + log::debug!("{:?}", property); + true + } + _ => false, + }, + ); + + sleep(Duration::from_millis(5)).await; + mpv.set_property("pause", true).await?; + sleep(Duration::from_millis(5)).await; + + cancellation_token.cancel(); + check_property_thread_result(handle).await?; + + mpv.unobserve_property(MPV_CHANNEL_ID).await?; + + let (handle, cancellation_token) = + create_interruptable_event_property_checking_thread(mpv.clone(), |_property| { + // We should not receive any properties after unobserving + false + }); + + sleep(Duration::from_millis(5)).await; + mpv.set_property("pause", false).await?; + sleep(Duration::from_millis(5)).await; + + graceful_shutdown(cancellation_token, handle, mpv, proc).await?; + + Ok(()) +} diff --git a/tests/integration_tests/util.rs b/tests/integration_tests/util.rs index b087619..be7145c 100644 --- a/tests/integration_tests/util.rs +++ b/tests/integration_tests/util.rs @@ -1,10 +1,13 @@ use std::{path::Path, time::Duration}; -use mpvipc_async::{Mpv, MpvError}; +use thiserror::Error; use tokio::{ process::{Child, Command}, time::{sleep, timeout}, }; +use tokio_stream::StreamExt; + +use mpvipc_async::{Event, Mpv, MpvError, MpvExt, Property, parse_property}; #[cfg(target_family = "unix")] pub async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> { @@ -41,3 +44,106 @@ pub async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> { let mpv = Mpv::connect(socket_path.to_str().unwrap()).await?; Ok((process_handle, mpv)) } + +pub const MPV_CHANNEL_ID: u64 = 1337; + +#[derive(Error, Debug)] +pub enum PropertyCheckingThreadError { + #[error("Unexpected property: {0:?}")] + UnexpectedPropertyError(Property), + + #[error(transparent)] + MpvError(#[from] MpvError), +} + +/// This function will create an ongoing tokio task that collects [`Event::PropertyChange`] events, +/// and parses them into [`Property`]s. It will then run the property through the provided +/// closure, and return an error if the closure returns false. +/// +/// The returned cancellation token can be used to stop the task. +pub fn create_interruptable_event_property_checking_thread( + mpv: Mpv, + on_property: T, +) -> ( + tokio::task::JoinHandle>, + tokio_util::sync::CancellationToken, +) +where + T: Fn(Property) -> bool + Send + 'static, +{ + let cancellation_token = tokio_util::sync::CancellationToken::new(); + let cancellation_token_clone = cancellation_token.clone(); + let handle = tokio::spawn(async move { + let mut events = mpv.get_event_stream().await; + + loop { + tokio::select! { + event = events.next() => { + match event { + Some(Ok(event)) => { + match event { + Event::PropertyChange { id: Some(MPV_CHANNEL_ID), name, data } => { + let property = parse_property(&name, data).unwrap(); + if !on_property(property.clone()) { + return Err(PropertyCheckingThreadError::UnexpectedPropertyError(property)) + } + } + _ => { + log::trace!("Received unrelated event, ignoring: {:?}", event); + } + } + } + Some(Err(err)) => return Err(err.into()), + None => return Ok(()), + } + } + _ = cancellation_token_clone.cancelled() => return Ok(()), + } + } + }); + + (handle, cancellation_token) +} + +pub async fn check_property_thread_result( + handle: tokio::task::JoinHandle>, +) -> Result<(), MpvError> { + timeout(Duration::from_millis(500), handle) + .await + .map_err(|_| { + MpvError::InternalConnectionError("Event checking thread timed out".to_owned()) + })? + .map_err(|_| { + MpvError::InternalConnectionError("Event checking thread panicked".to_owned()) + })? + .map_err(|err| match err { + PropertyCheckingThreadError::UnexpectedPropertyError(property) => { + MpvError::Other(format!("Unexpected property: {:?}", property)) + } + PropertyCheckingThreadError::MpvError(err) => err, + }) +} + +/// This helper function will gracefully shut down both the event checking thread and the mpv process. +/// It will also return an error if the event checking thread happened to panic, or if it times out +/// The timeout is hardcoded to 500ms. +pub async fn graceful_shutdown( + cancellation_token: tokio_util::sync::CancellationToken, + handle: tokio::task::JoinHandle>, + mpv: Mpv, + mut proc: tokio::process::Child, +) -> Result<(), MpvError> { + cancellation_token.cancel(); + + check_property_thread_result(handle).await?; + + mpv.kill().await?; + proc.wait().await.map_err(|err| { + MpvError::InternalConnectionError(format!( + "Failed to wait for mpv process to exit: {}", + err + )) + })?; + + Ok(()) +}