From 8b42e27e3374de9ce1ab985a04c25c0132cfe0fa Mon Sep 17 00:00:00 2001 From: Peder Bergebakken Sundt Date: Sat, 19 Feb 2022 23:04:29 +0100 Subject: [PATCH] MPV class cleanup --- grzegorz/mpv.py | 91 ++++++++++++++++++++++++++++++++++----------- grzegorz/nyasync.py | 9 +++-- 2 files changed, 75 insertions(+), 25 deletions(-) diff --git a/grzegorz/mpv.py b/grzegorz/mpv.py index 763d51a..5307212 100644 --- a/grzegorz/mpv.py +++ b/grzegorz/mpv.py @@ -1,24 +1,30 @@ import os import asyncio import json -from shlex import quote +import time +import shlex +import traceback from typing import List, Optional, Union +from pathlib import Path from . import nyasync class MPV: - _ipc_endpoint = 'mpv_ipc.socket' + # TODO: move this to /tmp or /var/run ? + # TODO: make it configurable with an env variable? + _ipc_endpoint = Path("mpv_ipc.socket") def __init__(self): self.requests = nyasync.Queue() self.responses = nyasync.Queue() self.events = nyasync.Queue() - async def run(self): - self.proc = await asyncio.create_subprocess_exec( + @classmethod + def mpv_command(cls) -> List[str]: + return [ 'mpv', - '--input-ipc-server=' + quote(self._ipc_endpoint), + f'--input-ipc-server={str(cls._ipc_endpoint)}', '--idle', '--force-window', '--fullscreen', @@ -26,41 +32,79 @@ class MPV: '--load-unsafe-playlists', '--keep-open', # Keep last frame of video on end of video #'--no-input-default-bindings', + ] + + async def run(self, is_restarted=False, **kw): + if self._ipc_endpoint.is_socket(): + print("Socket found, try connecting instead of starting our own mpv!") + self.proc = None # we do not own the socket + await self.connect(**kw) + else: + print("Starting mpv...") + self.proc = await asyncio.create_subprocess_exec(*self.mpv_command()) + await asyncio.gather( + self.ensure_running(), + self.connect(**kw), ) - while self.is_running(): + async def connect(self, *, timeout=10): + t = time.time() + while self.is_running and time.time() - t < timeout: try: - self.ipc_conn = await nyasync.unix_connection(self._ipc_endpoint) + self.ipc_conn = await nyasync.UnixConnection.from_path(str(self._ipc_endpoint)) break except (FileNotFoundError, ConnectionRefusedError): continue await asyncio.sleep(0.1) else: - raise Exception("MPV died before socket connected") - - self._future = asyncio.gather( - self.ensure_running(), + if time.time() - t >= timeout: + #raise TimeoutError + + # assume the socket is dead, and start our own instance + print("Socket not responding. Will try deleting it and start mpv ourselves!") + self._ipc_endpoint.unlink() + return await self.run() + else: + raise Exception("MPV died before socket connected") + + print("Connected to mpv!") + # TODO: in this state we are unable to detect if the connection is lost + + self._future_connect = asyncio.gather( self.process_outgoing(), self.process_incomming(), ) - await self._future + await self._future_connect - def _cleanup(self): - if os.path.exists(self._ipc_endpoint): - os.remove(self._ipc_endpoint) - self._future.cancel()#reduces a lot of errors on exit + def _cleanup_connection(self): + assert self.proc is not None # we must own the socket + self._future_connect.cancel() # reduces a lot of errors on exit + if self._ipc_endpoint.is_socket(): + self._ipc_endpoint.unlink() - def is_running(self): - return self.proc.returncode is None + @property + def is_running(self) -> bool: + if self.proc is None: # we do not own the socket + # TODO: can i check the read and writer? + return self._ipc_endpoint.is_socket() + else: + return self.proc.returncode is None async def ensure_running(self): await self.proc.wait() - self._cleanup() - raise Exception("MPV died unexpectedly") + print("MPV suddenly stopped...") + self._cleanup_connection() + await self.run() async def process_outgoing(self): async for request in self.requests: - self.ipc_conn.write(json.dumps(request).encode('utf-8')) + try: + encoded = json.dumps(request).encode('utf-8') + except Exception as e: + print("Unencodable request:", request) + traceback.print_exception(e) + continue + self.ipc_conn.write(encoded) self.ipc_conn.write(b'\n') async def process_incomming(self): @@ -207,3 +251,8 @@ class MPVControl: async def playlist_set_looping(self, value: bool): resp = await self.send_request({"command":["set_property", "loop-playlist", "inf" if value else "no"]}) return resp["error"] == "success" + + +# CLI entrypoint +def print_mpv_command(): + print(*map(shlex.quote, MPV.mpv_command())) diff --git a/grzegorz/nyasync.py b/grzegorz/nyasync.py index fd684e3..cc51ba4 100644 --- a/grzegorz/nyasync.py +++ b/grzegorz/nyasync.py @@ -53,15 +53,16 @@ class Condition: with await self.monitor: self.monitor.notify_all() -async def unix_connection(path): - endpoints = await asyncio.open_unix_connection(path) - return UnixConnection(*endpoints) - class UnixConnection: def __init__(self, reader: StreamReader, writer: StreamWriter): self.reader: StreamReader = reader self.writer: StreamWriter = writer + @classmethod + async def from_path(cls, path): + endpoints = await asyncio.open_unix_connection(path) + return cls(*endpoints) + def __aiter__(self): return self.reader.__aiter__()