fixup! WIP

This commit is contained in:
Oystein Kristoffer Tveit 2024-12-15 18:00:28 +01:00
parent 1ff19c6e4e
commit 8c81d27500
Signed by: oysteikt
GPG Key ID: 9F2F7D8250F35146
3 changed files with 33 additions and 14 deletions

6
Cargo.lock generated
View File

@ -382,9 +382,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.20" version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]] [[package]]
name = "crypto-common" name = "crypto-common"
@ -1077,7 +1077,7 @@ dependencies = [
[[package]] [[package]]
name = "mpvipc-async" name = "mpvipc-async"
version = "0.1.0" version = "0.1.0"
source = "git+https://git.pvv.ntnu.no/Projects/mpvipc-async.git?rev=v0.1.0#467cac3c503887c4d6371ec5fdf1b23b3e0eb515" source = "git+https://git.pvv.ntnu.no/Projects/mpvipc-async.git?branch=main#5a74dd0b0251f406636b73043837d133a80a3ec7"
dependencies = [ dependencies = [
"futures", "futures",
"log", "log",

View File

@ -16,7 +16,7 @@ clap-verbosity-flag = "2.2.2"
env_logger = "0.10.0" env_logger = "0.10.0"
futures = "0.3.31" futures = "0.3.31"
log = "0.4.20" log = "0.4.20"
mpvipc-async = { git = "https://git.pvv.ntnu.no/Projects/mpvipc-async.git", rev = "v0.1.0" } mpvipc-async = { git = "https://git.pvv.ntnu.no/Projects/mpvipc-async.git", branch = "main" }
sd-notify = "0.4.3" sd-notify = "0.4.3"
serde = { version = "1.0.188", features = ["derive"] } serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105" serde_json = "1.0.105"

View File

@ -32,7 +32,9 @@ async fn websocket_handler(
State(mpv): State<Mpv>, State(mpv): State<Mpv>,
) -> impl IntoResponse { ) -> impl IntoResponse {
let mpv = mpv.clone(); let mpv = mpv.clone();
ws.on_upgrade(move |socket| handle_connection(socket, addr, mpv))
// TODO: get an id provisioned by the id pool
ws.on_upgrade(move |socket| handle_connection(socket, addr, mpv, 1))
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@ -72,6 +74,8 @@ const DEFAULT_PROPERTY_SUBSCRIBES: [&str; 6] = [
"loop-playlist", "loop-playlist",
"pause", "pause",
"playlist", "playlist",
// TODO: Although this is nice to see for the user, it might be more wise to use
// "percent-pos" for the internal slider value, as well as the setter API call.
"time-pos", "time-pos",
"volume", "volume",
]; ];
@ -92,7 +96,7 @@ async fn setup_default_subscribes(mpv: &Mpv) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
async fn handle_connection(mut socket: WebSocket, addr: SocketAddr, mpv: Mpv) { async fn handle_connection(mut socket: WebSocket, addr: SocketAddr, mpv: Mpv, channel_id: u64) {
// TODO: There is an asynchronous gap between gathering the initial state and subscribing to the properties // TODO: There is an asynchronous gap between gathering the initial state and subscribing to the properties
// This could lead to missing events if they happen in that gap. Send initial state, but also ensure // This could lead to missing events if they happen in that gap. Send initial state, but also ensure
// that there is an additional "initial state" sent upon subscription to all properties to ensure that // that there is an additional "initial state" sent upon subscription to all properties to ensure that
@ -111,8 +115,9 @@ async fn handle_connection(mut socket: WebSocket, addr: SocketAddr, mpv: Mpv) {
setup_default_subscribes(&mpv).await.unwrap(); setup_default_subscribes(&mpv).await.unwrap();
let mut event_stream = mpv.get_event_stream().await; let connection_loop_mpv = mpv.clone();
let connection_loop = tokio::spawn(async move { let connection_loop = tokio::spawn(async move {
let mut event_stream = connection_loop_mpv.get_event_stream().await;
loop { loop {
select! { select! {
message = socket.recv() => { message = socket.recv() => {
@ -148,13 +153,24 @@ async fn handle_connection(mut socket: WebSocket, addr: SocketAddr, mpv: Mpv) {
Err(e) => return Err(anyhow::anyhow!("Error parsing message from {:?}: {:?}", addr, e)), Err(e) => return Err(anyhow::anyhow!("Error parsing message from {:?}: {:?}", addr, e)),
}; };
log::trace!("Handling command from {:?}: {:?}", addr, message_json);
// TODO: handle errors // TODO: handle errors
if let Ok(Some(response)) = handle_message(message_json, mpv.clone()).await { match handle_message(message_json, connection_loop_mpv.clone(), channel_id).await {
Ok(Some(response)) => {
log::trace!("Handled command from {:?} successfully, sending response", addr);
let message = Message::Text(json!({ let message = Message::Text(json!({
"type": "response", "type": "response",
"value": response, "value": response,
}).to_string()); }).to_string());
socket.send(message).await.unwrap(); socket.send(message).await.unwrap();
}
Ok(None) => {
log::trace!("Handled command from {:?} successfully", addr);
}
Err(e) => {
log::error!("Error handling message from {:?}: {:?}", addr, e);
}
} }
} }
event = event_stream.next() => { event = event_stream.next() => {
@ -181,11 +197,13 @@ async fn handle_connection(mut socket: WebSocket, addr: SocketAddr, mpv: Mpv) {
} }
}); });
connection_loop.await.unwrap().unwrap() connection_loop.await.unwrap().unwrap();
mpv.unobserve_property(channel_id).await.unwrap();
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type")] #[serde(tag = "type", rename_all = "snake_case")]
pub enum WSCommand { pub enum WSCommand {
Subscribe { property: String }, Subscribe { property: String },
UnsubscribeAll, UnsubscribeAll,
@ -204,18 +222,19 @@ pub enum WSCommand {
SetLooping { value: bool }, SetLooping { value: bool },
} }
async fn handle_message(message: Value, mpv: Mpv) -> anyhow::Result<Option<Value>> { async fn handle_message(message: Value, mpv: Mpv, channel_id: u64) -> anyhow::Result<Option<Value>> {
let command = let command =
serde_json::from_value::<WSCommand>(message).context("Failed to parse message")?; serde_json::from_value::<WSCommand>(message).context("Failed to parse message")?;
log::trace!("Successfully parsed message: {:?}", command);
match command { match command {
WSCommand::Subscribe { property } => { WSCommand::Subscribe { property } => {
// TODO: get an id provisioned by the id pool mpv.observe_property(channel_id, &property).await?;
mpv.observe_property(0, &property).await?;
Ok(None) Ok(None)
} }
WSCommand::UnsubscribeAll => { WSCommand::UnsubscribeAll => {
mpv.unobserve_property(0).await?; mpv.unobserve_property(channel_id).await?;
Ok(None) Ok(None)
} }
WSCommand::Load { url } => { WSCommand::Load { url } => {