Files
dibbler/tests/queries/test_user_balance.py

959 lines
25 KiB
Python

import math
from datetime import datetime
from pprint import pprint
import pytest
from sqlalchemy.orm import Session
from dibbler.models import Product, Transaction, User
from dibbler.models.Transaction import (
DEFAULT_INTEREST_RATE_PERCENT,
DEFAULT_PENALTY_MULTIPLIER_PERCENT,
)
from dibbler.queries import joint_buy_product, user_balance, user_balance_log
from dibbler.queries.user_balance import _joint_transaction_query, _non_joint_transaction_query
# TODO: see if we can use pytest_runtest_makereport to print the "user_balance_log"s
# only on failures instead of inlining it in every test function
def insert_test_data(sql_session: Session) -> tuple[User, User, User, Product]:
user = User("Test User")
user2 = User("Test User 2")
user3 = User("Test User 3")
product = Product("1234567890123", "Test Product")
sql_session.add_all([user, user2, user3, product])
sql_session.commit()
return user, user2, user3, product
def _product_cost(
per_product: int,
product_count: int,
interest_rate_percent: int = DEFAULT_INTEREST_RATE_PERCENT,
apply_penalty: bool = False,
penalty_multiplier_percent: int = DEFAULT_PENALTY_MULTIPLIER_PERCENT,
joint_shares: int = 1,
joint_total_shares: int = 1,
) -> int:
base_cost = per_product * product_count * joint_shares / joint_total_shares
cost_with_interest = math.ceil(base_cost * (interest_rate_percent / 100))
if apply_penalty:
total_cost = math.ceil(cost_with_interest * (penalty_multiplier_percent / 100))
else:
total_cost = cost_with_interest
return total_cost
def test_non_joint_transaction_query(sql_session) -> None:
user1, user2, user3, product = insert_test_data(sql_session)
transactions = [
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user1.id,
amount=100,
),
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user2.id,
amount=50,
),
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 1),
user_id=user2.id,
amount=70,
product_id=product.id,
product_count=3,
per_product=30,
),
Transaction.transfer(
time=datetime(2023, 10, 1, 10, 0, 2),
user_id=user1.id,
transfer_user_id=user2.id,
amount=50,
),
Transaction.transfer(
time=datetime(2023, 10, 1, 10, 0, 3),
user_id=user2.id,
transfer_user_id=user3.id,
amount=30,
),
]
sql_session.add_all(transactions)
sql_session.commit()
t = transactions
result = [
row[0]
for row in sql_session.execute(
_non_joint_transaction_query(
user_id=user1.id,
use_cache=False,
),
).all()
]
assert result == [t[0].id, t[3].id]
result = [
row[0]
for row in sql_session.execute(
_non_joint_transaction_query(
user_id=user2.id,
use_cache=False,
),
).all()
]
assert result == [t[1].id, t[2].id, t[3].id, t[4].id]
result = [
row[0]
for row in sql_session.execute(
_non_joint_transaction_query(
user_id=user3.id,
use_cache=False,
),
).all()
]
assert result == [t[4].id]
def test_joint_transaction_query(sql_session: Session) -> None:
user1, user2, user3, product = insert_test_data(sql_session)
j1 = joint_buy_product(
sql_session,
product=product,
product_count=3,
instigator=user1,
users=[user1, user2],
)
j2 = joint_buy_product(
sql_session,
product=product,
product_count=2,
instigator=user1,
users=[user1, user1, user2],
)
j3 = joint_buy_product(
sql_session,
product=product,
product_count=2,
instigator=user1,
users=[user1, user3, user3],
)
j4 = joint_buy_product(
sql_session,
product=product,
product_count=2,
instigator=user2,
users=[user2, user3, user3],
)
result = list(
sql_session.execute(
_joint_transaction_query(
user_id=user1.id,
use_cache=False,
),
).all(),
)
assert list(result) == [
(j1[0].id, 1, 2),
(j2[0].id, 2, 3),
(j3[0].id, 1, 3),
]
result = list(
sql_session.execute(
_joint_transaction_query(
user_id=user2.id,
use_cache=False,
),
).all(),
)
assert list(result) == [
(j1[0].id, 1, 2),
(j2[0].id, 1, 3),
(j4[0].id, 1, 3),
]
result = list(
sql_session.execute(
_joint_transaction_query(
user_id=user3.id,
use_cache=False,
),
).all(),
)
assert list(result) == [
(j3[0].id, 2, 3),
(j4[0].id, 2, 3),
]
def test_user_balance_no_transactions(sql_session: Session) -> None:
user, *_ = insert_test_data(sql_session)
pprint(user_balance_log(sql_session, user))
balance = user_balance(sql_session, user)
assert balance == 0
def test_user_balance_basic_history(sql_session: Session) -> None:
user, _, _, product = insert_test_data(sql_session)
transactions = [
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
amount=100,
),
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 1),
user_id=user.id,
product_id=product.id,
amount=27,
per_product=27,
product_count=1,
),
]
sql_session.add_all(transactions)
sql_session.commit()
pprint(user_balance_log(sql_session, user))
balance = user_balance(sql_session, user)
assert balance == 100 + 27
def test_user_balance_with_transfers(sql_session: Session) -> None:
user1, user2, _, product = insert_test_data(sql_session)
transactions = [
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user1.id,
amount=100,
),
Transaction.transfer(
time=datetime(2023, 10, 1, 10, 0, 1),
user_id=user1.id,
transfer_user_id=user2.id,
amount=50,
),
Transaction.transfer(
time=datetime(2023, 10, 1, 10, 0, 2),
user_id=user2.id,
transfer_user_id=user1.id,
amount=30,
),
]
sql_session.add_all(transactions)
sql_session.commit()
pprint(user_balance_log(sql_session, user1))
user1_balance = user_balance(sql_session, user1)
assert user1_balance == 100 - 50 + 30
pprint(user_balance_log(sql_session, user2))
user2_balance = user_balance(sql_session, user2)
assert user2_balance == 50 - 30
@pytest.mark.skip(reason="Not yet implemented")
def test_user_balance_complex_history(sql_session: Session) -> None: ...
def test_user_balance_penalty(sql_session: Session) -> None:
user, _, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27,
per_product=27,
product_count=1,
),
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
amount=-200,
),
# Penalized, pays 2x the price (default penalty)
Transaction.buy_product(
time=datetime(2023, 10, 1, 12, 0, 0),
user_id=user.id,
product_id=product.id,
product_count=1,
),
]
sql_session.add_all(transactions)
sql_session.commit()
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == 27 - 200 - _product_cost(27, 1, apply_penalty=True)
def test_user_balance_changing_penalty(sql_session: Session) -> None:
user, _, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27,
per_product=27,
product_count=1,
),
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
amount=-200,
),
# Penalized, pays 2x the price (default penalty)
Transaction.buy_product(
time=datetime(2023, 10, 1, 12, 0, 0),
user_id=user.id,
product_id=product.id,
product_count=1,
),
Transaction.adjust_penalty(
time=datetime(2023, 10, 1, 13, 0, 0),
user_id=user.id,
penalty_multiplier_percent=300,
penalty_threshold=-100,
),
# Penalized, pays 3x the price
Transaction.buy_product(
time=datetime(2023, 10, 1, 14, 0, 0),
user_id=user.id,
product_id=product.id,
product_count=1,
),
]
sql_session.add_all(transactions)
sql_session.commit()
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
27
- 200
- _product_cost(27, 1, apply_penalty=True)
- _product_cost(27, 1, apply_penalty=True, penalty_multiplier_percent=300)
)
def test_user_balance_interest(sql_session: Session) -> None:
user, _, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27,
per_product=27,
product_count=1,
),
Transaction.adjust_interest(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
interest_rate_percent=110,
),
Transaction.buy_product(
time=datetime(2023, 10, 1, 12, 0, 0),
user_id=user.id,
product_id=product.id,
product_count=1,
),
]
sql_session.add_all(transactions)
sql_session.commit()
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == 27 - _product_cost(27, 1, interest_rate_percent=110)
def test_user_balance_changing_interest(sql_session: Session) -> None:
user, _, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27 * 3,
per_product=27,
product_count=3,
),
Transaction.adjust_interest(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
interest_rate_percent=110,
),
# Pays 1.1x the price
Transaction.buy_product(
time=datetime(2023, 10, 1, 12, 0, 0),
user_id=user.id,
product_id=product.id,
product_count=1,
),
Transaction.adjust_interest(
time=datetime(2023, 10, 1, 13, 0, 0),
user_id=user.id,
interest_rate_percent=120,
),
# Pays 1.2x the price
Transaction.buy_product(
time=datetime(2023, 10, 1, 14, 0, 0),
user_id=user.id,
product_id=product.id,
product_count=1,
),
]
sql_session.add_all(transactions)
sql_session.commit()
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
27 * 3
- _product_cost(27, 1, interest_rate_percent=110)
- _product_cost(27, 1, interest_rate_percent=120)
)
def test_user_balance_penalty_interest_combined(sql_session: Session) -> None:
user, _, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27,
per_product=27,
product_count=1,
),
Transaction.adjust_interest(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
interest_rate_percent=110,
),
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 12, 0, 0),
user_id=user.id,
amount=-200,
),
# Penalized, pays 2x the price (default penalty)
# Pays 1.1x the price
Transaction.buy_product(
time=datetime(2023, 10, 1, 13, 0, 0),
user_id=user.id,
product_id=product.id,
product_count=1,
),
]
sql_session.add_all(transactions)
sql_session.commit()
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
27
- 200
- _product_cost(
27,
1,
interest_rate_percent=110,
apply_penalty=True,
)
)
def test_user_balance_joint_transaction_single_user(sql_session: Session) -> None:
user, _, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27 * 3,
per_product=27,
product_count=3,
),
]
sql_session.add_all(transactions)
sql_session.commit()
joint_buy_product(
sql_session,
instigator=user,
users=[user],
product=product,
product_count=2,
)
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
(27 * 3)
- _product_cost(
27,
2,
joint_shares=1,
joint_total_shares=1,
)
)
def test_user_balance_joint_transactions_multiple_users(sql_session: Session) -> None:
user, user2, user3, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27 * 3,
per_product=27,
product_count=3,
),
]
sql_session.add_all(transactions)
sql_session.commit()
joint_buy_product(
sql_session,
instigator=user,
users=[user, user2, user3],
product=product,
product_count=2,
)
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
(27 * 3)
- _product_cost(
27,
2,
joint_shares=1,
joint_total_shares=3,
)
)
def test_user_balance_joint_transactions_multiple_times_self(sql_session: Session) -> None:
user, user2, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27 * 3,
per_product=27,
product_count=3,
),
]
sql_session.add_all(transactions)
sql_session.commit()
joint_buy_product(
sql_session,
instigator=user,
users=[user, user, user2],
product=product,
product_count=2,
)
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
(27 * 3)
- _product_cost(
27,
2,
joint_shares=2,
joint_total_shares=3,
)
)
def test_user_balance_joint_transactions_interest(sql_session: Session) -> None:
user, user2, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27 * 3,
per_product=27,
product_count=3,
),
Transaction.adjust_interest(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
interest_rate_percent=110,
),
]
sql_session.add_all(transactions)
sql_session.commit()
joint_buy_product(
sql_session,
instigator=user,
users=[user, user2],
product=product,
product_count=2,
)
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
(27 * 3)
- _product_cost(
27,
2,
joint_shares=1,
joint_total_shares=2,
interest_rate_percent=110,
)
)
def test_user_balance_joint_transactions_changing_interest(sql_session: Session) -> None:
user, user2, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27 * 4,
per_product=27,
product_count=4,
),
# Pays 1.1x the price
Transaction.adjust_interest(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
interest_rate_percent=110,
),
]
sql_session.add_all(transactions)
sql_session.commit()
joint_buy_product(
sql_session,
instigator=user,
users=[user, user2],
product=product,
product_count=2,
)
transactions = [
# Pays 1.2x the price
Transaction.adjust_interest(
time=datetime(2023, 10, 1, 12, 0, 0),
user_id=user.id,
interest_rate_percent=120,
)
]
sql_session.add_all(transactions)
sql_session.commit()
joint_buy_product(
sql_session,
instigator=user,
users=[user, user2],
product=product,
product_count=1,
)
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
(27 * 4)
- _product_cost(
27,
2,
joint_shares=1,
joint_total_shares=2,
interest_rate_percent=110,
)
- _product_cost(
27,
1,
joint_shares=1,
joint_total_shares=2,
interest_rate_percent=120,
)
)
def test_user_balance_joint_transactions_penalty(sql_session: Session) -> None:
user, user2, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27 * 3,
per_product=27,
product_count=3,
),
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
amount=-200,
),
]
sql_session.add_all(transactions)
sql_session.commit()
joint_buy_product(
sql_session,
instigator=user,
users=[user, user2],
product=product,
product_count=2,
)
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
(27 * 3)
- 200
- _product_cost(
27,
2,
joint_shares=1,
joint_total_shares=2,
apply_penalty=True,
)
)
def test_user_balance_joint_transactions_changing_penalty(sql_session: Session) -> None:
user, user2, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27 * 3,
per_product=27,
product_count=3,
),
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
amount=-200,
),
]
sql_session.add_all(transactions)
sql_session.commit()
joint_buy_product(
sql_session,
instigator=user,
users=[user, user2],
product=product,
product_count=2,
)
transactions = [
Transaction.adjust_penalty(
time=datetime(2023, 10, 1, 12, 0, 0),
user_id=user.id,
penalty_multiplier_percent=300,
penalty_threshold=-100,
)
]
sql_session.add_all(transactions)
sql_session.commit()
joint_buy_product(
sql_session,
instigator=user,
users=[user, user2],
product=product,
product_count=1,
)
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
(27 * 3)
- 200
- _product_cost(
27,
2,
joint_shares=1,
joint_total_shares=2,
apply_penalty=True,
)
- _product_cost(
27,
1,
joint_shares=1,
joint_total_shares=2,
apply_penalty=True,
penalty_multiplier_percent=300,
)
)
def test_user_balance_joint_transactions_penalty_interest_combined(
sql_session: Session,
) -> None:
user, user2, _, product = insert_test_data(sql_session)
transactions = [
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
product_id=product.id,
amount=27 * 3,
per_product=27,
product_count=3,
),
Transaction.adjust_interest(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
interest_rate_percent=110,
),
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 12, 0, 0),
user_id=user.id,
amount=-200,
),
]
sql_session.add_all(transactions)
sql_session.commit()
joint_buy_product(
sql_session,
instigator=user,
users=[user, user2],
product=product,
product_count=2,
)
pprint(user_balance_log(sql_session, user))
assert user_balance(sql_session, user) == (
(27 * 3)
- 200
- _product_cost(
27,
2,
joint_shares=1,
joint_total_shares=2,
interest_rate_percent=110,
apply_penalty=True,
)
)
def test_user_balance_until_time(sql_session: Session) -> None:
user, _, _, product = insert_test_data(sql_session)
transactions = [
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
amount=100,
),
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 1),
user_id=user.id,
product_id=product.id,
amount=27,
per_product=27,
product_count=1,
),
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
amount=50,
),
]
sql_session.add_all(transactions)
sql_session.commit()
pprint(user_balance_log(sql_session, user, until_time=datetime(2023, 10, 1, 10, 30, 0)))
balance = user_balance(sql_session, user, until_time=datetime(2023, 10, 1, 10, 30, 0))
assert balance == 100 + 27
def test_user_balance_until_transaction(sql_session: Session) -> None:
user, _, _, product = insert_test_data(sql_session)
transactions = [
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 10, 0, 0),
user_id=user.id,
amount=100,
),
Transaction.add_product(
time=datetime(2023, 10, 1, 10, 0, 1),
user_id=user.id,
product_id=product.id,
amount=27,
per_product=27,
product_count=1,
),
Transaction.adjust_balance(
time=datetime(2023, 10, 1, 11, 0, 0),
user_id=user.id,
amount=50,
),
]
sql_session.add_all(transactions)
sql_session.commit()
until_transaction = transactions[1]
pprint(user_balance_log(sql_session, user, until_transaction=until_transaction))
balance = user_balance(sql_session, user, until_transaction=until_transaction)
assert balance == 100 + 27
@pytest.mark.skip(reason="Not yet implemented")
def test_user_balance_throw_away_products(sql_session: Session) -> None: ...