thread/Thread: use BoundMethod

This commit is contained in:
Max Kellermann 2017-02-10 22:41:11 +01:00
parent 752ff12c37
commit 8649ea3d6f
18 changed files with 99 additions and 120 deletions

View File

@ -27,12 +27,17 @@
#include <assert.h> #include <assert.h>
static struct { static struct IOThread {
Mutex mutex; Mutex mutex;
Cond cond; Cond cond;
EventLoop *loop; EventLoop *loop;
Thread thread; Thread thread;
IOThread():thread(BIND_THIS_METHOD(Run)) {}
private:
void Run() noexcept;
} io; } io;
void void
@ -44,15 +49,15 @@ io_thread_run(void)
io.loop->Run(); io.loop->Run();
} }
static void inline void
io_thread_func(gcc_unused void *arg) IOThread::Run() noexcept
{ {
SetThreadName("io"); SetThreadName("io");
/* lock+unlock to synchronize with io_thread_start(), to be /* lock+unlock to synchronize with io_thread_start(), to be
sure that io.thread is set */ sure that io.thread is set */
io.mutex.lock(); mutex.lock();
io.mutex.unlock(); mutex.unlock();
io_thread_run(); io_thread_run();
} }
@ -73,7 +78,7 @@ io_thread_start()
assert(!io.thread.IsDefined()); assert(!io.thread.IsDefined());
const std::lock_guard<Mutex> protect(io.mutex); const std::lock_guard<Mutex> protect(io.mutex);
io.thread.Start(io_thread_func, nullptr); io.thread.Start();
} }
void void

View File

