It brokey
This commit is contained in:
parent
9c93da087d
commit
fe79e3da4b
|
@ -2,8 +2,14 @@ class Bank(val allowedAttempts: Integer = 3) {
|
||||||
|
|
||||||
private val transactionsQueue: TransactionQueue = new TransactionQueue()
|
private val transactionsQueue: TransactionQueue = new TransactionQueue()
|
||||||
private val processedTransactions: TransactionQueue = new TransactionQueue()
|
private val processedTransactions: TransactionQueue = new TransactionQueue()
|
||||||
|
private var processingThreadsStarted = false;
|
||||||
|
private val processingThreads: List[Thread] =
|
||||||
|
(1 to 1).map(_ => new Thread {
|
||||||
|
override def run = processTransactions
|
||||||
|
}).toList
|
||||||
|
|
||||||
def addTransactionToQueue(from: Account, to: Account, amount: Double): Unit = {
|
def addTransactionToQueue(from: Account, to: Account, amount: Double): Unit = {
|
||||||
|
printf("[%s]: Added transaction to queue\n", Thread.currentThread().toString())
|
||||||
transactionsQueue.push(new Transaction(
|
transactionsQueue.push(new Transaction(
|
||||||
transactionsQueue,
|
transactionsQueue,
|
||||||
processedTransactions,
|
processedTransactions,
|
||||||
|
@ -13,30 +19,39 @@ class Bank(val allowedAttempts: Integer = 3) {
|
||||||
10,
|
10,
|
||||||
))
|
))
|
||||||
|
|
||||||
Main.thread(processTransaction(transactionsQueue.pop()))
|
if (!processingThreadsStarted) {
|
||||||
|
processingThreads.foreach(t => {
|
||||||
|
t.start
|
||||||
|
print("Starting processing thread\n")
|
||||||
|
})
|
||||||
|
processingThreadsStarted = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// TODO
|
// TODO
|
||||||
// project task 2
|
// project task 2
|
||||||
// create a new transaction object and put it in the queue
|
// create a new transaction object and put it in the queue
|
||||||
// spawn a thread that calls processTransactions
|
// spawn a thread that calls processTransactions
|
||||||
|
|
||||||
// There are mixed instructions for this method.
|
// This function is a worker that continuously
|
||||||
// It's called `processTransactions`, indicating that it should
|
// pops elements from the queue and processes them.
|
||||||
// process all lists, the part in the assigment pdf indicates this as well.
|
// Multiple of these can be run on separate threads.
|
||||||
// However the comment below is written as if there is only one transaction to
|
private def processTransactions: Unit = {
|
||||||
// be processed, and the fact that `addTransactionToQueue` calls this method every
|
if (transactionsQueue.isEmpty) {
|
||||||
// time something is added, supports that theory as well.
|
Thread.sleep(50)
|
||||||
// We just went with the most logical option...
|
} else {
|
||||||
private def processTransactions(trx: Transaction): Unit = {
|
val trx = transactionsQueue.pop
|
||||||
// thread = Main.thread(trx)
|
|
||||||
// thread.join()
|
Main.thread(trx.run).join()
|
||||||
trx()
|
|
||||||
if (trx.status == TransactionStatus.PENDING && trx.attempt < trx.allowedAttemps) {
|
if (trx.status == TransactionStatus.PENDING) {
|
||||||
processTransactions(trx)
|
transactionsQueue.push(trx);
|
||||||
} else {
|
} else {
|
||||||
processedTransactions.push(trx);
|
processedTransactions.push(trx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
processTransactions
|
||||||
|
}
|
||||||
// TODO
|
// TODO
|
||||||
// project task 2
|
// project task 2
|
||||||
// Function that pops a transaction from the queue
|
// Function that pops a transaction from the queue
|
||||||
|
|
|
@ -16,16 +16,16 @@ class TransactionQueue {
|
||||||
def pop: Transaction = {queue.synchronized(queue.dequeue())}
|
def pop: Transaction = {queue.synchronized(queue.dequeue())}
|
||||||
|
|
||||||
// Return whether the queue is empty
|
// Return whether the queue is empty
|
||||||
def isEmpty: Boolean = {queue.synchronized(queue.isEmpty())}
|
def isEmpty: Boolean = {queue.synchronized(queue.isEmpty)}
|
||||||
|
|
||||||
// Add new element to the back of the queue
|
// Add new element to the back of the queue
|
||||||
def push(t: Transaction): Unit = {queue.synchronized(queue.push(t))}
|
def push(t: Transaction): Unit = {queue.synchronized(queue.enqueue(t))}
|
||||||
|
|
||||||
// Return the first element from the queue without removing it
|
// Return the first element from the queue without removing it
|
||||||
def peek: Transaction = {queue.synchronized(queue.front())}
|
def peek: Transaction = {queue.synchronized(queue.front)}
|
||||||
|
|
||||||
// Return an iterator to allow you to iterate over the queue
|
// Return an iterator to allow you to iterate over the queue
|
||||||
def iterator: Iterator[Transaction] = {queue.synchronized(queue.iterator())}
|
def iterator: Iterator[Transaction] = {queue.synchronized(queue.iterator)}
|
||||||
}
|
}
|
||||||
|
|
||||||
class Transaction(val transactionsQueue: TransactionQueue,
|
class Transaction(val transactionsQueue: TransactionQueue,
|
||||||
|
@ -39,22 +39,31 @@ class Transaction(val transactionsQueue: TransactionQueue,
|
||||||
var attempt = 0
|
var attempt = 0
|
||||||
|
|
||||||
override def run: Unit = {
|
override def run: Unit = {
|
||||||
|
def doTransaction(): Unit = {
|
||||||
def doTransaction() = {
|
|
||||||
// TODO - project task 3
|
// TODO - project task 3
|
||||||
// Extend this method to satisfy requirements.
|
// Extend this method to satisfy requirements.
|
||||||
from withdraw amount
|
if (from.withdraw(amount).isRight) {
|
||||||
to deposit amount
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO - project task 3
|
if (to.deposit(amount).isRight) {
|
||||||
// make the code below thread safe
|
if (from.deposit(amount).isRight) print("oof")
|
||||||
if (status == TransactionStatus.PENDING) {
|
return
|
||||||
doTransaction
|
|
||||||
Thread.sleep(50) // you might want this to make more room for
|
|
||||||
// new transactions to be added to the queue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
status = TransactionStatus.SUCCESS
|
||||||
|
}
|
||||||
|
|
||||||
|
attempt += 1
|
||||||
|
if (status != TransactionStatus.PENDING) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
from.synchronized(to.synchronized(doTransaction))
|
||||||
|
|
||||||
|
Thread.sleep(50)
|
||||||
|
|
||||||
|
if (attempt >= allowedAttemps) {
|
||||||
|
status = TransactionStatus.FAILED
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue