lib/upnp/Discovery: use CURL instead of UpnpDownloadUrlItem()

We can do CURL requests asynchronously, and we don't need a
synchronous WorkQueue thread for that.

This allows parallelizing lookups and allows immediate cancellation.
This commit is contained in:
Max Kellermann 2017-01-26 09:34:53 +01:00
parent 28a2d41b85
commit c8f7a859ea
7 changed files with 157 additions and 297 deletions

View File

@ -254,7 +254,6 @@ UPNP_SOURCES = \
src/lib/upnp/Callback.hxx \ src/lib/upnp/Callback.hxx \
src/lib/upnp/Util.cxx src/lib/upnp/Util.hxx \ src/lib/upnp/Util.cxx src/lib/upnp/Util.hxx \
src/lib/upnp/UniqueIxml.hxx \ src/lib/upnp/UniqueIxml.hxx \
src/lib/upnp/WorkQueue.hxx \
src/lib/upnp/Action.hxx src/lib/upnp/Action.hxx
ALSA_SOURCES = \ ALSA_SOURCES = \
@ -1784,9 +1783,14 @@ test_DumpDatabase_SOURCES = test/DumpDatabase.cxx \
src/DetachedSong.cxx \ src/DetachedSong.cxx \
src/TagSave.cxx \ src/TagSave.cxx \
src/SongFilter.cxx src/SongFilter.cxx
test_DumpDatabase_CPPFLAGS = $(AM_CPPFLAGS)
if ENABLE_UPNP if ENABLE_UPNP
test_DumpDatabase_SOURCES += src/lib/expat/ExpatParser.cxx test_DumpDatabase_SOURCES += \
$(CURL_SOURCES) \
src/lib/expat/ExpatParser.cxx
test_DumpDatabase_CPPFLAGS += $(CURL_CFLAGS)
test_DumpDatabase_LDADD += $(CURL_LIBS)
endif endif
test_run_storage_LDADD = \ test_run_storage_LDADD = \

View File

