Make entire project async

This moves all communication with Mpv's unix socket into another tokio
task, and uses message passing through clonable mpsc channels to receive
commands to execute, and to send responses.
This commit is contained in:
Oystein Kristoffer Tveit 2024-04-17 00:07:11 +02:00
parent f5c9674b78
commit deb45a4570
Signed by: oysteikt
GPG Key ID: 9F2F7D8250F35146
6 changed files with 638 additions and 646 deletions

View File

@ -8,11 +8,19 @@ homepage = "https://gitlab.com/mpv-ipc/mpvipc"
repository = "https://gitlab.com/mpv-ipc/mpvipc" repository = "https://gitlab.com/mpv-ipc/mpvipc"
documentation = "https://docs.rs/mpvipc/" documentation = "https://docs.rs/mpvipc/"
edition = "2021" edition = "2021"
rust-version = "1.75"
[dependencies] [dependencies]
serde_json = "1.0.104" serde_json = "1.0.104"
log = "0.4.19" log = "0.4.19"
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
tokio = { version = "1.37.0", features = ["sync", "macros", "rt", "net"] }
tokio-util = { version = "0.7.10", features = ["codec"] }
futures = "0.3.30"
[dev-dependencies] [dev-dependencies]
env_logger = "0.10.0" env_logger = "0.10.0"
tokio = { version = "1.37.0", features = ["rt-multi-thread"] }
[lib]
doctest = false

View File

@ -1,15 +1,16 @@
use env_logger; use env_logger;
use mpvipc::{Error as MpvError, Mpv}; use mpvipc::{Error as MpvError, Mpv};
fn main() -> Result<(), MpvError> { #[tokio::main]
async fn main() -> Result<(), MpvError> {
env_logger::init(); env_logger::init();
let mpv = Mpv::connect("/tmp/mpv.sock")?; let mpv = Mpv::connect("/tmp/mpv.sock").await?;
let meta = mpv.get_metadata()?; let meta = mpv.get_metadata().await?;
println!("metadata: {:?}", meta); println!("metadata: {:?}", meta);
let playlist = mpv.get_playlist()?; let playlist = mpv.get_playlist().await?;
println!("playlist: {:?}", playlist); println!("playlist: {:?}", playlist);
let playback_time: f64 = mpv.get_property("playback-time")?; let playback_time: f64 = mpv.get_property("playback-time").await?;
println!("playback-time: {}", playback_time); println!("playback-time: {}", playback_time);
Ok(()) Ok(())
} }

View File

@ -11,58 +11,60 @@ fn seconds_to_hms(total: f64) -> String {
format!("{:02}:{:02}:{:02}", hours, minutes, seconds) format!("{:02}:{:02}:{:02}", hours, minutes, seconds)
} }
fn main() -> Result<(), Error> { #[tokio::main]
async fn main() -> Result<(), Error> {
env_logger::init(); env_logger::init();
let mut mpv = Mpv::connect("/tmp/mpv.sock")?; let mut mpv = Mpv::connect("/tmp/mpv.sock").await?;
let mut pause = false; let mut pause = false;
let mut playback_time = std::f64::NAN; let mut playback_time = std::f64::NAN;
let mut duration = std::f64::NAN; let mut duration = std::f64::NAN;
mpv.observe_property(1, "path")?; mpv.observe_property(1, "path").await?;
mpv.observe_property(2, "pause")?; mpv.observe_property(2, "pause").await?;
mpv.observe_property(3, "playback-time")?; mpv.observe_property(3, "playback-time").await?;
mpv.observe_property(4, "duration")?; mpv.observe_property(4, "duration").await?;
mpv.observe_property(5, "metadata")?; mpv.observe_property(5, "metadata").await?;
loop { loop {
let event = mpv.event_listen()?; // TODO:
match event { // let event = mpv.event_listen()?;
Event::PropertyChange { id: _, property } => match property { // match event {
Property::Path(Some(value)) => println!("\nPlaying: {}", value), // Event::PropertyChange { id: _, property } => match property {
Property::Path(None) => (), // Property::Path(Some(value)) => println!("\nPlaying: {}", value),
Property::Pause(value) => pause = value, // Property::Path(None) => (),
Property::PlaybackTime(Some(value)) => playback_time = value, // Property::Pause(value) => pause = value,
Property::PlaybackTime(None) => playback_time = std::f64::NAN, // Property::PlaybackTime(Some(value)) => playback_time = value,
Property::Duration(Some(value)) => duration = value, // Property::PlaybackTime(None) => playback_time = std::f64::NAN,
Property::Duration(None) => duration = std::f64::NAN, // Property::Duration(Some(value)) => duration = value,
Property::Metadata(Some(value)) => { // Property::Duration(None) => duration = std::f64::NAN,
println!("File tags:"); // Property::Metadata(Some(value)) => {
if let Some(MpvDataType::String(value)) = value.get("ARTIST") { // println!("File tags:");
println!(" Artist: {}", value); // if let Some(MpvDataType::String(value)) = value.get("ARTIST") {
} // println!(" Artist: {}", value);
if let Some(MpvDataType::String(value)) = value.get("ALBUM") { // }
println!(" Album: {}", value); // if let Some(MpvDataType::String(value)) = value.get("ALBUM") {
} // println!(" Album: {}", value);
if let Some(MpvDataType::String(value)) = value.get("TITLE") { // }
println!(" Title: {}", value); // if let Some(MpvDataType::String(value)) = value.get("TITLE") {
} // println!(" Title: {}", value);
if let Some(MpvDataType::String(value)) = value.get("TRACK") { // }
println!(" Track: {}", value); // if let Some(MpvDataType::String(value)) = value.get("TRACK") {
} // println!(" Track: {}", value);
} // }
Property::Metadata(None) => (), // }
Property::Unknown { name: _, data: _ } => (), // Property::Metadata(None) => (),
}, // Property::Unknown { name: _, data: _ } => (),
Event::Shutdown => return Ok(()), // },
Event::Unimplemented => panic!("Unimplemented event"), // Event::Shutdown => return Ok(()),
_ => (), // Event::Unimplemented => panic!("Unimplemented event"),
} // _ => (),
print!( // }
"{}{} / {} ({:.0}%)\r", // print!(
if pause { "(Paused) " } else { "" }, // "{}{} / {} ({:.0}%)\r",
seconds_to_hms(playback_time), // if pause { "(Paused) " } else { "" },
seconds_to_hms(duration), // seconds_to_hms(playback_time),
100. * playback_time / duration // seconds_to_hms(duration),
); // 100. * playback_time / duration
io::stdout().flush().unwrap(); // );
// io::stdout().flush().unwrap();
} }
} }

