db/simple: mount points
A SimpleDatabase instance can now "mount" other Database instances at certain locations. This is used to use a new SimpleDatabase instance for each storage mount (issued with the "mount" protocol command). Each such instance has its own database file, stored in the directory that is specified with the "cache_directory" option.
This commit is contained in:
@@ -42,6 +42,9 @@ UpdateWalk::MakeDirectoryIfModified(Directory &parent, const char *name,
|
||||
|
||||
// directory exists already
|
||||
if (directory != nullptr) {
|
||||
if (directory->IsMount())
|
||||
return nullptr;
|
||||
|
||||
if (directory->mtime == info.mtime && !walk_discard) {
|
||||
/* not modified */
|
||||
return nullptr;
|
||||
|
||||
@@ -21,12 +21,13 @@
|
||||
#include "Queue.hxx"
|
||||
|
||||
bool
|
||||
UpdateQueue::Push(const char *path, bool discard, unsigned id)
|
||||
UpdateQueue::Push(SimpleDatabase &db, Storage &storage,
|
||||
const char *path, bool discard, unsigned id)
|
||||
{
|
||||
if (update_queue.size() >= MAX_UPDATE_QUEUE_SIZE)
|
||||
return false;
|
||||
|
||||
update_queue.emplace_back(path, discard, id);
|
||||
update_queue.emplace_back(db, storage, path, discard, id);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -40,3 +41,27 @@ UpdateQueue::Pop()
|
||||
update_queue.pop_front();
|
||||
return i;
|
||||
}
|
||||
|
||||
void
|
||||
UpdateQueue::Erase(SimpleDatabase &db)
|
||||
{
|
||||
for (auto i = update_queue.begin(), end = update_queue.end();
|
||||
i != end;) {
|
||||
if (i->db == &db)
|
||||
i = update_queue.erase(i);
|
||||
else
|
||||
++i;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
UpdateQueue::Erase(Storage &storage)
|
||||
{
|
||||
for (auto i = update_queue.begin(), end = update_queue.end();
|
||||
i != end;) {
|
||||
if (i->storage == &storage)
|
||||
i = update_queue.erase(i);
|
||||
else
|
||||
++i;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,19 +21,30 @@
|
||||
#define MPD_UPDATE_QUEUE_HXX
|
||||
|
||||
#include "check.h"
|
||||
#include "Compiler.h"
|
||||
|
||||
#include <string>
|
||||
#include <list>
|
||||
|
||||
class SimpleDatabase;
|
||||
class Storage;
|
||||
|
||||
struct UpdateQueueItem {
|
||||
SimpleDatabase *db;
|
||||
Storage *storage;
|
||||
|
||||
std::string path_utf8;
|
||||
unsigned id;
|
||||
bool discard;
|
||||
|
||||
UpdateQueueItem():id(0) {}
|
||||
UpdateQueueItem(const char *_path, bool _discard,
|
||||
|
||||
UpdateQueueItem(SimpleDatabase &_db,
|
||||
Storage &_storage,
|
||||
const char *_path, bool _discard,
|
||||
unsigned _id)
|
||||
:path_utf8(_path), id(_id), discard(_discard) {}
|
||||
:db(&_db), storage(&_storage), path_utf8(_path),
|
||||
id(_id), discard(_discard) {}
|
||||
|
||||
bool IsDefined() const {
|
||||
return id != 0;
|
||||
@@ -46,13 +57,21 @@ class UpdateQueue {
|
||||
std::list<UpdateQueueItem> update_queue;
|
||||
|
||||
public:
|
||||
bool Push(const char *path, bool discard, unsigned id);
|
||||
gcc_nonnull_all
|
||||
bool Push(SimpleDatabase &db, Storage &storage,
|
||||
const char *path, bool discard, unsigned id);
|
||||
|
||||
UpdateQueueItem Pop();
|
||||
|
||||
void Clear() {
|
||||
update_queue.clear();
|
||||
}
|
||||
|
||||
gcc_nonnull_all
|
||||
void Erase(SimpleDatabase &db);
|
||||
|
||||
gcc_nonnull_all
|
||||
void Erase(Storage &storage);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
@@ -22,7 +22,10 @@
|
||||
#include "Walk.hxx"
|
||||
#include "UpdateDomain.hxx"
|
||||
#include "db/DatabaseListener.hxx"
|
||||
#include "db/DatabaseLock.hxx"
|
||||
#include "db/plugins/simple/SimpleDatabasePlugin.hxx"
|
||||
#include "db/plugins/simple/Directory.hxx"
|
||||
#include "storage/CompositeStorage.hxx"
|
||||
#include "Idle.hxx"
|
||||
#include "util/Error.hxx"
|
||||
#include "Log.hxx"
|
||||
@@ -39,7 +42,7 @@
|
||||
#include <assert.h>
|
||||
|
||||
UpdateService::UpdateService(EventLoop &_loop, SimpleDatabase &_db,
|
||||
Storage &_storage,
|
||||
CompositeStorage &_storage,
|
||||
DatabaseListener &_listener)
|
||||
:DeferredMonitor(_loop),
|
||||
db(_db), storage(_storage),
|
||||
@@ -71,6 +74,42 @@ UpdateService::CancelAllAsync()
|
||||
walk->Cancel();
|
||||
}
|
||||
|
||||
void
|
||||
UpdateService::CancelMount(const char *uri)
|
||||
{
|
||||
/* determine which (mounted) database will be updated and what
|
||||
storage will be scanned */
|
||||
|
||||
db_lock();
|
||||
const auto lr = db.GetRoot().LookupDirectory(uri);
|
||||
db_unlock();
|
||||
|
||||
if (!lr.directory->IsMount())
|
||||
return;
|
||||
|
||||
bool cancel_current = false;
|
||||
|
||||
Storage *storage2 = storage.GetMount(uri);
|
||||
if (storage2 != nullptr) {
|
||||
queue.Erase(*storage2);
|
||||
cancel_current = next.IsDefined() && next.storage == storage2;
|
||||
}
|
||||
|
||||
Database &_db2 = *lr.directory->mounted_database;
|
||||
if (_db2.IsPlugin(simple_db_plugin)) {
|
||||
SimpleDatabase &db2 = static_cast<SimpleDatabase &>(_db2);
|
||||
queue.Erase(db2);
|
||||
cancel_current |= next.IsDefined() && next.db == &db2;
|
||||
}
|
||||
|
||||
if (cancel_current && walk != nullptr) {
|
||||
walk->Cancel();
|
||||
|
||||
if (update_thread.IsDefined())
|
||||
update_thread.Join();
|
||||
}
|
||||
}
|
||||
|
||||
inline void
|
||||
UpdateService::Task()
|
||||
{
|
||||
@@ -84,12 +123,12 @@ UpdateService::Task()
|
||||
|
||||
SetThreadIdlePriority();
|
||||
|
||||
modified = walk->Walk(db.GetRoot(), next.path_utf8.c_str(),
|
||||
modified = walk->Walk(next.db->GetRoot(), next.path_utf8.c_str(),
|
||||
next.discard);
|
||||
|
||||
if (modified || !db.FileExists()) {
|
||||
if (modified || !next.db->FileExists()) {
|
||||
Error error;
|
||||
if (!db.Save(error))
|
||||
if (!next.db->Save(error))
|
||||
LogError(error, "Failed to save database");
|
||||
}
|
||||
|
||||
@@ -120,7 +159,7 @@ UpdateService::StartThread(UpdateQueueItem &&i)
|
||||
modified = false;
|
||||
|
||||
next = std::move(i);
|
||||
walk = new UpdateWalk(GetEventLoop(), listener, storage);
|
||||
walk = new UpdateWalk(GetEventLoop(), listener, *next.storage);
|
||||
|
||||
Error error;
|
||||
if (!update_thread.Start(Task, this, error))
|
||||
@@ -144,9 +183,52 @@ UpdateService::Enqueue(const char *path, bool discard)
|
||||
{
|
||||
assert(GetEventLoop().IsInsideOrNull());
|
||||
|
||||
/* determine which (mounted) database will be updated and what
|
||||
storage will be scanned */
|
||||
SimpleDatabase *db2;
|
||||
Storage *storage2;
|
||||
|
||||
db_lock();
|
||||
const auto lr = db.GetRoot().LookupDirectory(path);
|
||||
db_unlock();
|
||||
if (lr.directory->IsMount()) {
|
||||
/* follow the mountpoint, update the mounted
|
||||
database */
|
||||
|
||||
Database &_db2 = *lr.directory->mounted_database;
|
||||
if (!_db2.IsPlugin(simple_db_plugin))
|
||||
/* cannot update this type of database */
|
||||
return 0;
|
||||
|
||||
db2 = static_cast<SimpleDatabase *>(&_db2);
|
||||
|
||||
if (lr.uri == nullptr) {
|
||||
storage2 = storage.GetMount(path);
|
||||
path = "";
|
||||
} else {
|
||||
assert(lr.uri > path);
|
||||
assert(lr.uri < path + strlen(path));
|
||||
assert(lr.uri[-1] == '/');
|
||||
|
||||
const std::string mountpoint(path, lr.uri - 1);
|
||||
storage2 = storage.GetMount(mountpoint.c_str());
|
||||
path = lr.uri;
|
||||
}
|
||||
} else {
|
||||
/* use the "root" database/storage */
|
||||
|
||||
db2 = &db;
|
||||
storage2 = storage.GetMount("");
|
||||
}
|
||||
|
||||
if (storage2 == nullptr)
|
||||
/* no storage found at this mount point - should not
|
||||
happen */
|
||||
return 0;
|
||||
|
||||
if (progress != UPDATE_PROGRESS_IDLE) {
|
||||
const unsigned id = GenerateId();
|
||||
if (!queue.Push(path, discard, id))
|
||||
if (!queue.Push(*db2, *storage2, path, discard, id))
|
||||
return 0;
|
||||
|
||||
update_task_id = id;
|
||||
@@ -154,7 +236,7 @@ UpdateService::Enqueue(const char *path, bool discard)
|
||||
}
|
||||
|
||||
const unsigned id = update_task_id = GenerateId();
|
||||
StartThread(UpdateQueueItem(path, discard, id));
|
||||
StartThread(UpdateQueueItem(*db2, *storage2, path, discard, id));
|
||||
|
||||
idle_add(IDLE_UPDATE);
|
||||
|
||||
@@ -171,7 +253,10 @@ UpdateService::RunDeferred()
|
||||
assert(next.IsDefined());
|
||||
assert(walk != nullptr);
|
||||
|
||||
update_thread.Join();
|
||||
/* wait for thread to finish only if it wasn't cancelled by
|
||||
CancelMount() */
|
||||
if (update_thread.IsDefined())
|
||||
update_thread.Join();
|
||||
|
||||
delete walk;
|
||||
walk = nullptr;
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
class SimpleDatabase;
|
||||
class DatabaseListener;
|
||||
class UpdateWalk;
|
||||
class Storage;
|
||||
class CompositeStorage;
|
||||
|
||||
/**
|
||||
* This class manages the update queue and runs the update thread.
|
||||
@@ -41,7 +41,7 @@ class UpdateService final : DeferredMonitor {
|
||||
};
|
||||
|
||||
SimpleDatabase &db;
|
||||
Storage &storage;
|
||||
CompositeStorage &storage;
|
||||
|
||||
DatabaseListener &listener;
|
||||
|
||||
@@ -63,7 +63,7 @@ class UpdateService final : DeferredMonitor {
|
||||
|
||||
public:
|
||||
UpdateService(EventLoop &_loop, SimpleDatabase &_db,
|
||||
Storage &_storage,
|
||||
CompositeStorage &_storage,
|
||||
DatabaseListener &_listener);
|
||||
|
||||
~UpdateService();
|
||||
@@ -92,6 +92,13 @@ public:
|
||||
*/
|
||||
void CancelAllAsync();
|
||||
|
||||
/**
|
||||
* Cancel all updates for the given mount point. If an update
|
||||
* is already running for it, the method will wait for
|
||||
* cancellation to complete.
|
||||
*/
|
||||
void CancelMount(const char *uri);
|
||||
|
||||
private:
|
||||
/* virtual methods from class DeferredMonitor */
|
||||
virtual void RunDeferred() override;
|
||||
|
||||
@@ -397,8 +397,12 @@ UpdateWalk::DirectoryMakeChildChecked(Directory &parent,
|
||||
Directory *directory = parent.FindChild(name_utf8);
|
||||
db_unlock();
|
||||
|
||||
if (directory != nullptr)
|
||||
if (directory != nullptr) {
|
||||
if (directory->IsMount())
|
||||
directory = nullptr;
|
||||
|
||||
return directory;
|
||||
}
|
||||
|
||||
FileInfo info;
|
||||
if (!GetInfo(storage, uri_utf8, info) ||
|
||||
|
||||
Reference in New Issue
Block a user