tests/misc: test unobserving an event
This commit is contained in:
@@ -1,118 +1,11 @@
|
||||
use futures::stream::StreamExt;
|
||||
use mpvipc_async::{Event, Mpv, MpvError, MpvExt, Property, parse_property};
|
||||
use thiserror::Error;
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::{Duration, timeout};
|
||||
|
||||
use test_log::test;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use mpvipc_async::{MpvError, MpvExt, Property};
|
||||
|
||||
use super::*;
|
||||
|
||||
const MPV_CHANNEL_ID: u64 = 1337;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
enum PropertyCheckingThreadError {
|
||||
#[error("Unexpected property: {0:?}")]
|
||||
UnexpectedPropertyError(Property),
|
||||
|
||||
#[error(transparent)]
|
||||
MpvError(#[from] MpvError),
|
||||
}
|
||||
|
||||
/// This function will create an ongoing tokio task that collects [`Event::PropertyChange`] events,
|
||||
/// and parses them into [`Property`]s. It will then run the property through the provided
|
||||
/// closure, and return an error if the closure returns false.
|
||||
///
|
||||
/// The returned cancellation token can be used to stop the task.
|
||||
fn create_interruptable_event_property_checking_thread<T>(
|
||||
mpv: Mpv,
|
||||
on_property: T,
|
||||
) -> (
|
||||
tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
|
||||
tokio_util::sync::CancellationToken,
|
||||
)
|
||||
where
|
||||
T: Fn(Property) -> bool + Send + 'static,
|
||||
{
|
||||
let cancellation_token = tokio_util::sync::CancellationToken::new();
|
||||
let cancellation_token_clone = cancellation_token.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut events = mpv.get_event_stream().await;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = events.next() => {
|
||||
match event {
|
||||
Some(Ok(event)) => {
|
||||
match event {
|
||||
Event::PropertyChange { id: Some(MPV_CHANNEL_ID), name, data } => {
|
||||
let property = parse_property(&name, data).unwrap();
|
||||
if !on_property(property.clone()) {
|
||||
return Err(PropertyCheckingThreadError::UnexpectedPropertyError(property))
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
log::trace!("Received unrelated event, ignoring: {:?}", event);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(err)) => return Err(err.into()),
|
||||
None => return Ok(()),
|
||||
}
|
||||
}
|
||||
_ = cancellation_token_clone.cancelled() => return Ok(()),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(handle, cancellation_token)
|
||||
}
|
||||
|
||||
/// This helper function will gracefully shut down both the event checking thread and the mpv process.
|
||||
/// It will also return an error if the event checking thread happened to panic, or if it times out
|
||||
/// The timeout is hardcoded to 500ms.
|
||||
async fn graceful_shutdown(
|
||||
cancellation_token: tokio_util::sync::CancellationToken,
|
||||
handle: tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
|
||||
mpv: Mpv,
|
||||
mut proc: tokio::process::Child,
|
||||
) -> Result<(), MpvError> {
|
||||
cancellation_token.cancel();
|
||||
|
||||
match timeout(Duration::from_millis(500), handle).await {
|
||||
Ok(Ok(Ok(()))) => {}
|
||||
Ok(Ok(Err(err))) => match err {
|
||||
PropertyCheckingThreadError::UnexpectedPropertyError(property) => {
|
||||
return Err(MpvError::Other(format!(
|
||||
"Unexpected property: {:?}",
|
||||
property
|
||||
)));
|
||||
}
|
||||
PropertyCheckingThreadError::MpvError(err) => return Err(err),
|
||||
},
|
||||
Ok(Err(_)) => {
|
||||
return Err(MpvError::InternalConnectionError(
|
||||
"Event checking thread timed out".to_owned(),
|
||||
));
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(MpvError::InternalConnectionError(
|
||||
"Event checking thread panicked".to_owned(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
mpv.kill().await?;
|
||||
proc.wait().await.map_err(|err| {
|
||||
MpvError::InternalConnectionError(format!(
|
||||
"Failed to wait for mpv process to exit: {}",
|
||||
err
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test correct parsing of different values of the "pause" property
|
||||
#[test(tokio::test)]
|
||||
#[cfg(target_family = "unix")]
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
use mpvipc_async::{MpvError, MpvExt};
|
||||
use std::time::Duration;
|
||||
|
||||
use test_log::test;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use mpvipc_async::{MpvError, MpvExt, Property};
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -60,3 +65,45 @@ async fn test_get_nonexistent_property() -> Result<(), MpvError> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test(tokio::test)]
|
||||
#[cfg(target_family = "unix")]
|
||||
async fn test_unobserve_property() -> Result<(), MpvError> {
|
||||
let (proc, mpv) = spawn_headless_mpv().await?;
|
||||
|
||||
mpv.observe_property(MPV_CHANNEL_ID, "pause").await?;
|
||||
|
||||
let (handle, cancellation_token) = create_interruptable_event_property_checking_thread(
|
||||
mpv.clone(),
|
||||
|property| match property {
|
||||
Property::Pause(_) => {
|
||||
log::debug!("{:?}", property);
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
},
|
||||
);
|
||||
|
||||
sleep(Duration::from_millis(5)).await;
|
||||
mpv.set_property("pause", true).await?;
|
||||
sleep(Duration::from_millis(5)).await;
|
||||
|
||||
cancellation_token.cancel();
|
||||
check_property_thread_result(handle).await?;
|
||||
|
||||
mpv.unobserve_property(MPV_CHANNEL_ID).await?;
|
||||
|
||||
let (handle, cancellation_token) =
|
||||
create_interruptable_event_property_checking_thread(mpv.clone(), |_property| {
|
||||
// We should not receive any properties after unobserving
|
||||
false
|
||||
});
|
||||
|
||||
sleep(Duration::from_millis(5)).await;
|
||||
mpv.set_property("pause", false).await?;
|
||||
sleep(Duration::from_millis(5)).await;
|
||||
|
||||
graceful_shutdown(cancellation_token, handle, mpv, proc).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
use std::{path::Path, time::Duration};
|
||||
|
||||
use mpvipc_async::{Mpv, MpvError};
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
process::{Child, Command},
|
||||
time::{sleep, timeout},
|
||||
};
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use mpvipc_async::{Event, Mpv, MpvError, MpvExt, Property, parse_property};
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
pub async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> {
|
||||
@@ -41,3 +44,106 @@ pub async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> {
|
||||
let mpv = Mpv::connect(socket_path.to_str().unwrap()).await?;
|
||||
Ok((process_handle, mpv))
|
||||
}
|
||||
|
||||
pub const MPV_CHANNEL_ID: u64 = 1337;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum PropertyCheckingThreadError {
|
||||
#[error("Unexpected property: {0:?}")]
|
||||
UnexpectedPropertyError(Property),
|
||||
|
||||
#[error(transparent)]
|
||||
MpvError(#[from] MpvError),
|
||||
}
|
||||
|
||||
/// This function will create an ongoing tokio task that collects [`Event::PropertyChange`] events,
|
||||
/// and parses them into [`Property`]s. It will then run the property through the provided
|
||||
/// closure, and return an error if the closure returns false.
|
||||
///
|
||||
/// The returned cancellation token can be used to stop the task.
|
||||
pub fn create_interruptable_event_property_checking_thread<T>(
|
||||
mpv: Mpv,
|
||||
on_property: T,
|
||||
) -> (
|
||||
tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
|
||||
tokio_util::sync::CancellationToken,
|
||||
)
|
||||
where
|
||||
T: Fn(Property) -> bool + Send + 'static,
|
||||
{
|
||||
let cancellation_token = tokio_util::sync::CancellationToken::new();
|
||||
let cancellation_token_clone = cancellation_token.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut events = mpv.get_event_stream().await;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = events.next() => {
|
||||
match event {
|
||||
Some(Ok(event)) => {
|
||||
match event {
|
||||
Event::PropertyChange { id: Some(MPV_CHANNEL_ID), name, data } => {
|
||||
let property = parse_property(&name, data).unwrap();
|
||||
if !on_property(property.clone()) {
|
||||
return Err(PropertyCheckingThreadError::UnexpectedPropertyError(property))
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
log::trace!("Received unrelated event, ignoring: {:?}", event);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(err)) => return Err(err.into()),
|
||||
None => return Ok(()),
|
||||
}
|
||||
}
|
||||
_ = cancellation_token_clone.cancelled() => return Ok(()),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(handle, cancellation_token)
|
||||
}
|
||||
|
||||
pub async fn check_property_thread_result(
|
||||
handle: tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
|
||||
) -> Result<(), MpvError> {
|
||||
timeout(Duration::from_millis(500), handle)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
MpvError::InternalConnectionError("Event checking thread timed out".to_owned())
|
||||
})?
|
||||
.map_err(|_| {
|
||||
MpvError::InternalConnectionError("Event checking thread panicked".to_owned())
|
||||
})?
|
||||
.map_err(|err| match err {
|
||||
PropertyCheckingThreadError::UnexpectedPropertyError(property) => {
|
||||
MpvError::Other(format!("Unexpected property: {:?}", property))
|
||||
}
|
||||
PropertyCheckingThreadError::MpvError(err) => err,
|
||||
})
|
||||
}
|
||||
|
||||
/// This helper function will gracefully shut down both the event checking thread and the mpv process.
|
||||
/// It will also return an error if the event checking thread happened to panic, or if it times out
|
||||
/// The timeout is hardcoded to 500ms.
|
||||
pub async fn graceful_shutdown(
|
||||
cancellation_token: tokio_util::sync::CancellationToken,
|
||||
handle: tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
|
||||
mpv: Mpv,
|
||||
mut proc: tokio::process::Child,
|
||||
) -> Result<(), MpvError> {
|
||||
cancellation_token.cancel();
|
||||
|
||||
check_property_thread_result(handle).await?;
|
||||
|
||||
mpv.kill().await?;
|
||||
proc.wait().await.map_err(|err| {
|
||||
MpvError::InternalConnectionError(format!(
|
||||
"Failed to wait for mpv process to exit: {}",
|
||||
err
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user