Compare commits
4 Commits
a352264efe
...
6fafd79c3e
Author | SHA1 | Date | |
---|---|---|---|
6fafd79c3e | |||
a07f58551d | |||
331d3bb608 | |||
034e923bb7 |
@ -13,6 +13,9 @@ STOP_PLACES = {
|
|||||||
"hesthagen": "41620"
|
"hesthagen": "41620"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def get_departures(stop_place: str, atb_api_url: str = ATB_API_URL):
|
||||||
|
return requests.get(atb_api_url + stop_place).json()["departures"]
|
||||||
|
|
||||||
def format_departure(departure_dict: dict) -> str:
|
def format_departure(departure_dict: dict) -> str:
|
||||||
"""Formats the json supplied by mpolden's atb api to a single line of text
|
"""Formats the json supplied by mpolden's atb api to a single line of text
|
||||||
quickly summarizing the scheduled bus.
|
quickly summarizing the scheduled bus.
|
||||||
@ -39,14 +42,17 @@ def write_to_socket(s: str, mpv_socket) -> None:
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
input_stop_place = sys.argv[1]
|
try:
|
||||||
|
input_stop_place = sys.argv[1]
|
||||||
|
except:
|
||||||
|
input_stop_place = "no input was given"
|
||||||
|
|
||||||
if input_stop_place in STOP_PLACES.keys():
|
if input_stop_place in STOP_PLACES.keys():
|
||||||
active_stop_place = STOP_PLACES[input_stop_place]
|
active_stop_place = STOP_PLACES[input_stop_place]
|
||||||
else:
|
else:
|
||||||
active_stop_place = STOP_PLACES["gløshaugen"]
|
active_stop_place = STOP_PLACES["gløshaugen"]
|
||||||
|
|
||||||
r = requests.get(ATB_API_URL + active_stop_place)
|
departures = get_departures(active_stop_place)
|
||||||
departures = r.json()["departures"]
|
|
||||||
sorted_departures = sorted(departures, key = lambda d: d["scheduledDepartureTime"])
|
sorted_departures = sorted(departures, key = lambda d: d["scheduledDepartureTime"])
|
||||||
|
|
||||||
# ensure mpv is running with the ipc-server socket set in MPV_SOCKET
|
# ensure mpv is running with the ipc-server socket set in MPV_SOCKET
|
||||||
|
162
crystal_orb_cli.py
Normal file
162
crystal_orb_cli.py
Normal file
@ -0,0 +1,162 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
import dataclasses
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from crystal_orb import get_departures
|
||||||
|
from crystal_orb import STOP_PLACES
|
||||||
|
|
||||||
|
def pad(s, total_length, justification="l"):
|
||||||
|
if len(s) >= total_length:
|
||||||
|
return s[0:total_length]
|
||||||
|
else:
|
||||||
|
surplus = total_length - len(s)
|
||||||
|
out_str = " " * total_length
|
||||||
|
if justification == "l":
|
||||||
|
return s + " "*surplus
|
||||||
|
elif justification == "m":
|
||||||
|
return " "*(surplus//2) + s + " "*(surplus - surplus//2)
|
||||||
|
elif justification == "r":
|
||||||
|
return " "*surplus + s
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Justification was {justification}, which is not l, m or r.")
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Departure:
|
||||||
|
line: int
|
||||||
|
departure_time: datetime
|
||||||
|
destination: str
|
||||||
|
is_realtime: bool
|
||||||
|
towards_midtbyen: bool
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
self.departure_time = datetime.fromisoformat(self.departure_time)
|
||||||
|
|
||||||
|
def colorize(self):
|
||||||
|
out_str = ""
|
||||||
|
out_str += "\033[2;20;35m" + str(self.departure_time.time()) + "\033[0m"
|
||||||
|
out_str += ": "
|
||||||
|
out_str += pad(str(self.line), 4, justification="r")
|
||||||
|
out_str += " "
|
||||||
|
out_str += "\033[3;37m" + pad(self.destination, 35+19) + "\033[0m"
|
||||||
|
out_str += " ("
|
||||||
|
if self.is_realtime: out_str += "\033[0;32mR\033[0m"
|
||||||
|
else: out_str += " "
|
||||||
|
|
||||||
|
out_str += ", "
|
||||||
|
if self.towards_midtbyen: out_str += "\033[0;32mTo \033[0m"
|
||||||
|
else: out_str += "\033[0;31mFrom\033[0m"
|
||||||
|
|
||||||
|
out_str += ")"
|
||||||
|
return out_str
|
||||||
|
|
||||||
|
|
||||||
|
def get_departure_as_classes(stop_place: str):
|
||||||
|
departures = get_departures(STOP_PLACES[stop_place])
|
||||||
|
departures = [
|
||||||
|
Departure(
|
||||||
|
d["line"],
|
||||||
|
d["scheduledDepartureTime"],
|
||||||
|
d["destination"],
|
||||||
|
d["isRealtimeData"],
|
||||||
|
d["isGoingTowardsCentrum"])
|
||||||
|
for d in departures
|
||||||
|
]
|
||||||
|
return sorted(departures, key = lambda d: d.departure_time)
|
||||||
|
|
||||||
|
def print_colorized_departures(stop_place: str):
|
||||||
|
departures = get_departure_as_classes(stop_place)
|
||||||
|
for dep in departures:
|
||||||
|
print(dep.colorize())
|
||||||
|
|
||||||
|
def print_help_info():
|
||||||
|
print(
|
||||||
|
"""Help info for the crystal orb cli.
|
||||||
|
Possible commands:
|
||||||
|
<stop place>
|
||||||
|
query the api for all busses from the given stop place
|
||||||
|
-h or --help
|
||||||
|
Show this screen
|
||||||
|
-l or --list-stop-places
|
||||||
|
List the admissible stop places.
|
||||||
|
This is configured in the STOP_PLACES dict in crystal_orb.py
|
||||||
|
-n or --next
|
||||||
|
Limit the response to only the next bus matching the query
|
||||||
|
-b <line> or --bus <line>
|
||||||
|
Limit the response to only the supplied bus line
|
||||||
|
-s <place> or --stop <place>
|
||||||
|
Change the stop place to the supplied place
|
||||||
|
-t or --to
|
||||||
|
Limit the response to buses going into town
|
||||||
|
-f or --from
|
||||||
|
Limit the response to buses going away from town""")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
args = sys.argv
|
||||||
|
# If no arguments are passed, we just print the default stop
|
||||||
|
if len(args) < 2:
|
||||||
|
print_colorized_departures("gløshaugen")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
if "-h" in args or "--help" in args:
|
||||||
|
print_help_info()
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# Values for control flow and processing on the available departures
|
||||||
|
stop_place = "gløshaugen"
|
||||||
|
bus_line = None
|
||||||
|
towards_city_center = None
|
||||||
|
take_one = False
|
||||||
|
|
||||||
|
# Parse input arguments to the program
|
||||||
|
for i, arg in enumerate(args):
|
||||||
|
if i == 1 and arg[0] != "-": # First argument is taken as the stop place
|
||||||
|
stop_place = arg.lower()
|
||||||
|
elif arg == "-n" or arg == "--next": # Limits to only giving "the next bus" matching the query
|
||||||
|
take_one = True
|
||||||
|
elif arg == "-b" or arg == "--bus": # Limit to specific bus line
|
||||||
|
try:
|
||||||
|
bus_line = args[i+1]
|
||||||
|
except:
|
||||||
|
print("Bus line not supplied to -b or --bus.")
|
||||||
|
elif arg == "--list-stop-places" or arg == "-l": # Just print stop places
|
||||||
|
print("Admissible stop places:")
|
||||||
|
for place in STOP_PLACES.keys():
|
||||||
|
print(f"\t{place}")
|
||||||
|
sys.exit()
|
||||||
|
elif arg == "-t" or arg == "--to": # Limit to buses going to town
|
||||||
|
towards_city_center = True
|
||||||
|
elif arg == "-f" or arg == "--from": # Limit to buses leaving town
|
||||||
|
towards_city_center = False
|
||||||
|
elif arg == "-s" or arg == "--stop": # Change stop
|
||||||
|
try:
|
||||||
|
stop_place = args[i+1].lower()
|
||||||
|
except:
|
||||||
|
print("Stop place not supplied to -s or --stop")
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Arguments are parsed, we can execute the query
|
||||||
|
try:
|
||||||
|
deps = get_departure_as_classes(stop_place)
|
||||||
|
except:
|
||||||
|
print(f"Stop {stop_place} not admissible. Run --list-stop-places for help.")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
if bus_line is not None:
|
||||||
|
deps = list(filter(lambda d: str(d.line) == bus_line, deps))
|
||||||
|
if towards_city_center is not None:
|
||||||
|
if towards_city_center:
|
||||||
|
deps = list(filter(lambda d: d.towards_midtbyen, deps))
|
||||||
|
else:
|
||||||
|
deps = list(filter(lambda d: not d.towards_midtbyen, deps))
|
||||||
|
|
||||||
|
if take_one:
|
||||||
|
print(deps[0].colorize())
|
||||||
|
else:
|
||||||
|
for dep in deps:
|
||||||
|
print(dep.colorize())
|
||||||
|
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user