64 lines
2.3 KiB
Python
64 lines
2.3 KiB
Python
|
import json
|
||
|
import requests
|
||
|
import sys
|
||
|
import os
|
||
|
import subprocess
|
||
|
|
||
|
# See github project: https://github.com/mpolden/atb
|
||
|
ATB_API_URL = "https://mpolden.no/atb/v2/departures/"
|
||
|
# Stop place codes found through https://stoppested.entur.org/
|
||
|
STOP_PLACES = {
|
||
|
"gløshaugen": "44085",
|
||
|
"høgskoleringen": "42029",
|
||
|
"hesthagen": "41620"
|
||
|
}
|
||
|
|
||
|
def format_departure(departure_dict: dict) -> str:
|
||
|
"""Formats the json supplied by mpolden's atb api to a single line of text
|
||
|
quickly summarizing the scheduled bus.
|
||
|
"""
|
||
|
line = departure_dict["line"]
|
||
|
departure_time = departure_dict["scheduledDepartureTime"].split("T")[1].split(".")[0]
|
||
|
destination = departure_dict["destination"]
|
||
|
is_real_time = departure_dict["isRealtimeData"]
|
||
|
towards_city_center = departure_dict["isGoingTowardsCentrum"]
|
||
|
|
||
|
# return f"{departure_time}: {line}\t({'R' if is_real_time else 'S'}, {'I' if towards_city_center else 'O'}) {destination}"
|
||
|
return f"{departure_time}: {line}\t{destination}\t ({'realtime' if is_real_time else 'aimed'},\t{'til Midtbyen' if towards_city_center else 'fra Midtbyen'})"
|
||
|
|
||
|
|
||
|
def write_to_socket(s: str, mpv_socket) -> None:
|
||
|
"""Writes the string s to the mpv on screen display through
|
||
|
the open mpv ipc socket located at the path `mpv_socket`.
|
||
|
"""
|
||
|
cmd_dict = {"command": ["show-text", f"{s}", 10000]}
|
||
|
mpv_cmd = json.dumps(cmd_dict)
|
||
|
cmd = f'echo \'{mpv_cmd}\' | socat - ./{mpv_socket}'
|
||
|
subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||
|
return None
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
input_stop_place = sys.argv[1]
|
||
|
if input_stop_place in STOP_PLACES.keys():
|
||
|
active_stop_place = STOP_PLACES[input_stop_place]
|
||
|
else:
|
||
|
active_stop_place = STOP_PLACES["gløshaugen"]
|
||
|
|
||
|
r = requests.get(ATB_API_URL + active_stop_place)
|
||
|
departures = r.json()["departures"]
|
||
|
sorted_departures = sorted(departures, key = lambda d: d["scheduledDepartureTime"])
|
||
|
|
||
|
# ensure mpv is running with the ipc-server socket set in MPV_SOCKET
|
||
|
MPV_SOCKET = "mpv_ipc.socket"
|
||
|
if os.path.exists(MPV_SOCKET):
|
||
|
write_to_socket(
|
||
|
f"{'\n'.join([
|
||
|
format_departure(sorted_departures[i])
|
||
|
for i in range(len(sorted_departures))
|
||
|
])}", MPV_SOCKET)
|
||
|
else:
|
||
|
for dep in sorted_departures:
|
||
|
print(format_departure(dep))
|
||
|
|