From db7caa2dac03ec114b99ec7d49d10d6f81b73115 Mon Sep 17 00:00:00 2001 From: Shen-Ta Hsieh Date: Wed, 2 Dec 2020 07:33:40 +0800 Subject: [PATCH] src/output: Move event and spsc_queue into thread object --- src/output/plugins/WasapiOutputPlugin.cxx | 72 +++++++++++------------ 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/src/output/plugins/WasapiOutputPlugin.cxx b/src/output/plugins/WasapiOutputPlugin.cxx index 8c52bf15f..30eeb2c75 100644 --- a/src/output/plugins/WasapiOutputPlugin.cxx +++ b/src/output/plugins/WasapiOutputPlugin.cxx @@ -118,15 +118,14 @@ inline constexpr const unsigned int kErrorId = -1; class WasapiOutputThread : public Thread { public: enum class Status : uint32_t { FINISH, PLAY, PAUSE }; - WasapiOutputThread(std::shared_ptr _event, ComPtr _client, + WasapiOutputThread(IAudioClient *_client, ComPtr &&_render_client, const UINT32 _frame_size, const UINT32 _buffer_size_in_frames, - bool _is_exclusive, - boost::lockfree::spsc_queue &_spsc_buffer) - : Thread(BIND_THIS_METHOD(Work)), event(std::move(_event)), - client(std::move(_client)), render_client(std::move(_render_client)), - frame_size(_frame_size), buffer_size_in_frames(_buffer_size_in_frames), - is_exclusive(_is_exclusive), spsc_buffer(_spsc_buffer) {} + bool _is_exclusive) + : Thread(BIND_THIS_METHOD(Work)), client(_client), + render_client(std::move(_render_client)), frame_size(_frame_size), + buffer_size_in_frames(_buffer_size_in_frames), is_exclusive(_is_exclusive), + spsc_buffer(_buffer_size_in_frames * 4 * _frame_size) {} void Finish() noexcept { return SetStatus(Status::FINISH); } void Play() noexcept { return SetStatus(Status::PLAY); } void Pause() noexcept { return SetStatus(Status::PAUSE); } @@ -144,13 +143,13 @@ public: } private: - std::shared_ptr event; - ComPtr client; + friend class WasapiOutput; + WinEvent event; + IAudioClient *client; ComPtr render_client; const UINT32 frame_size; const UINT32 buffer_size_in_frames; bool is_exclusive; - boost::lockfree::spsc_queue &spsc_buffer; alignas(BOOST_LOCKFREE_CACHELINE_BYTES) std::atomic status = Status::PAUSE; alignas(BOOST_LOCKFREE_CACHELINE_BYTES) struct { @@ -162,10 +161,11 @@ private: Cond cond; std::exception_ptr error_ptr = nullptr; } error{}; + boost::lockfree::spsc_queue spsc_buffer; void SetStatus(Status s) noexcept { status.store(s); - event->Set(); + event.Set(); } void Work() noexcept; }; @@ -203,13 +203,11 @@ private: bool enumerate_devices; std::string device_config; std::vector> device_desc; - std::shared_ptr event; ComPtr enumerator; ComPtr device; ComPtr client; WAVEFORMATEXTENSIBLE device_format; - std::unique_ptr thread; - std::unique_ptr> spsc_buffer; + std::optional thread; std::size_t watermark; friend bool wasapi_is_exclusive(WasapiOutput &output) noexcept; @@ -248,7 +246,7 @@ void WasapiOutputThread::Work() noexcept { COM com{true}; while (true) { try { - event->Wait(INFINITE); + event.Wait(INFINITE); Status current_state = status.load(); if (current_state == Status::FINISH) { @@ -330,13 +328,10 @@ void WasapiOutput::DoDisable() noexcept { err.what()); } thread.reset(); - spsc_buffer.reset(); client.reset(); } device.reset(); enumerator.reset(); - com.reset(); - event.reset(); } /// run inside COMWorkerThread @@ -442,11 +437,6 @@ void WasapiOutput::DoOpen(AudioFormat &audio_format) { throw FormatHResultError(result, "Unable to get new render client"); } - result = client->SetEventHandle(event->handle()); - if (FAILED(result)) { - throw FormatHResultError(result, "Unable to set event handler"); - } - UINT32 buffer_size_in_frames; result = client->GetBufferSize(&buffer_size_in_frames); if (FAILED(result)) { @@ -455,11 +445,14 @@ void WasapiOutput::DoOpen(AudioFormat &audio_format) { } watermark = buffer_size_in_frames * 3 * FrameSize(); - spsc_buffer = std::make_unique>( - buffer_size_in_frames * 4 * FrameSize()); - thread = std::make_unique( - event, client, std::move(render_client), FrameSize(), - buffer_size_in_frames, is_exclusive, *spsc_buffer); + thread.emplace(client.get(), std::move(render_client), FrameSize(), + buffer_size_in_frames, is_exclusive); + + result = client->SetEventHandle(thread->event.handle()); + if (FAILED(result)) { + throw FormatHResultError(result, "Unable to set event handler"); + } + thread->Start(); } @@ -472,7 +465,6 @@ void WasapiOutput::Close() noexcept { thread.reset(); client.reset(); }).get(); - spsc_buffer.reset(); } std::chrono::steady_clock::duration WasapiOutput::Delay() const noexcept { @@ -480,7 +472,9 @@ std::chrono::steady_clock::duration WasapiOutput::Delay() const noexcept { return std::chrono::steady_clock::duration::zero(); } - const size_t data_size = spsc_buffer->read_available(); + assert(thread); + + const size_t data_size = thread->spsc_buffer.read_available(); const size_t delay_size = std::max(data_size, watermark) - watermark; using s = std::chrono::seconds; @@ -490,13 +484,11 @@ std::chrono::steady_clock::duration WasapiOutput::Delay() const noexcept { } size_t WasapiOutput::Play(const void *chunk, size_t size) { - if (!client || !thread) { - return 0; - } + assert(thread); do { const size_t consumed_size = - spsc_buffer->push(static_cast(chunk), size); + thread->spsc_buffer.push(static_cast(chunk), size); if (consumed_size == 0) { assert(is_started); thread->WaitWrite(); @@ -523,11 +515,6 @@ size_t WasapiOutput::Play(const void *chunk, size_t size) { } while (true); } -void WasapiOutput::Drain() { - spsc_buffer->consume_all([](auto &&) {}); - thread->CheckException(); -} - bool WasapiOutput::Pause() { if (!client || !thread) { return false; @@ -549,6 +536,13 @@ bool WasapiOutput::Pause() { return true; } +void WasapiOutput::Drain() { + assert(thread); + + thread->spsc_buffer.consume_all([](auto &&) {}); + thread->CheckException(); +} + /// run inside COMWorkerThread void WasapiOutput::OpenDevice() { enumerator.CoCreateInstance(__uuidof(MMDeviceEnumerator), nullptr,