MPV class cleanup

This commit is contained in:
Peder Bergebakken Sundt 2022-02-19 23:04:29 +01:00
parent fc33b9e1ae
commit 8b42e27e33
2 changed files with 75 additions and 25 deletions

View File

@ -1,24 +1,30 @@
import os
import asyncio
import json
from shlex import quote
import time
import shlex
import traceback
from typing import List, Optional, Union
from pathlib import Path
from . import nyasync
class MPV:
_ipc_endpoint = 'mpv_ipc.socket'
# TODO: move this to /tmp or /var/run ?
# TODO: make it configurable with an env variable?
_ipc_endpoint = Path("mpv_ipc.socket")
def __init__(self):
self.requests = nyasync.Queue()
self.responses = nyasync.Queue()
self.events = nyasync.Queue()
async def run(self):
self.proc = await asyncio.create_subprocess_exec(
@classmethod
def mpv_command(cls) -> List[str]:
return [
'mpv',
'--input-ipc-server=' + quote(self._ipc_endpoint),
f'--input-ipc-server={str(cls._ipc_endpoint)}',
'--idle',
'--force-window',
'--fullscreen',
@ -26,41 +32,79 @@ class MPV:
'--load-unsafe-playlists',
'--keep-open', # Keep last frame of video on end of video
#'--no-input-default-bindings',
]
async def run(self, is_restarted=False, **kw):
if self._ipc_endpoint.is_socket():
print("Socket found, try connecting instead of starting our own mpv!")
self.proc = None # we do not own the socket
await self.connect(**kw)
else:
print("Starting mpv...")
self.proc = await asyncio.create_subprocess_exec(*self.mpv_command())
await asyncio.gather(
self.ensure_running(),
self.connect(**kw),
)
while self.is_running():
async def connect(self, *, timeout=10):
t = time.time()
while self.is_running and time.time() - t < timeout:
try:
self.ipc_conn = await nyasync.unix_connection(self._ipc_endpoint)
self.ipc_conn = await nyasync.UnixConnection.from_path(str(self._ipc_endpoint))
break
except (FileNotFoundError, ConnectionRefusedError):
continue
await asyncio.sleep(0.1)
else:
raise Exception("MPV died before socket connected")
self._future = asyncio.gather(
self.ensure_running(),
if time.time() - t >= timeout:
#raise TimeoutError
# assume the socket is dead, and start our own instance
print("Socket not responding. Will try deleting it and start mpv ourselves!")
self._ipc_endpoint.unlink()
return await self.run()
else:
raise Exception("MPV died before socket connected")
print("Connected to mpv!")
# TODO: in this state we are unable to detect if the connection is lost
self._future_connect = asyncio.gather(
self.process_outgoing(),
self.process_incomming(),
)
await self._future
await self._future_connect
def _cleanup(self):
if os.path.exists(self._ipc_endpoint):
os.remove(self._ipc_endpoint)
self._future.cancel()#reduces a lot of errors on exit
def _cleanup_connection(self):
assert self.proc is not None # we must own the socket
self._future_connect.cancel() # reduces a lot of errors on exit
if self._ipc_endpoint.is_socket():
self._ipc_endpoint.unlink()
def is_running(self):
return self.proc.returncode is None
@property
def is_running(self) -> bool:
if self.proc is None: # we do not own the socket
# TODO: can i check the read and writer?
return self._ipc_endpoint.is_socket()
else:
return self.proc.returncode is None
async def ensure_running(self):
await self.proc.wait()
self._cleanup()
raise Exception("MPV died unexpectedly")
print("MPV suddenly stopped...")
self._cleanup_connection()
await self.run()
async def process_outgoing(self):
async for request in self.requests:
self.ipc_conn.write(json.dumps(request).encode('utf-8'))
try:
encoded = json.dumps(request).encode('utf-8')
except Exception as e:
print("Unencodable request:", request)
traceback.print_exception(e)
continue
self.ipc_conn.write(encoded)
self.ipc_conn.write(b'\n')
async def process_incomming(self):
@ -207,3 +251,8 @@ class MPVControl:
async def playlist_set_looping(self, value: bool):
resp = await self.send_request({"command":["set_property", "loop-playlist", "inf" if value else "no"]})
return resp["error"] == "success"
# CLI entrypoint
def print_mpv_command():
print(*map(shlex.quote, MPV.mpv_command()))

View File

@ -53,15 +53,16 @@ class Condition:
with await self.monitor:
self.monitor.notify_all()
async def unix_connection(path):
endpoints = await asyncio.open_unix_connection(path)
return UnixConnection(*endpoints)
class UnixConnection:
def __init__(self, reader: StreamReader, writer: StreamWriter):
self.reader: StreamReader = reader
self.writer: StreamWriter = writer
@classmethod
async def from_path(cls, path):
endpoints = await asyncio.open_unix_connection(path)
return cls(*endpoints)
def __aiter__(self):
return self.reader.__aiter__()