From 6d0d8cf9cf07a5d0905f715be9a79f5de86e8b37 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Mon, 1 Jan 2018 19:22:45 +0100 Subject: [PATCH] filter/Filter: add virtual method Flush() This will be used by filters which have internal buffers which need to be flushed at the end, e.g. the "soxr" resampler. --- Makefile.am | 2 +- src/filter/Filter.cxx | 27 +++++++++++++ src/filter/Filter.hxx | 6 +++ src/filter/Observer.cxx | 4 ++ src/filter/plugins/ChainFilterPlugin.cxx | 49 +++++++++++++++++++++--- src/output/Source.cxx | 8 ++++ src/output/Source.hxx | 5 +++ src/output/Thread.cxx | 31 +++++++++++++++ test/run_convert.cxx | 9 +++++ 9 files changed, 134 insertions(+), 7 deletions(-) create mode 100644 src/filter/Filter.cxx diff --git a/Makefile.am b/Makefile.am index 42611701d..15c829861 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1644,7 +1644,7 @@ libfilter_api_a_SOURCES = \ src/filter/Observer.cxx src/filter/Observer.hxx \ src/filter/FilterPlugin.hxx \ src/filter/Prepared.hxx \ - src/filter/Filter.hxx + src/filter/Filter.cxx src/filter/Filter.hxx libfilter_plugins_a_SOURCES = \ src/AudioCompress/config.h \ diff --git a/src/filter/Filter.cxx b/src/filter/Filter.cxx new file mode 100644 index 000000000..6a291cde9 --- /dev/null +++ b/src/filter/Filter.cxx @@ -0,0 +1,27 @@ +/* + * Copyright 2003-2017 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "Filter.hxx" +#include "util/ConstBuffer.hxx" + +ConstBuffer +Filter::Flush() +{ + return nullptr; +} diff --git a/src/filter/Filter.hxx b/src/filter/Filter.hxx index 6af002adc..7d714df79 100644 --- a/src/filter/Filter.hxx +++ b/src/filter/Filter.hxx @@ -62,6 +62,12 @@ public: * or Reset() call) */ virtual ConstBuffer FilterPCM(ConstBuffer src) = 0; + + /** + * Flush pending data and return it. This should be called + * repepatedly until it returns nullptr. + */ + virtual ConstBuffer Flush(); }; #endif diff --git a/src/filter/Observer.cxx b/src/filter/Observer.cxx index d51d6329e..a56efc70f 100644 --- a/src/filter/Observer.cxx +++ b/src/filter/Observer.cxx @@ -75,6 +75,10 @@ public: ConstBuffer FilterPCM(ConstBuffer src) override { return filter->FilterPCM(src); } + + ConstBuffer Flush() noexcept override { + return filter->Flush(); + } }; Filter * diff --git a/src/filter/plugins/ChainFilterPlugin.cxx b/src/filter/plugins/ChainFilterPlugin.cxx index 6fa273729..d95ae3768 100644 --- a/src/filter/plugins/ChainFilterPlugin.cxx +++ b/src/filter/plugins/ChainFilterPlugin.cxx @@ -43,6 +43,11 @@ class ChainFilter final : public Filter { std::list children; + /** + * The child which will be flushed in the next Flush() call. + */ + std::list::iterator flushing = children.end(); + public: explicit ChainFilter(AudioFormat _audio_format) :Filter(_audio_format) {} @@ -54,11 +59,20 @@ public: assert(out_audio_format.IsValid()); children.emplace_back(name, std::move(filter)); + + RewindFlush(); } /* virtual methods from class Filter */ void Reset() noexcept override; ConstBuffer FilterPCM(ConstBuffer src) override; + ConstBuffer Flush() override; + +private: + void RewindFlush() { + flushing = children.begin(); + } + }; class PreparedChainFilter final : public PreparedFilter { @@ -118,21 +132,44 @@ PreparedChainFilter::Open(AudioFormat &in_audio_format) void ChainFilter::Reset() noexcept { + RewindFlush(); + for (auto &child : children) child.filter->Reset(); } +template +static ConstBuffer +ApplyFilterChain(I begin, I end, ConstBuffer src) +{ + for (auto i = begin; i != end; ++i) + /* feed the output of the previous filter as input + into the current one */ + src = i->filter->FilterPCM(src); + + return src; +} + ConstBuffer ChainFilter::FilterPCM(ConstBuffer src) { - for (auto &child : children) { - /* feed the output of the previous filter as input - into the current one */ - src = child.filter->FilterPCM(src); - } + RewindFlush(); /* return the output of the last filter */ - return src; + return ApplyFilterChain(children.begin(), children.end(), src); +} + +ConstBuffer +ChainFilter::Flush() +{ + for (auto end = children.end(); flushing != end; ++flushing) { + auto data = flushing->filter->Flush(); + if (!data.IsNull()) + return ApplyFilterChain(std::next(flushing), end, + data); + } + + return nullptr; } std::unique_ptr diff --git a/src/output/Source.cxx b/src/output/Source.cxx index d6c110a38..fdaff77f2 100644 --- a/src/output/Source.cxx +++ b/src/output/Source.cxx @@ -244,3 +244,11 @@ AudioOutputSource::ConsumeData(size_t nbytes) noexcept if (pending_data.empty()) pipe.Consume(*std::exchange(current_chunk, nullptr)); } + +ConstBuffer +AudioOutputSource::Flush() +{ + return filter + ? filter->Flush() + : nullptr; +} diff --git a/src/output/Source.hxx b/src/output/Source.hxx index 1b26a3378..435ce52b2 100644 --- a/src/output/Source.hxx +++ b/src/output/Source.hxx @@ -195,6 +195,11 @@ public: pipe.ClearTail(chunk); } + /** + * Wrapper for Filter::Flush(). + */ + ConstBuffer Flush(); + private: void OpenFilter(AudioFormat audio_format, PreparedFilter *prepared_replay_gain_filter, diff --git a/src/output/Thread.cxx b/src/output/Thread.cxx index ac382864a..bad9e0a73 100644 --- a/src/output/Thread.cxx +++ b/src/output/Thread.cxx @@ -362,11 +362,42 @@ AudioOutputControl::InternalPause() noexcept skip_delay = true; } +static void +PlayFull(FilteredAudioOutput &output, ConstBuffer _buffer) +{ + auto buffer = ConstBuffer::FromVoid(_buffer); + + while (!buffer.empty()) { + size_t nbytes = output.Play(buffer.data, buffer.size); + assert(nbytes > 0); + + buffer.skip_front(nbytes); + } + +} + inline void AudioOutputControl::InternalDrain() noexcept { const ScopeUnlock unlock(mutex); + try { + /* flush the filter and play its remaining output */ + + while (true) { + auto buffer = source.Flush(); + if (buffer.IsNull()) + break; + + PlayFull(*output, buffer); + } + } catch (...) { + FormatError(std::current_exception(), + "Failed to flush filter on %s", GetLogName()); + InternalCloseError(std::current_exception()); + return; + } + output->Drain(); } diff --git a/test/run_convert.cxx b/test/run_convert.cxx index 1d11e44d0..64ca98bcc 100644 --- a/test/run_convert.cxx +++ b/test/run_convert.cxx @@ -86,6 +86,15 @@ try { output.size); } + while (true) { + auto output = state.Flush(); + if (output.IsNull()) + break; + + gcc_unused ssize_t ignored = write(1, output.data, + output.size); + } + state.Close(); return EXIT_SUCCESS;