[PATCH printk v5 5/6] printk: ringbuffer: add finalization/extension support

John Ogness john.ogness at linutronix.de
Mon Sep 14 08:33:53 EDT 2020


Add support for extending the newest data block. For this, introduce
a new finalization state (desc_finalized) denoting a committed
descriptor that cannot be extended.

Until a record is finalized, a writer can reopen that record to
append new data. Reopening a record means transitioning from the
desc_committed state back to the desc_reserved state.

A writer can explicitly finalize a record if there is no intention
of extending it. Also, records are automatically finalized when a
new record is reserved. This relieves writers of needing to
explicitly finalize while also making such records available to
readers sooner. (Readers can only traverse finalized records.)

Four new memory barrier pairs are introduced. Two of them are
insignificant additions (data_realloc:A/desc_read:D and
data_realloc:A/data_push_tail:B) because they are alternate path
memory barriers that exactly match the purpose, pairing, and
context of the two existing memory barrier pairs they provide an
alternate path for. The other two new memory barrier pairs are
significant additions:

desc_reopen_last:A / _prb_commit:B - When reopening a descriptor,
    ensure the state transitions back to desc_reserved before
    fully trusting the descriptor data.

_prb_commit:B / desc_reserve:D - When committing a descriptor,
    ensure the state transitions to desc_committed before checking
    the head ID to see if the descriptor needs to be finalized.

Signed-off-by: John Ogness <john.ogness at linutronix.de>
---
 Documentation/admin-guide/kdump/gdbmacros.txt |   3 +-
 kernel/printk/printk_ringbuffer.c             | 525 ++++++++++++++++--
 kernel/printk/printk_ringbuffer.h             |   6 +-
 scripts/gdb/linux/dmesg.py                    |   3 +-
 4 files changed, 480 insertions(+), 57 deletions(-)

diff --git a/Documentation/admin-guide/kdump/gdbmacros.txt b/Documentation/admin-guide/kdump/gdbmacros.txt
index 8f533b751c46..94fabb165abf 100644
--- a/Documentation/admin-guide/kdump/gdbmacros.txt
+++ b/Documentation/admin-guide/kdump/gdbmacros.txt
@@ -297,6 +297,7 @@ end
 define dmesg
 	# definitions from kernel/printk/printk_ringbuffer.h
 	set var $desc_committed = 1
+	set var $desc_finalized = 2
 	set var $desc_sv_bits = sizeof(long) * 8
 	set var $desc_flags_shift = $desc_sv_bits - 2
 	set var $desc_flags_mask = 3 << $desc_flags_shift
@@ -313,7 +314,7 @@ define dmesg
 
 		# skip non-committed record
 		set var $state = 3 & ($desc->state_var.counter >> $desc_flags_shift)
-		if ($state == $desc_committed)
+		if ($state == $desc_committed || $state == $desc_finalized)
 			dump_record $desc $prev_flags
 			set var $prev_flags = $desc->info.flags
 		end
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index 911fbe150e9a..4e526c79f89c 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -46,20 +46,26 @@
  * into a single descriptor field named @state_var, allowing ID and state to
  * be synchronously and atomically updated.
  *
- * Descriptors have three states:
+ * Descriptors have four states:
  *
  *   reserved
  *     A writer is modifying the record.
  *
  *   committed
- *     The record and all its data are complete and available for reading.
+ *     The record and all its data are written. A writer can reopen the
+ *     descriptor (transitioning it back to reserved), but in the committed
+ *     state the data is consistent.
+ *
+ *   finalized
+ *     The record and all its data are complete and available for reading. A
+ *     writer cannot reopen the descriptor.
  *
  *   reusable
  *     The record exists, but its text and/or dictionary data may no longer
  *     be available.
  *
  * Querying the @state_var of a record requires providing the ID of the
- * descriptor to query. This can yield a possible fourth (pseudo) state:
+ * descriptor to query. This can yield a possible fifth (pseudo) state:
  *
  *   miss
  *     The descriptor being queried has an unexpected ID.
@@ -79,6 +85,28 @@
  * committed or reusable queried state. This makes it possible that a valid
  * sequence number of the tail is always available.
  *
