Fix iprop against legacy master with full log

When the master's log has all entries from version 1 to now, and no
uber entry (legacy master), then new slaves will not pull version 1,
because their uber record has version 1.

The fix is to force the uber version to 0 always, and avoid adding a
truncate nop when doing a full prop.  The uber record now records the
database version even in the absence of any other log entries so that
we know what to pull going forward.
This commit is contained in:
Viktor Dukhovni
2016-06-08 21:26:43 +00:00
parent 7d9fcb46b9
commit ffd0dda237
5 changed files with 110 additions and 55 deletions

View File

@@ -157,7 +157,7 @@ init(struct init_options *opt, int argc, char **argv)
krb5_warn(context, ret, "hdb_open");
return 0;
}
ret = kadm5_log_reinit(kadm_handle);
ret = kadm5_log_reinit(kadm_handle, 0);
if (ret)
krb5_err(context, 1, ret, "Failed iprop log initialization");
kadm5_log_end(kadm_handle);

View File

@@ -362,7 +362,7 @@ parse_extensions(char *str, HDB_extensions **e)
static int
doit(const char *filename, int mergep)
{
krb5_error_code ret;
krb5_error_code ret = 0;
FILE *f;
char s[8192]; /* XXX should fix this properly */
char *p;
@@ -378,13 +378,18 @@ doit(const char *filename, int mergep)
return 1;
}
/*
* We don't have a version number in the dump, so we don't know
* which iprop log entries to keep, if any. We throw the log away.
* We don't have a version number in the dump, so we don't know which iprop
* log entries to keep, if any. We throw the log away.
*
* We could merge the ipropd-master/slave dump/load here as an
* option, in which case we would first load the dump.
* We could merge the ipropd-master/slave dump/load here as an option, in
* which case we would first load the dump.
*
* If we're merging, first recover unconfirmed records in the existing log.
*/
ret = kadm5_log_reinit(kadm_handle);
if (mergep)
ret = kadm5_log_init(kadm_handle);
if (ret == 0)
ret = kadm5_log_reinit(kadm_handle, 0);
if (ret) {
fclose (f);
krb5_warn(context, ret, "kadm5_log_reinit");

View File

@@ -376,7 +376,10 @@ iprop_truncate(struct truncate_options *opt, int argc, char **argv)
opt->max_bytes_integer = 0;
if (opt->reset_flag) {
ret = kadm5_log_reinit(server_context);
/* First recover unconfirmed records */
ret = kadm5_log_init(server_context);
if (ret == 0)
ret = kadm5_log_reinit(server_context, 0);
} else {
ret = kadm5_log_init(server_context);
if (ret)

View File

@@ -467,7 +467,7 @@ reinit_log(krb5_context context,
if (verbose)
krb5_warnx(context, "truncating log on slave");
ret = kadm5_log_reinit(server_context);
ret = kadm5_log_reinit(server_context, vno);
if (ret)
krb5_err(context, IPROPD_RESTART_SLOW, ret, "kadm5_log_reinit");
}
@@ -567,7 +567,6 @@ receive_everything(krb5_context context, int fd,
if (ret)
krb5_err(context, IPROPD_RESTART_SLOW, ret, "db->rename");
server_context->log_context.version = vno;
return 0;

View File

@@ -117,8 +117,8 @@ RCSID("$Id$");
* - The zeroth record in new logs is a nop with a 16 byte payload:
*
* offset of end of last confirmed record 8 bytes
* version number of last confirmed record 4 bytes
* timestamp of last confirmed record 4 bytes
* version number of last confirmed record 4 bytes
*
* - New non-zeroth nop records:
*
@@ -446,11 +446,12 @@ static krb5_storage *log_goto_first(kadm5_server_context *, int);
* Get the version and timestamp metadata of either the first, or last
* confirmed entry in the log.
*
* If `which' is LOG_VERSION_UBER (0), then this gets the metadata for
* the uber record. If `which' is LOG_VERSION_FIRST (1), then this gets
* the metadata for the logically first entry past the uberblock. If
* `which' is LOG_VERSION_LAST (-1), then this gets metadata for the
* last confirmed entry's version and timestpamp.
* If `which' is LOG_VERSION_UBER (0), then this gets the version number of the
* last committed entry (or if none, the version from any just-completed full
* prop from master) from the uber record. If `which' is LOG_VERSION_FIRST (1),
* then this gets the metadata for the logically first entry past the uberblock.
* If `which' is LOG_VERSION_LAST (-1), then this gets metadata for the last
* confirmed entry's version and timestpamp.
*
* The `fd''s offset will be set to the start of the header of the entry
* identified by `which'.
@@ -499,6 +500,16 @@ kadm5_log_get_version_fd(kadm5_server_context *server_context, int fd,
ret = errno;
if (ret == 0 && (op != kadm_nop || len != LOG_UBER_SZ))
ret = KADM5_LOG_NEEDS_UPGRADE;
if (ret != 0)
return ret;
/* Skip 8 byte offset and 4 byte time */
if (krb5_storage_seek(sp, LOG_HEADER_SZ + 12, SEEK_CUR)
!= LOG_HEADER_SZ + 12)
ret = errno;
if (ret == 0)
ret = krb5_ret_uint32(sp, ver);
if (krb5_storage_seek(sp, 0, SEEK_SET) != 0)
ret = errno;
krb5_storage_free(sp);
break;
default:
@@ -529,19 +540,14 @@ kadm5_log_set_version(kadm5_server_context *context, uint32_t vno)
/*
* Open the log and setup server_context->log_context
*
* XXX: This and its callers should be named after "open", not "init"
*/
static kadm5_ret_t
log_init(kadm5_server_context *server_context, int lock_mode)
log_open(kadm5_server_context *server_context, int lock_mode)
{
int fd = -1;
int lock_it = 0;
int lock_nb = 0;
int oflags = O_RDWR;
struct stat st;
uint32_t uber_ver;
size_t maxbytes = get_max_log_size(server_context->context);
kadm5_ret_t ret;
kadm5_log_context *log_context = &server_context->log_context;
@@ -572,8 +578,7 @@ log_init(kadm5_server_context *server_context, int lock_mode)
if (fd < 0) {
ret = errno;
krb5_set_error_message(server_context->context, ret,
"kadm5_log_init: open %s",
log_context->log_file);
"log_open: open %s", log_context->log_file);
return ret;
}
lock_it = (lock_mode != LOCK_UN);
@@ -581,8 +586,7 @@ log_init(kadm5_server_context *server_context, int lock_mode)
if (lock_it && flock(fd, lock_mode | lock_nb) < 0) {
ret = errno;
krb5_set_error_message(server_context->context, ret,
"kadm5_log_init: flock %s",
log_context->log_file);
"log_open: flock %s", log_context->log_file);
if (fd != log_context->log_fd)
(void) close(fd);
return ret;
@@ -592,7 +596,32 @@ log_init(kadm5_server_context *server_context, int lock_mode)
log_context->lock_mode = lock_mode;
log_context->read_only = (lock_mode != LOCK_EX);
ret = 0;
return 0;
}
/*
* Open the log and setup server_context->log_context
*/
static kadm5_ret_t
log_init(kadm5_server_context *server_context, int lock_mode)
{
int fd;
struct stat st;
uint32_t last_ver;
size_t maxbytes = get_max_log_size(server_context->context);
kadm5_ret_t ret;
kadm5_log_context *log_context = &server_context->log_context;
if (strcmp(log_context->log_file, "/dev/null") == 0) {
/* log_context->log_fd should be -1 here */
return 0;
}
ret = log_open(server_context, lock_mode);
if (ret)
return ret;
fd = log_context->log_fd;
if (!log_context->read_only) {
if (fstat(fd, &st) == -1)
ret = errno;
@@ -605,7 +634,8 @@ log_init(kadm5_server_context *server_context, int lock_mode)
}
if (ret == 0) {
ret = kadm5_log_get_version_fd(server_context, fd,
LOG_VERSION_UBER, &uber_ver, NULL);
LOG_VERSION_UBER,
&log_context->version, NULL);
/* Upgrade the log if it was an old-style log */
if (ret == KADM5_LOG_NEEDS_UPGRADE)
@@ -613,11 +643,17 @@ log_init(kadm5_server_context *server_context, int lock_mode)
}
if (ret == 0)
ret = kadm5_log_recover(server_context, kadm_recover_replay);
} else {
ret = kadm5_log_get_version_fd(server_context, fd,
LOG_VERSION_UBER,
&log_context->version, NULL);
}
if (ret == 0) {
ret = kadm5_log_get_version_fd(server_context, fd, LOG_VERSION_LAST,
&log_context->version, NULL);
&last_ver, NULL);
if (ret == 0 && last_ver != 0 && last_ver != log_context->version)
ret = KADM5_LOG_CORRUPT;
if (ret == HEIM_ERR_EOF)
ret = 0;
}
@@ -662,12 +698,12 @@ kadm5_log_init_sharedlock(kadm5_server_context *server_context, int lock_flags)
* Reinitialize the log and open it
*/
kadm5_ret_t
kadm5_log_reinit(kadm5_server_context *server_context)
kadm5_log_reinit(kadm5_server_context *server_context, uint32_t vno)
{
int ret;
kadm5_log_context *log_context = &server_context->log_context;
ret = log_init(server_context, LOCK_EX);
ret = log_open(server_context, LOCK_EX);
if (ret)
return ret;
if (log_context->log_fd != -1) {
@@ -680,10 +716,10 @@ kadm5_log_reinit(kadm5_server_context *server_context)
return ret;
}
}
log_context->version = 0;
/* Write uber entry */
ret = kadm5_log_nop(server_context, kadm_nop_trunc);
/* Write uber entry and truncation nop with version `vno` */
log_context->version = vno;
ret = kadm5_log_nop(server_context, kadm_nop_plain);
return 0;
}
@@ -719,13 +755,14 @@ kadm5_log_end(kadm5_server_context *server_context)
static kadm5_ret_t
kadm5_log_preamble(kadm5_server_context *context,
krb5_storage *sp,
enum kadm_ops op)
enum kadm_ops op,
uint32_t vno)
{
kadm5_log_context *log_context = &context->log_context;
time_t now = time(NULL);
kadm5_ret_t ret;
ret = krb5_store_uint32(sp, log_context->version + 1);
ret = krb5_store_uint32(sp, vno);
if (ret)
return ret;
ret = krb5_store_uint32(sp, now);
@@ -741,9 +778,10 @@ kadm5_log_preamble(kadm5_server_context *context,
/* Writes the version part of the trailer */
static kadm5_ret_t
kadm5_log_postamble(kadm5_log_context *context,
krb5_storage *sp)
krb5_storage *sp,
uint32_t vno)
{
return krb5_store_uint32(sp, context->version + 1);
return krb5_store_uint32(sp, vno);
}
/*
@@ -849,10 +887,10 @@ kadm5_log_flush(kadm5_server_context *context, krb5_storage *sp)
return ret;
}
if (prev_ver != log_context->version)
if (prev_ver != 0 && prev_ver != log_context->version)
return EINVAL; /* Internal error, really; just a consistency check */
if (new_ver != prev_ver + 1) {
if (prev_ver != 0 && new_ver != prev_ver + 1) {
krb5_warnx(context->context, "refusing to write a log record "
"with non-monotonic version (new: %u, old: %u)",
new_ver, prev_ver);
@@ -919,7 +957,8 @@ kadm5_log_create(kadm5_server_context *context, hdb_entry *entry)
if (sp == NULL)
ret = ENOMEM;
if (ret == 0)
ret = kadm5_log_preamble(context, sp, kadm_create);
ret = kadm5_log_preamble(context, sp, kadm_create,
log_context->version + 1);
if (ret == 0)
ret = krb5_store_uint32(sp, value.length);
if (ret == 0) {
@@ -930,7 +969,8 @@ kadm5_log_create(kadm5_server_context *context, hdb_entry *entry)
if (ret == 0)
ret = krb5_store_uint32(sp, value.length);
if (ret == 0)
ret = kadm5_log_postamble(log_context, sp);
ret = kadm5_log_postamble(log_context, sp,
log_context->version + 1);
if (ret == 0)
ret = kadm5_log_flush(context, sp);
krb5_storage_free(sp);
@@ -1000,7 +1040,8 @@ kadm5_log_delete(kadm5_server_context *context,
if (sp == NULL)
ret = ENOMEM;
if (ret == 0)
ret = kadm5_log_preamble(context, sp, kadm_delete);
ret = kadm5_log_preamble(context, sp, kadm_delete,
log_context->version + 1);
if (ret) {
krb5_storage_free(sp);
return ret;
@@ -1041,7 +1082,8 @@ kadm5_log_delete(kadm5_server_context *context,
if (ret == 0)
ret = krb5_store_uint32(sp, len);
if (ret == 0)
ret = kadm5_log_postamble(log_context, sp);
ret = kadm5_log_postamble(log_context, sp,
log_context->version + 1);
if (ret == 0)
ret = kadm5_log_flush(context, sp);
if (ret == 0)
@@ -1128,7 +1170,8 @@ kadm5_log_rename(kadm5_server_context *context,
if (sp == NULL)
ret = ENOMEM;
if (ret == 0)
ret = kadm5_log_preamble(context, sp, kadm_rename);
ret = kadm5_log_preamble(context, sp, kadm_rename,
log_context->version + 1);
if (ret == 0)
ret = hdb_entry2value(context->context, entry, &value);
if (ret) {
@@ -1177,7 +1220,8 @@ kadm5_log_rename(kadm5_server_context *context,
if (ret == 0)
ret = krb5_store_uint32(sp, len);
if (ret == 0)
ret = kadm5_log_postamble(log_context, sp);
ret = kadm5_log_postamble(log_context, sp,
log_context->version + 1);
if (ret == 0)
ret = kadm5_log_flush(context, sp);
if (ret == 0)
@@ -1286,7 +1330,8 @@ kadm5_log_modify(kadm5_server_context *context,
if (value.length > len || len > INT32_MAX)
ret = E2BIG;
if (ret == 0)
ret = kadm5_log_preamble(context, sp, kadm_modify);
ret = kadm5_log_preamble(context, sp, kadm_modify,
log_context->version + 1);
if (ret == 0)
ret = krb5_store_uint32(sp, len);
if (ret == 0)
@@ -1299,7 +1344,8 @@ kadm5_log_modify(kadm5_server_context *context,
if (ret == 0)
ret = krb5_store_uint32(sp, len);
if (ret == 0)
ret = kadm5_log_postamble(log_context, sp);
ret = kadm5_log_postamble(log_context, sp,
log_context->version + 1);
if (ret == 0)
ret = kadm5_log_flush(context, sp);
if (ret == 0)
@@ -1628,7 +1674,8 @@ kadm5_log_nop(kadm5_server_context *context, enum kadm_nop_type nop_type)
return errno;
sp = krb5_storage_emem();
ret = kadm5_log_preamble(context, sp, kadm_nop);
ret = kadm5_log_preamble(context, sp, kadm_nop,
off == 0 ? 0 : log_context->version + 1);
if (ret)
goto out;
@@ -1660,12 +1707,13 @@ kadm5_log_nop(kadm5_server_context *context, enum kadm_nop_type nop_type)
}
if (ret == 0)
ret = kadm5_log_postamble(log_context, sp);
ret = kadm5_log_postamble(log_context, sp,
off == 0 ? 0 : log_context->version + 1);
if (ret == 0)
ret = kadm5_log_flush(context, sp);
if (ret == 0 && off == 0 && nop_type != kadm_nop_plain)
ret = kadm5_log_nop(context, nop_type); /* shouldn't happen */
ret = kadm5_log_nop(context, nop_type);
if (ret == 0 && off != 0)
ret = kadm5_log_recover(context, kadm_recover_commit);
@@ -2097,7 +2145,7 @@ kadm5_log_goto_end(kadm5_server_context *server_context, int fd)
truncate:
/* If we can, truncate */
if (server_context->log_context.lock_mode == LOCK_EX) {
ret = kadm5_log_reinit(server_context);
ret = kadm5_log_reinit(server_context, 0);
if (ret == 0) {
krb5_warn(server_context->context, ret,
"Invalid log; truncating to recover");
@@ -2440,7 +2488,7 @@ kadm5_log_truncate(kadm5_server_context *context, size_t keep, size_t maxbytes)
krb5_data_free(&entries);
return errno;
}
ret = krb5_store_uint32(sp, first - 1);
ret = krb5_store_uint32(sp, 0);
if (ret == 0)
ret = krb5_store_uint32(sp, now);
if (ret == 0)
@@ -2452,11 +2500,11 @@ kadm5_log_truncate(kadm5_server_context *context, size_t keep, size_t maxbytes)
if (ret == 0)
ret = krb5_store_uint32(sp, 0);
if (ret == 0)
ret = krb5_store_uint32(sp, 0); /* end of payload */
ret = krb5_store_uint32(sp, context->log_context.version);
if (ret == 0)
ret = krb5_store_uint32(sp, LOG_UBER_SZ);
if (ret == 0)
ret = krb5_store_uint32(sp, first - 1); /* end of trailer */
ret = krb5_store_uint32(sp, 0); /* end of trailer */
if (ret == 0) {
bytes = krb5_storage_write(sp, entries.data, entries.length);
if (bytes == -1)