diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx index 4f18782fe..9e749884f 100644 --- a/src/input/AsyncInputStream.cxx +++ b/src/input/AsyncInputStream.cxx @@ -22,20 +22,20 @@ #include "Domain.hxx" #include "tag/Tag.hxx" #include "thread/Cond.hxx" -#include "IOThread.hxx" +#include "event/Loop.hxx" #include #include #include -AsyncInputStream::AsyncInputStream(const char *_url, +AsyncInputStream::AsyncInputStream(EventLoop &event_loop, const char *_url, Mutex &_mutex, Cond &_cond, size_t _buffer_size, size_t _resume_at) :InputStream(_url, _mutex, _cond), - deferred_resume(io_thread_get(), BIND_THIS_METHOD(DeferredResume)), - deferred_seek(io_thread_get(), BIND_THIS_METHOD(DeferredSeek)), + deferred_resume(event_loop, BIND_THIS_METHOD(DeferredResume)), + deferred_seek(event_loop, BIND_THIS_METHOD(DeferredSeek)), allocation(_buffer_size), buffer((uint8_t *)allocation.get(), _buffer_size), resume_at(_resume_at), @@ -61,7 +61,7 @@ AsyncInputStream::SetTag(Tag *_tag) void AsyncInputStream::Pause() { - assert(io_thread_inside()); + assert(GetEventLoop().IsInside()); paused = true; } @@ -69,7 +69,7 @@ AsyncInputStream::Pause() inline void AsyncInputStream::Resume() { - assert(io_thread_inside()); + assert(GetEventLoop().IsInside()); if (paused) { paused = false; @@ -143,7 +143,7 @@ AsyncInputStream::Seek(offset_type new_offset) void AsyncInputStream::SeekDone() { - assert(io_thread_inside()); + assert(GetEventLoop().IsInside()); assert(IsSeekPending()); /* we may have reached end-of-file previously, and the @@ -174,7 +174,7 @@ AsyncInputStream::IsAvailable() size_t AsyncInputStream::Read(void *ptr, size_t read_size) { - assert(!io_thread_inside()); + assert(!GetEventLoop().IsInside()); /* wait for data */ CircularBuffer::Range r; diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx index dad8899ad..ade1ea622 100644 --- a/src/input/AsyncInputStream.hxx +++ b/src/input/AsyncInputStream.hxx @@ -73,7 +73,7 @@ public: * @param _buffer a buffer allocated with HugeAllocate(); the * destructor will free it using HugeFree() */ - AsyncInputStream(const char *_url, + AsyncInputStream(EventLoop &event_loop, const char *_url, Mutex &_mutex, Cond &_cond, size_t _buffer_size, size_t _resume_at); diff --git a/src/input/plugins/AlsaInputPlugin.cxx b/src/input/plugins/AlsaInputPlugin.cxx index 7c37ecec5..c16016c78 100644 --- a/src/input/plugins/AlsaInputPlugin.cxx +++ b/src/input/plugins/AlsaInputPlugin.cxx @@ -76,7 +76,7 @@ public: const char *_uri, Mutex &_mutex, Cond &_cond, const char *_device, snd_pcm_t *_handle, int _frame_size) - :AsyncInputStream(_uri, _mutex, _cond, + :AsyncInputStream(loop, _uri, _mutex, _cond, ALSA_MAX_BUFFERED, ALSA_RESUME_AT), MultiSocketMonitor(loop), DeferredMonitor(loop), diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx index 9c03c8a3e..6fcc75afd 100644 --- a/src/input/plugins/CurlInputPlugin.cxx +++ b/src/input/plugins/CurlInputPlugin.cxx @@ -74,8 +74,9 @@ struct CurlInputStream final : public AsyncInputStream, CurlResponseHandler { /** parser for icy-metadata */ IcyInputStream *icy; - CurlInputStream(const char *_url, Mutex &_mutex, Cond &_cond) - :AsyncInputStream(_url, _mutex, _cond, + CurlInputStream(EventLoop &event_loop, const char *_url, + Mutex &_mutex, Cond &_cond) + :AsyncInputStream(event_loop, _url, _mutex, _cond, CURL_MAX_BUFFERED, CURL_RESUME_AT), icy(new IcyInputStream(this)) { @@ -420,7 +421,8 @@ CurlInputStream::DoSeek(offset_type new_offset) inline InputStream * CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond) { - CurlInputStream *c = new CurlInputStream(url, mutex, cond); + CurlInputStream *c = new CurlInputStream(curl_global->GetEventLoop(), + url, mutex, cond); try { BlockingCall(c->GetEventLoop(), [c](){ diff --git a/src/input/plugins/NfsInputPlugin.cxx b/src/input/plugins/NfsInputPlugin.cxx index 320035a33..7ceb0dce2 100644 --- a/src/input/plugins/NfsInputPlugin.cxx +++ b/src/input/plugins/NfsInputPlugin.cxx @@ -48,7 +48,7 @@ class NfsInputStream final : public AsyncInputStream, NfsFileReader { public: NfsInputStream(const char *_uri, Mutex &_mutex, Cond &_cond) - :AsyncInputStream(_uri, _mutex, _cond, + :AsyncInputStream(io_thread_get(), _uri, _mutex, _cond, NFS_MAX_BUFFERED, NFS_RESUME_AT), NfsFileReader(io_thread_get()),