217 lines
6.9 KiB
Rust
217 lines
6.9 KiB
Rust
|
use futures::{stream::StreamExt, Stream};
|
||
|
use mpvipc::{parse_event_property, Event, Mpv, MpvError, MpvExt};
|
||
|
use thiserror::Error;
|
||
|
use tokio::time::sleep;
|
||
|
use tokio::time::{timeout, Duration};
|
||
|
|
||
|
use test_log::test;
|
||
|
|
||
|
use super::*;
|
||
|
|
||
|
const MPV_CHANNEL_ID: usize = 1337;
|
||
|
|
||
|
#[derive(Error, Debug)]
|
||
|
enum PropertyCheckingThreadError {
|
||
|
#[error("Unexpected property: {0:?}")]
|
||
|
UnexpectedPropertyError(mpvipc::Property),
|
||
|
|
||
|
#[error(transparent)]
|
||
|
MpvError(#[from] MpvError),
|
||
|
}
|
||
|
|
||
|
fn create_interruptable_event_property_checking_thread<T>(
|
||
|
mut events: impl Stream<Item = Result<Event, MpvError>> + Unpin + Send + 'static,
|
||
|
on_property: T,
|
||
|
) -> (
|
||
|
tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
|
||
|
tokio_util::sync::CancellationToken,
|
||
|
)
|
||
|
where
|
||
|
T: Fn(mpvipc::Property) -> bool + Send + 'static,
|
||
|
{
|
||
|
let cancellation_token = tokio_util::sync::CancellationToken::new();
|
||
|
let cancellation_token_clone = cancellation_token.clone();
|
||
|
let handle = tokio::spawn(async move {
|
||
|
loop {
|
||
|
tokio::select! {
|
||
|
event = events.next() => {
|
||
|
match event {
|
||
|
Some(Ok(event)) => {
|
||
|
match event {
|
||
|
Event::PropertyChange { id: MPV_CHANNEL_ID, .. } => {
|
||
|
let property = parse_event_property(event).unwrap().1;
|
||
|
if !on_property(property.clone()) {
|
||
|
return Err(PropertyCheckingThreadError::UnexpectedPropertyError(property))
|
||
|
}
|
||
|
}
|
||
|
_ => {
|
||
|
log::trace!("Received unrelated event, ignoring: {:?}", event);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
Some(Err(err)) => return Err(err.into()),
|
||
|
None => return Ok(()),
|
||
|
}
|
||
|
}
|
||
|
_ = cancellation_token_clone.cancelled() => return Ok(()),
|
||
|
}
|
||
|
}
|
||
|
});
|
||
|
|
||
|
(handle, cancellation_token)
|
||
|
}
|
||
|
|
||
|
async fn graceful_shutdown(
|
||
|
cancellation_token: tokio_util::sync::CancellationToken,
|
||
|
handle: tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
|
||
|
mpv: Mpv,
|
||
|
mut proc: tokio::process::Child,
|
||
|
) -> Result<(), MpvError> {
|
||
|
cancellation_token.cancel();
|
||
|
|
||
|
match timeout(Duration::from_millis(500), handle).await {
|
||
|
Ok(Ok(Ok(()))) => {}
|
||
|
Ok(Ok(Err(err))) => match err {
|
||
|
PropertyCheckingThreadError::UnexpectedPropertyError(property) => {
|
||
|
return Err(MpvError::Other(format!(
|
||
|
"Unexpected property: {:?}",
|
||
|
property
|
||
|
)));
|
||
|
}
|
||
|
PropertyCheckingThreadError::MpvError(err) => return Err(err),
|
||
|
},
|
||
|
Ok(Err(_)) => {
|
||
|
return Err(MpvError::InternalConnectionError(
|
||
|
"Event checking thread timed out".to_owned(),
|
||
|
));
|
||
|
}
|
||
|
Err(_) => {
|
||
|
return Err(MpvError::InternalConnectionError(
|
||
|
"Event checking thread panicked".to_owned(),
|
||
|
));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
mpv.kill().await?;
|
||
|
proc.wait().await.map_err(|err| {
|
||
|
MpvError::InternalConnectionError(format!(
|
||
|
"Failed to wait for mpv process to exit: {}",
|
||
|
err
|
||
|
))
|
||
|
})?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
#[test(tokio::test)]
|
||
|
#[cfg(target_family = "unix")]
|
||
|
async fn test_highlevel_event_pause() -> Result<(), MpvError> {
|
||
|
let (proc, mpv) = spawn_headless_mpv().await?;
|
||
|
|
||
|
mpv.observe_property(MPV_CHANNEL_ID, "pause").await?;
|
||
|
|
||
|
let events = mpv.get_event_stream().await;
|
||
|
let (handle, cancellation_token) =
|
||
|
create_interruptable_event_property_checking_thread(events, |property| match property {
|
||
|
mpvipc::Property::Pause(_) => {
|
||
|
log::debug!("{:?}", property);
|
||
|
true
|
||
|
}
|
||
|
_ => false,
|
||
|
});
|
||
|
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
mpv.set_property("pause", false).await?;
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
mpv.set_property("pause", true).await?;
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
|
||
|
graceful_shutdown(cancellation_token, handle, mpv, proc).await?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
#[test(tokio::test)]
|
||
|
#[cfg(target_family = "unix")]
|
||
|
async fn test_highlevel_event_volume() -> Result<(), MpvError> {
|
||
|
let (proc, mpv) = spawn_headless_mpv().await?;
|
||
|
|
||
|
mpv.observe_property(1337, "volume").await?;
|
||
|
let events = mpv.get_event_stream().await;
|
||
|
let (handle, cancellation_token) =
|
||
|
create_interruptable_event_property_checking_thread(events, |property| match property {
|
||
|
mpvipc::Property::Volume(_) => {
|
||
|
log::trace!("{:?}", property);
|
||
|
true
|
||
|
}
|
||
|
_ => false,
|
||
|
});
|
||
|
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
mpv.set_property("volume", 100.0).await?;
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
mpv.set_property("volume", 40).await?;
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
mpv.set_property("volume", 0.0).await?;
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
|
||
|
graceful_shutdown(cancellation_token, handle, mpv, proc).await?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
#[test(tokio::test)]
|
||
|
#[cfg(target_family = "unix")]
|
||
|
async fn test_highlevel_event_mute() -> Result<(), MpvError> {
|
||
|
let (proc, mpv) = spawn_headless_mpv().await?;
|
||
|
|
||
|
mpv.observe_property(1337, "mute").await?;
|
||
|
let events = mpv.get_event_stream().await;
|
||
|
let (handle, cancellation_token) =
|
||
|
create_interruptable_event_property_checking_thread(events, |property| match property {
|
||
|
mpvipc::Property::Mute(_) => {
|
||
|
log::trace!("{:?}", property);
|
||
|
true
|
||
|
}
|
||
|
_ => false,
|
||
|
});
|
||
|
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
mpv.set_property("mute", true).await?;
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
mpv.set_property("mute", false).await?;
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
|
||
|
graceful_shutdown(cancellation_token, handle, mpv, proc).await?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
#[test(tokio::test)]
|
||
|
#[cfg(target_family = "unix")]
|
||
|
async fn test_highlevel_event_duration() -> Result<(), MpvError> {
|
||
|
let (proc, mpv) = spawn_headless_mpv().await?;
|
||
|
|
||
|
mpv.observe_property(1337, "duration").await?;
|
||
|
|
||
|
let events = mpv.get_event_stream().await;
|
||
|
let (handle, cancellation_token) =
|
||
|
create_interruptable_event_property_checking_thread(events, |property| match property {
|
||
|
mpvipc::Property::Duration(_) => {
|
||
|
log::trace!("{:?}", property);
|
||
|
true
|
||
|
}
|
||
|
_ => false,
|
||
|
});
|
||
|
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
mpv.set_property("pause", true).await?;
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
mpv.set_property("pause", false).await?;
|
||
|
sleep(Duration::from_millis(5)).await;
|
||
|
|
||
|
graceful_shutdown(cancellation_token, handle, mpv, proc).await?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|