Create event stream

This commit is contained in:
Oystein Kristoffer Tveit 2024-04-19 01:36:11 +02:00
parent c039eafa1e
commit aacde5df12
Signed by: oysteikt
GPG Key ID: 9F2F7D8250F35146
4 changed files with 266 additions and 32 deletions

View File

@ -17,6 +17,7 @@ serde = { version = "1.0.197", features = ["derive"] }
tokio = { version = "1.37.0", features = ["sync", "macros", "rt", "net"] } tokio = { version = "1.37.0", features = ["sync", "macros", "rt", "net"] }
tokio-util = { version = "0.7.10", features = ["codec"] } tokio-util = { version = "0.7.10", features = ["codec"] }
futures = "0.3.30" futures = "0.3.30"
tokio-stream = { version = "0.1.15", features = ["sync"] }
[dev-dependencies] [dev-dependencies]
env_logger = "0.10.0" env_logger = "0.10.0"

View File

@ -1,12 +1,16 @@
use futures::StreamExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use std::{ use std::{
collections::HashMap, collections::HashMap,
fmt::{self, Display}, fmt::{self, Display},
}; };
use tokio::{net::UnixStream, sync::oneshot}; use tokio::{
net::UnixStream,
sync::{broadcast, mpsc, oneshot},
};
use crate::ipc::{MpvIpc, MpvIpcCommand, MpvIpcResponse}; use crate::ipc::{MpvIpc, MpvIpcCommand, MpvIpcEvent, MpvIpcResponse};
use crate::message_parser::TypeHandler; use crate::message_parser::TypeHandler;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -288,7 +292,8 @@ impl Display for ErrorCode {
#[derive(Clone)] #[derive(Clone)]
pub struct Mpv { pub struct Mpv {
command_sender: tokio::sync::mpsc::Sender<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>, command_sender: mpsc::Sender<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
broadcast_channel: broadcast::Sender<MpvIpcEvent>,
} }
impl fmt::Debug for Mpv { impl fmt::Debug for Mpv {
@ -310,14 +315,16 @@ impl Mpv {
} }
pub async fn connect_socket(socket: UnixStream) -> Result<Mpv, Error> { pub async fn connect_socket(socket: UnixStream) -> Result<Mpv, Error> {
let (com_tx, com_rx) = tokio::sync::mpsc::channel(100); let (com_tx, com_rx) = mpsc::channel(100);
let ipc = MpvIpc::new(socket, com_rx); let (ev_tx, _) = broadcast::channel(100);
let ipc = MpvIpc::new(socket, com_rx, ev_tx.clone());
log::debug!("Starting IPC handler"); log::debug!("Starting IPC handler");
tokio::spawn(ipc.run()); tokio::spawn(ipc.run());
Ok(Mpv { Ok(Mpv {
command_sender: com_tx, command_sender: com_tx,
broadcast_channel: ev_tx,
}) })
} }
@ -337,9 +344,118 @@ impl Mpv {
} }
} }
// pub fn get_stream_ref(&self) -> &UnixStream { pub async fn get_event_stream(&self) -> impl futures::Stream<Item = Result<Event, Error>> {
// &self.stream tokio_stream::wrappers::BroadcastStream::new(self.broadcast_channel.subscribe())
// } .map(|event| {
match event {
Ok(event) => Mpv::map_event(event),
Err(_) => Err(Error(ErrorCode::ConnectError("Failed to receive event".to_string()))),
}
})
}
fn map_event(raw_event: MpvIpcEvent) -> Result<Event, Error> {
let MpvIpcEvent(event) = raw_event;
event
.as_object()
.ok_or(Error(ErrorCode::JsonContainsUnexptectedType))
.and_then(|event| {
let event_name = event
.get("event")
.ok_or(Error(ErrorCode::MissingValue))?
.as_str()
.ok_or(Error(ErrorCode::ValueDoesNotContainString))?;
match event_name {
"shutdown" => Ok(Event::Shutdown),
"start-file" => Ok(Event::StartFile),
"end-file" => Ok(Event::EndFile),
"file-loaded" => Ok(Event::FileLoaded),
"tracks-changed" => Ok(Event::TracksChanged),
"track-switched" => Ok(Event::TrackSwitched),
"idle" => Ok(Event::Idle),
"pause" => Ok(Event::Pause),
"unpause" => Ok(Event::Unpause),
"tick" => Ok(Event::Tick),
"video-reconfig" => Ok(Event::VideoReconfig),
"audio-reconfig" => Ok(Event::AudioReconfig),
"metadata-update" => Ok(Event::MetadataUpdate),
"seek" => Ok(Event::Seek),
"playback-restart" => Ok(Event::PlaybackRestart),
"property-change" => {
let id = event
.get("id")
.ok_or(Error(ErrorCode::MissingValue))?
.as_u64()
.ok_or(Error(ErrorCode::ValueDoesNotContainUsize))?
as usize;
let property_name = event
.get("name")
.ok_or(Error(ErrorCode::MissingValue))?
.as_str()
.ok_or(Error(ErrorCode::ValueDoesNotContainString))?;
match property_name {
"path" => {
let path = event
.get("data")
.ok_or(Error(ErrorCode::MissingValue))?
.as_str()
.map(|s| s.to_string());
Ok(Event::PropertyChange {
id,
property: Property::Path(path),
})
}
"pause" => {
let pause = event
.get("data")
.ok_or(Error(ErrorCode::MissingValue))?
.as_bool()
.ok_or(Error(ErrorCode::ValueDoesNotContainBool))?;
Ok(Event::PropertyChange {
id,
property: Property::Pause(pause),
})
}
// TODO: missing cases
_ => {
let data = event
.get("data")
.ok_or(Error(ErrorCode::MissingValue))?
.clone();
Ok(Event::PropertyChange {
id,
property: Property::Unknown {
name: property_name.to_string(),
// TODO: fix
data: MpvDataType::Double(data.as_f64().unwrap_or(0.0)),
},
})
}
}
}
"chapter-change" => Ok(Event::ChapterChange),
"client-message" => {
let args = event
.get("args")
.ok_or(Error(ErrorCode::MissingValue))?
.as_array()
.ok_or(Error(ErrorCode::ValueDoesNotContainString))?
.iter()
.map(|arg| {
arg.as_str()
.ok_or(Error(ErrorCode::ValueDoesNotContainString))
.map(|s| s.to_string())
})
.collect::<Result<Vec<String>, Error>>()?;
Ok(Event::ClientMessage { args })
}
_ => Ok(Event::Unimplemented),
}
})
}
pub async fn get_metadata(&self) -> Result<HashMap<String, MpvDataType>, Error> { pub async fn get_metadata(&self) -> Result<HashMap<String, MpvDataType>, Error> {
self.get_property("metadata").await self.get_property("metadata").await
@ -414,9 +530,9 @@ impl Mpv {
)) ))
})?; })?;
match res_rx.await { match res_rx.await {
Ok(MpvIpcResponse(response)) => response.and_then(|value| { Ok(MpvIpcResponse(response)) => {
value.ok_or(Error(ErrorCode::MissingValue)) response.and_then(|value| value.ok_or(Error(ErrorCode::MissingValue)))
}), }
Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))), Err(err) => Err(Error(ErrorCode::ConnectError(err.to_string()))),
} }
} }
@ -607,7 +723,11 @@ impl Mpv {
/// Run a custom command. /// Run a custom command.
/// This should only be used if the desired command is not implemented /// This should only be used if the desired command is not implemented
/// with [MpvCommand]. /// with [MpvCommand].
pub async fn run_command_raw(&self, command: &str, args: &[&str]) -> Result<Option<Value>, Error> { pub async fn run_command_raw(
&self,
command: &str,
args: &[&str],
) -> Result<Option<Value>, Error> {
let command = Vec::from( let command = Vec::from(
[command] [command]
.iter() .iter()

View File

@ -3,14 +3,15 @@ use futures::{SinkExt, StreamExt};
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::mem; use std::mem;
use tokio::net::UnixStream; use tokio::net::UnixStream;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc;
use tokio::sync::{oneshot, Mutex}; use tokio::sync::{broadcast, oneshot, Mutex};
use tokio_util::codec::{Framed, LinesCodec}; use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
pub(crate) struct MpvIpc { pub(crate) struct MpvIpc {
socket: Framed<UnixStream, LinesCodec>, socket: Framed<UnixStream, LinesCodec>,
command_channel: Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>, command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
socket_lock: Mutex<()>, socket_lock: Mutex<()>,
event_channel: broadcast::Sender<MpvIpcEvent>,
} }
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
@ -26,19 +27,24 @@ pub(crate) enum MpvIpcCommand {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct MpvIpcResponse(pub(crate) Result<Option<Value>, Error>); pub(crate) struct MpvIpcResponse(pub(crate) Result<Option<Value>, Error>);
#[derive(Debug, Clone)]
pub(crate) struct MpvIpcEvent(pub(crate) Value);
impl MpvIpc { impl MpvIpc {
pub(crate) fn new( pub(crate) fn new(
socket: UnixStream, socket: UnixStream,
command_channel: Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>, command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
event_channel: broadcast::Sender<MpvIpcEvent>,
) -> Self { ) -> Self {
MpvIpc { MpvIpc {
socket: Framed::new(socket, LinesCodec::new()), socket: Framed::new(socket, LinesCodec::new()),
command_channel, command_channel,
socket_lock: Mutex::new(()), socket_lock: Mutex::new(()),
event_channel,
} }
} }
pub(crate) async fn send_command(&mut self, command: &[&str]) -> Result<Option<Value>, Error> { pub(crate) async fn send_command(&mut self, command: &[Value]) -> Result<Option<Value>, Error> {
let lock = self.socket_lock.lock().await; let lock = self.socket_lock.lock().await;
// START CRITICAL SECTION // START CRITICAL SECTION
let ipc_command = json!({ "command": command }); let ipc_command = json!({ "command": command });
@ -67,11 +73,14 @@ impl MpvIpc {
serde_json::from_str::<Value>(&response) serde_json::from_str::<Value>(&response)
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string()))) .map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))
.and_then(parse_mpv_response_data) .and_then(parse_mpv_response_data)
} }
pub(crate) async fn get_mpv_property(&mut self, property: &str) -> Result<Option<Value>, Error> { pub(crate) async fn get_mpv_property(
self.send_command(&["get_property", property]).await &mut self,
property: &str,
) -> Result<Option<Value>, Error> {
self.send_command(&[json!("get_property"), json!(property)])
.await
} }
pub(crate) async fn set_mpv_property( pub(crate) async fn set_mpv_property(
@ -79,11 +88,11 @@ impl MpvIpc {
property: &str, property: &str,
value: Value, value: Value,
) -> Result<Option<Value>, Error> { ) -> Result<Option<Value>, Error> {
let str_value = match &value { // let str_value = match &value {
Value::String(s) => s, // Value::String(s) => s,
v => &serde_json::to_string(&v).unwrap() // v => &serde_json::to_string(&v).unwrap(),
}; // };
self.send_command(&["set_property", property, &str_value]) self.send_command(&[json!("set_property"), json!(property), value])
.await .await
} }
@ -92,27 +101,52 @@ impl MpvIpc {
id: isize, id: isize,
property: &str, property: &str,
) -> Result<Option<Value>, Error> { ) -> Result<Option<Value>, Error> {
self.send_command(&["observe_property", &id.to_string(), property]) self.send_command(&[json!("observe_property"), json!(id), json!(property)])
.await .await
} }
pub(crate) async fn unobserve_property(&mut self, id: isize) -> Result<Option<Value>, Error> { pub(crate) async fn unobserve_property(&mut self, id: isize) -> Result<Option<Value>, Error> {
self.send_command(&["unobserve_property", &id.to_string()]) self.send_command(&[json!("unobserve_property"), json!(id)])
.await .await
} }
async fn handle_event(&mut self, event: Result<String, LinesCodecError>) {
let parsed_event = event
.as_ref()
.map_err(|why| Error(ErrorCode::ConnectError(why.to_string())))
.and_then(|event| {
serde_json::from_str::<Value>(&event)
.map_err(|why| Error(ErrorCode::JsonParseError(why.to_string())))
});
match parsed_event {
Ok(event) => {
log::trace!("Parsed event: {:?}", event);
if let Err(broadcast::error::SendError(_)) =
self.event_channel.send(MpvIpcEvent(event.to_owned()))
{
log::trace!("Failed to send event to channel, ignoring");
}
}
Err(e) => {
log::trace!("Error parsing event, ignoring:\n {:?}\n {:?}", event, e);
}
}
}
pub(crate) async fn run(mut self) -> Result<(), Error> { pub(crate) async fn run(mut self) -> Result<(), Error> {
loop { loop {
tokio::select! { tokio::select! {
Some(event) = self.socket.next() => { Some(event) = self.socket.next() => {
log::trace!("Handling event: {:?}", serde_json::from_str::<Value>(&event.unwrap()).unwrap()); log::trace!("Got event: {:?}", event);
// TODO: handle event // TODO: error handling
self.handle_event(event).await;
} }
Some((cmd, tx)) = self.command_channel.recv() => { Some((cmd, tx)) = self.command_channel.recv() => {
log::trace!("Handling command: {:?}", cmd); log::trace!("Handling command: {:?}", cmd);
match cmd { match cmd {
MpvIpcCommand::Command(command) => { MpvIpcCommand::Command(command) => {
let refs = command.iter().map(|s| s.as_str()).collect::<Vec<&str>>(); let refs = command.iter().map(|s| json!(s)).collect::<Vec<Value>>();
let response = self.send_command(refs.as_slice()).await; let response = self.send_command(refs.as_slice()).await;
tx.send(MpvIpcResponse(response)).unwrap() tx.send(MpvIpcResponse(response)).unwrap()
} }

79
tests/events.rs Normal file
View File

@ -0,0 +1,79 @@
use std::panic;
use futures::{stream::StreamExt, SinkExt};
use mpvipc::{Mpv, MpvDataType, Property};
use serde_json::json;
use test_log::test;
use tokio::{net::UnixStream, task::JoinHandle};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
use mpvipc::Event;
fn test_socket(
answers: Vec<(bool, String)>,
) -> (UnixStream, JoinHandle<Result<(), LinesCodecError>>) {
let (socket, server) = UnixStream::pair().unwrap();
let join_handle = tokio::spawn(async move {
let mut framed = Framed::new(socket, LinesCodec::new());
for (unsolicited, answer) in answers {
if !unsolicited {
framed.next().await;
}
framed.send(answer).await?;
}
Ok(())
});
(server, join_handle)
}
#[test(tokio::test)]
async fn test_observe_event_successful() {
let (server, join_handle) = test_socket(vec![
(
false,
json!({ "request_id": 0, "error": "success" }).to_string(),
),
(
false,
json!({ "request_id": 0, "error": "success" }).to_string(),
),
(
true,
json!({ "data": 64.0, "event": "property-change", "id": 1, "name": "volume" })
.to_string(),
),
]);
let mpv = Mpv::connect_socket(server).await.unwrap();
mpv.observe_property(1, "volume").await.unwrap();
let mpv2 = mpv.clone();
tokio::spawn(async move {
let event = mpv2.get_event_stream().await.next().await.unwrap().unwrap();
let property = match event {
Event::PropertyChange { id, property } => {
assert_eq!(id, 1);
property
}
err => panic!("{:?}", err),
};
let data = match property {
Property::Unknown { name, data } => {
assert_eq!(name, "volume");
data
}
err => panic!("{:?}", err),
};
match data {
MpvDataType::Double(data) => assert_eq!(data, 64.0),
err => panic!("{:?}", err),
}
});
mpv.set_property("volume", 64.0).await.unwrap();
join_handle.await.unwrap().unwrap();
}