+ * Descriptor Finalization
+ * ~~~~~~~~~~~~~~~~~~~~~~~
+ * When a writer calls the commit function prb_commit(), record data is
+ * fully stored and is consistent within the ringbuffer. However, a writer can
+ * reopen that record, claiming exclusive access (as with prb_reserve()), and
+ * modify that record. When finished, the writer must again commit the record.
+ *
+ * In order for a record to be made available to readers (and also become
+ * recyclable for writers), it must be finalized. A finalized record cannot be
+ * reopened and can never become "unfinalized". Record finalization can occur
+ * in three different scenarios:
+ *
+ *   1) A writer can simultaneously commit and finalize its record by calling
+ *      prb_final_commit() instead of prb_commit().
+ *
+ *   2) When a new record is reserved and the previous record has been
+ *      committed via prb_commit(), that previous record is automatically
+ *      finalized.
+ *
+ *   3) When a record is committed via prb_commit() and a newer record
+ *      already exists, the record being committed is automatically finalized.
+ *
  * Data Rings
  * ~~~~~~~~~~
  * The two data rings (text and dictionary) function identically. They exist
@@ -97,7 +125,7 @@
  * are met:
  *
  *   1) The descriptor associated with the data block is in the committed
- *      queried state.
+ *      or finalized queried state.
  *
  *   2) The blk_lpos struct within the descriptor associated with the data
  *      block references back to the same data block.
@@ -156,9 +184,38 @@
  *
  *		r.info->ts_nsec = local_clock();
  *
+ *		prb_final_commit(&e);
+ *	}
+ *
+ * Note that additional writer functions are available to extend a record
+ * after it has been committed but not yet finalized. This can be done as
+ * long as no new records have been reserved and the caller is the same.
+ *
+ * Sample writer code (record extending)::
+ *
+ *		// alternate rest of previous example
+ *		r.info->ts_nsec = local_clock();
+ *		r.info->text_len = strlen(textstr);
+ *		r.info->caller_id = printk_caller_id();
+ *
+ *		// commit the record (but do not finalize yet)
  *		prb_commit(&e);
  *	}
  *
+ *	...
+ *
+ *	// specify additional 5 bytes text space to extend
+ *	prb_rec_init_wr(&r, 5, 0);
+ *
+ *	if (prb_reserve_in_last(&e, &test_rb, &r, printk_caller_id())) {
+ *		snprintf(&r.text_buf[r.info->text_len],
+ *			 r.text_buf_size - r.info->text_len, "hello");
+ *
+ *		r.info->text_len += 5;
+ *
+ *		prb_final_commit(&e);
+ *	}
+ *
  * Sample reader code::
  *
  *	struct printk_info info;
@@ -236,15 +293,21 @@
  *   desc_reserve:F / desc_read:D
  *     set new descriptor id and reserved (state), then allow writer changes
  *
- *   data_alloc:A / desc_read:D
+ *   data_alloc:A (or data_realloc:A) / desc_read:D
  *     set old descriptor reusable (state), then modify new data block area
  *
- *   data_alloc:A / data_push_tail:B
+ *   data_alloc:A (or data_realloc:A) / data_push_tail:B
  *     push data tail (lpos), then modify new data block area
  *
- *   prb_commit:B / desc_read:B
+ *   _prb_commit:B / desc_read:B
  *     store writer changes, then set new descriptor committed (state)
  *
+ *   desc_reopen_last:A / _prb_commit:B
+ *     set descriptor reserved (state), then read descriptor data
+ *
+ *   _prb_commit:B / desc_reserve:D
+ *     set new descriptor committed (state), then check descriptor head (id)
+ *
  *   data_push_tail:D / data_push_tail:A
  *     set descriptor reusable (state), then push data tail (lpos)
  *
@@ -380,16 +443,16 @@ static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
 	/*
 	 * Guarantee the state is loaded before copying the descriptor
 	 * content. This avoids copying obsolete descriptor content that might
-	 * not apply to the descriptor state. This pairs with prb_commit:B.
+	 * not apply to the descriptor state. This pairs with _prb_commit:B.
 	 *
 	 * Memory barrier involvement:
 	 *
-	 * If desc_read:A reads from prb_commit:B, then desc_read:C reads
-	 * from prb_commit:A.
+	 * If desc_read:A reads from _prb_commit:B, then desc_read:C reads
+	 * from _prb_commit:A.
 	 *
 	 * Relies on:
 	 *
-	 * WMB from prb_commit:A to prb_commit:B
+	 * WMB from _prb_commit:A to _prb_commit:B
 	 *    matching
 	 * RMB from desc_read:A to desc_read:C
 	 */
