init commit
This commit is contained in:
commit
4ad365a2b4
|
@ -0,0 +1 @@
|
|||
__pycache__
|
|
@ -0,0 +1,5 @@
|
|||
# Alarmed
|
||||
|
||||
The alarmed daemon.
|
||||
|
||||
Lightweight package for keeping track of alarms that run custom commands when they go off.
|
|
@ -0,0 +1,23 @@
|
|||
from setuptools import setup
|
||||
|
||||
with open("README", 'r') as f:
|
||||
long_description = f.read()
|
||||
|
||||
setup(
|
||||
name = 'alarmed',
|
||||
version = '1.0',
|
||||
description = 'A tool to keep track of alarms, and make them execute commands',
|
||||
license = 'MIT',
|
||||
long_description = long_description,
|
||||
author ='h7x4',
|
||||
author_email ='h7x4abk3g@protonmail.com',
|
||||
url ="https://www.github.com/h7x4ABk3g/alarmed",
|
||||
packages = ['alarmed'],
|
||||
install_requires = ['xdg', 'python-daemon'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'alarme = src.alarme:main',
|
||||
'alarmed = src.alarmed:main',
|
||||
]
|
||||
},
|
||||
)
|
|
@ -0,0 +1,47 @@
|
|||
from datetime import datetime
|
||||
from json import dump, load
|
||||
from uuid import uuid4
|
||||
from os import walk, path
|
||||
|
||||
from common import PREVIOUS_ALARM_DIR, ALARM_DIR, list_files
|
||||
|
||||
class Alarm:
|
||||
def __init__(self, timestamp, comment=None, command=None, id=None):
|
||||
self.timestamp = timestamp
|
||||
self.comment = comment
|
||||
self.command = command
|
||||
self.id = id or self.generateId()
|
||||
|
||||
def __str__(self):
|
||||
return str(self.__dict__)
|
||||
|
||||
@classmethod
|
||||
def readFromFile(cls, path):
|
||||
with open(path) as file:
|
||||
data = load(file)
|
||||
alarm = cls(data['timestamp'], data['comment'], data['command'], data['id'])
|
||||
return alarm
|
||||
|
||||
def writeToFile(self, path):
|
||||
with open(path, 'w') as file:
|
||||
dump(self.__dict__, file)
|
||||
|
||||
def generateId(self):
|
||||
alarm_files = list_files(ALARM_DIR)
|
||||
prev_alarm_files = list_files(PREVIOUS_ALARM_DIR)
|
||||
|
||||
i = 1
|
||||
while hex(i)[2:].upper() in alarm_files + prev_alarm_files:
|
||||
i += 1
|
||||
|
||||
return hex(i)[2:].upper()
|
||||
|
||||
def getTimeLeft(self):
|
||||
timeleft = datetime.fromtimestamp(self.timestamp) - datetime.now()
|
||||
hs = str(timeleft.seconds // 3600).zfill(2)
|
||||
ms = str((timeleft.seconds // 60) % 60).zfill(2)
|
||||
ss = str(timeleft.seconds % 60).zfill(2)
|
||||
if timeleft.days:
|
||||
return f'{timeleft.days} days, {hs}:{ms}:{ss}'
|
||||
else:
|
||||
return f'{hs}:{ms}:{ss}'
|
|
@ -0,0 +1,138 @@
|
|||
import argparse
|
||||
from datetime import date, time, datetime, timedelta
|
||||
from os import walk, path, replace
|
||||
from pathlib import Path
|
||||
|
||||
from Alarm import Alarm
|
||||
from common import ALARM_DIR, PREVIOUS_ALARM_DIR, print_error, list_files
|
||||
|
||||
def get_time_str(file):
|
||||
return str(file)
|
||||
|
||||
def print_alarms(args):
|
||||
files = list_files(ALARM_DIR)
|
||||
|
||||
if files == []:
|
||||
print('No alarms')
|
||||
return
|
||||
|
||||
if args.id:
|
||||
filepath = path.join(ALARM_DIR, args.id)
|
||||
if not path.exists(filepath):
|
||||
print_error(f'Alarm with id {args.id} does not exist')
|
||||
alarms = [Alarm.readFromFile(filepath)]
|
||||
else:
|
||||
alarms = [Alarm.readFromFile(path.join(ALARM_DIR, file)) for file in files]
|
||||
|
||||
print(args.sep.join(alarm.getTimeLeft() for alarm in alarms))
|
||||
|
||||
##### Time input parsers
|
||||
|
||||
def parse_clock_time(t): # ex. "-t 18:20"
|
||||
time_offset = time(hour=int(t[0:2]), minute=int(t[3:5]))
|
||||
time_now = datetime.now()
|
||||
time_now_delta = timedelta(hours=time_now.hour, minutes=time_now.minute, seconds=time_now.second)
|
||||
time_offset_delta = timedelta(hours=time_offset.hour, minutes=time_offset.minute, seconds=time_offset.second)
|
||||
|
||||
datetime_today = datetime.combine(date.today(), time(second=0))
|
||||
|
||||
if time_offset_delta > time_now_delta:
|
||||
alarm_time = datetime_today + time_offset_delta
|
||||
else:
|
||||
alarm_time = datetime_today + timedelta(days=1) + time_offset_delta
|
||||
|
||||
return alarm_time
|
||||
|
||||
def parse_seconds_time(t): # ex. "-s 3600"
|
||||
return datetime.now() + timedelta(seconds=int(t))
|
||||
|
||||
def choose_gui_time():
|
||||
print('Function not implemented yet')
|
||||
exit(1)
|
||||
|
||||
def parse_default_time(t): # ex. "00:40:10"
|
||||
time_until_alarm = timedelta(hours=int(t[0:2]), minutes=int(t[3:5]), seconds=int(t[7:9]))
|
||||
return datetime.now() + time_until_alarm
|
||||
|
||||
#####
|
||||
|
||||
def set_alarm(args):
|
||||
if args.is_clock_time:
|
||||
alarm_time = parse_clock_time(args.time)
|
||||
elif args.is_seconds:
|
||||
alarm_time = parse_seconds_time(args.time)
|
||||
elif args.is_gui:
|
||||
alarm_time = choose_gui_time()
|
||||
else:
|
||||
alarm_time = parse_default_time(args.time)
|
||||
|
||||
alarm = Alarm(alarm_time.timestamp())
|
||||
print('Made alarm - ' + str(alarm))
|
||||
# print(alarm.getTimeLeft())
|
||||
alarm.writeToFile(path.join(ALARM_DIR, alarm.id))
|
||||
|
||||
def deactivate_alarm(args):
|
||||
file_path = path.join(ALARM_DIR, args.id)
|
||||
if not path.exists(file_path):
|
||||
print(f'[ERROR] alarm with ID "{args.id}" does not exist')
|
||||
exit(1)
|
||||
|
||||
replace(file_path, path.join(PREVIOUS_ALARM_DIR, args.id))
|
||||
|
||||
def main():
|
||||
argparser = argparse.ArgumentParser(description='Get and set alarms for the \'Alarmed\' daemon.')
|
||||
argparser.set_defaults(func=lambda _: argparser.print_help())
|
||||
|
||||
# ------------------------------------------------------------------------ #
|
||||
|
||||
subargparsers = argparser.add_subparsers(
|
||||
title='subcommands',
|
||||
description='Run \'alarme <subcommand> --help\' for more information',
|
||||
help='subcommand description')
|
||||
|
||||
# ------------------------------------------------------------------------ #
|
||||
|
||||
get_parser = subargparsers.add_parser('get', help='print out the currently active alarms')
|
||||
get_parser.add_argument('-i', '--id', metavar='ID',
|
||||
help='specify the id of the alarm to print')
|
||||
|
||||
get_parser.set_defaults(sep=' | ')
|
||||
get_parser.set_defaults(func=print_alarms)
|
||||
|
||||
# ------------------------------------------------------------------------ #
|
||||
|
||||
set_parser = subargparsers.add_parser('set', help='make a new alarm')
|
||||
|
||||
set_time_format_group = set_parser.add_mutually_exclusive_group()
|
||||
set_time_format_group.add_argument('-s', '--seconds', dest='is_seconds', action='store_true',
|
||||
help='specify the remaining time as seconds')
|
||||
set_time_format_group.add_argument('-t', '--to', dest='is_clock_time', action='store_true',
|
||||
help='specify a clock time for the alarm to go off. If the clock is earlier than the current time, it wraps around to tomorrow. Format: "HH:MM"')
|
||||
set_time_format_group.add_argument('-g', '--gui', dest='is_gui', action='store_true',
|
||||
help='use a curses based gui to choose the time')
|
||||
|
||||
set_parser.add_argument('time', metavar='TIME', help='Time until or for the alarm to go off. Default format: "HH:MM:SS"')
|
||||
|
||||
set_parser.add_argument('-c', '--command', metavar='CMD',
|
||||
help='specify a command to run as the alarm goes off')
|
||||
set_parser.add_argument('-m', '--message', metavar='MSG',
|
||||
help='add a comment as to what the alarm means')
|
||||
|
||||
set_parser.set_defaults(func=set_alarm)
|
||||
|
||||
# ------------------------------------------------------------------------ #
|
||||
|
||||
deactivation_parser = subargparsers.add_parser('deactivate', help='deactivate one of the alarms')
|
||||
deactivation_parser.add_argument('-g', '--gui', dest='is_gui', action='store_true',
|
||||
help='use a curses based gui to choose an alarm to remove')
|
||||
deactivation_parser.add_argument('id', metavar='ID')
|
||||
deactivation_parser.set_defaults(func=deactivate_alarm)
|
||||
|
||||
# ------------------------------------------------------------------------ #
|
||||
|
||||
args = argparser.parse_args()
|
||||
|
||||
args.func(args)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,60 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
from daemon import DaemonContext
|
||||
from xdg import XDG_DATA_HOME, XDG_CONFIG_HOME
|
||||
from datetime import datetime
|
||||
from os import walk, replace, path
|
||||
from subprocess import run
|
||||
from time import sleep
|
||||
from pathlib import Path
|
||||
|
||||
from Alarm import Alarm
|
||||
from common import ALARM_DIR, PREVIOUS_ALARM_DIR, DEFAULT_ALARM_COMMAND, list_files
|
||||
|
||||
# TODO: Make this function modify global state in common.py
|
||||
def init_data_folders(datadir=XDG_DATA_HOME, configdir=XDG_CONFIG_HOME):
|
||||
datadir.joinpath('alarmed/alarms').mkdir(parents=True, exist_ok=True)
|
||||
datadir.joinpath('alarmed/previous_alarms').mkdir(parents=True, exist_ok=True)
|
||||
configdir.joinpath('alarmed').mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def is_finished(file):
|
||||
alarm = Alarm.readFromFile(path.join(ALARM_DIR, file))
|
||||
return alarm.timestamp <= datetime.now().timestamp()
|
||||
|
||||
async def execute_alarm_command(alarm):
|
||||
run(alarm.command or DEFAULT_ALARM_COMMAND.replace('%I', alarm.id).replace('%C', alarm.comment or ''), shell=True)
|
||||
|
||||
def move_finished_alarm(alarm_file, finished_alarm_dir=PREVIOUS_ALARM_DIR):
|
||||
replace(
|
||||
path.join(ALARM_DIR, alarm_file),
|
||||
path.join(finished_alarm_dir, alarm_file)
|
||||
)
|
||||
|
||||
async def execute_finished_alarms():
|
||||
files = list_files(ALARM_DIR)
|
||||
|
||||
if files == []:
|
||||
return
|
||||
|
||||
finished_alarms = [file for file in files if is_finished(file)]
|
||||
|
||||
for file in finished_alarms:
|
||||
alarm = Alarm.readFromFile(path.join(ALARM_DIR, file))
|
||||
print(f'[{datetime.now().strftime("%H:%M")}] Executing alarm with ID {alarm.id}')
|
||||
move_finished_alarm(file)
|
||||
asyncio.create_task(execute_alarm_command(alarm))
|
||||
|
||||
async def alarm_check_loop():
|
||||
while True:
|
||||
await execute_finished_alarms()
|
||||
sleep(1)
|
||||
|
||||
async def main():
|
||||
# with DaemonContext():
|
||||
init_data_folders()
|
||||
await alarm_check_loop()
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
|
@ -0,0 +1,18 @@
|
|||
from pathlib import Path
|
||||
from xdg import XDG_DATA_HOME, XDG_CONFIG_HOME
|
||||
from os import walk
|
||||
|
||||
ALARM_DIR = XDG_DATA_HOME.joinpath('alarmed/alarms').absolute()
|
||||
PREVIOUS_ALARM_DIR = XDG_DATA_HOME.joinpath('alarmed/previous_alarms').absolute()
|
||||
DEFAULT_ALARM_COMMAND = 'notify-send "Alarm %I: %C"'
|
||||
|
||||
def print_error(msg):
|
||||
print('\033[31m[ERROR]\033[0m ' + msg)
|
||||
|
||||
def list_files(dir):
|
||||
try:
|
||||
_,_,files = next(walk(dir))
|
||||
except StopIteration:
|
||||
files = []
|
||||
|
||||
return files
|
|
@ -0,0 +1,26 @@
|
|||
import curses
|
||||
|
||||
from common import ALARM_DIR, list_files
|
||||
|
||||
def choose_from_list(l):
|
||||
screen = curses.initscr()
|
||||
screen.addstr(0, 0, "Hello 1 from position (0, 0)")
|
||||
screen.addstr(3, 1, "Hello 2 from (3, 1)")
|
||||
screen.addstr(4, 4, "X")
|
||||
screen.addch(5, 5, "Y")
|
||||
screen.refresh()
|
||||
|
||||
curses.napms(2000)
|
||||
curses.endwin()
|
||||
pass
|
||||
|
||||
def choose_clock():
|
||||
pass
|
||||
|
||||
def choose_alarm_id():
|
||||
ids = list_files(ALARM_DIR)
|
||||
chosen_id = choose_from_list(ids)
|
||||
return chosen_id
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(choose_alarm_id())
|
Reference in New Issue