treewide: lint and format

This commit is contained in:
2026-02-12 17:32:11 +09:00
parent e5d4a846d2
commit 5e94e1a902
42 changed files with 401 additions and 323 deletions

View File

@@ -50,3 +50,37 @@ cleanmigrations = "git clean -f worblehat/models/migrations/versions"
[tool.uv.sources]
libdib = { git = "https://git.pvv.ntnu.no/Projects/libdib.git" }
[tool.black]
line-length = 100
[tool.ruff]
line-length = 100
[tool.ruff.lint]
select = [
"A", # flake8-builtins
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"COM", # flake8-commas
"ANN",
# "E", # pycodestyle
# "F", # Pyflakes
"FA", # flake8-future-annotations
"I", # isort
"S", # flake8-bandit
"ICN", # flake8-import-conventions
"ISC", # flake8-implicit-str-concat
# "N", # pep8-naming
"PTH", # flake8-use-pathlib
"RET", # flake8-return
"SIM", # flake8-simplify
"TC", # flake8-type-checking
"UP", # pyupgrade
"YTT", # flake8-2020
]
ignore = [
# "E501", # line too long
"S101", # assert detected
# "S311", # non-cryptographic random generator
]

View File

@@ -1,30 +1,28 @@
from textwrap import dedent
from datetime import datetime
from textwrap import dedent
from libdib.repl import (
InteractiveItemSelector,
NumberedCmd,
prompt_yes_no,
)
from sqlalchemy import (
event,
select,
)
from sqlalchemy.orm import Session
from libdib.repl import (
NumberedCmd,
InteractiveItemSelector,
prompt_yes_no,
)
from worblehat.models import *
from worblehat.services import (
create_bookcase_item_from_isbn,
is_valid_isbn,
)
from worblehat.models import *
from .subclis import (
AdvancedOptionsCli,
BookcaseItemCli,
select_bookcase_shelf,
SearchCli,
select_bookcase_shelf,
)
# TODO: Category seems to have been forgotten. Maybe relevant interactivity should be added?
@@ -33,24 +31,24 @@ from .subclis import (
class WorblehatCli(NumberedCmd):
def __init__(self, sql_session: Session):
def __init__(self, sql_session: Session) -> None:
super().__init__()
self.sql_session = sql_session
self.sql_session_dirty = False
@event.listens_for(self.sql_session, "after_flush")
def mark_session_as_dirty(*_):
def mark_session_as_dirty(*_) -> None:
self.sql_session_dirty = True
self.prompt_header = "(unsaved changes)"
@event.listens_for(self.sql_session, "after_commit")
@event.listens_for(self.sql_session, "after_rollback")
def mark_session_as_clean(*_):
def mark_session_as_clean(*_) -> None:
self.sql_session_dirty = False
self.prompt_header = None
@classmethod
def run_with_safe_exit_wrapper(cls, sql_session: Session):
def run_with_safe_exit_wrapper(cls, sql_session: Session) -> None:
tool = cls(sql_session)
while True:
try:
@@ -61,7 +59,8 @@ class WorblehatCli(NumberedCmd):
try:
print()
if prompt_yes_no(
"Are you sure you want to exit without saving?", default=False
"Are you sure you want to exit without saving?",
default=False,
):
raise KeyboardInterrupt
except KeyboardInterrupt:
@@ -69,7 +68,7 @@ class WorblehatCli(NumberedCmd):
tool.sql_session.rollback()
exit(0)
def do_show_bookcase(self, arg: str):
def do_show_bookcase(self, arg: str) -> None:
bookcase_selector = InteractiveItemSelector(
cls=Bookcase,
sql_session=self.sql_session,
@@ -82,7 +81,7 @@ class WorblehatCli(NumberedCmd):
for item in shelf.items:
print(f" {item.name} - {item.amount} copies")
def do_show_borrowed_queued(self, _: str):
def do_show_borrowed_queued(self, _: str) -> None:
borrowed_items = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(BookcaseItemBorrowing.delivered.is_(None))
@@ -95,14 +94,14 @@ class WorblehatCli(NumberedCmd):
print("Borrowed items:")
for item in borrowed_items:
print(
f"- {item.username} - {item.item.name} - to be delivered by {item.end_time.strftime('%Y-%m-%d')}"
f"- {item.username} - {item.item.name} - to be delivered by {item.end_time.strftime('%Y-%m-%d')}",
)
print()
queued_items = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue).order_by(
BookcaseItemBorrowingQueue.entered_queue_time
BookcaseItemBorrowingQueue.entered_queue_time,
),
).all()
@@ -112,26 +111,25 @@ class WorblehatCli(NumberedCmd):
print("Users in queue:")
for item in queued_items:
print(
f"- {item.username} - {item.item.name} - entered queue at {item.entered_queue_time.strftime('%Y-%m-%d')}"
f"- {item.username} - {item.item.name} - entered queue at {item.entered_queue_time.strftime('%Y-%m-%d')}",
)
def _create_bookcase_item(self, isbn: str):
def _create_bookcase_item(self, isbn: str) -> None:
bookcase_item = create_bookcase_item_from_isbn(isbn, self.sql_session)
if bookcase_item is None:
print(f"Could not find data about item with ISBN {isbn} online.")
print(
"If you think this is not due to a bug, please add the book to openlibrary.org before continuing."
"If you think this is not due to a bug, please add the book to openlibrary.org before continuing.",
)
return
else:
print(
dedent(f"""
print(
dedent(f"""
Found item:
title: {bookcase_item.name}
authors: {", ".join(a.name for a in bookcase_item.authors)}
language: {bookcase_item.language}
""")
)
"""),
)
print("Please select the bookcase where the item is placed:")
bookcase_selector = InteractiveItemSelector(
@@ -162,7 +160,7 @@ class WorblehatCli(NumberedCmd):
self.sql_session.add(bookcase_item)
self.sql_session.flush()
def default(self, isbn: str):
def default(self, isbn: str) -> None:
isbn = isbn.strip()
if not is_valid_isbn(isbn):
super()._default(isbn)
@@ -173,7 +171,7 @@ class WorblehatCli(NumberedCmd):
select(BookcaseItem)
.where(BookcaseItem.isbn == isbn)
.join(BookcaseItemBorrowing)
.join(BookcaseItemBorrowingQueue)
.join(BookcaseItemBorrowingQueue),
).one_or_none()
) is not None:
print(f'\nFound existing item for isbn "{isbn}"')
@@ -189,7 +187,7 @@ class WorblehatCli(NumberedCmd):
):
self._create_bookcase_item(isbn)
def do_search(self, _: str):
def do_search(self, _: str) -> None:
search_cli = SearchCli(self.sql_session)
search_cli.cmdloop()
if search_cli.result is not None:
@@ -198,7 +196,7 @@ class WorblehatCli(NumberedCmd):
bookcase_item=search_cli.result,
).cmdloop()
def do_show_slabbedasker(self, _: str):
def do_show_slabbedasker(self, _: str) -> None:
slubberter = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.join(BookcaseItem)
@@ -218,25 +216,25 @@ class WorblehatCli(NumberedCmd):
for slubbert in slubberter:
print("Slubberter:")
print(
f"- {slubbert.username} - {slubbert.item.name} - {slubbert.end_time.strftime('%Y-%m-%d')}"
f"- {slubbert.username} - {slubbert.item.name} - {slubbert.end_time.strftime('%Y-%m-%d')}",
)
def do_advanced(self, _: str):
def do_advanced(self, _: str) -> None:
AdvancedOptionsCli(self.sql_session).cmdloop()
def do_save(self, _: str):
def do_save(self, _: str) -> None:
if not self.sql_session_dirty:
print("No changes to save.")
return
self.sql_session.commit()
def do_abort(self, _: str):
def do_abort(self, _: str) -> None:
if not self.sql_session_dirty:
print("No changes to abort.")
return
self.sql_session.rollback()
def do_exit(self, _: str):
def do_exit(self, _: str) -> None:
if self.sql_session_dirty:
if prompt_yes_no("Would you like to save your changes?"):
self.sql_session.commit()

View File

