From 3525e84576622c2428c75ccd3a45949498d6df29 Mon Sep 17 00:00:00 2001 From: h7x4 Date: Sat, 6 May 2023 02:30:24 +0200 Subject: [PATCH] Start converting the scanner tool into a cli tool The scanner tool is about to become a fullfledged repl, much in the same manner as dibbler. Changes: - Rename `tools/scanner.py` -> `cli/main.py` - Create abstraction layer for Cmd with numbered commands - Create submenu for items - Lots of functionality edits - Fill out more todos in REAMDE --- README.md | 25 +- flake.nix | 2 +- pyproject.toml | 2 +- worblehat/cli/__init__.py | 0 worblehat/cli/main.py | 352 +++++++++++++++++++++++++ worblehat/cli/prompt_utils.py | 182 +++++++++++++ worblehat/cli/subclis/__init__.py | 0 worblehat/cli/subclis/bookcase_item.py | 163 ++++++++++++ worblehat/models/BookcaseItem.py | 6 +- worblehat/models/BookcaseShelf.py | 2 +- worblehat/tools/scanner.py | 274 ------------------- 11 files changed, 723 insertions(+), 285 deletions(-) create mode 100644 worblehat/cli/__init__.py create mode 100644 worblehat/cli/main.py create mode 100644 worblehat/cli/prompt_utils.py create mode 100644 worblehat/cli/subclis/__init__.py create mode 100644 worblehat/cli/subclis/bookcase_item.py delete mode 100644 worblehat/tools/scanner.py diff --git a/README.md b/README.md index e4c0d23..4779e60 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ This project uses [poetry][poetry] as its buildtool as of May 2023. ```console $ poetry install $ poetry run alembic migrate -$ poetry run scanner +$ poetry run cli $ poetry run dev ``` @@ -46,11 +46,26 @@ See `worblehat/config.py` for configurable settings. ## TODO List - [ ] High priority: - - [ ] Book ingestion tool in order to create the database in a quick manner. It should pull data from online ISBN databases (this is almost done) - - [ ] A database with all of PVVs books should be created - - [ ] Ability to request book loans for PVV members, e.g. through Dibblers interface. + - [X] Data ingestion logic, that will pull data from online databases based on ISBN. + - [ ] Cli version of the program (this is currently being worked on). + - [ ] Web version of the program + - [ ] Setting up a database with all of PVVs books + - [ ] Creating database with user and pw + - [ ] Model all bookshelfs + - [ ] Scan in all books + - [ ] Inner workings + - [X] Ability to create and update bookcases + - [X] Ability to create and update bookcase shelfs + - [~] Ability to create and update bookcase items + - [ ] Ability to search for books + - [ ] Ability to request book loans for PVV members + - [ ] Ability to queue book loans for PVV members + - [ ] Ability to be notified when books are available + - [ ] Ability to be notified when deadlines are due + - [ ] Ascii art of monkey - [ ] Low priority: - [ ] Ability for PVV members to request book loans through the PVV website - [ ] Ability for PVV members to search for books through the PVV website - [ ] Discussion - - [ ] Should this project run in a separate tty-instance on Dibblers interface, or should they share the tty with some kind of switching ability? \ No newline at end of file + - [ ] Should this project run in a separate tty-instance on Dibblers interface, or should they share the tty with some kind of switching ability? + After some discussion with other PVV members, we came up with an idea where we run the programs in separate ttys, and use a set of large mechanical switches connected to a QMK-flashed microcontroller to switch between them. \ No newline at end of file diff --git a/flake.nix b/flake.nix index 776e212..300d064 100644 --- a/flake.nix +++ b/flake.nix @@ -15,7 +15,7 @@ in { default = self.apps.${system}.dev; dev = app "${self.packages.${system}.worblehat}/bin/dev"; - scanner = app "${self.packages.${system}.worblehat}/bin/scanner"; + cli = app "${self.packages.${system}.worblehat}/bin/cli"; }; packages.${system} = { diff --git a/pyproject.toml b/pyproject.toml index 376bd25..22ba0d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ sqlalchemy = "^2.0.8" werkzeug = "^2.3.3" [tool.poetry.scripts] -scanner = "worblehat.tools.scanner:main" +cli = "worblehat.cli.main:main" dev = "worblehat.wsgi_dev:main" [build-system] diff --git a/worblehat/cli/__init__.py b/worblehat/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/worblehat/cli/main.py b/worblehat/cli/main.py new file mode 100644 index 0000000..de8e5f4 --- /dev/null +++ b/worblehat/cli/main.py @@ -0,0 +1,352 @@ +from textwrap import dedent + +from sqlalchemy import ( + create_engine, + event, + select, +) +from sqlalchemy.orm import ( + Session, +) +from worblehat.cli.subclis.bookcase_item import BookcaseItemCli + +from worblehat.services.bookcase_item import ( + create_bookcase_item_from_isbn, + is_valid_isbn, +) + +from .prompt_utils import * + +from worblehat.config import Config +from worblehat.models import * + +# TODO: Category seems to have been forgotten. Maybe relevant interactivity should be added? +# However, is there anyone who are going to search by category rather than just look in +# the shelves? + +class WorblehatCli(NumberedCmd): + sql_session: Session + sql_session_dirty: bool = False + + def __init__(self): + super().__init__() + + try: + engine = create_engine(Config.SQLALCHEMY_DATABASE_URI) + self.sql_session = Session(engine) + except Exception: + print('Error: could not connect to database.') + exit(1) + + @event.listens_for(self.sql_session, 'after_flush') + def mark_session_as_dirty(*_): + self.sql_session_dirty = True + self.prompt_header = f'(unsaved changes)' + + @event.listens_for(self.sql_session, 'after_commit') + @event.listens_for(self.sql_session, 'after_rollback') + def mark_session_as_clean(*_): + self.sql_session_dirty = False + self.prompt_header = None + + print(f"Debug: Connected to database at '{Config.SQLALCHEMY_DATABASE_URI}'") + + + def do_list_bookcases(self, _: str): + bookcase_shelfs = self.sql_session.scalars( + select(BookcaseShelf) + .join(Bookcase) + .order_by( + Bookcase.name, + BookcaseShelf.column, + BookcaseShelf.row, + ) + ).all() + + bookcase_uid = None + for shelf in bookcase_shelfs: + if shelf.bookcase.uid != bookcase_uid: + print(shelf.bookcase.name) + bookcase_uid = shelf.bookcase.uid + + name = f"r{shelf.row}-c{shelf.column}" + if shelf.description is not None: + name += f" [{shelf.description}]" + + print(f' {name} - {sum(i.amount for i in shelf.items)} items') + + + def do_show_bookcase(self, arg: str): + bookcase_selector = InteractiveItemSelector( + cls = Bookcase, + sql_session = self.sql_session, + ) + bookcase_selector.cmdloop() + bookcase = bookcase_selector.result + + for shelf in bookcase.shelfs: + name = f"r{shelf.row}-c{shelf.column}" + if shelf.description is not None: + name += f" [{shelf.description}]" + print(name) + for item in shelf.items: + print(f' {item.name} - {item.amount} copies') + + + def do_add_bookcase(self, _: str): + while True: + name = input('Name of bookcase> ') + if name == '': + print('Error: name cannot be empty') + continue + + if self.sql_session.scalars( + select(Bookcase) + .where(Bookcase.name == name) + ).one_or_none() is not None: + print(f'Error: a bookcase with name {name} already exists') + continue + + break + + description = input('Description of bookcase> ') + if description == '': + description = None + + bookcase = Bookcase(name, description) + self.sql_session.add(bookcase) + self.sql_session.flush() + + + def do_add_bookcase_shelf(self, arg: str): + bookcase_selector = InteractiveItemSelector( + cls = Bookcase, + sql_session = self.sql_session, + ) + bookcase_selector.cmdloop() + bookcase = bookcase_selector.result + + while True: + row = input('Row> ') + try: + row = int(row) + except ValueError: + print('Error: row must be a number') + continue + break + + while True: + column = input('Column> ') + try: + column = int(column) + except ValueError: + print('Error: column must be a number') + continue + break + + if self.sql_session.scalars( + select(BookcaseShelf) + .where( + BookcaseShelf.bookcase == bookcase, + BookcaseShelf.row == row, + BookcaseShelf.column == column, + ) + ).one_or_none() is not None: + print(f'Error: a bookshelf in bookcase {bookcase.name} with position {row}-{column} already exists') + return + + description = input('Description> ') + if description == '': + description = None + + shelf = BookcaseShelf( + row, + column, + bookcase, + description, + ) + self.sql_session.add(shelf) + self.sql_session.flush() + + + def _create_bookcase_item(self, isbn: str): + bookcase_item = create_bookcase_item_from_isbn(isbn, self.sql_session) + if bookcase_item is None: + print(f'Could not find data about item with isbn {isbn} online.') + print(f'If you think this is not due to a bug, please add the book to openlibrary.org before continuing.') + return + else: + print(dedent(f""" + Found item: + title: {bookcase_item.name} + authors: {', '.join(a.name for a in bookcase_item.authors)} + language: {bookcase_item.language} + """)) + + print('Please select the bookcase where the item is placed:') + bookcase_selector = InteractiveItemSelector( + cls = Bookcase, + sql_session = self.sql_session, + ) + bookcase_selector.cmdloop() + bookcase = bookcase_selector.result + + def __complete_bookshelf_selection(session: Session, cls: type, arg: str): + args = arg.split('-') + query = select(cls.row, cls.column).where(cls.bookcase == bookcase) + try: + if arg != '' and len(args) > 0: + query = query.where(cls.row == int(args[0])) + if len(args) > 1: + query = query.where(cls.column == int(args[1])) + except ValueError: + return [] + + result = session.execute(query).all() + return [f"{r}-{c}" for r,c in result] + + print('Please select the shelf where the item is placed:') + bookcase_shelf_selector = InteractiveItemSelector( + cls = BookcaseShelf, + sql_session = self.sql_session, + execute_selection = lambda session, cls, arg: session.scalars( + select(cls) + .where( + cls.bookcase == bookcase, + cls.column == int(arg.split('-')[1]), + cls.row == int(arg.split('-')[0]), + ) + ).all(), + complete_selection = __complete_bookshelf_selection, + ) + + bookcase_shelf_selector.cmdloop() + bookcase_item.shelf = bookcase_shelf_selector.result + + print('Please select the items media type:') + media_type_selector = InteractiveItemSelector( + cls = MediaType, + sql_session = self.sql_session, + default = self.sql_session.scalars( + select(MediaType) + .where(MediaType.name.ilike("book")), + ).one(), + ) + + media_type_selector.cmdloop() + bookcase_item.media_type = media_type_selector.result + + username = input('Who owns this book? [PVV]> ') + if username != '': + bookcase_item.owner = username + + self.sql_session.add(bookcase_item) + self.sql_session.flush() + + + def default(self, isbn: str): + isbn = isbn.strip() + if not is_valid_isbn(isbn): + super()._default(isbn) + return + + if (existing_item := self.sql_session.scalars( + select(BookcaseItem) + .where(BookcaseItem.isbn == isbn) + ).one_or_none()) is not None: + print('Found existing BookcaseItem:', existing_item) + BookcaseItemCli( + sql_session = self.sql_session, + bookcase_item = existing_item, + ).cmdloop() + return + + if prompt_yes_no(f"Could not find item with isbn '{isbn}'.\nWould you like to create it?", default=True): + self._create_bookcase_item(isbn) + + + def do_search(self, _: str): + print('TODO: implement search') + + def do_save(self, _:str): + if not self.sql_session_dirty: + print('No changes to save.') + return + self.sql_session.commit() + + + def do_abort(self, _:str): + if not self.sql_session_dirty: + print('No changes to abort.') + return + self.sql_session.rollback() + + + def do_exit(self, _: str): + if self.sql_session_dirty: + if prompt_yes_no('Would you like to save your changes?'): + self.sql_session.commit() + else: + self.sql_session.rollback() + exit(0) + + + funcs = { + 0: { + 'f': default, + 'doc': 'Choose / Add item with its ISBN', + }, + 1: { + 'f': do_list_bookcases, + 'doc': 'List all bookcases', + }, + 2: { + 'f': do_search, + 'doc': 'Search', + }, + 3: { + 'f': do_show_bookcase, + 'doc': 'Show a bookcase, and its items', + }, + 4: { + 'f': do_add_bookcase, + 'doc': 'Add a new bookcase', + }, + 5: { + 'f': do_add_bookcase_shelf, + 'doc': 'Add a new bookshelf', + }, + 6: { + 'f': do_save, + 'doc': 'Save changes', + }, + 7: { + 'f': do_abort, + 'doc': 'Abort changes', + }, + 8: { + 'f': do_exit, + 'doc': 'Exit', + }, + } + + +def main(): + tool = WorblehatCli() + while True: + try: + tool.cmdloop() + except KeyboardInterrupt: + if not tool.sql_session_dirty: + exit(0) + try: + print() + if prompt_yes_no('Are you sure you want to exit without saving?', default=False): + raise KeyboardInterrupt + except KeyboardInterrupt: + if tool.sql_session is not None: + tool.sql_session.rollback() + exit(0) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/worblehat/cli/prompt_utils.py b/worblehat/cli/prompt_utils.py new file mode 100644 index 0000000..a5eaf46 --- /dev/null +++ b/worblehat/cli/prompt_utils.py @@ -0,0 +1,182 @@ +from cmd import Cmd +from typing import Any, Callable + +from sqlalchemy import select +from sqlalchemy.orm import Session + +def prompt_yes_no(question: str, default: bool | None = None) -> bool: + prompt = { + None: '[y/n]', + True: '[Y/n]', + False: '[y/N]', + }[default] + + while not any([ + (answer := input(f'{question} {prompt} ').lower()) in ('y','n'), + (default != None and answer.strip() == '') + ]): + pass + + return { + 'y': True, + 'n': False, + '': default, + }[answer] + + +class InteractiveItemSelector(Cmd): + def __init__( + self, + cls: type, + sql_session: Session, + execute_selection: Callable[[Session, type, str], list[Any]] = lambda session, cls, arg: session.scalars( + select(cls) + .where(cls.name == arg), + ).all(), + complete_selection: Callable[[Session, type, str], list[str]] = lambda session, cls, text: session.scalars( + select(cls.name) + .where(cls.name.ilike(f'{text}%')), + ).all(), + default: Any | None = None, + ): + """ + This is a utility class for prompting the user to select an + item from the database. The default functions assumes that + the item has a name attribute, and that the name is unique. + However, this can be overridden by passing in custom functions. + """ + + super().__init__() + + self.cls = cls + self.sql_session = sql_session + self.execute_selection = execute_selection + self.complete_selection = complete_selection + self.default_item = default + + if default is not None: + self.prompt = f'Select {cls.__name__} [{default.name}]> ' + else: + self.prompt = f'Select {cls.__name__}> ' + + + def emptyline(self) -> bool: + if self.default_item is not None: + self.result = self.default_item + return True + + + def default(self, arg: str): + result = self.execute_selection(self.sql_session, self.cls, arg) + + if len(result) != 1: + print(f'No such {self.cls.__name__} found: {arg}') + return + + self.result = result[0] + return True + + # TODO: Override this function to not act as an argument completer + # but to complete the entire value name + def completedefault(self, text: str, line: str, *_) -> list[str]: + return [] + + def completenames(self, text: str, *_) -> list[str]: + x = self.complete_selection(self.sql_session, self.cls, text) + return x + + +class NumberedCmd(Cmd): + """ + This is a utility class for creating a numbered command line. + + It will automatically generate a prompt that lists all the + available commands, and will automatically call the correct + function based on the user input. + + If the user input is not a number, it will call the default + function, which can be overridden by the subclass. + + Example: + ``` + class MyCmd(NumberedCmd): + def __init__(self): + super().__init__() + + def do_foo(self, arg: str): + pass + + def do_bar(self, arg: str): + pass + + funcs = { + 1: { + 'f': do_foo, + 'doc': 'do foo', + }, + 2: { + 'f': do_bar, + 'doc': 'do bar', + }, + } + ``` + """ + + + prompt_header: str | None = None + + + def __init__(self): + super().__init__() + + + @classmethod + def _generate_usage_list(cls) -> str: + result = '' + for i, func in cls.funcs.items(): + if i == 0: + i = '*' + result += f'{i}) {func["doc"]}\n' + return result + + + def _default(self, arg: str): + try: + i = int(arg) + self.funcs[i] + except (ValueError, KeyError): + return + + self.funcs[i]['f'](self, arg) + + + def default(self, arg: str): + self._default(arg) + + + def _postcmd(self, stop: bool, line: str) -> bool: + print() + print('-----------------') + print() + return stop + + + def postcmd(self, stop: bool, line: str) -> bool: + return self._postcmd(stop, line) + + + @property + def prompt(self): + result = '' + + if self.prompt_header != None: + result += self.prompt_header + '\n' + + result += self._generate_usage_list() + + if self.lastcmd == '': + result += f'> ' + else: + result += f'[{self.lastcmd}]> ' + + return result \ No newline at end of file diff --git a/worblehat/cli/subclis/__init__.py b/worblehat/cli/subclis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/worblehat/cli/subclis/bookcase_item.py b/worblehat/cli/subclis/bookcase_item.py new file mode 100644 index 0000000..2775eb8 --- /dev/null +++ b/worblehat/cli/subclis/bookcase_item.py @@ -0,0 +1,163 @@ +from textwrap import dedent +from sqlalchemy import select + +from sqlalchemy.orm import Session + +from worblehat.cli.prompt_utils import ( + InteractiveItemSelector, + NumberedCmd, + prompt_yes_no, +) +from worblehat.models import ( + BookcaseItem, + Language, + MediaType, +) +from worblehat.services.bookcase_item import ( + create_bookcase_item_from_isbn, + is_valid_isbn, +) + +class BookcaseItemCli(NumberedCmd): + def __init__(self, sql_session: Session, bookcase_item: BookcaseItem): + super().__init__() + self.sql_session = sql_session + self.bookcase_item = bookcase_item + + def do_show(self, _: str): + print(dedent(f""" + Bookcase Item: + Name: {self.bookcase_item.name} + ISBN: {self.bookcase_item.isbn} + Amount: {self.bookcase_item.amount} + Shelf: {self.bookcase_item.shelf.column}-{self.bookcase_item.shelf.row} + Description: {self.bookcase_item.shelf.description} + """)) + + def do_update_data(self, _: str): + item = create_bookcase_item_from_isbn(self.sql_session, self.bookcase_item.isbn) + self.bookcase_item.name = item.name + # TODO: Remove any old authors + self.bookcase_item.authors = item.authors + self.bookcase_item.language = item.language + + def do_edit(self, arg: str): + EditBookcaseCli(self.sql_session, self.bookcase_item, self).cmdloop() + + def do_loan(self, arg: str): + print('TODO: implement loan') + + def do_exit(self, _: str): + return True + + funcs = { + 1: { + 'f': do_show, + 'doc': 'Show bookcase item', + }, + 2: { + 'f': do_update_data, + 'doc': 'Pull updated data from online databases', + }, + 3: { + 'f': do_edit, + 'doc': 'Edit', + }, + 4: { + 'f': do_loan, + 'doc': 'Loan bookcase item', + }, + 5: { + 'f': do_exit, + 'doc': 'Exit', + }, + } + +class EditBookcaseCli(NumberedCmd): + def __init__(self, sql_session: Session, bookcase_item: BookcaseItem, parent: BookcaseItemCli): + super().__init__() + self.sql_session = sql_session + self.bookcase_item = bookcase_item + self.parent = parent + + + def do_name(self, _: str): + while True: + name = input('New name> ') + if name == '': + print('Error: name cannot be empty') + continue + + if self.sql_session.scalars( + select(BookcaseItem) + .where(BookcaseItem.name == name) + ).one_or_none() is not None: + print(f'Error: an item with name {name} already exists') + continue + + break + self.bookcase_item.name = name + + + def do_isbn(self, _: str): + while True: + isbn = input('New ISBN> ') + if isbn == '': + print('Error: ISBN cannot be empty') + continue + + if not is_valid_isbn(isbn): + print('Error: ISBN is not valid') + continue + + if self.sql_session.scalars( + select(BookcaseItem) + .where(BookcaseItem.isbn == isbn) + ).one_or_none() is not None: + print(f'Error: an item with ISBN {isbn} already exists') + continue + + break + + self.bookcase_item.isbn = isbn + + if prompt_yes_no('Update data from online databases?'): + self.parent.do_update_data('') + + + def do_language(self, _: str): + language_selector = InteractiveItemSelector( + Language, + self.sql_session, + ) + + self.bookcase_item.language = language_selector.result + + + def do_media_type(self, _: str): + media_type_selector = InteractiveItemSelector( + MediaType, + self.sql_session, + ) + + self.bookcase_item.media_type = media_type_selector.result + + + def do_amount(self, _: str): + while (new_amount := input(f'New amount [{self.bookcase_item.amount}]> ')) != '': + try: + new_amount = int(new_amount) + except ValueError: + print('Error: amount must be an integer') + continue + + if new_amount < 1: + print('Error: amount must be greater than 0') + continue + + break + self.bookcase_item.amount = new_amount + + + def do_exit(): + return True diff --git a/worblehat/models/BookcaseItem.py b/worblehat/models/BookcaseItem.py index 2ed1a3b..b073704 100644 --- a/worblehat/models/BookcaseItem.py +++ b/worblehat/models/BookcaseItem.py @@ -36,9 +36,9 @@ class BookcaseItem(Base, UidMixin, UniqueNameMixin): owner: Mapped[str] = mapped_column(String, default='PVV') amount: Mapped[int] = mapped_column(SmallInteger, default=1) - fk_media_type_uid: Mapped[int] = mapped_column(Integer, ForeignKey('MediaType.uid')) - fk_bookcase_shelf_uid: Mapped[int | None] = mapped_column(Integer, ForeignKey('BookcaseShelf.uid')) - fk_language_uid: Mapped[int] = mapped_column(Integer, ForeignKey('Language.uid')) + fk_media_type_uid: Mapped[int] = mapped_column(ForeignKey('MediaType.uid')) + fk_bookcase_shelf_uid: Mapped[int | None] = mapped_column(ForeignKey('BookcaseShelf.uid')) + fk_language_uid: Mapped[int | None] = mapped_column(ForeignKey('Language.uid')) media_type: Mapped[MediaType] = relationship(back_populates='items') shelf: Mapped[BookcaseShelf] = relationship(back_populates='items') diff --git a/worblehat/models/BookcaseShelf.py b/worblehat/models/BookcaseShelf.py index 69bea55..fb21248 100644 --- a/worblehat/models/BookcaseShelf.py +++ b/worblehat/models/BookcaseShelf.py @@ -35,7 +35,7 @@ class BookcaseShelf(Base, UidMixin): row: Mapped[int] = mapped_column(SmallInteger) column: Mapped[int] = mapped_column(SmallInteger) - fk_bookcase_uid: Mapped[int] = mapped_column(Integer, ForeignKey('Bookcase.uid')) + fk_bookcase_uid: Mapped[int] = mapped_column(ForeignKey('Bookcase.uid')) bookcase: Mapped[Bookcase] = relationship(back_populates='shelfs') items: Mapped[set[BookcaseItem]] = relationship(back_populates='shelf') diff --git a/worblehat/tools/scanner.py b/worblehat/tools/scanner.py deleted file mode 100644 index e4444e8..0000000 --- a/worblehat/tools/scanner.py +++ /dev/null @@ -1,274 +0,0 @@ -from cmd import Cmd -from typing import Any -from textwrap import dedent - -from sqlalchemy import ( - create_engine, - select, -) -from sqlalchemy.orm import ( - Session, -) - -from worblehat.services.bookcase_item import ( - create_bookcase_item_from_isbn, - is_valid_isbn, -) - -from ..config import Config -from ..models import * - - -def _prompt_yes_no(question: str, default: bool | None = None) -> bool: - prompt = { - None: '[y/n]', - True: '[Y/n]', - False: '[y/N]', - }[default] - - while not any([ - (answer := input(f'{question} {prompt} ').lower()) in ('y','n'), - (default != None and answer.strip() == '') - ]): - pass - - return { - 'y': True, - 'n': False, - '': default, - }[answer] - - -class _InteractiveItemSelector(Cmd): - def __init__( - self, - cls: type, - sql_session: Session, - default: Any | None = None, - ): - super().__init__() - self.cls = cls - self.sql_session = sql_session - self.default_item = default - if default is not None: - self.prompt = f'Select {cls.__name__} [{default.name}]> ' - else: - self.prompt = f'Select {cls.__name__}> ' - - def default(self, arg: str): - if arg == '' and self.default_item is not None: - self.result = self.default_item - return True - - result = self.sql_session.scalars( - select(self.cls) - .where(self.cls.name == arg), - ).all() - - if len(result) != 1: - print(f'No such {self.cls.__name__} found: {arg}') - return - - self.result = result[0] - return True - - def completenames(self, text: str, *_) -> list[str]: - return self.sql_session.scalars( - select(self.cls.name) - .where(self.cls.name.like(f'{text}%')), - ).all() - - -class BookScanTool(Cmd): - prompt = '> ' - intro = """ -Welcome to the worblehat scanner tool. -Start by entering a command, or entering an ISBN of a new item. -Type "help" to see list of commands - """ - - def __init__(self): - super().__init__() - - try: - engine = create_engine(Config.SQLALCHEMY_DATABASE_URI) - self.sql_session = Session(engine) - except Exception: - print('Error: could not connect to database.') - exit(1) - - print(f"Connected to database at '{Config.SQLALCHEMY_DATABASE_URI}'") - - - def do_list_bookcases(self, _: str): - """Usage: list_bookcases""" - bookcase_shelfs = self.sql_session.scalars( - select(BookcaseShelf) - .join(Bookcase) - .order_by( - Bookcase.name, - BookcaseShelf.column, - BookcaseShelf.row, - ) - ).all() - - bookcase_uid = None - for shelf in bookcase_shelfs: - if shelf.bookcase.uid != bookcase_uid: - print(shelf.bookcase.name) - bookcase_uid = shelf.bookcase.uid - - name = f"r{shelf.row}-c{shelf.column}" - if shelf.description is not None: - name += f" [{shelf.description}]" - - print(f' {name} - {sum(i.amount for i in shelf.items)} items') - - - def do_add_bookcase(self, arg: str): - """Usage: add_bookcase [description]""" - arg = arg.split(' ') - if len(arg) < 1: - print('Usage: add_bookcase [description]') - return - - name = arg[0] - description = ' '.join(arg[1:]) - - if self.sql_session.scalars( - select(Bookcase) - .where(Bookcase.name == name) - ).one_or_none() is not None: - print(f'Error: a bookcase with name {name} already exists') - return - - bookcase = Bookcase(name, description) - self.sql_session.add(bookcase) - - def do_add_bookcase_shelf(self, arg: str): - """Usage: add_bookcase_shelf - [description]""" - arg = arg.split(' ') - if len(arg) < 2: - print('Usage: add_bookcase_shelf - [description]') - return - - bookcase_name = arg[0] - row, column = [int(x) for x in arg[1].split('-')] - description = ' '.join(arg[2:]) - - if (bookcase := self.sql_session.scalars( - select(Bookcase) - .where(Bookcase.name == bookcase_name) - ).one_or_none()) is None: - print(f'Error: Could not find bookcase with name {bookcase_name}') - return - - if self.sql_session.scalars( - select(BookcaseShelf) - .where( - BookcaseShelf.bookcase == bookcase, - BookcaseShelf.row == row, - BookcaseShelf.column == column, - ) - ).one_or_none() is not None: - print(f'Error: a bookshelf in bookcase {bookcase.name} with position {row}-{column} already exists') - return - - shelf = BookcaseShelf( - row, - column, - bookcase, - description, - ) - self.sql_session.add(shelf) - - - def do_list_bookcase_items(self, _: str): - """Usage: list_bookcase_items""" - self.columnize([repr(bi) for bi in self.bookcase_items]) - - - def default(self, isbn: str): - isbn = isbn.strip() - if not is_valid_isbn(isbn): - print(f'"{isbn}" is not a valid isbn') - return - - if (existing_item := self.sql_session.scalars( - select(BookcaseItem) - .where(BookcaseItem.isbn == isbn) - ).one_or_none()) is not None: - print('Found existing BookcaseItem:', existing_item) - print(f'There are currently {existing_item.amount} of these in the system.') - if _prompt_yes_no('Would you like to add another?', default=True): - existing_item.amount += 1 - return - - bookcase_item = create_bookcase_item_from_isbn(isbn, self.sql_session) - if bookcase_item is None: - print(f'Could not find data about item with isbn {isbn} online.') - print(f'If you think this is not due to a bug, please add the book to openlibrary.org before continuing.') - return - else: - print(dedent(f""" - Found item: - title: {bookcase_item.name} - authors: {', '.join(a.name for a in bookcase_item.authors)} - language: {bookcase_item.language} - """)) - - print('Please select the shelf where the item is placed:') - bookcase_shelf_selector = _InteractiveItemSelector( - cls = BookcaseShelf, - sql_session = self.sql_session, - ) - - bookcase_shelf_selector.cmdloop() - bookcase_item.shelf = bookcase_shelf_selector.result - - print('Please select the items media type:') - media_type_selector = _InteractiveItemSelector( - cls = MediaType, - sql_session = self.sql_session, - default = self.sql_session.scalars( - select(MediaType) - .where(MediaType.name.ilike("book")), - ).one(), - ) - - media_type_selector.cmdloop() - bookcase_item.media_type = media_type_selector.result - - username = input('Who owns this book? [PVV]: ') - if username != '': - bookcase_item.owner = username - - self.sql_session.add(bookcase_item) - - - def do_exit(self, _: str): - """Usage: exit""" - if _prompt_yes_no('Would you like to save your changes?'): - self.sql_session.commit() - else: - self.sql_session.rollback() - exit(0) - - -def main(): - tool = BookScanTool() - while True: - try: - tool.cmdloop() - except KeyboardInterrupt: - try: - print() - if _prompt_yes_no('Are you sure you want to exit without saving?', default=False): - raise KeyboardInterrupt - except KeyboardInterrupt: - if tool.sql_session is not None: - tool.sql_session.rollback() - exit(0) - -if __name__ == '__main__': - main() \ No newline at end of file