diff --git a/Cargo.toml b/Cargo.toml index b4a3ec3..bc0907b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,11 +8,19 @@ homepage = "https://gitlab.com/mpv-ipc/mpvipc" repository = "https://gitlab.com/mpv-ipc/mpvipc" documentation = "https://docs.rs/mpvipc/" edition = "2021" +rust-version = "1.75" [dependencies] serde_json = "1.0.104" log = "0.4.19" serde = { version = "1.0.197", features = ["derive"] } +# TODO: downscale features once implementation is stable +tokio = { version = "1.37.0", features = ["full"] } +tokio-util = { version = "0.7.10", features = ["codec"] } +futures = "0.3.30" [dev-dependencies] env_logger = "0.10.0" + +[lib] +doctest = false \ No newline at end of file diff --git a/examples/fetch_state.rs b/examples/fetch_state.rs index e085275..63b9619 100644 --- a/examples/fetch_state.rs +++ b/examples/fetch_state.rs @@ -1,15 +1,16 @@ use env_logger; use mpvipc::{Error as MpvError, Mpv}; -fn main() -> Result<(), MpvError> { +#[tokio::main] +async fn main() -> Result<(), MpvError> { env_logger::init(); - let mpv = Mpv::connect("/tmp/mpv.sock")?; - let meta = mpv.get_metadata()?; + let mpv = Mpv::connect("/tmp/mpv.sock").await?; + let meta = mpv.get_metadata().await?; println!("metadata: {:?}", meta); - let playlist = mpv.get_playlist()?; + let playlist = mpv.get_playlist().await?; println!("playlist: {:?}", playlist); - let playback_time: f64 = mpv.get_property("playback-time")?; + let playback_time: f64 = mpv.get_property("playback-time").await?; println!("playback-time: {}", playback_time); Ok(()) } diff --git a/examples/media_player.rs b/examples/media_player.rs index 3e44516..70ffd20 100644 --- a/examples/media_player.rs +++ b/examples/media_player.rs @@ -11,58 +11,60 @@ fn seconds_to_hms(total: f64) -> String { format!("{:02}:{:02}:{:02}", hours, minutes, seconds) } -fn main() -> Result<(), Error> { +#[tokio::main] +async fn main() -> Result<(), Error> { env_logger::init(); - let mut mpv = Mpv::connect("/tmp/mpv.sock")?; + let mut mpv = Mpv::connect("/tmp/mpv.sock").await?; let mut pause = false; let mut playback_time = std::f64::NAN; let mut duration = std::f64::NAN; - mpv.observe_property(1, "path")?; - mpv.observe_property(2, "pause")?; - mpv.observe_property(3, "playback-time")?; - mpv.observe_property(4, "duration")?; - mpv.observe_property(5, "metadata")?; + mpv.observe_property(1, "path").await?; + mpv.observe_property(2, "pause").await?; + mpv.observe_property(3, "playback-time").await?; + mpv.observe_property(4, "duration").await?; + mpv.observe_property(5, "metadata").await?; loop { - let event = mpv.event_listen()?; - match event { - Event::PropertyChange { id: _, property } => match property { - Property::Path(Some(value)) => println!("\nPlaying: {}", value), - Property::Path(None) => (), - Property::Pause(value) => pause = value, - Property::PlaybackTime(Some(value)) => playback_time = value, - Property::PlaybackTime(None) => playback_time = std::f64::NAN, - Property::Duration(Some(value)) => duration = value, - Property::Duration(None) => duration = std::f64::NAN, - Property::Metadata(Some(value)) => { - println!("File tags:"); - if let Some(MpvDataType::String(value)) = value.get("ARTIST") { - println!(" Artist: {}", value); - } - if let Some(MpvDataType::String(value)) = value.get("ALBUM") { - println!(" Album: {}", value); - } - if let Some(MpvDataType::String(value)) = value.get("TITLE") { - println!(" Title: {}", value); - } - if let Some(MpvDataType::String(value)) = value.get("TRACK") { - println!(" Track: {}", value); - } - } - Property::Metadata(None) => (), - Property::Unknown { name: _, data: _ } => (), - }, - Event::Shutdown => return Ok(()), - Event::Unimplemented => panic!("Unimplemented event"), - _ => (), - } - print!( - "{}{} / {} ({:.0}%)\r", - if pause { "(Paused) " } else { "" }, - seconds_to_hms(playback_time), - seconds_to_hms(duration), - 100. * playback_time / duration - ); - io::stdout().flush().unwrap(); + // TODO: + // let event = mpv.event_listen()?; + // match event { + // Event::PropertyChange { id: _, property } => match property { + // Property::Path(Some(value)) => println!("\nPlaying: {}", value), + // Property::Path(None) => (), + // Property::Pause(value) => pause = value, + // Property::PlaybackTime(Some(value)) => playback_time = value, + // Property::PlaybackTime(None) => playback_time = std::f64::NAN, + // Property::Duration(Some(value)) => duration = value, + // Property::Duration(None) => duration = std::f64::NAN, + // Property::Metadata(Some(value)) => { + // println!("File tags:"); + // if let Some(MpvDataType::String(value)) = value.get("ARTIST") { + // println!(" Artist: {}", value); + // } + // if let Some(MpvDataType::String(value)) = value.get("ALBUM") { + // println!(" Album: {}", value); + // } + // if let Some(MpvDataType::String(value)) = value.get("TITLE") { + // println!(" Title: {}", value); + // } + // if let Some(MpvDataType::String(value)) = value.get("TRACK") { + // println!(" Track: {}", value); + // } + // } + // Property::Metadata(None) => (), + // Property::Unknown { name: _, data: _ } => (), + // }, + // Event::Shutdown => return Ok(()), + // Event::Unimplemented => panic!("Unimplemented event"), + // _ => (), + // } + // print!( + // "{}{} / {} ({:.0}%)\r", + // if pause { "(Paused) " } else { "" }, + // seconds_to_hms(playback_time), + // seconds_to_hms(duration), + // 100. * playback_time / duration + // ); + // io::stdout().flush().unwrap(); } } diff --git a/src/api.rs b/src/api.rs index d70bef4..6efa1dc 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,14 +1,13 @@ use serde::{Deserialize, Serialize}; -use serde_json::json; -use std::collections::HashMap; -use std::fmt::{self, Display}; -use std::io::{BufReader, Read}; -use std::os::unix::net::UnixStream; - -use crate::ipc::{ - get_mpv_property, get_mpv_property_string, listen, listen_raw, observe_mpv_property, - run_mpv_command, set_mpv_property, unobserve_mpv_property, +use serde_json::Value; +use std::{ + collections::HashMap, + fmt::{self, Display}, }; +use tokio::{net::UnixStream, sync::oneshot}; + +use crate::ipc::{MpvIpc, MpvIpcCommand, MpvIpcResponse}; +use crate::message_parser::TypeHandler; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Event { @@ -126,7 +125,7 @@ pub enum Switch { Toggle, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum ErrorCode { MpvError(String), JsonParseError(String), @@ -144,7 +143,7 @@ pub enum ErrorCode { ValueDoesNotContainUsize, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PlaylistEntry { pub id: usize, pub filename: String, @@ -152,52 +151,59 @@ pub struct PlaylistEntry { pub current: bool, } -pub struct Mpv { - pub(crate) stream: UnixStream, - pub(crate) reader: BufReader, - pub(crate) name: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Playlist(pub Vec); -#[derive(Debug, Clone, Serialize, Deserialize)] +pub trait GetPropertyTypeHandler: Sized { + async fn get_property_generic(instance: &Mpv, property: &str) -> Result; +} + +impl GetPropertyTypeHandler for T +where + T: TypeHandler, +{ + async fn get_property_generic(instance: &Mpv, property: &str) -> Result { + instance + .get_property_value(property) + .await + .and_then(T::get_value) + } +} + +pub trait SetPropertyTypeHandler { + async fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error>; +} + +impl SetPropertyTypeHandler for T +where + T: Serialize, +{ + async fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error> { + let (res_tx, res_rx) = oneshot::channel(); + let value = serde_json::to_value(value) + .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?; + instance + .command_sender + .send(( + MpvIpcCommand::SetProperty(property.to_owned(), value), + res_tx, + )) + .await + .map_err(|_| { + Error(ErrorCode::ConnectError( + "Failed to send command".to_string(), + )) + })?; + match res_rx.await { + Ok(MpvIpcResponse::ValueResponse(_)) => Ok(()), + _ => Err(Error(ErrorCode::UnexpectedResult)), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Error(pub ErrorCode); -impl Drop for Mpv { - fn drop(&mut self) { - self.disconnect(); - } -} - -impl fmt::Debug for Mpv { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_tuple("Mpv").field(&self.name).finish() - } -} - -impl Clone for Mpv { - fn clone(&self) -> Self { - let stream = self.stream.try_clone().expect("cloning UnixStream"); - let cloned_stream = stream.try_clone().expect("cloning UnixStream"); - Mpv { - stream, - reader: BufReader::new(cloned_stream), - name: self.name.clone(), - } - } - - fn clone_from(&mut self, source: &Self) { - let stream = source.stream.try_clone().expect("cloning UnixStream"); - let cloned_stream = stream.try_clone().expect("cloning UnixStream"); - *self = Mpv { - stream, - reader: BufReader::new(cloned_stream), - name: source.name.clone(), - } - } -} - impl Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { Display::fmt(&self.0, f) @@ -241,116 +247,79 @@ impl Display for ErrorCode { } } -pub trait GetPropertyTypeHandler: Sized { - fn get_property_generic(instance: &Mpv, property: &str) -> Result; +#[derive(Clone)] +pub struct Mpv { + command_sender: tokio::sync::mpsc::Sender<(MpvIpcCommand, oneshot::Sender)>, } -impl GetPropertyTypeHandler for bool { - fn get_property_generic(instance: &Mpv, property: &str) -> Result { - get_mpv_property::(instance, property) - } -} +// impl Drop for Mpv { +// fn drop(&mut self) { +// self.disconnect(); +// } +// } -impl GetPropertyTypeHandler for String { - fn get_property_generic(instance: &Mpv, property: &str) -> Result { - get_mpv_property::(instance, property) - } -} - -impl GetPropertyTypeHandler for f64 { - fn get_property_generic(instance: &Mpv, property: &str) -> Result { - get_mpv_property::(instance, property) - } -} - -impl GetPropertyTypeHandler for usize { - fn get_property_generic(instance: &Mpv, property: &str) -> Result { - get_mpv_property::(instance, property) - } -} - -impl GetPropertyTypeHandler for Vec { - fn get_property_generic(instance: &Mpv, property: &str) -> Result, Error> { - get_mpv_property::>(instance, property) - } -} - -impl GetPropertyTypeHandler for HashMap { - fn get_property_generic( - instance: &Mpv, - property: &str, - ) -> Result, Error> { - get_mpv_property::>(instance, property) - } -} - -pub trait SetPropertyTypeHandler { - fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error>; -} - -impl SetPropertyTypeHandler for bool { - fn set_property_generic(instance: &Mpv, property: &str, value: bool) -> Result<(), Error> { - set_mpv_property(instance, property, json!(value)) - } -} - -impl SetPropertyTypeHandler for String { - fn set_property_generic(instance: &Mpv, property: &str, value: String) -> Result<(), Error> { - set_mpv_property(instance, property, json!(value)) - } -} - -impl SetPropertyTypeHandler for f64 { - fn set_property_generic(instance: &Mpv, property: &str, value: f64) -> Result<(), Error> { - set_mpv_property(instance, property, json!(value)) - } -} - -impl SetPropertyTypeHandler for usize { - fn set_property_generic(instance: &Mpv, property: &str, value: usize) -> Result<(), Error> { - set_mpv_property(instance, property, json!(value)) +impl fmt::Debug for Mpv { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: + fmt.debug_struct("Mpv") + .field("name", &"TODO: name") + .finish() } } impl Mpv { - pub fn connect(socket: &str) -> Result { - match UnixStream::connect(socket) { - Ok(stream) => { - let cloned_stream = stream.try_clone().expect("cloning UnixStream"); - return Ok(Mpv { - stream, - reader: BufReader::new(cloned_stream), - name: String::from(socket), - }); - } + pub async fn connect(socket_path: &str) -> Result { + log::debug!("Connecting to mpv socket at {}", socket_path); + + let socket = match UnixStream::connect(socket_path).await { + Ok(stream) => Ok(stream), Err(internal_error) => Err(Error(ErrorCode::ConnectError(internal_error.to_string()))), + }?; + + Self::connect_socket(socket).await + } + + pub async fn connect_socket(socket: UnixStream) -> Result { + let (com_tx, com_rx) = tokio::sync::mpsc::channel(100); + let ipc = MpvIpc::new(socket, com_rx); + + log::debug!("Starting IPC handler"); + tokio::spawn(ipc.run()); + + Ok(Mpv { + command_sender: com_tx, + }) + } + + pub async fn disconnect(&self) -> Result<(), Error> { + let (res_tx, res_rx) = oneshot::channel(); + self.command_sender + .send((MpvIpcCommand::Exit, res_tx)) + .await + .map_err(|_| { + Error(ErrorCode::ConnectError( + "Failed to send command".to_string(), + )) + })?; + match res_rx.await { + Ok(MpvIpcResponse::ResultResponse(result)) => result, + _ => Err(Error(ErrorCode::UnexpectedResult)), } } - pub fn disconnect(&self) { - let mut stream = &self.stream; - stream - .shutdown(std::net::Shutdown::Both) - .expect("socket disconnect"); - let mut buffer = [0; 32]; - for _ in 0..stream.bytes().count() { - stream.read(&mut buffer[..]).unwrap(); - } - } + // pub fn get_stream_ref(&self) -> &UnixStream { + // &self.stream + // } - pub fn get_stream_ref(&self) -> &UnixStream { - &self.stream - } - - pub fn get_metadata(&self) -> Result, Error> { - match get_mpv_property(self, "metadata") { + pub async fn get_metadata(&self) -> Result, Error> { + match self.get_property("metadata").await { Ok(map) => Ok(map), Err(err) => Err(err), } } - pub fn get_playlist(&self) -> Result { - match get_mpv_property::>(self, "playlist") { + pub async fn get_playlist(&self) -> Result { + match self.get_property::>("playlist").await { Ok(entries) => Ok(Playlist(entries)), Err(msg) => Err(msg), } @@ -375,15 +344,18 @@ impl Mpv { /// # Example /// ``` /// use mpvipc::{Mpv, Error}; - /// fn main() -> Result<(), Error> { + /// async fn main() -> Result<(), Error> { /// let mpv = Mpv::connect("/tmp/mpvsocket")?; - /// let paused: bool = mpv.get_property("pause")?; - /// let title: String = mpv.get_property("media-title")?; + /// let paused: bool = mpv.get_property("pause").await?; + /// let title: String = mpv.get_property("media-title").await?; /// Ok(()) /// } /// ``` - pub fn get_property(&self, property: &str) -> Result { - T::get_property_generic(self, property) + pub async fn get_property( + &self, + property: &str, + ) -> Result { + T::get_property_generic(self, property).await } /// # Description @@ -405,12 +377,24 @@ impl Mpv { /// Ok(()) /// } /// ``` - pub fn get_property_string(&self, property: &str) -> Result { - get_mpv_property_string(self, property) + pub async fn get_property_value(&self, property: &str) -> Result { + let (res_tx, res_rx) = oneshot::channel(); + self.command_sender + .send((MpvIpcCommand::GetProperty(property.to_owned()), res_tx)) + .await + .map_err(|_| { + Error(ErrorCode::ConnectError( + "Failed to send command".to_string(), + )) + })?; + match res_rx.await { + Ok(MpvIpcResponse::ValueResponse(response)) => response, + _ => Err(Error(ErrorCode::UnexpectedResult)), + } } - pub fn kill(&self) -> Result<(), Error> { - self.run_command(MpvCommand::Quit) + pub async fn kill(&self) -> Result<(), Error> { + self.run_command(MpvCommand::Quit).await } /// # Description @@ -426,42 +410,44 @@ impl Mpv { /// println!("{:?}", event); /// } /// ``` - pub fn event_listen(&mut self) -> Result { - listen(self) + // pub fn event_listen(&mut self) -> Result { + // listen(self) + // } + + // pub fn event_listen_raw(&mut self) -> String { + // listen_raw(self) + // } + + pub async fn next(&self) -> Result<(), Error> { + self.run_command(MpvCommand::PlaylistNext).await } - pub fn event_listen_raw(&mut self) -> String { - listen_raw(self) - } - - pub fn next(&self) -> Result<(), Error> { - self.run_command(MpvCommand::PlaylistNext) - } - - pub fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> { + pub async fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> { self.run_command(MpvCommand::Observe { - id: id, + id, property: property.to_string(), }) + .await } - pub fn unobserve_property(&self, id: isize) -> Result<(), Error> { - self.run_command(MpvCommand::Unobserve(id)) + pub async fn unobserve_property(&self, id: isize) -> Result<(), Error> { + self.run_command(MpvCommand::Unobserve(id)).await } - pub fn pause(&self) -> Result<(), Error> { - set_mpv_property(self, "pause", json!(true)) + pub async fn pause(&self) -> Result<(), Error> { + self.set_property("pause", true).await } - pub fn prev(&self) -> Result<(), Error> { - self.run_command(MpvCommand::PlaylistPrev) + pub async fn prev(&self) -> Result<(), Error> { + self.run_command(MpvCommand::PlaylistPrev).await } - pub fn restart(&self) -> Result<(), Error> { + pub async fn restart(&self) -> Result<(), Error> { self.run_command(MpvCommand::Seek { seconds: 0f64, option: SeekOptions::Absolute, }) + .await } /// # Description @@ -490,183 +476,241 @@ impl Mpv { /// Ok(()) /// } /// ``` - pub fn run_command(&self, command: MpvCommand) -> Result<(), Error> { - match command { - MpvCommand::LoadFile { file, option } => run_mpv_command( - self, - "loadfile", - &[ - file.as_ref(), - match option { - PlaylistAddOptions::Append => "append", - PlaylistAddOptions::Replace => "replace", - }, - ], - ), - MpvCommand::LoadList { file, option } => run_mpv_command( - self, - "loadlist", - &[ - file.as_ref(), - match option { - PlaylistAddOptions::Append => "append", - PlaylistAddOptions::Replace => "replace", - }, - ], - ), - MpvCommand::Observe { id, property } => observe_mpv_property(self, &id, &property), - MpvCommand::PlaylistClear => run_mpv_command(self, "playlist-clear", &[]), + pub async fn run_command(&self, command: MpvCommand) -> Result<(), Error> { + log::trace!("Running command: {:?}", command); + let result = match command { + MpvCommand::LoadFile { file, option } => { + self.run_command_raw( + "loadfile", + &[ + file.as_ref(), + match option { + PlaylistAddOptions::Append => "append", + PlaylistAddOptions::Replace => "replace", + }, + ], + ) + .await + } + MpvCommand::LoadList { file, option } => { + self.run_command_raw( + "loadlist", + &[ + file.as_ref(), + match option { + PlaylistAddOptions::Append => "append", + PlaylistAddOptions::Replace => "replace", + }, + ], + ) + .await + } + MpvCommand::Observe { id, property } => { + let (res_tx, res_rx) = oneshot::channel(); + self.command_sender + .send((MpvIpcCommand::ObserveProperty(id, property), res_tx)) + .await + .map_err(|_| { + Error(ErrorCode::ConnectError( + "Failed to send command".to_string(), + )) + })?; + match res_rx.await { + Ok(MpvIpcResponse::ResultResponse(result)) => result, + _ => Err(Error(ErrorCode::UnexpectedResult)), + } + } + MpvCommand::PlaylistClear => self.run_command_raw("playlist-clear", &[]).await, MpvCommand::PlaylistMove { from, to } => { - run_mpv_command(self, "playlist-move", &[&from.to_string(), &to.to_string()]) + self.run_command_raw("playlist-move", &[&from.to_string(), &to.to_string()]) + .await } - MpvCommand::PlaylistNext => run_mpv_command(self, "playlist-next", &[]), - MpvCommand::PlaylistPrev => run_mpv_command(self, "playlist-prev", &[]), + MpvCommand::PlaylistNext => self.run_command_raw("playlist-next", &[]).await, + MpvCommand::PlaylistPrev => self.run_command_raw("playlist-prev", &[]).await, MpvCommand::PlaylistRemove(id) => { - run_mpv_command(self, "playlist-remove", &[&id.to_string()]) + self.run_command_raw("playlist-remove", &[&id.to_string()]) + .await } - MpvCommand::PlaylistShuffle => run_mpv_command(self, "playlist-shuffle", &[]), - MpvCommand::Quit => run_mpv_command(self, "quit", &[]), + MpvCommand::PlaylistShuffle => self.run_command_raw("playlist-shuffle", &[]).await, + MpvCommand::Quit => self.run_command_raw("quit", &[]).await, MpvCommand::ScriptMessage(args) => { let str_args: Vec<_> = args.iter().map(String::as_str).collect(); - run_mpv_command(self, "script-message", &str_args) + self.run_command_raw("script-message", &str_args).await } MpvCommand::ScriptMessageTo { target, args } => { let mut cmd_args: Vec<_> = vec![target.as_str()]; let mut str_args: Vec<_> = args.iter().map(String::as_str).collect(); cmd_args.append(&mut str_args); - run_mpv_command(self, "script-message-to", &cmd_args) + self.run_command_raw("script-message-to", &cmd_args).await } - MpvCommand::Seek { seconds, option } => run_mpv_command( - self, - "seek", - &[ - &seconds.to_string(), - match option { - SeekOptions::Absolute => "absolute", - SeekOptions::Relative => "relative", - SeekOptions::AbsolutePercent => "absolute-percent", - SeekOptions::RelativePercent => "relative-percent", - }, - ], - ), - MpvCommand::Stop => run_mpv_command(self, "stop", &[]), - MpvCommand::Unobserve(id) => unobserve_mpv_property(self, &id), - } + MpvCommand::Seek { seconds, option } => { + self.run_command_raw( + "seek", + &[ + &seconds.to_string(), + match option { + SeekOptions::Absolute => "absolute", + SeekOptions::Relative => "relative", + SeekOptions::AbsolutePercent => "absolute-percent", + SeekOptions::RelativePercent => "relative-percent", + }, + ], + ) + .await + } + MpvCommand::Stop => self.run_command_raw("stop", &[]).await, + MpvCommand::Unobserve(id) => { + let (res_tx, res_rx) = oneshot::channel(); + self.command_sender + .send((MpvIpcCommand::UnobserveProperty(id), res_tx)) + .await + .unwrap(); + match res_rx.await { + Ok(MpvIpcResponse::ResultResponse(result)) => result, + _ => Err(Error(ErrorCode::UnexpectedResult)), + } + } + }; + log::trace!("Command result: {:?}", result); + result } /// Run a custom command. /// This should only be used if the desired command is not implemented /// with [MpvCommand]. - pub fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<(), Error> { - run_mpv_command(self, command, args) + pub async fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<(), Error> { + let command = Vec::from( + [command] + .iter() + .chain(args.iter()) + .map(|s| s.to_string()) + .collect::>() + .as_slice(), + ); + let (res_tx, res_rx) = oneshot::channel(); + self.command_sender + .send((MpvIpcCommand::Command(command), res_tx)) + .await + .map_err(|_| { + Error(ErrorCode::ConnectError( + "Failed to send command".to_string(), + )) + })?; + + match res_rx.await { + Ok(MpvIpcResponse::ResultResponse(result)) => result, + _ => Err(Error(ErrorCode::UnexpectedResult)), + } } - pub fn playlist_add( + pub async fn playlist_add( &self, file: &str, file_type: PlaylistAddTypeOptions, option: PlaylistAddOptions, ) -> Result<(), Error> { match file_type { - PlaylistAddTypeOptions::File => self.run_command(MpvCommand::LoadFile { - file: file.to_string(), - option, - }), + PlaylistAddTypeOptions::File => { + self.run_command(MpvCommand::LoadFile { + file: file.to_string(), + option, + }) + .await + } - PlaylistAddTypeOptions::Playlist => self.run_command(MpvCommand::LoadList { - file: file.to_string(), - option, - }), + PlaylistAddTypeOptions::Playlist => { + self.run_command(MpvCommand::LoadList { + file: file.to_string(), + option, + }) + .await + } } } - pub fn playlist_clear(&self) -> Result<(), Error> { - self.run_command(MpvCommand::PlaylistClear) + pub async fn playlist_clear(&self) -> Result<(), Error> { + self.run_command(MpvCommand::PlaylistClear).await } - pub fn playlist_move_id(&self, from: usize, to: usize) -> Result<(), Error> { + pub async fn playlist_move_id(&self, from: usize, to: usize) -> Result<(), Error> { self.run_command(MpvCommand::PlaylistMove { from, to }) + .await } - pub fn playlist_play_id(&self, id: usize) -> Result<(), Error> { - set_mpv_property(self, "playlist-pos", json!(id)) + pub async fn playlist_play_id(&self, id: usize) -> Result<(), Error> { + self.set_property("playlist-pos", id).await } - pub fn playlist_play_next(&self, id: usize) -> Result<(), Error> { - match get_mpv_property::(self, "playlist-pos") { - Ok(current_id) => self.run_command(MpvCommand::PlaylistMove { - from: id, - to: current_id + 1, - }), + pub async fn playlist_play_next(&self, id: usize) -> Result<(), Error> { + match self.get_property::("playlist-pos").await { + Ok(current_id) => { + self.run_command(MpvCommand::PlaylistMove { + from: id, + to: current_id + 1, + }) + .await + } Err(msg) => Err(msg), } } - pub fn playlist_remove_id(&self, id: usize) -> Result<(), Error> { - self.run_command(MpvCommand::PlaylistRemove(id)) + pub async fn playlist_remove_id(&self, id: usize) -> Result<(), Error> { + self.run_command(MpvCommand::PlaylistRemove(id)).await } - pub fn playlist_shuffle(&self) -> Result<(), Error> { - self.run_command(MpvCommand::PlaylistShuffle) + pub async fn playlist_shuffle(&self) -> Result<(), Error> { + self.run_command(MpvCommand::PlaylistShuffle).await } - pub fn seek(&self, seconds: f64, option: SeekOptions) -> Result<(), Error> { - self.run_command(MpvCommand::Seek { seconds, option }) + pub async fn seek(&self, seconds: f64, option: SeekOptions) -> Result<(), Error> { + self.run_command(MpvCommand::Seek { seconds, option }).await } - pub fn set_loop_file(&self, option: Switch) -> Result<(), Error> { + pub async fn set_loop_file(&self, option: Switch) -> Result<(), Error> { let mut enabled = false; match option { Switch::On => enabled = true, Switch::Off => {} - Switch::Toggle => match get_mpv_property_string(self, "loop-file") { - Ok(value) => match value.as_ref() { - "false" => { - enabled = true; - } - _ => { - enabled = false; - } - }, + Switch::Toggle => match self.get_property_value("loop-file").await { + Ok(Value::Bool(b)) => { + enabled = b; + } + Ok(_) => return Err(Error(ErrorCode::ValueDoesNotContainBool)), Err(msg) => return Err(msg), }, } - set_mpv_property(self, "loop-file", json!(enabled)) + self.set_property("loop-file", enabled).await } - pub fn set_loop_playlist(&self, option: Switch) -> Result<(), Error> { + pub async fn set_loop_playlist(&self, option: Switch) -> Result<(), Error> { let mut enabled = false; match option { Switch::On => enabled = true, Switch::Off => {} - Switch::Toggle => match get_mpv_property_string(self, "loop-playlist") { - Ok(value) => match value.as_ref() { - "false" => { - enabled = true; - } - _ => { - enabled = false; - } - }, + Switch::Toggle => match self.get_property_value("loop-playlist").await { + Ok(Value::Bool(b)) => { + enabled = b; + } + Ok(_) => return Err(Error(ErrorCode::ValueDoesNotContainBool)), Err(msg) => return Err(msg), }, } - set_mpv_property(self, "loop-playlist", json!(enabled)) + self.set_property("loop-playlist", enabled).await } - pub fn set_mute(&self, option: Switch) -> Result<(), Error> { + pub async fn set_mute(&self, option: Switch) -> Result<(), Error> { let mut enabled = false; match option { Switch::On => enabled = true, Switch::Off => {} - Switch::Toggle => match get_mpv_property::(self, "mute") { + Switch::Toggle => match self.get_property::("mute").await { Ok(value) => { enabled = !value; } Err(msg) => return Err(msg), }, } - set_mpv_property(self, "mute", json!(enabled)) + self.set_property("mute", enabled).await } /// # Description @@ -687,63 +731,67 @@ impl Mpv { /// # Example /// ``` /// use mpvipc::{Mpv, Error}; - /// fn main() -> Result<(), Error> { + /// fn async main() -> Result<(), Error> { /// let mpv = Mpv::connect("/tmp/mpvsocket")?; - /// mpv.set_property("pause", true)?; + /// mpv.set_property("pause", true).await?; /// Ok(()) /// } /// ``` - pub fn set_property>( + pub async fn set_property>( &self, property: &str, value: T, ) -> Result<(), Error> { - T::set_property_generic(self, property, value) + T::set_property_generic(self, property, value).await } - pub fn set_speed(&self, input_speed: f64, option: NumberChangeOptions) -> Result<(), Error> { - match get_mpv_property::(self, "speed") { + pub async fn set_speed( + &self, + input_speed: f64, + option: NumberChangeOptions, + ) -> Result<(), Error> { + match self.get_property::("speed").await { Ok(speed) => match option { NumberChangeOptions::Increase => { - set_mpv_property(self, "speed", json!(speed + input_speed)) + self.set_property("speed", speed + input_speed).await } NumberChangeOptions::Decrease => { - set_mpv_property(self, "speed", json!(speed - input_speed)) + self.set_property("speed", speed - input_speed).await } - NumberChangeOptions::Absolute => { - set_mpv_property(self, "speed", json!(input_speed)) - } + NumberChangeOptions::Absolute => self.set_property("speed", input_speed).await, }, Err(msg) => Err(msg), } } - pub fn set_volume(&self, input_volume: f64, option: NumberChangeOptions) -> Result<(), Error> { - match get_mpv_property::(self, "volume") { + pub async fn set_volume( + &self, + input_volume: f64, + option: NumberChangeOptions, + ) -> Result<(), Error> { + match self.get_property::("volume").await { Ok(volume) => match option { NumberChangeOptions::Increase => { - set_mpv_property(self, "volume", json!(volume + input_volume)) + self.set_property("volume", volume + input_volume).await } NumberChangeOptions::Decrease => { - set_mpv_property(self, "volume", json!(volume - input_volume)) + self.set_property("volume", volume - input_volume).await } - NumberChangeOptions::Absolute => { - set_mpv_property(self, "volume", json!(input_volume)) - } + NumberChangeOptions::Absolute => self.set_property("volume", input_volume).await, }, Err(msg) => Err(msg), } } - pub fn stop(&self) -> Result<(), Error> { - self.run_command(MpvCommand::Stop) + pub async fn stop(&self) -> Result<(), Error> { + self.run_command(MpvCommand::Stop).await } - pub fn toggle(&self) -> Result<(), Error> { - run_mpv_command(self, "cycle", &["pause"]) + pub async fn toggle(&self) -> Result<(), Error> { + self.run_command_raw("cycle", &["pause"]).await } } diff --git a/src/ipc.rs b/src/ipc.rs index a7ce542..d896757 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -1,248 +1,145 @@ -use crate::message_parser::TypeHandler; - -use self::message_parser::extract_mpv_response_data; -use self::message_parser::json_array_to_playlist; -use self::message_parser::json_array_to_vec; -use self::message_parser::json_map_to_hashmap; - use super::*; -use log::{debug, warn}; -use serde_json::json; -use serde_json::Value; -use std::io::BufRead; -use std::io::BufReader; -use std::io::Write; +use futures::{SinkExt, StreamExt}; +use serde_json::{json, Value}; +use std::mem; +use tokio::net::UnixStream; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{oneshot, Mutex}; +use tokio_util::codec::{Framed, LinesCodec}; -pub fn get_mpv_property(instance: &Mpv, property: &str) -> Result { - let ipc_string = json!({"command": ["get_property", property]}); - match serde_json::from_str::(&send_command_sync(instance, ipc_string)) { - Ok(val) => T::get_value(val), - Err(why) => Err(Error(ErrorCode::JsonParseError(why.to_string()))), - } +pub(crate) struct MpvIpc { + socket: Framed, + command_channel: Receiver<(MpvIpcCommand, oneshot::Sender)>, + socket_lock: Mutex<()>, } -pub fn get_mpv_property_string(instance: &Mpv, property: &str) -> Result { - let ipc_string = json!({"command": ["get_property", property]}); - let val = serde_json::from_str::(&send_command_sync(instance, ipc_string)) - .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?; - - let data = extract_mpv_response_data(&val)?; - - match data { - Value::Bool(b) => Ok(b.to_string()), - Value::Number(ref n) => Ok(n.to_string()), - Value::String(ref s) => Ok(s.to_string()), - Value::Array(ref array) => Ok(format!("{:?}", array)), - Value::Object(ref map) => Ok(format!("{:?}", map)), - Value::Null => Err(Error(ErrorCode::MissingValue)), - } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum MpvIpcCommand { + Command(Vec), + GetProperty(String), + SetProperty(String, Value), + ObserveProperty(isize, String), + UnobserveProperty(isize), + Exit, } -fn validate_mpv_response(response: &str) -> Result<(), Error> { - serde_json::from_str::(response) - .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string()))) - .and_then(|value| extract_mpv_response_data(&value).map(|_| ())) +#[derive(Debug, Clone)] +pub(crate) enum MpvIpcResponse { + ValueResponse(Result), + ResultResponse(Result<(), Error>), } -pub fn set_mpv_property(instance: &Mpv, property: &str, value: Value) -> Result<(), Error> { - let ipc_string = json!({ - "command": ["set_property", property, value] - }); - - let response = &send_command_sync(instance, ipc_string); - validate_mpv_response(response) -} - -pub fn run_mpv_command(instance: &Mpv, command: &str, args: &[&str]) -> Result<(), Error> { - let mut ipc_string = json!({ - "command": [command] - }); - if let Value::Array(args_array) = &mut ipc_string["command"] { - for arg in args { - args_array.push(json!(arg)); +impl MpvIpc { + pub(crate) fn new( + socket: UnixStream, + command_channel: Receiver<(MpvIpcCommand, oneshot::Sender)>, + ) -> Self { + MpvIpc { + socket: Framed::new(socket, LinesCodec::new()), + command_channel, + socket_lock: Mutex::new(()), } } - let response = &send_command_sync(instance, ipc_string); - validate_mpv_response(response) -} - -pub fn observe_mpv_property(instance: &Mpv, id: &isize, property: &str) -> Result<(), Error> { - let ipc_string = json!({ - "command": ["observe_property", id, property] - }); - - let response = &send_command_sync(instance, ipc_string); - validate_mpv_response(response) -} - -pub fn unobserve_mpv_property(instance: &Mpv, id: &isize) -> Result<(), Error> { - let ipc_string = json!({ - "command": ["unobserve_property", id] - }); - - let response = &send_command_sync(instance, ipc_string); - validate_mpv_response(response) -} - -fn try_convert_property(name: &str, id: usize, data: MpvDataType) -> Event { - let property = match name { - "path" => match data { - MpvDataType::String(value) => Property::Path(Some(value)), - MpvDataType::Null => Property::Path(None), - _ => unimplemented!(), - }, - "pause" => match data { - MpvDataType::Bool(value) => Property::Pause(value), - _ => unimplemented!(), - }, - "playback-time" => match data { - MpvDataType::Double(value) => Property::PlaybackTime(Some(value)), - MpvDataType::Null => Property::PlaybackTime(None), - _ => unimplemented!(), - }, - "duration" => match data { - MpvDataType::Double(value) => Property::Duration(Some(value)), - MpvDataType::Null => Property::Duration(None), - _ => unimplemented!(), - }, - "metadata" => match data { - MpvDataType::HashMap(value) => Property::Metadata(Some(value)), - MpvDataType::Null => Property::Metadata(None), - _ => unimplemented!(), - }, - _ => { - warn!("Property {} not implemented", name); - Property::Unknown { - name: name.to_string(), - data, - } - } - }; - Event::PropertyChange { id, property } -} - -pub fn listen(instance: &mut Mpv) -> Result { - let mut e; - // sometimes we get responses unrelated to events, so we read a new line until we receive one - // with an event field - let name = loop { - let mut response = String::new(); - instance.reader.read_line(&mut response).unwrap(); - response = response.trim_end().to_string(); - debug!("Event: {}", response); - - e = serde_json::from_str::(&response) + pub(crate) async fn send_command(&mut self, command: &[&str]) -> Result { + let lock = self.socket_lock.lock().await; + // START CRITICAL SECTION + let ipc_command = json!({ "command": command }); + let ipc_command_str = serde_json::to_string(&ipc_command) .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?; - match e["event"] { - Value::String(ref name) => break name, - _ => { - // It was not an event - try again - debug!("Bad response: {:?}", response) + log::trace!("Sending command: {}", ipc_command_str); + + self.socket + .send(ipc_command_str) + .await + .map_err(|why| Error(ErrorCode::ConnectError(why.to_string())))?; + + let response = self + .socket + .next() + .await + .ok_or(Error(ErrorCode::MissingValue))? + .map_err(|why| Error(ErrorCode::ConnectError(why.to_string())))?; + + // END CRITICAL SECTION + mem::drop(lock); + + log::trace!("Received response: {}", response); + + serde_json::from_str::(&response) + .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string()))) + } + + pub(crate) async fn get_mpv_property(&mut self, property: &str) -> Result { + self.send_command(&["get_property", property]).await + } + + pub(crate) async fn set_mpv_property( + &mut self, + property: &str, + value: Value, + ) -> Result<(), Error> { + let str_value = serde_json::to_string(&value).unwrap(); + self.send_command(&["set_property", property, &str_value]) + .await + .map(|_| ()) + } + + pub(crate) async fn observe_property( + &mut self, + id: isize, + property: &str, + ) -> Result<(), Error> { + self.send_command(&["observe_property", &id.to_string(), property]) + .await + .map(|_| ()) + } + + pub(crate) async fn unobserve_property(&mut self, id: isize) -> Result<(), Error> { + self.send_command(&["unobserve_property", &id.to_string()]) + .await + .map(|_| ()) + } + + pub(crate) async fn run(mut self) -> Result<(), Error> { + loop { + tokio::select! { + Some(event) = self.socket.next() => { + log::trace!("Handling event: {:?}", event); + // TODO: handle event + } + Some((cmd, tx)) = self.command_channel.recv() => { + log::trace!("Handling command: {:?}", cmd); + match cmd { + MpvIpcCommand::Command(command) => { + let refs = command.iter().map(|s| s.as_str()).collect::>(); + let response = self.send_command(refs.as_slice()).await; + tx.send(MpvIpcResponse::ValueResponse(response)).unwrap() + } + MpvIpcCommand::GetProperty(property) => { + let response = self.get_mpv_property(&property).await; + tx.send(MpvIpcResponse::ValueResponse(response)).unwrap() + } + MpvIpcCommand::SetProperty(property, value) => { + let response = self.set_mpv_property(&property, value).await; + tx.send(MpvIpcResponse::ResultResponse(response)).unwrap() + } + MpvIpcCommand::ObserveProperty(id, property) => { + let response = self.observe_property(id, &property).await; + tx.send(MpvIpcResponse::ResultResponse(response)).unwrap() + } + MpvIpcCommand::UnobserveProperty(id) => { + let response = self.unobserve_property(id).await; + tx.send(MpvIpcResponse::ResultResponse(response)).unwrap() + } + MpvIpcCommand::Exit => { + tx.send(MpvIpcResponse::ResultResponse(Ok(()))).unwrap(); + return Ok(()); + } + } + } } } - }; - - let event = match name.as_str() { - "shutdown" => Event::Shutdown, - "start-file" => Event::StartFile, - "file-loaded" => Event::FileLoaded, - "seek" => Event::Seek, - "playback-restart" => Event::PlaybackRestart, - "idle" => Event::Idle, - "tick" => Event::Tick, - "video-reconfig" => Event::VideoReconfig, - "audio-reconfig" => Event::AudioReconfig, - "tracks-changed" => Event::TracksChanged, - "track-switched" => Event::TrackSwitched, - "pause" => Event::Pause, - "unpause" => Event::Unpause, - "metadata-update" => Event::MetadataUpdate, - "chapter-change" => Event::ChapterChange, - "end-file" => Event::EndFile, - "property-change" => { - let name = match e["name"] { - Value::String(ref n) => Ok(n.to_string()), - _ => Err(Error(ErrorCode::JsonContainsUnexptectedType)), - }?; - - let id: usize = match e["id"] { - Value::Number(ref n) => n.as_u64().unwrap() as usize, - _ => 0, - }; - - let data: MpvDataType = match e["data"] { - Value::String(ref n) => MpvDataType::String(n.to_string()), - - Value::Array(ref a) => { - if name == "playlist".to_string() { - MpvDataType::Playlist(Playlist(json_array_to_playlist(a))) - } else { - MpvDataType::Array(json_array_to_vec(a)) - } - } - - Value::Bool(b) => MpvDataType::Bool(b), - - Value::Number(ref n) => { - if n.is_u64() { - MpvDataType::Usize(n.as_u64().unwrap() as usize) - } else if n.is_f64() { - MpvDataType::Double(n.as_f64().unwrap()) - } else { - return Err(Error(ErrorCode::JsonContainsUnexptectedType)); - } - } - - Value::Object(ref m) => MpvDataType::HashMap(json_map_to_hashmap(m)), - - Value::Null => MpvDataType::Null, - }; - - try_convert_property(name.as_ref(), id, data) - } - "client-message" => { - let args = match e["args"] { - Value::Array(ref a) => json_array_to_vec(a) - .iter() - .map(|arg| match arg { - MpvDataType::String(s) => Ok(s.to_owned()), - _ => Err(Error(ErrorCode::JsonContainsUnexptectedType)), - }) - .collect::, _>>(), - _ => return Err(Error(ErrorCode::JsonContainsUnexptectedType)), - }?; - Event::ClientMessage { args } - } - _ => Event::Unimplemented, - }; - Ok(event) -} - -pub fn listen_raw(instance: &mut Mpv) -> String { - let mut response = String::new(); - instance.reader.read_line(&mut response).unwrap(); - response.trim_end().to_string() -} - -fn send_command_sync(instance: &Mpv, command: Value) -> String { - let stream = &instance.stream; - match serde_json::to_writer(stream, &command) { - Err(why) => panic!("Error: Could not write to socket: {}", why), - Ok(_) => { - let mut stream = stream; - stream.write_all(b"\n").unwrap(); - let mut response = String::new(); - { - let mut reader = BufReader::new(stream); - while !response.contains("\"error\":") { - response.clear(); - reader.read_line(&mut response).unwrap(); - } - } - debug!("Response: {}", response.trim_end()); - response - } } } diff --git a/src/message_parser.rs b/src/message_parser.rs index 764add9..ca4cac2 100644 --- a/src/message_parser.rs +++ b/src/message_parser.rs @@ -9,8 +9,9 @@ pub trait TypeHandler: Sized { fn as_string(&self) -> String; } -pub(crate) fn extract_mpv_response_data(value: &Value) -> Result<&Value, Error> { - value +pub(crate) fn parse_mpv_response_data(value: &Value) -> Result<&Value, Error> { + log::trace!("Parsing mpv response data: {:?}", value); + let result = value .as_object() .map(|o| (o.get("error").and_then(|e| e.as_str()), o.get("data"))) .ok_or(Error(ErrorCode::UnexpectedValue)) @@ -18,12 +19,17 @@ pub(crate) fn extract_mpv_response_data(value: &Value) -> Result<&Value, Error> Some("success") => data.ok_or(Error(ErrorCode::UnexpectedValue)), Some(e) => Err(Error(ErrorCode::MpvError(e.to_string()))), None => Err(Error(ErrorCode::UnexpectedValue)), - }) + }); + match &result { + Ok(v) => log::trace!("Successfully parsed mpv response data: {:?}", v), + Err(e) => log::trace!("Error parsing mpv response data: {:?}", e), + } + result } impl TypeHandler for String { fn get_value(value: Value) -> Result { - extract_mpv_response_data(&value) + parse_mpv_response_data(&value) .and_then(|d| { d.as_str() .ok_or(Error(ErrorCode::ValueDoesNotContainString)) @@ -38,7 +44,7 @@ impl TypeHandler for String { impl TypeHandler for bool { fn get_value(value: Value) -> Result { - extract_mpv_response_data(&value) + parse_mpv_response_data(&value) .and_then(|d| d.as_bool().ok_or(Error(ErrorCode::ValueDoesNotContainBool))) } @@ -53,7 +59,7 @@ impl TypeHandler for bool { impl TypeHandler for f64 { fn get_value(value: Value) -> Result { - extract_mpv_response_data(&value) + parse_mpv_response_data(&value) .and_then(|d| d.as_f64().ok_or(Error(ErrorCode::ValueDoesNotContainF64))) } @@ -64,7 +70,7 @@ impl TypeHandler for f64 { impl TypeHandler for usize { fn get_value(value: Value) -> Result { - extract_mpv_response_data(&value) + parse_mpv_response_data(&value) .and_then(|d| d.as_u64().ok_or(Error(ErrorCode::ValueDoesNotContainUsize))) .map(|u| u as usize) } @@ -76,7 +82,7 @@ impl TypeHandler for usize { impl TypeHandler for HashMap { fn get_value(value: Value) -> Result, Error> { - extract_mpv_response_data(&value) + parse_mpv_response_data(&value) .and_then(|d| { d.as_object() .ok_or(Error(ErrorCode::ValueDoesNotContainHashMap)) @@ -91,7 +97,7 @@ impl TypeHandler for HashMap { impl TypeHandler for Vec { fn get_value(value: Value) -> Result, Error> { - extract_mpv_response_data(&value) + parse_mpv_response_data(&value) .and_then(|d| { d.as_array() .ok_or(Error(ErrorCode::ValueDoesNotContainPlaylist))