event/FullyBufferedSocket: try to write without extra roundtrip
Postpone the write using IdleMonitor instead of scheduling a write event. This reduces the number of system calls, because we don't need to register and unregister the write event in epoll.
This commit is contained in:
parent
5b213b0504
commit
422b8472fe
@ -42,7 +42,8 @@ FullyBufferedSocket::DirectWrite(const void *data, size_t length)
|
|||||||
if (IsSocketErrorAgain(code))
|
if (IsSocketErrorAgain(code))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
Cancel();
|
IdleMonitor::Cancel();
|
||||||
|
BufferedSocket::Cancel();
|
||||||
|
|
||||||
if (IsSocketErrorClosed(code))
|
if (IsSocketErrorClosed(code))
|
||||||
OnSocketClosed();
|
OnSocketClosed();
|
||||||
@ -61,6 +62,7 @@ FullyBufferedSocket::Flush()
|
|||||||
size_t length;
|
size_t length;
|
||||||
const void *data = output.Read(&length);
|
const void *data = output.Read(&length);
|
||||||
if (data == nullptr) {
|
if (data == nullptr) {
|
||||||
|
IdleMonitor::Cancel();
|
||||||
CancelWrite();
|
CancelWrite();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -71,8 +73,10 @@ FullyBufferedSocket::Flush()
|
|||||||
|
|
||||||
output.Consume(nbytes);
|
output.Consume(nbytes);
|
||||||
|
|
||||||
if (output.IsEmpty())
|
if (output.IsEmpty()) {
|
||||||
|
IdleMonitor::Cancel();
|
||||||
CancelWrite();
|
CancelWrite();
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -82,6 +86,9 @@ FullyBufferedSocket::Write(const void *data, size_t length)
|
|||||||
{
|
{
|
||||||
assert(IsDefined());
|
assert(IsDefined());
|
||||||
|
|
||||||
|
if (length == 0)
|
||||||
|
return true;
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
/* TODO: disabled because this would add overhead on some callers (the ones that often), but it may be useful */
|
/* TODO: disabled because this would add overhead on some callers (the ones that often), but it may be useful */
|
||||||
|
|
||||||
@ -98,6 +105,8 @@ FullyBufferedSocket::Write(const void *data, size_t length)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
const bool was_empty = output.IsEmpty();
|
||||||
|
|
||||||
if (!output.Append(data, length)) {
|
if (!output.Append(data, length)) {
|
||||||
// TODO
|
// TODO
|
||||||
static constexpr Domain buffered_socket_domain("buffered_socket");
|
static constexpr Domain buffered_socket_domain("buffered_socket");
|
||||||
@ -107,30 +116,31 @@ FullyBufferedSocket::Write(const void *data, size_t length)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ScheduleWrite();
|
if (was_empty)
|
||||||
|
IdleMonitor::Schedule();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
FullyBufferedSocket::OnSocketReady(unsigned flags)
|
FullyBufferedSocket::OnSocketReady(unsigned flags)
|
||||||
{
|
{
|
||||||
const bool was_empty = output.IsEmpty();
|
|
||||||
if (!BufferedSocket::OnSocketReady(flags))
|
|
||||||
return false;
|
|
||||||
|
|
||||||
if (was_empty && !output.IsEmpty())
|
|
||||||
/* just in case the OnSocketInput() method has added
|
|
||||||
data to the output buffer: try to send it now
|
|
||||||
instead of waiting for the next event loop
|
|
||||||
iteration */
|
|
||||||
flags |= WRITE;
|
|
||||||
|
|
||||||
if (flags & WRITE) {
|
if (flags & WRITE) {
|
||||||
assert(!output.IsEmpty());
|
assert(!output.IsEmpty());
|
||||||
|
assert(!IdleMonitor::IsActive());
|
||||||
|
|
||||||
if (!Flush())
|
if (!Flush())
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!BufferedSocket::OnSocketReady(flags))
|
||||||
|
return false;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
FullyBufferedSocket::OnIdle()
|
||||||
|
{
|
||||||
|
if (Flush() && !output.IsEmpty())
|
||||||
|
ScheduleWrite();
|
||||||
|
}
|
||||||
|
@ -22,24 +22,29 @@
|
|||||||
|
|
||||||
#include "check.h"
|
#include "check.h"
|
||||||
#include "BufferedSocket.hxx"
|
#include "BufferedSocket.hxx"
|
||||||
|
#include "IdleMonitor.hxx"
|
||||||
#include "util/PeakBuffer.hxx"
|
#include "util/PeakBuffer.hxx"
|
||||||
#include "Compiler.h"
|
#include "Compiler.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A #BufferedSocket specialization that adds an output buffer.
|
* A #BufferedSocket specialization that adds an output buffer.
|
||||||
*/
|
*/
|
||||||
class FullyBufferedSocket : protected BufferedSocket {
|
class FullyBufferedSocket : protected BufferedSocket, private IdleMonitor {
|
||||||
PeakBuffer output;
|
PeakBuffer output;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
FullyBufferedSocket(int _fd, EventLoop &_loop,
|
FullyBufferedSocket(int _fd, EventLoop &_loop,
|
||||||
size_t normal_size, size_t peak_size=0)
|
size_t normal_size, size_t peak_size=0)
|
||||||
:BufferedSocket(_fd, _loop),
|
:BufferedSocket(_fd, _loop), IdleMonitor(_loop),
|
||||||
output(normal_size, peak_size) {
|
output(normal_size, peak_size) {
|
||||||
}
|
}
|
||||||
|
|
||||||
using BufferedSocket::IsDefined;
|
using BufferedSocket::IsDefined;
|
||||||
using BufferedSocket::Close;
|
|
||||||
|
void Close() {
|
||||||
|
IdleMonitor::Cancel();
|
||||||
|
BufferedSocket::Close();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ssize_t DirectWrite(const void *data, size_t length);
|
ssize_t DirectWrite(const void *data, size_t length);
|
||||||
@ -58,6 +63,7 @@ protected:
|
|||||||
bool Write(const void *data, size_t length);
|
bool Write(const void *data, size_t length);
|
||||||
|
|
||||||
virtual bool OnSocketReady(unsigned flags) override;
|
virtual bool OnSocketReady(unsigned flags) override;
|
||||||
|
virtual void OnIdle() override;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
Loading…
Reference in New Issue
Block a user