2024-12-01 01:55:29 +01:00
|
|
|
import json
|
|
|
|
import requests
|
|
|
|
import sys
|
|
|
|
import os
|
|
|
|
import subprocess
|
|
|
|
|
|
|
|
# See github project: https://github.com/mpolden/atb
|
|
|
|
ATB_API_URL = "https://mpolden.no/atb/v2/departures/"
|
|
|
|
# Stop place codes found through https://stoppested.entur.org/
|
|
|
|
STOP_PLACES = {
|
|
|
|
"gløshaugen": "44085",
|
|
|
|
"høgskoleringen": "42029",
|
|
|
|
"hesthagen": "41620"
|
|
|
|
}
|
|
|
|
|
2024-12-12 23:07:32 +01:00
|
|
|
def get_departures(stop_place: str, atb_api_url: str = ATB_API_URL):
|
|
|
|
return requests.get(atb_api_url + stop_place).json()["departures"]
|
|
|
|
|
2024-12-01 01:55:29 +01:00
|
|
|
def format_departure(departure_dict: dict) -> str:
|
|
|
|
"""Formats the json supplied by mpolden's atb api to a single line of text
|
|
|
|
quickly summarizing the scheduled bus.
|
|
|
|
"""
|
|
|
|
line = departure_dict["line"]
|
|
|
|
departure_time = departure_dict["scheduledDepartureTime"].split("T")[1].split(".")[0]
|
|
|
|
destination = departure_dict["destination"]
|
|
|
|
is_real_time = departure_dict["isRealtimeData"]
|
|
|
|
towards_city_center = departure_dict["isGoingTowardsCentrum"]
|
|
|
|
|
|
|
|
# return f"{departure_time}: {line}\t({'R' if is_real_time else 'S'}, {'I' if towards_city_center else 'O'}) {destination}"
|
|
|
|
return f"{departure_time}: {line}\t{destination}\t ({'realtime' if is_real_time else 'aimed'},\t{'til Midtbyen' if towards_city_center else 'fra Midtbyen'})"
|
|
|
|
|
|
|
|
|
|
|
|
def write_to_socket(s: str, mpv_socket) -> None:
|
|
|
|
"""Writes the string s to the mpv on screen display through
|
|
|
|
the open mpv ipc socket located at the path `mpv_socket`.
|
|
|
|
"""
|
|
|
|
cmd_dict = {"command": ["show-text", f"{s}", 10000]}
|
|
|
|
mpv_cmd = json.dumps(cmd_dict)
|
|
|
|
cmd = f'echo \'{mpv_cmd}\' | socat - ./{mpv_socket}'
|
|
|
|
subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2024-12-12 23:07:32 +01:00
|
|
|
try:
|
|
|
|
input_stop_place = sys.argv[1]
|
|
|
|
except:
|
|
|
|
input_stop_place = "no input was given"
|
|
|
|
|
2024-12-01 01:55:29 +01:00
|
|
|
if input_stop_place in STOP_PLACES.keys():
|
|
|
|
active_stop_place = STOP_PLACES[input_stop_place]
|
|
|
|
else:
|
|
|
|
active_stop_place = STOP_PLACES["gløshaugen"]
|
|
|
|
|
2024-12-12 23:07:32 +01:00
|
|
|
departures = get_departures(active_stop_place)
|
2024-12-01 01:55:29 +01:00
|
|
|
sorted_departures = sorted(departures, key = lambda d: d["scheduledDepartureTime"])
|
|
|
|
|
|
|
|
# ensure mpv is running with the ipc-server socket set in MPV_SOCKET
|
|
|
|
MPV_SOCKET = "mpv_ipc.socket"
|
|
|
|
if os.path.exists(MPV_SOCKET):
|
|
|
|
write_to_socket(
|
|
|
|
f"{'\n'.join([
|
|
|
|
format_departure(sorted_departures[i])
|
|
|
|
for i in range(len(sorted_departures))
|
|
|
|
])}", MPV_SOCKET)
|
|
|
|
else:
|
|
|
|
for dep in sorted_departures:
|
|
|
|
print(format_departure(dep))
|
|
|
|
|