Fix handling of uber record nominal version

When flushing the uber record, retain the current log version.  When the
uber record is the only (thus last) record in the log, return its nominal
version as the last version, not 0.  Upgrade the log if the current uber
record version number is not 0.
This commit is contained in:
Viktor Dukhovni
2016-06-09 09:11:30 +00:00
parent 316e0d2184
commit a5237e6940

View File

@@ -51,14 +51,18 @@ RCSID("$Id$");
* of an record's start or end one can traverse the log forwards and * of an record's start or end one can traverse the log forwards and
* backwards. * backwards.
* *
* The log always starts with a nop record that contains the offset (8 * The log always starts with a nop record (uber record) that contains the
* bytes) where the next record should start after the last one, and the * offset (8 bytes) of the first unconfirmed record (typically EOF), and the
* version number and timestamp of the last record: * version number and timestamp of the preceding last confirmed record:
* *
* offset of next new record 8 bytes * offset of next new record 8 bytes
* last record time 4 bytes * last record time 4 bytes
* last record version number 4 bytes * last record version number 4 bytes
* *
* When an iprop slave receives a complete database, it saves that version as
* the last confirmed version, without writing any other records to the log. We
* use that version as the basis for further updates.
*
* kadm5 write operations are done in this order: * kadm5 write operations are done in this order:
* *
* - replay unconfirmed log records * - replay unconfirmed log records
@@ -173,7 +177,8 @@ RCSID("$Id$");
#define LOG_HEADER_SZ (sizeof(uint32_t) * 4) #define LOG_HEADER_SZ (sizeof(uint32_t) * 4)
#define LOG_TRAILER_SZ (sizeof(uint32_t) * 2) #define LOG_TRAILER_SZ (sizeof(uint32_t) * 2)
#define LOG_WRAPPER_SZ (LOG_HEADER_SZ + LOG_TRAILER_SZ) #define LOG_WRAPPER_SZ (LOG_HEADER_SZ + LOG_TRAILER_SZ)
#define LOG_UBER_SZ (sizeof(uint64_t) + sizeof(uint32_t) * 2) #define LOG_UBER_LEN (sizeof(uint64_t) + sizeof(uint32_t) * 2)
#define LOG_UBER_SZ (LOG_WRAPPER_SZ + LOG_UBER_LEN)
#define LOG_NOPEEK 0 #define LOG_NOPEEK 0
#define LOG_DOPEEK 1 #define LOG_DOPEEK 1
@@ -376,6 +381,7 @@ log_corrupt:
/* /*
* Get the version of the entry ending at the current offset into sp. * Get the version of the entry ending at the current offset into sp.
* If it is the uber record, return its nominal version instead.
* *
* Returns HEIM_ERR_EOF if sp is at offset zero. * Returns HEIM_ERR_EOF if sp is at offset zero.
* *
@@ -385,7 +391,7 @@ static kadm5_ret_t
get_version_prev(krb5_storage *sp, uint32_t *verp, uint32_t *tstampp) get_version_prev(krb5_storage *sp, uint32_t *verp, uint32_t *tstampp)
{ {
krb5_error_code ret; krb5_error_code ret;
uint32_t ver2, len, len2; uint32_t ver, ver2, len, len2;
off_t off, prev_off, new_off; off_t off, prev_off, new_off;
*verp = 0; *verp = 0;
@@ -399,13 +405,28 @@ get_version_prev(krb5_storage *sp, uint32_t *verp, uint32_t *tstampp)
return HEIM_ERR_EOF; return HEIM_ERR_EOF;
/* Read the trailer and seek back */ /* Read the trailer and seek back */
prev_off = seek_prev(sp, verp, &len); prev_off = seek_prev(sp, &ver, &len);
if (prev_off == -1) if (prev_off == -1)
return errno; return errno;
/* Uber record? Return nominal version. */
if (prev_off == 0 && len == LOG_UBER_LEN && ver == 0) {
/* Skip 8 byte offset and 4 byte time */
if (krb5_storage_seek(sp, LOG_HEADER_SZ + 12, SEEK_SET)
!= LOG_HEADER_SZ + 12)
return errno;
ret = krb5_ret_uint32(sp, verp);
if (krb5_storage_seek(sp, 0, SEEK_SET) != 0)
return errno;
if (ret != 0)
return ret;
} else {
*verp = ver;
}
/* Verify that the trailer matches header */ /* Verify that the trailer matches header */
ret = get_header(sp, LOG_NOPEEK, &ver2, tstampp, NULL, &len2); ret = get_header(sp, LOG_NOPEEK, &ver2, tstampp, NULL, &len2);
if (ret || *verp != ver2 || len != len2) if (ret || ver != ver2 || len != len2)
goto log_corrupt; goto log_corrupt;
/* Preserve offset */ /* Preserve offset */
@@ -434,7 +455,7 @@ get_max_log_size(krb5_context context)
"kdc", "kdc",
"log-max-size", "log-max-size",
NULL); NULL);
if (n >= 4 * (LOG_UBER_SZ + LOG_WRAPPER_SZ) && n == (size_t)n) if (n >= 4 * (LOG_UBER_LEN + LOG_WRAPPER_SZ) && n == (size_t)n)
return (size_t)n; return (size_t)n;
return 0; return 0;
} }
@@ -446,12 +467,17 @@ static krb5_storage *log_goto_first(kadm5_server_context *, int);
* Get the version and timestamp metadata of either the first, or last * Get the version and timestamp metadata of either the first, or last
* confirmed entry in the log. * confirmed entry in the log.
* *
* If `which' is LOG_VERSION_UBER (0), then this gets the version number of the * If `which' is LOG_VERSION_UBER, then this gets the version number of the uber
* last committed entry (or if none, the version from any just-completed full * uber record which must be 0, or else we need to upgrade the log.
* prop from master) from the uber record. If `which' is LOG_VERSION_FIRST (1), *
* then this gets the metadata for the logically first entry past the uberblock. * If `which' is LOG_VERSION_FIRST, then this gets the metadata for the
* If `which' is LOG_VERSION_LAST (-1), then this gets metadata for the last * logically first entry past the uberblock, or returns HEIM_EOF if
* confirmed entry's version and timestpamp. * only the uber record is present.
*
* If `which' is LOG_VERSION_LAST, then this gets metadata for the last
* confirmed entry's version and timestamp. If only the uber record is present,
* then the version will be its "nominal" version, which may differ from its
* actual version (0).
* *
* The `fd''s offset will be set to the start of the header of the entry * The `fd''s offset will be set to the start of the header of the entry
* identified by `which'. * identified by `which'.
@@ -498,18 +524,8 @@ kadm5_log_get_version_fd(kadm5_server_context *server_context, int fd,
ret = get_header(sp, LOG_DOPEEK, ver, tstamp, &op, &len); ret = get_header(sp, LOG_DOPEEK, ver, tstamp, &op, &len);
else else
ret = errno; ret = errno;
if (ret == 0 && (op != kadm_nop || len != LOG_UBER_SZ)) if (ret == 0 && (op != kadm_nop || len != LOG_UBER_LEN || *ver != 0))
ret = KADM5_LOG_NEEDS_UPGRADE; ret = KADM5_LOG_NEEDS_UPGRADE;
if (ret != 0)
return ret;
/* Skip 8 byte offset and 4 byte time */
if (krb5_storage_seek(sp, LOG_HEADER_SZ + 12, SEEK_CUR)
!= LOG_HEADER_SZ + 12)
ret = errno;
if (ret == 0)
ret = krb5_ret_uint32(sp, ver);
if (krb5_storage_seek(sp, 0, SEEK_SET) != 0)
ret = errno;
krb5_storage_free(sp); krb5_storage_free(sp);
break; break;
default: default:
@@ -607,7 +623,7 @@ log_init(kadm5_server_context *server_context, int lock_mode)
{ {
int fd; int fd;
struct stat st; struct stat st;
uint32_t last_ver; uint32_t vno;
size_t maxbytes = get_max_log_size(server_context->context); size_t maxbytes = get_max_log_size(server_context->context);
kadm5_ret_t ret; kadm5_ret_t ret;
kadm5_log_context *log_context = &server_context->log_context; kadm5_log_context *log_context = &server_context->log_context;
@@ -634,8 +650,7 @@ log_init(kadm5_server_context *server_context, int lock_mode)
} }
if (ret == 0) { if (ret == 0) {
ret = kadm5_log_get_version_fd(server_context, fd, ret = kadm5_log_get_version_fd(server_context, fd,
LOG_VERSION_UBER, LOG_VERSION_UBER, &vno, NULL);
&log_context->version, NULL);
/* Upgrade the log if it was an old-style log */ /* Upgrade the log if it was an old-style log */
if (ret == KADM5_LOG_NEEDS_UPGRADE) if (ret == KADM5_LOG_NEEDS_UPGRADE)
@@ -643,17 +658,11 @@ log_init(kadm5_server_context *server_context, int lock_mode)
} }
if (ret == 0) if (ret == 0)
ret = kadm5_log_recover(server_context, kadm_recover_replay); ret = kadm5_log_recover(server_context, kadm_recover_replay);
} else {
ret = kadm5_log_get_version_fd(server_context, fd,
LOG_VERSION_UBER,
&log_context->version, NULL);
} }
if (ret == 0) { if (ret == 0) {
ret = kadm5_log_get_version_fd(server_context, fd, LOG_VERSION_LAST, ret = kadm5_log_get_version_fd(server_context, fd, LOG_VERSION_LAST,
&last_ver, NULL); &log_context->version, NULL);
if (ret == 0 && last_ver != 0 && last_ver != log_context->version)
ret = KADM5_LOG_CORRUPT;
if (ret == HEIM_ERR_EOF) if (ret == HEIM_ERR_EOF)
ret = 0; ret = 0;
} }
@@ -914,7 +923,9 @@ kadm5_log_flush(kadm5_server_context *context, krb5_storage *sp)
if (ret) if (ret)
return ret; return ret;
log_context->version = new_ver; /* Retain the nominal database version when flushing the uber record */
if (new_ver != 0)
log_context->version = new_ver;
return 0; return 0;
} }
@@ -1609,7 +1620,7 @@ log_update_uber(kadm5_server_context *context, off_t off)
/* If the first entry is not a 16-byte nop, ditto */ /* If the first entry is not a 16-byte nop, ditto */
ret = krb5_ret_uint32(sp, &len); ret = krb5_ret_uint32(sp, &len);
if (ret || len != LOG_UBER_SZ) if (ret || len != LOG_UBER_LEN)
goto out; goto out;
/* /*
@@ -1665,6 +1676,7 @@ kadm5_log_nop(kadm5_server_context *context, enum kadm_nop_type nop_type)
kadm5_ret_t ret; kadm5_ret_t ret;
kadm5_log_context *log_context = &context->log_context; kadm5_log_context *log_context = &context->log_context;
off_t off; off_t off;
uint32_t vno = log_context->version;
if (strcmp(log_context->log_file, "/dev/null") == 0) if (strcmp(log_context->log_file, "/dev/null") == 0)
return 0; return 0;
@@ -1674,8 +1686,7 @@ kadm5_log_nop(kadm5_server_context *context, enum kadm_nop_type nop_type)
return errno; return errno;
sp = krb5_storage_emem(); sp = krb5_storage_emem();
ret = kadm5_log_preamble(context, sp, kadm_nop, ret = kadm5_log_preamble(context, sp, kadm_nop, off == 0 ? 0 : vno + 1);
off == 0 ? 0 : log_context->version + 1);
if (ret) if (ret)
goto out; goto out;
@@ -1684,16 +1695,16 @@ kadm5_log_nop(kadm5_server_context *context, enum kadm_nop_type nop_type)
* First entry (uber-entry) gets room for offset of next new * First entry (uber-entry) gets room for offset of next new
* entry and time and version of last entry. * entry and time and version of last entry.
*/ */
ret = krb5_store_uint32(sp, LOG_UBER_SZ); ret = krb5_store_uint32(sp, LOG_UBER_LEN);
/* These get overwritten with the same values below */ /* These get overwritten with the same values below */
if (ret == 0) if (ret == 0)
ret = krb5_store_uint64(sp, LOG_WRAPPER_SZ + LOG_UBER_SZ); ret = krb5_store_uint64(sp, LOG_UBER_SZ);
if (ret == 0) if (ret == 0)
ret = krb5_store_uint32(sp, log_context->last_time); ret = krb5_store_uint32(sp, log_context->last_time);
if (ret == 0) if (ret == 0)
ret = krb5_store_uint32(sp, log_context->version); ret = krb5_store_uint32(sp, vno);
if (ret == 0) if (ret == 0)
ret = krb5_store_uint32(sp, LOG_UBER_SZ); ret = krb5_store_uint32(sp, LOG_UBER_LEN);
} else if (nop_type == kadm_nop_plain) { } else if (nop_type == kadm_nop_plain) {
ret = krb5_store_uint32(sp, 0); ret = krb5_store_uint32(sp, 0);
if (ret == 0) if (ret == 0)
@@ -1707,8 +1718,7 @@ kadm5_log_nop(kadm5_server_context *context, enum kadm_nop_type nop_type)
} }
if (ret == 0) if (ret == 0)
ret = kadm5_log_postamble(log_context, sp, ret = kadm5_log_postamble(log_context, sp, off == 0 ? 0 : vno + 1);
off == 0 ? 0 : log_context->version + 1);
if (ret == 0) if (ret == 0)
ret = kadm5_log_flush(context, sp); ret = kadm5_log_flush(context, sp);
@@ -2063,7 +2073,7 @@ log_goto_first(kadm5_server_context *server_context, int fd)
errno = ret; errno = ret;
return NULL; return NULL;
} }
if (op == kadm_nop && len == LOG_UBER_SZ && seek_next(sp) == -1) { if (op == kadm_nop && len == LOG_UBER_LEN && seek_next(sp) == -1) {
krb5_storage_free(sp); krb5_storage_free(sp);
return NULL; return NULL;
} }
@@ -2110,7 +2120,7 @@ kadm5_log_goto_end(kadm5_server_context *server_context, int fd)
if (ret) if (ret)
goto fail; goto fail;
if (op == kadm_nop && len == LOG_UBER_SZ) { if (op == kadm_nop && len == LOG_UBER_LEN) {
/* New style log */ /* New style log */
ret = krb5_ret_uint64(sp, &off); ret = krb5_ret_uint64(sp, &off);
if (ret) if (ret)
@@ -2119,7 +2129,7 @@ kadm5_log_goto_end(kadm5_server_context *server_context, int fd)
if (krb5_storage_seek(sp, off, SEEK_SET) == -1) if (krb5_storage_seek(sp, off, SEEK_SET) == -1)
goto fail; goto fail;
if (off >= LOG_WRAPPER_SZ + LOG_UBER_SZ) { if (off >= LOG_UBER_SZ) {
ret = get_version_prev(sp, &ver, NULL); ret = get_version_prev(sp, &ver, NULL);
if (ret == 0) if (ret == 0)
return sp; return sp;
@@ -2410,7 +2420,7 @@ kadm5_log_truncate(kadm5_server_context *context, size_t keep, size_t maxbytes)
krb5_data entries; krb5_data entries;
krb5_storage *sp; krb5_storage *sp;
ssize_t bytes; ssize_t bytes;
uint64_t head_sz, sz; uint64_t sz;
off_t off; off_t off;
if (maxbytes == 0) if (maxbytes == 0)
@@ -2438,11 +2448,8 @@ kadm5_log_truncate(kadm5_server_context *context, size_t keep, size_t maxbytes)
return EINVAL; return EINVAL;
} }
/* Size of the uber record */
head_sz = LOG_WRAPPER_SZ + LOG_UBER_SZ;
/* Check that entries.length won't overflow off_t */ /* Check that entries.length won't overflow off_t */
sz = head_sz + entries.length; sz = LOG_UBER_SZ + entries.length;
off = (off_t)sz; off = (off_t)sz;
if (off < 0 || off != sz || sz < entries.length) { if (off < 0 || off != sz || sz < entries.length) {
krb5_data_free(&entries); krb5_data_free(&entries);
@@ -2494,15 +2501,15 @@ kadm5_log_truncate(kadm5_server_context *context, size_t keep, size_t maxbytes)
if (ret == 0) if (ret == 0)
ret = krb5_store_uint32(sp, kadm_nop); /* end of preamble */ ret = krb5_store_uint32(sp, kadm_nop); /* end of preamble */
if (ret == 0) if (ret == 0)
ret = krb5_store_uint32(sp, LOG_UBER_SZ); /* end of header */ ret = krb5_store_uint32(sp, LOG_UBER_LEN); /* end of header */
if (ret == 0) if (ret == 0)
ret = krb5_store_uint64(sp, 0); ret = krb5_store_uint64(sp, LOG_UBER_SZ);
if (ret == 0) if (ret == 0)
ret = krb5_store_uint32(sp, 0); ret = krb5_store_uint32(sp, 0);
if (ret == 0) if (ret == 0)
ret = krb5_store_uint32(sp, context->log_context.version); ret = krb5_store_uint32(sp, context->log_context.version);
if (ret == 0) if (ret == 0)
ret = krb5_store_uint32(sp, LOG_UBER_SZ); ret = krb5_store_uint32(sp, LOG_UBER_LEN);
if (ret == 0) if (ret == 0)
ret = krb5_store_uint32(sp, 0); /* end of trailer */ ret = krb5_store_uint32(sp, 0); /* end of trailer */
if (ret == 0) { if (ret == 0) {
@@ -2524,7 +2531,7 @@ kadm5_log_truncate(kadm5_server_context *context, size_t keep, size_t maxbytes)
if (ret) { if (ret) {
krb5_warn(context->context, ret, "Unable to keep entries"); krb5_warn(context->context, ret, "Unable to keep entries");
(void) ftruncate(context->log_context.log_fd, head_sz); (void) ftruncate(context->log_context.log_fd, LOG_UBER_SZ);
(void) lseek(context->log_context.log_fd, 0, SEEK_SET); (void) lseek(context->log_context.log_fd, 0, SEEK_SET);
return ret; return ret;
} }