@@ -420,7 +483,8 @@ static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
 	 *
 	 * 2. Guarantee the record data is loaded before re-checking the
 	 *    state. This avoids reading an obsolete descriptor state that may
-	 *    not apply to the copied data. This pairs with data_alloc:A.
+	 *    not apply to the copied data. This pairs with data_alloc:A and
+	 *    data_realloc:A.
 	 *
 	 *    Memory barrier involvement:
 	 *
@@ -446,19 +510,19 @@ static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
 }
 
 /*
- * Take a specified descriptor out of the committed state by attempting
- * the transition from committed to reusable. Either this context or some
+ * Take a specified descriptor out of the finalized state by attempting
+ * the transition from finalized to reusable. Either this context or some
  * other context will have been successful.
  */
 static void desc_make_reusable(struct prb_desc_ring *desc_ring,
 			       unsigned long id)
 {
-	unsigned long val_committed = DESC_SV(id, desc_committed);
+	unsigned long val_finalized = DESC_SV(id, desc_finalized);
 	unsigned long val_reusable = DESC_SV(id, desc_reusable);
 	struct prb_desc *desc = to_desc(desc_ring, id);
 	atomic_long_t *state_var = &desc->state_var;
 
-	atomic_long_cmpxchg_relaxed(state_var, val_committed,
+	atomic_long_cmpxchg_relaxed(state_var, val_finalized,
 				    val_reusable); /* LMM(desc_make_reusable:A) */
 }
 
@@ -467,7 +531,7 @@ static void desc_make_reusable(struct prb_desc_ring *desc_ring,
  * data block from @lpos_begin until @lpos_end into the reusable state.
  *
  * If there is any problem making the associated descriptor reusable, either
- * the descriptor has not yet been committed or another writer context has
+ * the descriptor has not yet been finalized or another writer context has
  * already pushed the tail lpos past the problematic data block. Regardless,
  * on error the caller can re-load the tail lpos to determine the situation.
  */
@@ -511,10 +575,10 @@ static bool data_make_reusable(struct printk_ringbuffer *rb,
 
 		switch (d_state) {
 		case desc_miss:
-			return false;
 		case desc_reserved:
-			return false;
 		case desc_committed:
+			return false;
+		case desc_finalized:
 			/*
 			 * This data block is invalid if the descriptor
 			 * does not point back to it.
@@ -599,7 +663,7 @@ static bool data_push_tail(struct printk_ringbuffer *rb,
 			 *    data_make_reusable() may be due to a newly
 			 *    recycled data area causing the tail lpos to
 			 *    have been previously pushed. This pairs with
-			 *    data_alloc:A.
+			 *    data_alloc:A and data_realloc:A.
 			 *
 			 *    Memory barrier involvement:
 			 *
@@ -712,8 +776,9 @@ static bool desc_push_tail(struct printk_ringbuffer *rb,
 		 */
 		return true;
 	case desc_reserved:
-		return false;
 	case desc_committed:
+		return false;
+	case desc_finalized:
 		desc_make_reusable(desc_ring, tail_id);
 		break;
 	case desc_reusable:
@@ -734,7 +799,7 @@ static bool desc_push_tail(struct printk_ringbuffer *rb,
 
 	/*
 	 * Check the next descriptor after @tail_id before pushing the tail
-	 * to it because the tail must always be in a committed or reusable
+	 * to it because the tail must always be in a finalized or reusable
 	 * state. The implementation of prb_first_seq() relies on this.
 	 *
 	 * A successful read implies that the next descriptor is less than or
@@ -743,7 +808,7 @@ static bool desc_push_tail(struct printk_ringbuffer *rb,
 	 */
 	d_state = desc_read(desc_ring, DESC_ID(tail_id + 1), &desc); /* LMM(desc_push_tail:A) */
 
-	if (d_state == desc_committed || d_state == desc_reusable) {
+	if (d_state == desc_finalized || d_state == desc_reusable) {
 		/*
 		 * Guarantee any descriptor states that have transitioned to
 		 * reusable are stored before pushing the tail ID. This allows
@@ -878,6 +943,10 @@ static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
 		 *    another CPU may have pushed the tail ID. This pairs
 		 *    with desc_push_tail:C and this also pairs with
 		 *    prb_first_seq:C.
+		 *
+		 * 5. Guarantee the head ID is stored before trying to
+		 *    finalize the previous descriptor. This pairs with
+		 *    _prb_commit:B.
 		 */
 	} while (!atomic_long_try_cmpxchg(&desc_ring->head_id, &head_id,
 					  id)); /* LMM(desc_reserve:D) */
@@ -1007,6 +1076,84 @@ static char *data_alloc(struct printk_ringbuffer *rb,
 	return &blk->data[0];
 }
 
+/*
+ * Try to resize an existing data block associated with the descriptor
+ * specified by @id. If the resized data block should become wrapped, it
+ * copies the old data to the new data block. If @size yields a data block
+ * with the same or less size, the data block is left as is.
+ *
+ * Fail if this is not the last allocated data block or if there is not
+ * enough space or it is not possible make enough space.
+ *
+ * Return a pointer to the beginning of the entire data buffer or NULL on
+ * failure.
+ */
+static char *data_realloc(struct printk_ringbuffer *rb,
+			  struct prb_data_ring *data_ring, unsigned int size,
+			  struct prb_data_blk_lpos *blk_lpos, unsigned long id)
+{
+	struct prb_data_block *blk;
+	unsigned long head_lpos;
+	unsigned long next_lpos;
+	bool wrapped;
+
+	/* Reallocation only works if @blk_lpos is the newest data block. */
+	head_lpos = atomic_long_read(&data_ring->head_lpos);
+	if (head_lpos != blk_lpos->next)
+		return NULL;
+
+	/* Keep track if @blk_lpos was a wrapping data block. */
+	wrapped = (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, blk_lpos->next));
+
+	size = to_blk_size(size);
+
+	next_lpos = get_next_lpos(data_ring, blk_lpos->begin, size);
+
+	/* If the data block does not increase, there is nothing to do. */
+	if (head_lpos - next_lpos < DATA_SIZE(data_ring)) {
+		blk = to_block(data_ring, blk_lpos->begin);
+		return &blk->data[0];
+	}
+
+	if (!data_push_tail(rb, data_ring, next_lpos - DATA_SIZE(data_ring)))
+		return NULL;
+
+	/* The memory barrier involvement is the same as data_alloc:A. */
+	if (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &head_lpos,
+				     next_lpos)) { /* LMM(data_realloc:A) */
+		return NULL;
+	}
+
+	blk = to_block(data_ring, blk_lpos->begin);
+
+	if (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, next_lpos)) {
+		struct prb_data_block *old_blk = blk;
+
+		/* Wrapping data blocks store their data at the beginning. */
+		blk = to_block(data_ring, 0);
+
+		/*
+		 * Store the ID on the wrapped block for consistency.
+		 * The printk_ringbuffer does not actually use it.
+		 */
+		blk->id = id;
+
+		if (!wrapped) {
+			/*
+			 * Since the allocated space is now in the newly
+			 * created wrapping data block, copy the content
+			 * from the old data block.
+			 */
+			memcpy(&blk->data[0], &old_blk->data[0],
+			       (blk_lpos->next - blk_lpos->begin) - sizeof(blk->id));
+		}
+	}
+
+	blk_lpos->next = next_lpos;
+
+	return &blk->data[0];
+}
+
 /* Return the number of bytes used by a data block. */
 static unsigned int space_used(struct prb_data_ring *data_ring,
 			       struct prb_data_blk_lpos *blk_lpos)
@@ -1087,6 +1234,206 @@ static const char *get_data(struct prb_data_ring *data_ring,
 	return &db->data[0];
 }
 
+/*
+ * Attempt to transition the newest descriptor from committed back to reserved
+ * so that the record can be modified by a writer again. This is only possible
+ * if the descriptor is not yet finalized and the provided @caller_id matches.
+ */
+static struct prb_desc *desc_reopen_last(struct prb_desc_ring *desc_ring,
+					 u32 caller_id, unsigned long *id_out)
+{
+	unsigned long prev_state_val;
+	enum desc_state d_state;
+	struct prb_desc desc;
+	struct prb_desc *d;
+	unsigned long id;
+
+	id = atomic_long_read(&desc_ring->head_id);
+
+	/*
+	 * To reduce unnecessarily reopening, first check if the descriptor
+	 * state and caller ID are correct.
+	 */
+	d_state = desc_read(desc_ring, id, &desc);
+	if (d_state != desc_committed || desc.info.caller_id != caller_id)
+		return NULL;
+
+	d = to_desc(desc_ring, id);
+
+	prev_state_val = DESC_SV(id, desc_committed);
+
+	/*
+	 * Guarantee the reserved state is stored before reading any
+	 * record data. A full memory barrier is needed because @state_var
+	 * modification is followed by reading. This pairs with _prb_commit:B.
+	 *
+	 * Memory barrier involvement:
+	 *
+	 * If desc_reopen_last:A reads from _prb_commit:B, then
+	 * prb_reserve_in_last:A reads from _prb_commit:A.
+	 *
+	 * Relies on:
+	 *
+	 * WMB from _prb_commit:A to _prb_commit:B
+	 *    matching
+	 * MB If desc_reopen_last:A to prb_reserve_in_last:A
+	 */
+	if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
+			DESC_SV(id, desc_reserved))) { /* LMM(desc_reopen_last:A) */
+		return NULL;
+	}
+
+	*id_out = id;
+	return d;
+}
+
+/**
+ * prb_reserve_in_last() - Re-reserve and extend the space in the ringbuffer
+ *                         used by the newest record.
+ *
+ * @e:         The entry structure to setup.
+ * @rb:        The ringbuffer to re-reserve and extend data in.
+ * @r:         The record structure to allocate buffers for.
+ * @caller_id: The caller ID of the caller (reserving writer).
+ *
+ * This is the public function available to writers to re-reserve and extend
+ * data.
+ *
+ * The writer specifies the text size to extend (not the new total size) by
+ * setting the @text_buf_size field of @r. Extending dictionaries is not
+ * supported, so @dict_buf_size of @r should be set to 0. To ensure proper
+ * initialization of @r, prb_rec_init_wr() should be used.
+ *
+ * This function will fail if @caller_id does not match the caller ID of the
+ * newest record. In that case the caller must reserve new data using
+ * prb_reserve().
+ *
+ * Context: Any context. Disables local interrupts on success.
+ * Return: true if text data could be extended, otherwise false.
+ *
+ * On success:
+ *
+ *   - @r->text_buf points to the beginning of the entire text buffer.
+ *
+ *   - @r->text_buf_size is set to the new total size of the buffer.
+ *
+ *   - @r->dict_buf and @r->dict_buf_size are cleared because extending
+ *     the dict buffer is not supported.
+ *
+ *   - @r->info is not touched so that @r->info->text_len could be used
+ *     to append the text.
+ *
+ *   - prb_record_text_space() can be used on @e to query the new
+ *     actually used space.
+ *
+ * Important: All @r->info fields will already be set with the current values
+ *            for the record. I.e. @r->info->text_len will be less than
+ *            @text_buf_size and @r->info->dict_len may be set, even though
+ *            @dict_buf_size is 0. Writers can use @r->info->text_len to know
+ *            where concatenation begins and writers should update
+ *            @r->info->text_len after concatenating.
+ */
+bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+			 struct printk_record *r, u32 caller_id)
+{
+	unsigned int data_size;
+	struct prb_desc *d;
+	unsigned long id;
+
+	local_irq_save(e->irqflags);
+
+	/* Transition the newest descriptor back to the reserved state. */
+	d = desc_reopen_last(&rb->desc_ring, caller_id, &id);
+	if (!d) {
+		local_irq_restore(e->irqflags);
+		goto fail_reopen;
+	}
+
+	/* Now the writer has exclusive access: LMM(prb_reserve_in_last:A) */
+
+	/*
+	 * Set the @e fields here so that prb_commit() can be used if
+	 * anything fails from now on.
+	 */
+	e->rb = rb;
+	e->id = id;
+
+	/*
+	 * desc_reopen_last() checked the caller_id, but there was no
+	 * exclusive access at that point. The descriptor may have
+	 * changed since then.
+	 */
+	if (caller_id != d->info.caller_id)
+		goto fail;
+
+	if (BLK_DATALESS(&d->text_blk_lpos)) {
+		if (WARN_ON_ONCE(d->info.text_len != 0)) {
+			pr_warn_once("wrong text_len value (%hu, expecting 0)\n",
+				     d->info.text_len);
+			d->info.text_len = 0;
+		}
+
+		if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
+			goto fail;
+
+		r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
+					 &d->text_blk_lpos, id);
+	} else {
+		if (!get_data(&rb->text_data_ring, &d->text_blk_lpos, &data_size))
+			goto fail;
+
+		/*
+		 * Increase the buffer size to include the original size. If
+		 * the meta data (@text_len) is not sane, use the full data
+		 * block size.
+		 */
+		if (WARN_ON_ONCE(d->info.text_len > data_size)) {
+			pr_warn_once("wrong text_len value (%hu, expecting <=%u)\n",
+				     d->info.text_len, data_size);
+			d->info.text_len = data_size;
+		}
+		r->text_buf_size += d->info.text_len;
+
+		if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
+			goto fail;
+
+		r->text_buf = data_realloc(rb, &rb->text_data_ring, r->text_buf_size,
+					   &d->text_blk_lpos, id);
+	}
+	if (r->text_buf_size && !r->text_buf)
+		goto fail;
+
+	/* Although dictionary data may be in use, it cannot be extended. */
+	r->dict_buf = NULL;
+	r->dict_buf_size = 0;
+
+	r->info = &d->info;
+
+	e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
+
+	return true;
+fail:
+	prb_commit(e);
+	/* prb_commit() re-enabled interrupts. */
+fail_reopen:
+	/* Make it clear to the caller that the re-reserve failed. */
+	memset(r, 0, sizeof(*r));
+	return false;
+}
+
+/*
+ * Attempt to finalize a specified descriptor. If this fails, the descriptor
+ * is either already final or it will finalize itself when the writer commits.
+ */
+static void desc_make_final(struct prb_desc_ring *desc_ring, unsigned long id)
+{
+	unsigned long prev_state_val = DESC_SV(id, desc_committed);
+	struct prb_desc *d = to_desc(desc_ring, id);
+
+	atomic_long_cmpxchg_relaxed(&d->state_var, prev_state_val,
+			DESC_SV(id, desc_finalized)); /* LMM(desc_make_final:A) */
+}
+
 /**
  * prb_reserve() - Reserve space in the ringbuffer.
  *
@@ -1180,6 +1527,15 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
 	else
 		d->info.seq = seq + DESCS_COUNT(desc_ring);
 
+	/*
+	 * New data is about to be reserved. Once that happens, previous
+	 * descriptors are no longer able to be extended. Finalize the
+	 * previous descriptor now so that it can be made available to
+	 * readers. (For seq==0 there is no previous descriptor.)
+	 */
+	if (d->info.seq > 0)
+		desc_make_final(desc_ring, DESC_ID(id - 1));
+
 	r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
 				 &d->text_blk_lpos, id);
 	/* If text data allocation fails, a data-less record is committed. */
