fixup! WIP

This commit is contained in:
2025-12-09 20:39:01 +09:00
parent 2e66a9a4b0
commit 60fa6529ee
4 changed files with 316 additions and 21 deletions

View File

@@ -247,6 +247,15 @@ class Transaction(Base):
product_count: Mapped[int | None] = mapped_column(Integer)
"""
The amount of products being added or bought.
This is always relative to the existing stock.
- `ADD_PRODUCT` increases the stock by this amount.
- `BUY_PRODUCT` decreases the stock by this amount.
- `ADJUST_STOCK` increases or decreases the stock by this amount,
depending on whether the amount is positive or negative.
"""
penalty_threshold: Mapped[int | None] = mapped_column(Integer, nullable=True)
@@ -546,3 +555,21 @@ class Transaction(Base):
transfer_user_id=transfer_user_id,
message=message,
)
@classmethod
def throw_product(
cls: type[Self],
user_id: int,
product_id: int,
product_count: int,
time: datetime | None = None,
message: str | None = None,
) -> Self:
return cls(
time=time,
type_=TransactionType.THROW_PRODUCT,
user_id=user_id,
product_id=product_id,
product_count=product_count,
message=message,
)

View File

@@ -6,6 +6,7 @@ __all__ = [
"current_interest",
"current_penalty",
"joint_buy_product",
"product_owners",
"product_price",
"product_price_log",
"product_stock",
@@ -15,7 +16,6 @@ __all__ = [
"transaction_log",
"user_balance",
"user_balance_log",
# "users_owning_product",
]
# from .add_product import add_product
@@ -25,6 +25,7 @@ from .adjust_penalty import adjust_penalty
from .current_interest import current_interest
from .current_penalty import current_penalty
from .joint_buy_product import joint_buy_product
from .product_owners import product_owners
from .product_price import product_price, product_price_log
from .product_stock import product_stock
@@ -33,4 +34,3 @@ from .search_product import search_product
from .search_user import search_user
from .transaction_log import transaction_log
from .user_balance import user_balance, user_balance_log
# from .users_owning_product import users_owning_product

View File

@@ -3,7 +3,9 @@ from datetime import datetime
from sqlalchemy import (
CTE,
and_,
asc,
case,
func,
literal,
select,
)
@@ -39,7 +41,13 @@ def _product_owners_query(
# Subset of transactions that we'll want to iterate over.
trx_subset = (
select(Transaction)
select(
func.row_number().over(order_by=asc(Transaction.time)).label("i"),
Transaction.time,
Transaction.id,
Transaction.type_,
Transaction.user_id,
Transaction.product_count, )
.where(
Transaction.type_.in_(
[
@@ -63,7 +71,7 @@ def _product_owners_query(
literal(None).label("transaction_id"),
literal(None).label("user_id"),
literal(0).label("product_count"),
product_stock.as_scalar().label("products_left_to_account_for"),
product_stock.scalar_subquery().label("products_left_to_account_for"),
)
recursive_cte = initial_element.cte(name=cte_name, recursive=True)
@@ -73,7 +81,7 @@ def _product_owners_query(
trx_subset.c.i,
trx_subset.c.time,
trx_subset.c.id.label("transaction_id"),
# Who added the product
# Who added the product (if any)
case(
# Someone adds the product -> they own it
(
@@ -82,7 +90,7 @@ def _product_owners_query(
),
else_=None,
).label("user_id"),
# How many products did they add
# How many products did they add (if any)
case(
# Someone adds the product -> they added a certain amount of products
(trx_subset.c.type_ == TransactionType.ADD_PRODUCT, trx_subset.c.product_count),
@@ -110,15 +118,19 @@ def _product_owners_query(
TransactionType.THROW_PRODUCT,
]
),
recursive_cte.c.products_left_to_account_for + trx_subset.c.product_count,
recursive_cte.c.products_left_to_account_for - trx_subset.c.product_count,
),
# Someone adjusts the stock ->
# If adjusted upwards -> products owned by nobody, decrease products left to account for
# If adjusted downwards -> products taken away from owners, decrease products left to account for
(
trx_subset.c.type_ == TransactionType.ADJUST_STOCK,
(trx_subset.c.type_ == TransactionType.ADJUST_STOCK) and (trx_subset.c.product_count > 0),
recursive_cte.c.products_left_to_account_for - trx_subset.c.product_count,
),
(
(trx_subset.c.type_ == TransactionType.ADJUST_STOCK) and (trx_subset.c.product_count < 0),
recursive_cte.c.products_left_to_account_for + trx_subset.c.product_count,
),
else_=recursive_cte.c.products_left_to_account_for,
).label("products_left_to_account_for"),
)
@@ -151,13 +163,20 @@ def product_owners(
use_cache=use_cache,
until=until,
)
result = sql_session.scalars(
db_result = sql_session.execute(
select(
recursive_cte.c.user_id,
recursive_cte.c.product_count,
User,
)
.distinct()
.join(User, User.id == recursive_cte.c.user_id)
.order_by(recursive_cte.c.i.desc())
).all()
result: list[User | None] = []
for user_count, user in db_result:
result.extend([user] * user_count)
# redistribute the user counts to a list of users
return list(result)

View File

@@ -1,12 +1,261 @@
from sqlalchemy.orm import Session
def test_product_owners_no_transactions(sql_session: Session) -> None: ...
def test_product_owners_add_products(sql_session: Session) -> None: ...
def test_product_owners_add_and_buy_products(sql_session: Session) -> None: ...
def test_product_owners_add_and_throw_products(sql_session: Session) -> None: ...
def test_product_owners_multiple_users(sql_session: Session) -> None: ...
def test_product_owners_adjust_stock_down(sql_session: Session) -> None: ...
def test_product_owners_adjust_stock_up(sql_session: Session) -> None: ...
def test_product_owners_negative_stock(sql_session: Session) -> None: ...
def test_product_owners_add_products_from_negative_stock(sql_session: Session) -> None: ...
def test_product_owners_interleaved_users(sql_session: Session) -> None: ...
from dibbler.models import Product, User
from dibbler.models.Transaction import Transaction
from dibbler.queries import product_owners
def insert_test_data(sql_session: Session) -> tuple[Product, User]:
user = User("testuser")
product = Product("1234567890123", "Test Product")
sql_session.add(user)
sql_session.add(product)
sql_session.commit()
return product, user
def test_product_owners_no_transactions(sql_session: Session) -> None:
product, _ = insert_test_data(sql_session)
owners = product_owners(sql_session, product)
assert owners == []
def test_product_owners_add_products(sql_session: Session) -> None:
product, user = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
user_id=user.id,
product_id=product.id,
amount=30,
per_product=10,
product_count=3,
)
]
sql_session.add_all(transactions)
sql_session.commit()
owners = product_owners(sql_session, product)
assert owners == [user, user, user]
def test_product_owners_add_and_buy_products(sql_session: Session) -> None:
product, user = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
user_id=user.id,
product_id=product.id,
amount=30,
per_product=10,
product_count=3,
),
Transaction.buy_product(
user_id=user.id,
product_id=product.id,
product_count=1,
),
]
sql_session.add_all(transactions)
sql_session.commit()
owners = product_owners(sql_session, product)
assert owners == [user, user]
def test_product_owners_add_and_throw_products(sql_session: Session) -> None:
product, user = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
user_id=user.id,
product_id=product.id,
amount=40,
per_product=10,
product_count=4,
),
Transaction.throw_product(
user_id=user.id,
product_id=product.id,
product_count=2,
),
]
sql_session.add_all(transactions)
sql_session.commit()
owners = product_owners(sql_session, product)
assert owners == [user, user]
def test_product_owners_multiple_users(sql_session: Session) -> None:
product, user1 = insert_test_data(sql_session)
user2 = User("testuser2")
sql_session.add(user2)
sql_session.commit()
transactions = [
Transaction.add_product(
user_id=user1.id,
product_id=product.id,
amount=20,
per_product=10,
product_count=2,
),
Transaction.add_product(
user_id=user2.id,
product_id=product.id,
amount=30,
per_product=10,
product_count=3,
),
]
sql_session.add_all(transactions)
sql_session.commit()
owners = product_owners(sql_session, product)
assert owners == [user2, user2, user2, user1, user1]
def test_product_owners_adjust_stock_down(sql_session: Session) -> None:
product, user = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
user_id=user.id,
product_id=product.id,
amount=50,
per_product=10,
product_count=5,
),
Transaction.adjust_stock(
user_id=user.id,
product_id=product.id,
product_count=-2,
),
]
sql_session.add_all(transactions)
sql_session.commit()
owners = product_owners(sql_session, product)
assert owners == [user, user, user]
def test_product_owners_adjust_stock_up(sql_session: Session) -> None:
product, user = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
user_id=user.id,
product_id=product.id,
amount=20,
per_product=10,
product_count=2,
),
Transaction.adjust_stock(
user_id=user.id,
product_id=product.id,
product_count=3,
),
]
sql_session.add_all(transactions)
sql_session.commit()
owners = product_owners(sql_session, product)
assert owners == [user, user, None, None, None]
def test_product_owners_negative_stock(sql_session: Session) -> None:
product, user = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
user_id=user.id,
product_id=product.id,
amount=10,
per_product=10,
product_count=1,
),
Transaction.buy_product(
user_id=user.id,
product_id=product.id,
product_count=2,
),
]
sql_session.add_all(transactions)
sql_session.commit()
owners = product_owners(sql_session, product)
assert owners == []
def test_product_owners_add_products_from_negative_stock(sql_session: Session) -> None:
product, user = insert_test_data(sql_session)
transactions = [
Transaction.buy_product(
user_id=user.id,
product_id=product.id,
product_count=2,
),
Transaction.add_product(
user_id=user.id,
product_id=product.id,
amount=30,
per_product=10,
product_count=3,
),
]
sql_session.add_all(transactions)
sql_session.commit()
owners = product_owners(sql_session, product)
assert owners == [user]
def test_product_owners_interleaved_users(sql_session: Session) -> None:
product, user1 = insert_test_data(sql_session)
user2 = User("testuser2")
sql_session.add(user2)
sql_session.commit()
transactions = [
Transaction.add_product(
user_id=user1.id,
product_id=product.id,
amount=20,
per_product=10,
product_count=2,
),
Transaction.add_product(
user_id=user2.id,
product_id=product.id,
amount=30,
per_product=10,
product_count=3,
),
Transaction.buy_product(
user_id=user1.id,
product_id=product.id,
product_count=1,
),
Transaction.add_product(
user_id=user1.id,
product_id=product.id,
amount=10,
per_product=10,
product_count=1,
),
]
sql_session.add_all(transactions)
sql_session.commit()
owners = product_owners(sql_session, product)
assert owners == [user1, user2, user2, user1, user1]