move stuff to

This commit is contained in:
2025-02-22 21:48:26 +01:00
parent 3c4f6ccf8c
commit 944bf92150
75 changed files with 1 additions and 0 deletions

23
src/worblehat-frontend/.gitignore vendored Normal file
View File

@@ -0,0 +1,23 @@
# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.
# dependencies
/node_modules
/.pnp
.pnp.js
# testing
/coverage
# production
/build
# misc
.DS_Store
.env.local
.env.development.local
.env.test.local
.env.production.local
npm-debug.log*
yarn-debug.log*
yarn-error.log*

View File

@@ -0,0 +1,50 @@
{
"name": "worblehat",
"version": "0.1.0",
"private": true,
"dependencies": {
"@emotion/react": "^11.11.0",
"@emotion/styled": "^11.11.0",
"@mui/icons-material": "^5.11.16",
"@mui/material": "^5.12.3",
"@mui/types": "^7.2.4",
"@testing-library/jest-dom": "^5.16.5",
"@testing-library/react": "^14.0.0",
"@testing-library/user-event": "^14.4.3",
"@types/jest": "^29.5.1",
"@types/node": "^20.1.0",
"@types/react": "^18.2.6",
"@types/react-dom": "^18.2.4",
"@types/react-router-dom": "^5.3.3",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-router-dom": "^6.11.1",
"react-scripts": "^5.0.1",
"typescript": "^5.0.4",
"web-vitals": "^3.3.1"
},
"scripts": {
"start": "react-scripts start",
"build": "react-scripts build",
"test": "react-scripts test",
"eject": "react-scripts eject"
},
"eslintConfig": {
"extends": [
"react-app",
"react-app/jest"
]
},
"browserslist": {
"production": [
">0.2%",
"not dead",
"not op_mini all"
],
"development": [
"last 1 chrome version",
"last 1 firefox version",
"last 1 safari version"
]
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.8 KiB

View File

@@ -0,0 +1,43 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<link rel="icon" href="%PUBLIC_URL%/favicon.ico" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="theme-color" content="#000000" />
<meta
name="description"
content="Web site created using create-react-app"
/>
<link rel="apple-touch-icon" href="%PUBLIC_URL%/logo192.png" />
<!--
manifest.json provides metadata used when your web app is installed on a
user's mobile device or desktop. See https://developers.google.com/web/fundamentals/web-app-manifest/
-->
<link rel="manifest" href="%PUBLIC_URL%/manifest.json" />
<!--
Notice the use of %PUBLIC_URL% in the tags above.
It will be replaced with the URL of the `public` folder during the build.
Only files inside the `public` folder can be referenced from the HTML.
Unlike "/favicon.ico" or "favicon.ico", "%PUBLIC_URL%/favicon.ico" will
work correctly both with client-side routing and a non-root public URL.
Learn how to configure a non-root public URL by running `npm run build`.
-->
<title>React App</title>
</head>
<body>
<noscript>You need to enable JavaScript to run this app.</noscript>
<div id="root"></div>
<!--
This HTML file is a template.
If you open it directly in the browser, you will see an empty page.
You can add webfonts, meta tags, or analytics to this file.
The build step will place the bundled scripts into the <body> tag.
To begin the development, run `npm start` or `yarn start`.
To create a production bundle, use `npm run build` or `yarn build`.
-->
</body>
</html>

View File

@@ -0,0 +1,25 @@
{
"short_name": "React App",
"name": "Create React App Sample",
"icons": [
{
"src": "favicon.ico",
"sizes": "64x64 32x32 24x24 16x16",
"type": "image/x-icon"
},
{
"src": "logo192.png",
"type": "image/png",
"sizes": "192x192"
},
{
"src": "logo512.png",
"type": "image/png",
"sizes": "512x512"
}
],
"start_url": ".",
"display": "standalone",
"theme_color": "#000000",
"background_color": "#ffffff"
}

View File

@@ -0,0 +1,25 @@
import React from 'react';
import { BrowserRouter as Router, Routes, Route, Link } from "react-router-dom";
import FrontPage from './FrontPage';
import BooksPage from './Books';
import NavBar from './components/NavBar';
function App() {
return (
<div className="App">¨
<NavBar />
<Router>
<Routes>
{/* Sett opp en navbar ellerno */}
<Route path="/" element={<FrontPage />} />
<Route path="/books" element={<BooksPage />} />
</Routes>
</Router>
</div>
);
}
export default App;

View File

@@ -0,0 +1,23 @@
import React from 'react';
import Book from './components/Book';
import { useEffect } from 'react';
function BooksPage() {
const [books, setBooks] = React.useState<string[]>([]);
useEffect(() => {
setBooks(["The Hobbit", "The Lord of the Rings", "The Silmarillion"]);
}, []);
return (
<div>
<h1>
Books Page!
</h1>
<ul>
{books.map((title) => <Book title={title} />)}
</ul>
</div>
);
}
export default BooksPage;

View File

@@ -0,0 +1,16 @@
import React from 'react';
import Button from '@mui/material/Button';
function FrontPage() {
return (
<div>
<h1>
Worblehat Frontpage
</h1>
<Button variant="contained" color="primary">Trykk her</Button>
</div>
);
}
export default FrontPage;

View File

@@ -0,0 +1,15 @@
import React from 'react';
interface BookProps {
title: string;
}
function Book(props: BookProps) {
return (
<li>
{props.title}
</li>
);
}
export default Book;

View File

@@ -0,0 +1,33 @@
// Kopiert fra https://mui.com/material-ui/react-app-bar/
import React from 'react';
import AppBar from '@mui/material/AppBar';
import Box from '@mui/material/Box';
import Toolbar from '@mui/material/Toolbar';
import Typography from '@mui/material/Typography';
import Button from '@mui/material/Button';
import IconButton from '@mui/material/IconButton';
import MenuIcon from '@mui/icons-material/Menu';
export default function NavBar() {
return (
<Box sx={{ flexGrow: 1 }}>
<AppBar position="static">
<Toolbar>
<IconButton
size="large"
edge="start"
color="inherit"
aria-label="menu"
sx={{ mr: 2 }}
>
<MenuIcon />
</IconButton>
<Typography variant="h6" component="div" sx={{ flexGrow: 1 }}>
News
</Typography>
<Button color="inherit">Login</Button>
</Toolbar>
</AppBar>
</Box>
);
}

View File

@@ -0,0 +1,13 @@
body {
margin: 0;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen',
'Ubuntu', 'Cantarell', 'Fira Sans', 'Droid Sans', 'Helvetica Neue',
sans-serif;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
code {
font-family: source-code-pro, Menlo, Monaco, Consolas, 'Courier New',
monospace;
}

View File

@@ -0,0 +1,13 @@
import React from 'react';
import ReactDOM from 'react-dom/client';
import './index.css';
import App from './App';
const root = ReactDOM.createRoot(
document.getElementById('root') as HTMLElement
);
root.render(
<React.StrictMode>
<App />
</React.StrictMode>
);

View File

@@ -0,0 +1 @@
/// <reference types="react-scripts" />

View File

@@ -0,0 +1,26 @@
{
"compilerOptions": {
"target": "es5",
"lib": [
"dom",
"dom.iterable",
"esnext"
],
"allowJs": true,
"skipLibCheck": true,
"esModuleInterop": true,
"allowSyntheticDefaultImports": true,
"strict": true,
"forceConsistentCasingInFileNames": true,
"noFallthroughCasesInSwitch": true,
"module": "esnext",
"moduleResolution": "node",
"resolveJsonModule": true,
"isolatedModules": true,
"noEmit": true,
"jsx": "react-jsx"
},
"include": [
"src"
]
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
from .main import main

View File

@@ -0,0 +1 @@
from .main import WorblehatCli

271
src/worblehat/cli/main.py Normal file
View File

@@ -0,0 +1,271 @@
from textwrap import dedent
from sqlalchemy import (
event,
select,
)
from sqlalchemy.orm import Session
from worblehat.services import (
create_bookcase_item_from_isbn,
is_valid_isbn,
)
from worblehat.models import *
from .prompt_utils import *
from .subclis import (
AdvancedOptionsCli,
BookcaseItemCli,
select_bookcase_shelf,
SearchCli,
)
# TODO: Category seems to have been forgotten. Maybe relevant interactivity should be added?
# However, is there anyone who are going to search by category rather than just look in
# the shelves?
class WorblehatCli(NumberedCmd):
def __init__(self, sql_session: Session):
super().__init__()
self.sql_session = sql_session
self.sql_session_dirty = False
@event.listens_for(self.sql_session, 'after_flush')
def mark_session_as_dirty(*_):
self.sql_session_dirty = True
self.prompt_header = f'(unsaved changes)'
@event.listens_for(self.sql_session, 'after_commit')
@event.listens_for(self.sql_session, 'after_rollback')
def mark_session_as_clean(*_):
self.sql_session_dirty = False
self.prompt_header = None
@classmethod
def run_with_safe_exit_wrapper(cls, sql_session: Session):
tool = cls(sql_session)
while True:
try:
tool.cmdloop()
except KeyboardInterrupt:
if not tool.sql_session_dirty:
exit(0)
try:
print()
if prompt_yes_no('Are you sure you want to exit without saving?', default=False):
raise KeyboardInterrupt
except KeyboardInterrupt:
if tool.sql_session is not None:
tool.sql_session.rollback()
exit(0)
def do_show_bookcase(self, arg: str):
bookcase_selector = InteractiveItemSelector(
cls = Bookcase,
sql_session = self.sql_session,
)
bookcase_selector.cmdloop()
bookcase = bookcase_selector.result
for shelf in bookcase.shelfs:
print(shelf.short_str())
for item in shelf.items:
print(f' {item.name} - {item.amount} copies')
def do_show_borrowed_queued(self, _: str):
borrowed_items = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(BookcaseItemBorrowing.delivered.is_(None))
.order_by(BookcaseItemBorrowing.end_time),
).all()
if len(borrowed_items) == 0:
print('No borrowed items found.')
else:
print('Borrowed items:')
for item in borrowed_items:
print(f'- {item.username} - {item.item.name} - to be delivered by {item.end_time.strftime("%Y-%m-%d")}')
print()
queued_items = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.order_by(BookcaseItemBorrowingQueue.entered_queue_time),
).all()
if len(queued_items) == 0:
print('No queued items found.')
else:
print('Users in queue:')
for item in queued_items:
print(f'- {item.username} - {item.item.name} - entered queue at {item.entered_queue_time.strftime("%Y-%m-%d")}')
def _create_bookcase_item(self, isbn: str):
bookcase_item = create_bookcase_item_from_isbn(isbn, self.sql_session)
if bookcase_item is None:
print(f'Could not find data about item with ISBN {isbn} online.')
print(f'If you think this is not due to a bug, please add the book to openlibrary.org before continuing.')
return
else:
print(dedent(f"""
Found item:
title: {bookcase_item.name}
authors: {', '.join(a.name for a in bookcase_item.authors)}
language: {bookcase_item.language}
"""))
print('Please select the bookcase where the item is placed:')
bookcase_selector = InteractiveItemSelector(
cls = Bookcase,
sql_session = self.sql_session,
)
bookcase_selector.cmdloop()
bookcase = bookcase_selector.result
bookcase_item.shelf = select_bookcase_shelf(bookcase, self.sql_session)
print('Please select the items media type:')
media_type_selector = InteractiveItemSelector(
cls = MediaType,
sql_session = self.sql_session,
default = self.sql_session.scalars(
select(MediaType)
.where(MediaType.name.ilike("book")),
).one(),
)
media_type_selector.cmdloop()
bookcase_item.media_type = media_type_selector.result
username = input('Who owns this book? [PVV]> ')
if username != '':
bookcase_item.owner = username
self.sql_session.add(bookcase_item)
self.sql_session.flush()
def default(self, isbn: str):
isbn = isbn.strip()
if not is_valid_isbn(isbn):
super()._default(isbn)
return
if (existing_item := self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.isbn == isbn)
.join(BookcaseItemBorrowing)
.join(BookcaseItemBorrowingQueue)
).one_or_none()) is not None:
print(f'\nFound existing item for isbn "{isbn}"')
BookcaseItemCli(
sql_session = self.sql_session,
bookcase_item = existing_item,
).cmdloop()
return
if prompt_yes_no(f"Could not find item with ISBN '{isbn}'.\nWould you like to create it?", default=True):
self._create_bookcase_item(isbn)
def do_search(self, _: str):
search_cli = SearchCli(self.sql_session)
search_cli.cmdloop()
if search_cli.result is not None:
BookcaseItemCli(
sql_session = self.sql_session,
bookcase_item = search_cli.result,
).cmdloop()
def do_show_slabbedasker(self, _: str):
slubberter = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.join(BookcaseItem)
.where(
BookcaseItemBorrowing.end_time < datetime.now(),
BookcaseItemBorrowing.delivered.is_(None),
)
.order_by(
BookcaseItemBorrowing.end_time,
),
).all()
if len(slubberter) == 0:
print('No slubberts found. Life is good.')
return
for slubbert in slubberter:
print('Slubberter:')
print(f'- {slubbert.username} - {slubbert.item.name} - {slubbert.end_time.strftime("%Y-%m-%d")}')
def do_advanced(self, _: str):
AdvancedOptionsCli(self.sql_session).cmdloop()
def do_save(self, _:str):
if not self.sql_session_dirty:
print('No changes to save.')
return
self.sql_session.commit()
def do_abort(self, _:str):
if not self.sql_session_dirty:
print('No changes to abort.')
return
self.sql_session.rollback()
def do_exit(self, _: str):
if self.sql_session_dirty:
if prompt_yes_no('Would you like to save your changes?'):
self.sql_session.commit()
else:
self.sql_session.rollback()
exit(0)
funcs = {
0: {
'f': default,
'doc': 'Choose / Add item with its ISBN',
},
1: {
'f': do_search,
'doc': 'Search',
},
2: {
'f': do_show_bookcase,
'doc': 'Show a bookcase, and its items',
},
3: {
'f': do_show_borrowed_queued,
'doc': 'Show borrowed/queued items',
},
4: {
'f': do_show_slabbedasker,
'doc': 'Show slabbedasker',
},
5: {
'f': do_save,
'doc': 'Save changes',
},
6: {
'f': do_abort,
'doc': 'Abort changes',
},
7: {
'f': do_advanced,
'doc': 'Advanced options',
},
9: {
'f': do_exit,
'doc': 'Exit',
},
}

