Thread/Thread: replacement library for GThread

This commit is contained in:
Max Kellermann 2013-10-17 18:42:14 +02:00
parent f6d74012b7
commit 8e676db633
21 changed files with 309 additions and 97 deletions

View File

@ -274,6 +274,7 @@ libthread_a_SOURCES = \
src/thread/PosixCond.hxx \
src/thread/WindowsCond.hxx \
src/thread/GLibCond.hxx \
src/thread/Thread.cxx src/thread/Thread.hxx \
src/thread/Id.hxx
# System library
@ -1120,6 +1121,7 @@ test_run_input_LDADD = \
libconf.a \
libutil.a \
libevent.a \
libthread.a \
libsystem.a \
libfs.a \
$(GLIB_LIBS)
@ -1138,6 +1140,7 @@ test_visit_archive_LDADD = \
libconf.a \
libutil.a \
libevent.a \
libthread.a \
libsystem.a \
libfs.a \
$(GLIB_LIBS)
@ -1160,6 +1163,7 @@ test_dump_text_file_LDADD = \
libevent.a \
libfs.a \
libsystem.a \
libthread.a \
libutil.a \
$(GLIB_LIBS)
test_dump_text_file_SOURCES = test/dump_text_file.cxx \
@ -1177,6 +1181,7 @@ test_dump_playlist_LDADD = \
$(TAG_LIBS) \
libconf.a \
libevent.a \
libthread.a \
libsystem.a \
libfs.a \
libutil.a \
@ -1206,6 +1211,7 @@ test_run_decoder_LDADD = \
$(TAG_LIBS) \
libconf.a \
libevent.a \
libthread.a \
libsystem.a \
libfs.a \
libutil.a \
@ -1229,6 +1235,7 @@ test_read_tags_LDADD = \
$(TAG_LIBS) \
libconf.a \
libevent.a \
libthread.a \
libsystem.a \
libfs.a \
libutil.a \
@ -1289,6 +1296,7 @@ test_run_encoder_LDADD = \
$(TAG_LIBS) \
libconf.a \
libpcm.a \
libthread.a \
libsystem.a \
libfs.a \
libutil.a \
@ -1356,6 +1364,7 @@ test_run_output_LDADD = $(MPD_LIBS) \
libevent.a \
libfs.a \
libsystem.a \
libthread.a \
libutil.a \
$(GLIB_LIBS)
test_run_output_SOURCES = test/run_output.cxx \

View File

@ -28,6 +28,8 @@
#include "util/Error.hxx"
#include "Log.hxx"
#include <glib.h>
#include <assert.h>
#include <sys/types.h>
#ifdef WIN32

View File

@ -27,8 +27,7 @@
#include <assert.h>
decoder_control::decoder_control()
:thread(nullptr),
state(DecoderState::STOP),
:state(DecoderState::STOP),
command(DecoderCommand::NONE),
song(nullptr),
replay_gain_db(0), replay_gain_prev_db(0),
@ -124,13 +123,12 @@ decoder_control::Seek(double where)
void
decoder_control::Quit()
{
assert(thread != nullptr);
assert(thread.IsDefined());
quit = true;
LockAsynchronousCommand(DecoderCommand::STOP);
g_thread_join(thread);
thread = nullptr;
thread.Join();
}
void

View File

