src/output: Move event and spsc_queue into thread object

This commit is contained in:
Shen-Ta Hsieh 2020-12-02 07:33:40 +08:00 committed by Max Kellermann
parent 2974737746
commit db7caa2dac
1 changed files with 33 additions and 39 deletions

View File

@ -118,15 +118,14 @@ inline constexpr const unsigned int kErrorId = -1;
class WasapiOutputThread : public Thread {
public:
enum class Status : uint32_t { FINISH, PLAY, PAUSE };
WasapiOutputThread(std::shared_ptr<WinEvent> _event, ComPtr<IAudioClient> _client,
WasapiOutputThread(IAudioClient *_client,
ComPtr<IAudioRenderClient> &&_render_client,
const UINT32 _frame_size, const UINT32 _buffer_size_in_frames,
bool _is_exclusive,
boost::lockfree::spsc_queue<BYTE> &_spsc_buffer)
: Thread(BIND_THIS_METHOD(Work)), event(std::move(_event)),
client(std::move(_client)), render_client(std::move(_render_client)),
frame_size(_frame_size), buffer_size_in_frames(_buffer_size_in_frames),
is_exclusive(_is_exclusive), spsc_buffer(_spsc_buffer) {}
bool _is_exclusive)
: Thread(BIND_THIS_METHOD(Work)), client(_client),
render_client(std::move(_render_client)), frame_size(_frame_size),
buffer_size_in_frames(_buffer_size_in_frames), is_exclusive(_is_exclusive),
spsc_buffer(_buffer_size_in_frames * 4 * _frame_size) {}
void Finish() noexcept { return SetStatus(Status::FINISH); }
void Play() noexcept { return SetStatus(Status::PLAY); }
void Pause() noexcept { return SetStatus(Status::PAUSE); }
@ -144,13 +143,13 @@ public:
}
private:
std::shared_ptr<WinEvent> event;
ComPtr<IAudioClient> client;
friend class WasapiOutput;
WinEvent event;
IAudioClient *client;
ComPtr<IAudioRenderClient> render_client;
const UINT32 frame_size;
const UINT32 buffer_size_in_frames;
bool is_exclusive;
boost::lockfree::spsc_queue<BYTE> &spsc_buffer;
alignas(BOOST_LOCKFREE_CACHELINE_BYTES) std::atomic<Status> status =
Status::PAUSE;
alignas(BOOST_LOCKFREE_CACHELINE_BYTES) struct {
@ -162,10 +161,11 @@ private:
Cond cond;
std::exception_ptr error_ptr = nullptr;
} error{};
boost::lockfree::spsc_queue<BYTE> spsc_buffer;
void SetStatus(Status s) noexcept {
status.store(s);
event->Set();
event.Set();
}
void Work() noexcept;
};
@ -203,13 +203,11 @@ private:
bool enumerate_devices;
std::string device_config;
std::vector<std::pair<unsigned int, AllocatedString>> device_desc;
std::shared_ptr<WinEvent> event;
ComPtr<IMMDeviceEnumerator> enumerator;
ComPtr<IMMDevice> device;
ComPtr<IAudioClient> client;
WAVEFORMATEXTENSIBLE device_format;
std::unique_ptr<WasapiOutputThread> thread;
std::unique_ptr<boost::lockfree::spsc_queue<BYTE>> spsc_buffer;
std::optional<WasapiOutputThread> thread;
std::size_t watermark;
friend bool wasapi_is_exclusive(WasapiOutput &output) noexcept;
@ -248,7 +246,7 @@ void WasapiOutputThread::Work() noexcept {
COM com{true};
while (true) {
try {
event->Wait(INFINITE);
event.Wait(INFINITE);
Status current_state = status.load();
if (current_state == Status::FINISH) {
@ -330,13 +328,10 @@ void WasapiOutput::DoDisable() noexcept {
err.what());
}
thread.reset();
spsc_buffer.reset();
client.reset();
}
device.reset();
enumerator.reset();
com.reset();
event.reset();
}
/// run inside COMWorkerThread
@ -442,11 +437,6 @@ void WasapiOutput::DoOpen(AudioFormat &audio_format) {
throw FormatHResultError(result, "Unable to get new render client");
}
result = client->SetEventHandle(event->handle());
if (FAILED(result)) {
throw FormatHResultError(result, "Unable to set event handler");
}
UINT32 buffer_size_in_frames;
result = client->GetBufferSize(&buffer_size_in_frames);
if (FAILED(result)) {
@ -455,11 +445,14 @@ void WasapiOutput::DoOpen(AudioFormat &audio_format) {
}
watermark = buffer_size_in_frames * 3 * FrameSize();
spsc_buffer = std::make_unique<boost::lockfree::spsc_queue<BYTE>>(
buffer_size_in_frames * 4 * FrameSize());
thread = std::make_unique<WasapiOutputThread>(
event, client, std::move(render_client), FrameSize(),
buffer_size_in_frames, is_exclusive, *spsc_buffer);
thread.emplace(client.get(), std::move(render_client), FrameSize(),
buffer_size_in_frames, is_exclusive);
result = client->SetEventHandle(thread->event.handle());
if (FAILED(result)) {
throw FormatHResultError(result, "Unable to set event handler");
}
thread->Start();
}
@ -472,7 +465,6 @@ void WasapiOutput::Close() noexcept {
thread.reset();
client.reset();
}).get();
spsc_buffer.reset();
}
std::chrono::steady_clock::duration WasapiOutput::Delay() const noexcept {
@ -480,7 +472,9 @@ std::chrono::steady_clock::duration WasapiOutput::Delay() const noexcept {
return std::chrono::steady_clock::duration::zero();
}
const size_t data_size = spsc_buffer->read_available();
assert(thread);
const size_t data_size = thread->spsc_buffer.read_available();
const size_t delay_size = std::max(data_size, watermark) - watermark;
using s = std::chrono::seconds;
@ -490,13 +484,11 @@ std::chrono::steady_clock::duration WasapiOutput::Delay() const noexcept {
}
size_t WasapiOutput::Play(const void *chunk, size_t size) {
if (!client || !thread) {
return 0;
}
assert(thread);
do {
const size_t consumed_size =
spsc_buffer->push(static_cast<const BYTE *>(chunk), size);
thread->spsc_buffer.push(static_cast<const BYTE *>(chunk), size);
if (consumed_size == 0) {
assert(is_started);
thread->WaitWrite();
@ -523,11 +515,6 @@ size_t WasapiOutput::Play(const void *chunk, size_t size) {
} while (true);
}
void WasapiOutput::Drain() {
spsc_buffer->consume_all([](auto &&) {});
thread->CheckException();
}
bool WasapiOutput::Pause() {
if (!client || !thread) {
return false;
@ -549,6 +536,13 @@ bool WasapiOutput::Pause() {
return true;
}
void WasapiOutput::Drain() {
assert(thread);
thread->spsc_buffer.consume_all([](auto &&) {});
thread->CheckException();
}
/// run inside COMWorkerThread
void WasapiOutput::OpenDevice() {
enumerator.CoCreateInstance(__uuidof(MMDeviceEnumerator), nullptr,