View File

@@ -0,0 +1,213 @@
from cmd import Cmd
from datetime import datetime
from typing import Any, Callable
from sqlalchemy import select
from sqlalchemy.orm import Session
def prompt_yes_no(question: str, default: bool | None = None) -> bool:
prompt = {
None: '[y/n]',
True: '[Y/n]',
False: '[y/N]',
}[default]
while not any([
(answer := input(f'{question} {prompt} ').lower()) in ('y','n'),
(default != None and answer.strip() == '')
]):
pass
return {
'y': True,
'n': False,
'': default,
}[answer]
def format_date(date: datetime):
return date.strftime("%a %b %d, %Y")
class InteractiveItemSelector(Cmd):
def __init__(
self,
cls: type,
sql_session: Session,
execute_selection: Callable[[Session, type, str], list[Any]] = lambda session, cls, arg: session.scalars(
select(cls)
.where(cls.name == arg),
).all(),
complete_selection: Callable[[Session, type, str], list[str]] = lambda session, cls, text: session.scalars(
select(cls.name)
.where(cls.name.ilike(f'{text}%')),
).all(),
default: Any | None = None,
):
"""
This is a utility class for prompting the user to select an
item from the database. The default functions assumes that
the item has a name attribute, and that the name is unique.
However, this can be overridden by passing in custom functions.
"""
super().__init__()
self.cls = cls
self.sql_session = sql_session
self.execute_selection = execute_selection
self.complete_selection = complete_selection
self.default_item = default
self.result = None
if default is not None:
self.prompt = f'Select {cls.__name__} [{default.name}]> '
else:
self.prompt = f'Select {cls.__name__}> '
def emptyline(self) -> bool:
if self.default_item is not None:
self.result = self.default_item
return True
def default(self, arg: str):
result = self.execute_selection(self.sql_session, self.cls, arg)
if len(result) != 1:
print(f'No such {self.cls.__name__} found: {arg}')
return
self.result = result[0]
return True
# TODO: Override this function to not act as an argument completer
# but to complete the entire value name
def completedefault(self, text: str, line: str, *_) -> list[str]:
return []
def completenames(self, text: str, *_) -> list[str]:
x = self.complete_selection(self.sql_session, self.cls, text)
return x
class NumberedCmd(Cmd):
"""
This is a utility class for creating a numbered command line.
It will automatically generate a prompt that lists all the
available commands, and will automatically call the correct
function based on the user input.
If the user input is not a number, it will call the default
function, which can be overridden by the subclass.
Example:
```
class MyCmd(NumberedCmd):
def __init__(self):
super().__init__()
def do_foo(self, arg: str):
pass
def do_bar(self, arg: str):
pass
funcs = {
1: {
'f': do_foo,
'doc': 'do foo',
},
2: {
'f': do_bar,
'doc': 'do bar',
},
}
```
"""
prompt_header: str | None = None
funcs: dict[int, dict[str, str | Callable[[Any, str], bool | None]]]
def __init__(self):
super().__init__()
def _generate_usage_list(self) -> str:
result = ''
for i, func in self.funcs.items():
if i == 0:
i = '*'
result += f'{i}) {func["doc"]}\n'
return result
def _default(self, arg: str):
try:
i = int(arg)
self.funcs[i]
except (ValueError, KeyError):
return
return self.funcs[i]['f'](self, arg)
def default(self, arg: str):
return self._default(arg)
def _postcmd(self, stop: bool, _: str) -> bool:
if not stop:
print()
print('-----------------')
print()
return stop
def postcmd(self, stop: bool, line: str) -> bool:
return self._postcmd(stop, line)
@property
def prompt(self):
result = ''
if self.prompt_header != None:
result += self.prompt_header + '\n'
result += self._generate_usage_list()
if self.lastcmd == '':
result += f'> '
else:
result += f'[{self.lastcmd}]> '
return result
class NumberedItemSelector(NumberedCmd):
def __init__(
self,
items: list[Any],
stringify: Callable[[Any], str] = lambda x: str(x),
):
super().__init__()
self.items = items
self.stringify = stringify
self.result = None
self.funcs = {
i: {
'f': self._select_item,
'doc': self.stringify(item),
}
for i, item in enumerate(items, start=1)
}
def _select_item(self, *a):
self.result = self.items[int(self.lastcmd)-1]
return True

View File

@@ -0,0 +1,4 @@
from .advanced_options import AdvancedOptionsCli
from .bookcase_item import BookcaseItemCli
from .bookcase_shelf_selector import select_bookcase_shelf
from .search import SearchCli

View File

@@ -0,0 +1,135 @@
from sqlalchemy import select
from sqlalchemy.orm import Session
from worblehat.cli.prompt_utils import (
InteractiveItemSelector,
NumberedCmd,
format_date,
prompt_yes_no,
)
from worblehat.models import Bookcase, BookcaseShelf
class AdvancedOptionsCli(NumberedCmd):
def __init__(self, sql_session: Session):
super().__init__()
self.sql_session = sql_session
def do_add_bookcase(self, _: str):
while True:
name = input('Name of bookcase> ')
if name == '':
print('Error: name cannot be empty')
continue
if self.sql_session.scalars(
select(Bookcase)
.where(Bookcase.name == name)
).one_or_none() is not None:
print(f'Error: a bookcase with name {name} already exists')
continue
break
description = input('Description of bookcase> ')
if description == '':
description = None
bookcase = Bookcase(name, description)
self.sql_session.add(bookcase)
self.sql_session.flush()
def do_add_bookcase_shelf(self, arg: str):
bookcase_selector = InteractiveItemSelector(
cls = Bookcase,
sql_session = self.sql_session,
)
bookcase_selector.cmdloop()
bookcase = bookcase_selector.result
while True:
column = input('Column> ')
try:
column = int(column)
except ValueError:
print('Error: column must be a number')
continue
break
while True:
row = input('Row> ')
try:
row = int(row)
except ValueError:
print('Error: row must be a number')
continue
break
if self.sql_session.scalars(
select(BookcaseShelf)
.where(
BookcaseShelf.bookcase == bookcase,
BookcaseShelf.column == column,
BookcaseShelf.row == row,
)
).one_or_none() is not None:
print(f'Error: a bookshelf in bookcase {bookcase.name} with position c{column}-r{row} already exists')
return
description = input('Description> ')
if description == '':
description = None
shelf = BookcaseShelf(
row,
column,
bookcase,
description,
)
self.sql_session.add(shelf)
self.sql_session.flush()
def do_list_bookcases(self, _: str):
bookcase_shelfs = self.sql_session.scalars(
select(BookcaseShelf)
.join(Bookcase)
.order_by(
Bookcase.name,
BookcaseShelf.column,
BookcaseShelf.row,
)
).all()
bookcase_uid = None
for shelf in bookcase_shelfs:
if shelf.bookcase.uid != bookcase_uid:
print(shelf.bookcase.short_str())
bookcase_uid = shelf.bookcase.uid
print(f' {shelf.short_str()} - {sum(i.amount for i in shelf.items)} items')
def do_done(self, _: str):
return True
funcs = {
1: {
'f': do_add_bookcase,
'doc': 'Add bookcase',
},
2: {
'f': do_add_bookcase_shelf,
'doc': 'Add bookcase shelf',
},
3: {
'f': do_list_bookcases,
'doc': 'List all bookcases',
},
9: {
'f': do_done,
'doc': 'Done',
},
}

View File

@@ -0,0 +1,396 @@
from datetime import datetime, timedelta
from textwrap import dedent
from sqlalchemy import select
from sqlalchemy.orm import Session
from worblehat.cli.prompt_utils import (
InteractiveItemSelector,
NumberedCmd,
NumberedItemSelector,
format_date,
prompt_yes_no,
)
from worblehat.models import (
Bookcase,
BookcaseItem,
BookcaseItemBorrowing,
BookcaseItemBorrowingQueue,
Language,
MediaType,
)
from worblehat.services.bookcase_item import (
create_bookcase_item_from_isbn,
is_valid_isbn,
)
from worblehat.services.config import Config
from .bookcase_shelf_selector import select_bookcase_shelf
def _selected_bookcase_item_prompt(bookcase_item: BookcaseItem) -> str:
amount_borrowed = len(bookcase_item.borrowings)
return dedent(f'''
Item: {bookcase_item.name}
ISBN: {bookcase_item.isbn}
Authors: {', '.join(a.name for a in bookcase_item.authors)}
Bookcase: {bookcase_item.shelf.bookcase.short_str()}
Shelf: {bookcase_item.shelf.short_str()}
Amount: {bookcase_item.amount - amount_borrowed}/{bookcase_item.amount}
''')
class BookcaseItemCli(NumberedCmd):
def __init__(self, sql_session: Session, bookcase_item: BookcaseItem):
super().__init__()
self.sql_session = sql_session
self.bookcase_item = bookcase_item
@property
def prompt_header(self) -> str:
return _selected_bookcase_item_prompt(self.bookcase_item)
def do_update_data(self, _: str):
item = create_bookcase_item_from_isbn(self.sql_session, self.bookcase_item.isbn)
self.bookcase_item.name = item.name
# TODO: Remove any old authors
self.bookcase_item.authors = item.authors
self.bookcase_item.language = item.language
self.sql_session.flush()
def do_edit(self, arg: str):
EditBookcaseCli(self.sql_session, self.bookcase_item, self).cmdloop()
@staticmethod
def _prompt_username() -> str:
while True:
username = input('Username: ')
if prompt_yes_no(f'Is {username} correct?', default = True):
return username
def _has_active_borrowing(self, username: str) -> bool:
return self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(
BookcaseItemBorrowing.username == username,
BookcaseItemBorrowing.item == self.bookcase_item,
BookcaseItemBorrowing.delivered.is_(None),
)
).one_or_none() is not None
def _has_borrowing_queue_item(self, username: str) -> bool:
return self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.where(
BookcaseItemBorrowingQueue.username == username,
BookcaseItemBorrowingQueue.item == self.bookcase_item,
)
).one_or_none() is not None
def do_borrow(self, _: str):
active_borrowings = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(
BookcaseItemBorrowing.item == self.bookcase_item,
BookcaseItemBorrowing.delivered.is_(None),
)
.order_by(BookcaseItemBorrowing.end_time)
).all()
if len(active_borrowings) >= self.bookcase_item.amount:
print('This item is currently not available')
print()
print('Active borrowings:')
for b in active_borrowings:
print(f' {b.username} - Until {format_date(b.end_time)}')
if len(self.bookcase_item.borrowing_queue) > 0:
print('Borrowing queue:')
for i, b in enumerate(self.bookcase_item.borrowing_queue):
print(f' {i + 1} - {b.username}')
print()
if not prompt_yes_no('Would you like to enter the borrowing queue?', default = True):
return
username = self._prompt_username()
if self._has_active_borrowing(username):
print('You already have an active borrowing')
return
if self._has_borrowing_queue_item(username):
print('You are already in the borrowing queue')
return
borrowing_queue_item = BookcaseItemBorrowingQueue(username, self.bookcase_item)
self.sql_session.add(borrowing_queue_item)
print(f'{username} entered the queue!')
return
username = self._prompt_username()
borrowing_item = BookcaseItemBorrowing(username, self.bookcase_item)
self.sql_session.add(borrowing_item)
self.sql_session.flush()
print(f'Successfully borrowed the item. Please deliver it back by {format_date(borrowing_item.end_time)}')
def do_deliver(self, _: str):
borrowings = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.join(BookcaseItem, BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid)
.where(BookcaseItem.isbn == self.bookcase_item.isbn)
.order_by(BookcaseItemBorrowing.username)
).all()
if len(borrowings) == 0:
print('No one seems to have borrowed this item')
return
print('Borrowers:')
for i, b in enumerate(borrowings):
print(f' {i + 1}) {b.username}')
while True:
try:
selection = int(input('> '))
except ValueError:
print('Error: selection must be an integer')
continue
if selection < 1 or selection > len(borrowings):
print('Error: selection out of range')
continue
break
borrowing = borrowings[selection - 1]
borrowing.delivered = datetime.now()
self.sql_session.flush()
print(f'Successfully delivered the item for {borrowing.username}')
def do_extend_borrowing(self, _: str):
borrowings = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.join(BookcaseItem, BookcaseItem.uid == BookcaseItemBorrowing.fk_bookcase_item_uid)
.where(BookcaseItem.isbn == self.bookcase_item.isbn)
.order_by(BookcaseItemBorrowing.username)
).all()
if len(borrowings) == 0:
print('No one seems to have borrowed this item')
return
borrowing_queue = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.where(
BookcaseItemBorrowingQueue.item == self.bookcase_item,
BookcaseItemBorrowingQueue.item_became_available_time == None,
)
.order_by(BookcaseItemBorrowingQueue.entered_queue_time)
).all()
if len(borrowing_queue) != 0:
print('Sorry, you cannot extend the borrowing because there are people waiting in the queue')
print('Borrowing queue:')
for i, b in enumerate(borrowing_queue):
print(f' {i + 1}) {b.username}')
return
print('Who are you?')
selector = NumberedItemSelector(
items = list(borrowings),
stringify = lambda b: f'{b.username} - Until {format_date(b.end_time)}',
)
selector.cmdloop()
if selector.result is None:
return
borrowing = selector.result
borrowing.end_time = datetime.now() + timedelta(days=int(Config['deadline_daemon.days_before_queue_position_expires']))
self.sql_session.flush()
print(f'Successfully extended the borrowing for {borrowing.username} until {format_date(borrowing.end_time)}')
def do_done(self, _: str):
return True
funcs = {
1: {
'f': do_borrow,
'doc': 'Borrow',
},
2: {
'f': do_deliver,
'doc': 'Deliver',
},
3: {
'f': do_extend_borrowing,
'doc': 'Extend borrowing',
},
4: {
'f': do_edit,
'doc': 'Edit',
},
5: {
'f': do_update_data,
'doc': 'Pull updated data from online databases',
},
9: {
'f': do_done,
'doc': 'Done',
},
}
class EditBookcaseCli(NumberedCmd):
def __init__(self, sql_session: Session, bookcase_item: BookcaseItem, parent: BookcaseItemCli):
super().__init__()
self.sql_session = sql_session
self.bookcase_item = bookcase_item
self.parent = parent
@property
def prompt_header(self) -> str:
return _selected_bookcase_item_prompt(self.bookcase_item)
def do_name(self, _: str):
while True:
name = input('New name> ')
if name == '':
print('Error: name cannot be empty')
continue
if self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.name == name)
).one_or_none() is not None:
print(f'Error: an item with name {name} already exists')
continue
break
self.bookcase_item.name = name
self.sql_session.flush()
def do_isbn(self, _: str):
while True:
isbn = input('New ISBN> ')
if isbn == '':
print('Error: ISBN cannot be empty')
continue
if not is_valid_isbn(isbn):
print('Error: ISBN is not valid')
continue
if self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.isbn == isbn)
).one_or_none() is not None:
print(f'Error: an item with ISBN {isbn} already exists')
continue
break
self.bookcase_item.isbn = isbn
if prompt_yes_no('Update data from online databases?'):
self.parent.do_update_data('')
self.sql_session.flush()
def do_language(self, _: str):
language_selector = InteractiveItemSelector(
Language,
self.sql_session,
)
self.bookcase_item.language = language_selector.result
self.sql_session.flush()
def do_media_type(self, _: str):
media_type_selector = InteractiveItemSelector(
MediaType,
self.sql_session,
)
self.bookcase_item.media_type = media_type_selector.result
self.sql_session.flush()
def do_amount(self, _: str):
while (new_amount := input(f'New amount [{self.bookcase_item.amount}]> ')) != '':
try:
new_amount = int(new_amount)
except ValueError:
print('Error: amount must be an integer')
continue
if new_amount < 1:
print('Error: amount must be greater than 0')
continue
break
self.bookcase_item.amount = new_amount
self.sql_session.flush()
def do_shelf(self, _: str):
bookcase_selector = InteractiveItemSelector(
Bookcase,
self.sql_session,
)
bookcase_selector.cmdloop()
bookcase = bookcase_selector.result
shelf = select_bookcase_shelf(bookcase, self.sql_session)
self.bookcase_item.shelf = shelf
self.sql_session.flush()
def do_done(self, _: str):
return True
funcs = {
1: {
'f': do_name,
'doc': 'Change name',
},
2: {
'f': do_isbn,
'doc': 'Change ISBN',
},
3: {
'f': do_language,
'doc': 'Change language',
},
4: {
'f': do_media_type,
'doc': 'Change media type',
},
5: {
'f': do_amount,
'doc': 'Change amount',
},
6: {
'f': do_shelf,
'doc': 'Change shelf',
},
9: {
'f': do_done,
'doc': 'Done',
},
}

