Optimize queries a bit
This commit is contained in:
@@ -139,6 +139,12 @@ class Transaction(Base):
|
||||
|
||||
# Speed up user transaction list / credit calculation
|
||||
Index("ix__transaction__user_id_time", "user_id", "time"),
|
||||
|
||||
# Speed up product price calculation
|
||||
Index("ix__transaction__product_id_time_type", "product_id", "time", "type"),
|
||||
|
||||
# Speed up transaction logs
|
||||
Index("ix__transaction__time_id", "time", "id"),
|
||||
)
|
||||
|
||||
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
||||
|
||||
@@ -42,27 +42,28 @@ def joint_buy_product(
|
||||
|
||||
# TODO: verify time is not behind last transaction's time
|
||||
|
||||
joint_transaction = Transaction.joint(
|
||||
user_id=instigator.id,
|
||||
product_id=product.id,
|
||||
product_count=product_count,
|
||||
time=time,
|
||||
message=message,
|
||||
)
|
||||
sql_session.add(joint_transaction)
|
||||
sql_session.flush() # Ensure joint_transaction gets an ID
|
||||
|
||||
transactions = [joint_transaction]
|
||||
|
||||
for user in users:
|
||||
buy_transaction = Transaction.joint_buy_product(
|
||||
user_id=user.id,
|
||||
joint_transaction_id=joint_transaction.id,
|
||||
with sql_session.begin(nested=True):
|
||||
joint_transaction = Transaction.joint(
|
||||
user_id=instigator.id,
|
||||
product_id=product.id,
|
||||
product_count=product_count,
|
||||
time=time,
|
||||
message=message,
|
||||
)
|
||||
sql_session.add(buy_transaction)
|
||||
transactions.append(buy_transaction)
|
||||
sql_session.add(joint_transaction)
|
||||
sql_session.flush() # Ensure joint_transaction gets an ID
|
||||
|
||||
sql_session.commit()
|
||||
transactions = [joint_transaction]
|
||||
|
||||
for user in users:
|
||||
buy_transaction = Transaction.joint_buy_product(
|
||||
user_id=user.id,
|
||||
joint_transaction_id=joint_transaction.id,
|
||||
time=time,
|
||||
message=message,
|
||||
)
|
||||
sql_session.add(buy_transaction)
|
||||
transactions.append(buy_transaction)
|
||||
|
||||
sql_session.commit()
|
||||
return transactions
|
||||
|
||||
@@ -93,7 +93,6 @@ def _product_owners_query(
|
||||
until_inclusive=until_inclusive,
|
||||
),
|
||||
)
|
||||
.order_by(Transaction.time.desc())
|
||||
.subquery(trx_subset_name)
|
||||
)
|
||||
|
||||
@@ -270,8 +269,6 @@ def product_owners(
|
||||
.order_by(recursive_cte.c.time.desc())
|
||||
).all()
|
||||
|
||||
print(db_result)
|
||||
|
||||
result: list[User | None] = []
|
||||
none_count = 0
|
||||
|
||||
|
||||
@@ -3,16 +3,19 @@ from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import (
|
||||
CTE,
|
||||
BindParameter,
|
||||
ColumnElement,
|
||||
Integer,
|
||||
and_,
|
||||
bindparam,
|
||||
case,
|
||||
cast,
|
||||
func,
|
||||
select,
|
||||
tuple_,
|
||||
)
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.orm import Session, aliased
|
||||
|
||||
from dibbler.models import (
|
||||
Product,
|
||||
@@ -28,6 +31,20 @@ from dibbler.queries.query_helpers import (
|
||||
)
|
||||
|
||||
|
||||
def cte_union(self, other, inner_fn=lambda x: x):
|
||||
inner = self.element.union_all(other)
|
||||
inner = inner_fn(inner)
|
||||
return CTE._construct(
|
||||
inner,
|
||||
name=self.name,
|
||||
recursive=self.recursive,
|
||||
nesting=self.nesting,
|
||||
_restates=self,
|
||||
_prefixes=self._prefixes,
|
||||
_suffixes=self._suffixes,
|
||||
)
|
||||
|
||||
|
||||
def _product_price_query(
|
||||
product_id: int | ColumnElement[int],
|
||||
use_cache: bool = True,
|
||||
@@ -62,26 +79,21 @@ def _product_price_query(
|
||||
|
||||
initial_element = select(
|
||||
CONST_ZERO.label("i"),
|
||||
CONST_ZERO.label("time"),
|
||||
CONST_NONE.label("transaction_id"),
|
||||
CONST_ZERO.label("time"),
|
||||
CONST_ZERO.label("price"),
|
||||
CONST_ZERO.label("product_count"),
|
||||
)
|
||||
|
||||
recursive_cte = initial_element.cte(name=cte_name, recursive=True)
|
||||
|
||||
# Subset of transactions that we'll want to iterate over.
|
||||
trx_subset = (
|
||||
select(
|
||||
func.row_number().over(order_by=Transaction.time.asc()).label("i"),
|
||||
Transaction.id,
|
||||
Transaction.time,
|
||||
Transaction.type_,
|
||||
Transaction.product_count,
|
||||
Transaction.per_product,
|
||||
)
|
||||
t1 = aliased(Transaction)
|
||||
|
||||
next_time = (
|
||||
select(func.min(t1.time))
|
||||
.where(
|
||||
Transaction.type_.in_(
|
||||
t1.product_id == product_id,
|
||||
t1.type_.in_(
|
||||
[
|
||||
TransactionType.BUY_PRODUCT.as_literal_column(),
|
||||
TransactionType.ADD_PRODUCT.as_literal_column(),
|
||||
@@ -89,84 +101,108 @@ def _product_price_query(
|
||||
TransactionType.JOINT.as_literal_column(),
|
||||
]
|
||||
),
|
||||
Transaction.product_id == product_id,
|
||||
tuple_(t1.time, t1.id) > tuple_(recursive_cte.c.time, recursive_cte.c.transaction_id),
|
||||
until_filter(
|
||||
until_time=until_time,
|
||||
until_transaction_id=until_transaction_id,
|
||||
until_inclusive=until_inclusive,
|
||||
transaction_time=t1.time,
|
||||
),
|
||||
)
|
||||
.order_by(Transaction.time.asc())
|
||||
.subquery(trx_subset_name)
|
||||
.correlate(recursive_cte)
|
||||
.scalar_subquery()
|
||||
)
|
||||
|
||||
next_id = (
|
||||
select(func.min(t1.id))
|
||||
.where(
|
||||
t1.product_id == product_id,
|
||||
t1.type_.in_(
|
||||
[
|
||||
TransactionType.BUY_PRODUCT.as_literal_column(),
|
||||
TransactionType.ADD_PRODUCT.as_literal_column(),
|
||||
TransactionType.ADJUST_STOCK.as_literal_column(),
|
||||
TransactionType.JOINT.as_literal_column(),
|
||||
]
|
||||
),
|
||||
tuple_(t1.time, t1.id) > tuple_(recursive_cte.c.time, recursive_cte.c.transaction_id),
|
||||
until_filter(
|
||||
until_time=until_time,
|
||||
until_transaction_id=until_transaction_id,
|
||||
until_inclusive=until_inclusive,
|
||||
transaction_time=t1.time,
|
||||
),
|
||||
t1.time == next_time,
|
||||
)
|
||||
.correlate(recursive_cte)
|
||||
.scalar_subquery()
|
||||
)
|
||||
|
||||
t_next = aliased(Transaction)
|
||||
|
||||
recursive_elements = (
|
||||
select(
|
||||
trx_subset.c.i,
|
||||
trx_subset.c.time,
|
||||
trx_subset.c.id.label("transaction_id"),
|
||||
(recursive_cte.c.i + CONST_ONE).label("i"),
|
||||
t_next.id.label("transaction_id"),
|
||||
t_next.time,
|
||||
case(
|
||||
# Someone buys the product -> price remains the same.
|
||||
(
|
||||
trx_subset.c.type_ == TransactionType.BUY_PRODUCT.as_literal_column(),
|
||||
recursive_cte.c.price,
|
||||
),
|
||||
# Someone adds the product -> price is recalculated based on
|
||||
# product count, previous price, and new price.
|
||||
(
|
||||
trx_subset.c.type_ == TransactionType.ADD_PRODUCT.as_literal_column(),
|
||||
t_next.type_ == TransactionType.ADD_PRODUCT.as_literal_column(),
|
||||
cast(
|
||||
func.ceil(
|
||||
(
|
||||
recursive_cte.c.price
|
||||
* func.max(recursive_cte.c.product_count, CONST_ZERO)
|
||||
+ trx_subset.c.per_product * trx_subset.c.product_count
|
||||
* cast(recursive_cte.c.product_count > CONST_ZERO, Integer)
|
||||
* recursive_cte.c.product_count
|
||||
+ t_next.per_product * t_next.product_count
|
||||
)
|
||||
/ (
|
||||
# The running product count can be negative if the accounting is bad.
|
||||
# This ensures that we never end up with negative prices or zero divisions
|
||||
# and other disastrous phenomena.
|
||||
func.max(recursive_cte.c.product_count, CONST_ZERO)
|
||||
+ trx_subset.c.product_count
|
||||
cast(recursive_cte.c.product_count > CONST_ZERO, Integer)
|
||||
* recursive_cte.c.product_count
|
||||
+ t_next.product_count
|
||||
)
|
||||
),
|
||||
Integer,
|
||||
),
|
||||
),
|
||||
# Someone adjusts the stock -> price remains the same.
|
||||
(
|
||||
trx_subset.c.type_ == TransactionType.ADJUST_STOCK.as_literal_column(),
|
||||
recursive_cte.c.price,
|
||||
),
|
||||
# Should never happen
|
||||
# BUY_PRODUCT
|
||||
# JOINT
|
||||
# ADJUST_STOCK
|
||||
else_=recursive_cte.c.price,
|
||||
).label("price"),
|
||||
case(
|
||||
# Someone buys the product -> product count is reduced.
|
||||
(
|
||||
trx_subset.c.type_ == TransactionType.BUY_PRODUCT.as_literal_column(),
|
||||
recursive_cte.c.product_count - trx_subset.c.product_count,
|
||||
),
|
||||
(
|
||||
trx_subset.c.type_ == TransactionType.JOINT.as_literal_column(),
|
||||
recursive_cte.c.product_count - trx_subset.c.product_count,
|
||||
),
|
||||
# Someone adds the product -> product count is increased.
|
||||
(
|
||||
trx_subset.c.type_ == TransactionType.ADD_PRODUCT.as_literal_column(),
|
||||
recursive_cte.c.product_count + trx_subset.c.product_count,
|
||||
),
|
||||
# Someone adjusts the stock -> product count is adjusted.
|
||||
(
|
||||
trx_subset.c.type_ == TransactionType.ADJUST_STOCK.as_literal_column(),
|
||||
recursive_cte.c.product_count + trx_subset.c.product_count,
|
||||
),
|
||||
# Should never happen
|
||||
else_=recursive_cte.c.product_count,
|
||||
(
|
||||
recursive_cte.c.product_count
|
||||
- (
|
||||
# Someone buys the product -> product count is reduced.
|
||||
cast(t_next.type_ == TransactionType.BUY_PRODUCT.as_literal_column(), Integer)
|
||||
* t_next.product_count
|
||||
)
|
||||
- (
|
||||
cast(t_next.type_ == TransactionType.JOINT.as_literal_column(), Integer)
|
||||
* t_next.product_count
|
||||
)
|
||||
+ (
|
||||
# Someone adds the product -> product count is increased.
|
||||
cast(t_next.type_ == TransactionType.ADD_PRODUCT.as_literal_column(), Integer)
|
||||
* t_next.product_count
|
||||
)
|
||||
+ (
|
||||
# Someone adjusts the stock -> product count is adjusted.
|
||||
cast(t_next.type_ == TransactionType.ADJUST_STOCK.as_literal_column(), Integer)
|
||||
* t_next.product_count
|
||||
)
|
||||
).label("product_count"),
|
||||
)
|
||||
.select_from(trx_subset)
|
||||
.where(trx_subset.c.i == recursive_cte.c.i + CONST_ONE)
|
||||
.select_from(recursive_cte)
|
||||
.join(
|
||||
t_next,
|
||||
onclause=and_(t_next.time == next_time, t_next.id == next_id),
|
||||
)
|
||||
)
|
||||
|
||||
return recursive_cte.union_all(recursive_elements)
|
||||
@@ -217,7 +253,7 @@ def product_price_log(
|
||||
Transaction,
|
||||
onclause=Transaction.id == recursive_cte.c.transaction_id,
|
||||
)
|
||||
.order_by(recursive_cte.c.i.asc())
|
||||
.order_by(recursive_cte.c.time.asc())
|
||||
).all()
|
||||
|
||||
if result is None:
|
||||
@@ -274,9 +310,15 @@ def product_price(
|
||||
# - product_count should never be negative (but this happens sometimes, so just a warning)
|
||||
# - price should never be negative
|
||||
|
||||
# TODO: remove the order_by
|
||||
# the order_by causes sqlite to build a temp b-tree, even though we
|
||||
# already have calculated the rows in the correct order, and only need
|
||||
# to take the last one. I think it *should* be possible to do this, but
|
||||
# I couldn't figure out a good way to do it.
|
||||
result = sql_session.scalars(
|
||||
select(recursive_cte.c.price)
|
||||
.order_by(recursive_cte.c.i.desc())
|
||||
.select_from(recursive_cte)
|
||||
.order_by(recursive_cte.c.time.desc())
|
||||
.limit(CONST_ONE)
|
||||
.offset(CONST_ZERO)
|
||||
).one_or_none()
|
||||
|
||||
@@ -238,7 +238,7 @@ def _product_cost_expression(
|
||||
trx_subset_name=trx_subset_name,
|
||||
)
|
||||
)
|
||||
.order_by(column("i").desc())
|
||||
.order_by(column("time").desc())
|
||||
.limit(CONST_ONE)
|
||||
.scalar_subquery()
|
||||
)
|
||||
@@ -319,7 +319,6 @@ def _user_balance_query(
|
||||
Transaction,
|
||||
onclause=Transaction.id == trx_subset_subset.c.id,
|
||||
)
|
||||
.order_by(Transaction.time.asc())
|
||||
.subquery(trx_subset_name)
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user