Compare commits

...

3 Commits

Author SHA1 Message Date
Oystein Kristoffer Tveit 7470a59404
WIP: add tests 2024-04-19 00:13:27 +02:00
Oystein Kristoffer Tveit dbfe543f59
nix support 2024-04-19 00:12:05 +02:00
Oystein Kristoffer Tveit 61e20f4923
Make entire project async
This moves all communication with Mpv's unix socket into another tokio
task, and uses message passing through clonable mpsc channels to receive
commands to execute, and to send responses.
2024-04-19 00:12:04 +02:00
9 changed files with 964 additions and 646 deletions

View File

@ -8,11 +8,20 @@ homepage = "https://gitlab.com/mpv-ipc/mpvipc"
repository = "https://gitlab.com/mpv-ipc/mpvipc"
documentation = "https://docs.rs/mpvipc/"
edition = "2021"
rust-version = "1.75"
[dependencies]
serde_json = "1.0.104"
log = "0.4.19"
serde = { version = "1.0.197", features = ["derive"] }
tokio = { version = "1.37.0", features = ["sync", "macros", "rt", "net"] }
tokio-util = { version = "0.7.10", features = ["codec"] }
futures = "0.3.30"
[dev-dependencies]
env_logger = "0.10.0"
test-log = "0.2.15"
tokio = { version = "1.37.0", features = ["rt-multi-thread", "time"] }
[lib]
doctest = false

View File

@ -1,15 +1,16 @@
use env_logger;
use mpvipc::{Error as MpvError, Mpv};
fn main() -> Result<(), MpvError> {
#[tokio::main]
async fn main() -> Result<(), MpvError> {
env_logger::init();
let mpv = Mpv::connect("/tmp/mpv.sock")?;
let meta = mpv.get_metadata()?;
let mpv = Mpv::connect("/tmp/mpv.sock").await?;
let meta = mpv.get_metadata().await?;
println!("metadata: {:?}", meta);
let playlist = mpv.get_playlist()?;
let playlist = mpv.get_playlist().await?;
println!("playlist: {:?}", playlist);
let playback_time: f64 = mpv.get_property("playback-time")?;
let playback_time: f64 = mpv.get_property("playback-time").await?;
println!("playback-time: {}", playback_time);
Ok(())
}

View File

@ -11,58 +11,60 @@ fn seconds_to_hms(total: f64) -> String {
format!("{:02}:{:02}:{:02}", hours, minutes, seconds)
}
fn main() -> Result<(), Error> {
#[tokio::main]
async fn main() -> Result<(), Error> {
env_logger::init();
let mut mpv = Mpv::connect("/tmp/mpv.sock")?;
let mut mpv = Mpv::connect("/tmp/mpv.sock").await?;
let mut pause = false;
let mut playback_time = std::f64::NAN;
let mut duration = std::f64::NAN;
mpv.observe_property(1, "path")?;
mpv.observe_property(2, "pause")?;
mpv.observe_property(3, "playback-time")?;
mpv.observe_property(4, "duration")?;
mpv.observe_property(5, "metadata")?;
mpv.observe_property(1, "path").await?;
mpv.observe_property(2, "pause").await?;
mpv.observe_property(3, "playback-time").await?;
mpv.observe_property(4, "duration").await?;
mpv.observe_property(5, "metadata").await?;
loop {
let event = mpv.event_listen()?;
match event {
Event::PropertyChange { id: _, property } => match property {
Property::Path(Some(value)) => println!("\nPlaying: {}", value),
Property::Path(None) => (),
Property::Pause(value) => pause = value,
Property::PlaybackTime(Some(value)) => playback_time = value,
Property::PlaybackTime(None) => playback_time = std::f64::NAN,
Property::Duration(Some(value)) => duration = value,
Property::Duration(None) => duration = std::f64::NAN,
Property::Metadata(Some(value)) => {
println!("File tags:");
if let Some(MpvDataType::String(value)) = value.get("ARTIST") {
println!(" Artist: {}", value);
}
if let Some(MpvDataType::String(value)) = value.get("ALBUM") {
println!(" Album: {}", value);
}
if let Some(MpvDataType::String(value)) = value.get("TITLE") {
println!(" Title: {}", value);
}
if let Some(MpvDataType::String(value)) = value.get("TRACK") {
println!(" Track: {}", value);
}
}
Property::Metadata(None) => (),
Property::Unknown { name: _, data: _ } => (),
},
Event::Shutdown => return Ok(()),
Event::Unimplemented => panic!("Unimplemented event"),
_ => (),
}
print!(
"{}{} / {} ({:.0}%)\r",
if pause { "(Paused) " } else { "" },
seconds_to_hms(playback_time),
seconds_to_hms(duration),
100. * playback_time / duration
);
io::stdout().flush().unwrap();
// TODO:
// let event = mpv.event_listen()?;
// match event {
// Event::PropertyChange { id: _, property } => match property {
// Property::Path(Some(value)) => println!("\nPlaying: {}", value),
// Property::Path(None) => (),
// Property::Pause(value) => pause = value,
// Property::PlaybackTime(Some(value)) => playback_time = value,
// Property::PlaybackTime(None) => playback_time = std::f64::NAN,
// Property::Duration(Some(value)) => duration = value,
// Property::Duration(None) => duration = std::f64::NAN,
// Property::Metadata(Some(value)) => {
// println!("File tags:");
// if let Some(MpvDataType::String(value)) = value.get("ARTIST") {
// println!(" Artist: {}", value);
// }
// if let Some(MpvDataType::String(value)) = value.get("ALBUM") {
// println!(" Album: {}", value);
// }
// if let Some(MpvDataType::String(value)) = value.get("TITLE") {
// println!(" Title: {}", value);
// }
// if let Some(MpvDataType::String(value)) = value.get("TRACK") {
// println!(" Track: {}", value);
// }
// }
// Property::Metadata(None) => (),
// Property::Unknown { name: _, data: _ } => (),
// },
// Event::Shutdown => return Ok(()),
// Event::Unimplemented => panic!("Unimplemented event"),
// _ => (),
// }
// print!(
// "{}{} / {} ({:.0}%)\r",
// if pause { "(Paused) " } else { "" },
// seconds_to_hms(playback_time),
// seconds_to_hms(duration),
// 100. * playback_time / duration
// );
// io::stdout().flush().unwrap();
}
}

66
flake.lock Normal file
View File

@ -0,0 +1,66 @@
{
"nodes": {
"fenix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1713421495,
"narHash": "sha256-5vVF9W1tJT+WdfpWAEG76KywktKDAW/71mVmNHEHjac=",
"owner": "nix-community",
"repo": "fenix",
"rev": "fd47b1f9404fae02a4f38bd9f4b12bad7833c96b",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "fenix",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1713248628,
"narHash": "sha256-NLznXB5AOnniUtZsyy/aPWOk8ussTuePp2acb9U+ISA=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "5672bc9dbf9d88246ddab5ac454e82318d094bb8",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"fenix": "fenix",
"nixpkgs": "nixpkgs"
}
},
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1713373173,
"narHash": "sha256-octd9BFY9G/Gbr4KfwK4itZp4Lx+qvJeRRcYnN+dEH8=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "46702ffc1a02a2ac153f1d1ce619ec917af8f3a6",
"type": "github"
},
"original": {
"owner": "rust-lang",
"ref": "nightly",
"repo": "rust-analyzer",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

36
flake.nix Normal file
View File

@ -0,0 +1,36 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
fenix.url = "github:nix-community/fenix";
fenix.inputs.nixpkgs.follows = "nixpkgs";
};
outputs = { self, nixpkgs, fenix }@inputs:
let
systems = [
"x86_64-linux"
"aarch64-linux"
"aarch64-darwin"
];
forAllSystems = f: nixpkgs.lib.genAttrs systems (system: let
toolchain = fenix.packages.${system}.complete;
pkgs = import nixpkgs {
inherit system;
overlays = [
(_: super: let pkgs = fenix.inputs.nixpkgs.legacyPackages.${system}; in fenix.overlays.default pkgs pkgs)
];
};
in f system pkgs toolchain);
in {
devShell = forAllSystems (system: pkgs: toolchain: pkgs.mkShell {
packages = [
(toolchain.withComponents [
"cargo" "rustc" "rustfmt" "clippy"
])
pkgs.mpv
];
RUST_SRC_PATH = "${toolchain.rust-src}/lib/rustlib/src/rust/";
});
};
}