@ -43,6 +43,7 @@ UpdateService::UpdateService(EventLoop &_loop, SimpleDatabase &_db,
:DeferredMonitor(_loop), :DeferredMonitor(_loop),
db(_db), storage(_storage), db(_db), storage(_storage),
listener(_listener), listener(_listener),
update_thread(BIND_THIS_METHOD(Task)),
update_task_id(0), update_task_id(0),
walk(nullptr) walk(nullptr)
{ {
@ -140,13 +141,6 @@ UpdateService::Task()
DeferredMonitor::Schedule(); DeferredMonitor::Schedule();
} }
void
UpdateService::Task(void *ctx)
{
UpdateService &service = *(UpdateService *)ctx;
return service.Task();
}
void void
UpdateService::StartThread(UpdateQueueItem &&i) UpdateService::StartThread(UpdateQueueItem &&i)
{ {
@ -158,7 +152,7 @@ UpdateService::StartThread(UpdateQueueItem &&i)
next = std::move(i); next = std::move(i);
walk = new UpdateWalk(GetEventLoop(), listener, *next.storage); walk = new UpdateWalk(GetEventLoop(), listener, *next.storage);
update_thread.Start(Task, this); update_thread.Start();
FormatDebug(update_domain, FormatDebug(update_domain,
"spawned thread for update job id %i", next.id); "spawned thread for update job id %i", next.id);

View File

@ -98,7 +98,6 @@ private:
/* the update thread */ /* the update thread */
void Task(); void Task();
static void Task(void *ctx);
void StartThread(UpdateQueueItem &&i); void StartThread(UpdateQueueItem &&i);

View File

@ -30,7 +30,8 @@
DecoderControl::DecoderControl(Mutex &_mutex, Cond &_client_cond, DecoderControl::DecoderControl(Mutex &_mutex, Cond &_client_cond,
const AudioFormat _configured_audio_format, const AudioFormat _configured_audio_format,
const ReplayGainConfig &_replay_gain_config) const ReplayGainConfig &_replay_gain_config)
:mutex(_mutex), client_cond(_client_cond), :thread(BIND_THIS_METHOD(RunThread)),
mutex(_mutex), client_cond(_client_cond),
configured_audio_format(_configured_audio_format), configured_audio_format(_configured_audio_format),
replay_gain_config(_replay_gain_config) {} replay_gain_config(_replay_gain_config) {}

View File

@ -415,6 +415,9 @@ public:
* mixramp_start/mixramp_end. * mixramp_start/mixramp_end.
*/ */
void CycleMixRamp(); void CycleMixRamp();
private:
void RunThread();
}; };
#endif #endif

View File

@ -513,30 +513,28 @@ try {
dc.client_cond.signal(); dc.client_cond.signal();
} }
static void void
decoder_task(void *arg) DecoderControl::RunThread()
{ {
DecoderControl &dc = *(DecoderControl *)arg;
SetThreadName("decoder"); SetThreadName("decoder");
const std::lock_guard<Mutex> protect(dc.mutex); const std::lock_guard<Mutex> protect(mutex);
do { do {
assert(dc.state == DecoderState::STOP || assert(state == DecoderState::STOP ||
dc.state == DecoderState::ERROR); state == DecoderState::ERROR);
switch (dc.command) { switch (command) {
case DecoderCommand::START: case DecoderCommand::START:
dc.CycleMixRamp(); CycleMixRamp();
dc.replay_gain_prev_db = dc.replay_gain_db; replay_gain_prev_db = replay_gain_db;
dc.replay_gain_db = 0; replay_gain_db = 0;
decoder_run(dc); decoder_run(*this);
if (dc.state == DecoderState::ERROR) { if (state == DecoderState::ERROR) {
try { try {
std::rethrow_exception(dc.error); std::rethrow_exception(error);
} catch (const std::exception &e) { } catch (const std::exception &e) {
LogError(e); LogError(e);
} catch (...) { } catch (...) {
@ -552,20 +550,20 @@ decoder_task(void *arg)
/* we need to clear the pipe here; usually the /* we need to clear the pipe here; usually the
PlayerThread is responsible, but it is not PlayerThread is responsible, but it is not
aware that the decoder has finished */ aware that the decoder has finished */
dc.pipe->Clear(*dc.buffer); pipe->Clear(*buffer);
decoder_run(dc); decoder_run(*this);
break; break;
case DecoderCommand::STOP: case DecoderCommand::STOP:
dc.CommandFinishedLocked(); CommandFinishedLocked();
break; break;
case DecoderCommand::NONE: case DecoderCommand::NONE:
dc.Wait(); Wait();
break; break;
} }
} while (dc.command != DecoderCommand::NONE || !dc.quit); } while (command != DecoderCommand::NONE || !quit);
} }
void void
@ -574,5 +572,5 @@ decoder_thread_start(DecoderControl &dc)
assert(!dc.thread.IsDefined()); assert(!dc.thread.IsDefined());
dc.quit = false; dc.quit = false;
dc.thread.Start(decoder_task, &dc); dc.thread.Start();
} }

View File

@ -54,10 +54,10 @@ ThreadInputStream::Start()
assert(p != nullptr); assert(p != nullptr);
buffer = new CircularBuffer<uint8_t>((uint8_t *)p, buffer_size); buffer = new CircularBuffer<uint8_t>((uint8_t *)p, buffer_size);
thread.Start(ThreadFunc, this); thread.Start();
} }
inline void void
ThreadInputStream::ThreadFunc() ThreadInputStream::ThreadFunc()
{ {
FormatThreadName("input:%s", plugin); FormatThreadName("input:%s", plugin);
@ -107,13 +107,6 @@ ThreadInputStream::ThreadFunc()
Close(); Close();
} }
void
ThreadInputStream::ThreadFunc(void *ctx)
{
ThreadInputStream &tis = *(ThreadInputStream *)ctx;
tis.ThreadFunc();
}
void void
ThreadInputStream::Check() ThreadInputStream::Check()
{ {

View File

@ -73,6 +73,7 @@ public:
size_t _buffer_size) size_t _buffer_size)
:InputStream(_uri, _mutex, _cond), :InputStream(_uri, _mutex, _cond),
plugin(_plugin), plugin(_plugin),
thread(BIND_THIS_METHOD(ThreadFunc)),
buffer_size(_buffer_size) {} buffer_size(_buffer_size) {}
virtual ~ThreadInputStream(); virtual ~ThreadInputStream();
@ -138,7 +139,6 @@ protected:
private: private:
void ThreadFunc(); void ThreadFunc();
static void ThreadFunc(void *ctx);
}; };
#endif #endif

View File

