aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/trace/ring_buffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/trace/ring_buffer.c')
-rw-r--r--kernel/trace/ring_buffer.c1849
1 files changed, 1343 insertions, 506 deletions
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 04dac2638258..bd1c35a4fbcc 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -14,12 +14,14 @@
14#include <linux/module.h> 14#include <linux/module.h>
15#include <linux/percpu.h> 15#include <linux/percpu.h>
16#include <linux/mutex.h> 16#include <linux/mutex.h>
17#include <linux/slab.h>
17#include <linux/init.h> 18#include <linux/init.h>
18#include <linux/hash.h> 19#include <linux/hash.h>
19#include <linux/list.h> 20#include <linux/list.h>
20#include <linux/cpu.h> 21#include <linux/cpu.h>
21#include <linux/fs.h> 22#include <linux/fs.h>
22 23
24#include <asm/local.h>
23#include "trace.h" 25#include "trace.h"
24 26
25/* 27/*
@@ -201,13 +203,19 @@ int tracing_is_on(void)
201} 203}
202EXPORT_SYMBOL_GPL(tracing_is_on); 204EXPORT_SYMBOL_GPL(tracing_is_on);
203 205
204#include "trace.h"
205
206#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 206#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
207#define RB_ALIGNMENT 4U 207#define RB_ALIGNMENT 4U
208#define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 208#define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
209#define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 209#define RB_EVNT_MIN_SIZE 8U /* two 32bit words */
210 210
211#if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS)
212# define RB_FORCE_8BYTE_ALIGNMENT 0
213# define RB_ARCH_ALIGNMENT RB_ALIGNMENT
214#else
215# define RB_FORCE_8BYTE_ALIGNMENT 1
216# define RB_ARCH_ALIGNMENT 8U
217#endif
218
211/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 219/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
212#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 220#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
213 221
@@ -216,19 +224,17 @@ enum {
216 RB_LEN_TIME_STAMP = 16, 224 RB_LEN_TIME_STAMP = 16,
217}; 225};
218 226
219static inline int rb_null_event(struct ring_buffer_event *event) 227#define skip_time_extend(event) \
220{ 228 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
221 return event->type_len == RINGBUF_TYPE_PADDING
222 && event->time_delta == 0;
223}
224 229
225static inline int rb_discarded_event(struct ring_buffer_event *event) 230static inline int rb_null_event(struct ring_buffer_event *event)
226{ 231{
227 return event->type_len == RINGBUF_TYPE_PADDING && event->time_delta; 232 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
228} 233}
229 234
230static void rb_event_set_padding(struct ring_buffer_event *event) 235static void rb_event_set_padding(struct ring_buffer_event *event)
231{ 236{
237 /* padding has a NULL time_delta */
232 event->type_len = RINGBUF_TYPE_PADDING; 238 event->type_len = RINGBUF_TYPE_PADDING;
233 event->time_delta = 0; 239 event->time_delta = 0;
234} 240}
@@ -245,8 +251,12 @@ rb_event_data_length(struct ring_buffer_event *event)
245 return length + RB_EVNT_HDR_SIZE; 251 return length + RB_EVNT_HDR_SIZE;
246} 252}
247 253
248/* inline for ring buffer fast paths */ 254/*
249static unsigned 255 * Return the length of the given event. Will return
256 * the length of the time extend if the event is a
257 * time extend.
258 */
259static inline unsigned
250rb_event_length(struct ring_buffer_event *event) 260rb_event_length(struct ring_buffer_event *event)
251{ 261{
252 switch (event->type_len) { 262 switch (event->type_len) {
@@ -271,13 +281,41 @@ rb_event_length(struct ring_buffer_event *event)
271 return 0; 281 return 0;
272} 282}
273 283
284/*
285 * Return total length of time extend and data,
286 * or just the event length for all other events.
287 */
288static inline unsigned
289rb_event_ts_length(struct ring_buffer_event *event)
290{
291 unsigned len = 0;
292
293 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
294 /* time extends include the data event after it */
295 len = RB_LEN_TIME_EXTEND;
296 event = skip_time_extend(event);
297 }
298 return len + rb_event_length(event);
299}
300
274/** 301/**
275 * ring_buffer_event_length - return the length of the event 302 * ring_buffer_event_length - return the length of the event
276 * @event: the event to get the length of 303 * @event: the event to get the length of
304 *
305 * Returns the size of the data load of a data event.
306 * If the event is something other than a data event, it
307 * returns the size of the event itself. With the exception
308 * of a TIME EXTEND, where it still returns the size of the
309 * data load of the data event after it.
277 */ 310 */
278unsigned ring_buffer_event_length(struct ring_buffer_event *event) 311unsigned ring_buffer_event_length(struct ring_buffer_event *event)
279{ 312{
280 unsigned length = rb_event_length(event); 313 unsigned length;
314
315 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
316 event = skip_time_extend(event);
317
318 length = rb_event_length(event);
281 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 319 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
282 return length; 320 return length;
283 length -= RB_EVNT_HDR_SIZE; 321 length -= RB_EVNT_HDR_SIZE;
@@ -291,6 +329,8 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_length);
291static void * 329static void *
292rb_event_data(struct ring_buffer_event *event) 330rb_event_data(struct ring_buffer_event *event)
293{ 331{
332 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
333 event = skip_time_extend(event);
294 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 334 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
295 /* If length is in len field, then array[0] has the data */ 335 /* If length is in len field, then array[0] has the data */
296 if (event->type_len) 336 if (event->type_len)
@@ -316,20 +356,49 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_data);
316#define TS_MASK ((1ULL << TS_SHIFT) - 1) 356#define TS_MASK ((1ULL << TS_SHIFT) - 1)
317#define TS_DELTA_TEST (~TS_MASK) 357#define TS_DELTA_TEST (~TS_MASK)
318 358
359/* Flag when events were overwritten */
360#define RB_MISSED_EVENTS (1 << 31)
361/* Missed count stored at end */
362#define RB_MISSED_STORED (1 << 30)
363
319struct buffer_data_page { 364struct buffer_data_page {
320 u64 time_stamp; /* page time stamp */ 365 u64 time_stamp; /* page time stamp */
321 local_t commit; /* write committed index */ 366 local_t commit; /* write committed index */
322 unsigned char data[]; /* data of buffer page */ 367 unsigned char data[]; /* data of buffer page */
323}; 368};
324 369
370/*
371 * Note, the buffer_page list must be first. The buffer pages
372 * are allocated in cache lines, which means that each buffer
373 * page will be at the beginning of a cache line, and thus
374 * the least significant bits will be zero. We use this to
375 * add flags in the list struct pointers, to make the ring buffer
376 * lockless.
377 */
325struct buffer_page { 378struct buffer_page {
326 struct list_head list; /* list of buffer pages */ 379 struct list_head list; /* list of buffer pages */
327 local_t write; /* index for next write */ 380 local_t write; /* index for next write */
328 unsigned read; /* index for next read */ 381 unsigned read; /* index for next read */
329 local_t entries; /* entries on this page */ 382 local_t entries; /* entries on this page */
383 unsigned long real_end; /* real end of data */
330 struct buffer_data_page *page; /* Actual data page */ 384 struct buffer_data_page *page; /* Actual data page */
331}; 385};
332 386
387/*
388 * The buffer page counters, write and entries, must be reset
389 * atomically when crossing page boundaries. To synchronize this
390 * update, two counters are inserted into the number. One is
391 * the actual counter for the write position or count on the page.
392 *
393 * The other is a counter of updaters. Before an update happens
394 * the update partition of the counter is incremented. This will
395 * allow the updater to update the counter atomically.
396 *
397 * The counter is 20 bits, and the state data is 12.
398 */
399#define RB_WRITE_MASK 0xfffff
400#define RB_WRITE_INTCNT (1 << 20)
401
333static void rb_init_page(struct buffer_data_page *bpage) 402static void rb_init_page(struct buffer_data_page *bpage)
334{ 403{
335 local_set(&bpage->commit, 0); 404 local_set(&bpage->commit, 0);
@@ -372,27 +441,33 @@ static inline int test_time_stamp(u64 delta)
372/* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 441/* Max payload is BUF_PAGE_SIZE - header (8bytes) */
373#define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 442#define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
374 443
375/* Max number of timestamps that can fit on a page */
376#define RB_TIMESTAMPS_PER_PAGE (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP)
377
378int ring_buffer_print_page_header(struct trace_seq *s) 444int ring_buffer_print_page_header(struct trace_seq *s)
379{ 445{
380 struct buffer_data_page field; 446 struct buffer_data_page field;
381 int ret; 447 int ret;
382 448
383 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" 449 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t"
384 "offset:0;\tsize:%u;\n", 450 "offset:0;\tsize:%u;\tsigned:%u;\n",
385 (unsigned int)sizeof(field.time_stamp)); 451 (unsigned int)sizeof(field.time_stamp),
452 (unsigned int)is_signed_type(u64));
386 453
387 ret = trace_seq_printf(s, "\tfield: local_t commit;\t" 454 ret = trace_seq_printf(s, "\tfield: local_t commit;\t"
388 "offset:%u;\tsize:%u;\n", 455 "offset:%u;\tsize:%u;\tsigned:%u;\n",
456 (unsigned int)offsetof(typeof(field), commit),
457 (unsigned int)sizeof(field.commit),
458 (unsigned int)is_signed_type(long));
459
460 ret = trace_seq_printf(s, "\tfield: int overwrite;\t"
461 "offset:%u;\tsize:%u;\tsigned:%u;\n",
389 (unsigned int)offsetof(typeof(field), commit), 462 (unsigned int)offsetof(typeof(field), commit),
390 (unsigned int)sizeof(field.commit)); 463 1,
464 (unsigned int)is_signed_type(long));
391 465
392 ret = trace_seq_printf(s, "\tfield: char data;\t" 466 ret = trace_seq_printf(s, "\tfield: char data;\t"
393 "offset:%u;\tsize:%u;\n", 467 "offset:%u;\tsize:%u;\tsigned:%u;\n",
394 (unsigned int)offsetof(typeof(field), data), 468 (unsigned int)offsetof(typeof(field), data),
395 (unsigned int)BUF_PAGE_SIZE); 469 (unsigned int)BUF_PAGE_SIZE,
470 (unsigned int)is_signed_type(char));
396 471
397 return ret; 472 return ret;
398} 473}
@@ -402,25 +477,26 @@ int ring_buffer_print_page_header(struct trace_seq *s)
402 */ 477 */
403struct ring_buffer_per_cpu { 478struct ring_buffer_per_cpu {
404 int cpu; 479 int cpu;
480 atomic_t record_disabled;
405 struct ring_buffer *buffer; 481 struct ring_buffer *buffer;
406 spinlock_t reader_lock; /* serialize readers */ 482 spinlock_t reader_lock; /* serialize readers */
407 raw_spinlock_t lock; 483 arch_spinlock_t lock;
408 struct lock_class_key lock_key; 484 struct lock_class_key lock_key;
409 struct list_head pages; 485 struct list_head *pages;
410 struct buffer_page *head_page; /* read from head */ 486 struct buffer_page *head_page; /* read from head */
411 struct buffer_page *tail_page; /* write to tail */ 487 struct buffer_page *tail_page; /* write to tail */
412 struct buffer_page *commit_page; /* committed pages */ 488 struct buffer_page *commit_page; /* committed pages */
413 struct buffer_page *reader_page; 489 struct buffer_page *reader_page;
414 unsigned long nmi_dropped; 490 unsigned long lost_events;
415 unsigned long commit_overrun; 491 unsigned long last_overrun;
416 unsigned long overrun; 492 local_t commit_overrun;
417 unsigned long read; 493 local_t overrun;
418 local_t entries; 494 local_t entries;
419 local_t committing; 495 local_t committing;
420 local_t commits; 496 local_t commits;
497 unsigned long read;
421 u64 write_stamp; 498 u64 write_stamp;
422 u64 read_stamp; 499 u64 read_stamp;
423 atomic_t record_disabled;
424}; 500};
425 501
426struct ring_buffer { 502struct ring_buffer {
@@ -446,24 +522,31 @@ struct ring_buffer_iter {
446 struct ring_buffer_per_cpu *cpu_buffer; 522 struct ring_buffer_per_cpu *cpu_buffer;
447 unsigned long head; 523 unsigned long head;
448 struct buffer_page *head_page; 524 struct buffer_page *head_page;
525 struct buffer_page *cache_reader_page;
526 unsigned long cache_read;
449 u64 read_stamp; 527 u64 read_stamp;
450}; 528};
451 529
452/* buffer may be either ring_buffer or ring_buffer_per_cpu */ 530/* buffer may be either ring_buffer or ring_buffer_per_cpu */
453#define RB_WARN_ON(buffer, cond) \ 531#define RB_WARN_ON(b, cond) \
454 ({ \ 532 ({ \
455 int _____ret = unlikely(cond); \ 533 int _____ret = unlikely(cond); \
456 if (_____ret) { \ 534 if (_____ret) { \
457 atomic_inc(&buffer->record_disabled); \ 535 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
458 WARN_ON(1); \ 536 struct ring_buffer_per_cpu *__b = \
459 } \ 537 (void *)b; \
460 _____ret; \ 538 atomic_inc(&__b->buffer->record_disabled); \
539 } else \
540 atomic_inc(&b->record_disabled); \
541 WARN_ON(1); \
542 } \
543 _____ret; \
461 }) 544 })
462 545
463/* Up this if you want to test the TIME_EXTENTS and normalization */ 546/* Up this if you want to test the TIME_EXTENTS and normalization */
464#define DEBUG_SHIFT 0 547#define DEBUG_SHIFT 0
465 548
466static inline u64 rb_time_stamp(struct ring_buffer *buffer, int cpu) 549static inline u64 rb_time_stamp(struct ring_buffer *buffer)
467{ 550{
468 /* shift to debug/test normalization and TIME_EXTENTS */ 551 /* shift to debug/test normalization and TIME_EXTENTS */
469 return buffer->clock() << DEBUG_SHIFT; 552 return buffer->clock() << DEBUG_SHIFT;
@@ -474,7 +557,7 @@ u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
474 u64 time; 557 u64 time;
475 558
476 preempt_disable_notrace(); 559 preempt_disable_notrace();
477 time = rb_time_stamp(buffer, cpu); 560 time = rb_time_stamp(buffer);
478 preempt_enable_no_resched_notrace(); 561 preempt_enable_no_resched_notrace();
479 562
480 return time; 563 return time;
@@ -489,6 +572,390 @@ void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
489} 572}
490EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 573EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
491 574
575/*
576 * Making the ring buffer lockless makes things tricky.
577 * Although writes only happen on the CPU that they are on,
578 * and they only need to worry about interrupts. Reads can
579 * happen on any CPU.
580 *
581 * The reader page is always off the ring buffer, but when the
582 * reader finishes with a page, it needs to swap its page with
583 * a new one from the buffer. The reader needs to take from
584 * the head (writes go to the tail). But if a writer is in overwrite
585 * mode and wraps, it must push the head page forward.
586 *
587 * Here lies the problem.
588 *
589 * The reader must be careful to replace only the head page, and
590 * not another one. As described at the top of the file in the
591 * ASCII art, the reader sets its old page to point to the next
592 * page after head. It then sets the page after head to point to
593 * the old reader page. But if the writer moves the head page
594 * during this operation, the reader could end up with the tail.
595 *
596 * We use cmpxchg to help prevent this race. We also do something
597 * special with the page before head. We set the LSB to 1.
598 *
599 * When the writer must push the page forward, it will clear the
600 * bit that points to the head page, move the head, and then set
601 * the bit that points to the new head page.
602 *
603 * We also don't want an interrupt coming in and moving the head
604 * page on another writer. Thus we use the second LSB to catch
605 * that too. Thus:
606 *
607 * head->list->prev->next bit 1 bit 0
608 * ------- -------
609 * Normal page 0 0
610 * Points to head page 0 1
611 * New head page 1 0
612 *
613 * Note we can not trust the prev pointer of the head page, because:
614 *
615 * +----+ +-----+ +-----+
616 * | |------>| T |---X--->| N |
617 * | |<------| | | |
618 * +----+ +-----+ +-----+
619 * ^ ^ |
620 * | +-----+ | |
621 * +----------| R |----------+ |
622 * | |<-----------+
623 * +-----+
624 *
625 * Key: ---X--> HEAD flag set in pointer
626 * T Tail page
627 * R Reader page
628 * N Next page
629 *
630 * (see __rb_reserve_next() to see where this happens)
631 *
632 * What the above shows is that the reader just swapped out
633 * the reader page with a page in the buffer, but before it
634 * could make the new header point back to the new page added
635 * it was preempted by a writer. The writer moved forward onto
636 * the new page added by the reader and is about to move forward
637 * again.
638 *
639 * You can see, it is legitimate for the previous pointer of
640 * the head (or any page) not to point back to itself. But only
641 * temporarially.
642 */
643
644#define RB_PAGE_NORMAL 0UL
645#define RB_PAGE_HEAD 1UL
646#define RB_PAGE_UPDATE 2UL
647
648
649#define RB_FLAG_MASK 3UL
650
651/* PAGE_MOVED is not part of the mask */
652#define RB_PAGE_MOVED 4UL
653
654/*
655 * rb_list_head - remove any bit
656 */
657static struct list_head *rb_list_head(struct list_head *list)
658{
659 unsigned long val = (unsigned long)list;
660
661 return (struct list_head *)(val & ~RB_FLAG_MASK);
662}
663
664/*
665 * rb_is_head_page - test if the given page is the head page
666 *
667 * Because the reader may move the head_page pointer, we can
668 * not trust what the head page is (it may be pointing to
669 * the reader page). But if the next page is a header page,
670 * its flags will be non zero.
671 */
672static int inline
673rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
674 struct buffer_page *page, struct list_head *list)
675{
676 unsigned long val;
677
678 val = (unsigned long)list->next;
679
680 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
681 return RB_PAGE_MOVED;
682
683 return val & RB_FLAG_MASK;
684}
685
686/*
687 * rb_is_reader_page
688 *
689 * The unique thing about the reader page, is that, if the
690 * writer is ever on it, the previous pointer never points
691 * back to the reader page.
692 */
693static int rb_is_reader_page(struct buffer_page *page)
694{
695 struct list_head *list = page->list.prev;
696
697 return rb_list_head(list->next) != &page->list;
698}
699
700/*
701 * rb_set_list_to_head - set a list_head to be pointing to head.
702 */
703static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
704 struct list_head *list)
705{
706 unsigned long *ptr;
707
708 ptr = (unsigned long *)&list->next;
709 *ptr |= RB_PAGE_HEAD;
710 *ptr &= ~RB_PAGE_UPDATE;
711}
712
713/*
714 * rb_head_page_activate - sets up head page
715 */
716static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
717{
718 struct buffer_page *head;
719
720 head = cpu_buffer->head_page;
721 if (!head)
722 return;
723
724 /*
725 * Set the previous list pointer to have the HEAD flag.
726 */
727 rb_set_list_to_head(cpu_buffer, head->list.prev);
728}
729
730static void rb_list_head_clear(struct list_head *list)
731{
732 unsigned long *ptr = (unsigned long *)&list->next;
733
734 *ptr &= ~RB_FLAG_MASK;
735}
736
737/*
738 * rb_head_page_dactivate - clears head page ptr (for free list)
739 */
740static void
741rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
742{
743 struct list_head *hd;
744
745 /* Go through the whole list and clear any pointers found. */
746 rb_list_head_clear(cpu_buffer->pages);
747
748 list_for_each(hd, cpu_buffer->pages)
749 rb_list_head_clear(hd);
750}
751
752static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
753 struct buffer_page *head,
754 struct buffer_page *prev,
755 int old_flag, int new_flag)
756{
757 struct list_head *list;
758 unsigned long val = (unsigned long)&head->list;
759 unsigned long ret;
760
761 list = &prev->list;
762
763 val &= ~RB_FLAG_MASK;
764
765 ret = cmpxchg((unsigned long *)&list->next,
766 val | old_flag, val | new_flag);
767
768 /* check if the reader took the page */
769 if ((ret & ~RB_FLAG_MASK) != val)
770 return RB_PAGE_MOVED;
771
772 return ret & RB_FLAG_MASK;
773}
774
775static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
776 struct buffer_page *head,
777 struct buffer_page *prev,
778 int old_flag)
779{
780 return rb_head_page_set(cpu_buffer, head, prev,
781 old_flag, RB_PAGE_UPDATE);
782}
783
784static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
785 struct buffer_page *head,
786 struct buffer_page *prev,
787 int old_flag)
788{
789 return rb_head_page_set(cpu_buffer, head, prev,
790 old_flag, RB_PAGE_HEAD);
791}
792
793static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
794 struct buffer_page *head,
795 struct buffer_page *prev,
796 int old_flag)
797{
798 return rb_head_page_set(cpu_buffer, head, prev,
799 old_flag, RB_PAGE_NORMAL);
800}
801
802static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
803 struct buffer_page **bpage)
804{
805 struct list_head *p = rb_list_head((*bpage)->list.next);
806
807 *bpage = list_entry(p, struct buffer_page, list);
808}
809
810static struct buffer_page *
811rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
812{
813 struct buffer_page *head;
814 struct buffer_page *page;
815 struct list_head *list;
816 int i;
817
818 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
819 return NULL;
820
821 /* sanity check */
822 list = cpu_buffer->pages;
823 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
824 return NULL;
825
826 page = head = cpu_buffer->head_page;
827 /*
828 * It is possible that the writer moves the header behind
829 * where we started, and we miss in one loop.
830 * A second loop should grab the header, but we'll do
831 * three loops just because I'm paranoid.
832 */
833 for (i = 0; i < 3; i++) {
834 do {
835 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
836 cpu_buffer->head_page = page;
837 return page;
838 }
839 rb_inc_page(cpu_buffer, &page);
840 } while (page != head);
841 }
842
843 RB_WARN_ON(cpu_buffer, 1);
844
845 return NULL;
846}
847
848static int rb_head_page_replace(struct buffer_page *old,
849 struct buffer_page *new)
850{
851 unsigned long *ptr = (unsigned long *)&old->list.prev->next;
852 unsigned long val;
853 unsigned long ret;
854
855 val = *ptr & ~RB_FLAG_MASK;
856 val |= RB_PAGE_HEAD;
857
858 ret = cmpxchg(ptr, val, (unsigned long)&new->list);
859
860 return ret == val;
861}
862
863/*
864 * rb_tail_page_update - move the tail page forward
865 *
866 * Returns 1 if moved tail page, 0 if someone else did.
867 */
868static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
869 struct buffer_page *tail_page,
870 struct buffer_page *next_page)
871{
872 struct buffer_page *old_tail;
873 unsigned long old_entries;
874 unsigned long old_write;
875 int ret = 0;
876
877 /*
878 * The tail page now needs to be moved forward.
879 *
880 * We need to reset the tail page, but without messing
881 * with possible erasing of data brought in by interrupts
882 * that have moved the tail page and are currently on it.
883 *
884 * We add a counter to the write field to denote this.
885 */
886 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
887 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
888
889 /*
890 * Just make sure we have seen our old_write and synchronize
891 * with any interrupts that come in.
892 */
893 barrier();
894
895 /*
896 * If the tail page is still the same as what we think
897 * it is, then it is up to us to update the tail
898 * pointer.
899 */
900 if (tail_page == cpu_buffer->tail_page) {
901 /* Zero the write counter */
902 unsigned long val = old_write & ~RB_WRITE_MASK;
903 unsigned long eval = old_entries & ~RB_WRITE_MASK;
904
905 /*
906 * This will only succeed if an interrupt did
907 * not come in and change it. In which case, we
908 * do not want to modify it.
909 *
910 * We add (void) to let the compiler know that we do not care
911 * about the return value of these functions. We use the
912 * cmpxchg to only update if an interrupt did not already
913 * do it for us. If the cmpxchg fails, we don't care.
914 */
915 (void)local_cmpxchg(&next_page->write, old_write, val);
916 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
917
918 /*
919 * No need to worry about races with clearing out the commit.
920 * it only can increment when a commit takes place. But that
921 * only happens in the outer most nested commit.
922 */
923 local_set(&next_page->page->commit, 0);
924
925 old_tail = cmpxchg(&cpu_buffer->tail_page,
926 tail_page, next_page);
927
928 if (old_tail == tail_page)
929 ret = 1;
930 }
931
932 return ret;
933}
934
935static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
936 struct buffer_page *bpage)
937{
938 unsigned long val = (unsigned long)bpage;
939
940 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
941 return 1;
942
943 return 0;
944}
945
946/**
947 * rb_check_list - make sure a pointer to a list has the last bits zero
948 */
949static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
950 struct list_head *list)
951{
952 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
953 return 1;
954 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
955 return 1;
956 return 0;
957}
958
492/** 959/**
493 * check_pages - integrity check of buffer pages 960 * check_pages - integrity check of buffer pages
494 * @cpu_buffer: CPU buffer with pages to test 961 * @cpu_buffer: CPU buffer with pages to test
@@ -498,14 +965,19 @@ EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
498 */ 965 */
499static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 966static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
500{ 967{
501 struct list_head *head = &cpu_buffer->pages; 968 struct list_head *head = cpu_buffer->pages;
502 struct buffer_page *bpage, *tmp; 969 struct buffer_page *bpage, *tmp;
503 970
971 rb_head_page_deactivate(cpu_buffer);
972
504 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 973 if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
505 return -1; 974 return -1;
506 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 975 if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
507 return -1; 976 return -1;
508 977
978 if (rb_check_list(cpu_buffer, head))
979 return -1;
980
509 list_for_each_entry_safe(bpage, tmp, head, list) { 981 list_for_each_entry_safe(bpage, tmp, head, list) {
510 if (RB_WARN_ON(cpu_buffer, 982 if (RB_WARN_ON(cpu_buffer,
511 bpage->list.next->prev != &bpage->list)) 983 bpage->list.next->prev != &bpage->list))
@@ -513,25 +985,33 @@ static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
513 if (RB_WARN_ON(cpu_buffer, 985 if (RB_WARN_ON(cpu_buffer,
514 bpage->list.prev->next != &bpage->list)) 986 bpage->list.prev->next != &bpage->list))
515 return -1; 987 return -1;
988 if (rb_check_list(cpu_buffer, &bpage->list))
989 return -1;
516 } 990 }
517 991
992 rb_head_page_activate(cpu_buffer);
993
518 return 0; 994 return 0;
519} 995}
520 996
521static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 997static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
522 unsigned nr_pages) 998 unsigned nr_pages)
523{ 999{
524 struct list_head *head = &cpu_buffer->pages;
525 struct buffer_page *bpage, *tmp; 1000 struct buffer_page *bpage, *tmp;
526 unsigned long addr; 1001 unsigned long addr;
527 LIST_HEAD(pages); 1002 LIST_HEAD(pages);
528 unsigned i; 1003 unsigned i;
529 1004
1005 WARN_ON(!nr_pages);
1006
530 for (i = 0; i < nr_pages; i++) { 1007 for (i = 0; i < nr_pages; i++) {
531 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1008 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
532 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 1009 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
533 if (!bpage) 1010 if (!bpage)
534 goto free_pages; 1011 goto free_pages;
1012
1013 rb_check_bpage(cpu_buffer, bpage);
1014
535 list_add(&bpage->list, &pages); 1015 list_add(&bpage->list, &pages);
536 1016
537 addr = __get_free_page(GFP_KERNEL); 1017 addr = __get_free_page(GFP_KERNEL);
@@ -541,7 +1021,13 @@ static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
541 rb_init_page(bpage->page); 1021 rb_init_page(bpage->page);
542 } 1022 }
543 1023
544 list_splice(&pages, head); 1024 /*
1025 * The ring buffer page list is a circular list that does not
1026 * start and end with a list head. All page list items point to
1027 * other pages.
1028 */
1029 cpu_buffer->pages = pages.next;
1030 list_del(&pages);
545 1031
546 rb_check_pages(cpu_buffer); 1032 rb_check_pages(cpu_buffer);
547 1033
@@ -572,14 +1058,15 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
572 cpu_buffer->buffer = buffer; 1058 cpu_buffer->buffer = buffer;
573 spin_lock_init(&cpu_buffer->reader_lock); 1059 spin_lock_init(&cpu_buffer->reader_lock);
574 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1060 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
575 cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED; 1061 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
576 INIT_LIST_HEAD(&cpu_buffer->pages);
577 1062
578 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1063 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
579 GFP_KERNEL, cpu_to_node(cpu)); 1064 GFP_KERNEL, cpu_to_node(cpu));
580 if (!bpage) 1065 if (!bpage)
581 goto fail_free_buffer; 1066 goto fail_free_buffer;
582 1067
1068 rb_check_bpage(cpu_buffer, bpage);
1069
583 cpu_buffer->reader_page = bpage; 1070 cpu_buffer->reader_page = bpage;
584 addr = __get_free_page(GFP_KERNEL); 1071 addr = __get_free_page(GFP_KERNEL);
585 if (!addr) 1072 if (!addr)
@@ -594,9 +1081,11 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
594 goto fail_free_reader; 1081 goto fail_free_reader;
595 1082
596 cpu_buffer->head_page 1083 cpu_buffer->head_page
597 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 1084 = list_entry(cpu_buffer->pages, struct buffer_page, list);
598 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1085 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
599 1086
1087 rb_head_page_activate(cpu_buffer);
1088
600 return cpu_buffer; 1089 return cpu_buffer;
601 1090
602 fail_free_reader: 1091 fail_free_reader:
@@ -609,15 +1098,22 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
609 1098
610static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1099static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
611{ 1100{
612 struct list_head *head = &cpu_buffer->pages; 1101 struct list_head *head = cpu_buffer->pages;
613 struct buffer_page *bpage, *tmp; 1102 struct buffer_page *bpage, *tmp;
614 1103
615 free_buffer_page(cpu_buffer->reader_page); 1104 free_buffer_page(cpu_buffer->reader_page);
616 1105
617 list_for_each_entry_safe(bpage, tmp, head, list) { 1106 rb_head_page_deactivate(cpu_buffer);
618 list_del_init(&bpage->list); 1107
1108 if (head) {
1109 list_for_each_entry_safe(bpage, tmp, head, list) {
1110 list_del_init(&bpage->list);
1111 free_buffer_page(bpage);
1112 }
1113 bpage = list_entry(head, struct buffer_page, list);
619 free_buffer_page(bpage); 1114 free_buffer_page(bpage);
620 } 1115 }
1116
621 kfree(cpu_buffer); 1117 kfree(cpu_buffer);
622} 1118}
623 1119
@@ -735,6 +1231,7 @@ ring_buffer_free(struct ring_buffer *buffer)
735 1231
736 put_online_cpus(); 1232 put_online_cpus();
737 1233
1234 kfree(buffer->buffers);
738 free_cpumask_var(buffer->cpumask); 1235 free_cpumask_var(buffer->cpumask);
739 1236
740 kfree(buffer); 1237 kfree(buffer);
@@ -756,26 +1253,25 @@ rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
756 struct list_head *p; 1253 struct list_head *p;
757 unsigned i; 1254 unsigned i;
758 1255
759 atomic_inc(&cpu_buffer->record_disabled); 1256 spin_lock_irq(&cpu_buffer->reader_lock);
760 synchronize_sched(); 1257 rb_head_page_deactivate(cpu_buffer);
761 1258
762 for (i = 0; i < nr_pages; i++) { 1259 for (i = 0; i < nr_pages; i++) {
763 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 1260 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
764 return; 1261 goto out;
765 p = cpu_buffer->pages.next; 1262 p = cpu_buffer->pages->next;
766 bpage = list_entry(p, struct buffer_page, list); 1263 bpage = list_entry(p, struct buffer_page, list);
767 list_del_init(&bpage->list); 1264 list_del_init(&bpage->list);
768 free_buffer_page(bpage); 1265 free_buffer_page(bpage);
769 } 1266 }
770 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 1267 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
771 return; 1268 goto out;
772 1269
773 rb_reset_cpu(cpu_buffer); 1270 rb_reset_cpu(cpu_buffer);
774
775 rb_check_pages(cpu_buffer); 1271 rb_check_pages(cpu_buffer);
776 1272
777 atomic_dec(&cpu_buffer->record_disabled); 1273out:
778 1274 spin_unlock_irq(&cpu_buffer->reader_lock);
779} 1275}
780 1276
781static void 1277static void
@@ -786,22 +1282,22 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
786 struct list_head *p; 1282 struct list_head *p;
787 unsigned i; 1283 unsigned i;
788 1284
789 atomic_inc(&cpu_buffer->record_disabled); 1285 spin_lock_irq(&cpu_buffer->reader_lock);
790 synchronize_sched(); 1286 rb_head_page_deactivate(cpu_buffer);
791 1287
792 for (i = 0; i < nr_pages; i++) { 1288 for (i = 0; i < nr_pages; i++) {
793 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 1289 if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
794 return; 1290 goto out;
795 p = pages->next; 1291 p = pages->next;
796 bpage = list_entry(p, struct buffer_page, list); 1292 bpage = list_entry(p, struct buffer_page, list);
797 list_del_init(&bpage->list); 1293 list_del_init(&bpage->list);
798 list_add_tail(&bpage->list, &cpu_buffer->pages); 1294 list_add_tail(&bpage->list, cpu_buffer->pages);
799 } 1295 }
800 rb_reset_cpu(cpu_buffer); 1296 rb_reset_cpu(cpu_buffer);
801
802 rb_check_pages(cpu_buffer); 1297 rb_check_pages(cpu_buffer);
803 1298
804 atomic_dec(&cpu_buffer->record_disabled); 1299out:
1300 spin_unlock_irq(&cpu_buffer->reader_lock);
805} 1301}
806 1302
807/** 1303/**
@@ -809,11 +1305,6 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
809 * @buffer: the buffer to resize. 1305 * @buffer: the buffer to resize.
810 * @size: the new size. 1306 * @size: the new size.
811 * 1307 *
812 * The tracer is responsible for making sure that the buffer is
813 * not being used while changing the size.
814 * Note: We may be able to change the above requirement by using
815 * RCU synchronizations.
816 *
817 * Minimum size is 2 * BUF_PAGE_SIZE. 1308 * Minimum size is 2 * BUF_PAGE_SIZE.
818 * 1309 *
819 * Returns -1 on failure. 1310 * Returns -1 on failure.
@@ -845,6 +1336,11 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
845 if (size == buffer_size) 1336 if (size == buffer_size)
846 return size; 1337 return size;
847 1338
1339 atomic_inc(&buffer->record_disabled);
1340
1341 /* Make sure all writers are done with this buffer. */
1342 synchronize_sched();
1343
848 mutex_lock(&buffer->mutex); 1344 mutex_lock(&buffer->mutex);
849 get_online_cpus(); 1345 get_online_cpus();
850 1346
@@ -907,6 +1403,8 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
907 put_online_cpus(); 1403 put_online_cpus();
908 mutex_unlock(&buffer->mutex); 1404 mutex_unlock(&buffer->mutex);
909 1405
1406 atomic_dec(&buffer->record_disabled);
1407
910 return size; 1408 return size;
911 1409
912 free_pages: 1410 free_pages:
@@ -916,6 +1414,7 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
916 } 1414 }
917 put_online_cpus(); 1415 put_online_cpus();
918 mutex_unlock(&buffer->mutex); 1416 mutex_unlock(&buffer->mutex);
1417 atomic_dec(&buffer->record_disabled);
919 return -ENOMEM; 1418 return -ENOMEM;
920 1419
921 /* 1420 /*
@@ -925,6 +1424,7 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
925 out_fail: 1424 out_fail:
926 put_online_cpus(); 1425 put_online_cpus();
927 mutex_unlock(&buffer->mutex); 1426 mutex_unlock(&buffer->mutex);
1427 atomic_dec(&buffer->record_disabled);
928 return -1; 1428 return -1;
929} 1429}
930EXPORT_SYMBOL_GPL(ring_buffer_resize); 1430EXPORT_SYMBOL_GPL(ring_buffer_resize);
@@ -948,21 +1448,14 @@ rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
948} 1448}
949 1449
950static inline struct ring_buffer_event * 1450static inline struct ring_buffer_event *
951rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
952{
953 return __rb_page_index(cpu_buffer->head_page,
954 cpu_buffer->head_page->read);
955}
956
957static inline struct ring_buffer_event *
958rb_iter_head_event(struct ring_buffer_iter *iter) 1451rb_iter_head_event(struct ring_buffer_iter *iter)
959{ 1452{
960 return __rb_page_index(iter->head_page, iter->head); 1453 return __rb_page_index(iter->head_page, iter->head);
961} 1454}
962 1455
963static inline unsigned rb_page_write(struct buffer_page *bpage) 1456static inline unsigned long rb_page_write(struct buffer_page *bpage)
964{ 1457{
965 return local_read(&bpage->write); 1458 return local_read(&bpage->write) & RB_WRITE_MASK;
966} 1459}
967 1460
968static inline unsigned rb_page_commit(struct buffer_page *bpage) 1461static inline unsigned rb_page_commit(struct buffer_page *bpage)
@@ -970,6 +1463,11 @@ static inline unsigned rb_page_commit(struct buffer_page *bpage)
970 return local_read(&bpage->page->commit); 1463 return local_read(&bpage->page->commit);
971} 1464}
972 1465
1466static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1467{
1468 return local_read(&bpage->entries) & RB_WRITE_MASK;
1469}
1470
973/* Size is determined by what has been commited */ 1471/* Size is determined by what has been commited */
974static inline unsigned rb_page_size(struct buffer_page *bpage) 1472static inline unsigned rb_page_size(struct buffer_page *bpage)
975{ 1473{
@@ -982,22 +1480,6 @@ rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
982 return rb_page_commit(cpu_buffer->commit_page); 1480 return rb_page_commit(cpu_buffer->commit_page);
983} 1481}
984 1482
985static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
986{
987 return rb_page_commit(cpu_buffer->head_page);
988}
989
990static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
991 struct buffer_page **bpage)
992{
993 struct list_head *p = (*bpage)->list.next;
994
995 if (p == &cpu_buffer->pages)
996 p = p->next;
997
998 *bpage = list_entry(p, struct buffer_page, list);
999}
1000
1001static inline unsigned 1483static inline unsigned
1002rb_event_index(struct ring_buffer_event *event) 1484rb_event_index(struct ring_buffer_event *event)
1003{ 1485{
@@ -1023,6 +1505,8 @@ rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
1023static void 1505static void
1024rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1506rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
1025{ 1507{
1508 unsigned long max_count;
1509
1026 /* 1510 /*
1027 * We only race with interrupts and NMIs on this CPU. 1511 * We only race with interrupts and NMIs on this CPU.
1028 * If we own the commit event, then we can commit 1512 * If we own the commit event, then we can commit
@@ -1032,9 +1516,16 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
1032 * assign the commit to the tail. 1516 * assign the commit to the tail.
1033 */ 1517 */
1034 again: 1518 again:
1519 max_count = cpu_buffer->buffer->pages * 100;
1520
1035 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 1521 while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
1036 cpu_buffer->commit_page->page->commit = 1522 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
1037 cpu_buffer->commit_page->write; 1523 return;
1524 if (RB_WARN_ON(cpu_buffer,
1525 rb_is_reader_page(cpu_buffer->tail_page)))
1526 return;
1527 local_set(&cpu_buffer->commit_page->page->commit,
1528 rb_page_write(cpu_buffer->commit_page));
1038 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 1529 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
1039 cpu_buffer->write_stamp = 1530 cpu_buffer->write_stamp =
1040 cpu_buffer->commit_page->page->time_stamp; 1531 cpu_buffer->commit_page->page->time_stamp;
@@ -1043,8 +1534,12 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
1043 } 1534 }
1044 while (rb_commit_index(cpu_buffer) != 1535 while (rb_commit_index(cpu_buffer) !=
1045 rb_page_write(cpu_buffer->commit_page)) { 1536 rb_page_write(cpu_buffer->commit_page)) {
1046 cpu_buffer->commit_page->page->commit = 1537
1047 cpu_buffer->commit_page->write; 1538 local_set(&cpu_buffer->commit_page->page->commit,
1539 rb_page_write(cpu_buffer->commit_page));
1540 RB_WARN_ON(cpu_buffer,
1541 local_read(&cpu_buffer->commit_page->page->commit) &
1542 ~RB_WRITE_MASK);
1048 barrier(); 1543 barrier();
1049 } 1544 }
1050 1545
@@ -1077,7 +1572,7 @@ static void rb_inc_iter(struct ring_buffer_iter *iter)
1077 * to the head page instead of next. 1572 * to the head page instead of next.
1078 */ 1573 */
1079 if (iter->head_page == cpu_buffer->reader_page) 1574 if (iter->head_page == cpu_buffer->reader_page)
1080 iter->head_page = cpu_buffer->head_page; 1575 iter->head_page = rb_set_head_page(cpu_buffer);
1081 else 1576 else
1082 rb_inc_page(cpu_buffer, &iter->head_page); 1577 rb_inc_page(cpu_buffer, &iter->head_page);
1083 1578
@@ -1085,6 +1580,25 @@ static void rb_inc_iter(struct ring_buffer_iter *iter)
1085 iter->head = 0; 1580 iter->head = 0;
1086} 1581}
1087 1582
1583/* Slow path, do not inline */
1584static noinline struct ring_buffer_event *
1585rb_add_time_stamp(struct ring_buffer_event *event, u64 delta)
1586{
1587 event->type_len = RINGBUF_TYPE_TIME_EXTEND;
1588
1589 /* Not the first event on the page? */
1590 if (rb_event_index(event)) {
1591 event->time_delta = delta & TS_MASK;
1592 event->array[0] = delta >> TS_SHIFT;
1593 } else {
1594 /* nope, just zero it */
1595 event->time_delta = 0;
1596 event->array[0] = 0;
1597 }
1598
1599 return skip_time_extend(event);
1600}
1601
1088/** 1602/**
1089 * ring_buffer_update_event - update event type and data 1603 * ring_buffer_update_event - update event type and data
1090 * @event: the even to update 1604 * @event: the even to update
@@ -1097,28 +1611,188 @@ static void rb_inc_iter(struct ring_buffer_iter *iter)
1097 * data field. 1611 * data field.
1098 */ 1612 */
1099static void 1613static void
1100rb_update_event(struct ring_buffer_event *event, 1614rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
1101 unsigned type, unsigned length) 1615 struct ring_buffer_event *event, unsigned length,
1616 int add_timestamp, u64 delta)
1102{ 1617{
1103 event->type_len = type; 1618 /* Only a commit updates the timestamp */
1619 if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
1620 delta = 0;
1621
1622 /*
1623 * If we need to add a timestamp, then we
1624 * add it to the start of the resevered space.
1625 */
1626 if (unlikely(add_timestamp)) {
1627 event = rb_add_time_stamp(event, delta);
1628 length -= RB_LEN_TIME_EXTEND;
1629 delta = 0;
1630 }
1631
1632 event->time_delta = delta;
1633 length -= RB_EVNT_HDR_SIZE;
1634 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
1635 event->type_len = 0;
1636 event->array[0] = length;
1637 } else
1638 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
1639}
1640
1641/*
1642 * rb_handle_head_page - writer hit the head page
1643 *
1644 * Returns: +1 to retry page
1645 * 0 to continue
1646 * -1 on error
1647 */
1648static int
1649rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
1650 struct buffer_page *tail_page,
1651 struct buffer_page *next_page)
1652{
1653 struct buffer_page *new_head;
1654 int entries;
1655 int type;
1656 int ret;
1657
1658 entries = rb_page_entries(next_page);
1659
1660 /*
1661 * The hard part is here. We need to move the head
1662 * forward, and protect against both readers on
1663 * other CPUs and writers coming in via interrupts.
1664 */
1665 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
1666 RB_PAGE_HEAD);
1667
1668 /*
1669 * type can be one of four:
1670 * NORMAL - an interrupt already moved it for us
1671 * HEAD - we are the first to get here.
1672 * UPDATE - we are the interrupt interrupting
1673 * a current move.
1674 * MOVED - a reader on another CPU moved the next
1675 * pointer to its reader page. Give up
1676 * and try again.
1677 */
1104 1678
1105 switch (type) { 1679 switch (type) {
1680 case RB_PAGE_HEAD:
1681 /*
1682 * We changed the head to UPDATE, thus
1683 * it is our responsibility to update
1684 * the counters.
1685 */
1686 local_add(entries, &cpu_buffer->overrun);
1106 1687
1107 case RINGBUF_TYPE_PADDING: 1688 /*
1108 case RINGBUF_TYPE_TIME_EXTEND: 1689 * The entries will be zeroed out when we move the
1109 case RINGBUF_TYPE_TIME_STAMP: 1690 * tail page.
1691 */
1692
1693 /* still more to do */
1110 break; 1694 break;
1111 1695
1112 case 0: 1696 case RB_PAGE_UPDATE:
1113 length -= RB_EVNT_HDR_SIZE; 1697 /*
1114 if (length > RB_MAX_SMALL_DATA) 1698 * This is an interrupt that interrupt the
1115 event->array[0] = length; 1699 * previous update. Still more to do.
1116 else 1700 */
1117 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
1118 break; 1701 break;
1702 case RB_PAGE_NORMAL:
1703 /*
1704 * An interrupt came in before the update
1705 * and processed this for us.
1706 * Nothing left to do.
1707 */
1708 return 1;
1709 case RB_PAGE_MOVED:
1710 /*
1711 * The reader is on another CPU and just did
1712 * a swap with our next_page.
1713 * Try again.
1714 */
1715 return 1;
1119 default: 1716 default:
1120 BUG(); 1717 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
1718 return -1;
1719 }
1720
1721 /*
1722 * Now that we are here, the old head pointer is
1723 * set to UPDATE. This will keep the reader from
1724 * swapping the head page with the reader page.
1725 * The reader (on another CPU) will spin till
1726 * we are finished.
1727 *
1728 * We just need to protect against interrupts
1729 * doing the job. We will set the next pointer
1730 * to HEAD. After that, we set the old pointer
1731 * to NORMAL, but only if it was HEAD before.
1732 * otherwise we are an interrupt, and only
1733 * want the outer most commit to reset it.
1734 */
1735 new_head = next_page;
1736 rb_inc_page(cpu_buffer, &new_head);
1737
1738 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
1739 RB_PAGE_NORMAL);
1740
1741 /*
1742 * Valid returns are:
1743 * HEAD - an interrupt came in and already set it.
1744 * NORMAL - One of two things:
1745 * 1) We really set it.
1746 * 2) A bunch of interrupts came in and moved
1747 * the page forward again.
1748 */
1749 switch (ret) {
1750 case RB_PAGE_HEAD:
1751 case RB_PAGE_NORMAL:
1752 /* OK */
1753 break;
1754 default:
1755 RB_WARN_ON(cpu_buffer, 1);
1756 return -1;
1121 } 1757 }
1758
1759 /*
1760 * It is possible that an interrupt came in,
1761 * set the head up, then more interrupts came in
1762 * and moved it again. When we get back here,
1763 * the page would have been set to NORMAL but we
1764 * just set it back to HEAD.
1765 *
1766 * How do you detect this? Well, if that happened
1767 * the tail page would have moved.
1768 */
1769 if (ret == RB_PAGE_NORMAL) {
1770 /*
1771 * If the tail had moved passed next, then we need
1772 * to reset the pointer.
1773 */
1774 if (cpu_buffer->tail_page != tail_page &&
1775 cpu_buffer->tail_page != next_page)
1776 rb_head_page_set_normal(cpu_buffer, new_head,
1777 next_page,
1778 RB_PAGE_HEAD);
1779 }
1780
1781 /*
1782 * If this was the outer most commit (the one that
1783 * changed the original pointer from HEAD to UPDATE),
1784 * then it is up to us to reset it to NORMAL.
1785 */
1786 if (type == RB_PAGE_HEAD) {
1787 ret = rb_head_page_set_normal(cpu_buffer, next_page,
1788 tail_page,
1789 RB_PAGE_UPDATE);
1790 if (RB_WARN_ON(cpu_buffer,
1791 ret != RB_PAGE_UPDATE))
1792 return -1;
1793 }
1794
1795 return 0;
1122} 1796}
1123 1797
1124static unsigned rb_calculate_event_length(unsigned length) 1798static unsigned rb_calculate_event_length(unsigned length)
@@ -1129,11 +1803,11 @@ static unsigned rb_calculate_event_length(unsigned length)
1129 if (!length) 1803 if (!length)
1130 length = 1; 1804 length = 1;
1131 1805
1132 if (length > RB_MAX_SMALL_DATA) 1806 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
1133 length += sizeof(event.array[0]); 1807 length += sizeof(event.array[0]);
1134 1808
1135 length += RB_EVNT_HDR_SIZE; 1809 length += RB_EVNT_HDR_SIZE;
1136 length = ALIGN(length, RB_ALIGNMENT); 1810 length = ALIGN(length, RB_ARCH_ALIGNMENT);
1137 1811
1138 return length; 1812 return length;
1139} 1813}
@@ -1150,6 +1824,14 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
1150 * must fill the old tail_page with padding. 1824 * must fill the old tail_page with padding.
1151 */ 1825 */
1152 if (tail >= BUF_PAGE_SIZE) { 1826 if (tail >= BUF_PAGE_SIZE) {
1827 /*
1828 * If the page was filled, then we still need
1829 * to update the real_end. Reset it to zero
1830 * and the reader will ignore it.
1831 */
1832 if (tail == BUF_PAGE_SIZE)
1833 tail_page->real_end = 0;
1834
1153 local_sub(length, &tail_page->write); 1835 local_sub(length, &tail_page->write);
1154 return; 1836 return;
1155 } 1837 }
@@ -1158,6 +1840,13 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
1158 kmemcheck_annotate_bitfield(event, bitfield); 1840 kmemcheck_annotate_bitfield(event, bitfield);
1159 1841
1160 /* 1842 /*
1843 * Save the original length to the meta data.
1844 * This will be used by the reader to add lost event
1845 * counter.
1846 */
1847 tail_page->real_end = tail;
1848
1849 /*
1161 * If this event is bigger than the minimum size, then 1850 * If this event is bigger than the minimum size, then
1162 * we need to be careful that we don't subtract the 1851 * we need to be careful that we don't subtract the
1163 * write counter enough to allow another writer to slip 1852 * write counter enough to allow another writer to slip
@@ -1184,111 +1873,108 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
1184 event->type_len = RINGBUF_TYPE_PADDING; 1873 event->type_len = RINGBUF_TYPE_PADDING;
1185 /* time delta must be non zero */ 1874 /* time delta must be non zero */
1186 event->time_delta = 1; 1875 event->time_delta = 1;
1187 /* Account for this as an entry */
1188 local_inc(&tail_page->entries);
1189 local_inc(&cpu_buffer->entries);
1190 1876
1191 /* Set write to end of buffer */ 1877 /* Set write to end of buffer */
1192 length = (tail + length) - BUF_PAGE_SIZE; 1878 length = (tail + length) - BUF_PAGE_SIZE;
1193 local_sub(length, &tail_page->write); 1879 local_sub(length, &tail_page->write);
1194} 1880}
1195 1881
1196static struct ring_buffer_event * 1882/*
1883 * This is the slow path, force gcc not to inline it.
1884 */
1885static noinline struct ring_buffer_event *
1197rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 1886rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
1198 unsigned long length, unsigned long tail, 1887 unsigned long length, unsigned long tail,
1199 struct buffer_page *commit_page, 1888 struct buffer_page *tail_page, u64 ts)
1200 struct buffer_page *tail_page, u64 *ts)
1201{ 1889{
1202 struct buffer_page *next_page, *head_page, *reader_page; 1890 struct buffer_page *commit_page = cpu_buffer->commit_page;
1203 struct ring_buffer *buffer = cpu_buffer->buffer; 1891 struct ring_buffer *buffer = cpu_buffer->buffer;
1204 bool lock_taken = false; 1892 struct buffer_page *next_page;
1205 unsigned long flags; 1893 int ret;
1206 1894
1207 next_page = tail_page; 1895 next_page = tail_page;
1208 1896
1209 local_irq_save(flags);
1210 /*
1211 * Since the write to the buffer is still not
1212 * fully lockless, we must be careful with NMIs.
1213 * The locks in the writers are taken when a write
1214 * crosses to a new page. The locks protect against
1215 * races with the readers (this will soon be fixed
1216 * with a lockless solution).
1217 *
1218 * Because we can not protect against NMIs, and we
1219 * want to keep traces reentrant, we need to manage
1220 * what happens when we are in an NMI.
1221 *
1222 * NMIs can happen after we take the lock.
1223 * If we are in an NMI, only take the lock
1224 * if it is not already taken. Otherwise
1225 * simply fail.
1226 */
1227 if (unlikely(in_nmi())) {
1228 if (!__raw_spin_trylock(&cpu_buffer->lock)) {
1229 cpu_buffer->nmi_dropped++;
1230 goto out_reset;
1231 }
1232 } else
1233 __raw_spin_lock(&cpu_buffer->lock);
1234
1235 lock_taken = true;
1236
1237 rb_inc_page(cpu_buffer, &next_page); 1897 rb_inc_page(cpu_buffer, &next_page);
1238 1898
1239 head_page = cpu_buffer->head_page;
1240 reader_page = cpu_buffer->reader_page;
1241
1242 /* we grabbed the lock before incrementing */
1243 if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
1244 goto out_reset;
1245
1246 /* 1899 /*
1247 * If for some reason, we had an interrupt storm that made 1900 * If for some reason, we had an interrupt storm that made
1248 * it all the way around the buffer, bail, and warn 1901 * it all the way around the buffer, bail, and warn
1249 * about it. 1902 * about it.
1250 */ 1903 */
1251 if (unlikely(next_page == commit_page)) { 1904 if (unlikely(next_page == commit_page)) {
1252 cpu_buffer->commit_overrun++; 1905 local_inc(&cpu_buffer->commit_overrun);
1253 goto out_reset; 1906 goto out_reset;
1254 } 1907 }
1255 1908
1256 if (next_page == head_page) { 1909 /*
1257 if (!(buffer->flags & RB_FL_OVERWRITE)) 1910 * This is where the fun begins!
1258 goto out_reset; 1911 *
1259 1912 * We are fighting against races between a reader that
1260 /* tail_page has not moved yet? */ 1913 * could be on another CPU trying to swap its reader
1261 if (tail_page == cpu_buffer->tail_page) { 1914 * page with the buffer head.
1262 /* count overflows */ 1915 *
1263 cpu_buffer->overrun += 1916 * We are also fighting against interrupts coming in and
1264 local_read(&head_page->entries); 1917 * moving the head or tail on us as well.
1918 *
1919 * If the next page is the head page then we have filled
1920 * the buffer, unless the commit page is still on the
1921 * reader page.
1922 */
1923 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
1265 1924
1266 rb_inc_page(cpu_buffer, &head_page); 1925 /*
1267 cpu_buffer->head_page = head_page; 1926 * If the commit is not on the reader page, then
1268 cpu_buffer->head_page->read = 0; 1927 * move the header page.
1928 */
1929 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
1930 /*
1931 * If we are not in overwrite mode,
1932 * this is easy, just stop here.
1933 */
1934 if (!(buffer->flags & RB_FL_OVERWRITE))
1935 goto out_reset;
1936
1937 ret = rb_handle_head_page(cpu_buffer,
1938 tail_page,
1939 next_page);
1940 if (ret < 0)
1941 goto out_reset;
1942 if (ret)
1943 goto out_again;
1944 } else {
1945 /*
1946 * We need to be careful here too. The
1947 * commit page could still be on the reader
1948 * page. We could have a small buffer, and
1949 * have filled up the buffer with events
1950 * from interrupts and such, and wrapped.
1951 *
1952 * Note, if the tail page is also the on the
1953 * reader_page, we let it move out.
1954 */
1955 if (unlikely((cpu_buffer->commit_page !=
1956 cpu_buffer->tail_page) &&
1957 (cpu_buffer->commit_page ==
1958 cpu_buffer->reader_page))) {
1959 local_inc(&cpu_buffer->commit_overrun);
1960 goto out_reset;
1961 }
1269 } 1962 }
1270 } 1963 }
1271 1964
1272 /* 1965 ret = rb_tail_page_update(cpu_buffer, tail_page, next_page);
1273 * If the tail page is still the same as what we think 1966 if (ret) {
1274 * it is, then it is up to us to update the tail 1967 /*
1275 * pointer. 1968 * Nested commits always have zero deltas, so
1276 */ 1969 * just reread the time stamp
1277 if (tail_page == cpu_buffer->tail_page) { 1970 */
1278 local_set(&next_page->write, 0); 1971 ts = rb_time_stamp(buffer);
1279 local_set(&next_page->entries, 0); 1972 next_page->page->time_stamp = ts;
1280 local_set(&next_page->page->commit, 0);
1281 cpu_buffer->tail_page = next_page;
1282
1283 /* reread the time stamp */
1284 *ts = rb_time_stamp(buffer, cpu_buffer->cpu);
1285 cpu_buffer->tail_page->page->time_stamp = *ts;
1286 } 1973 }
1287 1974
1288 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1975 out_again:
1289 1976
1290 __raw_spin_unlock(&cpu_buffer->lock); 1977 rb_reset_tail(cpu_buffer, tail_page, tail, length);
1291 local_irq_restore(flags);
1292 1978
1293 /* fail and let the caller try again */ 1979 /* fail and let the caller try again */
1294 return ERR_PTR(-EAGAIN); 1980 return ERR_PTR(-EAGAIN);
@@ -1297,48 +1983,52 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
1297 /* reset write */ 1983 /* reset write */
1298 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1984 rb_reset_tail(cpu_buffer, tail_page, tail, length);
1299 1985
1300 if (likely(lock_taken))
1301 __raw_spin_unlock(&cpu_buffer->lock);
1302 local_irq_restore(flags);
1303 return NULL; 1986 return NULL;
1304} 1987}
1305 1988
1306static struct ring_buffer_event * 1989static struct ring_buffer_event *
1307__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 1990__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
1308 unsigned type, unsigned long length, u64 *ts) 1991 unsigned long length, u64 ts,
1992 u64 delta, int add_timestamp)
1309{ 1993{
1310 struct buffer_page *tail_page, *commit_page; 1994 struct buffer_page *tail_page;
1311 struct ring_buffer_event *event; 1995 struct ring_buffer_event *event;
1312 unsigned long tail, write; 1996 unsigned long tail, write;
1313 1997
1314 commit_page = cpu_buffer->commit_page; 1998 /*
1315 /* we just need to protect against interrupts */ 1999 * If the time delta since the last event is too big to
1316 barrier(); 2000 * hold in the time field of the event, then we append a
2001 * TIME EXTEND event ahead of the data event.
2002 */
2003 if (unlikely(add_timestamp))
2004 length += RB_LEN_TIME_EXTEND;
2005
1317 tail_page = cpu_buffer->tail_page; 2006 tail_page = cpu_buffer->tail_page;
1318 write = local_add_return(length, &tail_page->write); 2007 write = local_add_return(length, &tail_page->write);
2008
2009 /* set write to only the index of the write */
2010 write &= RB_WRITE_MASK;
1319 tail = write - length; 2011 tail = write - length;
1320 2012
1321 /* See if we shot pass the end of this buffer page */ 2013 /* See if we shot pass the end of this buffer page */
1322 if (write > BUF_PAGE_SIZE) 2014 if (unlikely(write > BUF_PAGE_SIZE))
1323 return rb_move_tail(cpu_buffer, length, tail, 2015 return rb_move_tail(cpu_buffer, length, tail,
1324 commit_page, tail_page, ts); 2016 tail_page, ts);
1325 2017
1326 /* We reserved something on the buffer */ 2018 /* We reserved something on the buffer */
1327 2019
1328 event = __rb_page_index(tail_page, tail); 2020 event = __rb_page_index(tail_page, tail);
1329 kmemcheck_annotate_bitfield(event, bitfield); 2021 kmemcheck_annotate_bitfield(event, bitfield);
1330 rb_update_event(event, type, length); 2022 rb_update_event(cpu_buffer, event, length, add_timestamp, delta);
1331 2023
1332 /* The passed in type is zero for DATA */ 2024 local_inc(&tail_page->entries);
1333 if (likely(!type))
1334 local_inc(&tail_page->entries);
1335 2025
1336 /* 2026 /*
1337 * If this is the first commit on the page, then update 2027 * If this is the first commit on the page, then update
1338 * its timestamp. 2028 * its timestamp.
1339 */ 2029 */
1340 if (!tail) 2030 if (!tail)
1341 tail_page->page->time_stamp = *ts; 2031 tail_page->page->time_stamp = ts;
1342 2032
1343 return event; 2033 return event;
1344} 2034}
@@ -1353,19 +2043,23 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
1353 unsigned long addr; 2043 unsigned long addr;
1354 2044
1355 new_index = rb_event_index(event); 2045 new_index = rb_event_index(event);
1356 old_index = new_index + rb_event_length(event); 2046 old_index = new_index + rb_event_ts_length(event);
1357 addr = (unsigned long)event; 2047 addr = (unsigned long)event;
1358 addr &= PAGE_MASK; 2048 addr &= PAGE_MASK;
1359 2049
1360 bpage = cpu_buffer->tail_page; 2050 bpage = cpu_buffer->tail_page;
1361 2051
1362 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2052 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2053 unsigned long write_mask =
2054 local_read(&bpage->write) & ~RB_WRITE_MASK;
1363 /* 2055 /*
1364 * This is on the tail page. It is possible that 2056 * This is on the tail page. It is possible that
1365 * a write could come in and move the tail page 2057 * a write could come in and move the tail page
1366 * and write to the next page. That is fine 2058 * and write to the next page. That is fine
1367 * because we just shorten what is on this page. 2059 * because we just shorten what is on this page.
1368 */ 2060 */
2061 old_index += write_mask;
2062 new_index += write_mask;
1369 index = local_cmpxchg(&bpage->write, old_index, new_index); 2063 index = local_cmpxchg(&bpage->write, old_index, new_index);
1370 if (index == old_index) 2064 if (index == old_index)
1371 return 1; 2065 return 1;
@@ -1375,80 +2069,13 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
1375 return 0; 2069 return 0;
1376} 2070}
1377 2071
1378static int
1379rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1380 u64 *ts, u64 *delta)
1381{
1382 struct ring_buffer_event *event;
1383 static int once;
1384 int ret;
1385
1386 if (unlikely(*delta > (1ULL << 59) && !once++)) {
1387 printk(KERN_WARNING "Delta way too big! %llu"
1388 " ts=%llu write stamp = %llu\n",
1389 (unsigned long long)*delta,
1390 (unsigned long long)*ts,
1391 (unsigned long long)cpu_buffer->write_stamp);
1392 WARN_ON(1);
1393 }
1394
1395 /*
1396 * The delta is too big, we to add a
1397 * new timestamp.
1398 */
1399 event = __rb_reserve_next(cpu_buffer,
1400 RINGBUF_TYPE_TIME_EXTEND,
1401 RB_LEN_TIME_EXTEND,
1402 ts);
1403 if (!event)
1404 return -EBUSY;
1405
1406 if (PTR_ERR(event) == -EAGAIN)
1407 return -EAGAIN;
1408
1409 /* Only a commited time event can update the write stamp */
1410 if (rb_event_is_commit(cpu_buffer, event)) {
1411 /*
1412 * If this is the first on the page, then it was
1413 * updated with the page itself. Try to discard it
1414 * and if we can't just make it zero.
1415 */
1416 if (rb_event_index(event)) {
1417 event->time_delta = *delta & TS_MASK;
1418 event->array[0] = *delta >> TS_SHIFT;
1419 } else {
1420 /* try to discard, since we do not need this */
1421 if (!rb_try_to_discard(cpu_buffer, event)) {
1422 /* nope, just zero it */
1423 event->time_delta = 0;
1424 event->array[0] = 0;
1425 }
1426 }
1427 cpu_buffer->write_stamp = *ts;
1428 /* let the caller know this was the commit */
1429 ret = 1;
1430 } else {
1431 /* Try to discard the event */
1432 if (!rb_try_to_discard(cpu_buffer, event)) {
1433 /* Darn, this is just wasted space */
1434 event->time_delta = 0;
1435 event->array[0] = 0;
1436 }
1437 ret = 0;
1438 }
1439
1440 *delta = 0;
1441
1442 return ret;
1443}
1444
1445static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2072static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
1446{ 2073{
1447 local_inc(&cpu_buffer->committing); 2074 local_inc(&cpu_buffer->committing);
1448 local_inc(&cpu_buffer->commits); 2075 local_inc(&cpu_buffer->commits);
1449} 2076}
1450 2077
1451static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2078static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
1452{ 2079{
1453 unsigned long commits; 2080 unsigned long commits;
1454 2081
@@ -1481,18 +2108,38 @@ static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
1481} 2108}
1482 2109
1483static struct ring_buffer_event * 2110static struct ring_buffer_event *
1484rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, 2111rb_reserve_next_event(struct ring_buffer *buffer,
2112 struct ring_buffer_per_cpu *cpu_buffer,
1485 unsigned long length) 2113 unsigned long length)
1486{ 2114{
1487 struct ring_buffer_event *event; 2115 struct ring_buffer_event *event;
1488 u64 ts, delta = 0; 2116 u64 ts, delta;
1489 int commit = 0;
1490 int nr_loops = 0; 2117 int nr_loops = 0;
2118 int add_timestamp;
2119 u64 diff;
1491 2120
1492 rb_start_commit(cpu_buffer); 2121 rb_start_commit(cpu_buffer);
1493 2122
2123#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
2124 /*
2125 * Due to the ability to swap a cpu buffer from a buffer
2126 * it is possible it was swapped before we committed.
2127 * (committing stops a swap). We check for it here and
2128 * if it happened, we have to fail the write.
2129 */
2130 barrier();
2131 if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
2132 local_dec(&cpu_buffer->committing);
2133 local_dec(&cpu_buffer->commits);
2134 return NULL;
2135 }
2136#endif
2137
1494 length = rb_calculate_event_length(length); 2138 length = rb_calculate_event_length(length);
1495 again: 2139 again:
2140 add_timestamp = 0;
2141 delta = 0;
2142
1496 /* 2143 /*
1497 * We allow for interrupts to reenter here and do a trace. 2144 * We allow for interrupts to reenter here and do a trace.
1498 * If one does, it will cause this original code to loop 2145 * If one does, it will cause this original code to loop
@@ -1505,57 +2152,33 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1505 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 2152 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
1506 goto out_fail; 2153 goto out_fail;
1507 2154
1508 ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu); 2155 ts = rb_time_stamp(cpu_buffer->buffer);
2156 diff = ts - cpu_buffer->write_stamp;
1509 2157
1510 /* 2158 /* make sure this diff is calculated here */
1511 * Only the first commit can update the timestamp. 2159 barrier();
1512 * Yes there is a race here. If an interrupt comes in
1513 * just after the conditional and it traces too, then it
1514 * will also check the deltas. More than one timestamp may
1515 * also be made. But only the entry that did the actual
1516 * commit will be something other than zero.
1517 */
1518 if (likely(cpu_buffer->tail_page == cpu_buffer->commit_page &&
1519 rb_page_write(cpu_buffer->tail_page) ==
1520 rb_commit_index(cpu_buffer))) {
1521 u64 diff;
1522
1523 diff = ts - cpu_buffer->write_stamp;
1524
1525 /* make sure this diff is calculated here */
1526 barrier();
1527
1528 /* Did the write stamp get updated already? */
1529 if (unlikely(ts < cpu_buffer->write_stamp))
1530 goto get_event;
1531 2160
2161 /* Did the write stamp get updated already? */
2162 if (likely(ts >= cpu_buffer->write_stamp)) {
1532 delta = diff; 2163 delta = diff;
1533 if (unlikely(test_time_stamp(delta))) { 2164 if (unlikely(test_time_stamp(delta))) {
1534 2165 WARN_ONCE(delta > (1ULL << 59),
1535 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 2166 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n",
1536 if (commit == -EBUSY) 2167 (unsigned long long)delta,
1537 goto out_fail; 2168 (unsigned long long)ts,
1538 2169 (unsigned long long)cpu_buffer->write_stamp);
1539 if (commit == -EAGAIN) 2170 add_timestamp = 1;
1540 goto again;
1541
1542 RB_WARN_ON(cpu_buffer, commit < 0);
1543 } 2171 }
1544 } 2172 }
1545 2173
1546 get_event: 2174 event = __rb_reserve_next(cpu_buffer, length, ts,
1547 event = __rb_reserve_next(cpu_buffer, 0, length, &ts); 2175 delta, add_timestamp);
1548 if (unlikely(PTR_ERR(event) == -EAGAIN)) 2176 if (unlikely(PTR_ERR(event) == -EAGAIN))
1549 goto again; 2177 goto again;
1550 2178
1551 if (!event) 2179 if (!event)
1552 goto out_fail; 2180 goto out_fail;
1553 2181
1554 if (!rb_event_is_commit(cpu_buffer, event))
1555 delta = 0;
1556
1557 event->time_delta = delta;
1558
1559 return event; 2182 return event;
1560 2183
1561 out_fail: 2184 out_fail:
@@ -1563,15 +2186,13 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1563 return NULL; 2186 return NULL;
1564} 2187}
1565 2188
2189#ifdef CONFIG_TRACING
2190
1566#define TRACE_RECURSIVE_DEPTH 16 2191#define TRACE_RECURSIVE_DEPTH 16
1567 2192
1568static int trace_recursive_lock(void) 2193/* Keep this code out of the fast path cache */
2194static noinline void trace_recursive_fail(void)
1569{ 2195{
1570 current->trace_recursion++;
1571
1572 if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH))
1573 return 0;
1574
1575 /* Disable all tracing before we do anything else */ 2196 /* Disable all tracing before we do anything else */
1576 tracing_off_permanent(); 2197 tracing_off_permanent();
1577 2198
@@ -1583,17 +2204,33 @@ static int trace_recursive_lock(void)
1583 in_nmi()); 2204 in_nmi());
1584 2205
1585 WARN_ON_ONCE(1); 2206 WARN_ON_ONCE(1);
2207}
2208
2209static inline int trace_recursive_lock(void)
2210{
2211 current->trace_recursion++;
2212
2213 if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH))
2214 return 0;
2215
2216 trace_recursive_fail();
2217
1586 return -1; 2218 return -1;
1587} 2219}
1588 2220
1589static void trace_recursive_unlock(void) 2221static inline void trace_recursive_unlock(void)
1590{ 2222{
1591 WARN_ON_ONCE(!current->trace_recursion); 2223 WARN_ON_ONCE(!current->trace_recursion);
1592 2224
1593 current->trace_recursion--; 2225 current->trace_recursion--;
1594} 2226}
1595 2227
1596static DEFINE_PER_CPU(int, rb_need_resched); 2228#else
2229
2230#define trace_recursive_lock() (0)
2231#define trace_recursive_unlock() do { } while (0)
2232
2233#endif
1597 2234
1598/** 2235/**
1599 * ring_buffer_lock_reserve - reserve a part of the buffer 2236 * ring_buffer_lock_reserve - reserve a part of the buffer
@@ -1615,16 +2252,16 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
1615{ 2252{
1616 struct ring_buffer_per_cpu *cpu_buffer; 2253 struct ring_buffer_per_cpu *cpu_buffer;
1617 struct ring_buffer_event *event; 2254 struct ring_buffer_event *event;
1618 int cpu, resched; 2255 int cpu;
1619 2256
1620 if (ring_buffer_flags != RB_BUFFERS_ON) 2257 if (ring_buffer_flags != RB_BUFFERS_ON)
1621 return NULL; 2258 return NULL;
1622 2259
1623 if (atomic_read(&buffer->record_disabled))
1624 return NULL;
1625
1626 /* If we are tracing schedule, we don't want to recurse */ 2260 /* If we are tracing schedule, we don't want to recurse */
1627 resched = ftrace_preempt_disable(); 2261 preempt_disable_notrace();
2262
2263 if (atomic_read(&buffer->record_disabled))
2264 goto out_nocheck;
1628 2265
1629 if (trace_recursive_lock()) 2266 if (trace_recursive_lock())
1630 goto out_nocheck; 2267 goto out_nocheck;
@@ -1642,41 +2279,54 @@ ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
1642 if (length > BUF_MAX_DATA_SIZE) 2279 if (length > BUF_MAX_DATA_SIZE)
1643 goto out; 2280 goto out;
1644 2281
1645 event = rb_reserve_next_event(cpu_buffer, length); 2282 event = rb_reserve_next_event(buffer, cpu_buffer, length);
1646 if (!event) 2283 if (!event)
1647 goto out; 2284 goto out;
1648 2285
1649 /*
1650 * Need to store resched state on this cpu.
1651 * Only the first needs to.
1652 */
1653
1654 if (preempt_count() == 1)
1655 per_cpu(rb_need_resched, cpu) = resched;
1656
1657 return event; 2286 return event;
1658 2287
1659 out: 2288 out:
1660 trace_recursive_unlock(); 2289 trace_recursive_unlock();
1661 2290
1662 out_nocheck: 2291 out_nocheck:
1663 ftrace_preempt_enable(resched); 2292 preempt_enable_notrace();
1664 return NULL; 2293 return NULL;
1665} 2294}
1666EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 2295EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
1667 2296
1668static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2297static void
2298rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1669 struct ring_buffer_event *event) 2299 struct ring_buffer_event *event)
1670{ 2300{
1671 local_inc(&cpu_buffer->entries); 2301 u64 delta;
1672 2302
1673 /* 2303 /*
1674 * The event first in the commit queue updates the 2304 * The event first in the commit queue updates the
1675 * time stamp. 2305 * time stamp.
1676 */ 2306 */
1677 if (rb_event_is_commit(cpu_buffer, event)) 2307 if (rb_event_is_commit(cpu_buffer, event)) {
1678 cpu_buffer->write_stamp += event->time_delta; 2308 /*
2309 * A commit event that is first on a page
2310 * updates the write timestamp with the page stamp
2311 */
2312 if (!rb_event_index(event))
2313 cpu_buffer->write_stamp =
2314 cpu_buffer->commit_page->page->time_stamp;
2315 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
2316 delta = event->array[0];
2317 delta <<= TS_SHIFT;
2318 delta += event->time_delta;
2319 cpu_buffer->write_stamp += delta;
2320 } else
2321 cpu_buffer->write_stamp += event->time_delta;
2322 }
2323}
1679 2324
2325static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
2326 struct ring_buffer_event *event)
2327{
2328 local_inc(&cpu_buffer->entries);
2329 rb_update_write_stamp(cpu_buffer, event);
1680 rb_end_commit(cpu_buffer); 2330 rb_end_commit(cpu_buffer);
1681} 2331}
1682 2332
@@ -1701,13 +2351,7 @@ int ring_buffer_unlock_commit(struct ring_buffer *buffer,
1701 2351
1702 trace_recursive_unlock(); 2352 trace_recursive_unlock();
1703 2353
1704 /* 2354 preempt_enable_notrace();
1705 * Only the last preempt count needs to restore preemption.
1706 */
1707 if (preempt_count() == 1)
1708 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1709 else
1710 preempt_enable_no_resched_notrace();
1711 2355
1712 return 0; 2356 return 0;
1713} 2357}
@@ -1715,6 +2359,9 @@ EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
1715 2359
1716static inline void rb_event_discard(struct ring_buffer_event *event) 2360static inline void rb_event_discard(struct ring_buffer_event *event)
1717{ 2361{
2362 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
2363 event = skip_time_extend(event);
2364
1718 /* array[0] holds the actual length for the discarded event */ 2365 /* array[0] holds the actual length for the discarded event */
1719 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2366 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
1720 event->type_len = RINGBUF_TYPE_PADDING; 2367 event->type_len = RINGBUF_TYPE_PADDING;
@@ -1723,32 +2370,57 @@ static inline void rb_event_discard(struct ring_buffer_event *event)
1723 event->time_delta = 1; 2370 event->time_delta = 1;
1724} 2371}
1725 2372
1726/** 2373/*
1727 * ring_buffer_event_discard - discard any event in the ring buffer 2374 * Decrement the entries to the page that an event is on.
1728 * @event: the event to discard 2375 * The event does not even need to exist, only the pointer
1729 * 2376 * to the page it is on. This may only be called before the commit
1730 * Sometimes a event that is in the ring buffer needs to be ignored. 2377 * takes place.
1731 * This function lets the user discard an event in the ring buffer
1732 * and then that event will not be read later.
1733 *
1734 * Note, it is up to the user to be careful with this, and protect
1735 * against races. If the user discards an event that has been consumed
1736 * it is possible that it could corrupt the ring buffer.
1737 */ 2378 */
1738void ring_buffer_event_discard(struct ring_buffer_event *event) 2379static inline void
2380rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
2381 struct ring_buffer_event *event)
1739{ 2382{
1740 rb_event_discard(event); 2383 unsigned long addr = (unsigned long)event;
2384 struct buffer_page *bpage = cpu_buffer->commit_page;
2385 struct buffer_page *start;
2386
2387 addr &= PAGE_MASK;
2388
2389 /* Do the likely case first */
2390 if (likely(bpage->page == (void *)addr)) {
2391 local_dec(&bpage->entries);
2392 return;
2393 }
2394
2395 /*
2396 * Because the commit page may be on the reader page we
2397 * start with the next page and check the end loop there.
2398 */
2399 rb_inc_page(cpu_buffer, &bpage);
2400 start = bpage;
2401 do {
2402 if (bpage->page == (void *)addr) {
2403 local_dec(&bpage->entries);
2404 return;
2405 }
2406 rb_inc_page(cpu_buffer, &bpage);
2407 } while (bpage != start);
2408
2409 /* commit not part of this buffer?? */
2410 RB_WARN_ON(cpu_buffer, 1);
1741} 2411}
1742EXPORT_SYMBOL_GPL(ring_buffer_event_discard);
1743 2412
1744/** 2413/**
1745 * ring_buffer_commit_discard - discard an event that has not been committed 2414 * ring_buffer_commit_discard - discard an event that has not been committed
1746 * @buffer: the ring buffer 2415 * @buffer: the ring buffer
1747 * @event: non committed event to discard 2416 * @event: non committed event to discard
1748 * 2417 *
1749 * This is similar to ring_buffer_event_discard but must only be 2418 * Sometimes an event that is in the ring buffer needs to be ignored.
1750 * performed on an event that has not been committed yet. The difference 2419 * This function lets the user discard an event in the ring buffer
1751 * is that this will also try to free the event from the ring buffer 2420 * and then that event will not be read later.
2421 *
2422 * This function only works if it is called before the the item has been
2423 * committed. It will try to free the event from the ring buffer
1752 * if another event has not been added behind it. 2424 * if another event has not been added behind it.
1753 * 2425 *
1754 * If another event has been added behind it, it will set the event 2426 * If another event has been added behind it, it will set the event
@@ -1776,26 +2448,21 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer,
1776 */ 2448 */
1777 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 2449 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
1778 2450
1779 if (!rb_try_to_discard(cpu_buffer, event)) 2451 rb_decrement_entry(cpu_buffer, event);
2452 if (rb_try_to_discard(cpu_buffer, event))
1780 goto out; 2453 goto out;
1781 2454
1782 /* 2455 /*
1783 * The commit is still visible by the reader, so we 2456 * The commit is still visible by the reader, so we
1784 * must increment entries. 2457 * must still update the timestamp.
1785 */ 2458 */
1786 local_inc(&cpu_buffer->entries); 2459 rb_update_write_stamp(cpu_buffer, event);
1787 out: 2460 out:
1788 rb_end_commit(cpu_buffer); 2461 rb_end_commit(cpu_buffer);
1789 2462
1790 trace_recursive_unlock(); 2463 trace_recursive_unlock();
1791 2464
1792 /* 2465 preempt_enable_notrace();
1793 * Only the last preempt count needs to restore preemption.
1794 */
1795 if (preempt_count() == 1)
1796 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1797 else
1798 preempt_enable_no_resched_notrace();
1799 2466
1800} 2467}
1801EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 2468EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
@@ -1821,15 +2488,15 @@ int ring_buffer_write(struct ring_buffer *buffer,
1821 struct ring_buffer_event *event; 2488 struct ring_buffer_event *event;
1822 void *body; 2489 void *body;
1823 int ret = -EBUSY; 2490 int ret = -EBUSY;
1824 int cpu, resched; 2491 int cpu;
1825 2492
1826 if (ring_buffer_flags != RB_BUFFERS_ON) 2493 if (ring_buffer_flags != RB_BUFFERS_ON)
1827 return -EBUSY; 2494 return -EBUSY;
1828 2495
1829 if (atomic_read(&buffer->record_disabled)) 2496 preempt_disable_notrace();
1830 return -EBUSY;
1831 2497
1832 resched = ftrace_preempt_disable(); 2498 if (atomic_read(&buffer->record_disabled))
2499 goto out;
1833 2500
1834 cpu = raw_smp_processor_id(); 2501 cpu = raw_smp_processor_id();
1835 2502
@@ -1844,7 +2511,7 @@ int ring_buffer_write(struct ring_buffer *buffer,
1844 if (length > BUF_MAX_DATA_SIZE) 2511 if (length > BUF_MAX_DATA_SIZE)
1845 goto out; 2512 goto out;
1846 2513
1847 event = rb_reserve_next_event(cpu_buffer, length); 2514 event = rb_reserve_next_event(buffer, cpu_buffer, length);
1848 if (!event) 2515 if (!event)
1849 goto out; 2516 goto out;
1850 2517
@@ -1856,7 +2523,7 @@ int ring_buffer_write(struct ring_buffer *buffer,
1856 2523
1857 ret = 0; 2524 ret = 0;
1858 out: 2525 out:
1859 ftrace_preempt_enable(resched); 2526 preempt_enable_notrace();
1860 2527
1861 return ret; 2528 return ret;
1862} 2529}
@@ -1865,9 +2532,13 @@ EXPORT_SYMBOL_GPL(ring_buffer_write);
1865static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 2532static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
1866{ 2533{
1867 struct buffer_page *reader = cpu_buffer->reader_page; 2534 struct buffer_page *reader = cpu_buffer->reader_page;
1868 struct buffer_page *head = cpu_buffer->head_page; 2535 struct buffer_page *head = rb_set_head_page(cpu_buffer);
1869 struct buffer_page *commit = cpu_buffer->commit_page; 2536 struct buffer_page *commit = cpu_buffer->commit_page;
1870 2537
2538 /* In case of error, head will be NULL */
2539 if (unlikely(!head))
2540 return 1;
2541
1871 return reader->read == rb_page_commit(reader) && 2542 return reader->read == rb_page_commit(reader) &&
1872 (commit == reader || 2543 (commit == reader ||
1873 (commit == head && 2544 (commit == head &&
@@ -1894,7 +2565,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
1894 * @buffer: The ring buffer to enable writes 2565 * @buffer: The ring buffer to enable writes
1895 * 2566 *
1896 * Note, multiple disables will need the same number of enables 2567 * Note, multiple disables will need the same number of enables
1897 * to truely enable the writing (much like preempt_disable). 2568 * to truly enable the writing (much like preempt_disable).
1898 */ 2569 */
1899void ring_buffer_record_enable(struct ring_buffer *buffer) 2570void ring_buffer_record_enable(struct ring_buffer *buffer)
1900{ 2571{
@@ -1930,7 +2601,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
1930 * @cpu: The CPU to enable. 2601 * @cpu: The CPU to enable.
1931 * 2602 *
1932 * Note, multiple disables will need the same number of enables 2603 * Note, multiple disables will need the same number of enables
1933 * to truely enable the writing (much like preempt_disable). 2604 * to truly enable the writing (much like preempt_disable).
1934 */ 2605 */
1935void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 2606void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1936{ 2607{
@@ -1944,6 +2615,19 @@ void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1944} 2615}
1945EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 2616EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
1946 2617
2618/*
2619 * The total entries in the ring buffer is the running counter
2620 * of entries entered into the ring buffer, minus the sum of
2621 * the entries read from the ring buffer and the number of
2622 * entries that were overwritten.
2623 */
2624static inline unsigned long
2625rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
2626{
2627 return local_read(&cpu_buffer->entries) -
2628 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
2629}
2630
1947/** 2631/**
1948 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 2632 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1949 * @buffer: The ring buffer 2633 * @buffer: The ring buffer
@@ -1952,16 +2636,13 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
1952unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 2636unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1953{ 2637{
1954 struct ring_buffer_per_cpu *cpu_buffer; 2638 struct ring_buffer_per_cpu *cpu_buffer;
1955 unsigned long ret;
1956 2639
1957 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2640 if (!cpumask_test_cpu(cpu, buffer->cpumask))
1958 return 0; 2641 return 0;
1959 2642
1960 cpu_buffer = buffer->buffers[cpu]; 2643 cpu_buffer = buffer->buffers[cpu];
1961 ret = (local_read(&cpu_buffer->entries) - cpu_buffer->overrun)
1962 - cpu_buffer->read;
1963 2644
1964 return ret; 2645 return rb_num_of_entries(cpu_buffer);
1965} 2646}
1966EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 2647EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
1967 2648
@@ -1979,33 +2660,13 @@ unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1979 return 0; 2660 return 0;
1980 2661
1981 cpu_buffer = buffer->buffers[cpu]; 2662 cpu_buffer = buffer->buffers[cpu];
1982 ret = cpu_buffer->overrun; 2663 ret = local_read(&cpu_buffer->overrun);
1983 2664
1984 return ret; 2665 return ret;
1985} 2666}
1986EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 2667EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
1987 2668
1988/** 2669/**
1989 * ring_buffer_nmi_dropped_cpu - get the number of nmis that were dropped
1990 * @buffer: The ring buffer
1991 * @cpu: The per CPU buffer to get the number of overruns from
1992 */
1993unsigned long ring_buffer_nmi_dropped_cpu(struct ring_buffer *buffer, int cpu)
1994{
1995 struct ring_buffer_per_cpu *cpu_buffer;
1996 unsigned long ret;
1997
1998 if (!cpumask_test_cpu(cpu, buffer->cpumask))
1999 return 0;
2000
2001 cpu_buffer = buffer->buffers[cpu];
2002 ret = cpu_buffer->nmi_dropped;
2003
2004 return ret;
2005}
2006EXPORT_SYMBOL_GPL(ring_buffer_nmi_dropped_cpu);
2007
2008/**
2009 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits 2670 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits
2010 * @buffer: The ring buffer 2671 * @buffer: The ring buffer
2011 * @cpu: The per CPU buffer to get the number of overruns from 2672 * @cpu: The per CPU buffer to get the number of overruns from
@@ -2020,7 +2681,7 @@ ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)
2020 return 0; 2681 return 0;
2021 2682
2022 cpu_buffer = buffer->buffers[cpu]; 2683 cpu_buffer = buffer->buffers[cpu];
2023 ret = cpu_buffer->commit_overrun; 2684 ret = local_read(&cpu_buffer->commit_overrun);
2024 2685
2025 return ret; 2686 return ret;
2026} 2687}
@@ -2042,8 +2703,7 @@ unsigned long ring_buffer_entries(struct ring_buffer *buffer)
2042 /* if you care about this being correct, lock the buffer */ 2703 /* if you care about this being correct, lock the buffer */
2043 for_each_buffer_cpu(buffer, cpu) { 2704 for_each_buffer_cpu(buffer, cpu) {
2044 cpu_buffer = buffer->buffers[cpu]; 2705 cpu_buffer = buffer->buffers[cpu];
2045 entries += (local_read(&cpu_buffer->entries) - 2706 entries += rb_num_of_entries(cpu_buffer);
2046 cpu_buffer->overrun) - cpu_buffer->read;
2047 } 2707 }
2048 2708
2049 return entries; 2709 return entries;
@@ -2051,7 +2711,7 @@ unsigned long ring_buffer_entries(struct ring_buffer *buffer)
2051EXPORT_SYMBOL_GPL(ring_buffer_entries); 2711EXPORT_SYMBOL_GPL(ring_buffer_entries);
2052 2712
2053/** 2713/**
2054 * ring_buffer_overrun_cpu - get the number of overruns in buffer 2714 * ring_buffer_overruns - get the number of overruns in buffer
2055 * @buffer: The ring buffer 2715 * @buffer: The ring buffer
2056 * 2716 *
2057 * Returns the total number of overruns in the ring buffer 2717 * Returns the total number of overruns in the ring buffer
@@ -2066,7 +2726,7 @@ unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
2066 /* if you care about this being correct, lock the buffer */ 2726 /* if you care about this being correct, lock the buffer */
2067 for_each_buffer_cpu(buffer, cpu) { 2727 for_each_buffer_cpu(buffer, cpu) {
2068 cpu_buffer = buffer->buffers[cpu]; 2728 cpu_buffer = buffer->buffers[cpu];
2069 overruns += cpu_buffer->overrun; 2729 overruns += local_read(&cpu_buffer->overrun);
2070 } 2730 }
2071 2731
2072 return overruns; 2732 return overruns;
@@ -2079,8 +2739,10 @@ static void rb_iter_reset(struct ring_buffer_iter *iter)
2079 2739
2080 /* Iterator usage is expected to have record disabled */ 2740 /* Iterator usage is expected to have record disabled */
2081 if (list_empty(&cpu_buffer->reader_page->list)) { 2741 if (list_empty(&cpu_buffer->reader_page->list)) {
2082 iter->head_page = cpu_buffer->head_page; 2742 iter->head_page = rb_set_head_page(cpu_buffer);
2083 iter->head = cpu_buffer->head_page->read; 2743 if (unlikely(!iter->head_page))
2744 return;
2745 iter->head = iter->head_page->read;
2084 } else { 2746 } else {
2085 iter->head_page = cpu_buffer->reader_page; 2747 iter->head_page = cpu_buffer->reader_page;
2086 iter->head = cpu_buffer->reader_page->read; 2748 iter->head = cpu_buffer->reader_page->read;
@@ -2089,6 +2751,8 @@ static void rb_iter_reset(struct ring_buffer_iter *iter)
2089 iter->read_stamp = cpu_buffer->read_stamp; 2751 iter->read_stamp = cpu_buffer->read_stamp;
2090 else 2752 else
2091 iter->read_stamp = iter->head_page->page->time_stamp; 2753 iter->read_stamp = iter->head_page->page->time_stamp;
2754 iter->cache_reader_page = cpu_buffer->reader_page;
2755 iter->cache_read = cpu_buffer->read;
2092} 2756}
2093 2757
2094/** 2758/**
@@ -2195,11 +2859,13 @@ static struct buffer_page *
2195rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2859rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
2196{ 2860{
2197 struct buffer_page *reader = NULL; 2861 struct buffer_page *reader = NULL;
2862 unsigned long overwrite;
2198 unsigned long flags; 2863 unsigned long flags;
2199 int nr_loops = 0; 2864 int nr_loops = 0;
2865 int ret;
2200 2866
2201 local_irq_save(flags); 2867 local_irq_save(flags);
2202 __raw_spin_lock(&cpu_buffer->lock); 2868 arch_spin_lock(&cpu_buffer->lock);
2203 2869
2204 again: 2870 again:
2205 /* 2871 /*
@@ -2230,39 +2896,83 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
2230 goto out; 2896 goto out;
2231 2897
2232 /* 2898 /*
2233 * Splice the empty reader page into the list around the head.
2234 * Reset the reader page to size zero. 2899 * Reset the reader page to size zero.
2235 */ 2900 */
2236
2237 reader = cpu_buffer->head_page;
2238 cpu_buffer->reader_page->list.next = reader->list.next;
2239 cpu_buffer->reader_page->list.prev = reader->list.prev;
2240
2241 local_set(&cpu_buffer->reader_page->write, 0); 2901 local_set(&cpu_buffer->reader_page->write, 0);
2242 local_set(&cpu_buffer->reader_page->entries, 0); 2902 local_set(&cpu_buffer->reader_page->entries, 0);
2243 local_set(&cpu_buffer->reader_page->page->commit, 0); 2903 local_set(&cpu_buffer->reader_page->page->commit, 0);
2904 cpu_buffer->reader_page->real_end = 0;
2905
2906 spin:
2907 /*
2908 * Splice the empty reader page into the list around the head.
2909 */
2910 reader = rb_set_head_page(cpu_buffer);
2911 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
2912 cpu_buffer->reader_page->list.prev = reader->list.prev;
2913
2914 /*
2915 * cpu_buffer->pages just needs to point to the buffer, it
2916 * has no specific buffer page to point to. Lets move it out
2917 * of our way so we don't accidently swap it.
2918 */
2919 cpu_buffer->pages = reader->list.prev;
2244 2920
2245 /* Make the reader page now replace the head */ 2921 /* The reader page will be pointing to the new head */
2246 reader->list.prev->next = &cpu_buffer->reader_page->list; 2922 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
2247 reader->list.next->prev = &cpu_buffer->reader_page->list;
2248 2923
2249 /* 2924 /*
2250 * If the tail is on the reader, then we must set the head 2925 * We want to make sure we read the overruns after we set up our
2251 * to the inserted page, otherwise we set it one before. 2926 * pointers to the next object. The writer side does a
2927 * cmpxchg to cross pages which acts as the mb on the writer
2928 * side. Note, the reader will constantly fail the swap
2929 * while the writer is updating the pointers, so this
2930 * guarantees that the overwrite recorded here is the one we
2931 * want to compare with the last_overrun.
2252 */ 2932 */
2253 cpu_buffer->head_page = cpu_buffer->reader_page; 2933 smp_mb();
2934 overwrite = local_read(&(cpu_buffer->overrun));
2254 2935
2255 if (cpu_buffer->commit_page != reader) 2936 /*
2256 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 2937 * Here's the tricky part.
2938 *
2939 * We need to move the pointer past the header page.
2940 * But we can only do that if a writer is not currently
2941 * moving it. The page before the header page has the
2942 * flag bit '1' set if it is pointing to the page we want.
2943 * but if the writer is in the process of moving it
2944 * than it will be '2' or already moved '0'.
2945 */
2946
2947 ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
2948
2949 /*
2950 * If we did not convert it, then we must try again.
2951 */
2952 if (!ret)
2953 goto spin;
2954
2955 /*
2956 * Yeah! We succeeded in replacing the page.
2957 *
2958 * Now make the new head point back to the reader page.
2959 */
2960 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
2961 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
2257 2962
2258 /* Finally update the reader page to the new head */ 2963 /* Finally update the reader page to the new head */
2259 cpu_buffer->reader_page = reader; 2964 cpu_buffer->reader_page = reader;
2260 rb_reset_reader_page(cpu_buffer); 2965 rb_reset_reader_page(cpu_buffer);
2261 2966
2967 if (overwrite != cpu_buffer->last_overrun) {
2968 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
2969 cpu_buffer->last_overrun = overwrite;
2970 }
2971
2262 goto again; 2972 goto again;
2263 2973
2264 out: 2974 out:
2265 __raw_spin_unlock(&cpu_buffer->lock); 2975 arch_spin_unlock(&cpu_buffer->lock);
2266 local_irq_restore(flags); 2976 local_irq_restore(flags);
2267 2977
2268 return reader; 2978 return reader;
@@ -2282,8 +2992,7 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
2282 2992
2283 event = rb_reader_event(cpu_buffer); 2993 event = rb_reader_event(cpu_buffer);
2284 2994
2285 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX 2995 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
2286 || rb_discarded_event(event))
2287 cpu_buffer->read++; 2996 cpu_buffer->read++;
2288 2997
2289 rb_update_read_stamp(cpu_buffer, event); 2998 rb_update_read_stamp(cpu_buffer, event);
@@ -2294,13 +3003,11 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
2294 3003
2295static void rb_advance_iter(struct ring_buffer_iter *iter) 3004static void rb_advance_iter(struct ring_buffer_iter *iter)
2296{ 3005{
2297 struct ring_buffer *buffer;
2298 struct ring_buffer_per_cpu *cpu_buffer; 3006 struct ring_buffer_per_cpu *cpu_buffer;
2299 struct ring_buffer_event *event; 3007 struct ring_buffer_event *event;
2300 unsigned length; 3008 unsigned length;
2301 3009
2302 cpu_buffer = iter->cpu_buffer; 3010 cpu_buffer = iter->cpu_buffer;
2303 buffer = cpu_buffer->buffer;
2304 3011
2305 /* 3012 /*
2306 * Check if we are at the end of the buffer. 3013 * Check if we are at the end of the buffer.
@@ -2336,24 +3043,27 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)
2336 rb_advance_iter(iter); 3043 rb_advance_iter(iter);
2337} 3044}
2338 3045
3046static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
3047{
3048 return cpu_buffer->lost_events;
3049}
3050
2339static struct ring_buffer_event * 3051static struct ring_buffer_event *
2340rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 3052rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
3053 unsigned long *lost_events)
2341{ 3054{
2342 struct ring_buffer_per_cpu *cpu_buffer;
2343 struct ring_buffer_event *event; 3055 struct ring_buffer_event *event;
2344 struct buffer_page *reader; 3056 struct buffer_page *reader;
2345 int nr_loops = 0; 3057 int nr_loops = 0;
2346 3058
2347 cpu_buffer = buffer->buffers[cpu];
2348
2349 again: 3059 again:
2350 /* 3060 /*
2351 * We repeat when a timestamp is encountered. It is possible 3061 * We repeat when a time extend is encountered.
2352 * to get multiple timestamps from an interrupt entering just 3062 * Since the time extend is always attached to a data event,
2353 * as one timestamp is about to be written, or from discarded 3063 * we should never loop more than once.
2354 * commits. The most that we can have is the number on a single page. 3064 * (We never hit the following condition more than twice).
2355 */ 3065 */
2356 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 3066 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
2357 return NULL; 3067 return NULL;
2358 3068
2359 reader = rb_get_reader_page(cpu_buffer); 3069 reader = rb_get_reader_page(cpu_buffer);
@@ -2374,7 +3084,6 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
2374 * the box. Return the padding, and we will release 3084 * the box. Return the padding, and we will release
2375 * the current locks, and try again. 3085 * the current locks, and try again.
2376 */ 3086 */
2377 rb_advance_reader(cpu_buffer);
2378 return event; 3087 return event;
2379 3088
2380 case RINGBUF_TYPE_TIME_EXTEND: 3089 case RINGBUF_TYPE_TIME_EXTEND:
@@ -2390,9 +3099,11 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
2390 case RINGBUF_TYPE_DATA: 3099 case RINGBUF_TYPE_DATA:
2391 if (ts) { 3100 if (ts) {
2392 *ts = cpu_buffer->read_stamp + event->time_delta; 3101 *ts = cpu_buffer->read_stamp + event->time_delta;
2393 ring_buffer_normalize_time_stamp(buffer, 3102 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
2394 cpu_buffer->cpu, ts); 3103 cpu_buffer->cpu, ts);
2395 } 3104 }
3105 if (lost_events)
3106 *lost_events = rb_lost_events(cpu_buffer);
2396 return event; 3107 return event;
2397 3108
2398 default: 3109 default:
@@ -2411,27 +3122,39 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
2411 struct ring_buffer_event *event; 3122 struct ring_buffer_event *event;
2412 int nr_loops = 0; 3123 int nr_loops = 0;
2413 3124
2414 if (ring_buffer_iter_empty(iter))
2415 return NULL;
2416
2417 cpu_buffer = iter->cpu_buffer; 3125 cpu_buffer = iter->cpu_buffer;
2418 buffer = cpu_buffer->buffer; 3126 buffer = cpu_buffer->buffer;
2419 3127
3128 /*
3129 * Check if someone performed a consuming read to
3130 * the buffer. A consuming read invalidates the iterator
3131 * and we need to reset the iterator in this case.
3132 */
3133 if (unlikely(iter->cache_read != cpu_buffer->read ||
3134 iter->cache_reader_page != cpu_buffer->reader_page))
3135 rb_iter_reset(iter);
3136
2420 again: 3137 again:
3138 if (ring_buffer_iter_empty(iter))
3139 return NULL;
3140
2421 /* 3141 /*
2422 * We repeat when a timestamp is encountered. 3142 * We repeat when a time extend is encountered.
2423 * We can get multiple timestamps by nested interrupts or also 3143 * Since the time extend is always attached to a data event,
2424 * if filtering is on (discarding commits). Since discarding 3144 * we should never loop more than once.
2425 * commits can be frequent we can get a lot of timestamps. 3145 * (We never hit the following condition more than twice).
2426 * But we limit them by not adding timestamps if they begin
2427 * at the start of a page.
2428 */ 3146 */
2429 if (RB_WARN_ON(cpu_buffer, ++nr_loops > RB_TIMESTAMPS_PER_PAGE)) 3147 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
2430 return NULL; 3148 return NULL;
2431 3149
2432 if (rb_per_cpu_empty(cpu_buffer)) 3150 if (rb_per_cpu_empty(cpu_buffer))
2433 return NULL; 3151 return NULL;
2434 3152
3153 if (iter->head >= local_read(&iter->head_page->page->commit)) {
3154 rb_inc_iter(iter);
3155 goto again;
3156 }
3157
2435 event = rb_iter_head_event(iter); 3158 event = rb_iter_head_event(iter);
2436 3159
2437 switch (event->type_len) { 3160 switch (event->type_len) {
@@ -2477,7 +3200,7 @@ static inline int rb_ok_to_lock(void)
2477 * buffer too. A one time deal is all you get from reading 3200 * buffer too. A one time deal is all you get from reading
2478 * the ring buffer from an NMI. 3201 * the ring buffer from an NMI.
2479 */ 3202 */
2480 if (likely(!in_nmi() && !oops_in_progress)) 3203 if (likely(!in_nmi()))
2481 return 1; 3204 return 1;
2482 3205
2483 tracing_off_permanent(); 3206 tracing_off_permanent();
@@ -2489,12 +3212,14 @@ static inline int rb_ok_to_lock(void)
2489 * @buffer: The ring buffer to read 3212 * @buffer: The ring buffer to read
2490 * @cpu: The cpu to peak at 3213 * @cpu: The cpu to peak at
2491 * @ts: The timestamp counter of this event. 3214 * @ts: The timestamp counter of this event.
3215 * @lost_events: a variable to store if events were lost (may be NULL)
2492 * 3216 *
2493 * This will return the event that will be read next, but does 3217 * This will return the event that will be read next, but does
2494 * not consume the data. 3218 * not consume the data.
2495 */ 3219 */
2496struct ring_buffer_event * 3220struct ring_buffer_event *
2497ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 3221ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
3222 unsigned long *lost_events)
2498{ 3223{
2499 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3224 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2500 struct ring_buffer_event *event; 3225 struct ring_buffer_event *event;
@@ -2509,15 +3234,15 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
2509 local_irq_save(flags); 3234 local_irq_save(flags);
2510 if (dolock) 3235 if (dolock)
2511 spin_lock(&cpu_buffer->reader_lock); 3236 spin_lock(&cpu_buffer->reader_lock);
2512 event = rb_buffer_peek(buffer, cpu, ts); 3237 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
3238 if (event && event->type_len == RINGBUF_TYPE_PADDING)
3239 rb_advance_reader(cpu_buffer);
2513 if (dolock) 3240 if (dolock)
2514 spin_unlock(&cpu_buffer->reader_lock); 3241 spin_unlock(&cpu_buffer->reader_lock);
2515 local_irq_restore(flags); 3242 local_irq_restore(flags);
2516 3243
2517 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 3244 if (event && event->type_len == RINGBUF_TYPE_PADDING)
2518 cpu_relax();
2519 goto again; 3245 goto again;
2520 }
2521 3246
2522 return event; 3247 return event;
2523} 3248}
@@ -2542,10 +3267,8 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
2542 event = rb_iter_peek(iter, ts); 3267 event = rb_iter_peek(iter, ts);
2543 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3268 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2544 3269
2545 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 3270 if (event && event->type_len == RINGBUF_TYPE_PADDING)
2546 cpu_relax();
2547 goto again; 3271 goto again;
2548 }
2549 3272
2550 return event; 3273 return event;
2551} 3274}
@@ -2553,13 +3276,17 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
2553/** 3276/**
2554 * ring_buffer_consume - return an event and consume it 3277 * ring_buffer_consume - return an event and consume it
2555 * @buffer: The ring buffer to get the next event from 3278 * @buffer: The ring buffer to get the next event from
3279 * @cpu: the cpu to read the buffer from
3280 * @ts: a variable to store the timestamp (may be NULL)
3281 * @lost_events: a variable to store if events were lost (may be NULL)
2556 * 3282 *
2557 * Returns the next event in the ring buffer, and that event is consumed. 3283 * Returns the next event in the ring buffer, and that event is consumed.
2558 * Meaning, that sequential reads will keep returning a different event, 3284 * Meaning, that sequential reads will keep returning a different event,
2559 * and eventually empty the ring buffer if the producer is slower. 3285 * and eventually empty the ring buffer if the producer is slower.
2560 */ 3286 */
2561struct ring_buffer_event * 3287struct ring_buffer_event *
2562ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 3288ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
3289 unsigned long *lost_events)
2563{ 3290{
2564 struct ring_buffer_per_cpu *cpu_buffer; 3291 struct ring_buffer_per_cpu *cpu_buffer;
2565 struct ring_buffer_event *event = NULL; 3292 struct ring_buffer_event *event = NULL;
@@ -2580,13 +3307,12 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
2580 if (dolock) 3307 if (dolock)
2581 spin_lock(&cpu_buffer->reader_lock); 3308 spin_lock(&cpu_buffer->reader_lock);
2582 3309
2583 event = rb_buffer_peek(buffer, cpu, ts); 3310 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
2584 if (!event) 3311 if (event) {
2585 goto out_unlock; 3312 cpu_buffer->lost_events = 0;
2586 3313 rb_advance_reader(cpu_buffer);
2587 rb_advance_reader(cpu_buffer); 3314 }
2588 3315
2589 out_unlock:
2590 if (dolock) 3316 if (dolock)
2591 spin_unlock(&cpu_buffer->reader_lock); 3317 spin_unlock(&cpu_buffer->reader_lock);
2592 local_irq_restore(flags); 3318 local_irq_restore(flags);
@@ -2594,33 +3320,38 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
2594 out: 3320 out:
2595 preempt_enable(); 3321 preempt_enable();
2596 3322
2597 if (event && event->type_len == RINGBUF_TYPE_PADDING) { 3323 if (event && event->type_len == RINGBUF_TYPE_PADDING)
2598 cpu_relax();
2599 goto again; 3324 goto again;
2600 }
2601 3325
2602 return event; 3326 return event;
2603} 3327}
2604EXPORT_SYMBOL_GPL(ring_buffer_consume); 3328EXPORT_SYMBOL_GPL(ring_buffer_consume);
2605 3329
2606/** 3330/**
2607 * ring_buffer_read_start - start a non consuming read of the buffer 3331 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
2608 * @buffer: The ring buffer to read from 3332 * @buffer: The ring buffer to read from
2609 * @cpu: The cpu buffer to iterate over 3333 * @cpu: The cpu buffer to iterate over
2610 * 3334 *
2611 * This starts up an iteration through the buffer. It also disables 3335 * This performs the initial preparations necessary to iterate
2612 * the recording to the buffer until the reading is finished. 3336 * through the buffer. Memory is allocated, buffer recording
2613 * This prevents the reading from being corrupted. This is not 3337 * is disabled, and the iterator pointer is returned to the caller.
2614 * a consuming read, so a producer is not expected.
2615 * 3338 *
2616 * Must be paired with ring_buffer_finish. 3339 * Disabling buffer recordng prevents the reading from being
3340 * corrupted. This is not a consuming read, so a producer is not
3341 * expected.
3342 *
3343 * After a sequence of ring_buffer_read_prepare calls, the user is
3344 * expected to make at least one call to ring_buffer_prepare_sync.
3345 * Afterwards, ring_buffer_read_start is invoked to get things going
3346 * for real.
3347 *
3348 * This overall must be paired with ring_buffer_finish.
2617 */ 3349 */
2618struct ring_buffer_iter * 3350struct ring_buffer_iter *
2619ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 3351ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu)
2620{ 3352{
2621 struct ring_buffer_per_cpu *cpu_buffer; 3353 struct ring_buffer_per_cpu *cpu_buffer;
2622 struct ring_buffer_iter *iter; 3354 struct ring_buffer_iter *iter;
2623 unsigned long flags;
2624 3355
2625 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3356 if (!cpumask_test_cpu(cpu, buffer->cpumask))
2626 return NULL; 3357 return NULL;
@@ -2634,15 +3365,52 @@ ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
2634 iter->cpu_buffer = cpu_buffer; 3365 iter->cpu_buffer = cpu_buffer;
2635 3366
2636 atomic_inc(&cpu_buffer->record_disabled); 3367 atomic_inc(&cpu_buffer->record_disabled);
3368
3369 return iter;
3370}
3371EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
3372
3373/**
3374 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
3375 *
3376 * All previously invoked ring_buffer_read_prepare calls to prepare
3377 * iterators will be synchronized. Afterwards, read_buffer_read_start
3378 * calls on those iterators are allowed.
3379 */
3380void
3381ring_buffer_read_prepare_sync(void)
3382{
2637 synchronize_sched(); 3383 synchronize_sched();
3384}
3385EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
3386
3387/**
3388 * ring_buffer_read_start - start a non consuming read of the buffer
3389 * @iter: The iterator returned by ring_buffer_read_prepare
3390 *
3391 * This finalizes the startup of an iteration through the buffer.
3392 * The iterator comes from a call to ring_buffer_read_prepare and
3393 * an intervening ring_buffer_read_prepare_sync must have been
3394 * performed.
3395 *
3396 * Must be paired with ring_buffer_finish.
3397 */
3398void
3399ring_buffer_read_start(struct ring_buffer_iter *iter)
3400{
3401 struct ring_buffer_per_cpu *cpu_buffer;
3402 unsigned long flags;
3403
3404 if (!iter)
3405 return;
3406
3407 cpu_buffer = iter->cpu_buffer;
2638 3408
2639 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3409 spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2640 __raw_spin_lock(&cpu_buffer->lock); 3410 arch_spin_lock(&cpu_buffer->lock);
2641 rb_iter_reset(iter); 3411 rb_iter_reset(iter);
2642 __raw_spin_unlock(&cpu_buffer->lock); 3412 arch_spin_unlock(&cpu_buffer->lock);
2643 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3413 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2644
2645 return iter;
2646} 3414}
2647EXPORT_SYMBOL_GPL(ring_buffer_read_start); 3415EXPORT_SYMBOL_GPL(ring_buffer_read_start);
2648 3416
@@ -2677,21 +3445,19 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
2677 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3445 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2678 unsigned long flags; 3446 unsigned long flags;
2679 3447
2680 again:
2681 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3448 spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3449 again:
2682 event = rb_iter_peek(iter, ts); 3450 event = rb_iter_peek(iter, ts);
2683 if (!event) 3451 if (!event)
2684 goto out; 3452 goto out;
2685 3453
3454 if (event->type_len == RINGBUF_TYPE_PADDING)
3455 goto again;
3456
2686 rb_advance_iter(iter); 3457 rb_advance_iter(iter);
2687 out: 3458 out:
2688 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3459 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2689 3460
2690 if (event && event->type_len == RINGBUF_TYPE_PADDING) {
2691 cpu_relax();
2692 goto again;
2693 }
2694
2695 return event; 3461 return event;
2696} 3462}
2697EXPORT_SYMBOL_GPL(ring_buffer_read); 3463EXPORT_SYMBOL_GPL(ring_buffer_read);
@@ -2709,8 +3475,10 @@ EXPORT_SYMBOL_GPL(ring_buffer_size);
2709static void 3475static void
2710rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 3476rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2711{ 3477{
3478 rb_head_page_deactivate(cpu_buffer);
3479
2712 cpu_buffer->head_page 3480 cpu_buffer->head_page
2713 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 3481 = list_entry(cpu_buffer->pages, struct buffer_page, list);
2714 local_set(&cpu_buffer->head_page->write, 0); 3482 local_set(&cpu_buffer->head_page->write, 0);
2715 local_set(&cpu_buffer->head_page->entries, 0); 3483 local_set(&cpu_buffer->head_page->entries, 0);
2716 local_set(&cpu_buffer->head_page->page->commit, 0); 3484 local_set(&cpu_buffer->head_page->page->commit, 0);
@@ -2726,16 +3494,20 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2726 local_set(&cpu_buffer->reader_page->page->commit, 0); 3494 local_set(&cpu_buffer->reader_page->page->commit, 0);
2727 cpu_buffer->reader_page->read = 0; 3495 cpu_buffer->reader_page->read = 0;
2728 3496
2729 cpu_buffer->nmi_dropped = 0; 3497 local_set(&cpu_buffer->commit_overrun, 0);
2730 cpu_buffer->commit_overrun = 0; 3498 local_set(&cpu_buffer->overrun, 0);
2731 cpu_buffer->overrun = 0;
2732 cpu_buffer->read = 0;
2733 local_set(&cpu_buffer->entries, 0); 3499 local_set(&cpu_buffer->entries, 0);
2734 local_set(&cpu_buffer->committing, 0); 3500 local_set(&cpu_buffer->committing, 0);
2735 local_set(&cpu_buffer->commits, 0); 3501 local_set(&cpu_buffer->commits, 0);
3502 cpu_buffer->read = 0;
2736 3503
2737 cpu_buffer->write_stamp = 0; 3504 cpu_buffer->write_stamp = 0;
2738 cpu_buffer->read_stamp = 0; 3505 cpu_buffer->read_stamp = 0;
3506
3507 cpu_buffer->lost_events = 0;
3508 cpu_buffer->last_overrun = 0;
3509
3510 rb_head_page_activate(cpu_buffer);
2739} 3511}
2740 3512
2741/** 3513/**
@@ -2755,12 +3527,16 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
2755 3527
2756 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3528 spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2757 3529
2758 __raw_spin_lock(&cpu_buffer->lock); 3530 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
3531 goto out;
3532
3533 arch_spin_lock(&cpu_buffer->lock);
2759 3534
2760 rb_reset_cpu(cpu_buffer); 3535 rb_reset_cpu(cpu_buffer);
2761 3536
2762 __raw_spin_unlock(&cpu_buffer->lock); 3537 arch_spin_unlock(&cpu_buffer->lock);
2763 3538
3539 out:
2764 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3540 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2765 3541
2766 atomic_dec(&cpu_buffer->record_disabled); 3542 atomic_dec(&cpu_buffer->record_disabled);
@@ -2843,6 +3619,7 @@ int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
2843} 3619}
2844EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 3620EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
2845 3621
3622#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
2846/** 3623/**
2847 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 3624 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
2848 * @buffer_a: One buffer to swap with 3625 * @buffer_a: One buffer to swap with
@@ -2897,20 +3674,28 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
2897 atomic_inc(&cpu_buffer_a->record_disabled); 3674 atomic_inc(&cpu_buffer_a->record_disabled);
2898 atomic_inc(&cpu_buffer_b->record_disabled); 3675 atomic_inc(&cpu_buffer_b->record_disabled);
2899 3676
3677 ret = -EBUSY;
3678 if (local_read(&cpu_buffer_a->committing))
3679 goto out_dec;
3680 if (local_read(&cpu_buffer_b->committing))
3681 goto out_dec;
3682
2900 buffer_a->buffers[cpu] = cpu_buffer_b; 3683 buffer_a->buffers[cpu] = cpu_buffer_b;
2901 buffer_b->buffers[cpu] = cpu_buffer_a; 3684 buffer_b->buffers[cpu] = cpu_buffer_a;
2902 3685
2903 cpu_buffer_b->buffer = buffer_a; 3686 cpu_buffer_b->buffer = buffer_a;
2904 cpu_buffer_a->buffer = buffer_b; 3687 cpu_buffer_a->buffer = buffer_b;
2905 3688
3689 ret = 0;
3690
3691out_dec:
2906 atomic_dec(&cpu_buffer_a->record_disabled); 3692 atomic_dec(&cpu_buffer_a->record_disabled);
2907 atomic_dec(&cpu_buffer_b->record_disabled); 3693 atomic_dec(&cpu_buffer_b->record_disabled);
2908
2909 ret = 0;
2910out: 3694out:
2911 return ret; 3695 return ret;
2912} 3696}
2913EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 3697EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
3698#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
2914 3699
2915/** 3700/**
2916 * ring_buffer_alloc_read_page - allocate a page to read from buffer 3701 * ring_buffer_alloc_read_page - allocate a page to read from buffer
@@ -2997,6 +3782,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
2997 struct ring_buffer_event *event; 3782 struct ring_buffer_event *event;
2998 struct buffer_data_page *bpage; 3783 struct buffer_data_page *bpage;
2999 struct buffer_page *reader; 3784 struct buffer_page *reader;
3785 unsigned long missed_events;
3000 unsigned long flags; 3786 unsigned long flags;
3001 unsigned int commit; 3787 unsigned int commit;
3002 unsigned int read; 3788 unsigned int read;
@@ -3033,6 +3819,9 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
3033 read = reader->read; 3819 read = reader->read;
3034 commit = rb_page_commit(reader); 3820 commit = rb_page_commit(reader);
3035 3821
3822 /* Check if any events were dropped */
3823 missed_events = cpu_buffer->lost_events;
3824
3036 /* 3825 /*
3037 * If this page has been partially read or 3826 * If this page has been partially read or
3038 * if len is not big enough to read the rest of the page or 3827 * if len is not big enough to read the rest of the page or
@@ -3053,7 +3842,8 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
3053 if (len > (commit - read)) 3842 if (len > (commit - read))
3054 len = (commit - read); 3843 len = (commit - read);
3055 3844
3056 size = rb_event_length(event); 3845 /* Always keep the time extend and data together */
3846 size = rb_event_ts_length(event);
3057 3847
3058 if (len < size) 3848 if (len < size)
3059 goto out_unlock; 3849 goto out_unlock;
@@ -3063,6 +3853,13 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
3063 3853
3064 /* Need to copy one event at a time */ 3854 /* Need to copy one event at a time */
3065 do { 3855 do {
3856 /* We need the size of one event, because
3857 * rb_advance_reader only advances by one event,
3858 * whereas rb_event_ts_length may include the size of
3859 * one or two events.
3860 * We have already ensured there's enough space if this
3861 * is a time extend. */
3862 size = rb_event_length(event);
3066 memcpy(bpage->data + pos, rpage->data + rpos, size); 3863 memcpy(bpage->data + pos, rpage->data + rpos, size);
3067 3864
3068 len -= size; 3865 len -= size;
@@ -3071,9 +3868,13 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
3071 rpos = reader->read; 3868 rpos = reader->read;
3072 pos += size; 3869 pos += size;
3073 3870
3871 if (rpos >= commit)
3872 break;
3873
3074 event = rb_reader_event(cpu_buffer); 3874 event = rb_reader_event(cpu_buffer);
3075 size = rb_event_length(event); 3875 /* Always keep the time extend and data together */
3076 } while (len > size); 3876 size = rb_event_ts_length(event);
3877 } while (len >= size);
3077 3878
3078 /* update bpage */ 3879 /* update bpage */
3079 local_set(&bpage->commit, pos); 3880 local_set(&bpage->commit, pos);
@@ -3083,7 +3884,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
3083 read = 0; 3884 read = 0;
3084 } else { 3885 } else {
3085 /* update the entry counter */ 3886 /* update the entry counter */
3086 cpu_buffer->read += local_read(&reader->entries); 3887 cpu_buffer->read += rb_page_entries(reader);
3087 3888
3088 /* swap the pages */ 3889 /* swap the pages */
3089 rb_init_page(bpage); 3890 rb_init_page(bpage);
@@ -3093,9 +3894,42 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
3093 local_set(&reader->entries, 0); 3894 local_set(&reader->entries, 0);
3094 reader->read = 0; 3895 reader->read = 0;
3095 *data_page = bpage; 3896 *data_page = bpage;
3897
3898 /*
3899 * Use the real_end for the data size,
3900 * This gives us a chance to store the lost events
3901 * on the page.
3902 */
3903 if (reader->real_end)
3904 local_set(&bpage->commit, reader->real_end);
3096 } 3905 }
3097 ret = read; 3906 ret = read;
3098 3907
3908 cpu_buffer->lost_events = 0;
3909
3910 commit = local_read(&bpage->commit);
3911 /*
3912 * Set a flag in the commit field if we lost events
3913 */
3914 if (missed_events) {
3915 /* If there is room at the end of the page to save the
3916 * missed events, then record it there.
3917 */
3918 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
3919 memcpy(&bpage->data[commit], &missed_events,
3920 sizeof(missed_events));
3921 local_add(RB_MISSED_STORED, &bpage->commit);
3922 commit += sizeof(missed_events);
3923 }
3924 local_add(RB_MISSED_EVENTS, &bpage->commit);
3925 }
3926
3927 /*
3928 * This page may be off to user land. Zero it out here.
3929 */
3930 if (commit < BUF_PAGE_SIZE)
3931 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
3932
3099 out_unlock: 3933 out_unlock:
3100 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3934 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3101 3935
@@ -3104,6 +3938,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
3104} 3938}
3105EXPORT_SYMBOL_GPL(ring_buffer_read_page); 3939EXPORT_SYMBOL_GPL(ring_buffer_read_page);
3106 3940
3941#ifdef CONFIG_TRACING
3107static ssize_t 3942static ssize_t
3108rb_simple_read(struct file *filp, char __user *ubuf, 3943rb_simple_read(struct file *filp, char __user *ubuf,
3109 size_t cnt, loff_t *ppos) 3944 size_t cnt, loff_t *ppos)
@@ -3155,6 +3990,7 @@ static const struct file_operations rb_simple_fops = {
3155 .open = tracing_open_generic, 3990 .open = tracing_open_generic,
3156 .read = rb_simple_read, 3991 .read = rb_simple_read,
3157 .write = rb_simple_write, 3992 .write = rb_simple_write,
3993 .llseek = default_llseek,
3158}; 3994};
3159 3995
3160 3996
@@ -3171,6 +4007,7 @@ static __init int rb_init_debugfs(void)
3171} 4007}
3172 4008
3173fs_initcall(rb_init_debugfs); 4009fs_initcall(rb_init_debugfs);
4010#endif
3174 4011
3175#ifdef CONFIG_HOTPLUG_CPU 4012#ifdef CONFIG_HOTPLUG_CPU
3176static int rb_cpu_notify(struct notifier_block *self, 4013static int rb_cpu_notify(struct notifier_block *self,