fixup! WIP
All checks were successful
Run tests / run-tests (push) Successful in 1m25s

This commit is contained in:
2025-12-09 14:43:45 +09:00
parent c6ecb6fae9
commit e7453d0fdd
6 changed files with 196 additions and 7 deletions

0
dibbler/lib/__init__.py Normal file
View File

View File

@@ -0,0 +1,129 @@
from dibbler.models import Transaction, TransactionType
from dibbler.models.Transaction import EXPECTED_FIELDS
def render_transaction_log(transaction_log: list[Transaction]) -> str:
"""
Renders a transaction log as a pretty, human-readable string.
"""
aggregated_log = _aggregate_joint_transactions(transaction_log)
lines = []
for i, transaction in enumerate(aggregated_log):
if isinstance(transaction, list):
inner_lines = []
is_last = i == len(aggregated_log) - 1
lines.append(_render_transaction(transaction[0], is_last))
for j, sub_transaction in enumerate(transaction[1:]):
is_last_inner = j == len(transaction) - 2
line = _render_transaction(sub_transaction, is_last_inner)
inner_lines.append(line)
indented_inner_lines = _indent_lines(inner_lines, is_last=is_last)
lines.extend(indented_inner_lines)
else:
is_last = i == len(aggregated_log) - 1
line = _render_transaction(transaction, is_last)
lines.append(line)
return "\n".join(lines)
def _aggregate_joint_transactions(
transactions: list[Transaction],
) -> list[Transaction | list[Transaction]]:
aggregated: list[Transaction | list[Transaction]] = []
i = 0
while i < len(transactions):
current = transactions[i]
# The aggregation is running backwards, so it will hit JOINT transactions first
if current.type_ == TransactionType.JOINT:
joint_transactions = [current]
j = i
while j < len(transactions):
j += 1
next_transaction = transactions[j]
if next_transaction.type_ == TransactionType.JOINT_BUY_PRODUCT:
joint_transactions.append(next_transaction)
else:
break
aggregated.append(joint_transactions)
i = j # Skip processed transactions
elif current.type_ == TransactionType.JOINT:
# Empty joint transaction?
i += 1
continue
else:
aggregated.append(current)
i += 1
return aggregated
def _indent_lines(lines: list[str], is_last: bool = False) -> list[str]:
indented_lines = []
for line in lines:
if is_last:
indented_lines.append(" " + line)
else:
indented_lines.append("" + line)
return indented_lines
def _render_transaction(transaction: Transaction, is_last: bool) -> str:
match transaction.type_:
case TransactionType.ADD_PRODUCT:
line = f"ADD_PRODUCT({transaction.id}, {transaction.user.name}"
for field in EXPECTED_FIELDS[TransactionType.ADD_PRODUCT]:
value = getattr(transaction, field)
line += f", {field}={value}"
line += ")"
case TransactionType.BUY_PRODUCT:
line = f"BUY_PRODUCT({transaction.id}, {transaction.user.name}"
for field in EXPECTED_FIELDS[TransactionType.BUY_PRODUCT]:
value = getattr(transaction, field)
line += f", {field}={value}"
line += ")"
case TransactionType.ADJUST_STOCK:
line = f"ADJUST_STOCK({transaction.id}, {transaction.user.name}"
for field in EXPECTED_FIELDS[TransactionType.ADJUST_STOCK]:
value = getattr(transaction, field)
line += f", {field}={value}"
line += ")"
case TransactionType.ADJUST_PENALTY:
line = f"ADJUST_PENALTY({transaction.id}, {transaction.user.name}"
for field in EXPECTED_FIELDS[TransactionType.ADJUST_PENALTY]:
value = getattr(transaction, field)
line += f", {field}={value}"
line += ")"
case TransactionType.ADJUST_INTEREST:
line = f"ADJUST_INTEREST({transaction.id}, {transaction.user.name}"
for field in EXPECTED_FIELDS[TransactionType.ADJUST_INTEREST]:
value = getattr(transaction, field)
line += f", {field}={value}"
line += ")"
case TransactionType.ADJUST_BALANCE:
line = f"ADJUST_BALANCE({transaction.id}, {transaction.user.name}"
for field in EXPECTED_FIELDS[TransactionType.ADJUST_BALANCE]:
value = getattr(transaction, field)
line += f", {field}={value}"
line += ")"
case TransactionType.JOINT:
line = f"JOINT({transaction.id}, {transaction.user.name}"
for field in EXPECTED_FIELDS[TransactionType.JOINT]:
value = getattr(transaction, field)
line += f", {field}={value}"
line += ")"
case TransactionType.JOINT_BUY_PRODUCT:
line = f"JOINT_BUY_PRODUCT({transaction.id}, {transaction.user.name}"
for field in EXPECTED_FIELDS[TransactionType.JOINT_BUY_PRODUCT]:
value = getattr(transaction, field)
line += f", {field}={value}"
line += ")"
case _:
line = (
f"UNKNOWN[{transaction.type_}](id={transaction.id}, user_id={transaction.user_id})"
)
return "└─ " + line if is_last else "├─ " + line

View File

