output/httpd/Page: use std::shared_ptr instead of class RefCount

This commit is contained in:
Max Kellermann 2017-02-19 20:12:30 +01:00
parent 8b1931072a
commit 26e4a40cc7
8 changed files with 42 additions and 139 deletions

View File

@ -33,16 +33,6 @@
HttpdClient::~HttpdClient()
{
if (state == RESPONSE) {
if (current_page != nullptr)
current_page->Unref();
ClearQueue();
}
if (metadata)
metadata->Unref();
if (IsDefined())
BufferedSocket::Close();
}
@ -217,15 +207,13 @@ HttpdClient::ClearQueue()
assert(state == RESPONSE);
while (!pages.empty()) {
Page *page = pages.front();
pages.pop();
#ifndef NDEBUG
auto &page = pages.front();
assert(queue_size >= page->GetSize());
queue_size -= page->GetSize();
#endif
page->Unref();
pages.pop();
}
assert(queue_size == 0);
@ -372,8 +360,7 @@ HttpdClient::TryWrite()
metadata_fill += nbytes;
if (current_position >= current_page->GetSize()) {
current_page->Unref();
current_page = nullptr;
current_page.reset();
if (pages.empty())
/* all pages are sent: remove the
@ -386,7 +373,7 @@ HttpdClient::TryWrite()
}
void
HttpdClient::PushPage(Page *page)
HttpdClient::PushPage(PagePtr page)
{
if (state != RESPONSE)
/* the client is still writing the HTTP request */
@ -398,25 +385,18 @@ HttpdClient::PushPage(Page *page)
ClearQueue();
}
page->Ref();
pages.push(page);
queue_size += page->GetSize();
pages.emplace(std::move(page));
ScheduleWrite();
}
void
HttpdClient::PushMetaData(Page *page)
HttpdClient::PushMetaData(PagePtr page)
{
assert(page != nullptr);
if (metadata) {
metadata->Unref();
metadata = nullptr;
}
page->Ref();
metadata = page;
metadata = std::move(page);
metadata_sent = false;
}

View File

@ -20,6 +20,7 @@
#ifndef MPD_OUTPUT_HTTPD_CLIENT_HXX
#define MPD_OUTPUT_HTTPD_CLIENT_HXX
#include "Page.hxx"
#include "event/BufferedSocket.hxx"
#include "Compiler.h"
@ -32,7 +33,6 @@
#include <stddef.h>
class HttpdOutput;
class Page;
class HttpdClient final
: BufferedSocket,
@ -59,7 +59,7 @@ class HttpdClient final
/**
* A queue of #Page objects to be sent to the client.
*/
std::queue<Page *, std::list<Page *>> pages;
std::queue<PagePtr, std::list<PagePtr>> pages;
/**
* The sum of all page sizes in #pages.
@ -69,7 +69,7 @@ class HttpdClient final
/**
* The #page which is currently being sent to the client.
*/
Page *current_page;
PagePtr current_page;
/**
* The amount of bytes which were already sent from
@ -113,7 +113,7 @@ class HttpdClient final
/**
* The metadata as #Page which is currently being sent to the client.
*/
Page *metadata;
PagePtr metadata;
/*
* The amount of bytes which were already sent from the metadata.
@ -178,12 +178,12 @@ public:
/**
* Appends a page to the client's queue.
*/
void PushPage(Page *page);
void PushPage(PagePtr page);
/**
* Sends the passed metadata.
*/
void PushMetaData(Page *page);
void PushMetaData(PagePtr page);
private:
void ClearQueue();

View File

