treewide: format and lint

This commit is contained in:
Oystein Kristoffer Tveit 2025-03-22 21:15:15 +01:00
parent a7ff594548
commit 46f6de5d61
Signed by: oysteikt
GPG Key ID: 9F2F7D8250F35146
55 changed files with 1269 additions and 987 deletions

@ -1 +1,3 @@
from .main import main
from .main import main
__all__ = ["main"]

@ -1 +1,3 @@
from .main import WorblehatCli
from .main import WorblehatCli
__all__ = ["WorblehatCli"]

@ -30,19 +30,20 @@ from .subclis import (
# However, is there anyone who are going to search by category rather than just look in
# the shelves?
class WorblehatCli(NumberedCmd):
def __init__(self, sql_session: Session):
super().__init__()
self.sql_session = sql_session
self.sql_session_dirty = False
@event.listens_for(self.sql_session, 'after_flush')
@event.listens_for(self.sql_session, "after_flush")
def mark_session_as_dirty(*_):
self.sql_session_dirty = True
self.prompt_header = f'(unsaved changes)'
self.prompt_header = "(unsaved changes)"
@event.listens_for(self.sql_session, 'after_commit')
@event.listens_for(self.sql_session, 'after_rollback')
@event.listens_for(self.sql_session, "after_commit")
@event.listens_for(self.sql_session, "after_rollback")
def mark_session_as_clean(*_):
self.sql_session_dirty = False
self.prompt_header = None
@ -57,19 +58,20 @@ class WorblehatCli(NumberedCmd):
if not tool.sql_session_dirty:
exit(0)
try:
print()
if prompt_yes_no('Are you sure you want to exit without saving?', default=False):
raise KeyboardInterrupt
print()
if prompt_yes_no(
"Are you sure you want to exit without saving?", default=False
):
raise KeyboardInterrupt
except KeyboardInterrupt:
if tool.sql_session is not None:
tool.sql_session.rollback()
exit(0)
def do_show_bookcase(self, arg: str):
bookcase_selector = InteractiveItemSelector(
cls = Bookcase,
sql_session = self.sql_session,
cls=Bookcase,
sql_session=self.sql_session,
)
bookcase_selector.cmdloop()
bookcase = bookcase_selector.result
@ -77,8 +79,7 @@ class WorblehatCli(NumberedCmd):
for shelf in bookcase.shelfs:
print(shelf.short_str())
for item in shelf.items:
print(f' {item.name} - {item.amount} copies')
print(f" {item.name} - {item.amount} copies")
def do_show_borrowed_queued(self, _: str):
borrowed_items = self.sql_session.scalars(
@ -88,105 +89,114 @@ class WorblehatCli(NumberedCmd):
).all()
if len(borrowed_items) == 0:
print('No borrowed items found.')
print("No borrowed items found.")
else:
print('Borrowed items:')
for item in borrowed_items:
print(f'- {item.username} - {item.item.name} - to be delivered by {item.end_time.strftime("%Y-%m-%d")}')
print("Borrowed items:")
for item in borrowed_items:
print(
f"- {item.username} - {item.item.name} - to be delivered by {item.end_time.strftime('%Y-%m-%d')}"
)
print()
queued_items = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.order_by(BookcaseItemBorrowingQueue.entered_queue_time),
select(BookcaseItemBorrowingQueue).order_by(
BookcaseItemBorrowingQueue.entered_queue_time
),
).all()
if len(queued_items) == 0:
print('No queued items found.')
print("No queued items found.")
else:
print('Users in queue:')
for item in queued_items:
print(f'- {item.username} - {item.item.name} - entered queue at {item.entered_queue_time.strftime("%Y-%m-%d")}')
print("Users in queue:")
for item in queued_items:
print(
f"- {item.username} - {item.item.name} - entered queue at {item.entered_queue_time.strftime('%Y-%m-%d')}"
)
def _create_bookcase_item(self, isbn: str):
bookcase_item = create_bookcase_item_from_isbn(isbn, self.sql_session)
if bookcase_item is None:
print(f'Could not find data about item with ISBN {isbn} online.')
print(f'If you think this is not due to a bug, please add the book to openlibrary.org before continuing.')
print(f"Could not find data about item with ISBN {isbn} online.")
print(
"If you think this is not due to a bug, please add the book to openlibrary.org before continuing."
)
return
else:
print(dedent(f"""
print(
dedent(f"""
Found item:
title: {bookcase_item.name}
authors: {', '.join(a.name for a in bookcase_item.authors)}
authors: {", ".join(a.name for a in bookcase_item.authors)}
language: {bookcase_item.language}
"""))
""")
)
print('Please select the bookcase where the item is placed:')
print("Please select the bookcase where the item is placed:")
bookcase_selector = InteractiveItemSelector(
cls = Bookcase,
sql_session = self.sql_session,
cls=Bookcase,
sql_session=self.sql_session,
)
bookcase_selector.cmdloop()
bookcase = bookcase_selector.result
bookcase_item.shelf = select_bookcase_shelf(bookcase, self.sql_session)
print('Please select the items media type:')
print("Please select the items media type:")
media_type_selector = InteractiveItemSelector(
cls = MediaType,
sql_session = self.sql_session,
default = self.sql_session.scalars(
select(MediaType)
.where(MediaType.name.ilike("book")),
cls=MediaType,
sql_session=self.sql_session,
default=self.sql_session.scalars(
select(MediaType).where(MediaType.name.ilike("book")),
).one(),
)
media_type_selector.cmdloop()
bookcase_item.media_type = media_type_selector.result
username = input('Who owns this book? [PVV]> ')
if username != '':
username = input("Who owns this book? [PVV]> ")
if username != "":
bookcase_item.owner = username
self.sql_session.add(bookcase_item)
self.sql_session.flush()
def default(self, isbn: str):
isbn = isbn.strip()
if not is_valid_isbn(isbn):
super()._default(isbn)
return
if (existing_item := self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.isbn == isbn)
.join(BookcaseItemBorrowing)
.join(BookcaseItemBorrowingQueue)
).one_or_none()) is not None:
if (
existing_item := self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.isbn == isbn)
.join(BookcaseItemBorrowing)
.join(BookcaseItemBorrowingQueue)
).one_or_none()
) is not None:
print(f'\nFound existing item for isbn "{isbn}"')
BookcaseItemCli(
sql_session = self.sql_session,
bookcase_item = existing_item,
sql_session=self.sql_session,
bookcase_item=existing_item,
).cmdloop()
return
if prompt_yes_no(f"Could not find item with ISBN '{isbn}'.\nWould you like to create it?", default=True):
if prompt_yes_no(
f"Could not find item with ISBN '{isbn}'.\nWould you like to create it?",
default=True,
):
self._create_bookcase_item(isbn)
def do_search(self, _: str):
search_cli = SearchCli(self.sql_session)
search_cli.cmdloop()
if search_cli.result is not None:
BookcaseItemCli(
sql_session = self.sql_session,
bookcase_item = search_cli.result,
sql_session=self.sql_session,
bookcase_item=search_cli.result,
).cmdloop()
def do_show_slabbedasker(self, _: str):
slubberter = self.sql_session.scalars(
select(BookcaseItemBorrowing)
@ -201,76 +211,73 @@ class WorblehatCli(NumberedCmd):
).all()
if len(slubberter) == 0:
print('No slubberts found. Life is good.')
print("No slubberts found. Life is good.")
return
for slubbert in slubberter:
print('Slubberter:')
print(f'- {slubbert.username} - {slubbert.item.name} - {slubbert.end_time.strftime("%Y-%m-%d")}')
print("Slubberter:")
print(
f"- {slubbert.username} - {slubbert.item.name} - {slubbert.end_time.strftime('%Y-%m-%d')}"
)
def do_advanced(self, _: str):
AdvancedOptionsCli(self.sql_session).cmdloop()
def do_save(self, _:str):
def do_save(self, _: str):
if not self.sql_session_dirty:
print('No changes to save.')
print("No changes to save.")
return
self.sql_session.commit()
def do_abort(self, _:str):
def do_abort(self, _: str):
if not self.sql_session_dirty:
print('No changes to abort.')
print("No changes to abort.")
return
self.sql_session.rollback()
def do_exit(self, _: str):
if self.sql_session_dirty:
if prompt_yes_no('Would you like to save your changes?'):
if prompt_yes_no("Would you like to save your changes?"):
self.sql_session.commit()
else:
self.sql_session.rollback()
exit(0)
funcs = {
0: {
'f': default,
'doc': 'Choose / Add item with its ISBN',
"f": default,
"doc": "Choose / Add item with its ISBN",
},
1: {
'f': do_search,
'doc': 'Search',
"f": do_search,
"doc": "Search",
},
2: {
'f': do_show_bookcase,
'doc': 'Show a bookcase, and its items',
"f": do_show_bookcase,
"doc": "Show a bookcase, and its items",
},
3: {
'f': do_show_borrowed_queued,
'doc': 'Show borrowed/queued items',
"f": do_show_borrowed_queued,
"doc": "Show borrowed/queued items",
},
4: {
'f': do_show_slabbedasker,
'doc': 'Show slabbedasker',
"f": do_show_slabbedasker,
"doc": "Show slabbedasker",
},
5: {
'f': do_save,
'doc': 'Save changes',
"f": do_save,
"doc": "Save changes",
},
6: {
'f': do_abort,
'doc': 'Abort changes',
"f": do_abort,
"doc": "Abort changes",
},
7: {
'f': do_advanced,
'doc': 'Advanced options',
"f": do_advanced,
"doc": "Advanced options",
},
9: {
'f': do_exit,
'doc': 'Exit',
"f": do_exit,
"doc": "Exit",
},
}

@ -1,4 +1,11 @@
from .advanced_options import AdvancedOptionsCli
from .bookcase_item import BookcaseItemCli
from .bookcase_shelf_selector import select_bookcase_shelf
from .search import SearchCli
from .search import SearchCli
__all__ = [
"AdvancedOptionsCli",
"BookcaseItemCli",
"select_bookcase_shelf",
"SearchCli",
]