View File

@@ -0,0 +1,45 @@
from sqlalchemy import select
from sqlalchemy.orm import Session
from worblehat.cli.prompt_utils import InteractiveItemSelector
from worblehat.models import (
Bookcase,
BookcaseShelf,
)
def select_bookcase_shelf(
bookcase: Bookcase,
sql_session: Session,
prompt: str = "Please select the shelf where the item is placed (col-row):"
) -> BookcaseShelf:
def __complete_bookshelf_selection(session: Session, cls: type, arg: str):
args = arg.split('-')
query = select(cls.row, cls.column).where(cls.bookcase == bookcase)
try:
if arg != '' and len(args) > 0:
query = query.where(cls.column == int(args[0]))
if len(args) > 1:
query = query.where(cls.row == int(args[1]))
except ValueError:
return []
result = session.execute(query).all()
return [f"{c}-{r}" for r,c in result]
print(prompt)
bookcase_shelf_selector = InteractiveItemSelector(
cls = BookcaseShelf,
sql_session = sql_session,
execute_selection = lambda session, cls, arg: session.scalars(
select(cls)
.where(
cls.bookcase == bookcase,
cls.column == int(arg.split('-')[0]),
cls.row == int(arg.split('-')[1]),
)
).all(),
complete_selection = __complete_bookshelf_selection,
)
bookcase_shelf_selector.cmdloop()
return bookcase_shelf_selector.result

View File

@@ -0,0 +1,146 @@
from sqlalchemy import select
from sqlalchemy.orm import Session
from worblehat.cli.prompt_utils import (
NumberedCmd,
NumberedItemSelector,
)
from worblehat.models import Author, BookcaseItem
class SearchCli(NumberedCmd):
def __init__(self, sql_session: Session):
super().__init__()
self.sql_session = sql_session
self.result = None
def do_search_all(self, _: str):
print('TODO: Implement search all')
def do_search_title(self, _: str):
while (input_text := input('Enter title: ')) == '':
pass
items = self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.name.ilike(f'%{input_text}%')),
).all()
if len(items) == 0:
print('No items found.')
return
selector = NumberedItemSelector(
items = items,
stringify = lambda item: f"{item.name} ({item.isbn})",
)
selector.cmdloop()
if selector.result is not None:
self.result = selector.result
return True
def do_search_author(self, _: str):
while (input_text := input('Enter author name: ')) == '':
pass
author = self.sql_session.scalars(
select(Author)
.where(Author.name.ilike(f'%{input_text}%')),
).all()
if len(author) == 0:
print('No authors found.')
return
elif len(author) == 1:
selected_author = author[0]
print('Found author:')
print(f" {selected_author.name} ({sum(item.amount for item in selected_author.items)} items)")
else:
selector = NumberedItemSelector(
items = author,
stringify = lambda author: f"{author.name} ({sum(item.amount for item in author.items)} items)",
)
selector.cmdloop()
if selector.result is None:
return
selected_author = selector.result
selector = NumberedItemSelector(
items = list(selected_author.items),
stringify = lambda item: f"{item.name} ({item.isbn})",
)
selector.cmdloop()
if selector.result is not None:
self.result = selector.result
return True
def do_search_owner(self, _: str):
while (input_text := input('Enter username: ')) == '':
pass
users = self.sql_session.scalars(
select(BookcaseItem.owner)
.where(BookcaseItem.owner.ilike(f'%{input_text}%'))
.distinct(),
).all()
if len(users) == 0:
print('No users found.')
return
elif len(users) == 1:
selected_user = users[0]
print('Found user:')
print(f" {selected_user}")
else:
selector = NumberedItemSelector(items = users)
selector.cmdloop()
if selector.result is None:
return
selected_user = selector.result
items = self.sql_session.scalars(
select(BookcaseItem)
.where(BookcaseItem.owner == selected_user),
).all()
selector = NumberedItemSelector(
items = items,
stringify = lambda item: f"{item.name} ({item.isbn})",
)
selector.cmdloop()
if selector.result is not None:
self.result = selector.result
return True
def do_done(self, _: str):
return True
funcs = {
1: {
'f': do_search_all,
'doc': 'Search everything',
},
2: {
'f': do_search_title,
'doc': 'Search by title',
},
3: {
'f': do_search_author,
'doc': 'Search by author',
},
4: {
'f': do_search_owner,
'doc': 'Search by owner',
},
9: {
'f': do_done,
'doc': 'Done',
},
}

View File

@@ -0,0 +1 @@
from .main import DeadlineDaemon

View File

@@ -0,0 +1,291 @@
import logging
from datetime import datetime, timedelta
from textwrap import dedent
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from worblehat.services.config import Config
from worblehat.models import (
BookcaseItemBorrowing,
DeadlineDaemonLastRunDatetime,
BookcaseItemBorrowingQueue,
)
from worblehat.services.email import send_email
class DeadlineDaemon:
def __init__(self, sql_session: Session):
if not Config['deadline_daemon.enabled']:
return
self.sql_session = sql_session
self.last_run = self.sql_session.scalars(
select(DeadlineDaemonLastRunDatetime),
).one_or_none()
if self.last_run is None:
logging.info('No previous run found, assuming this is the first run')
self.last_run = DeadlineDaemonLastRunDatetime(time=datetime.now())
self.sql_session.add(self.last_run)
self.sql_session.commit()
self.last_run_datetime = self.last_run.time
self.current_run_datetime = datetime.now()
def run(self):
logging.info('Deadline daemon started')
if not Config['deadline_daemon.enabled']:
logging.warn('Deadline daemon disabled, exiting')
return
if Config['deadline_daemon.dryrun']:
logging.warn('Running in dryrun mode')
self.send_close_deadline_reminder_mails()
self.send_overdue_mails()
self.send_newly_available_mails()
self.send_expiring_queue_position_mails()
self.auto_expire_queue_positions()
self.last_run.time = self.current_run_datetime
self.sql_session.commit()
###################
# EMAIL TEMPLATES #
###################
def _send_close_deadline_mail(self, borrowing: BookcaseItemBorrowing):
logging.info(f'Sending close deadline mail to {borrowing.username}@pvv.ntnu.no.')
send_email(
f'{borrowing.username}@pvv.ntnu.no',
'Reminder - Your borrowing deadline is approaching',
dedent(f'''
Your borrowing deadline for the following item is approaching:
{borrowing.item.name}
Please return the item by {borrowing.end_time.strftime("%a %b %d, %Y")}
''',
).strip(),
)
def _send_overdue_mail(self, borrowing: BookcaseItemBorrowing):
logging.info(f'Sending overdue mail to {borrowing.username}@pvv.ntnu.no for {borrowing.item.isbn} - {borrowing.end_time.strftime("%a %b %d, %Y")}')
send_email(
f'{borrowing.username}@pvv.ntnu.no',
'Your deadline has passed',
dedent(f'''
Your delivery deadline for the following item has passed:
{borrowing.item.name}
Please return the item as soon as possible.
''',
).strip(),
)
def _send_newly_available_mail(self, queue_item: BookcaseItemBorrowingQueue):
logging.info(f'Sending newly available mail to {queue_item.username}')
days_before_queue_expires = Config['deadline_daemon.days_before_queue_position_expires']
# TODO: calculate and format the date of when the queue position expires in the mail.
send_email(
f'{queue_item.username}@pvv.ntnu.no',
'An item you have queued for is now available',
dedent(f'''
The following item is now available for you to borrow:
{queue_item.item.name}
Please pick up the item within {days_before_queue_expires} days.
''',
).strip(),
)
def _send_expiring_queue_position_mail(self, queue_position: BookcaseItemBorrowingQueue, day: int):
logging.info(f'Sending queue position expiry reminder to {queue_position.username}@pvv.ntnu.no.')
send_email(
f'{queue_position.username}@pvv.ntnu.no',
'Reminder - Your queue position expiry deadline is approaching',
dedent(f'''
Your queue position expiry deadline for the following item is approaching:
{queue_position.item.name}
Please borrow the item by {(queue_position.item_became_available_time + timedelta(days=day)).strftime("%a %b %d, %Y")}
''',
).strip(),
)
def _send_queue_position_expired_mail(self, queue_position: BookcaseItemBorrowingQueue):
send_email(
f'{queue_position.username}@pvv.ntnu.no',
'Your queue position has expired',
dedent(f'''
Your queue position for the following item has expired:
{queue_position.item.name}
You can queue for the item again at any time, but you will be placed at the back of the queue.
There are currently {len(queue_position.item.borrowing_queue)} users in the queue.
''',
).strip(),
)
##################
# EMAIL ROUTINES #
##################
def _sql_subtract_date(self, x: datetime, y: timedelta):
if self.sql_session.bind.dialect.name == 'sqlite':
# SQLite does not support timedelta in queries
return func.datetime(x, f'-{y.days} days')
elif self.sql_session.bind.dialect.name == 'postgresql':
return x - y
else:
raise NotImplementedError(f'Unsupported dialect: {self.sql_session.bind.dialect.name}')
def send_close_deadline_reminder_mails(self):
logging.info('Sending mails for items with a closing deadline')
# TODO: This should be int-parsed and validated before the daemon started
days = [int(d) for d in Config['deadline_daemon.warn_days_before_borrowing_deadline']]
for day in days:
borrowings_to_remind = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(
self._sql_subtract_date(
BookcaseItemBorrowing.end_time,
timedelta(days=day),
)
.between(
self.last_run_datetime,
self.current_run_datetime,
),
BookcaseItemBorrowing.delivered.is_(None),
),
).all()
for borrowing in borrowings_to_remind:
self._send_close_deadline_mail(borrowing)
def send_overdue_mails(self):
logging.info('Sending mails for overdue items')
to_remind = self.sql_session.scalars(
select(BookcaseItemBorrowing)
.where(
BookcaseItemBorrowing.end_time < self.current_run_datetime,
BookcaseItemBorrowing.delivered.is_(None),
)
).all()
for borrowing in to_remind:
self._send_overdue_mail(borrowing)
def send_newly_available_mails(self):
logging.info('Sending mails about newly available items')
newly_available = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.join(
BookcaseItemBorrowing,
BookcaseItemBorrowing.fk_bookcase_item_uid == BookcaseItemBorrowingQueue.fk_bookcase_item_uid,
)
.where(
BookcaseItemBorrowingQueue.expired.is_(False),
BookcaseItemBorrowing.delivered.is_not(None),
BookcaseItemBorrowing.delivered.between(
self.last_run_datetime,
self.current_run_datetime,
),
)
.order_by(BookcaseItemBorrowingQueue.entered_queue_time)
.group_by(BookcaseItemBorrowingQueue.fk_bookcase_item_uid)
).all()
for queue_item in newly_available:
logging.info(f'Adding user {queue_item.username} to queue for {queue_item.item.name}')
queue_item.item_became_available_time = self.current_run_datetime
self.sql_session.commit()
self._send_newly_available_mail(queue_item)
def send_expiring_queue_position_mails(self):
logging.info('Sending mails about queue positions which are expiring soon')
logging.warning('Not implemented')
days = [int(d) for d in Config['deadline_daemon.warn_days_before_expiring_queue_position_deadline']]
for day in days:
queue_positions_to_remind = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.join(
BookcaseItemBorrowing,
BookcaseItemBorrowing.fk_bookcase_item_uid == BookcaseItemBorrowingQueue.fk_bookcase_item_uid,
)
.where(
self._sql_subtract_date(
BookcaseItemBorrowingQueue.item_became_available_time + timedelta(days=day),
timedelta(days=day),
)
.between(
self.last_run_datetime,
self.current_run_datetime,
),
),
).all()
for queue_position in queue_positions_to_remind:
self._send_expiring_queue_position_mail(queue_position, day)
def auto_expire_queue_positions(self):
logging.info('Expiring queue positions which are too old')
queue_position_expiry_days = int(Config['deadline_daemon.days_before_queue_position_expires'])
overdue_queue_positions = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.where(
BookcaseItemBorrowingQueue.item_became_available_time + timedelta(days=queue_position_expiry_days) < self.current_run_datetime,
BookcaseItemBorrowingQueue.expired.is_(False),
),
).all()
for queue_position in overdue_queue_positions:
logging.info(f'Expiring queue position for {queue_position.username} for item {queue_position.item.name}')
queue_position.expired = True
next_queue_position = self.sql_session.scalars(
select(BookcaseItemBorrowingQueue)
.where(
BookcaseItemBorrowingQueue.fk_bookcase_item_uid == queue_position.fk_bookcase_item_uid,
BookcaseItemBorrowingQueue.item_became_available_time.is_(None),
)
.order_by(BookcaseItemBorrowingQueue.entered_queue_time)
.limit(1),
).one_or_none()
self._send_queue_position_expired_mail(queue_position)
if next_queue_position is not None:
next_queue_position.item_became_available_time = self.current_run_datetime
logging.info(f'Next user in queue for item {next_queue_position.item.name} is {next_queue_position.username}')
self._send_newly_available_mail(next_queue_position)
self.sql_session.commit()

