4 Commits

29 changed files with 370 additions and 555 deletions

View File

@@ -1,2 +0,0 @@
[registries]
pvv-git = { index = "sparse+https://git.pvv.ntnu.no/api/packages/Grzegorz/cargo/" }

View File

@@ -7,9 +7,9 @@ on:
jobs: jobs:
build: build:
runs-on: debian-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v6 - uses: actions/checkout@v4
- name: Install rust toolchain - name: Install rust toolchain
uses: dtolnay/rust-toolchain@stable uses: dtolnay/rust-toolchain@stable
@@ -18,9 +18,9 @@ jobs:
run: cargo build --all-features --verbose --release run: cargo build --all-features --verbose --release
check: check:
runs-on: debian-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v6 - uses: actions/checkout@v4
- name: Install rust toolchain - name: Install rust toolchain
uses: dtolnay/rust-toolchain@stable uses: dtolnay/rust-toolchain@stable
@@ -34,9 +34,9 @@ jobs:
run: cargo clippy --all-features -- --deny warnings run: cargo clippy --all-features -- --deny warnings
test: test:
runs-on: debian-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v6 - uses: actions/checkout@v4
- name: Install cargo binstall - name: Install cargo binstall
uses: cargo-bins/cargo-binstall@main uses: cargo-bins/cargo-binstall@main
@@ -88,13 +88,13 @@ jobs:
target: ${{ gitea.ref_name }}/coverage/ target: ${{ gitea.ref_name }}/coverage/
username: gitea-web username: gitea-web
ssh-key: ${{ secrets.WEB_SYNC_SSH_KEY }} ssh-key: ${{ secrets.WEB_SYNC_SSH_KEY }}
host: pages.pvv.ntnu.no host: bekkalokk.pvv.ntnu.no
known-hosts: "pages.pvv.ntnu.no ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIH2QjfFB+city1SYqltkVqWACfo1j37k+oQQfj13mtgg" known-hosts: "bekkalokk.pvv.ntnu.no ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIEI6VSaDrMG8+flg4/AeHlAFIen8RUzWh6URQKqFegSx"
docs: docs:
runs-on: debian-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v6 - uses: actions/checkout@v4
- name: Install rust toolchain - name: Install rust toolchain
uses: dtolnay/rust-toolchain@stable uses: dtolnay/rust-toolchain@stable
@@ -109,5 +109,5 @@ jobs:
target: ${{ gitea.ref_name }}/docs/ target: ${{ gitea.ref_name }}/docs/
username: gitea-web username: gitea-web
ssh-key: ${{ secrets.WEB_SYNC_SSH_KEY }} ssh-key: ${{ secrets.WEB_SYNC_SSH_KEY }}
host: pages.pvv.ntnu.no host: bekkalokk.pvv.ntnu.no
known-hosts: "pages.pvv.ntnu.no ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIH2QjfFB+city1SYqltkVqWACfo1j37k+oQQfj13mtgg" known-hosts: "bekkalokk.pvv.ntnu.no ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIEI6VSaDrMG8+flg4/AeHlAFIen8RUzWh6URQKqFegSx"

3
.gitignore vendored
View File