@@ -1210,33 +1566,40 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
 	return false;
 }
 
-/**
- * prb_commit() - Commit (previously reserved) data to the ringbuffer.
- *
- * @e: The entry containing the reserved data information.
- *
- * This is the public function available to writers to commit data.
- *
- * Context: Any context. Enables local interrupts.
- */
-void prb_commit(struct prb_reserved_entry *e)
+/* Commit the data (possibly finalizing it) and restore interrupts. */
+static void _prb_commit(struct prb_reserved_entry *e, unsigned long state_val)
 {
 	struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
 	struct prb_desc *d = to_desc(desc_ring, e->id);
 	unsigned long prev_state_val = DESC_SV(e->id, desc_reserved);
 
-	/* Now the writer has finished all writing: LMM(prb_commit:A) */
+	/* Now the writer has finished all writing: LMM(_prb_commit:A) */
 
 	/*
 	 * Set the descriptor as committed. See "ABA Issues" about why
 	 * cmpxchg() instead of set() is used.
 	 *
-	 * Guarantee all record data is stored before the descriptor state
-	 * is stored as committed. A write memory barrier is sufficient for
-	 * this. This pairs with desc_read:B.
+	 * 1  Guarantee all record data is stored before the descriptor state
+	 *    is stored as committed. A write memory barrier is sufficient
+	 *    for this. This pairs with desc_read:B and desc_reopen_last:A.
+	 *
+	 * 2. Guarantee the descriptor state is stored as committed before
+	 *    re-checking the head ID in order to possibly finalize this
+	 *    descriptor. This pairs with desc_reserve:D.
+	 *
+	 *    Memory barrier involvement:
+	 *
+	 *    If prb_commit:A reads from desc_reserve:D, then
+	 *    desc_make_final:A reads from _prb_commit:B.
+	 *
+	 *    Relies on:
+	 *
+	 *    MB _prb_commit:B to prb_commit:A
+	 *       matching
+	 *    MB desc_reserve:D to desc_make_final:A
 	 */
 	if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
-			DESC_SV(e->id, desc_committed))) { /* LMM(prb_commit:B) */
+			DESC_SV(e->id, state_val))) { /* LMM(_prb_commit:B) */
 		WARN_ON_ONCE(1);
 	}
 