@ -799,6 +799,9 @@ dnl ---------------------------------- libupnp ---------------------------------
MPD_ENABLE_AUTO_PKG_DEPENDS(upnp, UPNP, [libupnp], MPD_ENABLE_AUTO_PKG_DEPENDS(upnp, UPNP, [libupnp],
[UPnP client support], [libupnp not found], [], [UPnP client support], [libupnp not found], [],
[enable_database], [Database support is disabled], [ [enable_database], [Database support is disabled], [
MPD_DEPENDS([enable_upnp], [enable_curl],
[UPnP client support],
[UPnP requires CURL])
MPD_DEPENDS([enable_upnp], [enable_expat], MPD_DEPENDS([enable_upnp], [enable_expat],
[UPnP client support], [UPnP client support],
[UPnP requires expat]) [UPnP requires expat])

View File

@ -69,11 +69,14 @@ public:
}; };
class UpnpDatabase : public Database { class UpnpDatabase : public Database {
EventLoop &event_loop;
UpnpClient_Handle handle; UpnpClient_Handle handle;
UPnPDeviceDirectory *discovery; UPnPDeviceDirectory *discovery;
public: public:
UpnpDatabase():Database(upnp_db_plugin) {} explicit UpnpDatabase(EventLoop &_event_loop)
:Database(upnp_db_plugin),
event_loop(_event_loop) {}
static Database *Create(EventLoop &main_event_loop, static Database *Create(EventLoop &main_event_loop,
EventLoop &io_event_loop, EventLoop &io_event_loop,
@ -140,11 +143,11 @@ private:
}; };
Database * Database *
UpnpDatabase::Create(EventLoop &, EventLoop &, UpnpDatabase::Create(EventLoop &, EventLoop &io_event_loop,
gcc_unused DatabaseListener &listener, gcc_unused DatabaseListener &listener,
const ConfigBlock &) const ConfigBlock &)
{ {
return new UpnpDatabase(); return new UpnpDatabase(io_event_loop);
} }
void void
@ -152,7 +155,7 @@ UpnpDatabase::Open()
{ {
UpnpClientGlobalInit(handle); UpnpClientGlobalInit(handle);
discovery = new UPnPDeviceDirectory(handle); discovery = new UPnPDeviceDirectory(event_loop, handle);
try { try {
discovery->Start(); discovery->Start();
} catch (...) { } catch (...) {

View File

@ -21,6 +21,9 @@
#include "Discovery.hxx" #include "Discovery.hxx"
#include "ContentDirectoryService.hxx" #include "ContentDirectoryService.hxx"
#include "Log.hxx" #include "Log.hxx"
#include "lib/curl/Global.hxx"
#include "event/Call.hxx"
#include "util/DeleteDisposer.hxx"
#include "util/ScopeExit.hxx" #include "util/ScopeExit.hxx"
#include "util/RuntimeError.hxx" #include "util/RuntimeError.hxx"
@ -29,6 +32,74 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
UPnPDeviceDirectory::Downloader::Downloader(UPnPDeviceDirectory &_parent,
const Upnp_Discovery &disco)
:parent(_parent),
id(disco.DeviceId), url(disco.Location),
expires(std::chrono::seconds(disco.Expires)),
request(*parent.curl, url.c_str(), *this)
{
parent.downloaders.push_back(*this);
}
void
UPnPDeviceDirectory::Downloader::Start()
{
auto &event_loop = parent.curl->GetEventLoop();
BlockingCall(event_loop, [this](){
request.Start();
});
}
void
UPnPDeviceDirectory::Downloader::Destroy()
{
parent.downloaders.erase_and_dispose(parent.downloaders.iterator_to(*this),
DeleteDisposer());
}
void
UPnPDeviceDirectory::Downloader::OnHeaders(unsigned status,
std::multimap<std::string, std::string> &&)
{
if (status != 200) {
Destroy();
return;
}
}
void
UPnPDeviceDirectory::Downloader::OnData(ConstBuffer<void> _data)
{
data.append((const char *)_data.data, _data.size);
}
void
UPnPDeviceDirectory::Downloader::OnEnd()
{
AtScopeExit(this) { Destroy(); };
ContentDirectoryDescriptor d(std::move(id),
std::chrono::steady_clock::now(),
expires);
try {
d.Parse(url.c_str(), data.c_str());
} catch (const std::exception &e) {
LogError(e);
}
parent.LockAdd(std::move(d));
}
void
UPnPDeviceDirectory::Downloader::OnError(std::exception_ptr e)
{
LogError(e);
Destroy();
}
// The service type string we are looking for. // The service type string we are looking for.
static constexpr char ContentDirectorySType[] = "urn:schemas-upnp-org:service:ContentDirectory:1"; static constexpr char ContentDirectorySType[] = "urn:schemas-upnp-org:service:ContentDirectory:1";
@ -106,60 +177,23 @@ UPnPDeviceDirectory::LockRemove(const std::string &id)
} }
} }
inline void
UPnPDeviceDirectory::Explore()
{
for (;;) {
std::unique_ptr<DiscoveredTask> tsk;
if (!queue.take(tsk)) {
queue.workerExit();
return;
}
// Device signals its existence and well-being. Perform the
// UPnP "description" phase by downloading and decoding the
// description document.
char *buf;
// LINE_SIZE is defined by libupnp's upnp.h...
char contentType[LINE_SIZE];
int code = UpnpDownloadUrlItem(tsk->url.c_str(), &buf, contentType);
if (code != UPNP_E_SUCCESS) {
continue;
}
AtScopeExit(buf){ free(buf); };
// Update or insert the device
ContentDirectoryDescriptor d(std::move(tsk->device_id),
std::chrono::steady_clock::now(),
tsk->expires);
try {
d.Parse(tsk->url, buf);
} catch (const std::exception &e) {
LogError(e);
}
LockAdd(std::move(d));
}
}
void *
UPnPDeviceDirectory::Explore(void *ctx)
{
UPnPDeviceDirectory &directory = *(UPnPDeviceDirectory *)ctx;
directory.Explore();
return (void*)1;
}
inline int inline int
UPnPDeviceDirectory::OnAlive(Upnp_Discovery *disco) UPnPDeviceDirectory::OnAlive(Upnp_Discovery *disco)
{ {
if (isMSDevice(disco->DeviceType) || if (isMSDevice(disco->DeviceType) ||
isCDService(disco->ServiceType)) { isCDService(disco->ServiceType)) {
DiscoveredTask *tp = new DiscoveredTask(disco); auto *downloader = new Downloader(*this, *disco);
if (queue.put(tp))
return UPNP_E_FINISH; try {
downloader->Start();
} catch (...) {
BlockingCall(curl->GetEventLoop(), [downloader](){
downloader->Destroy();
});
LogError(std::current_exception());
return UPNP_E_SUCCESS;
}
} }
return UPNP_E_SUCCESS; return UPNP_E_SUCCESS;
@ -226,25 +260,26 @@ UPnPDeviceDirectory::ExpireDevices()
Search(); Search();
} }
UPnPDeviceDirectory::UPnPDeviceDirectory(UpnpClient_Handle _handle, UPnPDeviceDirectory::UPnPDeviceDirectory(EventLoop &event_loop,
UpnpClient_Handle _handle,
UPnPDiscoveryListener *_listener) UPnPDiscoveryListener *_listener)
:handle(_handle), :curl(event_loop), handle(_handle),
listener(_listener), listener(_listener)
queue("DiscoveredQueue")
{ {
} }
UPnPDeviceDirectory::~UPnPDeviceDirectory() UPnPDeviceDirectory::~UPnPDeviceDirectory()
{ {
/* this destructor exists here just so it won't get inlined */ /* this destructor exists here just so it won't get inlined */
BlockingCall(curl->GetEventLoop(), [this](){
downloaders.clear_and_dispose(DeleteDisposer());
});
} }
void void
UPnPDeviceDirectory::Start() UPnPDeviceDirectory::Start()
{ {
if (!queue.start(1, Explore, this))
throw std::runtime_error("Discover work queue start failed");
Search(); Search();
} }

View File

@ -22,12 +22,16 @@
#include "Callback.hxx" #include "Callback.hxx"
#include "Device.hxx" #include "Device.hxx"
#include "WorkQueue.hxx" #include "lib/curl/Init.hxx"
#include "lib/curl/Handler.hxx"
#include "lib/curl/Request.hxx"
#include "thread/Mutex.hxx" #include "thread/Mutex.hxx"
#include "Compiler.h" #include "Compiler.h"
#include <upnp/upnp.h> #include <upnp/upnp.h>
#include <boost/intrusive/list.hpp>
#include <list> #include <list>
#include <vector> #include <vector>
#include <string> #include <string>
@ -49,22 +53,6 @@ public:
* for now, but this could be made more general, by removing the filtering. * for now, but this could be made more general, by removing the filtering.
*/ */
class UPnPDeviceDirectory final : UpnpCallback { class UPnPDeviceDirectory final : UpnpCallback {
/**
* Each appropriate discovery event (executing in a libupnp thread
* context) queues the following task object for processing by the
* discovery thread.
*/
struct DiscoveredTask {
std::string url;
std::string device_id;
std::chrono::steady_clock::duration expires;
DiscoveredTask(const Upnp_Discovery *disco)
:url(disco->Location),
device_id(disco->DeviceId),
expires(std::chrono::seconds(disco->Expires)) {}
};
/** /**
* Descriptor for one device having a Content Directory * Descriptor for one device having a Content Directory
* service found on the network. * service found on the network.
@ -93,12 +81,47 @@ class UPnPDeviceDirectory final : UpnpCallback {
} }
}; };
class Downloader final
: public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::normal_link>>,
CurlResponseHandler {
UPnPDeviceDirectory &parent;
std::string id;
const std::string url;
const std::chrono::steady_clock::duration expires;
CurlRequest request;
std::string data;
public:
Downloader(UPnPDeviceDirectory &_parent,
const Upnp_Discovery &disco);
void Start();
void Destroy();
private:
/* virtual methods from CurlResponseHandler */
void OnHeaders(unsigned status,
std::multimap<std::string, std::string> &&headers) override;
void OnData(ConstBuffer<void> data) override;
void OnEnd() override;
void OnError(std::exception_ptr e) override;
};
CurlInit curl;
const UpnpClient_Handle handle; const UpnpClient_Handle handle;
UPnPDiscoveryListener *const listener; UPnPDiscoveryListener *const listener;
Mutex mutex; Mutex mutex;
boost::intrusive::list<Downloader,
boost::intrusive::constant_time_size<false>> downloaders;
std::list<ContentDirectoryDescriptor> directories; std::list<ContentDirectoryDescriptor> directories;
WorkQueue<std::unique_ptr<DiscoveredTask>> queue;
/** /**
* The UPnP device search timeout, which should actually be * The UPnP device search timeout, which should actually be
@ -113,7 +136,7 @@ class UPnPDeviceDirectory final : UpnpCallback {
std::chrono::steady_clock::time_point last_search = std::chrono::steady_clock::time_point(); std::chrono::steady_clock::time_point last_search = std::chrono::steady_clock::time_point();
public: public:
UPnPDeviceDirectory(UpnpClient_Handle _handle, UPnPDeviceDirectory(EventLoop &event_loop, UpnpClient_Handle _handle,
UPnPDiscoveryListener *_listener=nullptr); UPnPDiscoveryListener *_listener=nullptr);
~UPnPDeviceDirectory(); ~UPnPDeviceDirectory();
@ -145,14 +168,6 @@ private:
void LockAdd(ContentDirectoryDescriptor &&d); void LockAdd(ContentDirectoryDescriptor &&d);
void LockRemove(const std::string &id); void LockRemove(const std::string &id);
/**
* Worker routine for the discovery queue. Get messages about
* devices appearing and disappearing, and update the
* directory pool accordingly.
*/
static void *Explore(void *);
void Explore();
int OnAlive(Upnp_Discovery *disco); int OnAlive(Upnp_Discovery *disco);
int OnByeBye(Upnp_Discovery *disco); int OnByeBye(Upnp_Discovery *disco);

View File

@ -1,203 +0,0 @@
/*
* Copyright 2003-2017 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _WORKQUEUE_H_INCLUDED_
#define _WORKQUEUE_H_INCLUDED_
#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"
#include <assert.h>
#include <pthread.h>
#include <string>
#include <queue>
#define LOGINFO(X)
#define LOGERR(X)
/**
* A WorkQueue manages the synchronisation around a queue of work items,
* where a number of client threads queue tasks and a number of worker
* threads take and execute them. The goal is to introduce some level
* of parallelism between the successive steps of a previously single
* threaded pipeline. For example data extraction / data preparation / index
* update, but this could have other uses.
*
* There is no individual task status return. In case of fatal error,
* the client or worker sets an end condition on the queue. A second
* queue could conceivably be used for returning individual task
* status.
*/
template <class T>
class WorkQueue {
// Configuration
const std::string name;
// Status
// Worker threads having called exit
unsigned n_workers_exited = 0;
bool ok = false;
unsigned n_threads = 0;
pthread_t *threads = nullptr;
// Synchronization
std::queue<T> queue;
Cond client_cond;
Cond worker_cond;
Mutex mutex;
public:
/** Create a WorkQueue
* @param _name for message printing
*/
explicit WorkQueue(const char *_name)
:name(_name)
{
}
~WorkQueue() {
setTerminateAndWait();
}
WorkQueue(const WorkQueue &) = delete;
WorkQueue &operator=(const WorkQueue &) = delete;
/** Start the worker threads.
*
* @param nworkers number of threads copies to start.
* @param workproc thread function. It should loop
* taking (QueueWorker::take()) and executing tasks.
* @param arg initial parameter to thread function.
* @return true if ok.
*/
bool start(unsigned nworkers, void *(*workproc)(void *), void *arg)
{
const std::lock_guard<Mutex> protect(mutex);
assert(nworkers > 0);
assert(!ok);
assert(n_threads == 0);
assert(threads == nullptr);
ok = true;
n_threads = nworkers;
threads = new pthread_t[n_threads];
for (unsigned i = 0; i < nworkers; i++) {
int err;
if ((err = pthread_create(&threads[i], 0, workproc, arg))) {
LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
name.c_str(), err));
return false;
}
}
return true;
}
/** Add item to work queue, called from client.
*
* Sleeps if there are already too many.
*/
template<typename U>
bool put(U &&u)
{
const std::lock_guard<Mutex> protect(mutex);
queue.emplace(std::forward<U>(u));
// Just wake one worker, there is only one new task.
worker_cond.signal();
return true;
}
/** Tell the workers to exit, and wait for them.
*/
void setTerminateAndWait()
{
const std::lock_guard<Mutex> protect(mutex);
// Wait for all worker threads to have called workerExit()
ok = false;
while (n_workers_exited < n_threads) {
worker_cond.broadcast();
client_cond.wait(mutex);
}
// Perform the thread joins and compute overall status
// Workers return (void*)1 if ok
for (unsigned i = 0; i < n_threads; ++i) {
void *status;
pthread_join(threads[i], &status);
}
delete[] threads;
threads = nullptr;
n_threads = 0;
// Reset to start state.
n_workers_exited = 0;
}
/** Take task from queue. Called from worker.
*
* Sleeps if there are not enough. Signal if we go to sleep on empty
* queue: client may be waiting for our going idle.
*/
bool take(T &tp)
{
const std::lock_guard<Mutex> protect(mutex);
if (!ok)
return false;
while (queue.empty()) {
worker_cond.wait(mutex);
if (!ok)
return false;
}
tp = std::move(queue.front());
queue.pop();
return true;
}
/** Advertise exit and abort queue. Called from worker
*
* This would happen after an unrecoverable error, or when
* the queue is terminated by the client. Workers never exit normally,
* except when the queue is shut down (at which point ok is set to
* false by the shutdown code anyway). The thread must return/exit
* immediately after calling this.
*/
void workerExit()
{
const std::lock_guard<Mutex> protect(mutex);
n_workers_exited++;
ok = false;
client_cond.broadcast();
}
};
#endif /* _WORKQUEUE_H_INCLUDED_ */

View File

@ -53,11 +53,14 @@ class UpnpNeighborExplorer final
} }
}; };
EventLoop &event_loop;
UPnPDeviceDirectory *discovery; UPnPDeviceDirectory *discovery;
public: public:
UpnpNeighborExplorer(NeighborListener &_listener) UpnpNeighborExplorer(EventLoop &_event_loop,
:NeighborExplorer(_listener) {} NeighborListener &_listener)
:NeighborExplorer(_listener), event_loop(_event_loop) {}
/* virtual methods from class NeighborExplorer */ /* virtual methods from class NeighborExplorer */
void Open() override; void Open() override;
@ -76,7 +79,7 @@ UpnpNeighborExplorer::Open()
UpnpClient_Handle handle; UpnpClient_Handle handle;
UpnpClientGlobalInit(handle); UpnpClientGlobalInit(handle);
discovery = new UPnPDeviceDirectory(handle, this); discovery = new UPnPDeviceDirectory(event_loop, handle, this);
try { try {
discovery->Start(); discovery->Start();
@ -126,11 +129,11 @@ UpnpNeighborExplorer::LostUPnP(const ContentDirectoryService &service)
} }
static NeighborExplorer * static NeighborExplorer *
upnp_neighbor_create(gcc_unused EventLoop &loop, upnp_neighbor_create(EventLoop &event_loop,
NeighborListener &listener, NeighborListener &listener,
gcc_unused const ConfigBlock &block) gcc_unused const ConfigBlock &block)
{ {
return new UpnpNeighborExplorer(listener); return new UpnpNeighborExplorer(event_loop, listener);
} }
const NeighborPlugin upnp_neighbor_plugin = { const NeighborPlugin upnp_neighbor_plugin = {