@@ -21,6 +21,7 @@ subparsers.add_parser("loop", help="Run the dibbler loop")
subparsers.add_parser("create-db", help="Create the database")
subparsers.add_parser("slabbedasker", help="Find out who is slabbedasker")
subparsers.add_parser("seed-data", help="Fill with mock data")
subparsers.add_parser("transaction-log", help="Print transaction log")
def main():
@@ -47,6 +48,11 @@ def main():
seed_test_data.main()
elif args.subcommand == "transaction-log":
import dibbler.subcommands.transaction_log as transaction_log
transaction_log.main()
if __name__ == "__main__":
main()

View File

@@ -56,7 +56,7 @@ _DYNAMIC_FIELDS: set[str] = {
"transfer_user_id",
}
_EXPECTED_FIELDS: dict[TransactionType, set[str]] = {
EXPECTED_FIELDS: dict[TransactionType, set[str]] = {
TransactionType.ADD_PRODUCT: {"amount", "per_product", "product_count", "product_id"},
TransactionType.ADJUST_BALANCE: {"amount"},
TransactionType.ADJUST_INTEREST: {"interest_rate_percent"},
@@ -68,7 +68,7 @@ _EXPECTED_FIELDS: dict[TransactionType, set[str]] = {
TransactionType.TRANSFER: {"amount", "transfer_user_id"},
}
assert all(x <= _DYNAMIC_FIELDS for x in _EXPECTED_FIELDS.values()), (
assert all(x <= _DYNAMIC_FIELDS for x in EXPECTED_FIELDS.values()), (
"All expected fields must be part of _DYNAMIC_FIELDS."
)
@@ -95,7 +95,7 @@ class Transaction(Base):
__table_args__ = (
*[
_transaction_type_field_constraints(transaction_type, expected_fields)
for transaction_type, expected_fields in _EXPECTED_FIELDS.items()
for transaction_type, expected_fields in EXPECTED_FIELDS.items()
],
CheckConstraint(
or_(
@@ -119,7 +119,7 @@ class Transaction(Base):
Not used for anything else than identifying the transaction in the database.
"""
time: Mapped[datetime] = mapped_column(DateTime, unique=True)
time: Mapped[datetime] = mapped_column(DateTime)
"""
The time when the transaction took place.
@@ -296,11 +296,11 @@ class Transaction(Base):
if self.amount == 0:
raise ValueError("Amount must not be zero.")
for field in _EXPECTED_FIELDS[self.type_]:
for field in EXPECTED_FIELDS[self.type_]:
if getattr(self, field) is None:
raise ValueError(f"{field} must not be None for {self.type_.value} transactions.")
for field in _DYNAMIC_FIELDS - _EXPECTED_FIELDS[self.type_]:
for field in _DYNAMIC_FIELDS - EXPECTED_FIELDS[self.type_]:
if getattr(self, field) is not None:
raise ValueError(f"{field} must be None for {self.type_.value} transactions.")
@@ -344,7 +344,7 @@ class Transaction(Base):
isinstance(v, InstrumentedList),
isinstance(v, InstrumentedSet),
isinstance(v, InstrumentedDict),
*[k in (_DYNAMIC_FIELDS - _EXPECTED_FIELDS[self.type_])],
*[k in (_DYNAMIC_FIELDS - EXPECTED_FIELDS[self.type_])],
]
)
)

View File

@@ -3,6 +3,7 @@ from pathlib import Path
from dibbler.db import Session
from dibbler.models import Product, Transaction, User
from dibbler.queries import joint_buy_product
JSON_FILE = Path(__file__).parent.parent.parent / "mock_data.json"
@@ -28,9 +29,11 @@ def main():
# Add users
user1 = User("Test User 1")
user2 = User("Test User 2")
user3 = User("Test User 3")
sql_session.add(user1)
sql_session.add(user2)
sql_session.add(user3)
sql_session.commit()
# Add products
@@ -73,5 +76,41 @@ def main():
),
]
sql_session.add_all(transactions)
sql_session.flush()
joint_buy_product(
sql_session,
time=datetime(2023, 10, 1, 12, 0, 2),
instigator=user1,
product_count=1,
users=[user1, user2, user3],
product=product2,
)
joint_buy_product(
sql_session,
time=datetime(2023, 10, 1, 13, 0, 2),
instigator=user3,
product_count=2,
users=[user2, user3],
product=product2,
)
transactions = [
Transaction.buy_product(
time=datetime(2023, 10, 2, 14, 0, 0),
product_count=1,
user_id=user1.id,
product_id=product1.id,
),
Transaction.buy_product(
time=datetime(2023, 10, 2, 14, 0, 1),
product_count=1,
user_id=user2.id,
product_id=product2.id,
),
]
sql_session.add_all(transactions)
sql_session.commit()

View File

@@ -0,0 +1,15 @@
from dibbler.db import Session
from dibbler.queries import transaction_log
from dibbler.lib.render_transaction_log import render_transaction_log
def main() -> None:
sql_session = Session()
result = transaction_log(sql_session)
rendered = render_transaction_log(result)
print(rendered)
if __name__ == "__main__":
main()