cleanup and type hints

This commit is contained in:
Peder Bergebakken Sundt 2022-02-19 23:00:44 +01:00
parent 1b9c723e06
commit b9968d88e3
5 changed files with 76 additions and 42 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
*.pyc *.pyc
__pycache__ __pycache__
config.py config.py
*.socket

View File

@ -1,8 +1,7 @@
import asyncio from sanic import Request, Blueprint, response
from sanic import Blueprint, response
from sanic_openapi import doc from sanic_openapi import doc
from functools import wraps from functools import wraps
from . import mpv from .mpv import MPVControl
from .playlist_data import PlaylistDataCache from .playlist_data import PlaylistDataCache
bp = Blueprint("grzegorz-api", strict_slashes=True) bp = Blueprint("grzegorz-api", strict_slashes=True)
@ -13,7 +12,7 @@ bp = Blueprint("grzegorz-api", strict_slashes=True)
def response_json(func): def response_json(func):
@wraps(func) @wraps(func)
async def newfunc(*args, **kwargs): async def newfunc(*args, **kwargs):
try: try:
request = args[0] request = args[0]
mpv_control = request.app.config["mpv_control"] mpv_control = request.app.config["mpv_control"]
out = await func(*args, mpv_control, **kwargs) out = await func(*args, mpv_control, **kwargs)
@ -37,15 +36,17 @@ def response_text(func):
return response.text(body) return response.text(body)
return newfunc return newfunc
class APIError(Exception): pass class APIError(Exception):
pass
# singleton
PLAYLIST_DATA_CACHE = PlaylistDataCache(auto_fetch_data=True) PLAYLIST_DATA_CACHE = PlaylistDataCache(auto_fetch_data=True)
#routes: #routes:
@bp.get("") @bp.get("")
@doc.exclude(True) @doc.exclude(True)
@response_text @response_text
async def root(request): async def root(request: Request):
return "Hello friend, I hope you're having a lovely day" return "Hello friend, I hope you're having a lovely day"
@bp.post("/load") @bp.post("/load")
@ -53,7 +54,7 @@ async def root(request):
@doc.consumes({"path": doc.String("Link to the resource to enqueue")}, required=True) @doc.consumes({"path": doc.String("Link to the resource to enqueue")}, required=True)
@doc.consumes({"body":doc.Dictionary(description="Any data you want stored with the queued item")}, location="body") @doc.consumes({"body":doc.Dictionary(description="Any data you want stored with the queued item")}, location="body")
@response_json @response_json
async def loadfile(request, mpv_control): async def loadfile(request: Request, mpv_control: MPVControl):
if "path" not in request.args: if "path" not in request.args:
raise APIError("No query parameter \"path\" provided") raise APIError("No query parameter \"path\" provided")
if request.json: if request.json:
@ -64,7 +65,7 @@ async def loadfile(request, mpv_control):
@bp.get("/play") @bp.get("/play")
@doc.summary("Check whether the player is paused or playing") @doc.summary("Check whether the player is paused or playing")
@response_json @response_json
async def play_get(request, mpv_control): async def play_get(request: Request, mpv_control: MPVControl):
value = await mpv_control.pause_get() == False value = await mpv_control.pause_get() == False
return locals() return locals()
@ -72,7 +73,7 @@ async def play_get(request, mpv_control):
@doc.summary("Set whether the player is paused or playing") @doc.summary("Set whether the player is paused or playing")
@doc.consumes({"play": doc.Boolean("Whether to be playing or not")}) @doc.consumes({"play": doc.Boolean("Whether to be playing or not")})
@response_json @response_json
async def play_set(request, mpv_control): async def play_set(request: Request, mpv_control: MPVControl):
if "play" not in request.args: if "play" not in request.args:
raise APIError("No query parameter \"play\" provided") raise APIError("No query parameter \"play\" provided")
success = await mpv_control \ success = await mpv_control \
@ -82,7 +83,7 @@ async def play_set(request, mpv_control):
@bp.get("/volume") @bp.get("/volume")
@doc.summary("Get the current player volume") @doc.summary("Get the current player volume")
@response_json @response_json
async def volume_get(request, mpv_control): async def volume_get(request: Request, mpv_control: MPVControl):
value = await mpv_control.volume_get() value = await mpv_control.volume_get()
return locals() return locals()
@ -90,7 +91,7 @@ async def volume_get(request, mpv_control):
@doc.summary("Set the player volume") @doc.summary("Set the player volume")
@doc.consumes({"volume": doc.Integer("A number between 0 and 100")}) @doc.consumes({"volume": doc.Integer("A number between 0 and 100")})
@response_json @response_json
async def volume_set(request, mpv_control): async def volume_set(request: Request, mpv_control: MPVControl):
if "volume" not in request.args: if "volume" not in request.args:
raise APIError("No query parameter \"volume\" provided") raise APIError("No query parameter \"volume\" provided")
success = await mpv_control \ success = await mpv_control \
@ -100,7 +101,7 @@ async def volume_set(request, mpv_control):
@bp.get("/time") @bp.get("/time")
@doc.summary("Get current playback position") @doc.summary("Get current playback position")
@response_json @response_json
async def time_get(request, mpv_control): async def time_get(request: Request, mpv_control: MPVControl):
value = { value = {
"current": await mpv_control.time_pos_get(), "current": await mpv_control.time_pos_get(),
"left": await mpv_control.time_remaining_get(), "left": await mpv_control.time_remaining_get(),
@ -112,7 +113,7 @@ async def time_get(request, mpv_control):
@doc.summary("Set playback position") @doc.summary("Set playback position")
@doc.consumes({"pos": doc.Float("Seconds to seek to"), "pos": doc.Integer("Percent to seek to")}) @doc.consumes({"pos": doc.Float("Seconds to seek to"), "pos": doc.Integer("Percent to seek to")})
@response_json @response_json
async def time_set(request, mpv_control): async def time_set(request: Request, mpv_control: MPVControl):
if "pos" in request.args: if "pos" in request.args:
success = await mpv_control.seek_absolute(float(request.args["pos"][0])) success = await mpv_control.seek_absolute(float(request.args["pos"][0]))
elif "percent" in request.args: elif "percent" in request.args:
@ -124,7 +125,7 @@ async def time_set(request, mpv_control):
@bp.get("/playlist") @bp.get("/playlist")
@doc.summary("Get the current playlist") @doc.summary("Get the current playlist")
@response_json @response_json
async def playlist_get(request, mpv_control): async def playlist_get(request: Request, mpv_control: MPVControl):
value = await mpv_control.playlist_get() value = await mpv_control.playlist_get()
value = list(PLAYLIST_DATA_CACHE.add_data_to_playlist(value)) value = list(PLAYLIST_DATA_CACHE.add_data_to_playlist(value))
for i, v in enumerate(value): for i, v in enumerate(value):
@ -137,14 +138,14 @@ async def playlist_get(request, mpv_control):
@bp.post("/playlist/next") @bp.post("/playlist/next")
@doc.summary("Skip to the next item in the playlist") @doc.summary("Skip to the next item in the playlist")
@response_json @response_json
async def playlist_next(request, mpv_control): async def playlist_next(request: Request, mpv_control: MPVControl):
success = await mpv_control.playlist_next() success = await mpv_control.playlist_next()
return locals() return locals()
@bp.post("/playlist/previous") @bp.post("/playlist/previous")
@doc.summary("Go back to the previous item in the playlist") @doc.summary("Go back to the previous item in the playlist")
@response_json @response_json
async def playlist_previous(request, mpv_control): async def playlist_previous(request: Request, mpv_control: MPVControl):
success = await mpv_control.playlist_prev() success = await mpv_control.playlist_prev()
return locals() return locals()
@ -152,7 +153,7 @@ async def playlist_previous(request, mpv_control):
@doc.summary("Go chosen item in the playlist") @doc.summary("Go chosen item in the playlist")
@doc.consumes({"index": doc.Integer("The 0 indexed playlist item to go to")}, required=True) @doc.consumes({"index": doc.Integer("The 0 indexed playlist item to go to")}, required=True)
@response_json @response_json
async def playlist_goto(request, mpv_control): async def playlist_goto(request: Request, mpv_control: MPVControl):
if "index" not in request.args: if "index" not in request.args:
raise APIError("Missing the required parameter: \"index\"") raise APIError("Missing the required parameter: \"index\"")
success = await mpv_control.playlist_goto( success = await mpv_control.playlist_goto(
@ -163,7 +164,7 @@ async def playlist_goto(request, mpv_control):
@doc.summary("Clears single item or whole playlist") @doc.summary("Clears single item or whole playlist")
@doc.consumes({"index": doc.Integer("Index to item in playlist to remove. If unset, the whole playlist is cleared")}) @doc.consumes({"index": doc.Integer("Index to item in playlist to remove. If unset, the whole playlist is cleared")})
@response_json @response_json
async def playlist_remove_or_clear(request, mpv_control): async def playlist_remove_or_clear(request: Request, mpv_control: MPVControl):
if "index" in request.args: if "index" in request.args:
success = await mpv_control.playlist_remove(int(request.args["index"][0])) success = await mpv_control.playlist_remove(int(request.args["index"][0]))
action = f"remove #{request.args['index'][0]}" action = f"remove #{request.args['index'][0]}"
@ -183,7 +184,7 @@ async def playlist_remove_or_clear(request, mpv_control):
@doc.consumes({"index2": int}, required=True) @doc.consumes({"index2": int}, required=True)
@doc.consumes({"index1": int}, required=True) @doc.consumes({"index1": int}, required=True)
@response_json @response_json
async def playlist_move(request, mpv_control): async def playlist_move(request: Request, mpv_control: MPVControl):
if "index1" not in request.args or "index2" not in request.args: if "index1" not in request.args or "index2" not in request.args:
raise APIError( raise APIError(
"Missing at least one of the required query " "Missing at least one of the required query "
@ -196,14 +197,14 @@ async def playlist_move(request, mpv_control):
@bp.post("/playlist/shuffle") @bp.post("/playlist/shuffle")
@doc.summary("Clears single item or whole playlist") @doc.summary("Clears single item or whole playlist")
@response_json @response_json
async def playlist_shuffle(request, mpv_control): async def playlist_shuffle(request: Request, mpv_control: MPVControl):
success = await mpv_control.playlist_shuffle() success = await mpv_control.playlist_shuffle()
return locals() return locals()
@bp.get("/playlist/loop") @bp.get("/playlist/loop")
@doc.summary("See whether it loops the playlist or not") @doc.summary("See whether it loops the playlist or not")
@response_json @response_json
async def playlist_get_looping(request, mpv_control): async def playlist_get_looping(request: Request, mpv_control: MPVControl):
value = await mpv_control.playlist_get_looping() value = await mpv_control.playlist_get_looping()
return locals() return locals()
@ -211,7 +212,7 @@ async def playlist_get_looping(request, mpv_control):
@doc.summary("Sets whether to loop the playlist or not") @doc.summary("Sets whether to loop the playlist or not")
@doc.consumes({"loop": doc.Boolean("Whether to be looping or not")}, required=True) @doc.consumes({"loop": doc.Boolean("Whether to be looping or not")}, required=True)
@response_json @response_json
async def playlist_set_looping(request, mpv_control): async def playlist_set_looping(request: Request, mpv_control: MPVControl):
if "loop" not in request.args: if "loop" not in request.args:
raise APIError("Missing the required parameter: \"loop\"") raise APIError("Missing the required parameter: \"loop\"")
success = await mpv_control.playlist_set_looping( success = await mpv_control.playlist_set_looping(

View File

@ -2,9 +2,11 @@ import os
import asyncio import asyncio
import json import json
from shlex import quote from shlex import quote
from typing import List, Optional, Union
from . import nyasync from . import nyasync
class MPV: class MPV:
_ipc_endpoint = 'mpv_ipc.socket' _ipc_endpoint = 'mpv_ipc.socket'
@ -69,7 +71,8 @@ class MPV:
else: # response else: # response
self.responses.put_nowait(msg) self.responses.put_nowait(msg)
class MPVError(Exception): pass class MPVError(Exception):
pass
class MPVControl: class MPVControl:
def __init__(self): def __init__(self):
@ -84,6 +87,7 @@ class MPVControl:
async def process_events(self): async def process_events(self):
async for event in self.mpv.events: async for event in self.mpv.events:
# TODO: print?
pass pass
async def send_request(self, msg): async def send_request(self, msg):
@ -93,9 +97,11 @@ class MPVControl:
# is the safest option. # is the safest option.
self.mpv.requests.put_nowait(msg) self.mpv.requests.put_nowait(msg)
return await self.mpv.responses.get() return await self.mpv.responses.get()
#other commands: #other commands:
async def wake_screen(self): async def wake_screen(self):
# TODO: use this
# TODO: wayland counterpart
p = await asyncio.create_subprocess_exec( p = await asyncio.create_subprocess_exec(
"xset", "xset",
"-display", "-display",
@ -105,75 +111,99 @@ class MPVControl:
"on" "on"
) )
code = await process.wait() code = await process.wait()
#Shorthand command requests: #Shorthand command requests:
async def loadfile(self, file):#appends to playlist and start playback if paused
async def loadfile(self, file: Union[str, Path]):
"appends to playlist and start playback if paused"
if isinstance(file, Path):
file = str(file)
resp = await self.send_request({"command":["loadfile", file, "append-play"]}) resp = await self.send_request({"command":["loadfile", file, "append-play"]})
return resp["error"] == "success" return resp["error"] == "success"
async def pause_get(self): async def pause_get(self):
resp = await self.send_request({"command":["get_property", "pause"]}) resp = await self.send_request({"command":["get_property", "pause"]})
if "error" in resp and resp["error"] != "success": if "error" in resp and resp["error"] != "success":
raise MPVError("Unable to get whether paused or not: " + resp["error"]) raise MPVError("Unable to get whether paused or not: " + resp["error"])
return resp["data"] if "data" in resp else None return resp["data"] if "data" in resp else None
async def pause_set(self, state):
async def pause_set(self, state: bool):
resp = await self.send_request({"command":["set_property", "pause", bool(state)]}) resp = await self.send_request({"command":["set_property", "pause", bool(state)]})
return resp["error"] == "success" return resp["error"] == "success"
async def volume_get(self): async def volume_get(self):
resp = await self.send_request({"command":["get_property", "volume"]}) resp = await self.send_request({"command":["get_property", "volume"]})
if "error" in resp and resp["error"] != "success": if "error" in resp and resp["error"] != "success":
raise MPVError("Unable to get volume! " + resp["error"]) raise MPVError("Unable to get volume! " + resp["error"])
return resp["data"] if "data" in resp else None return resp["data"] if "data" in resp else None
async def volume_set(self, volume):
async def volume_set(self, volume: int):
resp = await self.send_request({"command":["set_property", "volume", volume]}) resp = await self.send_request({"command":["set_property", "volume", volume]})
return resp["error"] == "success" return resp["error"] == "success"
async def time_pos_get(self): async def time_pos_get(self):
resp = await self.send_request({"command":["get_property", "time-pos"]}) resp = await self.send_request({"command":["get_property", "time-pos"]})
if "error" in resp and resp["error"] != "success": if "error" in resp and resp["error"] != "success":
raise MPVError("Unable to get time pos: " + resp["error"]) raise MPVError("Unable to get time pos: " + resp["error"])
return resp["data"] if "data" in resp else None return resp["data"] if "data" in resp else None
async def time_remaining_get(self): async def time_remaining_get(self):
resp = await self.send_request({"command":["get_property", "time-remaining"]}) resp = await self.send_request({"command":["get_property", "time-remaining"]})
if "error" in resp and resp["error"] != "success": if "error" in resp and resp["error"] != "success":
raise MPVError("Unable to get time left:" + resp["error"]) raise MPVError("Unable to get time left:" + resp["error"])
return resp["data"] if "data" in resp else None return resp["data"] if "data" in resp else None
async def seek_absolute(self, seconds):
async def seek_absolute(self, seconds: float):
resp = await self.send_request({"command":["seek", seconds, "absolute"]}) resp = await self.send_request({"command":["seek", seconds, "absolute"]})
return resp["data"] if "data" in resp else None return resp["data"] if "data" in resp else None
async def seek_relative(self, seconds):
async def seek_relative(self, seconds: float):
resp = await self.send_request({"command":["seek", seconds, "relative"]}) resp = await self.send_request({"command":["seek", seconds, "relative"]})
return resp["data"] if "data" in resp else None return resp["data"] if "data" in resp else None
async def seek_percent(self, percent):
async def seek_percent(self, percent: float):
resp = await self.send_request({"command":["seek", percent, "absolute-percent"]}) resp = await self.send_request({"command":["seek", percent, "absolute-percent"]})
return resp["data"] if "data" in resp else None return resp["data"] if "data" in resp else None
async def playlist_get(self): async def playlist_get(self):
resp = await self.send_request({"command":["get_property", "playlist"]}) resp = await self.send_request({"command":["get_property", "playlist"]})
if "error" in resp and resp["error"] != "success": if "error" in resp and resp["error"] != "success":
raise MPVError("Unable to get playlist:" + resp["error"]) raise MPVError("Unable to get playlist:" + resp["error"])
return resp["data"] if "data" in resp else None return resp["data"] if "data" in resp else None
async def playlist_next(self): async def playlist_next(self):
resp = await self.send_request({"command":["playlist-next", "weak"]}) resp = await self.send_request({"command":["playlist-next", "weak"]})
return resp["error"] == "success" return resp["error"] == "success"
async def playlist_prev(self): async def playlist_prev(self):
resp = await self.send_request({"command":["playlist-prev", "weak"]}) resp = await self.send_request({"command":["playlist-prev", "weak"]})
return resp["error"] == "success" return resp["error"] == "success"
async def playlist_goto(self, index): async def playlist_goto(self, index):
resp = await self.send_request({"command":["set_property", "playlist-pos", index]}) resp = await self.send_request({"command":["set_property", "playlist-pos", index]})
return resp["error"] == "success" return resp["error"] == "success"
async def playlist_clear(self): async def playlist_clear(self):
resp = await self.send_request({"command":["playlist-clear"]}) resp = await self.send_request({"command":["playlist-clear"]})
return resp["error"] == "success" return resp["error"] == "success"
async def playlist_remove(self, index=None):
async def playlist_remove(self, index: Optional[int] = None):
resp = await self.send_request({"command":["playlist-remove", "current" if index==None else index]}) resp = await self.send_request({"command":["playlist-remove", "current" if index==None else index]})
return resp["error"] == "success" return resp["error"] == "success"
async def playlist_move(self, index1, index2):
async def playlist_move(self, index1: int, index2: int):
resp = await self.send_request({"command":["playlist-move", index1, index2]}) resp = await self.send_request({"command":["playlist-move", index1, index2]})
return resp["error"] == "success" return resp["error"] == "success"
async def playlist_shuffle(self): async def playlist_shuffle(self):
resp = await self.send_request({"command":["playlist-shuffle"]}) resp = await self.send_request({"command":["playlist-shuffle"]})
return resp["error"] == "success" return resp["error"] == "success"
async def playlist_get_looping(self): async def playlist_get_looping(self):
resp = await self.send_request({"command":["get_property", "loop-playlist"]}) resp = await self.send_request({"command":["get_property", "loop-playlist"]})
return resp["data"] == "inf" if "data" in resp else False return resp["data"] == "inf" if "data" in resp else False
async def playlist_set_looping(self, value):
async def playlist_set_looping(self, value: bool):
resp = await self.send_request({"command":["set_property", "loop-playlist", "inf" if value else "no"]}) resp = await self.send_request({"command":["set_property", "loop-playlist", "inf" if value else "no"]})
return resp["error"] == "success" return resp["error"] == "success"

View File

@ -1,4 +1,5 @@
import asyncio import asyncio
from asyncio.streams import StreamReader, StreamWriter
def ify(func): def ify(func):
"""Decorate func to run async in default executor""" """Decorate func to run async in default executor"""
@ -57,9 +58,9 @@ async def unix_connection(path):
return UnixConnection(*endpoints) return UnixConnection(*endpoints)
class UnixConnection: class UnixConnection:
def __init__(self, reader, writer): def __init__(self, reader: StreamReader, writer: StreamWriter):
self.reader = reader self.reader: StreamReader = reader
self.writer = writer self.writer: StreamWriter = writer
def __aiter__(self): def __aiter__(self):
return self.reader.__aiter__() return self.reader.__aiter__()

View File

@ -1,4 +1,3 @@
import asyncio
from .metadatafetch import get_metadata from .metadatafetch import get_metadata
from . import nyasync from . import nyasync
@ -8,12 +7,14 @@ class PlaylistDataCache:
self.filepath_data_map = {} self.filepath_data_map = {}
self.auto_fetch_data = auto_fetch_data self.auto_fetch_data = auto_fetch_data
self.jobs = None self.jobs = None
def add_data(self, filepath, data=None): def add_data(self, filepath, data=None):
if data: if data:
self.filepath_data_map[filepath] = data self.filepath_data_map[filepath] = data
async def run(self): async def run(self):
if not self.auto_fetch_data: return if not self.auto_fetch_data: return
self.jobs = nyasync.Queue() self.jobs = nyasync.Queue()
async for filename in self.jobs: async for filename in self.jobs:
print("Fetching metadata for ", repr(filename)) print("Fetching metadata for ", repr(filename))
@ -22,9 +23,10 @@ class PlaylistDataCache:
if filename in self.filepath_data_map: if filename in self.filepath_data_map:
self.filepath_data_map[filename].update(data) self.filepath_data_map[filename].update(data)
del self.filepath_data_map[filename]["fetching"] del self.filepath_data_map[filename]["fetching"]
def add_data_to_playlist(self, playlist): def add_data_to_playlist(self, playlist):
seen = set() seen = set()
for item in playlist: for item in playlist:
if "filename" in item: if "filename" in item:
seen.add(item["filename"]) seen.add(item["filename"])
@ -41,8 +43,7 @@ class PlaylistDataCache:
yield new_item yield new_item
continue continue
yield item yield item
not_seen = set(self.filepath_data_map.keys()) - seen not_seen = set(self.filepath_data_map.keys()) - seen
for name in not_seen: for name in not_seen:
del self.filepath_data_map[name] del self.filepath_data_map[name]