@ -69,7 +69,8 @@ class SmbclientNeighborExplorer final : public NeighborExplorer {
public: public:
SmbclientNeighborExplorer(NeighborListener &_listener) SmbclientNeighborExplorer(NeighborListener &_listener)
:NeighborExplorer(_listener) {} :NeighborExplorer(_listener),
thread(BIND_THIS_METHOD(ThreadFunc)) {}
/* virtual methods from class NeighborExplorer */ /* virtual methods from class NeighborExplorer */
void Open() override; void Open() override;
@ -79,14 +80,13 @@ public:
private: private:
void Run(); void Run();
void ThreadFunc(); void ThreadFunc();
static void ThreadFunc(void *ctx);
}; };
void void
SmbclientNeighborExplorer::Open() SmbclientNeighborExplorer::Open()
{ {
quit = false; quit = false;
thread.Start(ThreadFunc, this); thread.Start();
} }
void void
@ -239,6 +239,8 @@ SmbclientNeighborExplorer::Run()
inline void inline void
SmbclientNeighborExplorer::ThreadFunc() SmbclientNeighborExplorer::ThreadFunc()
{ {
SetThreadName("smbclient");
mutex.lock(); mutex.lock();
while (!quit) { while (!quit) {
@ -257,15 +259,6 @@ SmbclientNeighborExplorer::ThreadFunc()
mutex.unlock(); mutex.unlock();
} }
void
SmbclientNeighborExplorer::ThreadFunc(void *ctx)
{
SetThreadName("smbclient");
SmbclientNeighborExplorer &e = *(SmbclientNeighborExplorer *)ctx;
e.ThreadFunc();
}
static NeighborExplorer * static NeighborExplorer *
smbclient_neighbor_create(gcc_unused EventLoop &loop, smbclient_neighbor_create(gcc_unused EventLoop &loop,
NeighborListener &listener, NeighborListener &listener,

View File

@ -51,7 +51,8 @@
AudioOutput::AudioOutput(const AudioOutputPlugin &_plugin, AudioOutput::AudioOutput(const AudioOutputPlugin &_plugin,
const ConfigBlock &block) const ConfigBlock &block)
:plugin(_plugin) :plugin(_plugin),
thread(BIND_THIS_METHOD(Task))
{ {
assert(plugin.finish != nullptr); assert(plugin.finish != nullptr);
assert(plugin.open != nullptr); assert(plugin.open != nullptr);

View File

@ -515,7 +515,6 @@ private:
* The OutputThread. * The OutputThread.
*/ */
void Task(); void Task();
static void Task(void *arg);
}; };
/** /**

View File

@ -396,7 +396,7 @@ AudioOutput::Pause()
pause = false; pause = false;
} }
inline void void
AudioOutput::Task() AudioOutput::Task()
{ {
FormatThreadName("output:%s", name); FormatThreadName("output:%s", name);
@ -512,17 +512,10 @@ AudioOutput::Task()
} }
} }
void
AudioOutput::Task(void *arg)
{
AudioOutput *ao = (AudioOutput *)arg;
ao->Task();
}
void void
AudioOutput::StartThread() AudioOutput::StartThread()
{ {
assert(command == Command::NONE); assert(command == Command::NONE);
thread.Start(Task, this); thread.Start();
} }

View File

@ -37,6 +37,7 @@ PlayerControl::PlayerControl(PlayerListener &_listener,
buffer_chunks(_buffer_chunks), buffer_chunks(_buffer_chunks),
buffered_before_play(_buffered_before_play), buffered_before_play(_buffered_before_play),
configured_audio_format(_configured_audio_format), configured_audio_format(_configured_audio_format),
thread(BIND_THIS_METHOD(RunThread)),
replay_gain_config(_replay_gain_config) replay_gain_config(_replay_gain_config)
{ {
} }

View File

@ -537,6 +537,9 @@ public:
void ApplyEnabled() override { void ApplyEnabled() override {
LockUpdateAudio(); LockUpdateAudio();
} }
private:
void RunThread();
}; };
#endif #endif

View File

@ -1147,91 +1147,89 @@ do_play(PlayerControl &pc, DecoderControl &dc,
player.Run(); player.Run();
} }
static void void
player_task(void *arg) PlayerControl::RunThread()
{ {
PlayerControl &pc = *(PlayerControl *)arg;
SetThreadName("player"); SetThreadName("player");
DecoderControl dc(pc.mutex, pc.cond, DecoderControl dc(mutex, cond,
pc.configured_audio_format, configured_audio_format,
pc.replay_gain_config); replay_gain_config);
decoder_thread_start(dc); decoder_thread_start(dc);
MusicBuffer buffer(pc.buffer_chunks); MusicBuffer buffer(buffer_chunks);
pc.Lock(); Lock();
while (1) { while (1) {
switch (pc.command) { switch (command) {
case PlayerCommand::SEEK: case PlayerCommand::SEEK:
case PlayerCommand::QUEUE: case PlayerCommand::QUEUE:
assert(pc.next_song != nullptr); assert(next_song != nullptr);
pc.Unlock(); Unlock();
do_play(pc, dc, buffer); do_play(*this, dc, buffer);
pc.listener.OnPlayerSync(); listener.OnPlayerSync();
pc.Lock(); Lock();
break; break;
case PlayerCommand::STOP: case PlayerCommand::STOP:
pc.Unlock(); Unlock();
pc.outputs.Cancel(); outputs.Cancel();
pc.Lock(); Lock();
/* fall through */ /* fall through */
case PlayerCommand::PAUSE: case PlayerCommand::PAUSE:
delete pc.next_song; delete next_song;
pc.next_song = nullptr; next_song = nullptr;
pc.CommandFinished(); CommandFinished();
break; break;
case PlayerCommand::CLOSE_AUDIO: case PlayerCommand::CLOSE_AUDIO:
pc.Unlock(); Unlock();
pc.outputs.Release(); outputs.Release();
pc.Lock(); Lock();
pc.CommandFinished(); CommandFinished();
assert(buffer.IsEmptyUnsafe()); assert(buffer.IsEmptyUnsafe());
break; break;
case PlayerCommand::UPDATE_AUDIO: case PlayerCommand::UPDATE_AUDIO:
pc.Unlock(); Unlock();
pc.outputs.EnableDisable(); outputs.EnableDisable();
pc.Lock(); Lock();
pc.CommandFinished(); CommandFinished();
break; break;
case PlayerCommand::EXIT: case PlayerCommand::EXIT:
pc.Unlock(); Unlock();
dc.Quit(); dc.Quit();
pc.outputs.Close(); outputs.Close();
pc.LockCommandFinished(); LockCommandFinished();
return; return;
case PlayerCommand::CANCEL: case PlayerCommand::CANCEL:
delete pc.next_song; delete next_song;
pc.next_song = nullptr; next_song = nullptr;
pc.CommandFinished(); CommandFinished();
break; break;
case PlayerCommand::REFRESH: case PlayerCommand::REFRESH:
/* no-op when not playing */ /* no-op when not playing */
pc.CommandFinished(); CommandFinished();
break; break;
case PlayerCommand::NONE: case PlayerCommand::NONE:
pc.Wait(); Wait();
break; break;
} }
} }
@ -1242,5 +1240,5 @@ StartPlayerThread(PlayerControl &pc)
{ {
assert(!pc.thread.IsDefined()); assert(!pc.thread.IsDefined());
pc.thread.Start(player_task, &pc); pc.thread.Start();
} }

