Refactor send_diffs making it progressive
When a slave is many diffs behind, send these in batches of up to 50, then handle other slaves. This also implements a fast-path that makes incremental diffs faster when the log has not rolled over. Related code cleanup.
This commit is contained in:

committed by
Nico Williams

parent
96fd393d29
commit
d0211ef475
@@ -159,6 +159,16 @@ struct slave {
|
||||
size_t offset;
|
||||
int hlen;
|
||||
} input;
|
||||
/*
|
||||
* Continuation for fair diff sending we send N entries at a time.
|
||||
*/
|
||||
struct {
|
||||
off_t off_next_version; /* offset in log of next diff */
|
||||
uint32_t initial_version; /* at time of previous diff */
|
||||
uint32_t initial_tstamp; /* at time of previous diff */
|
||||
uint32_t last_version_sent;
|
||||
int more; /* need to send more diffs */
|
||||
} next_diff;
|
||||
struct slave *next;
|
||||
};
|
||||
|
||||
@@ -541,7 +551,14 @@ have_tail(slave *s)
|
||||
return s->tail.header.length || s->tail.packet.length || s->tail.dump;
|
||||
}
|
||||
|
||||
static int
|
||||
more_diffs(slave *s)
|
||||
{
|
||||
return s->next_diff.more;
|
||||
}
|
||||
|
||||
#define SEND_COMPLETE_MAX_RECORDS 50
|
||||
#define SEND_DIFFS_MAX_RECORDS 50
|
||||
|
||||
static int
|
||||
send_tail(krb5_context context, slave *s)
|
||||
@@ -645,9 +662,9 @@ ewouldblock:
|
||||
}
|
||||
|
||||
static int
|
||||
send_complete (krb5_context context, slave *s, const char *database,
|
||||
uint32_t current_version, uint32_t oldest_version,
|
||||
uint32_t initial_log_tstamp)
|
||||
send_complete(krb5_context context, slave *s, const char *database,
|
||||
uint32_t current_version, uint32_t oldest_version,
|
||||
uint32_t initial_log_tstamp)
|
||||
{
|
||||
krb5_error_code ret;
|
||||
krb5_storage *dump = NULL;
|
||||
@@ -838,21 +855,8 @@ send_are_you_there (krb5_context context, slave *s)
|
||||
}
|
||||
|
||||
static int
|
||||
send_diffs (kadm5_server_context *server_context, slave *s, int log_fd,
|
||||
const char *database, uint32_t current_version,
|
||||
uint32_t current_tstamp)
|
||||
diffready(krb5_context context, slave *s)
|
||||
{
|
||||
krb5_context context = server_context->context;
|
||||
krb5_storage *sp;
|
||||
uint32_t ver, initial_version, initial_version2;
|
||||
uint32_t initial_tstamp, initial_tstamp2;
|
||||
enum kadm_ops op;
|
||||
uint32_t len;
|
||||
off_t right, left;
|
||||
krb5_ssize_t bytes;
|
||||
krb5_data data;
|
||||
int ret = 0;
|
||||
|
||||
/*
|
||||
* Don't send any diffs until slave has sent an I_HAVE telling us the
|
||||
* initial version number!
|
||||
@@ -861,197 +865,363 @@ send_diffs (kadm5_server_context *server_context, slave *s, int log_fd,
|
||||
return 0;
|
||||
|
||||
if (s->flags & SLAVE_F_DEAD) {
|
||||
krb5_warnx(context, "not sending diffs to dead slave %s", s->name);
|
||||
if (verbose)
|
||||
krb5_warnx(context, "not sending diffs to dead slave %s", s->name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Write any remainder of previous write, if we can. If we'd block we'll
|
||||
* return EWOULDBLOCK.
|
||||
*/
|
||||
ret = send_tail(context, s);
|
||||
if (ret)
|
||||
return ret;
|
||||
/* Write any remainder of previous write, if we can. */
|
||||
if (send_tail(context, s) != 0)
|
||||
return 0;
|
||||
|
||||
if (s->version == current_version) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int
|
||||
nodiffs(krb5_context context, slave *s, uint32_t current_version)
|
||||
{
|
||||
krb5_storage *sp;
|
||||
krb5_data data;
|
||||
int ret;
|
||||
|
||||
if (s->version < current_version)
|
||||
return 0;
|
||||
|
||||
/*
|
||||
* If we had sent a partial diff, and now they're caught up, then there's
|
||||
* no more.
|
||||
*/
|
||||
s->next_diff.more = 0;
|
||||
|
||||
if (verbose)
|
||||
krb5_warnx(context, "slave %s version %ld already sent", s->name,
|
||||
(long)s->version);
|
||||
sp = krb5_storage_emem();
|
||||
if (sp == NULL)
|
||||
krb5_errx(context, IPROPD_RESTART, "krb5_storage_from_mem");
|
||||
ret = krb5_store_uint32(sp, YOU_HAVE_LAST_VERSION);
|
||||
if (ret == 0)
|
||||
ret = krb5_storage_to_data(sp, &data);
|
||||
krb5_storage_free(sp);
|
||||
if (ret == 0)
|
||||
ret = mk_priv_tail(context, s, &data);
|
||||
if (ret == 0)
|
||||
ret = send_tail(context, s);
|
||||
return ret;
|
||||
sp = krb5_storage_emem();
|
||||
if (sp == NULL)
|
||||
krb5_errx(context, IPROPD_RESTART, "krb5_storage_from_mem");
|
||||
|
||||
ret = krb5_store_uint32(sp, YOU_HAVE_LAST_VERSION);
|
||||
if (ret == 0) {
|
||||
krb5_data_zero(&data);
|
||||
ret = krb5_storage_to_data(sp, &data);
|
||||
}
|
||||
krb5_storage_free(sp);
|
||||
if (ret == 0) {
|
||||
ret = mk_priv_tail(context, s, &data);
|
||||
krb5_data_free(&data);
|
||||
}
|
||||
if (ret == 0)
|
||||
send_tail(context, s);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Lock the log and return initial version and timestamp
|
||||
*/
|
||||
static int
|
||||
get_first(kadm5_server_context *server_context, int log_fd,
|
||||
uint32_t *initial_verp, uint32_t *initial_timep)
|
||||
{
|
||||
krb5_context context = server_context->context;
|
||||
int ret;
|
||||
|
||||
/*
|
||||
* We don't want to perform tight retry loops on log access errors, so on
|
||||
* error mark the slave dead. The slave reconnect after a delay...
|
||||
*/
|
||||
if (flock(log_fd, LOCK_SH) == -1) {
|
||||
krb5_warn(context, errno, "could not obtain shared lock on log file");
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = kadm5_log_get_version_fd(server_context, log_fd, LOG_VERSION_FIRST,
|
||||
initial_verp, initial_timep);
|
||||
if (ret != 0) {
|
||||
flock(log_fd, LOCK_UN);
|
||||
krb5_warnx(context, "could not read initial log entry");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*-
|
||||
* Find the left end of the diffs in the log we want to send.
|
||||
*
|
||||
* - On success, return a positive offset to the first new entry, retaining
|
||||
* a read lock on the log file.
|
||||
* - On error, return a negative offset, with the lock released.
|
||||
* - If we simply find no successor entry in the log, return zero
|
||||
* with the lock released, which indicates that fallback to send_complete()
|
||||
* is needed.
|
||||
*/
|
||||
static off_t
|
||||
get_left(kadm5_server_context *server_context, slave *s, krb5_storage *sp,
|
||||
int log_fd, uint32_t current_version,
|
||||
uint32_t *initial_verp, uint32_t *initial_timep)
|
||||
{
|
||||
krb5_context context = server_context->context;
|
||||
off_t pos;
|
||||
off_t left;
|
||||
int ret;
|
||||
|
||||
for (;;) {
|
||||
uint32_t ver = s->version;
|
||||
|
||||
/* This acquires a read lock on success */
|
||||
ret = get_first(server_context, log_fd,
|
||||
initial_verp, initial_timep);
|
||||
if (ret != 0)
|
||||
return -1;
|
||||
|
||||
/* When the slave version is out of range, send the whole database. */
|
||||
if (ver == 0 || ver < *initial_verp || ver > current_version) {
|
||||
flock(log_fd, LOCK_UN);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Avoid seeking past the last committed record */
|
||||
if (kadm5_log_goto_end(server_context, sp) != 0 ||
|
||||
(pos = krb5_storage_seek(sp, 0, SEEK_CUR)) < 0)
|
||||
goto err;
|
||||
|
||||
/*
|
||||
* First try to see if we can find it quickly by seeking to the right
|
||||
* end of the previous diff sent.
|
||||
*/
|
||||
if (s->next_diff.last_version_sent > 0 &&
|
||||
s->next_diff.off_next_version > 0 &&
|
||||
s->next_diff.off_next_version < pos &&
|
||||
s->next_diff.initial_version == *initial_verp &&
|
||||
s->next_diff.initial_tstamp == *initial_timep) {
|
||||
/*
|
||||
* Sanity check that the left version matches what we wanted, the
|
||||
* log may have been truncated since.
|
||||
*/
|
||||
left = s->next_diff.off_next_version;
|
||||
if (krb5_storage_seek(sp, left, SEEK_SET) != left)
|
||||
goto err;
|
||||
if (kadm5_log_next(context, sp, &ver, NULL, NULL, NULL) == 0 &&
|
||||
ver == s->next_diff.last_version_sent + 1)
|
||||
return left;
|
||||
}
|
||||
|
||||
if (krb5_storage_seek(sp, pos, SEEK_SET) != pos)
|
||||
goto err;
|
||||
|
||||
/*
|
||||
* Drop the lock and try to find the left entry by seeking backward
|
||||
* from the end of the end of the log. If we succeed, re-acquire the
|
||||
* lock, update "next_diff", and retry the fast-path.
|
||||
*/
|
||||
flock(log_fd, LOCK_UN);
|
||||
|
||||
/* Slow path: seek backwards, entry by entry, from the end */
|
||||
for (;;) {
|
||||
enum kadm_ops op;
|
||||
uint32_t len;
|
||||
|
||||
ret = kadm5_log_previous(context, sp, &ver, NULL, &op, &len);
|
||||
if (ret)
|
||||
return -1;
|
||||
left = krb5_storage_seek(sp, -16, SEEK_CUR);
|
||||
if (left < 0)
|
||||
return left;
|
||||
if (ver == s->version + 1)
|
||||
break;
|
||||
|
||||
/*
|
||||
* We don't expect to reach the slave's version, unless the log
|
||||
* has been modified after we released the lock.
|
||||
*/
|
||||
if (ver == s->version) {
|
||||
krb5_warnx(context, "iprop log truncated while sending diffs "
|
||||
"to slave?? ver = %lu", (unsigned long)ver);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* If we've reached the uber record, send the complete database */
|
||||
if (left == 0 || (ver == 0 && op == kadm_nop))
|
||||
return 0;
|
||||
}
|
||||
assert(ver == s->version + 1);
|
||||
|
||||
/* Set up the fast-path pre-conditions */
|
||||
s->next_diff.last_version_sent = s->version;
|
||||
s->next_diff.off_next_version = left;
|
||||
s->next_diff.initial_version = *initial_verp;
|
||||
s->next_diff.initial_tstamp = *initial_timep;
|
||||
|
||||
/*
|
||||
* If we loop then we're hoping to hit the fast path so we can return a
|
||||
* non-zero, positive left offset with the lock held.
|
||||
*
|
||||
* We just updated the fast path pre-conditions, so unless a log
|
||||
* truncation event happens between the point where we dropped the lock
|
||||
* and the point where we rearcuire it above, we will hit the fast
|
||||
* path.
|
||||
*/
|
||||
}
|
||||
|
||||
return left;
|
||||
|
||||
err:
|
||||
flock(log_fd, LOCK_UN);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static off_t
|
||||
get_right(krb5_context context, int log_fd, krb5_storage *sp,
|
||||
int lastver, slave *s, off_t left, uint32_t *verp)
|
||||
{
|
||||
int ret = 0;
|
||||
int i = 0;
|
||||
uint32_t ver = s->version;
|
||||
off_t right = krb5_storage_seek(sp, left, SEEK_SET);
|
||||
|
||||
if (right <= 0) {
|
||||
flock(log_fd, LOCK_UN);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* The "lastver" bound should preclude us reaching EOF */
|
||||
for (; ret == 0 && i < SEND_DIFFS_MAX_RECORDS && ver < lastver; ++i) {
|
||||
uint32_t logver;
|
||||
|
||||
ret = kadm5_log_next(context, sp, &logver, NULL, NULL, NULL);
|
||||
if (logver != ++ver)
|
||||
ret = KADM5_LOG_CORRUPT;
|
||||
}
|
||||
|
||||
if (ret == 0)
|
||||
right = krb5_storage_seek(sp, 0, SEEK_CUR);
|
||||
else
|
||||
right = -1;
|
||||
if (right <= 0) {
|
||||
flock(log_fd, LOCK_UN);
|
||||
return -1;
|
||||
}
|
||||
*verp = ver;
|
||||
return right;
|
||||
}
|
||||
|
||||
static void
|
||||
send_diffs(kadm5_server_context *server_context, slave *s, int log_fd,
|
||||
const char *database, uint32_t current_version)
|
||||
{
|
||||
krb5_context context = server_context->context;
|
||||
krb5_storage *sp;
|
||||
uint32_t initial_version;
|
||||
uint32_t initial_tstamp;
|
||||
uint32_t ver;
|
||||
off_t left = 0;
|
||||
off_t right = 0;
|
||||
krb5_ssize_t bytes;
|
||||
krb5_data data;
|
||||
int ret = 0;
|
||||
|
||||
if (!diffready(context, s) || nodiffs(context, s, current_version))
|
||||
return;
|
||||
|
||||
if (verbose)
|
||||
krb5_warnx(context, "sending diffs to live-seeming slave %s", s->name);
|
||||
|
||||
/*
|
||||
* XXX The code that makes the diffs should be made a separate function,
|
||||
* then error handling (send_are_you_there() or slave_dead()) can be done
|
||||
* here.
|
||||
*/
|
||||
sp = krb5_storage_from_fd(log_fd);
|
||||
if (sp == NULL)
|
||||
krb5_err(context, IPROPD_RESTART_SLOW, ENOMEM,
|
||||
"send_diffs: out of memory");
|
||||
|
||||
if (flock(log_fd, LOCK_SH) == -1) {
|
||||
krb5_warn(context, errno, "could not obtain shared lock on log file");
|
||||
send_are_you_there(context, s);
|
||||
return errno;
|
||||
}
|
||||
ret = kadm5_log_get_version_fd(server_context, log_fd, LOG_VERSION_FIRST,
|
||||
&initial_version, &initial_tstamp);
|
||||
sp = kadm5_log_goto_end(server_context, log_fd);
|
||||
flock(log_fd, LOCK_UN);
|
||||
if (ret) {
|
||||
if (sp != NULL)
|
||||
krb5_storage_free(sp);
|
||||
krb5_warn(context, ret, "send_diffs: failed to read log");
|
||||
send_are_you_there(context, s);
|
||||
return ret;
|
||||
}
|
||||
if (sp == NULL) {
|
||||
send_are_you_there(context, s);
|
||||
krb5_warn(context, errno ? errno : EINVAL,
|
||||
"send_diffs: failed to read log");
|
||||
return errno ? errno : EINVAL;
|
||||
}
|
||||
/*
|
||||
* We're not holding any locks here, so we can't prevent truncations.
|
||||
*
|
||||
* We protect against this by re-checking that the initial version and
|
||||
* timestamp are the same before and after this loop.
|
||||
*/
|
||||
right = krb5_storage_seek(sp, 0, SEEK_CUR);
|
||||
if (right == (off_t)-1) {
|
||||
left = get_left(server_context, s, sp, log_fd, current_version,
|
||||
&initial_version, &initial_tstamp);
|
||||
if (left < 0) {
|
||||
krb5_storage_free(sp);
|
||||
send_are_you_there(context, s);
|
||||
return errno;
|
||||
}
|
||||
for (;;) {
|
||||
ret = kadm5_log_previous (context, sp, &ver, NULL, &op, &len);
|
||||
if (ret)
|
||||
krb5_err(context, IPROPD_RESTART, ret,
|
||||
"send_diffs: failed to find previous entry");
|
||||
left = krb5_storage_seek(sp, -16, SEEK_CUR);
|
||||
if (left == (off_t)-1) {
|
||||
krb5_storage_free(sp);
|
||||
send_are_you_there(context, s);
|
||||
return errno;
|
||||
}
|
||||
if (ver == s->version + 1)
|
||||
break;
|
||||
|
||||
/*
|
||||
* We don't expect to reach the slave's version, except when it is
|
||||
* starting empty with the uber record.
|
||||
*/
|
||||
if (ver == s->version && !(ver == 0 && op == kadm_nop)) {
|
||||
/*
|
||||
* This shouldn't happen, but recall we're not holding a lock on
|
||||
* the log.
|
||||
*/
|
||||
krb5_storage_free(sp);
|
||||
krb5_warnx(context, "iprop log truncated while sending diffs to "
|
||||
"slave?? ver = %lu", (unsigned long)ver);
|
||||
send_are_you_there(context, s);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* If we've reached the uber record, send the complete database */
|
||||
if (left == 0 || (ver == 0 && op == kadm_nop)) {
|
||||
krb5_storage_free(sp);
|
||||
krb5_warnx(context,
|
||||
"slave %s (version %lu) out of sync with master "
|
||||
"(first version in log %lu), sending complete database",
|
||||
s->name, (unsigned long)s->version, (unsigned long)ver);
|
||||
return send_complete (context, s, database, current_version, ver,
|
||||
initial_tstamp);
|
||||
}
|
||||
slave_dead(context, s);
|
||||
return;
|
||||
}
|
||||
|
||||
assert(ver == s->version + 1);
|
||||
if (left == 0) {
|
||||
/* Slave's version is not in the log, fall back on send_complete() */
|
||||
krb5_storage_free(sp);
|
||||
send_complete(context, s, database, current_version,
|
||||
initial_version, initial_tstamp);
|
||||
return;
|
||||
}
|
||||
|
||||
krb5_warnx(context,
|
||||
"syncing slave %s from version %lu to version %lu",
|
||||
s->name, (unsigned long)s->version,
|
||||
(unsigned long)current_version);
|
||||
/* We still hold the read lock, if right > 0 */
|
||||
right = get_right(server_context->context, log_fd, sp, current_version,
|
||||
s, left, &ver);
|
||||
if (right == left) {
|
||||
flock(log_fd, LOCK_UN);
|
||||
krb5_storage_free(sp);
|
||||
return;
|
||||
}
|
||||
if (right < left) {
|
||||
assert(right < 0);
|
||||
krb5_storage_free(sp);
|
||||
slave_dead(context, s);
|
||||
return;
|
||||
}
|
||||
|
||||
ret = krb5_data_alloc (&data, right - left + 4);
|
||||
if (krb5_storage_seek(sp, left, SEEK_SET) != left) {
|
||||
ret = errno ? errno : EIO;
|
||||
flock(log_fd, LOCK_UN);
|
||||
krb5_warn(context, ret, "send_diffs: krb5_storage_seek");
|
||||
krb5_storage_free(sp);
|
||||
slave_dead(context, s);
|
||||
return;
|
||||
}
|
||||
|
||||
ret = krb5_data_alloc(&data, right - left + 4);
|
||||
if (ret) {
|
||||
krb5_storage_free(sp);
|
||||
krb5_warn (context, ret, "send_diffs: krb5_data_alloc");
|
||||
send_are_you_there(context, s);
|
||||
return 1;
|
||||
flock(log_fd, LOCK_UN);
|
||||
krb5_warn(context, ret, "send_diffs: krb5_data_alloc");
|
||||
krb5_storage_free(sp);
|
||||
slave_dead(context, s);
|
||||
return;
|
||||
}
|
||||
|
||||
bytes = krb5_storage_read(sp, (char *)data.data + 4, data.length - 4);
|
||||
krb5_storage_free(sp);
|
||||
if (bytes != data.length - 4) {
|
||||
krb5_warnx(context, "iprop log truncated while sending diffs to "
|
||||
"slave?? ver = %lu", (unsigned long)ver);
|
||||
send_are_you_there(context, s);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that we have the same log initial version and timestamp now as
|
||||
* when we dropped the shared lock on the log file! Else we could be
|
||||
* sending garbage to the slave.
|
||||
*/
|
||||
if (flock(log_fd, LOCK_SH) == -1) {
|
||||
krb5_warn(context, errno, "could not obtain shared lock on log file");
|
||||
send_are_you_there(context, s);
|
||||
return 1;
|
||||
}
|
||||
ret = kadm5_log_get_version_fd(server_context, log_fd, LOG_VERSION_FIRST,
|
||||
&initial_version2, &initial_tstamp2);
|
||||
flock(log_fd, LOCK_UN);
|
||||
if (ret) {
|
||||
krb5_warn(context, ret,
|
||||
"send_diffs: failed to read log while producing diffs");
|
||||
send_are_you_there(context, s);
|
||||
return 1;
|
||||
}
|
||||
if (initial_version != initial_version2 ||
|
||||
initial_tstamp != initial_tstamp2) {
|
||||
krb5_warn(context, ret,
|
||||
"send_diffs: log truncated while producing diffs");
|
||||
send_are_you_there(context, s);
|
||||
return 1;
|
||||
}
|
||||
krb5_storage_free(sp);
|
||||
if (bytes != data.length - 4)
|
||||
krb5_errx(context, IPROPD_RESTART, "locked log truncated???");
|
||||
|
||||
sp = krb5_storage_from_data (&data);
|
||||
sp = krb5_storage_from_data(&data);
|
||||
if (sp == NULL) {
|
||||
krb5_warnx (context, "send_diffs: krb5_storage_from_data");
|
||||
send_are_you_there(context, s);
|
||||
return 1;
|
||||
krb5_err(context, IPROPD_RESTART_SLOW, ENOMEM, "out of memory");
|
||||
krb5_warnx(context, "send_diffs: krb5_storage_from_data");
|
||||
return;
|
||||
}
|
||||
krb5_store_uint32 (sp, FOR_YOU);
|
||||
krb5_store_uint32(sp, FOR_YOU);
|
||||
krb5_storage_free(sp);
|
||||
|
||||
ret = mk_priv_tail(context, s, &data);
|
||||
krb5_data_free(&data);
|
||||
if (ret == 0)
|
||||
if (ret == 0) {
|
||||
/* Save the fast-path continuation */
|
||||
s->next_diff.last_version_sent = ver;
|
||||
s->next_diff.off_next_version = right;
|
||||
s->next_diff.initial_version = initial_version;
|
||||
s->next_diff.initial_tstamp = initial_tstamp;
|
||||
s->next_diff.more = ver < current_version;
|
||||
ret = send_tail(context, s);
|
||||
|
||||
krb5_warnx(context,
|
||||
"syncing slave %s from version %lu to version %lu",
|
||||
s->name, (unsigned long)s->version,
|
||||
(unsigned long)ver);
|
||||
s->version = ver;
|
||||
}
|
||||
|
||||
if (ret && ret != EWOULDBLOCK) {
|
||||
krb5_warn(context, ret, "send_diffs: making or sending "
|
||||
krb5_warn(context, ret, "send_diffs: making or sending "
|
||||
"KRB-PRIV message");
|
||||
slave_dead(context, s);
|
||||
return 1;
|
||||
slave_dead(context, s);
|
||||
return;
|
||||
}
|
||||
slave_seen(s);
|
||||
|
||||
s->version = current_version;
|
||||
|
||||
krb5_warnx(context, "slave %s is now up to date (%u)", s->name, s->version);
|
||||
|
||||
return 0;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Sensible bound on slave message size */
|
||||
@@ -1125,9 +1295,8 @@ read_msg(krb5_context context, slave *s, krb5_data *out)
|
||||
}
|
||||
|
||||
static int
|
||||
process_msg (kadm5_server_context *server_context, slave *s, int log_fd,
|
||||
const char *database, uint32_t current_version,
|
||||
uint32_t current_tstamp)
|
||||
process_msg(kadm5_server_context *server_context, slave *s, int log_fd,
|
||||
const char *database, uint32_t current_version)
|
||||
{
|
||||
krb5_context context = server_context->context;
|
||||
int ret = 0;
|
||||
@@ -1201,8 +1370,7 @@ process_msg (kadm5_server_context *server_context, slave *s, int log_fd,
|
||||
}
|
||||
if ((s->version_ack = tmp) < s->version)
|
||||
break;
|
||||
ret = send_diffs(server_context, s, log_fd, database, current_version,
|
||||
current_tstamp);
|
||||
send_diffs(server_context, s, log_fd, database, current_version);
|
||||
break;
|
||||
case I_AM_HERE :
|
||||
if (verbose)
|
||||
@@ -1386,7 +1554,6 @@ main(int argc, char **argv)
|
||||
int log_fd;
|
||||
slave *slaves = NULL;
|
||||
uint32_t current_version = 0, old_version = 0;
|
||||
uint32_t current_tstamp = 0;
|
||||
krb5_keytab keytab;
|
||||
char **files;
|
||||
int aret;
|
||||
@@ -1481,7 +1648,7 @@ main(int argc, char **argv)
|
||||
krb5_err(context, 1, errno, "shared flock %s",
|
||||
server_context->log_context.log_file);
|
||||
kadm5_log_get_version_fd(server_context, log_fd, LOG_VERSION_LAST,
|
||||
¤t_version, ¤t_tstamp);
|
||||
¤t_version, NULL);
|
||||
flock(log_fd, LOCK_UN);
|
||||
|
||||
signal_fd = make_signal_socket (context);
|
||||
@@ -1522,7 +1689,7 @@ main(int argc, char **argv)
|
||||
if (p->flags & SLAVE_F_DEAD)
|
||||
continue;
|
||||
FD_SET(p->fd, &readset);
|
||||
if (have_tail(p))
|
||||
if (have_tail(p) || more_diffs(p))
|
||||
FD_SET(p->fd, &writeset);
|
||||
max_fd = max(max_fd, p->fd);
|
||||
}
|
||||
@@ -1556,7 +1723,7 @@ main(int argc, char **argv)
|
||||
krb5_err(context, IPROPD_RESTART, errno, "shared flock %s",
|
||||
server_context->log_context.log_file);
|
||||
kadm5_log_get_version_fd(server_context, log_fd, LOG_VERSION_LAST,
|
||||
¤t_version, ¤t_tstamp);
|
||||
¤t_version, NULL);
|
||||
flock(log_fd, LOCK_UN);
|
||||
}
|
||||
|
||||
@@ -1569,7 +1736,7 @@ main(int argc, char **argv)
|
||||
krb5_err(context, IPROPD_RESTART, errno,
|
||||
"could not lock log file");
|
||||
kadm5_log_get_version_fd(server_context, log_fd, LOG_VERSION_LAST,
|
||||
¤t_version, ¤t_tstamp);
|
||||
¤t_version, NULL);
|
||||
flock(log_fd, LOCK_UN);
|
||||
|
||||
if (current_version > old_version) {
|
||||
@@ -1581,8 +1748,8 @@ main(int argc, char **argv)
|
||||
for (p = slaves; p != NULL; p = p->next) {
|
||||
if (p->flags & SLAVE_F_DEAD)
|
||||
continue;
|
||||
send_diffs (server_context, p, log_fd, database,
|
||||
current_version, current_tstamp);
|
||||
send_diffs(server_context, p, log_fd, database,
|
||||
current_version);
|
||||
}
|
||||
old_version = current_version;
|
||||
}
|
||||
@@ -1613,7 +1780,7 @@ main(int argc, char **argv)
|
||||
krb5_err(context, IPROPD_RESTART, errno, "shared flock %s",
|
||||
server_context->log_context.log_file);
|
||||
kadm5_log_get_version_fd(server_context, log_fd, LOG_VERSION_LAST,
|
||||
¤t_version, ¤t_tstamp);
|
||||
¤t_version, NULL);
|
||||
flock(log_fd, LOCK_UN);
|
||||
if (current_version != old_version) {
|
||||
/*
|
||||
@@ -1636,8 +1803,8 @@ main(int argc, char **argv)
|
||||
for (p = slaves; p != NULL; p = p->next) {
|
||||
if (p->flags & SLAVE_F_DEAD)
|
||||
continue;
|
||||
send_diffs (server_context, p, log_fd, database,
|
||||
current_version, current_tstamp);
|
||||
send_diffs(server_context, p, log_fd, database,
|
||||
current_version);
|
||||
}
|
||||
} else {
|
||||
if (verbose)
|
||||
@@ -1650,10 +1817,10 @@ main(int argc, char **argv)
|
||||
for (p = slaves; p != NULL; p = p->next) {
|
||||
if (!(p->flags & SLAVE_F_DEAD) &&
|
||||
FD_ISSET(p->fd, &writeset) &&
|
||||
have_tail(p) &&
|
||||
send_tail(context, p) == 0) {
|
||||
(void) send_diffs(server_context, p, log_fd, database,
|
||||
current_version, current_tstamp);
|
||||
((have_tail(p) && send_tail(context, p) == 0) ||
|
||||
(!have_tail(p) && more_diffs(p)))) {
|
||||
send_diffs(server_context, p, log_fd, database,
|
||||
current_version);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1664,7 +1831,7 @@ main(int argc, char **argv)
|
||||
--ret;
|
||||
assert(ret >= 0);
|
||||
ret = process_msg(server_context, p, log_fd, database,
|
||||
current_version, current_tstamp);
|
||||
current_version);
|
||||
if (ret && ret != EWOULDBLOCK)
|
||||
slave_dead(context, p);
|
||||
} else if (slave_gone_p (p))
|
||||
|
@@ -66,6 +66,7 @@ EXPORTS
|
||||
;! kadm5_log_signal_socket
|
||||
kadm5_log_signal_socket_info ;!
|
||||
kadm5_log_previous
|
||||
kadm5_log_goto_first
|
||||
kadm5_log_goto_end
|
||||
kadm5_log_foreach
|
||||
kadm5_log_get_version_fd
|
||||
@@ -78,6 +79,7 @@ EXPORTS
|
||||
kadm5_log_init_nb
|
||||
kadm5_log_init_nolock
|
||||
kadm5_log_init_sharedlock
|
||||
kadm5_log_next
|
||||
kadm5_log_nop
|
||||
kadm5_log_truncate
|
||||
kadm5_log_modify
|
||||
|
189
lib/kadm5/log.c
189
lib/kadm5/log.c
@@ -464,7 +464,6 @@ get_max_log_size(krb5_context context)
|
||||
}
|
||||
|
||||
static kadm5_ret_t truncate_if_needed(kadm5_server_context *);
|
||||
static krb5_storage *log_goto_first(kadm5_server_context *, int);
|
||||
|
||||
/*
|
||||
* Get the version and timestamp metadata of either the first, or last
|
||||
@@ -474,7 +473,7 @@ static krb5_storage *log_goto_first(kadm5_server_context *, int);
|
||||
* uber record which must be 0, or else we need to upgrade the log.
|
||||
*
|
||||
* If `which' is LOG_VERSION_FIRST, then this gets the metadata for the
|
||||
* logically first entry past the uberblock, or returns HEIM_EOF if
|
||||
* logically first entry past the uberblock, or returns HEIM_ERR_EOF if
|
||||
* only the uber record is present.
|
||||
*
|
||||
* If `which' is LOG_VERSION_LAST, then this gets metadata for the last
|
||||
@@ -504,37 +503,35 @@ kadm5_log_get_version_fd(kadm5_server_context *server_context, int fd,
|
||||
*ver = 0;
|
||||
*tstamp = 0;
|
||||
|
||||
sp = krb5_storage_from_fd(fd);
|
||||
if (sp == NULL)
|
||||
return errno ? errno : ENOMEM;
|
||||
|
||||
switch (which) {
|
||||
case LOG_VERSION_LAST:
|
||||
sp = kadm5_log_goto_end(server_context, fd);
|
||||
if (sp == NULL)
|
||||
return errno;
|
||||
ret = get_version_prev(sp, ver, tstamp);
|
||||
krb5_storage_free(sp);
|
||||
ret = kadm5_log_goto_end(server_context, sp);
|
||||
if (ret == 0)
|
||||
ret = get_version_prev(sp, ver, tstamp);
|
||||
break;
|
||||
case LOG_VERSION_FIRST:
|
||||
sp = log_goto_first(server_context, fd);
|
||||
if (sp == NULL)
|
||||
return errno;
|
||||
ret = get_header(sp, LOG_DOPEEK, ver, tstamp, NULL, NULL);
|
||||
krb5_storage_free(sp);
|
||||
ret = kadm5_log_goto_first(server_context, sp);
|
||||
if (ret == 0)
|
||||
ret = get_header(sp, LOG_DOPEEK, ver, tstamp, NULL, NULL);
|
||||
break;
|
||||
case LOG_VERSION_UBER:
|
||||
sp = krb5_storage_from_fd(server_context->log_context.log_fd);
|
||||
if (sp == NULL)
|
||||
return errno;
|
||||
if (krb5_storage_seek(sp, 0, SEEK_SET) == 0)
|
||||
ret = get_header(sp, LOG_DOPEEK, ver, tstamp, &op, &len);
|
||||
else
|
||||
ret = errno;
|
||||
if (ret == 0 && (op != kadm_nop || len != LOG_UBER_LEN || *ver != 0))
|
||||
ret = KADM5_LOG_NEEDS_UPGRADE;
|
||||
krb5_storage_free(sp);
|
||||
break;
|
||||
default:
|
||||
return ENOTSUP;
|
||||
ret = ENOTSUP;
|
||||
break;
|
||||
}
|
||||
|
||||
krb5_storage_free(sp);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -1830,12 +1827,14 @@ kadm5_log_recover(kadm5_server_context *context, enum kadm_recover_mode mode)
|
||||
replay_data.ver = 0;
|
||||
replay_data.mode = mode;
|
||||
|
||||
sp = kadm5_log_goto_end(context, context->log_context.log_fd);
|
||||
sp = krb5_storage_from_fd(context->log_context.log_fd);
|
||||
if (sp == NULL)
|
||||
return errno ? errno : EIO;
|
||||
ret = kadm5_log_goto_end(context, sp);
|
||||
|
||||
ret = kadm5_log_foreach(context, kadm_forward | kadm_unconfirmed,
|
||||
NULL, recover_replay, &replay_data);
|
||||
if (ret == 0)
|
||||
ret = kadm5_log_foreach(context, kadm_forward | kadm_unconfirmed,
|
||||
NULL, recover_replay, &replay_data);
|
||||
if (ret == 0 && mode == kadm_recover_commit && replay_data.count != 1)
|
||||
ret = KADM5_LOG_CORRUPT;
|
||||
krb5_storage_free(sp);
|
||||
@@ -1887,7 +1886,7 @@ kadm5_log_foreach(kadm5_server_context *context,
|
||||
*/
|
||||
sp = krb5_storage_from_fd(fd);
|
||||
if (sp == NULL)
|
||||
return errno;
|
||||
return errno ? errno : ENOMEM;
|
||||
|
||||
log_end = krb5_storage_seek(sp, 0, SEEK_END);
|
||||
if (log_end == -1 ||
|
||||
@@ -1898,9 +1897,12 @@ kadm5_log_foreach(kadm5_server_context *context,
|
||||
}
|
||||
} else {
|
||||
/* Get the end of the log based on the uber entry */
|
||||
sp = kadm5_log_goto_end(context, fd);
|
||||
sp = krb5_storage_from_fd(fd);
|
||||
if (sp == NULL)
|
||||
return errno;
|
||||
return errno ? errno : ENOMEM;
|
||||
ret = kadm5_log_goto_end(context, sp);
|
||||
if (ret != 0)
|
||||
return ret;
|
||||
log_end = krb5_storage_seek(sp, 0, SEEK_CUR);
|
||||
}
|
||||
|
||||
@@ -2049,81 +2051,50 @@ kadm5_log_foreach(kadm5_server_context *context,
|
||||
}
|
||||
|
||||
/*
|
||||
* Go to the second record, which, if we have an uber record, will be
|
||||
* the first record.
|
||||
* Go to the first record, which, if we have an uber record, will be
|
||||
* the second record.
|
||||
*/
|
||||
static krb5_storage *
|
||||
log_goto_first(kadm5_server_context *server_context, int fd)
|
||||
kadm5_ret_t
|
||||
kadm5_log_goto_first(kadm5_server_context *server_context, krb5_storage *sp)
|
||||
{
|
||||
krb5_storage *sp;
|
||||
enum kadm_ops op;
|
||||
uint32_t ver, len;
|
||||
kadm5_ret_t ret;
|
||||
|
||||
if (fd == -1) {
|
||||
errno = EINVAL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
sp = krb5_storage_from_fd(fd);
|
||||
if (sp == NULL)
|
||||
return NULL;
|
||||
|
||||
if (krb5_storage_seek(sp, 0, SEEK_SET) == -1)
|
||||
return NULL;
|
||||
return KADM5_LOG_CORRUPT;
|
||||
|
||||
ret = get_header(sp, LOG_DOPEEK, &ver, NULL, &op, &len);
|
||||
if (ret) {
|
||||
krb5_storage_free(sp);
|
||||
errno = ret;
|
||||
return NULL;
|
||||
}
|
||||
if (op == kadm_nop && len == LOG_UBER_LEN && seek_next(sp) == -1) {
|
||||
krb5_storage_free(sp);
|
||||
return NULL;
|
||||
}
|
||||
return sp;
|
||||
if (ret)
|
||||
return ret;
|
||||
if (op == kadm_nop && len == LOG_UBER_LEN && seek_next(sp) == -1)
|
||||
return KADM5_LOG_CORRUPT;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Go to end of log.
|
||||
*
|
||||
* XXX This really needs to return a kadm5_ret_t and either output a
|
||||
* krb5_storage * via an argument, or take one as input.
|
||||
*/
|
||||
|
||||
krb5_storage *
|
||||
kadm5_log_goto_end(kadm5_server_context *server_context, int fd)
|
||||
kadm5_ret_t
|
||||
kadm5_log_goto_end(kadm5_server_context *server_context, krb5_storage *sp)
|
||||
{
|
||||
krb5_error_code ret = 0;
|
||||
krb5_storage *sp;
|
||||
enum kadm_ops op;
|
||||
uint32_t ver, len;
|
||||
uint32_t tstamp;
|
||||
uint64_t off;
|
||||
|
||||
if (fd == -1) {
|
||||
errno = EINVAL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
sp = krb5_storage_from_fd(fd);
|
||||
if (sp == NULL)
|
||||
return NULL;
|
||||
|
||||
if (krb5_storage_seek(sp, 0, SEEK_SET) == -1) {
|
||||
ret = errno;
|
||||
goto fail;
|
||||
}
|
||||
if (krb5_storage_seek(sp, 0, SEEK_SET) == -1)
|
||||
return errno;
|
||||
ret = get_header(sp, LOG_NOPEEK, &ver, &tstamp, &op, &len);
|
||||
if (ret == HEIM_ERR_EOF) {
|
||||
(void) krb5_storage_seek(sp, 0, SEEK_SET);
|
||||
return sp;
|
||||
return 0;
|
||||
}
|
||||
if (ret == KADM5_LOG_CORRUPT)
|
||||
goto truncate;
|
||||
if (ret)
|
||||
goto fail;
|
||||
return ret;
|
||||
|
||||
if (op == kadm_nop && len == LOG_UBER_LEN) {
|
||||
/* New style log */
|
||||
@@ -2132,12 +2103,12 @@ kadm5_log_goto_end(kadm5_server_context *server_context, int fd)
|
||||
goto truncate;
|
||||
|
||||
if (krb5_storage_seek(sp, off, SEEK_SET) == -1)
|
||||
goto fail;
|
||||
return ret;
|
||||
|
||||
if (off >= LOG_UBER_SZ) {
|
||||
ret = get_version_prev(sp, &ver, NULL);
|
||||
if (ret == 0)
|
||||
return sp;
|
||||
return 0;
|
||||
}
|
||||
/* Invalid offset in uber entry */
|
||||
goto truncate;
|
||||
@@ -2155,7 +2126,7 @@ kadm5_log_goto_end(kadm5_server_context *server_context, int fd)
|
||||
ret = get_version_prev(sp, &ver, NULL);
|
||||
if (ret)
|
||||
goto truncate;
|
||||
return sp;
|
||||
return 0;
|
||||
|
||||
truncate:
|
||||
/* If we can, truncate */
|
||||
@@ -2164,18 +2135,64 @@ truncate:
|
||||
if (ret == 0) {
|
||||
krb5_warn(server_context->context, ret,
|
||||
"Invalid log; truncating to recover");
|
||||
if (krb5_storage_seek(sp, 0, SEEK_END) == -1)
|
||||
return NULL;
|
||||
return sp;
|
||||
if (krb5_storage_seek(sp, 0, SEEK_END) >= 0)
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
ret = KADM5_LOG_CORRUPT;
|
||||
krb5_warn(server_context->context, ret,
|
||||
"Invalid log; truncate to recover");
|
||||
return ret;
|
||||
}
|
||||
|
||||
fail:
|
||||
errno = ret;
|
||||
krb5_storage_free(sp);
|
||||
return NULL;
|
||||
/*
|
||||
* Return the next log entry.
|
||||
*
|
||||
* The pointer in `sp' is assumed to be at the end of an entry. On success,
|
||||
* the `sp' pointer is set to the next entry (not the data portion). In case
|
||||
* of error, it's not changed at all.
|
||||
*/
|
||||
kadm5_ret_t
|
||||
kadm5_log_next(krb5_context context,
|
||||
krb5_storage *sp,
|
||||
uint32_t *verp,
|
||||
time_t *tstampp,
|
||||
enum kadm_ops *opp,
|
||||
uint32_t *lenp)
|
||||
{
|
||||
uint32_t len = 0;
|
||||
uint32_t len2 = 0;
|
||||
uint32_t ver = verp ? *verp : 0;
|
||||
uint32_t ver2;
|
||||
uint32_t tstamp = tstampp ? *tstampp : 0;
|
||||
enum kadm_ops op = kadm_nop;
|
||||
off_t off = krb5_storage_seek(sp, 0, SEEK_CUR);
|
||||
kadm5_ret_t ret = get_header(sp, LOG_NOPEEK, &ver, &tstamp, &op, &len);
|
||||
|
||||
/* Validate the trailer */
|
||||
if (ret == 0 && krb5_storage_seek(sp, len, SEEK_CUR) == -1)
|
||||
ret = errno;
|
||||
|
||||
if (ret == 0)
|
||||
ret = krb5_ret_uint32(sp, &len2);
|
||||
if (ret == 0)
|
||||
ret = krb5_ret_uint32(sp, &ver2);
|
||||
if (ret == 0 && (len != len2 || ver != ver2))
|
||||
ret = KADM5_LOG_CORRUPT;
|
||||
if (ret != 0) {
|
||||
(void) krb5_storage_seek(sp, off, SEEK_SET);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (verp)
|
||||
*verp = ver;
|
||||
if (tstampp)
|
||||
*tstampp = tstamp;
|
||||
if (opp)
|
||||
*opp = op;
|
||||
if (lenp)
|
||||
*lenp = len;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -2549,11 +2566,15 @@ kadm5_log_truncate(kadm5_server_context *context, size_t keep, size_t maxbytes)
|
||||
|
||||
/* Done. Now rebuild the log_context state. */
|
||||
(void) lseek(context->log_context.log_fd, off, SEEK_SET);
|
||||
sp = kadm5_log_goto_end(context, context->log_context.log_fd);
|
||||
sp = krb5_storage_from_fd(context->log_context.log_fd);
|
||||
if (sp == NULL)
|
||||
return krb5_enomem(context->context);
|
||||
ret = get_version_prev(sp, &context->log_context.version, &last_tstamp);
|
||||
context->log_context.last_time = last_tstamp;
|
||||
return errno ? errno : krb5_enomem(context->context);
|
||||
ret = kadm5_log_goto_end(context, sp);
|
||||
if (ret == 0) {
|
||||
ret = get_version_prev(sp, &context->log_context.version, &last_tstamp);
|
||||
if (ret == 0)
|
||||
context->log_context.last_time = last_tstamp;
|
||||
}
|
||||
krb5_storage_free(sp);
|
||||
return ret;
|
||||
}
|
||||
|
@@ -68,6 +68,7 @@ HEIMDAL_KAMD5_SERVER_1.0 {
|
||||
kadm5_log_signal_master;
|
||||
kadm5_log_signal_socket;
|
||||
kadm5_log_previous;
|
||||
kadm5_log_goto_first;
|
||||
kadm5_log_goto_end;
|
||||
kadm5_log_foreach;
|
||||
kadm5_log_get_version_fd;
|
||||
@@ -80,6 +81,7 @@ HEIMDAL_KAMD5_SERVER_1.0 {
|
||||
kadm5_log_init_nb;
|
||||
kadm5_log_init_nolock;
|
||||
kadm5_log_init_sharedlock;
|
||||
kadm5_log_next;
|
||||
kadm5_log_nop;
|
||||
kadm5_log_truncate;
|
||||
kadm5_log_modify;
|
||||
|
Reference in New Issue
Block a user