fixup! WIP: caching
Run tests / run-tests (push) Failing after 2m59s
Run benchmarks / run-tests (push) Successful in 34m3s

This commit is contained in:
2026-05-31 23:36:26 +09:00
parent bf3628bcce
commit 4611540069
2 changed files with 207 additions and 45 deletions
+106 -44
View File
@@ -1,18 +1,21 @@
from datetime import datetime
from typing import Tuple
from sqlalchemy import (
BindParameter,
Select,
and_,
bindparam,
case,
func,
or_,
select,
)
from sqlalchemy.orm import Session
from dibbler.models import (
LastCacheTransaction,
Product,
ProductCache,
Transaction,
TransactionType,
)
@@ -30,9 +33,6 @@ def _product_stock_query(
The inner query for calculating the product stock.
"""
if use_cache:
print("WARNING: Using cache for product stock query is not implemented yet.")
if isinstance(product_id, int):
product_id = BindParameter("product_id", value=product_id)
@@ -49,50 +49,112 @@ def _product_stock_query(
else:
until_transaction_id = None
query = select(
func.sum(
case(
(
Transaction.type_ == TransactionType.ADD_PRODUCT.as_literal_column(),
Transaction.product_count,
),
(
Transaction.type_ == TransactionType.ADJUST_STOCK.as_literal_column(),
Transaction.product_count,
),
(
Transaction.type_ == TransactionType.BUY_PRODUCT.as_literal_column(),
-Transaction.product_count,
),
(
Transaction.type_ == TransactionType.JOINT.as_literal_column(),
-Transaction.product_count,
),
(
Transaction.type_ == TransactionType.THROW_PRODUCT.as_literal_column(),
-Transaction.product_count,
),
else_=0,
),
).label("stock"),
).where(
Transaction.type_.in_(
[
TransactionType.ADD_PRODUCT.as_literal_column(),
TransactionType.ADJUST_STOCK.as_literal_column(),
TransactionType.BUY_PRODUCT.as_literal_column(),
TransactionType.JOINT.as_literal_column(),
TransactionType.THROW_PRODUCT.as_literal_column(),
],
stock_delta = case(
(
Transaction.type_ == TransactionType.ADD_PRODUCT.as_literal_column(),
Transaction.product_count,
),
Transaction.product_id == product_id,
until_filter(
until_time=until_time,
until_transaction_id=until_transaction_id,
until_inclusive=until_inclusive,
(
Transaction.type_ == TransactionType.ADJUST_STOCK.as_literal_column(),
Transaction.product_count,
),
(
Transaction.type_ == TransactionType.BUY_PRODUCT.as_literal_column(),
-Transaction.product_count,
),
(
Transaction.type_ == TransactionType.JOINT.as_literal_column(),
-Transaction.product_count,
),
(
Transaction.type_ == TransactionType.THROW_PRODUCT.as_literal_column(),
-Transaction.product_count,
),
else_=0,
)
if use_cache:
latest_cache = (
select(
ProductCache.stock.label("stock"),
Transaction.time.label("transaction_time"),
Transaction.id.label("transaction_id"),
)
.select_from(ProductCache)
.join(
LastCacheTransaction,
ProductCache.last_cache_transaction_id == LastCacheTransaction.id,
)
.join(Transaction, LastCacheTransaction.transaction_id == Transaction.id)
.where(
ProductCache.product_id == product_id,
until_filter(
until_time=until_time,
until_transaction_id=until_transaction_id,
until_inclusive=until_inclusive,
transaction_time=Transaction.time,
),
)
.order_by(Transaction.time.desc(), Transaction.id.desc(), ProductCache.id.desc())
.limit(1)
.subquery("latest_product_cache")
)
latest_cache_stock = select(latest_cache.c.stock).scalar_subquery()
latest_cache_time = select(latest_cache.c.transaction_time).scalar_subquery()
latest_cache_transaction_id = select(latest_cache.c.transaction_id).scalar_subquery()
query = select(
(
func.coalesce(latest_cache_stock, 0)
+ func.coalesce(func.sum(stock_delta), 0)
).label("stock"),
).where(
Transaction.type_.in_(
[
TransactionType.ADD_PRODUCT.as_literal_column(),
TransactionType.ADJUST_STOCK.as_literal_column(),
TransactionType.BUY_PRODUCT.as_literal_column(),
TransactionType.JOINT.as_literal_column(),
TransactionType.THROW_PRODUCT.as_literal_column(),
],
),
Transaction.product_id == product_id,
until_filter(
until_time=until_time,
until_transaction_id=until_transaction_id,
until_inclusive=until_inclusive,
),
or_(
latest_cache_time.is_(None),
Transaction.time > latest_cache_time,
and_(
Transaction.time == latest_cache_time,
Transaction.id > latest_cache_transaction_id,
),
),
)
else:
query = select(
func.coalesce(func.sum(stock_delta), 0).label("stock"),
).where(
Transaction.type_.in_(
[
TransactionType.ADD_PRODUCT.as_literal_column(),
TransactionType.ADJUST_STOCK.as_literal_column(),
TransactionType.BUY_PRODUCT.as_literal_column(),
TransactionType.JOINT.as_literal_column(),
TransactionType.THROW_PRODUCT.as_literal_column(),
],
),
Transaction.product_id == product_id,
until_filter(
until_time=until_time,
until_transaction_id=until_transaction_id,
until_inclusive=until_inclusive,
),
)
return query
+101 -1
View File
@@ -3,7 +3,8 @@ from datetime import datetime, timedelta
import pytest
from sqlalchemy.orm import Session
from dibbler.models import Product, Transaction, User
from dibbler.models import Product, ProductCache, Transaction, User
from dibbler.models.LastCacheTransaction import LastCacheTransaction
from dibbler.queries import joint_buy_product, product_stock
from tests.helpers import assert_id_order_similar_to_time_order, assign_times
@@ -283,3 +284,102 @@ def test_product_stock_until_time(sql_session: Session) -> None:
)
== 1
)
def test_product_stock_uses_cached_checkpoint_as_base(sql_session: Session) -> None:
user, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
amount=50,
per_product=10,
user_id=user.id,
product_id=product.id,
product_count=5,
),
Transaction.buy_product(
user_id=user.id,
product_id=product.id,
product_count=2,
),
Transaction.adjust_stock(
user_id=user.id,
product_id=product.id,
product_count=4,
),
]
assign_times(transactions)
sql_session.add_all(transactions)
sql_session.commit()
checkpoint = LastCacheTransaction(transaction_id=transactions[1].id)
sql_session.add(checkpoint)
sql_session.flush()
sql_session.add(
ProductCache(
product_id=product.id,
stock=100,
price=10,
last_cache_transaction_id=checkpoint.id,
),
)
sql_session.commit()
assert product_stock(sql_session, product, use_cache=False) == 5 - 2 + 4
assert product_stock(sql_session, product, use_cache=True) == 100 + 4
def test_product_stock_ignores_cache_after_until_time(sql_session: Session) -> None:
user, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
amount=50,
per_product=10,
user_id=user.id,
product_id=product.id,
product_count=5,
),
Transaction.buy_product(
user_id=user.id,
product_id=product.id,
product_count=2,
),
Transaction.adjust_stock(
user_id=user.id,
product_id=product.id,
product_count=4,
),
]
assign_times(transactions)
sql_session.add_all(transactions)
sql_session.commit()
checkpoint = LastCacheTransaction(transaction_id=transactions[2].id)
sql_session.add(checkpoint)
sql_session.flush()
sql_session.add(
ProductCache(
product_id=product.id,
stock=100,
price=10,
last_cache_transaction_id=checkpoint.id,
),
)
sql_session.commit()
assert (
product_stock(
sql_session,
product,
use_cache=True,
until_time=transactions[1].time,
)
== 5 - 2
)