View File

@@ -0,0 +1,117 @@
from datetime import datetime, timedelta
from worblehat.models import (
BookcaseItem,
BookcaseItemBorrowing,
BookcaseItemBorrowingQueue,
DeadlineDaemonLastRunDatetime,
)
from worblehat.services.config import Config
from .seed_test_data import main as seed_test_data_main
def clear_db(sql_session):
sql_session.query(BookcaseItemBorrowingQueue).delete()
sql_session.query(BookcaseItemBorrowing).delete()
sql_session.query(DeadlineDaemonLastRunDatetime).delete()
sql_session.commit()
# NOTE: feel free to change this function to suit your needs
# it's just a quick and dirty way to get some data into the database
# for testing the deadline daemon - oysteikt 2024
def main(sql_session):
borrow_warning_days = [timedelta(days=int(d)) for d in Config['deadline_daemon.warn_days_before_borrowing_deadline']]
queue_warning_days = [timedelta(days=int(d)) for d in Config['deadline_daemon.warn_days_before_expiring_queue_position_deadline']]
queue_expire_days = int(Config['deadline_daemon.days_before_queue_position_expires'])
clear_db(sql_session)
seed_test_data_main(sql_session)
books = sql_session.query(BookcaseItem).all()
last_run_datetime = datetime.now() - timedelta(days=16)
last_run = DeadlineDaemonLastRunDatetime(last_run_datetime)
sql_session.add(last_run)
# Create at least one item that is borrowed and not supposed to be returned yet
borrowing = BookcaseItemBorrowing(
item=books[0],
username='test_borrower_still_borrowing',
)
borrowing.start_time = last_run_datetime - timedelta(days=1)
borrowing.end_time = datetime.now() - timedelta(days=6)
sql_session.add(borrowing)
# Create at least one item that is borrowed and is supposed to be returned soon
borrowing = BookcaseItemBorrowing(
item=books[1],
username='test_borrower_return_soon',
)
borrowing.start_time = last_run_datetime - timedelta(days=1)
borrowing.end_time = datetime.now() - timedelta(days=2)
sql_session.add(borrowing)
# Create at least one item that is borrowed and is overdue
borrowing = BookcaseItemBorrowing(
item=books[2],
username='test_borrower_overdue',
)
borrowing.start_time = datetime.now() - timedelta(days=1)
borrowing.end_time = datetime.now() + timedelta(days=1)
sql_session.add(borrowing)
# Create at least one item that is in the queue and is not supposed to be borrowed yet
queue_item = BookcaseItemBorrowingQueue(
item=books[3],
username='test_queue_user_still_waiting',
)
queue_item.entered_queue_time = last_run_datetime - timedelta(days=1)
borrowing = BookcaseItemBorrowing(
item=books[3],
username='test_borrower_return_soon',
)
borrowing.start_time = last_run_datetime - timedelta(days=1)
borrowing.end_time = datetime.now() - timedelta(days=2)
sql_session.add(queue_item)
sql_session.add(borrowing)
# Create at least three items that is in the queue and two items were just returned
for i in range(3):
queue_item = BookcaseItemBorrowingQueue(
item=books[4 + i],
username=f'test_queue_user_{i}',
)
sql_session.add(queue_item)
for i in range(3):
borrowing = BookcaseItemBorrowing(
item=books[4 + i],
username=f'test_borrower_returned_{i}',
)
borrowing.start_time = last_run_datetime - timedelta(days=2)
borrowing.end_time = datetime.now() + timedelta(days=1)
if i != 2:
borrowing.delivered = datetime.now() - timedelta(days=1)
sql_session.add(borrowing)
# Create at least one item that has been in the queue for so long that the queue position should expire
queue_item = BookcaseItemBorrowingQueue(
item=books[7],
username='test_queue_user_expired',
)
queue_item.entered_queue_time = datetime.now() - timedelta(days=15)
# Create at least one item that has been in the queue for so long that the queue position should expire,
# but the queue person has already been notified
queue_item = BookcaseItemBorrowingQueue(
item=books[8],
username='test_queue_user_expired_notified',
)
queue_item.entered_queue_time = datetime.now() - timedelta(days=15)
sql_session.commit()

View File

@@ -0,0 +1,87 @@
import csv
from pathlib import Path
from datetime import datetime, timedelta
from worblehat.models import (
Bookcase,
BookcaseItem,
BookcaseShelf,
MediaType,
Language,
)
from worblehat.services.config import Config
CSV_FILE = Path(__file__).parent.parent.parent / 'data' / 'arbeidsrom_smal_hylle_5.csv'
def clear_db(sql_session):
sql_session.query(BookcaseItem).delete()
sql_session.query(BookcaseShelf).delete()
sql_session.query(Bookcase).delete()
sql_session.query(MediaType).delete()
sql_session.query(Language).delete()
sql_session.commit()
def main(sql_session):
clear_db(sql_session)
media_type = MediaType(
name='Book',
description='A book',
)
sql_session.add(media_type)
language = Language(
name='Norwegian',
iso639_1_code='no',
)
sql_session.add(language)
seed_case = Bookcase(
name='seed_case',
description='test bookcase with test data',
)
sql_session.add(seed_case)
seed_shelf_1 = BookcaseShelf(
row=1,
column=1,
bookcase=seed_case,
description='test shelf with test data 1',
)
seed_shelf_2 = BookcaseShelf(
row=2,
column=1,
bookcase=seed_case,
description='test shelf with test data 2',
)
sql_session.add(seed_shelf_1)
sql_session.add(seed_shelf_2)
bookcase_items = []
with open(CSV_FILE) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
next(csv_reader)
for row in csv_reader:
item = BookcaseItem(
isbn=row[0],
name=row[1],
)
item.media_type = media_type
item.language = language
bookcase_items.append(item)
half = len(bookcase_items) // 2
first_half = bookcase_items[:half]
second_half = bookcase_items[half:]
for item in first_half:
seed_shelf_1.items.add(item)
for item in second_half:
seed_shelf_2.items.add(item)
sql_session.add_all(bookcase_items)
sql_session.commit()

View File

View File

View File

@@ -0,0 +1,11 @@
from flask import Blueprint, render_template
main = Blueprint("main", __name__, template_folder="main")
@main.route('/')
def index():
return render_template("main/index.html")
@main.route("/login")
def login():
return render_template("main/login.html")

View File

@@ -0,0 +1,3 @@
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

View File

@@ -0,0 +1,42 @@
from flask import Flask
from flask_admin import Admin
from flask_admin.contrib.sqla import ModelView
from sqlalchemy import inspect
from worblehat.models import *
from worblehat.services.seed_test_data import seed_data
from worblehat.services.config import Config
from .blueprints.main import main
from .database import db
def create_app(args: dict[str, any] | None = None):
app = Flask(__name__)
app.config.update(Config['flask'])
app.config.update(Config._config)
app.config['SQLALCHEMY_DATABASE_URI'] = Config.db_string()
app.config['SQLALCHEMY_ECHO'] = Config['logging.debug_sql']
db.init_app(app)
with app.app_context():
if not inspect(db.engine).has_table('Bookcase'):
Base.metadata.create_all(db.engine)
seed_data()
configure_admin(app)
app.register_blueprint(main)
return app
def configure_admin(app):
admin = Admin(app, name='Worblehat', template_mode='bootstrap3')
admin.add_view(ModelView(Author, db.session))
admin.add_view(ModelView(Bookcase, db.session))
admin.add_view(ModelView(BookcaseItem, db.session))
admin.add_view(ModelView(BookcaseShelf, db.session))
admin.add_view(ModelView(Category, db.session))
admin.add_view(ModelView(Language, db.session))
admin.add_view(ModelView(MediaType, db.session))

View File

@@ -0,0 +1,17 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>
{% block title %} Worblehat {% endblock %}
</title>
</head>
<body>
<script src="https://cdn.tailwindcss.com"></script>
{% block main %}
{% endblock %}
</body>
</html>

View File

