From 0a7a256f8d7b96444434d2e5c91c5b0b19356dcb Mon Sep 17 00:00:00 2001 From: h7x4 Date: Wed, 17 Apr 2024 00:07:11 +0200 Subject: [PATCH] WIP: async mpv socket de/muxing broker thingamabob --- src/api.rs | 502 +++++++++++++++++++++++++++-------------------------- src/ipc.rs | 323 ++++++++++------------------------ 2 files changed, 347 insertions(+), 478 deletions(-) diff --git a/src/api.rs b/src/api.rs index d70bef4..ac7254e 100644 --- a/src/api.rs +++ b/src/api.rs @@ -2,13 +2,12 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use std::collections::HashMap; use std::fmt::{self, Display}; -use std::io::{BufReader, Read}; use std::os::unix::net::UnixStream; +use std::path::Path; +use std::sync::{Arc, Mutex}; -use crate::ipc::{ - get_mpv_property, get_mpv_property_string, listen, listen_raw, observe_mpv_property, - run_mpv_command, set_mpv_property, unobserve_mpv_property, -}; +use crate::ipc::MpvIpc; +use crate::message_parser::TypeHandler; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Event { @@ -152,18 +151,17 @@ pub struct PlaylistEntry { pub current: bool, } -pub struct Mpv { - pub(crate) stream: UnixStream, - pub(crate) reader: BufReader, - pub(crate) name: String, -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Playlist(pub Vec); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Error(pub ErrorCode); +#[derive(Clone)] +pub struct Mpv { + pub(crate) ipc: Arc>, +} + impl Drop for Mpv { fn drop(&mut self) { self.disconnect(); @@ -172,31 +170,34 @@ impl Drop for Mpv { impl fmt::Debug for Mpv { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_tuple("Mpv").field(&self.name).finish() + // TODO: + fmt.debug_struct("Mpv") + .field("name", &"TODO: name") + .finish() } } -impl Clone for Mpv { - fn clone(&self) -> Self { - let stream = self.stream.try_clone().expect("cloning UnixStream"); - let cloned_stream = stream.try_clone().expect("cloning UnixStream"); - Mpv { - stream, - reader: BufReader::new(cloned_stream), - name: self.name.clone(), - } - } +// impl Clone for Mpv { +// fn clone(&self) -> Self { +// let stream = self.stream.try_clone().expect("cloning UnixStream"); +// let cloned_stream = stream.try_clone().expect("cloning UnixStream"); +// Mpv { +// stream, +// reader: BufReader::new(cloned_stream), +// name: self.name.clone(), +// } +// } - fn clone_from(&mut self, source: &Self) { - let stream = source.stream.try_clone().expect("cloning UnixStream"); - let cloned_stream = stream.try_clone().expect("cloning UnixStream"); - *self = Mpv { - stream, - reader: BufReader::new(cloned_stream), - name: source.name.clone(), - } - } -} +// fn clone_from(&mut self, source: &Self) { +// let stream = source.stream.try_clone().expect("cloning UnixStream"); +// let cloned_stream = stream.try_clone().expect("cloning UnixStream"); +// *self = Mpv { +// stream, +// reader: BufReader::new(cloned_stream), +// name: source.name.clone(), +// } +// } +// } impl Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -242,115 +243,85 @@ impl Display for ErrorCode { } pub trait GetPropertyTypeHandler: Sized { - fn get_property_generic(instance: &Mpv, property: &str) -> Result; + async fn get_property_generic(instance: &Mpv, property: &str) -> Result; } -impl GetPropertyTypeHandler for bool { - fn get_property_generic(instance: &Mpv, property: &str) -> Result { - get_mpv_property::(instance, property) - } -} - -impl GetPropertyTypeHandler for String { - fn get_property_generic(instance: &Mpv, property: &str) -> Result { - get_mpv_property::(instance, property) - } -} - -impl GetPropertyTypeHandler for f64 { - fn get_property_generic(instance: &Mpv, property: &str) -> Result { - get_mpv_property::(instance, property) - } -} - -impl GetPropertyTypeHandler for usize { - fn get_property_generic(instance: &Mpv, property: &str) -> Result { - get_mpv_property::(instance, property) - } -} - -impl GetPropertyTypeHandler for Vec { - fn get_property_generic(instance: &Mpv, property: &str) -> Result, Error> { - get_mpv_property::>(instance, property) - } -} - -impl GetPropertyTypeHandler for HashMap { - fn get_property_generic( - instance: &Mpv, - property: &str, - ) -> Result, Error> { - get_mpv_property::>(instance, property) +impl GetPropertyTypeHandler for T +where + T: TypeHandler, +{ + async fn get_property_generic(instance: &Mpv, property: &str) -> Result { + instance + .ipc + .lock() + .unwrap() + .get_mpv_property::(property) + .await } } pub trait SetPropertyTypeHandler { - fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error>; + async fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error>; } -impl SetPropertyTypeHandler for bool { - fn set_property_generic(instance: &Mpv, property: &str, value: bool) -> Result<(), Error> { - set_mpv_property(instance, property, json!(value)) - } -} - -impl SetPropertyTypeHandler for String { - fn set_property_generic(instance: &Mpv, property: &str, value: String) -> Result<(), Error> { - set_mpv_property(instance, property, json!(value)) - } -} - -impl SetPropertyTypeHandler for f64 { - fn set_property_generic(instance: &Mpv, property: &str, value: f64) -> Result<(), Error> { - set_mpv_property(instance, property, json!(value)) - } -} - -impl SetPropertyTypeHandler for usize { - fn set_property_generic(instance: &Mpv, property: &str, value: usize) -> Result<(), Error> { - set_mpv_property(instance, property, json!(value)) +impl SetPropertyTypeHandler for T +where + T: Serialize, +{ + async fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error> { + instance + .ipc + .lock() + .unwrap() + .set_mpv_property(property, json!(value)) + .await } } impl Mpv { - pub fn connect(socket: &str) -> Result { - match UnixStream::connect(socket) { - Ok(stream) => { - let cloned_stream = stream.try_clone().expect("cloning UnixStream"); - return Ok(Mpv { + pub fn connect(socket_path: &str) -> Result { + match UnixStream::connect(socket_path) { + Ok(stream) => Ok(Mpv { + ipc: Arc::new(Mutex::new(MpvIpc::new( + Path::new(socket_path).to_owned(), stream, - reader: BufReader::new(cloned_stream), - name: String::from(socket), - }); - } + ))), + }), Err(internal_error) => Err(Error(ErrorCode::ConnectError(internal_error.to_string()))), } } pub fn disconnect(&self) { - let mut stream = &self.stream; - stream + let mut ipc = self.ipc.lock().unwrap(); + ipc.socket .shutdown(std::net::Shutdown::Both) .expect("socket disconnect"); let mut buffer = [0; 32]; - for _ in 0..stream.bytes().count() { - stream.read(&mut buffer[..]).unwrap(); - } + // TODO: + // for _ in 0..ipc.socket.bytes().count() { + // ipc.socket.read(&mut buffer[..]).unwrap(); + // } } - pub fn get_stream_ref(&self) -> &UnixStream { - &self.stream - } + // pub fn get_stream_ref(&self) -> &UnixStream { + // &self.stream + // } - pub fn get_metadata(&self) -> Result, Error> { - match get_mpv_property(self, "metadata") { + pub async fn get_metadata(&self) -> Result, Error> { + match self.ipc.lock().unwrap().get_mpv_property("metadata").await { Ok(map) => Ok(map), Err(err) => Err(err), } } - pub fn get_playlist(&self) -> Result { - match get_mpv_property::>(self, "playlist") { + pub async fn get_playlist(&self) -> Result { + match self + .ipc + .lock() + .unwrap() + .get_mpv_property::>("playlist") + .await + { Ok(entries) => Ok(Playlist(entries)), Err(msg) => Err(msg), } @@ -375,15 +346,18 @@ impl Mpv { /// # Example /// ``` /// use mpvipc::{Mpv, Error}; - /// fn main() -> Result<(), Error> { + /// async fn main() -> Result<(), Error> { /// let mpv = Mpv::connect("/tmp/mpvsocket")?; - /// let paused: bool = mpv.get_property("pause")?; - /// let title: String = mpv.get_property("media-title")?; + /// let paused: bool = mpv.get_property("pause").await?; + /// let title: String = mpv.get_property("media-title").await?; /// Ok(()) /// } /// ``` - pub fn get_property(&self, property: &str) -> Result { - T::get_property_generic(self, property) + pub async fn get_property( + &self, + property: &str, + ) -> Result { + T::get_property_generic(self, property).await } /// # Description @@ -405,12 +379,16 @@ impl Mpv { /// Ok(()) /// } /// ``` - pub fn get_property_string(&self, property: &str) -> Result { - get_mpv_property_string(self, property) + pub async fn get_property_string(&self, property: &str) -> Result { + self.ipc + .lock() + .unwrap() + .get_mpv_property_string(property) + .await } - pub fn kill(&self) -> Result<(), Error> { - self.run_command(MpvCommand::Quit) + pub async fn kill(&self) -> Result<(), Error> { + self.run_command(MpvCommand::Quit).await } /// # Description @@ -426,42 +404,44 @@ impl Mpv { /// println!("{:?}", event); /// } /// ``` - pub fn event_listen(&mut self) -> Result { - listen(self) + // pub fn event_listen(&mut self) -> Result { + // listen(self) + // } + + // pub fn event_listen_raw(&mut self) -> String { + // listen_raw(self) + // } + + pub async fn next(&self) -> Result<(), Error> { + self.run_command(MpvCommand::PlaylistNext).await } - pub fn event_listen_raw(&mut self) -> String { - listen_raw(self) - } - - pub fn next(&self) -> Result<(), Error> { - self.run_command(MpvCommand::PlaylistNext) - } - - pub fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> { + pub async fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> { self.run_command(MpvCommand::Observe { id: id, property: property.to_string(), }) + .await } - pub fn unobserve_property(&self, id: isize) -> Result<(), Error> { - self.run_command(MpvCommand::Unobserve(id)) + pub async fn unobserve_property(&self, id: isize) -> Result<(), Error> { + self.run_command(MpvCommand::Unobserve(id)).await } - pub fn pause(&self) -> Result<(), Error> { - set_mpv_property(self, "pause", json!(true)) + pub async fn pause(&self) -> Result<(), Error> { + self.set_property("pause", true).await } - pub fn prev(&self) -> Result<(), Error> { - self.run_command(MpvCommand::PlaylistPrev) + pub async fn prev(&self) -> Result<(), Error> { + self.run_command(MpvCommand::PlaylistPrev).await } - pub fn restart(&self) -> Result<(), Error> { + pub async fn restart(&self) -> Result<(), Error> { self.run_command(MpvCommand::Seek { seconds: 0f64, option: SeekOptions::Absolute, }) + .await } /// # Description @@ -490,136 +470,162 @@ impl Mpv { /// Ok(()) /// } /// ``` - pub fn run_command(&self, command: MpvCommand) -> Result<(), Error> { + pub async fn run_command(&self, command: MpvCommand) -> Result<(), Error> { match command { - MpvCommand::LoadFile { file, option } => run_mpv_command( - self, - "loadfile", - &[ - file.as_ref(), - match option { - PlaylistAddOptions::Append => "append", - PlaylistAddOptions::Replace => "replace", - }, - ], - ), - MpvCommand::LoadList { file, option } => run_mpv_command( - self, - "loadlist", - &[ - file.as_ref(), - match option { - PlaylistAddOptions::Append => "append", - PlaylistAddOptions::Replace => "replace", - }, - ], - ), - MpvCommand::Observe { id, property } => observe_mpv_property(self, &id, &property), - MpvCommand::PlaylistClear => run_mpv_command(self, "playlist-clear", &[]), + MpvCommand::LoadFile { file, option } => { + self.run_command_raw( + "loadfile", + &[ + file.as_ref(), + match option { + PlaylistAddOptions::Append => "append", + PlaylistAddOptions::Replace => "replace", + }, + ], + ) + .await + } + MpvCommand::LoadList { file, option } => { + self.run_command_raw( + "loadlist", + &[ + file.as_ref(), + match option { + PlaylistAddOptions::Append => "append", + PlaylistAddOptions::Replace => "replace", + }, + ], + ) + .await + } + MpvCommand::Observe { id, property } => { + self.ipc + .lock() + .unwrap() + .observe_property(id, &property) + .await + } + MpvCommand::PlaylistClear => self.run_command_raw("playlist-clear", &[]).await, MpvCommand::PlaylistMove { from, to } => { - run_mpv_command(self, "playlist-move", &[&from.to_string(), &to.to_string()]) + self.run_command_raw("playlist-move", &[&from.to_string(), &to.to_string()]) + .await } - MpvCommand::PlaylistNext => run_mpv_command(self, "playlist-next", &[]), - MpvCommand::PlaylistPrev => run_mpv_command(self, "playlist-prev", &[]), + MpvCommand::PlaylistNext => self.run_command_raw("playlist-next", &[]).await, + MpvCommand::PlaylistPrev => self.run_command_raw("playlist-prev", &[]).await, MpvCommand::PlaylistRemove(id) => { - run_mpv_command(self, "playlist-remove", &[&id.to_string()]) + self.run_command_raw("playlist-remove", &[&id.to_string()]) + .await } - MpvCommand::PlaylistShuffle => run_mpv_command(self, "playlist-shuffle", &[]), - MpvCommand::Quit => run_mpv_command(self, "quit", &[]), + MpvCommand::PlaylistShuffle => self.run_command_raw("playlist-shuffle", &[]).await, + MpvCommand::Quit => self.run_command_raw("quit", &[]).await, MpvCommand::ScriptMessage(args) => { let str_args: Vec<_> = args.iter().map(String::as_str).collect(); - run_mpv_command(self, "script-message", &str_args) + self.run_command_raw("script-message", &str_args).await } MpvCommand::ScriptMessageTo { target, args } => { let mut cmd_args: Vec<_> = vec![target.as_str()]; let mut str_args: Vec<_> = args.iter().map(String::as_str).collect(); cmd_args.append(&mut str_args); - run_mpv_command(self, "script-message-to", &cmd_args) + self.run_command_raw("script-message-to", &cmd_args).await } - MpvCommand::Seek { seconds, option } => run_mpv_command( - self, - "seek", - &[ - &seconds.to_string(), - match option { - SeekOptions::Absolute => "absolute", - SeekOptions::Relative => "relative", - SeekOptions::AbsolutePercent => "absolute-percent", - SeekOptions::RelativePercent => "relative-percent", - }, - ], - ), - MpvCommand::Stop => run_mpv_command(self, "stop", &[]), - MpvCommand::Unobserve(id) => unobserve_mpv_property(self, &id), + MpvCommand::Seek { seconds, option } => { + self.run_command_raw( + "seek", + &[ + &seconds.to_string(), + match option { + SeekOptions::Absolute => "absolute", + SeekOptions::Relative => "relative", + SeekOptions::AbsolutePercent => "absolute-percent", + SeekOptions::RelativePercent => "relative-percent", + }, + ], + ) + .await + } + MpvCommand::Stop => self.run_command_raw("stop", &[]).await, + MpvCommand::Unobserve(id) => self.ipc.lock().unwrap().unobserve_property(id).await, } } /// Run a custom command. /// This should only be used if the desired command is not implemented /// with [MpvCommand]. - pub fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<(), Error> { - run_mpv_command(self, command, args) + pub async fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<(), Error> { + let command = [&[command], args].concat(); + self.ipc.lock().unwrap().send_command(&command).await?; + Ok(()) } - pub fn playlist_add( + pub async fn playlist_add( &self, file: &str, file_type: PlaylistAddTypeOptions, option: PlaylistAddOptions, ) -> Result<(), Error> { match file_type { - PlaylistAddTypeOptions::File => self.run_command(MpvCommand::LoadFile { - file: file.to_string(), - option, - }), + PlaylistAddTypeOptions::File => { + self.run_command(MpvCommand::LoadFile { + file: file.to_string(), + option, + }) + .await + } - PlaylistAddTypeOptions::Playlist => self.run_command(MpvCommand::LoadList { - file: file.to_string(), - option, - }), + PlaylistAddTypeOptions::Playlist => { + self.run_command(MpvCommand::LoadList { + file: file.to_string(), + option, + }) + .await + } } } - pub fn playlist_clear(&self) -> Result<(), Error> { - self.run_command(MpvCommand::PlaylistClear) + pub async fn playlist_clear(&self) -> Result<(), Error> { + self.run_command(MpvCommand::PlaylistClear).await } - pub fn playlist_move_id(&self, from: usize, to: usize) -> Result<(), Error> { + pub async fn playlist_move_id(&self, from: usize, to: usize) -> Result<(), Error> { self.run_command(MpvCommand::PlaylistMove { from, to }) + .await } - pub fn playlist_play_id(&self, id: usize) -> Result<(), Error> { - set_mpv_property(self, "playlist-pos", json!(id)) + pub async fn playlist_play_id(&self, id: usize) -> Result<(), Error> { + self.set_property("playlist-pos", id).await } - pub fn playlist_play_next(&self, id: usize) -> Result<(), Error> { - match get_mpv_property::(self, "playlist-pos") { - Ok(current_id) => self.run_command(MpvCommand::PlaylistMove { - from: id, - to: current_id + 1, - }), + pub async fn playlist_play_next(&self, id: usize) -> Result<(), Error> { + match self.get_property::("playlist-pos").await { + Ok(current_id) => { + self.run_command(MpvCommand::PlaylistMove { + from: id, + to: current_id + 1, + }) + .await + } Err(msg) => Err(msg), } } - pub fn playlist_remove_id(&self, id: usize) -> Result<(), Error> { - self.run_command(MpvCommand::PlaylistRemove(id)) + pub async fn playlist_remove_id(&self, id: usize) -> Result<(), Error> { + self.run_command(MpvCommand::PlaylistRemove(id)).await } - pub fn playlist_shuffle(&self) -> Result<(), Error> { - self.run_command(MpvCommand::PlaylistShuffle) + pub async fn playlist_shuffle(&self) -> Result<(), Error> { + self.run_command(MpvCommand::PlaylistShuffle).await } - pub fn seek(&self, seconds: f64, option: SeekOptions) -> Result<(), Error> { - self.run_command(MpvCommand::Seek { seconds, option }) + pub async fn seek(&self, seconds: f64, option: SeekOptions) -> Result<(), Error> { + self.run_command(MpvCommand::Seek { seconds, option }).await } - pub fn set_loop_file(&self, option: Switch) -> Result<(), Error> { + pub async fn set_loop_file(&self, option: Switch) -> Result<(), Error> { let mut enabled = false; match option { Switch::On => enabled = true, Switch::Off => {} - Switch::Toggle => match get_mpv_property_string(self, "loop-file") { + Switch::Toggle => match self.get_property_string("loop-file").await { Ok(value) => match value.as_ref() { "false" => { enabled = true; @@ -631,15 +637,15 @@ impl Mpv { Err(msg) => return Err(msg), }, } - set_mpv_property(self, "loop-file", json!(enabled)) + self.set_property("loop-file", enabled).await } - pub fn set_loop_playlist(&self, option: Switch) -> Result<(), Error> { + pub async fn set_loop_playlist(&self, option: Switch) -> Result<(), Error> { let mut enabled = false; match option { Switch::On => enabled = true, Switch::Off => {} - Switch::Toggle => match get_mpv_property_string(self, "loop-playlist") { + Switch::Toggle => match self.get_property_string("loop-playlist").await { Ok(value) => match value.as_ref() { "false" => { enabled = true; @@ -651,22 +657,22 @@ impl Mpv { Err(msg) => return Err(msg), }, } - set_mpv_property(self, "loop-playlist", json!(enabled)) + self.set_property("loop-playlist", enabled).await } - pub fn set_mute(&self, option: Switch) -> Result<(), Error> { + pub async fn set_mute(&self, option: Switch) -> Result<(), Error> { let mut enabled = false; match option { Switch::On => enabled = true, Switch::Off => {} - Switch::Toggle => match get_mpv_property::(self, "mute") { + Switch::Toggle => match self.get_property::("mute").await { Ok(value) => { enabled = !value; } Err(msg) => return Err(msg), }, } - set_mpv_property(self, "mute", json!(enabled)) + self.set_property("mute", enabled).await } /// # Description @@ -687,63 +693,67 @@ impl Mpv { /// # Example /// ``` /// use mpvipc::{Mpv, Error}; - /// fn main() -> Result<(), Error> { + /// fn async main() -> Result<(), Error> { /// let mpv = Mpv::connect("/tmp/mpvsocket")?; - /// mpv.set_property("pause", true)?; + /// mpv.set_property("pause", true).await?; /// Ok(()) /// } /// ``` - pub fn set_property>( + pub async fn set_property>( &self, property: &str, value: T, ) -> Result<(), Error> { - T::set_property_generic(self, property, value) + T::set_property_generic(self, property, value).await } - pub fn set_speed(&self, input_speed: f64, option: NumberChangeOptions) -> Result<(), Error> { - match get_mpv_property::(self, "speed") { + pub async fn set_speed( + &self, + input_speed: f64, + option: NumberChangeOptions, + ) -> Result<(), Error> { + match self.get_property::("speed").await { Ok(speed) => match option { NumberChangeOptions::Increase => { - set_mpv_property(self, "speed", json!(speed + input_speed)) + self.set_property("speed", speed + input_speed).await } NumberChangeOptions::Decrease => { - set_mpv_property(self, "speed", json!(speed - input_speed)) + self.set_property("speed", speed - input_speed).await } - NumberChangeOptions::Absolute => { - set_mpv_property(self, "speed", json!(input_speed)) - } + NumberChangeOptions::Absolute => self.set_property("speed", input_speed).await, }, Err(msg) => Err(msg), } } - pub fn set_volume(&self, input_volume: f64, option: NumberChangeOptions) -> Result<(), Error> { - match get_mpv_property::(self, "volume") { + pub async fn set_volume( + &self, + input_volume: f64, + option: NumberChangeOptions, + ) -> Result<(), Error> { + match self.get_property::("volume").await { Ok(volume) => match option { NumberChangeOptions::Increase => { - set_mpv_property(self, "volume", json!(volume + input_volume)) + self.set_property("volume", volume + input_volume).await } NumberChangeOptions::Decrease => { - set_mpv_property(self, "volume", json!(volume - input_volume)) + self.set_property("volume", volume - input_volume).await } - NumberChangeOptions::Absolute => { - set_mpv_property(self, "volume", json!(input_volume)) - } + NumberChangeOptions::Absolute => self.set_property("volume", input_volume).await, }, Err(msg) => Err(msg), } } - pub fn stop(&self) -> Result<(), Error> { - self.run_command(MpvCommand::Stop) + pub async fn stop(&self) -> Result<(), Error> { + self.run_command(MpvCommand::Stop).await } - pub fn toggle(&self) -> Result<(), Error> { - run_mpv_command(self, "cycle", &["pause"]) + pub async fn toggle(&self) -> Result<(), Error> { + self.run_command_raw("cycle", &["pause"]).await } } diff --git a/src/ipc.rs b/src/ipc.rs index a7ce542..a4b7c3f 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -1,248 +1,107 @@ use crate::message_parser::TypeHandler; use self::message_parser::extract_mpv_response_data; -use self::message_parser::json_array_to_playlist; -use self::message_parser::json_array_to_vec; -use self::message_parser::json_map_to_hashmap; use super::*; -use log::{debug, warn}; -use serde_json::json; -use serde_json::Value; -use std::io::BufRead; -use std::io::BufReader; -use std::io::Write; +use serde_json::{json, Value}; +use std::{ + io::{BufRead, BufReader, Write}, + os::unix::net::UnixStream, + path::PathBuf, + sync::mpsc, +}; -pub fn get_mpv_property(instance: &Mpv, property: &str) -> Result { - let ipc_string = json!({"command": ["get_property", property]}); - match serde_json::from_str::(&send_command_sync(instance, ipc_string)) { - Ok(val) => T::get_value(val), - Err(why) => Err(Error(ErrorCode::JsonParseError(why.to_string()))), - } +pub(crate) struct MpvIpc { + pub(crate) socket_path: PathBuf, + pub(crate) socket: UnixStream, + + command_queue_sender: mpsc::Sender<(mpsc::Sender>, Value)>, + command_queue_receiver: mpsc::Receiver<(mpsc::Sender>, Value)>, } -pub fn get_mpv_property_string(instance: &Mpv, property: &str) -> Result { - let ipc_string = json!({"command": ["get_property", property]}); - let val = serde_json::from_str::(&send_command_sync(instance, ipc_string)) - .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?; - - let data = extract_mpv_response_data(&val)?; - - match data { - Value::Bool(b) => Ok(b.to_string()), - Value::Number(ref n) => Ok(n.to_string()), - Value::String(ref s) => Ok(s.to_string()), - Value::Array(ref array) => Ok(format!("{:?}", array)), - Value::Object(ref map) => Ok(format!("{:?}", map)), - Value::Null => Err(Error(ErrorCode::MissingValue)), - } -} - -fn validate_mpv_response(response: &str) -> Result<(), Error> { - serde_json::from_str::(response) - .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string()))) - .and_then(|value| extract_mpv_response_data(&value).map(|_| ())) -} - -pub fn set_mpv_property(instance: &Mpv, property: &str, value: Value) -> Result<(), Error> { - let ipc_string = json!({ - "command": ["set_property", property, value] - }); - - let response = &send_command_sync(instance, ipc_string); - validate_mpv_response(response) -} - -pub fn run_mpv_command(instance: &Mpv, command: &str, args: &[&str]) -> Result<(), Error> { - let mut ipc_string = json!({ - "command": [command] - }); - if let Value::Array(args_array) = &mut ipc_string["command"] { - for arg in args { - args_array.push(json!(arg)); +impl MpvIpc { + pub(crate) fn new(socket_path: PathBuf, socket: UnixStream) -> Self { + let (command_queue_sender, command_queue_receiver) = mpsc::channel(); + MpvIpc { + socket_path, + socket, + command_queue_sender, + command_queue_receiver, } } - let response = &send_command_sync(instance, ipc_string); - validate_mpv_response(response) -} - -pub fn observe_mpv_property(instance: &Mpv, id: &isize, property: &str) -> Result<(), Error> { - let ipc_string = json!({ - "command": ["observe_property", id, property] - }); - - let response = &send_command_sync(instance, ipc_string); - validate_mpv_response(response) -} - -pub fn unobserve_mpv_property(instance: &Mpv, id: &isize) -> Result<(), Error> { - let ipc_string = json!({ - "command": ["unobserve_property", id] - }); - - let response = &send_command_sync(instance, ipc_string); - validate_mpv_response(response) -} - -fn try_convert_property(name: &str, id: usize, data: MpvDataType) -> Event { - let property = match name { - "path" => match data { - MpvDataType::String(value) => Property::Path(Some(value)), - MpvDataType::Null => Property::Path(None), - _ => unimplemented!(), - }, - "pause" => match data { - MpvDataType::Bool(value) => Property::Pause(value), - _ => unimplemented!(), - }, - "playback-time" => match data { - MpvDataType::Double(value) => Property::PlaybackTime(Some(value)), - MpvDataType::Null => Property::PlaybackTime(None), - _ => unimplemented!(), - }, - "duration" => match data { - MpvDataType::Double(value) => Property::Duration(Some(value)), - MpvDataType::Null => Property::Duration(None), - _ => unimplemented!(), - }, - "metadata" => match data { - MpvDataType::HashMap(value) => Property::Metadata(Some(value)), - MpvDataType::Null => Property::Metadata(None), - _ => unimplemented!(), - }, - _ => { - warn!("Property {} not implemented", name); - Property::Unknown { - name: name.to_string(), - data, - } - } - }; - Event::PropertyChange { id, property } -} - -pub fn listen(instance: &mut Mpv) -> Result { - let mut e; - // sometimes we get responses unrelated to events, so we read a new line until we receive one - // with an event field - let name = loop { + // TODO: handle events + pub(crate) async fn run(&self) { + let mut stream = &self.socket; + let mut reader = BufReader::new(stream); let mut response = String::new(); - instance.reader.read_line(&mut response).unwrap(); - response = response.trim_end().to_string(); - debug!("Event: {}", response); + loop { + let (tx, ipc_command) = self.command_queue_receiver.recv().unwrap(); + let command = serde_json::to_string(&ipc_command).unwrap(); - e = serde_json::from_str::(&response) - .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?; - - match e["event"] { - Value::String(ref name) => break name, - _ => { - // It was not an event - try again - debug!("Bad response: {:?}", response) - } - } - }; - - let event = match name.as_str() { - "shutdown" => Event::Shutdown, - "start-file" => Event::StartFile, - "file-loaded" => Event::FileLoaded, - "seek" => Event::Seek, - "playback-restart" => Event::PlaybackRestart, - "idle" => Event::Idle, - "tick" => Event::Tick, - "video-reconfig" => Event::VideoReconfig, - "audio-reconfig" => Event::AudioReconfig, - "tracks-changed" => Event::TracksChanged, - "track-switched" => Event::TrackSwitched, - "pause" => Event::Pause, - "unpause" => Event::Unpause, - "metadata-update" => Event::MetadataUpdate, - "chapter-change" => Event::ChapterChange, - "end-file" => Event::EndFile, - "property-change" => { - let name = match e["name"] { - Value::String(ref n) => Ok(n.to_string()), - _ => Err(Error(ErrorCode::JsonContainsUnexptectedType)), - }?; - - let id: usize = match e["id"] { - Value::Number(ref n) => n.as_u64().unwrap() as usize, - _ => 0, - }; - - let data: MpvDataType = match e["data"] { - Value::String(ref n) => MpvDataType::String(n.to_string()), - - Value::Array(ref a) => { - if name == "playlist".to_string() { - MpvDataType::Playlist(Playlist(json_array_to_playlist(a))) - } else { - MpvDataType::Array(json_array_to_vec(a)) - } - } - - Value::Bool(b) => MpvDataType::Bool(b), - - Value::Number(ref n) => { - if n.is_u64() { - MpvDataType::Usize(n.as_u64().unwrap() as usize) - } else if n.is_f64() { - MpvDataType::Double(n.as_f64().unwrap()) - } else { - return Err(Error(ErrorCode::JsonContainsUnexptectedType)); - } - } - - Value::Object(ref m) => MpvDataType::HashMap(json_map_to_hashmap(m)), - - Value::Null => MpvDataType::Null, - }; - - try_convert_property(name.as_ref(), id, data) - } - "client-message" => { - let args = match e["args"] { - Value::Array(ref a) => json_array_to_vec(a) - .iter() - .map(|arg| match arg { - MpvDataType::String(s) => Ok(s.to_owned()), - _ => Err(Error(ErrorCode::JsonContainsUnexptectedType)), - }) - .collect::, _>>(), - _ => return Err(Error(ErrorCode::JsonContainsUnexptectedType)), - }?; - Event::ClientMessage { args } - } - _ => Event::Unimplemented, - }; - Ok(event) -} - -pub fn listen_raw(instance: &mut Mpv) -> String { - let mut response = String::new(); - instance.reader.read_line(&mut response).unwrap(); - response.trim_end().to_string() -} - -fn send_command_sync(instance: &Mpv, command: Value) -> String { - let stream = &instance.stream; - match serde_json::to_writer(stream, &command) { - Err(why) => panic!("Error: Could not write to socket: {}", why), - Ok(_) => { - let mut stream = stream; + stream.write_all(command.as_bytes()).unwrap(); stream.write_all(b"\n").unwrap(); - let mut response = String::new(); - { - let mut reader = BufReader::new(stream); - while !response.contains("\"error\":") { - response.clear(); - reader.read_line(&mut response).unwrap(); - } - } - debug!("Response: {}", response.trim_end()); - response + response.clear(); + reader.read_line(&mut response).unwrap(); + + let response = serde_json::from_str::(&response) + .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string()))) + .and_then(|value| extract_mpv_response_data(&value).map(|data| data.to_owned())); + + tx.send(response).unwrap(); } } -} + + pub(crate) async fn send_command(&self, command: &[&str]) -> Result { + let ipc_command = json!({ "command": command }); + + log::trace!("Sending command: {:?}", ipc_command); + let (tx, rx) = mpsc::channel(); + self.command_queue_sender.send((tx, ipc_command)).unwrap(); + + let result = rx.recv().unwrap(); + log::trace!("Received response: {:?}", result); + + result + } + + pub(crate) async fn get_mpv_property( + &self, + property: &str, + ) -> Result { + self.send_command(&["get_property", property]) + .await + .and_then(|val| T::get_value(val)) + } + + pub(crate) async fn get_mpv_property_string(&self, property: &str) -> Result { + let val = self.send_command(&["get_property", property]).await?; + match extract_mpv_response_data(&val)? { + Value::Bool(b) => Ok(b.to_string()), + Value::Number(ref n) => Ok(n.to_string()), + Value::String(ref s) => Ok(s.to_string()), + Value::Array(ref array) => Ok(format!("{:?}", array)), + Value::Object(ref map) => Ok(format!("{:?}", map)), + Value::Null => Err(Error(ErrorCode::MissingValue)), + } + } + + pub(crate) async fn set_mpv_property(&self, property: &str, value: Value) -> Result<(), Error> { + let str_value = serde_json::to_string(&value).unwrap(); + self.send_command(&["set_property", property, &str_value]) + .await + .map(|_| ()) + } + + pub(crate) async fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> { + self.send_command(&["observe_property", &id.to_string(), property]) + .await + .map(|_| ()) + } + + pub(crate) async fn unobserve_property(&self, id: isize) -> Result<(), Error> { + self.send_command(&["unobserve_property", &id.to_string()]) + .await + .map(|_| ()) + } +} \ No newline at end of file