diff --git a/Makefile.am b/Makefile.am index 61b442b75..cd1311029 100644 --- a/Makefile.am +++ b/Makefile.am @@ -254,7 +254,6 @@ UPNP_SOURCES = \ src/lib/upnp/Callback.hxx \ src/lib/upnp/Util.cxx src/lib/upnp/Util.hxx \ src/lib/upnp/UniqueIxml.hxx \ - src/lib/upnp/WorkQueue.hxx \ src/lib/upnp/Action.hxx ALSA_SOURCES = \ @@ -1784,9 +1783,14 @@ test_DumpDatabase_SOURCES = test/DumpDatabase.cxx \ src/DetachedSong.cxx \ src/TagSave.cxx \ src/SongFilter.cxx +test_DumpDatabase_CPPFLAGS = $(AM_CPPFLAGS) if ENABLE_UPNP -test_DumpDatabase_SOURCES += src/lib/expat/ExpatParser.cxx +test_DumpDatabase_SOURCES += \ + $(CURL_SOURCES) \ + src/lib/expat/ExpatParser.cxx +test_DumpDatabase_CPPFLAGS += $(CURL_CFLAGS) +test_DumpDatabase_LDADD += $(CURL_LIBS) endif test_run_storage_LDADD = \ diff --git a/configure.ac b/configure.ac index cd7f23aa3..afe98f779 100644 --- a/configure.ac +++ b/configure.ac @@ -799,6 +799,9 @@ dnl ---------------------------------- libupnp --------------------------------- MPD_ENABLE_AUTO_PKG_DEPENDS(upnp, UPNP, [libupnp], [UPnP client support], [libupnp not found], [], [enable_database], [Database support is disabled], [ + MPD_DEPENDS([enable_upnp], [enable_curl], + [UPnP client support], + [UPnP requires CURL]) MPD_DEPENDS([enable_upnp], [enable_expat], [UPnP client support], [UPnP requires expat]) diff --git a/src/db/plugins/upnp/UpnpDatabasePlugin.cxx b/src/db/plugins/upnp/UpnpDatabasePlugin.cxx index 669bc37ef..6f30771c5 100644 --- a/src/db/plugins/upnp/UpnpDatabasePlugin.cxx +++ b/src/db/plugins/upnp/UpnpDatabasePlugin.cxx @@ -69,11 +69,14 @@ public: }; class UpnpDatabase : public Database { + EventLoop &event_loop; UpnpClient_Handle handle; UPnPDeviceDirectory *discovery; public: - UpnpDatabase():Database(upnp_db_plugin) {} + explicit UpnpDatabase(EventLoop &_event_loop) + :Database(upnp_db_plugin), + event_loop(_event_loop) {} static Database *Create(EventLoop &main_event_loop, EventLoop &io_event_loop, @@ -140,11 +143,11 @@ private: }; Database * -UpnpDatabase::Create(EventLoop &, EventLoop &, +UpnpDatabase::Create(EventLoop &, EventLoop &io_event_loop, gcc_unused DatabaseListener &listener, const ConfigBlock &) { - return new UpnpDatabase(); + return new UpnpDatabase(io_event_loop); } void @@ -152,7 +155,7 @@ UpnpDatabase::Open() { UpnpClientGlobalInit(handle); - discovery = new UPnPDeviceDirectory(handle); + discovery = new UPnPDeviceDirectory(event_loop, handle); try { discovery->Start(); } catch (...) { diff --git a/src/lib/upnp/Discovery.cxx b/src/lib/upnp/Discovery.cxx index 267c90ff5..675e43239 100644 --- a/src/lib/upnp/Discovery.cxx +++ b/src/lib/upnp/Discovery.cxx @@ -21,6 +21,9 @@ #include "Discovery.hxx" #include "ContentDirectoryService.hxx" #include "Log.hxx" +#include "lib/curl/Global.hxx" +#include "event/Call.hxx" +#include "util/DeleteDisposer.hxx" #include "util/ScopeExit.hxx" #include "util/RuntimeError.hxx" @@ -29,6 +32,74 @@ #include #include +UPnPDeviceDirectory::Downloader::Downloader(UPnPDeviceDirectory &_parent, + const Upnp_Discovery &disco) + :parent(_parent), + id(disco.DeviceId), url(disco.Location), + expires(std::chrono::seconds(disco.Expires)), + request(*parent.curl, url.c_str(), *this) +{ + parent.downloaders.push_back(*this); +} + +void +UPnPDeviceDirectory::Downloader::Start() +{ + auto &event_loop = parent.curl->GetEventLoop(); + + BlockingCall(event_loop, [this](){ + request.Start(); + }); +} + +void +UPnPDeviceDirectory::Downloader::Destroy() +{ + parent.downloaders.erase_and_dispose(parent.downloaders.iterator_to(*this), + DeleteDisposer()); +} + +void +UPnPDeviceDirectory::Downloader::OnHeaders(unsigned status, + std::multimap &&) +{ + if (status != 200) { + Destroy(); + return; + } +} + +void +UPnPDeviceDirectory::Downloader::OnData(ConstBuffer _data) +{ + data.append((const char *)_data.data, _data.size); +} + +void +UPnPDeviceDirectory::Downloader::OnEnd() +{ + AtScopeExit(this) { Destroy(); }; + + ContentDirectoryDescriptor d(std::move(id), + std::chrono::steady_clock::now(), + expires); + + try { + d.Parse(url.c_str(), data.c_str()); + } catch (const std::exception &e) { + LogError(e); + } + + parent.LockAdd(std::move(d)); +} + +void +UPnPDeviceDirectory::Downloader::OnError(std::exception_ptr e) +{ + LogError(e); + Destroy(); +} + // The service type string we are looking for. static constexpr char ContentDirectorySType[] = "urn:schemas-upnp-org:service:ContentDirectory:1"; @@ -106,60 +177,23 @@ UPnPDeviceDirectory::LockRemove(const std::string &id) } } -inline void -UPnPDeviceDirectory::Explore() -{ - for (;;) { - std::unique_ptr tsk; - if (!queue.take(tsk)) { - queue.workerExit(); - return; - } - - // Device signals its existence and well-being. Perform the - // UPnP "description" phase by downloading and decoding the - // description document. - char *buf; - // LINE_SIZE is defined by libupnp's upnp.h... - char contentType[LINE_SIZE]; - int code = UpnpDownloadUrlItem(tsk->url.c_str(), &buf, contentType); - if (code != UPNP_E_SUCCESS) { - continue; - } - - AtScopeExit(buf){ free(buf); }; - - // Update or insert the device - ContentDirectoryDescriptor d(std::move(tsk->device_id), - std::chrono::steady_clock::now(), - tsk->expires); - - try { - d.Parse(tsk->url, buf); - } catch (const std::exception &e) { - LogError(e); - } - - LockAdd(std::move(d)); - } -} - -void * -UPnPDeviceDirectory::Explore(void *ctx) -{ - UPnPDeviceDirectory &directory = *(UPnPDeviceDirectory *)ctx; - directory.Explore(); - return (void*)1; -} - inline int UPnPDeviceDirectory::OnAlive(Upnp_Discovery *disco) { if (isMSDevice(disco->DeviceType) || isCDService(disco->ServiceType)) { - DiscoveredTask *tp = new DiscoveredTask(disco); - if (queue.put(tp)) - return UPNP_E_FINISH; + auto *downloader = new Downloader(*this, *disco); + + try { + downloader->Start(); + } catch (...) { + BlockingCall(curl->GetEventLoop(), [downloader](){ + downloader->Destroy(); + }); + + LogError(std::current_exception()); + return UPNP_E_SUCCESS; + } } return UPNP_E_SUCCESS; @@ -226,25 +260,26 @@ UPnPDeviceDirectory::ExpireDevices() Search(); } -UPnPDeviceDirectory::UPnPDeviceDirectory(UpnpClient_Handle _handle, +UPnPDeviceDirectory::UPnPDeviceDirectory(EventLoop &event_loop, + UpnpClient_Handle _handle, UPnPDiscoveryListener *_listener) - :handle(_handle), - listener(_listener), - queue("DiscoveredQueue") + :curl(event_loop), handle(_handle), + listener(_listener) { } UPnPDeviceDirectory::~UPnPDeviceDirectory() { /* this destructor exists here just so it won't get inlined */ + + BlockingCall(curl->GetEventLoop(), [this](){ + downloaders.clear_and_dispose(DeleteDisposer()); + }); } void UPnPDeviceDirectory::Start() { - if (!queue.start(1, Explore, this)) - throw std::runtime_error("Discover work queue start failed"); - Search(); } diff --git a/src/lib/upnp/Discovery.hxx b/src/lib/upnp/Discovery.hxx index 131ffb3c6..e61f1e399 100644 --- a/src/lib/upnp/Discovery.hxx +++ b/src/lib/upnp/Discovery.hxx @@ -22,12 +22,16 @@ #include "Callback.hxx" #include "Device.hxx" -#include "WorkQueue.hxx" +#include "lib/curl/Init.hxx" +#include "lib/curl/Handler.hxx" +#include "lib/curl/Request.hxx" #include "thread/Mutex.hxx" #include "Compiler.h" #include +#include + #include #include #include @@ -49,22 +53,6 @@ public: * for now, but this could be made more general, by removing the filtering. */ class UPnPDeviceDirectory final : UpnpCallback { - /** - * Each appropriate discovery event (executing in a libupnp thread - * context) queues the following task object for processing by the - * discovery thread. - */ - struct DiscoveredTask { - std::string url; - std::string device_id; - std::chrono::steady_clock::duration expires; - - DiscoveredTask(const Upnp_Discovery *disco) - :url(disco->Location), - device_id(disco->DeviceId), - expires(std::chrono::seconds(disco->Expires)) {} - }; - /** * Descriptor for one device having a Content Directory * service found on the network. @@ -93,12 +81,47 @@ class UPnPDeviceDirectory final : UpnpCallback { } }; + class Downloader final + : public boost::intrusive::list_base_hook>, + CurlResponseHandler { + + UPnPDeviceDirectory &parent; + + std::string id; + const std::string url; + const std::chrono::steady_clock::duration expires; + + CurlRequest request; + + std::string data; + + public: + Downloader(UPnPDeviceDirectory &_parent, + const Upnp_Discovery &disco); + + void Start(); + void Destroy(); + + private: + /* virtual methods from CurlResponseHandler */ + void OnHeaders(unsigned status, + std::multimap &&headers) override; + void OnData(ConstBuffer data) override; + void OnEnd() override; + void OnError(std::exception_ptr e) override; + }; + + CurlInit curl; + const UpnpClient_Handle handle; UPnPDiscoveryListener *const listener; Mutex mutex; + + boost::intrusive::list> downloaders; + std::list directories; - WorkQueue> queue; /** * The UPnP device search timeout, which should actually be @@ -113,7 +136,7 @@ class UPnPDeviceDirectory final : UpnpCallback { std::chrono::steady_clock::time_point last_search = std::chrono::steady_clock::time_point(); public: - UPnPDeviceDirectory(UpnpClient_Handle _handle, + UPnPDeviceDirectory(EventLoop &event_loop, UpnpClient_Handle _handle, UPnPDiscoveryListener *_listener=nullptr); ~UPnPDeviceDirectory(); @@ -145,14 +168,6 @@ private: void LockAdd(ContentDirectoryDescriptor &&d); void LockRemove(const std::string &id); - /** - * Worker routine for the discovery queue. Get messages about - * devices appearing and disappearing, and update the - * directory pool accordingly. - */ - static void *Explore(void *); - void Explore(); - int OnAlive(Upnp_Discovery *disco); int OnByeBye(Upnp_Discovery *disco); diff --git a/src/lib/upnp/WorkQueue.hxx b/src/lib/upnp/WorkQueue.hxx deleted file mode 100644 index b503e9956..000000000 --- a/src/lib/upnp/WorkQueue.hxx +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright 2003-2017 The Music Player Daemon Project - * http://www.musicpd.org - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#ifndef _WORKQUEUE_H_INCLUDED_ -#define _WORKQUEUE_H_INCLUDED_ - -#include "thread/Mutex.hxx" -#include "thread/Cond.hxx" - -#include -#include - -#include -#include - -#define LOGINFO(X) -#define LOGERR(X) - -/** - * A WorkQueue manages the synchronisation around a queue of work items, - * where a number of client threads queue tasks and a number of worker - * threads take and execute them. The goal is to introduce some level - * of parallelism between the successive steps of a previously single - * threaded pipeline. For example data extraction / data preparation / index - * update, but this could have other uses. - * - * There is no individual task status return. In case of fatal error, - * the client or worker sets an end condition on the queue. A second - * queue could conceivably be used for returning individual task - * status. - */ -template -class WorkQueue { - // Configuration - const std::string name; - - // Status - // Worker threads having called exit - unsigned n_workers_exited = 0; - bool ok = false; - - unsigned n_threads = 0; - pthread_t *threads = nullptr; - - // Synchronization - std::queue queue; - Cond client_cond; - Cond worker_cond; - Mutex mutex; - -public: - /** Create a WorkQueue - * @param _name for message printing - */ - explicit WorkQueue(const char *_name) - :name(_name) - { - } - - ~WorkQueue() { - setTerminateAndWait(); - } - - WorkQueue(const WorkQueue &) = delete; - WorkQueue &operator=(const WorkQueue &) = delete; - - /** Start the worker threads. - * - * @param nworkers number of threads copies to start. - * @param workproc thread function. It should loop - * taking (QueueWorker::take()) and executing tasks. - * @param arg initial parameter to thread function. - * @return true if ok. - */ - bool start(unsigned nworkers, void *(*workproc)(void *), void *arg) - { - const std::lock_guard protect(mutex); - - assert(nworkers > 0); - assert(!ok); - assert(n_threads == 0); - assert(threads == nullptr); - - ok = true; - n_threads = nworkers; - threads = new pthread_t[n_threads]; - - for (unsigned i = 0; i < nworkers; i++) { - int err; - if ((err = pthread_create(&threads[i], 0, workproc, arg))) { - LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n", - name.c_str(), err)); - return false; - } - } - - return true; - } - - /** Add item to work queue, called from client. - * - * Sleeps if there are already too many. - */ - template - bool put(U &&u) - { - const std::lock_guard protect(mutex); - - queue.emplace(std::forward(u)); - - // Just wake one worker, there is only one new task. - worker_cond.signal(); - - return true; - } - - - /** Tell the workers to exit, and wait for them. - */ - void setTerminateAndWait() - { - const std::lock_guard protect(mutex); - - // Wait for all worker threads to have called workerExit() - ok = false; - while (n_workers_exited < n_threads) { - worker_cond.broadcast(); - client_cond.wait(mutex); - } - - // Perform the thread joins and compute overall status - // Workers return (void*)1 if ok - for (unsigned i = 0; i < n_threads; ++i) { - void *status; - pthread_join(threads[i], &status); - } - - delete[] threads; - threads = nullptr; - n_threads = 0; - - // Reset to start state. - n_workers_exited = 0; - } - - /** Take task from queue. Called from worker. - * - * Sleeps if there are not enough. Signal if we go to sleep on empty - * queue: client may be waiting for our going idle. - */ - bool take(T &tp) - { - const std::lock_guard protect(mutex); - - if (!ok) - return false; - - while (queue.empty()) { - worker_cond.wait(mutex); - if (!ok) - return false; - } - - tp = std::move(queue.front()); - queue.pop(); - return true; - } - - /** Advertise exit and abort queue. Called from worker - * - * This would happen after an unrecoverable error, or when - * the queue is terminated by the client. Workers never exit normally, - * except when the queue is shut down (at which point ok is set to - * false by the shutdown code anyway). The thread must return/exit - * immediately after calling this. - */ - void workerExit() - { - const std::lock_guard protect(mutex); - - n_workers_exited++; - ok = false; - client_cond.broadcast(); - } -}; - -#endif /* _WORKQUEUE_H_INCLUDED_ */ diff --git a/src/neighbor/plugins/UpnpNeighborPlugin.cxx b/src/neighbor/plugins/UpnpNeighborPlugin.cxx index 34e14514f..559d87221 100644 --- a/src/neighbor/plugins/UpnpNeighborPlugin.cxx +++ b/src/neighbor/plugins/UpnpNeighborPlugin.cxx @@ -53,11 +53,14 @@ class UpnpNeighborExplorer final } }; + EventLoop &event_loop; + UPnPDeviceDirectory *discovery; public: - UpnpNeighborExplorer(NeighborListener &_listener) - :NeighborExplorer(_listener) {} + UpnpNeighborExplorer(EventLoop &_event_loop, + NeighborListener &_listener) + :NeighborExplorer(_listener), event_loop(_event_loop) {} /* virtual methods from class NeighborExplorer */ void Open() override; @@ -76,7 +79,7 @@ UpnpNeighborExplorer::Open() UpnpClient_Handle handle; UpnpClientGlobalInit(handle); - discovery = new UPnPDeviceDirectory(handle, this); + discovery = new UPnPDeviceDirectory(event_loop, handle, this); try { discovery->Start(); @@ -126,11 +129,11 @@ UpnpNeighborExplorer::LostUPnP(const ContentDirectoryService &service) } static NeighborExplorer * -upnp_neighbor_create(gcc_unused EventLoop &loop, +upnp_neighbor_create(EventLoop &event_loop, NeighborListener &listener, gcc_unused const ConfigBlock &block) { - return new UpnpNeighborExplorer(listener); + return new UpnpNeighborExplorer(event_loop, listener); } const NeighborPlugin upnp_neighbor_plugin = {