input/buffering: use notify_all() instead of notify_one()
More preparations to support multiple readers.
This commit is contained in:
parent
c2dd6808e1
commit
19e4672a54
@ -39,7 +39,7 @@ BufferingInputStream::~BufferingInputStream() noexcept
|
|||||||
{
|
{
|
||||||
const std::lock_guard<Mutex> lock(mutex);
|
const std::lock_guard<Mutex> lock(mutex);
|
||||||
stop = true;
|
stop = true;
|
||||||
wake_cond.notify_one();
|
wake_cond.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
thread.Join();
|
thread.Join();
|
||||||
@ -73,7 +73,7 @@ BufferingInputStream::Seek(std::unique_lock<Mutex> &lock, size_t new_offset)
|
|||||||
|
|
||||||
seek_offset = new_offset;
|
seek_offset = new_offset;
|
||||||
seek = true;
|
seek = true;
|
||||||
wake_cond.notify_one();
|
wake_cond.notify_all();
|
||||||
|
|
||||||
client_cond.wait(lock, [this]{ return !seek; });
|
client_cond.wait(lock, [this]{ return !seek; });
|
||||||
|
|
||||||
@ -105,7 +105,7 @@ BufferingInputStream::Read(std::unique_lock<Mutex> &lock, void *ptr, size_t s)
|
|||||||
|
|
||||||
if (!IsAvailable()) {
|
if (!IsAvailable()) {
|
||||||
/* wake up the sleeping thread */
|
/* wake up the sleeping thread */
|
||||||
wake_cond.notify_one();
|
wake_cond.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
return nbytes;
|
return nbytes;
|
||||||
@ -150,7 +150,7 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
}
|
}
|
||||||
|
|
||||||
seek = false;
|
seek = false;
|
||||||
client_cond.notify_one();
|
client_cond.notify_all();
|
||||||
} else if (offset != input->GetOffset() && !IsAvailable()) {
|
} else if (offset != input->GetOffset() && !IsAvailable()) {
|
||||||
/* a past Seek() call was a no-op because data
|
/* a past Seek() call was a no-op because data
|
||||||
was already available at that position, but
|
was already available at that position, but
|
||||||
@ -172,7 +172,7 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
own InputStream interface) is in
|
own InputStream interface) is in
|
||||||
"read" mode */
|
"read" mode */
|
||||||
read_error = std::current_exception();
|
read_error = std::current_exception();
|
||||||
client_cond.notify_one();
|
client_cond.notify_all();
|
||||||
OnBufferAvailable();
|
OnBufferAvailable();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -191,7 +191,7 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
input->Seek(lock, new_offset);
|
input->Seek(lock, new_offset);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
read_error = std::current_exception();
|
read_error = std::current_exception();
|
||||||
client_cond.notify_one();
|
client_cond.notify_all();
|
||||||
OnBufferAvailable();
|
OnBufferAvailable();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -215,7 +215,7 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
input->Seek(lock, new_offset);
|
input->Seek(lock, new_offset);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
read_error = std::current_exception();
|
read_error = std::current_exception();
|
||||||
client_cond.notify_one();
|
client_cond.notify_all();
|
||||||
OnBufferAvailable();
|
OnBufferAvailable();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -230,7 +230,7 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
input->Seek(lock, offset);
|
input->Seek(lock, offset);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
read_error = std::current_exception();
|
read_error = std::current_exception();
|
||||||
client_cond.notify_one();
|
client_cond.notify_all();
|
||||||
OnBufferAvailable();
|
OnBufferAvailable();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -246,12 +246,12 @@ BufferingInputStream::RunThread() noexcept
|
|||||||
read_offset + nbytes);
|
read_offset + nbytes);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
read_error = std::current_exception();
|
read_error = std::current_exception();
|
||||||
client_cond.notify_one();
|
client_cond.notify_all();
|
||||||
OnBufferAvailable();
|
OnBufferAvailable();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
client_cond.notify_one();
|
client_cond.notify_all();
|
||||||
OnBufferAvailable();
|
OnBufferAvailable();
|
||||||
} else
|
} else
|
||||||
wake_cond.wait(lock);
|
wake_cond.wait(lock);
|
||||||
|
Loading…
Reference in New Issue
Block a user