use futures::{stream::StreamExt, Stream}; use mpvipc::{parse_event_property, Event, Mpv, MpvError, MpvExt}; use thiserror::Error; use tokio::time::sleep; use tokio::time::{timeout, Duration}; use test_log::test; use super::*; const MPV_CHANNEL_ID: usize = 1337; #[derive(Error, Debug)] enum PropertyCheckingThreadError { #[error("Unexpected property: {0:?}")] UnexpectedPropertyError(mpvipc::Property), #[error(transparent)] MpvError(#[from] MpvError), } fn create_interruptable_event_property_checking_thread( mut events: impl Stream> + Unpin + Send + 'static, on_property: T, ) -> ( tokio::task::JoinHandle>, tokio_util::sync::CancellationToken, ) where T: Fn(mpvipc::Property) -> bool + Send + 'static, { let cancellation_token = tokio_util::sync::CancellationToken::new(); let cancellation_token_clone = cancellation_token.clone(); let handle = tokio::spawn(async move { loop { tokio::select! { event = events.next() => { match event { Some(Ok(event)) => { match event { Event::PropertyChange { id: MPV_CHANNEL_ID, .. } => { let property = parse_event_property(event).unwrap().1; if !on_property(property.clone()) { return Err(PropertyCheckingThreadError::UnexpectedPropertyError(property)) } } _ => { log::trace!("Received unrelated event, ignoring: {:?}", event); } } } Some(Err(err)) => return Err(err.into()), None => return Ok(()), } } _ = cancellation_token_clone.cancelled() => return Ok(()), } } }); (handle, cancellation_token) } async fn graceful_shutdown( cancellation_token: tokio_util::sync::CancellationToken, handle: tokio::task::JoinHandle>, mpv: Mpv, mut proc: tokio::process::Child, ) -> Result<(), MpvError> { cancellation_token.cancel(); match timeout(Duration::from_millis(500), handle).await { Ok(Ok(Ok(()))) => {} Ok(Ok(Err(err))) => match err { PropertyCheckingThreadError::UnexpectedPropertyError(property) => { return Err(MpvError::Other(format!( "Unexpected property: {:?}", property ))); } PropertyCheckingThreadError::MpvError(err) => return Err(err), }, Ok(Err(_)) => { return Err(MpvError::InternalConnectionError( "Event checking thread timed out".to_owned(), )); } Err(_) => { return Err(MpvError::InternalConnectionError( "Event checking thread panicked".to_owned(), )); } } mpv.kill().await?; proc.wait().await.map_err(|err| { MpvError::InternalConnectionError(format!( "Failed to wait for mpv process to exit: {}", err )) })?; Ok(()) } #[test(tokio::test)] #[cfg(target_family = "unix")] async fn test_highlevel_event_pause() -> Result<(), MpvError> { let (proc, mpv) = spawn_headless_mpv().await?; mpv.observe_property(MPV_CHANNEL_ID, "pause").await?; let events = mpv.get_event_stream().await; let (handle, cancellation_token) = create_interruptable_event_property_checking_thread(events, |property| match property { mpvipc::Property::Pause(_) => { log::debug!("{:?}", property); true } _ => false, }); sleep(Duration::from_millis(5)).await; mpv.set_property("pause", false).await?; sleep(Duration::from_millis(5)).await; mpv.set_property("pause", true).await?; sleep(Duration::from_millis(5)).await; graceful_shutdown(cancellation_token, handle, mpv, proc).await?; Ok(()) } #[test(tokio::test)] #[cfg(target_family = "unix")] async fn test_highlevel_event_volume() -> Result<(), MpvError> { let (proc, mpv) = spawn_headless_mpv().await?; mpv.observe_property(1337, "volume").await?; let events = mpv.get_event_stream().await; let (handle, cancellation_token) = create_interruptable_event_property_checking_thread(events, |property| match property { mpvipc::Property::Volume(_) => { log::trace!("{:?}", property); true } _ => false, }); sleep(Duration::from_millis(5)).await; mpv.set_property("volume", 100.0).await?; sleep(Duration::from_millis(5)).await; mpv.set_property("volume", 40).await?; sleep(Duration::from_millis(5)).await; mpv.set_property("volume", 0.0).await?; sleep(Duration::from_millis(5)).await; graceful_shutdown(cancellation_token, handle, mpv, proc).await?; Ok(()) } #[test(tokio::test)] #[cfg(target_family = "unix")] async fn test_highlevel_event_mute() -> Result<(), MpvError> { let (proc, mpv) = spawn_headless_mpv().await?; mpv.observe_property(1337, "mute").await?; let events = mpv.get_event_stream().await; let (handle, cancellation_token) = create_interruptable_event_property_checking_thread(events, |property| match property { mpvipc::Property::Mute(_) => { log::trace!("{:?}", property); true } _ => false, }); sleep(Duration::from_millis(5)).await; mpv.set_property("mute", true).await?; sleep(Duration::from_millis(5)).await; mpv.set_property("mute", false).await?; sleep(Duration::from_millis(5)).await; graceful_shutdown(cancellation_token, handle, mpv, proc).await?; Ok(()) } #[test(tokio::test)] #[cfg(target_family = "unix")] async fn test_highlevel_event_duration() -> Result<(), MpvError> { let (proc, mpv) = spawn_headless_mpv().await?; mpv.observe_property(1337, "duration").await?; let events = mpv.get_event_stream().await; let (handle, cancellation_token) = create_interruptable_event_property_checking_thread(events, |property| match property { mpvipc::Property::Duration(_) => { log::trace!("{:?}", property); true } _ => false, }); sleep(Duration::from_millis(5)).await; mpv.set_property("pause", true).await?; sleep(Duration::from_millis(5)).await; mpv.set_property("pause", false).await?; sleep(Duration::from_millis(5)).await; graceful_shutdown(cancellation_token, handle, mpv, proc).await?; Ok(()) }