View File

@ -25,14 +25,11 @@
#include "java/Global.hxx" #include "java/Global.hxx"
#endif #endif
bool void
Thread::Start(void (*_f)(void *ctx), void *_ctx) Thread::Start()
{ {
assert(!IsDefined()); assert(!IsDefined());
f = _f;
ctx = _ctx;
#ifdef _WIN32 #ifdef _WIN32
handle = ::CreateThread(nullptr, 0, ThreadProc, this, 0, &id); handle = ::CreateThread(nullptr, 0, ThreadProc, this, 0, &id);
if (handle == nullptr) if (handle == nullptr)
@ -56,8 +53,6 @@ Thread::Start(void (*_f)(void *ctx), void *_ctx)
creating = false; creating = false;
#endif #endif
#endif #endif
return true;
} }
void void
@ -89,7 +84,7 @@ Thread::Run()
#endif #endif
#endif #endif
f(ctx); f();
#ifdef ANDROID #ifdef ANDROID
Java::DetachCurrentThread(); Java::DetachCurrentThread();

View File

@ -21,6 +21,7 @@
#define MPD_THREAD_HXX #define MPD_THREAD_HXX
#include "check.h" #include "check.h"
#include "util/BindMethod.hxx"
#include "Compiler.h" #include "Compiler.h"
#ifdef _WIN32 #ifdef _WIN32
@ -32,6 +33,9 @@
#include <assert.h> #include <assert.h>
class Thread { class Thread {
typedef BoundMethod<void()> Function;
const Function f;
#ifdef _WIN32 #ifdef _WIN32
HANDLE handle = nullptr; HANDLE handle = nullptr;
DWORD id; DWORD id;
@ -49,11 +53,8 @@ class Thread {
#endif #endif
#endif #endif
void (*f)(void *ctx);
void *ctx;
public: public:
Thread() = default; explicit Thread(Function _f):f(_f) {}
Thread(const Thread &) = delete; Thread(const Thread &) = delete;
@ -89,7 +90,7 @@ public:
#endif #endif
} }
bool Start(void (*f)(void *ctx), void *ctx); void Start();
void Join(); void Join();
private: private:

View File

@ -44,6 +44,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
void AudioOutput::Task() {}
class DummyAudioOutputClient final : public AudioOutputClient { class DummyAudioOutputClient final : public AudioOutputClient {
public: public:
/* virtual methods from AudioOutputClient */ /* virtual methods from AudioOutputClient */