WIP: async mpv socket de/muxing broker thingamabob

This commit is contained in:
Oystein Kristoffer Tveit 2024-04-17 00:07:11 +02:00
parent f5c9674b78
commit 0a7a256f8d
Signed by: oysteikt
GPG Key ID: 9F2F7D8250F35146
2 changed files with 347 additions and 478 deletions

View File

@ -2,13 +2,12 @@ use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::{self, Display}; use std::fmt::{self, Display};
use std::io::{BufReader, Read};
use std::os::unix::net::UnixStream; use std::os::unix::net::UnixStream;
use std::path::Path;
use std::sync::{Arc, Mutex};
use crate::ipc::{ use crate::ipc::MpvIpc;
get_mpv_property, get_mpv_property_string, listen, listen_raw, observe_mpv_property, use crate::message_parser::TypeHandler;
run_mpv_command, set_mpv_property, unobserve_mpv_property,
};
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Event { pub enum Event {
@ -152,18 +151,17 @@ pub struct PlaylistEntry {
pub current: bool, pub current: bool,
} }
pub struct Mpv {
pub(crate) stream: UnixStream,
pub(crate) reader: BufReader<UnixStream>,
pub(crate) name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Playlist(pub Vec<PlaylistEntry>); pub struct Playlist(pub Vec<PlaylistEntry>);
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Error(pub ErrorCode); pub struct Error(pub ErrorCode);
#[derive(Clone)]
pub struct Mpv {
pub(crate) ipc: Arc<Mutex<MpvIpc>>,
}
impl Drop for Mpv { impl Drop for Mpv {
fn drop(&mut self) { fn drop(&mut self) {
self.disconnect(); self.disconnect();
@ -172,31 +170,34 @@ impl Drop for Mpv {
impl fmt::Debug for Mpv { impl fmt::Debug for Mpv {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("Mpv").field(&self.name).finish() // TODO:
fmt.debug_struct("Mpv")
.field("name", &"TODO: name")
.finish()
} }
} }
impl Clone for Mpv { // impl Clone for Mpv {
fn clone(&self) -> Self { // fn clone(&self) -> Self {
let stream = self.stream.try_clone().expect("cloning UnixStream"); // let stream = self.stream.try_clone().expect("cloning UnixStream");
let cloned_stream = stream.try_clone().expect("cloning UnixStream"); // let cloned_stream = stream.try_clone().expect("cloning UnixStream");
Mpv { // Mpv {
stream, // stream,
reader: BufReader::new(cloned_stream), // reader: BufReader::new(cloned_stream),
name: self.name.clone(), // name: self.name.clone(),
} // }
} // }
fn clone_from(&mut self, source: &Self) { // fn clone_from(&mut self, source: &Self) {
let stream = source.stream.try_clone().expect("cloning UnixStream"); // let stream = source.stream.try_clone().expect("cloning UnixStream");
let cloned_stream = stream.try_clone().expect("cloning UnixStream"); // let cloned_stream = stream.try_clone().expect("cloning UnixStream");
*self = Mpv { // *self = Mpv {
stream, // stream,
reader: BufReader::new(cloned_stream), // reader: BufReader::new(cloned_stream),
name: source.name.clone(), // name: source.name.clone(),
} // }
} // }
} // }
impl Display for Error { impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@ -242,115 +243,85 @@ impl Display for ErrorCode {
} }
pub trait GetPropertyTypeHandler: Sized { pub trait GetPropertyTypeHandler: Sized {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<Self, Error>; async fn get_property_generic(instance: &Mpv, property: &str) -> Result<Self, Error>;
} }
impl GetPropertyTypeHandler for bool { impl<T> GetPropertyTypeHandler for T
fn get_property_generic(instance: &Mpv, property: &str) -> Result<bool, Error> { where
get_mpv_property::<bool>(instance, property) T: TypeHandler,
} {
} async fn get_property_generic(instance: &Mpv, property: &str) -> Result<T, Error> {
instance
impl GetPropertyTypeHandler for String { .ipc
fn get_property_generic(instance: &Mpv, property: &str) -> Result<String, Error> { .lock()
get_mpv_property::<String>(instance, property) .unwrap()
} .get_mpv_property::<T>(property)
} .await
impl GetPropertyTypeHandler for f64 {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<f64, Error> {
get_mpv_property::<f64>(instance, property)
}
}
impl GetPropertyTypeHandler for usize {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<usize, Error> {
get_mpv_property::<usize>(instance, property)
}
}
impl GetPropertyTypeHandler for Vec<PlaylistEntry> {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<Vec<PlaylistEntry>, Error> {
get_mpv_property::<Vec<PlaylistEntry>>(instance, property)
}
}
impl GetPropertyTypeHandler for HashMap<String, MpvDataType> {
fn get_property_generic(
instance: &Mpv,
property: &str,
) -> Result<HashMap<String, MpvDataType>, Error> {
get_mpv_property::<HashMap<String, MpvDataType>>(instance, property)
} }
} }
pub trait SetPropertyTypeHandler<T> { pub trait SetPropertyTypeHandler<T> {
fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error>; async fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error>;
} }
impl SetPropertyTypeHandler<bool> for bool { impl<T> SetPropertyTypeHandler<T> for T
fn set_property_generic(instance: &Mpv, property: &str, value: bool) -> Result<(), Error> { where
set_mpv_property(instance, property, json!(value)) T: Serialize,
} {
} async fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error> {
instance
impl SetPropertyTypeHandler<String> for String { .ipc
fn set_property_generic(instance: &Mpv, property: &str, value: String) -> Result<(), Error> { .lock()
set_mpv_property(instance, property, json!(value)) .unwrap()
} .set_mpv_property(property, json!(value))
} .await
impl SetPropertyTypeHandler<f64> for f64 {
fn set_property_generic(instance: &Mpv, property: &str, value: f64) -> Result<(), Error> {
set_mpv_property(instance, property, json!(value))
}
}
impl SetPropertyTypeHandler<usize> for usize {
fn set_property_generic(instance: &Mpv, property: &str, value: usize) -> Result<(), Error> {
set_mpv_property(instance, property, json!(value))
} }
} }
impl Mpv { impl Mpv {
pub fn connect(socket: &str) -> Result<Mpv, Error> { pub fn connect(socket_path: &str) -> Result<Mpv, Error> {
match UnixStream::connect(socket) { match UnixStream::connect(socket_path) {
Ok(stream) => { Ok(stream) => Ok(Mpv {
let cloned_stream = stream.try_clone().expect("cloning UnixStream"); ipc: Arc::new(Mutex::new(MpvIpc::new(
return Ok(Mpv { Path::new(socket_path).to_owned(),
stream, stream,
reader: BufReader::new(cloned_stream), ))),
name: String::from(socket), }),
});
}
Err(internal_error) => Err(Error(ErrorCode::ConnectError(internal_error.to_string()))), Err(internal_error) => Err(Error(ErrorCode::ConnectError(internal_error.to_string()))),
} }
} }
pub fn disconnect(&self) { pub fn disconnect(&self) {
let mut stream = &self.stream; let mut ipc = self.ipc.lock().unwrap();
stream ipc.socket
.shutdown(std::net::Shutdown::Both) .shutdown(std::net::Shutdown::Both)
.expect("socket disconnect"); .expect("socket disconnect");
let mut buffer = [0; 32]; let mut buffer = [0; 32];
for _ in 0..stream.bytes().count() { // TODO:
stream.read(&mut buffer[..]).unwrap(); // for _ in 0..ipc.socket.bytes().count() {
} // ipc.socket.read(&mut buffer[..]).unwrap();
// }
} }
pub fn get_stream_ref(&self) -> &UnixStream { // pub fn get_stream_ref(&self) -> &UnixStream {
&self.stream // &self.stream
} // }
pub fn get_metadata(&self) -> Result<HashMap<String, MpvDataType>, Error> { pub async fn get_metadata(&self) -> Result<HashMap<String, MpvDataType>, Error> {
match get_mpv_property(self, "metadata") { match self.ipc.lock().unwrap().get_mpv_property("metadata").await {
Ok(map) => Ok(map), Ok(map) => Ok(map),
Err(err) => Err(err), Err(err) => Err(err),
} }
} }
pub fn get_playlist(&self) -> Result<Playlist, Error> { pub async fn get_playlist(&self) -> Result<Playlist, Error> {
match get_mpv_property::<Vec<PlaylistEntry>>(self, "playlist") { match self
.ipc
.lock()
.unwrap()
.get_mpv_property::<Vec<PlaylistEntry>>("playlist")
.await
{
Ok(entries) => Ok(Playlist(entries)), Ok(entries) => Ok(Playlist(entries)),
Err(msg) => Err(msg), Err(msg) => Err(msg),
} }
@ -375,15 +346,18 @@ impl Mpv {
/// # Example /// # Example
/// ``` /// ```
/// use mpvipc::{Mpv, Error}; /// use mpvipc::{Mpv, Error};
/// fn main() -> Result<(), Error> { /// async fn main() -> Result<(), Error> {
/// let mpv = Mpv::connect("/tmp/mpvsocket")?; /// let mpv = Mpv::connect("/tmp/mpvsocket")?;
/// let paused: bool = mpv.get_property("pause")?; /// let paused: bool = mpv.get_property("pause").await?;
/// let title: String = mpv.get_property("media-title")?; /// let title: String = mpv.get_property("media-title").await?;
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub fn get_property<T: GetPropertyTypeHandler>(&self, property: &str) -> Result<T, Error> { pub async fn get_property<T: GetPropertyTypeHandler>(
T::get_property_generic(self, property) &self,
property: &str,
) -> Result<T, Error> {
T::get_property_generic(self, property).await
} }
/// # Description /// # Description
@ -405,12 +379,16 @@ impl Mpv {
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub fn get_property_string(&self, property: &str) -> Result<String, Error> { pub async fn get_property_string(&self, property: &str) -> Result<String, Error> {
get_mpv_property_string(self, property) self.ipc
.lock()
.unwrap()
.get_mpv_property_string(property)
.await
} }
pub fn kill(&self) -> Result<(), Error> { pub async fn kill(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Quit) self.run_command(MpvCommand::Quit).await
} }
/// # Description /// # Description
@ -426,42 +404,44 @@ impl Mpv {
/// println!("{:?}", event); /// println!("{:?}", event);
/// } /// }
/// ``` /// ```
pub fn event_listen(&mut self) -> Result<Event, Error> { // pub fn event_listen(&mut self) -> Result<Event, Error> {
listen(self) // listen(self)
// }
// pub fn event_listen_raw(&mut self) -> String {
// listen_raw(self)
// }
pub async fn next(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistNext).await
} }
pub fn event_listen_raw(&mut self) -> String { pub async fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> {
listen_raw(self)
}
pub fn next(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistNext)
}
pub fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> {
self.run_command(MpvCommand::Observe { self.run_command(MpvCommand::Observe {
id: id, id: id,
property: property.to_string(), property: property.to_string(),
}) })
.await
} }
pub fn unobserve_property(&self, id: isize) -> Result<(), Error> { pub async fn unobserve_property(&self, id: isize) -> Result<(), Error> {
self.run_command(MpvCommand::Unobserve(id)) self.run_command(MpvCommand::Unobserve(id)).await
} }
pub fn pause(&self) -> Result<(), Error> { pub async fn pause(&self) -> Result<(), Error> {
set_mpv_property(self, "pause", json!(true)) self.set_property("pause", true).await
} }
pub fn prev(&self) -> Result<(), Error> { pub async fn prev(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistPrev) self.run_command(MpvCommand::PlaylistPrev).await
} }
pub fn restart(&self) -> Result<(), Error> { pub async fn restart(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Seek { self.run_command(MpvCommand::Seek {
seconds: 0f64, seconds: 0f64,
option: SeekOptions::Absolute, option: SeekOptions::Absolute,
}) })
.await
} }
/// # Description /// # Description
@ -490,136 +470,162 @@ impl Mpv {
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub fn run_command(&self, command: MpvCommand) -> Result<(), Error> { pub async fn run_command(&self, command: MpvCommand) -> Result<(), Error> {
match command { match command {
MpvCommand::LoadFile { file, option } => run_mpv_command( MpvCommand::LoadFile { file, option } => {
self, self.run_command_raw(
"loadfile", "loadfile",
&[ &[
file.as_ref(), file.as_ref(),
match option { match option {
PlaylistAddOptions::Append => "append", PlaylistAddOptions::Append => "append",
PlaylistAddOptions::Replace => "replace", PlaylistAddOptions::Replace => "replace",
}, },
], ],
), )
MpvCommand::LoadList { file, option } => run_mpv_command( .await
self, }
"loadlist", MpvCommand::LoadList { file, option } => {
&[ self.run_command_raw(
file.as_ref(), "loadlist",
match option { &[
PlaylistAddOptions::Append => "append", file.as_ref(),
PlaylistAddOptions::Replace => "replace", match option {
}, PlaylistAddOptions::Append => "append",
], PlaylistAddOptions::Replace => "replace",
), },
MpvCommand::Observe { id, property } => observe_mpv_property(self, &id, &property), ],
MpvCommand::PlaylistClear => run_mpv_command(self, "playlist-clear", &[]), )
.await
}
MpvCommand::Observe { id, property } => {
self.ipc
.lock()
.unwrap()
.observe_property(id, &property)
.await
}
MpvCommand::PlaylistClear => self.run_command_raw("playlist-clear", &[]).await,
MpvCommand::PlaylistMove { from, to } => { MpvCommand::PlaylistMove { from, to } => {
run_mpv_command(self, "playlist-move", &[&from.to_string(), &to.to_string()]) self.run_command_raw("playlist-move", &[&from.to_string(), &to.to_string()])
.await
} }
MpvCommand::PlaylistNext => run_mpv_command(self, "playlist-next", &[]), MpvCommand::PlaylistNext => self.run_command_raw("playlist-next", &[]).await,
MpvCommand::PlaylistPrev => run_mpv_command(self, "playlist-prev", &[]), MpvCommand::PlaylistPrev => self.run_command_raw("playlist-prev", &[]).await,
MpvCommand::PlaylistRemove(id) => { MpvCommand::PlaylistRemove(id) => {
run_mpv_command(self, "playlist-remove", &[&id.to_string()]) self.run_command_raw("playlist-remove", &[&id.to_string()])
.await
} }
MpvCommand::PlaylistShuffle => run_mpv_command(self, "playlist-shuffle", &[]), MpvCommand::PlaylistShuffle => self.run_command_raw("playlist-shuffle", &[]).await,
MpvCommand::Quit => run_mpv_command(self, "quit", &[]), MpvCommand::Quit => self.run_command_raw("quit", &[]).await,
MpvCommand::ScriptMessage(args) => { MpvCommand::ScriptMessage(args) => {
let str_args: Vec<_> = args.iter().map(String::as_str).collect(); let str_args: Vec<_> = args.iter().map(String::as_str).collect();
run_mpv_command(self, "script-message", &str_args) self.run_command_raw("script-message", &str_args).await
} }
MpvCommand::ScriptMessageTo { target, args } => { MpvCommand::ScriptMessageTo { target, args } => {
let mut cmd_args: Vec<_> = vec![target.as_str()]; let mut cmd_args: Vec<_> = vec![target.as_str()];
let mut str_args: Vec<_> = args.iter().map(String::as_str).collect(); let mut str_args: Vec<_> = args.iter().map(String::as_str).collect();
cmd_args.append(&mut str_args); cmd_args.append(&mut str_args);
run_mpv_command(self, "script-message-to", &cmd_args) self.run_command_raw("script-message-to", &cmd_args).await
} }
MpvCommand::Seek { seconds, option } => run_mpv_command( MpvCommand::Seek { seconds, option } => {
self, self.run_command_raw(
"seek", "seek",
&[ &[
&seconds.to_string(), &seconds.to_string(),
match option { match option {
SeekOptions::Absolute => "absolute", SeekOptions::Absolute => "absolute",
SeekOptions::Relative => "relative", SeekOptions::Relative => "relative",
SeekOptions::AbsolutePercent => "absolute-percent", SeekOptions::AbsolutePercent => "absolute-percent",
SeekOptions::RelativePercent => "relative-percent", SeekOptions::RelativePercent => "relative-percent",
}, },
], ],
), )
MpvCommand::Stop => run_mpv_command(self, "stop", &[]), .await
MpvCommand::Unobserve(id) => unobserve_mpv_property(self, &id), }
MpvCommand::Stop => self.run_command_raw("stop", &[]).await,
MpvCommand::Unobserve(id) => self.ipc.lock().unwrap().unobserve_property(id).await,
} }
} }
/// Run a custom command. /// Run a custom command.
/// This should only be used if the desired command is not implemented /// This should only be used if the desired command is not implemented
/// with [MpvCommand]. /// with [MpvCommand].
pub fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<(), Error> { pub async fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<(), Error> {
run_mpv_command(self, command, args) let command = [&[command], args].concat();
self.ipc.lock().unwrap().send_command(&command).await?;
Ok(())
} }
pub fn playlist_add( pub async fn playlist_add(
&self, &self,
file: &str, file: &str,
file_type: PlaylistAddTypeOptions, file_type: PlaylistAddTypeOptions,
option: PlaylistAddOptions, option: PlaylistAddOptions,
) -> Result<(), Error> { ) -> Result<(), Error> {
match file_type { match file_type {
PlaylistAddTypeOptions::File => self.run_command(MpvCommand::LoadFile { PlaylistAddTypeOptions::File => {
file: file.to_string(), self.run_command(MpvCommand::LoadFile {
option, file: file.to_string(),
}), option,
})
.await
}
PlaylistAddTypeOptions::Playlist => self.run_command(MpvCommand::LoadList { PlaylistAddTypeOptions::Playlist => {
file: file.to_string(), self.run_command(MpvCommand::LoadList {
option, file: file.to_string(),
}), option,
})
.await
}
} }
} }
pub fn playlist_clear(&self) -> Result<(), Error> { pub async fn playlist_clear(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistClear) self.run_command(MpvCommand::PlaylistClear).await
} }
pub fn playlist_move_id(&self, from: usize, to: usize) -> Result<(), Error> { pub async fn playlist_move_id(&self, from: usize, to: usize) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistMove { from, to }) self.run_command(MpvCommand::PlaylistMove { from, to })
.await
} }
pub fn playlist_play_id(&self, id: usize) -> Result<(), Error> { pub async fn playlist_play_id(&self, id: usize) -> Result<(), Error> {
set_mpv_property(self, "playlist-pos", json!(id)) self.set_property("playlist-pos", id).await
} }
pub fn playlist_play_next(&self, id: usize) -> Result<(), Error> { pub async fn playlist_play_next(&self, id: usize) -> Result<(), Error> {
match get_mpv_property::<usize>(self, "playlist-pos") { match self.get_property::<usize>("playlist-pos").await {
Ok(current_id) => self.run_command(MpvCommand::PlaylistMove { Ok(current_id) => {
from: id, self.run_command(MpvCommand::PlaylistMove {
to: current_id + 1, from: id,
}), to: current_id + 1,
})
.await
}
Err(msg) => Err(msg), Err(msg) => Err(msg),
} }
} }
pub fn playlist_remove_id(&self, id: usize) -> Result<(), Error> { pub async fn playlist_remove_id(&self, id: usize) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistRemove(id)) self.run_command(MpvCommand::PlaylistRemove(id)).await
} }
pub fn playlist_shuffle(&self) -> Result<(), Error> { pub async fn playlist_shuffle(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistShuffle) self.run_command(MpvCommand::PlaylistShuffle).await
} }
pub fn seek(&self, seconds: f64, option: SeekOptions) -> Result<(), Error> { pub async fn seek(&self, seconds: f64, option: SeekOptions) -> Result<(), Error> {
self.run_command(MpvCommand::Seek { seconds, option }) self.run_command(MpvCommand::Seek { seconds, option }).await
} }
pub fn set_loop_file(&self, option: Switch) -> Result<(), Error> { pub async fn set_loop_file(&self, option: Switch) -> Result<(), Error> {
let mut enabled = false; let mut enabled = false;
match option { match option {
Switch::On => enabled = true, Switch::On => enabled = true,
Switch::Off => {} Switch::Off => {}
Switch::Toggle => match get_mpv_property_string(self, "loop-file") { Switch::Toggle => match self.get_property_string("loop-file").await {
Ok(value) => match value.as_ref() { Ok(value) => match value.as_ref() {
"false" => { "false" => {
enabled = true; enabled = true;
@ -631,15 +637,15 @@ impl Mpv {
Err(msg) => return Err(msg), Err(msg) => return Err(msg),
}, },
} }
set_mpv_property(self, "loop-file", json!(enabled)) self.set_property("loop-file", enabled).await
} }
pub fn set_loop_playlist(&self, option: Switch) -> Result<(), Error> { pub async fn set_loop_playlist(&self, option: Switch) -> Result<(), Error> {
let mut enabled = false; let mut enabled = false;
match option { match option {
Switch::On => enabled = true, Switch::On => enabled = true,
Switch::Off => {} Switch::Off => {}
Switch::Toggle => match get_mpv_property_string(self, "loop-playlist") { Switch::Toggle => match self.get_property_string("loop-playlist").await {
Ok(value) => match value.as_ref() { Ok(value) => match value.as_ref() {
"false" => { "false" => {
enabled = true; enabled = true;
@ -651,22 +657,22 @@ impl Mpv {
Err(msg) => return Err(msg), Err(msg) => return Err(msg),
}, },
} }
set_mpv_property(self, "loop-playlist", json!(enabled)) self.set_property("loop-playlist", enabled).await
} }
pub fn set_mute(&self, option: Switch) -> Result<(), Error> { pub async fn set_mute(&self, option: Switch) -> Result<(), Error> {
let mut enabled = false; let mut enabled = false;
match option { match option {
Switch::On => enabled = true, Switch::On => enabled = true,
Switch::Off => {} Switch::Off => {}
Switch::Toggle => match get_mpv_property::<bool>(self, "mute") { Switch::Toggle => match self.get_property::<bool>("mute").await {
Ok(value) => { Ok(value) => {
enabled = !value; enabled = !value;
} }
Err(msg) => return Err(msg), Err(msg) => return Err(msg),
}, },
} }
set_mpv_property(self, "mute", json!(enabled)) self.set_property("mute", enabled).await
} }
/// # Description /// # Description
@ -687,63 +693,67 @@ impl Mpv {
/// # Example /// # Example
/// ``` /// ```
/// use mpvipc::{Mpv, Error}; /// use mpvipc::{Mpv, Error};
/// fn main() -> Result<(), Error> { /// fn async main() -> Result<(), Error> {
/// let mpv = Mpv::connect("/tmp/mpvsocket")?; /// let mpv = Mpv::connect("/tmp/mpvsocket")?;
/// mpv.set_property("pause", true)?; /// mpv.set_property("pause", true).await?;
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub fn set_property<T: SetPropertyTypeHandler<T>>( pub async fn set_property<T: SetPropertyTypeHandler<T>>(
&self, &self,
property: &str, property: &str,
value: T, value: T,
) -> Result<(), Error> { ) -> Result<(), Error> {
T::set_property_generic(self, property, value) T::set_property_generic(self, property, value).await
} }
pub fn set_speed(&self, input_speed: f64, option: NumberChangeOptions) -> Result<(), Error> { pub async fn set_speed(
match get_mpv_property::<f64>(self, "speed") { &self,
input_speed: f64,
option: NumberChangeOptions,
) -> Result<(), Error> {
match self.get_property::<f64>("speed").await {
Ok(speed) => match option { Ok(speed) => match option {
NumberChangeOptions::Increase => { NumberChangeOptions::Increase => {
set_mpv_property(self, "speed", json!(speed + input_speed)) self.set_property("speed", speed + input_speed).await
} }
NumberChangeOptions::Decrease => { NumberChangeOptions::Decrease => {
set_mpv_property(self, "speed", json!(speed - input_speed)) self.set_property("speed", speed - input_speed).await
} }
NumberChangeOptions::Absolute => { NumberChangeOptions::Absolute => self.set_property("speed", input_speed).await,
set_mpv_property(self, "speed", json!(input_speed))
}
}, },
Err(msg) => Err(msg), Err(msg) => Err(msg),
} }
} }
pub fn set_volume(&self, input_volume: f64, option: NumberChangeOptions) -> Result<(), Error> { pub async fn set_volume(
match get_mpv_property::<f64>(self, "volume") { &self,
input_volume: f64,
option: NumberChangeOptions,
) -> Result<(), Error> {
match self.get_property::<f64>("volume").await {
Ok(volume) => match option { Ok(volume) => match option {
NumberChangeOptions::Increase => { NumberChangeOptions::Increase => {
set_mpv_property(self, "volume", json!(volume + input_volume)) self.set_property("volume", volume + input_volume).await
} }
NumberChangeOptions::Decrease => { NumberChangeOptions::Decrease => {
set_mpv_property(self, "volume", json!(volume - input_volume)) self.set_property("volume", volume - input_volume).await
} }
NumberChangeOptions::Absolute => { NumberChangeOptions::Absolute => self.set_property("volume", input_volume).await,
set_mpv_property(self, "volume", json!(input_volume))
}
}, },
Err(msg) => Err(msg), Err(msg) => Err(msg),
} }
} }
pub fn stop(&self) -> Result<(), Error> { pub async fn stop(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Stop) self.run_command(MpvCommand::Stop).await
} }
pub fn toggle(&self) -> Result<(), Error> { pub async fn toggle(&self) -> Result<(), Error> {
run_mpv_command(self, "cycle", &["pause"]) self.run_command_raw("cycle", &["pause"]).await
} }
} }

View File

@ -1,248 +1,107 @@
use crate::message_parser::TypeHandler; use crate::message_parser::TypeHandler;
use self::message_parser::extract_mpv_response_data; use self::message_parser::extract_mpv_response_data;
use self::message_parser::json_array_to_playlist;
use self::message_parser::json_array_to_vec;
use self::message_parser::json_map_to_hashmap;
use super::*; use super::*;
use log::{debug, warn}; use serde_json::{json, Value};
use serde_json::json; use std::{
use serde_json::Value; io::{BufRead, BufReader, Write},
use std::io::BufRead; os::unix::net::UnixStream,
use std::io::BufReader; path::PathBuf,
use std::io::Write; sync::mpsc,
};
pub fn get_mpv_property<T: TypeHandler>(instance: &Mpv, property: &str) -> Result<T, Error> { pub(crate) struct MpvIpc {
let ipc_string = json!({"command": ["get_property", property]}); pub(crate) socket_path: PathBuf,
match serde_json::from_str::<Value>(&send_command_sync(instance, ipc_string)) { pub(crate) socket: UnixStream,
Ok(val) => T::get_value(val),
Err(why) => Err(Error(ErrorCode::JsonParseError(why.to_string()))), command_queue_sender: mpsc::Sender<(mpsc::Sender<Result<Value, Error>>, Value)>,
} command_queue_receiver: mpsc::Receiver<(mpsc::Sender<Result<Value, Error>>, Value)>,
} }
pub fn get_mpv_property_string(instance: &Mpv, property: &str) -> Result<String, Error> { impl MpvIpc {
let ipc_string = json!({"command": ["get_property", property]}); pub(crate) fn new(socket_path: PathBuf, socket: UnixStream) -> Self {
let val = serde_json::from_str::<Value>(&send_command_sync(instance, ipc_string)) let (command_queue_sender, command_queue_receiver) = mpsc::channel();
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?; MpvIpc {
socket_path,
let data = extract_mpv_response_data(&val)?; socket,
command_queue_sender,
match data { command_queue_receiver,
Value::Bool(b) => Ok(b.to_string()),
Value::Number(ref n) => Ok(n.to_string()),
Value::String(ref s) => Ok(s.to_string()),
Value::Array(ref array) => Ok(format!("{:?}", array)),
Value::Object(ref map) => Ok(format!("{:?}", map)),
Value::Null => Err(Error(ErrorCode::MissingValue)),
}
}
fn validate_mpv_response(response: &str) -> Result<(), Error> {
serde_json::from_str::<Value>(response)
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))
.and_then(|value| extract_mpv_response_data(&value).map(|_| ()))
}
pub fn set_mpv_property(instance: &Mpv, property: &str, value: Value) -> Result<(), Error> {
let ipc_string = json!({
"command": ["set_property", property, value]
});
let response = &send_command_sync(instance, ipc_string);
validate_mpv_response(response)
}
pub fn run_mpv_command(instance: &Mpv, command: &str, args: &[&str]) -> Result<(), Error> {
let mut ipc_string = json!({
"command": [command]
});
if let Value::Array(args_array) = &mut ipc_string["command"] {
for arg in args {
args_array.push(json!(arg));
} }
} }
let response = &send_command_sync(instance, ipc_string); // TODO: handle events
validate_mpv_response(response) pub(crate) async fn run(&self) {
} let mut stream = &self.socket;
let mut reader = BufReader::new(stream);
pub fn observe_mpv_property(instance: &Mpv, id: &isize, property: &str) -> Result<(), Error> {
let ipc_string = json!({
"command": ["observe_property", id, property]
});
let response = &send_command_sync(instance, ipc_string);
validate_mpv_response(response)
}
pub fn unobserve_mpv_property(instance: &Mpv, id: &isize) -> Result<(), Error> {
let ipc_string = json!({
"command": ["unobserve_property", id]
});
let response = &send_command_sync(instance, ipc_string);
validate_mpv_response(response)
}
fn try_convert_property(name: &str, id: usize, data: MpvDataType) -> Event {
let property = match name {
"path" => match data {
MpvDataType::String(value) => Property::Path(Some(value)),
MpvDataType::Null => Property::Path(None),
_ => unimplemented!(),
},
"pause" => match data {
MpvDataType::Bool(value) => Property::Pause(value),
_ => unimplemented!(),
},
"playback-time" => match data {
MpvDataType::Double(value) => Property::PlaybackTime(Some(value)),
MpvDataType::Null => Property::PlaybackTime(None),
_ => unimplemented!(),
},
"duration" => match data {
MpvDataType::Double(value) => Property::Duration(Some(value)),
MpvDataType::Null => Property::Duration(None),
_ => unimplemented!(),
},
"metadata" => match data {
MpvDataType::HashMap(value) => Property::Metadata(Some(value)),
MpvDataType::Null => Property::Metadata(None),
_ => unimplemented!(),
},
_ => {
warn!("Property {} not implemented", name);
Property::Unknown {
name: name.to_string(),
data,
}
}
};
Event::PropertyChange { id, property }
}
pub fn listen(instance: &mut Mpv) -> Result<Event, Error> {
let mut e;
// sometimes we get responses unrelated to events, so we read a new line until we receive one
// with an event field
let name = loop {
let mut response = String::new(); let mut response = String::new();
instance.reader.read_line(&mut response).unwrap(); loop {
response = response.trim_end().to_string(); let (tx, ipc_command) = self.command_queue_receiver.recv().unwrap();
debug!("Event: {}", response); let command = serde_json::to_string(&ipc_command).unwrap();
e = serde_json::from_str::<Value>(&response) stream.write_all(command.as_bytes()).unwrap();
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?;
match e["event"] {
Value::String(ref name) => break name,
_ => {
// It was not an event - try again
debug!("Bad response: {:?}", response)
}
}
};
let event = match name.as_str() {
"shutdown" => Event::Shutdown,
"start-file" => Event::StartFile,
"file-loaded" => Event::FileLoaded,
"seek" => Event::Seek,
"playback-restart" => Event::PlaybackRestart,
"idle" => Event::Idle,
"tick" => Event::Tick,
"video-reconfig" => Event::VideoReconfig,
"audio-reconfig" => Event::AudioReconfig,
"tracks-changed" => Event::TracksChanged,
"track-switched" => Event::TrackSwitched,
"pause" => Event::Pause,
"unpause" => Event::Unpause,
"metadata-update" => Event::MetadataUpdate,
"chapter-change" => Event::ChapterChange,
"end-file" => Event::EndFile,
"property-change" => {
let name = match e["name"] {
Value::String(ref n) => Ok(n.to_string()),
_ => Err(Error(ErrorCode::JsonContainsUnexptectedType)),
}?;
let id: usize = match e["id"] {
Value::Number(ref n) => n.as_u64().unwrap() as usize,
_ => 0,
};
let data: MpvDataType = match e["data"] {
Value::String(ref n) => MpvDataType::String(n.to_string()),
Value::Array(ref a) => {
if name == "playlist".to_string() {
MpvDataType::Playlist(Playlist(json_array_to_playlist(a)))
} else {
MpvDataType::Array(json_array_to_vec(a))
}
}
Value::Bool(b) => MpvDataType::Bool(b),
Value::Number(ref n) => {
if n.is_u64() {
MpvDataType::Usize(n.as_u64().unwrap() as usize)
} else if n.is_f64() {
MpvDataType::Double(n.as_f64().unwrap())
} else {
return Err(Error(ErrorCode::JsonContainsUnexptectedType));
}
}
Value::Object(ref m) => MpvDataType::HashMap(json_map_to_hashmap(m)),
Value::Null => MpvDataType::Null,
};
try_convert_property(name.as_ref(), id, data)
}
"client-message" => {
let args = match e["args"] {
Value::Array(ref a) => json_array_to_vec(a)
.iter()
.map(|arg| match arg {
MpvDataType::String(s) => Ok(s.to_owned()),
_ => Err(Error(ErrorCode::JsonContainsUnexptectedType)),
})
.collect::<Result<Vec<_>, _>>(),
_ => return Err(Error(ErrorCode::JsonContainsUnexptectedType)),
}?;
Event::ClientMessage { args }
}
_ => Event::Unimplemented,
};
Ok(event)
}
pub fn listen_raw(instance: &mut Mpv) -> String {
let mut response = String::new();
instance.reader.read_line(&mut response).unwrap();
response.trim_end().to_string()
}
fn send_command_sync(instance: &Mpv, command: Value) -> String {
let stream = &instance.stream;
match serde_json::to_writer(stream, &command) {
Err(why) => panic!("Error: Could not write to socket: {}", why),
Ok(_) => {
let mut stream = stream;
stream.write_all(b"\n").unwrap(); stream.write_all(b"\n").unwrap();
let mut response = String::new(); response.clear();
{ reader.read_line(&mut response).unwrap();
let mut reader = BufReader::new(stream);
while !response.contains("\"error\":") { let response = serde_json::from_str::<Value>(&response)
response.clear(); .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))
reader.read_line(&mut response).unwrap(); .and_then(|value| extract_mpv_response_data(&value).map(|data| data.to_owned()));
}
} tx.send(response).unwrap();
debug!("Response: {}", response.trim_end());
response
} }
} }
pub(crate) async fn send_command(&self, command: &[&str]) -> Result<Value, Error> {
let ipc_command = json!({ "command": command });
log::trace!("Sending command: {:?}", ipc_command);
let (tx, rx) = mpsc::channel();
self.command_queue_sender.send((tx, ipc_command)).unwrap();
let result = rx.recv().unwrap();
log::trace!("Received response: {:?}", result);
result
}
pub(crate) async fn get_mpv_property<T: TypeHandler>(
&self,
property: &str,
) -> Result<T, Error> {
self.send_command(&["get_property", property])
.await
.and_then(|val| T::get_value(val))
}
pub(crate) async fn get_mpv_property_string(&self, property: &str) -> Result<String, Error> {
let val = self.send_command(&["get_property", property]).await?;
match extract_mpv_response_data(&val)? {
Value::Bool(b) => Ok(b.to_string()),
Value::Number(ref n) => Ok(n.to_string()),
Value::String(ref s) => Ok(s.to_string()),
Value::Array(ref array) => Ok(format!("{:?}", array)),
Value::Object(ref map) => Ok(format!("{:?}", map)),
Value::Null => Err(Error(ErrorCode::MissingValue)),
}
}
pub(crate) async fn set_mpv_property(&self, property: &str, value: Value) -> Result<(), Error> {
let str_value = serde_json::to_string(&value).unwrap();
self.send_command(&["set_property", property, &str_value])
.await
.map(|_| ())
}
pub(crate) async fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> {
self.send_command(&["observe_property", &id.to_string(), property])
.await
.map(|_| ())
}
pub(crate) async fn unobserve_property(&self, id: isize) -> Result<(), Error> {
self.send_command(&["unobserve_property", &id.to_string()])
.await
.map(|_| ())
}
} }