diff --git a/src/worblehat/__init__.py b/src/worblehat/__init__.py index b668da9..21db616 100644 --- a/src/worblehat/__init__.py +++ b/src/worblehat/__init__.py @@ -1 +1,3 @@ -from .main import main \ No newline at end of file +from .main import main + +__all__ = ["main"] diff --git a/src/worblehat/cli/__init__.py b/src/worblehat/cli/__init__.py index b55ba85..4dfbc6a 100644 --- a/src/worblehat/cli/__init__.py +++ b/src/worblehat/cli/__init__.py @@ -1 +1,3 @@ -from .main import WorblehatCli \ No newline at end of file +from .main import WorblehatCli + +__all__ = ["WorblehatCli"] diff --git a/src/worblehat/cli/main.py b/src/worblehat/cli/main.py index 1d06adb..1f68328 100644 --- a/src/worblehat/cli/main.py +++ b/src/worblehat/cli/main.py @@ -30,19 +30,20 @@ from .subclis import ( # However, is there anyone who are going to search by category rather than just look in # the shelves? + class WorblehatCli(NumberedCmd): def __init__(self, sql_session: Session): super().__init__() self.sql_session = sql_session self.sql_session_dirty = False - @event.listens_for(self.sql_session, 'after_flush') + @event.listens_for(self.sql_session, "after_flush") def mark_session_as_dirty(*_): self.sql_session_dirty = True - self.prompt_header = f'(unsaved changes)' + self.prompt_header = "(unsaved changes)" - @event.listens_for(self.sql_session, 'after_commit') - @event.listens_for(self.sql_session, 'after_rollback') + @event.listens_for(self.sql_session, "after_commit") + @event.listens_for(self.sql_session, "after_rollback") def mark_session_as_clean(*_): self.sql_session_dirty = False self.prompt_header = None @@ -57,19 +58,20 @@ class WorblehatCli(NumberedCmd): if not tool.sql_session_dirty: exit(0) try: - print() - if prompt_yes_no('Are you sure you want to exit without saving?', default=False): - raise KeyboardInterrupt + print() + if prompt_yes_no( + "Are you sure you want to exit without saving?", default=False + ): + raise KeyboardInterrupt except KeyboardInterrupt: if tool.sql_session is not None: tool.sql_session.rollback() exit(0) - def do_show_bookcase(self, arg: str): bookcase_selector = InteractiveItemSelector( - cls = Bookcase, - sql_session = self.sql_session, + cls=Bookcase, + sql_session=self.sql_session, ) bookcase_selector.cmdloop() bookcase = bookcase_selector.result @@ -77,8 +79,7 @@ class WorblehatCli(NumberedCmd): for shelf in bookcase.shelfs: print(shelf.short_str()) for item in shelf.items: - print(f' {item.name} - {item.amount} copies') - + print(f" {item.name} - {item.amount} copies") def do_show_borrowed_queued(self, _: str): borrowed_items = self.sql_session.scalars( @@ -88,105 +89,114 @@ class WorblehatCli(NumberedCmd): ).all() if len(borrowed_items) == 0: - print('No borrowed items found.') + print("No borrowed items found.") else: - print('Borrowed items:') - for item in borrowed_items: - print(f'- {item.username} - {item.item.name} - to be delivered by {item.end_time.strftime("%Y-%m-%d")}') + print("Borrowed items:") + for item in borrowed_items: + print( + f"- {item.username} - {item.item.name} - to be delivered by {item.end_time.strftime('%Y-%m-%d')}" + ) print() queued_items = self.sql_session.scalars( - select(BookcaseItemBorrowingQueue) - .order_by(BookcaseItemBorrowingQueue.entered_queue_time), + select(BookcaseItemBorrowingQueue).order_by( + BookcaseItemBorrowingQueue.entered_queue_time + ), ).all() if len(queued_items) == 0: - print('No queued items found.') + print("No queued items found.") else: - print('Users in queue:') - for item in queued_items: - print(f'- {item.username} - {item.item.name} - entered queue at {item.entered_queue_time.strftime("%Y-%m-%d")}') - + print("Users in queue:") + for item in queued_items: + print( + f"- {item.username} - {item.item.name} - entered queue at {item.entered_queue_time.strftime('%Y-%m-%d')}" + ) def _create_bookcase_item(self, isbn: str): bookcase_item = create_bookcase_item_from_isbn(isbn, self.sql_session) if bookcase_item is None: - print(f'Could not find data about item with ISBN {isbn} online.') - print(f'If you think this is not due to a bug, please add the book to openlibrary.org before continuing.') + print(f"Could not find data about item with ISBN {isbn} online.") + print( + "If you think this is not due to a bug, please add the book to openlibrary.org before continuing." + ) return else: - print(dedent(f""" + print( + dedent(f""" Found item: title: {bookcase_item.name} - authors: {', '.join(a.name for a in bookcase_item.authors)} + authors: {", ".join(a.name for a in bookcase_item.authors)} language: {bookcase_item.language} - """)) + """) + ) - print('Please select the bookcase where the item is placed:') + print("Please select the bookcase where the item is placed:") bookcase_selector = InteractiveItemSelector( - cls = Bookcase, - sql_session = self.sql_session, + cls=Bookcase, + sql_session=self.sql_session, ) bookcase_selector.cmdloop() bookcase = bookcase_selector.result bookcase_item.shelf = select_bookcase_shelf(bookcase, self.sql_session) - print('Please select the items media type:') + print("Please select the items media type:") media_type_selector = InteractiveItemSelector( - cls = MediaType, - sql_session = self.sql_session, - default = self.sql_session.scalars( - select(MediaType) - .where(MediaType.name.ilike("book")), + cls=MediaType, + sql_session=self.sql_session, + default=self.sql_session.scalars( + select(MediaType).where(MediaType.name.ilike("book")), ).one(), ) media_type_selector.cmdloop() bookcase_item.media_type = media_type_selector.result - username = input('Who owns this book? [PVV]> ') - if username != '': + username = input("Who owns this book? [PVV]> ") + if username != "": bookcase_item.owner = username self.sql_session.add(bookcase_item) self.sql_session.flush() - def default(self, isbn: str): isbn = isbn.strip() if not is_valid_isbn(isbn): super()._default(isbn) return - if (existing_item := self.sql_session.scalars( - select(BookcaseItem) - .where(BookcaseItem.isbn == isbn) - .join(BookcaseItemBorrowing) - .join(BookcaseItemBorrowingQueue) - ).one_or_none()) is not None: + if ( + existing_item := self.sql_session.scalars( + select(BookcaseItem) + .where(BookcaseItem.isbn == isbn) + .join(BookcaseItemBorrowing) + .join(BookcaseItemBorrowingQueue) + ).one_or_none() + ) is not None: print(f'\nFound existing item for isbn "{isbn}"') BookcaseItemCli( - sql_session = self.sql_session, - bookcase_item = existing_item, + sql_session=self.sql_session, + bookcase_item=existing_item, ).cmdloop() return - if prompt_yes_no(f"Could not find item with ISBN '{isbn}'.\nWould you like to create it?", default=True): + if prompt_yes_no( + f"Could not find item with ISBN '{isbn}'.\nWould you like to create it?", + default=True, + ): self._create_bookcase_item(isbn) - def do_search(self, _: str): search_cli = SearchCli(self.sql_session) search_cli.cmdloop() if search_cli.result is not None: BookcaseItemCli( - sql_session = self.sql_session, - bookcase_item = search_cli.result, + sql_session=self.sql_session, + bookcase_item=search_cli.result, ).cmdloop() - def do_show_slabbedasker(self, _: str): slubberter = self.sql_session.scalars( select(BookcaseItemBorrowing) @@ -201,76 +211,73 @@ class WorblehatCli(NumberedCmd): ).all() if len(slubberter) == 0: - print('No slubberts found. Life is good.') + print("No slubberts found. Life is good.") return for slubbert in slubberter: - print('Slubberter:') - print(f'- {slubbert.username} - {slubbert.item.name} - {slubbert.end_time.strftime("%Y-%m-%d")}') - + print("Slubberter:") + print( + f"- {slubbert.username} - {slubbert.item.name} - {slubbert.end_time.strftime('%Y-%m-%d')}" + ) def do_advanced(self, _: str): AdvancedOptionsCli(self.sql_session).cmdloop() - - def do_save(self, _:str): + def do_save(self, _: str): if not self.sql_session_dirty: - print('No changes to save.') + print("No changes to save.") return self.sql_session.commit() - - def do_abort(self, _:str): + def do_abort(self, _: str): if not self.sql_session_dirty: - print('No changes to abort.') + print("No changes to abort.") return self.sql_session.rollback() - def do_exit(self, _: str): if self.sql_session_dirty: - if prompt_yes_no('Would you like to save your changes?'): + if prompt_yes_no("Would you like to save your changes?"): self.sql_session.commit() else: self.sql_session.rollback() exit(0) - funcs = { 0: { - 'f': default, - 'doc': 'Choose / Add item with its ISBN', + "f": default, + "doc": "Choose / Add item with its ISBN", }, 1: { - 'f': do_search, - 'doc': 'Search', + "f": do_search, + "doc": "Search", }, 2: { - 'f': do_show_bookcase, - 'doc': 'Show a bookcase, and its items', + "f": do_show_bookcase, + "doc": "Show a bookcase, and its items", }, 3: { - 'f': do_show_borrowed_queued, - 'doc': 'Show borrowed/queued items', + "f": do_show_borrowed_queued, + "doc": "Show borrowed/queued items", }, 4: { - 'f': do_show_slabbedasker, - 'doc': 'Show slabbedasker', + "f": do_show_slabbedasker, + "doc": "Show slabbedasker", }, 5: { - 'f': do_save, - 'doc': 'Save changes', + "f": do_save, + "doc": "Save changes", }, 6: { - 'f': do_abort, - 'doc': 'Abort changes', + "f": do_abort, + "doc": "Abort changes", }, 7: { - 'f': do_advanced, - 'doc': 'Advanced options', + "f": do_advanced, + "doc": "Advanced options", }, 9: { - 'f': do_exit, - 'doc': 'Exit', + "f": do_exit, + "doc": "Exit", }, } diff --git a/src/worblehat/cli/subclis/__init__.py b/src/worblehat/cli/subclis/__init__.py index 68eef45..69ade3e 100644 --- a/src/worblehat/cli/subclis/__init__.py +++ b/src/worblehat/cli/subclis/__init__.py @@ -1,4 +1,11 @@ from .advanced_options import AdvancedOptionsCli from .bookcase_item import BookcaseItemCli from .bookcase_shelf_selector import select_bookcase_shelf -from .search import SearchCli \ No newline at end of file +from .search import SearchCli + +__all__ = [ + "AdvancedOptionsCli", + "BookcaseItemCli", + "select_bookcase_shelf", + "SearchCli", +] diff --git a/src/worblehat/cli/subclis/advanced_options.py b/src/worblehat/cli/subclis/advanced_options.py index 49580c7..ebb78b7 100644 --- a/src/worblehat/cli/subclis/advanced_options.py +++ b/src/worblehat/cli/subclis/advanced_options.py @@ -4,81 +4,84 @@ from sqlalchemy.orm import Session from libdib.repl import ( InteractiveItemSelector, NumberedCmd, - format_date, - prompt_yes_no, ) from worblehat.models import Bookcase, BookcaseShelf + class AdvancedOptionsCli(NumberedCmd): def __init__(self, sql_session: Session): super().__init__() self.sql_session = sql_session - def do_add_bookcase(self, _: str): while True: - name = input('Name of bookcase> ') - if name == '': - print('Error: name cannot be empty') + name = input("Name of bookcase> ") + if name == "": + print("Error: name cannot be empty") continue - if self.sql_session.scalars( - select(Bookcase) - .where(Bookcase.name == name) - ).one_or_none() is not None: - print(f'Error: a bookcase with name {name} already exists') + if ( + self.sql_session.scalars( + select(Bookcase).where(Bookcase.name == name) + ).one_or_none() + is not None + ): + print(f"Error: a bookcase with name {name} already exists") continue break - description = input('Description of bookcase> ') - if description == '': + description = input("Description of bookcase> ") + if description == "": description = None bookcase = Bookcase(name, description) self.sql_session.add(bookcase) self.sql_session.flush() - def do_add_bookcase_shelf(self, arg: str): bookcase_selector = InteractiveItemSelector( - cls = Bookcase, - sql_session = self.sql_session, + cls=Bookcase, + sql_session=self.sql_session, ) bookcase_selector.cmdloop() bookcase = bookcase_selector.result while True: - column = input('Column> ') + column = input("Column> ") try: column = int(column) except ValueError: - print('Error: column must be a number') + print("Error: column must be a number") continue break while True: - row = input('Row> ') + row = input("Row> ") try: row = int(row) except ValueError: - print('Error: row must be a number') + print("Error: row must be a number") continue break - if self.sql_session.scalars( - select(BookcaseShelf) - .where( - BookcaseShelf.bookcase == bookcase, - BookcaseShelf.column == column, - BookcaseShelf.row == row, + if ( + self.sql_session.scalars( + select(BookcaseShelf).where( + BookcaseShelf.bookcase == bookcase, + BookcaseShelf.column == column, + BookcaseShelf.row == row, + ) + ).one_or_none() + is not None + ): + print( + f"Error: a bookshelf in bookcase {bookcase.name} with position c{column}-r{row} already exists" ) - ).one_or_none() is not None: - print(f'Error: a bookshelf in bookcase {bookcase.name} with position c{column}-r{row} already exists') return - description = input('Description> ') - if description == '': + description = input("Description> ") + if description == "": description = None shelf = BookcaseShelf( @@ -90,15 +93,14 @@ class AdvancedOptionsCli(NumberedCmd): self.sql_session.add(shelf) self.sql_session.flush() - def do_list_bookcases(self, _: str): bookcase_shelfs = self.sql_session.scalars( select(BookcaseShelf) .join(Bookcase) .order_by( - Bookcase.name, - BookcaseShelf.column, - BookcaseShelf.row, + Bookcase.name, + BookcaseShelf.column, + BookcaseShelf.row, ) ).all() @@ -108,28 +110,26 @@ class AdvancedOptionsCli(NumberedCmd): print(shelf.bookcase.short_str()) bookcase_uid = shelf.bookcase.uid - print(f' {shelf.short_str()} - {sum(i.amount for i in shelf.items)} items') - + print(f" {shelf.short_str()} - {sum(i.amount for i in shelf.items)} items") def do_done(self, _: str): return True - funcs = { 1: { - 'f': do_add_bookcase, - 'doc': 'Add bookcase', + "f": do_add_bookcase, + "doc": "Add bookcase", }, 2: { - 'f': do_add_bookcase_shelf, - 'doc': 'Add bookcase shelf', + "f": do_add_bookcase_shelf, + "doc": "Add bookcase shelf", }, 3: { - 'f': do_list_bookcases, - 'doc': 'List all bookcases', + "f": do_list_bookcases, + "doc": "List all bookcases", }, 9: { - 'f': do_done, - 'doc': 'Done', + "f": do_done, + "doc": "Done", }, } diff --git a/src/worblehat/cli/subclis/bookcase_item.py b/src/worblehat/cli/subclis/bookcase_item.py index a86c93d..44abb25 100644 --- a/src/worblehat/cli/subclis/bookcase_item.py +++ b/src/worblehat/cli/subclis/bookcase_item.py @@ -27,16 +27,18 @@ from worblehat.services.config import Config from .bookcase_shelf_selector import select_bookcase_shelf + def _selected_bookcase_item_prompt(bookcase_item: BookcaseItem) -> str: amount_borrowed = len(bookcase_item.borrowings) - return dedent(f''' + return dedent(f""" Item: {bookcase_item.name} ISBN: {bookcase_item.isbn} - Authors: {', '.join(a.name for a in bookcase_item.authors)} + Authors: {", ".join(a.name for a in bookcase_item.authors)} Bookcase: {bookcase_item.shelf.bookcase.short_str()} Shelf: {bookcase_item.shelf.short_str()} Amount: {bookcase_item.amount - amount_borrowed}/{bookcase_item.amount} - ''') + """) + class BookcaseItemCli(NumberedCmd): def __init__(self, sql_session: Session, bookcase_item: BookcaseItem): @@ -44,12 +46,10 @@ class BookcaseItemCli(NumberedCmd): self.sql_session = sql_session self.bookcase_item = bookcase_item - @property def prompt_header(self) -> str: return _selected_bookcase_item_prompt(self.bookcase_item) - def do_update_data(self, _: str): item = create_bookcase_item_from_isbn(self.sql_session, self.bookcase_item.isbn) self.bookcase_item.name = item.name @@ -58,80 +58,83 @@ class BookcaseItemCli(NumberedCmd): self.bookcase_item.language = item.language self.sql_session.flush() - def do_edit(self, arg: str): EditBookcaseCli(self.sql_session, self.bookcase_item, self).cmdloop() - @staticmethod def _prompt_username() -> str: while True: - username = input('Username: ') - if prompt_yes_no(f'Is {username} correct?', default = True): + username = input("Username: ") + if prompt_yes_no(f"Is {username} correct?", default=True): return username - def _has_active_borrowing(self, username: str) -> bool: - return self.sql_session.scalars( - select(BookcaseItemBorrowing) - .where( - BookcaseItemBorrowing.username == username, - BookcaseItemBorrowing.item == self.bookcase_item, - BookcaseItemBorrowing.delivered.is_(None), - ) - ).one_or_none() is not None - + return ( + self.sql_session.scalars( + select(BookcaseItemBorrowing).where( + BookcaseItemBorrowing.username == username, + BookcaseItemBorrowing.item == self.bookcase_item, + BookcaseItemBorrowing.delivered.is_(None), + ) + ).one_or_none() + is not None + ) def _has_borrowing_queue_item(self, username: str) -> bool: - return self.sql_session.scalars( - select(BookcaseItemBorrowingQueue) - .where( - BookcaseItemBorrowingQueue.username == username, - BookcaseItemBorrowingQueue.item == self.bookcase_item, - ) - ).one_or_none() is not None - + return ( + self.sql_session.scalars( + select(BookcaseItemBorrowingQueue).where( + BookcaseItemBorrowingQueue.username == username, + BookcaseItemBorrowingQueue.item == self.bookcase_item, + ) + ).one_or_none() + is not None + ) def do_borrow(self, _: str): active_borrowings = self.sql_session.scalars( select(BookcaseItemBorrowing) .where( - BookcaseItemBorrowing.item == self.bookcase_item, - BookcaseItemBorrowing.delivered.is_(None), + BookcaseItemBorrowing.item == self.bookcase_item, + BookcaseItemBorrowing.delivered.is_(None), ) .order_by(BookcaseItemBorrowing.end_time) ).all() if len(active_borrowings) >= self.bookcase_item.amount: - print('This item is currently not available') + print("This item is currently not available") print() - print('Active borrowings:') + print("Active borrowings:") for b in active_borrowings: - print(f' {b.username} - Until {format_date(b.end_time)}') + print(f" {b.username} - Until {format_date(b.end_time)}") if len(self.bookcase_item.borrowing_queue) > 0: - print('Borrowing queue:') + print("Borrowing queue:") for i, b in enumerate(self.bookcase_item.borrowing_queue): - print(f' {i + 1} - {b.username}') + print(f" {i + 1} - {b.username}") print() - if not prompt_yes_no('Would you like to enter the borrowing queue?', default = True): + if not prompt_yes_no( + "Would you like to enter the borrowing queue?", default=True + ): return username = self._prompt_username() if self._has_active_borrowing(username): - print('You already have an active borrowing') + print("You already have an active borrowing") return if self._has_borrowing_queue_item(username): - print('You are already in the borrowing queue') + print("You are already in the borrowing queue") return - borrowing_queue_item = BookcaseItemBorrowingQueue(username, self.bookcase_item) + borrowing_queue_item = BookcaseItemBorrowingQueue( + username, self.bookcase_item + ) self.sql_session.add(borrowing_queue_item) - print(f'{username} entered the queue!') + print(f"{username} entered the queue!") return username = self._prompt_username() @@ -139,33 +142,38 @@ class BookcaseItemCli(NumberedCmd): borrowing_item = BookcaseItemBorrowing(username, self.bookcase_item) self.sql_session.add(borrowing_item) self.sql_session.flush() - print(f'Successfully borrowed the item. Please deliver it back by {format_date(borrowing_item.end_time)}') + print( + f"Successfully borrowed the item. Please deliver it back by {format_date(borrowing_item.end_time)}" + ) def do_deliver(self, _: str): borrowings = self.sql_session.scalars( select(BookcaseItemBorrowing) - .join(BookcaseItem, BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid) + .join( + BookcaseItem, + BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid, + ) .where(BookcaseItem.isbn == self.bookcase_item.isbn) .order_by(BookcaseItemBorrowing.username) ).all() if len(borrowings) == 0: - print('No one seems to have borrowed this item') + print("No one seems to have borrowed this item") return - print('Borrowers:') + print("Borrowers:") for i, b in enumerate(borrowings): - print(f' {i + 1}) {b.username}') + print(f" {i + 1}) {b.username}") while True: try: - selection = int(input('> ')) + selection = int(input("> ")) except ValueError: - print('Error: selection must be an integer') + print("Error: selection must be an integer") continue if selection < 1 or selection > len(borrowings): - print('Error: selection out of range') + print("Error: selection out of range") continue break @@ -173,19 +181,21 @@ class BookcaseItemCli(NumberedCmd): borrowing = borrowings[selection - 1] borrowing.delivered = datetime.now() self.sql_session.flush() - print(f'Successfully delivered the item for {borrowing.username}') - + print(f"Successfully delivered the item for {borrowing.username}") def do_extend_borrowing(self, _: str): borrowings = self.sql_session.scalars( select(BookcaseItemBorrowing) - .join(BookcaseItem, BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid) + .join( + BookcaseItem, + BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid, + ) .where(BookcaseItem.isbn == self.bookcase_item.isbn) .order_by(BookcaseItemBorrowing.username) ).all() if len(borrowings) == 0: - print('No one seems to have borrowed this item') + print("No one seems to have borrowed this item") return borrowing_queue = self.sql_session.scalars( @@ -198,61 +208,68 @@ class BookcaseItemCli(NumberedCmd): ).all() if len(borrowing_queue) != 0: - print('Sorry, you cannot extend the borrowing because there are people waiting in the queue') - print('Borrowing queue:') - for i, b in enumerate(borrowing_queue): - print(f' {i + 1}) {b.username}') - return + print( + "Sorry, you cannot extend the borrowing because there are people waiting in the queue" + ) + print("Borrowing queue:") + for i, b in enumerate(borrowing_queue): + print(f" {i + 1}) {b.username}") + return - print('Who are you?') + print("Who are you?") selector = NumberedItemSelector( - items = list(borrowings), - stringify = lambda b: f'{b.username} - Until {format_date(b.end_time)}', + items=list(borrowings), + stringify=lambda b: f"{b.username} - Until {format_date(b.end_time)}", ) selector.cmdloop() if selector.result is None: return borrowing = selector.result - borrowing.end_time = datetime.now() + timedelta(days=int(Config['deadline_daemon.days_before_queue_position_expires'])) + borrowing.end_time = datetime.now() + timedelta( + days=int(Config["deadline_daemon.days_before_queue_position_expires"]) + ) self.sql_session.flush() - print(f'Successfully extended the borrowing for {borrowing.username} until {format_date(borrowing.end_time)}') - + print( + f"Successfully extended the borrowing for {borrowing.username} until {format_date(borrowing.end_time)}" + ) def do_done(self, _: str): return True - funcs = { 1: { - 'f': do_borrow, - 'doc': 'Borrow', + "f": do_borrow, + "doc": "Borrow", }, 2: { - 'f': do_deliver, - 'doc': 'Deliver', + "f": do_deliver, + "doc": "Deliver", }, 3: { - 'f': do_extend_borrowing, - 'doc': 'Extend borrowing', + "f": do_extend_borrowing, + "doc": "Extend borrowing", }, 4: { - 'f': do_edit, - 'doc': 'Edit', + "f": do_edit, + "doc": "Edit", }, 5: { - 'f': do_update_data, - 'doc': 'Pull updated data from online databases', + "f": do_update_data, + "doc": "Pull updated data from online databases", }, 9: { - 'f': do_done, - 'doc': 'Done', + "f": do_done, + "doc": "Done", }, } + class EditBookcaseCli(NumberedCmd): - def __init__(self, sql_session: Session, bookcase_item: BookcaseItem, parent: BookcaseItemCli): + def __init__( + self, sql_session: Session, bookcase_item: BookcaseItem, parent: BookcaseItemCli + ): super().__init__() self.sql_session = sql_session self.bookcase_item = bookcase_item @@ -260,54 +277,56 @@ class EditBookcaseCli(NumberedCmd): @property def prompt_header(self) -> str: - return _selected_bookcase_item_prompt(self.bookcase_item) + return _selected_bookcase_item_prompt(self.bookcase_item) def do_name(self, _: str): while True: - name = input('New name> ') - if name == '': - print('Error: name cannot be empty') + name = input("New name> ") + if name == "": + print("Error: name cannot be empty") continue - if self.sql_session.scalars( - select(BookcaseItem) - .where(BookcaseItem.name == name) - ).one_or_none() is not None: - print(f'Error: an item with name {name} already exists') + if ( + self.sql_session.scalars( + select(BookcaseItem).where(BookcaseItem.name == name) + ).one_or_none() + is not None + ): + print(f"Error: an item with name {name} already exists") continue break self.bookcase_item.name = name self.sql_session.flush() - def do_isbn(self, _: str): while True: - isbn = input('New ISBN> ') - if isbn == '': - print('Error: ISBN cannot be empty') + isbn = input("New ISBN> ") + if isbn == "": + print("Error: ISBN cannot be empty") continue if not is_valid_isbn(isbn): - print('Error: ISBN is not valid') + print("Error: ISBN is not valid") continue - if self.sql_session.scalars( - select(BookcaseItem) - .where(BookcaseItem.isbn == isbn) - ).one_or_none() is not None: - print(f'Error: an item with ISBN {isbn} already exists') + if ( + self.sql_session.scalars( + select(BookcaseItem).where(BookcaseItem.isbn == isbn) + ).one_or_none() + is not None + ): + print(f"Error: an item with ISBN {isbn} already exists") continue break self.bookcase_item.isbn = isbn - if prompt_yes_no('Update data from online databases?'): - self.parent.do_update_data('') + if prompt_yes_no("Update data from online databases?"): + self.parent.do_update_data("") self.sql_session.flush() - def do_language(self, _: str): language_selector = InteractiveItemSelector( Language, @@ -317,7 +336,6 @@ class EditBookcaseCli(NumberedCmd): self.bookcase_item.language = language_selector.result self.sql_session.flush() - def do_media_type(self, _: str): media_type_selector = InteractiveItemSelector( MediaType, @@ -327,24 +345,24 @@ class EditBookcaseCli(NumberedCmd): self.bookcase_item.media_type = media_type_selector.result self.sql_session.flush() - def do_amount(self, _: str): - while (new_amount := input(f'New amount [{self.bookcase_item.amount}]> ')) != '': + while ( + new_amount := input(f"New amount [{self.bookcase_item.amount}]> ") + ) != "": try: new_amount = int(new_amount) except ValueError: - print('Error: amount must be an integer') + print("Error: amount must be an integer") continue if new_amount < 1: - print('Error: amount must be greater than 0') + print("Error: amount must be greater than 0") continue break self.bookcase_item.amount = new_amount self.sql_session.flush() - def do_shelf(self, _: str): bookcase_selector = InteractiveItemSelector( Bookcase, @@ -358,38 +376,36 @@ class EditBookcaseCli(NumberedCmd): self.bookcase_item.shelf = shelf self.sql_session.flush() - def do_done(self, _: str): return True - funcs = { 1: { - 'f': do_name, - 'doc': 'Change name', + "f": do_name, + "doc": "Change name", }, 2: { - 'f': do_isbn, - 'doc': 'Change ISBN', + "f": do_isbn, + "doc": "Change ISBN", }, 3: { - 'f': do_language, - 'doc': 'Change language', + "f": do_language, + "doc": "Change language", }, 4: { - 'f': do_media_type, - 'doc': 'Change media type', + "f": do_media_type, + "doc": "Change media type", }, 5: { - 'f': do_amount, - 'doc': 'Change amount', + "f": do_amount, + "doc": "Change amount", }, 6: { - 'f': do_shelf, - 'doc': 'Change shelf', + "f": do_shelf, + "doc": "Change shelf", }, 9: { - 'f': do_done, - 'doc': 'Done', + "f": do_done, + "doc": "Done", }, } diff --git a/src/worblehat/cli/subclis/bookcase_shelf_selector.py b/src/worblehat/cli/subclis/bookcase_shelf_selector.py index 534a13e..31cbb43 100644 --- a/src/worblehat/cli/subclis/bookcase_shelf_selector.py +++ b/src/worblehat/cli/subclis/bookcase_shelf_selector.py @@ -8,16 +8,17 @@ from worblehat.models import ( BookcaseShelf, ) + def select_bookcase_shelf( bookcase: Bookcase, sql_session: Session, - prompt: str = "Please select the shelf where the item is placed (col-row):" + prompt: str = "Please select the shelf where the item is placed (col-row):", ) -> BookcaseShelf: def __complete_bookshelf_selection(session: Session, cls: type, arg: str): - args = arg.split('-') + args = arg.split("-") query = select(cls.row, cls.column).where(cls.bookcase == bookcase) try: - if arg != '' and len(args) > 0: + if arg != "" and len(args) > 0: query = query.where(cls.column == int(args[0])) if len(args) > 1: query = query.where(cls.row == int(args[1])) @@ -25,21 +26,20 @@ def select_bookcase_shelf( return [] result = session.execute(query).all() - return [f"{c}-{r}" for r,c in result] + return [f"{c}-{r}" for r, c in result] print(prompt) bookcase_shelf_selector = InteractiveItemSelector( - cls = BookcaseShelf, - sql_session = sql_session, - execute_selection = lambda session, cls, arg: session.scalars( - select(cls) - .where( - cls.bookcase == bookcase, - cls.column == int(arg.split('-')[0]), - cls.row == int(arg.split('-')[1]), + cls=BookcaseShelf, + sql_session=sql_session, + execute_selection=lambda session, cls, arg: session.scalars( + select(cls).where( + cls.bookcase == bookcase, + cls.column == int(arg.split("-")[0]), + cls.row == int(arg.split("-")[1]), ) ).all(), - complete_selection = __complete_bookshelf_selection, + complete_selection=__complete_bookshelf_selection, ) bookcase_shelf_selector.cmdloop() diff --git a/src/worblehat/cli/subclis/search.py b/src/worblehat/cli/subclis/search.py index 32382f2..9f956d4 100644 --- a/src/worblehat/cli/subclis/search.py +++ b/src/worblehat/cli/subclis/search.py @@ -15,54 +15,51 @@ class SearchCli(NumberedCmd): self.sql_session = sql_session self.result = None - def do_search_all(self, _: str): - print('TODO: Implement search all') - + print("TODO: Implement search all") def do_search_title(self, _: str): - while (input_text := input('Enter title: ')) == '': + while (input_text := input("Enter title: ")) == "": pass items = self.sql_session.scalars( - select(BookcaseItem) - .where(BookcaseItem.name.ilike(f'%{input_text}%')), + select(BookcaseItem).where(BookcaseItem.name.ilike(f"%{input_text}%")), ).all() if len(items) == 0: - print('No items found.') + print("No items found.") return selector = NumberedItemSelector( - items = items, - stringify = lambda item: f"{item.name} ({item.isbn})", + items=items, + stringify=lambda item: f"{item.name} ({item.isbn})", ) selector.cmdloop() if selector.result is not None: self.result = selector.result return True - def do_search_author(self, _: str): - while (input_text := input('Enter author name: ')) == '': + while (input_text := input("Enter author name: ")) == "": pass author = self.sql_session.scalars( - select(Author) - .where(Author.name.ilike(f'%{input_text}%')), + select(Author).where(Author.name.ilike(f"%{input_text}%")), ).all() if len(author) == 0: - print('No authors found.') + print("No authors found.") return elif len(author) == 1: selected_author = author[0] - print('Found author:') - print(f" {selected_author.name} ({sum(item.amount for item in selected_author.items)} items)") + print("Found author:") + print( + f" {selected_author.name} ({sum(item.amount for item in selected_author.items)} items)" + ) else: selector = NumberedItemSelector( - items = author, - stringify = lambda author: f"{author.name} ({sum(item.amount for item in author.items)} items)", + items=author, + stringify=lambda author: f"{author.name} ({sum(item.amount for item in author.items)} items)", ) selector.cmdloop() if selector.result is None: @@ -70,77 +67,73 @@ class SearchCli(NumberedCmd): selected_author = selector.result selector = NumberedItemSelector( - items = list(selected_author.items), - stringify = lambda item: f"{item.name} ({item.isbn})", + items=list(selected_author.items), + stringify=lambda item: f"{item.name} ({item.isbn})", ) selector.cmdloop() if selector.result is not None: self.result = selector.result return True - def do_search_owner(self, _: str): - while (input_text := input('Enter username: ')) == '': + while (input_text := input("Enter username: ")) == "": pass users = self.sql_session.scalars( select(BookcaseItem.owner) - .where(BookcaseItem.owner.ilike(f'%{input_text}%')) + .where(BookcaseItem.owner.ilike(f"%{input_text}%")) .distinct(), ).all() if len(users) == 0: - print('No users found.') + print("No users found.") return elif len(users) == 1: selected_user = users[0] - print('Found user:') + print("Found user:") print(f" {selected_user}") else: - selector = NumberedItemSelector(items = users) + selector = NumberedItemSelector(items=users) selector.cmdloop() if selector.result is None: return selected_user = selector.result items = self.sql_session.scalars( - select(BookcaseItem) - .where(BookcaseItem.owner == selected_user), + select(BookcaseItem).where(BookcaseItem.owner == selected_user), ).all() selector = NumberedItemSelector( - items = items, - stringify = lambda item: f"{item.name} ({item.isbn})", + items=items, + stringify=lambda item: f"{item.name} ({item.isbn})", ) selector.cmdloop() if selector.result is not None: self.result = selector.result return True - def do_done(self, _: str): return True - funcs = { 1: { - 'f': do_search_all, - 'doc': 'Search everything', + "f": do_search_all, + "doc": "Search everything", }, 2: { - 'f': do_search_title, - 'doc': 'Search by title', + "f": do_search_title, + "doc": "Search by title", }, 3: { - 'f': do_search_author, - 'doc': 'Search by author', + "f": do_search_author, + "doc": "Search by author", }, 4: { - 'f': do_search_owner, - 'doc': 'Search by owner', + "f": do_search_owner, + "doc": "Search by owner", }, 9: { - 'f': do_done, - 'doc': 'Done', + "f": do_done, + "doc": "Done", }, } diff --git a/src/worblehat/deadline_daemon/__init__.py b/src/worblehat/deadline_daemon/__init__.py index b648064..1a55859 100644 --- a/src/worblehat/deadline_daemon/__init__.py +++ b/src/worblehat/deadline_daemon/__init__.py @@ -1 +1,3 @@ -from .main import DeadlineDaemon \ No newline at end of file +from .main import DeadlineDaemon + +__all__ = ["DeadlineDaemon"] diff --git a/src/worblehat/deadline_daemon/main.py b/src/worblehat/deadline_daemon/main.py index 7b03bcb..8b00ad3 100644 --- a/src/worblehat/deadline_daemon/main.py +++ b/src/worblehat/deadline_daemon/main.py @@ -14,9 +14,10 @@ from worblehat.models import ( from worblehat.services.email import send_email + class DeadlineDaemon: def __init__(self, sql_session: Session): - if not Config['deadline_daemon.enabled']: + if not Config["deadline_daemon.enabled"]: return self.sql_session = sql_session @@ -26,7 +27,7 @@ class DeadlineDaemon: ).one_or_none() if self.last_run is None: - logging.info('No previous run found, assuming this is the first run') + logging.info("No previous run found, assuming this is the first run") self.last_run = DeadlineDaemonLastRunDatetime(time=datetime.now()) self.sql_session.add(self.last_run) self.sql_session.commit() @@ -34,15 +35,14 @@ class DeadlineDaemon: self.last_run_datetime = self.last_run.time self.current_run_datetime = datetime.now() - def run(self): - logging.info('Deadline daemon started') - if not Config['deadline_daemon.enabled']: - logging.warn('Deadline daemon disabled, exiting') + logging.info("Deadline daemon started") + if not Config["deadline_daemon.enabled"]: + logging.warn("Deadline daemon disabled, exiting") return - if Config['deadline_daemon.dryrun']: - logging.warn('Running in dryrun mode') + if Config["deadline_daemon.dryrun"]: + logging.warn("Running in dryrun mode") self.send_close_deadline_reminder_mails() self.send_overdue_mails() @@ -58,78 +58,91 @@ class DeadlineDaemon: ################### def _send_close_deadline_mail(self, borrowing: BookcaseItemBorrowing): - logging.info(f'Sending close deadline mail to {borrowing.username}@pvv.ntnu.no.') + logging.info( + f"Sending close deadline mail to {borrowing.username}@pvv.ntnu.no." + ) send_email( - f'{borrowing.username}@pvv.ntnu.no', - 'Reminder - Your borrowing deadline is approaching', - dedent(f''' + f"{borrowing.username}@pvv.ntnu.no", + "Reminder - Your borrowing deadline is approaching", + dedent( + f""" Your borrowing deadline for the following item is approaching: {borrowing.item.name} Please return the item by {borrowing.end_time.strftime("%a %b %d, %Y")} - ''', + """, ).strip(), ) - def _send_overdue_mail(self, borrowing: BookcaseItemBorrowing): - logging.info(f'Sending overdue mail to {borrowing.username}@pvv.ntnu.no for {borrowing.item.isbn} - {borrowing.end_time.strftime("%a %b %d, %Y")}') + logging.info( + f"Sending overdue mail to {borrowing.username}@pvv.ntnu.no for {borrowing.item.isbn} - {borrowing.end_time.strftime('%a %b %d, %Y')}" + ) send_email( - f'{borrowing.username}@pvv.ntnu.no', - 'Your deadline has passed', - dedent(f''' + f"{borrowing.username}@pvv.ntnu.no", + "Your deadline has passed", + dedent( + f""" Your delivery deadline for the following item has passed: {borrowing.item.name} Please return the item as soon as possible. - ''', + """, ).strip(), ) - def _send_newly_available_mail(self, queue_item: BookcaseItemBorrowingQueue): - logging.info(f'Sending newly available mail to {queue_item.username}') + logging.info(f"Sending newly available mail to {queue_item.username}") - days_before_queue_expires = Config['deadline_daemon.days_before_queue_position_expires'] + days_before_queue_expires = Config[ + "deadline_daemon.days_before_queue_position_expires" + ] # TODO: calculate and format the date of when the queue position expires in the mail. send_email( - f'{queue_item.username}@pvv.ntnu.no', - 'An item you have queued for is now available', - dedent(f''' + f"{queue_item.username}@pvv.ntnu.no", + "An item you have queued for is now available", + dedent( + f""" The following item is now available for you to borrow: {queue_item.item.name} Please pick up the item within {days_before_queue_expires} days. - ''', + """, ).strip(), ) - - def _send_expiring_queue_position_mail(self, queue_position: BookcaseItemBorrowingQueue, day: int): - logging.info(f'Sending queue position expiry reminder to {queue_position.username}@pvv.ntnu.no.') + def _send_expiring_queue_position_mail( + self, queue_position: BookcaseItemBorrowingQueue, day: int + ): + logging.info( + f"Sending queue position expiry reminder to {queue_position.username}@pvv.ntnu.no." + ) send_email( - f'{queue_position.username}@pvv.ntnu.no', - 'Reminder - Your queue position expiry deadline is approaching', - dedent(f''' + f"{queue_position.username}@pvv.ntnu.no", + "Reminder - Your queue position expiry deadline is approaching", + dedent( + f""" Your queue position expiry deadline for the following item is approaching: {queue_position.item.name} Please borrow the item by {(queue_position.item_became_available_time + timedelta(days=day)).strftime("%a %b %d, %Y")} - ''', + """, ).strip(), ) - - - def _send_queue_position_expired_mail(self, queue_position: BookcaseItemBorrowingQueue): + + def _send_queue_position_expired_mail( + self, queue_position: BookcaseItemBorrowingQueue + ): send_email( - f'{queue_position.username}@pvv.ntnu.no', - 'Your queue position has expired', - dedent(f''' + f"{queue_position.username}@pvv.ntnu.no", + "Your queue position has expired", + dedent( + f""" Your queue position for the following item has expired: {queue_position.item.name} @@ -137,7 +150,7 @@ class DeadlineDaemon: You can queue for the item again at any time, but you will be placed at the back of the queue. There are currently {len(queue_position.item.borrowing_queue)} users in the queue. - ''', + """, ).strip(), ) @@ -146,30 +159,32 @@ class DeadlineDaemon: ################## def _sql_subtract_date(self, x: datetime, y: timedelta): - if self.sql_session.bind.dialect.name == 'sqlite': + if self.sql_session.bind.dialect.name == "sqlite": # SQLite does not support timedelta in queries - return func.datetime(x, f'-{y.days} days') - elif self.sql_session.bind.dialect.name == 'postgresql': + return func.datetime(x, f"-{y.days} days") + elif self.sql_session.bind.dialect.name == "postgresql": return x - y else: - raise NotImplementedError(f'Unsupported dialect: {self.sql_session.bind.dialect.name}') - + raise NotImplementedError( + f"Unsupported dialect: {self.sql_session.bind.dialect.name}" + ) def send_close_deadline_reminder_mails(self): - logging.info('Sending mails for items with a closing deadline') + logging.info("Sending mails for items with a closing deadline") # TODO: This should be int-parsed and validated before the daemon started - days = [int(d) for d in Config['deadline_daemon.warn_days_before_borrowing_deadline']] + days = [ + int(d) + for d in Config["deadline_daemon.warn_days_before_borrowing_deadline"] + ] for day in days: borrowings_to_remind = self.sql_session.scalars( - select(BookcaseItemBorrowing) - .where( + select(BookcaseItemBorrowing).where( self._sql_subtract_date( BookcaseItemBorrowing.end_time, timedelta(days=day), - ) - .between( + ).between( self.last_run_datetime, self.current_run_datetime, ), @@ -179,13 +194,11 @@ class DeadlineDaemon: for borrowing in borrowings_to_remind: self._send_close_deadline_mail(borrowing) - def send_overdue_mails(self): - logging.info('Sending mails for overdue items') + logging.info("Sending mails for overdue items") to_remind = self.sql_session.scalars( - select(BookcaseItemBorrowing) - .where( + select(BookcaseItemBorrowing).where( BookcaseItemBorrowing.end_time < self.current_run_datetime, BookcaseItemBorrowing.delivered.is_(None), ) @@ -194,15 +207,15 @@ class DeadlineDaemon: for borrowing in to_remind: self._send_overdue_mail(borrowing) - def send_newly_available_mails(self): - logging.info('Sending mails about newly available items') + logging.info("Sending mails about newly available items") newly_available = self.sql_session.scalars( select(BookcaseItemBorrowingQueue) .join( BookcaseItemBorrowing, - BookcaseItemBorrowing.fk_bookcase_item_uid == BookcaseItemBorrowingQueue.fk_bookcase_item_uid, + BookcaseItemBorrowing.fk_bookcase_item_uid + == BookcaseItemBorrowingQueue.fk_bookcase_item_uid, ) .where( BookcaseItemBorrowingQueue.expired.is_(False), @@ -217,31 +230,38 @@ class DeadlineDaemon: ).all() for queue_item in newly_available: - logging.info(f'Adding user {queue_item.username} to queue for {queue_item.item.name}') + logging.info( + f"Adding user {queue_item.username} to queue for {queue_item.item.name}" + ) queue_item.item_became_available_time = self.current_run_datetime self.sql_session.commit() self._send_newly_available_mail(queue_item) - def send_expiring_queue_position_mails(self): - logging.info('Sending mails about queue positions which are expiring soon') - logging.warning('Not implemented') + logging.info("Sending mails about queue positions which are expiring soon") + logging.warning("Not implemented") - days = [int(d) for d in Config['deadline_daemon.warn_days_before_expiring_queue_position_deadline']] + days = [ + int(d) + for d in Config[ + "deadline_daemon.warn_days_before_expiring_queue_position_deadline" + ] + ] for day in days: queue_positions_to_remind = self.sql_session.scalars( select(BookcaseItemBorrowingQueue) .join( BookcaseItemBorrowing, - BookcaseItemBorrowing.fk_bookcase_item_uid == BookcaseItemBorrowingQueue.fk_bookcase_item_uid, + BookcaseItemBorrowing.fk_bookcase_item_uid + == BookcaseItemBorrowingQueue.fk_bookcase_item_uid, ) .where( self._sql_subtract_date( - BookcaseItemBorrowingQueue.item_became_available_time + timedelta(days=day), + BookcaseItemBorrowingQueue.item_became_available_time + + timedelta(days=day), timedelta(days=day), - ) - .between( + ).between( self.last_run_datetime, self.current_run_datetime, ), @@ -251,29 +271,34 @@ class DeadlineDaemon: for queue_position in queue_positions_to_remind: self._send_expiring_queue_position_mail(queue_position, day) - def auto_expire_queue_positions(self): - logging.info('Expiring queue positions which are too old') + logging.info("Expiring queue positions which are too old") - queue_position_expiry_days = int(Config['deadline_daemon.days_before_queue_position_expires']) + queue_position_expiry_days = int( + Config["deadline_daemon.days_before_queue_position_expires"] + ) overdue_queue_positions = self.sql_session.scalars( - select(BookcaseItemBorrowingQueue) - .where( - BookcaseItemBorrowingQueue.item_became_available_time + timedelta(days=queue_position_expiry_days) < self.current_run_datetime, + select(BookcaseItemBorrowingQueue).where( + BookcaseItemBorrowingQueue.item_became_available_time + + timedelta(days=queue_position_expiry_days) + < self.current_run_datetime, BookcaseItemBorrowingQueue.expired.is_(False), ), ).all() for queue_position in overdue_queue_positions: - logging.info(f'Expiring queue position for {queue_position.username} for item {queue_position.item.name}') + logging.info( + f"Expiring queue position for {queue_position.username} for item {queue_position.item.name}" + ) queue_position.expired = True next_queue_position = self.sql_session.scalars( select(BookcaseItemBorrowingQueue) .where( - BookcaseItemBorrowingQueue.fk_bookcase_item_uid == queue_position.fk_bookcase_item_uid, + BookcaseItemBorrowingQueue.fk_bookcase_item_uid + == queue_position.fk_bookcase_item_uid, BookcaseItemBorrowingQueue.item_became_available_time.is_(None), ) .order_by(BookcaseItemBorrowingQueue.entered_queue_time) @@ -283,9 +308,13 @@ class DeadlineDaemon: self._send_queue_position_expired_mail(queue_position) if next_queue_position is not None: - next_queue_position.item_became_available_time = self.current_run_datetime + next_queue_position.item_became_available_time = ( + self.current_run_datetime + ) - logging.info(f'Next user in queue for item {next_queue_position.item.name} is {next_queue_position.username}') + logging.info( + f"Next user in queue for item {next_queue_position.item.name} is {next_queue_position.username}" + ) self._send_newly_available_mail(next_queue_position) - self.sql_session.commit() \ No newline at end of file + self.sql_session.commit() diff --git a/src/worblehat/devscripts/__init__.py b/src/worblehat/devscripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/worblehat/devscripts/seed_content_for_deadline_daemon.py b/src/worblehat/devscripts/seed_content_for_deadline_daemon.py index 54ba746..5384fed 100644 --- a/src/worblehat/devscripts/seed_content_for_deadline_daemon.py +++ b/src/worblehat/devscripts/seed_content_for_deadline_daemon.py @@ -1,10 +1,10 @@ from datetime import datetime, timedelta from worblehat.models import ( - BookcaseItem, - BookcaseItemBorrowing, - BookcaseItemBorrowingQueue, - DeadlineDaemonLastRunDatetime, + BookcaseItem, + BookcaseItemBorrowing, + BookcaseItemBorrowingQueue, + DeadlineDaemonLastRunDatetime, ) from worblehat.services.config import Config @@ -13,105 +13,115 @@ from .seed_test_data import main as seed_test_data_main def clear_db(sql_session): - sql_session.query(BookcaseItemBorrowingQueue).delete() - sql_session.query(BookcaseItemBorrowing).delete() - sql_session.query(DeadlineDaemonLastRunDatetime).delete() - sql_session.commit() + sql_session.query(BookcaseItemBorrowingQueue).delete() + sql_session.query(BookcaseItemBorrowing).delete() + sql_session.query(DeadlineDaemonLastRunDatetime).delete() + sql_session.commit() + # NOTE: feel free to change this function to suit your needs # it's just a quick and dirty way to get some data into the database # for testing the deadline daemon - oysteikt 2024 def main(sql_session): - borrow_warning_days = [timedelta(days=int(d)) for d in Config['deadline_daemon.warn_days_before_borrowing_deadline']] - queue_warning_days = [timedelta(days=int(d)) for d in Config['deadline_daemon.warn_days_before_expiring_queue_position_deadline']] - queue_expire_days = int(Config['deadline_daemon.days_before_queue_position_expires']) - - clear_db(sql_session) - seed_test_data_main(sql_session) - - books = sql_session.query(BookcaseItem).all() - - last_run_datetime = datetime.now() - timedelta(days=16) - last_run = DeadlineDaemonLastRunDatetime(last_run_datetime) - sql_session.add(last_run) - - # Create at least one item that is borrowed and not supposed to be returned yet - borrowing = BookcaseItemBorrowing( - item=books[0], - username='test_borrower_still_borrowing', - ) - borrowing.start_time = last_run_datetime - timedelta(days=1) - borrowing.end_time = datetime.now() - timedelta(days=6) - sql_session.add(borrowing) - - # Create at least one item that is borrowed and is supposed to be returned soon - borrowing = BookcaseItemBorrowing( - item=books[1], - username='test_borrower_return_soon', - ) - borrowing.start_time = last_run_datetime - timedelta(days=1) - borrowing.end_time = datetime.now() - timedelta(days=2) - sql_session.add(borrowing) - - # Create at least one item that is borrowed and is overdue - borrowing = BookcaseItemBorrowing( - item=books[2], - username='test_borrower_overdue', - ) - borrowing.start_time = datetime.now() - timedelta(days=1) - borrowing.end_time = datetime.now() + timedelta(days=1) - sql_session.add(borrowing) - - # Create at least one item that is in the queue and is not supposed to be borrowed yet - queue_item = BookcaseItemBorrowingQueue( - item=books[3], - username='test_queue_user_still_waiting', - ) - queue_item.entered_queue_time = last_run_datetime - timedelta(days=1) - borrowing = BookcaseItemBorrowing( - item=books[3], - username='test_borrower_return_soon', - ) - borrowing.start_time = last_run_datetime - timedelta(days=1) - borrowing.end_time = datetime.now() - timedelta(days=2) - sql_session.add(queue_item) - sql_session.add(borrowing) - - # Create at least three items that is in the queue and two items were just returned - for i in range(3): - queue_item = BookcaseItemBorrowingQueue( - item=books[4 + i], - username=f'test_queue_user_{i}', + borrow_warning_days = [ + timedelta(days=int(d)) + for d in Config["deadline_daemon.warn_days_before_borrowing_deadline"] + ] + queue_warning_days = [ + timedelta(days=int(d)) + for d in Config[ + "deadline_daemon.warn_days_before_expiring_queue_position_deadline" + ] + ] + queue_expire_days = int( + Config["deadline_daemon.days_before_queue_position_expires"] ) - sql_session.add(queue_item) - for i in range(3): + clear_db(sql_session) + seed_test_data_main(sql_session) + + books = sql_session.query(BookcaseItem).all() + + last_run_datetime = datetime.now() - timedelta(days=16) + last_run = DeadlineDaemonLastRunDatetime(last_run_datetime) + sql_session.add(last_run) + + # Create at least one item that is borrowed and not supposed to be returned yet borrowing = BookcaseItemBorrowing( - item=books[4 + i], - username=f'test_borrower_returned_{i}', + item=books[0], + username="test_borrower_still_borrowing", ) - borrowing.start_time = last_run_datetime - timedelta(days=2) - borrowing.end_time = datetime.now() + timedelta(days=1) - - if i != 2: - borrowing.delivered = datetime.now() - timedelta(days=1) - + borrowing.start_time = last_run_datetime - timedelta(days=1) + borrowing.end_time = datetime.now() - timedelta(days=6) sql_session.add(borrowing) - # Create at least one item that has been in the queue for so long that the queue position should expire - queue_item = BookcaseItemBorrowingQueue( - item=books[7], - username='test_queue_user_expired', - ) - queue_item.entered_queue_time = datetime.now() - timedelta(days=15) + # Create at least one item that is borrowed and is supposed to be returned soon + borrowing = BookcaseItemBorrowing( + item=books[1], + username="test_borrower_return_soon", + ) + borrowing.start_time = last_run_datetime - timedelta(days=1) + borrowing.end_time = datetime.now() - timedelta(days=2) + sql_session.add(borrowing) - # Create at least one item that has been in the queue for so long that the queue position should expire, - # but the queue person has already been notified - queue_item = BookcaseItemBorrowingQueue( - item=books[8], - username='test_queue_user_expired_notified', - ) - queue_item.entered_queue_time = datetime.now() - timedelta(days=15) + # Create at least one item that is borrowed and is overdue + borrowing = BookcaseItemBorrowing( + item=books[2], + username="test_borrower_overdue", + ) + borrowing.start_time = datetime.now() - timedelta(days=1) + borrowing.end_time = datetime.now() + timedelta(days=1) + sql_session.add(borrowing) - sql_session.commit() + # Create at least one item that is in the queue and is not supposed to be borrowed yet + queue_item = BookcaseItemBorrowingQueue( + item=books[3], + username="test_queue_user_still_waiting", + ) + queue_item.entered_queue_time = last_run_datetime - timedelta(days=1) + borrowing = BookcaseItemBorrowing( + item=books[3], + username="test_borrower_return_soon", + ) + borrowing.start_time = last_run_datetime - timedelta(days=1) + borrowing.end_time = datetime.now() - timedelta(days=2) + sql_session.add(queue_item) + sql_session.add(borrowing) + # Create at least three items that is in the queue and two items were just returned + for i in range(3): + queue_item = BookcaseItemBorrowingQueue( + item=books[4 + i], + username=f"test_queue_user_{i}", + ) + sql_session.add(queue_item) + + for i in range(3): + borrowing = BookcaseItemBorrowing( + item=books[4 + i], + username=f"test_borrower_returned_{i}", + ) + borrowing.start_time = last_run_datetime - timedelta(days=2) + borrowing.end_time = datetime.now() + timedelta(days=1) + + if i != 2: + borrowing.delivered = datetime.now() - timedelta(days=1) + + sql_session.add(borrowing) + + # Create at least one item that has been in the queue for so long that the queue position should expire + queue_item = BookcaseItemBorrowingQueue( + item=books[7], + username="test_queue_user_expired", + ) + queue_item.entered_queue_time = datetime.now() - timedelta(days=15) + + # Create at least one item that has been in the queue for so long that the queue position should expire, + # but the queue person has already been notified + queue_item = BookcaseItemBorrowingQueue( + item=books[8], + username="test_queue_user_expired_notified", + ) + queue_item.entered_queue_time = datetime.now() - timedelta(days=15) + + sql_session.commit() diff --git a/src/worblehat/devscripts/seed_test_data.py b/src/worblehat/devscripts/seed_test_data.py index 02cda53..6189a7c 100644 --- a/src/worblehat/devscripts/seed_test_data.py +++ b/src/worblehat/devscripts/seed_test_data.py @@ -1,19 +1,18 @@ import csv from pathlib import Path -from datetime import datetime, timedelta from worblehat.models import ( - Bookcase, - BookcaseItem, - BookcaseShelf, - MediaType, - Language, + Bookcase, + BookcaseItem, + BookcaseShelf, + MediaType, + Language, ) -from worblehat.services.config import Config -CSV_FILE = Path(__file__).parent.parent.parent / 'data' / 'arbeidsrom_smal_hylle_5.csv' +CSV_FILE = Path(__file__).parent.parent.parent / "data" / "arbeidsrom_smal_hylle_5.csv" + def clear_db(sql_session): sql_session.query(BookcaseItem).delete() @@ -23,45 +22,46 @@ def clear_db(sql_session): sql_session.query(Language).delete() sql_session.commit() + def main(sql_session): clear_db(sql_session) media_type = MediaType( - name='Book', - description='A book', + name="Book", + description="A book", ) sql_session.add(media_type) language = Language( - name='Norwegian', - iso639_1_code='no', + name="Norwegian", + iso639_1_code="no", ) sql_session.add(language) seed_case = Bookcase( - name='seed_case', - description='test bookcase with test data', + name="seed_case", + description="test bookcase with test data", ) sql_session.add(seed_case) seed_shelf_1 = BookcaseShelf( - row=1, - column=1, - bookcase=seed_case, - description='test shelf with test data 1', + row=1, + column=1, + bookcase=seed_case, + description="test shelf with test data 1", ) seed_shelf_2 = BookcaseShelf( - row=2, - column=1, - bookcase=seed_case, - description='test shelf with test data 2', + row=2, + column=1, + bookcase=seed_case, + description="test shelf with test data 2", ) sql_session.add(seed_shelf_1) sql_session.add(seed_shelf_2) bookcase_items = [] with open(CSV_FILE) as csv_file: - csv_reader = csv.reader(csv_file, delimiter=',') + csv_reader = csv.reader(csv_file, delimiter=",") next(csv_reader) for row in csv_reader: diff --git a/src/worblehat/flaskapp/api/__init__.py b/src/worblehat/flaskapp/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/worblehat/flaskapp/blueprints/__init__.py b/src/worblehat/flaskapp/blueprints/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/worblehat/flaskapp/blueprints/main.py b/src/worblehat/flaskapp/blueprints/main.py index 800d369..65ccb05 100644 --- a/src/worblehat/flaskapp/blueprints/main.py +++ b/src/worblehat/flaskapp/blueprints/main.py @@ -2,10 +2,12 @@ from flask import Blueprint, render_template main = Blueprint("main", __name__, template_folder="main") -@main.route('/') + +@main.route("/") def index(): return render_template("main/index.html") + @main.route("/login") def login(): - return render_template("main/login.html") \ No newline at end of file + return render_template("main/login.html") diff --git a/src/worblehat/flaskapp/database.py b/src/worblehat/flaskapp/database.py index 2e1eeb6..f0b13d6 100644 --- a/src/worblehat/flaskapp/database.py +++ b/src/worblehat/flaskapp/database.py @@ -1,3 +1,3 @@ from flask_sqlalchemy import SQLAlchemy -db = SQLAlchemy() \ No newline at end of file +db = SQLAlchemy() diff --git a/src/worblehat/flaskapp/flaskapp.py b/src/worblehat/flaskapp/flaskapp.py index fd32147..adda8ef 100644 --- a/src/worblehat/flaskapp/flaskapp.py +++ b/src/worblehat/flaskapp/flaskapp.py @@ -10,18 +10,19 @@ from worblehat.services.config import Config from .blueprints.main import main from .database import db + def create_app(args: dict[str, any] | None = None): app = Flask(__name__) - app.config.update(Config['flask']) + app.config.update(Config["flask"]) app.config.update(Config._config) - app.config['SQLALCHEMY_DATABASE_URI'] = Config.db_string() - app.config['SQLALCHEMY_ECHO'] = Config['logging.debug_sql'] + app.config["SQLALCHEMY_DATABASE_URI"] = Config.db_string() + app.config["SQLALCHEMY_ECHO"] = Config["logging.debug_sql"] db.init_app(app) with app.app_context(): - if not inspect(db.engine).has_table('Bookcase'): + if not inspect(db.engine).has_table("Bookcase"): Base.metadata.create_all(db.engine) seed_data() @@ -31,12 +32,13 @@ def create_app(args: dict[str, any] | None = None): return app + def configure_admin(app): - admin = Admin(app, name='Worblehat', template_mode='bootstrap3') + admin = Admin(app, name="Worblehat", template_mode="bootstrap3") admin.add_view(ModelView(Author, db.session)) admin.add_view(ModelView(Bookcase, db.session)) admin.add_view(ModelView(BookcaseItem, db.session)) admin.add_view(ModelView(BookcaseShelf, db.session)) admin.add_view(ModelView(Category, db.session)) admin.add_view(ModelView(Language, db.session)) - admin.add_view(ModelView(MediaType, db.session)) \ No newline at end of file + admin.add_view(ModelView(MediaType, db.session)) diff --git a/src/worblehat/flaskapp/wsgi_dev.py b/src/worblehat/flaskapp/wsgi_dev.py index 4dbab90..6d37861 100644 --- a/src/worblehat/flaskapp/wsgi_dev.py +++ b/src/worblehat/flaskapp/wsgi_dev.py @@ -1,18 +1,19 @@ from werkzeug import run_simple -from worblehat.services.config import Config from .flaskapp import create_app + def main(): app = create_app() run_simple( - hostname = 'localhost', - port = 5000, - application = app, - use_debugger = True, - use_reloader = True, + hostname="localhost", + port=5000, + application=app, + use_debugger=True, + use_reloader=True, ) -if __name__ == '__main__': - main() \ No newline at end of file + +if __name__ == "__main__": + main() diff --git a/src/worblehat/flaskapp/wsgi_prod.py b/src/worblehat/flaskapp/wsgi_prod.py index 7f64acc..4c1c0db 100644 --- a/src/worblehat/flaskapp/wsgi_prod.py +++ b/src/worblehat/flaskapp/wsgi_prod.py @@ -1,8 +1,10 @@ from .flaskapp import create_app + def main(): app = create_app() app.run() -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/src/worblehat/main.py b/src/worblehat/main.py index 9f261e9..8798dc6 100644 --- a/src/worblehat/main.py +++ b/src/worblehat/main.py @@ -18,7 +18,8 @@ from .flaskapp.wsgi_prod import main as flask_prod_main def _print_version() -> None: from worblehat import __version__ - print(f'Worblehat version {__version__}') + + print(f"Worblehat version {__version__}") def _connect_to_database(**engine_args) -> Session: @@ -26,7 +27,7 @@ def _connect_to_database(**engine_args) -> Session: engine = create_engine(Config.db_string(), **engine_args) sql_session = Session(engine) except Exception as err: - print('Error: could not connect to database.') + print("Error: could not connect to database.") print(err) exit(1) @@ -38,51 +39,55 @@ def main(): args = arg_parser.parse_args() Config.load_configuration(vars(args)) - if Config['logging.debug']: - logging.basicConfig(encoding='utf-8', level=logging.DEBUG) + if Config["logging.debug"]: + logging.basicConfig(encoding="utf-8", level=logging.DEBUG) else: - logging.basicConfig(encoding='utf-8', level=logging.INFO) + logging.basicConfig(encoding="utf-8", level=logging.INFO) if args.version: _print_version() exit(0) if args.print_config: - print(f'Configuration:\n{pformat(vars(args))}') + print(f"Configuration:\n{pformat(vars(args))}") exit(0) - if args.command == 'deadline-daemon': - sql_session = _connect_to_database(echo=Config['logging.debug_sql']) + if args.command == "deadline-daemon": + sql_session = _connect_to_database(echo=Config["logging.debug_sql"]) DeadlineDaemon(sql_session).run() exit(0) - if args.command == 'cli': - sql_session = _connect_to_database(echo=Config['logging.debug_sql']) + if args.command == "cli": + sql_session = _connect_to_database(echo=Config["logging.debug_sql"]) WorblehatCli.run_with_safe_exit_wrapper(sql_session) exit(0) - if args.command == 'devscripts': - sql_session = _connect_to_database(echo=Config['logging.debug_sql']) - if args.script == 'seed-content-for-deadline-daemon': + if args.command == "devscripts": + sql_session = _connect_to_database(echo=Config["logging.debug_sql"]) + if args.script == "seed-content-for-deadline-daemon": from .devscripts.seed_content_for_deadline_daemon import main + main(sql_session) - elif args.script == 'seed-test-data': + elif args.script == "seed-test-data": from .devscripts.seed_test_data import main + main(sql_session) else: print(devscripts_arg_parser.format_help()) exit(1) exit(0) - if args.command == 'flask-dev': + if args.command == "flask-dev": flask_dev_main() exit(0) - if args.command == 'flask-prod': - if Config['logging.debug'] or Config['logging.debug_sql']: - logging.warn('Debug mode is enabled for the production server. This is not recommended.') + if args.command == "flask-prod": + if Config["logging.debug"] or Config["logging.debug_sql"]: + logging.warn( + "Debug mode is enabled for the production server. This is not recommended." + ) flask_prod_main() exit(0) print(arg_parser.format_help()) - exit(1) \ No newline at end of file + exit(1) diff --git a/src/worblehat/models/Author.py b/src/worblehat/models/Author.py index d2b0608..a286bff 100644 --- a/src/worblehat/models/Author.py +++ b/src/worblehat/models/Author.py @@ -1,13 +1,8 @@ from __future__ import annotations from typing import TYPE_CHECKING -from sqlalchemy import ( - Integer, - ForeignKey, -) from sqlalchemy.orm import ( Mapped, - mapped_column, relationship, ) @@ -21,14 +16,15 @@ from .xref_tables import Item_Author if TYPE_CHECKING: from .BookcaseItem import BookcaseItem + class Author(Base, UidMixin, UniqueNameMixin): items: Mapped[set[BookcaseItem]] = relationship( - secondary = Item_Author.__table__, - back_populates = 'authors', + secondary=Item_Author.__table__, + back_populates="authors", ) def __init__( self, name: str, ): - self.name = name \ No newline at end of file + self.name = name diff --git a/src/worblehat/models/Base.py b/src/worblehat/models/Base.py index e879c23..f0764fe 100644 --- a/src/worblehat/models/Base.py +++ b/src/worblehat/models/Base.py @@ -9,6 +9,7 @@ from sqlalchemy.orm.collections import ( InstrumentedSet, ) + class Base(DeclarativeBase): metadata = MetaData( naming_convention={ @@ -16,7 +17,7 @@ class Base(DeclarativeBase): "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_`%(constraint_name)s`", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", - "pk": "pk_%(table_name)s" + "pk": "pk_%(table_name)s", } ) @@ -26,15 +27,18 @@ class Base(DeclarativeBase): def __repr__(self) -> str: columns = ", ".join( - f"{k}={repr(v)}" for k, v in self.__dict__.items() if not any([ - k.startswith("_"), - - # Ensure that we don't try to print out the entire list of - # relationships, which could create an infinite loop - isinstance(v, Base), - isinstance(v, InstrumentedList), - isinstance(v, InstrumentedSet), - isinstance(v, InstrumentedDict), - ]) + f"{k}={repr(v)}" + for k, v in self.__dict__.items() + if not any( + [ + k.startswith("_"), + # Ensure that we don't try to print out the entire list of + # relationships, which could create an infinite loop + isinstance(v, Base), + isinstance(v, InstrumentedList), + isinstance(v, InstrumentedSet), + isinstance(v, InstrumentedDict), + ] + ) ) - return f"<{self.__class__.__name__}({columns})>" \ No newline at end of file + return f"<{self.__class__.__name__}({columns})>" diff --git a/src/worblehat/models/Bookcase.py b/src/worblehat/models/Bookcase.py index 51f3828..266de4e 100644 --- a/src/worblehat/models/Bookcase.py +++ b/src/worblehat/models/Bookcase.py @@ -13,13 +13,15 @@ from .mixins import ( UidMixin, UniqueNameMixin, ) + if TYPE_CHECKING: from .BookcaseShelf import BookcaseShelf + class Bookcase(Base, UidMixin, UniqueNameMixin): description: Mapped[str | None] = mapped_column(Text) - shelfs: Mapped[list[BookcaseShelf]] = relationship(back_populates='bookcase') + shelfs: Mapped[list[BookcaseShelf]] = relationship(back_populates="bookcase") def __init__( self, @@ -32,6 +34,5 @@ class Bookcase(Base, UidMixin, UniqueNameMixin): def short_str(self) -> str: result = self.name if self.description is not None: - result += f' [{self.description}]' + result += f" [{self.description}]" return result - diff --git a/src/worblehat/models/BookcaseItem.py b/src/worblehat/models/BookcaseItem.py index e907f1f..d40eff6 100644 --- a/src/worblehat/models/BookcaseItem.py +++ b/src/worblehat/models/BookcaseItem.py @@ -2,11 +2,10 @@ from __future__ import annotations from typing import TYPE_CHECKING from sqlalchemy import ( - ForeignKey, - Integer, - SmallInteger, - String, - Text, + ForeignKey, + SmallInteger, + String, + Text, ) from sqlalchemy.orm import ( Mapped, @@ -17,12 +16,12 @@ from sqlalchemy.orm import ( from .Base import Base from .mixins import ( UidMixin, - UniqueNameMixin, ) from .xref_tables import ( Item_Category, Item_Author, ) + if TYPE_CHECKING: from .Author import Author from .BookcaseItemBorrowing import BookcaseItemBorrowing @@ -34,36 +33,39 @@ if TYPE_CHECKING: from worblehat.flaskapp.database import db + class BookcaseItem(Base, UidMixin): isbn: Mapped[int] = mapped_column(String, unique=True, index=True) name: Mapped[str] = mapped_column(Text, index=True) - owner: Mapped[str] = mapped_column(String, default='PVV') + owner: Mapped[str] = mapped_column(String, default="PVV") amount: Mapped[int] = mapped_column(SmallInteger, default=1) - fk_media_type_uid: Mapped[int] = mapped_column(ForeignKey('MediaType.uid')) - fk_bookcase_shelf_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseShelf.uid')) - fk_language_uid: Mapped[int | None] = mapped_column(ForeignKey('Language.uid')) + fk_media_type_uid: Mapped[int] = mapped_column(ForeignKey("MediaType.uid")) + fk_bookcase_shelf_uid: Mapped[int] = mapped_column(ForeignKey("BookcaseShelf.uid")) + fk_language_uid: Mapped[int | None] = mapped_column(ForeignKey("Language.uid")) - media_type: Mapped[MediaType] = relationship(back_populates='items') - shelf: Mapped[BookcaseShelf] = relationship(back_populates='items') + media_type: Mapped[MediaType] = relationship(back_populates="items") + shelf: Mapped[BookcaseShelf] = relationship(back_populates="items") language: Mapped[Language] = relationship() - borrowings: Mapped[set[BookcaseItemBorrowing]] = relationship(back_populates='item') - borrowing_queue: Mapped[set[BookcaseItemBorrowingQueue]] = relationship(back_populates='item') + borrowings: Mapped[set[BookcaseItemBorrowing]] = relationship(back_populates="item") + borrowing_queue: Mapped[set[BookcaseItemBorrowingQueue]] = relationship( + back_populates="item" + ) categories: Mapped[set[Category]] = relationship( - secondary = Item_Category.__table__, - back_populates = 'items', + secondary=Item_Category.__table__, + back_populates="items", ) authors: Mapped[set[Author]] = relationship( - secondary = Item_Author.__table__, - back_populates = 'items', + secondary=Item_Author.__table__, + back_populates="items", ) def __init__( self, name: str, isbn: int | None = None, - owner: str = 'PVV', + owner: str = "PVV", ): self.name = name self.isbn = isbn @@ -76,4 +78,4 @@ class BookcaseItem(Base, UidMixin): This method defaults to using the flask_sqlalchemy session. It will not work outside of a request context, unless another session is provided. """ - return sql_session.query(cls).where(cls.isbn == isbn).one_or_none() \ No newline at end of file + return sql_session.query(cls).where(cls.isbn == isbn).one_or_none() diff --git a/src/worblehat/models/BookcaseItemBorrowing.py b/src/worblehat/models/BookcaseItemBorrowing.py index 9cf65e6..ac1f3a1 100644 --- a/src/worblehat/models/BookcaseItemBorrowing.py +++ b/src/worblehat/models/BookcaseItemBorrowing.py @@ -3,7 +3,6 @@ from typing import TYPE_CHECKING from datetime import datetime, timedelta from sqlalchemy import ( - Boolean, ForeignKey, String, DateTime, @@ -16,18 +15,24 @@ from sqlalchemy.orm import ( from .Base import Base from .mixins import UidMixin + if TYPE_CHECKING: from .BookcaseItem import BookcaseItem + class BookcaseItemBorrowing(Base, UidMixin): username: Mapped[str] = mapped_column(String) start_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now()) - end_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now() + timedelta(days=30)) + end_time: Mapped[datetime] = mapped_column( + DateTime, default=datetime.now() + timedelta(days=30) + ) delivered: Mapped[datetime | None] = mapped_column(DateTime, default=None) - fk_bookcase_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), index=True) + fk_bookcase_item_uid: Mapped[int] = mapped_column( + ForeignKey("BookcaseItem.uid"), index=True + ) - item: Mapped[BookcaseItem] = relationship(back_populates='borrowings') + item: Mapped[BookcaseItem] = relationship(back_populates="borrowings") def __init__( self, @@ -37,4 +42,4 @@ class BookcaseItemBorrowing(Base, UidMixin): self.username = username self.item = item self.start_time = datetime.now() - self.end_time = datetime.now() + timedelta(days=30) \ No newline at end of file + self.end_time = datetime.now() + timedelta(days=30) diff --git a/src/worblehat/models/BookcaseItemBorrowingQueue.py b/src/worblehat/models/BookcaseItemBorrowingQueue.py index 88bb739..357b788 100644 --- a/src/worblehat/models/BookcaseItemBorrowingQueue.py +++ b/src/worblehat/models/BookcaseItemBorrowingQueue.py @@ -16,18 +16,24 @@ from sqlalchemy.orm import ( from .Base import Base from .mixins import UidMixin + if TYPE_CHECKING: from .BookcaseItem import BookcaseItem + class BookcaseItemBorrowingQueue(Base, UidMixin): username: Mapped[str] = mapped_column(String) - entered_queue_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now()) + entered_queue_time: Mapped[datetime] = mapped_column( + DateTime, default=datetime.now() + ) item_became_available_time: Mapped[datetime | None] = mapped_column(DateTime) expired = mapped_column(Boolean, default=False) - fk_bookcase_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), index=True) + fk_bookcase_item_uid: Mapped[int] = mapped_column( + ForeignKey("BookcaseItem.uid"), index=True + ) - item: Mapped[BookcaseItem] = relationship(back_populates='borrowing_queue') + item: Mapped[BookcaseItem] = relationship(back_populates="borrowing_queue") def __init__( self, @@ -36,4 +42,4 @@ class BookcaseItemBorrowingQueue(Base, UidMixin): ): self.username = username self.item = item - self.entered_queue_time = datetime.now() \ No newline at end of file + self.entered_queue_time = datetime.now() diff --git a/src/worblehat/models/BookcaseShelf.py b/src/worblehat/models/BookcaseShelf.py index 09762ec..b96c874 100644 --- a/src/worblehat/models/BookcaseShelf.py +++ b/src/worblehat/models/BookcaseShelf.py @@ -2,7 +2,6 @@ from __future__ import annotations from typing import TYPE_CHECKING from sqlalchemy import ( - Integer, ForeignKey, SmallInteger, Text, @@ -16,6 +15,7 @@ from sqlalchemy.orm import ( from .Base import Base from .mixins import UidMixin + if TYPE_CHECKING: from .Bookcase import Bookcase from .BookcaseItem import BookcaseItem @@ -23,22 +23,23 @@ if TYPE_CHECKING: # NOTE: Booshelfs are 0 indexed for both rows and columns, # where cell 0-0 is placed in the lower right corner. + class BookcaseShelf(Base, UidMixin): __table_args__ = ( UniqueConstraint( - 'column', - 'fk_bookcase_uid', - 'row', + "column", + "fk_bookcase_uid", + "row", ), ) description: Mapped[str | None] = mapped_column(Text) row: Mapped[int] = mapped_column(SmallInteger) column: Mapped[int] = mapped_column(SmallInteger) - fk_bookcase_uid: Mapped[int] = mapped_column(ForeignKey('Bookcase.uid')) + fk_bookcase_uid: Mapped[int] = mapped_column(ForeignKey("Bookcase.uid")) - bookcase: Mapped[Bookcase] = relationship(back_populates='shelfs') - items: Mapped[set[BookcaseItem]] = relationship(back_populates='shelf') + bookcase: Mapped[Bookcase] = relationship(back_populates="shelfs") + items: Mapped[set[BookcaseItem]] = relationship(back_populates="shelf") def __init__( self, @@ -53,7 +54,7 @@ class BookcaseShelf(Base, UidMixin): self.description = description def short_str(self) -> str: - result = f'{self.column}-{self.row}' + result = f"{self.column}-{self.row}" if self.description is not None: - result += f' [{self.description}]' - return result \ No newline at end of file + result += f" [{self.description}]" + return result diff --git a/src/worblehat/models/Category.py b/src/worblehat/models/Category.py index 776028b..ee04cd3 100644 --- a/src/worblehat/models/Category.py +++ b/src/worblehat/models/Category.py @@ -14,15 +14,17 @@ from .mixins import ( UniqueNameMixin, ) from .xref_tables import Item_Category + if TYPE_CHECKING: from .BookcaseItem import BookcaseItem + class Category(Base, UidMixin, UniqueNameMixin): description: Mapped[str | None] = mapped_column(Text) items: Mapped[set[BookcaseItem]] = relationship( secondary=Item_Category.__table__, - back_populates='categories', + back_populates="categories", ) def __init__( @@ -31,4 +33,4 @@ class Category(Base, UidMixin, UniqueNameMixin): description: str | None = None, ): self.name = name - self.description = description \ No newline at end of file + self.description = description diff --git a/src/worblehat/models/DeadlineDaemonLastRunDatetime.py b/src/worblehat/models/DeadlineDaemonLastRunDatetime.py index 4122e74..0d96bd5 100644 --- a/src/worblehat/models/DeadlineDaemonLastRunDatetime.py +++ b/src/worblehat/models/DeadlineDaemonLastRunDatetime.py @@ -12,11 +12,12 @@ from sqlalchemy.orm import ( from .Base import Base + class DeadlineDaemonLastRunDatetime(Base): __table_args__ = ( CheckConstraint( - 'uid = true', - name = 'single_row_only', + "uid = true", + name="single_row_only", ), ) uid: Mapped[bool] = mapped_column(Boolean, primary_key=True, default=True) @@ -24,4 +25,4 @@ class DeadlineDaemonLastRunDatetime(Base): def __init__(self, time: datetime | None = None): if time is not None: - self.time = time \ No newline at end of file + self.time = time diff --git a/src/worblehat/models/Language.py b/src/worblehat/models/Language.py index c470bc2..7ca40d5 100644 --- a/src/worblehat/models/Language.py +++ b/src/worblehat/models/Language.py @@ -11,6 +11,7 @@ from sqlalchemy.orm import ( from .Base import Base from .mixins import UidMixin, UniqueNameMixin + class Language(Base, UidMixin, UniqueNameMixin): iso639_1_code: Mapped[str] = mapped_column(String(2), unique=True, index=True) diff --git a/src/worblehat/models/MediaType.py b/src/worblehat/models/MediaType.py index 94b1904..ff2fa42 100644 --- a/src/worblehat/models/MediaType.py +++ b/src/worblehat/models/MediaType.py @@ -10,13 +10,15 @@ from sqlalchemy.orm import ( from .Base import Base from .mixins import UidMixin, UniqueNameMixin + if TYPE_CHECKING: from .BookcaseItem import BookcaseItem + class MediaType(Base, UidMixin, UniqueNameMixin): description: Mapped[str | None] = mapped_column(Text) - items: Mapped[set[BookcaseItem]] = relationship(back_populates='media_type') + items: Mapped[set[BookcaseItem]] = relationship(back_populates="media_type") def __init__( self, @@ -25,5 +27,3 @@ class MediaType(Base, UidMixin, UniqueNameMixin): ): self.name = name self.description = description - - diff --git a/src/worblehat/models/__init__.py b/src/worblehat/models/__init__.py index 320e42b..d207888 100644 --- a/src/worblehat/models/__init__.py +++ b/src/worblehat/models/__init__.py @@ -8,4 +8,18 @@ from .BookcaseShelf import BookcaseShelf from .Category import Category from .DeadlineDaemonLastRunDatetime import DeadlineDaemonLastRunDatetime from .Language import Language -from .MediaType import MediaType \ No newline at end of file +from .MediaType import MediaType + +__all__ = [ + "Author", + "Base", + "Bookcase", + "BookcaseItem", + "BookcaseItemBorrowing", + "BookcaseItemBorrowingQueue", + "BookcaseShelf", + "Category", + "DeadlineDaemonLastRunDatetime", + "Language", + "MediaType", +] diff --git a/src/worblehat/models/migrations/env.py b/src/worblehat/models/migrations/env.py index 662a6b6..04d228d 100644 --- a/src/worblehat/models/migrations/env.py +++ b/src/worblehat/models/migrations/env.py @@ -1,5 +1,4 @@ from alembic import context -from flask import current_app from logging.config import fileConfig from sqlalchemy import engine_from_config from sqlalchemy import pool @@ -14,7 +13,8 @@ if config.config_file_name is not None: Config.load_configuration({}) -config.set_main_option('sqlalchemy.url', Config.db_string()) +config.set_main_option("sqlalchemy.url", Config.db_string()) + # This will make sure alembic doesn't generate empty migrations # https://stackoverflow.com/questions/70203927/how-to-prevent-alembic-revision-autogenerate-from-making-revision-file-if-it-h @@ -23,7 +23,8 @@ def _process_revision_directives(context, revision, directives): script = directives[0] if script.upgrade_ops.is_empty(): directives[:] = [] - print('No changes in schema detected. Not generating migration.') + print("No changes in schema detected. Not generating migration.") + def run_migrations_online() -> None: connectable = engine_from_config( @@ -36,11 +37,9 @@ def run_migrations_online() -> None: context.configure( connection=connection, target_metadata=Base.metadata, - # Extended type checking with alembic when generating migrations # https://alembic.sqlalchemy.org/en/latest/autogenerate.html#what-does-autogenerate-detect-and-what-does-it-not-detect compare_type=True, - # This is required for ALTER TABLE to work with sqlite. # It should have no effect on postgreSQL # https://alembic.sqlalchemy.org/en/latest/batch.html @@ -51,6 +50,7 @@ def run_migrations_online() -> None: with context.begin_transaction(): context.run_migrations() + # We don't have any good reasons to generate raw sql migrations, # so the `run_migrations_offline` has been removed run_migrations_online() diff --git a/src/worblehat/models/migrations/versions/2024-07-31T2107_7dfbf8a8dec8_initial_migration.py b/src/worblehat/models/migrations/versions/2024-07-31T2107_7dfbf8a8dec8_initial_migration.py index faedb69..3e1f8ee 100644 --- a/src/worblehat/models/migrations/versions/2024-07-31T2107_7dfbf8a8dec8_initial_migration.py +++ b/src/worblehat/models/migrations/versions/2024-07-31T2107_7dfbf8a8dec8_initial_migration.py @@ -1,16 +1,17 @@ """initial_migration Revision ID: 7dfbf8a8dec8 -Revises: +Revises: Create Date: 2024-07-31 21:07:13.434012 """ + from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '7dfbf8a8dec8' +revision = "7dfbf8a8dec8" down_revision = None branch_labels = None depends_on = None @@ -18,166 +19,243 @@ depends_on = None def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.create_table('Author', - sa.Column('uid', sa.Integer(), nullable=False), - sa.Column('name', sa.Text(), nullable=False), - sa.PrimaryKeyConstraint('uid', name=op.f('pk_Author')) + op.create_table( + "Author", + sa.Column("uid", sa.Integer(), nullable=False), + sa.Column("name", sa.Text(), nullable=False), + sa.PrimaryKeyConstraint("uid", name=op.f("pk_Author")), ) - with op.batch_alter_table('Author', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_Author_name'), ['name'], unique=True) + with op.batch_alter_table("Author", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_Author_name"), ["name"], unique=True) - op.create_table('Bookcase', - sa.Column('description', sa.Text(), nullable=True), - sa.Column('uid', sa.Integer(), nullable=False), - sa.Column('name', sa.Text(), nullable=False), - sa.PrimaryKeyConstraint('uid', name=op.f('pk_Bookcase')) + op.create_table( + "Bookcase", + sa.Column("description", sa.Text(), nullable=True), + sa.Column("uid", sa.Integer(), nullable=False), + sa.Column("name", sa.Text(), nullable=False), + sa.PrimaryKeyConstraint("uid", name=op.f("pk_Bookcase")), ) - with op.batch_alter_table('Bookcase', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_Bookcase_name'), ['name'], unique=True) + with op.batch_alter_table("Bookcase", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_Bookcase_name"), ["name"], unique=True) - op.create_table('Category', - sa.Column('description', sa.Text(), nullable=True), - sa.Column('uid', sa.Integer(), nullable=False), - sa.Column('name', sa.Text(), nullable=False), - sa.PrimaryKeyConstraint('uid', name=op.f('pk_Category')) + op.create_table( + "Category", + sa.Column("description", sa.Text(), nullable=True), + sa.Column("uid", sa.Integer(), nullable=False), + sa.Column("name", sa.Text(), nullable=False), + sa.PrimaryKeyConstraint("uid", name=op.f("pk_Category")), ) - with op.batch_alter_table('Category', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_Category_name'), ['name'], unique=True) + with op.batch_alter_table("Category", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_Category_name"), ["name"], unique=True) - op.create_table('DeadlineDaemonLastRunDatetime', - sa.Column('uid', sa.Boolean(), nullable=False), - sa.Column('time', sa.DateTime(), nullable=False), - sa.CheckConstraint('uid = true', name=op.f('ck_DeadlineDaemonLastRunDatetime_`single_row_only`')), - sa.PrimaryKeyConstraint('uid', name=op.f('pk_DeadlineDaemonLastRunDatetime')) + op.create_table( + "DeadlineDaemonLastRunDatetime", + sa.Column("uid", sa.Boolean(), nullable=False), + sa.Column("time", sa.DateTime(), nullable=False), + sa.CheckConstraint( + "uid = true", + name=op.f("ck_DeadlineDaemonLastRunDatetime_`single_row_only`"), + ), + sa.PrimaryKeyConstraint("uid", name=op.f("pk_DeadlineDaemonLastRunDatetime")), ) - op.create_table('Language', - sa.Column('iso639_1_code', sa.String(length=2), nullable=False), - sa.Column('uid', sa.Integer(), nullable=False), - sa.Column('name', sa.Text(), nullable=False), - sa.PrimaryKeyConstraint('uid', name=op.f('pk_Language')) + op.create_table( + "Language", + sa.Column("iso639_1_code", sa.String(length=2), nullable=False), + sa.Column("uid", sa.Integer(), nullable=False), + sa.Column("name", sa.Text(), nullable=False), + sa.PrimaryKeyConstraint("uid", name=op.f("pk_Language")), ) - with op.batch_alter_table('Language', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_Language_iso639_1_code'), ['iso639_1_code'], unique=True) - batch_op.create_index(batch_op.f('ix_Language_name'), ['name'], unique=True) + with op.batch_alter_table("Language", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_Language_iso639_1_code"), ["iso639_1_code"], unique=True + ) + batch_op.create_index(batch_op.f("ix_Language_name"), ["name"], unique=True) - op.create_table('MediaType', - sa.Column('description', sa.Text(), nullable=True), - sa.Column('uid', sa.Integer(), nullable=False), - sa.Column('name', sa.Text(), nullable=False), - sa.PrimaryKeyConstraint('uid', name=op.f('pk_MediaType')) + op.create_table( + "MediaType", + sa.Column("description", sa.Text(), nullable=True), + sa.Column("uid", sa.Integer(), nullable=False), + sa.Column("name", sa.Text(), nullable=False), + sa.PrimaryKeyConstraint("uid", name=op.f("pk_MediaType")), ) - with op.batch_alter_table('MediaType', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_MediaType_name'), ['name'], unique=True) + with op.batch_alter_table("MediaType", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_MediaType_name"), ["name"], unique=True) - op.create_table('BookcaseShelf', - sa.Column('description', sa.Text(), nullable=True), - sa.Column('row', sa.SmallInteger(), nullable=False), - sa.Column('column', sa.SmallInteger(), nullable=False), - sa.Column('fk_bookcase_uid', sa.Integer(), nullable=False), - sa.Column('uid', sa.Integer(), nullable=False), - sa.ForeignKeyConstraint(['fk_bookcase_uid'], ['Bookcase.uid'], name=op.f('fk_BookcaseShelf_fk_bookcase_uid_Bookcase')), - sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseShelf')), - sa.UniqueConstraint('column', 'fk_bookcase_uid', 'row', name=op.f('uq_BookcaseShelf_column')) + op.create_table( + "BookcaseShelf", + sa.Column("description", sa.Text(), nullable=True), + sa.Column("row", sa.SmallInteger(), nullable=False), + sa.Column("column", sa.SmallInteger(), nullable=False), + sa.Column("fk_bookcase_uid", sa.Integer(), nullable=False), + sa.Column("uid", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["fk_bookcase_uid"], + ["Bookcase.uid"], + name=op.f("fk_BookcaseShelf_fk_bookcase_uid_Bookcase"), + ), + sa.PrimaryKeyConstraint("uid", name=op.f("pk_BookcaseShelf")), + sa.UniqueConstraint( + "column", "fk_bookcase_uid", "row", name=op.f("uq_BookcaseShelf_column") + ), ) - op.create_table('BookcaseItem', - sa.Column('isbn', sa.String(), nullable=False), - sa.Column('name', sa.Text(), nullable=False), - sa.Column('owner', sa.String(), nullable=False), - sa.Column('amount', sa.SmallInteger(), nullable=False), - sa.Column('fk_media_type_uid', sa.Integer(), nullable=False), - sa.Column('fk_bookcase_shelf_uid', sa.Integer(), nullable=False), - sa.Column('fk_language_uid', sa.Integer(), nullable=True), - sa.Column('uid', sa.Integer(), nullable=False), - sa.ForeignKeyConstraint(['fk_bookcase_shelf_uid'], ['BookcaseShelf.uid'], name=op.f('fk_BookcaseItem_fk_bookcase_shelf_uid_BookcaseShelf')), - sa.ForeignKeyConstraint(['fk_language_uid'], ['Language.uid'], name=op.f('fk_BookcaseItem_fk_language_uid_Language')), - sa.ForeignKeyConstraint(['fk_media_type_uid'], ['MediaType.uid'], name=op.f('fk_BookcaseItem_fk_media_type_uid_MediaType')), - sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseItem')) + op.create_table( + "BookcaseItem", + sa.Column("isbn", sa.String(), nullable=False), + sa.Column("name", sa.Text(), nullable=False), + sa.Column("owner", sa.String(), nullable=False), + sa.Column("amount", sa.SmallInteger(), nullable=False), + sa.Column("fk_media_type_uid", sa.Integer(), nullable=False), + sa.Column("fk_bookcase_shelf_uid", sa.Integer(), nullable=False), + sa.Column("fk_language_uid", sa.Integer(), nullable=True), + sa.Column("uid", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["fk_bookcase_shelf_uid"], + ["BookcaseShelf.uid"], + name=op.f("fk_BookcaseItem_fk_bookcase_shelf_uid_BookcaseShelf"), + ), + sa.ForeignKeyConstraint( + ["fk_language_uid"], + ["Language.uid"], + name=op.f("fk_BookcaseItem_fk_language_uid_Language"), + ), + sa.ForeignKeyConstraint( + ["fk_media_type_uid"], + ["MediaType.uid"], + name=op.f("fk_BookcaseItem_fk_media_type_uid_MediaType"), + ), + sa.PrimaryKeyConstraint("uid", name=op.f("pk_BookcaseItem")), ) - with op.batch_alter_table('BookcaseItem', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_BookcaseItem_isbn'), ['isbn'], unique=True) - batch_op.create_index(batch_op.f('ix_BookcaseItem_name'), ['name'], unique=False) + with op.batch_alter_table("BookcaseItem", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_BookcaseItem_isbn"), ["isbn"], unique=True) + batch_op.create_index( + batch_op.f("ix_BookcaseItem_name"), ["name"], unique=False + ) - op.create_table('BookcaseItemBorrowing', - sa.Column('username', sa.String(), nullable=False), - sa.Column('start_time', sa.DateTime(), nullable=False), - sa.Column('end_time', sa.DateTime(), nullable=False), - sa.Column('delivered', sa.DateTime(), nullable=True), - sa.Column('fk_bookcase_item_uid', sa.Integer(), nullable=False), - sa.Column('uid', sa.Integer(), nullable=False), - sa.ForeignKeyConstraint(['fk_bookcase_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_BookcaseItemBorrowing_fk_bookcase_item_uid_BookcaseItem')), - sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseItemBorrowing')) + op.create_table( + "BookcaseItemBorrowing", + sa.Column("username", sa.String(), nullable=False), + sa.Column("start_time", sa.DateTime(), nullable=False), + sa.Column("end_time", sa.DateTime(), nullable=False), + sa.Column("delivered", sa.DateTime(), nullable=True), + sa.Column("fk_bookcase_item_uid", sa.Integer(), nullable=False), + sa.Column("uid", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["fk_bookcase_item_uid"], + ["BookcaseItem.uid"], + name=op.f("fk_BookcaseItemBorrowing_fk_bookcase_item_uid_BookcaseItem"), + ), + sa.PrimaryKeyConstraint("uid", name=op.f("pk_BookcaseItemBorrowing")), ) - with op.batch_alter_table('BookcaseItemBorrowing', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_BookcaseItemBorrowing_fk_bookcase_item_uid'), ['fk_bookcase_item_uid'], unique=False) + with op.batch_alter_table("BookcaseItemBorrowing", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_BookcaseItemBorrowing_fk_bookcase_item_uid"), + ["fk_bookcase_item_uid"], + unique=False, + ) - op.create_table('BookcaseItemBorrowingQueue', - sa.Column('username', sa.String(), nullable=False), - sa.Column('entered_queue_time', sa.DateTime(), nullable=False), - sa.Column('item_became_available_time', sa.DateTime(), nullable=True), - sa.Column('expired', sa.Boolean(), nullable=True), - sa.Column('fk_bookcase_item_uid', sa.Integer(), nullable=False), - sa.Column('uid', sa.Integer(), nullable=False), - sa.ForeignKeyConstraint(['fk_bookcase_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_BookcaseItemBorrowingQueue_fk_bookcase_item_uid_BookcaseItem')), - sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseItemBorrowingQueue')) + op.create_table( + "BookcaseItemBorrowingQueue", + sa.Column("username", sa.String(), nullable=False), + sa.Column("entered_queue_time", sa.DateTime(), nullable=False), + sa.Column("item_became_available_time", sa.DateTime(), nullable=True), + sa.Column("expired", sa.Boolean(), nullable=True), + sa.Column("fk_bookcase_item_uid", sa.Integer(), nullable=False), + sa.Column("uid", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["fk_bookcase_item_uid"], + ["BookcaseItem.uid"], + name=op.f( + "fk_BookcaseItemBorrowingQueue_fk_bookcase_item_uid_BookcaseItem" + ), + ), + sa.PrimaryKeyConstraint("uid", name=op.f("pk_BookcaseItemBorrowingQueue")), ) - with op.batch_alter_table('BookcaseItemBorrowingQueue', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid'), ['fk_bookcase_item_uid'], unique=False) + with op.batch_alter_table("BookcaseItemBorrowingQueue", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid"), + ["fk_bookcase_item_uid"], + unique=False, + ) - op.create_table('Item_Author', - sa.Column('fk_item_uid', sa.Integer(), nullable=False), - sa.Column('fk_author_uid', sa.Integer(), nullable=False), - sa.ForeignKeyConstraint(['fk_author_uid'], ['Author.uid'], name=op.f('fk_Item_Author_fk_author_uid_Author')), - sa.ForeignKeyConstraint(['fk_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_Item_Author_fk_item_uid_BookcaseItem')), - sa.PrimaryKeyConstraint('fk_item_uid', 'fk_author_uid', name=op.f('pk_Item_Author')) + op.create_table( + "Item_Author", + sa.Column("fk_item_uid", sa.Integer(), nullable=False), + sa.Column("fk_author_uid", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["fk_author_uid"], + ["Author.uid"], + name=op.f("fk_Item_Author_fk_author_uid_Author"), + ), + sa.ForeignKeyConstraint( + ["fk_item_uid"], + ["BookcaseItem.uid"], + name=op.f("fk_Item_Author_fk_item_uid_BookcaseItem"), + ), + sa.PrimaryKeyConstraint( + "fk_item_uid", "fk_author_uid", name=op.f("pk_Item_Author") + ), ) - op.create_table('Item_Category', - sa.Column('fk_item_uid', sa.Integer(), nullable=False), - sa.Column('fk_category_uid', sa.Integer(), nullable=False), - sa.ForeignKeyConstraint(['fk_category_uid'], ['Category.uid'], name=op.f('fk_Item_Category_fk_category_uid_Category')), - sa.ForeignKeyConstraint(['fk_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_Item_Category_fk_item_uid_BookcaseItem')), - sa.PrimaryKeyConstraint('fk_item_uid', 'fk_category_uid', name=op.f('pk_Item_Category')) + op.create_table( + "Item_Category", + sa.Column("fk_item_uid", sa.Integer(), nullable=False), + sa.Column("fk_category_uid", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["fk_category_uid"], + ["Category.uid"], + name=op.f("fk_Item_Category_fk_category_uid_Category"), + ), + sa.ForeignKeyConstraint( + ["fk_item_uid"], + ["BookcaseItem.uid"], + name=op.f("fk_Item_Category_fk_item_uid_BookcaseItem"), + ), + sa.PrimaryKeyConstraint( + "fk_item_uid", "fk_category_uid", name=op.f("pk_Item_Category") + ), ) # ### end Alembic commands ### def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.drop_table('Item_Category') - op.drop_table('Item_Author') - with op.batch_alter_table('BookcaseItemBorrowingQueue', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid')) + op.drop_table("Item_Category") + op.drop_table("Item_Author") + with op.batch_alter_table("BookcaseItemBorrowingQueue", schema=None) as batch_op: + batch_op.drop_index( + batch_op.f("ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid") + ) - op.drop_table('BookcaseItemBorrowingQueue') - with op.batch_alter_table('BookcaseItemBorrowing', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_BookcaseItemBorrowing_fk_bookcase_item_uid')) + op.drop_table("BookcaseItemBorrowingQueue") + with op.batch_alter_table("BookcaseItemBorrowing", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_BookcaseItemBorrowing_fk_bookcase_item_uid")) - op.drop_table('BookcaseItemBorrowing') - with op.batch_alter_table('BookcaseItem', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_BookcaseItem_name')) - batch_op.drop_index(batch_op.f('ix_BookcaseItem_isbn')) + op.drop_table("BookcaseItemBorrowing") + with op.batch_alter_table("BookcaseItem", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_BookcaseItem_name")) + batch_op.drop_index(batch_op.f("ix_BookcaseItem_isbn")) - op.drop_table('BookcaseItem') - op.drop_table('BookcaseShelf') - with op.batch_alter_table('MediaType', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_MediaType_name')) + op.drop_table("BookcaseItem") + op.drop_table("BookcaseShelf") + with op.batch_alter_table("MediaType", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_MediaType_name")) - op.drop_table('MediaType') - with op.batch_alter_table('Language', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_Language_name')) - batch_op.drop_index(batch_op.f('ix_Language_iso639_1_code')) + op.drop_table("MediaType") + with op.batch_alter_table("Language", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_Language_name")) + batch_op.drop_index(batch_op.f("ix_Language_iso639_1_code")) - op.drop_table('Language') - op.drop_table('DeadlineDaemonLastRunDatetime') - with op.batch_alter_table('Category', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_Category_name')) + op.drop_table("Language") + op.drop_table("DeadlineDaemonLastRunDatetime") + with op.batch_alter_table("Category", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_Category_name")) - op.drop_table('Category') - with op.batch_alter_table('Bookcase', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_Bookcase_name')) + op.drop_table("Category") + with op.batch_alter_table("Bookcase", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_Bookcase_name")) - op.drop_table('Bookcase') - with op.batch_alter_table('Author', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_Author_name')) + op.drop_table("Bookcase") + with op.batch_alter_table("Author", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_Author_name")) - op.drop_table('Author') + op.drop_table("Author") # ### end Alembic commands ### diff --git a/src/worblehat/models/mixins/UidMixin.py b/src/worblehat/models/mixins/UidMixin.py index 4673b34..33451a8 100644 --- a/src/worblehat/models/mixins/UidMixin.py +++ b/src/worblehat/models/mixins/UidMixin.py @@ -9,6 +9,7 @@ from sqlalchemy.orm import ( from worblehat.flaskapp.database import db + class UidMixin(object): uid: Mapped[int] = mapped_column(Integer, primary_key=True) @@ -28,4 +29,4 @@ class UidMixin(object): This method defaults to using the flask_sqlalchemy session. It will not work outside of a request context, unless another session is provided. """ - return sql_session.query(cls).where(cls.uid == uid).one_or_404() \ No newline at end of file + return sql_session.query(cls).where(cls.uid == uid).one_or_404() diff --git a/src/worblehat/models/mixins/UniqueNameMixin.py b/src/worblehat/models/mixins/UniqueNameMixin.py index cf35147..14f0bc4 100644 --- a/src/worblehat/models/mixins/UniqueNameMixin.py +++ b/src/worblehat/models/mixins/UniqueNameMixin.py @@ -9,6 +9,7 @@ from sqlalchemy.orm import ( from worblehat.flaskapp.database import db + class UniqueNameMixin(object): name: Mapped[str] = mapped_column(Text, unique=True, index=True) @@ -28,4 +29,4 @@ class UniqueNameMixin(object): This method defaults to using the flask_sqlalchemy session. It will not work outside of a request context, unless another session is provided. """ - return sql_session.query(cls).where(cls.name == name).one_or_404() \ No newline at end of file + return sql_session.query(cls).where(cls.name == name).one_or_404() diff --git a/src/worblehat/models/mixins/XrefMixin.py b/src/worblehat/models/mixins/XrefMixin.py index ea9d510..68e7cb9 100644 --- a/src/worblehat/models/mixins/XrefMixin.py +++ b/src/worblehat/models/mixins/XrefMixin.py @@ -1,6 +1,7 @@ from sqlalchemy.orm import declared_attr + class XrefMixin(object): @declared_attr.directive def __tablename__(cls) -> str: - return f'xref_{cls.__name__.lower()}' + return f"xref_{cls.__name__.lower()}" diff --git a/src/worblehat/models/mixins/__init__.py b/src/worblehat/models/mixins/__init__.py index 190d20a..cea7123 100644 --- a/src/worblehat/models/mixins/__init__.py +++ b/src/worblehat/models/mixins/__init__.py @@ -1,2 +1,4 @@ from .UidMixin import UidMixin from .UniqueNameMixin import UniqueNameMixin + +__all__ = ["UidMixin", "UniqueNameMixin"] diff --git a/src/worblehat/models/xref_tables/Item_Author.py b/src/worblehat/models/xref_tables/Item_Author.py index 502b383..1958452 100644 --- a/src/worblehat/models/xref_tables/Item_Author.py +++ b/src/worblehat/models/xref_tables/Item_Author.py @@ -1,5 +1,4 @@ from sqlalchemy import ( - Integer, ForeignKey, ) from sqlalchemy.orm import ( @@ -10,6 +9,11 @@ from sqlalchemy.orm import ( from ..Base import Base from ..mixins.XrefMixin import XrefMixin + class Item_Author(Base, XrefMixin): - fk_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), primary_key=True) - fk_author_uid: Mapped[int] = mapped_column(ForeignKey('Author.uid'), primary_key=True) \ No newline at end of file + fk_item_uid: Mapped[int] = mapped_column( + ForeignKey("BookcaseItem.uid"), primary_key=True + ) + fk_author_uid: Mapped[int] = mapped_column( + ForeignKey("Author.uid"), primary_key=True + ) diff --git a/src/worblehat/models/xref_tables/Item_Category.py b/src/worblehat/models/xref_tables/Item_Category.py index 4912b87..58c7046 100644 --- a/src/worblehat/models/xref_tables/Item_Category.py +++ b/src/worblehat/models/xref_tables/Item_Category.py @@ -1,5 +1,4 @@ from sqlalchemy import ( - Integer, ForeignKey, ) from sqlalchemy.orm import ( @@ -10,6 +9,11 @@ from sqlalchemy.orm import ( from ..Base import Base from ..mixins.XrefMixin import XrefMixin + class Item_Category(Base, XrefMixin): - fk_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), primary_key=True) - fk_category_uid: Mapped[int] = mapped_column(ForeignKey('Category.uid'), primary_key=True) \ No newline at end of file + fk_item_uid: Mapped[int] = mapped_column( + ForeignKey("BookcaseItem.uid"), primary_key=True + ) + fk_category_uid: Mapped[int] = mapped_column( + ForeignKey("Category.uid"), primary_key=True + ) diff --git a/src/worblehat/models/xref_tables/__init__.py b/src/worblehat/models/xref_tables/__init__.py index 0d2dd16..d58dd6d 100644 --- a/src/worblehat/models/xref_tables/__init__.py +++ b/src/worblehat/models/xref_tables/__init__.py @@ -1,2 +1,7 @@ from .Item_Author import Item_Author -from .Item_Category import Item_Category \ No newline at end of file +from .Item_Category import Item_Category + +__all__ = [ + "Item_Author", + "Item_Category", +] diff --git a/src/worblehat/services/__init__.py b/src/worblehat/services/__init__.py index 9311630..639546d 100644 --- a/src/worblehat/services/__init__.py +++ b/src/worblehat/services/__init__.py @@ -1,6 +1,6 @@ from .argument_parser import ( - arg_parser, - devscripts_arg_parser, + arg_parser, + devscripts_arg_parser, ) from .bookcase_item import ( create_bookcase_item_from_isbn, @@ -8,4 +8,14 @@ from .bookcase_item import ( ) from .config import Config from .email import send_email -from .seed_test_data import seed_data \ No newline at end of file +from .seed_test_data import seed_data + +__all__ = [ + "arg_parser", + "devscripts_arg_parser", + "Config", + "create_bookcase_item_from_isbn", + "is_valid_isbn", + "send_email", + "seed_data", +] diff --git a/src/worblehat/services/argument_parser.py b/src/worblehat/services/argument_parser.py index fafd168..db313f0 100644 --- a/src/worblehat/services/argument_parser.py +++ b/src/worblehat/services/argument_parser.py @@ -1,66 +1,69 @@ from argparse import ArgumentParser from pathlib import Path + def _is_valid_file(parser: ArgumentParser, arg: str) -> Path: path = Path(arg) if not path.is_file(): - parser.error(f'The file {arg} does not exist!') + parser.error(f"The file {arg} does not exist!") return path arg_parser = ArgumentParser( - description = 'Worblehat library management system', + description="Worblehat library management system", ) -subparsers = arg_parser.add_subparsers(dest='command') +subparsers = arg_parser.add_subparsers(dest="command") subparsers.add_parser( - 'deadline-daemon', - help = 'Initialize a single pass of the daemon which sends deadline emails', + "deadline-daemon", + help="Initialize a single pass of the daemon which sends deadline emails", ) subparsers.add_parser( - 'cli', - help = 'Start the command line interface', + "cli", + help="Start the command line interface", ) subparsers.add_parser( - 'flask-dev', - help = 'Start the web interface in development mode', + "flask-dev", + help="Start the web interface in development mode", ) subparsers.add_parser( - 'flask-prod', - help = 'Start the web interface in production mode', + "flask-prod", + help="Start the web interface in production mode", ) -devscripts_arg_parser = subparsers.add_parser('devscripts', help='Run development scripts') -devscripts_subparsers = devscripts_arg_parser.add_subparsers(dest='script') +devscripts_arg_parser = subparsers.add_parser( + "devscripts", help="Run development scripts" +) +devscripts_subparsers = devscripts_arg_parser.add_subparsers(dest="script") devscripts_subparsers.add_parser( - 'seed-test-data', - help = 'Seed test data in the database', + "seed-test-data", + help="Seed test data in the database", ) devscripts_subparsers.add_parser( - 'seed-content-for-deadline-daemon', - help = 'Seed data tailorded for testing the deadline daemon, into the database', + "seed-content-for-deadline-daemon", + help="Seed data tailorded for testing the deadline daemon, into the database", ) arg_parser.add_argument( - '-V', - '--version', - action = 'store_true', - help = 'Print version and exit', + "-V", + "--version", + action="store_true", + help="Print version and exit", ) arg_parser.add_argument( - '-c', - '--config', + "-c", + "--config", type=lambda x: _is_valid_file(arg_parser, x), - help = 'Path to config file', - dest = 'config_file', - metavar = 'FILE', + help="Path to config file", + dest="config_file", + metavar="FILE", ) arg_parser.add_argument( - '-p', - '--print-config', - action = 'store_true', - help = 'Print configuration and quit', + "-p", + "--print-config", + action="store_true", + help="Print configuration and quit", ) diff --git a/src/worblehat/services/bookcase_item.py b/src/worblehat/services/bookcase_item.py index a6f5841..f1f0555 100644 --- a/src/worblehat/services/bookcase_item.py +++ b/src/worblehat/services/bookcase_item.py @@ -10,22 +10,27 @@ from ..models import ( Language, ) + def is_valid_pvv_isbn(isbn: str) -> bool: - try: - int(isbn) - except ValueError: - return False - return len(isbn) == 8 + try: + int(isbn) + except ValueError: + return False + return len(isbn) == 8 def is_valid_isbn(isbn: str) -> bool: - return any([ - isbnlib.is_isbn10(isbn), - isbnlib.is_isbn13(isbn), - ]) + return any( + [ + isbnlib.is_isbn10(isbn), + isbnlib.is_isbn13(isbn), + ] + ) -def create_bookcase_item_from_isbn(isbn: str, sql_session: Session) -> BookcaseItem | None: +def create_bookcase_item_from_isbn( + isbn: str, sql_session: Session +) -> BookcaseItem | None: """ This function fetches metadata for the given ISBN and creates a BookcaseItem from it. It does so using a database connection to connect it to the correct authors and language @@ -43,18 +48,17 @@ def create_bookcase_item_from_isbn(isbn: str, sql_session: Session) -> BookcaseI metadata = metadata[0] bookcase_item = BookcaseItem( - name = metadata.title, - isbn = int(isbn.replace('-', '')), + name=metadata.title, + isbn=int(isbn.replace("-", "")), ) if len(authors := metadata.authors) > 0: for author in authors: bookcase_item.authors.add(Author(author)) - if (language := metadata.language): + if language := metadata.language: bookcase_item.language = sql_session.scalars( - select(Language) - .where(Language.iso639_1_code == language) + select(Language).where(Language.iso639_1_code == language) ).one() - return bookcase_item \ No newline at end of file + return bookcase_item diff --git a/src/worblehat/services/config.py b/src/worblehat/services/config.py index 5b4b199..bf4e6aa 100644 --- a/src/worblehat/services/config.py +++ b/src/worblehat/services/config.py @@ -18,30 +18,31 @@ class Config: _config = None _expected_config_file_locations = [ - Path('./config.toml'), - Path('~/.config/worblehat/config.toml'), - Path('/var/lib/worblehat/config.toml'), + Path("./config.toml"), + Path("~/.config/worblehat/config.toml"), + Path("/var/lib/worblehat/config.toml"), ] def __class_getitem__(cls, name: str) -> Any: if cls._config is None: - raise RuntimeError('Configuration not loaded, call Config.load_configuration() first.') + raise RuntimeError( + "Configuration not loaded, call Config.load_configuration() first." + ) __config = cls._config - for attr in name.split('.'): + for attr in name.split("."): __config = __config.get(attr) if __config is None: - raise AttributeError(f'No such attribute: {name}') + raise AttributeError(f"No such attribute: {name}") return __config @staticmethod def read_password(password_field: str) -> str: - if Path(password_field).is_file(): - with open(password_field, 'r') as f: - return f.read() - else: - return password_field - + if Path(password_field).is_file(): + with open(password_field, "r") as f: + return f.read() + else: + return password_field @classmethod def _locate_configuration_file(cls) -> Path | None: @@ -49,48 +50,46 @@ class Config: if path.is_file(): return path - @classmethod - def _load_configuration_from_file(cls, config_file_path: str | None) -> dict[str, any]: + def _load_configuration_from_file( + cls, config_file_path: str | None + ) -> dict[str, any]: if config_file_path is None: config_file_path = cls._locate_configuration_file() if config_file_path is None: - print('Error: could not locate configuration file.') + print("Error: could not locate configuration file.") exit(1) - with open(config_file_path, 'rb') as config_file: + with open(config_file_path, "rb") as config_file: args = tomllib.load(config_file) return args - @classmethod def db_string(cls) -> str: - db_type = cls._config.get('database').get('type') + db_type = cls._config.get("database").get("type") - if db_type == 'sqlite': - path = Path(cls._config.get('database').get('sqlite').get('path')) + if db_type == "sqlite": + path = Path(cls._config.get("database").get("sqlite").get("path")) return f"sqlite:///{path.absolute()}" - elif db_type == 'postgresql': - db_config = cls._config.get('database').get('postgresql') - hostname = db_config.get('hostname') - port = db_config.get('port') - username = db_config.get('username') - password = cls.read_password(db_config.get('password')) - database = db_config.get('database') + elif db_type == "postgresql": + db_config = cls._config.get("database").get("postgresql") + hostname = db_config.get("hostname") + port = db_config.get("port") + username = db_config.get("username") + password = cls.read_password(db_config.get("password")) + database = db_config.get("database") return f"psycopg2+postgresql://{username}:{password}@{hostname}:{port}/{database}" else: print(f"Error: unknown database type '{db_config.get('type')}'") exit(1) - @classmethod def debug(cls) -> str: return pformat(cls._config) - @classmethod def load_configuration(cls, args: dict[str, any]) -> dict[str, any]: - cls._config = cls._load_configuration_from_file(args.get('config_file')) \ No newline at end of file + cls._config = cls._load_configuration_from_file(args.get("config_file")) diff --git a/src/worblehat/services/email.py b/src/worblehat/services/email.py index 83c55cc..0194784 100644 --- a/src/worblehat/services/email.py +++ b/src/worblehat/services/email.py @@ -10,26 +10,26 @@ from .config import Config def send_email(to: str, subject: str, body: str): msg = MIMEMultipart() - msg['From'] = Config['smtp.from'] - msg['To'] = to - if Config['smtp.subject_prefix']: - msg['Subject'] = f"{Config['smtp.subject_prefix']} {subject}" + msg["From"] = Config["smtp.from"] + msg["To"] = to + if Config["smtp.subject_prefix"]: + msg["Subject"] = f"{Config['smtp.subject_prefix']} {subject}" else: - msg['Subject'] = subject - msg.attach(MIMEText(body, 'plain')) + msg["Subject"] = subject + msg.attach(MIMEText(body, "plain")) - if Config['smtp.enabled'] and not Config['deadline_daemon.dryrun']: + if Config["smtp.enabled"] and not Config["deadline_daemon.dryrun"]: try: - with smtplib.SMTP(Config['smtp.host'], Config['smtp.port']) as server: + with smtplib.SMTP(Config["smtp.host"], Config["smtp.port"]) as server: server.starttls() server.login( - Config['smtp.username'], - Config.read_password(Config['smtp.password']), + Config["smtp.username"], + Config.read_password(Config["smtp.password"]), ) - server.sendmail(Config['smtp.from'], to, msg.as_string()) + server.sendmail(Config["smtp.from"], to, msg.as_string()) except Exception as err: - print('Error: could not send email.') + print("Error: could not send email.") print(err) else: - print('Debug: Email sending is disabled, so the following email was not sent:') - print(indent(msg.as_string(), ' ')) \ No newline at end of file + print("Debug: Email sending is disabled, so the following email was not sent:") + print(indent(msg.as_string(), " ")) diff --git a/src/worblehat/services/metadata_fetchers/BookMetadata.py b/src/worblehat/services/metadata_fetchers/BookMetadata.py index 5616948..812281e 100644 --- a/src/worblehat/services/metadata_fetchers/BookMetadata.py +++ b/src/worblehat/services/metadata_fetchers/BookMetadata.py @@ -3,26 +3,29 @@ from typing import Set # TODO: Add more languages -LANGUAGES: set[str] = set([ - "no", - "en", - "de", - "fr", - "es", - "it", - "sv", - "da", - "fi", - "ru", - "zh", - "ja", - "ko", -]) +LANGUAGES: set[str] = set( + [ + "no", + "en", + "de", + "fr", + "es", + "it", + "sv", + "da", + "fi", + "ru", + "zh", + "ja", + "ko", + ] +) @dataclass class BookMetadata: """A class representing metadata for a book.""" + isbn: str title: str # The source of the metadata provider @@ -35,28 +38,30 @@ class BookMetadata: def to_dict(self) -> dict[str, any]: return { - 'isbn': self.isbn, - 'title': self.title, - 'source': self.metadata_source_id(), - 'authors': set() if self.authors is None else self.authors, - 'language': self.language, - 'publish_date': self.publish_date, - 'num_pages': self.num_pages, - 'subjects': set() if self.subjects is None else self.subjects + "isbn": self.isbn, + "title": self.title, + "source": self.metadata_source_id(), + "authors": set() if self.authors is None else self.authors, + "language": self.language, + "publish_date": self.publish_date, + "num_pages": self.num_pages, + "subjects": set() if self.subjects is None else self.subjects, } def validate(self) -> None: if not self.isbn: - raise ValueError('Missing ISBN') + raise ValueError("Missing ISBN") if not self.title: - raise ValueError('Missing title') + raise ValueError("Missing title") if not self.source: - raise ValueError('Missing source') + raise ValueError("Missing source") if not self.authors: - raise ValueError('Missing authors') + raise ValueError("Missing authors") if self.language is not None and self.language not in LANGUAGES: - raise ValueError(f'Invalid language: {self.language}. Consider adding it to the LANGUAGES set if you think this is a mistake.') + raise ValueError( + f"Invalid language: {self.language}. Consider adding it to the LANGUAGES set if you think this is a mistake." + ) if self.num_pages is not None and self.num_pages < 0: - raise ValueError(f'Invalid number of pages: {self.num_pages}') + raise ValueError(f"Invalid number of pages: {self.num_pages}") diff --git a/src/worblehat/services/metadata_fetchers/BookMetadataFetcher.py b/src/worblehat/services/metadata_fetchers/BookMetadataFetcher.py index fe289e5..9356f94 100644 --- a/src/worblehat/services/metadata_fetchers/BookMetadataFetcher.py +++ b/src/worblehat/services/metadata_fetchers/BookMetadataFetcher.py @@ -1,7 +1,8 @@ -#base fetcher. +# base fetcher. from abc import ABC, abstractmethod from .BookMetadata import BookMetadata + class BookMetadataFetcher(ABC): """ A base class for metadata fetchers. @@ -17,4 +18,4 @@ class BookMetadataFetcher(ABC): @abstractmethod def fetch_metadata(cls, isbn: str) -> BookMetadata | None: """Tries to fetch metadata for the given ISBN.""" - pass \ No newline at end of file + pass diff --git a/src/worblehat/services/metadata_fetchers/GoogleBooksFetcher.py b/src/worblehat/services/metadata_fetchers/GoogleBooksFetcher.py index 31182da..54273b3 100644 --- a/src/worblehat/services/metadata_fetchers/GoogleBooksFetcher.py +++ b/src/worblehat/services/metadata_fetchers/GoogleBooksFetcher.py @@ -11,14 +11,14 @@ from worblehat.services.metadata_fetchers.BookMetadata import BookMetadata class GoogleBooksFetcher(BookMetadataFetcher): @classmethod def metadata_source_id(_cls) -> str: - return "google_books" + return "google_books" @classmethod def fetch_metadata(cls, isbn: str) -> BookMetadata | None: try: jsonInput = requests.get( - f"https://www.googleapis.com/books/v1/volumes", - params = {"q": f"isbn:{isbn}"}, + "https://www.googleapis.com/books/v1/volumes", + params={"q": f"isbn:{isbn}"}, ).json() data = jsonInput.get("items")[0].get("volumeInfo") @@ -34,18 +34,18 @@ class GoogleBooksFetcher(BookMetadataFetcher): return None return BookMetadata( - isbn = isbn, - title = title, - source = cls.metadata_source_id(), - authors = authors, - language = languages, - publish_date = publishDate, - num_pages = numberOfPages, - subjects = subjects, + isbn=isbn, + title=title, + source=cls.metadata_source_id(), + authors=authors, + language=languages, + publish_date=publishDate, + num_pages=numberOfPages, + subjects=subjects, ) -if __name__ == '__main__': - book_data = GoogleBooksFetcher.fetch_metadata('0132624788') +if __name__ == "__main__": + book_data = GoogleBooksFetcher.fetch_metadata("0132624788") book_data.validate() - print(book_data) \ No newline at end of file + print(book_data) diff --git a/src/worblehat/services/metadata_fetchers/OpenLibraryFetcher.py b/src/worblehat/services/metadata_fetchers/OpenLibraryFetcher.py index bebaf76..9b784ce 100644 --- a/src/worblehat/services/metadata_fetchers/OpenLibraryFetcher.py +++ b/src/worblehat/services/metadata_fetchers/OpenLibraryFetcher.py @@ -15,7 +15,7 @@ LANGUAGE_MAP = { class OpenLibraryFetcher(BookMetadataFetcher): @classmethod def metadata_source_id(_cls) -> str: - return "open_library" + return "open_library" @classmethod def fetch_metadata(cls, isbn: str) -> BookMetadata | None: @@ -25,8 +25,12 @@ class OpenLibraryFetcher(BookMetadataFetcher): author_keys = jsonInput.get("authors") or [] author_names = set() for author_key in author_keys: - key = author_key.get('key') - author_name = requests.get(f"https://openlibrary.org/{key}.json").json().get("name") + key = author_key.get("key") + author_name = ( + requests.get(f"https://openlibrary.org/{key}.json") + .json() + .get("name") + ) author_names.add(author_name) title = jsonInput.get("title") @@ -37,25 +41,30 @@ class OpenLibraryFetcher(BookMetadataFetcher): numberOfPages = int(numberOfPages) language_key = jsonInput.get("languages")[0].get("key") - language = requests.get(f"https://openlibrary.org/{language_key}.json").json().get("identifiers").get("iso_639_1")[0] + language = ( + requests.get(f"https://openlibrary.org/{language_key}.json") + .json() + .get("identifiers") + .get("iso_639_1")[0] + ) subjects = set(jsonInput.get("subjects") or []) except Exception: return None return BookMetadata( - isbn = isbn, - title = title, - source = cls.metadata_source_id(), - authors = author_names, - language = language, - publish_date = publishDate, - num_pages = numberOfPages, - subjects = subjects, + isbn=isbn, + title=title, + source=cls.metadata_source_id(), + authors=author_names, + language=language, + publish_date=publishDate, + num_pages=numberOfPages, + subjects=subjects, ) -if __name__ == '__main__': - book_data = OpenLibraryFetcher.fetch_metadata('9788205530751') +if __name__ == "__main__": + book_data = OpenLibraryFetcher.fetch_metadata("9788205530751") book_data.validate() - print(book_data) \ No newline at end of file + print(book_data) diff --git a/src/worblehat/services/metadata_fetchers/OutlandScraperFetcher.py b/src/worblehat/services/metadata_fetchers/OutlandScraperFetcher.py index e0ddc9f..f50e5b6 100644 --- a/src/worblehat/services/metadata_fetchers/OutlandScraperFetcher.py +++ b/src/worblehat/services/metadata_fetchers/OutlandScraperFetcher.py @@ -30,7 +30,7 @@ LANGUAGE_MAP = { class OutlandScraperFetcher(BookMetadataFetcher): @classmethod def metadata_source_id(_cls) -> str: - return "outland_scraper" + return "outland_scraper" @classmethod def fetch_metadata(cls, isbn: str) -> BookMetadata | None: @@ -50,7 +50,7 @@ class OutlandScraperFetcher(BookMetadataFetcher): title = soup.find_all("span", class_="base")[0].text releaseDate = soup.find_all("span", class_="release-date")[0].text.strip() - releaseDate = releaseDate[-4:] # only keep year + releaseDate = releaseDate[-4:] # only keep year bookData = { "Title": title, @@ -67,7 +67,7 @@ class OutlandScraperFetcher(BookMetadataFetcher): "NumberOfPages": "Antall Sider", "Genre": "Sjanger", "Language": "Språk", - "Subjects": "Serie" + "Subjects": "Serie", } for value in data: @@ -92,18 +92,18 @@ class OutlandScraperFetcher(BookMetadataFetcher): return None return BookMetadata( - isbn = isbn, - title = bookData.get('Title'), - source = cls.metadata_source_id(), - authors = bookData.get('Authors'), - language = bookData.get('Language'), - publish_date = bookData.get('PublishDate'), - num_pages = bookData.get('NumberOfPages'), - subjects = bookData.get('Subjects'), + isbn=isbn, + title=bookData.get("Title"), + source=cls.metadata_source_id(), + authors=bookData.get("Authors"), + language=bookData.get("Language"), + publish_date=bookData.get("PublishDate"), + num_pages=bookData.get("NumberOfPages"), + subjects=bookData.get("Subjects"), ) -if __name__ == '__main__': - book_data = OutlandScraperFetcher.fetch_metadata('9781947808225') +if __name__ == "__main__": + book_data = OutlandScraperFetcher.fetch_metadata("9781947808225") book_data.validate() - print(book_data) \ No newline at end of file + print(book_data) diff --git a/src/worblehat/services/metadata_fetchers/__init__.py b/src/worblehat/services/metadata_fetchers/__init__.py index c489f15..f7ecf33 100644 --- a/src/worblehat/services/metadata_fetchers/__init__.py +++ b/src/worblehat/services/metadata_fetchers/__init__.py @@ -1 +1,3 @@ -from .book_metadata_fetcher import fetch_metadata_from_multiple_sources \ No newline at end of file +from .book_metadata_fetcher import fetch_metadata_from_multiple_sources + +__all__ = ["fetch_metadata_from_multiple_sources"] diff --git a/src/worblehat/services/metadata_fetchers/book_metadata_fetcher.py b/src/worblehat/services/metadata_fetchers/book_metadata_fetcher.py index 6915a4f..6d86f5f 100644 --- a/src/worblehat/services/metadata_fetchers/book_metadata_fetcher.py +++ b/src/worblehat/services/metadata_fetchers/book_metadata_fetcher.py @@ -10,7 +10,9 @@ from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadat from worblehat.services.metadata_fetchers.GoogleBooksFetcher import GoogleBooksFetcher from worblehat.services.metadata_fetchers.OpenLibraryFetcher import OpenLibraryFetcher -from worblehat.services.metadata_fetchers.OutlandScraperFetcher import OutlandScraperFetcher +from worblehat.services.metadata_fetchers.OutlandScraperFetcher import ( + OutlandScraperFetcher, +) # The order of these fetchers determines the priority of the sources. @@ -46,14 +48,16 @@ def fetch_metadata_from_multiple_sources(isbn: str, strict=False) -> list[BookMe The results are always ordered in the same way as the fetchers are listed in the FETCHERS list. """ - isbn = isbn.replace('-', '').replace('_', '').strip().lower() + isbn = isbn.replace("-", "").replace("_", "").strip().lower() if len(isbn) != 10 and len(isbn) != 13 and not isbn.isnumeric(): - raise ValueError('Invalid ISBN') + raise ValueError("Invalid ISBN") results: list[BookMetadata] = [] with ThreadPoolExecutor() as executor: - futures = [executor.submit(fetcher.fetch_metadata, isbn) for fetcher in FETCHERS] + futures = [ + executor.submit(fetcher.fetch_metadata, isbn) for fetcher in FETCHERS + ] for future in futures: result = future.result() @@ -67,14 +71,15 @@ def fetch_metadata_from_multiple_sources(isbn: str, strict=False) -> list[BookMe if strict: raise e else: - print(f'Invalid metadata: {e}') + print(f"Invalid metadata: {e}") results.remove(result) return sort_metadata_by_priority(results) -if __name__ == '__main__': +if __name__ == "__main__": from pprint import pprint - isbn = '0132624788' + + isbn = "0132624788" metadata = fetch_metadata_from_multiple_sources(isbn) - pprint(metadata) \ No newline at end of file + pprint(metadata) diff --git a/src/worblehat/services/seed_test_data.py b/src/worblehat/services/seed_test_data.py index 021816e..034c231 100644 --- a/src/worblehat/services/seed_test_data.py +++ b/src/worblehat/services/seed_test_data.py @@ -17,20 +17,27 @@ from ..models import ( MediaType, ) + def seed_data(sql_session: Session = db.session): media_types = [ - MediaType(name='Book', description='A physical book'), - MediaType(name='Comic', description='A comic book'), - MediaType(name='Video Game', description='A digital game for computers or games consoles'), - MediaType(name='Tabletop Game', description='A physical game with cards, boards or similar') + MediaType(name="Book", description="A physical book"), + MediaType(name="Comic", description="A comic book"), + MediaType( + name="Video Game", + description="A digital game for computers or games consoles", + ), + MediaType( + name="Tabletop Game", + description="A physical game with cards, boards or similar", + ), ] bookcases = [ - Bookcase(name='Unnamed A', description='White case across dibbler'), - Bookcase(name='Unnamed B', description='Math case in the working room'), - Bookcase(name='Unnamed C', description='Large case in the working room'), - Bookcase(name='Unnamed D', description='White comics case in the hallway'), - Bookcase(name='Unnamed E', description='Wooden comics case in the hallway'), + Bookcase(name="Unnamed A", description="White case across dibbler"), + Bookcase(name="Unnamed B", description="Math case in the working room"), + Bookcase(name="Unnamed C", description="Large case in the working room"), + Bookcase(name="Unnamed D", description="White comics case in the hallway"), + Bookcase(name="Unnamed E", description="Wooden comics case in the hallway"), ] shelfs = [ @@ -39,82 +46,110 @@ def seed_data(sql_session: Session = db.session): BookcaseShelf(row=2, column=0, bookcase=bookcases[0]), BookcaseShelf(row=3, column=0, bookcase=bookcases[0], description="Hacking"), BookcaseShelf(row=4, column=0, bookcase=bookcases[0], description="Hacking"), - BookcaseShelf(row=0, column=1, bookcase=bookcases[0]), BookcaseShelf(row=1, column=1, bookcase=bookcases[0]), BookcaseShelf(row=2, column=1, bookcase=bookcases[0], description="DOS"), - BookcaseShelf(row=3, column=1, bookcase=bookcases[0], description="Food for thought"), + BookcaseShelf( + row=3, column=1, bookcase=bookcases[0], description="Food for thought" + ), BookcaseShelf(row=4, column=1, bookcase=bookcases[0], description="CPP"), - BookcaseShelf(row=0, column=2, bookcase=bookcases[0]), BookcaseShelf(row=1, column=2, bookcase=bookcases[0]), BookcaseShelf(row=2, column=2, bookcase=bookcases[0], description="E = mc2"), BookcaseShelf(row=3, column=2, bookcase=bookcases[0], description="OBJECTION!"), BookcaseShelf(row=4, column=2, bookcase=bookcases[0], description="/home"), - BookcaseShelf(row=0, column=3, bookcase=bookcases[0]), - BookcaseShelf(row=1, column=3, bookcase=bookcases[0], description="Big indonisian island"), + BookcaseShelf( + row=1, column=3, bookcase=bookcases[0], description="Big indonisian island" + ), BookcaseShelf(row=2, column=3, bookcase=bookcases[0]), - BookcaseShelf(row=3, column=3, bookcase=bookcases[0], description="Div science"), + BookcaseShelf( + row=3, column=3, bookcase=bookcases[0], description="Div science" + ), BookcaseShelf(row=4, column=3, bookcase=bookcases[0], description="/home"), - BookcaseShelf(row=0, column=4, bookcase=bookcases[0]), BookcaseShelf(row=1, column=4, bookcase=bookcases[0]), - BookcaseShelf(row=2, column=4, bookcase=bookcases[0], description="(not) computer vision"), - BookcaseShelf(row=3, column=4, bookcase=bookcases[0], description="Low voltage"), + BookcaseShelf( + row=2, column=4, bookcase=bookcases[0], description="(not) computer vision" + ), + BookcaseShelf( + row=3, column=4, bookcase=bookcases[0], description="Low voltage" + ), BookcaseShelf(row=4, column=4, bookcase=bookcases[0], description="/home"), - BookcaseShelf(row=0, column=5, bookcase=bookcases[0]), BookcaseShelf(row=1, column=5, bookcase=bookcases[0]), BookcaseShelf(row=2, column=5, bookcase=bookcases[0], description="/home"), BookcaseShelf(row=3, column=5, bookcase=bookcases[0], description="/home"), - BookcaseShelf(row=0, column=0, bookcase=bookcases[1]), - BookcaseShelf(row=1, column=0, bookcase=bookcases[1], description="Kjellerarealer og komodovaraner"), + BookcaseShelf( + row=1, + column=0, + bookcase=bookcases[1], + description="Kjellerarealer og komodovaraner", + ), BookcaseShelf(row=2, column=0, bookcase=bookcases[1]), BookcaseShelf(row=3, column=0, bookcase=bookcases[1], description="Quick mafs"), BookcaseShelf(row=4, column=0, bookcase=bookcases[1]), - BookcaseShelf(row=0, column=0, bookcase=bookcases[2]), BookcaseShelf(row=1, column=0, bookcase=bookcases[2]), BookcaseShelf(row=2, column=0, bookcase=bookcases[2], description="AI"), BookcaseShelf(row=3, column=0, bookcase=bookcases[2], description="X86"), BookcaseShelf(row=4, column=0, bookcase=bookcases[2], description="Humanoira"), - BookcaseShelf(row=5, column=0, bookcase=bookcases[2], description="Hvem monterte rørforsterker?"), - + BookcaseShelf( + row=5, + column=0, + bookcase=bookcases[2], + description="Hvem monterte rørforsterker?", + ), BookcaseShelf(row=0, column=1, bookcase=bookcases[2]), BookcaseShelf(row=1, column=1, bookcase=bookcases[2], description="Div data"), BookcaseShelf(row=2, column=1, bookcase=bookcases[2], description="Chemistry"), - BookcaseShelf(row=3, column=1, bookcase=bookcases[2], description="Soviet Phys. Techn. Phys"), - BookcaseShelf(row=4, column=1, bookcase=bookcases[2], description="Digitalteknikk"), + BookcaseShelf( + row=3, + column=1, + bookcase=bookcases[2], + description="Soviet Phys. Techn. Phys", + ), + BookcaseShelf( + row=4, column=1, bookcase=bookcases[2], description="Digitalteknikk" + ), BookcaseShelf(row=5, column=1, bookcase=bookcases[2], description="Material"), - BookcaseShelf(row=0, column=2, bookcase=bookcases[2]), - BookcaseShelf(row=1, column=2, bookcase=bookcases[2], description="Assembler / APL"), + BookcaseShelf( + row=1, column=2, bookcase=bookcases[2], description="Assembler / APL" + ), BookcaseShelf(row=2, column=2, bookcase=bookcases[2], description="Internet"), BookcaseShelf(row=3, column=2, bookcase=bookcases[2], description="Algorithms"), - BookcaseShelf(row=4, column=2, bookcase=bookcases[2], description="Soviet Physics Jetp"), - BookcaseShelf(row=5, column=2, bookcase=bookcases[2], description="Død og pine"), - + BookcaseShelf( + row=4, column=2, bookcase=bookcases[2], description="Soviet Physics Jetp" + ), + BookcaseShelf( + row=5, column=2, bookcase=bookcases[2], description="Død og pine" + ), BookcaseShelf(row=0, column=3, bookcase=bookcases[2]), BookcaseShelf(row=1, column=3, bookcase=bookcases[2], description="Web"), - BookcaseShelf(row=2, column=3, bookcase=bookcases[2], description="Div languages"), + BookcaseShelf( + row=2, column=3, bookcase=bookcases[2], description="Div languages" + ), BookcaseShelf(row=3, column=3, bookcase=bookcases[2], description="Python"), BookcaseShelf(row=4, column=3, bookcase=bookcases[2], description="D&D Minis"), BookcaseShelf(row=5, column=3, bookcase=bookcases[2], description="Perl"), - BookcaseShelf(row=0, column=4, bookcase=bookcases[2]), - BookcaseShelf(row=1, column=4, bookcase=bookcases[2], description="Knuth on programming"), - BookcaseShelf(row=2, column=4, bookcase=bookcases[2], description="Div languages"), - BookcaseShelf(row=3, column=4, bookcase=bookcases[2], description="Typesetting"), + BookcaseShelf( + row=1, column=4, bookcase=bookcases[2], description="Knuth on programming" + ), + BookcaseShelf( + row=2, column=4, bookcase=bookcases[2], description="Div languages" + ), + BookcaseShelf( + row=3, column=4, bookcase=bookcases[2], description="Typesetting" + ), BookcaseShelf(row=4, column=4, bookcase=bookcases[2]), - BookcaseShelf(row=0, column=0, bookcase=bookcases[3]), BookcaseShelf(row=0, column=1, bookcase=bookcases[3]), BookcaseShelf(row=0, column=2, bookcase=bookcases[3]), BookcaseShelf(row=0, column=3, bookcase=bookcases[3]), BookcaseShelf(row=0, column=4, bookcase=bookcases[3]), - BookcaseShelf(row=0, column=0, bookcase=bookcases[4]), BookcaseShelf(row=0, column=1, bookcase=bookcases[4]), BookcaseShelf(row=0, column=2, bookcase=bookcases[4]), @@ -132,24 +167,24 @@ def seed_data(sql_session: Session = db.session): ] book1 = BookcaseItem( - name = "The Art of Computer Programming", - isbn = "9780201896831", + name="The Art of Computer Programming", + isbn="9780201896831", ) book1.authors.add(authors[0]) book1.media_type = media_types[0] book1.shelf = shelfs[59] book2 = BookcaseItem( - name = "Harry Potter and the Philosopher's Stone", - isbn = "9780747532743", + name="Harry Potter and the Philosopher's Stone", + isbn="9780747532743", ) book2.authors.add(authors[1]) book2.media_type = media_types[0] book2.shelf = shelfs[-1] book_owned_by_other_user = BookcaseItem( - name = "Book owned by other user", - isbn = "9780747532744", + name="Book owned by other user", + isbn="9780747532744", ) book_owned_by_other_user.owner = "other_user" @@ -158,8 +193,8 @@ def seed_data(sql_session: Session = db.session): book_owned_by_other_user.shelf = shelfs[-2] borrowed_book_more_available = BookcaseItem( - name = "Borrowed book with more available", - isbn = "9780747532745", + name="Borrowed book with more available", + isbn="9780747532745", ) borrowed_book_more_available.authors.add(authors[5]) borrowed_book_more_available.media_type = media_types[0] @@ -167,24 +202,24 @@ def seed_data(sql_session: Session = db.session): borrowed_book_more_available.amount = 2 borrowed_book_no_more_available = BookcaseItem( - name = "Borrowed book with no more available", - isbn = "9780747532746", + name="Borrowed book with no more available", + isbn="9780747532746", ) borrowed_book_no_more_available.authors.add(authors[5]) borrowed_book_no_more_available.media_type = media_types[0] borrowed_book_no_more_available.shelf = shelfs[-3] borrowed_book_people_in_queue = BookcaseItem( - name = "Borrowed book with people in queue", - isbn = "9780747532747", + name="Borrowed book with people in queue", + isbn="9780747532747", ) borrowed_book_people_in_queue.authors.add(authors[5]) borrowed_book_people_in_queue.media_type = media_types[0] borrowed_book_people_in_queue.shelf = shelfs[-3] borrowed_book_by_slabbedask = BookcaseItem( - name = "Borrowed book by slabbedask", - isbn = "9780747532748", + name="Borrowed book by slabbedask", + isbn="9780747532748", ) borrowed_book_by_slabbedask.authors.add(authors[5]) borrowed_book_by_slabbedask.media_type = media_types[0] @@ -216,9 +251,9 @@ def seed_data(sql_session: Session = db.session): BookcaseItemBorrowingQueue(username="user", item=borrowed_book_people_in_queue), ] - with open(Path(__file__).parent.parent.parent / 'data' / 'iso639_1.csv') as f: - reader = csv.reader(f) - languages = [Language(name, code) for (code, name) in reader] + with open(Path(__file__).parent.parent.parent / "data" / "iso639_1.csv") as f: + reader = csv.reader(f) + languages = [Language(name, code) for (code, name) in reader] sql_session.add_all(media_types) sql_session.add_all(bookcases) @@ -229,4 +264,4 @@ def seed_data(sql_session: Session = db.session): sql_session.add_all(borrowings) sql_session.add_all(queue) sql_session.commit() - print("Added test media types, bookcases and shelfs.") \ No newline at end of file + print("Added test media types, bookcases and shelfs.")