@@ -0,0 +1,19 @@
{% extends 'base.html' %}
{% block main %}
<div class="py-8 px-8 max-w-sm my-8 mx-auto bg-white rounded-xl shadow-lg space-y-2 sm:py-4 sm:flex sm:items-center sm:space-y-0 sm:space-x-6">
<div class="text-center space-y-2 sm:text-left">
<div class="space-y-0.5">
<p class="text-lg text-black font-semibold">
Worblehat
</p>
<p class="text-slate-500 font-medium">
PVV Library
</p>
</div>
<a href="{{ url_for('main.login') }}" class="px-4 py-1 text-sm text-purple-600 font-semibold rounded-full border border-purple-200 hover:text-white hover:bg-purple-600 hover:border-transparent focus:outline-none focus:ring-2 focus:ring-purple-600 focus:ring-offset-2">Demo</button>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,40 @@
{% extends 'base.html' %}
{% block main %}
<section class="bg-gray-50 dark:bg-gray-900">
<div class="flex flex-col items-center justify-center px-6 py-8 mx-auto md:h-screen lg:py-0">
<a href="#" class="flex items-center mb-6 text-2xl font-semibold text-gray-900 dark:text-white">
Logo goes here
</a>
<div class="w-full bg-white rounded-lg shadow dark:border md:mt-0 sm:max-w-md xl:p-0 dark:bg-gray-800 dark:border-gray-700">
<div class="p-6 space-y-4 md:space-y-6 sm:p-8">
<h1 class="text-xl font-bold leading-tight tracking-tight text-gray-900 md:text-2xl dark:text-white">
Sign in to your account
</h1>
<form class="space-y-4 md:space-y-6" action="#">
<div>
<label for="email" class="block mb-2 text-sm font-medium text-gray-900 dark:text-white">Your email</label>
<input type="email" name="email" id="email" class="bg-gray-50 border border-gray-300 text-gray-900 sm:text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-blue-500 dark:focus:border-blue-500" placeholder="name@pvv.ntnu.no" required="">
</div>
<div>
<label for="password" class="block mb-2 text-sm font-medium text-gray-900 dark:text-white">Password</label>
<input type="password" name="password" id="password" placeholder="••••••••" class="bg-gray-50 border border-gray-300 text-gray-900 sm:text-sm rounded-lg focus:ring-primary-600 focus:border-primary-600 block w-full p-2.5 dark:bg-gray-700 dark:border-gray-600 dark:placeholder-gray-400 dark:text-white dark:focus:ring-blue-500 dark:focus:border-blue-500" required="">
</div>
<div class="flex items-center justify-between">
<div class="flex items-start">
<div class="flex items-center h-5">
<input id="remember" aria-describedby="remember" type="checkbox" class="w-4 h-4 border border-gray-300 rounded bg-gray-50 focus:ring-3 focus:ring-primary-300 dark:bg-gray-700 dark:border-gray-600 dark:focus:ring-primary-600 dark:ring-offset-gray-800" required="">
</div>
<div class="ml-3 text-sm">
<label for="remember" class="text-gray-500 dark:text-gray-300">Remember me</label>
</div>
</div>
<a href="#" class="text-sm font-medium text-primary-600 hover:underline dark:text-primary-500">Forgot password?</a>
</div>
<button type="submit" class="w-full text-white bg-primary-600 hover:bg-primary-700 focus:ring-4 focus:outline-none focus:ring-primary-300 font-medium rounded-lg text-sm px-5 py-2.5 text-center dark:bg-primary-600 dark:hover:bg-primary-700 dark:focus:ring-primary-800">Sign in</button>
</form>
</div>
</div>
</div>
</section>
{% endblock %}

View File

@@ -0,0 +1,18 @@
from werkzeug import run_simple
from worblehat.services.config import Config
from .flaskapp import create_app
def main():
app = create_app()
run_simple(
hostname = 'localhost',
port = 5000,
application = app,
use_debugger = True,
use_reloader = True,
)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,8 @@
from .flaskapp import create_app
def main():
app = create_app()
app.run()
if __name__ == '__main__':
main()

88
src/worblehat/main.py Normal file
View File

@@ -0,0 +1,88 @@
import logging
from pprint import pformat
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from .services import (
Config,
arg_parser,
devscripts_arg_parser,
)
from .deadline_daemon import DeadlineDaemon
from .cli import WorblehatCli
from .flaskapp.wsgi_dev import main as flask_dev_main
from .flaskapp.wsgi_prod import main as flask_prod_main
def _print_version() -> None:
from worblehat import __version__
print(f'Worblehat version {__version__}')
def _connect_to_database(**engine_args) -> Session:
try:
engine = create_engine(Config.db_string(), **engine_args)
sql_session = Session(engine)
except Exception as err:
print('Error: could not connect to database.')
print(err)
exit(1)
print(f"Debug: Connected to database at '{Config.db_string()}'")
return sql_session
def main():
args = arg_parser.parse_args()
Config.load_configuration(vars(args))
if Config['logging.debug']:
logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
else:
logging.basicConfig(encoding='utf-8', level=logging.INFO)
if args.version:
_print_version()
exit(0)
if args.print_config:
print(f'Configuration:\n{pformat(vars(args))}')
exit(0)
if args.command == 'deadline-daemon':
sql_session = _connect_to_database(echo=Config['logging.debug_sql'])
DeadlineDaemon(sql_session).run()
exit(0)
if args.command == 'cli':
sql_session = _connect_to_database(echo=Config['logging.debug_sql'])
WorblehatCli.run_with_safe_exit_wrapper(sql_session)
exit(0)
if args.command == 'devscripts':
sql_session = _connect_to_database(echo=Config['logging.debug_sql'])
if args.script == 'seed-content-for-deadline-daemon':
from .devscripts.seed_content_for_deadline_daemon import main
main(sql_session)
elif args.script == 'seed-test-data':
from .devscripts.seed_test_data import main
main(sql_session)
else:
print(devscripts_arg_parser.format_help())
exit(1)
exit(0)
if args.command == 'flask-dev':
flask_dev_main()
exit(0)
if args.command == 'flask-prod':
if Config['logging.debug'] or Config['logging.debug_sql']:
logging.warn('Debug mode is enabled for the production server. This is not recommended.')
flask_prod_main()
exit(0)
print(arg_parser.format_help())
exit(1)

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
Integer,
ForeignKey,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
from .mixins import (
UidMixin,
UniqueNameMixin,
)
from .xref_tables import Item_Author
if TYPE_CHECKING:
from .BookcaseItem import BookcaseItem
class Author(Base, UidMixin, UniqueNameMixin):
items: Mapped[set[BookcaseItem]] = relationship(
secondary = Item_Author.__table__,
back_populates = 'authors',
)
def __init__(
self,
name: str,
):
self.name = name

View File

@@ -0,0 +1,40 @@
from sqlalchemy import MetaData
from sqlalchemy.orm import (
DeclarativeBase,
declared_attr,
)
from sqlalchemy.orm.collections import (
InstrumentedDict,
InstrumentedList,
InstrumentedSet,
)
class Base(DeclarativeBase):
metadata = MetaData(
naming_convention={
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_`%(constraint_name)s`",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
}
)
@declared_attr.directive
def __tablename__(cls) -> str:
return cls.__name__
def __repr__(self) -> str:
columns = ", ".join(
f"{k}={repr(v)}" for k, v in self.__dict__.items() if not any([
k.startswith("_"),
# Ensure that we don't try to print out the entire list of
# relationships, which could create an infinite loop
isinstance(v, Base),
isinstance(v, InstrumentedList),
isinstance(v, InstrumentedSet),
isinstance(v, InstrumentedDict),
])
)
return f"<{self.__class__.__name__}({columns})>"

View File

@@ -0,0 +1,37 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import Text
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
from .mixins import (
UidMixin,
UniqueNameMixin,
)
if TYPE_CHECKING:
from .BookcaseShelf import BookcaseShelf
class Bookcase(Base, UidMixin, UniqueNameMixin):
description: Mapped[str | None] = mapped_column(Text)
shelfs: Mapped[list[BookcaseShelf]] = relationship(back_populates='bookcase')
def __init__(
self,
name: str,
description: str | None = None,
):
self.name = name
self.description = description
def short_str(self) -> str:
result = self.name
if self.description is not None:
result += f' [{self.description}]'
return result

View File

@@ -0,0 +1,79 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
ForeignKey,
Integer,
SmallInteger,
String,
Text,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
from .mixins import (
UidMixin,
UniqueNameMixin,
)
from .xref_tables import (
Item_Category,
Item_Author,
)
if TYPE_CHECKING:
from .Author import Author
from .BookcaseItemBorrowing import BookcaseItemBorrowing
from .BookcaseItemBorrowingQueue import BookcaseItemBorrowingQueue
from .BookcaseShelf import BookcaseShelf
from .Category import Category
from .Language import Language
from .MediaType import MediaType
from worblehat.flaskapp.database import db
class BookcaseItem(Base, UidMixin):
isbn: Mapped[int] = mapped_column(String, unique=True, index=True)
name: Mapped[str] = mapped_column(Text, index=True)
owner: Mapped[str] = mapped_column(String, default='PVV')
amount: Mapped[int] = mapped_column(SmallInteger, default=1)
fk_media_type_uid: Mapped[int] = mapped_column(ForeignKey('MediaType.uid'))
fk_bookcase_shelf_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseShelf.uid'))
fk_language_uid: Mapped[int | None] = mapped_column(ForeignKey('Language.uid'))
media_type: Mapped[MediaType] = relationship(back_populates='items')
shelf: Mapped[BookcaseShelf] = relationship(back_populates='items')
language: Mapped[Language] = relationship()
borrowings: Mapped[set[BookcaseItemBorrowing]] = relationship(back_populates='item')
borrowing_queue: Mapped[set[BookcaseItemBorrowingQueue]] = relationship(back_populates='item')
categories: Mapped[set[Category]] = relationship(
secondary = Item_Category.__table__,
back_populates = 'items',
)
authors: Mapped[set[Author]] = relationship(
secondary = Item_Author.__table__,
back_populates = 'items',
)
def __init__(
self,
name: str,
isbn: int | None = None,
owner: str = 'PVV',
):
self.name = name
self.isbn = isbn
self.owner = owner
@classmethod
def get_by_isbn(cls, isbn: str, sql_session: Session = db.session) -> Self | None:
"""
NOTE:
This method defaults to using the flask_sqlalchemy session.
It will not work outside of a request context, unless another session is provided.
"""
return sql_session.query(cls).where(cls.isbn == isbn).one_or_none()

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from datetime import datetime, timedelta
from sqlalchemy import (
Boolean,
ForeignKey,
String,
DateTime,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
from .mixins import UidMixin
if TYPE_CHECKING:
from .BookcaseItem import BookcaseItem
class BookcaseItemBorrowing(Base, UidMixin):
username: Mapped[str] = mapped_column(String)
start_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now())
end_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now() + timedelta(days=30))
delivered: Mapped[datetime | None] = mapped_column(DateTime, default=None)
fk_bookcase_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), index=True)
item: Mapped[BookcaseItem] = relationship(back_populates='borrowings')
def __init__(
self,
username: str,
item: BookcaseItem,
):
self.username = username
self.item = item
self.start_time = datetime.now()
self.end_time = datetime.now() + timedelta(days=30)

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from datetime import datetime
from sqlalchemy import (
ForeignKey,
String,
DateTime,
Boolean,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
from .mixins import UidMixin
if TYPE_CHECKING:
from .BookcaseItem import BookcaseItem
class BookcaseItemBorrowingQueue(Base, UidMixin):
username: Mapped[str] = mapped_column(String)
entered_queue_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now())
item_became_available_time: Mapped[datetime | None] = mapped_column(DateTime)
expired = mapped_column(Boolean, default=False)
fk_bookcase_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), index=True)
item: Mapped[BookcaseItem] = relationship(back_populates='borrowing_queue')
def __init__(
self,
username: str,
item: BookcaseItem,
):
self.username = username
self.item = item
self.entered_queue_time = datetime.now()

View File

@@ -0,0 +1,59 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
Integer,
ForeignKey,
SmallInteger,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
from .mixins import UidMixin
if TYPE_CHECKING:
from .Bookcase import Bookcase
from .BookcaseItem import BookcaseItem
# NOTE: Booshelfs are 0 indexed for both rows and columns,
# where cell 0-0 is placed in the lower right corner.
class BookcaseShelf(Base, UidMixin):
__table_args__ = (
UniqueConstraint(
'column',
'fk_bookcase_uid',
'row',
),
)
description: Mapped[str | None] = mapped_column(Text)
row: Mapped[int] = mapped_column(SmallInteger)
column: Mapped[int] = mapped_column(SmallInteger)
fk_bookcase_uid: Mapped[int] = mapped_column(ForeignKey('Bookcase.uid'))
bookcase: Mapped[Bookcase] = relationship(back_populates='shelfs')
items: Mapped[set[BookcaseItem]] = relationship(back_populates='shelf')
def __init__(
self,
row: int,
column: int,
bookcase: Bookcase,
description: str | None = None,
):
self.row = row
self.column = column
self.bookcase = bookcase
self.description = description
def short_str(self) -> str:
result = f'{self.column}-{self.row}'
if self.description is not None:
result += f' [{self.description}]'
return result

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import Text
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
from .mixins import (
UidMixin,
UniqueNameMixin,
)
from .xref_tables import Item_Category
if TYPE_CHECKING:
from .BookcaseItem import BookcaseItem
class Category(Base, UidMixin, UniqueNameMixin):
description: Mapped[str | None] = mapped_column(Text)
items: Mapped[set[BookcaseItem]] = relationship(
secondary=Item_Category.__table__,
back_populates='categories',
)
def __init__(
self,
name: str,
description: str | None = None,
):
self.name = name
self.description = description

View File

@@ -0,0 +1,27 @@
from datetime import datetime
from sqlalchemy import (
CheckConstraint,
DateTime,
Boolean,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
)
from .Base import Base
class DeadlineDaemonLastRunDatetime(Base):
__table_args__ = (
CheckConstraint(
'uid = true',
name = 'single_row_only',
),
)
uid: Mapped[bool] = mapped_column(Boolean, primary_key=True, default=True)
time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now())
def __init__(self, time: datetime | None = None):
if time is not None:
self.time = time

View File

@@ -0,0 +1,23 @@
# from sqlalchemy import Column, Integer, String, ForeignKey, Boolean
# from sqlalchemy.orm import relationship
from sqlalchemy import String
from sqlalchemy.orm import (
Mapped,
mapped_column,
)
from .Base import Base
from .mixins import UidMixin, UniqueNameMixin
class Language(Base, UidMixin, UniqueNameMixin):
iso639_1_code: Mapped[str] = mapped_column(String(2), unique=True, index=True)
def __init__(
self,
name: str,
iso639_1_code: str,
):
self.name = name
self.iso639_1_code = iso639_1_code

View File

