Start converting the scanner tool into a cli tool
The scanner tool is about to become a fullfledged repl, much in the same manner as dibbler. Changes: - Rename `tools/scanner.py` -> `cli/main.py` - Create abstraction layer for Cmd with numbered commands - Create submenu for items - Lots of functionality edits - Fill out more todos in REAMDE
This commit is contained in:
parent
c445cb2dbd
commit
3525e84576
25
README.md
25
README.md
|
@ -35,7 +35,7 @@ This project uses [poetry][poetry] as its buildtool as of May 2023.
|
|||
```console
|
||||
$ poetry install
|
||||
$ poetry run alembic migrate
|
||||
$ poetry run scanner
|
||||
$ poetry run cli
|
||||
$ poetry run dev
|
||||
```
|
||||
|
||||
|
@ -46,11 +46,26 @@ See `worblehat/config.py` for configurable settings.
|
|||
## TODO List
|
||||
|
||||
- [ ] High priority:
|
||||
- [ ] Book ingestion tool in order to create the database in a quick manner. It should pull data from online ISBN databases (this is almost done)
|
||||
- [ ] A database with all of PVVs books should be created
|
||||
- [ ] Ability to request book loans for PVV members, e.g. through Dibblers interface.
|
||||
- [X] Data ingestion logic, that will pull data from online databases based on ISBN.
|
||||
- [ ] Cli version of the program (this is currently being worked on).
|
||||
- [ ] Web version of the program
|
||||
- [ ] Setting up a database with all of PVVs books
|
||||
- [ ] Creating database with user and pw
|
||||
- [ ] Model all bookshelfs
|
||||
- [ ] Scan in all books
|
||||
- [ ] Inner workings
|
||||
- [X] Ability to create and update bookcases
|
||||
- [X] Ability to create and update bookcase shelfs
|
||||
- [~] Ability to create and update bookcase items
|
||||
- [ ] Ability to search for books
|
||||
- [ ] Ability to request book loans for PVV members
|
||||
- [ ] Ability to queue book loans for PVV members
|
||||
- [ ] Ability to be notified when books are available
|
||||
- [ ] Ability to be notified when deadlines are due
|
||||
- [ ] Ascii art of monkey
|
||||
- [ ] Low priority:
|
||||
- [ ] Ability for PVV members to request book loans through the PVV website
|
||||
- [ ] Ability for PVV members to search for books through the PVV website
|
||||
- [ ] Discussion
|
||||
- [ ] Should this project run in a separate tty-instance on Dibblers interface, or should they share the tty with some kind of switching ability?
|
||||
- [ ] Should this project run in a separate tty-instance on Dibblers interface, or should they share the tty with some kind of switching ability?
|
||||
After some discussion with other PVV members, we came up with an idea where we run the programs in separate ttys, and use a set of large mechanical switches connected to a QMK-flashed microcontroller to switch between them.
|
|
@ -15,7 +15,7 @@
|
|||
in {
|
||||
default = self.apps.${system}.dev;
|
||||
dev = app "${self.packages.${system}.worblehat}/bin/dev";
|
||||
scanner = app "${self.packages.${system}.worblehat}/bin/scanner";
|
||||
cli = app "${self.packages.${system}.worblehat}/bin/cli";
|
||||
};
|
||||
|
||||
packages.${system} = {
|
||||
|
|
|
@ -21,7 +21,7 @@ sqlalchemy = "^2.0.8"
|
|||
werkzeug = "^2.3.3"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
scanner = "worblehat.tools.scanner:main"
|
||||
cli = "worblehat.cli.main:main"
|
||||
dev = "worblehat.wsgi_dev:main"
|
||||
|
||||
[build-system]
|
||||
|
|
|
@ -0,0 +1,352 @@
|
|||
from textwrap import dedent
|
||||
|
||||
from sqlalchemy import (
|
||||
create_engine,
|
||||
event,
|
||||
select,
|
||||
)
|
||||
from sqlalchemy.orm import (
|
||||
Session,
|
||||
)
|
||||
from worblehat.cli.subclis.bookcase_item import BookcaseItemCli
|
||||
|
||||
from worblehat.services.bookcase_item import (
|
||||
create_bookcase_item_from_isbn,
|
||||
is_valid_isbn,
|
||||
)
|
||||
|
||||
from .prompt_utils import *
|
||||
|
||||
from worblehat.config import Config
|
||||
from worblehat.models import *
|
||||
|
||||
# TODO: Category seems to have been forgotten. Maybe relevant interactivity should be added?
|
||||
# However, is there anyone who are going to search by category rather than just look in
|
||||
# the shelves?
|
||||
|
||||
class WorblehatCli(NumberedCmd):
|
||||
sql_session: Session
|
||||
sql_session_dirty: bool = False
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
try:
|
||||
engine = create_engine(Config.SQLALCHEMY_DATABASE_URI)
|
||||
self.sql_session = Session(engine)
|
||||
except Exception:
|
||||
print('Error: could not connect to database.')
|
||||
exit(1)
|
||||
|
||||
@event.listens_for(self.sql_session, 'after_flush')
|
||||
def mark_session_as_dirty(*_):
|
||||
self.sql_session_dirty = True
|
||||
self.prompt_header = f'(unsaved changes)'
|
||||
|
||||
@event.listens_for(self.sql_session, 'after_commit')
|
||||
@event.listens_for(self.sql_session, 'after_rollback')
|
||||
def mark_session_as_clean(*_):
|
||||
self.sql_session_dirty = False
|
||||
self.prompt_header = None
|
||||
|
||||
print(f"Debug: Connected to database at '{Config.SQLALCHEMY_DATABASE_URI}'")
|
||||
|
||||
|
||||
def do_list_bookcases(self, _: str):
|
||||
bookcase_shelfs = self.sql_session.scalars(
|
||||
select(BookcaseShelf)
|
||||
.join(Bookcase)
|
||||
.order_by(
|
||||
Bookcase.name,
|
||||
BookcaseShelf.column,
|
||||
BookcaseShelf.row,
|
||||
)
|
||||
).all()
|
||||
|
||||
bookcase_uid = None
|
||||
for shelf in bookcase_shelfs:
|
||||
if shelf.bookcase.uid != bookcase_uid:
|
||||
print(shelf.bookcase.name)
|
||||
bookcase_uid = shelf.bookcase.uid
|
||||
|
||||
name = f"r{shelf.row}-c{shelf.column}"
|
||||
if shelf.description is not None:
|
||||
name += f" [{shelf.description}]"
|
||||
|
||||
print(f' {name} - {sum(i.amount for i in shelf.items)} items')
|
||||
|
||||
|
||||
def do_show_bookcase(self, arg: str):
|
||||
bookcase_selector = InteractiveItemSelector(
|
||||
cls = Bookcase,
|
||||
sql_session = self.sql_session,
|
||||
)
|
||||
bookcase_selector.cmdloop()
|
||||
bookcase = bookcase_selector.result
|
||||
|
||||
for shelf in bookcase.shelfs:
|
||||
name = f"r{shelf.row}-c{shelf.column}"
|
||||
if shelf.description is not None:
|
||||
name += f" [{shelf.description}]"
|
||||
print(name)
|
||||
for item in shelf.items:
|
||||
print(f' {item.name} - {item.amount} copies')
|
||||
|
||||
|
||||
def do_add_bookcase(self, _: str):
|
||||
while True:
|
||||
name = input('Name of bookcase> ')
|
||||
if name == '':
|
||||
print('Error: name cannot be empty')
|
||||
continue
|
||||
|
||||
if self.sql_session.scalars(
|
||||
select(Bookcase)
|
||||
.where(Bookcase.name == name)
|
||||
).one_or_none() is not None:
|
||||
print(f'Error: a bookcase with name {name} already exists')
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
description = input('Description of bookcase> ')
|
||||
if description == '':
|
||||
description = None
|
||||
|
||||
bookcase = Bookcase(name, description)
|
||||
self.sql_session.add(bookcase)
|
||||
self.sql_session.flush()
|
||||
|
||||
|
||||
def do_add_bookcase_shelf(self, arg: str):
|
||||
bookcase_selector = InteractiveItemSelector(
|
||||
cls = Bookcase,
|
||||
sql_session = self.sql_session,
|
||||
)
|
||||
bookcase_selector.cmdloop()
|
||||
bookcase = bookcase_selector.result
|
||||
|
||||
while True:
|
||||
row = input('Row> ')
|
||||
try:
|
||||
row = int(row)
|
||||
except ValueError:
|
||||
print('Error: row must be a number')
|
||||
continue
|
||||
break
|
||||
|
||||
while True:
|
||||
column = input('Column> ')
|
||||
try:
|
||||
column = int(column)
|
||||
except ValueError:
|
||||
print('Error: column must be a number')
|
||||
continue
|
||||
break
|
||||
|
||||
if self.sql_session.scalars(
|
||||
select(BookcaseShelf)
|
||||
.where(
|
||||
BookcaseShelf.bookcase == bookcase,
|
||||
BookcaseShelf.row == row,
|
||||
BookcaseShelf.column == column,
|
||||
)
|
||||
).one_or_none() is not None:
|
||||
print(f'Error: a bookshelf in bookcase {bookcase.name} with position {row}-{column} already exists')
|
||||
return
|
||||
|
||||
description = input('Description> ')
|
||||
if description == '':
|
||||
description = None
|
||||
|
||||
shelf = BookcaseShelf(
|
||||
row,
|
||||
column,
|
||||
bookcase,
|
||||
description,
|
||||
)
|
||||
self.sql_session.add(shelf)
|
||||
self.sql_session.flush()
|
||||
|
||||
|
||||
def _create_bookcase_item(self, isbn: str):
|
||||
bookcase_item = create_bookcase_item_from_isbn(isbn, self.sql_session)
|
||||
if bookcase_item is None:
|
||||
print(f'Could not find data about item with isbn {isbn} online.')
|
||||
print(f'If you think this is not due to a bug, please add the book to openlibrary.org before continuing.')
|
||||
return
|
||||
else:
|
||||
print(dedent(f"""
|
||||
Found item:
|
||||
title: {bookcase_item.name}
|
||||
authors: {', '.join(a.name for a in bookcase_item.authors)}
|
||||
language: {bookcase_item.language}
|
||||
"""))
|
||||
|
||||
print('Please select the bookcase where the item is placed:')
|
||||
bookcase_selector = InteractiveItemSelector(
|
||||
cls = Bookcase,
|
||||
sql_session = self.sql_session,
|
||||
)
|
||||
bookcase_selector.cmdloop()
|
||||
bookcase = bookcase_selector.result
|
||||
|
||||
def __complete_bookshelf_selection(session: Session, cls: type, arg: str):
|
||||
args = arg.split('-')
|
||||
query = select(cls.row, cls.column).where(cls.bookcase == bookcase)
|
||||
try:
|
||||
if arg != '' and len(args) > 0:
|
||||
query = query.where(cls.row == int(args[0]))
|
||||
if len(args) > 1:
|
||||
query = query.where(cls.column == int(args[1]))
|
||||
except ValueError:
|
||||
return []
|
||||
|
||||
result = session.execute(query).all()
|
||||
return [f"{r}-{c}" for r,c in result]
|
||||
|
||||
print('Please select the shelf where the item is placed:')
|
||||
bookcase_shelf_selector = InteractiveItemSelector(
|
||||
cls = BookcaseShelf,
|
||||
sql_session = self.sql_session,
|
||||
execute_selection = lambda session, cls, arg: session.scalars(
|
||||
select(cls)
|
||||
.where(
|
||||
cls.bookcase == bookcase,
|
||||
cls.column == int(arg.split('-')[1]),
|
||||
cls.row == int(arg.split('-')[0]),
|
||||
)
|
||||
).all(),
|
||||
complete_selection = __complete_bookshelf_selection,
|
||||
)
|
||||
|
||||
bookcase_shelf_selector.cmdloop()
|
||||
bookcase_item.shelf = bookcase_shelf_selector.result
|
||||
|
||||
print('Please select the items media type:')
|
||||
media_type_selector = InteractiveItemSelector(
|
||||
cls = MediaType,
|
||||
sql_session = self.sql_session,
|
||||
default = self.sql_session.scalars(
|
||||
select(MediaType)
|
||||
.where(MediaType.name.ilike("book")),
|
||||
).one(),
|
||||
)
|
||||
|
||||
media_type_selector.cmdloop()
|
||||
bookcase_item.media_type = media_type_selector.result
|
||||
|
||||
username = input('Who owns this book? [PVV]> ')
|
||||
if username != '':
|
||||
bookcase_item.owner = username
|
||||
|
||||
self.sql_session.add(bookcase_item)
|
||||
self.sql_session.flush()
|
||||
|
||||
|
||||
def default(self, isbn: str):
|
||||
isbn = isbn.strip()
|
||||
if not is_valid_isbn(isbn):
|
||||
super()._default(isbn)
|
||||
return
|
||||
|
||||
if (existing_item := self.sql_session.scalars(
|
||||
select(BookcaseItem)
|
||||
.where(BookcaseItem.isbn == isbn)
|
||||
).one_or_none()) is not None:
|
||||
print('Found existing BookcaseItem:', existing_item)
|
||||
BookcaseItemCli(
|
||||
sql_session = self.sql_session,
|
||||
bookcase_item = existing_item,
|
||||
).cmdloop()
|
||||
return
|
||||
|
||||
if prompt_yes_no(f"Could not find item with isbn '{isbn}'.\nWould you like to create it?", default=True):
|
||||
self._create_bookcase_item(isbn)
|
||||
|
||||
|
||||
def do_search(self, _: str):
|
||||
print('TODO: implement search')
|
||||
|
||||
def do_save(self, _:str):
|
||||
if not self.sql_session_dirty:
|
||||
print('No changes to save.')
|
||||
return
|
||||
self.sql_session.commit()
|
||||
|
||||
|
||||
def do_abort(self, _:str):
|
||||
if not self.sql_session_dirty:
|
||||
print('No changes to abort.')
|
||||
return
|
||||
self.sql_session.rollback()
|
||||
|
||||
|
||||
def do_exit(self, _: str):
|
||||
if self.sql_session_dirty:
|
||||
if prompt_yes_no('Would you like to save your changes?'):
|
||||
self.sql_session.commit()
|
||||
else:
|
||||
self.sql_session.rollback()
|
||||
exit(0)
|
||||
|
||||
|
||||
funcs = {
|
||||
0: {
|
||||
'f': default,
|
||||
'doc': 'Choose / Add item with its ISBN',
|
||||
},
|
||||
1: {
|
||||
'f': do_list_bookcases,
|
||||
'doc': 'List all bookcases',
|
||||
},
|
||||
2: {
|
||||
'f': do_search,
|
||||
'doc': 'Search',
|
||||
},
|
||||
3: {
|
||||
'f': do_show_bookcase,
|
||||
'doc': 'Show a bookcase, and its items',
|
||||
},
|
||||
4: {
|
||||
'f': do_add_bookcase,
|
||||
'doc': 'Add a new bookcase',
|
||||
},
|
||||
5: {
|
||||
'f': do_add_bookcase_shelf,
|
||||
'doc': 'Add a new bookshelf',
|
||||
},
|
||||
6: {
|
||||
'f': do_save,
|
||||
'doc': 'Save changes',
|
||||
},
|
||||
7: {
|
||||
'f': do_abort,
|
||||
'doc': 'Abort changes',
|
||||
},
|
||||
8: {
|
||||
'f': do_exit,
|
||||
'doc': 'Exit',
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
tool = WorblehatCli()
|
||||
while True:
|
||||
try:
|
||||
tool.cmdloop()
|
||||
except KeyboardInterrupt:
|
||||
if not tool.sql_session_dirty:
|
||||
exit(0)
|
||||
try:
|
||||
print()
|
||||
if prompt_yes_no('Are you sure you want to exit without saving?', default=False):
|
||||
raise KeyboardInterrupt
|
||||
except KeyboardInterrupt:
|
||||
if tool.sql_session is not None:
|
||||
tool.sql_session.rollback()
|
||||
exit(0)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,182 @@
|
|||
from cmd import Cmd
|
||||
from typing import Any, Callable
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
def prompt_yes_no(question: str, default: bool | None = None) -> bool:
|
||||
prompt = {
|
||||
None: '[y/n]',
|
||||
True: '[Y/n]',
|
||||
False: '[y/N]',
|
||||
}[default]
|
||||
|
||||
while not any([
|
||||
(answer := input(f'{question} {prompt} ').lower()) in ('y','n'),
|
||||
(default != None and answer.strip() == '')
|
||||
]):
|
||||
pass
|
||||
|
||||
return {
|
||||
'y': True,
|
||||
'n': False,
|
||||
'': default,
|
||||
}[answer]
|
||||
|
||||
|
||||
class InteractiveItemSelector(Cmd):
|
||||
def __init__(
|
||||
self,
|
||||
cls: type,
|
||||
sql_session: Session,
|
||||
execute_selection: Callable[[Session, type, str], list[Any]] = lambda session, cls, arg: session.scalars(
|
||||
select(cls)
|
||||
.where(cls.name == arg),
|
||||
).all(),
|
||||
complete_selection: Callable[[Session, type, str], list[str]] = lambda session, cls, text: session.scalars(
|
||||
select(cls.name)
|
||||
.where(cls.name.ilike(f'{text}%')),
|
||||
).all(),
|
||||
default: Any | None = None,
|
||||
):
|
||||
"""
|
||||
This is a utility class for prompting the user to select an
|
||||
item from the database. The default functions assumes that
|
||||
the item has a name attribute, and that the name is unique.
|
||||
However, this can be overridden by passing in custom functions.
|
||||
"""
|
||||
|
||||
super().__init__()
|
||||
|
||||
self.cls = cls
|
||||
self.sql_session = sql_session
|
||||
self.execute_selection = execute_selection
|
||||
self.complete_selection = complete_selection
|
||||
self.default_item = default
|
||||
|
||||
if default is not None:
|
||||
self.prompt = f'Select {cls.__name__} [{default.name}]> '
|
||||
else:
|
||||
self.prompt = f'Select {cls.__name__}> '
|
||||
|
||||
|
||||
def emptyline(self) -> bool:
|
||||
if self.default_item is not None:
|
||||
self.result = self.default_item
|
||||
return True
|
||||
|
||||
|
||||
def default(self, arg: str):
|
||||
result = self.execute_selection(self.sql_session, self.cls, arg)
|
||||
|
||||
if len(result) != 1:
|
||||
print(f'No such {self.cls.__name__} found: {arg}')
|
||||
return
|
||||
|
||||
self.result = result[0]
|
||||
return True
|
||||
|
||||
# TODO: Override this function to not act as an argument completer
|
||||
# but to complete the entire value name
|
||||
def completedefault(self, text: str, line: str, *_) -> list[str]:
|
||||
return []
|
||||
|
||||
def completenames(self, text: str, *_) -> list[str]:
|
||||
x = self.complete_selection(self.sql_session, self.cls, text)
|
||||
return x
|
||||
|
||||
|
||||
class NumberedCmd(Cmd):
|
||||
"""
|
||||
This is a utility class for creating a numbered command line.
|
||||
|
||||
It will automatically generate a prompt that lists all the
|
||||
available commands, and will automatically call the correct
|
||||
function based on the user input.
|
||||
|
||||
If the user input is not a number, it will call the default
|
||||
function, which can be overridden by the subclass.
|
||||
|
||||
Example:
|
||||
```
|
||||
class MyCmd(NumberedCmd):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def do_foo(self, arg: str):
|
||||
pass
|
||||
|
||||
def do_bar(self, arg: str):
|
||||
pass
|
||||
|
||||
funcs = {
|
||||
1: {
|
||||
'f': do_foo,
|
||||
'doc': 'do foo',
|
||||
},
|
||||
2: {
|
||||
'f': do_bar,
|
||||
'doc': 'do bar',
|
||||
},
|
||||
}
|
||||
```
|
||||
"""
|
||||
|
||||
|
||||
prompt_header: str | None = None
|
||||
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
|
||||
@classmethod
|
||||
def _generate_usage_list(cls) -> str:
|
||||
result = ''
|
||||
for i, func in cls.funcs.items():
|
||||
if i == 0:
|
||||
i = '*'
|
||||
result += f'{i}) {func["doc"]}\n'
|
||||
return result
|
||||
|
||||
|
||||
def _default(self, arg: str):
|
||||
try:
|
||||
i = int(arg)
|
||||
self.funcs[i]
|
||||
except (ValueError, KeyError):
|
||||
return
|
||||
|
||||
self.funcs[i]['f'](self, arg)
|
||||
|
||||
|
||||
def default(self, arg: str):
|
||||
self._default(arg)
|
||||
|
||||
|
||||
def _postcmd(self, stop: bool, line: str) -> bool:
|
||||
print()
|
||||
print('-----------------')
|
||||
print()
|
||||
return stop
|
||||
|
||||
|
||||
def postcmd(self, stop: bool, line: str) -> bool:
|
||||
return self._postcmd(stop, line)
|
||||
|
||||
|
||||
@property
|
||||
def prompt(self):
|
||||
result = ''
|
||||
|
||||
if self.prompt_header != None:
|
||||
result += self.prompt_header + '\n'
|
||||
|
||||
result += self._generate_usage_list()
|
||||
|
||||
if self.lastcmd == '':
|
||||
result += f'> '
|
||||
else:
|
||||
result += f'[{self.lastcmd}]> '
|
||||
|
||||
return result
|
|
@ -0,0 +1,163 @@
|
|||
from textwrap import dedent
|
||||
from sqlalchemy import select
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from worblehat.cli.prompt_utils import (
|
||||
InteractiveItemSelector,
|
||||
NumberedCmd,
|
||||
prompt_yes_no,
|
||||
)
|
||||
from worblehat.models import (
|
||||
BookcaseItem,
|
||||
Language,
|
||||
MediaType,
|
||||
)
|
||||
from worblehat.services.bookcase_item import (
|
||||
create_bookcase_item_from_isbn,
|
||||
is_valid_isbn,
|
||||
)
|
||||
|
||||
class BookcaseItemCli(NumberedCmd):
|
||||
def __init__(self, sql_session: Session, bookcase_item: BookcaseItem):
|
||||
super().__init__()
|
||||
self.sql_session = sql_session
|
||||
self.bookcase_item = bookcase_item
|
||||
|
||||
def do_show(self, _: str):
|
||||
print(dedent(f"""
|
||||
Bookcase Item:
|
||||
Name: {self.bookcase_item.name}
|
||||
ISBN: {self.bookcase_item.isbn}
|
||||
Amount: {self.bookcase_item.amount}
|
||||
Shelf: {self.bookcase_item.shelf.column}-{self.bookcase_item.shelf.row}
|
||||
Description: {self.bookcase_item.shelf.description}
|
||||
"""))
|
||||
|
||||
def do_update_data(self, _: str):
|
||||
item = create_bookcase_item_from_isbn(self.sql_session, self.bookcase_item.isbn)
|
||||
self.bookcase_item.name = item.name
|
||||
# TODO: Remove any old authors
|
||||
self.bookcase_item.authors = item.authors
|
||||
self.bookcase_item.language = item.language
|
||||
|
||||
def do_edit(self, arg: str):
|
||||
EditBookcaseCli(self.sql_session, self.bookcase_item, self).cmdloop()
|
||||
|
||||
def do_loan(self, arg: str):
|
||||
print('TODO: implement loan')
|
||||
|
||||
def do_exit(self, _: str):
|
||||
return True
|
||||
|
||||
funcs = {
|
||||
1: {
|
||||
'f': do_show,
|
||||
'doc': 'Show bookcase item',
|
||||
},
|
||||
2: {
|
||||
'f': do_update_data,
|
||||
'doc': 'Pull updated data from online databases',
|
||||
},
|
||||
3: {
|
||||
'f': do_edit,
|
||||
'doc': 'Edit',
|
||||
},
|
||||
4: {
|
||||
'f': do_loan,
|
||||
'doc': 'Loan bookcase item',
|
||||
},
|
||||
5: {
|
||||
'f': do_exit,
|
||||
'doc': 'Exit',
|
||||
},
|
||||
}
|
||||
|
||||
class EditBookcaseCli(NumberedCmd):
|
||||
def __init__(self, sql_session: Session, bookcase_item: BookcaseItem, parent: BookcaseItemCli):
|
||||
super().__init__()
|
||||
self.sql_session = sql_session
|
||||
self.bookcase_item = bookcase_item
|
||||
self.parent = parent
|
||||
|
||||
|
||||
def do_name(self, _: str):
|
||||
while True:
|
||||
name = input('New name> ')
|
||||
if name == '':
|
||||
print('Error: name cannot be empty')
|
||||
continue
|
||||
|
||||
if self.sql_session.scalars(
|
||||
select(BookcaseItem)
|
||||
.where(BookcaseItem.name == name)
|
||||
).one_or_none() is not None:
|
||||
print(f'Error: an item with name {name} already exists')
|
||||
continue
|
||||
|
||||
break
|
||||
self.bookcase_item.name = name
|
||||
|
||||
|
||||
def do_isbn(self, _: str):
|
||||
while True:
|
||||
isbn = input('New ISBN> ')
|
||||
if isbn == '':
|
||||
print('Error: ISBN cannot be empty')
|
||||
continue
|
||||
|
||||
if not is_valid_isbn(isbn):
|
||||
print('Error: ISBN is not valid')
|
||||
continue
|
||||
|
||||
if self.sql_session.scalars(
|
||||
select(BookcaseItem)
|
||||
.where(BookcaseItem.isbn == isbn)
|
||||
).one_or_none() is not None:
|
||||
print(f'Error: an item with ISBN {isbn} already exists')
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
self.bookcase_item.isbn = isbn
|
||||
|
||||
if prompt_yes_no('Update data from online databases?'):
|
||||
self.parent.do_update_data('')
|
||||
|
||||
|
||||
def do_language(self, _: str):
|
||||
language_selector = InteractiveItemSelector(
|
||||
Language,
|
||||
self.sql_session,
|
||||
)
|
||||
|
||||
self.bookcase_item.language = language_selector.result
|
||||
|
||||
|
||||
def do_media_type(self, _: str):
|
||||
media_type_selector = InteractiveItemSelector(
|
||||
MediaType,
|
||||
self.sql_session,
|
||||
)
|
||||
|
||||
self.bookcase_item.media_type = media_type_selector.result
|
||||
|
||||
|
||||
def do_amount(self, _: str):
|
||||
while (new_amount := input(f'New amount [{self.bookcase_item.amount}]> ')) != '':
|
||||
try:
|
||||
new_amount = int(new_amount)
|
||||
except ValueError:
|
||||
print('Error: amount must be an integer')
|
||||
continue
|
||||
|
||||
if new_amount < 1:
|
||||
print('Error: amount must be greater than 0')
|
||||
continue
|
||||
|
||||
break
|
||||
self.bookcase_item.amount = new_amount
|
||||
|
||||
|
||||
def do_exit():
|
||||
return True
|
|
@ -36,9 +36,9 @@ class BookcaseItem(Base, UidMixin, UniqueNameMixin):
|
|||
owner: Mapped[str] = mapped_column(String, default='PVV')
|
||||
amount: Mapped[int] = mapped_column(SmallInteger, default=1)
|
||||
|
||||
fk_media_type_uid: Mapped[int] = mapped_column(Integer, ForeignKey('MediaType.uid'))
|
||||
fk_bookcase_shelf_uid: Mapped[int | None] = mapped_column(Integer, ForeignKey('BookcaseShelf.uid'))
|
||||
fk_language_uid: Mapped[int] = mapped_column(Integer, ForeignKey('Language.uid'))
|
||||
fk_media_type_uid: Mapped[int] = mapped_column(ForeignKey('MediaType.uid'))
|
||||
fk_bookcase_shelf_uid: Mapped[int | None] = mapped_column(ForeignKey('BookcaseShelf.uid'))
|
||||
fk_language_uid: Mapped[int | None] = mapped_column(ForeignKey('Language.uid'))
|
||||
|
||||
media_type: Mapped[MediaType] = relationship(back_populates='items')
|
||||
shelf: Mapped[BookcaseShelf] = relationship(back_populates='items')
|
||||
|
|
|
@ -35,7 +35,7 @@ class BookcaseShelf(Base, UidMixin):
|
|||
row: Mapped[int] = mapped_column(SmallInteger)
|
||||
column: Mapped[int] = mapped_column(SmallInteger)
|
||||
|
||||
fk_bookcase_uid: Mapped[int] = mapped_column(Integer, ForeignKey('Bookcase.uid'))
|
||||
fk_bookcase_uid: Mapped[int] = mapped_column(ForeignKey('Bookcase.uid'))
|
||||
|
||||
bookcase: Mapped[Bookcase] = relationship(back_populates='shelfs')
|
||||
items: Mapped[set[BookcaseItem]] = relationship(back_populates='shelf')
|
||||
|
|
|
@ -1,274 +0,0 @@
|
|||
from cmd import Cmd
|
||||
from typing import Any
|
||||
from textwrap import dedent
|
||||
|
||||
from sqlalchemy import (
|
||||
create_engine,
|
||||
select,
|
||||
)
|
||||
from sqlalchemy.orm import (
|
||||
Session,
|
||||
)
|
||||
|
||||
from worblehat.services.bookcase_item import (
|
||||
create_bookcase_item_from_isbn,
|
||||
is_valid_isbn,
|
||||
)
|
||||
|
||||
from ..config import Config
|
||||
from ..models import *
|
||||
|
||||
|
||||
def _prompt_yes_no(question: str, default: bool | None = None) -> bool:
|
||||
prompt = {
|
||||
None: '[y/n]',
|
||||
True: '[Y/n]',
|
||||
False: '[y/N]',
|
||||
}[default]
|
||||
|
||||
while not any([
|
||||
(answer := input(f'{question} {prompt} ').lower()) in ('y','n'),
|
||||
(default != None and answer.strip() == '')
|
||||
]):
|
||||
pass
|
||||
|
||||
return {
|
||||
'y': True,
|
||||
'n': False,
|
||||
'': default,
|
||||
}[answer]
|
||||
|
||||
|
||||
class _InteractiveItemSelector(Cmd):
|
||||
def __init__(
|
||||
self,
|
||||
cls: type,
|
||||
sql_session: Session,
|
||||
default: Any | None = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.cls = cls
|
||||
self.sql_session = sql_session
|
||||
self.default_item = default
|
||||
if default is not None:
|
||||
self.prompt = f'Select {cls.__name__} [{default.name}]> '
|
||||
else:
|
||||
self.prompt = f'Select {cls.__name__}> '
|
||||
|
||||
def default(self, arg: str):
|
||||
if arg == '' and self.default_item is not None:
|
||||
self.result = self.default_item
|
||||
return True
|
||||
|
||||
result = self.sql_session.scalars(
|
||||
select(self.cls)
|
||||
.where(self.cls.name == arg),
|
||||
).all()
|
||||
|
||||
if len(result) != 1:
|
||||
print(f'No such {self.cls.__name__} found: {arg}')
|
||||
return
|
||||
|
||||
self.result = result[0]
|
||||
return True
|
||||
|
||||
def completenames(self, text: str, *_) -> list[str]:
|
||||
return self.sql_session.scalars(
|
||||
select(self.cls.name)
|
||||
.where(self.cls.name.like(f'{text}%')),
|
||||
).all()
|
||||
|
||||
|
||||
class BookScanTool(Cmd):
|
||||
prompt = '> '
|
||||
intro = """
|
||||
Welcome to the worblehat scanner tool.
|
||||
Start by entering a command, or entering an ISBN of a new item.
|
||||
Type "help" to see list of commands
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
try:
|
||||
engine = create_engine(Config.SQLALCHEMY_DATABASE_URI)
|
||||
self.sql_session = Session(engine)
|
||||
except Exception:
|
||||
print('Error: could not connect to database.')
|
||||
exit(1)
|
||||
|
||||
print(f"Connected to database at '{Config.SQLALCHEMY_DATABASE_URI}'")
|
||||
|
||||
|
||||
def do_list_bookcases(self, _: str):
|
||||
"""Usage: list_bookcases"""
|
||||
bookcase_shelfs = self.sql_session.scalars(
|
||||
select(BookcaseShelf)
|
||||
.join(Bookcase)
|
||||
.order_by(
|
||||
Bookcase.name,
|
||||
BookcaseShelf.column,
|
||||
BookcaseShelf.row,
|
||||
)
|
||||
).all()
|
||||
|
||||
bookcase_uid = None
|
||||
for shelf in bookcase_shelfs:
|
||||
if shelf.bookcase.uid != bookcase_uid:
|
||||
print(shelf.bookcase.name)
|
||||
bookcase_uid = shelf.bookcase.uid
|
||||
|
||||
name = f"r{shelf.row}-c{shelf.column}"
|
||||
if shelf.description is not None:
|
||||
name += f" [{shelf.description}]"
|
||||
|
||||
print(f' {name} - {sum(i.amount for i in shelf.items)} items')
|
||||
|
||||
|
||||
def do_add_bookcase(self, arg: str):
|
||||
"""Usage: add_bookcase <name> [description]"""
|
||||
arg = arg.split(' ')
|
||||
if len(arg) < 1:
|
||||
print('Usage: add_bookcase <name> [description]')
|
||||
return
|
||||
|
||||
name = arg[0]
|
||||
description = ' '.join(arg[1:])
|
||||
|
||||
if self.sql_session.scalars(
|
||||
select(Bookcase)
|
||||
.where(Bookcase.name == name)
|
||||
).one_or_none() is not None:
|
||||
print(f'Error: a bookcase with name {name} already exists')
|
||||
return
|
||||
|
||||
bookcase = Bookcase(name, description)
|
||||
self.sql_session.add(bookcase)
|
||||
|
||||
def do_add_bookcase_shelf(self, arg: str):
|
||||
"""Usage: add_bookcase_shelf <bookcase_name> <row>-<column> [description]"""
|
||||
arg = arg.split(' ')
|
||||
if len(arg) < 2:
|
||||
print('Usage: add_bookcase_shelf <bookcase_name> <row>-<column> [description]')
|
||||
return
|
||||
|
||||
bookcase_name = arg[0]
|
||||
row, column = [int(x) for x in arg[1].split('-')]
|
||||
description = ' '.join(arg[2:])
|
||||
|
||||
if (bookcase := self.sql_session.scalars(
|
||||
select(Bookcase)
|
||||
.where(Bookcase.name == bookcase_name)
|
||||
).one_or_none()) is None:
|
||||
print(f'Error: Could not find bookcase with name {bookcase_name}')
|
||||
return
|
||||
|
||||
if self.sql_session.scalars(
|
||||
select(BookcaseShelf)
|
||||
.where(
|
||||
BookcaseShelf.bookcase == bookcase,
|
||||
BookcaseShelf.row == row,
|
||||
BookcaseShelf.column == column,
|
||||
)
|
||||
).one_or_none() is not None:
|
||||
print(f'Error: a bookshelf in bookcase {bookcase.name} with position {row}-{column} already exists')
|
||||
return
|
||||
|
||||
shelf = BookcaseShelf(
|
||||
row,
|
||||
column,
|
||||
bookcase,
|
||||
description,
|
||||
)
|
||||
self.sql_session.add(shelf)
|
||||
|
||||
|
||||
def do_list_bookcase_items(self, _: str):
|
||||
"""Usage: list_bookcase_items"""
|
||||
self.columnize([repr(bi) for bi in self.bookcase_items])
|
||||
|
||||
|
||||
def default(self, isbn: str):
|
||||
isbn = isbn.strip()
|
||||
if not is_valid_isbn(isbn):
|
||||
print(f'"{isbn}" is not a valid isbn')
|
||||
return
|
||||
|
||||
if (existing_item := self.sql_session.scalars(
|
||||
select(BookcaseItem)
|
||||
.where(BookcaseItem.isbn == isbn)
|
||||
).one_or_none()) is not None:
|
||||
print('Found existing BookcaseItem:', existing_item)
|
||||
print(f'There are currently {existing_item.amount} of these in the system.')
|
||||
if _prompt_yes_no('Would you like to add another?', default=True):
|
||||
existing_item.amount += 1
|
||||
return
|
||||
|
||||
bookcase_item = create_bookcase_item_from_isbn(isbn, self.sql_session)
|
||||
if bookcase_item is None:
|
||||
print(f'Could not find data about item with isbn {isbn} online.')
|
||||
print(f'If you think this is not due to a bug, please add the book to openlibrary.org before continuing.')
|
||||
return
|
||||
else:
|
||||
print(dedent(f"""
|
||||
Found item:
|
||||
title: {bookcase_item.name}
|
||||
authors: {', '.join(a.name for a in bookcase_item.authors)}
|
||||
language: {bookcase_item.language}
|
||||
"""))
|
||||
|
||||
print('Please select the shelf where the item is placed:')
|
||||
bookcase_shelf_selector = _InteractiveItemSelector(
|
||||
cls = BookcaseShelf,
|
||||
sql_session = self.sql_session,
|
||||
)
|
||||
|
||||
bookcase_shelf_selector.cmdloop()
|
||||
bookcase_item.shelf = bookcase_shelf_selector.result
|
||||
|
||||
print('Please select the items media type:')
|
||||
media_type_selector = _InteractiveItemSelector(
|
||||
cls = MediaType,
|
||||
sql_session = self.sql_session,
|
||||
default = self.sql_session.scalars(
|
||||
select(MediaType)
|
||||
.where(MediaType.name.ilike("book")),
|
||||
).one(),
|
||||
)
|
||||
|
||||
media_type_selector.cmdloop()
|
||||
bookcase_item.media_type = media_type_selector.result
|
||||
|
||||
username = input('Who owns this book? [PVV]: ')
|
||||
if username != '':
|
||||
bookcase_item.owner = username
|
||||
|
||||
self.sql_session.add(bookcase_item)
|
||||
|
||||
|
||||
def do_exit(self, _: str):
|
||||
"""Usage: exit"""
|
||||
if _prompt_yes_no('Would you like to save your changes?'):
|
||||
self.sql_session.commit()
|
||||
else:
|
||||
self.sql_session.rollback()
|
||||
exit(0)
|
||||
|
||||
|
||||
def main():
|
||||
tool = BookScanTool()
|
||||
while True:
|
||||
try:
|
||||
tool.cmdloop()
|
||||
except KeyboardInterrupt:
|
||||
try:
|
||||
print()
|
||||
if _prompt_yes_no('Are you sure you want to exit without saving?', default=False):
|
||||
raise KeyboardInterrupt
|
||||
except KeyboardInterrupt:
|
||||
if tool.sql_session is not None:
|
||||
tool.sql_session.rollback()
|
||||
exit(0)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in New Issue