2013-11-01 19:26:01 +01:00
|
|
|
/*
|
|
|
|
* Copyright (C) 2003-2014 The Music Player Daemon Project
|
|
|
|
* http://www.musicpd.org
|
|
|
|
*
|
|
|
|
* This program is free software; you can redistribute it and/or modify
|
|
|
|
* it under the terms of the GNU General Public License as published by
|
|
|
|
* the Free Software Foundation; either version 2 of the License, or
|
|
|
|
* (at your option) any later version.
|
|
|
|
*
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
* GNU General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU General Public License along
|
|
|
|
* with this program; if not, write to the Free Software Foundation, Inc.,
|
|
|
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
|
|
*/
|
|
|
|
|
|
|
|
#ifndef _WORKQUEUE_H_INCLUDED_
|
|
|
|
#define _WORKQUEUE_H_INCLUDED_
|
|
|
|
|
|
|
|
#include "thread/Mutex.hxx"
|
|
|
|
#include "thread/Cond.hxx"
|
|
|
|
|
|
|
|
#include <pthread.h>
|
|
|
|
#include <time.h>
|
|
|
|
|
|
|
|
#include <string>
|
|
|
|
#include <queue>
|
2014-01-14 10:47:42 +01:00
|
|
|
#include <list>
|
2013-11-01 19:26:01 +01:00
|
|
|
|
|
|
|
//#include "debuglog.h"
|
|
|
|
#define LOGINFO(X)
|
|
|
|
#define LOGERR(X)
|
|
|
|
|
|
|
|
/**
|
|
|
|
* A WorkQueue manages the synchronisation around a queue of work items,
|
|
|
|
* where a number of client threads queue tasks and a number of worker
|
|
|
|
* threads take and execute them. The goal is to introduce some level
|
|
|
|
* of parallelism between the successive steps of a previously single
|
|
|
|
* threaded pipeline. For example data extraction / data preparation / index
|
|
|
|
* update, but this could have other uses.
|
|
|
|
*
|
|
|
|
* There is no individual task status return. In case of fatal error,
|
|
|
|
* the client or worker sets an end condition on the queue. A second
|
|
|
|
* queue could conceivably be used for returning individual task
|
|
|
|
* status.
|
|
|
|
*/
|
|
|
|
template <class T>
|
|
|
|
class WorkQueue {
|
|
|
|
// Configuration
|
2014-01-14 10:47:52 +01:00
|
|
|
const std::string name;
|
|
|
|
const size_t high;
|
|
|
|
const size_t low;
|
2013-11-01 19:26:01 +01:00
|
|
|
|
|
|
|
// Status
|
|
|
|
// Worker threads having called exit
|
2014-01-14 10:47:52 +01:00
|
|
|
unsigned n_workers_exited;
|
|
|
|
bool ok;
|
2013-11-01 19:26:01 +01:00
|
|
|
|
2014-01-14 10:47:42 +01:00
|
|
|
std::list<pthread_t> threads;
|
2013-11-01 19:26:01 +01:00
|
|
|
|
|
|
|
// Synchronization
|
2014-01-14 10:47:52 +01:00
|
|
|
std::queue<T> queue;
|
|
|
|
Cond client_cond;
|
|
|
|
Cond worker_cond;
|
|
|
|
Mutex mutex;
|
2013-11-01 19:26:01 +01:00
|
|
|
// Client/Worker threads currently waiting for a job
|
2014-01-14 10:47:52 +01:00
|
|
|
unsigned n_clients_waiting;
|
|
|
|
unsigned n_workers_waiting;
|
2013-11-01 19:26:01 +01:00
|
|
|
|
|
|
|
public:
|
|
|
|
/** Create a WorkQueue
|
|
|
|
* @param name for message printing
|
|
|
|
* @param hi number of tasks on queue before clients blocks. Default 0
|
|
|
|
* meaning no limit. hi == -1 means that the queue is disabled.
|
|
|
|
* @param lo minimum count of tasks before worker starts. Default 1.
|
|
|
|
*/
|
2014-01-14 10:47:52 +01:00
|
|
|
WorkQueue(const char *_name, size_t hi = 0, size_t lo = 1)
|
|
|
|
:name(_name), high(hi), low(lo),
|
|
|
|
n_workers_exited(0),
|
|
|
|
ok(true),
|
|
|
|
n_clients_waiting(0), n_workers_waiting(0)
|
2013-11-01 19:26:01 +01:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
~WorkQueue() {
|
|
|
|
setTerminateAndWait();
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Start the worker threads.
|
|
|
|
*
|
|
|
|
* @param nworkers number of threads copies to start.
|
|
|
|
* @param start_routine thread function. It should loop
|
|
|
|
* taking (QueueWorker::take()) and executing tasks.
|
|
|
|
* @param arg initial parameter to thread function.
|
|
|
|
* @return true if ok.
|
|
|
|
*/
|
|
|
|
bool start(int nworkers, void *(*workproc)(void *), void *arg)
|
|
|
|
{
|
2014-01-14 10:47:52 +01:00
|
|
|
const ScopeLock protect(mutex);
|
2013-11-01 19:26:01 +01:00
|
|
|
|
|
|
|
for (int i = 0; i < nworkers; i++) {
|
|
|
|
int err;
|
|
|
|
pthread_t thr;
|
|
|
|
if ((err = pthread_create(&thr, 0, workproc, arg))) {
|
|
|
|
LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
|
2014-01-14 10:47:52 +01:00
|
|
|
name.c_str(), err));
|
2013-11-01 19:26:01 +01:00
|
|
|
return false;
|
|
|
|
}
|
2014-01-14 10:47:42 +01:00
|
|
|
threads.push_back(thr);
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Add item to work queue, called from client.
|
|
|
|
*
|
|
|
|
* Sleeps if there are already too many.
|
|
|
|
*/
|
|
|
|
bool put(T t)
|
|
|
|
{
|
2014-01-14 10:47:52 +01:00
|
|
|
const ScopeLock protect(mutex);
|
2013-11-01 19:26:01 +01:00
|
|
|
|
2014-01-14 10:47:52 +01:00
|
|
|
if (!IsOK()) {
|
2013-11-01 19:26:01 +01:00
|
|
|
LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n",
|
2014-01-14 10:47:52 +01:00
|
|
|
name.c_str()));
|
2013-11-01 19:26:01 +01:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2014-01-14 10:47:52 +01:00
|
|
|
while (IsOK() && high > 0 && queue.size() >= high) {
|
|
|
|
// Keep the order: we test IsOK() AFTER the sleep...
|
|
|
|
n_clients_waiting++;
|
|
|
|
client_cond.wait(mutex);
|
|
|
|
if (!IsOK()) {
|
|
|
|
n_clients_waiting--;
|
2013-11-01 19:26:01 +01:00
|
|
|
return false;
|
|
|
|
}
|
2014-01-14 10:47:52 +01:00
|
|
|
n_clients_waiting--;
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
|
2014-01-14 10:47:52 +01:00
|
|
|
queue.push(t);
|
|
|
|
if (n_workers_waiting > 0) {
|
2013-11-01 19:26:01 +01:00
|
|
|
// Just wake one worker, there is only one new task.
|
2014-01-14 10:47:52 +01:00
|
|
|
worker_cond.signal();
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Wait until the queue is inactive. Called from client.
|
|
|
|
*
|
|
|
|
* Waits until the task queue is empty and the workers are all
|
|
|
|
* back sleeping. Used by the client to wait for all current work
|
|
|
|
* to be completed, when it needs to perform work that couldn't be
|
|
|
|
* done in parallel with the worker's tasks, or before shutting
|
|
|
|
* down. Work can be resumed after calling this. Note that the
|
|
|
|
* only thread which can call it safely is the client just above
|
|
|
|
* (which can control the task flow), else there could be
|
|
|
|
* tasks in the intermediate queues.
|
|
|
|
* To rephrase: there is no warranty on return that the queue is actually
|
|
|
|
* idle EXCEPT if the caller knows that no jobs are still being created.
|
|
|
|
* It would be possible to transform this into a safe call if some kind
|
|
|
|
* of suspend condition was set on the queue by waitIdle(), to be reset by
|
|
|
|
* some kind of "resume" call. Not currently the case.
|
|
|
|
*/
|
|
|
|
bool waitIdle()
|
|
|
|
{
|
2014-01-14 10:47:52 +01:00
|
|
|
const ScopeLock protect(mutex);
|
2013-11-01 19:26:01 +01:00
|
|
|
|
2014-01-14 10:47:52 +01:00
|
|
|
if (!IsOK()) {
|
2013-11-01 19:26:01 +01:00
|
|
|
LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n",
|
2014-01-14 10:47:52 +01:00
|
|
|
name.c_str()));
|
2013-11-01 19:26:01 +01:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// We're done when the queue is empty AND all workers are back
|
|
|
|
// waiting for a task.
|
2014-01-14 10:47:52 +01:00
|
|
|
while (IsOK() && (queue.size() > 0 ||
|
|
|
|
n_workers_waiting != threads.size())) {
|
|
|
|
n_clients_waiting++;
|
|
|
|
client_cond.wait(mutex);
|
|
|
|
n_clients_waiting--;
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
|
2014-01-14 10:47:52 +01:00
|
|
|
return IsOK();
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/** Tell the workers to exit, and wait for them.
|
|
|
|
*
|
|
|
|
* Does not bother about tasks possibly remaining on the queue, so
|
|
|
|
* should be called after waitIdle() for an orderly shutdown.
|
|
|
|
*/
|
|
|
|
void setTerminateAndWait()
|
|
|
|
{
|
2014-01-14 10:47:52 +01:00
|
|
|
const ScopeLock protect(mutex);
|
2013-11-01 19:26:01 +01:00
|
|
|
|
2014-01-14 10:47:52 +01:00
|
|
|
if (threads.empty())
|
2013-11-01 19:26:01 +01:00
|
|
|
// Already called ?
|
|
|
|
return;
|
|
|
|
|
|
|
|
// Wait for all worker threads to have called workerExit()
|
2014-01-14 10:47:52 +01:00
|
|
|
ok = false;
|
|
|
|
while (n_workers_exited < threads.size()) {
|
|
|
|
worker_cond.broadcast();
|
|
|
|
n_clients_waiting++;
|
|
|
|
client_cond.wait(mutex);
|
|
|
|
n_clients_waiting--;
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Perform the thread joins and compute overall status
|
|
|
|
// Workers return (void*)1 if ok
|
2014-01-14 10:47:52 +01:00
|
|
|
while (!threads.empty()) {
|
2013-11-01 19:26:01 +01:00
|
|
|
void *status;
|
2014-01-14 10:47:42 +01:00
|
|
|
auto thread = threads.front();
|
|
|
|
pthread_join(thread, &status);
|
|
|
|
threads.pop_front();
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Reset to start state.
|
2014-01-14 10:47:52 +01:00
|
|
|
n_workers_exited = n_clients_waiting = n_workers_waiting = 0;
|
|
|
|
ok = true;
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/** Take task from queue. Called from worker.
|
|
|
|
*
|
|
|
|
* Sleeps if there are not enough. Signal if we go to sleep on empty
|
|
|
|
* queue: client may be waiting for our going idle.
|
|
|
|
*/
|
2014-01-14 11:02:04 +01:00
|
|
|
bool take(T &tp)
|
2013-11-01 19:26:01 +01:00
|
|
|
{
|
2014-01-14 10:47:52 +01:00
|
|
|
const ScopeLock protect(mutex);
|
2013-11-01 19:26:01 +01:00
|
|
|
|
2014-01-14 10:47:52 +01:00
|
|
|
if (!IsOK()) {
|
2013-11-01 19:26:01 +01:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2014-01-14 10:47:52 +01:00
|
|
|
while (IsOK() && queue.size() < low) {
|
|
|
|
n_workers_waiting++;
|
|
|
|
if (queue.empty())
|
|
|
|
client_cond.broadcast();
|
|
|
|
worker_cond.wait(mutex);
|
|
|
|
if (!IsOK()) {
|
2013-11-01 19:26:01 +01:00
|
|
|
// !ok is a normal condition when shutting down
|
2014-01-14 10:47:52 +01:00
|
|
|
if (IsOK()) {
|
2013-11-01 19:26:01 +01:00
|
|
|
LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
|
2014-01-14 10:47:52 +01:00
|
|
|
name.c_str()));
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
2014-01-14 10:47:52 +01:00
|
|
|
n_workers_waiting--;
|
2013-11-01 19:26:01 +01:00
|
|
|
return false;
|
|
|
|
}
|
2014-01-14 10:47:52 +01:00
|
|
|
n_workers_waiting--;
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
|
2014-01-14 10:47:52 +01:00
|
|
|
tp = queue.front();
|
|
|
|
queue.pop();
|
|
|
|
if (n_clients_waiting > 0) {
|
2013-11-01 19:26:01 +01:00
|
|
|
// No reason to wake up more than one client thread
|
2014-01-14 10:47:52 +01:00
|
|
|
client_cond.signal();
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Advertise exit and abort queue. Called from worker
|
|
|
|
*
|
|
|
|
* This would happen after an unrecoverable error, or when
|
|
|
|
* the queue is terminated by the client. Workers never exit normally,
|
2014-01-14 10:47:52 +01:00
|
|
|
* except when the queue is shut down (at which point ok is set to
|
2013-11-01 19:26:01 +01:00
|
|
|
* false by the shutdown code anyway). The thread must return/exit
|
|
|
|
* immediately after calling this.
|
|
|
|
*/
|
|
|
|
void workerExit()
|
|
|
|
{
|
2014-01-14 10:47:52 +01:00
|
|
|
const ScopeLock protect(mutex);
|
2013-11-01 19:26:01 +01:00
|
|
|
|
2014-01-14 10:47:52 +01:00
|
|
|
n_workers_exited++;
|
|
|
|
ok = false;
|
|
|
|
client_cond.broadcast();
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
2014-01-14 10:47:52 +01:00
|
|
|
bool IsOK()
|
2013-11-01 19:26:01 +01:00
|
|
|
{
|
2014-01-14 10:47:52 +01:00
|
|
|
return ok && n_workers_exited == 0 && !threads.empty();
|
2013-11-01 19:26:01 +01:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
#endif /* _WORKQUEUE_H_INCLUDED_ */
|