204 lines
4.9 KiB
C++
204 lines
4.9 KiB
C++
/*
|
|
* Copyright 2003-2017 The Music Player Daemon Project
|
|
* http://www.musicpd.org
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation; either version 2 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License along
|
|
* with this program; if not, write to the Free Software Foundation, Inc.,
|
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
*/
|
|
|
|
#ifndef _WORKQUEUE_H_INCLUDED_
|
|
#define _WORKQUEUE_H_INCLUDED_
|
|
|
|
#include "thread/Mutex.hxx"
|
|
#include "thread/Cond.hxx"
|
|
|
|
#include <assert.h>
|
|
#include <pthread.h>
|
|
|
|
#include <string>
|
|
#include <queue>
|
|
|
|
#define LOGINFO(X)
|
|
#define LOGERR(X)
|
|
|
|
/**
|
|
* A WorkQueue manages the synchronisation around a queue of work items,
|
|
* where a number of client threads queue tasks and a number of worker
|
|
* threads take and execute them. The goal is to introduce some level
|
|
* of parallelism between the successive steps of a previously single
|
|
* threaded pipeline. For example data extraction / data preparation / index
|
|
* update, but this could have other uses.
|
|
*
|
|
* There is no individual task status return. In case of fatal error,
|
|
* the client or worker sets an end condition on the queue. A second
|
|
* queue could conceivably be used for returning individual task
|
|
* status.
|
|
*/
|
|
template <class T>
|
|
class WorkQueue {
|
|
// Configuration
|
|
const std::string name;
|
|
|
|
// Status
|
|
// Worker threads having called exit
|
|
unsigned n_workers_exited = 0;
|
|
bool ok = false;
|
|
|
|
unsigned n_threads = 0;
|
|
pthread_t *threads = nullptr;
|
|
|
|
// Synchronization
|
|
std::queue<T> queue;
|
|
Cond client_cond;
|
|
Cond worker_cond;
|
|
Mutex mutex;
|
|
|
|
public:
|
|
/** Create a WorkQueue
|
|
* @param _name for message printing
|
|
*/
|
|
explicit WorkQueue(const char *_name)
|
|
:name(_name)
|
|
{
|
|
}
|
|
|
|
~WorkQueue() {
|
|
setTerminateAndWait();
|
|
}
|
|
|
|
WorkQueue(const WorkQueue &) = delete;
|
|
WorkQueue &operator=(const WorkQueue &) = delete;
|
|
|
|
/** Start the worker threads.
|
|
*
|
|
* @param nworkers number of threads copies to start.
|
|
* @param workproc thread function. It should loop
|
|
* taking (QueueWorker::take()) and executing tasks.
|
|
* @param arg initial parameter to thread function.
|
|
* @return true if ok.
|
|
*/
|
|
bool start(unsigned nworkers, void *(*workproc)(void *), void *arg)
|
|
{
|
|
const std::lock_guard<Mutex> protect(mutex);
|
|
|
|
assert(nworkers > 0);
|
|
assert(!ok);
|
|
assert(n_threads == 0);
|
|
assert(threads == nullptr);
|
|
|
|
n_threads = nworkers;
|
|
threads = new pthread_t[n_threads];
|
|
|
|
for (unsigned i = 0; i < nworkers; i++) {
|
|
int err;
|
|
if ((err = pthread_create(&threads[i], 0, workproc, arg))) {
|
|
LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
|
|
name.c_str(), err));
|
|
return false;
|
|
}
|
|
}
|
|
|
|
ok = true;
|
|
return true;
|
|
}
|
|
|
|
/** Add item to work queue, called from client.
|
|
*
|
|
* Sleeps if there are already too many.
|
|
*/
|
|
template<typename U>
|
|
bool put(U &&u)
|
|
{
|
|
const std::lock_guard<Mutex> protect(mutex);
|
|
|
|
queue.emplace(std::forward<U>(u));
|
|
|
|
// Just wake one worker, there is only one new task.
|
|
worker_cond.signal();
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/** Tell the workers to exit, and wait for them.
|
|
*/
|
|
void setTerminateAndWait()
|
|
{
|
|
const std::lock_guard<Mutex> protect(mutex);
|
|
|
|
// Wait for all worker threads to have called workerExit()
|
|
ok = false;
|
|
while (n_workers_exited < n_threads) {
|
|
worker_cond.broadcast();
|
|
client_cond.wait(mutex);
|
|
}
|
|
|
|
// Perform the thread joins and compute overall status
|
|
// Workers return (void*)1 if ok
|
|
for (unsigned i = 0; i < n_threads; ++i) {
|
|
void *status;
|
|
pthread_join(threads[i], &status);
|
|
}
|
|
|
|
delete[] threads;
|
|
threads = nullptr;
|
|
n_threads = 0;
|
|
|
|
// Reset to start state.
|
|
n_workers_exited = 0;
|
|
}
|
|
|
|
/** Take task from queue. Called from worker.
|
|
*
|
|
* Sleeps if there are not enough. Signal if we go to sleep on empty
|
|
* queue: client may be waiting for our going idle.
|
|
*/
|
|
bool take(T &tp)
|
|
{
|
|
const std::lock_guard<Mutex> protect(mutex);
|
|
|
|
if (!ok)
|
|
return false;
|
|
|
|
while (queue.empty()) {
|
|
worker_cond.wait(mutex);
|
|
if (!ok)
|
|
return false;
|
|
}
|
|
|
|
tp = std::move(queue.front());
|
|
queue.pop();
|
|
return true;
|
|
}
|
|
|
|
/** Advertise exit and abort queue. Called from worker
|
|
*
|
|
* This would happen after an unrecoverable error, or when
|
|
* the queue is terminated by the client. Workers never exit normally,
|
|
* except when the queue is shut down (at which point ok is set to
|
|
* false by the shutdown code anyway). The thread must return/exit
|
|
* immediately after calling this.
|
|
*/
|
|
void workerExit()
|
|
{
|
|
const std::lock_guard<Mutex> protect(mutex);
|
|
|
|
n_workers_exited++;
|
|
ok = false;
|
|
client_cond.broadcast();
|
|
}
|
|
};
|
|
|
|
#endif /* _WORKQUEUE_H_INCLUDED_ */
|