@@ -1,2 +1,5 @@
target target
Cargo.lock Cargo.lock
test_assets/*
!test_assets/.gitkeep

View File

@@ -1,7 +1,7 @@
GNU GENERAL PUBLIC LICENSE GNU GENERAL PUBLIC LICENSE
Version 3, 29 June 2007 Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/> Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed. of this license document, but changing it is not allowed.
@@ -645,7 +645,7 @@ the "copyright" line and a pointer to where the full notice is found.
GNU General Public License for more details. GNU General Public License for more details.
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
Also add information on how to contact you by electronic and paper mail. Also add information on how to contact you by electronic and paper mail.
@@ -664,11 +664,12 @@ might be different; for a GUI interface, you would use an "about box".
You should also get your employer (if you work as a programmer) or school, You should also get your employer (if you work as a programmer) or school,
if any, to sign a "copyright disclaimer" for the program, if necessary. if any, to sign a "copyright disclaimer" for the program, if necessary.
For more information on this, and how to apply and follow the GNU GPL, see For more information on this, and how to apply and follow the GNU GPL, see
<https://www.gnu.org/licenses/>. <http://www.gnu.org/licenses/>.
The GNU General Public License does not permit incorporating your program The GNU General Public License does not permit incorporating your program
into proprietary programs. If your program is a subroutine library, you into proprietary programs. If your program is a subroutine library, you
may consider it more useful to permit linking proprietary applications with may consider it more useful to permit linking proprietary applications with
the library. If this is what you want to do, use the GNU Lesser General the library. If this is what you want to do, use the GNU Lesser General
Public License instead of this License. But first, please read Public License instead of this License. But first, please read
<https://www.gnu.org/licenses/why-not-lgpl.html>. <http://www.gnu.org/philosophy/why-not-lgpl.html>.

View File

@@ -1,32 +1,32 @@
[package] [package]
name = "mpvipc-async" name = "mpvipc-async"
version = "0.2.0" version = "0.1.0"
authors = [ authors = [
"Jonas Frei <freijon@pm.me>", "Jonas Frei <freijon@pm.me>",
"Øystein Tveit <oysteikt@pvv.ntnu.no>" "Øystein Tveit <oysteikt@pvv.ntnu.no>"
] ]
description = "A small library which provides bindings to control existing mpv instances through sockets." description = "A small library which provides bindings to control existing mpv instances through sockets."
license = "GPL-3.0" license = "GPL-3.0"
repository = "https://git.pvv.ntnu.no/Grzegorz/mpvipc-async" repository = "https://git.pvv.ntnu.no/Projects/mpvipc-async"
documentation = "https://pages.pvv.ntnu.no/Grzegorz/mpvipc-async/main/docs/mpvipc_async/" documentation = "https://pages.pvv.ntnu.no/Projects/mpvipc-async/main/docs/mpvipc_async/"
edition = "2024" edition = "2021"
rust-version = "1.85.0" rust-version = "1.75"
[dependencies] [dependencies]
serde_json = "1.0.148" serde_json = "1.0.104"
log = "0.4.29" log = "0.4.19"
serde = { version = "1.0.228", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
tokio = { version = "1.48.0", features = ["sync", "macros", "rt", "net"] } tokio = { version = "1.37.0", features = ["sync", "macros", "rt", "net"] }
tokio-util = { version = "0.7.17", features = ["codec"] } tokio-util = { version = "0.7.10", features = ["codec"] }
futures = "0.3.31" futures = "0.3.30"
tokio-stream = { version = "0.1.17", features = ["sync"] } tokio-stream = { version = "0.1.15", features = ["sync"] }
thiserror = "2.0.17" thiserror = "1.0.59"
[dev-dependencies] [dev-dependencies]
env_logger = "0.11.8" env_logger = "0.10.0"
test-log = "0.2.19" test-log = "0.2.15"
tokio = { version = "1.48.0", features = ["rt-multi-thread", "time", "process"] } tokio = { version = "1.37.0", features = ["rt-multi-thread", "time", "process"] }
uuid = { version = "1.19.0", features = ["v4"] } uuid = { version = "1.8.0", features = ["v4"] }
[lib] [lib]
doctest = false doctest = false

View File

@@ -1,11 +1,10 @@
[![Coverage](https://pages.pvv.ntnu.no/Grzegorz/mpvipc-async/main/coverage/badges/for_the_badge.svg)](https://pages.pvv.ntnu.no/Grzegorz/mpvipc-async/main/coverage/src/) [![Coverage](https://pages.pvv.ntnu.no/Projects/mpvipc-async/main/coverage/badges/for_the_badge.svg)](https://pages.pvv.ntnu.no/Projects/mpvipc-async/main/coverage/src/)
[![Docs](https://img.shields.io/badge/docs-blue?style=for-the-badge&logo=rust)](https://pages.pvv.ntnu.no/Grzegorz/mpvipc-async/main/docs/mpvipc_async/) [![Docs](https://img.shields.io/badge/docs-blue?style=for-the-badge&logo=rust)](https://pages.pvv.ntnu.no/Projects/mpvipc-async/main/docs/mpvipc_async/)
# mpvipc-async # mpvipc-async
> [!NOTE] > **NOTE:** This is a fork of [gitlab.com/mpv-ipc/mpvipc](https://gitlab.com/mpv-ipc/mpvipc), which introduces a lot of changes to be able to use the library asynchronously with [tokio](https://github.com/tokio-rs/tokio).
> This is a fork of [gitlab.com/mpv-ipc/mpvipc](https://gitlab.com/mpv-ipc/mpvipc).
> The fork adds support for use in asynchronous contexts.
A small library which provides bindings to control existing mpv instances through sockets. A small library which provides bindings to control existing mpv instances through sockets.
@@ -35,5 +34,3 @@ async fn main() -> Result<(), MpvError> {
mpv.set_property("pause", !paused).await.expect("Error pausing"); mpv.set_property("pause", !paused).await.expect("Error pausing");
} }
``` ```
[You can find more examples in the `examples` directory](./examples)

View File

@@ -1,5 +1,5 @@
use futures::StreamExt; use futures::StreamExt;
use mpvipc_async::{Event, Mpv, MpvDataType, MpvError, MpvExt, Property, parse_property}; use mpvipc_async::{parse_property, Event, Mpv, MpvDataType, MpvError, MpvExt, Property};
fn seconds_to_hms(total: f64) -> String { fn seconds_to_hms(total: f64) -> String {
let total = total as u64; let total = total as u64;

54
flake.lock generated
View File

@@ -1,12 +1,33 @@
{ {
"nodes": { "nodes": {
"fenix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1713421495,
"narHash": "sha256-5vVF9W1tJT+WdfpWAEG76KywktKDAW/71mVmNHEHjac=",
"owner": "nix-community",
"repo": "fenix",
"rev": "fd47b1f9404fae02a4f38bd9f4b12bad7833c96b",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "fenix",
"type": "github"
}
},
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1767116409, "lastModified": 1713248628,
"narHash": "sha256-5vKw92l1GyTnjoLzEagJy5V5mDFck72LiQWZSOnSicw=", "narHash": "sha256-NLznXB5AOnniUtZsyy/aPWOk8ussTuePp2acb9U+ISA=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "cad22e7d996aea55ecab064e84834289143e44a0", "rev": "5672bc9dbf9d88246ddab5ac454e82318d094bb8",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -18,27 +39,24 @@
}, },
"root": { "root": {
"inputs": { "inputs": {
"nixpkgs": "nixpkgs", "fenix": "fenix",
"rust-overlay": "rust-overlay" "nixpkgs": "nixpkgs"
} }
}, },
"rust-overlay": { "rust-analyzer-src": {
"inputs": { "flake": false,
"nixpkgs": [
"nixpkgs"
]
},
"locked": { "locked": {
"lastModified": 1767322002, "lastModified": 1713373173,
"narHash": "sha256-yHKXXw2OWfIFsyTjduB4EyFwR0SYYF0hK8xI9z4NIn0=", "narHash": "sha256-octd9BFY9G/Gbr4KfwK4itZp4Lx+qvJeRRcYnN+dEH8=",
"owner": "oxalica", "owner": "rust-lang",
"repo": "rust-overlay", "repo": "rust-analyzer",
"rev": "03c6e38661c02a27ca006a284813afdc461e9f7e", "rev": "46702ffc1a02a2ac153f1d1ce619ec917af8f3a6",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "oxalica", "owner": "rust-lang",
"repo": "rust-overlay", "ref": "nightly",
"repo": "rust-analyzer",
"type": "github" "type": "github"
} }
} }

View File

@@ -1,12 +1,11 @@
{ {
inputs = { inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
fenix.url = "github:nix-community/fenix";
rust-overlay.url = "github:oxalica/rust-overlay"; fenix.inputs.nixpkgs.follows = "nixpkgs";
rust-overlay.inputs.nixpkgs.follows = "nixpkgs";
}; };
outputs = { self, nixpkgs, rust-overlay }@inputs: outputs = { self, nixpkgs, fenix }@inputs:
let let
systems = [ systems = [
"x86_64-linux" "x86_64-linux"
@@ -15,29 +14,26 @@
"aarch64-darwin" "aarch64-darwin"
]; ];
forAllSystems = f: nixpkgs.lib.genAttrs systems (system: let forAllSystems = f: nixpkgs.lib.genAttrs systems (system: let
toolchain = fenix.packages.${system}.complete;
pkgs = import nixpkgs { pkgs = import nixpkgs {
inherit system; inherit system;
overlays = [ overlays = [
(import rust-overlay) (_: super: let pkgs = fenix.inputs.nixpkgs.legacyPackages.${system}; in fenix.overlays.default pkgs pkgs)
]; ];
}; };
rust-bin = rust-overlay.lib.mkRustBin { } pkgs.buildPackages;
toolchain = rust-bin.stable.latest.default.override {
extensions = [ "rust-src" ];
};
in f system pkgs toolchain); in f system pkgs toolchain);
in { in {
devShell = forAllSystems (system: pkgs: toolchain: pkgs.mkShell { devShell = forAllSystems (system: pkgs: toolchain: pkgs.mkShell {
packages = with pkgs; [ packages = [
toolchain (toolchain.withComponents [
mpv "cargo" "rustc" "rustfmt" "clippy" "llvm-tools"
grcov ])
cargo-nextest pkgs.mpv
cargo-edit pkgs.grcov
pkgs.cargo-nextest
pkgs.ffmpeg
]; ];
RUST_SRC_PATH = "${toolchain.rust-src}/lib/rustlib/src/rust/library";
env.RUST_SRC_PATH = "${toolchain}/lib/rustlib/src/rust/library";
}); });
}; };
} }

View File

@@ -1 +0,0 @@
style_edition = "2024"

21
setup_test_assets.sh Executable file
View File

@@ -0,0 +1,21 @@
#!/usr/bin/env bash
set -euo pipefail
REQUIRED_COMMANDS=(
"git"
"ffmpeg"
)
for cmd in "${REQUIRED_COMMANDS[@]}"; do
if ! command -v "$cmd" &> /dev/null; then
echo "Command '$cmd' not found. Please install it and try again."
exit 1
fi
done
ROOT_DIR=$(git rev-parse --show-toplevel)
# Generate 30 seconds of 480p video with black background
ffmpeg -f lavfi -i color=c=black:s=640x480:d=30 -c:v libx264 -t 30 -pix_fmt yuv420p "$ROOT_DIR/test_assets/black-background-30s-480p.mp4"

View File

@@ -10,9 +10,9 @@ use tokio::{
}; };
use crate::{ use crate::{
Event, MpvError,
ipc::{MpvIpc, MpvIpcCommand, MpvIpcEvent, MpvIpcResponse}, ipc::{MpvIpc, MpvIpcCommand, MpvIpcEvent, MpvIpcResponse},
message_parser::TypeHandler, message_parser::TypeHandler,
Event, MpvError,
}; };
/// All possible commands that can be sent to mpv. /// All possible commands that can be sent to mpv.
@@ -95,7 +95,6 @@ pub(crate) trait IntoRawCommandPart {
/// Generic data type representing all possible data types that mpv can return. /// Generic data type representing all possible data types that mpv can return.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum MpvDataType { pub enum MpvDataType {
Array(Vec<MpvDataType>), Array(Vec<MpvDataType>),
Bool(bool), Bool(bool),
@@ -162,7 +161,7 @@ pub trait GetPropertyTypeHandler: Sized {
// TODO: fix this // TODO: fix this
#[allow(async_fn_in_trait)] #[allow(async_fn_in_trait)]
async fn get_property_generic(instance: &Mpv, property: &str) async fn get_property_generic(instance: &Mpv, property: &str)
-> Result<Option<Self>, MpvError>; -> Result<Option<Self>, MpvError>;
} }
impl<T> GetPropertyTypeHandler for T impl<T> GetPropertyTypeHandler for T
@@ -185,7 +184,7 @@ pub trait SetPropertyTypeHandler<T> {
// TODO: fix this // TODO: fix this
#[allow(async_fn_in_trait)] #[allow(async_fn_in_trait)]
async fn set_property_generic(instance: &Mpv, property: &str, value: T) async fn set_property_generic(instance: &Mpv, property: &str, value: T)
-> Result<(), MpvError>; -> Result<(), MpvError>;
} }
impl<T> SetPropertyTypeHandler<T> for T impl<T> SetPropertyTypeHandler<T> for T

View File

@@ -23,9 +23,7 @@ pub enum MpvError {
#[error("JsonParseError: {0}")] #[error("JsonParseError: {0}")]
JsonParseError(#[from] serde_json::Error), JsonParseError(#[from] serde_json::Error),
#[error( #[error("Mpv sent a value with an unexpected type:\nExpected {expected_type}, received {received:#?}")]
"Mpv sent a value with an unexpected type:\nExpected {expected_type}, received {received:#?}"
)]
ValueContainsUnexpectedType { ValueContainsUnexpectedType {
expected_type: String, expected_type: String,
received: Value, received: Value,

View File

@@ -5,39 +5,17 @@ use std::str::FromStr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::{Map, Value}; use serde_json::{Map, Value};
use crate::{MpvDataType, MpvError, ipc::MpvIpcEvent, message_parser::json_to_value}; use crate::{ipc::MpvIpcEvent, message_parser::json_to_value, MpvDataType, MpvError};
/// Reason behind the `MPV_EVENT_END_FILE` event.
///
/// Ref: <https://mpv.io/manual/stable/#command-interface-mpv-event-end-file>
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")] #[serde(rename_all = "kebab-case")]
pub enum EventEndFileReason { pub enum EventEndFileReason {
/// The file has ended. This can (but doesn't have to) include
/// incomplete files or broken network connections under circumstances.
Eof, Eof,
/// Playback was ended by a command.
Stop, Stop,
/// Playback was ended by sending the quit command.
Quit, Quit,
/// An error happened. In this case, an `error` field is present with the error string.
Error, Error,
/// Happens with playlists and similar. For details, see
/// [`MPV_END_FILE_REASON_REDIRECT`](https://github.com/mpv-player/mpv/blob/72efbfd009a2b3259055133d74b88c81b1115ae1/include/mpv/client.h#L1493)
/// in the C API.
Redirect, Redirect,
/// Unknown. Normally doesn't happen, unless the Lua API is out of sync
/// with the C API. (Likewise, it could happen that your script gets reason
/// strings that did not exist yet at the time your script was written.)
Unknown, Unknown,
/// A catch-all enum variant in case `mpvipc-async` has not implemented the
/// returned error yet.
Unimplemented(String), Unimplemented(String),
} }
@@ -56,11 +34,6 @@ impl FromStr for EventEndFileReason {
} }
} }
/// The log level of a log message event.
///
/// Ref:
/// - <https://mpv.io/manual/stable/#command-interface-mpv-event-log-message>
/// - <https://mpv.io/manual/stable/#mp-msg-functions>
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")] #[serde(rename_all = "kebab-case")]
pub enum EventLogMessageLevel { pub enum EventLogMessageLevel {
@@ -71,9 +44,6 @@ pub enum EventLogMessageLevel {
Verbose, Verbose,
Debug, Debug,
Trace, Trace,
/// A catch-all enum variant in case `mpvipc-async` has not implemented the
/// returned log-level yet.
Unimplemented(String), Unimplemented(String),
} }
@@ -139,7 +109,7 @@ pub enum Event {
VideoReconfig, VideoReconfig,
AudioReconfig, AudioReconfig,
PropertyChange { PropertyChange {
id: Option<u64>, id: u64,
name: String, name: String,
data: Option<MpvDataType>, data: Option<MpvDataType>,
}, },
@@ -194,16 +164,16 @@ macro_rules! get_key_as {
macro_rules! get_optional_key_as { macro_rules! get_optional_key_as {
($as_type:ident, $key:expr, $event:ident) => {{ ($as_type:ident, $key:expr, $event:ident) => {{
match $event.get($key) { if let Some(tmp) = $event.get($key) {
Some(Value::Null) => None, Some(
Some(tmp) => Some(
tmp.$as_type() tmp.$as_type()
.ok_or(MpvError::ValueContainsUnexpectedType { .ok_or(MpvError::ValueContainsUnexpectedType {
expected_type: stringify!($as_type).strip_prefix("as_").unwrap().to_owned(), expected_type: stringify!($as_type).strip_prefix("as_").unwrap().to_owned(),
received: tmp.clone(), received: tmp.clone(),
})?, })?,
), )
None => None, } else {
None
} }
}}; }};
} }
@@ -326,7 +296,7 @@ fn parse_client_message(event: &Map<String, Value>) -> Result<Event, MpvError> {
} }
fn parse_property_change(event: &Map<String, Value>) -> Result<Event, MpvError> { fn parse_property_change(event: &Map<String, Value>) -> Result<Event, MpvError> {
let id = get_optional_key_as!(as_u64, "id", event); let id = get_key_as!(as_u64, "id", event);
let property_name = get_key_as!(as_str, "name", event); let property_name = get_key_as!(as_str, "name", event);
let data = event.get("data").map(json_to_value).transpose()?; let data = event.get("data").map(json_to_value).transpose()?;
@@ -336,195 +306,3 @@ fn parse_property_change(event: &Map<String, Value>) -> Result<Event, MpvError>
data, data,
}) })
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::ipc::MpvIpcEvent;
use serde_json::json;
#[test]
fn test_parse_simple_events() {
let simple_events = vec![
(json!({"event": "file-loaded"}), Event::FileLoaded),
(json!({"event": "seek"}), Event::Seek),
(json!({"event": "playback-restart"}), Event::PlaybackRestart),
(json!({"event": "shutdown"}), Event::Shutdown),
(json!({"event": "video-reconfig"}), Event::VideoReconfig),
(json!({"event": "audio-reconfig"}), Event::AudioReconfig),
(json!({"event": "tick"}), Event::Tick),
(json!({"event": "idle"}), Event::Idle),
(json!({"event": "tracks-changed"}), Event::TracksChanged),
(json!({"event": "track-switched"}), Event::TrackSwitched),
(json!({"event": "pause"}), Event::Pause),
(json!({"event": "unpause"}), Event::Unpause),
(json!({"event": "metadata-update"}), Event::MetadataUpdate),
(json!({"event": "chapter-change"}), Event::ChapterChange),
];
for (raw_event_json, expected_event) in simple_events {
let raw_event = MpvIpcEvent(raw_event_json);
let event = parse_event(raw_event).unwrap();
assert_eq!(event, expected_event);
}
}
#[test]
fn test_parse_start_file_event() {
let raw_event = MpvIpcEvent(json!({
"event": "start-file",
"playlist_entry_id": 1
}));
let event = parse_event(raw_event).unwrap();
assert_eq!(
event,
Event::StartFile {
playlist_entry_id: 1
}
);
}
#[test]
fn test_parse_end_file_event() {
let raw_event = MpvIpcEvent(json!({
"event": "end-file",
"reason": "eof",
"playlist_entry_id": 2,
"file_error": null,
"playlist_insert_id": 3,
"playlist_insert_num_entries": 5
}));
let event = parse_event(raw_event).unwrap();
assert_eq!(
event,
Event::EndFile {
reason: EventEndFileReason::Eof,
playlist_entry_id: 2,
file_error: None,
playlist_insert_id: Some(3),
playlist_insert_num_entries: Some(5)
}
);
let raw_event_with_error = MpvIpcEvent(json!({
"event": "end-file",
"reason": "error",
"playlist_entry_id": 4,
"file_error": "File not found",
}));
let event_with_error = parse_event(raw_event_with_error).unwrap();
assert_eq!(
event_with_error,
Event::EndFile {
reason: EventEndFileReason::Error,
playlist_entry_id: 4,
file_error: Some("File not found".to_string()),
playlist_insert_id: None,
playlist_insert_num_entries: None,
}
);
let raw_event_unimplemented = MpvIpcEvent(json!({
"event": "end-file",
"reason": "unknown-reason",
"playlist_entry_id": 5
}));
let event_unimplemented = parse_event(raw_event_unimplemented).unwrap();
assert_eq!(
event_unimplemented,
Event::EndFile {
reason: EventEndFileReason::Unimplemented("unknown-reason".to_string()),
playlist_entry_id: 5,
file_error: None,
playlist_insert_id: None,
playlist_insert_num_entries: None,
}
);
}
#[test]
fn test_parse_log_message_event() {
let raw_event = MpvIpcEvent(json!({
"event": "log-message",
"prefix": "mpv",
"level": "info",
"text": "This is a log message"
}));
let event = parse_event(raw_event).unwrap();
assert_eq!(
event,
Event::LogMessage {
prefix: "mpv".to_string(),
level: EventLogMessageLevel::Info,
text: "This is a log message".to_string(),
}
);
}
#[test]
fn test_parse_hook_event() {
let raw_event = MpvIpcEvent(json!({
"event": "hook",
"hook_id": 42
}));
let event = parse_event(raw_event).unwrap();
assert_eq!(event, Event::Hook { hook_id: 42 });
}
#[test]
fn test_parse_client_message_event() {
let raw_event = MpvIpcEvent(json!({
"event": "client-message",
"args": ["arg1", "arg2", "arg3"]
}));
let event = parse_event(raw_event).unwrap();
assert_eq!(
event,
Event::ClientMessage {
args: vec!["arg1".to_string(), "arg2".to_string(), "arg3".to_string()]
}
);
}
#[test]
fn test_parse_property_change_event() {
let raw_event = MpvIpcEvent(json!({
"event": "property-change",
"id": 1,
"name": "pause",
"data": true
}));
let event = parse_event(raw_event).unwrap();
assert_eq!(
event,
Event::PropertyChange {
id: Some(1),
name: "pause".to_string(),
data: Some(MpvDataType::Bool(true)),
}
);
}
#[test]
fn test_parse_unimplemented_event() {
let raw_event = MpvIpcEvent(json!({
"event": "some-unimplemented-event",
"some_key": "some_value"
}));
let event = parse_event(raw_event).unwrap();
assert_eq!(
event,
Event::Unimplemented(
json!({
"event": "some-unimplemented-event",
"some_key": "some_value"
})
.as_object()
.unwrap()
.to_owned()
)
);
}
}

View File

@@ -1,8 +1,8 @@
//! High-level API extension for [`Mpv`]. //! High-level API extension for [`Mpv`].
use crate::{ use crate::{
IntoRawCommandPart, LoopProperty, Mpv, MpvCommand, MpvDataType, MpvError, Playlist, parse_property, IntoRawCommandPart, LoopProperty, Mpv, MpvCommand, MpvDataType, MpvError,
PlaylistAddOptions, Property, SeekOptions, parse_property, Playlist, PlaylistAddOptions, Property, SeekOptions,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
@@ -323,9 +323,9 @@ impl MpvExt for Mpv {
Switch::Off => "yes", Switch::Off => "yes",
Switch::Toggle => { Switch::Toggle => {
if self.is_playing().await? { if self.is_playing().await? {
"yes"
} else {
"no" "no"
} else {
"yes"
} }
} }
}; };

View File

@@ -1,7 +1,7 @@
//! IPC handling thread/task. Handles communication between [`Mpv`](crate::Mpv) instances and mpv's unix socket //! IPC handling thread/task. Handles communication between [`Mpv`](crate::Mpv) instances and mpv's unix socket
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use serde_json::{Value, json}; use serde_json::{json, Value};
use tokio::{ use tokio::{
net::UnixStream, net::UnixStream,
sync::{broadcast, mpsc, oneshot}, sync::{broadcast, mpsc, oneshot},

View File

@@ -164,7 +164,7 @@ fn json_map_to_playlist_entry(
return Err(MpvError::ValueContainsUnexpectedType { return Err(MpvError::ValueContainsUnexpectedType {
expected_type: "String".to_owned(), expected_type: "String".to_owned(),
received: data.clone(), received: data.clone(),
}); })
} }
None => return Err(MpvError::MissingMpvData), None => return Err(MpvError::MissingMpvData),
}; };
@@ -174,7 +174,7 @@ fn json_map_to_playlist_entry(
return Err(MpvError::ValueContainsUnexpectedType { return Err(MpvError::ValueContainsUnexpectedType {
expected_type: "String".to_owned(), expected_type: "String".to_owned(),
received: data.clone(), received: data.clone(),
}); })
} }
None => None, None => None,
}; };
@@ -184,7 +184,7 @@ fn json_map_to_playlist_entry(
return Err(MpvError::ValueContainsUnexpectedType { return Err(MpvError::ValueContainsUnexpectedType {
expected_type: "bool".to_owned(), expected_type: "bool".to_owned(),
received: data.clone(), received: data.clone(),
}); })
} }
None => false, None => false,
}; };

View File

@@ -46,15 +46,11 @@ pub enum Property {
}, },
} }
/// Loop mode used by mpv for files and playlists.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")] #[serde(rename_all = "kebab-case")]
pub enum LoopProperty { pub enum LoopProperty {
/// Loop N times
N(usize), N(usize),
/// Loop infinitely
Inf, Inf,
/// Disable looping
No, No,
} }
@@ -72,7 +68,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "String".to_owned(), expected_type: "String".to_owned(),
received: data, received: data,
}); })
} }
None => { None => {
return Err(MpvError::MissingMpvData); return Err(MpvError::MissingMpvData);
@@ -87,7 +83,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "bool".to_owned(), expected_type: "bool".to_owned(),
received: data, received: data,
}); })
} }
None => { None => {
return Err(MpvError::MissingMpvData); return Err(MpvError::MissingMpvData);
@@ -103,7 +99,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "f64".to_owned(), expected_type: "f64".to_owned(),
received: data, received: data,
}); })
} }
}; };
Ok(Property::PlaybackTime(playback_time)) Ok(Property::PlaybackTime(playback_time))
@@ -116,7 +112,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "f64".to_owned(), expected_type: "f64".to_owned(),
received: data, received: data,
}); })
} }
}; };
Ok(Property::Duration(duration)) Ok(Property::Duration(duration))
@@ -129,7 +125,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "HashMap".to_owned(), expected_type: "HashMap".to_owned(),
received: data, received: data,
}); })
} }
}; };
Ok(Property::Metadata(metadata)) Ok(Property::Metadata(metadata))
@@ -142,7 +138,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "Array".to_owned(), expected_type: "Array".to_owned(),
received: data, received: data,
}); })
} }
}; };
Ok(Property::Playlist(playlist)) Ok(Property::Playlist(playlist))
@@ -157,7 +153,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "usize or -1".to_owned(), expected_type: "usize or -1".to_owned(),
received: data, received: data,
}); })
} }
}; };
Ok(Property::PlaylistPos(playlist_pos)) Ok(Property::PlaylistPos(playlist_pos))
@@ -214,7 +210,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "f64".to_owned(), expected_type: "f64".to_owned(),
received: data, received: data,
}); })
} }
None => None, None => None,
}; };
@@ -228,7 +224,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "f64".to_owned(), expected_type: "f64".to_owned(),
received: data, received: data,
}); })
} }
None => None, None => None,
}; };
@@ -241,7 +237,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "f64".to_owned(), expected_type: "f64".to_owned(),
received: data, received: data,
}); })
} }
None => { None => {
return Err(MpvError::MissingMpvData); return Err(MpvError::MissingMpvData);
@@ -256,7 +252,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "f64".to_owned(), expected_type: "f64".to_owned(),
received: data, received: data,
}); })
} }
None => { None => {
return Err(MpvError::MissingMpvData); return Err(MpvError::MissingMpvData);
@@ -271,7 +267,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "bool".to_owned(), expected_type: "bool".to_owned(),
received: data, received: data,
}); })
} }
None => { None => {
return Err(MpvError::MissingMpvData); return Err(MpvError::MissingMpvData);
@@ -286,7 +282,7 @@ pub fn parse_property(name: &str, data: Option<MpvDataType>) -> Result<Property,
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "bool".to_owned(), expected_type: "bool".to_owned(),
received: data, received: data,
}); })
} }
None => true, None => true,
}; };
@@ -309,7 +305,7 @@ fn mpv_data_to_playlist_entry(
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "String".to_owned(), expected_type: "String".to_owned(),
received: data.clone(), received: data.clone(),
}); })
} }
None => return Err(MpvError::MissingMpvData), None => return Err(MpvError::MissingMpvData),
}; };
@@ -319,7 +315,7 @@ fn mpv_data_to_playlist_entry(
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "String".to_owned(), expected_type: "String".to_owned(),
received: data.clone(), received: data.clone(),
}); })
} }
None => None, None => None,
}; };
@@ -329,7 +325,7 @@ fn mpv_data_to_playlist_entry(
return Err(MpvError::DataContainsUnexpectedType { return Err(MpvError::DataContainsUnexpectedType {
expected_type: "bool".to_owned(), expected_type: "bool".to_owned(),
received: data.clone(), received: data.clone(),
}); })
} }
None => false, None => false,
}; };

0
test_assets/.gitkeep Normal file
View File

View File

@@ -1,29 +1,133 @@
use test_log::test; use futures::{stream::StreamExt, Stream};
use tokio::time::Duration; use mpvipc_async::{parse_property, Event, Mpv, MpvError, MpvExt, Property};
use thiserror::Error;
use tokio::time::sleep; use tokio::time::sleep;
use tokio::time::{timeout, Duration};
use mpvipc_async::{MpvError, MpvExt, Property}; use test_log::test;
use super::*; use super::*;
const MPV_CHANNEL_ID: u64 = 1337;
#[derive(Error, Debug)]
enum PropertyCheckingThreadError {
#[error("Unexpected property: {0:?}")]
UnexpectedPropertyError(Property),
#[error(transparent)]
MpvError(#[from] MpvError),
}
/// This function will create an ongoing tokio task that collects [`Event::PropertyChange`] events,
/// and parses them into [`Property`]s. It will then run the property through the provided
/// closure, and return an error if the closure returns false.
///
/// The returned cancellation token can be used to stop the task.
fn create_interruptable_event_property_checking_thread<T>(
mut events: impl Stream<Item = Result<Event, MpvError>> + Unpin + Send + 'static,
on_property: T,
) -> (
tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
tokio_util::sync::CancellationToken,
)
where
T: Fn(Property) -> bool + Send + 'static,
{
let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
event = events.next() => {
match event {
Some(Ok(event)) => {
match event {
Event::PropertyChange { id: MPV_CHANNEL_ID, name, data } => {
let property = parse_property(&name, data).unwrap();
if !on_property(property.clone()) {
return Err(PropertyCheckingThreadError::UnexpectedPropertyError(property))
}
}
_ => {
log::trace!("Received unrelated event, ignoring: {:?}", event);
}
}
}
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
_ = cancellation_token_clone.cancelled() => return Ok(()),
}
}
});
(handle, cancellation_token)
}
/// This helper function will gracefully shut down both the event checking thread and the mpv process.
/// It will also return an error if the event checking thread happened to panic, or if it times out
/// The timeout is hardcoded to 500ms.
async fn graceful_shutdown(
cancellation_token: tokio_util::sync::CancellationToken,
handle: tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
mpv: Mpv,
mut proc: tokio::process::Child,
) -> Result<(), MpvError> {
cancellation_token.cancel();
match timeout(Duration::from_millis(500), handle).await {
Ok(Ok(Ok(()))) => {}
Ok(Ok(Err(err))) => match err {
PropertyCheckingThreadError::UnexpectedPropertyError(property) => {
return Err(MpvError::Other(format!(
"Unexpected property: {:?}",
property
)));
}
PropertyCheckingThreadError::MpvError(err) => return Err(err),
},
Ok(Err(_)) => {
return Err(MpvError::InternalConnectionError(
"Event checking thread timed out".to_owned(),
));
}
Err(_) => {
return Err(MpvError::InternalConnectionError(
"Event checking thread panicked".to_owned(),
));
}
}
mpv.kill().await?;
proc.wait().await.map_err(|err| {
MpvError::InternalConnectionError(format!(
"Failed to wait for mpv process to exit: {}",
err
))
})?;
Ok(())
}
/// Test correct parsing of different values of the "pause" property /// Test correct parsing of different values of the "pause" property
#[test(tokio::test)] #[test(tokio::test)]
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
async fn test_highlevel_event_pause() -> Result<(), MpvError> { async fn test_highlevel_event_pause() -> Result<(), MpvError> {
let (proc, mpv) = spawn_headless_mpv().await?; let (proc, mpv) = spawn_mpv(true).await?;
mpv.observe_property(MPV_CHANNEL_ID, "pause").await?; mpv.observe_property(MPV_CHANNEL_ID, "pause").await?;
let (handle, cancellation_token) = create_interruptable_event_property_checking_thread( let events = mpv.get_event_stream().await;
mpv.clone(), let (handle, cancellation_token) =
|property| match property { create_interruptable_event_property_checking_thread(events, |property| match property {
Property::Pause(_) => { Property::Pause(_) => {
log::debug!("{:?}", property); log::debug!("{:?}", property);
true true
} }
_ => false, _ => false,
}, });
);
sleep(Duration::from_millis(5)).await; sleep(Duration::from_millis(5)).await;
mpv.set_property("pause", false).await?; mpv.set_property("pause", false).await?;
@@ -40,19 +144,18 @@ async fn test_highlevel_event_pause() -> Result<(), MpvError> {
#[test(tokio::test)] #[test(tokio::test)]
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
async fn test_highlevel_event_volume() -> Result<(), MpvError> { async fn test_highlevel_event_volume() -> Result<(), MpvError> {
let (proc, mpv) = spawn_headless_mpv().await?; let (proc, mpv) = spawn_mpv(true).await?;
mpv.observe_property(MPV_CHANNEL_ID, "volume").await?; mpv.observe_property(1337, "volume").await?;
let (handle, cancellation_token) = create_interruptable_event_property_checking_thread( let events = mpv.get_event_stream().await;
mpv.clone(), let (handle, cancellation_token) =
|property| match property { create_interruptable_event_property_checking_thread(events, |property| match property {
Property::Volume(_) => { Property::Volume(_) => {
log::trace!("{:?}", property); log::trace!("{:?}", property);
true true
} }
_ => false, _ => false,
}, });
);
sleep(Duration::from_millis(5)).await; sleep(Duration::from_millis(5)).await;
mpv.set_property("volume", 100.0).await?; mpv.set_property("volume", 100.0).await?;
@@ -71,19 +174,18 @@ async fn test_highlevel_event_volume() -> Result<(), MpvError> {
#[test(tokio::test)] #[test(tokio::test)]
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
async fn test_highlevel_event_mute() -> Result<(), MpvError> { async fn test_highlevel_event_mute() -> Result<(), MpvError> {
let (proc, mpv) = spawn_headless_mpv().await?; let (proc, mpv) = spawn_mpv(true).await?;
mpv.observe_property(MPV_CHANNEL_ID, "mute").await?; mpv.observe_property(1337, "mute").await?;
let (handle, cancellation_token) = create_interruptable_event_property_checking_thread( let events = mpv.get_event_stream().await;
mpv.clone(), let (handle, cancellation_token) =
|property| match property { create_interruptable_event_property_checking_thread(events, |property| match property {
Property::Mute(_) => { Property::Mute(_) => {
log::trace!("{:?}", property); log::trace!("{:?}", property);
true true
} }
_ => false, _ => false,
}, });
);
sleep(Duration::from_millis(5)).await; sleep(Duration::from_millis(5)).await;
mpv.set_property("mute", true).await?; mpv.set_property("mute", true).await?;
@@ -100,20 +202,19 @@ async fn test_highlevel_event_mute() -> Result<(), MpvError> {
#[test(tokio::test)] #[test(tokio::test)]
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
async fn test_highlevel_event_duration() -> Result<(), MpvError> { async fn test_highlevel_event_duration() -> Result<(), MpvError> {
let (proc, mpv) = spawn_headless_mpv().await?; let (proc, mpv) = spawn_mpv(true).await?;
mpv.observe_property(MPV_CHANNEL_ID, "duration").await?; mpv.observe_property(1337, "duration").await?;
let (handle, cancellation_token) = create_interruptable_event_property_checking_thread( let events = mpv.get_event_stream().await;
mpv.clone(), let (handle, cancellation_token) =
|property| match property { create_interruptable_event_property_checking_thread(events, |property| match property {
Property::Duration(_) => { Property::Duration(_) => {
log::trace!("{:?}", property); log::trace!("{:?}", property);
true true
} }
_ => false, _ => false,
}, });
);
sleep(Duration::from_millis(5)).await; sleep(Duration::from_millis(5)).await;
mpv.set_property("pause", true).await?; mpv.set_property("pause", true).await?;

View File

@@ -0,0 +1,35 @@
use super::util::{get_test_asset, spawn_mpv};
use mpvipc_async::{
MpvError, MpvExt, PlaylistAddOptions, PlaylistAddTypeOptions, SeekOptions, Switch,
};
#[tokio::test]
#[cfg(target_family = "unix")]
async fn test_seek() -> Result<(), MpvError> {
let (mut proc, mpv) = spawn_mpv(false).await.unwrap();
mpv.playlist_add(
&get_test_asset("black-background-30s-480p.mp4"),
PlaylistAddTypeOptions::File,
PlaylistAddOptions::Append,
)
.await?;
mpv.set_playback(Switch::On).await?;
mpv.set_playback(Switch::Off).await?;
// TODO: wait for property "seekable" to be true
mpv.seek(10.0, SeekOptions::Relative).await?;
let time_pos: f64 = mpv.get_property("time-pos").await?.unwrap();
assert_eq!(time_pos, 10.0);
mpv.seek(5.0, SeekOptions::Relative).await?;
let time_pos: f64 = mpv.get_property("time-pos").await?.unwrap();
assert_eq!(time_pos, 15.0);
mpv.kill().await.unwrap();
proc.kill().await.unwrap();
Ok(())
}

View File

@@ -1,16 +1,11 @@
use std::time::Duration; use mpvipc_async::{MpvError, MpvExt};
use test_log::test;
use tokio::time::sleep;
use mpvipc_async::{MpvError, MpvExt, Property};
use super::*; use super::*;
#[tokio::test] #[tokio::test]
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
async fn test_get_mpv_version() -> Result<(), MpvError> { async fn test_get_mpv_version() -> Result<(), MpvError> {
let (mut proc, mpv) = spawn_headless_mpv().await.unwrap(); let (mut proc, mpv) = spawn_mpv(true).await.unwrap();
let version: String = mpv.get_property("mpv-version").await?.unwrap(); let version: String = mpv.get_property("mpv-version").await?.unwrap();
assert!(version.starts_with("mpv")); assert!(version.starts_with("mpv"));
@@ -23,7 +18,7 @@ async fn test_get_mpv_version() -> Result<(), MpvError> {
#[tokio::test] #[tokio::test]
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
async fn test_set_property() -> Result<(), MpvError> { async fn test_set_property() -> Result<(), MpvError> {
let (mut proc, mpv) = spawn_headless_mpv().await.unwrap(); let (mut proc, mpv) = spawn_mpv(true).await.unwrap();
mpv.set_property("pause", true).await.unwrap(); mpv.set_property("pause", true).await.unwrap();
let paused: bool = mpv.get_property("pause").await?.unwrap(); let paused: bool = mpv.get_property("pause").await?.unwrap();
assert!(paused); assert!(paused);
@@ -37,7 +32,7 @@ async fn test_set_property() -> Result<(), MpvError> {
#[tokio::test] #[tokio::test]
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
async fn test_get_unavailable_property() -> Result<(), MpvError> { async fn test_get_unavailable_property() -> Result<(), MpvError> {
let (mut proc, mpv) = spawn_headless_mpv().await.unwrap(); let (mut proc, mpv) = spawn_mpv(true).await.unwrap();
let time_pos = mpv.get_property::<f64>("time-pos").await; let time_pos = mpv.get_property::<f64>("time-pos").await;
assert_eq!(time_pos, Ok(None)); assert_eq!(time_pos, Ok(None));
@@ -50,7 +45,7 @@ async fn test_get_unavailable_property() -> Result<(), MpvError> {
#[tokio::test] #[tokio::test]
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
async fn test_get_nonexistent_property() -> Result<(), MpvError> { async fn test_get_nonexistent_property() -> Result<(), MpvError> {
let (mut proc, mpv) = spawn_headless_mpv().await.unwrap(); let (mut proc, mpv) = spawn_mpv(true).await.unwrap();
let nonexistent = mpv.get_property::<f64>("nonexistent").await; let nonexistent = mpv.get_property::<f64>("nonexistent").await;
match nonexistent { match nonexistent {
@@ -65,45 +60,3 @@ async fn test_get_nonexistent_property() -> Result<(), MpvError> {
Ok(()) Ok(())
} }
#[test(tokio::test)]
#[cfg(target_family = "unix")]
async fn test_unobserve_property() -> Result<(), MpvError> {
let (proc, mpv) = spawn_headless_mpv().await?;
mpv.observe_property(MPV_CHANNEL_ID, "pause").await?;
let (handle, cancellation_token) = create_interruptable_event_property_checking_thread(
mpv.clone(),
|property| match property {
Property::Pause(_) => {
log::debug!("{:?}", property);
true
}
_ => false,
},
);
sleep(Duration::from_millis(5)).await;
mpv.set_property("pause", true).await?;
sleep(Duration::from_millis(5)).await;
cancellation_token.cancel();
check_property_thread_result(handle).await?;
mpv.unobserve_property(MPV_CHANNEL_ID).await?;
let (handle, cancellation_token) =
create_interruptable_event_property_checking_thread(mpv.clone(), |_property| {
// We should not receive any properties after unobserving
false
});
sleep(Duration::from_millis(5)).await;
mpv.set_property("pause", false).await?;
sleep(Duration::from_millis(5)).await;
graceful_shutdown(cancellation_token, handle, mpv, proc).await?;
Ok(())
}

View File

@@ -1,4 +1,5 @@
mod event_property_parser; mod event_property_parser;
mod highlevel_api;
mod misc; mod misc;
mod util; mod util;

View File

@@ -1,16 +1,40 @@
use std::{path::Path, time::Duration}; use std::{path::Path, time::Duration};
use thiserror::Error; use mpvipc_async::{Mpv, MpvError};
use tokio::{ use tokio::{
process::{Child, Command}, process::{Child, Command},
time::{sleep, timeout}, time::{sleep, timeout},
}; };
use tokio_stream::StreamExt;
use mpvipc_async::{Event, Mpv, MpvError, MpvExt, Property, parse_property}; pub fn assert_test_assets_exist() {
let test_data_dir = Path::new("test_assets");
if !test_data_dir.exists()
|| !test_data_dir.is_dir()
// `.gitkeep` should always be present, so there should be at least 2 entries
|| test_data_dir.read_dir().unwrap().count() <= 1
{
panic!(
"Test assets directory not found at {:?}, please run `./setup_test_assets.sh`",
test_data_dir
);
}
}
#[inline]
pub fn get_test_assets_dir() -> &'static Path {
Path::new("test_assets")
}
pub fn get_test_asset(file_name: &str) -> String {
assert_test_assets_exist();
let test_assets_dir = get_test_assets_dir();
let file_path = test_assets_dir.join(file_name);
file_path.to_str().unwrap().to_string()
}
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
pub async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> { pub async fn spawn_mpv(headless: bool) -> Result<(Child, Mpv), MpvError> {
let socket_path_str = format!("/tmp/mpv-ipc-{}", uuid::Uuid::new_v4()); let socket_path_str = format!("/tmp/mpv-ipc-{}", uuid::Uuid::new_v4());
let socket_path = Path::new(&socket_path_str); let socket_path = Path::new(&socket_path_str);
@@ -18,8 +42,11 @@ pub async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> {
let process_handle = Command::new("mpv") let process_handle = Command::new("mpv")
.arg("--no-config") .arg("--no-config")
.arg("--idle") .arg("--idle")
.arg("--no-video") .args(if headless {
.arg("--no-audio") vec!["--no-video", "--no-audio"]
} else {
vec![]
})
.arg(format!( .arg(format!(
"--input-ipc-server={}", "--input-ipc-server={}",
&socket_path.to_str().unwrap() &socket_path.to_str().unwrap()
@@ -44,107 +71,3 @@ pub async fn spawn_headless_mpv() -> Result<(Child, Mpv), MpvError> {
let mpv = Mpv::connect(socket_path.to_str().unwrap()).await?; let mpv = Mpv::connect(socket_path.to_str().unwrap()).await?;
Ok((process_handle, mpv)) Ok((process_handle, mpv))
} }
/// The channel ID used for property observation in tests
pub const MPV_CHANNEL_ID: u64 = 1337;
#[derive(Error, Debug)]
pub enum PropertyCheckingThreadError {
#[error("Unexpected property: {0:?}")]
UnexpectedPropertyError(Property),
#[error(transparent)]
MpvError(#[from] MpvError),
}
/// This function will create an ongoing tokio task that collects [`Event::PropertyChange`] events,
/// and parses them into [`Property`]s. It will then run the property through the provided
/// closure, and return an error if the closure returns false.
///
/// The returned cancellation token can be used to stop the task.
pub fn create_interruptable_event_property_checking_thread<T>(
mpv: Mpv,
on_property: T,
) -> (
tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
tokio_util::sync::CancellationToken,
)
where
T: Fn(Property) -> bool + Send + 'static,
{
let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();
let handle = tokio::spawn(async move {
let mut events = mpv.get_event_stream().await;
loop {
tokio::select! {
event = events.next() => {
match event {
Some(Ok(event)) => {
match event {
Event::PropertyChange { id: Some(MPV_CHANNEL_ID), name, data } => {
let property = parse_property(&name, data).unwrap();
if !on_property(property.clone()) {
return Err(PropertyCheckingThreadError::UnexpectedPropertyError(property))
}
}
_ => {
log::trace!("Received unrelated event, ignoring: {:?}", event);
}
}
}
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
_ = cancellation_token_clone.cancelled() => return Ok(()),
}
}
});
(handle, cancellation_token)
}
pub async fn check_property_thread_result(
handle: tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
) -> Result<(), MpvError> {
timeout(Duration::from_millis(500), handle)
.await
.map_err(|_| {
MpvError::InternalConnectionError("Event checking thread timed out".to_owned())
})?
.map_err(|_| {
MpvError::InternalConnectionError("Event checking thread panicked".to_owned())
})?
.map_err(|err| match err {
PropertyCheckingThreadError::UnexpectedPropertyError(property) => {
MpvError::Other(format!("Unexpected property: {:?}", property))
}
PropertyCheckingThreadError::MpvError(err) => err,
})
}
/// This helper function will gracefully shut down both the event checking thread and the mpv process.
/// It will also return an error if the event checking thread happened to panic, or if it times out
/// The timeout is hardcoded to 500ms.
pub async fn graceful_shutdown(
cancellation_token: tokio_util::sync::CancellationToken,
handle: tokio::task::JoinHandle<Result<(), PropertyCheckingThreadError>>,
mpv: Mpv,
mut proc: tokio::process::Child,
) -> Result<(), MpvError> {
cancellation_token.cancel();
check_property_thread_result(handle).await?;
mpv.kill().await?;
proc.wait().await.map_err(|err| {
MpvError::InternalConnectionError(format!(
"Failed to wait for mpv process to exit: {}",
err
))
})?;
Ok(())
}

View File

@@ -1,4 +1,4 @@
use futures::{SinkExt, stream::StreamExt}; use futures::{stream::StreamExt, SinkExt};
use mpvipc_async::{Event, Mpv, MpvDataType, MpvExt}; use mpvipc_async::{Event, Mpv, MpvDataType, MpvExt};
use serde_json::json; use serde_json::json;
use test_log::test; use test_log::test;
@@ -52,7 +52,7 @@ async fn test_observe_event_successful() {
assert_eq!( assert_eq!(
event, event,
Event::PropertyChange { Event::PropertyChange {
id: Some(1), id: 1,
name: "volume".to_string(), name: "volume".to_string(),
data: Some(MpvDataType::Double(64.0)) data: Some(MpvDataType::Double(64.0))
} }

View File

@@ -1,8 +1,8 @@
use std::{panic, time::Duration}; use std::{panic, time::Duration};
use futures::{SinkExt, StreamExt, stream::FuturesUnordered}; use futures::{stream::FuturesUnordered, SinkExt, StreamExt};
use mpvipc_async::{Mpv, MpvError, MpvExt, Playlist, PlaylistEntry}; use mpvipc_async::{Mpv, MpvError, MpvExt, Playlist, PlaylistEntry};
use serde_json::{Value, json}; use serde_json::{json, Value};
use test_log::test; use test_log::test;
use tokio::{net::UnixStream, task::JoinHandle}; use tokio::{net::UnixStream, task::JoinHandle};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
@@ -196,20 +196,18 @@ async fn test_get_playlist() -> Result<(), MpvError> {
}, },
]); ]);
let (server, join_handle) = test_socket(vec![ let (server, join_handle) = test_socket(vec![json!({
"data": expected.0.iter().map(|entry| {
json!({ json!({
"data": expected.0.iter().map(|entry| { "filename": entry.filename,
json!({ "title": entry.title,
"filename": entry.filename, "current": entry.current
"title": entry.title,
"current": entry.current
})
}).collect::<Vec<Value>>(),
"request_id": 0,
"error": "success"
}) })
.to_string(), }).collect::<Vec<Value>>(),
]); "request_id": 0,
"error": "success"
})
.to_string()]);
let mpv = Mpv::connect_socket(server).await?; let mpv = Mpv::connect_socket(server).await?;
let playlist = mpv.get_playlist().await?; let playlist = mpv.get_playlist().await?;

View File

@@ -1,8 +1,8 @@
use std::{panic, time::Duration}; use std::{panic, time::Duration};
use futures::{SinkExt, StreamExt, stream::FuturesUnordered}; use futures::{stream::FuturesUnordered, SinkExt, StreamExt};
use mpvipc_async::{Mpv, MpvError}; use mpvipc_async::{Mpv, MpvError};
use serde_json::{Value, json}; use serde_json::{json, Value};
use test_log::test; use test_log::test;
use tokio::{net::UnixStream, task::JoinHandle}; use tokio::{net::UnixStream, task::JoinHandle};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};