diff --git a/Cargo.toml b/Cargo.toml index f2425bd..10f5267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ serde = { version = "1.0.197", features = ["derive"] } tokio = { version = "1.37.0", features = ["sync", "macros", "rt", "net"] } tokio-util = { version = "0.7.10", features = ["codec"] } futures = "0.3.30" +tokio-stream = { version = "0.1.15", features = ["sync"] } [dev-dependencies] env_logger = "0.10.0" diff --git a/src/api.rs b/src/api.rs index 7f00ad2..6382575 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,12 +1,16 @@ +use futures::StreamExt; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::{ collections::HashMap, fmt::{self, Display}, }; -use tokio::{net::UnixStream, sync::oneshot}; +use tokio::{ + net::UnixStream, + sync::{broadcast, mpsc, oneshot}, +}; -use crate::ipc::{MpvIpc, MpvIpcCommand, MpvIpcResponse}; +use crate::ipc::{MpvIpc, MpvIpcCommand, MpvIpcEvent, MpvIpcResponse}; use crate::message_parser::TypeHandler; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -288,7 +292,8 @@ impl Display for ErrorCode { #[derive(Clone)] pub struct Mpv { - command_sender: tokio::sync::mpsc::Sender<(MpvIpcCommand, oneshot::Sender)>, + command_sender: mpsc::Sender<(MpvIpcCommand, oneshot::Sender)>, + broadcast_channel: broadcast::Sender, } impl fmt::Debug for Mpv { @@ -310,14 +315,16 @@ impl Mpv { } pub async fn connect_socket(socket: UnixStream) -> Result { - let (com_tx, com_rx) = tokio::sync::mpsc::channel(100); - let ipc = MpvIpc::new(socket, com_rx); + let (com_tx, com_rx) = mpsc::channel(100); + let (ev_tx, _) = broadcast::channel(100); + let ipc = MpvIpc::new(socket, com_rx, ev_tx.clone()); log::debug!("Starting IPC handler"); tokio::spawn(ipc.run()); Ok(Mpv { command_sender: com_tx, + broadcast_channel: ev_tx, }) } @@ -337,9 +344,118 @@ impl Mpv { } } - // pub fn get_stream_ref(&self) -> &UnixStream { - // &self.stream - // } + pub async fn get_event_stream(&self) -> impl futures::Stream> { + tokio_stream::wrappers::BroadcastStream::new(self.broadcast_channel.subscribe()) + .map(|event| { + match event { + Ok(event) => Mpv::map_event(event), + Err(_) => Err(Error(ErrorCode::ConnectError("Failed to receive event".to_string()))), + } + }) + } + + fn map_event(raw_event: MpvIpcEvent) -> Result { + let MpvIpcEvent(event) = raw_event; + + event + .as_object() + .ok_or(Error(ErrorCode::JsonContainsUnexptectedType)) + .and_then(|event| { + let event_name = event + .get("event") + .ok_or(Error(ErrorCode::MissingValue))? + .as_str() + .ok_or(Error(ErrorCode::ValueDoesNotContainString))?; + + match event_name { + "shutdown" => Ok(Event::Shutdown), + "start-file" => Ok(Event::StartFile), + "end-file" => Ok(Event::EndFile), + "file-loaded" => Ok(Event::FileLoaded), + "tracks-changed" => Ok(Event::TracksChanged), + "track-switched" => Ok(Event::TrackSwitched), + "idle" => Ok(Event::Idle), + "pause" => Ok(Event::Pause), + "unpause" => Ok(Event::Unpause), + "tick" => Ok(Event::Tick), + "video-reconfig" => Ok(Event::VideoReconfig), + "audio-reconfig" => Ok(Event::AudioReconfig), + "metadata-update" => Ok(Event::MetadataUpdate), + "seek" => Ok(Event::Seek), + "playback-restart" => Ok(Event::PlaybackRestart), + "property-change" => { + let id = event + .get("id") + .ok_or(Error(ErrorCode::MissingValue))? + .as_u64() + .ok_or(Error(ErrorCode::ValueDoesNotContainUsize))? + as usize; + let property_name = event + .get("name") + .ok_or(Error(ErrorCode::MissingValue))? + .as_str() + .ok_or(Error(ErrorCode::ValueDoesNotContainString))?; + + match property_name { + "path" => { + let path = event + .get("data") + .ok_or(Error(ErrorCode::MissingValue))? + .as_str() + .map(|s| s.to_string()); + Ok(Event::PropertyChange { + id, + property: Property::Path(path), + }) + } + "pause" => { + let pause = event + .get("data") + .ok_or(Error(ErrorCode::MissingValue))? + .as_bool() + .ok_or(Error(ErrorCode::ValueDoesNotContainBool))?; + Ok(Event::PropertyChange { + id, + property: Property::Pause(pause), + }) + } + // TODO: missing cases + _ => { + let data = event + .get("data") + .ok_or(Error(ErrorCode::MissingValue))? + .clone(); + Ok(Event::PropertyChange { + id, + property: Property::Unknown { + name: property_name.to_string(), + // TODO: fix + data: MpvDataType::Double(data.as_f64().unwrap_or(0.0)), + }, + }) + } + } + } + "chapter-change" => Ok(Event::ChapterChange), + "client-message" => { + let args = event + .get("args") + .ok_or(Error(ErrorCode::MissingValue))? + .as_array() + .ok_or(Error(ErrorCode::ValueDoesNotContainString))? + .iter() + .map(|arg| { + arg.as_str() + .ok_or(Error(ErrorCode::ValueDoesNotContainString)) + .map(|s| s.to_string()) + }) + .collect::, Error>>()?; + Ok(Event::ClientMessage { args }) + } + _ => Ok(Event::Unimplemented), + } + }) + } pub async fn get_metadata(&self) -> Result, Error> { self.get_property("metadata").await @@ -414,9 +530,9 @@ impl Mpv { )) })?; match res_rx.await { - Ok(MpvIpcResponse(response)) => response.and_then(|value| { - value.ok_or(Error(ErrorCode::MissingValue)) - }), + Ok(MpvIpcResponse(response)) => { + response.and_then(|value| value.ok_or(Error(ErrorCode::MissingValue))) + } Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))), } } @@ -607,7 +723,11 @@ impl Mpv { /// Run a custom command. /// This should only be used if the desired command is not implemented /// with [MpvCommand]. - pub async fn run_command_raw(&self, command: &str, args: &[&str]) -> Result, Error> { + pub async fn run_command_raw( + &self, + command: &str, + args: &[&str], + ) -> Result, Error> { let command = Vec::from( [command] .iter() diff --git a/src/ipc.rs b/src/ipc.rs index ef545b6..d91a7b4 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -3,14 +3,15 @@ use futures::{SinkExt, StreamExt}; use serde_json::{json, Value}; use std::mem; use tokio::net::UnixStream; -use tokio::sync::mpsc::Receiver; -use tokio::sync::{oneshot, Mutex}; -use tokio_util::codec::{Framed, LinesCodec}; +use tokio::sync::mpsc; +use tokio::sync::{broadcast, oneshot, Mutex}; +use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; pub(crate) struct MpvIpc { socket: Framed, - command_channel: Receiver<(MpvIpcCommand, oneshot::Sender)>, + command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender)>, socket_lock: Mutex<()>, + event_channel: broadcast::Sender, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -26,19 +27,24 @@ pub(crate) enum MpvIpcCommand { #[derive(Debug, Clone)] pub(crate) struct MpvIpcResponse(pub(crate) Result, Error>); +#[derive(Debug, Clone)] +pub(crate) struct MpvIpcEvent(pub(crate) Value); + impl MpvIpc { pub(crate) fn new( socket: UnixStream, - command_channel: Receiver<(MpvIpcCommand, oneshot::Sender)>, + command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender)>, + event_channel: broadcast::Sender, ) -> Self { MpvIpc { socket: Framed::new(socket, LinesCodec::new()), command_channel, socket_lock: Mutex::new(()), + event_channel, } } - pub(crate) async fn send_command(&mut self, command: &[&str]) -> Result, Error> { + pub(crate) async fn send_command(&mut self, command: &[Value]) -> Result, Error> { let lock = self.socket_lock.lock().await; // START CRITICAL SECTION let ipc_command = json!({ "command": command }); @@ -67,11 +73,14 @@ impl MpvIpc { serde_json::from_str::(&response) .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string()))) .and_then(parse_mpv_response_data) - } - pub(crate) async fn get_mpv_property(&mut self, property: &str) -> Result, Error> { - self.send_command(&["get_property", property]).await + pub(crate) async fn get_mpv_property( + &mut self, + property: &str, + ) -> Result, Error> { + self.send_command(&[json!("get_property"), json!(property)]) + .await } pub(crate) async fn set_mpv_property( @@ -79,11 +88,11 @@ impl MpvIpc { property: &str, value: Value, ) -> Result, Error> { - let str_value = match &value { - Value::String(s) => s, - v => &serde_json::to_string(&v).unwrap() - }; - self.send_command(&["set_property", property, &str_value]) + // let str_value = match &value { + // Value::String(s) => s, + // v => &serde_json::to_string(&v).unwrap(), + // }; + self.send_command(&[json!("set_property"), json!(property), value]) .await } @@ -92,27 +101,52 @@ impl MpvIpc { id: isize, property: &str, ) -> Result, Error> { - self.send_command(&["observe_property", &id.to_string(), property]) + self.send_command(&[json!("observe_property"), json!(id), json!(property)]) .await } pub(crate) async fn unobserve_property(&mut self, id: isize) -> Result, Error> { - self.send_command(&["unobserve_property", &id.to_string()]) + self.send_command(&[json!("unobserve_property"), json!(id)]) .await } + async fn handle_event(&mut self, event: Result) { + let parsed_event = event + .as_ref() + .map_err(|why| Error(ErrorCode::ConnectError(why.to_string()))) + .and_then(|event| { + serde_json::from_str::(&event) + .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string()))) + }); + + match parsed_event { + Ok(event) => { + log::trace!("Parsed event: {:?}", event); + if let Err(broadcast::error::SendError(_)) = + self.event_channel.send(MpvIpcEvent(event.to_owned())) + { + log::trace!("Failed to send event to channel, ignoring"); + } + } + Err(e) => { + log::trace!("Error parsing event, ignoring:\n {:?}\n {:?}", event, e); + } + } + } + pub(crate) async fn run(mut self) -> Result<(), Error> { loop { tokio::select! { Some(event) = self.socket.next() => { - log::trace!("Handling event: {:?}", serde_json::from_str::(&event.unwrap()).unwrap()); - // TODO: handle event + log::trace!("Got event: {:?}", event); + // TODO: error handling + self.handle_event(event).await; } Some((cmd, tx)) = self.command_channel.recv() => { log::trace!("Handling command: {:?}", cmd); match cmd { MpvIpcCommand::Command(command) => { - let refs = command.iter().map(|s| s.as_str()).collect::>(); + let refs = command.iter().map(|s| json!(s)).collect::>(); let response = self.send_command(refs.as_slice()).await; tx.send(MpvIpcResponse(response)).unwrap() } @@ -159,4 +193,4 @@ fn parse_mpv_response_data(value: Value) -> Result, Error> { Err(e) => log::trace!("Error parsing mpv response data: {:?}", e), } result.map(|opt| opt.map(|val| val.clone())) -} \ No newline at end of file +} diff --git a/tests/events.rs b/tests/events.rs new file mode 100644 index 0000000..de0733f --- /dev/null +++ b/tests/events.rs @@ -0,0 +1,79 @@ +use std::panic; + +use futures::{stream::StreamExt, SinkExt}; +use mpvipc::{Mpv, MpvDataType, Property}; +use serde_json::json; +use test_log::test; +use tokio::{net::UnixStream, task::JoinHandle}; +use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; + +use mpvipc::Event; + +fn test_socket( + answers: Vec<(bool, String)>, +) -> (UnixStream, JoinHandle>) { + let (socket, server) = UnixStream::pair().unwrap(); + let join_handle = tokio::spawn(async move { + let mut framed = Framed::new(socket, LinesCodec::new()); + for (unsolicited, answer) in answers { + if !unsolicited { + framed.next().await; + } + framed.send(answer).await?; + } + Ok(()) + }); + + (server, join_handle) +} + +#[test(tokio::test)] +async fn test_observe_event_successful() { + let (server, join_handle) = test_socket(vec![ + ( + false, + json!({ "request_id": 0, "error": "success" }).to_string(), + ), + ( + false, + json!({ "request_id": 0, "error": "success" }).to_string(), + ), + ( + true, + json!({ "data": 64.0, "event": "property-change", "id": 1, "name": "volume" }) + .to_string(), + ), + ]); + + let mpv = Mpv::connect_socket(server).await.unwrap(); + + mpv.observe_property(1, "volume").await.unwrap(); + + let mpv2 = mpv.clone(); + tokio::spawn(async move { + let event = mpv2.get_event_stream().await.next().await.unwrap().unwrap(); + + let property = match event { + Event::PropertyChange { id, property } => { + assert_eq!(id, 1); + property + } + err => panic!("{:?}", err), + }; + let data = match property { + Property::Unknown { name, data } => { + assert_eq!(name, "volume"); + data + } + err => panic!("{:?}", err), + }; + match data { + MpvDataType::Double(data) => assert_eq!(data, 64.0), + err => panic!("{:?}", err), + } + }); + + mpv.set_property("volume", 64.0).await.unwrap(); + + join_handle.await.unwrap().unwrap(); +}