View File

@ -1,14 +1,13 @@
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::fmt::{self, Display};
use std::io::{BufReader, Read};
use std::os::unix::net::UnixStream;
use crate::ipc::{
get_mpv_property, get_mpv_property_string, listen, listen_raw, observe_mpv_property,
run_mpv_command, set_mpv_property, unobserve_mpv_property,
use serde_json::Value;
use std::{
collections::HashMap,
fmt::{self, Display},
};
use tokio::{net::UnixStream, sync::oneshot};
use crate::ipc::{MpvIpc, MpvIpcCommand, MpvIpcResponse};
use crate::message_parser::TypeHandler;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Event {
@ -80,6 +79,10 @@ pub enum MpvCommand {
Unobserve(isize),
}
trait IntoRawCommandPart {
fn into_raw_command_part(self) -> String;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MpvDataType {
Array(Vec<MpvDataType>),
@ -99,12 +102,31 @@ pub enum NumberChangeOptions {
Decrease,
}
impl IntoRawCommandPart for NumberChangeOptions {
fn into_raw_command_part(self) -> String {
match self {
NumberChangeOptions::Absolute => "absolute".to_string(),
NumberChangeOptions::Increase => "increase".to_string(),
NumberChangeOptions::Decrease => "decrease".to_string(),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum PlaylistAddOptions {
Replace,
Append,
}
impl IntoRawCommandPart for PlaylistAddOptions {
fn into_raw_command_part(self) -> String {
match self {
PlaylistAddOptions::Replace => "replace".to_string(),
PlaylistAddOptions::Append => "append".to_string(),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum PlaylistAddTypeOptions {
File,
@ -119,6 +141,17 @@ pub enum SeekOptions {
AbsolutePercent,
}
impl IntoRawCommandPart for SeekOptions {
fn into_raw_command_part(self) -> String {
match self {
SeekOptions::Relative => "relative".to_string(),
SeekOptions::Absolute => "absolute".to_string(),
SeekOptions::RelativePercent => "relative-percent".to_string(),
SeekOptions::AbsolutePercent => "absolute-percent".to_string(),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum Switch {
On,
@ -126,7 +159,7 @@ pub enum Switch {
Toggle,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ErrorCode {
MpvError(String),
JsonParseError(String),
@ -144,7 +177,7 @@ pub enum ErrorCode {
ValueDoesNotContainUsize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PlaylistEntry {
pub id: usize,
pub filename: String,
@ -152,52 +185,64 @@ pub struct PlaylistEntry {
pub current: bool,
}
pub struct Mpv {
pub(crate) stream: UnixStream,
pub(crate) reader: BufReader<UnixStream>,
pub(crate) name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Playlist(pub Vec<PlaylistEntry>);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub trait GetPropertyTypeHandler: Sized {
// TODO: fix this
#[allow(async_fn_in_trait)]
async fn get_property_generic(instance: &Mpv, property: &str) -> Result<Self, Error>;
}
impl<T> GetPropertyTypeHandler for T
where
T: TypeHandler,
{
async fn get_property_generic(instance: &Mpv, property: &str) -> Result<T, Error> {
instance
.get_property_value(property)
.await
.and_then(T::get_value)
}
}
pub trait SetPropertyTypeHandler<T> {
// TODO: fix this
#[allow(async_fn_in_trait)]
async fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error>;
}
impl<T> SetPropertyTypeHandler<T> for T
where
T: Serialize,
{
async fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error> {
let (res_tx, res_rx) = oneshot::channel();
let value = serde_json::to_value(value)
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?;
instance
.command_sender
.send((
MpvIpcCommand::SetProperty(property.to_owned(), value),
res_tx,
))
.await
.map_err(|_| {
Error(ErrorCode::ConnectError(
"Failed to send command".to_string(),
))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response.map(|_| ()),
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Error(pub ErrorCode);
impl Drop for Mpv {
fn drop(&mut self) {
self.disconnect();
}
}
impl fmt::Debug for Mpv {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("Mpv").field(&self.name).finish()
}
}
impl Clone for Mpv {
fn clone(&self) -> Self {
let stream = self.stream.try_clone().expect("cloning UnixStream");
let cloned_stream = stream.try_clone().expect("cloning UnixStream");
Mpv {
stream,
reader: BufReader::new(cloned_stream),
name: self.name.clone(),
}
}
fn clone_from(&mut self, source: &Self) {
let stream = source.stream.try_clone().expect("cloning UnixStream");
let cloned_stream = stream.try_clone().expect("cloning UnixStream");
*self = Mpv {
stream,
reader: BufReader::new(cloned_stream),
name: source.name.clone(),
}
}
}
impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(&self.0, f)
@ -241,119 +286,69 @@ impl Display for ErrorCode {
}
}
pub trait GetPropertyTypeHandler: Sized {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<Self, Error>;
#[derive(Clone)]
pub struct Mpv {
command_sender: tokio::sync::mpsc::Sender<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
}
impl GetPropertyTypeHandler for bool {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<bool, Error> {
get_mpv_property::<bool>(instance, property)
}
}
impl GetPropertyTypeHandler for String {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<String, Error> {
get_mpv_property::<String>(instance, property)
}
}
impl GetPropertyTypeHandler for f64 {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<f64, Error> {
get_mpv_property::<f64>(instance, property)
}
}
impl GetPropertyTypeHandler for usize {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<usize, Error> {
get_mpv_property::<usize>(instance, property)
}
}
impl GetPropertyTypeHandler for Vec<PlaylistEntry> {
fn get_property_generic(instance: &Mpv, property: &str) -> Result<Vec<PlaylistEntry>, Error> {
get_mpv_property::<Vec<PlaylistEntry>>(instance, property)
}
}
impl GetPropertyTypeHandler for HashMap<String, MpvDataType> {
fn get_property_generic(
instance: &Mpv,
property: &str,
) -> Result<HashMap<String, MpvDataType>, Error> {
get_mpv_property::<HashMap<String, MpvDataType>>(instance, property)
}
}
pub trait SetPropertyTypeHandler<T> {
fn set_property_generic(instance: &Mpv, property: &str, value: T) -> Result<(), Error>;
}
impl SetPropertyTypeHandler<bool> for bool {
fn set_property_generic(instance: &Mpv, property: &str, value: bool) -> Result<(), Error> {
set_mpv_property(instance, property, json!(value))
}
}
impl SetPropertyTypeHandler<String> for String {
fn set_property_generic(instance: &Mpv, property: &str, value: String) -> Result<(), Error> {
set_mpv_property(instance, property, json!(value))
}
}
impl SetPropertyTypeHandler<f64> for f64 {
fn set_property_generic(instance: &Mpv, property: &str, value: f64) -> Result<(), Error> {
set_mpv_property(instance, property, json!(value))
}
}
impl SetPropertyTypeHandler<usize> for usize {
fn set_property_generic(instance: &Mpv, property: &str, value: usize) -> Result<(), Error> {
set_mpv_property(instance, property, json!(value))
impl fmt::Debug for Mpv {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Mpv").finish()
}
}
impl Mpv {
pub fn connect(socket: &str) -> Result<Mpv, Error> {
match UnixStream::connect(socket) {
Ok(stream) => {
let cloned_stream = stream.try_clone().expect("cloning UnixStream");
return Ok(Mpv {
stream,
reader: BufReader::new(cloned_stream),
name: String::from(socket),
});
}
pub async fn connect(socket_path: &str) -> Result<Mpv, Error> {
log::debug!("Connecting to mpv socket at {}", socket_path);
let socket = match UnixStream::connect(socket_path).await {
Ok(stream) => Ok(stream),
Err(internal_error) => Err(Error(ErrorCode::ConnectError(internal_error.to_string()))),
}?;
Self::connect_socket(socket).await
}
pub async fn connect_socket(socket: UnixStream) -> Result<Mpv, Error> {
let (com_tx, com_rx) = tokio::sync::mpsc::channel(100);
let ipc = MpvIpc::new(socket, com_rx);
log::debug!("Starting IPC handler");
tokio::spawn(ipc.run());
Ok(Mpv {
command_sender: com_tx,
})
}
pub async fn disconnect(&self) -> Result<(), Error> {
let (res_tx, res_rx) = oneshot::channel();
self.command_sender
.send((MpvIpcCommand::Exit, res_tx))
.await
.map_err(|_| {
Error(ErrorCode::ConnectError(
"Failed to send command".to_string(),
))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response.map(|_| ()),
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
}
pub fn disconnect(&self) {
let mut stream = &self.stream;
stream
.shutdown(std::net::Shutdown::Both)
.expect("socket disconnect");
let mut buffer = [0; 32];
for _ in 0..stream.bytes().count() {
stream.read(&mut buffer[..]).unwrap();
}
// pub fn get_stream_ref(&self) -> &UnixStream {
// &self.stream
// }
pub async fn get_metadata(&self) -> Result<HashMap<String, MpvDataType>, Error> {
self.get_property("metadata").await
}
pub fn get_stream_ref(&self) -> &UnixStream {
&self.stream
}
pub fn get_metadata(&self) -> Result<HashMap<String, MpvDataType>, Error> {
match get_mpv_property(self, "metadata") {
Ok(map) => Ok(map),
Err(err) => Err(err),
}
}
pub fn get_playlist(&self) -> Result<Playlist, Error> {
match get_mpv_property::<Vec<PlaylistEntry>>(self, "playlist") {
Ok(entries) => Ok(Playlist(entries)),
Err(msg) => Err(msg),
}
pub async fn get_playlist(&self) -> Result<Playlist, Error> {
self.get_property::<Vec<PlaylistEntry>>("playlist")
.await
.map(|entries| Playlist(entries))
}
/// # Description
@ -375,15 +370,18 @@ impl Mpv {
/// # Example
/// ```
/// use mpvipc::{Mpv, Error};
/// fn main() -> Result<(), Error> {
/// async fn main() -> Result<(), Error> {
/// let mpv = Mpv::connect("/tmp/mpvsocket")?;
/// let paused: bool = mpv.get_property("pause")?;
/// let title: String = mpv.get_property("media-title")?;
/// let paused: bool = mpv.get_property("pause").await?;
/// let title: String = mpv.get_property("media-title").await?;
/// Ok(())
/// }
/// ```
pub fn get_property<T: GetPropertyTypeHandler>(&self, property: &str) -> Result<T, Error> {
T::get_property_generic(self, property)
pub async fn get_property<T: GetPropertyTypeHandler>(
&self,
property: &str,
) -> Result<T, Error> {
T::get_property_generic(self, property).await
}
/// # Description
@ -405,12 +403,24 @@ impl Mpv {
/// Ok(())
/// }
/// ```
pub fn get_property_string(&self, property: &str) -> Result<String, Error> {
get_mpv_property_string(self, property)
pub async fn get_property_value(&self, property: &str) -> Result<Value, Error> {
let (res_tx, res_rx) = oneshot::channel();
self.command_sender
.send((MpvIpcCommand::GetProperty(property.to_owned()), res_tx))
.await
.map_err(|_| {
Error(ErrorCode::ConnectError(
"Failed to send command".to_string(),
))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response,
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
}
pub fn kill(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Quit)
pub async fn kill(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Quit).await
}
/// # Description
@ -426,42 +436,44 @@ impl Mpv {
/// println!("{:?}", event);
/// }
/// ```
pub fn event_listen(&mut self) -> Result<Event, Error> {
listen(self)
// pub fn event_listen(&mut self) -> Result<Event, Error> {
// listen(self)
// }
// pub fn event_listen_raw(&mut self) -> String {
// listen_raw(self)
// }
pub async fn next(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistNext).await
}
pub fn event_listen_raw(&mut self) -> String {
listen_raw(self)
}
pub fn next(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistNext)
}
pub fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> {
pub async fn observe_property(&self, id: isize, property: &str) -> Result<(), Error> {
self.run_command(MpvCommand::Observe {
id: id,
id,
property: property.to_string(),
})
.await
}
pub fn unobserve_property(&self, id: isize) -> Result<(), Error> {
self.run_command(MpvCommand::Unobserve(id))
pub async fn unobserve_property(&self, id: isize) -> Result<(), Error> {
self.run_command(MpvCommand::Unobserve(id)).await
}
pub fn pause(&self) -> Result<(), Error> {
set_mpv_property(self, "pause", json!(true))
pub async fn pause(&self) -> Result<(), Error> {
self.set_property("pause", true).await
}
pub fn prev(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistPrev)
pub async fn prev(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistPrev).await
}
pub fn restart(&self) -> Result<(), Error> {
pub async fn restart(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Seek {
seconds: 0f64,
option: SeekOptions::Absolute,
})
.await
}
/// # Description
@ -490,183 +502,254 @@ impl Mpv {
/// Ok(())
/// }
/// ```
pub fn run_command(&self, command: MpvCommand) -> Result<(), Error> {
match command {
MpvCommand::LoadFile { file, option } => run_mpv_command(
self,
pub async fn run_command(&self, command: MpvCommand) -> Result<(), Error> {
log::trace!("Running command: {:?}", command);
let result = match command {
MpvCommand::LoadFile { file, option } => {
self.run_command_raw_ignore_value(
"loadfile",
&[
file.as_ref(),
match option {
PlaylistAddOptions::Append => "append",
PlaylistAddOptions::Replace => "replace",
},
],
),
MpvCommand::LoadList { file, option } => run_mpv_command(
self,
&[file.as_ref(), option.into_raw_command_part().as_str()],
)
.await
}
MpvCommand::LoadList { file, option } => {
self.run_command_raw_ignore_value(
"loadlist",
&[
file.as_ref(),
match option {
PlaylistAddOptions::Append => "append",
PlaylistAddOptions::Replace => "replace",
},
],
),
MpvCommand::Observe { id, property } => observe_mpv_property(self, &id, &property),
MpvCommand::PlaylistClear => run_mpv_command(self, "playlist-clear", &[]),
&[file.as_ref(), option.into_raw_command_part().as_str()],
)
.await
}
MpvCommand::Observe { id, property } => {
let (res_tx, res_rx) = oneshot::channel();
self.command_sender
.send((MpvIpcCommand::ObserveProperty(id, property), res_tx))
.await
.map_err(|_| {
Error(ErrorCode::ConnectError(
"Failed to send command".to_string(),
))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response.map(|_| ()),
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
}
MpvCommand::PlaylistClear => {
self.run_command_raw_ignore_value("playlist-clear", &[])
.await
}
MpvCommand::PlaylistMove { from, to } => {
run_mpv_command(self, "playlist-move", &[&from.to_string(), &to.to_string()])
self.run_command_raw_ignore_value(
"playlist-move",
&[&from.to_string(), &to.to_string()],
)
.await
}
MpvCommand::PlaylistNext => {
self.run_command_raw_ignore_value("playlist-next", &[])
.await
}
MpvCommand::PlaylistPrev => {
self.run_command_raw_ignore_value("playlist-prev", &[])
.await
}
MpvCommand::PlaylistNext => run_mpv_command(self, "playlist-next", &[]),
MpvCommand::PlaylistPrev => run_mpv_command(self, "playlist-prev", &[]),
MpvCommand::PlaylistRemove(id) => {
run_mpv_command(self, "playlist-remove", &[&id.to_string()])
self.run_command_raw_ignore_value("playlist-remove", &[&id.to_string()])
.await
}
MpvCommand::PlaylistShuffle => run_mpv_command(self, "playlist-shuffle", &[]),
MpvCommand::Quit => run_mpv_command(self, "quit", &[]),
MpvCommand::PlaylistShuffle => {
self.run_command_raw_ignore_value("playlist-shuffle", &[])
.await
}
MpvCommand::Quit => self.run_command_raw_ignore_value("quit", &[]).await,
MpvCommand::ScriptMessage(args) => {
let str_args: Vec<_> = args.iter().map(String::as_str).collect();
run_mpv_command(self, "script-message", &str_args)
self.run_command_raw_ignore_value("script-message", &str_args)
.await
}
MpvCommand::ScriptMessageTo { target, args } => {
let mut cmd_args: Vec<_> = vec![target.as_str()];
let mut str_args: Vec<_> = args.iter().map(String::as_str).collect();
cmd_args.append(&mut str_args);
run_mpv_command(self, "script-message-to", &cmd_args)
self.run_command_raw_ignore_value("script-message-to", &cmd_args)
.await
}
MpvCommand::Seek { seconds, option } => run_mpv_command(
self,
MpvCommand::Seek { seconds, option } => {
self.run_command_raw_ignore_value(
"seek",
&[
&seconds.to_string(),
match option {
SeekOptions::Absolute => "absolute",
SeekOptions::Relative => "relative",
SeekOptions::AbsolutePercent => "absolute-percent",
SeekOptions::RelativePercent => "relative-percent",
},
option.into_raw_command_part().as_str(),
],
),
MpvCommand::Stop => run_mpv_command(self, "stop", &[]),
MpvCommand::Unobserve(id) => unobserve_mpv_property(self, &id),
)
.await
}
MpvCommand::Stop => self.run_command_raw_ignore_value("stop", &[]).await,
MpvCommand::Unobserve(id) => {
let (res_tx, res_rx) = oneshot::channel();
self.command_sender
.send((MpvIpcCommand::UnobserveProperty(id), res_tx))
.await
.unwrap();
match res_rx.await {
Ok(MpvIpcResponse(response)) => response.map(|_| ()),
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
}
};
log::trace!("Command result: {:?}", result);
result
}
/// Run a custom command.
/// This should only be used if the desired command is not implemented
/// with [MpvCommand].
pub fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<(), Error> {
run_mpv_command(self, command, args)
pub async fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<Value, Error> {
let command = Vec::from(
[command]
.iter()
.chain(args.iter())
.map(|s| s.to_string())
.collect::<Vec<String>>()
.as_slice(),
);
let (res_tx, res_rx) = oneshot::channel();
self.command_sender
.send((MpvIpcCommand::Command(command), res_tx))
.await
.map_err(|_| {
Error(ErrorCode::ConnectError(
"Failed to send command".to_string(),
))
})?;
match res_rx.await {
Ok(MpvIpcResponse(response)) => response,
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
}
}
pub fn playlist_add(
async fn run_command_raw_ignore_value(
&self,
command: &str,
args: &[&str],
) -> Result<(), Error> {
self.run_command_raw(command, args).await.map(|_| ())
}
pub async fn playlist_add(
&self,
file: &str,
file_type: PlaylistAddTypeOptions,
option: PlaylistAddOptions,
) -> Result<(), Error> {
match file_type {
PlaylistAddTypeOptions::File => self.run_command(MpvCommand::LoadFile {
PlaylistAddTypeOptions::File => {
self.run_command(MpvCommand::LoadFile {
file: file.to_string(),
option,
}),
})
.await
}
PlaylistAddTypeOptions::Playlist => self.run_command(MpvCommand::LoadList {
PlaylistAddTypeOptions::Playlist => {
self.run_command(MpvCommand::LoadList {
file: file.to_string(),
option,
}),
})
.await
}
}
}
pub fn playlist_clear(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistClear)
pub async fn playlist_clear(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistClear).await
}
pub fn playlist_move_id(&self, from: usize, to: usize) -> Result<(), Error> {
pub async fn playlist_move_id(&self, from: usize, to: usize) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistMove { from, to })
.await
}
pub fn playlist_play_id(&self, id: usize) -> Result<(), Error> {
set_mpv_property(self, "playlist-pos", json!(id))
pub async fn playlist_play_id(&self, id: usize) -> Result<(), Error> {
self.set_property("playlist-pos", id).await
}
pub fn playlist_play_next(&self, id: usize) -> Result<(), Error> {
match get_mpv_property::<usize>(self, "playlist-pos") {
Ok(current_id) => self.run_command(MpvCommand::PlaylistMove {
pub async fn playlist_play_next(&self, id: usize) -> Result<(), Error> {
match self.get_property::<usize>("playlist-pos").await {
Ok(current_id) => {
self.run_command(MpvCommand::PlaylistMove {
from: id,
to: current_id + 1,
}),
})
.await
}
Err(msg) => Err(msg),
}
}
pub fn playlist_remove_id(&self, id: usize) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistRemove(id))
pub async fn playlist_remove_id(&self, id: usize) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistRemove(id)).await
}
pub fn playlist_shuffle(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistShuffle)
pub async fn playlist_shuffle(&self) -> Result<(), Error> {
self.run_command(MpvCommand::PlaylistShuffle).await
}
pub fn seek(&self, seconds: f64, option: SeekOptions) -> Result<(), Error> {
self.run_command(MpvCommand::Seek { seconds, option })
pub async fn seek(&self, seconds: f64, option: SeekOptions) -> Result<(), Error> {
self.run_command(MpvCommand::Seek { seconds, option }).await
}
pub fn set_loop_file(&self, option: Switch) -> Result<(), Error> {
let mut enabled = false;
match option {
Switch::On => enabled = true,
Switch::Off => {}
Switch::Toggle => match get_mpv_property_string(self, "loop-file") {
Ok(value) => match value.as_ref() {
"false" => {
enabled = true;
pub async fn set_loop_file(&self, option: Switch) -> Result<(), Error> {
let enabled = match option {
Switch::On => "inf",
Switch::Off => "no",
Switch::Toggle => {
self.get_property::<String>("loop-file")
.await
.map(|s| match s.as_str() {
"inf" => "no",
"no" => "inf",
_ => "no",
})?
}
_ => {
enabled = false;
}
},
Err(msg) => return Err(msg),
},
}
set_mpv_property(self, "loop-file", json!(enabled))
};
self.set_property("loop-file", enabled).await
}
pub fn set_loop_playlist(&self, option: Switch) -> Result<(), Error> {
let mut enabled = false;
match option {
Switch::On => enabled = true,
Switch::Off => {}
Switch::Toggle => match get_mpv_property_string(self, "loop-playlist") {
Ok(value) => match value.as_ref() {
"false" => {
enabled = true;
pub async fn set_loop_playlist(&self, option: Switch) -> Result<(), Error> {
let enabled = match option {
Switch::On => "inf",
Switch::Off => "no",
Switch::Toggle => {
self.get_property::<String>("loop-playlist")
.await
.map(|s| match s.as_str() {
"inf" => "no",
"no" => "inf",
_ => "no",
})?
}
_ => {
enabled = false;
}
},
Err(msg) => return Err(msg),
},
}
set_mpv_property(self, "loop-playlist", json!(enabled))
};
self.set_property("loop-playlist", enabled).await
}
pub fn set_mute(&self, option: Switch) -> Result<(), Error> {
let mut enabled = false;
match option {
Switch::On => enabled = true,
Switch::Off => {}
Switch::Toggle => match get_mpv_property::<bool>(self, "mute") {
Ok(value) => {
enabled = !value;
pub async fn set_mute(&self, option: Switch) -> Result<(), Error> {
let enabled = match option {
Switch::On => "yes",
Switch::Off => "no",
Switch::Toggle => {
self.get_property::<String>("mute")
.await
.map(|s| match s.as_str() {
"yes" => "no",
"no" => "yes",
_ => "no",
})?
}
Err(msg) => return Err(msg),
},
}
set_mpv_property(self, "mute", json!(enabled))
};
self.set_property("mute", enabled).await
}
/// # Description
@ -687,63 +770,67 @@ impl Mpv {
/// # Example
/// ```
/// use mpvipc::{Mpv, Error};
/// fn main() -> Result<(), Error> {
/// fn async main() -> Result<(), Error> {
/// let mpv = Mpv::connect("/tmp/mpvsocket")?;
/// mpv.set_property("pause", true)?;
/// mpv.set_property("pause", true).await?;
/// Ok(())
/// }
/// ```
pub fn set_property<T: SetPropertyTypeHandler<T>>(
pub async fn set_property<T: SetPropertyTypeHandler<T>>(
&self,
property: &str,
value: T,
) -> Result<(), Error> {
T::set_property_generic(self, property, value)
T::set_property_generic(self, property, value).await
}
pub fn set_speed(&self, input_speed: f64, option: NumberChangeOptions) -> Result<(), Error> {
match get_mpv_property::<f64>(self, "speed") {
pub async fn set_speed(
&self,
input_speed: f64,
option: NumberChangeOptions,
) -> Result<(), Error> {
match self.get_property::<f64>("speed").await {
Ok(speed) => match option {
NumberChangeOptions::Increase => {
set_mpv_property(self, "speed", json!(speed + input_speed))
self.set_property("speed", speed + input_speed).await
}
NumberChangeOptions::Decrease => {
set_mpv_property(self, "speed", json!(speed - input_speed))
self.set_property("speed", speed - input_speed).await
}
NumberChangeOptions::Absolute => {
set_mpv_property(self, "speed", json!(input_speed))
}
NumberChangeOptions::Absolute => self.set_property("speed", input_speed).await,
},
Err(msg) => Err(msg),
}
}
pub fn set_volume(&self, input_volume: f64, option: NumberChangeOptions) -> Result<(), Error> {
match get_mpv_property::<f64>(self, "volume") {
pub async fn set_volume(
&self,
input_volume: f64,
option: NumberChangeOptions,
) -> Result<(), Error> {
match self.get_property::<f64>("volume").await {
Ok(volume) => match option {
NumberChangeOptions::Increase => {
set_mpv_property(self, "volume", json!(volume + input_volume))
self.set_property("volume", volume + input_volume).await
}
NumberChangeOptions::Decrease => {
set_mpv_property(self, "volume", json!(volume - input_volume))
self.set_property("volume", volume - input_volume).await
}
NumberChangeOptions::Absolute => {
set_mpv_property(self, "volume", json!(input_volume))
}
NumberChangeOptions::Absolute => self.set_property("volume", input_volume).await,
},
Err(msg) => Err(msg),
}
}
pub fn stop(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Stop)
pub async fn stop(&self) -> Result<(), Error> {
self.run_command(MpvCommand::Stop).await
}
pub fn toggle(&self) -> Result<(), Error> {
run_mpv_command(self, "cycle", &["pause"])
pub async fn toggle(&self) -> Result<(), Error> {
self.run_command_raw("cycle", &["pause"]).await.map(|_| ())
}
}

View File

@ -1,248 +1,162 @@
use crate::message_parser::TypeHandler;
use self::message_parser::extract_mpv_response_data;
use self::message_parser::json_array_to_playlist;
use self::message_parser::json_array_to_vec;
use self::message_parser::json_map_to_hashmap;
use super::*;
use log::{debug, warn};
use serde_json::json;
use serde_json::Value;
use std::io::BufRead;
use std::io::BufReader;
use std::io::Write;
use futures::{SinkExt, StreamExt};
use serde_json::{json, Value};
use std::mem;
use tokio::net::UnixStream;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{oneshot, Mutex};
use tokio_util::codec::{Framed, LinesCodec};
pub fn get_mpv_property<T: TypeHandler>(instance: &Mpv, property: &str) -> Result<T, Error> {
let ipc_string = json!({"command": ["get_property", property]});
match serde_json::from_str::<Value>(&send_command_sync(instance, ipc_string)) {
Ok(val) => T::get_value(val),
Err(why) => Err(Error(ErrorCode::JsonParseError(why.to_string()))),
}
pub(crate) struct MpvIpc {
socket: Framed<UnixStream, LinesCodec>,
command_channel: Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
socket_lock: Mutex<()>,
}
pub fn get_mpv_property_string(instance: &Mpv, property: &str) -> Result<String, Error> {
let ipc_string = json!({"command": ["get_property", property]});
let val = serde_json::from_str::<Value>(&send_command_sync(instance, ipc_string))
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum MpvIpcCommand {
Command(Vec<String>),
GetProperty(String),
SetProperty(String, Value),
ObserveProperty(isize, String),
UnobserveProperty(isize),
Exit,
}
#[derive(Debug, Clone)]
pub(crate) struct MpvIpcResponse(pub(crate) Result<Value, Error>);
impl MpvIpc {
pub(crate) fn new(
socket: UnixStream,
command_channel: Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
) -> Self {
MpvIpc {
socket: Framed::new(socket, LinesCodec::new()),
command_channel,
socket_lock: Mutex::new(()),
}
}
pub(crate) async fn send_command(&mut self, command: &[&str]) -> Result<Value, Error> {
let lock = self.socket_lock.lock().await;
// START CRITICAL SECTION
let ipc_command = json!({ "command": command });
let ipc_command_str = serde_json::to_string(&ipc_command)
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?;
let data = extract_mpv_response_data(&val)?;
log::trace!("Sending command: {}", ipc_command_str);
match data {
Value::Bool(b) => Ok(b.to_string()),
Value::Number(ref n) => Ok(n.to_string()),
Value::String(ref s) => Ok(s.to_string()),
Value::Array(ref array) => Ok(format!("{:?}", array)),
Value::Object(ref map) => Ok(format!("{:?}", map)),
Value::Null => Err(Error(ErrorCode::MissingValue)),
}
}
self.socket
.send(ipc_command_str)
.await
.map_err(|why| Error(ErrorCode::ConnectError(why.to_string())))?;
fn validate_mpv_response(response: &str) -> Result<(), Error> {
serde_json::from_str::<Value>(response)
let response = self
.socket
.next()
.await
.ok_or(Error(ErrorCode::MissingValue))?
.map_err(|why| Error(ErrorCode::ConnectError(why.to_string())))?;
// END CRITICAL SECTION
mem::drop(lock);
log::trace!("Received response: {}", response);
serde_json::from_str::<Value>(&response)
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))
.and_then(|value| extract_mpv_response_data(&value).map(|_| ()))
.and_then(parse_mpv_response_data)
}
pub(crate) async fn get_mpv_property(&mut self, property: &str) -> Result<Value, Error> {
self.send_command(&["get_property", property]).await
}
pub(crate) async fn set_mpv_property(
&mut self,
property: &str,
value: Value,
) -> Result<Value, Error> {
let str_value = match &value {
Value::String(s) => s,
v => &serde_json::to_string(&v).unwrap()
};
self.send_command(&["set_property", property, &str_value])
.await
}
pub(crate) async fn observe_property(
&mut self,
id: isize,
property: &str,
) -> Result<Value, Error> {
self.send_command(&["observe_property", &id.to_string(), property])
.await
}
pub(crate) async fn unobserve_property(&mut self, id: isize) -> Result<Value, Error> {
self.send_command(&["unobserve_property", &id.to_string()])
.await
}
pub(crate) async fn run(mut self) -> Result<(), Error> {
loop {
tokio::select! {
Some(event) = self.socket.next() => {
log::trace!("Handling event: {:?}", serde_json::from_str::<Value>(&event.unwrap()).unwrap());
// TODO: handle event
}
Some((cmd, tx)) = self.command_channel.recv() => {
log::trace!("Handling command: {:?}", cmd);
match cmd {
MpvIpcCommand::Command(command) => {
let refs = command.iter().map(|s| s.as_str()).collect::<Vec<&str>>();
let response = self.send_command(refs.as_slice()).await;
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::GetProperty(property) => {
let response = self.get_mpv_property(&property).await;
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::SetProperty(property, value) => {
let response = self.set_mpv_property(&property, value).await;
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::ObserveProperty(id, property) => {
let response = self.observe_property(id, &property).await;
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::UnobserveProperty(id) => {
let response = self.unobserve_property(id).await;
tx.send(MpvIpcResponse(response)).unwrap()
}
MpvIpcCommand::Exit => {
tx.send(MpvIpcResponse(Ok(Value::Null))).unwrap();
return Ok(());
}
}
}
}
}
}
}
pub fn set_mpv_property(instance: &Mpv, property: &str, value: Value) -> Result<(), Error> {
let ipc_string = json!({
"command": ["set_property", property, value]
fn parse_mpv_response_data(value: Value) -> Result<Value, Error> {
log::trace!("Parsing mpv response data: {:?}", value);
let result = value
.as_object()
.map(|o| (o.get("error").and_then(|e| e.as_str()), o.get("data")))
.ok_or(Error(ErrorCode::UnexpectedValue))
.and_then(|(error, data)| match error {
Some("success") => data.ok_or(Error(ErrorCode::UnexpectedValue)),
Some(e) => Err(Error(ErrorCode::MpvError(e.to_string()))),
None => Err(Error(ErrorCode::UnexpectedValue)),
});
let response = &send_command_sync(instance, ipc_string);
validate_mpv_response(response)
}
pub fn run_mpv_command(instance: &Mpv, command: &str, args: &[&str]) -> Result<(), Error> {
let mut ipc_string = json!({
"command": [command]
});
if let Value::Array(args_array) = &mut ipc_string["command"] {
for arg in args {
args_array.push(json!(arg));
}
}
let response = &send_command_sync(instance, ipc_string);
validate_mpv_response(response)
}
pub fn observe_mpv_property(instance: &Mpv, id: &isize, property: &str) -> Result<(), Error> {
let ipc_string = json!({
"command": ["observe_property", id, property]
});
let response = &send_command_sync(instance, ipc_string);
validate_mpv_response(response)
}
pub fn unobserve_mpv_property(instance: &Mpv, id: &isize) -> Result<(), Error> {
let ipc_string = json!({
"command": ["unobserve_property", id]
});
let response = &send_command_sync(instance, ipc_string);
validate_mpv_response(response)
}
fn try_convert_property(name: &str, id: usize, data: MpvDataType) -> Event {
let property = match name {
"path" => match data {
MpvDataType::String(value) => Property::Path(Some(value)),
MpvDataType::Null => Property::Path(None),
_ => unimplemented!(),
},
"pause" => match data {
MpvDataType::Bool(value) => Property::Pause(value),
_ => unimplemented!(),
},
"playback-time" => match data {
MpvDataType::Double(value) => Property::PlaybackTime(Some(value)),
MpvDataType::Null => Property::PlaybackTime(None),
_ => unimplemented!(),
},
"duration" => match data {
MpvDataType::Double(value) => Property::Duration(Some(value)),
MpvDataType::Null => Property::Duration(None),
_ => unimplemented!(),
},
"metadata" => match data {
MpvDataType::HashMap(value) => Property::Metadata(Some(value)),
MpvDataType::Null => Property::Metadata(None),
_ => unimplemented!(),
},
_ => {
warn!("Property {} not implemented", name);
Property::Unknown {
name: name.to_string(),
data,
}
}
};
Event::PropertyChange { id, property }
}
pub fn listen(instance: &mut Mpv) -> Result<Event, Error> {
let mut e;
// sometimes we get responses unrelated to events, so we read a new line until we receive one
// with an event field
let name = loop {
let mut response = String::new();
instance.reader.read_line(&mut response).unwrap();
response = response.trim_end().to_string();
debug!("Event: {}", response);
e = serde_json::from_str::<Value>(&response)
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))?;
match e["event"] {
Value::String(ref name) => break name,
_ => {
// It was not an event - try again
debug!("Bad response: {:?}", response)
}
}
};
let event = match name.as_str() {
"shutdown" => Event::Shutdown,
"start-file" => Event::StartFile,
"file-loaded" => Event::FileLoaded,
"seek" => Event::Seek,
"playback-restart" => Event::PlaybackRestart,
"idle" => Event::Idle,
"tick" => Event::Tick,
"video-reconfig" => Event::VideoReconfig,
"audio-reconfig" => Event::AudioReconfig,
"tracks-changed" => Event::TracksChanged,
"track-switched" => Event::TrackSwitched,
"pause" => Event::Pause,
"unpause" => Event::Unpause,
"metadata-update" => Event::MetadataUpdate,
"chapter-change" => Event::ChapterChange,
"end-file" => Event::EndFile,
"property-change" => {
let name = match e["name"] {
Value::String(ref n) => Ok(n.to_string()),
_ => Err(Error(ErrorCode::JsonContainsUnexptectedType)),
}?;
let id: usize = match e["id"] {
Value::Number(ref n) => n.as_u64().unwrap() as usize,
_ => 0,
};
let data: MpvDataType = match e["data"] {
Value::String(ref n) => MpvDataType::String(n.to_string()),
Value::Array(ref a) => {
if name == "playlist".to_string() {
MpvDataType::Playlist(Playlist(json_array_to_playlist(a)))
} else {
MpvDataType::Array(json_array_to_vec(a))
}
}
Value::Bool(b) => MpvDataType::Bool(b),
Value::Number(ref n) => {
if n.is_u64() {
MpvDataType::Usize(n.as_u64().unwrap() as usize)
} else if n.is_f64() {
MpvDataType::Double(n.as_f64().unwrap())
} else {
return Err(Error(ErrorCode::JsonContainsUnexptectedType));
}
}
Value::Object(ref m) => MpvDataType::HashMap(json_map_to_hashmap(m)),
Value::Null => MpvDataType::Null,
};
try_convert_property(name.as_ref(), id, data)
}
"client-message" => {
let args = match e["args"] {
Value::Array(ref a) => json_array_to_vec(a)
.iter()
.map(|arg| match arg {
MpvDataType::String(s) => Ok(s.to_owned()),
_ => Err(Error(ErrorCode::JsonContainsUnexptectedType)),
})
.collect::<Result<Vec<_>, _>>(),
_ => return Err(Error(ErrorCode::JsonContainsUnexptectedType)),
}?;
Event::ClientMessage { args }
}
_ => Event::Unimplemented,
};
Ok(event)
}
pub fn listen_raw(instance: &mut Mpv) -> String {
let mut response = String::new();
instance.reader.read_line(&mut response).unwrap();
response.trim_end().to_string()
}
fn send_command_sync(instance: &Mpv, command: Value) -> String {
let stream = &instance.stream;
match serde_json::to_writer(stream, &command) {
Err(why) => panic!("Error: Could not write to socket: {}", why),
Ok(_) => {
let mut stream = stream;
stream.write_all(b"\n").unwrap();
let mut response = String::new();
{
let mut reader = BufReader::new(stream);
while !response.contains("\"error\":") {
response.clear();
reader.read_line(&mut response).unwrap();
}
}
debug!("Response: {}", response.trim_end());
response
}
}
match &result {
Ok(v) => log::trace!("Successfully parsed mpv response data: {:?}", v),
Err(e) => log::trace!("Error parsing mpv response data: {:?}", e),
}
result.map(|v| v.clone())
}

View File

@ -9,25 +9,11 @@ pub trait TypeHandler: Sized {
fn as_string(&self) -> String;
}
pub(crate) fn extract_mpv_response_data(value: &Value) -> Result<&Value, Error> {
value
.as_object()
.map(|o| (o.get("error").and_then(|e| e.as_str()), o.get("data")))
.ok_or(Error(ErrorCode::UnexpectedValue))
.and_then(|(error, data)| match error {
Some("success") => data.ok_or(Error(ErrorCode::UnexpectedValue)),
Some(e) => Err(Error(ErrorCode::MpvError(e.to_string()))),
None => Err(Error(ErrorCode::UnexpectedValue)),
})
}
impl TypeHandler for String {
fn get_value(value: Value) -> Result<String, Error> {
extract_mpv_response_data(&value)
.and_then(|d| {
d.as_str()
value
.as_str()
.ok_or(Error(ErrorCode::ValueDoesNotContainString))
})
.map(|s| s.to_string())
}
@ -38,8 +24,7 @@ impl TypeHandler for String {
impl TypeHandler for bool {
fn get_value(value: Value) -> Result<bool, Error> {
extract_mpv_response_data(&value)
.and_then(|d| d.as_bool().ok_or(Error(ErrorCode::ValueDoesNotContainBool)))
value.as_bool().ok_or(Error(ErrorCode::ValueDoesNotContainBool))
}
fn as_string(&self) -> String {
@ -53,8 +38,7 @@ impl TypeHandler for bool {
impl TypeHandler for f64 {
fn get_value(value: Value) -> Result<f64, Error> {
extract_mpv_response_data(&value)
.and_then(|d| d.as_f64().ok_or(Error(ErrorCode::ValueDoesNotContainF64)))
value.as_f64().ok_or(Error(ErrorCode::ValueDoesNotContainF64))
}
fn as_string(&self) -> String {
@ -64,9 +48,9 @@ impl TypeHandler for f64 {
impl TypeHandler for usize {
fn get_value(value: Value) -> Result<usize, Error> {
extract_mpv_response_data(&value)
.and_then(|d| d.as_u64().ok_or(Error(ErrorCode::ValueDoesNotContainUsize)))
value.as_u64()
.map(|u| u as usize)
.ok_or(Error(ErrorCode::ValueDoesNotContainUsize))
}
fn as_string(&self) -> String {
@ -76,11 +60,8 @@ impl TypeHandler for usize {
impl TypeHandler for HashMap<String, MpvDataType> {
fn get_value(value: Value) -> Result<HashMap<String, MpvDataType>, Error> {
extract_mpv_response_data(&value)
.and_then(|d| {
d.as_object()
value.as_object()
.ok_or(Error(ErrorCode::ValueDoesNotContainHashMap))
})
.map(json_map_to_hashmap)
}
@ -91,11 +72,8 @@ impl TypeHandler for HashMap<String, MpvDataType> {
impl TypeHandler for Vec<PlaylistEntry> {
fn get_value(value: Value) -> Result<Vec<PlaylistEntry>, Error> {
extract_mpv_response_data(&value)
.and_then(|d| {
d.as_array()
value.as_array()
.ok_or(Error(ErrorCode::ValueDoesNotContainPlaylist))
})
.map(json_array_to_playlist)
}

225
tests/get_property.rs Normal file
View File

@ -0,0 +1,225 @@
use std::{panic, time::Duration};
use futures::{stream::FuturesUnordered, SinkExt, StreamExt};
use mpvipc::{Error, ErrorCode, Mpv, Playlist, PlaylistEntry};
use serde_json::{json, Value};
use test_log::test;
use tokio::{net::UnixStream, task::JoinHandle};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
fn test_socket(answers: Vec<String>) -> (UnixStream, JoinHandle<Result<(), LinesCodecError>>) {
let (socket, server) = UnixStream::pair().unwrap();
let join_handle = tokio::spawn(async move {
let mut framed = Framed::new(socket, LinesCodec::new());
for answer in answers {
framed.next().await;
framed.send(answer).await?;
}
Ok(())
});
(server, join_handle)
}
#[test(tokio::test)]
async fn test_get_property_successful() {
let (server, join_handle) = test_socket(vec![
json!({ "data": 100.0, "request_id": 0, "error": "success" }).to_string(),
]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let volume: f64 = mpv.get_property("volume").await.unwrap();
assert_eq!(volume, 100.0);
join_handle.await.unwrap().unwrap();
}
#[test(tokio::test)]
async fn test_get_property_broken_pipe() {
let (server, join_handle) = test_socket(vec![]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let maybe_volume = mpv.get_property::<f64>("volume").await;
assert_eq!(
maybe_volume,
Err(Error(ErrorCode::ConnectError(
"Broken pipe (os error 32)".to_owned()
)))
);
join_handle.await.unwrap().unwrap();
}
#[test(tokio::test)]
async fn test_get_property_wrong_type() {
let (server, join_handle) = test_socket(vec![
json!({ "data": 100.0, "request_id": 0, "error": "success" }).to_string(),
]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let maybe_volume = mpv.get_property::<bool>("volume").await;
assert_eq!(maybe_volume, Err(Error(ErrorCode::ValueDoesNotContainBool)));
join_handle.await.unwrap().unwrap();
}
#[test(tokio::test)]
async fn test_get_property_error() {
let (server, join_handle) = test_socket(vec![
json!({ "error": "property unavailable", "request_id": 0 }).to_string(),
]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let maybe_volume = mpv.get_property::<f64>("volume").await;
assert_eq!(
maybe_volume,
Err(Error(ErrorCode::MpvError(
"property unavailable".to_owned()
)))
);
join_handle.await.unwrap().unwrap();
}
#[test(tokio::test)]
async fn test_get_property_simultaneous_requests() {
let (socket, server) = UnixStream::pair().unwrap();
let mpv_handle: JoinHandle<Result<(), LinesCodecError>> = tokio::spawn(async move {
let mut framed = Framed::new(socket, LinesCodec::new());
while let Some(request) = framed.next().await {
match serde_json::from_str::<Value>(&request.unwrap()) {
Ok(json) => {
let property = json["command"][1].as_str().unwrap();
log::info!("Received request for property: {:?}", property);
match property {
"volume" => {
let response =
json!({ "data": 100.0, "request_id": 0, "error": "success" })
.to_string();
framed.send(response).await.unwrap();
}
"pause" => {
let response =
json!({ "data": true, "request_id": 0, "error": "success" })
.to_string();
framed.send(response).await.unwrap();
}
_ => {
let response =
json!({ "error": "property unavailable", "request_id": 0 })
.to_string();
framed.send(response).await.unwrap();
}
}
}
Err(_) => {}
}
}
Ok(())
});
let mpv = Mpv::connect_socket(server).await.unwrap();
let mpv_clone_1 = mpv.clone();
let mpv_poller_1 = tokio::spawn(async move {
loop {
let volume: f64 = mpv_clone_1.get_property("volume").await.unwrap();
assert_eq!(volume, 100.0);
}
});
let mpv_clone_2 = mpv.clone();
let mpv_poller_2 = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(1)).await;
let paused: bool = mpv_clone_2.get_property("pause").await.unwrap();
assert_eq!(paused, true);
}
});
let mpv_clone_3 = mpv.clone();
let mpv_poller_3 = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(2)).await;
let maybe_volume = mpv_clone_3.get_property::<f64>("non_existent").await;
assert_eq!(
maybe_volume,
Err(Error(ErrorCode::MpvError(
"property unavailable".to_owned()
)))
);
}
});
let mut tasks = FuturesUnordered::new();
tasks.push(mpv_handle);
tasks.push(mpv_poller_1);
tasks.push(mpv_poller_2);
tasks.push(mpv_poller_3);
if tokio::time::timeout(Duration::from_millis(200), tasks.next())
.await
.is_ok()
{
panic!("One of the pollers quit unexpectedly");
};
}
#[test(tokio::test)]
async fn test_get_playlist() {
let expected = Playlist(vec![
PlaylistEntry {
id: 0,
filename: "file1".to_string(),
title: "title1".to_string(),
current: false,
},
PlaylistEntry {
id: 1,
filename: "file2".to_string(),
title: "title2".to_string(),
current: true,
},
PlaylistEntry {
id: 2,
filename: "file3".to_string(),
title: "title3".to_string(),
current: false,
},
]);
let (server, join_handle) = test_socket(vec![json!({
"data": expected.0.iter().map(|entry| {
json!({
"filename": entry.filename,
"title": entry.title,
"current": entry.current
})
}).collect::<Vec<Value>>(),
"request_id": 0,
"error": "success"
})
.to_string()]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let playlist = mpv.get_playlist().await.unwrap();
assert_eq!(playlist, expected);
join_handle.await.unwrap().unwrap();
}
#[test(tokio::test)]
async fn test_get_playlist_empty() {
let (server, join_handle) = test_socket(vec![
json!({ "data": [], "request_id": 0, "error": "success" }).to_string(),
]);
let mpv = Mpv::connect_socket(server).await.unwrap();
let playlist = mpv.get_playlist().await.unwrap();
assert_eq!(playlist, Playlist(vec![]));
join_handle.await.unwrap().unwrap();
}