@@ -0,0 +1,29 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import Text
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
from .mixins import UidMixin, UniqueNameMixin
if TYPE_CHECKING:
from .BookcaseItem import BookcaseItem
class MediaType(Base, UidMixin, UniqueNameMixin):
description: Mapped[str | None] = mapped_column(Text)
items: Mapped[set[BookcaseItem]] = relationship(back_populates='media_type')
def __init__(
self,
name: str,
description: str | None = None,
):
self.name = name
self.description = description

View File

@@ -0,0 +1,11 @@
from .Author import Author
from .Base import Base
from .Bookcase import Bookcase
from .BookcaseItem import BookcaseItem
from .BookcaseItemBorrowing import BookcaseItemBorrowing
from .BookcaseItemBorrowingQueue import BookcaseItemBorrowingQueue
from .BookcaseShelf import BookcaseShelf
from .Category import Category
from .DeadlineDaemonLastRunDatetime import DeadlineDaemonLastRunDatetime
from .Language import Language
from .MediaType import MediaType

View File

@@ -0,0 +1,56 @@
from alembic import context
from flask import current_app
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from worblehat.models import Base
from worblehat.services.config import Config
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
Config.load_configuration({})
config.set_main_option('sqlalchemy.url', Config.db_string())
# This will make sure alembic doesn't generate empty migrations
# https://stackoverflow.com/questions/70203927/how-to-prevent-alembic-revision-autogenerate-from-making-revision-file-if-it-h
def _process_revision_directives(context, revision, directives):
if config.cmd_opts.autogenerate:
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
print('No changes in schema detected. Not generating migration.')
def run_migrations_online() -> None:
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=Base.metadata,
# Extended type checking with alembic when generating migrations
# https://alembic.sqlalchemy.org/en/latest/autogenerate.html#what-does-autogenerate-detect-and-what-does-it-not-detect
compare_type=True,
# This is required for ALTER TABLE to work with sqlite.
# It should have no effect on postgreSQL
# https://alembic.sqlalchemy.org/en/latest/batch.html
render_as_batch=True,
process_revision_directives=_process_revision_directives,
)
with context.begin_transaction():
context.run_migrations()
# We don't have any good reasons to generate raw sql migrations,
# so the `run_migrations_offline` has been removed
run_migrations_online()

View File

