import json import requests import sys import os import subprocess # See github project: https://github.com/mpolden/atb ATB_API_URL = "https://mpolden.no/atb/v2/departures/" # Stop place codes found through https://stoppested.entur.org/ STOP_PLACES = { "gløshaugen": "44085", "høgskoleringen": "42029", "hesthagen": "41620" } def format_departure(departure_dict: dict) -> str: """Formats the json supplied by mpolden's atb api to a single line of text quickly summarizing the scheduled bus. """ line = departure_dict["line"] departure_time = departure_dict["scheduledDepartureTime"].split("T")[1].split(".")[0] destination = departure_dict["destination"] is_real_time = departure_dict["isRealtimeData"] towards_city_center = departure_dict["isGoingTowardsCentrum"] # return f"{departure_time}: {line}\t({'R' if is_real_time else 'S'}, {'I' if towards_city_center else 'O'}) {destination}" return f"{departure_time}: {line}\t{destination}\t ({'realtime' if is_real_time else 'aimed'},\t{'til Midtbyen' if towards_city_center else 'fra Midtbyen'})" def write_to_socket(s: str, mpv_socket) -> None: """Writes the string s to the mpv on screen display through the open mpv ipc socket located at the path `mpv_socket`. """ cmd_dict = {"command": ["show-text", f"{s}", 10000]} mpv_cmd = json.dumps(cmd_dict) cmd = f'echo \'{mpv_cmd}\' | socat - ./{mpv_socket}' subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) return None if __name__ == "__main__": input_stop_place = sys.argv[1] if input_stop_place in STOP_PLACES.keys(): active_stop_place = STOP_PLACES[input_stop_place] else: active_stop_place = STOP_PLACES["gløshaugen"] r = requests.get(ATB_API_URL + active_stop_place) departures = r.json()["departures"] sorted_departures = sorted(departures, key = lambda d: d["scheduledDepartureTime"]) # ensure mpv is running with the ipc-server socket set in MPV_SOCKET MPV_SOCKET = "mpv_ipc.socket" if os.path.exists(MPV_SOCKET): write_to_socket( f"{'\n'.join([ format_departure(sorted_departures[i]) for i in range(len(sorted_departures)) ])}", MPV_SOCKET) else: for dep in sorted_departures: print(format_departure(dep))