input/BufferedInputStream: new wrapper for moving plugin to thread
This commit is contained in:
		| @@ -1024,6 +1024,7 @@ libinput_a_SOURCES = \ | ||||
| 	src/input/InputStream.cxx src/input/InputStream.hxx \ | ||||
| 	src/input/InputPlugin.hxx \ | ||||
| 	src/input/TextInputStream.cxx src/input/TextInputStream.hxx \ | ||||
| 	src/input/ThreadInputStream.cxx src/input/ThreadInputStream.hxx \ | ||||
| 	src/input/plugins/RewindInputPlugin.cxx src/input/plugins/RewindInputPlugin.hxx \ | ||||
| 	src/input/plugins/FileInputPlugin.cxx src/input/plugins/FileInputPlugin.hxx | ||||
|  | ||||
|   | ||||
							
								
								
									
										204
									
								
								src/input/ThreadInputStream.cxx
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										204
									
								
								src/input/ThreadInputStream.cxx
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,204 @@ | ||||
| /* | ||||
|  * Copyright (C) 2003-2014 The Music Player Daemon Project | ||||
|  * http://www.musicpd.org | ||||
|  * | ||||
|  * This program is free software; you can redistribute it and/or modify | ||||
|  * it under the terms of the GNU General Public License as published by | ||||
|  * the Free Software Foundation; either version 2 of the License, or | ||||
|  * (at your option) any later version. | ||||
|  * | ||||
|  * This program is distributed in the hope that it will be useful, | ||||
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|  * GNU General Public License for more details. | ||||
|  * | ||||
|  * You should have received a copy of the GNU General Public License along | ||||
|  * with this program; if not, write to the Free Software Foundation, Inc., | ||||
|  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||||
|  */ | ||||
|  | ||||
| #include "config.h" | ||||
| #include "ThreadInputStream.hxx" | ||||
| #include "InputPlugin.hxx" | ||||
| #include "thread/Name.hxx" | ||||
| #include "util/CircularBuffer.hxx" | ||||
| #include "util/HugeAllocator.hxx" | ||||
|  | ||||
| #include <assert.h> | ||||
| #include <string.h> | ||||
|  | ||||
| ThreadInputStream::~ThreadInputStream() | ||||
| { | ||||
| 	if (buffer != nullptr) { | ||||
| 		buffer->Clear(); | ||||
| 		HugeFree(buffer->Write().data, buffer_size); | ||||
| 		delete buffer; | ||||
| 	} | ||||
| } | ||||
|  | ||||
| InputStream * | ||||
| ThreadInputStream::Start(Error &error) | ||||
| { | ||||
| 	assert(buffer == nullptr); | ||||
|  | ||||
| 	void *p = HugeAllocate(buffer_size); | ||||
| 	if (p == nullptr) { | ||||
| 		error.SetErrno(); | ||||
| 		return nullptr; | ||||
| 	} | ||||
|  | ||||
| 	buffer = new CircularBuffer<uint8_t>((uint8_t *)p, buffer_size); | ||||
|  | ||||
| 	if (!thread.Start(ThreadFunc, this, error)) | ||||
| 		return nullptr; | ||||
|  | ||||
| 	return &base; | ||||
| } | ||||
|  | ||||
| inline void | ||||
| ThreadInputStream::ThreadFunc() | ||||
| { | ||||
| 	FormatThreadName("input:%s", base.plugin.name); | ||||
|  | ||||
| 	base.mutex.lock(); | ||||
| 	if (!Open(postponed_error)) { | ||||
| 		base.cond.broadcast(); | ||||
| 		base.mutex.unlock(); | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	/* we're ready, tell it to our client */ | ||||
| 	base.ready = true; | ||||
| 	base.cond.broadcast(); | ||||
|  | ||||
| 	while (!close) { | ||||
| 		assert(!postponed_error.IsDefined()); | ||||
|  | ||||
| 		auto w = buffer->Write(); | ||||
| 		if (w.IsEmpty()) { | ||||
| 			wake_cond.wait(base.mutex); | ||||
| 		} else { | ||||
| 			base.mutex.unlock(); | ||||
|  | ||||
| 			Error error; | ||||
| 			size_t nbytes = Read(w.data, w.size, error); | ||||
|  | ||||
| 			base.mutex.lock(); | ||||
| 			base.cond.broadcast(); | ||||
|  | ||||
| 			if (nbytes == 0) { | ||||
| 				eof = true; | ||||
| 				postponed_error = std::move(error); | ||||
| 				break; | ||||
| 			} | ||||
|  | ||||
| 			buffer->Append(nbytes); | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	base.mutex.unlock(); | ||||
|  | ||||
| 	Close(); | ||||
| } | ||||
|  | ||||
| void | ||||
| ThreadInputStream::ThreadFunc(void *ctx) | ||||
| { | ||||
| 	ThreadInputStream &tis = *(ThreadInputStream *)ctx; | ||||
| 	tis.ThreadFunc(); | ||||
| } | ||||
|  | ||||
| inline bool | ||||
| ThreadInputStream::Check2(Error &error) | ||||
| { | ||||
| 	if (postponed_error.IsDefined()) { | ||||
| 		error = std::move(postponed_error); | ||||
| 		return false; | ||||
| 	} | ||||
|  | ||||
| 	return true; | ||||
| } | ||||
|  | ||||
| bool | ||||
| ThreadInputStream::Check(InputStream *is, Error &error) | ||||
| { | ||||
| 	return Cast(is)->Check2(error); | ||||
| } | ||||
|  | ||||
| inline bool | ||||
| ThreadInputStream::Available2() | ||||
| { | ||||
| 	return !buffer->IsEmpty() || eof || postponed_error.IsDefined(); | ||||
| } | ||||
|  | ||||
| bool | ||||
| ThreadInputStream::Available(InputStream *is) | ||||
| { | ||||
| 	return Cast(is)->Available2(); | ||||
| } | ||||
|  | ||||
| inline size_t | ||||
| ThreadInputStream::Read2(void *ptr, size_t size, Error &error) | ||||
| { | ||||
| 	while (true) { | ||||
| 		if (postponed_error.IsDefined()) { | ||||
| 			error = std::move(postponed_error); | ||||
| 			return 0; | ||||
| 		} | ||||
|  | ||||
| 		auto r = buffer->Read(); | ||||
| 		if (!r.IsEmpty()) { | ||||
| 			size_t nbytes = std::min(size, r.size); | ||||
| 			memcpy(ptr, r.data, nbytes); | ||||
| 			buffer->Consume(nbytes); | ||||
| 			wake_cond.broadcast(); | ||||
| 			base.offset += nbytes; | ||||
| 			return nbytes; | ||||
| 		} | ||||
|  | ||||
| 		if (eof) | ||||
| 			return 0; | ||||
|  | ||||
| 		base.cond.wait(base.mutex); | ||||
| 	} | ||||
| } | ||||
|  | ||||
| size_t | ||||
| ThreadInputStream::Read(InputStream *is, void *ptr, size_t size, | ||||
| 			Error &error) | ||||
| { | ||||
| 	return Cast(is)->Read2(ptr, size, error); | ||||
| } | ||||
|  | ||||
| inline void | ||||
| ThreadInputStream::Close2() | ||||
| { | ||||
| 	base.mutex.lock(); | ||||
| 	close = true; | ||||
| 	wake_cond.signal(); | ||||
| 	base.mutex.unlock(); | ||||
|  | ||||
| 	Cancel(); | ||||
|  | ||||
| 	thread.Join(); | ||||
|  | ||||
| 	delete this; | ||||
| } | ||||
|  | ||||
| void | ||||
| ThreadInputStream::Close(InputStream *is) | ||||
| { | ||||
| 	Cast(is)->Close2(); | ||||
| } | ||||
|  | ||||
| inline bool | ||||
| ThreadInputStream::IsEOF2() | ||||
| { | ||||
| 	return eof; | ||||
| } | ||||
|  | ||||
| bool | ||||
| ThreadInputStream::IsEOF(InputStream *is) | ||||
| { | ||||
| 	return Cast(is)->IsEOF2(); | ||||
| } | ||||
							
								
								
									
										181
									
								
								src/input/ThreadInputStream.hxx
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										181
									
								
								src/input/ThreadInputStream.hxx
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,181 @@ | ||||
| /* | ||||
|  * Copyright (C) 2003-2014 The Music Player Daemon Project | ||||
|  * http://www.musicpd.org | ||||
|  * | ||||
|  * This program is free software; you can redistribute it and/or modify | ||||
|  * it under the terms of the GNU General Public License as published by | ||||
|  * the Free Software Foundation; either version 2 of the License, or | ||||
|  * (at your option) any later version. | ||||
|  * | ||||
|  * This program is distributed in the hope that it will be useful, | ||||
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|  * GNU General Public License for more details. | ||||
|  * | ||||
|  * You should have received a copy of the GNU General Public License along | ||||
|  * with this program; if not, write to the Free Software Foundation, Inc., | ||||
|  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||||
|  */ | ||||
|  | ||||
| #ifndef MPD_THREAD_INPUT_STREAM_HXX | ||||
| #define MPD_THREAD_INPUT_STREAM_HXX | ||||
|  | ||||
| #include "check.h" | ||||
| #include "InputStream.hxx" | ||||
| #include "thread/Thread.hxx" | ||||
| #include "thread/Cond.hxx" | ||||
| #include "util/Cast.hxx" | ||||
| #include "util/Error.hxx" | ||||
|  | ||||
| #include <stdint.h> | ||||
|  | ||||
| template<typename T> class CircularBuffer; | ||||
|  | ||||
| /** | ||||
|  * Helper class for moving InputStream implementations with blocking | ||||
|  * backend library implementation to a dedicated thread.  Data is | ||||
|  * being read into a ring buffer, and that buffer is then consumed by | ||||
|  * another thread using the regular #InputStream API.  This class | ||||
|  * manages the thread and the buffer. | ||||
|  * | ||||
|  * This works only for "streams": unknown length, no seeking, no tags. | ||||
|  */ | ||||
| class ThreadInputStream { | ||||
| 	InputStream base; | ||||
|  | ||||
| 	Thread thread; | ||||
|  | ||||
| 	/** | ||||
| 	 * Signalled when the thread shall be woken up: when data from | ||||
| 	 * the buffer has been consumed and when the stream shall be | ||||
| 	 * closed. | ||||
| 	 */ | ||||
| 	Cond wake_cond; | ||||
|  | ||||
| 	Error postponed_error; | ||||
|  | ||||
| 	const size_t buffer_size; | ||||
| 	CircularBuffer<uint8_t> *buffer; | ||||
|  | ||||
| 	/** | ||||
| 	 * Shall the stream be closed? | ||||
| 	 */ | ||||
| 	bool close; | ||||
|  | ||||
| 	/** | ||||
| 	 * Has the end of the stream been seen by the thread? | ||||
| 	 */ | ||||
| 	bool eof; | ||||
|  | ||||
| public: | ||||
| 	ThreadInputStream(const InputPlugin &_plugin, | ||||
| 			  const char *_uri, Mutex &_mutex, Cond &_cond, | ||||
| 			  size_t _buffer_size) | ||||
| 		:base(_plugin, _uri, _mutex, _cond), | ||||
| 		 buffer_size(_buffer_size), | ||||
| 		 buffer(nullptr), | ||||
| 		 close(false), eof(false) {} | ||||
|  | ||||
| 	virtual ~ThreadInputStream(); | ||||
|  | ||||
| 	/** | ||||
| 	 * Initialize the object and start the thread. | ||||
| 	 * | ||||
| 	 * @return false on error | ||||
| 	 */ | ||||
| 	InputStream *Start(Error &error); | ||||
|  | ||||
| protected: | ||||
| 	void Lock() { | ||||
| 		base.mutex.lock(); | ||||
| 	} | ||||
|  | ||||
| 	void Unlock() { | ||||
| 		base.mutex.unlock(); | ||||
| 	} | ||||
|  | ||||
| 	const char *GetURI() const { | ||||
| 		assert(thread.IsInside()); | ||||
|  | ||||
| 		return base.uri.c_str(); | ||||
| 	} | ||||
|  | ||||
| 	void SetMimeType(const char *mime) { | ||||
| 		assert(thread.IsInside()); | ||||
|  | ||||
| 		base.mime = mime; | ||||
| 	} | ||||
|  | ||||
| 	/* to be implemented by the plugin */ | ||||
|  | ||||
| 	/** | ||||
| 	 * Optional initialization after entering the thread.  After | ||||
| 	 * this returns with success, the InputStream::ready flag is | ||||
| 	 * set. | ||||
| 	 * | ||||
| 	 * The #InputStream is locked.  Unlock/relock it if you do a | ||||
| 	 * blocking operation. | ||||
| 	 */ | ||||
| 	virtual bool Open(gcc_unused Error &error) { | ||||
| 		return true; | ||||
| 	} | ||||
|  | ||||
| 	/** | ||||
| 	 * Read from the stream. | ||||
| 	 * | ||||
| 	 * The #InputStream is not locked. | ||||
| 	 * | ||||
| 	 * @return 0 on end-of-file or on error | ||||
| 	 */ | ||||
| 	virtual size_t Read(void *ptr, size_t size, Error &error) = 0; | ||||
|  | ||||
| 	/** | ||||
| 	 * Optional deinitialization before leaving the thread. | ||||
| 	 * | ||||
| 	 * The #InputStream is not locked. | ||||
| 	 */ | ||||
| 	virtual void Close() {} | ||||
|  | ||||
| 	/** | ||||
| 	 * Called from the client thread to cancel a Read() inside the | ||||
| 	 * thread. | ||||
| 	 * | ||||
| 	 * The #InputStream is not locked. | ||||
| 	 */ | ||||
| 	virtual void Cancel() {} | ||||
|  | ||||
| private: | ||||
|  | ||||
| #if GCC_CHECK_VERSION(4,6) || defined(__clang__) | ||||
| #pragma GCC diagnostic push | ||||
| #pragma GCC diagnostic ignored "-Winvalid-offsetof" | ||||
| #endif | ||||
|  | ||||
| 	static constexpr ThreadInputStream *Cast(InputStream *is) { | ||||
| 		return ContainerCast(is, ThreadInputStream, base); | ||||
| 	} | ||||
|  | ||||
| #if GCC_CHECK_VERSION(4,6) || defined(__clang__) | ||||
| #pragma GCC diagnostic pop | ||||
| #endif | ||||
|  | ||||
| 	void ThreadFunc(); | ||||
| 	static void ThreadFunc(void *ctx); | ||||
|  | ||||
| 	bool Check2(Error &error); | ||||
| 	bool Available2(); | ||||
| 	size_t Read2(void *ptr, size_t size, Error &error); | ||||
| 	void Close2(); | ||||
| 	bool IsEOF2(); | ||||
|  | ||||
| public: | ||||
| 	/* InputPlugin callbacks */ | ||||
| 	static bool Check(InputStream *is, Error &error); | ||||
| 	static bool Available(InputStream *is); | ||||
| 	static size_t Read(InputStream *is, void *ptr, size_t size, | ||||
| 			   Error &error); | ||||
| 	static void Close(InputStream *is); | ||||
| 	static bool IsEOF(InputStream *is); | ||||
| }; | ||||
|  | ||||
| #endif | ||||
		Reference in New Issue
	
	Block a user
	 Max Kellermann
					Max Kellermann