input/async: add method GetEventLoop()

This commit is contained in:
Max Kellermann 2017-01-25 23:15:52 +01:00
parent 611ce6e756
commit ecbad638f1
2 changed files with 11 additions and 6 deletions

View File

@ -80,6 +80,10 @@ public:
virtual ~AsyncInputStream(); virtual ~AsyncInputStream();
EventLoop &GetEventLoop() {
return deferred_resume.GetEventLoop();
}
/* virtual methods from InputStream */ /* virtual methods from InputStream */
void Check() final; void Check() final;
bool IsEOF() final; bool IsEOF() final;

View File

@ -31,6 +31,7 @@
#include "config/Block.hxx" #include "config/Block.hxx"
#include "tag/TagBuilder.hxx" #include "tag/TagBuilder.hxx"
#include "event/Call.hxx" #include "event/Call.hxx"
#include "event/Loop.hxx"
#include "IOThread.hxx" #include "IOThread.hxx"
#include "util/ASCII.hxx" #include "util/ASCII.hxx"
#include "util/StringUtil.hxx" #include "util/StringUtil.hxx"
@ -137,7 +138,7 @@ static constexpr Domain curl_domain("curl");
void void
CurlInputStream::DoResume() CurlInputStream::DoResume()
{ {
assert(io_thread_inside()); assert(GetEventLoop().IsInside());
mutex.unlock(); mutex.unlock();
request->Resume(); request->Resume();
@ -147,7 +148,7 @@ CurlInputStream::DoResume()
void void
CurlInputStream::FreeEasy() CurlInputStream::FreeEasy()
{ {
assert(io_thread_inside()); assert(GetEventLoop().IsInside());
if (request == nullptr) if (request == nullptr)
return; return;
@ -161,7 +162,7 @@ CurlInputStream::FreeEasy()
void void
CurlInputStream::FreeEasyIndirect() CurlInputStream::FreeEasyIndirect()
{ {
BlockingCall(io_thread_get(), [this](){ BlockingCall(GetEventLoop(), [this](){
FreeEasy(); FreeEasy();
curl_global->InvalidateSockets(); curl_global->InvalidateSockets();
}); });
@ -171,7 +172,7 @@ void
CurlInputStream::OnHeaders(unsigned status, CurlInputStream::OnHeaders(unsigned status,
std::multimap<std::string, std::string> &&headers) std::multimap<std::string, std::string> &&headers)
{ {
assert(io_thread_inside()); assert(GetEventLoop().IsInside());
assert(!postponed_exception); assert(!postponed_exception);
if (status < 200 || status >= 300) if (status < 200 || status >= 300)
@ -410,7 +411,7 @@ CurlInputStream::DoSeek(offset_type new_offset)
const ScopeUnlock unlock(mutex); const ScopeUnlock unlock(mutex);
BlockingCall(io_thread_get(), [this, new_offset](){ BlockingCall(GetEventLoop(), [this, new_offset](){
SeekInternal(new_offset); SeekInternal(new_offset);
}); });
} }
@ -421,7 +422,7 @@ CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond)
CurlInputStream *c = new CurlInputStream(url, mutex, cond); CurlInputStream *c = new CurlInputStream(url, mutex, cond);
try { try {
BlockingCall(io_thread_get(), [c](){ BlockingCall(c->GetEventLoop(), [c](){
c->InitEasy(); c->InitEasy();
}); });
} catch (...) { } catch (...) {