From 8c81d27500c1170589d683297aad85250152597e Mon Sep 17 00:00:00 2001 From: h7x4 Date: Sun, 15 Dec 2024 18:00:28 +0100 Subject: [PATCH] fixup! WIP --- Cargo.lock | 6 +++--- Cargo.toml | 2 +- src/api/websocket_v1.rs | 39 +++++++++++++++++++++++++++++---------- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b8144d..e8d4c93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,9 +382,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.20" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto-common" @@ -1077,7 +1077,7 @@ dependencies = [ [[package]] name = "mpvipc-async" version = "0.1.0" -source = "git+https://git.pvv.ntnu.no/Projects/mpvipc-async.git?rev=v0.1.0#467cac3c503887c4d6371ec5fdf1b23b3e0eb515" +source = "git+https://git.pvv.ntnu.no/Projects/mpvipc-async.git?branch=main#5a74dd0b0251f406636b73043837d133a80a3ec7" dependencies = [ "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index 0565726..a8f2c57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ clap-verbosity-flag = "2.2.2" env_logger = "0.10.0" futures = "0.3.31" log = "0.4.20" -mpvipc-async = { git = "https://git.pvv.ntnu.no/Projects/mpvipc-async.git", rev = "v0.1.0" } +mpvipc-async = { git = "https://git.pvv.ntnu.no/Projects/mpvipc-async.git", branch = "main" } sd-notify = "0.4.3" serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.105" diff --git a/src/api/websocket_v1.rs b/src/api/websocket_v1.rs index 6c83557..f547926 100644 --- a/src/api/websocket_v1.rs +++ b/src/api/websocket_v1.rs @@ -32,7 +32,9 @@ async fn websocket_handler( State(mpv): State, ) -> impl IntoResponse { let mpv = mpv.clone(); - ws.on_upgrade(move |socket| handle_connection(socket, addr, mpv)) + + // TODO: get an id provisioned by the id pool + ws.on_upgrade(move |socket| handle_connection(socket, addr, mpv, 1)) } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -72,6 +74,8 @@ const DEFAULT_PROPERTY_SUBSCRIBES: [&str; 6] = [ "loop-playlist", "pause", "playlist", + // TODO: Although this is nice to see for the user, it might be more wise to use + // "percent-pos" for the internal slider value, as well as the setter API call. "time-pos", "volume", ]; @@ -92,7 +96,7 @@ async fn setup_default_subscribes(mpv: &Mpv) -> anyhow::Result<()> { Ok(()) } -async fn handle_connection(mut socket: WebSocket, addr: SocketAddr, mpv: Mpv) { +async fn handle_connection(mut socket: WebSocket, addr: SocketAddr, mpv: Mpv, channel_id: u64) { // TODO: There is an asynchronous gap between gathering the initial state and subscribing to the properties // This could lead to missing events if they happen in that gap. Send initial state, but also ensure // that there is an additional "initial state" sent upon subscription to all properties to ensure that @@ -111,8 +115,9 @@ async fn handle_connection(mut socket: WebSocket, addr: SocketAddr, mpv: Mpv) { setup_default_subscribes(&mpv).await.unwrap(); - let mut event_stream = mpv.get_event_stream().await; + let connection_loop_mpv = mpv.clone(); let connection_loop = tokio::spawn(async move { + let mut event_stream = connection_loop_mpv.get_event_stream().await; loop { select! { message = socket.recv() => { @@ -148,13 +153,24 @@ async fn handle_connection(mut socket: WebSocket, addr: SocketAddr, mpv: Mpv) { Err(e) => return Err(anyhow::anyhow!("Error parsing message from {:?}: {:?}", addr, e)), }; + log::trace!("Handling command from {:?}: {:?}", addr, message_json); + // TODO: handle errors - if let Ok(Some(response)) = handle_message(message_json, mpv.clone()).await { + match handle_message(message_json, connection_loop_mpv.clone(), channel_id).await { + Ok(Some(response)) => { + log::trace!("Handled command from {:?} successfully, sending response", addr); let message = Message::Text(json!({ "type": "response", "value": response, }).to_string()); socket.send(message).await.unwrap(); + } + Ok(None) => { + log::trace!("Handled command from {:?} successfully", addr); + } + Err(e) => { + log::error!("Error handling message from {:?}: {:?}", addr, e); + } } } event = event_stream.next() => { @@ -181,11 +197,13 @@ async fn handle_connection(mut socket: WebSocket, addr: SocketAddr, mpv: Mpv) { } }); - connection_loop.await.unwrap().unwrap() + connection_loop.await.unwrap().unwrap(); + + mpv.unobserve_property(channel_id).await.unwrap(); } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(tag = "type")] +#[serde(tag = "type", rename_all = "snake_case")] pub enum WSCommand { Subscribe { property: String }, UnsubscribeAll, @@ -204,18 +222,19 @@ pub enum WSCommand { SetLooping { value: bool }, } -async fn handle_message(message: Value, mpv: Mpv) -> anyhow::Result> { +async fn handle_message(message: Value, mpv: Mpv, channel_id: u64) -> anyhow::Result> { let command = serde_json::from_value::(message).context("Failed to parse message")?; + log::trace!("Successfully parsed message: {:?}", command); + match command { WSCommand::Subscribe { property } => { - // TODO: get an id provisioned by the id pool - mpv.observe_property(0, &property).await?; + mpv.observe_property(channel_id, &property).await?; Ok(None) } WSCommand::UnsubscribeAll => { - mpv.unobserve_property(0).await?; + mpv.unobserve_property(channel_id).await?; Ok(None) } WSCommand::Load { url } => {