This commit is contained in:
2025-05-06 17:09:30 +02:00
parent 4a4f0e6947
commit de3e83a64e
12 changed files with 283 additions and 236 deletions

View File

@@ -10,12 +10,18 @@ from sqlalchemy.orm.collections import (
)
def _pascal_case_to_snake_case(name: str) -> str:
return "".join(
["_" + i.lower() if i.isupper() else i for i in name]
).lstrip("_")
class Base(DeclarativeBase):
metadata = MetaData(
naming_convention={
"ix": "ix_%(column_0_label)s",
"ix": "ix_%(table_name)s_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_`%(constraint_name)s`",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}
@@ -23,7 +29,7 @@ class Base(DeclarativeBase):
@declared_attr.directive
def __tablename__(cls) -> str:
return cls.__name__
return _pascal_case_to_snake_case(cls.__name__)
def __repr__(self) -> str:
columns = ", ".join(

View File

@@ -5,43 +5,99 @@ from sqlalchemy import (
Boolean,
Integer,
String,
func,
select,
)
from sqlalchemy.orm import (
Mapped,
Session,
mapped_column,
relationship,
)
from .Base import Base
from .Transaction import Transaction
from .TransactionType import TransactionType
import dibbler.models.User as user
if TYPE_CHECKING:
from .PurchaseEntry import PurchaseEntry
from .UserProducts import UserProducts
# if TYPE_CHECKING:
# from .PurchaseEntry import PurchaseEntry
# from .UserProducts import UserProducts
class Product(Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
product_id: Mapped[int] = mapped_column(Integer, primary_key=True)
bar_code: Mapped[str] = mapped_column(String(13))
name: Mapped[str] = mapped_column(String(45))
price: Mapped[int] = mapped_column(Integer)
stock: Mapped[int] = mapped_column(Integer)
# price: Mapped[int] = mapped_column(Integer)
# stock: Mapped[int] = mapped_column(Integer)
hidden: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
purchases: Mapped[set[PurchaseEntry]] = relationship(back_populates="product")
users: Mapped[set[UserProducts]] = relationship(back_populates="product")
# - count (virtual)
def stock(self, sql_session: Session) -> int:
"""
Returns the number of products in stock.
"""
bar_code_re = r"[0-9]+"
name_re = r".+"
name_length = 45
added_products = sql_session.scalars(
select(func.sum(Transaction.product_count))
.where(
Transaction.type == TransactionType.ADD_PRODUCT,
Transaction.product_id == self.id,
)
).one_or_none()
def __init__(self, bar_code, name, price, stock=0, hidden=False):
self.name = name
self.bar_code = bar_code
self.price = price
self.stock = stock
self.hidden = hidden
bought_products = sql_session.scalars(
select(func.sum(Transaction.product_count))
.where(
Transaction.type == TransactionType.BUY_PRODUCT,
Transaction.product_id == self.id,
)
).one_or_none()
def __str__(self):
return self.name
return (added_products or 0) - (bought_products or 0)
def remaining_with_exact_price(self, sql_session: Session) -> list[int]:
"""
Retrieves the remaining products with their exact price as they were bought.
"""
stock = self.stock(sql_session)
# TODO: only retrieve as many transactions as exists in the stock
last_added = sql_session.scalars(
select(Transaction)
.where(
Transaction.type == TransactionType.ADD_PRODUCT,
Transaction.product_id == self.id,
)
.order_by(Transaction.time.desc())
).all()
# result = []
# while stock > 0 and last_added:
...
def price(self, sql_session: Session) -> int:
"""
Returns the price of the product.
Average price over the last bought products.
"""
remaining = self.remaining_with_exact_price(sql_session)
if not remaining:
return 0
prices = [price for price in remaining]
return sum(prices) // len(prices)
def owned_by_user(self, sql_session: Session) -> dict[user.User, int]:
"""
Returns an overview of how many of the remaining products are owned by which user.
"""
...

View File

@@ -1,70 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from datetime import datetime
import math
from sqlalchemy import (
DateTime,
Integer,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
from .Transaction import Transaction
if TYPE_CHECKING:
from .PurchaseEntry import PurchaseEntry
class Purchase(Base):
__tablename__ = "purchases"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
time: Mapped[datetime] = mapped_column(DateTime)
price: Mapped[int] = mapped_column(Integer)
transactions: Mapped[set[Transaction]] = relationship(
back_populates="purchase", order_by="Transaction.user_name"
)
entries: Mapped[set[PurchaseEntry]] = relationship(back_populates="purchase")
def __init__(self):
pass
def is_complete(self):
return len(self.transactions) > 0 and len(self.entries) > 0
def price_per_transaction(self, round_up=True):
if round_up:
return int(math.ceil(float(self.price) / len(self.transactions)))
else:
return int(math.floor(float(self.price) / len(self.transactions)))
def set_price(self, round_up=True):
self.price = 0
for entry in self.entries:
self.price += entry.amount * entry.product.price
if len(self.transactions) > 0:
for t in self.transactions:
t.amount = self.price_per_transaction(round_up=round_up)
def perform_purchase(self, ignore_penalty=False, round_up=True):
self.time = datetime.datetime.now()
self.set_price(round_up=round_up)
for t in self.transactions:
t.perform_transaction(ignore_penalty=ignore_penalty)
for entry in self.entries:
entry.product.stock -= entry.amount
def perform_soft_purchase(self, price, round_up=True):
self.time = datetime.datetime.now()
self.price = price
for t in self.transactions:
t.amount = self.price_per_transaction(round_up=round_up)
for t in self.transactions:
t.perform_transaction()

View File

@@ -1,37 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
Integer,
ForeignKey,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
if TYPE_CHECKING:
from .Product import Product
from .Purchase import Purchase
class PurchaseEntry(Base):
__tablename__ = "purchase_entries"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
amount: Mapped[int] = mapped_column(Integer)
product_id: Mapped[int] = mapped_column(ForeignKey("products.product_id"))
purchase_id: Mapped[int] = mapped_column(ForeignKey("purchases.id"))
product: Mapped[Product] = relationship(lazy="joined")
purchase: Mapped[Purchase] = relationship(lazy="joined")
def __init__(self, purchase, product, amount):
self.product = product
self.product_bar_code = product.bar_code
self.purchase = purchase
self.amount = amount

View File

@@ -4,49 +4,86 @@ from typing import TYPE_CHECKING
from datetime import datetime
from sqlalchemy import (
Boolean,
DateTime,
ForeignKey,
Integer,
String,
CheckConstraint
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from sqlalchemy.sql.schema import Index
from .Base import Base
from .TransactionType import TransactionType
if TYPE_CHECKING:
from .User import User
from .Purchase import Purchase
from .Product import Product
# TODO: allow for joint transactions?
# dibbler allows joint transactions (e.g. buying more than one product at once, several people buying the same product, etc.)
# instead of having the software split the transactions up, making them hard to reconnect,
# maybe we should add some sort of joint transaction id field to allow multiple transactions to be grouped together?
class Transaction(Base):
__tablename__ = "transactions"
__table_args__ = (
CheckConstraint(
f'type != \'{TransactionType.TRANSFER}\' OR transfer_user_id IS NOT NULL',
name='trx_type_transfer_required_fields',
),
CheckConstraint(
f'type != \'{TransactionType.ADD_PRODUCT}\' OR (product_id IS NOT NULL AND per_product IS NOT NULL AND product_count IS NOT NULL)',
name='trx_type_add_product_required_fields',
),
CheckConstraint(
f'type != \'{TransactionType.BUY_PRODUCT}\' OR (product_id IS NOT NULL AND product_count IS NOT NULL)',
name='trx_type_buy_product_required_fields',
),
# Speed up product count calculation
Index('product_user_time', 'product_id', 'user_id', 'time'),
# Speed up product owner calculation
Index('user_product_time', 'user_id', 'product_id', 'time'),
# Speed up user transaction list / credit calculation
Index('user_time', 'user_id', 'time'),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
time: Mapped[datetime] = mapped_column(DateTime)
# The type of transaction
type: Mapped[TransactionType] = mapped_column(TransactionType)
# The amount of money being added or subtracted from the user's credit
amount: Mapped[int] = mapped_column(Integer)
penalty: Mapped[int] = mapped_column(Integer)
description: Mapped[str | None] = mapped_column(String(50))
user_name: Mapped[str] = mapped_column(ForeignKey("users.name"))
purchase_id: Mapped[int | None] = mapped_column(ForeignKey("purchases.id"))
# If buying products, is the user penalized for having too low credit?
penalty: Mapped[Boolean] = mapped_column(Boolean, default=False)
user: Mapped[User] = relationship(lazy="joined")
purchase: Mapped[Purchase] = relationship(lazy="joined")
# If adding products, how much is each product worth
per_product: Mapped[int | None] = mapped_column(Integer)
def __init__(self, user, amount=0, description=None, purchase=None, penalty=1):
self.user = user
self.amount = amount
self.description = description
self.purchase = purchase
self.penalty = penalty
# The user who performs the transaction
user_id: Mapped[int] = mapped_column(ForeignKey("user.id"))
user : Mapped[User] = relationship(lazy="joined", foreign_keys=[user_id])
def perform_transaction(self, ignore_penalty=False):
self.time = datetime.datetime.now()
if not ignore_penalty:
self.amount *= self.penalty
self.user.credit -= self.amount
# Receiving user when moving credit from one user to another
transfer_user_id: Mapped[int | None] = mapped_column(ForeignKey("user.id"))
transfer_user: Mapped[User | None] = relationship(lazy="joined", foreign_keys=[transfer_user_id])
# The product that is either being added or bought
product_id: Mapped[int | None] = mapped_column(ForeignKey("product.id"))
product: Mapped[Product | None] = relationship(lazy="joined")
# The amount of products being added or bought
product_count: Mapped[int | None] = mapped_column(Integer)
# TODO: create a constructor for every transaction type (as well as a generic one)

View File

@@ -0,0 +1,12 @@
from sqlalchemy.sql.sqltypes import Enum
class TransactionType(Enum):
"""
Enum for transaction types.
"""
ADJUST = "adjust"
TRANSFER = "transfer"
ADD_PRODUCT = "add_product"
BUY_PRODUCT = "buy_product"

View File

@@ -2,48 +2,124 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
func,
select,
column,
Integer,
String,
)
from sqlalchemy.orm import (
Session,
Mapped,
mapped_column,
relationship,
)
from .Base import Base
from .TransactionType import TransactionType
from .Transaction import Transaction
if TYPE_CHECKING:
from .UserProducts import UserProducts
from .Transaction import Transaction
import dibbler.models.Product as product
class User(Base):
__tablename__ = "users"
name: Mapped[str] = mapped_column(String(10), primary_key=True)
credit: Mapped[str] = mapped_column(Integer)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = mapped_column(String(20), unique=True)
card: Mapped[str | None] = mapped_column(String(20))
rfid: Mapped[str | None] = mapped_column(String(20))
products: Mapped[set[UserProducts]] = relationship(back_populates="user")
transactions: Mapped[set[Transaction]] = relationship(back_populates="user")
def credit(self, sql_session: Session) -> int:
"""
Returns the current credit of the user.
"""
name_re = r"[a-z]+"
card_re = r"(([Nn][Tt][Nn][Uu])?[0-9]+)?"
rfid_re = r"[0-9a-fA-F]*"
result = sql_session.scalars(
select(
# TODO: clearly define and fix the sign of the amount
- column("transfers_to_other_users")
+ column("transfers_to_self")
+ column("add_products")
- column("buy_products")
).select_from(
select(
func.sum(Transaction.amount)
.label("transfers_to_other_users")
.where(
Transaction.user_id == self.id,
Transaction.type == TransactionType.TRANSFER,
Transaction.transfer_user_id != self.id,
)
)
.subquery(),
select(
func.sum(Transaction.amount)
.label("transfers_to_self")
.where(
Transaction.transfer_user_id == self.id,
Transaction.type == TransactionType.TRANSFER,
Transaction.user_id != self.id,
)
)
.subquery(),
select(
func.sum(Transaction.amount)
.label("add_products")
.where(
Transaction.user_id == self.id,
Transaction.type == TransactionType.ADD_PRODUCT,
)
)
.subquery(),
select(
func.sum(Transaction.amount)
.label("buy_products")
.where(
Transaction.user_id == self.id,
Transaction.type == TransactionType.BUY_PRODUCT,
)
).subquery(),
)
).one_or_none()
def __init__(self, name, card, rfid=None, credit=0):
self.name = name
if card == "":
card = None
self.card = card
if rfid == "":
rfid = None
self.rfid = rfid
self.credit = credit
return result
def __str__(self):
return self.name
def products(self, sql_session: Session) -> list[tuple[product.Product, int]]:
"""
Returns the products that the user has put into the system (and has not been purchased yet)
"""
def is_anonymous(self):
return self.card == "11122233"
...
def transactions(self, sql_session: Session) -> list[Transaction]:
"""
Returns the transactions of the user.
"""
return list(sql_session.scalars(
select(Transaction)
.where(Transaction.user_id == self.id)
.order_by(
Transaction.time.desc()
)
).all())
# name_re = r"[a-z]+"
# card_re = r"(([Nn][Tt][Nn][Uu])?[0-9]+)?"
# rfid_re = r"[0-9a-fA-F]*"
# def __init__(self, name, card, rfid=None, credit=0):
# self.name = name
# if card == "":
# card = None
# self.card = card
# if rfid == "":
# rfid = None
# self.rfid = rfid
# self.credit = credit
# def __str__(self):
# return self.name
# def is_anonymous(self):
# return self.card == "11122233"

View File

@@ -1,31 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import (
Integer,
ForeignKey,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from .Base import Base
if TYPE_CHECKING:
from .User import User
from .Product import Product
class UserProducts(Base):
__tablename__ = "user_products"
user_name: Mapped[str] = mapped_column(ForeignKey("users.name"), primary_key=True)
product_id: Mapped[int] = mapped_column(ForeignKey("products.product_id"), primary_key=True)
count: Mapped[int] = mapped_column(Integer)
sign: Mapped[int] = mapped_column(Integer)
user: Mapped[User] = relationship()
product: Mapped[Product] = relationship()

View File

@@ -1,17 +1,11 @@
__all__ = [
"Base",
"Product",
"Purchase",
"PurchaseEntry",
"Transaction",
"User",
"UserProducts",
]
from .Base import Base
from .Product import Product
from .Purchase import Purchase
from .PurchaseEntry import PurchaseEntry
from .Transaction import Transaction
from .User import User
from .UserProducts import UserProducts

View File

@@ -19,30 +19,31 @@ def clear_db(session):
def main():
session = Session()
clear_db(session)
product_items = []
user_items = []
with open(JSON_FILE) as f:
json_obj = json.load(f)
# product_items = []
# user_items = []
for product in json_obj["products"]:
product_item = Product(
bar_code=product["bar_code"],
name=product["name"],
price=product["price"],
stock=product["stock"],
)
product_items.append(product_item)
# with open(JSON_FILE) as f:
# json_obj = json.load(f)
for user in json_obj["users"]:
user_item = User(
name=user["name"],
card=user["card"],
rfid=user["rfid"],
credit=user["credit"],
)
user_items.append(user_item)
# for product in json_obj["products"]:
# product_item = Product(
# bar_code=product["bar_code"],
# name=product["name"],
# price=product["price"],
# stock=product["stock"],
# )
# product_items.append(product_item)
session.add_all(product_items)
session.add_all(user_items)
session.commit()
# for user in json_obj["users"]:
# user_item = User(
# name=user["name"],
# card=user["card"],
# rfid=user["rfid"],
# credit=user["credit"],
# )
# user_items.append(user_item)
# session.add_all(product_items)
# session.add_all(user_items)
# session.commit()

View File

@@ -6,7 +6,7 @@ input_encoding = 'utf8'
[database]
# url = "postgresql://robertem@127.0.0.1/pvvvv"
url = "sqlite:///test.db"
url = sqlite:///test.db
[limits]
low_credit_warning_limit = -100

View File

@@ -13,6 +13,9 @@ python3Packages.buildPythonApplication {
# https://github.com/NixOS/nixpkgs/issues/285234
dontCheckRuntimeDeps = true;
doCheck = false;
pythonImportsCheck = [];
nativeBuildInputs = with python3Packages; [ setuptools ];
propagatedBuildInputs = with python3Packages; [
brother-ql