//! IPC handling thread/task. Handles communication between [`Mpv`](crate::Mpv) instances and mpv's unix socket use futures::{SinkExt, StreamExt}; use serde_json::{json, Value}; use tokio::{ net::UnixStream, sync::{broadcast, mpsc, oneshot}, }; use tokio_util::codec::{Framed, LinesCodec}; use crate::MpvError; /// Container for all state that regards communication with the mpv IPC socket /// and message passing with [`Mpv`](crate::Mpv) controllers. pub(crate) struct MpvIpc { socket: Framed, command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender)>, event_channel: broadcast::Sender, } /// Commands that can be sent to [`MpvIpc`] #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum MpvIpcCommand { Command(Vec), GetProperty(String), SetProperty(String, Value), ObserveProperty(usize, String), UnobserveProperty(usize), Exit, } /// [`MpvIpc`]'s response to a [`MpvIpcCommand`]. #[derive(Debug)] pub(crate) struct MpvIpcResponse(pub(crate) Result, MpvError>); /// A deserialized and partially parsed event from mpv. #[derive(Debug, Clone)] pub(crate) struct MpvIpcEvent(pub(crate) Value); impl MpvIpc { pub(crate) fn new( socket: UnixStream, command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender)>, event_channel: broadcast::Sender, ) -> Self { MpvIpc { socket: Framed::new(socket, LinesCodec::new()), command_channel, event_channel, } } pub(crate) async fn send_command( &mut self, command: &[Value], ) -> Result, MpvError> { let ipc_command = json!({ "command": command }); let ipc_command_str = serde_json::to_string(&ipc_command).map_err(MpvError::JsonParseError)?; log::trace!("Sending command: {}", ipc_command_str); self.socket .send(ipc_command_str) .await .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))?; let response = loop { let response = self .socket .next() .await .ok_or(MpvError::MpvSocketConnectionError( "Could not receive response from mpv".to_owned(), ))? .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))?; let parsed_response = serde_json::from_str::(&response).map_err(MpvError::JsonParseError); if parsed_response .as_ref() .ok() .and_then(|v| v.as_object().map(|o| o.contains_key("event"))) .unwrap_or(false) { self.handle_event(parsed_response).await; } else { break parsed_response; } }; log::trace!("Received response: {:?}", response); parse_mpv_response_data(response?) } pub(crate) async fn get_mpv_property( &mut self, property: &str, ) -> Result, MpvError> { self.send_command(&[json!("get_property"), json!(property)]) .await } pub(crate) async fn set_mpv_property( &mut self, property: &str, value: Value, ) -> Result, MpvError> { self.send_command(&[json!("set_property"), json!(property), value]) .await } pub(crate) async fn observe_property( &mut self, id: usize, property: &str, ) -> Result, MpvError> { self.send_command(&[json!("observe_property"), json!(id), json!(property)]) .await } pub(crate) async fn unobserve_property( &mut self, id: usize, ) -> Result, MpvError> { self.send_command(&[json!("unobserve_property"), json!(id)]) .await } async fn handle_event(&mut self, event: Result) { match &event { Ok(event) => { log::trace!("Parsed event: {:?}", event); if let Err(broadcast::error::SendError(_)) = self.event_channel.send(MpvIpcEvent(event.to_owned())) { log::trace!("Failed to send event to channel, ignoring"); } } Err(e) => { log::trace!("Error parsing event, ignoring:\n {:?}\n {:?}", &event, e); } } } pub(crate) async fn run(mut self) -> Result<(), MpvError> { loop { tokio::select! { Some(event) = self.socket.next() => { log::trace!("Got event: {:?}", event); let parsed_event = event .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string())) .and_then(|event| serde_json::from_str::(&event) .map_err(MpvError::JsonParseError)); self.handle_event(parsed_event).await; } Some((cmd, tx)) = self.command_channel.recv() => { log::trace!("Handling command: {:?}", cmd); match cmd { MpvIpcCommand::Command(command) => { let refs = command.iter().map(|s| json!(s)).collect::>(); let response = self.send_command(refs.as_slice()).await; tx.send(MpvIpcResponse(response)).unwrap() } MpvIpcCommand::GetProperty(property) => { let response = self.get_mpv_property(&property).await; tx.send(MpvIpcResponse(response)).unwrap() } MpvIpcCommand::SetProperty(property, value) => { let response = self.set_mpv_property(&property, value).await; tx.send(MpvIpcResponse(response)).unwrap() } MpvIpcCommand::ObserveProperty(id, property) => { let response = self.observe_property(id, &property).await; tx.send(MpvIpcResponse(response)).unwrap() } MpvIpcCommand::UnobserveProperty(id) => { let response = self.unobserve_property(id).await; tx.send(MpvIpcResponse(response)).unwrap() } MpvIpcCommand::Exit => { tx.send(MpvIpcResponse(Ok(None))).unwrap(); return Ok(()); } } } } } } } /// This function does the most basic JSON parsing and error handling /// for status codes and errors that all responses from mpv are /// expected to contain. fn parse_mpv_response_data(value: Value) -> Result, MpvError> { log::trace!("Parsing mpv response data: {:?}", value); let result = value .as_object() .ok_or(MpvError::ValueContainsUnexpectedType { expected_type: "object".to_string(), received: value.clone(), }) .and_then(|o| { let error = o .get("error") .ok_or(MpvError::MissingKeyInObject { key: "error".to_string(), map: o.clone(), })? .as_str() .ok_or(MpvError::ValueContainsUnexpectedType { expected_type: "string".to_string(), received: o.get("error").unwrap().clone(), })?; let data = o.get("data"); Ok((error, data)) }) .and_then(|(error, data)| match error { "success" => Ok(data), err => Err(MpvError::MpvError(err.to_string())), }); match &result { Ok(v) => log::trace!("Successfully parsed mpv response data: {:?}", v), Err(e) => log::trace!("Error parsing mpv response data: {:?}", e), } result.map(|opt| opt.cloned()) }