@ -4,81 +4,84 @@ from sqlalchemy.orm import Session
from libdib.repl import (
InteractiveItemSelector,
NumberedCmd,
format_date,
prompt_yes_no,
)
from worblehat.models import Bookcase, BookcaseShelf
class AdvancedOptionsCli(NumberedCmd):
def __init__(self, sql_session: Session):
super().__init__()
self.sql_session = sql_session
def do_add_bookcase(self, _: str):
while True:
name = input('Name of bookcase> ')
if name == '':
print('Error: name cannot be empty')
name = input("Name of bookcase> ")
if name == "":
print("Error: name cannot be empty")
continue
if self.sql_session.scalars(
select(Bookcase)
.where(Bookcase.name == name)
).one_or_none() is not None:
print(f'Error: a bookcase with name {name} already exists')
if (
self.sql_session.scalars(
select(Bookcase).where(Bookcase.name == name)
).one_or_none()
is not None
):
print(f"Error: a bookcase with name {name} already exists")
continue
break
description = input('Description of bookcase> ')
if description == '':
description = input("Description of bookcase> ")
if description == "":
description = None
bookcase = Bookcase(name, description)
self.sql_session.add(bookcase)
self.sql_session.flush()
def do_add_bookcase_shelf(self, arg: str):
bookcase_selector = InteractiveItemSelector(
cls = Bookcase,
sql_session = self.sql_session,
cls=Bookcase,
sql_session=self.sql_session,
)
bookcase_selector.cmdloop()
bookcase = bookcase_selector.result
while True:
column = input('Column> ')
column = input("Column> ")
try:
column = int(column)
except ValueError:
print('Error: column must be a number')
print("Error: column must be a number")
continue
break
while True:
row = input('Row> ')
row = input("Row> ")
try:
row = int(row)
except ValueError:
print('Error: row must be a number')
print("Error: row must be a number")
continue
break
if self.sql_session.scalars(
select(BookcaseShelf)
.where(
BookcaseShelf.bookcase == bookcase,
BookcaseShelf.column == column,
BookcaseShelf.row == row,
if (
self.sql_session.scalars(
select(BookcaseShelf).where(
BookcaseShelf.bookcase == bookcase,
BookcaseShelf.column == column,
BookcaseShelf.row == row,
)
).one_or_none()
is not None
):
print(
f"Error: a bookshelf in bookcase {bookcase.name} with position c{column}-r{row} already exists"
)
).one_or_none() is not None:
print(f'Error: a bookshelf in bookcase {bookcase.name} with position c{column}-r{row} already exists')
return
description = input('Description> ')
if description == '':
description = input("Description> ")
if description == "":
description = None
shelf = BookcaseShelf(
@ -90,15 +93,14 @@ class AdvancedOptionsCli(NumberedCmd):
self.sql_session.add(shelf)
self.sql_session.flush()
def do_list_bookcases(self, _: str):
bookcase_shelfs = self.sql_session.scalars(
select(BookcaseShelf)
.join(Bookcase)
.order_by(
Bookcase.name,
BookcaseShelf.column,
BookcaseShelf.row,
Bookcase.name,
BookcaseShelf.column,
BookcaseShelf.row,
)
).all()
@ -108,28 +110,26 @@ class AdvancedOptionsCli(NumberedCmd):
print(shelf.bookcase.short_str())
bookcase_uid = shelf.bookcase.uid
print(f' {shelf.short_str()} - {sum(i.amount for i in shelf.items)} items')
print(f" {shelf.short_str()} - {sum(i.amount for i in shelf.items)} items")
def do_done(self, _: str):
return True
funcs = {
1: {
'f': do_add_bookcase,
'doc': 'Add bookcase',
"f": do_add_bookcase,
"doc": "Add bookcase",
},
2: {
'f': do_add_bookcase_shelf,
'doc': 'Add bookcase shelf',
"f": do_add_bookcase_shelf,
"doc": "Add bookcase shelf",
},
3: {
'f': do_list_bookcases,
'doc': 'List all bookcases',
"f": do_list_bookcases,
"doc": "List all bookcases",
},
9: {
'f': do_done,
'doc': 'Done',
"f": do_done,
"doc": "Done",
},
}

@ -27,16 +27,18 @@ from worblehat.services.config import Config
from .bookcase_shelf_selector import select_bookcase_shelf
def _selected_bookcase_item_prompt(bookcase_item: BookcaseItem) -> str:
amount_borrowed = len(bookcase_item.borrowings)
return dedent(f'''
return dedent(f"""
Item: {bookcase_item.name}
ISBN: {bookcase_item.isbn}
Authors: {', '.join(a.name for a in bookcase_item.authors)}
Authors: {", ".join(a.name for a in bookcase_item.authors)}
Bookcase: {bookcase_item.shelf.bookcase.short_str()}
Shelf: {bookcase_item.shelf.short_str()}
Amount: {bookcase_item.amount - amount_borrowed}/{bookcase_item.amount}
''')
""")
class BookcaseItemCli(NumberedCmd):
def __init__(self, sql_session: Session, bookcase_item: BookcaseItem):
@ -44,12 +46,10 @@ class BookcaseItemCli(NumberedCmd):
self.sql_session = sql_session
self.bookcase_item = bookcase_item
@property
def prompt_header(self) -> str:
return _selected_bookcase_item_prompt(self.bookcase_item)
def do_update_data(self, _: str):
item = create_bookcase_item_from_isbn(self.sql_session, self.bookcase_item.isbn)
self.bookcase_item.name = item.name
@ -58,80 +58,83 @@ class BookcaseItemCli(NumberedCmd):
self.bookcase_item.language = item.language
self.sql_session.flush()
def do_edit(self, arg: str):
EditBookcaseCli(self.sql_session, self.bookcase_item, self).cmdloop()
@staticmethod
def _prompt_username() -> str:
while True:
username = input('Username: ')
if prompt_yes_no(f'Is {username} correct?', default = True):
username = input("Username: ")
if prompt_yes_no(f"Is {username} correct?", default=True):
return username
def _has_active_borrowing(self, username: str) -> bool:
return self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(
BookcaseItemBorrowing.username == username,
BookcaseItemBorrowing.item == self.bookcase_item,
BookcaseItemBorrowing.delivered.is_(None),
)
).one_or_none() is not None
return (
self.sql_session.scalars(
select(BookcaseItemBorrowing).where(
BookcaseItemBorrowing.username == username,
BookcaseItemBorrowing.item == self.bookcase_item,
BookcaseItemBorrowing.delivered.is_(None),
)
).one_or_none()
is not None
)
def _has_borrowing_queue_item(self, username: str) -> bool:
return self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.where(
BookcaseItemBorrowingQueue.username == username,
BookcaseItemBorrowingQueue.item == self.bookcase_item,
)
).one_or_none() is not None
return (
self.sql_session.scalars(
select(BookcaseItemBorrowingQueue).where(
BookcaseItemBorrowingQueue.username == username,
BookcaseItemBorrowingQueue.item == self.bookcase_item,
)
).one_or_none()
is not None
)
def do_borrow(self, _: str):
active_borrowings = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(
BookcaseItemBorrowing.item == self.bookcase_item,
BookcaseItemBorrowing.delivered.is_(None),
BookcaseItemBorrowing.item == self.bookcase_item,
BookcaseItemBorrowing.delivered.is_(None),
)
.order_by(BookcaseItemBorrowing.end_time)
).all()
if len(active_borrowings) >= self.bookcase_item.amount:
print('This item is currently not available')
print("This item is currently not available")
print()
print('Active borrowings:')
print("Active borrowings:")
for b in active_borrowings:
print(f' {b.username} - Until {format_date(b.end_time)}')
print(f" {b.username} - Until {format_date(b.end_time)}")
if len(self.bookcase_item.borrowing_queue) > 0:
print('Borrowing queue:')
print("Borrowing queue:")
for i, b in enumerate(self.bookcase_item.borrowing_queue):
print(f' {i + 1} - {b.username}')
print(f" {i + 1} - {b.username}")
print()
if not prompt_yes_no('Would you like to enter the borrowing queue?', default = True):
if not prompt_yes_no(
"Would you like to enter the borrowing queue?", default=True
):
return
username = self._prompt_username()
if self._has_active_borrowing(username):
print('You already have an active borrowing')
print("You already have an active borrowing")
return
if self._has_borrowing_queue_item(username):
print('You are already in the borrowing queue')
print("You are already in the borrowing queue")
return
borrowing_queue_item = BookcaseItemBorrowingQueue(username, self.bookcase_item)
borrowing_queue_item = BookcaseItemBorrowingQueue(
username, self.bookcase_item
)
self.sql_session.add(borrowing_queue_item)
print(f'{username} entered the queue!')
print(f"{username} entered the queue!")
return
username = self._prompt_username()
@ -139,33 +142,38 @@ class BookcaseItemCli(NumberedCmd):
borrowing_item = BookcaseItemBorrowing(username, self.bookcase_item)
self.sql_session.add(borrowing_item)
self.sql_session.flush()
print(f'Successfully borrowed the item. Please deliver it back by {format_date(borrowing_item.end_time)}')
print(
f"Successfully borrowed the item. Please deliver it back by {format_date(borrowing_item.end_time)}"
)
def do_deliver(self, _: str):
borrowings = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.join(BookcaseItem, BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid)
.join(
BookcaseItem,
BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid,
)
.where(BookcaseItem.isbn == self.bookcase_item.isbn)
.order_by(BookcaseItemBorrowing.username)
).all()
if len(borrowings) == 0:
print('No one seems to have borrowed this item')
print("No one seems to have borrowed this item")
return
print('Borrowers:')
print("Borrowers:")
for i, b in enumerate(borrowings):
print(f' {i + 1}) {b.username}')
print(f" {i + 1}) {b.username}")
while True:
try:
selection = int(input('> '))
selection = int(input("> "))
except ValueError:
print('Error: selection must be an integer')
print("Error: selection must be an integer")
continue
if selection < 1 or selection > len(borrowings):
print('Error: selection out of range')
print("Error: selection out of range")
continue
break
@ -173,19 +181,21 @@ class BookcaseItemCli(NumberedCmd):
borrowing = borrowings[selection - 1]
borrowing.delivered = datetime.now()
self.sql_session.flush()
print(f'Successfully delivered the item for {borrowing.username}')
print(f"Successfully delivered the item for {borrowing.username}")
def do_extend_borrowing(self, _: str):
borrowings = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.join(BookcaseItem, BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid)
.join(
BookcaseItem,
BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid,
)
.where(BookcaseItem.isbn == self.bookcase_item.isbn)
.order_by(BookcaseItemBorrowing.username)
).all()
if len(borrowings) == 0:
print('No one seems to have borrowed this item')
print("No one seems to have borrowed this item")
return
borrowing_queue = self.sql_session.scalars(
@ -198,61 +208,68 @@ class BookcaseItemCli(NumberedCmd):
).all()
if len(borrowing_queue) != 0:
print('Sorry, you cannot extend the borrowing because there are people waiting in the queue')
print('Borrowing queue:')
for i, b in enumerate(borrowing_queue):
print(f' {i + 1}) {b.username}')
return
print(
"Sorry, you cannot extend the borrowing because there are people waiting in the queue"
)
print("Borrowing queue:")
for i, b in enumerate(borrowing_queue):
print(f" {i + 1}) {b.username}")
return
print('Who are you?')
print("Who are you?")
selector = NumberedItemSelector(
items = list(borrowings),
stringify = lambda b: f'{b.username} - Until {format_date(b.end_time)}',
items=list(borrowings),
stringify=lambda b: f"{b.username} - Until {format_date(b.end_time)}",
)
selector.cmdloop()
if selector.result is None:
return
borrowing = selector.result
borrowing.end_time = datetime.now() + timedelta(days=int(Config['deadline_daemon.days_before_queue_position_expires']))
borrowing.end_time = datetime.now() + timedelta(
days=int(Config["deadline_daemon.days_before_queue_position_expires"])
)
self.sql_session.flush()
print(f'Successfully extended the borrowing for {borrowing.username} until {format_date(borrowing.end_time)}')
print(
f"Successfully extended the borrowing for {borrowing.username} until {format_date(borrowing.end_time)}"
)
def do_done(self, _: str):
return True
funcs = {
1: {
'f': do_borrow,
'doc': 'Borrow',
"f": do_borrow,
"doc": "Borrow",
},
2: {
'f': do_deliver,
'doc': 'Deliver',
"f": do_deliver,
"doc": "Deliver",
},
3: {
'f': do_extend_borrowing,
'doc': 'Extend borrowing',
"f": do_extend_borrowing,
"doc": "Extend borrowing",
},
4: {
'f': do_edit,
'doc': 'Edit',
"f": do_edit,
"doc": "Edit",
},
5: {
'f': do_update_data,
'doc': 'Pull updated data from online databases',
"f": do_update_data,
"doc": "Pull updated data from online databases",
},
9: {
'f': do_done,
'doc': 'Done',
"f": do_done,
"doc": "Done",
},
}
class EditBookcaseCli(NumberedCmd):
def __init__(self, sql_session: Session, bookcase_item: BookcaseItem, parent: BookcaseItemCli):
def __init__(
self, sql_session: Session, bookcase_item: BookcaseItem, parent: BookcaseItemCli
):
super().__init__()
self.sql_session = sql_session
self.bookcase_item = bookcase_item
@ -260,54 +277,56 @@ class EditBookcaseCli(NumberedCmd):
@property
def prompt_header(self) -> str:
return _selected_bookcase_item_prompt(self.bookcase_item)
return _selected_bookcase_item_prompt(self.bookcase_item)
def do_name(self, _: str):
while True:
name = input('New name> ')
if name == '':
print('Error: name cannot be empty')
name = input("New name> ")
if name == "":
print("Error: name cannot be empty")
continue
if self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.name == name)
).one_or_none() is not None:
print(f'Error: an item with name {name} already exists')
if (
self.sql_session.scalars(
select(BookcaseItem).where(BookcaseItem.name == name)
).one_or_none()
is not None
):
print(f"Error: an item with name {name} already exists")
continue
break
self.bookcase_item.name = name
self.sql_session.flush()
def do_isbn(self, _: str):
while True:
isbn = input('New ISBN> ')
if isbn == '':
print('Error: ISBN cannot be empty')
isbn = input("New ISBN> ")
if isbn == "":
print("Error: ISBN cannot be empty")
continue
if not is_valid_isbn(isbn):
print('Error: ISBN is not valid')
print("Error: ISBN is not valid")
continue
if self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.isbn == isbn)
).one_or_none() is not None:
print(f'Error: an item with ISBN {isbn} already exists')
if (
self.sql_session.scalars(
select(BookcaseItem).where(BookcaseItem.isbn == isbn)
).one_or_none()
is not None
):
print(f"Error: an item with ISBN {isbn} already exists")
continue
break
self.bookcase_item.isbn = isbn
if prompt_yes_no('Update data from online databases?'):
self.parent.do_update_data('')
if prompt_yes_no("Update data from online databases?"):
self.parent.do_update_data("")
self.sql_session.flush()
def do_language(self, _: str):
language_selector = InteractiveItemSelector(
Language,
@ -317,7 +336,6 @@ class EditBookcaseCli(NumberedCmd):
self.bookcase_item.language = language_selector.result
self.sql_session.flush()
def do_media_type(self, _: str):
media_type_selector = InteractiveItemSelector(
MediaType,
@ -327,24 +345,24 @@ class EditBookcaseCli(NumberedCmd):
self.bookcase_item.media_type = media_type_selector.result
self.sql_session.flush()
def do_amount(self, _: str):
while (new_amount := input(f'New amount [{self.bookcase_item.amount}]> ')) != '':
while (
new_amount := input(f"New amount [{self.bookcase_item.amount}]> ")
) != "":
try:
new_amount = int(new_amount)
except ValueError:
print('Error: amount must be an integer')
print("Error: amount must be an integer")
continue
if new_amount < 1:
print('Error: amount must be greater than 0')
print("Error: amount must be greater than 0")
continue
break
self.bookcase_item.amount = new_amount
self.sql_session.flush()
def do_shelf(self, _: str):
bookcase_selector = InteractiveItemSelector(
Bookcase,
@ -358,38 +376,36 @@ class EditBookcaseCli(NumberedCmd):
self.bookcase_item.shelf = shelf
self.sql_session.flush()
def do_done(self, _: str):
return True
funcs = {
1: {
'f': do_name,
'doc': 'Change name',
"f": do_name,
"doc": "Change name",
},
2: {
'f': do_isbn,
'doc': 'Change ISBN',
"f": do_isbn,
"doc": "Change ISBN",
},
3: {
'f': do_language,
'doc': 'Change language',
"f": do_language,
"doc": "Change language",
},
4: {
'f': do_media_type,
'doc': 'Change media type',
"f": do_media_type,
"doc": "Change media type",
},
5: {
'f': do_amount,
'doc': 'Change amount',
"f": do_amount,
"doc": "Change amount",
},
6: {
'f': do_shelf,
'doc': 'Change shelf',
"f": do_shelf,
"doc": "Change shelf",
},
9: {
'f': do_done,
'doc': 'Done',
"f": do_done,
"doc": "Done",
},
}

@ -8,16 +8,17 @@ from worblehat.models import (
BookcaseShelf,
)
def select_bookcase_shelf(
bookcase: Bookcase,
sql_session: Session,
prompt: str = "Please select the shelf where the item is placed (col-row):"
prompt: str = "Please select the shelf where the item is placed (col-row):",
) -> BookcaseShelf:
def __complete_bookshelf_selection(session: Session, cls: type, arg: str):
args = arg.split('-')
args = arg.split("-")
query = select(cls.row, cls.column).where(cls.bookcase == bookcase)
try:
if arg != '' and len(args) > 0:
if arg != "" and len(args) > 0:
query = query.where(cls.column == int(args[0]))
if len(args) > 1:
query = query.where(cls.row == int(args[1]))
@ -25,21 +26,20 @@ def select_bookcase_shelf(
return []
result = session.execute(query).all()
return [f"{c}-{r}" for r,c in result]
return [f"{c}-{r}" for r, c in result]
print(prompt)
bookcase_shelf_selector = InteractiveItemSelector(
cls = BookcaseShelf,
sql_session = sql_session,
execute_selection = lambda session, cls, arg: session.scalars(
select(cls)
.where(
cls.bookcase == bookcase,
cls.column == int(arg.split('-')[0]),
cls.row == int(arg.split('-')[1]),
cls=BookcaseShelf,
sql_session=sql_session,
execute_selection=lambda session, cls, arg: session.scalars(
select(cls).where(
cls.bookcase == bookcase,
cls.column == int(arg.split("-")[0]),
cls.row == int(arg.split("-")[1]),
)
).all(),
complete_selection = __complete_bookshelf_selection,
complete_selection=__complete_bookshelf_selection,
)
bookcase_shelf_selector.cmdloop()

@ -15,54 +15,51 @@ class SearchCli(NumberedCmd):
self.sql_session = sql_session
self.result = None
def do_search_all(self, _: str):
print('TODO: Implement search all')
print("TODO: Implement search all")
def do_search_title(self, _: str):
while (input_text := input('Enter title: ')) == '':
while (input_text := input("Enter title: ")) == "":
pass
items = self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.name.ilike(f'%{input_text}%')),
select(BookcaseItem).where(BookcaseItem.name.ilike(f"%{input_text}%")),
).all()
if len(items) == 0:
print('No items found.')
print("No items found.")
return
selector = NumberedItemSelector(
items = items,
stringify = lambda item: f"{item.name} ({item.isbn})",
items=items,
stringify=lambda item: f"{item.name} ({item.isbn})",
)
selector.cmdloop()
if selector.result is not None:
self.result = selector.result
return True
def do_search_author(self, _: str):
while (input_text := input('Enter author name: ')) == '':
while (input_text := input("Enter author name: ")) == "":
pass
author = self.sql_session.scalars(
select(Author)
.where(Author.name.ilike(f'%{input_text}%')),
select(Author).where(Author.name.ilike(f"%{input_text}%")),
).all()
if len(author) == 0:
print('No authors found.')
print("No authors found.")
return
elif len(author) == 1:
selected_author = author[0]
print('Found author:')
print(f" {selected_author.name} ({sum(item.amount for item in selected_author.items)} items)")
print("Found author:")
print(
f" {selected_author.name} ({sum(item.amount for item in selected_author.items)} items)"
)
else:
selector = NumberedItemSelector(
items = author,
stringify = lambda author: f"{author.name} ({sum(item.amount for item in author.items)} items)",
items=author,
stringify=lambda author: f"{author.name} ({sum(item.amount for item in author.items)} items)",
)
selector.cmdloop()
if selector.result is None:
@ -70,77 +67,73 @@ class SearchCli(NumberedCmd):
selected_author = selector.result
selector = NumberedItemSelector(
items = list(selected_author.items),
stringify = lambda item: f"{item.name} ({item.isbn})",
items=list(selected_author.items),
stringify=lambda item: f"{item.name} ({item.isbn})",
)
selector.cmdloop()
if selector.result is not None:
self.result = selector.result
return True
def do_search_owner(self, _: str):
while (input_text := input('Enter username: ')) == '':
while (input_text := input("Enter username: ")) == "":
pass
users = self.sql_session.scalars(
select(BookcaseItem.owner)
.where(BookcaseItem.owner.ilike(f'%{input_text}%'))
.where(BookcaseItem.owner.ilike(f"%{input_text}%"))
.distinct(),
).all()
if len(users) == 0:
print('No users found.')
print("No users found.")
return
elif len(users) == 1:
selected_user = users[0]
print('Found user:')
print("Found user:")
print(f" {selected_user}")
else:
selector = NumberedItemSelector(items = users)
selector = NumberedItemSelector(items=users)
selector.cmdloop()
if selector.result is None:
return
selected_user = selector.result
items = self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.owner == selected_user),
select(BookcaseItem).where(BookcaseItem.owner == selected_user),
).all()
selector = NumberedItemSelector(
items = items,
stringify = lambda item: f"{item.name} ({item.isbn})",
items=items,
stringify=lambda item: f"{item.name} ({item.isbn})",
)
selector.cmdloop()
if selector.result is not None:
self.result = selector.result
return True
def do_done(self, _: str):
return True
funcs = {
1: {
'f': do_search_all,
'doc': 'Search everything',
"f": do_search_all,
"doc": "Search everything",
},
2: {
'f': do_search_title,
'doc': 'Search by title',
"f": do_search_title,
"doc": "Search by title",
},
3: {
'f': do_search_author,
'doc': 'Search by author',
"f": do_search_author,
"doc": "Search by author",
},
4: {
'f': do_search_owner,
'doc': 'Search by owner',
"f": do_search_owner,
"doc": "Search by owner",
},
9: {
'f': do_done,
'doc': 'Done',
"f": do_done,
"doc": "Done",
},
}

@ -1 +1,3 @@
from .main import DeadlineDaemon
from .main import DeadlineDaemon
__all__ = ["DeadlineDaemon"]

@ -14,9 +14,10 @@ from worblehat.models import (
from worblehat.services.email import send_email
class DeadlineDaemon:
def __init__(self, sql_session: Session):
if not Config['deadline_daemon.enabled']:
if not Config["deadline_daemon.enabled"]:
return
self.sql_session = sql_session
@ -26,7 +27,7 @@ class DeadlineDaemon:
).one_or_none()
if self.last_run is None:
logging.info('No previous run found, assuming this is the first run')
logging.info("No previous run found, assuming this is the first run")
self.last_run = DeadlineDaemonLastRunDatetime(time=datetime.now())
self.sql_session.add(self.last_run)
self.sql_session.commit()
@ -34,15 +35,14 @@ class DeadlineDaemon:
self.last_run_datetime = self.last_run.time
self.current_run_datetime = datetime.now()
def run(self):
logging.info('Deadline daemon started')
if not Config['deadline_daemon.enabled']:
logging.warn('Deadline daemon disabled, exiting')
logging.info("Deadline daemon started")
if not Config["deadline_daemon.enabled"]:
logging.warn("Deadline daemon disabled, exiting")
return
if Config['deadline_daemon.dryrun']:
logging.warn('Running in dryrun mode')
if Config["deadline_daemon.dryrun"]:
logging.warn("Running in dryrun mode")
self.send_close_deadline_reminder_mails()
self.send_overdue_mails()
@ -58,78 +58,91 @@ class DeadlineDaemon:
###################
def _send_close_deadline_mail(self, borrowing: BookcaseItemBorrowing):
logging.info(f'Sending close deadline mail to {borrowing.username}@pvv.ntnu.no.')
logging.info(
f"Sending close deadline mail to {borrowing.username}@pvv.ntnu.no."
)
send_email(
f'{borrowing.username}@pvv.ntnu.no',
'Reminder - Your borrowing deadline is approaching',
dedent(f'''
f"{borrowing.username}@pvv.ntnu.no",
"Reminder - Your borrowing deadline is approaching",
dedent(
f"""
Your borrowing deadline for the following item is approaching:
{borrowing.item.name}
Please return the item by {borrowing.end_time.strftime("%a %b %d, %Y")}
''',
""",
).strip(),
)
def _send_overdue_mail(self, borrowing: BookcaseItemBorrowing):
logging.info(f'Sending overdue mail to {borrowing.username}@pvv.ntnu.no for {borrowing.item.isbn} - {borrowing.end_time.strftime("%a %b %d, %Y")}')
logging.info(
f"Sending overdue mail to {borrowing.username}@pvv.ntnu.no for {borrowing.item.isbn} - {borrowing.end_time.strftime('%a %b %d, %Y')}"
)
send_email(
f'{borrowing.username}@pvv.ntnu.no',
'Your deadline has passed',
dedent(f'''
f"{borrowing.username}@pvv.ntnu.no",
"Your deadline has passed",
dedent(
f"""
Your delivery deadline for the following item has passed:
{borrowing.item.name}
Please return the item as soon as possible.
''',
""",
).strip(),
)
def _send_newly_available_mail(self, queue_item: BookcaseItemBorrowingQueue):
logging.info(f'Sending newly available mail to {queue_item.username}')
logging.info(f"Sending newly available mail to {queue_item.username}")
days_before_queue_expires = Config['deadline_daemon.days_before_queue_position_expires']
days_before_queue_expires = Config[
"deadline_daemon.days_before_queue_position_expires"
]
# TODO: calculate and format the date of when the queue position expires in the mail.
send_email(
f'{queue_item.username}@pvv.ntnu.no',
'An item you have queued for is now available',
dedent(f'''
f"{queue_item.username}@pvv.ntnu.no",
"An item you have queued for is now available",
dedent(
f"""
The following item is now available for you to borrow:
{queue_item.item.name}
Please pick up the item within {days_before_queue_expires} days.
''',
""",
).strip(),
)
def _send_expiring_queue_position_mail(self, queue_position: BookcaseItemBorrowingQueue, day: int):
logging.info(f'Sending queue position expiry reminder to {queue_position.username}@pvv.ntnu.no.')
def _send_expiring_queue_position_mail(
self, queue_position: BookcaseItemBorrowingQueue, day: int
):
logging.info(
f"Sending queue position expiry reminder to {queue_position.username}@pvv.ntnu.no."
)
send_email(
f'{queue_position.username}@pvv.ntnu.no',
'Reminder - Your queue position expiry deadline is approaching',
dedent(f'''
f"{queue_position.username}@pvv.ntnu.no",
"Reminder - Your queue position expiry deadline is approaching",
dedent(
f"""
Your queue position expiry deadline for the following item is approaching:
{queue_position.item.name}
Please borrow the item by {(queue_position.item_became_available_time + timedelta(days=day)).strftime("%a %b %d, %Y")}
''',
""",
).strip(),
)
def _send_queue_position_expired_mail(self, queue_position: BookcaseItemBorrowingQueue):
def _send_queue_position_expired_mail(
self, queue_position: BookcaseItemBorrowingQueue
):
send_email(
f'{queue_position.username}@pvv.ntnu.no',
'Your queue position has expired',
dedent(f'''
f"{queue_position.username}@pvv.ntnu.no",
"Your queue position has expired",
dedent(
f"""
Your queue position for the following item has expired:
{queue_position.item.name}
@ -137,7 +150,7 @@ class DeadlineDaemon:
You can queue for the item again at any time, but you will be placed at the back of the queue.
There are currently {len(queue_position.item.borrowing_queue)} users in the queue.
''',
""",
).strip(),
)
@ -146,30 +159,32 @@ class DeadlineDaemon:
##################
def _sql_subtract_date(self, x: datetime, y: timedelta):
if self.sql_session.bind.dialect.name == 'sqlite':
if self.sql_session.bind.dialect.name == "sqlite":
# SQLite does not support timedelta in queries
return func.datetime(x, f'-{y.days} days')
elif self.sql_session.bind.dialect.name == 'postgresql':
return func.datetime(x, f"-{y.days} days")
elif self.sql_session.bind.dialect.name == "postgresql":
return x - y
else:
raise NotImplementedError(f'Unsupported dialect: {self.sql_session.bind.dialect.name}')
raise NotImplementedError(
f"Unsupported dialect: {self.sql_session.bind.dialect.name}"
)
def send_close_deadline_reminder_mails(self):
logging.info('Sending mails for items with a closing deadline')
logging.info("Sending mails for items with a closing deadline")
# TODO: This should be int-parsed and validated before the daemon started
days = [int(d) for d in Config['deadline_daemon.warn_days_before_borrowing_deadline']]
days = [
int(d)
for d in Config["deadline_daemon.warn_days_before_borrowing_deadline"]
]
for day in days:
borrowings_to_remind = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(
select(BookcaseItemBorrowing).where(
self._sql_subtract_date(
BookcaseItemBorrowing.end_time,
timedelta(days=day),
)
.between(
).between(
self.last_run_datetime,
self.current_run_datetime,
),
@ -179,13 +194,11 @@ class DeadlineDaemon:
for borrowing in borrowings_to_remind:
self._send_close_deadline_mail(borrowing)
def send_overdue_mails(self):
logging.info('Sending mails for overdue items')
logging.info("Sending mails for overdue items")
to_remind = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(
select(BookcaseItemBorrowing).where(
BookcaseItemBorrowing.end_time < self.current_run_datetime,
BookcaseItemBorrowing.delivered.is_(None),
)
@ -194,15 +207,15 @@ class DeadlineDaemon:
for borrowing in to_remind:
self._send_overdue_mail(borrowing)
def send_newly_available_mails(self):
logging.info('Sending mails about newly available items')
logging.info("Sending mails about newly available items")
newly_available = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.join(
BookcaseItemBorrowing,
BookcaseItemBorrowing.fk_bookcase_item_uid == BookcaseItemBorrowingQueue.fk_bookcase_item_uid,
BookcaseItemBorrowing.fk_bookcase_item_uid
== BookcaseItemBorrowingQueue.fk_bookcase_item_uid,
)
.where(
BookcaseItemBorrowingQueue.expired.is_(False),
@ -217,31 +230,38 @@ class DeadlineDaemon:
).all()
for queue_item in newly_available:
logging.info(f'Adding user {queue_item.username} to queue for {queue_item.item.name}')
logging.info(
f"Adding user {queue_item.username} to queue for {queue_item.item.name}"
)
queue_item.item_became_available_time = self.current_run_datetime
self.sql_session.commit()
self._send_newly_available_mail(queue_item)
def send_expiring_queue_position_mails(self):
logging.info('Sending mails about queue positions which are expiring soon')
logging.warning('Not implemented')
logging.info("Sending mails about queue positions which are expiring soon")
logging.warning("Not implemented")
days = [int(d) for d in Config['deadline_daemon.warn_days_before_expiring_queue_position_deadline']]
days = [
int(d)
for d in Config[
"deadline_daemon.warn_days_before_expiring_queue_position_deadline"
]
]
for day in days:
queue_positions_to_remind = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.join(
BookcaseItemBorrowing,
BookcaseItemBorrowing.fk_bookcase_item_uid == BookcaseItemBorrowingQueue.fk_bookcase_item_uid,
BookcaseItemBorrowing.fk_bookcase_item_uid
== BookcaseItemBorrowingQueue.fk_bookcase_item_uid,
)
.where(
self._sql_subtract_date(
BookcaseItemBorrowingQueue.item_became_available_time + timedelta(days=day),
BookcaseItemBorrowingQueue.item_became_available_time
+ timedelta(days=day),
timedelta(days=day),
)
.between(
).between(
self.last_run_datetime,
self.current_run_datetime,
),
@ -251,29 +271,34 @@ class DeadlineDaemon:
for queue_position in queue_positions_to_remind:
self._send_expiring_queue_position_mail(queue_position, day)
def auto_expire_queue_positions(self):
logging.info('Expiring queue positions which are too old')
logging.info("Expiring queue positions which are too old")
queue_position_expiry_days = int(Config['deadline_daemon.days_before_queue_position_expires'])
queue_position_expiry_days = int(
Config["deadline_daemon.days_before_queue_position_expires"]
)
overdue_queue_positions = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.where(
BookcaseItemBorrowingQueue.item_became_available_time + timedelta(days=queue_position_expiry_days) < self.current_run_datetime,
select(BookcaseItemBorrowingQueue).where(
BookcaseItemBorrowingQueue.item_became_available_time
+ timedelta(days=queue_position_expiry_days)
< self.current_run_datetime,
BookcaseItemBorrowingQueue.expired.is_(False),
),
).all()
for queue_position in overdue_queue_positions:
logging.info(f'Expiring queue position for {queue_position.username} for item {queue_position.item.name}')
logging.info(
f"Expiring queue position for {queue_position.username} for item {queue_position.item.name}"
)
queue_position.expired = True
next_queue_position = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.where(
BookcaseItemBorrowingQueue.fk_bookcase_item_uid == queue_position.fk_bookcase_item_uid,
BookcaseItemBorrowingQueue.fk_bookcase_item_uid
== queue_position.fk_bookcase_item_uid,
BookcaseItemBorrowingQueue.item_became_available_time.is_(None),
)
.order_by(BookcaseItemBorrowingQueue.entered_queue_time)
@ -283,9 +308,13 @@ class DeadlineDaemon:
self._send_queue_position_expired_mail(queue_position)
if next_queue_position is not None:
next_queue_position.item_became_available_time = self.current_run_datetime
next_queue_position.item_became_available_time = (
self.current_run_datetime
)
logging.info(f'Next user in queue for item {next_queue_position.item.name} is {next_queue_position.username}')
logging.info(
f"Next user in queue for item {next_queue_position.item.name} is {next_queue_position.username}"
)
self._send_newly_available_mail(next_queue_position)
self.sql_session.commit()
self.sql_session.commit()

@ -1,10 +1,10 @@
from datetime import datetime, timedelta
from worblehat.models import (
BookcaseItem,
BookcaseItemBorrowing,
BookcaseItemBorrowingQueue,
DeadlineDaemonLastRunDatetime,
BookcaseItem,
BookcaseItemBorrowing,
BookcaseItemBorrowingQueue,
DeadlineDaemonLastRunDatetime,
)
from worblehat.services.config import Config
@ -13,105 +13,115 @@ from .seed_test_data import main as seed_test_data_main
def clear_db(sql_session):
sql_session.query(BookcaseItemBorrowingQueue).delete()
sql_session.query(BookcaseItemBorrowing).delete()
sql_session.query(DeadlineDaemonLastRunDatetime).delete()
sql_session.commit()
sql_session.query(BookcaseItemBorrowingQueue).delete()
sql_session.query(BookcaseItemBorrowing).delete()
sql_session.query(DeadlineDaemonLastRunDatetime).delete()
sql_session.commit()
# NOTE: feel free to change this function to suit your needs
# it's just a quick and dirty way to get some data into the database
# for testing the deadline daemon - oysteikt 2024
def main(sql_session):
borrow_warning_days = [timedelta(days=int(d)) for d in Config['deadline_daemon.warn_days_before_borrowing_deadline']]
queue_warning_days = [timedelta(days=int(d)) for d in Config['deadline_daemon.warn_days_before_expiring_queue_position_deadline']]
queue_expire_days = int(Config['deadline_daemon.days_before_queue_position_expires'])
clear_db(sql_session)
seed_test_data_main(sql_session)
books = sql_session.query(BookcaseItem).all()
last_run_datetime = datetime.now() - timedelta(days=16)
last_run = DeadlineDaemonLastRunDatetime(last_run_datetime)
sql_session.add(last_run)
# Create at least one item that is borrowed and not supposed to be returned yet
borrowing = BookcaseItemBorrowing(
item=books[0],
username='test_borrower_still_borrowing',
)
borrowing.start_time = last_run_datetime - timedelta(days=1)
borrowing.end_time = datetime.now() - timedelta(days=6)
sql_session.add(borrowing)
# Create at least one item that is borrowed and is supposed to be returned soon
borrowing = BookcaseItemBorrowing(
item=books[1],
username='test_borrower_return_soon',
)
borrowing.start_time = last_run_datetime - timedelta(days=1)
borrowing.end_time = datetime.now() - timedelta(days=2)
sql_session.add(borrowing)
# Create at least one item that is borrowed and is overdue
borrowing = BookcaseItemBorrowing(
item=books[2],
username='test_borrower_overdue',
)
borrowing.start_time = datetime.now() - timedelta(days=1)
borrowing.end_time = datetime.now() + timedelta(days=1)
sql_session.add(borrowing)
# Create at least one item that is in the queue and is not supposed to be borrowed yet
queue_item = BookcaseItemBorrowingQueue(
item=books[3],
username='test_queue_user_still_waiting',
)
queue_item.entered_queue_time = last_run_datetime - timedelta(days=1)
borrowing = BookcaseItemBorrowing(
item=books[3],
username='test_borrower_return_soon',
)
borrowing.start_time = last_run_datetime - timedelta(days=1)
borrowing.end_time = datetime.now() - timedelta(days=2)
sql_session.add(queue_item)
sql_session.add(borrowing)
# Create at least three items that is in the queue and two items were just returned
for i in range(3):
queue_item = BookcaseItemBorrowingQueue(
item=books[4 + i],
username=f'test_queue_user_{i}',
borrow_warning_days = [
timedelta(days=int(d))
for d in Config["deadline_daemon.warn_days_before_borrowing_deadline"]
]
queue_warning_days = [
timedelta(days=int(d))
for d in Config[
"deadline_daemon.warn_days_before_expiring_queue_position_deadline"
]
]
queue_expire_days = int(
Config["deadline_daemon.days_before_queue_position_expires"]
)
sql_session.add(queue_item)
for i in range(3):
clear_db(sql_session)
seed_test_data_main(sql_session)
books = sql_session.query(BookcaseItem).all()
last_run_datetime = datetime.now() - timedelta(days=16)
last_run = DeadlineDaemonLastRunDatetime(last_run_datetime)
sql_session.add(last_run)
# Create at least one item that is borrowed and not supposed to be returned yet
borrowing = BookcaseItemBorrowing(
item=books[4 + i],
username=f'test_borrower_returned_{i}',
item=books[0],
username="test_borrower_still_borrowing",
)
borrowing.start_time = last_run_datetime - timedelta(days=2)
borrowing.end_time = datetime.now() + timedelta(days=1)
if i != 2:
borrowing.delivered = datetime.now() - timedelta(days=1)
borrowing.start_time = last_run_datetime - timedelta(days=1)
borrowing.end_time = datetime.now() - timedelta(days=6)
sql_session.add(borrowing)
# Create at least one item that has been in the queue for so long that the queue position should expire
queue_item = BookcaseItemBorrowingQueue(
item=books[7],
username='test_queue_user_expired',
)
queue_item.entered_queue_time = datetime.now() - timedelta(days=15)
# Create at least one item that is borrowed and is supposed to be returned soon
borrowing = BookcaseItemBorrowing(
item=books[1],
username="test_borrower_return_soon",
)
borrowing.start_time = last_run_datetime - timedelta(days=1)
borrowing.end_time = datetime.now() - timedelta(days=2)
sql_session.add(borrowing)
# Create at least one item that has been in the queue for so long that the queue position should expire,
# but the queue person has already been notified
queue_item = BookcaseItemBorrowingQueue(
item=books[8],
username='test_queue_user_expired_notified',
)
queue_item.entered_queue_time = datetime.now() - timedelta(days=15)
# Create at least one item that is borrowed and is overdue
borrowing = BookcaseItemBorrowing(
item=books[2],
username="test_borrower_overdue",
)
borrowing.start_time = datetime.now() - timedelta(days=1)
borrowing.end_time = datetime.now() + timedelta(days=1)
sql_session.add(borrowing)
sql_session.commit()
# Create at least one item that is in the queue and is not supposed to be borrowed yet
queue_item = BookcaseItemBorrowingQueue(
item=books[3],
username="test_queue_user_still_waiting",
)
queue_item.entered_queue_time = last_run_datetime - timedelta(days=1)
borrowing = BookcaseItemBorrowing(
item=books[3],
username="test_borrower_return_soon",
)
borrowing.start_time = last_run_datetime - timedelta(days=1)
borrowing.end_time = datetime.now() - timedelta(days=2)
sql_session.add(queue_item)
sql_session.add(borrowing)
# Create at least three items that is in the queue and two items were just returned
for i in range(3):
queue_item = BookcaseItemBorrowingQueue(
item=books[4 + i],
username=f"test_queue_user_{i}",
)
sql_session.add(queue_item)
for i in range(3):
borrowing = BookcaseItemBorrowing(
item=books[4 + i],
username=f"test_borrower_returned_{i}",
)
borrowing.start_time = last_run_datetime - timedelta(days=2)
borrowing.end_time = datetime.now() + timedelta(days=1)
if i != 2:
borrowing.delivered = datetime.now() - timedelta(days=1)
sql_session.add(borrowing)
# Create at least one item that has been in the queue for so long that the queue position should expire
queue_item = BookcaseItemBorrowingQueue(
item=books[7],
username="test_queue_user_expired",
)
queue_item.entered_queue_time = datetime.now() - timedelta(days=15)
# Create at least one item that has been in the queue for so long that the queue position should expire,
# but the queue person has already been notified
queue_item = BookcaseItemBorrowingQueue(
item=books[8],
username="test_queue_user_expired_notified",
)
queue_item.entered_queue_time = datetime.now() - timedelta(days=15)
sql_session.commit()

@ -1,19 +1,18 @@
import csv
from pathlib import Path
from datetime import datetime, timedelta
from worblehat.models import (
Bookcase,
BookcaseItem,
BookcaseShelf,
MediaType,
Language,
Bookcase,
BookcaseItem,
BookcaseShelf,
MediaType,
Language,
)
from worblehat.services.config import Config
CSV_FILE = Path(__file__).parent.parent.parent / 'data' / 'arbeidsrom_smal_hylle_5.csv'
CSV_FILE = Path(__file__).parent.parent.parent / "data" / "arbeidsrom_smal_hylle_5.csv"
def clear_db(sql_session):
sql_session.query(BookcaseItem).delete()
@ -23,45 +22,46 @@ def clear_db(sql_session):
sql_session.query(Language).delete()
sql_session.commit()
def main(sql_session):
clear_db(sql_session)
media_type = MediaType(
name='Book',
description='A book',
name="Book",
description="A book",
)
sql_session.add(media_type)
language = Language(
name='Norwegian',
iso639_1_code='no',
name="Norwegian",
iso639_1_code="no",
)
sql_session.add(language)
seed_case = Bookcase(
name='seed_case',
description='test bookcase with test data',
name="seed_case",
description="test bookcase with test data",
)
sql_session.add(seed_case)
seed_shelf_1 = BookcaseShelf(
row=1,
column=1,
bookcase=seed_case,
description='test shelf with test data 1',
row=1,
column=1,
bookcase=seed_case,
description="test shelf with test data 1",
)
seed_shelf_2 = BookcaseShelf(
row=2,
column=1,
bookcase=seed_case,
description='test shelf with test data 2',
row=2,
column=1,
bookcase=seed_case,
description="test shelf with test data 2",
)
sql_session.add(seed_shelf_1)
sql_session.add(seed_shelf_2)
bookcase_items = []
with open(CSV_FILE) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
csv_reader = csv.reader(csv_file, delimiter=",")
next(csv_reader)
for row in csv_reader:

@ -2,10 +2,12 @@ from flask import Blueprint, render_template
main = Blueprint("main", __name__, template_folder="main")
@main.route('/')
@main.route("/")
def index():
return render_template("main/index.html")
@main.route("/login")
def login():
return render_template("main/login.html")
return render_template("main/login.html")

@ -1,3 +1,3 @@
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
db = SQLAlchemy()

@ -10,18 +10,19 @@ from worblehat.services.config import Config
from .blueprints.main import main
from .database import db
def create_app(args: dict[str, any] | None = None):
app = Flask(__name__)
app.config.update(Config['flask'])
app.config.update(Config["flask"])
app.config.update(Config._config)
app.config['SQLALCHEMY_DATABASE_URI'] = Config.db_string()
app.config['SQLALCHEMY_ECHO'] = Config['logging.debug_sql']
app.config["SQLALCHEMY_DATABASE_URI"] = Config.db_string()
app.config["SQLALCHEMY_ECHO"] = Config["logging.debug_sql"]
db.init_app(app)
with app.app_context():
if not inspect(db.engine).has_table('Bookcase'):
if not inspect(db.engine).has_table("Bookcase"):
Base.metadata.create_all(db.engine)
seed_data()
@ -31,12 +32,13 @@ def create_app(args: dict[str, any] | None = None):
return app
def configure_admin(app):
admin = Admin(app, name='Worblehat', template_mode='bootstrap3')
admin = Admin(app, name="Worblehat", template_mode="bootstrap3")
admin.add_view(ModelView(Author, db.session))
admin.add_view(ModelView(Bookcase, db.session))
admin.add_view(ModelView(BookcaseItem, db.session))
admin.add_view(ModelView(BookcaseShelf, db.session))
admin.add_view(ModelView(Category, db.session))
admin.add_view(ModelView(Language, db.session))
admin.add_view(ModelView(MediaType, db.session))
admin.add_view(ModelView(MediaType, db.session))

@ -1,18 +1,19 @@
from werkzeug import run_simple
from worblehat.services.config import Config
from .flaskapp import create_app
def main():
app = create_app()
run_simple(
hostname = 'localhost',
port = 5000,
application = app,
use_debugger = True,
use_reloader = True,
hostname="localhost",
port=5000,
application=app,
use_debugger=True,
use_reloader=True,
)
if __name__ == '__main__':
main()
if __name__ == "__main__":
main()

@ -1,8 +1,10 @@
from .flaskapp import create_app
def main():
app = create_app()
app.run()
if __name__ == '__main__':
if __name__ == "__main__":
main()

@ -18,7 +18,8 @@ from .flaskapp.wsgi_prod import main as flask_prod_main
def _print_version() -> None:
from worblehat import __version__
print(f'Worblehat version {__version__}')
print(f"Worblehat version {__version__}")
def _connect_to_database(**engine_args) -> Session:
@ -26,7 +27,7 @@ def _connect_to_database(**engine_args) -> Session:
engine = create_engine(Config.db_string(), **engine_args)
sql_session = Session(engine)
except Exception as err:
print('Error: could not connect to database.')
print("Error: could not connect to database.")
print(err)
exit(1)
@ -38,51 +39,55 @@ def main():
args = arg_parser.parse_args()
Config.load_configuration(vars(args))
if Config['logging.debug']:
logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
if Config["logging.debug"]:
logging.basicConfig(encoding="utf-8", level=logging.DEBUG)
else:
logging.basicConfig(encoding='utf-8', level=logging.INFO)
logging.basicConfig(encoding="utf-8", level=logging.INFO)
if args.version:
_print_version()
exit(0)
if args.print_config:
print(f'Configuration:\n{pformat(vars(args))}')
print(f"Configuration:\n{pformat(vars(args))}")
exit(0)
if args.command == 'deadline-daemon':
sql_session = _connect_to_database(echo=Config['logging.debug_sql'])
if args.command == "deadline-daemon":
sql_session = _connect_to_database(echo=Config["logging.debug_sql"])
DeadlineDaemon(sql_session).run()
exit(0)
if args.command == 'cli':
sql_session = _connect_to_database(echo=Config['logging.debug_sql'])
if args.command == "cli":
sql_session = _connect_to_database(echo=Config["logging.debug_sql"])
WorblehatCli.run_with_safe_exit_wrapper(sql_session)
exit(0)
if args.command == 'devscripts':
sql_session = _connect_to_database(echo=Config['logging.debug_sql'])
if args.script == 'seed-content-for-deadline-daemon':
if args.command == "devscripts":
sql_session = _connect_to_database(echo=Config["logging.debug_sql"])
if args.script == "seed-content-for-deadline-daemon":
from .devscripts.seed_content_for_deadline_daemon import main
main(sql_session)
elif args.script == 'seed-test-data':
elif args.script == "seed-test-data":
from .devscripts.seed_test_data import main
main(sql_session)
else:
print(devscripts_arg_parser.format_help())
exit(1)
exit(0)
if args.command == 'flask-dev':
if args.command == "flask-dev":
flask_dev_main()
exit(0)
if args.command == 'flask-prod':
if Config['logging.debug'] or Config['logging.debug_sql']:
logging.warn('Debug mode is enabled for the production server. This is not recommended.')
if args.command == "flask-prod":
if Config["logging.debug"] or Config["logging.debug_sql"]:
logging.warn(
"Debug mode is enabled for the production server. This is not recommended."
)
flask_prod_main()
exit(0)
print(arg_parser.format_help())
exit(1)
exit(1)

@ -1,13 +1,8 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
Integer,
ForeignKey,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
@ -21,14 +16,15 @@ from .xref_tables import Item_Author
if TYPE_CHECKING:
from .BookcaseItem import BookcaseItem
class Author(Base, UidMixin, UniqueNameMixin):
items: Mapped[set[BookcaseItem]] = relationship(
secondary = Item_Author.__table__,
back_populates = 'authors',
secondary=Item_Author.__table__,
back_populates="authors",
)
def __init__(
self,
name: str,
):
self.name = name
self.name = name

@ -9,6 +9,7 @@ from sqlalchemy.orm.collections import (
InstrumentedSet,
)
class Base(DeclarativeBase):
metadata = MetaData(
naming_convention={
@ -16,7 +17,7 @@ class Base(DeclarativeBase):
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_`%(constraint_name)s`",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
"pk": "pk_%(table_name)s",
}
)
@ -26,15 +27,18 @@ class Base(DeclarativeBase):
def __repr__(self) -> str:
columns = ", ".join(
f"{k}={repr(v)}" for k, v in self.__dict__.items() if not any([
k.startswith("_"),
# Ensure that we don't try to print out the entire list of
# relationships, which could create an infinite loop
isinstance(v, Base),
isinstance(v, InstrumentedList),
isinstance(v, InstrumentedSet),
isinstance(v, InstrumentedDict),
])
f"{k}={repr(v)}"
for k, v in self.__dict__.items()
if not any(
[
k.startswith("_"),
# Ensure that we don't try to print out the entire list of
# relationships, which could create an infinite loop
isinstance(v, Base),
isinstance(v, InstrumentedList),
isinstance(v, InstrumentedSet),
isinstance(v, InstrumentedDict),
]
)
)
return f"<{self.__class__.__name__}({columns})>"
return f"<{self.__class__.__name__}({columns})>"

@ -13,13 +13,15 @@ from .mixins import (
UidMixin,
UniqueNameMixin,
)
if TYPE_CHECKING:
from .BookcaseShelf import BookcaseShelf
class Bookcase(Base, UidMixin, UniqueNameMixin):
description: Mapped[str | None] = mapped_column(Text)
shelfs: Mapped[list[BookcaseShelf]] = relationship(back_populates='bookcase')
shelfs: Mapped[list[BookcaseShelf]] = relationship(back_populates="bookcase")
def __init__(
self,
@ -32,6 +34,5 @@ class Bookcase(Base, UidMixin, UniqueNameMixin):
def short_str(self) -> str:
result = self.name
if self.description is not None:
result += f' [{self.description}]'
result += f" [{self.description}]"
return result

@ -2,11 +2,10 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
ForeignKey,
Integer,
SmallInteger,
String,
Text,
ForeignKey,
SmallInteger,
String,
Text,
)
from sqlalchemy.orm import (
Mapped,
@ -17,12 +16,12 @@ from sqlalchemy.orm import (
from .Base import Base
from .mixins import (
UidMixin,
UniqueNameMixin,
)
from .xref_tables import (
Item_Category,
Item_Author,
)
if TYPE_CHECKING:
from .Author import Author
from .BookcaseItemBorrowing import BookcaseItemBorrowing
@ -34,36 +33,39 @@ if TYPE_CHECKING:
from worblehat.flaskapp.database import db
class BookcaseItem(Base, UidMixin):
isbn: Mapped[int] = mapped_column(String, unique=True, index=True)
name: Mapped[str] = mapped_column(Text, index=True)
owner: Mapped[str] = mapped_column(String, default='PVV')
owner: Mapped[str] = mapped_column(String, default="PVV")
amount: Mapped[int] = mapped_column(SmallInteger, default=1)
fk_media_type_uid: Mapped[int] = mapped_column(ForeignKey('MediaType.uid'))
fk_bookcase_shelf_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseShelf.uid'))
fk_language_uid: Mapped[int | None] = mapped_column(ForeignKey('Language.uid'))
fk_media_type_uid: Mapped[int] = mapped_column(ForeignKey("MediaType.uid"))
fk_bookcase_shelf_uid: Mapped[int] = mapped_column(ForeignKey("BookcaseShelf.uid"))
fk_language_uid: Mapped[int | None] = mapped_column(ForeignKey("Language.uid"))
media_type: Mapped[MediaType] = relationship(back_populates='items')
shelf: Mapped[BookcaseShelf] = relationship(back_populates='items')
media_type: Mapped[MediaType] = relationship(back_populates="items")
shelf: Mapped[BookcaseShelf] = relationship(back_populates="items")
language: Mapped[Language] = relationship()
borrowings: Mapped[set[BookcaseItemBorrowing]] = relationship(back_populates='item')
borrowing_queue: Mapped[set[BookcaseItemBorrowingQueue]] = relationship(back_populates='item')
borrowings: Mapped[set[BookcaseItemBorrowing]] = relationship(back_populates="item")
borrowing_queue: Mapped[set[BookcaseItemBorrowingQueue]] = relationship(
back_populates="item"
)
categories: Mapped[set[Category]] = relationship(
secondary = Item_Category.__table__,
back_populates = 'items',
secondary=Item_Category.__table__,
back_populates="items",
)
authors: Mapped[set[Author]] = relationship(
secondary = Item_Author.__table__,
back_populates = 'items',
secondary=Item_Author.__table__,
back_populates="items",
)
def __init__(
self,
name: str,
isbn: int | None = None,
owner: str = 'PVV',
owner: str = "PVV",
):
self.name = name
self.isbn = isbn
@ -76,4 +78,4 @@ class BookcaseItem(Base, UidMixin):
This method defaults to using the flask_sqlalchemy session.
It will not work outside of a request context, unless another session is provided.
"""
return sql_session.query(cls).where(cls.isbn == isbn).one_or_none()
return sql_session.query(cls).where(cls.isbn == isbn).one_or_none()

@ -3,7 +3,6 @@ from typing import TYPE_CHECKING
from datetime import datetime, timedelta
from sqlalchemy import (
Boolean,
ForeignKey,
String,
DateTime,
@ -16,18 +15,24 @@ from sqlalchemy.orm import (
from .Base import Base
from .mixins import UidMixin
if TYPE_CHECKING:
from .BookcaseItem import BookcaseItem
class BookcaseItemBorrowing(Base, UidMixin):
username: Mapped[str] = mapped_column(String)
start_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now())
end_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now() + timedelta(days=30))
end_time: Mapped[datetime] = mapped_column(
DateTime, default=datetime.now() + timedelta(days=30)
)
delivered: Mapped[datetime | None] = mapped_column(DateTime, default=None)
fk_bookcase_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), index=True)
fk_bookcase_item_uid: Mapped[int] = mapped_column(
ForeignKey("BookcaseItem.uid"), index=True
)
item: Mapped[BookcaseItem] = relationship(back_populates='borrowings')
item: Mapped[BookcaseItem] = relationship(back_populates="borrowings")
def __init__(
self,
@ -37,4 +42,4 @@ class BookcaseItemBorrowing(Base, UidMixin):
self.username = username
self.item = item
self.start_time = datetime.now()
self.end_time = datetime.now() + timedelta(days=30)
self.end_time = datetime.now() + timedelta(days=30)

@ -16,18 +16,24 @@ from sqlalchemy.orm import (
from .Base import Base
from .mixins import UidMixin
if TYPE_CHECKING:
from .BookcaseItem import BookcaseItem
class BookcaseItemBorrowingQueue(Base, UidMixin):
username: Mapped[str] = mapped_column(String)
entered_queue_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now())
entered_queue_time: Mapped[datetime] = mapped_column(
DateTime, default=datetime.now()
)
item_became_available_time: Mapped[datetime | None] = mapped_column(DateTime)
expired = mapped_column(Boolean, default=False)
fk_bookcase_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), index=True)
fk_bookcase_item_uid: Mapped[int] = mapped_column(
ForeignKey("BookcaseItem.uid"), index=True
)
item: Mapped[BookcaseItem] = relationship(back_populates='borrowing_queue')
item: Mapped[BookcaseItem] = relationship(back_populates="borrowing_queue")
def __init__(
self,
@ -36,4 +42,4 @@ class BookcaseItemBorrowingQueue(Base, UidMixin):
):
self.username = username
self.item = item
self.entered_queue_time = datetime.now()
self.entered_queue_time = datetime.now()

@ -2,7 +2,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
Integer,
ForeignKey,
SmallInteger,
Text,
@ -16,6 +15,7 @@ from sqlalchemy.orm import (
from .Base import Base
from .mixins import UidMixin
if TYPE_CHECKING:
from .Bookcase import Bookcase
from .BookcaseItem import BookcaseItem
@ -23,22 +23,23 @@ if TYPE_CHECKING:
# NOTE: Booshelfs are 0 indexed for both rows and columns,
# where cell 0-0 is placed in the lower right corner.
class BookcaseShelf(Base, UidMixin):
__table_args__ = (
UniqueConstraint(
'column',
'fk_bookcase_uid',
'row',
"column",
"fk_bookcase_uid",
"row",
),
)
description: Mapped[str | None] = mapped_column(Text)
row: Mapped[int] = mapped_column(SmallInteger)
column: Mapped[int] = mapped_column(SmallInteger)
fk_bookcase_uid: Mapped[int] = mapped_column(ForeignKey('Bookcase.uid'))
fk_bookcase_uid: Mapped[int] = mapped_column(ForeignKey("Bookcase.uid"))
bookcase: Mapped[Bookcase] = relationship(back_populates='shelfs')
items: Mapped[set[BookcaseItem]] = relationship(back_populates='shelf')
bookcase: Mapped[Bookcase] = relationship(back_populates="shelfs")
items: Mapped[set[BookcaseItem]] = relationship(back_populates="shelf")
def __init__(
self,
@ -53,7 +54,7 @@ class BookcaseShelf(Base, UidMixin):
self.description = description
def short_str(self) -> str:
result = f'{self.column}-{self.row}'
result = f"{self.column}-{self.row}"
if self.description is not None:
result += f' [{self.description}]'
return result
result += f" [{self.description}]"
return result

@ -14,15 +14,17 @@ from .mixins import (
UniqueNameMixin,
)
from .xref_tables import Item_Category
if TYPE_CHECKING:
from .BookcaseItem import BookcaseItem
class Category(Base, UidMixin, UniqueNameMixin):
description: Mapped[str | None] = mapped_column(Text)
items: Mapped[set[BookcaseItem]] = relationship(
secondary=Item_Category.__table__,
back_populates='categories',
back_populates="categories",
)
def __init__(
@ -31,4 +33,4 @@ class Category(Base, UidMixin, UniqueNameMixin):
description: str | None = None,
):
self.name = name
self.description = description
self.description = description

@ -12,11 +12,12 @@ from sqlalchemy.orm import (
from .Base import Base
class DeadlineDaemonLastRunDatetime(Base):
__table_args__ = (
CheckConstraint(
'uid = true',
name = 'single_row_only',
"uid = true",
name="single_row_only",
),
)
uid: Mapped[bool] = mapped_column(Boolean, primary_key=True, default=True)
@ -24,4 +25,4 @@ class DeadlineDaemonLastRunDatetime(Base):
def __init__(self, time: datetime | None = None):
if time is not None:
self.time = time
self.time = time

@ -11,6 +11,7 @@ from sqlalchemy.orm import (
from .Base import Base
from .mixins import UidMixin, UniqueNameMixin
class Language(Base, UidMixin, UniqueNameMixin):
iso639_1_code: Mapped[str] = mapped_column(String(2), unique=True, index=True)

@ -10,13 +10,15 @@ from sqlalchemy.orm import (
from .Base import Base
from .mixins import UidMixin, UniqueNameMixin
if TYPE_CHECKING:
from .BookcaseItem import BookcaseItem
class MediaType(Base, UidMixin, UniqueNameMixin):
description: Mapped[str | None] = mapped_column(Text)
items: Mapped[set[BookcaseItem]] = relationship(back_populates='media_type')
items: Mapped[set[BookcaseItem]] = relationship(back_populates="media_type")
def __init__(
self,
@ -25,5 +27,3 @@ class MediaType(Base, UidMixin, UniqueNameMixin):
):
self.name = name
self.description = description

@ -8,4 +8,18 @@ from .BookcaseShelf import BookcaseShelf
from .Category import Category
from .DeadlineDaemonLastRunDatetime import DeadlineDaemonLastRunDatetime
from .Language import Language
from .MediaType import MediaType
from .MediaType import MediaType
__all__ = [
"Author",
"Base",
"Bookcase",
"BookcaseItem",
"BookcaseItemBorrowing",
"BookcaseItemBorrowingQueue",
"BookcaseShelf",
"Category",
"DeadlineDaemonLastRunDatetime",
"Language",
"MediaType",
]

@ -1,5 +1,4 @@
from alembic import context
from flask import current_app
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
@ -14,7 +13,8 @@ if config.config_file_name is not None:
Config.load_configuration({})
config.set_main_option('sqlalchemy.url', Config.db_string())
config.set_main_option("sqlalchemy.url", Config.db_string())
# This will make sure alembic doesn't generate empty migrations
# https://stackoverflow.com/questions/70203927/how-to-prevent-alembic-revision-autogenerate-from-making-revision-file-if-it-h
@ -23,7 +23,8 @@ def _process_revision_directives(context, revision, directives):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
print('No changes in schema detected. Not generating migration.')
print("No changes in schema detected. Not generating migration.")
def run_migrations_online() -> None:
connectable = engine_from_config(
@ -36,11 +37,9 @@ def run_migrations_online() -> None:
context.configure(
connection=connection,
target_metadata=Base.metadata,
# Extended type checking with alembic when generating migrations
# https://alembic.sqlalchemy.org/en/latest/autogenerate.html#what-does-autogenerate-detect-and-what-does-it-not-detect
compare_type=True,
# This is required for ALTER TABLE to work with sqlite.
# It should have no effect on postgreSQL
# https://alembic.sqlalchemy.org/en/latest/batch.html
@ -51,6 +50,7 @@ def run_migrations_online() -> None:
with context.begin_transaction():
context.run_migrations()
# We don't have any good reasons to generate raw sql migrations,
# so the `run_migrations_offline` has been removed
run_migrations_online()

@ -1,16 +1,17 @@
"""initial_migration
Revision ID: 7dfbf8a8dec8
Revises:
Revises:
Create Date: 2024-07-31 21:07:13.434012
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7dfbf8a8dec8'
revision = "7dfbf8a8dec8"
down_revision = None
branch_labels = None
depends_on = None
@ -18,166 +19,243 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('Author',
sa.Column('uid', sa.Integer(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_Author'))
op.create_table(
"Author",
sa.Column("uid", sa.Integer(), nullable=False),
sa.Column("name", sa.Text(), nullable=False),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_Author")),
)
with op.batch_alter_table('Author', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_Author_name'), ['name'], unique=True)
with op.batch_alter_table("Author", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_Author_name"), ["name"], unique=True)
op.create_table('Bookcase',
sa.Column('description', sa.Text(), nullable=True),
sa.Column('uid', sa.Integer(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_Bookcase'))
op.create_table(
"Bookcase",
sa.Column("description", sa.Text(), nullable=True),
sa.Column("uid", sa.Integer(), nullable=False),
sa.Column("name", sa.Text(), nullable=False),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_Bookcase")),
)
with op.batch_alter_table('Bookcase', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_Bookcase_name'), ['name'], unique=True)
with op.batch_alter_table("Bookcase", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_Bookcase_name"), ["name"], unique=True)
op.create_table('Category',
sa.Column('description', sa.Text(), nullable=True),
sa.Column('uid', sa.Integer(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_Category'))
op.create_table(
"Category",
sa.Column("description", sa.Text(), nullable=True),
sa.Column("uid", sa.Integer(), nullable=False),
sa.Column("name", sa.Text(), nullable=False),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_Category")),
)
with op.batch_alter_table('Category', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_Category_name'), ['name'], unique=True)
with op.batch_alter_table("Category", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_Category_name"), ["name"], unique=True)
op.create_table('DeadlineDaemonLastRunDatetime',
sa.Column('uid', sa.Boolean(), nullable=False),
sa.Column('time', sa.DateTime(), nullable=False),
sa.CheckConstraint('uid = true', name=op.f('ck_DeadlineDaemonLastRunDatetime_`single_row_only`')),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_DeadlineDaemonLastRunDatetime'))
op.create_table(
"DeadlineDaemonLastRunDatetime",
sa.Column("uid", sa.Boolean(), nullable=False),
sa.Column("time", sa.DateTime(), nullable=False),
sa.CheckConstraint(
"uid = true",
name=op.f("ck_DeadlineDaemonLastRunDatetime_`single_row_only`"),
),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_DeadlineDaemonLastRunDatetime")),
)
op.create_table('Language',
sa.Column('iso639_1_code', sa.String(length=2), nullable=False),
sa.Column('uid', sa.Integer(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_Language'))
op.create_table(
"Language",
sa.Column("iso639_1_code", sa.String(length=2), nullable=False),
sa.Column("uid", sa.Integer(), nullable=False),
sa.Column("name", sa.Text(), nullable=False),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_Language")),
)
with op.batch_alter_table('Language', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_Language_iso639_1_code'), ['iso639_1_code'], unique=True)
batch_op.create_index(batch_op.f('ix_Language_name'), ['name'], unique=True)
with op.batch_alter_table("Language", schema=None) as batch_op:
batch_op.create_index(
batch_op.f("ix_Language_iso639_1_code"), ["iso639_1_code"], unique=True
)
batch_op.create_index(batch_op.f("ix_Language_name"), ["name"], unique=True)
op.create_table('MediaType',
sa.Column('description', sa.Text(), nullable=True),
sa.Column('uid', sa.Integer(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_MediaType'))
op.create_table(
"MediaType",
sa.Column("description", sa.Text(), nullable=True),
sa.Column("uid", sa.Integer(), nullable=False),
sa.Column("name", sa.Text(), nullable=False),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_MediaType")),
)
with op.batch_alter_table('MediaType', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_MediaType_name'), ['name'], unique=True)
with op.batch_alter_table("MediaType", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_MediaType_name"), ["name"], unique=True)
op.create_table('BookcaseShelf',
sa.Column('description', sa.Text(), nullable=True),
sa.Column('row', sa.SmallInteger(), nullable=False),
sa.Column('column', sa.SmallInteger(), nullable=False),
sa.Column('fk_bookcase_uid', sa.Integer(), nullable=False),
sa.Column('uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_bookcase_uid'], ['Bookcase.uid'], name=op.f('fk_BookcaseShelf_fk_bookcase_uid_Bookcase')),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseShelf')),
sa.UniqueConstraint('column', 'fk_bookcase_uid', 'row', name=op.f('uq_BookcaseShelf_column'))
op.create_table(
"BookcaseShelf",
sa.Column("description", sa.Text(), nullable=True),
sa.Column("row", sa.SmallInteger(), nullable=False),
sa.Column("column", sa.SmallInteger(), nullable=False),
sa.Column("fk_bookcase_uid", sa.Integer(), nullable=False),
sa.Column("uid", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["fk_bookcase_uid"],
["Bookcase.uid"],
name=op.f("fk_BookcaseShelf_fk_bookcase_uid_Bookcase"),
),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_BookcaseShelf")),
sa.UniqueConstraint(
"column", "fk_bookcase_uid", "row", name=op.f("uq_BookcaseShelf_column")
),
)
op.create_table('BookcaseItem',
sa.Column('isbn', sa.String(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.Column('owner', sa.String(), nullable=False),
sa.Column('amount', sa.SmallInteger(), nullable=False),
sa.Column('fk_media_type_uid', sa.Integer(), nullable=False),
sa.Column('fk_bookcase_shelf_uid', sa.Integer(), nullable=False),
sa.Column('fk_language_uid', sa.Integer(), nullable=True),
sa.Column('uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_bookcase_shelf_uid'], ['BookcaseShelf.uid'], name=op.f('fk_BookcaseItem_fk_bookcase_shelf_uid_BookcaseShelf')),
sa.ForeignKeyConstraint(['fk_language_uid'], ['Language.uid'], name=op.f('fk_BookcaseItem_fk_language_uid_Language')),
sa.ForeignKeyConstraint(['fk_media_type_uid'], ['MediaType.uid'], name=op.f('fk_BookcaseItem_fk_media_type_uid_MediaType')),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseItem'))
op.create_table(
"BookcaseItem",
sa.Column("isbn", sa.String(), nullable=False),
sa.Column("name", sa.Text(), nullable=False),
sa.Column("owner", sa.String(), nullable=False),
sa.Column("amount", sa.SmallInteger(), nullable=False),
sa.Column("fk_media_type_uid", sa.Integer(), nullable=False),
sa.Column("fk_bookcase_shelf_uid", sa.Integer(), nullable=False),
sa.Column("fk_language_uid", sa.Integer(), nullable=True),
sa.Column("uid", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["fk_bookcase_shelf_uid"],
["BookcaseShelf.uid"],
name=op.f("fk_BookcaseItem_fk_bookcase_shelf_uid_BookcaseShelf"),
),
sa.ForeignKeyConstraint(
["fk_language_uid"],
["Language.uid"],
name=op.f("fk_BookcaseItem_fk_language_uid_Language"),
),
sa.ForeignKeyConstraint(
["fk_media_type_uid"],
["MediaType.uid"],
name=op.f("fk_BookcaseItem_fk_media_type_uid_MediaType"),
),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_BookcaseItem")),
)
with op.batch_alter_table('BookcaseItem', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_BookcaseItem_isbn'), ['isbn'], unique=True)
batch_op.create_index(batch_op.f('ix_BookcaseItem_name'), ['name'], unique=False)
with op.batch_alter_table("BookcaseItem", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_BookcaseItem_isbn"), ["isbn"], unique=True)
batch_op.create_index(
batch_op.f("ix_BookcaseItem_name"), ["name"], unique=False
)
op.create_table('BookcaseItemBorrowing',
sa.Column('username', sa.String(), nullable=False),
sa.Column('start_time', sa.DateTime(), nullable=False),
sa.Column('end_time', sa.DateTime(), nullable=False),
sa.Column('delivered', sa.DateTime(), nullable=True),
sa.Column('fk_bookcase_item_uid', sa.Integer(), nullable=False),
sa.Column('uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_bookcase_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_BookcaseItemBorrowing_fk_bookcase_item_uid_BookcaseItem')),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseItemBorrowing'))
op.create_table(
"BookcaseItemBorrowing",
sa.Column("username", sa.String(), nullable=False),
sa.Column("start_time", sa.DateTime(), nullable=False),
sa.Column("end_time", sa.DateTime(), nullable=False),
sa.Column("delivered", sa.DateTime(), nullable=True),
sa.Column("fk_bookcase_item_uid", sa.Integer(), nullable=False),
sa.Column("uid", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["fk_bookcase_item_uid"],
["BookcaseItem.uid"],
name=op.f("fk_BookcaseItemBorrowing_fk_bookcase_item_uid_BookcaseItem"),
),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_BookcaseItemBorrowing")),
)
with op.batch_alter_table('BookcaseItemBorrowing', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_BookcaseItemBorrowing_fk_bookcase_item_uid'), ['fk_bookcase_item_uid'], unique=False)
with op.batch_alter_table("BookcaseItemBorrowing", schema=None) as batch_op:
batch_op.create_index(
batch_op.f("ix_BookcaseItemBorrowing_fk_bookcase_item_uid"),
["fk_bookcase_item_uid"],
unique=False,
)
op.create_table('BookcaseItemBorrowingQueue',
sa.Column('username', sa.String(), nullable=False),
sa.Column('entered_queue_time', sa.DateTime(), nullable=False),
sa.Column('item_became_available_time', sa.DateTime(), nullable=True),
sa.Column('expired', sa.Boolean(), nullable=True),
sa.Column('fk_bookcase_item_uid', sa.Integer(), nullable=False),
sa.Column('uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_bookcase_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_BookcaseItemBorrowingQueue_fk_bookcase_item_uid_BookcaseItem')),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseItemBorrowingQueue'))
op.create_table(
"BookcaseItemBorrowingQueue",
sa.Column("username", sa.String(), nullable=False),
sa.Column("entered_queue_time", sa.DateTime(), nullable=False),
sa.Column("item_became_available_time", sa.DateTime(), nullable=True),
sa.Column("expired", sa.Boolean(), nullable=True),
sa.Column("fk_bookcase_item_uid", sa.Integer(), nullable=False),
sa.Column("uid", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["fk_bookcase_item_uid"],
["BookcaseItem.uid"],
name=op.f(
"fk_BookcaseItemBorrowingQueue_fk_bookcase_item_uid_BookcaseItem"
),
),
sa.PrimaryKeyConstraint("uid", name=op.f("pk_BookcaseItemBorrowingQueue")),
)
with op.batch_alter_table('BookcaseItemBorrowingQueue', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid'), ['fk_bookcase_item_uid'], unique=False)
with op.batch_alter_table("BookcaseItemBorrowingQueue", schema=None) as batch_op:
batch_op.create_index(
batch_op.f("ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid"),
["fk_bookcase_item_uid"],
unique=False,
)
op.create_table('Item_Author',
sa.Column('fk_item_uid', sa.Integer(), nullable=False),
sa.Column('fk_author_uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_author_uid'], ['Author.uid'], name=op.f('fk_Item_Author_fk_author_uid_Author')),
sa.ForeignKeyConstraint(['fk_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_Item_Author_fk_item_uid_BookcaseItem')),
sa.PrimaryKeyConstraint('fk_item_uid', 'fk_author_uid', name=op.f('pk_Item_Author'))
op.create_table(
"Item_Author",
sa.Column("fk_item_uid", sa.Integer(), nullable=False),
sa.Column("fk_author_uid", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["fk_author_uid"],
["Author.uid"],
name=op.f("fk_Item_Author_fk_author_uid_Author"),
),
sa.ForeignKeyConstraint(
["fk_item_uid"],
["BookcaseItem.uid"],
name=op.f("fk_Item_Author_fk_item_uid_BookcaseItem"),
),
sa.PrimaryKeyConstraint(
"fk_item_uid", "fk_author_uid", name=op.f("pk_Item_Author")
),
)
op.create_table('Item_Category',
sa.Column('fk_item_uid', sa.Integer(), nullable=False),
sa.Column('fk_category_uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_category_uid'], ['Category.uid'], name=op.f('fk_Item_Category_fk_category_uid_Category')),
sa.ForeignKeyConstraint(['fk_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_Item_Category_fk_item_uid_BookcaseItem')),
sa.PrimaryKeyConstraint('fk_item_uid', 'fk_category_uid', name=op.f('pk_Item_Category'))
op.create_table(
"Item_Category",
sa.Column("fk_item_uid", sa.Integer(), nullable=False),
sa.Column("fk_category_uid", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["fk_category_uid"],
["Category.uid"],
name=op.f("fk_Item_Category_fk_category_uid_Category"),
),
sa.ForeignKeyConstraint(
["fk_item_uid"],
["BookcaseItem.uid"],
name=op.f("fk_Item_Category_fk_item_uid_BookcaseItem"),
),
sa.PrimaryKeyConstraint(
"fk_item_uid", "fk_category_uid", name=op.f("pk_Item_Category")
),
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('Item_Category')
op.drop_table('Item_Author')
with op.batch_alter_table('BookcaseItemBorrowingQueue', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid'))
op.drop_table("Item_Category")
op.drop_table("Item_Author")
with op.batch_alter_table("BookcaseItemBorrowingQueue", schema=None) as batch_op:
batch_op.drop_index(
batch_op.f("ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid")
)
op.drop_table('BookcaseItemBorrowingQueue')
with op.batch_alter_table('BookcaseItemBorrowing', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_BookcaseItemBorrowing_fk_bookcase_item_uid'))
op.drop_table("BookcaseItemBorrowingQueue")
with op.batch_alter_table("BookcaseItemBorrowing", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_BookcaseItemBorrowing_fk_bookcase_item_uid"))
op.drop_table('BookcaseItemBorrowing')
with op.batch_alter_table('BookcaseItem', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_BookcaseItem_name'))
batch_op.drop_index(batch_op.f('ix_BookcaseItem_isbn'))
op.drop_table("BookcaseItemBorrowing")
with op.batch_alter_table("BookcaseItem", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_BookcaseItem_name"))
batch_op.drop_index(batch_op.f("ix_BookcaseItem_isbn"))
op.drop_table('BookcaseItem')
op.drop_table('BookcaseShelf')
with op.batch_alter_table('MediaType', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_MediaType_name'))
op.drop_table("BookcaseItem")
op.drop_table("BookcaseShelf")
with op.batch_alter_table("MediaType", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_MediaType_name"))
op.drop_table('MediaType')
with op.batch_alter_table('Language', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_Language_name'))
batch_op.drop_index(batch_op.f('ix_Language_iso639_1_code'))
op.drop_table("MediaType")
with op.batch_alter_table("Language", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_Language_name"))
batch_op.drop_index(batch_op.f("ix_Language_iso639_1_code"))
op.drop_table('Language')
op.drop_table('DeadlineDaemonLastRunDatetime')
with op.batch_alter_table('Category', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_Category_name'))
op.drop_table("Language")
op.drop_table("DeadlineDaemonLastRunDatetime")
with op.batch_alter_table("Category", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_Category_name"))
op.drop_table('Category')
with op.batch_alter_table('Bookcase', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_Bookcase_name'))
op.drop_table("Category")
with op.batch_alter_table("Bookcase", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_Bookcase_name"))
op.drop_table('Bookcase')
with op.batch_alter_table('Author', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_Author_name'))
op.drop_table("Bookcase")
with op.batch_alter_table("Author", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_Author_name"))
op.drop_table('Author')
op.drop_table("Author")
# ### end Alembic commands ###

@ -9,6 +9,7 @@ from sqlalchemy.orm import (
from worblehat.flaskapp.database import db
class UidMixin(object):
uid: Mapped[int] = mapped_column(Integer, primary_key=True)
@ -28,4 +29,4 @@ class UidMixin(object):
This method defaults to using the flask_sqlalchemy session.
It will not work outside of a request context, unless another session is provided.
"""
return sql_session.query(cls).where(cls.uid == uid).one_or_404()
return sql_session.query(cls).where(cls.uid == uid).one_or_404()

@ -9,6 +9,7 @@ from sqlalchemy.orm import (
from worblehat.flaskapp.database import db
class UniqueNameMixin(object):
name: Mapped[str] = mapped_column(Text, unique=True, index=True)
@ -28,4 +29,4 @@ class UniqueNameMixin(object):
This method defaults to using the flask_sqlalchemy session.
It will not work outside of a request context, unless another session is provided.
"""
return sql_session.query(cls).where(cls.name == name).one_or_404()
return sql_session.query(cls).where(cls.name == name).one_or_404()

@ -1,6 +1,7 @@
from sqlalchemy.orm import declared_attr
class XrefMixin(object):
@declared_attr.directive
def __tablename__(cls) -> str:
return f'xref_{cls.__name__.lower()}'
return f"xref_{cls.__name__.lower()}"

@ -1,2 +1,4 @@
from .UidMixin import UidMixin
from .UniqueNameMixin import UniqueNameMixin
__all__ = ["UidMixin", "UniqueNameMixin"]

@ -1,5 +1,4 @@
from sqlalchemy import (
Integer,
ForeignKey,
)
from sqlalchemy.orm import (
@ -10,6 +9,11 @@ from sqlalchemy.orm import (
from ..Base import Base
from ..mixins.XrefMixin import XrefMixin
class Item_Author(Base, XrefMixin):
fk_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), primary_key=True)
fk_author_uid: Mapped[int] = mapped_column(ForeignKey('Author.uid'), primary_key=True)
fk_item_uid: Mapped[int] = mapped_column(
ForeignKey("BookcaseItem.uid"), primary_key=True
)
fk_author_uid: Mapped[int] = mapped_column(
ForeignKey("Author.uid"), primary_key=True
)

@ -1,5 +1,4 @@
from sqlalchemy import (
Integer,
ForeignKey,
)
from sqlalchemy.orm import (
@ -10,6 +9,11 @@ from sqlalchemy.orm import (
from ..Base import Base
from ..mixins.XrefMixin import XrefMixin
class Item_Category(Base, XrefMixin):
fk_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), primary_key=True)
fk_category_uid: Mapped[int] = mapped_column(ForeignKey('Category.uid'), primary_key=True)
fk_item_uid: Mapped[int] = mapped_column(
ForeignKey("BookcaseItem.uid"), primary_key=True
)
fk_category_uid: Mapped[int] = mapped_column(
ForeignKey("Category.uid"), primary_key=True
)

@ -1,2 +1,7 @@
from .Item_Author import Item_Author
from .Item_Category import Item_Category
from .Item_Category import Item_Category
__all__ = [
"Item_Author",
"Item_Category",
]

@ -1,6 +1,6 @@
from .argument_parser import (
arg_parser,
devscripts_arg_parser,
arg_parser,
devscripts_arg_parser,
)
from .bookcase_item import (
create_bookcase_item_from_isbn,
@ -8,4 +8,14 @@ from .bookcase_item import (
)
from .config import Config
from .email import send_email
from .seed_test_data import seed_data
from .seed_test_data import seed_data
__all__ = [
"arg_parser",
"devscripts_arg_parser",
"Config",
"create_bookcase_item_from_isbn",
"is_valid_isbn",
"send_email",
"seed_data",
]

@ -1,66 +1,69 @@
from argparse import ArgumentParser
from pathlib import Path
def _is_valid_file(parser: ArgumentParser, arg: str) -> Path:
path = Path(arg)
if not path.is_file():
parser.error(f'The file {arg} does not exist!')
parser.error(f"The file {arg} does not exist!")
return path
arg_parser = ArgumentParser(
description = 'Worblehat library management system',
description="Worblehat library management system",
)
subparsers = arg_parser.add_subparsers(dest='command')
subparsers = arg_parser.add_subparsers(dest="command")
subparsers.add_parser(
'deadline-daemon',
help = 'Initialize a single pass of the daemon which sends deadline emails',
"deadline-daemon",
help="Initialize a single pass of the daemon which sends deadline emails",
)
subparsers.add_parser(
'cli',
help = 'Start the command line interface',
"cli",
help="Start the command line interface",
)
subparsers.add_parser(
'flask-dev',
help = 'Start the web interface in development mode',
"flask-dev",
help="Start the web interface in development mode",
)
subparsers.add_parser(
'flask-prod',
help = 'Start the web interface in production mode',
"flask-prod",
help="Start the web interface in production mode",
)
devscripts_arg_parser = subparsers.add_parser('devscripts', help='Run development scripts')
devscripts_subparsers = devscripts_arg_parser.add_subparsers(dest='script')
devscripts_arg_parser = subparsers.add_parser(
"devscripts", help="Run development scripts"
)
devscripts_subparsers = devscripts_arg_parser.add_subparsers(dest="script")
devscripts_subparsers.add_parser(
'seed-test-data',
help = 'Seed test data in the database',
"seed-test-data",
help="Seed test data in the database",
)
devscripts_subparsers.add_parser(
'seed-content-for-deadline-daemon',
help = 'Seed data tailorded for testing the deadline daemon, into the database',
"seed-content-for-deadline-daemon",
help="Seed data tailorded for testing the deadline daemon, into the database",
)
arg_parser.add_argument(
'-V',
'--version',
action = 'store_true',
help = 'Print version and exit',
"-V",
"--version",
action="store_true",
help="Print version and exit",
)
arg_parser.add_argument(
'-c',
'--config',
"-c",
"--config",
type=lambda x: _is_valid_file(arg_parser, x),
help = 'Path to config file',
dest = 'config_file',
metavar = 'FILE',
help="Path to config file",
dest="config_file",
metavar="FILE",
)
arg_parser.add_argument(
'-p',
'--print-config',
action = 'store_true',
help = 'Print configuration and quit',
"-p",
"--print-config",
action="store_true",
help="Print configuration and quit",
)

@ -10,22 +10,27 @@ from ..models import (
Language,
)
def is_valid_pvv_isbn(isbn: str) -> bool:
try:
int(isbn)
except ValueError:
return False
return len(isbn) == 8
try:
int(isbn)
except ValueError:
return False
return len(isbn) == 8
def is_valid_isbn(isbn: str) -> bool:
return any([
isbnlib.is_isbn10(isbn),
isbnlib.is_isbn13(isbn),
])
return any(
[
isbnlib.is_isbn10(isbn),
isbnlib.is_isbn13(isbn),
]
)
def create_bookcase_item_from_isbn(isbn: str, sql_session: Session) -> BookcaseItem | None:
def create_bookcase_item_from_isbn(
isbn: str, sql_session: Session
) -> BookcaseItem | None:
"""
This function fetches metadata for the given ISBN and creates a BookcaseItem from it.
It does so using a database connection to connect it to the correct authors and language
@ -43,18 +48,17 @@ def create_bookcase_item_from_isbn(isbn: str, sql_session: Session) -> BookcaseI
metadata = metadata[0]
bookcase_item = BookcaseItem(
name = metadata.title,
isbn = int(isbn.replace('-', '')),
name=metadata.title,
isbn=int(isbn.replace("-", "")),
)
if len(authors := metadata.authors) > 0:
for author in authors:
bookcase_item.authors.add(Author(author))
if (language := metadata.language):
if language := metadata.language:
bookcase_item.language = sql_session.scalars(
select(Language)
.where(Language.iso639_1_code == language)
select(Language).where(Language.iso639_1_code == language)
).one()
return bookcase_item
return bookcase_item

@ -18,30 +18,31 @@ class Config:
_config = None
_expected_config_file_locations = [
Path('./config.toml'),
Path('~/.config/worblehat/config.toml'),
Path('/var/lib/worblehat/config.toml'),
Path("./config.toml"),
Path("~/.config/worblehat/config.toml"),
Path("/var/lib/worblehat/config.toml"),
]
def __class_getitem__(cls, name: str) -> Any:
if cls._config is None:
raise RuntimeError('Configuration not loaded, call Config.load_configuration() first.')
raise RuntimeError(
"Configuration not loaded, call Config.load_configuration() first."
)
__config = cls._config
for attr in name.split('.'):
for attr in name.split("."):
__config = __config.get(attr)
if __config is None:
raise AttributeError(f'No such attribute: {name}')
raise AttributeError(f"No such attribute: {name}")
return __config
@staticmethod
def read_password(password_field: str) -> str:
if Path(password_field).is_file():
with open(password_field, 'r') as f:
return f.read()
else:
return password_field
if Path(password_field).is_file():
with open(password_field, "r") as f:
return f.read()
else:
return password_field
@classmethod
def _locate_configuration_file(cls) -> Path | None:
@ -49,48 +50,46 @@ class Config:
if path.is_file():
return path
@classmethod
def _load_configuration_from_file(cls, config_file_path: str | None) -> dict[str, any]:
def _load_configuration_from_file(
cls, config_file_path: str | None
) -> dict[str, any]:
if config_file_path is None:
config_file_path = cls._locate_configuration_file()
if config_file_path is None:
print('Error: could not locate configuration file.')
print("Error: could not locate configuration file.")
exit(1)
with open(config_file_path, 'rb') as config_file:
with open(config_file_path, "rb") as config_file:
args = tomllib.load(config_file)
return args
@classmethod
def db_string(cls) -> str:
db_type = cls._config.get('database').get('type')
db_type = cls._config.get("database").get("type")
if db_type == 'sqlite':
path = Path(cls._config.get('database').get('sqlite').get('path'))
if db_type == "sqlite":
path = Path(cls._config.get("database").get("sqlite").get("path"))
return f"sqlite:///{path.absolute()}"
elif db_type == 'postgresql':
db_config = cls._config.get('database').get('postgresql')
hostname = db_config.get('hostname')
port = db_config.get('port')
username = db_config.get('username')
password = cls.read_password(db_config.get('password'))
database = db_config.get('database')
elif db_type == "postgresql":
db_config = cls._config.get("database").get("postgresql")
hostname = db_config.get("hostname")
port = db_config.get("port")
username = db_config.get("username")
password = cls.read_password(db_config.get("password"))
database = db_config.get("database")
return f"psycopg2+postgresql://{username}:{password}@{hostname}:{port}/{database}"
else:
print(f"Error: unknown database type '{db_config.get('type')}'")
exit(1)
@classmethod
def debug(cls) -> str:
return pformat(cls._config)
@classmethod
def load_configuration(cls, args: dict[str, any]) -> dict[str, any]:
cls._config = cls._load_configuration_from_file(args.get('config_file'))
cls._config = cls._load_configuration_from_file(args.get("config_file"))

@ -10,26 +10,26 @@ from .config import Config
def send_email(to: str, subject: str, body: str):
msg = MIMEMultipart()
msg['From'] = Config['smtp.from']
msg['To'] = to
if Config['smtp.subject_prefix']:
msg['Subject'] = f"{Config['smtp.subject_prefix']} {subject}"
msg["From"] = Config["smtp.from"]
msg["To"] = to
if Config["smtp.subject_prefix"]:
msg["Subject"] = f"{Config['smtp.subject_prefix']} {subject}"
else:
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
if Config['smtp.enabled'] and not Config['deadline_daemon.dryrun']:
if Config["smtp.enabled"] and not Config["deadline_daemon.dryrun"]:
try:
with smtplib.SMTP(Config['smtp.host'], Config['smtp.port']) as server:
with smtplib.SMTP(Config["smtp.host"], Config["smtp.port"]) as server:
server.starttls()
server.login(
Config['smtp.username'],
Config.read_password(Config['smtp.password']),
Config["smtp.username"],
Config.read_password(Config["smtp.password"]),
)
server.sendmail(Config['smtp.from'], to, msg.as_string())
server.sendmail(Config["smtp.from"], to, msg.as_string())
except Exception as err:
print('Error: could not send email.')
print("Error: could not send email.")
print(err)
else:
print('Debug: Email sending is disabled, so the following email was not sent:')
print(indent(msg.as_string(), ' '))
print("Debug: Email sending is disabled, so the following email was not sent:")
print(indent(msg.as_string(), " "))

@ -3,26 +3,29 @@ from typing import Set
# TODO: Add more languages
LANGUAGES: set[str] = set([
"no",
"en",
"de",
"fr",
"es",
"it",
"sv",
"da",
"fi",
"ru",
"zh",
"ja",
"ko",
])
LANGUAGES: set[str] = set(
[
"no",
"en",
"de",
"fr",
"es",
"it",
"sv",
"da",
"fi",
"ru",
"zh",
"ja",
"ko",
]
)
@dataclass
class BookMetadata:
"""A class representing metadata for a book."""
isbn: str
title: str
# The source of the metadata provider
@ -35,28 +38,30 @@ class BookMetadata:
def to_dict(self) -> dict[str, any]:
return {
'isbn': self.isbn,
'title': self.title,
'source': self.metadata_source_id(),
'authors': set() if self.authors is None else self.authors,
'language': self.language,
'publish_date': self.publish_date,
'num_pages': self.num_pages,
'subjects': set() if self.subjects is None else self.subjects
"isbn": self.isbn,
"title": self.title,
"source": self.metadata_source_id(),
"authors": set() if self.authors is None else self.authors,
"language": self.language,
"publish_date": self.publish_date,
"num_pages": self.num_pages,
"subjects": set() if self.subjects is None else self.subjects,
}
def validate(self) -> None:
if not self.isbn:
raise ValueError('Missing ISBN')
raise ValueError("Missing ISBN")
if not self.title:
raise ValueError('Missing title')
raise ValueError("Missing title")
if not self.source:
raise ValueError('Missing source')
raise ValueError("Missing source")
if not self.authors:
raise ValueError('Missing authors')
raise ValueError("Missing authors")
if self.language is not None and self.language not in LANGUAGES:
raise ValueError(f'Invalid language: {self.language}. Consider adding it to the LANGUAGES set if you think this is a mistake.')
raise ValueError(
f"Invalid language: {self.language}. Consider adding it to the LANGUAGES set if you think this is a mistake."
)
if self.num_pages is not None and self.num_pages < 0:
raise ValueError(f'Invalid number of pages: {self.num_pages}')
raise ValueError(f"Invalid number of pages: {self.num_pages}")

@ -1,7 +1,8 @@
#base fetcher.
# base fetcher.
from abc import ABC, abstractmethod
from .BookMetadata import BookMetadata
class BookMetadataFetcher(ABC):
"""
A base class for metadata fetchers.
@ -17,4 +18,4 @@ class BookMetadataFetcher(ABC):
@abstractmethod
def fetch_metadata(cls, isbn: str) -> BookMetadata | None:
"""Tries to fetch metadata for the given ISBN."""
pass
pass

@ -11,14 +11,14 @@ from worblehat.services.metadata_fetchers.BookMetadata import BookMetadata
class GoogleBooksFetcher(BookMetadataFetcher):
@classmethod
def metadata_source_id(_cls) -> str:
return "google_books"
return "google_books"
@classmethod
def fetch_metadata(cls, isbn: str) -> BookMetadata | None:
try:
jsonInput = requests.get(
f"https://www.googleapis.com/books/v1/volumes",
params = {"q": f"isbn:{isbn}"},
"https://www.googleapis.com/books/v1/volumes",
params={"q": f"isbn:{isbn}"},
).json()
data = jsonInput.get("items")[0].get("volumeInfo")
@ -34,18 +34,18 @@ class GoogleBooksFetcher(BookMetadataFetcher):
return None
return BookMetadata(
isbn = isbn,
title = title,
source = cls.metadata_source_id(),
authors = authors,
language = languages,
publish_date = publishDate,
num_pages = numberOfPages,
subjects = subjects,
isbn=isbn,
title=title,
source=cls.metadata_source_id(),
authors=authors,
language=languages,
publish_date=publishDate,
num_pages=numberOfPages,
subjects=subjects,
)
if __name__ == '__main__':
book_data = GoogleBooksFetcher.fetch_metadata('0132624788')
if __name__ == "__main__":
book_data = GoogleBooksFetcher.fetch_metadata("0132624788")
book_data.validate()
print(book_data)
print(book_data)

@ -15,7 +15,7 @@ LANGUAGE_MAP = {
class OpenLibraryFetcher(BookMetadataFetcher):
@classmethod
def metadata_source_id(_cls) -> str:
return "open_library"
return "open_library"
@classmethod
def fetch_metadata(cls, isbn: str) -> BookMetadata | None:
@ -25,8 +25,12 @@ class OpenLibraryFetcher(BookMetadataFetcher):
author_keys = jsonInput.get("authors") or []
author_names = set()
for author_key in author_keys:
key = author_key.get('key')
author_name = requests.get(f"https://openlibrary.org/{key}.json").json().get("name")
key = author_key.get("key")
author_name = (
requests.get(f"https://openlibrary.org/{key}.json")
.json()
.get("name")
)
author_names.add(author_name)
title = jsonInput.get("title")
@ -37,25 +41,30 @@ class OpenLibraryFetcher(BookMetadataFetcher):
numberOfPages = int(numberOfPages)
language_key = jsonInput.get("languages")[0].get("key")
language = requests.get(f"https://openlibrary.org/{language_key}.json").json().get("identifiers").get("iso_639_1")[0]
language = (
requests.get(f"https://openlibrary.org/{language_key}.json")
.json()
.get("identifiers")
.get("iso_639_1")[0]
)
subjects = set(jsonInput.get("subjects") or [])
except Exception:
return None
return BookMetadata(
isbn = isbn,
title = title,
source = cls.metadata_source_id(),
authors = author_names,
language = language,
publish_date = publishDate,
num_pages = numberOfPages,
subjects = subjects,
isbn=isbn,
title=title,
source=cls.metadata_source_id(),
authors=author_names,
language=language,
publish_date=publishDate,
num_pages=numberOfPages,
subjects=subjects,
)
if __name__ == '__main__':
book_data = OpenLibraryFetcher.fetch_metadata('9788205530751')
if __name__ == "__main__":
book_data = OpenLibraryFetcher.fetch_metadata("9788205530751")
book_data.validate()
print(book_data)
print(book_data)

@ -30,7 +30,7 @@ LANGUAGE_MAP = {
class OutlandScraperFetcher(BookMetadataFetcher):
@classmethod
def metadata_source_id(_cls) -> str:
return "outland_scraper"
return "outland_scraper"
@classmethod
def fetch_metadata(cls, isbn: str) -> BookMetadata | None:
@ -50,7 +50,7 @@ class OutlandScraperFetcher(BookMetadataFetcher):
title = soup.find_all("span", class_="base")[0].text
releaseDate = soup.find_all("span", class_="release-date")[0].text.strip()
releaseDate = releaseDate[-4:] # only keep year
releaseDate = releaseDate[-4:] # only keep year
bookData = {
"Title": title,
@ -67,7 +67,7 @@ class OutlandScraperFetcher(BookMetadataFetcher):
"NumberOfPages": "Antall Sider",
"Genre": "Sjanger",
"Language": "Språk",
"Subjects": "Serie"
"Subjects": "Serie",
}
for value in data:
@ -92,18 +92,18 @@ class OutlandScraperFetcher(BookMetadataFetcher):
return None
return BookMetadata(
isbn = isbn,
title = bookData.get('Title'),
source = cls.metadata_source_id(),
authors = bookData.get('Authors'),
language = bookData.get('Language'),
publish_date = bookData.get('PublishDate'),
num_pages = bookData.get('NumberOfPages'),
subjects = bookData.get('Subjects'),
isbn=isbn,
title=bookData.get("Title"),
source=cls.metadata_source_id(),
authors=bookData.get("Authors"),
language=bookData.get("Language"),
publish_date=bookData.get("PublishDate"),
num_pages=bookData.get("NumberOfPages"),
subjects=bookData.get("Subjects"),
)
if __name__ == '__main__':
book_data = OutlandScraperFetcher.fetch_metadata('9781947808225')
if __name__ == "__main__":
book_data = OutlandScraperFetcher.fetch_metadata("9781947808225")
book_data.validate()
print(book_data)
print(book_data)

@ -1 +1,3 @@
from .book_metadata_fetcher import fetch_metadata_from_multiple_sources
from .book_metadata_fetcher import fetch_metadata_from_multiple_sources
__all__ = ["fetch_metadata_from_multiple_sources"]

@ -10,7 +10,9 @@ from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadat
from worblehat.services.metadata_fetchers.GoogleBooksFetcher import GoogleBooksFetcher
from worblehat.services.metadata_fetchers.OpenLibraryFetcher import OpenLibraryFetcher
from worblehat.services.metadata_fetchers.OutlandScraperFetcher import OutlandScraperFetcher
from worblehat.services.metadata_fetchers.OutlandScraperFetcher import (
OutlandScraperFetcher,
)
# The order of these fetchers determines the priority of the sources.
@ -46,14 +48,16 @@ def fetch_metadata_from_multiple_sources(isbn: str, strict=False) -> list[BookMe
The results are always ordered in the same way as the fetchers are listed in the FETCHERS list.
"""
isbn = isbn.replace('-', '').replace('_', '').strip().lower()
isbn = isbn.replace("-", "").replace("_", "").strip().lower()
if len(isbn) != 10 and len(isbn) != 13 and not isbn.isnumeric():
raise ValueError('Invalid ISBN')
raise ValueError("Invalid ISBN")
results: list[BookMetadata] = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(fetcher.fetch_metadata, isbn) for fetcher in FETCHERS]
futures = [
executor.submit(fetcher.fetch_metadata, isbn) for fetcher in FETCHERS
]
for future in futures:
result = future.result()
@ -67,14 +71,15 @@ def fetch_metadata_from_multiple_sources(isbn: str, strict=False) -> list[BookMe
if strict:
raise e
else:
print(f'Invalid metadata: {e}')
print(f"Invalid metadata: {e}")
results.remove(result)
return sort_metadata_by_priority(results)
if __name__ == '__main__':
if __name__ == "__main__":
from pprint import pprint
isbn = '0132624788'
isbn = "0132624788"
metadata = fetch_metadata_from_multiple_sources(isbn)
pprint(metadata)
pprint(metadata)

@ -17,20 +17,27 @@ from ..models import (
MediaType,
)
def seed_data(sql_session: Session = db.session):
media_types = [
MediaType(name='Book', description='A physical book'),
MediaType(name='Comic', description='A comic book'),
MediaType(name='Video Game', description='A digital game for computers or games consoles'),
MediaType(name='Tabletop Game', description='A physical game with cards, boards or similar')
MediaType(name="Book", description="A physical book"),
MediaType(name="Comic", description="A comic book"),
MediaType(
name="Video Game",
description="A digital game for computers or games consoles",
),
MediaType(
name="Tabletop Game",
description="A physical game with cards, boards or similar",
),
]
bookcases = [
Bookcase(name='Unnamed A', description='White case across dibbler'),
Bookcase(name='Unnamed B', description='Math case in the working room'),
Bookcase(name='Unnamed C', description='Large case in the working room'),
Bookcase(name='Unnamed D', description='White comics case in the hallway'),
Bookcase(name='Unnamed E', description='Wooden comics case in the hallway'),
Bookcase(name="Unnamed A", description="White case across dibbler"),
Bookcase(name="Unnamed B", description="Math case in the working room"),
Bookcase(name="Unnamed C", description="Large case in the working room"),
Bookcase(name="Unnamed D", description="White comics case in the hallway"),
Bookcase(name="Unnamed E", description="Wooden comics case in the hallway"),
]
shelfs = [
@ -39,82 +46,110 @@ def seed_data(sql_session: Session = db.session):
BookcaseShelf(row=2, column=0, bookcase=bookcases[0]),
BookcaseShelf(row=3, column=0, bookcase=bookcases[0], description="Hacking"),
BookcaseShelf(row=4, column=0, bookcase=bookcases[0], description="Hacking"),
BookcaseShelf(row=0, column=1, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=1, bookcase=bookcases[0]),
BookcaseShelf(row=2, column=1, bookcase=bookcases[0], description="DOS"),
BookcaseShelf(row=3, column=1, bookcase=bookcases[0], description="Food for thought"),
BookcaseShelf(
row=3, column=1, bookcase=bookcases[0], description="Food for thought"
),
BookcaseShelf(row=4, column=1, bookcase=bookcases[0], description="CPP"),
BookcaseShelf(row=0, column=2, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=2, bookcase=bookcases[0]),
BookcaseShelf(row=2, column=2, bookcase=bookcases[0], description="E = mc2"),
BookcaseShelf(row=3, column=2, bookcase=bookcases[0], description="OBJECTION!"),
BookcaseShelf(row=4, column=2, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=3, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=3, bookcase=bookcases[0], description="Big indonisian island"),
BookcaseShelf(
row=1, column=3, bookcase=bookcases[0], description="Big indonisian island"
),
BookcaseShelf(row=2, column=3, bookcase=bookcases[0]),
BookcaseShelf(row=3, column=3, bookcase=bookcases[0], description="Div science"),
BookcaseShelf(
row=3, column=3, bookcase=bookcases[0], description="Div science"
),
BookcaseShelf(row=4, column=3, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=4, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=4, bookcase=bookcases[0]),
BookcaseShelf(row=2, column=4, bookcase=bookcases[0], description="(not) computer vision"),
BookcaseShelf(row=3, column=4, bookcase=bookcases[0], description="Low voltage"),
BookcaseShelf(
row=2, column=4, bookcase=bookcases[0], description="(not) computer vision"
),
BookcaseShelf(
row=3, column=4, bookcase=bookcases[0], description="Low voltage"
),
BookcaseShelf(row=4, column=4, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=5, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=5, bookcase=bookcases[0]),
BookcaseShelf(row=2, column=5, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=3, column=5, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=0, bookcase=bookcases[1]),
BookcaseShelf(row=1, column=0, bookcase=bookcases[1], description="Kjellerarealer og komodovaraner"),
BookcaseShelf(
row=1,
column=0,
bookcase=bookcases[1],
description="Kjellerarealer og komodovaraner",
),
BookcaseShelf(row=2, column=0, bookcase=bookcases[1]),
BookcaseShelf(row=3, column=0, bookcase=bookcases[1], description="Quick mafs"),
BookcaseShelf(row=4, column=0, bookcase=bookcases[1]),
BookcaseShelf(row=0, column=0, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=0, bookcase=bookcases[2]),
BookcaseShelf(row=2, column=0, bookcase=bookcases[2], description="AI"),
BookcaseShelf(row=3, column=0, bookcase=bookcases[2], description="X86"),
BookcaseShelf(row=4, column=0, bookcase=bookcases[2], description="Humanoira"),
BookcaseShelf(row=5, column=0, bookcase=bookcases[2], description="Hvem monterte rørforsterker?"),
BookcaseShelf(
row=5,
column=0,
bookcase=bookcases[2],
description="Hvem monterte rørforsterker?",
),
BookcaseShelf(row=0, column=1, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=1, bookcase=bookcases[2], description="Div data"),
BookcaseShelf(row=2, column=1, bookcase=bookcases[2], description="Chemistry"),
BookcaseShelf(row=3, column=1, bookcase=bookcases[2], description="Soviet Phys. Techn. Phys"),
BookcaseShelf(row=4, column=1, bookcase=bookcases[2], description="Digitalteknikk"),
BookcaseShelf(
row=3,
column=1,
bookcase=bookcases[2],
description="Soviet Phys. Techn. Phys",
),
BookcaseShelf(
row=4, column=1, bookcase=bookcases[2], description="Digitalteknikk"
),
BookcaseShelf(row=5, column=1, bookcase=bookcases[2], description="Material"),
BookcaseShelf(row=0, column=2, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=2, bookcase=bookcases[2], description="Assembler / APL"),
BookcaseShelf(
row=1, column=2, bookcase=bookcases[2], description="Assembler / APL"
),
BookcaseShelf(row=2, column=2, bookcase=bookcases[2], description="Internet"),
BookcaseShelf(row=3, column=2, bookcase=bookcases[2], description="Algorithms"),
BookcaseShelf(row=4, column=2, bookcase=bookcases[2], description="Soviet Physics Jetp"),
BookcaseShelf(row=5, column=2, bookcase=bookcases[2], description="Død og pine"),
BookcaseShelf(
row=4, column=2, bookcase=bookcases[2], description="Soviet Physics Jetp"
),
BookcaseShelf(
row=5, column=2, bookcase=bookcases[2], description="Død og pine"
),
BookcaseShelf(row=0, column=3, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=3, bookcase=bookcases[2], description="Web"),
BookcaseShelf(row=2, column=3, bookcase=bookcases[2], description="Div languages"),
BookcaseShelf(
row=2, column=3, bookcase=bookcases[2], description="Div languages"
),
BookcaseShelf(row=3, column=3, bookcase=bookcases[2], description="Python"),
BookcaseShelf(row=4, column=3, bookcase=bookcases[2], description="D&D Minis"),
BookcaseShelf(row=5, column=3, bookcase=bookcases[2], description="Perl"),
BookcaseShelf(row=0, column=4, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=4, bookcase=bookcases[2], description="Knuth on programming"),
BookcaseShelf(row=2, column=4, bookcase=bookcases[2], description="Div languages"),
BookcaseShelf(row=3, column=4, bookcase=bookcases[2], description="Typesetting"),
BookcaseShelf(
row=1, column=4, bookcase=bookcases[2], description="Knuth on programming"
),
BookcaseShelf(
row=2, column=4, bookcase=bookcases[2], description="Div languages"
),
BookcaseShelf(
row=3, column=4, bookcase=bookcases[2], description="Typesetting"
),
BookcaseShelf(row=4, column=4, bookcase=bookcases[2]),
BookcaseShelf(row=0, column=0, bookcase=bookcases[3]),
BookcaseShelf(row=0, column=1, bookcase=bookcases[3]),
BookcaseShelf(row=0, column=2, bookcase=bookcases[3]),
BookcaseShelf(row=0, column=3, bookcase=bookcases[3]),
BookcaseShelf(row=0, column=4, bookcase=bookcases[3]),
BookcaseShelf(row=0, column=0, bookcase=bookcases[4]),
BookcaseShelf(row=0, column=1, bookcase=bookcases[4]),
BookcaseShelf(row=0, column=2, bookcase=bookcases[4]),
@ -132,24 +167,24 @@ def seed_data(sql_session: Session = db.session):
]
book1 = BookcaseItem(
name = "The Art of Computer Programming",
isbn = "9780201896831",
name="The Art of Computer Programming",
isbn="9780201896831",
)
book1.authors.add(authors[0])
book1.media_type = media_types[0]
book1.shelf = shelfs[59]
book2 = BookcaseItem(
name = "Harry Potter and the Philosopher's Stone",
isbn = "9780747532743",
name="Harry Potter and the Philosopher's Stone",
isbn="9780747532743",
)
book2.authors.add(authors[1])
book2.media_type = media_types[0]
book2.shelf = shelfs[-1]
book_owned_by_other_user = BookcaseItem(
name = "Book owned by other user",
isbn = "9780747532744",
name="Book owned by other user",
isbn="9780747532744",
)
book_owned_by_other_user.owner = "other_user"
@ -158,8 +193,8 @@ def seed_data(sql_session: Session = db.session):
book_owned_by_other_user.shelf = shelfs[-2]
borrowed_book_more_available = BookcaseItem(
name = "Borrowed book with more available",
isbn = "9780747532745",
name="Borrowed book with more available",
isbn="9780747532745",
)
borrowed_book_more_available.authors.add(authors[5])
borrowed_book_more_available.media_type = media_types[0]
@ -167,24 +202,24 @@ def seed_data(sql_session: Session = db.session):
borrowed_book_more_available.amount = 2
borrowed_book_no_more_available = BookcaseItem(
name = "Borrowed book with no more available",
isbn = "9780747532746",
name="Borrowed book with no more available",
isbn="9780747532746",
)
borrowed_book_no_more_available.authors.add(authors[5])
borrowed_book_no_more_available.media_type = media_types[0]
borrowed_book_no_more_available.shelf = shelfs[-3]
borrowed_book_people_in_queue = BookcaseItem(
name = "Borrowed book with people in queue",
isbn = "9780747532747",
name="Borrowed book with people in queue",
isbn="9780747532747",
)
borrowed_book_people_in_queue.authors.add(authors[5])
borrowed_book_people_in_queue.media_type = media_types[0]
borrowed_book_people_in_queue.shelf = shelfs[-3]
borrowed_book_by_slabbedask = BookcaseItem(
name = "Borrowed book by slabbedask",
isbn = "9780747532748",
name="Borrowed book by slabbedask",
isbn="9780747532748",
)
borrowed_book_by_slabbedask.authors.add(authors[5])
borrowed_book_by_slabbedask.media_type = media_types[0]
@ -216,9 +251,9 @@ def seed_data(sql_session: Session = db.session):
BookcaseItemBorrowingQueue(username="user", item=borrowed_book_people_in_queue),
]
with open(Path(__file__).parent.parent.parent / 'data' / 'iso639_1.csv') as f:
reader = csv.reader(f)
languages = [Language(name, code) for (code, name) in reader]
with open(Path(__file__).parent.parent.parent / "data" / "iso639_1.csv") as f:
reader = csv.reader(f)
languages = [Language(name, code) for (code, name) in reader]
sql_session.add_all(media_types)
sql_session.add_all(bookcases)
@ -229,4 +264,4 @@ def seed_data(sql_session: Session = db.session):
sql_session.add_all(borrowings)
sql_session.add_all(queue)
sql_session.commit()
print("Added test media types, bookcases and shelfs.")
print("Added test media types, bookcases and shelfs.")