@@ -1,19 +1,19 @@
from sqlalchemy import select
from sqlalchemy.orm import Session
from libdib.repl import (
InteractiveItemSelector,
NumberedCmd,
)
from sqlalchemy import select
from sqlalchemy.orm import Session
from worblehat.models import Bookcase, BookcaseShelf
class AdvancedOptionsCli(NumberedCmd):
def __init__(self, sql_session: Session):
def __init__(self, sql_session: Session) -> None:
super().__init__()
self.sql_session = sql_session
def do_add_bookcase(self, _: str):
def do_add_bookcase(self, _: str) -> None:
while True:
name = input("Name of bookcase> ")
if name == "":
@@ -22,7 +22,7 @@ class AdvancedOptionsCli(NumberedCmd):
if (
self.sql_session.scalars(
select(Bookcase).where(Bookcase.name == name)
select(Bookcase).where(Bookcase.name == name),
).one_or_none()
is not None
):
@@ -39,13 +39,14 @@ class AdvancedOptionsCli(NumberedCmd):
self.sql_session.add(bookcase)
self.sql_session.flush()
def do_add_bookcase_shelf(self, arg: str):
def do_add_bookcase_shelf(self, arg: str) -> None:
bookcase_selector = InteractiveItemSelector(
cls=Bookcase,
sql_session=self.sql_session,
)
bookcase_selector.cmdloop()
bookcase = bookcase_selector.result
assert isinstance(bookcase, Bookcase)
while True:
column = input("Column> ")
@@ -71,12 +72,12 @@ class AdvancedOptionsCli(NumberedCmd):
BookcaseShelf.bookcase == bookcase,
BookcaseShelf.column == column,
BookcaseShelf.row == row,
)
),
).one_or_none()
is not None
):
print(
f"Error: a bookshelf in bookcase {bookcase.name} with position c{column}-r{row} already exists"
f"Error: a bookshelf in bookcase {bookcase.name} with position c{column}-r{row} already exists",
)
return
@@ -93,7 +94,7 @@ class AdvancedOptionsCli(NumberedCmd):
self.sql_session.add(shelf)
self.sql_session.flush()
def do_list_bookcases(self, _: str):
def do_list_bookcases(self, _: str) -> None:
bookcase_shelfs = self.sql_session.scalars(
select(BookcaseShelf)
.join(Bookcase)
@@ -101,7 +102,7 @@ class AdvancedOptionsCli(NumberedCmd):
Bookcase.name,
BookcaseShelf.column,
BookcaseShelf.row,
)
),
).all()
bookcase_uid = None
@@ -112,7 +113,7 @@ class AdvancedOptionsCli(NumberedCmd):
print(f" {shelf.short_str()} - {sum(i.amount for i in shelf.items)} items")
def do_done(self, _: str):
def do_done(self, _: str) -> bool:
return True
funcs = {

View File

@@ -41,7 +41,7 @@ def _selected_bookcase_item_prompt(bookcase_item: BookcaseItem) -> str:
class BookcaseItemCli(NumberedCmd):
def __init__(self, sql_session: Session, bookcase_item: BookcaseItem):
def __init__(self, sql_session: Session, bookcase_item: BookcaseItem) -> None:
super().__init__()
self.sql_session = sql_session
self.bookcase_item = bookcase_item
@@ -50,7 +50,7 @@ class BookcaseItemCli(NumberedCmd):
def prompt_header(self) -> str:
return _selected_bookcase_item_prompt(self.bookcase_item)
def do_update_data(self, _: str):
def do_update_data(self, _: str) -> None:
item = create_bookcase_item_from_isbn(
str(self.bookcase_item.isbn),
self.sql_session,
@@ -66,7 +66,7 @@ class BookcaseItemCli(NumberedCmd):
self.bookcase_item.language = item.language
self.sql_session.flush()
def do_edit(self, arg: str):
def do_edit(self, arg: str) -> None:
EditBookcaseCli(self.sql_session, self.bookcase_item, self).cmdloop()
@staticmethod
@@ -83,7 +83,7 @@ class BookcaseItemCli(NumberedCmd):
BookcaseItemBorrowing.username == username,
BookcaseItemBorrowing.item == self.bookcase_item,
BookcaseItemBorrowing.delivered.is_(None),
)
),
).one_or_none()
is not None
)
@@ -94,19 +94,19 @@ class BookcaseItemCli(NumberedCmd):
select(BookcaseItemBorrowingQueue).where(
BookcaseItemBorrowingQueue.username == username,
BookcaseItemBorrowingQueue.item == self.bookcase_item,
)
),
).one_or_none()
is not None
)
def do_borrow(self, _: str):
def do_borrow(self, _: str) -> None:
active_borrowings = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(
BookcaseItemBorrowing.item == self.bookcase_item,
BookcaseItemBorrowing.delivered.is_(None),
)
.order_by(BookcaseItemBorrowing.end_time)
.order_by(BookcaseItemBorrowing.end_time),
).all()
if len(active_borrowings) >= self.bookcase_item.amount:
@@ -125,7 +125,8 @@ class BookcaseItemCli(NumberedCmd):
print()
if not prompt_yes_no(
"Would you like to enter the borrowing queue?", default=True
"Would you like to enter the borrowing queue?",
default=True,
):
return
username = self._prompt_username()
@@ -139,7 +140,8 @@ class BookcaseItemCli(NumberedCmd):
return
borrowing_queue_item = BookcaseItemBorrowingQueue(
username, self.bookcase_item
username,
self.bookcase_item,
)
self.sql_session.add(borrowing_queue_item)
print(f"{username} entered the queue!")
@@ -151,10 +153,10 @@ class BookcaseItemCli(NumberedCmd):
self.sql_session.add(borrowing_item)
self.sql_session.flush()
print(
f"Successfully borrowed the item. Please deliver it back by {format_date(borrowing_item.end_time)}"
f"Successfully borrowed the item. Please deliver it back by {format_date(borrowing_item.end_time)}",
)
def do_deliver(self, _: str):
def do_deliver(self, _: str) -> None:
borrowings = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.join(
@@ -162,7 +164,7 @@ class BookcaseItemCli(NumberedCmd):
BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid,
)
.where(BookcaseItem.isbn == self.bookcase_item.isbn)
.order_by(BookcaseItemBorrowing.username)
.order_by(BookcaseItemBorrowing.username),
).all()
if len(borrowings) == 0:
@@ -191,7 +193,7 @@ class BookcaseItemCli(NumberedCmd):
self.sql_session.flush()
print(f"Successfully delivered the item for {borrowing.username}")
def do_extend_borrowing(self, _: str):
def do_extend_borrowing(self, _: str) -> None:
borrowings = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.join(
@@ -199,7 +201,7 @@ class BookcaseItemCli(NumberedCmd):
BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid,
)
.where(BookcaseItem.isbn == self.bookcase_item.isbn)
.order_by(BookcaseItemBorrowing.username)
.order_by(BookcaseItemBorrowing.username),
).all()
if len(borrowings) == 0:
@@ -212,12 +214,12 @@ class BookcaseItemCli(NumberedCmd):
BookcaseItemBorrowingQueue.item == self.bookcase_item,
BookcaseItemBorrowingQueue.item_became_available_time is None,
)
.order_by(BookcaseItemBorrowingQueue.entered_queue_time)
.order_by(BookcaseItemBorrowingQueue.entered_queue_time),
).all()
if len(borrowing_queue) != 0:
print(
"Sorry, you cannot extend the borrowing because there are people waiting in the queue"
"Sorry, you cannot extend the borrowing because there are people waiting in the queue",
)
print("Borrowing queue:")
for i, b in enumerate(borrowing_queue):
@@ -235,15 +237,15 @@ class BookcaseItemCli(NumberedCmd):
borrowing = selector.result
borrowing.end_time = datetime.now() + timedelta(
days=int(Config["deadline_daemon.days_before_queue_position_expires"])
days=int(Config["deadline_daemon.days_before_queue_position_expires"]),
)
self.sql_session.flush()
print(
f"Successfully extended the borrowing for {borrowing.username} until {format_date(borrowing.end_time)}"
f"Successfully extended the borrowing for {borrowing.username} until {format_date(borrowing.end_time)}",
)
def do_done(self, _: str):
def do_done(self, _: str) -> bool:
return True
funcs = {
@@ -275,9 +277,15 @@ class BookcaseItemCli(NumberedCmd):
class EditBookcaseCli(NumberedCmd):
bookcase_item: BookcaseItem
parent: BookcaseItemCli
def __init__(
self, sql_session: Session, bookcase_item: BookcaseItem, parent: BookcaseItemCli
):
self,
sql_session: Session,
bookcase_item: BookcaseItem,
parent: BookcaseItemCli,
) -> None:
super().__init__()
self.sql_session = sql_session
self.bookcase_item = bookcase_item
@@ -287,7 +295,7 @@ class EditBookcaseCli(NumberedCmd):
def prompt_header(self) -> str:
return _selected_bookcase_item_prompt(self.bookcase_item)
def do_name(self, _: str):
def do_name(self, _: str) -> None:
while True:
name = input("New name> ")
if name == "":
@@ -296,7 +304,7 @@ class EditBookcaseCli(NumberedCmd):
if (
self.sql_session.scalars(
select(BookcaseItem).where(BookcaseItem.name == name)
select(BookcaseItem).where(BookcaseItem.name == name),
).one_or_none()
is not None
):
@@ -307,7 +315,7 @@ class EditBookcaseCli(NumberedCmd):
self.bookcase_item.name = name
self.sql_session.flush()
def do_isbn(self, _: str):
def do_isbn(self, _: str) -> None:
while True:
isbn = input("New ISBN> ")
if isbn == "":
@@ -320,7 +328,7 @@ class EditBookcaseCli(NumberedCmd):
if (
self.sql_session.scalars(
select(BookcaseItem).where(BookcaseItem.isbn == isbn)
select(BookcaseItem).where(BookcaseItem.isbn == isbn),
).one_or_none()
is not None
):
@@ -335,7 +343,7 @@ class EditBookcaseCli(NumberedCmd):
self.parent.do_update_data("")
self.sql_session.flush()
def do_language(self, _: str):
def do_language(self, _: str) -> None:
language_selector = InteractiveItemSelector(
Language,
self.sql_session,
@@ -344,7 +352,7 @@ class EditBookcaseCli(NumberedCmd):
self.bookcase_item.language = language_selector.result
self.sql_session.flush()
def do_media_type(self, _: str):
def do_media_type(self, _: str) -> None:
media_type_selector = InteractiveItemSelector(
MediaType,
self.sql_session,
@@ -353,10 +361,8 @@ class EditBookcaseCli(NumberedCmd):
self.bookcase_item.media_type = media_type_selector.result
self.sql_session.flush()
def do_amount(self, _: str):
while (
new_amount := input(f"New amount [{self.bookcase_item.amount}]> ")
) != "":
def do_amount(self, _: str) -> None:
while (new_amount := input(f"New amount [{self.bookcase_item.amount}]> ")) != "":
try:
new_amount = int(new_amount)
except ValueError:
@@ -371,20 +377,21 @@ class EditBookcaseCli(NumberedCmd):
self.bookcase_item.amount = new_amount
self.sql_session.flush()
def do_shelf(self, _: str):
def do_shelf(self, _: str) -> None:
bookcase_selector = InteractiveItemSelector(
Bookcase,
self.sql_session,
)
bookcase_selector.cmdloop()
bookcase = bookcase_selector.result
assert isinstance(bookcase, Bookcase)
shelf = select_bookcase_shelf(bookcase, self.sql_session)
self.bookcase_item.shelf = shelf
self.sql_session.flush()
def do_done(self, _: str):
def do_done(self, _: str) -> bool:
return True
funcs = {

View File

@@ -1,8 +1,7 @@
from libdib.repl import InteractiveItemSelector
from sqlalchemy import select
from sqlalchemy.orm import Session
from libdib.repl import InteractiveItemSelector
from worblehat.models import (
Bookcase,
BookcaseShelf,
@@ -37,10 +36,12 @@ def select_bookcase_shelf(
cls.bookcase == bookcase,
cls.column == int(arg.split("-")[0]),
cls.row == int(arg.split("-")[1]),
)
),
).all(),
complete_selection=__complete_bookshelf_selection,
)
bookcase_shelf_selector.cmdloop()
return bookcase_shelf_selector.result
result = bookcase_shelf_selector.result
assert isinstance(result, BookcaseShelf)
return result

View File

@@ -1,24 +1,23 @@
from sqlalchemy import select
from sqlalchemy.orm import Session
from libdib.repl import (
NumberedCmd,
NumberedItemSelector,
)
from sqlalchemy import select
from sqlalchemy.orm import Session
from worblehat.models import Author, BookcaseItem
class SearchCli(NumberedCmd):
def __init__(self, sql_session: Session):
def __init__(self, sql_session: Session) -> None:
super().__init__()
self.sql_session = sql_session
self.result = None
def do_search_all(self, _: str):
def do_search_all(self, _: str) -> None:
print("TODO: Implement search all")
def do_search_title(self, _: str):
def do_search_title(self, _: str) -> bool | None:
while (input_text := input("Enter title: ")) == "":
pass
@@ -28,7 +27,7 @@ class SearchCli(NumberedCmd):
if len(items) == 0:
print("No items found.")
return
return None
selector = NumberedItemSelector(
items=items,
@@ -38,8 +37,9 @@ class SearchCli(NumberedCmd):
if selector.result is not None:
self.result = selector.result
return True
return None
def do_search_author(self, _: str):
def do_search_author(self, _: str) -> bool | None:
while (input_text := input("Enter author name: ")) == "":
pass
@@ -49,12 +49,12 @@ class SearchCli(NumberedCmd):
if len(author) == 0:
print("No authors found.")
return
elif len(author) == 1:
return None
if len(author) == 1:
selected_author = author[0]
print("Found author:")
print(
f" {selected_author.name} ({sum(item.amount for item in selected_author.items)} items)"
f" {selected_author.name} ({sum(item.amount for item in selected_author.items)} items)",
)
else:
selector = NumberedItemSelector(
@@ -63,7 +63,7 @@ class SearchCli(NumberedCmd):
)
selector.cmdloop()
if selector.result is None:
return
return None
selected_author = selector.result
selector = NumberedItemSelector(
@@ -74,8 +74,9 @@ class SearchCli(NumberedCmd):
if selector.result is not None:
self.result = selector.result
return True
return None
def do_search_owner(self, _: str):
def do_search_owner(self, _: str) -> bool | None:
while (input_text := input("Enter username: ")) == "":
pass
@@ -87,8 +88,8 @@ class SearchCli(NumberedCmd):
if len(users) == 0:
print("No users found.")
return
elif len(users) == 1:
return None
if len(users) == 1:
selected_user = users[0]
print("Found user:")
print(f" {selected_user}")
@@ -96,7 +97,7 @@ class SearchCli(NumberedCmd):
selector = NumberedItemSelector(items=users)
selector.cmdloop()
if selector.result is None:
return
return None
selected_user = selector.result
items = self.sql_session.scalars(
@@ -111,8 +112,9 @@ class SearchCli(NumberedCmd):
if selector.result is not None:
self.result = selector.result
return True
return None
def do_done(self, _: str):
def do_done(self, _: str) -> bool:
return True
funcs = {

View File

@@ -5,18 +5,17 @@ from textwrap import dedent
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from worblehat.services.config import Config
from worblehat.models import (
BookcaseItemBorrowing,
DeadlineDaemonLastRunDatetime,
BookcaseItemBorrowingQueue,
DeadlineDaemonLastRunDatetime,
)
from worblehat.services.config import Config
from worblehat.services.email import send_email
class DeadlineDaemon:
def __init__(self, sql_session: Session):
def __init__(self, sql_session: Session) -> None:
if not Config["deadline_daemon.enabled"]:
return
@@ -35,7 +34,7 @@ class DeadlineDaemon:
self.last_run_datetime = self.last_run.time
self.current_run_datetime = datetime.now()
def run(self):
def run(self) -> None:
logging.info("Deadline daemon started")
if not Config["deadline_daemon.enabled"]:
logging.warn("Deadline daemon disabled, exiting")
@@ -57,9 +56,9 @@ class DeadlineDaemon:
# EMAIL TEMPLATES #
###################
def _send_close_deadline_mail(self, borrowing: BookcaseItemBorrowing):
def _send_close_deadline_mail(self, borrowing: BookcaseItemBorrowing) -> None:
logging.info(
f"Sending close deadline mail to {borrowing.username}@pvv.ntnu.no."
f"Sending close deadline mail to {borrowing.username}@pvv.ntnu.no.",
)
send_email(
f"{borrowing.username}@pvv.ntnu.no",
@@ -67,17 +66,17 @@ class DeadlineDaemon:
dedent(
f"""
Your borrowing deadline for the following item is approaching:
{borrowing.item.name}
Please return the item by {borrowing.end_time.strftime("%a %b %d, %Y")}
""",
).strip(),
)
def _send_overdue_mail(self, borrowing: BookcaseItemBorrowing):
def _send_overdue_mail(self, borrowing: BookcaseItemBorrowing) -> None:
logging.info(
f"Sending overdue mail to {borrowing.username}@pvv.ntnu.no for {borrowing.item.isbn} - {borrowing.end_time.strftime('%a %b %d, %Y')}"
f"Sending overdue mail to {borrowing.username}@pvv.ntnu.no for {borrowing.item.isbn} - {borrowing.end_time.strftime('%a %b %d, %Y')}",
)
send_email(
f"{borrowing.username}@pvv.ntnu.no",
@@ -85,20 +84,18 @@ class DeadlineDaemon:
dedent(
f"""
Your delivery deadline for the following item has passed:
{borrowing.item.name}
Please return the item as soon as possible.
""",
).strip(),
)
def _send_newly_available_mail(self, queue_item: BookcaseItemBorrowingQueue):
def _send_newly_available_mail(self, queue_item: BookcaseItemBorrowingQueue) -> None:
logging.info(f"Sending newly available mail to {queue_item.username}")
days_before_queue_expires = Config[
"deadline_daemon.days_before_queue_position_expires"
]
days_before_queue_expires = Config["deadline_daemon.days_before_queue_position_expires"]
# TODO: calculate and format the date of when the queue position expires in the mail.
send_email(
@@ -116,10 +113,12 @@ class DeadlineDaemon:
)
def _send_expiring_queue_position_mail(
self, queue_position: BookcaseItemBorrowingQueue, day: int
):
self,
queue_position: BookcaseItemBorrowingQueue,
day: int,
) -> None:
logging.info(
f"Sending queue position expiry reminder to {queue_position.username}@pvv.ntnu.no."
f"Sending queue position expiry reminder to {queue_position.username}@pvv.ntnu.no.",
)
send_email(
f"{queue_position.username}@pvv.ntnu.no",
@@ -127,26 +126,27 @@ class DeadlineDaemon:
dedent(
f"""
Your queue position expiry deadline for the following item is approaching:
{queue_position.item.name}
Please borrow the item by {(queue_position.item_became_available_time + timedelta(days=day)).strftime("%a %b %d, %Y")}
""",
).strip(),
)
def _send_queue_position_expired_mail(
self, queue_position: BookcaseItemBorrowingQueue
):
self,
queue_position: BookcaseItemBorrowingQueue,
) -> None:
send_email(
f"{queue_position.username}@pvv.ntnu.no",
"Your queue position has expired",
dedent(
f"""
Your queue position for the following item has expired:
{queue_position.item.name}
You can queue for the item again at any time, but you will be placed at the back of the queue.
There are currently {len(queue_position.item.borrowing_queue)} users in the queue.
@@ -162,21 +162,17 @@ class DeadlineDaemon:
if self.sql_session.bind.dialect.name == "sqlite":
# SQLite does not support timedelta in queries
return func.datetime(x, f"-{y.days} days")
elif self.sql_session.bind.dialect.name == "postgresql":
if self.sql_session.bind.dialect.name == "postgresql":
return x - y
else:
raise NotImplementedError(
f"Unsupported dialect: {self.sql_session.bind.dialect.name}"
)
raise NotImplementedError(
f"Unsupported dialect: {self.sql_session.bind.dialect.name}",
)
def send_close_deadline_reminder_mails(self):
def send_close_deadline_reminder_mails(self) -> None:
logging.info("Sending mails for items with a closing deadline")
# TODO: This should be int-parsed and validated before the daemon started
days = [
int(d)
for d in Config["deadline_daemon.warn_days_before_borrowing_deadline"]
]
days = [int(d) for d in Config["deadline_daemon.warn_days_before_borrowing_deadline"]]
for day in days:
borrowings_to_remind = self.sql_session.scalars(
@@ -194,20 +190,20 @@ class DeadlineDaemon:
for borrowing in borrowings_to_remind:
self._send_close_deadline_mail(borrowing)
def send_overdue_mails(self):
def send_overdue_mails(self) -> None:
logging.info("Sending mails for overdue items")
to_remind = self.sql_session.scalars(
select(BookcaseItemBorrowing).where(
BookcaseItemBorrowing.end_time < self.current_run_datetime,
BookcaseItemBorrowing.delivered.is_(None),
)
),
).all()
for borrowing in to_remind:
self._send_overdue_mail(borrowing)
def send_newly_available_mails(self):
def send_newly_available_mails(self) -> None:
logging.info("Sending mails about newly available items")
newly_available = self.sql_session.scalars(
@@ -226,27 +222,25 @@ class DeadlineDaemon:
),
)
.order_by(BookcaseItemBorrowingQueue.entered_queue_time)
.group_by(BookcaseItemBorrowingQueue.fk_bookcase_item_uid)
.group_by(BookcaseItemBorrowingQueue.fk_bookcase_item_uid),
).all()
for queue_item in newly_available:
logging.info(
f"Adding user {queue_item.username} to queue for {queue_item.item.name}"
f"Adding user {queue_item.username} to queue for {queue_item.item.name}",
)
queue_item.item_became_available_time = self.current_run_datetime
self.sql_session.commit()
self._send_newly_available_mail(queue_item)
def send_expiring_queue_position_mails(self):
def send_expiring_queue_position_mails(self) -> None:
logging.info("Sending mails about queue positions which are expiring soon")
logging.warning("Not implemented")
days = [
int(d)
for d in Config[
"deadline_daemon.warn_days_before_expiring_queue_position_deadline"
]
for d in Config["deadline_daemon.warn_days_before_expiring_queue_position_deadline"]
]
for day in days:
queue_positions_to_remind = self.sql_session.scalars(
@@ -258,8 +252,7 @@ class DeadlineDaemon:
)
.where(
self._sql_subtract_date(
BookcaseItemBorrowingQueue.item_became_available_time
+ timedelta(days=day),
BookcaseItemBorrowingQueue.item_became_available_time + timedelta(days=day),
timedelta(days=day),
).between(
self.last_run_datetime,
@@ -271,11 +264,11 @@ class DeadlineDaemon:
for queue_position in queue_positions_to_remind:
self._send_expiring_queue_position_mail(queue_position, day)
def auto_expire_queue_positions(self):
def auto_expire_queue_positions(self) -> None:
logging.info("Expiring queue positions which are too old")
queue_position_expiry_days = int(
Config["deadline_daemon.days_before_queue_position_expires"]
Config["deadline_daemon.days_before_queue_position_expires"],
)
overdue_queue_positions = self.sql_session.scalars(
@@ -289,7 +282,7 @@ class DeadlineDaemon:
for queue_position in overdue_queue_positions:
logging.info(
f"Expiring queue position for {queue_position.username} for item {queue_position.item.name}"
f"Expiring queue position for {queue_position.username} for item {queue_position.item.name}",
)
queue_position.expired = True
@@ -308,12 +301,10 @@ class DeadlineDaemon:
self._send_queue_position_expired_mail(queue_position)
if next_queue_position is not None:
next_queue_position.item_became_available_time = (
self.current_run_datetime
)
next_queue_position.item_became_available_time = self.current_run_datetime
logging.info(
f"Next user in queue for item {next_queue_position.item.name} is {next_queue_position.username}"
f"Next user in queue for item {next_queue_position.item.name} is {next_queue_position.username}",
)
self._send_newly_available_mail(next_queue_position)

View File

@@ -1,18 +1,19 @@
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from worblehat.models import (
BookcaseItem,
BookcaseItemBorrowing,
BookcaseItemBorrowingQueue,
DeadlineDaemonLastRunDatetime,
)
from worblehat.services.config import Config
from .seed_test_data import main as seed_test_data_main
def clear_db(sql_session):
def clear_db(sql_session: Session) -> None:
sql_session.query(BookcaseItemBorrowingQueue).delete()
sql_session.query(BookcaseItemBorrowing).delete()
sql_session.query(DeadlineDaemonLastRunDatetime).delete()
@@ -22,19 +23,17 @@ def clear_db(sql_session):
# NOTE: feel free to change this function to suit your needs
# it's just a quick and dirty way to get some data into the database
# for testing the deadline daemon - oysteikt 2024
def main(sql_session):
def main(sql_session: Session) -> None:
borrow_warning_days = [
timedelta(days=int(d))
for d in Config["deadline_daemon.warn_days_before_borrowing_deadline"]
]
queue_warning_days = [
timedelta(days=int(d))
for d in Config[
"deadline_daemon.warn_days_before_expiring_queue_position_deadline"
]
for d in Config["deadline_daemon.warn_days_before_expiring_queue_position_deadline"]
]
queue_expire_days = int(
Config["deadline_daemon.days_before_queue_position_expires"]
Config["deadline_daemon.days_before_queue_position_expires"],
)
clear_db(sql_session)

View File

@@ -1,6 +1,8 @@
import csv
from pathlib import Path
from sqlalchemy.orm import Session
from worblehat.models import (
Bookcase,
BookcaseItem,
@@ -9,13 +11,11 @@ from worblehat.models import (
MediaType,
)
CSV_FILE = (
Path(__file__).parent.parent.parent.parent / "data" / "arbeidsrom_smal_hylle_5.csv"
)
CSV_FILE = Path(__file__).parent.parent.parent.parent / "data" / "arbeidsrom_smal_hylle_5.csv"
LANGUAGE_FILE = Path(__file__).parent.parent.parent.parent / "data" / "iso639_1.csv"
def clear_db(sql_session):
def clear_db(sql_session: Session) -> None:
sql_session.query(BookcaseItem).delete()
sql_session.query(BookcaseShelf).delete()
sql_session.query(Bookcase).delete()
@@ -24,7 +24,7 @@ def clear_db(sql_session):
sql_session.commit()
def main(sql_session):
def main(sql_session: Session) -> None:
clear_db(sql_session)
media_type = MediaType(
@@ -33,7 +33,7 @@ def main(sql_session):
)
sql_session.add(media_type)
with open(LANGUAGE_FILE, newline="") as langs:
with LANGUAGE_FILE.open(newline="") as langs:
t = csv.reader(langs, delimiter=",", quotechar="|")
for row in t:
language = Language(name=row[1], iso639_1_code=row[0])
@@ -61,13 +61,13 @@ def main(sql_session):
sql_session.add(seed_shelf_2)
bookcase_items = []
with open(CSV_FILE) as csv_file:
with CSV_FILE.open() as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")
next(csv_reader)
for row in csv_reader:
item = BookcaseItem(
isbn=int(row[0]),
isbn=row[0],
name=row[1],
)
item.media_type = media_type

View File

@@ -4,8 +4,8 @@ from flask_admin.contrib.sqla import ModelView
from sqlalchemy import inspect
from worblehat.models import *
from worblehat.services.seed_test_data import seed_data
from worblehat.services.config import Config
from worblehat.services.seed_test_data import seed_data
from .blueprints.main import main
from .database import db
@@ -33,7 +33,7 @@ def create_app(args: dict[str, any] | None = None):
return app
def configure_admin(app):
def configure_admin(app) -> None:
admin = Admin(app, name="Worblehat", template_mode="bootstrap3")
admin.add_view(ModelView(Author, db.session))
admin.add_view(ModelView(Bookcase, db.session))

View File

@@ -1,10 +1,9 @@
from werkzeug import run_simple
from .flaskapp import create_app
def main():
def main() -> None:
app = create_app()
run_simple(
hostname="localhost",

View File

@@ -1,7 +1,7 @@
from .flaskapp import create_app
def main():
def main() -> None:
app = create_app()
app.run()

View File

@@ -1,24 +1,24 @@
from worblehat.models import Base
import logging
from pprint import pformat
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from worblehat.models import Base
from .cli import WorblehatCli
from .deadline_daemon import DeadlineDaemon
from .flaskapp.wsgi_dev import main as flask_dev_main
from .flaskapp.wsgi_prod import main as flask_prod_main
from .services import (
Config,
arg_parser,
devscripts_arg_parser,
)
from .deadline_daemon import DeadlineDaemon
from .cli import WorblehatCli
from .flaskapp.wsgi_dev import main as flask_dev_main
from .flaskapp.wsgi_prod import main as flask_prod_main
def _print_version() -> None:
from importlib.metadata import version, PackageNotFoundError
from importlib.metadata import PackageNotFoundError, version
try:
__version__ = version("worblehat")
@@ -41,7 +41,7 @@ def _connect_to_database(**engine_args) -> Session:
return sql_session
def main():
def main() -> None:
args = arg_parser.parse_args()
Config.load_configuration(vars(args))
@@ -97,7 +97,7 @@ def main():
if args.command == "flask-prod":
if Config["logging.debug"] or Config["logging.debug_sql"]:
logging.warning(
"Debug mode is enabled for the production server. This is not recommended."
"Debug mode is enabled for the production server. This is not recommended.",
)
flask_prod_main()
exit(0)

View File

@@ -1,4 +1,5 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy.orm import (
@@ -26,5 +27,5 @@ class Author(Base, UidMixin, UniqueNameMixin):
def __init__(
self,
name: str,
):
) -> None:
self.name = name

View File

@@ -18,7 +18,7 @@ class Base(DeclarativeBase):
"ck": "ck_%(table_name)s_`%(constraint_name)s`",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}
},
)
@declared_attr.directive
@@ -38,7 +38,7 @@ class Base(DeclarativeBase):
isinstance(v, InstrumentedList),
isinstance(v, InstrumentedSet),
isinstance(v, InstrumentedDict),
]
],
)
)
return f"<{self.__class__.__name__}({columns})>"

View File

@@ -1,4 +1,5 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import Text
@@ -27,7 +28,7 @@ class Bookcase(Base, UidMixin, UniqueNameMixin):
self,
name: str,
description: str | None = None,
):
) -> None:
self.name = name
self.description = description

View File

@@ -1,4 +1,5 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
@@ -9,6 +10,7 @@ from sqlalchemy import (
)
from sqlalchemy.orm import (
Mapped,
Session,
mapped_column,
relationship,
)
@@ -18,8 +20,8 @@ from .mixins import (
UidMixin,
)
from .xref_tables import (
Item_Category,
Item_Author,
Item_Category,
)
if TYPE_CHECKING:
@@ -35,7 +37,7 @@ from worblehat.flaskapp.database import db
class BookcaseItem(Base, UidMixin):
isbn: Mapped[int] = mapped_column(String, unique=True, index=True)
isbn: Mapped[str] = mapped_column(String, unique=True, index=True)
name: Mapped[str] = mapped_column(Text, index=True)
owner: Mapped[str] = mapped_column(String, default="PVV")
amount: Mapped[int] = mapped_column(SmallInteger, default=1)
@@ -49,7 +51,7 @@ class BookcaseItem(Base, UidMixin):
language: Mapped[Language] = relationship()
borrowings: Mapped[set[BookcaseItemBorrowing]] = relationship(back_populates="item")
borrowing_queue: Mapped[set[BookcaseItemBorrowingQueue]] = relationship(
back_populates="item"
back_populates="item",
)
categories: Mapped[set[Category]] = relationship(
@@ -64,9 +66,9 @@ class BookcaseItem(Base, UidMixin):
def __init__(
self,
name: str,
isbn: int | None = None,
isbn: str | None = None,
owner: str = "PVV",
):
) -> None:
self.name = name
self.isbn = isbn
self.owner = owner

View File

@@ -1,11 +1,12 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from sqlalchemy import (
DateTime,
ForeignKey,
String,
DateTime,
)
from sqlalchemy.orm import (
Mapped,
@@ -24,12 +25,14 @@ class BookcaseItemBorrowing(Base, UidMixin):
username: Mapped[str] = mapped_column(String)
start_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now())
end_time: Mapped[datetime] = mapped_column(
DateTime, default=datetime.now() + timedelta(days=30)
DateTime,
default=datetime.now() + timedelta(days=30),
)
delivered: Mapped[datetime | None] = mapped_column(DateTime, default=None)
fk_bookcase_item_uid: Mapped[int] = mapped_column(
ForeignKey("BookcaseItem.uid"), index=True
ForeignKey("BookcaseItem.uid"),
index=True,
)
item: Mapped[BookcaseItem] = relationship(back_populates="borrowings")
@@ -38,7 +41,7 @@ class BookcaseItemBorrowing(Base, UidMixin):
self,
username: str,
item: BookcaseItem,
):
) -> None:
self.username = username
self.item = item
self.start_time = datetime.now()

View File

@@ -1,12 +1,13 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import (
Boolean,
DateTime,
ForeignKey,
String,
DateTime,
Boolean,
)
from sqlalchemy.orm import (
Mapped,
@@ -24,13 +25,15 @@ if TYPE_CHECKING:
class BookcaseItemBorrowingQueue(Base, UidMixin):
username: Mapped[str] = mapped_column(String)
entered_queue_time: Mapped[datetime] = mapped_column(
DateTime, default=datetime.now()
DateTime,
default=datetime.now(),
)
item_became_available_time: Mapped[datetime | None] = mapped_column(DateTime)
expired = mapped_column(Boolean, default=False)
fk_bookcase_item_uid: Mapped[int] = mapped_column(
ForeignKey("BookcaseItem.uid"), index=True
ForeignKey("BookcaseItem.uid"),
index=True,
)
item: Mapped[BookcaseItem] = relationship(back_populates="borrowing_queue")
@@ -39,7 +42,7 @@ class BookcaseItemBorrowingQueue(Base, UidMixin):
self,
username: str,
item: BookcaseItem,
):
) -> None:
self.username = username
self.item = item
self.entered_queue_time = datetime.now()

View File

@@ -1,4 +1,5 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
@@ -47,7 +48,7 @@ class BookcaseShelf(Base, UidMixin):
column: int,
bookcase: Bookcase,
description: str | None = None,
):
) -> None:
self.row = row
self.column = column
self.bookcase = bookcase

View File

@@ -1,4 +1,5 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import Text
@@ -31,6 +32,6 @@ class Category(Base, UidMixin, UniqueNameMixin):
self,
name: str,
description: str | None = None,
):
) -> None:
self.name = name
self.description = description

View File

@@ -1,9 +1,9 @@
from datetime import datetime
from sqlalchemy import (
Boolean,
CheckConstraint,
DateTime,
Boolean,
)
from sqlalchemy.orm import (
Mapped,
@@ -23,6 +23,6 @@ class DeadlineDaemonLastRunDatetime(Base):
uid: Mapped[bool] = mapped_column(Boolean, primary_key=True, default=True)
time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now())
def __init__(self, time: datetime | None = None):
def __init__(self, time: datetime | None = None) -> None:
if time is not None:
self.time = time

View File

@@ -7,7 +7,6 @@ from sqlalchemy.orm import (
mapped_column,
)
from .Base import Base
from .mixins import UidMixin, UniqueNameMixin
@@ -19,6 +18,6 @@ class Language(Base, UidMixin, UniqueNameMixin):
self,
name: str,
iso639_1_code: str,
):
) -> None:
self.name = name
self.iso639_1_code = iso639_1_code

View File

@@ -1,4 +1,5 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import Text
@@ -24,6 +25,6 @@ class MediaType(Base, UidMixin, UniqueNameMixin):
self,
name: str,
description: str | None = None,
):
) -> None:
self.name = name
self.description = description

View File

@@ -1,7 +1,7 @@
from alembic import context
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from sqlalchemy import engine_from_config, pool
from worblehat.models import Base
from worblehat.services.config import Config
@@ -12,8 +12,8 @@ if config.config_file_name is not None:
fileConfig(config.config_file_name)
config_attrs = {}
if (config_path := context.get_x_argument(as_dictionary=True).get('config', None)):
config_attrs['config_file'] = config_path
if config_path := context.get_x_argument(as_dictionary=True).get("config", None):
config_attrs["config_file"] = config_path
Config.load_configuration(config_attrs)
@@ -22,7 +22,7 @@ config.set_main_option("sqlalchemy.url", Config.db_string())
# This will make sure alembic doesn't generate empty migrations
# https://stackoverflow.com/questions/70203927/how-to-prevent-alembic-revision-autogenerate-from-making-revision-file-if-it-h
def _process_revision_directives(context, revision, directives):
def _process_revision_directives(context, revision, directives) -> None:
if config.cmd_opts.autogenerate:
script = directives[0]
if script.upgrade_ops.is_empty():

View File

@@ -6,9 +6,8 @@ Create Date: 2024-07-31 21:07:13.434012
"""
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "7dfbf8a8dec8"
@@ -67,7 +66,9 @@ def upgrade() -> None:
)
with op.batch_alter_table("Language", schema=None) as batch_op:
batch_op.create_index(
batch_op.f("ix_Language_iso639_1_code"), ["iso639_1_code"], unique=True
batch_op.f("ix_Language_iso639_1_code"),
["iso639_1_code"],
unique=True,
)
batch_op.create_index(batch_op.f("ix_Language_name"), ["name"], unique=True)
@@ -95,7 +96,10 @@ def upgrade() -> None:
),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_BookcaseShelf")),
sa.UniqueConstraint(
"column", "fk_bookcase_uid", "row", name=op.f("uq_BookcaseShelf_column")
"column",
"fk_bookcase_uid",
"row",
name=op.f("uq_BookcaseShelf_column"),
),
)
op.create_table(
@@ -128,7 +132,9 @@ def upgrade() -> None:
with op.batch_alter_table("BookcaseItem", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_BookcaseItem_isbn"), ["isbn"], unique=True)
batch_op.create_index(
batch_op.f("ix_BookcaseItem_name"), ["name"], unique=False
batch_op.f("ix_BookcaseItem_name"),
["name"],
unique=False,
)
op.create_table(
@@ -165,7 +171,7 @@ def upgrade() -> None:
["fk_bookcase_item_uid"],
["BookcaseItem.uid"],
name=op.f(
"fk_BookcaseItemBorrowingQueue_fk_bookcase_item_uid_BookcaseItem"
"fk_BookcaseItemBorrowingQueue_fk_bookcase_item_uid_BookcaseItem",
),
),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_BookcaseItemBorrowingQueue")),
@@ -192,7 +198,9 @@ def upgrade() -> None:
name=op.f("fk_Item_Author_fk_item_uid_BookcaseItem"),
),
sa.PrimaryKeyConstraint(
"fk_item_uid", "fk_author_uid", name=op.f("pk_Item_Author")
"fk_item_uid",
"fk_author_uid",
name=op.f("pk_Item_Author"),
),
)
op.create_table(
@@ -210,7 +218,9 @@ def upgrade() -> None:
name=op.f("fk_Item_Category_fk_item_uid_BookcaseItem"),
),
sa.PrimaryKeyConstraint(
"fk_item_uid", "fk_category_uid", name=op.f("pk_Item_Category")
"fk_item_uid",
"fk_category_uid",
name=op.f("pk_Item_Category"),
),
)
# ### end Alembic commands ###
@@ -222,7 +232,7 @@ def downgrade() -> None:
op.drop_table("Item_Author")
with op.batch_alter_table("BookcaseItemBorrowingQueue", schema=None) as batch_op:
batch_op.drop_index(
batch_op.f("ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid")
batch_op.f("ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid"),
)
op.drop_table("BookcaseItemBorrowingQueue")

View File

@@ -1,4 +1,4 @@
from typing_extensions import Self
from typing import Self
from sqlalchemy import Integer
from sqlalchemy.orm import (
@@ -10,7 +10,7 @@ from sqlalchemy.orm import (
from worblehat.flaskapp.database import db
class UidMixin(object):
class UidMixin:
uid: Mapped[int] = mapped_column(Integer, primary_key=True)
@classmethod

View File

@@ -1,4 +1,4 @@
from typing_extensions import Self
from typing import Self
from sqlalchemy import Text
from sqlalchemy.orm import (
@@ -10,7 +10,7 @@ from sqlalchemy.orm import (
from worblehat.flaskapp.database import db
class UniqueNameMixin(object):
class UniqueNameMixin:
name: Mapped[str] = mapped_column(Text, unique=True, index=True)
@classmethod

View File

@@ -1,7 +1,7 @@
from sqlalchemy.orm import declared_attr
class XrefMixin(object):
class XrefMixin:
@declared_attr.directive
def __tablename__(cls) -> str:
return f"xref_{cls.__name__.lower()}"

View File

@@ -12,8 +12,10 @@ from ..mixins.XrefMixin import XrefMixin
class Item_Author(Base, XrefMixin):
fk_item_uid: Mapped[int] = mapped_column(
ForeignKey("BookcaseItem.uid"), primary_key=True
ForeignKey("BookcaseItem.uid"),
primary_key=True,
)
fk_author_uid: Mapped[int] = mapped_column(
ForeignKey("Author.uid"), primary_key=True
ForeignKey("Author.uid"),
primary_key=True,
)

View File

@@ -12,8 +12,10 @@ from ..mixins.XrefMixin import XrefMixin
class Item_Category(Base, XrefMixin):
fk_item_uid: Mapped[int] = mapped_column(
ForeignKey("BookcaseItem.uid"), primary_key=True
ForeignKey("BookcaseItem.uid"),
primary_key=True,
)
fk_category_uid: Mapped[int] = mapped_column(
ForeignKey("Category.uid"), primary_key=True
ForeignKey("Category.uid"),
primary_key=True,
)

View File

@@ -37,7 +37,8 @@ subparsers.add_parser(
)
devscripts_arg_parser = subparsers.add_parser(
"devscripts", help="Run development scripts"
"devscripts",
help="Run development scripts",
)
devscripts_subparsers = devscripts_arg_parser.add_subparsers(dest="script")

View File

@@ -23,7 +23,7 @@ def is_valid_isbn(isbn: str) -> bool:
[
isbnlib.is_isbn10(isbn),
isbnlib.is_isbn13(isbn),
]
],
)
@@ -58,7 +58,7 @@ def create_bookcase_item_from_isbn(
if language := metadata.language:
bookcase_item.language = sql_session.scalars(
select(Language).where(Language.iso639_1_code == language)
select(Language).where(Language.iso639_1_code == language),
).one()
return bookcase_item

View File

@@ -1,7 +1,7 @@
from pathlib import Path
import tomllib
from typing import Any
from pathlib import Path
from pprint import pformat
from typing import Any
class Config:
@@ -26,7 +26,7 @@ class Config:
def __class_getitem__(cls, name: str) -> Any:
if cls._config is None:
raise RuntimeError(
"Configuration not loaded, call Config.load_configuration() first."
"Configuration not loaded, call Config.load_configuration() first.",
)
__config = cls._config
@@ -39,7 +39,7 @@ class Config:
@staticmethod
def read_password(password_field: str) -> str:
if Path(password_field).is_file():
with open(password_field, "r") as f:
with Path(password_field).open() as f:
return f.read()
else:
return password_field
@@ -49,10 +49,12 @@ class Config:
for path in cls._expected_config_file_locations:
if path.is_file():
return path
return None
@classmethod
def _load_configuration_from_file(
cls, config_file_path: str | None
cls,
config_file_path: str | None,
) -> dict[str, any]:
if config_file_path is None:
config_file_path = cls._locate_configuration_file()
@@ -61,10 +63,8 @@ class Config:
print("Error: could not locate configuration file.")
exit(1)
with open(config_file_path, "rb") as config_file:
args = tomllib.load(config_file)
return args
with config_file_path.open("rb") as config_file:
return tomllib.load(config_file)
@classmethod
def db_string(cls) -> str:
@@ -74,7 +74,7 @@ class Config:
path = Path(cls._config.get("database").get("sqlite").get("path"))
return f"sqlite:///{path.absolute()}"
elif db_type == "postgresql":
if db_type == "postgresql":
db_config = cls._config.get("database").get("postgresql")
host = db_config.get("host")
port = db_config.get("port")
@@ -83,11 +83,9 @@ class Config:
database = db_config.get("database")
if host.startswith("/"):
return f"postgresql+psycopg2://{username}:{password}@/{database}?host={host}"
else:
return f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}"
else:
print(f"Error: unknown database type '{db_config.get('type')}'")
exit(1)
return f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}"
print(f"Error: unknown database type '{db_config.get('type')}'")
exit(1)
@classmethod
def db_string_no_password(cls) -> str:
@@ -97,7 +95,7 @@ class Config:
path = Path(cls._config.get("database").get("sqlite").get("path"))
return f"sqlite:///{path.absolute()}"
elif db_type == "postgresql":
if db_type == "postgresql":
db_config = cls._config.get("database").get("postgresql")
host = db_config.get("host")
port = db_config.get("port")
@@ -105,11 +103,9 @@ class Config:
database = db_config.get("database")
if host.startswith("/"):
return f"postgresql+psycopg2://{username}:<password>@/{database}?host={host}"
else:
return f"postgresql+psycopg2://{username}:<password>@{host}:{port}/{database}"
else:
print(f"Error: unknown database type '{db_config.get('type')}'")
exit(1)
return f"postgresql+psycopg2://{username}:<password>@{host}:{port}/{database}"
print(f"Error: unknown database type '{db_config.get('type')}'")
exit(1)
@classmethod
def debug(cls) -> str:

View File

@@ -1,14 +1,12 @@
import smtplib
from textwrap import indent
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from textwrap import indent
from .config import Config
def send_email(to: str, subject: str, body: str):
def send_email(to: str, subject: str, body: str) -> None:
msg = MIMEMultipart()
msg["From"] = Config["smtp.from"]
msg["To"] = to

View File

@@ -1,25 +1,21 @@
from dataclasses import dataclass
from typing import Set
# TODO: Add more languages
LANGUAGES: set[str] = set(
[
"no",
"en",
"de",
"fr",
"es",
"it",
"sv",
"da",
"fi",
"ru",
"zh",
"ja",
"ko",
]
)
LANGUAGES: set[str] = {
"no",
"en",
"de",
"fr",
"es",
"it",
"sv",
"da",
"fi",
"ru",
"zh",
"ja",
"ko",
}
@dataclass
@@ -30,11 +26,11 @@ class BookMetadata:
title: str
# The source of the metadata provider
source: str
authors: Set[str]
authors: set[str]
language: str | None
publish_date: str | None
num_pages: int | None
subjects: Set[str]
subjects: set[str]
def to_dict(self) -> dict[str, any]:
return {
@@ -60,7 +56,7 @@ class BookMetadata:
if self.language is not None and self.language not in LANGUAGES:
raise ValueError(
f"Invalid language: {self.language}. Consider adding it to the LANGUAGES set if you think this is a mistake."
f"Invalid language: {self.language}. Consider adding it to the LANGUAGES set if you think this is a mistake.",
)
if self.num_pages is not None and self.num_pages < 0:

View File

@@ -1,5 +1,6 @@
# base fetcher.
from abc import ABC, abstractmethod
from .BookMetadata import BookMetadata

View File

@@ -4,8 +4,8 @@ A BookMetadataFetcher for the Google Books API.
import requests
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
from worblehat.services.metadata_fetchers.BookMetadata import BookMetadata
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
class GoogleBooksFetcher(BookMetadataFetcher):

View File

@@ -4,8 +4,8 @@ A BookMetadataFetcher for the Open Library API.
import requests
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
from worblehat.services.metadata_fetchers.BookMetadata import BookMetadata
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
LANGUAGE_MAP = {
"Norwegian": "no",
@@ -26,11 +26,7 @@ class OpenLibraryFetcher(BookMetadataFetcher):
author_names = set()
for author_key in author_keys:
key = author_key.get("key")
author_name = (
requests.get(f"https://openlibrary.org/{key}.json")
.json()
.get("name")
)
author_name = requests.get(f"https://openlibrary.org/{key}.json").json().get("name")
author_names.add(author_name)
title = jsonInput.get("title")

View File

@@ -2,13 +2,11 @@
A BookMetadataFetcher that webscrapes https://outland.no/
"""
import requests
from bs4 import BeautifulSoup
import requests
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
from worblehat.services.metadata_fetchers.BookMetadata import BookMetadata
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
LANGUAGE_MAP = {
"Norsk": "no",

View File

@@ -7,14 +7,12 @@ from concurrent.futures import ThreadPoolExecutor
from worblehat.services.metadata_fetchers.BookMetadata import BookMetadata
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
from worblehat.services.metadata_fetchers.GoogleBooksFetcher import GoogleBooksFetcher
from worblehat.services.metadata_fetchers.OpenLibraryFetcher import OpenLibraryFetcher
from worblehat.services.metadata_fetchers.OutlandScraperFetcher import (
OutlandScraperFetcher,
)
# The order of these fetchers determines the priority of the sources.
# The first fetcher in the list has the highest priority.
FETCHERS: list[BookMetadataFetcher] = [
@@ -38,7 +36,7 @@ def sort_metadata_by_priority(metadata: list[BookMetadata]) -> list[BookMetadata
return sorted(metadata, key=lambda m: FETCHER_SOURCE_IDS.index(m.source))
def fetch_metadata_from_multiple_sources(isbn: str, strict=False) -> list[BookMetadata]:
def fetch_metadata_from_multiple_sources(isbn: str, strict: bool=False) -> list[BookMetadata]:
"""
Returns a list of metadata fetched from multiple sources.
@@ -55,9 +53,7 @@ def fetch_metadata_from_multiple_sources(isbn: str, strict=False) -> list[BookMe
results: list[BookMetadata] = []
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(fetcher.fetch_metadata, isbn) for fetcher in FETCHERS
]
futures = [executor.submit(fetcher.fetch_metadata, isbn) for fetcher in FETCHERS]
for future in futures:
result = future.result()
@@ -70,9 +66,8 @@ def fetch_metadata_from_multiple_sources(isbn: str, strict=False) -> list[BookMe
except ValueError as e:
if strict:
raise e
else:
print(f"Invalid metadata: {e}")
results.remove(result)
print(f"Invalid metadata: {e}")
results.remove(result)
return sort_metadata_by_priority(results)

View File

@@ -18,7 +18,7 @@ from ..models import (
)
def seed_data(sql_session: Session = db.session):
def seed_data(sql_session: Session = db.session) -> None:
media_types = [
MediaType(name="Book", description="A physical book"),
MediaType(name="Comic", description="A comic book"),
@@ -50,7 +50,10 @@ def seed_data(sql_session: Session = db.session):
BookcaseShelf(row=1, column=1, bookcase=bookcases[0]),
BookcaseShelf(row=2, column=1, bookcase=bookcases[0], description="DOS"),
BookcaseShelf(
row=3, column=1, bookcase=bookcases[0], description="Food for thought"
row=3,
column=1,
bookcase=bookcases[0],
description="Food for thought",
),
BookcaseShelf(row=4, column=1, bookcase=bookcases[0], description="CPP"),
BookcaseShelf(row=0, column=2, bookcase=bookcases[0]),
@@ -60,20 +63,32 @@ def seed_data(sql_session: Session = db.session):
BookcaseShelf(row=4, column=2, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=3, bookcase=bookcases[0]),
BookcaseShelf(
row=1, column=3, bookcase=bookcases[0], description="Big indonisian island"
row=1,
column=3,
bookcase=bookcases[0],
description="Big indonisian island",
),
BookcaseShelf(row=2, column=3, bookcase=bookcases[0]),
BookcaseShelf(
row=3, column=3, bookcase=bookcases[0], description="Div science"
row=3,
column=3,
bookcase=bookcases[0],
description="Div science",
),
BookcaseShelf(row=4, column=3, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=4, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=4, bookcase=bookcases[0]),
BookcaseShelf(
row=2, column=4, bookcase=bookcases[0], description="(not) computer vision"
row=2,
column=4,
bookcase=bookcases[0],
description="(not) computer vision",
),
BookcaseShelf(
row=3, column=4, bookcase=bookcases[0], description="Low voltage"
row=3,
column=4,
bookcase=bookcases[0],
description="Low voltage",
),
BookcaseShelf(row=4, column=4, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=5, bookcase=bookcases[0]),
@@ -111,38 +126,62 @@ def seed_data(sql_session: Session = db.session):
description="Soviet Phys. Techn. Phys",
),
BookcaseShelf(
row=4, column=1, bookcase=bookcases[2], description="Digitalteknikk"
row=4,
column=1,
bookcase=bookcases[2],
description="Digitalteknikk",
),
BookcaseShelf(row=5, column=1, bookcase=bookcases[2], description="Material"),
BookcaseShelf(row=0, column=2, bookcase=bookcases[2]),
BookcaseShelf(
row=1, column=2, bookcase=bookcases[2], description="Assembler / APL"
row=1,
column=2,
bookcase=bookcases[2],
description="Assembler / APL",
),
BookcaseShelf(row=2, column=2, bookcase=bookcases[2], description="Internet"),
BookcaseShelf(row=3, column=2, bookcase=bookcases[2], description="Algorithms"),
BookcaseShelf(
row=4, column=2, bookcase=bookcases[2], description="Soviet Physics Jetp"
row=4,
column=2,
bookcase=bookcases[2],
description="Soviet Physics Jetp",
),
BookcaseShelf(
row=5, column=2, bookcase=bookcases[2], description="Død og pine"
row=5,
column=2,
bookcase=bookcases[2],
description="Død og pine",
),
BookcaseShelf(row=0, column=3, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=3, bookcase=bookcases[2], description="Web"),
BookcaseShelf(
row=2, column=3, bookcase=bookcases[2], description="Div languages"
row=2,
column=3,
bookcase=bookcases[2],
description="Div languages",
),
BookcaseShelf(row=3, column=3, bookcase=bookcases[2], description="Python"),
BookcaseShelf(row=4, column=3, bookcase=bookcases[2], description="D&D Minis"),
BookcaseShelf(row=5, column=3, bookcase=bookcases[2], description="Perl"),
BookcaseShelf(row=0, column=4, bookcase=bookcases[2]),
BookcaseShelf(
row=1, column=4, bookcase=bookcases[2], description="Knuth on programming"
row=1,
column=4,
bookcase=bookcases[2],
description="Knuth on programming",
),
BookcaseShelf(
row=2, column=4, bookcase=bookcases[2], description="Div languages"
row=2,
column=4,
bookcase=bookcases[2],
description="Div languages",
),
BookcaseShelf(
row=3, column=4, bookcase=bookcases[2], description="Typesetting"
row=3,
column=4,
bookcase=bookcases[2],
description="Typesetting",
),
BookcaseShelf(row=4, column=4, bookcase=bookcases[2]),
BookcaseShelf(row=0, column=0, bookcase=bookcases[3]),
@@ -251,7 +290,7 @@ def seed_data(sql_session: Session = db.session):
BookcaseItemBorrowingQueue(username="user", item=borrowed_book_people_in_queue),
]
with open(Path(__file__).parent.parent.parent / "data" / "iso639_1.csv") as f:
with (Path(__file__).parent.parent.parent / "data" / "iso639_1.csv").open() as f:
reader = csv.reader(f)
languages = [Language(name, code) for (code, name) in reader]