@ -44,7 +44,6 @@ struct ConfigBlock;
class EventLoop;
class ServerSocket;
class HttpdClient;
class Page;
class PreparedEncoder;
class Encoder;
struct Tag;
@ -102,12 +101,12 @@ private:
/**
* The header page, which is sent to every client on connect.
*/
Page *header;
PagePtr header;
/**
* The metadata, which is sent to every client.
*/
Page *metadata;
PagePtr metadata;
/**
* The page queue, i.e. pages from the encoder to be
@ -115,7 +114,7 @@ private:
* pass pages from the OutputThread to the IOThread. It is
* protected by #mutex, and removing signals #cond.
*/
std::queue<Page *, std::list<Page *>> pages;
std::queue<PagePtr, std::list<PagePtr>> pages;
public:
/**
@ -234,14 +233,14 @@ public:
* Reads data from the encoder (as much as available) and
* returns it as a new #page object.
*/
Page *ReadPage();
PagePtr ReadPage();
/**
* Broadcasts a page struct to all clients.
*
* Mutext must not be locked.
*/
void BroadcastPage(Page *page);
void BroadcastPage(PagePtr page);
/**
* Broadcasts data from the encoder to all clients.

View File

@ -90,9 +90,6 @@ HttpdOutput::HttpdOutput(EventLoop &_loop, const ConfigBlock &block)
HttpdOutput::~HttpdOutput()
{
if (metadata != nullptr)
metadata->Unref();
delete prepared_encoder;
}
@ -147,13 +144,11 @@ HttpdOutput::RunDeferred()
const std::lock_guard<Mutex> protect(mutex);
while (!pages.empty()) {
Page *page = pages.front();
PagePtr page = std::move(pages.front());
pages.pop();
for (auto &client : clients)
client.PushPage(page);
page->Unref();
}
/* wake up the client that may be waiting for the queue to be
@ -204,7 +199,7 @@ HttpdOutput::OnAccept(int fd, SocketAddress address, gcc_unused int uid)
}
}
Page *
PagePtr
HttpdOutput::ReadPage()
{
if (unflushed_input >= 65536) {
@ -235,7 +230,7 @@ HttpdOutput::ReadPage()
if (size == 0)
return nullptr;
return Page::Copy(buffer, size);
return std::make_shared<Page>(buffer, size);
}
inline void
@ -282,8 +277,7 @@ HttpdOutput::Close()
clients.clear_and_dispose(DeleteDisposer());
});
if (header != nullptr)
header->Unref();
header.reset();
delete encoder;
}
@ -326,13 +320,12 @@ HttpdOutput::Delay() const
}
void
HttpdOutput::BroadcastPage(Page *page)
HttpdOutput::BroadcastPage(PagePtr page)
{
assert(page != nullptr);
mutex.lock();
pages.push(page);
page->Ref();
pages.emplace(std::move(page));
mutex.unlock();
DeferredMonitor::Schedule();
@ -346,7 +339,7 @@ HttpdOutput::BroadcastFromEncoder()
while (!pages.empty())
cond.wait(mutex);
Page *page;
PagePtr page;
while ((page = ReadPage()) != nullptr)
pages.push(page);
@ -418,19 +411,14 @@ HttpdOutput::SendTag(const Tag &tag)
used as the new "header" page, which is sent to all
new clients */
Page *page = ReadPage();
auto page = ReadPage();
if (page != nullptr) {
if (header != nullptr)
header->Unref();
header = page;
header = std::move(page);
BroadcastPage(page);
}
} else {
/* use Icy-Metadata */
if (metadata != nullptr)
metadata->Unref();
static constexpr TagType types[] = {
TAG_ALBUM, TAG_ARTIST, TAG_TITLE,
TAG_NUM_OF_ITEM_TYPES
@ -451,9 +439,8 @@ HttpdOutput::CancelAllClients()
const std::lock_guard<Mutex> protect(mutex);
while (!pages.empty()) {
Page *page = pages.front();
PagePtr page = std::move(pages.front());
pages.pop();
page->Unref();
}
for (auto &client : clients)

View File

@ -19,7 +19,6 @@
#include "config.h"
#include "IcyMetaDataServer.hxx"
#include "Page.hxx"
#include "tag/Tag.hxx"
#include "util/FormatString.hxx"
#include "util/AllocatedString.hxx"
@ -82,7 +81,7 @@ icy_server_metadata_string(const char *stream_title, const char* stream_url)
return icy_metadata;
}
Page *
PagePtr
icy_server_metadata_page(const Tag &tag, const TagType *types)
{
const char *tag_items[TAG_NUM_OF_ITEM_TYPES];
@ -113,5 +112,6 @@ icy_server_metadata_page(const Tag &tag, const TagType *types)
if (icy_string.IsNull())
return nullptr;
return Page::Copy(icy_string.c_str(), uint8_t(icy_string[0]) * 16 + 1);
return std::make_shared<Page>(icy_string.c_str(),
uint8_t(icy_string[0]) * 16 + 1);
}

View File

@ -20,10 +20,10 @@
#ifndef MPD_ICY_META_DATA_SERVER_HXX
#define MPD_ICY_META_DATA_SERVER_HXX
#include "Page.hxx"
#include "tag/Type.h"
struct Tag;
class Page;
template<typename T> class AllocatedString;
AllocatedString<char>
@ -31,7 +31,7 @@ icy_server_metadata_header(const char *name,
const char *genre, const char *url,
const char *content_type, int metaint);
Page *
PagePtr
icy_server_metadata_page(const Tag &tag, const TagType *types);
#endif

View File

@ -24,29 +24,8 @@
#include <string.h>
#include <stdlib.h>
Page *
Page::Create(size_t size)
Page::Page(const void *data, size_t size)
:buffer(size)
{
return new Page(size);
}
Page *
Page::Copy(const void *data, size_t size)
{
assert(data != nullptr);
Page *page = Create(size);
memcpy(&page->buffer.front(), data, size);
return page;
}
bool
Page::Unref()
{
bool unused = ref.Decrement();
if (unused)
delete this;
return unused;
memcpy(&buffer.front(), data, size);
}

View File

@ -17,17 +17,13 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
/** \file
*
* This is a library which manages reference counted buffers.
*/
#ifndef MPD_PAGE_HXX
#define MPD_PAGE_HXX
#include "util/RefCount.hxx"
#include "util/AllocatedArray.hxx"
#include <memory>
#include <stddef.h>
#include <stdint.h>
@ -37,54 +33,14 @@
* instances hold references to one buffer.
*/
class Page {
/**
* The number of references to this buffer. This library uses
* atomic functions to access it, i.e. no locks are required.
* As soon as this attribute reaches zero, the buffer is
* freed.
*/
RefCount ref;
AllocatedArray<uint8_t> buffer;
protected:
public:
explicit Page(size_t _size):buffer(_size) {}
explicit Page(AllocatedArray<uint8_t> &&_buffer)
:buffer(std::move(_buffer)) {}
~Page() = default;
/**
* Allocates a new #Page object, without filling the data
* element.
*/
static Page *Create(size_t size);
public:
/**
* Creates a new #page object, and copies data from the
* specified buffer. It is initialized with a reference count
* of 1.
*
* @param data the source buffer
* @param size the size of the source buffer
*/
static Page *Copy(const void *data, size_t size);
/**
* Increases the reference counter.
*/
void Ref() {
ref.Increment();
}
/**
* Decreases the reference counter. If it reaches zero, the #page is
* freed.
*
* @return true if the #page has been freed
*/
bool Unref();
Page(const void *data, size_t size);
size_t GetSize() const {
return buffer.size();
@ -95,4 +51,6 @@ public:
}
};
typedef std::shared_ptr<Page> PagePtr;
#endif