crystal-orb/crystal_orb.py

70 lines
2.5 KiB
Python

import json
import requests
import sys
import os
import subprocess
# See github project: https://github.com/mpolden/atb
ATB_API_URL = "https://mpolden.no/atb/v2/departures/"
# Stop place codes found through https://stoppested.entur.org/
STOP_PLACES = {
"gløshaugen": "44085",
"høgskoleringen": "42029",
"hesthagen": "41620"
}
def get_departures(stop_place: str, atb_api_url: str = ATB_API_URL):
return requests.get(atb_api_url + stop_place).json()["departures"]
def format_departure(departure_dict: dict) -> str:
"""Formats the json supplied by mpolden's atb api to a single line of text
quickly summarizing the scheduled bus.
"""
line = departure_dict["line"]
departure_time = departure_dict["scheduledDepartureTime"].split("T")[1].split(".")[0]
destination = departure_dict["destination"]
is_real_time = departure_dict["isRealtimeData"]
towards_city_center = departure_dict["isGoingTowardsCentrum"]
# return f"{departure_time}: {line}\t({'R' if is_real_time else 'S'}, {'I' if towards_city_center else 'O'}) {destination}"
return f"{departure_time}: {line}\t{destination}\t ({'realtime' if is_real_time else 'aimed'},\t{'til Midtbyen' if towards_city_center else 'fra Midtbyen'})"
def write_to_socket(s: str, mpv_socket) -> None:
"""Writes the string s to the mpv on screen display through
the open mpv ipc socket located at the path `mpv_socket`.
"""
cmd_dict = {"command": ["show-text", f"{s}", 10000]}
mpv_cmd = json.dumps(cmd_dict)
cmd = f'echo \'{mpv_cmd}\' | socat - ./{mpv_socket}'
subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return None
if __name__ == "__main__":
try:
input_stop_place = sys.argv[1]
except:
input_stop_place = "no input was given"
if input_stop_place in STOP_PLACES.keys():
active_stop_place = STOP_PLACES[input_stop_place]
else:
active_stop_place = STOP_PLACES["gløshaugen"]
departures = get_departures(active_stop_place)
sorted_departures = sorted(departures, key = lambda d: d["scheduledDepartureTime"])
# ensure mpv is running with the ipc-server socket set in MPV_SOCKET
MPV_SOCKET = "mpv_ipc.socket"
if os.path.exists(MPV_SOCKET):
write_to_socket(
f"{'\n'.join([
format_departure(sorted_departures[i])
for i in range(len(sorted_departures))
])}", MPV_SOCKET)
else:
for dep in sorted_departures:
print(format_departure(dep))