Compare commits

..

3 Commits

Author SHA1 Message Date
Oystein Kristoffer Tveit 752c89c8ed
WIP: add tests 2024-04-19 00:59:23 +02:00
Oystein Kristoffer Tveit ea12dbec5b
nix support 2024-04-19 00:59:22 +02:00
Oystein Kristoffer Tveit deb45a4570
Make entire project async
This moves all communication with Mpv's unix socket into another tokio
task, and uses message passing through clonable mpsc channels to receive
commands to execute, and to send responses.
2024-04-19 00:59:22 +02:00
4 changed files with 247 additions and 14 deletions

View File

@ -414,7 +414,9 @@ impl Mpv {
))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response,
Ok(MpvIpcResponse(response)) => response.and_then(|value| {
value.ok_or(Error(ErrorCode::MissingValue))
}),
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
}
@ -605,7 +607,7 @@ impl Mpv {
/// Run a custom command.
/// This should only be used if the desired command is not implemented
/// with [MpvCommand].
pub async fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<Value, Error> {
pub async fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<Option<Value>, Error> {
let command = Vec::from(
[command]
.iter()
@ -732,7 +734,7 @@ impl Mpv {
})?
}
};
self.set_property("loop-playlist", enabled).await
self.set_property("loo-playlist", enabled).await
}
pub async fn set_mute(&self, option: Switch) -> Result<(), Error> {

View File

@ -24,7 +24,7 @@ pub(crate) enum MpvIpcCommand {
}
#[derive(Debug, Clone)]
pub(crate) struct MpvIpcResponse(pub(crate) Result<Value, Error>);
pub(crate) struct MpvIpcResponse(pub(crate) Result<Option<Value>, Error>);
impl MpvIpc {
pub(crate) fn new(
@ -38,7 +38,7 @@ impl MpvIpc {
}
}
pub(crate) async fn send_command(&mut self, command: &[&str]) -> Result<Value, Error> {
pub(crate) async fn send_command(&mut self, command: &[&str]) -> Result<Option<Value>, Error> {
let lock = self.socket_lock.lock().await;
// START CRITICAL SECTION
let ipc_command = json!({ "command": command });
@ -70,7 +70,7 @@ impl MpvIpc {
}
pub(crate) async fn get_mpv_property(&mut self, property: &str) -> Result<Value, Error> {
pub(crate) async fn get_mpv_property(&mut self, property: &str) -> Result<Option<Value>, Error> {
self.send_command(&["get_property", property]).await
}
@ -78,7 +78,7 @@ impl MpvIpc {
&mut self,
property: &str,
value: Value,
) -> Result<Value, Error> {
) -> Result<Option<Value>, Error> {
let str_value = match &value {
Value::String(s) => s,
v => &serde_json::to_string(&v).unwrap()
@ -91,12 +91,12 @@ impl MpvIpc {
&mut self,
id: isize,
property: &str,
) -> Result<Value, Error> {
) -> Result<Option<Value>, Error> {
self.send_command(&["observe_property", &id.to_string(), property])
.await
}
pub(crate) async fn unobserve_property(&mut self, id: isize) -> Result<Value, Error> {
pub(crate) async fn unobserve_property(&mut self, id: isize) -> Result<Option<Value>, Error> {
self.send_command(&["unobserve_property", &id.to_string()])
.await
}
@ -133,7 +133,7 @@ impl MpvIpc {
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::Exit => {
tx.send(MpvIpcResponse(Ok(Value::Null))).unwrap();
tx.send(MpvIpcResponse(Ok(None))).unwrap();
return Ok(());
}
}
@ -143,14 +143,14 @@ impl MpvIpc {
}
}
fn parse_mpv_response_data(value: Value) -> Result<Value, Error> {
fn parse_mpv_response_data(value: Value) -> Result<Option<Value>, Error> {
log::trace!("Parsing mpv response data: {:?}", value);
let result = value
.as_object()
.map(|o| (o.get("error").and_then(|e| e.as_str()), o.get("data")))
.ok_or(Error(ErrorCode::UnexpectedValue))
.and_then(|(error, data)| match error {
Some("success") => data.ok_or(Error(ErrorCode::UnexpectedValue)),
Some("success") => Ok(data),
Some(e) => Err(Error(ErrorCode::MpvError(e.to_string()))),
None => Err(Error(ErrorCode::UnexpectedValue)),
});
@ -158,5 +158,5 @@ fn parse_mpv_response_data(value: Value) -> Result<Value, Error> {
Ok(v) => log::trace!("Successfully parsed mpv response data: {:?}", v),
Err(e) => log::trace!("Error parsing mpv response data: {:?}", e),
}
result.map(|v| v.clone())
result.map(|opt| opt.map(|val| val.clone()))
}

View File

@ -144,7 +144,7 @@ async fn test_get_property_simultaneous_requests() {
let mpv_poller_3 = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(2)).await;
let maybe_volume = mpv_clone_3.get_property::<f64>("non_existent").await;
let maybe_volume = mpv_clone_3.get_property::<f64>("nonexistent").await;
assert_eq!(
maybe_volume,
Err(Error(ErrorCode::MpvError(

231
tests/set_property.rs Normal file
View File

@ -0,0 +1,231 @@
use std::{panic, time::Duration};
use futures::{stream::FuturesUnordered, SinkExt, StreamExt};
use mpvipc::{Error, ErrorCode, Mpv, Playlist, PlaylistEntry};
use serde_json::{json, Value};
use test_log::test;
use tokio::{net::UnixStream, task::JoinHandle};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
fn test_socket(answers: Vec<String>) -> (UnixStream, JoinHandle<Result<(), LinesCodecError>>) {
let (socket, server) = UnixStream::pair().unwrap();
let join_handle = tokio::spawn(async move {
let mut framed = Framed::new(socket, LinesCodec::new());
for answer in answers {
framed.next().await;
framed.send(answer).await?;
}
Ok(())
});
(server, join_handle)
}
#[test(tokio::test)]
async fn test_set_property_successful() {
let (server, join_handle) = test_socket(vec![
json!({ "data": null, "request_id": 0, "error": "success" }).to_string(),
]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let volume = mpv.set_property("volume", 64.0).await;
assert!(volume.is_ok());
join_handle.await.unwrap().unwrap();
}
#[test(tokio::test)]
async fn test_set_property_broken_pipe() {
let (server, join_handle) = test_socket(vec![]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let maybe_set_volume = mpv.set_property("volume", 64.0).await;
assert_eq!(
maybe_set_volume,
Err(Error(ErrorCode::ConnectError(
"Broken pipe (os error 32)".to_owned()
)))
);
join_handle.await.unwrap().unwrap();
}
#[test(tokio::test)]
async fn test_set_property_wrong_type() {
let (server, join_handle) = test_socket(vec![
json!({"request_id":0,"error":"unsupported format for accessing property"}).to_string(),
]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let maybe_volume = mpv.set_property::<bool>("volume", true).await;
assert_eq!(
maybe_volume,
Err(Error(ErrorCode::MpvError(
"unsupported format for accessing property".to_owned()
)))
);
join_handle.await.unwrap().unwrap();
}
#[test(tokio::test)]
async fn test_get_property_error() {
let (server, join_handle) = test_socket(vec![
json!({"request_id":0,"error":"property not found"}).to_string(),
]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let maybe_volume = mpv.set_property("nonexistent", true).await;
assert_eq!(
maybe_volume,
Err(Error(ErrorCode::MpvError(
"property not found".to_owned()
)))
);
join_handle.await.unwrap().unwrap();
}
#[test(tokio::test)]
async fn test_set_property_simultaneous_requests() {
let (socket, server) = UnixStream::pair().unwrap();
let mpv_handle: JoinHandle<Result<(), LinesCodecError>> = tokio::spawn(async move {
let mut framed = Framed::new(socket, LinesCodec::new());
while let Some(request) = framed.next().await {
match serde_json::from_str::<Value>(&request.unwrap()) {
Ok(json) => {
let property = json["command"][1].as_str().unwrap();
let value = &json["command"][2];
log::info!("Received set property command: {:?} => {:?}", property, value);
match property {
"volume" => {
let response =
json!({ "request_id": 0, "error": "success" })
.to_string();
framed.send(response).await.unwrap();
}
"pause" => {
let response =
json!({ "request_id": 0, "error": "success" })
.to_string();
framed.send(response).await.unwrap();
}
_ => {
let response =
json!({ "error":"property not found", "request_id": 0 })
.to_string();
framed.send(response).await.unwrap();
}
}
}
Err(_) => {}
}
}
Ok(())
});
let mpv = Mpv::connect_socket(server).await.unwrap();
let mpv_clone_1 = mpv.clone();
let mpv_poller_1 = tokio::spawn(async move {
loop {
let status = mpv_clone_1.set_property("volume", 100).await;
assert_eq!(status, Ok(()));
}
});
let mpv_clone_2 = mpv.clone();
let mpv_poller_2 = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(1)).await;
let status = mpv_clone_2.set_property("pause", false).await;
assert_eq!(status, Ok(()));
}
});
let mpv_clone_3 = mpv.clone();
let mpv_poller_3 = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(2)).await;
let maybe_volume = mpv_clone_3.set_property("nonexistent", "a").await;
assert_eq!(
maybe_volume,
Err(Error(ErrorCode::MpvError(
"property not found".to_owned()
)))
);
}
});
let mut tasks = FuturesUnordered::new();
tasks.push(mpv_handle);
tasks.push(mpv_poller_1);
tasks.push(mpv_poller_2);
tasks.push(mpv_poller_3);
if tokio::time::timeout(Duration::from_millis(200), tasks.next())
.await
.is_ok()
{
panic!("One of the pollers quit unexpectedly");
};
}
#[test(tokio::test)]
async fn test_get_playlist() {
let expected = Playlist(vec![
PlaylistEntry {
id: 0,
filename: "file1".to_string(),
title: "title1".to_string(),
current: false,
},
PlaylistEntry {
id: 1,
filename: "file2".to_string(),
title: "title2".to_string(),
current: true,
},
PlaylistEntry {
id: 2,
filename: "file3".to_string(),
title: "title3".to_string(),
current: false,
},
]);
let (server, join_handle) = test_socket(vec![json!({
"data": expected.0.iter().map(|entry| {
json!({
"filename": entry.filename,
"title": entry.title,
"current": entry.current
})
}).collect::<Vec<Value>>(),
"request_id": 0,
"error": "success"
})
.to_string()]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let playlist = mpv.get_playlist().await.unwrap();
assert_eq!(playlist, expected);
join_handle.await.unwrap().unwrap();
}
#[test(tokio::test)]
async fn test_get_playlist_empty() {
let (server, join_handle) = test_socket(vec![
json!({ "data": [], "request_id": 0, "error": "success" }).to_string(),
]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let playlist = mpv.get_playlist().await.unwrap();
assert_eq!(playlist, Playlist(vec![]));
join_handle.await.unwrap().unwrap();
}