@@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,183 @@
"""initial_migration
Revision ID: 7dfbf8a8dec8
Revises:
Create Date: 2024-07-31 21:07:13.434012
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7dfbf8a8dec8'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('Author',
sa.Column('uid', sa.Integer(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_Author'))
)
with op.batch_alter_table('Author', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_Author_name'), ['name'], unique=True)
op.create_table('Bookcase',
sa.Column('description', sa.Text(), nullable=True),
sa.Column('uid', sa.Integer(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_Bookcase'))
)
with op.batch_alter_table('Bookcase', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_Bookcase_name'), ['name'], unique=True)
op.create_table('Category',
sa.Column('description', sa.Text(), nullable=True),
sa.Column('uid', sa.Integer(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_Category'))
)
with op.batch_alter_table('Category', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_Category_name'), ['name'], unique=True)
op.create_table('DeadlineDaemonLastRunDatetime',
sa.Column('uid', sa.Boolean(), nullable=False),
sa.Column('time', sa.DateTime(), nullable=False),
sa.CheckConstraint('uid = true', name=op.f('ck_DeadlineDaemonLastRunDatetime_`single_row_only`')),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_DeadlineDaemonLastRunDatetime'))
)
op.create_table('Language',
sa.Column('iso639_1_code', sa.String(length=2), nullable=False),
sa.Column('uid', sa.Integer(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_Language'))
)
with op.batch_alter_table('Language', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_Language_iso639_1_code'), ['iso639_1_code'], unique=True)
batch_op.create_index(batch_op.f('ix_Language_name'), ['name'], unique=True)
op.create_table('MediaType',
sa.Column('description', sa.Text(), nullable=True),
sa.Column('uid', sa.Integer(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_MediaType'))
)
with op.batch_alter_table('MediaType', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_MediaType_name'), ['name'], unique=True)
op.create_table('BookcaseShelf',
sa.Column('description', sa.Text(), nullable=True),
sa.Column('row', sa.SmallInteger(), nullable=False),
sa.Column('column', sa.SmallInteger(), nullable=False),
sa.Column('fk_bookcase_uid', sa.Integer(), nullable=False),
sa.Column('uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_bookcase_uid'], ['Bookcase.uid'], name=op.f('fk_BookcaseShelf_fk_bookcase_uid_Bookcase')),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseShelf')),
sa.UniqueConstraint('column', 'fk_bookcase_uid', 'row', name=op.f('uq_BookcaseShelf_column'))
)
op.create_table('BookcaseItem',
sa.Column('isbn', sa.String(), nullable=False),
sa.Column('name', sa.Text(), nullable=False),
sa.Column('owner', sa.String(), nullable=False),
sa.Column('amount', sa.SmallInteger(), nullable=False),
sa.Column('fk_media_type_uid', sa.Integer(), nullable=False),
sa.Column('fk_bookcase_shelf_uid', sa.Integer(), nullable=False),
sa.Column('fk_language_uid', sa.Integer(), nullable=True),
sa.Column('uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_bookcase_shelf_uid'], ['BookcaseShelf.uid'], name=op.f('fk_BookcaseItem_fk_bookcase_shelf_uid_BookcaseShelf')),
sa.ForeignKeyConstraint(['fk_language_uid'], ['Language.uid'], name=op.f('fk_BookcaseItem_fk_language_uid_Language')),
sa.ForeignKeyConstraint(['fk_media_type_uid'], ['MediaType.uid'], name=op.f('fk_BookcaseItem_fk_media_type_uid_MediaType')),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseItem'))
)
with op.batch_alter_table('BookcaseItem', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_BookcaseItem_isbn'), ['isbn'], unique=True)
batch_op.create_index(batch_op.f('ix_BookcaseItem_name'), ['name'], unique=False)
op.create_table('BookcaseItemBorrowing',
sa.Column('username', sa.String(), nullable=False),
sa.Column('start_time', sa.DateTime(), nullable=False),
sa.Column('end_time', sa.DateTime(), nullable=False),
sa.Column('delivered', sa.DateTime(), nullable=True),
sa.Column('fk_bookcase_item_uid', sa.Integer(), nullable=False),
sa.Column('uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_bookcase_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_BookcaseItemBorrowing_fk_bookcase_item_uid_BookcaseItem')),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseItemBorrowing'))
)
with op.batch_alter_table('BookcaseItemBorrowing', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_BookcaseItemBorrowing_fk_bookcase_item_uid'), ['fk_bookcase_item_uid'], unique=False)
op.create_table('BookcaseItemBorrowingQueue',
sa.Column('username', sa.String(), nullable=False),
sa.Column('entered_queue_time', sa.DateTime(), nullable=False),
sa.Column('item_became_available_time', sa.DateTime(), nullable=True),
sa.Column('expired', sa.Boolean(), nullable=True),
sa.Column('fk_bookcase_item_uid', sa.Integer(), nullable=False),
sa.Column('uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_bookcase_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_BookcaseItemBorrowingQueue_fk_bookcase_item_uid_BookcaseItem')),
sa.PrimaryKeyConstraint('uid', name=op.f('pk_BookcaseItemBorrowingQueue'))
)
with op.batch_alter_table('BookcaseItemBorrowingQueue', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid'), ['fk_bookcase_item_uid'], unique=False)
op.create_table('Item_Author',
sa.Column('fk_item_uid', sa.Integer(), nullable=False),
sa.Column('fk_author_uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_author_uid'], ['Author.uid'], name=op.f('fk_Item_Author_fk_author_uid_Author')),
sa.ForeignKeyConstraint(['fk_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_Item_Author_fk_item_uid_BookcaseItem')),
sa.PrimaryKeyConstraint('fk_item_uid', 'fk_author_uid', name=op.f('pk_Item_Author'))
)
op.create_table('Item_Category',
sa.Column('fk_item_uid', sa.Integer(), nullable=False),
sa.Column('fk_category_uid', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['fk_category_uid'], ['Category.uid'], name=op.f('fk_Item_Category_fk_category_uid_Category')),
sa.ForeignKeyConstraint(['fk_item_uid'], ['BookcaseItem.uid'], name=op.f('fk_Item_Category_fk_item_uid_BookcaseItem')),
sa.PrimaryKeyConstraint('fk_item_uid', 'fk_category_uid', name=op.f('pk_Item_Category'))
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('Item_Category')
op.drop_table('Item_Author')
with op.batch_alter_table('BookcaseItemBorrowingQueue', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_BookcaseItemBorrowingQueue_fk_bookcase_item_uid'))
op.drop_table('BookcaseItemBorrowingQueue')
with op.batch_alter_table('BookcaseItemBorrowing', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_BookcaseItemBorrowing_fk_bookcase_item_uid'))
op.drop_table('BookcaseItemBorrowing')
with op.batch_alter_table('BookcaseItem', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_BookcaseItem_name'))
batch_op.drop_index(batch_op.f('ix_BookcaseItem_isbn'))
op.drop_table('BookcaseItem')
op.drop_table('BookcaseShelf')
with op.batch_alter_table('MediaType', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_MediaType_name'))
op.drop_table('MediaType')
with op.batch_alter_table('Language', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_Language_name'))
batch_op.drop_index(batch_op.f('ix_Language_iso639_1_code'))
op.drop_table('Language')
op.drop_table('DeadlineDaemonLastRunDatetime')
with op.batch_alter_table('Category', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_Category_name'))
op.drop_table('Category')
with op.batch_alter_table('Bookcase', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_Bookcase_name'))
op.drop_table('Bookcase')
with op.batch_alter_table('Author', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_Author_name'))
op.drop_table('Author')
# ### end Alembic commands ###

View File

@@ -0,0 +1,31 @@
from typing_extensions import Self
from sqlalchemy import Integer
from sqlalchemy.orm import (
Mapped,
Session,
mapped_column,
)
from worblehat.flaskapp.database import db
class UidMixin(object):
uid: Mapped[int] = mapped_column(Integer, primary_key=True)
@classmethod
def get_by_uid(cls, uid: int, sql_session: Session = db.session) -> Self | None:
"""
NOTE:
This method defaults to using the flask_sqlalchemy session.
It will not work outside of a request context, unless another session is provided.
"""
return sql_session.query(cls).where(cls.uid == uid).one_or_none()
@classmethod
def get_by_uid_or_404(cls, uid: int, sql_session: Session = db.session) -> Self:
"""
NOTE:
This method defaults to using the flask_sqlalchemy session.
It will not work outside of a request context, unless another session is provided.
"""
return sql_session.query(cls).where(cls.uid == uid).one_or_404()

View File

@@ -0,0 +1,31 @@
from typing_extensions import Self
from sqlalchemy import Text
from sqlalchemy.orm import (
Mapped,
Session,
mapped_column,
)
from worblehat.flaskapp.database import db
class UniqueNameMixin(object):
name: Mapped[str] = mapped_column(Text, unique=True, index=True)
@classmethod
def get_by_name(cls, name: str, sql_session: Session = db.session) -> Self | None:
"""
NOTE:
This method defaults to using the flask_sqlalchemy session.
It will not work outside of a request context, unless another session is provided.
"""
return sql_session.query(cls).where(cls.name == name).one_or_none()
@classmethod
def get_by_uid_or_404(cls, name: str, sql_session: Session = db.session) -> Self:
"""
NOTE:
This method defaults to using the flask_sqlalchemy session.
It will not work outside of a request context, unless another session is provided.
"""
return sql_session.query(cls).where(cls.name == name).one_or_404()

View File

@@ -0,0 +1,6 @@
from sqlalchemy.orm import declared_attr
class XrefMixin(object):
@declared_attr.directive
def __tablename__(cls) -> str:
return f'xref_{cls.__name__.lower()}'

View File

@@ -0,0 +1,2 @@
from .UidMixin import UidMixin
from .UniqueNameMixin import UniqueNameMixin

View File

@@ -0,0 +1,15 @@
from sqlalchemy import (
Integer,
ForeignKey,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
)
from ..Base import Base
from ..mixins.XrefMixin import XrefMixin
class Item_Author(Base, XrefMixin):
fk_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), primary_key=True)
fk_author_uid: Mapped[int] = mapped_column(ForeignKey('Author.uid'), primary_key=True)

View File

@@ -0,0 +1,15 @@
from sqlalchemy import (
Integer,
ForeignKey,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
)
from ..Base import Base
from ..mixins.XrefMixin import XrefMixin
class Item_Category(Base, XrefMixin):
fk_item_uid: Mapped[int] = mapped_column(ForeignKey('BookcaseItem.uid'), primary_key=True)
fk_category_uid: Mapped[int] = mapped_column(ForeignKey('Category.uid'), primary_key=True)

View File

@@ -0,0 +1,2 @@
from .Item_Author import Item_Author
from .Item_Category import Item_Category

View File

@@ -0,0 +1,11 @@
from .argument_parser import (
arg_parser,
devscripts_arg_parser,
)
from .bookcase_item import (
create_bookcase_item_from_isbn,
is_valid_isbn,
)
from .config import Config
from .email import send_email
from .seed_test_data import seed_data

View File

@@ -0,0 +1,66 @@
from argparse import ArgumentParser
from pathlib import Path
def _is_valid_file(parser: ArgumentParser, arg: str) -> Path:
path = Path(arg)
if not path.is_file():
parser.error(f'The file {arg} does not exist!')
return path
arg_parser = ArgumentParser(
description = 'Worblehat library management system',
)
subparsers = arg_parser.add_subparsers(dest='command')
subparsers.add_parser(
'deadline-daemon',
help = 'Initialize a single pass of the daemon which sends deadline emails',
)
subparsers.add_parser(
'cli',
help = 'Start the command line interface',
)
subparsers.add_parser(
'flask-dev',
help = 'Start the web interface in development mode',
)
subparsers.add_parser(
'flask-prod',
help = 'Start the web interface in production mode',
)
devscripts_arg_parser = subparsers.add_parser('devscripts', help='Run development scripts')
devscripts_subparsers = devscripts_arg_parser.add_subparsers(dest='script')
devscripts_subparsers.add_parser(
'seed-test-data',
help = 'Seed test data in the database',
)
devscripts_subparsers.add_parser(
'seed-content-for-deadline-daemon',
help = 'Seed data tailorded for testing the deadline daemon, into the database',
)
arg_parser.add_argument(
'-V',
'--version',
action = 'store_true',
help = 'Print version and exit',
)
arg_parser.add_argument(
'-c',
'--config',
type=lambda x: _is_valid_file(arg_parser, x),
help = 'Path to config file',
dest = 'config_file',
metavar = 'FILE',
)
arg_parser.add_argument(
'-p',
'--print-config',
action = 'store_true',
help = 'Print configuration and quit',
)

View File

@@ -0,0 +1,60 @@
import isbnlib
from sqlalchemy import select
from sqlalchemy.orm import Session
from .metadata_fetchers import fetch_metadata_from_multiple_sources
from ..models import (
Author,
BookcaseItem,
Language,
)
def is_valid_pvv_isbn(isbn: str) -> bool:
try:
int(isbn)
except ValueError:
return False
return len(isbn) == 8
def is_valid_isbn(isbn: str) -> bool:
return any([
isbnlib.is_isbn10(isbn),
isbnlib.is_isbn13(isbn),
])
def create_bookcase_item_from_isbn(isbn: str, sql_session: Session) -> BookcaseItem | None:
"""
This function fetches metadata for the given ISBN and creates a BookcaseItem from it.
It does so using a database connection to connect it to the correct authors and language
through the sql ORM.
If no metadata is found, None is returned.
Please not that the returned BookcaseItem will likely not be fully populated with the required
data, such as the book's location in the library, and the owner of the book, etc.
"""
metadata = fetch_metadata_from_multiple_sources(isbn)
if len(metadata) == 0:
return None
metadata = metadata[0]
bookcase_item = BookcaseItem(
name = metadata.title,
isbn = int(isbn.replace('-', '')),
)
if len(authors := metadata.authors) > 0:
for author in authors:
bookcase_item.authors.add(Author(author))
if (language := metadata.language):
bookcase_item.language = sql_session.scalars(
select(Language)
.where(Language.iso639_1_code == language)
).one()
return bookcase_item

View File

@@ -0,0 +1,96 @@
from pathlib import Path
import tomllib
from typing import Any
from pprint import pformat
class Config:
"""
This class is a singleton which holds the configuration for the
application. It is initialized by calling `Config.load_configuration()`
with a dictionary of arguments. The arguments are usually the result
of calling `vars(arg_parser.parse_args())` where `arg_parser` i s the
argument parser from `worblehat/services/argument_parser.py`.
The class also provides some utility functions for accessing several
kinds of values that depend on the configuration.
"""
_config = None
_expected_config_file_locations = [
Path('./config.toml'),
Path('~/.config/worblehat/config.toml'),
Path('/var/lib/worblehat/config.toml'),
]
def __class_getitem__(cls, name: str) -> Any:
if cls._config is None:
raise RuntimeError('Configuration not loaded, call Config.load_configuration() first.')
__config = cls._config
for attr in name.split('.'):
__config = __config.get(attr)
if __config is None:
raise AttributeError(f'No such attribute: {name}')
return __config
@staticmethod
def read_password(password_field: str) -> str:
if Path(password_field).is_file():
with open(password_field, 'r') as f:
return f.read()
else:
return password_field
@classmethod
def _locate_configuration_file(cls) -> Path | None:
for path in cls._expected_config_file_locations:
if path.is_file():
return path
@classmethod
def _load_configuration_from_file(cls, config_file_path: str | None) -> dict[str, any]:
if config_file_path is None:
config_file_path = cls._locate_configuration_file()
if config_file_path is None:
print('Error: could not locate configuration file.')
exit(1)
with open(config_file_path, 'rb') as config_file:
args = tomllib.load(config_file)
return args
@classmethod
def db_string(cls) -> str:
db_type = cls._config.get('database').get('type')
if db_type == 'sqlite':
path = Path(cls._config.get('database').get('sqlite').get('path'))
return f"sqlite:///{path.absolute()}"
elif db_type == 'postgresql':
db_config = cls._config.get('database').get('postgresql')
hostname = db_config.get('hostname')
port = db_config.get('port')
username = db_config.get('username')
password = cls.read_password(db_config.get('password'))
database = db_config.get('database')
return f"psycopg2+postgresql://{username}:{password}@{hostname}:{port}/{database}"
else:
print(f"Error: unknown database type '{db_config.get('type')}'")
exit(1)
@classmethod
def debug(cls) -> str:
return pformat(cls._config)
@classmethod
def load_configuration(cls, args: dict[str, any]) -> dict[str, any]:
cls._config = cls._load_configuration_from_file(args.get('config_file'))

View File

@@ -0,0 +1,35 @@
import smtplib
from textwrap import indent
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from .config import Config
def send_email(to: str, subject: str, body: str):
msg = MIMEMultipart()
msg['From'] = Config['smtp.from']
msg['To'] = to
if Config['smtp.subject_prefix']:
msg['Subject'] = f"{Config['smtp.subject_prefix']} {subject}"
else:
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
if Config['smtp.enabled'] and not Config['deadline_daemon.dryrun']:
try:
with smtplib.SMTP(Config['smtp.host'], Config['smtp.port']) as server:
server.starttls()
server.login(
Config['smtp.username'],
Config.read_password(Config['smtp.password']),
)
server.sendmail(Config['smtp.from'], to, msg.as_string())
except Exception as err:
print('Error: could not send email.')
print(err)
else:
print('Debug: Email sending is disabled, so the following email was not sent:')
print(indent(msg.as_string(), ' '))

View File

@@ -0,0 +1,62 @@
from dataclasses import dataclass
from typing import Set
# TODO: Add more languages
LANGUAGES: set[str] = set([
"no",
"en",
"de",
"fr",
"es",
"it",
"sv",
"da",
"fi",
"ru",
"zh",
"ja",
"ko",
])
@dataclass
class BookMetadata:
"""A class representing metadata for a book."""
isbn: str
title: str
# The source of the metadata provider
source: str
authors: Set[str]
language: str | None
publish_date: str | None
num_pages: int | None
subjects: Set[str]
def to_dict(self) -> dict[str, any]:
return {
'isbn': self.isbn,
'title': self.title,
'source': self.metadata_source_id(),
'authors': set() if self.authors is None else self.authors,
'language': self.language,
'publish_date': self.publish_date,
'num_pages': self.num_pages,
'subjects': set() if self.subjects is None else self.subjects
}
def validate(self) -> None:
if not self.isbn:
raise ValueError('Missing ISBN')
if not self.title:
raise ValueError('Missing title')
if not self.source:
raise ValueError('Missing source')
if not self.authors:
raise ValueError('Missing authors')
if self.language is not None and self.language not in LANGUAGES:
raise ValueError(f'Invalid language: {self.language}. Consider adding it to the LANGUAGES set if you think this is a mistake.')
if self.num_pages is not None and self.num_pages < 0:
raise ValueError(f'Invalid number of pages: {self.num_pages}')

View File

@@ -0,0 +1,20 @@
#base fetcher.
from abc import ABC, abstractmethod
from .BookMetadata import BookMetadata
class BookMetadataFetcher(ABC):
"""
A base class for metadata fetchers.
"""
@classmethod
@abstractmethod
def metadata_source_id(cls) -> str:
"""Returns a unique identifier for the metadata source, to identify where the metadata came from."""
pass
@classmethod
@abstractmethod
def fetch_metadata(cls, isbn: str) -> BookMetadata | None:
"""Tries to fetch metadata for the given ISBN."""
pass

View File

@@ -0,0 +1,51 @@
"""
A BookMetadataFetcher for the Google Books API.
"""
import requests
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
from worblehat.services.metadata_fetchers.BookMetadata import BookMetadata
class GoogleBooksFetcher(BookMetadataFetcher):
@classmethod
def metadata_source_id(_cls) -> str:
return "google_books"
@classmethod
def fetch_metadata(cls, isbn: str) -> BookMetadata | None:
try:
jsonInput = requests.get(
f"https://www.googleapis.com/books/v1/volumes",
params = {"q": f"isbn:{isbn}"},
).json()
data = jsonInput.get("items")[0].get("volumeInfo")
authors = set(data.get("authors") or [])
title = data.get("title")
publishDate = data.get("publish_date")
numberOfPages = data.get("number_of_pages")
if numberOfPages:
numberOfPages = int(numberOfPages)
subjects = set(data.get("categories") or [])
languages = data.get("languages")
except Exception:
return None
return BookMetadata(
isbn = isbn,
title = title,
source = cls.metadata_source_id(),
authors = authors,
language = languages,
publish_date = publishDate,
num_pages = numberOfPages,
subjects = subjects,
)
if __name__ == '__main__':
book_data = GoogleBooksFetcher.fetch_metadata('0132624788')
book_data.validate()
print(book_data)

View File

@@ -0,0 +1,61 @@
"""
A BookMetadataFetcher for the Open Library API.
"""
import requests
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
from worblehat.services.metadata_fetchers.BookMetadata import BookMetadata
LANGUAGE_MAP = {
"Norwegian": "no",
}
class OpenLibraryFetcher(BookMetadataFetcher):
@classmethod
def metadata_source_id(_cls) -> str:
return "open_library"
@classmethod
def fetch_metadata(cls, isbn: str) -> BookMetadata | None:
try:
jsonInput = requests.get(f"https://openlibrary.org/isbn/{isbn}.json").json()
author_keys = jsonInput.get("authors") or []
author_names = set()
for author_key in author_keys:
key = author_key.get('key')
author_name = requests.get(f"https://openlibrary.org/{key}.json").json().get("name")
author_names.add(author_name)
title = jsonInput.get("title")
publishDate = jsonInput.get("publish_date")
numberOfPages = jsonInput.get("number_of_pages")
if numberOfPages:
numberOfPages = int(numberOfPages)
language_key = jsonInput.get("languages")[0].get("key")
language = requests.get(f"https://openlibrary.org/{language_key}.json").json().get("identifiers").get("iso_639_1")[0]
subjects = set(jsonInput.get("subjects") or [])
except Exception:
return None
return BookMetadata(
isbn = isbn,
title = title,
source = cls.metadata_source_id(),
authors = author_names,
language = language,
publish_date = publishDate,
num_pages = numberOfPages,
subjects = subjects,
)
if __name__ == '__main__':
book_data = OpenLibraryFetcher.fetch_metadata('9788205530751')
book_data.validate()
print(book_data)

View File

@@ -0,0 +1,109 @@
"""
A BookMetadataFetcher that webscrapes https://outland.no/
"""
from bs4 import BeautifulSoup
import requests
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
from worblehat.services.metadata_fetchers.BookMetadata import BookMetadata
LANGUAGE_MAP = {
"Norsk": "no",
"Engelsk": "en",
"Tysk": "de",
"Fransk": "fr",
"Spansk": "es",
"Italiensk": "it",
"Svensk": "sv",
"Dansk": "da",
"Finsk": "fi",
"Russisk": "ru",
"Kinesisk": "zh",
"Japansk": "ja",
"Koreansk": "ko",
}
class OutlandScraperFetcher(BookMetadataFetcher):
@classmethod
def metadata_source_id(_cls) -> str:
return "outland_scraper"
@classmethod
def fetch_metadata(cls, isbn: str) -> BookMetadata | None:
try:
# Find the link to the product page
response = requests.get(f"https://outland.no/{isbn}")
soup = BeautifulSoup(response.content, "html.parser")
soup = soup.find_all("a", class_="product-item-link")
href = soup[0].get("href")
# Find the metadata on the product page
response = requests.get(href)
soup = BeautifulSoup(response.content, "html.parser")
data = soup.find_all("td", class_="col data")
# Collect the metadata
title = soup.find_all("span", class_="base")[0].text
releaseDate = soup.find_all("span", class_="release-date")[0].text.strip()
releaseDate = releaseDate[-4:] # only keep year
bookData = {
"Title": title,
"PublishDate": releaseDate,
"Authors": None,
"NumberOfPages": None,
"Genre": None,
"Language": None,
"Subjects": None,
}
dataKeyMap = {
"Authors": "Forfattere",
"NumberOfPages": "Antall Sider",
"Genre": "Sjanger",
"Language": "Språk",
"Subjects": "Serie"
}
for value in data:
for key in dataKeyMap:
if str(value).lower().__contains__(dataKeyMap[key].lower()):
bookData[key] = value.text
break
if bookData["Language"] is not None:
bookData["Language"] = LANGUAGE_MAP.get(bookData["Language"])
if bookData["Authors"] is not None:
bookData["Authors"] = set(bookData["Authors"].split(", "))
if bookData["Subjects"] is not None:
bookData["Subjects"] = set(bookData["Subjects"].split(", "))
if bookData["NumberOfPages"] is not None:
bookData["NumberOfPages"] = int(bookData["NumberOfPages"])
except Exception:
return None
return BookMetadata(
isbn = isbn,
title = bookData.get('Title'),
source = cls.metadata_source_id(),
authors = bookData.get('Authors'),
language = bookData.get('Language'),
publish_date = bookData.get('PublishDate'),
num_pages = bookData.get('NumberOfPages'),
subjects = bookData.get('Subjects'),
)
if __name__ == '__main__':
book_data = OutlandScraperFetcher.fetch_metadata('9781947808225')
book_data.validate()
print(book_data)

View File

@@ -0,0 +1 @@
from .book_metadata_fetcher import fetch_metadata_from_multiple_sources

View File

@@ -0,0 +1,80 @@
"""
this module contains the fetch_book_metadata() function which fetches book metadata from multiple sources in threads and returns the higest ranked non-None result.
"""
from concurrent.futures import ThreadPoolExecutor
from worblehat.services.metadata_fetchers.BookMetadata import BookMetadata
from worblehat.services.metadata_fetchers.BookMetadataFetcher import BookMetadataFetcher
from worblehat.services.metadata_fetchers.GoogleBooksFetcher import GoogleBooksFetcher
from worblehat.services.metadata_fetchers.OpenLibraryFetcher import OpenLibraryFetcher
from worblehat.services.metadata_fetchers.OutlandScraperFetcher import OutlandScraperFetcher
# The order of these fetchers determines the priority of the sources.
# The first fetcher in the list has the highest priority.
FETCHERS: list[BookMetadataFetcher] = [
OpenLibraryFetcher,
GoogleBooksFetcher,
OutlandScraperFetcher,
]
FETCHER_SOURCE_IDS: list[str] = [fetcher.metadata_source_id() for fetcher in FETCHERS]
def sort_metadata_by_priority(metadata: list[BookMetadata]) -> list[BookMetadata]:
"""
Sorts the given metadata by the priority of the sources.
The order of the metadata is the same as the order of the sources in the FETCHERS list.
"""
# Note that this function is O(n^2) but the number of fetchers is small so it's fine.
return sorted(metadata, key=lambda m: FETCHER_SOURCE_IDS.index(m.source))
def fetch_metadata_from_multiple_sources(isbn: str, strict=False) -> list[BookMetadata]:
"""
Returns a list of metadata fetched from multiple sources.
Sources that does not have metadata for the given ISBN will be ignored.
There is no guarantee that there will be any metadata.
The results are always ordered in the same way as the fetchers are listed in the FETCHERS list.
"""
isbn = isbn.replace('-', '').replace('_', '').strip().lower()
if len(isbn) != 10 and len(isbn) != 13 and not isbn.isnumeric():
raise ValueError('Invalid ISBN')
results: list[BookMetadata] = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(fetcher.fetch_metadata, isbn) for fetcher in FETCHERS]
for future in futures:
result = future.result()
if result is not None:
results.append(result)
for result in results:
try:
result.validate()
except ValueError as e:
if strict:
raise e
else:
print(f'Invalid metadata: {e}')
results.remove(result)
return sort_metadata_by_priority(results)
if __name__ == '__main__':
from pprint import pprint
isbn = '0132624788'
metadata = fetch_metadata_from_multiple_sources(isbn)
pprint(metadata)

View File

@@ -0,0 +1,232 @@
import csv
from datetime import datetime, timedelta
from pathlib import Path
from sqlalchemy.orm import Session
from worblehat.flaskapp.database import db
from ..models import (
Author,
Bookcase,
BookcaseItem,
BookcaseItemBorrowing,
BookcaseItemBorrowingQueue,
BookcaseShelf,
Language,
MediaType,
)
def seed_data(sql_session: Session = db.session):
media_types = [
MediaType(name='Book', description='A physical book'),
MediaType(name='Comic', description='A comic book'),
MediaType(name='Video Game', description='A digital game for computers or games consoles'),
MediaType(name='Tabletop Game', description='A physical game with cards, boards or similar')
]
bookcases = [
Bookcase(name='Unnamed A', description='White case across dibbler'),
Bookcase(name='Unnamed B', description='Math case in the working room'),
Bookcase(name='Unnamed C', description='Large case in the working room'),
Bookcase(name='Unnamed D', description='White comics case in the hallway'),
Bookcase(name='Unnamed E', description='Wooden comics case in the hallway'),
]
shelfs = [
BookcaseShelf(row=0, column=0, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=0, bookcase=bookcases[0]),
BookcaseShelf(row=2, column=0, bookcase=bookcases[0]),
BookcaseShelf(row=3, column=0, bookcase=bookcases[0], description="Hacking"),
BookcaseShelf(row=4, column=0, bookcase=bookcases[0], description="Hacking"),
BookcaseShelf(row=0, column=1, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=1, bookcase=bookcases[0]),
BookcaseShelf(row=2, column=1, bookcase=bookcases[0], description="DOS"),
BookcaseShelf(row=3, column=1, bookcase=bookcases[0], description="Food for thought"),
BookcaseShelf(row=4, column=1, bookcase=bookcases[0], description="CPP"),
BookcaseShelf(row=0, column=2, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=2, bookcase=bookcases[0]),
BookcaseShelf(row=2, column=2, bookcase=bookcases[0], description="E = mc2"),
BookcaseShelf(row=3, column=2, bookcase=bookcases[0], description="OBJECTION!"),
BookcaseShelf(row=4, column=2, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=3, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=3, bookcase=bookcases[0], description="Big indonisian island"),
BookcaseShelf(row=2, column=3, bookcase=bookcases[0]),
BookcaseShelf(row=3, column=3, bookcase=bookcases[0], description="Div science"),
BookcaseShelf(row=4, column=3, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=4, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=4, bookcase=bookcases[0]),
BookcaseShelf(row=2, column=4, bookcase=bookcases[0], description="(not) computer vision"),
BookcaseShelf(row=3, column=4, bookcase=bookcases[0], description="Low voltage"),
BookcaseShelf(row=4, column=4, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=5, bookcase=bookcases[0]),
BookcaseShelf(row=1, column=5, bookcase=bookcases[0]),
BookcaseShelf(row=2, column=5, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=3, column=5, bookcase=bookcases[0], description="/home"),
BookcaseShelf(row=0, column=0, bookcase=bookcases[1]),
BookcaseShelf(row=1, column=0, bookcase=bookcases[1], description="Kjellerarealer og komodovaraner"),
BookcaseShelf(row=2, column=0, bookcase=bookcases[1]),
BookcaseShelf(row=3, column=0, bookcase=bookcases[1], description="Quick mafs"),
BookcaseShelf(row=4, column=0, bookcase=bookcases[1]),
BookcaseShelf(row=0, column=0, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=0, bookcase=bookcases[2]),
BookcaseShelf(row=2, column=0, bookcase=bookcases[2], description="AI"),
BookcaseShelf(row=3, column=0, bookcase=bookcases[2], description="X86"),
BookcaseShelf(row=4, column=0, bookcase=bookcases[2], description="Humanoira"),
BookcaseShelf(row=5, column=0, bookcase=bookcases[2], description="Hvem monterte rørforsterker?"),
BookcaseShelf(row=0, column=1, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=1, bookcase=bookcases[2], description="Div data"),
BookcaseShelf(row=2, column=1, bookcase=bookcases[2], description="Chemistry"),
BookcaseShelf(row=3, column=1, bookcase=bookcases[2], description="Soviet Phys. Techn. Phys"),
BookcaseShelf(row=4, column=1, bookcase=bookcases[2], description="Digitalteknikk"),
BookcaseShelf(row=5, column=1, bookcase=bookcases[2], description="Material"),
BookcaseShelf(row=0, column=2, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=2, bookcase=bookcases[2], description="Assembler / APL"),
BookcaseShelf(row=2, column=2, bookcase=bookcases[2], description="Internet"),
BookcaseShelf(row=3, column=2, bookcase=bookcases[2], description="Algorithms"),
BookcaseShelf(row=4, column=2, bookcase=bookcases[2], description="Soviet Physics Jetp"),
BookcaseShelf(row=5, column=2, bookcase=bookcases[2], description="Død og pine"),
BookcaseShelf(row=0, column=3, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=3, bookcase=bookcases[2], description="Web"),
BookcaseShelf(row=2, column=3, bookcase=bookcases[2], description="Div languages"),
BookcaseShelf(row=3, column=3, bookcase=bookcases[2], description="Python"),
BookcaseShelf(row=4, column=3, bookcase=bookcases[2], description="D&D Minis"),
BookcaseShelf(row=5, column=3, bookcase=bookcases[2], description="Perl"),
BookcaseShelf(row=0, column=4, bookcase=bookcases[2]),
BookcaseShelf(row=1, column=4, bookcase=bookcases[2], description="Knuth on programming"),
BookcaseShelf(row=2, column=4, bookcase=bookcases[2], description="Div languages"),
BookcaseShelf(row=3, column=4, bookcase=bookcases[2], description="Typesetting"),
BookcaseShelf(row=4, column=4, bookcase=bookcases[2]),
BookcaseShelf(row=0, column=0, bookcase=bookcases[3]),
BookcaseShelf(row=0, column=1, bookcase=bookcases[3]),
BookcaseShelf(row=0, column=2, bookcase=bookcases[3]),
BookcaseShelf(row=0, column=3, bookcase=bookcases[3]),
BookcaseShelf(row=0, column=4, bookcase=bookcases[3]),
BookcaseShelf(row=0, column=0, bookcase=bookcases[4]),
BookcaseShelf(row=0, column=1, bookcase=bookcases[4]),
BookcaseShelf(row=0, column=2, bookcase=bookcases[4]),
BookcaseShelf(row=0, column=3, bookcase=bookcases[4]),
BookcaseShelf(row=0, column=4, bookcase=bookcases[4], description="Religion"),
]
authors = [
Author(name="Donald E. Knuth"),
Author(name="J.K. Rowling"),
Author(name="J.R.R. Tolkien"),
Author(name="George R.R. Martin"),
Author(name="Stephen King"),
Author(name="Agatha Christie"),
]
book1 = BookcaseItem(
name = "The Art of Computer Programming",
isbn = "9780201896831",
)
book1.authors.add(authors[0])
book1.media_type = media_types[0]
book1.shelf = shelfs[59]
book2 = BookcaseItem(
name = "Harry Potter and the Philosopher's Stone",
isbn = "9780747532743",
)
book2.authors.add(authors[1])
book2.media_type = media_types[0]
book2.shelf = shelfs[-1]
book_owned_by_other_user = BookcaseItem(
name = "Book owned by other user",
isbn = "9780747532744",
)
book_owned_by_other_user.owner = "other_user"
book_owned_by_other_user.authors.add(authors[4])
book_owned_by_other_user.media_type = media_types[0]
book_owned_by_other_user.shelf = shelfs[-2]
borrowed_book_more_available = BookcaseItem(
name = "Borrowed book with more available",
isbn = "9780747532745",
)
borrowed_book_more_available.authors.add(authors[5])
borrowed_book_more_available.media_type = media_types[0]
borrowed_book_more_available.shelf = shelfs[-3]
borrowed_book_more_available.amount = 2
borrowed_book_no_more_available = BookcaseItem(
name = "Borrowed book with no more available",
isbn = "9780747532746",
)
borrowed_book_no_more_available.authors.add(authors[5])
borrowed_book_no_more_available.media_type = media_types[0]
borrowed_book_no_more_available.shelf = shelfs[-3]
borrowed_book_people_in_queue = BookcaseItem(
name = "Borrowed book with people in queue",
isbn = "9780747532747",
)
borrowed_book_people_in_queue.authors.add(authors[5])
borrowed_book_people_in_queue.media_type = media_types[0]
borrowed_book_people_in_queue.shelf = shelfs[-3]
borrowed_book_by_slabbedask = BookcaseItem(
name = "Borrowed book by slabbedask",
isbn = "9780747532748",
)
borrowed_book_by_slabbedask.authors.add(authors[5])
borrowed_book_by_slabbedask.media_type = media_types[0]
borrowed_book_by_slabbedask.shelf = shelfs[-3]
books = [
book1,
book2,
book_owned_by_other_user,
borrowed_book_more_available,
borrowed_book_no_more_available,
borrowed_book_people_in_queue,
]
slabbedask_borrowing = BookcaseItemBorrowing(
username="slabbedask",
item=borrowed_book_more_available,
)
slabbedask_borrowing.end_time = datetime.now() - timedelta(days=1)
borrowings = [
BookcaseItemBorrowing(username="user", item=borrowed_book_more_available),
BookcaseItemBorrowing(username="user", item=borrowed_book_no_more_available),
BookcaseItemBorrowing(username="user", item=borrowed_book_people_in_queue),
slabbedask_borrowing,
]
queue = [
BookcaseItemBorrowingQueue(username="user", item=borrowed_book_people_in_queue),
]
with open(Path(__file__).parent.parent.parent / 'data' / 'iso639_1.csv') as f:
reader = csv.reader(f)
languages = [Language(name, code) for (code, name) in reader]
sql_session.add_all(media_types)
sql_session.add_all(bookcases)
sql_session.add_all(shelfs)
sql_session.add_all(languages)
sql_session.add_all(authors)
sql_session.add_all(books)
sql_session.add_all(borrowings)
sql_session.add_all(queue)
sql_session.commit()
print("Added test media types, bookcases and shelfs.")