View File

@ -1,14 +1,13 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::Value;
use std::collections::HashMap; use std::{
use std::fmt::{self, Display}; collections::HashMap,
use std::io::{BufReader, Read}; fmt::{self, Display},
use std::os::unix::net::UnixStream;
use crate::ipc::{
get_mpv_property, get_mpv_property_string, listen, listen_raw, observe_mpv_property,
run_mpv_command, set_mpv_property, unobserve_mpv_property,
}; };
use tokio::{net::UnixStream, sync::oneshot};
use crate::ipc::{MpvIpc, MpvIpcCommand, MpvIpcResponse};
use crate::message_parser::TypeHandler;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Event { pub enum Event {
@ -80,6 +79,10 @@ pub enum MpvCommand {
Unobserve(isize), Unobserve(isize),
} }
trait IntoRawCommandPart {
fn into_raw_command_part(self) -> String;
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MpvDataType { pub enum MpvDataType {
Array(Vec<MpvDataType>), Array(Vec<MpvDataType>),
@ -99,12 +102,31 @@ pub enum NumberChangeOptions {
Decrease, Decrease,
} }
impl IntoRawCommandPart for NumberChangeOptions {
fn into_raw_command_part(self) -> String {
match self {
NumberChangeOptions::Absolute => "absolute".to_string(),
NumberChangeOptions::Increase => "increase".to_string(),
NumberChangeOptions::Decrease => "decrease".to_string(),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum PlaylistAddOptions { pub enum PlaylistAddOptions {
Replace, Replace,
Append, Append,
} }
impl IntoRawCommandPart for PlaylistAddOptions {
fn into_raw_command_part(self) -> String {
match self {
PlaylistAddOptions::Replace => "replace".to_string(),
PlaylistAddOptions::Append => "append".to_string(),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum PlaylistAddTypeOptions { pub enum PlaylistAddTypeOptions {
File, File,
@ -119,6 +141,17 @@ pub enum SeekOptions {
AbsolutePercent, AbsolutePercent,
} }
impl IntoRawCommandPart for SeekOptions {
fn into_raw_command_part(self) -> String {
match self {
SeekOptions::Relative => "relative".to_string(),
SeekOptions::Absolute => "absolute".to_string(),
SeekOptions::RelativePercent => "relative-percent".to_string(),
SeekOptions::AbsolutePercent => "absolute-percent".to_string(),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum Switch { pub enum Switch {
On, On,
@ -126,7 +159,7 @@ pub enum Switch {
Toggle, Toggle,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ErrorCode { pub enum ErrorCode {
MpvError(String), MpvError(String),
JsonParseError(String), JsonParseError(String),
@ -144,7 +177,7 @@ pub enum ErrorCode {
ValueDoesNotContainUsize, ValueDoesNotContainUsize,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PlaylistEntry { pub struct PlaylistEntry {
pub id: usize, pub id: usize,
pub filename: String, pub filename: String,
@ -152,52 +185,64 @@ pub struct PlaylistEntry {
pub current: bool, pub current: bool,
} }
pub struct Mpv { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) stream: UnixStream,
pub(crate) reader: BufReader<UnixStream>,
pub(crate) name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Playlist(pub Vec<PlaylistEntry>); pub struct Playlist(pub Vec<PlaylistEntry>);
#[derive(Debug, Clone, Serialize, Deserialize)] pub trait GetPropertyTypeHandler: Sized {
// TODO: fix this
#[allow(async_fn_in_trait)]
async fn get_property_generic(instance: &Mpv, property: &str) -> Result<Self, Error>;
}
impl<T> GetPropertyTypeHandler for T
where
T: TypeHandler,
{
async fn get_property_generic(instance: &Mpv, property: &str) -> Result<T, Error> {
instance
.get_property_value(property)
.await
.and_then(T::get_value)
}
}
pub trait SetPropertyTypeHandler<T> {
// TODO: fix this
#[allow(async_fn_in_trait)]
async fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error>;
}
impl<T> SetPropertyTypeHandler<T> for T
where
T: Serialize,
{
async fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error> {
let (res_tx, res_rx) = oneshot::channel();
let value = serde_json::to_value(value)
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?;
instance
.command_sender
.send((
MpvIpcCommand::SetProperty(property.to_owned(), value),
res_tx,
))
.await
.map_err(|_| {
Error(ErrorCode::ConnectError(
"Failed to send command".to_string(),
))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response.map(|_| ()),
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Error(pub ErrorCode); pub struct Error(pub ErrorCode);
impl Drop for Mpv {
fn drop(&mut self) {
self.disconnect();
}
}
impl fmt::Debug for Mpv {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("Mpv").field(&self.name).finish()
}
}
impl Clone for Mpv {
fn clone(&self) -> Self {
let stream = self.stream.try_clone().expect("cloning UnixStream");
let cloned_stream = stream.try_clone().expect("cloning UnixStream");
Mpv {
stream,
reader: BufReader::new(cloned_stream),
name: self.name.clone(),
}
}
fn clone_from(&mut self, source: &Self) {
let stream = source.stream.try_clone().expect("cloning UnixStream");
let cloned_stream = stream.try_clone().expect("cloning UnixStream");
*self = Mpv {
stream,
reader: BufReader::new(cloned_stream),
name: source.name.clone(),
}
}
}
impl Display for Error { impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(&self.0, f) Display::fmt(&self.0, f)
@ -241,119 +286,69 @@ impl Display for ErrorCode {
} }
} }
pub trait GetPropertyTypeHandler: Sized { #[derive(Clone)]
fn get_property_generic(instance: &Mpv, property: &str) -> Result<Self, Error>; pub struct Mpv {
command_sender: tokio::sync::mpsc::Sender<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
} }
impl GetPropertyTypeHandler for bool { impl fmt::Debug for Mpv {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<bool, Error> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
get_mpv_property::<bool>(instance, property) fmt.debug_struct("Mpv").finish()
}
}
impl GetPropertyTypeHandler for String {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<String, Error> {
get_mpv_property::<String>(instance, property)
}
}
impl GetPropertyTypeHandler for f64 {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<f64, Error> {
get_mpv_property::<f64>(instance, property)
}
}
impl GetPropertyTypeHandler for usize {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<usize, Error> {
get_mpv_property::<usize>(instance, property)
}
}
impl GetPropertyTypeHandler for Vec<PlaylistEntry> {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<Vec<PlaylistEntry>, Error> {
get_mpv_property::<Vec<PlaylistEntry>>(instance, property)
}
}
impl GetPropertyTypeHandler for HashMap<String, MpvDataType> {
fn get_property_generic(
instance: &Mpv,
property: &str,
) -> Result<HashMap<String, MpvDataType>, Error> {
get_mpv_property::<HashMap<String, MpvDataType>>(instance, property)
}
}
pub trait SetPropertyTypeHandler<T> {
fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error>;
}
impl SetPropertyTypeHandler<bool> for bool {
fn set_property_generic(instance: &Mpv, property: &str, value: bool) -> Result<(), Error> {
set_mpv_property(instance, property, json!(value))
}
}
impl SetPropertyTypeHandler<String> for String {
fn set_property_generic(instance: &Mpv, property: &str, value: String) -> Result<(), Error> {
set_mpv_property(instance, property, json!(value))
}
}
impl SetPropertyTypeHandler<f64> for f64 {
fn set_property_generic(instance: &Mpv, property: &str, value: f64) -> Result<(), Error> {
set_mpv_property(instance, property, json!(value))
}
}
impl SetPropertyTypeHandler<usize> for usize {
fn set_property_generic(instance: &Mpv, property: &str, value: usize) -> Result<(), Error> {
set_mpv_property(instance, property, json!(value))
} }
} }
impl Mpv { impl Mpv {
pub fn connect(socket: &str) -> Result<Mpv, Error> { pub async fn connect(socket_path: &str) -> Result<Mpv, Error> {
match UnixStream::connect(socket) { log::debug!("Connecting to mpv socket at {}", socket_path);
Ok(stream) => {
let cloned_stream = stream.try_clone().expect("cloning UnixStream"); let socket = match UnixStream::connect(socket_path).await {
return Ok(Mpv { Ok(stream) => Ok(stream),
stream,
reader: BufReader::new(cloned_stream),
name: String::from(socket),
});
}
Err(internal_error) => Err(Error(ErrorCode::ConnectError(internal_error.to_string()))), Err(internal_error) => Err(Error(ErrorCode::ConnectError(internal_error.to_string()))),
}?;
Self::connect_socket(socket).await
}
pub async fn connect_socket(socket: UnixStream) -> Result<Mpv, Error> {
let (com_tx, com_rx) = tokio::sync::mpsc::channel(100);
let ipc = MpvIpc::new(socket, com_rx);
log::debug!("Starting IPC handler");
tokio::spawn(ipc.run());
Ok(Mpv {
command_sender: com_tx,
})
}
pub async fn disconnect(&self) -> Result<(), Error> {
let (res_tx, res_rx) = oneshot::channel();
self.command_sender
.send((MpvIpcCommand::Exit, res_tx))
.await
.map_err(|_| {
Error(ErrorCode::ConnectError(
"Failed to send command".to_string(),
))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response.map(|_| ()),
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
} }
} }
pub fn disconnect(&self) { // pub fn get_stream_ref(&self) -> &UnixStream {
let mut stream = &self.stream; // &self.stream
stream // }
.shutdown(std::net::Shutdown::Both)
.expect("socket disconnect"); pub async fn get_metadata(&self) -> Result<HashMap<String, MpvDataType>, Error> {
let mut buffer = [0; 32]; self.get_property("metadata").await
for _ in 0..stream.bytes().count() {
stream.read(&mut buffer[..]).unwrap();
}
} }
pub fn get_stream_ref(&self) -> &UnixStream { pub async fn get_playlist(&self) -> Result<Playlist, Error> {
&self.stream self.get_property::<Vec<PlaylistEntry>>("playlist")
} .await
.map(|entries| Playlist(entries))
pub fn get_metadata(&self) -> Result<HashMap<String, MpvDataType>, Error> {
match get_mpv_property(self, "metadata") {
Ok(map) => Ok(map),
Err(err) => Err(err),
}
}
pub fn get_playlist(&self) -> Result<Playlist, Error> {
match get_mpv_property::<Vec<PlaylistEntry>>(self, "playlist") {
Ok(entries) => Ok(Playlist(entries)),
Err(msg) => Err(msg),
}
} }
/// # Description /// # Description
@ -375,15 +370,18 @@ impl Mpv {
/// # Example /// # Example
/// ``` /// ```
/// use mpvipc::{Mpv, Error}; /// use mpvipc::{Mpv, Error};
/// fn main() -> Result<(), Error> { /// async fn main() -> Result<(), Error> {
/// let mpv = Mpv::connect("/tmp/mpvsocket")?; /// let mpv = Mpv::connect("/tmp/mpvsocket")?;
/// let paused: bool = mpv.get_property("pause")?; /// let paused: bool = mpv.get_property("pause").await?;
/// let title: String = mpv.get_property("media-title")?; /// let title: String = mpv.get_property("media-title").await?;
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub fn get_property<T: GetPropertyTypeHandler>(&self, property: &str) -> Result<T, Error> { pub async fn get_property<T: GetPropertyTypeHandler>(
T::get_property_generic(self, property) &self,
property: &str,
) -> Result<T, Error> {
T::get_property_generic(self, property).await
} }
/// # Description /// # Description
@ -405,12 +403,26 @@ impl Mpv {
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub fn get_property_string(&self, property: &str) -> Result<String, Error> { pub async fn get_property_value(&self, property: &str) -> Result<Value, Error> {
get_mpv_property_string(self, property) let (res_tx, res_rx) = oneshot::channel();
self.command_sender
.send((MpvIpcCommand::GetProperty(property.to_owned()), res_tx))
.await
.map_err(|_| {
Error(ErrorCode::ConnectError(
"Failed to send command".to_string(),
))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response.and_then(|value| {
value.ok_or(Error(ErrorCode::MissingValue))
}),
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
} }
pub fn kill(&self) -> Result<(), Error> { pub async fn kill(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Quit) self.run_command(MpvCommand::Quit).await
} }
/// # Description /// # Description
@ -426,42 +438,44 @@ impl Mpv {
/// println!("{:?}", event); /// println!("{:?}", event);
/// } /// }
/// ``` /// ```
pub fn event_listen(&mut self) -> Result<Event, Error> { // pub fn event_listen(&mut self) -> Result<Event, Error> {
listen(self) // listen(self)
// }
// pub fn event_listen_raw(&mut self) -> String {
// listen_raw(self)
// }
pub async fn next(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistNext).await
} }
pub fn event_listen_raw(&mut self) -> String { pub async fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> {
listen_raw(self)
}
pub fn next(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistNext)
}
pub fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> {
self.run_command(MpvCommand::Observe { self.run_command(MpvCommand::Observe {
id: id, id,
property: property.to_string(), property: property.to_string(),
}) })
.await
} }
pub fn unobserve_property(&self, id: isize) -> Result<(), Error> { pub async fn unobserve_property(&self, id: isize) -> Result<(), Error> {
self.run_command(MpvCommand::Unobserve(id)) self.run_command(MpvCommand::Unobserve(id)).await
} }
pub fn pause(&self) -> Result<(), Error> { pub async fn pause(&self) -> Result<(), Error> {
set_mpv_property(self, "pause", json!(true)) self.set_property("pause", true).await
} }
pub fn prev(&self) -> Result<(), Error> { pub async fn prev(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistPrev) self.run_command(MpvCommand::PlaylistPrev).await
} }
pub fn restart(&self) -> Result<(), Error> { pub async fn restart(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Seek { self.run_command(MpvCommand::Seek {
seconds: 0f64, seconds: 0f64,
option: SeekOptions::Absolute, option: SeekOptions::Absolute,
}) })
.await
} }
/// # Description /// # Description
@ -490,183 +504,254 @@ impl Mpv {
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub fn run_command(&self, command: MpvCommand) -> Result<(), Error> { pub async fn run_command(&self, command: MpvCommand) -> Result<(), Error> {
match command { log::trace!("Running command: {:?}", command);
MpvCommand::LoadFile { file, option } => run_mpv_command( let result = match command {
self, MpvCommand::LoadFile { file, option } => {
"loadfile", self.run_command_raw_ignore_value(
&[ "loadfile",
file.as_ref(), &[file.as_ref(), option.into_raw_command_part().as_str()],
match option { )
PlaylistAddOptions::Append => "append", .await
PlaylistAddOptions::Replace => "replace", }
}, MpvCommand::LoadList { file, option } => {
], self.run_command_raw_ignore_value(
), "loadlist",
MpvCommand::LoadList { file, option } => run_mpv_command( &[file.as_ref(), option.into_raw_command_part().as_str()],
self, )
"loadlist", .await
&[ }
file.as_ref(), MpvCommand::Observe { id, property } => {
match option { let (res_tx, res_rx) = oneshot::channel();
PlaylistAddOptions::Append => "append", self.command_sender
PlaylistAddOptions::Replace => "replace", .send((MpvIpcCommand::ObserveProperty(id, property), res_tx))
}, .await
], .map_err(|_| {
), Error(ErrorCode::ConnectError(
MpvCommand::Observe { id, property } => observe_mpv_property(self, &id, &property), "Failed to send command".to_string(),
MpvCommand::PlaylistClear => run_mpv_command(self, "playlist-clear", &[]), ))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response.map(|_| ()),
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
}
MpvCommand::PlaylistClear => {
self.run_command_raw_ignore_value("playlist-clear", &[])
.await
}
MpvCommand::PlaylistMove { from, to } => { MpvCommand::PlaylistMove { from, to } => {
run_mpv_command(self, "playlist-move", &[&from.to_string(), &to.to_string()]) self.run_command_raw_ignore_value(
"playlist-move",
&[&from.to_string(), &to.to_string()],
)
.await
}
MpvCommand::PlaylistNext => {
self.run_command_raw_ignore_value("playlist-next", &[])
.await
}
MpvCommand::PlaylistPrev => {
self.run_command_raw_ignore_value("playlist-prev", &[])
.await
} }
MpvCommand::PlaylistNext => run_mpv_command(self, "playlist-next", &[]),
MpvCommand::PlaylistPrev => run_mpv_command(self, "playlist-prev", &[]),
MpvCommand::PlaylistRemove(id) => { MpvCommand::PlaylistRemove(id) => {
run_mpv_command(self, "playlist-remove", &[&id.to_string()]) self.run_command_raw_ignore_value("playlist-remove", &[&id.to_string()])
.await
} }
MpvCommand::PlaylistShuffle => run_mpv_command(self, "playlist-shuffle", &[]), MpvCommand::PlaylistShuffle => {
MpvCommand::Quit => run_mpv_command(self, "quit", &[]), self.run_command_raw_ignore_value("playlist-shuffle", &[])
.await
}
MpvCommand::Quit => self.run_command_raw_ignore_value("quit", &[]).await,
MpvCommand::ScriptMessage(args) => { MpvCommand::ScriptMessage(args) => {
let str_args: Vec<_> = args.iter().map(String::as_str).collect(); let str_args: Vec<_> = args.iter().map(String::as_str).collect();
run_mpv_command(self, "script-message", &str_args) self.run_command_raw_ignore_value("script-message", &str_args)
.await
} }
MpvCommand::ScriptMessageTo { target, args } => { MpvCommand::ScriptMessageTo { target, args } => {
let mut cmd_args: Vec<_> = vec![target.as_str()]; let mut cmd_args: Vec<_> = vec![target.as_str()];
let mut str_args: Vec<_> = args.iter().map(String::as_str).collect(); let mut str_args: Vec<_> = args.iter().map(String::as_str).collect();
cmd_args.append(&mut str_args); cmd_args.append(&mut str_args);
run_mpv_command(self, "script-message-to", &cmd_args) self.run_command_raw_ignore_value("script-message-to", &cmd_args)
.await
} }
MpvCommand::Seek { seconds, option } => run_mpv_command( MpvCommand::Seek { seconds, option } => {
self, self.run_command_raw_ignore_value(
"seek", "seek",
&[ &[
&seconds.to_string(), &seconds.to_string(),
match option { option.into_raw_command_part().as_str(),
SeekOptions::Absolute => "absolute", ],
SeekOptions::Relative => "relative", )
SeekOptions::AbsolutePercent => "absolute-percent", .await
SeekOptions::RelativePercent => "relative-percent", }
}, MpvCommand::Stop => self.run_command_raw_ignore_value("stop", &[]).await,
], MpvCommand::Unobserve(id) => {
), let (res_tx, res_rx) = oneshot::channel();
MpvCommand::Stop => run_mpv_command(self, "stop", &[]), self.command_sender
MpvCommand::Unobserve(id) => unobserve_mpv_property(self, &id), .send((MpvIpcCommand::UnobserveProperty(id), res_tx))
} .await
.unwrap();
match res_rx.await {
Ok(MpvIpcResponse(response)) => response.map(|_| ()),
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
}
};
log::trace!("Command result: {:?}", result);
result
} }
/// Run a custom command. /// Run a custom command.
/// This should only be used if the desired command is not implemented /// This should only be used if the desired command is not implemented
/// with [MpvCommand]. /// with [MpvCommand].
pub fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<(), Error> { pub async fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<Option<Value>, Error> {
run_mpv_command(self, command, args) let command = Vec::from(
[command]
.iter()
.chain(args.iter())
.map(|s| s.to_string())
.collect::<Vec<String>>()
.as_slice(),
);
let (res_tx, res_rx) = oneshot::channel();
self.command_sender
.send((MpvIpcCommand::Command(command), res_tx))
.await
.map_err(|_| {
Error(ErrorCode::ConnectError(
"Failed to send command".to_string(),
))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response,
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
} }
pub fn playlist_add( async fn run_command_raw_ignore_value(
&self,
command: &str,
args: &[&str],
) -> Result<(), Error> {
self.run_command_raw(command, args).await.map(|_| ())
}
pub async fn playlist_add(
&self, &self,
file: &str, file: &str,
file_type: PlaylistAddTypeOptions, file_type: PlaylistAddTypeOptions,
option: PlaylistAddOptions, option: PlaylistAddOptions,
) -> Result<(), Error> { ) -> Result<(), Error> {
match file_type { match file_type {
PlaylistAddTypeOptions::File => self.run_command(MpvCommand::LoadFile { PlaylistAddTypeOptions::File => {
file: file.to_string(), self.run_command(MpvCommand::LoadFile {
option, file: file.to_string(),
}), option,
})
.await
}
PlaylistAddTypeOptions::Playlist => self.run_command(MpvCommand::LoadList { PlaylistAddTypeOptions::Playlist => {
file: file.to_string(), self.run_command(MpvCommand::LoadList {
option, file: file.to_string(),
}), option,
})
.await
}
} }
} }
pub fn playlist_clear(&self) -> Result<(), Error> { pub async fn playlist_clear(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistClear) self.run_command(MpvCommand::PlaylistClear).await
} }
pub fn playlist_move_id(&self, from: usize, to: usize) -> Result<(), Error> { pub async fn playlist_move_id(&self, from: usize, to: usize) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistMove { from, to }) self.run_command(MpvCommand::PlaylistMove { from, to })
.await
} }
pub fn playlist_play_id(&self, id: usize) -> Result<(), Error> { pub async fn playlist_play_id(&self, id: usize) -> Result<(), Error> {
set_mpv_property(self, "playlist-pos", json!(id)) self.set_property("playlist-pos", id).await
} }
pub fn playlist_play_next(&self, id: usize) -> Result<(), Error> { pub async fn playlist_play_next(&self, id: usize) -> Result<(), Error> {
match get_mpv_property::<usize>(self, "playlist-pos") { match self.get_property::<usize>("playlist-pos").await {
Ok(current_id) => self.run_command(MpvCommand::PlaylistMove { Ok(current_id) => {
from: id, self.run_command(MpvCommand::PlaylistMove {
to: current_id + 1, from: id,
}), to: current_id + 1,
})
.await
}
Err(msg) => Err(msg), Err(msg) => Err(msg),
} }
} }
pub fn playlist_remove_id(&self, id: usize) -> Result<(), Error> { pub async fn playlist_remove_id(&self, id: usize) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistRemove(id)) self.run_command(MpvCommand::PlaylistRemove(id)).await
} }
pub fn playlist_shuffle(&self) -> Result<(), Error> { pub async fn playlist_shuffle(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistShuffle) self.run_command(MpvCommand::PlaylistShuffle).await
} }
pub fn seek(&self, seconds: f64, option: SeekOptions) -> Result<(), Error> { pub async fn seek(&self, seconds: f64, option: SeekOptions) -> Result<(), Error> {
self.run_command(MpvCommand::Seek { seconds, option }) self.run_command(MpvCommand::Seek { seconds, option }).await
} }
pub fn set_loop_file(&self, option: Switch) -> Result<(), Error> { pub async fn set_loop_file(&self, option: Switch) -> Result<(), Error> {
let mut enabled = false; let enabled = match option {
match option { Switch::On => "inf",
Switch::On => enabled = true, Switch::Off => "no",
Switch::Off => {} Switch::Toggle => {
Switch::Toggle => match get_mpv_property_string(self, "loop-file") { self.get_property::<String>("loop-file")
Ok(value) => match value.as_ref() { .await
"false" => { .map(|s| match s.as_str() {
enabled = true; "inf" => "no",
} "no" => "inf",
_ => { _ => "no",
enabled = false; })?
} }
}, };
Err(msg) => return Err(msg), self.set_property("loop-file", enabled).await
},
}
set_mpv_property(self, "loop-file", json!(enabled))
} }
pub fn set_loop_playlist(&self, option: Switch) -> Result<(), Error> { pub async fn set_loop_playlist(&self, option: Switch) -> Result<(), Error> {
let mut enabled = false; let enabled = match option {
match option { Switch::On => "inf",
Switch::On => enabled = true, Switch::Off => "no",
Switch::Off => {} Switch::Toggle => {
Switch::Toggle => match get_mpv_property_string(self, "loop-playlist") { self.get_property::<String>("loop-playlist")
Ok(value) => match value.as_ref() { .await
"false" => { .map(|s| match s.as_str() {
enabled = true; "inf" => "no",
} "no" => "inf",
_ => { _ => "no",
enabled = false; })?
} }
}, };
Err(msg) => return Err(msg), self.set_property("loo-playlist", enabled).await
},
}
set_mpv_property(self, "loop-playlist", json!(enabled))
} }
pub fn set_mute(&self, option: Switch) -> Result<(), Error> { pub async fn set_mute(&self, option: Switch) -> Result<(), Error> {
let mut enabled = false; let enabled = match option {
match option { Switch::On => "yes",
Switch::On => enabled = true, Switch::Off => "no",
Switch::Off => {} Switch::Toggle => {
Switch::Toggle => match get_mpv_property::<bool>(self, "mute") { self.get_property::<String>("mute")
Ok(value) => { .await
enabled = !value; .map(|s| match s.as_str() {
} "yes" => "no",
Err(msg) => return Err(msg), "no" => "yes",
}, _ => "no",
} })?
set_mpv_property(self, "mute", json!(enabled)) }
};
self.set_property("mute", enabled).await
} }
/// # Description /// # Description
@ -687,63 +772,67 @@ impl Mpv {
/// # Example /// # Example
/// ``` /// ```
/// use mpvipc::{Mpv, Error}; /// use mpvipc::{Mpv, Error};
/// fn main() -> Result<(), Error> { /// fn async main() -> Result<(), Error> {
/// let mpv = Mpv::connect("/tmp/mpvsocket")?; /// let mpv = Mpv::connect("/tmp/mpvsocket")?;
/// mpv.set_property("pause", true)?; /// mpv.set_property("pause", true).await?;
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub fn set_property<T: SetPropertyTypeHandler<T>>( pub async fn set_property<T: SetPropertyTypeHandler<T>>(
&self, &self,
property: &str, property: &str,
value: T, value: T,
) -> Result<(), Error> { ) -> Result<(), Error> {
T::set_property_generic(self, property, value) T::set_property_generic(self, property, value).await
} }
pub fn set_speed(&self, input_speed: f64, option: NumberChangeOptions) -> Result<(), Error> { pub async fn set_speed(
match get_mpv_property::<f64>(self, "speed") { &self,
input_speed: f64,
option: NumberChangeOptions,
) -> Result<(), Error> {
match self.get_property::<f64>("speed").await {
Ok(speed) => match option { Ok(speed) => match option {
NumberChangeOptions::Increase => { NumberChangeOptions::Increase => {
set_mpv_property(self, "speed", json!(speed + input_speed)) self.set_property("speed", speed + input_speed).await
} }
NumberChangeOptions::Decrease => { NumberChangeOptions::Decrease => {
set_mpv_property(self, "speed", json!(speed - input_speed)) self.set_property("speed", speed - input_speed).await
} }
NumberChangeOptions::Absolute => { NumberChangeOptions::Absolute => self.set_property("speed", input_speed).await,
set_mpv_property(self, "speed", json!(input_speed))
}
}, },
Err(msg) => Err(msg), Err(msg) => Err(msg),
} }
} }
pub fn set_volume(&self, input_volume: f64, option: NumberChangeOptions) -> Result<(), Error> { pub async fn set_volume(
match get_mpv_property::<f64>(self, "volume") { &self,
input_volume: f64,
option: NumberChangeOptions,
) -> Result<(), Error> {
match self.get_property::<f64>("volume").await {
Ok(volume) => match option { Ok(volume) => match option {
NumberChangeOptions::Increase => { NumberChangeOptions::Increase => {
set_mpv_property(self, "volume", json!(volume + input_volume)) self.set_property("volume", volume + input_volume).await
} }
NumberChangeOptions::Decrease => { NumberChangeOptions::Decrease => {
set_mpv_property(self, "volume", json!(volume - input_volume)) self.set_property("volume", volume - input_volume).await
} }
NumberChangeOptions::Absolute => { NumberChangeOptions::Absolute => self.set_property("volume", input_volume).await,
set_mpv_property(self, "volume", json!(input_volume))
}
}, },
Err(msg) => Err(msg), Err(msg) => Err(msg),
} }
} }
pub fn stop(&self) -> Result<(), Error> { pub async fn stop(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Stop) self.run_command(MpvCommand::Stop).await
} }
pub fn toggle(&self) -> Result<(), Error> { pub async fn toggle(&self) -> Result<(), Error> {
run_mpv_command(self, "cycle", &["pause"]) self.run_command_raw("cycle", &["pause"]).await.map(|_| ())
} }
} }

View File

@ -1,248 +1,162 @@
use crate::message_parser::TypeHandler;
use self::message_parser::extract_mpv_response_data;
use self::message_parser::json_array_to_playlist;
use self::message_parser::json_array_to_vec;
use self::message_parser::json_map_to_hashmap;
use super::*; use super::*;
use log::{debug, warn}; use futures::{SinkExt, StreamExt};
use serde_json::json; use serde_json::{json, Value};
use serde_json::Value; use std::mem;
use std::io::BufRead; use tokio::net::UnixStream;
use std::io::BufReader; use tokio::sync::mpsc::Receiver;
use std::io::Write; use tokio::sync::{oneshot, Mutex};
use tokio_util::codec::{Framed, LinesCodec};
pub fn get_mpv_property<T: TypeHandler>(instance: &Mpv, property: &str) -> Result<T, Error> { pub(crate) struct MpvIpc {
let ipc_string = json!({"command": ["get_property", property]}); socket: Framed<UnixStream, LinesCodec>,
match serde_json::from_str::<Value>(&send_command_sync(instance, ipc_string)) { command_channel: Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
Ok(val) => T::get_value(val), socket_lock: Mutex<()>,
Err(why) => Err(Error(ErrorCode::JsonParseError(why.to_string()))),
}
} }
pub fn get_mpv_property_string(instance: &Mpv, property: &str) -> Result<String, Error> { #[derive(Debug, Clone, PartialEq, Eq)]
let ipc_string = json!({"command": ["get_property", property]}); pub(crate) enum MpvIpcCommand {
let val = serde_json::from_str::<Value>(&send_command_sync(instance, ipc_string)) Command(Vec<String>),
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?; GetProperty(String),
SetProperty(String, Value),
let data = extract_mpv_response_data(&val)?; ObserveProperty(isize, String),
UnobserveProperty(isize),
match data { Exit,
Value::Bool(b) => Ok(b.to_string()),
Value::Number(ref n) => Ok(n.to_string()),
Value::String(ref s) => Ok(s.to_string()),
Value::Array(ref array) => Ok(format!("{:?}", array)),
Value::Object(ref map) => Ok(format!("{:?}", map)),
Value::Null => Err(Error(ErrorCode::MissingValue)),
}
} }
fn validate_mpv_response(response: &str) -> Result<(), Error> { #[derive(Debug, Clone)]
serde_json::from_str::<Value>(response) pub(crate) struct MpvIpcResponse(pub(crate) Result<Option<Value>, Error>);
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))
.and_then(|value| extract_mpv_response_data(&value).map(|_| ()))
}
pub fn set_mpv_property(instance: &Mpv, property: &str, value: Value) -> Result<(), Error> { impl MpvIpc {
let ipc_string = json!({ pub(crate) fn new(
"command": ["set_property", property, value] socket: UnixStream,
}); command_channel: Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
) -> Self {
let response = &send_command_sync(instance, ipc_string); MpvIpc {
validate_mpv_response(response) socket: Framed::new(socket, LinesCodec::new()),
} command_channel,
socket_lock: Mutex::new(()),
pub fn run_mpv_command(instance: &Mpv, command: &str, args: &[&str]) -> Result<(), Error> {
let mut ipc_string = json!({
"command": [command]
});
if let Value::Array(args_array) = &mut ipc_string["command"] {
for arg in args {
args_array.push(json!(arg));
} }
} }
let response = &send_command_sync(instance, ipc_string); pub(crate) async fn send_command(&mut self, command: &[&str]) -> Result<Option<Value>, Error> {
validate_mpv_response(response) let lock = self.socket_lock.lock().await;
} // START CRITICAL SECTION
let ipc_command = json!({ "command": command });
pub fn observe_mpv_property(instance: &Mpv, id: &isize, property: &str) -> Result<(), Error> { let ipc_command_str = serde_json::to_string(&ipc_command)
let ipc_string = json!({
"command": ["observe_property", id, property]
});
let response = &send_command_sync(instance, ipc_string);
validate_mpv_response(response)
}
pub fn unobserve_mpv_property(instance: &Mpv, id: &isize) -> Result<(), Error> {
let ipc_string = json!({
"command": ["unobserve_property", id]
});
let response = &send_command_sync(instance, ipc_string);
validate_mpv_response(response)
}
fn try_convert_property(name: &str, id: usize, data: MpvDataType) -> Event {
let property = match name {
"path" => match data {
MpvDataType::String(value) => Property::Path(Some(value)),
MpvDataType::Null => Property::Path(None),
_ => unimplemented!(),
},
"pause" => match data {
MpvDataType::Bool(value) => Property::Pause(value),
_ => unimplemented!(),
},
"playback-time" => match data {
MpvDataType::Double(value) => Property::PlaybackTime(Some(value)),
MpvDataType::Null => Property::PlaybackTime(None),
_ => unimplemented!(),
},
"duration" => match data {
MpvDataType::Double(value) => Property::Duration(Some(value)),
MpvDataType::Null => Property::Duration(None),
_ => unimplemented!(),
},
"metadata" => match data {
MpvDataType::HashMap(value) => Property::Metadata(Some(value)),
MpvDataType::Null => Property::Metadata(None),
_ => unimplemented!(),
},
_ => {
warn!("Property {} not implemented", name);
Property::Unknown {
name: name.to_string(),
data,
}
}
};
Event::PropertyChange { id, property }
}
pub fn listen(instance: &mut Mpv) -> Result<Event, Error> {
let mut e;
// sometimes we get responses unrelated to events, so we read a new line until we receive one
// with an event field
let name = loop {
let mut response = String::new();
instance.reader.read_line(&mut response).unwrap();
response = response.trim_end().to_string();
debug!("Event: {}", response);
e = serde_json::from_str::<Value>(&response)
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?; .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?;
match e["event"] { log::trace!("Sending command: {}", ipc_command_str);
Value::String(ref name) => break name,
_ => { self.socket
// It was not an event - try again .send(ipc_command_str)
debug!("Bad response: {:?}", response) .await
.map_err(|why| Error(ErrorCode::ConnectError(why.to_string())))?;
let response = self
.socket
.next()
.await
.ok_or(Error(ErrorCode::MissingValue))?
.map_err(|why| Error(ErrorCode::ConnectError(why.to_string())))?;
// END CRITICAL SECTION
mem::drop(lock);
log::trace!("Received response: {}", response);
serde_json::from_str::<Value>(&response)
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))
.and_then(parse_mpv_response_data)
}
pub(crate) async fn get_mpv_property(&mut self, property: &str) -> Result<Option<Value>, Error> {
self.send_command(&["get_property", property]).await
}
pub(crate) async fn set_mpv_property(
&mut self,
property: &str,
value: Value,
) -> Result<Option<Value>, Error> {
let str_value = match &value {
Value::String(s) => s,
v => &serde_json::to_string(&v).unwrap()
};
self.send_command(&["set_property", property, &str_value])
.await
}
pub(crate) async fn observe_property(
&mut self,
id: isize,
property: &str,
) -> Result<Option<Value>, Error> {
self.send_command(&["observe_property", &id.to_string(), property])
.await
}
pub(crate) async fn unobserve_property(&mut self, id: isize) -> Result<Option<Value>, Error> {
self.send_command(&["unobserve_property", &id.to_string()])
.await
}
pub(crate) async fn run(mut self) -> Result<(), Error> {
loop {
tokio::select! {
Some(event) = self.socket.next() => {
log::trace!("Handling event: {:?}", serde_json::from_str::<Value>(&event.unwrap()).unwrap());
// TODO: handle event
}
Some((cmd, tx)) = self.command_channel.recv() => {
log::trace!("Handling command: {:?}", cmd);
match cmd {
MpvIpcCommand::Command(command) => {
let refs = command.iter().map(|s| s.as_str()).collect::<Vec<&str>>();
let response = self.send_command(refs.as_slice()).await;
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::GetProperty(property) => {
let response = self.get_mpv_property(&property).await;
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::SetProperty(property, value) => {
let response = self.set_mpv_property(&property, value).await;
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::ObserveProperty(id, property) => {
let response = self.observe_property(id, &property).await;
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::UnobserveProperty(id) => {
let response = self.unobserve_property(id).await;
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::Exit => {
tx.send(MpvIpcResponse(Ok(None))).unwrap();
return Ok(());
}
}
}
} }
} }
};
let event = match name.as_str() {
"shutdown" => Event::Shutdown,
"start-file" => Event::StartFile,
"file-loaded" => Event::FileLoaded,
"seek" => Event::Seek,
"playback-restart" => Event::PlaybackRestart,
"idle" => Event::Idle,
"tick" => Event::Tick,
"video-reconfig" => Event::VideoReconfig,
"audio-reconfig" => Event::AudioReconfig,
"tracks-changed" => Event::TracksChanged,
"track-switched" => Event::TrackSwitched,
"pause" => Event::Pause,
"unpause" => Event::Unpause,
"metadata-update" => Event::MetadataUpdate,
"chapter-change" => Event::ChapterChange,
"end-file" => Event::EndFile,
"property-change" => {
let name = match e["name"] {
Value::String(ref n) => Ok(n.to_string()),
_ => Err(Error(ErrorCode::JsonContainsUnexptectedType)),
}?;
let id: usize = match e["id"] {
Value::Number(ref n) => n.as_u64().unwrap() as usize,
_ => 0,
};
let data: MpvDataType = match e["data"] {
Value::String(ref n) => MpvDataType::String(n.to_string()),
Value::Array(ref a) => {
if name == "playlist".to_string() {
MpvDataType::Playlist(Playlist(json_array_to_playlist(a)))
} else {
MpvDataType::Array(json_array_to_vec(a))
}
}
Value::Bool(b) => MpvDataType::Bool(b),
Value::Number(ref n) => {
if n.is_u64() {
MpvDataType::Usize(n.as_u64().unwrap() as usize)
} else if n.is_f64() {
MpvDataType::Double(n.as_f64().unwrap())
} else {
return Err(Error(ErrorCode::JsonContainsUnexptectedType));
}
}
Value::Object(ref m) => MpvDataType::HashMap(json_map_to_hashmap(m)),
Value::Null => MpvDataType::Null,
};
try_convert_property(name.as_ref(), id, data)
}
"client-message" => {
let args = match e["args"] {
Value::Array(ref a) => json_array_to_vec(a)
.iter()
.map(|arg| match arg {
MpvDataType::String(s) => Ok(s.to_owned()),
_ => Err(Error(ErrorCode::JsonContainsUnexptectedType)),
})
.collect::<Result<Vec<_>, _>>(),
_ => return Err(Error(ErrorCode::JsonContainsUnexptectedType)),
}?;
Event::ClientMessage { args }
}
_ => Event::Unimplemented,
};
Ok(event)
}
pub fn listen_raw(instance: &mut Mpv) -> String {
let mut response = String::new();
instance.reader.read_line(&mut response).unwrap();
response.trim_end().to_string()
}
fn send_command_sync(instance: &Mpv, command: Value) -> String {
let stream = &instance.stream;
match serde_json::to_writer(stream, &command) {
Err(why) => panic!("Error: Could not write to socket: {}", why),
Ok(_) => {
let mut stream = stream;
stream.write_all(b"\n").unwrap();
let mut response = String::new();
{
let mut reader = BufReader::new(stream);
while !response.contains("\"error\":") {
response.clear();
reader.read_line(&mut response).unwrap();
}
}
debug!("Response: {}", response.trim_end());
response
}
} }
} }
fn parse_mpv_response_data(value: Value) -> Result<Option<Value>, Error> {
log::trace!("Parsing mpv response data: {:?}", value);
let result = value
.as_object()
.map(|o| (o.get("error").and_then(|e| e.as_str()), o.get("data")))
.ok_or(Error(ErrorCode::UnexpectedValue))
.and_then(|(error, data)| match error {
Some("success") => Ok(data),
Some(e) => Err(Error(ErrorCode::MpvError(e.to_string()))),
None => Err(Error(ErrorCode::UnexpectedValue)),
});
match &result {
Ok(v) => log::trace!("Successfully parsed mpv response data: {:?}", v),
Err(e) => log::trace!("Error parsing mpv response data: {:?}", e),
}
result.map(|opt| opt.map(|val| val.clone()))
}

View File

@ -9,25 +9,11 @@ pub trait TypeHandler: Sized {
fn as_string(&self) -> String; fn as_string(&self) -> String;
} }
pub(crate) fn extract_mpv_response_data(value: &Value) -> Result<&Value, Error> {
value
.as_object()
.map(|o| (o.get("error").and_then(|e| e.as_str()), o.get("data")))
.ok_or(Error(ErrorCode::UnexpectedValue))
.and_then(|(error, data)| match error {
Some("success") => data.ok_or(Error(ErrorCode::UnexpectedValue)),
Some(e) => Err(Error(ErrorCode::MpvError(e.to_string()))),
None => Err(Error(ErrorCode::UnexpectedValue)),
})
}
impl TypeHandler for String { impl TypeHandler for String {
fn get_value(value: Value) -> Result<String, Error> { fn get_value(value: Value) -> Result<String, Error> {
extract_mpv_response_data(&value) value
.and_then(|d| { .as_str()
d.as_str() .ok_or(Error(ErrorCode::ValueDoesNotContainString))
.ok_or(Error(ErrorCode::ValueDoesNotContainString))
})
.map(|s| s.to_string()) .map(|s| s.to_string())
} }
@ -38,8 +24,7 @@ impl TypeHandler for String {
impl TypeHandler for bool { impl TypeHandler for bool {
fn get_value(value: Value) -> Result<bool, Error> { fn get_value(value: Value) -> Result<bool, Error> {
extract_mpv_response_data(&value) value.as_bool().ok_or(Error(ErrorCode::ValueDoesNotContainBool))
.and_then(|d| d.as_bool().ok_or(Error(ErrorCode::ValueDoesNotContainBool)))
} }
fn as_string(&self) -> String { fn as_string(&self) -> String {
@ -53,8 +38,7 @@ impl TypeHandler for bool {
impl TypeHandler for f64 { impl TypeHandler for f64 {
fn get_value(value: Value) -> Result<f64, Error> { fn get_value(value: Value) -> Result<f64, Error> {
extract_mpv_response_data(&value) value.as_f64().ok_or(Error(ErrorCode::ValueDoesNotContainF64))
.and_then(|d| d.as_f64().ok_or(Error(ErrorCode::ValueDoesNotContainF64)))
} }
fn as_string(&self) -> String { fn as_string(&self) -> String {
@ -64,9 +48,9 @@ impl TypeHandler for f64 {
impl TypeHandler for usize { impl TypeHandler for usize {
fn get_value(value: Value) -> Result<usize, Error> { fn get_value(value: Value) -> Result<usize, Error> {
extract_mpv_response_data(&value) value.as_u64()
.and_then(|d| d.as_u64().ok_or(Error(ErrorCode::ValueDoesNotContainUsize)))
.map(|u| u as usize) .map(|u| u as usize)
.ok_or(Error(ErrorCode::ValueDoesNotContainUsize))
} }
fn as_string(&self) -> String { fn as_string(&self) -> String {
@ -76,11 +60,8 @@ impl TypeHandler for usize {
impl TypeHandler for HashMap<String, MpvDataType> { impl TypeHandler for HashMap<String, MpvDataType> {
fn get_value(value: Value) -> Result<HashMap<String, MpvDataType>, Error> { fn get_value(value: Value) -> Result<HashMap<String, MpvDataType>, Error> {
extract_mpv_response_data(&value) value.as_object()
.and_then(|d| {
d.as_object()
.ok_or(Error(ErrorCode::ValueDoesNotContainHashMap)) .ok_or(Error(ErrorCode::ValueDoesNotContainHashMap))
})
.map(json_map_to_hashmap) .map(json_map_to_hashmap)
} }
@ -91,11 +72,8 @@ impl TypeHandler for HashMap<String, MpvDataType> {
impl TypeHandler for Vec<PlaylistEntry> { impl TypeHandler for Vec<PlaylistEntry> {
fn get_value(value: Value) -> Result<Vec<PlaylistEntry>, Error> { fn get_value(value: Value) -> Result<Vec<PlaylistEntry>, Error> {
extract_mpv_response_data(&value) value.as_array()
.and_then(|d| {
d.as_array()
.ok_or(Error(ErrorCode::ValueDoesNotContainPlaylist)) .ok_or(Error(ErrorCode::ValueDoesNotContainPlaylist))
})
.map(json_array_to_playlist) .map(json_array_to_playlist)
} }