From 46115400693100eb3255e4929ac9f592c2806537 Mon Sep 17 00:00:00 2001 From: h7x4 Date: Sun, 31 May 2026 23:36:26 +0900 Subject: [PATCH] fixup! WIP: caching --- dibbler/queries/product_stock.py | 150 ++++++++++++++++++++-------- tests/queries/test_product_stock.py | 102 ++++++++++++++++++- 2 files changed, 207 insertions(+), 45 deletions(-) diff --git a/dibbler/queries/product_stock.py b/dibbler/queries/product_stock.py index 606e7c5..c64373a 100644 --- a/dibbler/queries/product_stock.py +++ b/dibbler/queries/product_stock.py @@ -1,18 +1,21 @@ from datetime import datetime -from typing import Tuple from sqlalchemy import ( BindParameter, Select, + and_, bindparam, case, func, + or_, select, ) from sqlalchemy.orm import Session from dibbler.models import ( + LastCacheTransaction, Product, + ProductCache, Transaction, TransactionType, ) @@ -30,9 +33,6 @@ def _product_stock_query( The inner query for calculating the product stock. """ - if use_cache: - print("WARNING: Using cache for product stock query is not implemented yet.") - if isinstance(product_id, int): product_id = BindParameter("product_id", value=product_id) @@ -49,50 +49,112 @@ def _product_stock_query( else: until_transaction_id = None - query = select( - func.sum( - case( - ( - Transaction.type_ == TransactionType.ADD_PRODUCT.as_literal_column(), - Transaction.product_count, - ), - ( - Transaction.type_ == TransactionType.ADJUST_STOCK.as_literal_column(), - Transaction.product_count, - ), - ( - Transaction.type_ == TransactionType.BUY_PRODUCT.as_literal_column(), - -Transaction.product_count, - ), - ( - Transaction.type_ == TransactionType.JOINT.as_literal_column(), - -Transaction.product_count, - ), - ( - Transaction.type_ == TransactionType.THROW_PRODUCT.as_literal_column(), - -Transaction.product_count, - ), - else_=0, - ), - ).label("stock"), - ).where( - Transaction.type_.in_( - [ - TransactionType.ADD_PRODUCT.as_literal_column(), - TransactionType.ADJUST_STOCK.as_literal_column(), - TransactionType.BUY_PRODUCT.as_literal_column(), - TransactionType.JOINT.as_literal_column(), - TransactionType.THROW_PRODUCT.as_literal_column(), - ], + stock_delta = case( + ( + Transaction.type_ == TransactionType.ADD_PRODUCT.as_literal_column(), + Transaction.product_count, ), - Transaction.product_id == product_id, - until_filter( - until_time=until_time, - until_transaction_id=until_transaction_id, - until_inclusive=until_inclusive, + ( + Transaction.type_ == TransactionType.ADJUST_STOCK.as_literal_column(), + Transaction.product_count, ), + ( + Transaction.type_ == TransactionType.BUY_PRODUCT.as_literal_column(), + -Transaction.product_count, + ), + ( + Transaction.type_ == TransactionType.JOINT.as_literal_column(), + -Transaction.product_count, + ), + ( + Transaction.type_ == TransactionType.THROW_PRODUCT.as_literal_column(), + -Transaction.product_count, + ), + else_=0, ) + if use_cache: + latest_cache = ( + select( + ProductCache.stock.label("stock"), + Transaction.time.label("transaction_time"), + Transaction.id.label("transaction_id"), + ) + .select_from(ProductCache) + .join( + LastCacheTransaction, + ProductCache.last_cache_transaction_id == LastCacheTransaction.id, + ) + .join(Transaction, LastCacheTransaction.transaction_id == Transaction.id) + .where( + ProductCache.product_id == product_id, + until_filter( + until_time=until_time, + until_transaction_id=until_transaction_id, + until_inclusive=until_inclusive, + transaction_time=Transaction.time, + ), + ) + .order_by(Transaction.time.desc(), Transaction.id.desc(), ProductCache.id.desc()) + .limit(1) + .subquery("latest_product_cache") + ) + + latest_cache_stock = select(latest_cache.c.stock).scalar_subquery() + latest_cache_time = select(latest_cache.c.transaction_time).scalar_subquery() + latest_cache_transaction_id = select(latest_cache.c.transaction_id).scalar_subquery() + + query = select( + ( + func.coalesce(latest_cache_stock, 0) + + func.coalesce(func.sum(stock_delta), 0) + ).label("stock"), + ).where( + Transaction.type_.in_( + [ + TransactionType.ADD_PRODUCT.as_literal_column(), + TransactionType.ADJUST_STOCK.as_literal_column(), + TransactionType.BUY_PRODUCT.as_literal_column(), + TransactionType.JOINT.as_literal_column(), + TransactionType.THROW_PRODUCT.as_literal_column(), + ], + ), + Transaction.product_id == product_id, + until_filter( + until_time=until_time, + until_transaction_id=until_transaction_id, + until_inclusive=until_inclusive, + ), + or_( + latest_cache_time.is_(None), + Transaction.time > latest_cache_time, + and_( + Transaction.time == latest_cache_time, + Transaction.id > latest_cache_transaction_id, + ), + ), + ) + else: + query = select( + func.coalesce(func.sum(stock_delta), 0).label("stock"), + ).where( + Transaction.type_.in_( + [ + TransactionType.ADD_PRODUCT.as_literal_column(), + TransactionType.ADJUST_STOCK.as_literal_column(), + TransactionType.BUY_PRODUCT.as_literal_column(), + TransactionType.JOINT.as_literal_column(), + TransactionType.THROW_PRODUCT.as_literal_column(), + ], + ), + Transaction.product_id == product_id, + until_filter( + until_time=until_time, + until_transaction_id=until_transaction_id, + until_inclusive=until_inclusive, + ), + ) + return query diff --git a/tests/queries/test_product_stock.py b/tests/queries/test_product_stock.py index e06b24b..8ae0167 100644 --- a/tests/queries/test_product_stock.py +++ b/tests/queries/test_product_stock.py @@ -3,7 +3,8 @@ from datetime import datetime, timedelta import pytest from sqlalchemy.orm import Session -from dibbler.models import Product, Transaction, User +from dibbler.models import Product, ProductCache, Transaction, User +from dibbler.models.LastCacheTransaction import LastCacheTransaction from dibbler.queries import joint_buy_product, product_stock from tests.helpers import assert_id_order_similar_to_time_order, assign_times @@ -283,3 +284,102 @@ def test_product_stock_until_time(sql_session: Session) -> None: ) == 1 ) + + +def test_product_stock_uses_cached_checkpoint_as_base(sql_session: Session) -> None: + user, product = insert_test_data(sql_session) + + transactions = [ + Transaction.add_product( + amount=50, + per_product=10, + user_id=user.id, + product_id=product.id, + product_count=5, + ), + Transaction.buy_product( + user_id=user.id, + product_id=product.id, + product_count=2, + ), + Transaction.adjust_stock( + user_id=user.id, + product_id=product.id, + product_count=4, + ), + ] + + assign_times(transactions) + + sql_session.add_all(transactions) + sql_session.commit() + + checkpoint = LastCacheTransaction(transaction_id=transactions[1].id) + sql_session.add(checkpoint) + sql_session.flush() + + sql_session.add( + ProductCache( + product_id=product.id, + stock=100, + price=10, + last_cache_transaction_id=checkpoint.id, + ), + ) + sql_session.commit() + + assert product_stock(sql_session, product, use_cache=False) == 5 - 2 + 4 + assert product_stock(sql_session, product, use_cache=True) == 100 + 4 + + +def test_product_stock_ignores_cache_after_until_time(sql_session: Session) -> None: + user, product = insert_test_data(sql_session) + + transactions = [ + Transaction.add_product( + amount=50, + per_product=10, + user_id=user.id, + product_id=product.id, + product_count=5, + ), + Transaction.buy_product( + user_id=user.id, + product_id=product.id, + product_count=2, + ), + Transaction.adjust_stock( + user_id=user.id, + product_id=product.id, + product_count=4, + ), + ] + + assign_times(transactions) + + sql_session.add_all(transactions) + sql_session.commit() + + checkpoint = LastCacheTransaction(transaction_id=transactions[2].id) + sql_session.add(checkpoint) + sql_session.flush() + + sql_session.add( + ProductCache( + product_id=product.id, + stock=100, + price=10, + last_cache_transaction_id=checkpoint.id, + ), + ) + sql_session.commit() + + assert ( + product_stock( + sql_session, + product, + use_cache=True, + until_time=transactions[1].time, + ) + == 5 - 2 + )