/* * Copyright 2003-2019 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "BufferingInputStream.hxx" #include "thread/Cond.hxx" #include "thread/Name.hxx" #include BufferingInputStream::BufferingInputStream(InputStreamPtr _input) :input(std::move(_input)), mutex(input->mutex), thread(BIND_THIS_METHOD(RunThread)), buffer(input->GetSize()) { input->SetHandler(this); thread.Start(); } BufferingInputStream::~BufferingInputStream() noexcept { { const std::lock_guard lock(mutex); stop = true; wake_cond.notify_one(); } thread.Join(); } void BufferingInputStream::Check() { if (read_error) std::rethrow_exception(read_error); if (input) input->Check(); } void BufferingInputStream::Seek(std::unique_lock &lock, size_t new_offset) { if (new_offset >= size()) { offset = new_offset; return; } auto r = buffer.Read(new_offset); if (r.HasData()) { /* nice, we already have some data at the desired offset and this method call is a no-op */ offset = new_offset; return; } seek_offset = new_offset; seek = true; wake_cond.notify_one(); client_cond.wait(lock, [this]{ return !seek; }); if (seek_error) std::rethrow_exception(std::exchange(seek_error, {})); offset = new_offset; } bool BufferingInputStream::IsAvailable() noexcept { return offset == size() || buffer.Read(offset).HasData(); } size_t BufferingInputStream::Read(std::unique_lock &lock, void *ptr, size_t s) { if (offset >= size()) return 0; while (true) { auto r = buffer.Read(offset); if (r.HasData()) { /* yay, we have some data */ size_t nbytes = std::min(s, r.defined_buffer.size); memcpy(ptr, r.defined_buffer.data, nbytes); offset += nbytes; if (!IsAvailable()) { /* wake up the sleeping thread */ idle = false; wake_cond.notify_one(); } return nbytes; } if (read_error) { wake_cond.notify_one(); std::rethrow_exception(std::exchange(read_error, {})); } if (idle) { /* wake up the sleeping thread */ idle = false; wake_cond.notify_one(); } client_cond.wait(lock); } } size_t BufferingInputStream::FindFirstHole() const noexcept { auto r = buffer.Read(0); if (r.undefined_size > 0) /* a hole at the beginning */ return 0; if (r.defined_buffer.size < size()) /* a hole in the middle */ return r.defined_buffer.size; /* the file has been read completely */ return INVALID_OFFSET; } void BufferingInputStream::RunThread() noexcept { SetThreadName("buffering"); std::unique_lock lock(mutex); while (!stop) { if (seek) { try { input->Seek(lock, seek_offset); } catch (...) { seek_error = std::current_exception(); } idle = false; seek = false; read_error = {}; client_cond.notify_one(); } else if (read_error || idle) { /* wait for client to consume the read error */ wake_cond.wait(lock); } else if (offset != input->GetOffset() && !IsAvailable()) { /* a past Seek() call was a no-op because data was already available at that position, but now we've reached a new position where there is no more data in the buffer, and our input is reading somewhere else (maybe stuck at the end of the file); to find a way out, we now seek our input to our reading position to be able to fill our buffer */ try { input->Seek(lock, offset); } catch (...) { /* this is really a seek error, but we register it as a read_error, because seek_error is only checked by Seek(), and at our frontend (our own InputStream interface) is in "read" mode */ read_error = std::current_exception(); client_cond.notify_one(); OnBufferAvailable(); } } else if (input->IsEOF()) { /* our input has reached its end: prepare reading the first remaining hole */ size_t new_offset = FindFirstHole(); if (new_offset == INVALID_OFFSET) { /* the file has been read completely */ break; } /* seek to the first hole */ try { input->Seek(lock, new_offset); } catch (...) { read_error = std::current_exception(); client_cond.notify_one(); OnBufferAvailable(); } } else if (input->IsAvailable()) { const auto read_offset = input->GetOffset(); auto w = buffer.Write(read_offset); if (w.empty()) { if (IsAvailable()) { /* we still have enough data for the next Read() - sleep until we need more data */ idle = true; } else { /* we need more data at our current position, because the next Read() will stall - seek our input to our offset to prepare filling the buffer from there */ try { input->Seek(lock, offset); } catch (...) { read_error = std::current_exception(); client_cond.notify_one(); OnBufferAvailable(); } } continue; } try { size_t nbytes = input->Read(lock, w.data, w.size); buffer.Commit(read_offset, read_offset + nbytes); } catch (...) { read_error = std::current_exception(); } client_cond.notify_one(); OnBufferAvailable(); } else wake_cond.wait(lock); } /* clear the "input" attribute while holding the mutex */ auto _input = std::move(input); /* the mutex must be unlocked while an InputStream can be destructed */ lock.unlock(); /* and now actually destruct the InputStream */ _input.reset(); }