@@ -1244,6 +1607,59 @@ void prb_commit(struct prb_reserved_entry *e)
 	local_irq_restore(e->irqflags);
 }
 
+/**
+ * prb_commit() - Commit (previously reserved) data to the ringbuffer.
+ *
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit data.
+ *
+ * Note that the data is not yet available to readers until it is finalized.
+ * Finalizing happens automatically when space for the next record is
+ * reserved.
+ *
+ * See prb_final_commit() for a version of this function that finalizes
+ * immediately.
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_commit(struct prb_reserved_entry *e)
+{
+	struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
+	unsigned long head_id;
+
+	_prb_commit(e, desc_committed);
+
+	/*
+	 * If this descriptor is no longer the head (i.e. a new record has
+	 * been allocated), extending the data for this record is no longer
+	 * allowed and therefore it must be finalized.
+	 */
+	head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */
+	if (head_id != e->id)
+		desc_make_final(desc_ring, e->id);
+}
+
+/**
+ * prb_final_commit() - Commit and finalize (previously reserved) data to
+ *                      the ringbuffer.
+ *
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit+finalize data.
+ *
+ * By finalizing, the data is made immediately available to readers.
+ *
+ * This function should only be used if there are no intentions of extending
+ * this data using prb_reserve_in_last().
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_final_commit(struct prb_reserved_entry *e)
+{
+	_prb_commit(e, desc_finalized);
+}
+
 /*
  * Count the number of lines in provided text. All text has at least 1 line
  * (even if @text_size is 0). Each '\n' processed is counted as an additional
@@ -1295,7 +1711,7 @@ static bool copy_data(struct prb_data_ring *data_ring,
 	 * because of the trailing alignment padding.
 	 */
 	if (WARN_ON_ONCE(data_size < (unsigned int)len)) {
-		pr_warn_once("wrong data size (%u, expecting %hu) for data: %.*s\n",
+		pr_warn_once("wrong data size (%u, expecting >=%hu) for data: %.*s\n",
 			     data_size, len, data_size, data);
 		return false;
 	}
@@ -1316,16 +1732,16 @@ static bool copy_data(struct prb_data_ring *data_ring,
 
 /*
  * This is an extended version of desc_read(). It gets a copy of a specified
- * descriptor. However, it also verifies that the record is committed and has
+ * descriptor. However, it also verifies that the record is finalized and has
  * the sequence number @seq. On success, 0 is returned.
  *
  * Error return values:
- * -EINVAL: A committed record with sequence number @seq does not exist.
- * -ENOENT: A committed record with sequence number @seq exists, but its data
+ * -EINVAL: A finalized record with sequence number @seq does not exist.
+ * -ENOENT: A finalized record with sequence number @seq exists, but its data
  *          is not available. This is a valid record, so readers should
  *          continue with the next record.
  */
-static int desc_read_committed_seq(struct prb_desc_ring *desc_ring,
+static int desc_read_finalized_seq(struct prb_desc_ring *desc_ring,
 				   unsigned long id, u64 seq,
 				   struct prb_desc *desc_out)
 {
@@ -1336,11 +1752,12 @@ static int desc_read_committed_seq(struct prb_desc_ring *desc_ring,
 
 	/*
 	 * An unexpected @id (desc_miss) or @seq mismatch means the record
-	 * does not exist. A descriptor in the reserved state means the
-	 * record does not yet exist for the reader.
+	 * does not exist. A descriptor in the reserved or committed state
+	 * means the record does not yet exist for the reader.
 	 */
 	if (d_state == desc_miss ||
 	    d_state == desc_reserved ||
+	    d_state == desc_committed ||
 	    desc_out->info.seq != seq) {
 		return -EINVAL;
 	}
@@ -1362,7 +1779,7 @@ static int desc_read_committed_seq(struct prb_desc_ring *desc_ring,
  * Copy the ringbuffer data from the record with @seq to the provided
  * @r buffer. On success, 0 is returned.
  *
- * See desc_read_committed_seq() for error return values.
+ * See desc_read_finalized_seq() for error return values.
  */
 static int prb_read(struct printk_ringbuffer *rb, u64 seq,
 		    struct printk_record *r, unsigned int *line_count)
@@ -1378,7 +1795,7 @@ static int prb_read(struct printk_ringbuffer *rb, u64 seq,
 	id = DESC_ID(atomic_long_read(state_var));
 
 	/* Get a local copy of the correct descriptor (if available). */
-	err = desc_read_committed_seq(desc_ring, id, seq, &desc);
+	err = desc_read_finalized_seq(desc_ring, id, seq, &desc);
 
 	/*
 	 * If @r is NULL, the caller is only interested in the availability
@@ -1408,8 +1825,8 @@ static int prb_read(struct printk_ringbuffer *rb, u64 seq,
 			r->info->dict_len = 0;
 	}
 
-	/* Ensure the record is still committed and has the same @seq. */
-	return desc_read_committed_seq(desc_ring, id, seq, &desc);
+	/* Ensure the record is still finalized and has the same @seq. */
+	return desc_read_finalized_seq(desc_ring, id, seq, &desc);
 }
 
 /* Get the sequence number of the tail descriptor. */
@@ -1427,9 +1844,9 @@ static u64 prb_first_seq(struct printk_ringbuffer *rb)
 
 		/*
 		 * This loop will not be infinite because the tail is
-		 * _always_ in the committed or reusable state.
+		 * _always_ in the finalized or reusable state.
 		 */
-		if (d_state == desc_committed || d_state == desc_reusable)
+		if (d_state == desc_finalized || d_state == desc_reusable)
 			break;
 
 		/*
@@ -1456,8 +1873,8 @@ static u64 prb_first_seq(struct printk_ringbuffer *rb)
 }
 
 /*
- * Non-blocking read of a record. Updates @seq to the last committed record
- * (which may have no data).
+ * Non-blocking read of a record. Updates @seq to the last finalized record
+ * (which may have no data available).
  *
  * See the description of prb_read_valid() and prb_read_valid_info()
  * for details.
@@ -1483,7 +1900,7 @@ static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
 			(*seq)++;
 
 		} else {
-			/* Non-existent/non-committed record. Must stop. */
+			/* Non-existent/non-finalized record. Must stop. */
 			return false;
 		}
 	}
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
index a9d85a6727b1..853ea62dc5f2 100644
--- a/kernel/printk/printk_ringbuffer.h
+++ b/kernel/printk/printk_ringbuffer.h
@@ -116,7 +116,8 @@ struct prb_reserved_entry {
 enum desc_state {
 	desc_miss	=  -1,	/* ID mismatch (pseudo state) */
 	desc_reserved	= 0x0,	/* reserved, in use by writer */
-	desc_committed	= 0x1,	/* committed by writer */
+	desc_committed	= 0x1,	/* committed by writer, could get reopened */
+	desc_finalized	= 0x2,	/* committed, no further modification allowed */
 	desc_reusable	= 0x3,	/* free, not yet used by any writer */
 };
 
@@ -327,7 +328,10 @@ static inline void prb_rec_init_wr(struct printk_record *r,
 
 bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
 		 struct printk_record *r);
+bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+			 struct printk_record *r, u32 caller_id);
 void prb_commit(struct prb_reserved_entry *e);
+void prb_final_commit(struct prb_reserved_entry *e);
 
 void prb_init(struct printk_ringbuffer *rb,
 	      char *text_buf, unsigned int text_buf_size,
diff --git a/scripts/gdb/linux/dmesg.py b/scripts/gdb/linux/dmesg.py
index dd8c0b95063a..bce14de5f610 100644
--- a/scripts/gdb/linux/dmesg.py
+++ b/scripts/gdb/linux/dmesg.py
@@ -79,6 +79,7 @@ class LxDmesg(gdb.Command):
 
         # definitions from kernel/printk/printk_ringbuffer.h
         desc_committed = 1
+        desc_finalized = 2
         desc_sv_bits = utils.get_long_type().sizeof * 8
         desc_flags_shift = desc_sv_bits - 2
         desc_flags_mask = 3 << desc_flags_shift
@@ -98,7 +99,7 @@ class LxDmesg(gdb.Command):
             # skip non-committed record
             state = 3 & (utils.read_u64(descs, desc_off + sv_off +
                                         counter_off) >> desc_flags_shift)
-            if state != desc_committed:
+            if state != desc_committed and state != desc_finalized:
                 if did == head_id:
                     break
                 did = (did + 1) & desc_id_mask
-- 
2.20.1




More information about the kexec mailing list