@ -24,6 +24,7 @@
#include "AudioFormat.hxx"
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include "thread/Thread.hxx"
#include "util/Error.hxx"
#include <assert.h>
@ -37,7 +38,6 @@
struct Song;
class MusicBuffer;
class MusicPipe;
typedef struct _GThread GThread;
enum class DecoderState : uint8_t {
STOP = 0,
@ -54,9 +54,10 @@ enum class DecoderState : uint8_t {
};
struct decoder_control {
/** the handle of the decoder thread, or NULL if the decoder
thread isn't running */
GThread *thread;
/**
* The handle of the decoder thread.
*/
Thread thread;
/**
* This lock protects #state and #command.

View File

@ -440,8 +440,8 @@ decoder_run(struct decoder_control *dc)
}
static gpointer
decoder_task(gpointer arg)
static void
decoder_task(void *arg)
{
struct decoder_control *dc = (struct decoder_control *)arg;
@ -476,23 +476,16 @@ decoder_task(gpointer arg)
} while (dc->command != DecoderCommand::NONE || !dc->quit);
dc->Unlock();
return NULL;
}
void
decoder_thread_start(struct decoder_control *dc)
{
assert(dc->thread == NULL);
assert(!dc->thread.IsDefined());
dc->quit = false;
#if GLIB_CHECK_VERSION(2,32,0)
dc->thread = g_thread_new("thread", decoder_task, dc);
#else
GError *e = NULL;
dc->thread = g_thread_create(decoder_task, dc, true, &e);
if (dc->thread == NULL)
FatalError("Failed to spawn decoder task", e);
#endif
Error error;
if (!dc->thread.Start(decoder_task, dc, error))
FatalError(error);
}

View File

@ -21,10 +21,10 @@
#include "IOThread.hxx"
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include "thread/Thread.hxx"
#include "event/Loop.hxx"
#include "system/FatalError.hxx"
#include <glib.h>
#include "util/Error.hxx"
#include <assert.h>
@ -33,7 +33,7 @@ static struct {
Cond cond;
EventLoop *loop;
GThread *thread;
Thread thread;
} io;
void
@ -45,8 +45,8 @@ io_thread_run(void)
io.loop->Run();
}
static gpointer
io_thread_func(gcc_unused gpointer arg)
static void
io_thread_func(gcc_unused void *arg)
{
/* lock+unlock to synchronize with io_thread_start(), to be
sure that io.thread is set */
@ -54,14 +54,13 @@ io_thread_func(gcc_unused gpointer arg)
io.mutex.unlock();
io_thread_run();
return NULL;
}
void
io_thread_init(void)
{
assert(io.loop == NULL);
assert(io.thread == NULL);
assert(!io.thread.IsDefined());
io.loop = new EventLoop();
}
@ -70,18 +69,13 @@ void
io_thread_start()
{
assert(io.loop != NULL);
assert(io.thread == NULL);
assert(!io.thread.IsDefined());
const ScopeLock protect(io.mutex);
#if GLIB_CHECK_VERSION(2,32,0)
io.thread = g_thread_new("io", io_thread_func, nullptr);
#else
GError *error = nullptr;
io.thread = g_thread_create(io_thread_func, NULL, true, &error);
if (io.thread == NULL)
Error error;
if (!io.thread.Start(io_thread_func, nullptr, error))
FatalError(error);
#endif
}
void
@ -95,10 +89,9 @@ io_thread_quit(void)
void
io_thread_deinit(void)
{
if (io.thread != NULL) {
if (io.thread.IsDefined()) {
io_thread_quit();
g_thread_join(io.thread);
io.thread.Join();
}
delete io.loop;
@ -115,5 +108,5 @@ io_thread_get()
bool
io_thread_inside(void)
{
return io.thread != NULL && g_thread_self() == io.thread;
return io.thread.IsInside();
}

View File

@ -33,6 +33,8 @@
#include "ConfigOption.hxx"
#include "notify.hxx"
#include <glib.h>
#include <assert.h>
#include <string.h>

View File

@ -107,7 +107,7 @@ audio_output_set_replay_gain_mode(struct audio_output *ao,
void
audio_output_enable(struct audio_output *ao)
{
if (ao->thread == NULL) {
if (!ao->thread.IsDefined()) {
if (ao->plugin->enable == NULL) {
/* don't bother to start the thread now if the
device doesn't even have a enable() method;
@ -125,7 +125,7 @@ audio_output_enable(struct audio_output *ao)
void
audio_output_disable(struct audio_output *ao)
{
if (ao->thread == NULL) {
if (!ao->thread.IsDefined()) {
if (ao->plugin->disable == NULL)
ao->really_enabled = false;
else
@ -184,7 +184,7 @@ audio_output_open(struct audio_output *ao,
ao->pipe = &mp;
if (ao->thread == NULL)
if (!ao->thread.IsDefined())
audio_output_thread_start(ao);
ao_command(ao, ao->open ? AO_COMMAND_REOPEN : AO_COMMAND_OPEN);
@ -322,11 +322,10 @@ void audio_output_finish(struct audio_output *ao)
assert(ao->fail_timer == NULL);
if (ao->thread != NULL) {
if (ao->thread.IsDefined()) {
assert(ao->allow_play);
ao_lock_command(ao, AO_COMMAND_KILL);
g_thread_join(ao->thread);
ao->thread = NULL;
ao->thread.Join();
}
audio_output_free(ao);

View File

@ -30,7 +30,7 @@ ao_base_finish(struct audio_output *ao)
{
assert(!ao->open);
assert(ao->fail_timer == NULL);
assert(ao->thread == NULL);
assert(!ao->thread.IsDefined());
if (ao->mixer != NULL)
mixer_free(ao->mixer);
@ -45,7 +45,7 @@ audio_output_free(struct audio_output *ao)
{
assert(!ao->open);
assert(ao->fail_timer == NULL);
assert(ao->thread == NULL);
assert(!ao->thread.IsDefined());
ao_plugin_finish(ao);
}

View File

@ -202,7 +202,6 @@ ao_base_init(struct audio_output *ao,
"Failed to initialize filter chain for '%s'",
ao->name);
ao->thread = NULL;
ao->command = AO_COMMAND_NONE;
ao->mixer = NULL;

View File

@ -24,6 +24,7 @@
#include "pcm/PcmBuffer.hxx"
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include "thread/Thread.hxx"
#include <time.h>
@ -31,7 +32,6 @@ class Error;
class Filter;
class MusicPipe;
struct config_param;
typedef struct _GThread GThread;
typedef struct _GTimer GTimer;
enum audio_output_command {
@ -200,7 +200,7 @@ struct audio_output {
* The thread handle, or NULL if the output thread isn't
* running.
*/
GThread *thread;
Thread thread;
/**
* The next command to be performed by the output thread.

View File

@ -562,7 +562,8 @@ static void ao_pause(struct audio_output *ao)
ao->pause = false;
}
static gpointer audio_output_task(gpointer arg)
static void
audio_output_task(void *arg)
{
struct audio_output *ao = (struct audio_output *)arg;
@ -647,7 +648,7 @@ static gpointer audio_output_task(gpointer arg)
ao->chunk = NULL;
ao_command_finished(ao);
ao->mutex.unlock();
return NULL;
return;
}
if (ao->open && ao->allow_play && ao_play(ao))
@ -664,11 +665,7 @@ void audio_output_thread_start(struct audio_output *ao)
{
assert(ao->command == AO_COMMAND_NONE);
#if GLIB_CHECK_VERSION(2,32,0)
ao->thread = g_thread_new("output", audio_output_task, ao);
#else
GError *e = nullptr;
if (!(ao->thread = g_thread_create(audio_output_task, ao, true, &e)))
FatalError("Failed to spawn output task", e);
#endif
Error error;
if (!ao->thread.Start(audio_output_task, ao, error))
FatalError(error);
}

View File

@ -32,6 +32,8 @@
#include "AudioFormat.hxx"
#include "ReplayGainConfig.hxx"
#include <glib.h>
#define COMMAND_STATUS_STATE "state"
#define COMMAND_STATUS_REPEAT "repeat"
#define COMMAND_STATUS_SINGLE "single"

View File

@ -23,6 +23,8 @@
#include "Song.hxx"
#include "DecoderControl.hxx"
#include <glib.h>
#include <cmath>
#include <assert.h>
@ -31,7 +33,6 @@ player_control::player_control(unsigned _buffer_chunks,
unsigned _buffered_before_play)
:buffer_chunks(_buffer_chunks),
buffered_before_play(_buffered_before_play),
thread(nullptr),
command(PlayerCommand::NONE),
state(PlayerState::STOP),
error_type(PlayerError::NONE),
@ -100,11 +101,10 @@ player_control::UpdateAudio()
void
player_control::Kill()
{
assert(thread != NULL);
assert(thread.IsDefined());
LockSynchronousCommand(PlayerCommand::EXIT);
g_thread_join(thread);
thread = NULL;
thread.Join();
idle_add(IDLE_PLAYER);
}

View File

@ -23,10 +23,9 @@
#include "AudioFormat.hxx"
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include "thread/Thread.hxx"
#include "util/Error.hxx"
#include <glib.h>
#include <stdint.h>
struct Song;
@ -95,9 +94,10 @@ struct player_control {
unsigned int buffered_before_play;
/** the handle of the player thread, or NULL if the player
thread isn't running */
GThread *thread;
/**
* The handle of the player thread.
*/
Thread thread;
/**
* This lock protects #command, #state, #error.
@ -199,7 +199,7 @@ struct player_control {
* prior to calling this function.
*/
void Wait() {
assert(thread == g_thread_self());
assert(thread.IsInside());
cond.wait(mutex);
}
@ -210,7 +210,7 @@ struct player_control {
* Caller must lock the object.
*/
void ClientSignal() {
assert(thread == g_thread_self());
assert(thread.IsInside());
client_cond.signal();
}
@ -222,7 +222,7 @@ struct player_control {
* Caller must lock the object.
*/
void ClientWait() {
assert(thread != g_thread_self());
assert(!thread.IsInside());
client_cond.wait(mutex);
}

View File

@ -1096,8 +1096,8 @@ do_play(player_control &pc, decoder_control &dc,
player.Run();
}
static gpointer
player_task(gpointer arg)
static void
player_task(void *arg)
{
player_control &pc = *(player_control *)arg;
@ -1163,7 +1163,7 @@ player_task(gpointer arg)
audio_output_all_close();
player_command_finished(pc);
return nullptr;
return;
case PlayerCommand::CANCEL:
if (pc.next_song != nullptr) {
@ -1189,14 +1189,9 @@ player_task(gpointer arg)
void
player_create(player_control &pc)
{
assert(pc.thread == nullptr);
assert(!pc.thread.IsDefined());
#if GLIB_CHECK_VERSION(2,32,0)
pc.thread = g_thread_new("player", player_task, &pc);
#else
GError *e = nullptr;
pc.thread = g_thread_create(player_task, &pc, true, &e);
if (pc.thread == nullptr)
FatalError("Failed to spawn player task", e);
#endif
Error error;
if (!pc.thread.Start(player_task, &pc, error))
FatalError(error);
}

View File

@ -33,6 +33,8 @@
#include "ConfigOption.hxx"
#include "Log.hxx"
#include <glib.h>
#include <string.h>
#include <stdlib.h>

View File

@ -35,6 +35,8 @@
#include "util/Error.hxx"
#include "fs/Path.hxx"
#include <glib.h>
#include <string.h>
enum command_return

View File

@ -34,6 +34,7 @@
#include "Instance.hxx"
#include "system/FatalError.hxx"
#include "thread/Id.hxx"
#include "thread/Thread.hxx"
#include <glib.h>
@ -47,7 +48,7 @@ static enum update_progress {
static bool modified;
static GThread *update_thr;
static Thread update_thread;
static const unsigned update_task_id_max = 1 << 15;
@ -62,7 +63,8 @@ isUpdatingDB(void)
return (progress != UPDATE_PROGRESS_IDLE) ? update_task_id : 0;
}
static void * update_task(void *_path)
static void
update_task(void *_path)
{
const char *path = (const char *)_path;
@ -87,7 +89,6 @@ static void * update_task(void *_path)
progress = UPDATE_PROGRESS_DONE;
GlobalEvents::Emit(GlobalEvents::UPDATE);
return NULL;
}
static void
@ -98,14 +99,9 @@ spawn_update_task(const char *path)
progress = UPDATE_PROGRESS_RUNNING;
modified = false;
#if GLIB_CHECK_VERSION(2,32,0)
update_thr = g_thread_new("update", update_task, g_strdup(path));
#else
GError *e = NULL;
update_thr = g_thread_create(update_task, g_strdup(path), TRUE, &e);
if (update_thr == NULL)
FatalError("Failed to spawn update task", e);
#endif
Error error;
if (!update_thread.Start(update_task, g_strdup(path), error))
FatalError(error);
if (++update_task_id > update_task_id_max)
update_task_id = 1;
@ -147,7 +143,7 @@ static void update_finished_event(void)
assert(progress == UPDATE_PROGRESS_DONE);
g_thread_join(update_thr);
update_thread.Join();
idle_add(IDLE_UPDATE);

108
src/thread/Thread.cxx Normal file
View File

@ -0,0 +1,108 @@
/*
* Copyright (C) 2003-2013 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "config.h"
#include "Thread.hxx"
#include "util/Error.hxx"
bool
Thread::Start(void (*_f)(void *ctx), void *_ctx, Error &error)
{
assert(!IsDefined());
f = _f;
ctx = _ctx;
#ifdef WIN32
handle = ::CreateThread(nullptr, 0, ThreadProc, this, 0, &id);
if (handle == nullptr) {
error.SetLastError("Failed to create thread");
return false;
}
#else
#ifndef NDEBUG
creating = true;
#endif
int e = pthread_create(&handle, nullptr, ThreadProc, this);
if (e != 0) {
#ifndef NDEBUG
creating = false;
#endif
error.SetErrno(e, "Failed to create thread");
return false;
}
defined = true;
#ifndef NDEBUG
creating = false;
#endif
#endif
return true;
}
void
Thread::Join()
{
assert(IsDefined());
assert(!IsInside());
#ifdef WIN32
::WaitForSingleObject(handle, INFINITE);
::CloseHandle(handle);
handle = nullptr;
#else
pthread_join(handle, nullptr);
defined = false;
#endif
}
#ifdef WIN32
DWORD WINAPI
Thread::ThreadProc(LPVOID ctx)
{
Thread &thread = *(Thread *)ctx;
thread.f(thread.ctx);
return 0;
}
#else
void *
Thread::ThreadProc(void *ctx)
{
Thread &thread = *(Thread *)ctx;
#ifndef NDEBUG
/* this works around a race condition that causes an assertion
failure due to IsInside() spuriously returning false right
after the thread has been created, and the calling thread
hasn't initialised "defined" yet */
thread.defined = true;
#endif
thread.f(thread.ctx);
return nullptr;
}
#endif

114
src/thread/Thread.hxx Normal file
View File

@ -0,0 +1,114 @@
/*
* Copyright (C) 2003-2013 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef MPD_THREAD_HXX
#define MPD_THREAD_HXX
#include "check.h"
#include "Compiler.h"
#ifdef WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif
#include <assert.h>
class Error;
class Thread {
#ifdef WIN32
HANDLE handle;
DWORD id;
#else
pthread_t handle;
bool defined;
#ifndef NDEBUG
/**
* The thread is currently being created. This is a workaround for
* IsInside(), which may return false until pthread_create() has
* initialised the #handle.
*/
bool creating;
#endif
#endif
void (*f)(void *ctx);
void *ctx;
public:
#ifdef WIN32
Thread():handle(nullptr) {}
#else
Thread():defined(false) {
#ifndef NDEBUG
creating = false;
#endif
}
#endif
Thread(const Thread &) = delete;
#ifndef NDEBUG
virtual ~Thread() {
/* all Thread objects must be destructed manually by calling
Join(), to clean up */
assert(!IsDefined());
}
#endif
bool IsDefined() const {
#ifdef WIN32
return handle != nullptr;
#else
return defined;
#endif
}
/**
* Check if this thread is the current thread.
*/
gcc_pure
bool IsInside() const {
#ifdef WIN32
return GetCurrentThreadId() == id;
#else
#ifdef NDEBUG
constexpr bool creating = false;
#endif
return IsDefined() && (creating ||
pthread_equal(pthread_self(), handle));
#endif
}
bool Start(void (*f)(void *ctx), void *ctx, Error &error);
void Join();
private:
#ifdef WIN32
static DWORD WINAPI ThreadProc(LPVOID ctx);
#else
static void *ThreadProc(void *ctx);
#endif
};
#endif