aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/trace/ring_buffer.c
diff options
context:
space:
mode:
authorSteven Rostedt <rostedt@goodmis.org>2008-09-29 23:02:38 -0400
committerIngo Molnar <mingo@elte.hu>2008-10-14 04:38:54 -0400
commit7a8e76a3829f1067b70f715771ff88baf2fbf3c3 (patch)
tree71126d4ad6f89abd00cac688318aff14323a96bb /kernel/trace/ring_buffer.c
parent5aa60c6073456812251caf9177cb921b2de68f77 (diff)
tracing: unified trace buffer
This is a unified tracing buffer that implements a ring buffer that hopefully everyone will eventually be able to use. The events recorded into the buffer have the following structure: struct ring_buffer_event { u32 type:2, len:3, time_delta:27; u32 array[]; }; The minimum size of an event is 8 bytes. All events are 4 byte aligned inside the buffer. There are 4 types (all internal use for the ring buffer, only the data type is exported to the interface users). RINGBUF_TYPE_PADDING: this type is used to note extra space at the end of a buffer page. RINGBUF_TYPE_TIME_EXTENT: This type is used when the time between events is greater than the 27 bit delta can hold. We add another 32 bits, and record that in its own event (8 byte size). RINGBUF_TYPE_TIME_STAMP: (Not implemented yet). This will hold data to help keep the buffer timestamps in sync. RINGBUF_TYPE_DATA: The event actually holds user data. The "len" field is only three bits. Since the data must be 4 byte aligned, this field is shifted left by 2, giving a max length of 28 bytes. If the data load is greater than 28 bytes, the first array field holds the full length of the data load and the len field is set to zero. Example, data size of 7 bytes: type = RINGBUF_TYPE_DATA len = 2 time_delta: <time-stamp> - <prev_event-time-stamp> array[0..1]: <7 bytes of data> <1 byte empty> This event is saved in 12 bytes of the buffer. An event with 82 bytes of data: type = RINGBUF_TYPE_DATA len = 0 time_delta: <time-stamp> - <prev_event-time-stamp> array[0]: 84 (Note the alignment) array[1..14]: <82 bytes of data> <2 bytes empty> The above event is saved in 92 bytes (if my math is correct). 82 bytes of data, 2 bytes empty, 4 byte header, 4 byte length. Do not reference the above event struct directly. Use the following functions to gain access to the event table, since the ring_buffer_event structure may change in the future. ring_buffer_event_length(event): get the length of the event. This is the size of the memory used to record this event, and not the size of the data pay load. ring_buffer_time_delta(event): get the time delta of the event This returns the delta time stamp since the last event. Note: Even though this is in the header, there should be no reason to access this directly, accept for debugging. ring_buffer_event_data(event): get the data from the event This is the function to use to get the actual data from the event. Note, it is only a pointer to the data inside the buffer. This data must be copied to another location otherwise you risk it being written over in the buffer. ring_buffer_lock: A way to lock the entire buffer. ring_buffer_unlock: unlock the buffer. ring_buffer_alloc: create a new ring buffer. Can choose between overwrite or consumer/producer mode. Overwrite will overwrite old data, where as consumer producer will throw away new data if the consumer catches up with the producer. The consumer/producer is the default. ring_buffer_free: free the ring buffer. ring_buffer_resize: resize the buffer. Changes the size of each cpu buffer. Note, it is up to the caller to provide that the buffer is not being used while this is happening. This requirement may go away but do not count on it. ring_buffer_lock_reserve: locks the ring buffer and allocates an entry on the buffer to write to. ring_buffer_unlock_commit: unlocks the ring buffer and commits it to the buffer. ring_buffer_write: writes some data into the ring buffer. ring_buffer_peek: Look at a next item in the cpu buffer. ring_buffer_consume: get the next item in the cpu buffer and consume it. That is, this function increments the head pointer. ring_buffer_read_start: Start an iterator of a cpu buffer. For now, this disables the cpu buffer, until you issue a finish. This is just because we do not want the iterator to be overwritten. This restriction may change in the future. But note, this is used for static reading of a buffer which is usually done "after" a trace. Live readings would want to use the ring_buffer_consume above, which will not disable the ring buffer. ring_buffer_read_finish: Finishes the read iterator and reenables the ring buffer. ring_buffer_iter_peek: Look at the next item in the cpu iterator. ring_buffer_read: Read the iterator and increment it. ring_buffer_iter_reset: Reset the iterator to point to the beginning of the cpu buffer. ring_buffer_iter_empty: Returns true if the iterator is at the end of the cpu buffer. ring_buffer_size: returns the size in bytes of each cpu buffer. Note, the real size is this times the number of CPUs. ring_buffer_reset_cpu: Sets the cpu buffer to empty ring_buffer_reset: sets all cpu buffers to empty ring_buffer_swap_cpu: swaps a cpu buffer from one buffer with a cpu buffer of another buffer. This is handy when you want to take a snap shot of a running trace on just one cpu. Having a backup buffer, to swap with facilitates this. Ftrace max latencies use this. ring_buffer_empty: Returns true if the ring buffer is empty. ring_buffer_empty_cpu: Returns true if the cpu buffer is empty. ring_buffer_record_disable: disable all cpu buffers (read only) ring_buffer_record_disable_cpu: disable a single cpu buffer (read only) ring_buffer_record_enable: enable all cpu buffers. ring_buffer_record_enabl_cpu: enable a single cpu buffer. ring_buffer_entries: The number of entries in a ring buffer. ring_buffer_overruns: The number of entries removed due to writing wrap. ring_buffer_time_stamp: Get the time stamp used by the ring buffer ring_buffer_normalize_time_stamp: normalize the ring buffer time stamp into nanosecs. I still need to implement the GTOD feature. But we need support from the cpu frequency infrastructure. But this can be done at a later time without affecting the ring buffer interface. Signed-off-by: Steven Rostedt <srostedt@redhat.com> Signed-off-by: Ingo Molnar <mingo@elte.hu>
Diffstat (limited to 'kernel/trace/ring_buffer.c')
-rw-r--r--kernel/trace/ring_buffer.c1672
1 files changed, 1672 insertions, 0 deletions
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
new file mode 100644
index 000000000000..830a2930dd91
--- /dev/null
+++ b/kernel/trace/ring_buffer.c
@@ -0,0 +1,1672 @@
1/*
2 * Generic ring buffer
3 *
4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5 */
6#include <linux/ring_buffer.h>
7#include <linux/spinlock.h>
8#include <linux/debugfs.h>
9#include <linux/uaccess.h>
10#include <linux/module.h>
11#include <linux/percpu.h>
12#include <linux/mutex.h>
13#include <linux/sched.h> /* used for sched_clock() (for now) */
14#include <linux/init.h>
15#include <linux/hash.h>
16#include <linux/list.h>
17#include <linux/fs.h>
18
19/* Up this if you want to test the TIME_EXTENTS and normalization */
20#define DEBUG_SHIFT 0
21
22/* FIXME!!! */
23u64 ring_buffer_time_stamp(int cpu)
24{
25 /* shift to debug/test normalization and TIME_EXTENTS */
26 return sched_clock() << DEBUG_SHIFT;
27}
28
29void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
30{
31 /* Just stupid testing the normalize function and deltas */
32 *ts >>= DEBUG_SHIFT;
33}
34
35#define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36#define RB_ALIGNMENT_SHIFT 2
37#define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
38#define RB_MAX_SMALL_DATA 28
39
40enum {
41 RB_LEN_TIME_EXTEND = 8,
42 RB_LEN_TIME_STAMP = 16,
43};
44
45/* inline for ring buffer fast paths */
46static inline unsigned
47rb_event_length(struct ring_buffer_event *event)
48{
49 unsigned length;
50
51 switch (event->type) {
52 case RINGBUF_TYPE_PADDING:
53 /* undefined */
54 return -1;
55
56 case RINGBUF_TYPE_TIME_EXTEND:
57 return RB_LEN_TIME_EXTEND;
58
59 case RINGBUF_TYPE_TIME_STAMP:
60 return RB_LEN_TIME_STAMP;
61
62 case RINGBUF_TYPE_DATA:
63 if (event->len)
64 length = event->len << RB_ALIGNMENT_SHIFT;
65 else
66 length = event->array[0];
67 return length + RB_EVNT_HDR_SIZE;
68 default:
69 BUG();
70 }
71 /* not hit */
72 return 0;
73}
74
75/**
76 * ring_buffer_event_length - return the length of the event
77 * @event: the event to get the length of
78 */
79unsigned ring_buffer_event_length(struct ring_buffer_event *event)
80{
81 return rb_event_length(event);
82}
83
84/* inline for ring buffer fast paths */
85static inline void *
86rb_event_data(struct ring_buffer_event *event)
87{
88 BUG_ON(event->type != RINGBUF_TYPE_DATA);
89 /* If length is in len field, then array[0] has the data */
90 if (event->len)
91 return (void *)&event->array[0];
92 /* Otherwise length is in array[0] and array[1] has the data */
93 return (void *)&event->array[1];
94}
95
96/**
97 * ring_buffer_event_data - return the data of the event
98 * @event: the event to get the data from
99 */
100void *ring_buffer_event_data(struct ring_buffer_event *event)
101{
102 return rb_event_data(event);
103}
104
105#define for_each_buffer_cpu(buffer, cpu) \
106 for_each_cpu_mask(cpu, buffer->cpumask)
107
108#define TS_SHIFT 27
109#define TS_MASK ((1ULL << TS_SHIFT) - 1)
110#define TS_DELTA_TEST (~TS_MASK)
111
112/*
113 * This hack stolen from mm/slob.c.
114 * We can store per page timing information in the page frame of the page.
115 * Thanks to Peter Zijlstra for suggesting this idea.
116 */
117struct buffer_page {
118 union {
119 struct {
120 unsigned long flags; /* mandatory */
121 atomic_t _count; /* mandatory */
122 u64 time_stamp; /* page time stamp */
123 unsigned size; /* size of page data */
124 struct list_head list; /* list of free pages */
125 };
126 struct page page;
127 };
128};
129
130/*
131 * We need to fit the time_stamp delta into 27 bits.
132 */
133static inline int test_time_stamp(u64 delta)
134{
135 if (delta & TS_DELTA_TEST)
136 return 1;
137 return 0;
138}
139
140#define BUF_PAGE_SIZE PAGE_SIZE
141
142/*
143 * head_page == tail_page && head == tail then buffer is empty.
144 */
145struct ring_buffer_per_cpu {
146 int cpu;
147 struct ring_buffer *buffer;
148 spinlock_t lock;
149 struct lock_class_key lock_key;
150 struct list_head pages;
151 unsigned long head; /* read from head */
152 unsigned long tail; /* write to tail */
153 struct buffer_page *head_page;
154 struct buffer_page *tail_page;
155 unsigned long overrun;
156 unsigned long entries;
157 u64 write_stamp;
158 u64 read_stamp;
159 atomic_t record_disabled;
160};
161
162struct ring_buffer {
163 unsigned long size;
164 unsigned pages;
165 unsigned flags;
166 int cpus;
167 cpumask_t cpumask;
168 atomic_t record_disabled;
169
170 struct mutex mutex;
171
172 struct ring_buffer_per_cpu **buffers;
173};
174
175struct ring_buffer_iter {
176 struct ring_buffer_per_cpu *cpu_buffer;
177 unsigned long head;
178 struct buffer_page *head_page;
179 u64 read_stamp;
180};
181
182#define RB_WARN_ON(buffer, cond) \
183 if (unlikely(cond)) { \
184 atomic_inc(&buffer->record_disabled); \
185 WARN_ON(1); \
186 return -1; \
187 }
188
189/**
190 * check_pages - integrity check of buffer pages
191 * @cpu_buffer: CPU buffer with pages to test
192 *
193 * As a safty measure we check to make sure the data pages have not
194 * been corrupted.
195 */
196static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
197{
198 struct list_head *head = &cpu_buffer->pages;
199 struct buffer_page *page, *tmp;
200
201 RB_WARN_ON(cpu_buffer, head->next->prev != head);
202 RB_WARN_ON(cpu_buffer, head->prev->next != head);
203
204 list_for_each_entry_safe(page, tmp, head, list) {
205 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
206 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
207 }
208
209 return 0;
210}
211
212static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
213{
214 return cpu_buffer->head_page->size;
215}
216
217static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
218 unsigned nr_pages)
219{
220 struct list_head *head = &cpu_buffer->pages;
221 struct buffer_page *page, *tmp;
222 unsigned long addr;
223 LIST_HEAD(pages);
224 unsigned i;
225
226 for (i = 0; i < nr_pages; i++) {
227 addr = __get_free_page(GFP_KERNEL);
228 if (!addr)
229 goto free_pages;
230 page = (struct buffer_page *)virt_to_page(addr);
231 list_add(&page->list, &pages);
232 }
233
234 list_splice(&pages, head);
235
236 rb_check_pages(cpu_buffer);
237
238 return 0;
239
240 free_pages:
241 list_for_each_entry_safe(page, tmp, &pages, list) {
242 list_del_init(&page->list);
243 __free_page(&page->page);
244 }
245 return -ENOMEM;
246}
247
248static struct ring_buffer_per_cpu *
249rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
250{
251 struct ring_buffer_per_cpu *cpu_buffer;
252 int ret;
253
254 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
255 GFP_KERNEL, cpu_to_node(cpu));
256 if (!cpu_buffer)
257 return NULL;
258
259 cpu_buffer->cpu = cpu;
260 cpu_buffer->buffer = buffer;
261 spin_lock_init(&cpu_buffer->lock);
262 INIT_LIST_HEAD(&cpu_buffer->pages);
263
264 ret = rb_allocate_pages(cpu_buffer, buffer->pages);
265 if (ret < 0)
266 goto fail_free_buffer;
267
268 cpu_buffer->head_page
269 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
270 cpu_buffer->tail_page
271 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
272
273 return cpu_buffer;
274
275 fail_free_buffer:
276 kfree(cpu_buffer);
277 return NULL;
278}
279
280static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
281{
282 struct list_head *head = &cpu_buffer->pages;
283 struct buffer_page *page, *tmp;
284
285 list_for_each_entry_safe(page, tmp, head, list) {
286 list_del_init(&page->list);
287 __free_page(&page->page);
288 }
289 kfree(cpu_buffer);
290}
291
292/**
293 * ring_buffer_alloc - allocate a new ring_buffer
294 * @size: the size in bytes that is needed.
295 * @flags: attributes to set for the ring buffer.
296 *
297 * Currently the only flag that is available is the RB_FL_OVERWRITE
298 * flag. This flag means that the buffer will overwrite old data
299 * when the buffer wraps. If this flag is not set, the buffer will
300 * drop data when the tail hits the head.
301 */
302struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
303{
304 struct ring_buffer *buffer;
305 int bsize;
306 int cpu;
307
308 /* keep it in its own cache line */
309 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
310 GFP_KERNEL);
311 if (!buffer)
312 return NULL;
313
314 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
315 buffer->flags = flags;
316
317 /* need at least two pages */
318 if (buffer->pages == 1)
319 buffer->pages++;
320
321 buffer->cpumask = cpu_possible_map;
322 buffer->cpus = nr_cpu_ids;
323
324 bsize = sizeof(void *) * nr_cpu_ids;
325 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
326 GFP_KERNEL);
327 if (!buffer->buffers)
328 goto fail_free_buffer;
329
330 for_each_buffer_cpu(buffer, cpu) {
331 buffer->buffers[cpu] =
332 rb_allocate_cpu_buffer(buffer, cpu);
333 if (!buffer->buffers[cpu])
334 goto fail_free_buffers;
335 }
336
337 mutex_init(&buffer->mutex);
338
339 return buffer;
340
341 fail_free_buffers:
342 for_each_buffer_cpu(buffer, cpu) {
343 if (buffer->buffers[cpu])
344 rb_free_cpu_buffer(buffer->buffers[cpu]);
345 }
346 kfree(buffer->buffers);
347
348 fail_free_buffer:
349 kfree(buffer);
350 return NULL;
351}
352
353/**
354 * ring_buffer_free - free a ring buffer.
355 * @buffer: the buffer to free.
356 */
357void
358ring_buffer_free(struct ring_buffer *buffer)
359{
360 int cpu;
361
362 for_each_buffer_cpu(buffer, cpu)
363 rb_free_cpu_buffer(buffer->buffers[cpu]);
364
365 kfree(buffer);
366}
367
368static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
369
370static void
371rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
372{
373 struct buffer_page *page;
374 struct list_head *p;
375 unsigned i;
376
377 atomic_inc(&cpu_buffer->record_disabled);
378 synchronize_sched();
379
380 for (i = 0; i < nr_pages; i++) {
381 BUG_ON(list_empty(&cpu_buffer->pages));
382 p = cpu_buffer->pages.next;
383 page = list_entry(p, struct buffer_page, list);
384 list_del_init(&page->list);
385 __free_page(&page->page);
386 }
387 BUG_ON(list_empty(&cpu_buffer->pages));
388
389 rb_reset_cpu(cpu_buffer);
390
391 rb_check_pages(cpu_buffer);
392
393 atomic_dec(&cpu_buffer->record_disabled);
394
395}
396
397static void
398rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
399 struct list_head *pages, unsigned nr_pages)
400{
401 struct buffer_page *page;
402 struct list_head *p;
403 unsigned i;
404
405 atomic_inc(&cpu_buffer->record_disabled);
406 synchronize_sched();
407
408 for (i = 0; i < nr_pages; i++) {
409 BUG_ON(list_empty(pages));
410 p = pages->next;
411 page = list_entry(p, struct buffer_page, list);
412 list_del_init(&page->list);
413 list_add_tail(&page->list, &cpu_buffer->pages);
414 }
415 rb_reset_cpu(cpu_buffer);
416
417 rb_check_pages(cpu_buffer);
418
419 atomic_dec(&cpu_buffer->record_disabled);
420}
421
422/**
423 * ring_buffer_resize - resize the ring buffer
424 * @buffer: the buffer to resize.
425 * @size: the new size.
426 *
427 * The tracer is responsible for making sure that the buffer is
428 * not being used while changing the size.
429 * Note: We may be able to change the above requirement by using
430 * RCU synchronizations.
431 *
432 * Minimum size is 2 * BUF_PAGE_SIZE.
433 *
434 * Returns -1 on failure.
435 */
436int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
437{
438 struct ring_buffer_per_cpu *cpu_buffer;
439 unsigned nr_pages, rm_pages, new_pages;
440 struct buffer_page *page, *tmp;
441 unsigned long buffer_size;
442 unsigned long addr;
443 LIST_HEAD(pages);
444 int i, cpu;
445
446 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
447 size *= BUF_PAGE_SIZE;
448 buffer_size = buffer->pages * BUF_PAGE_SIZE;
449
450 /* we need a minimum of two pages */
451 if (size < BUF_PAGE_SIZE * 2)
452 size = BUF_PAGE_SIZE * 2;
453
454 if (size == buffer_size)
455 return size;
456
457 mutex_lock(&buffer->mutex);
458
459 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
460
461 if (size < buffer_size) {
462
463 /* easy case, just free pages */
464 BUG_ON(nr_pages >= buffer->pages);
465
466 rm_pages = buffer->pages - nr_pages;
467
468 for_each_buffer_cpu(buffer, cpu) {
469 cpu_buffer = buffer->buffers[cpu];
470 rb_remove_pages(cpu_buffer, rm_pages);
471 }
472 goto out;
473 }
474
475 /*
476 * This is a bit more difficult. We only want to add pages
477 * when we can allocate enough for all CPUs. We do this
478 * by allocating all the pages and storing them on a local
479 * link list. If we succeed in our allocation, then we
480 * add these pages to the cpu_buffers. Otherwise we just free
481 * them all and return -ENOMEM;
482 */
483 BUG_ON(nr_pages <= buffer->pages);
484 new_pages = nr_pages - buffer->pages;
485
486 for_each_buffer_cpu(buffer, cpu) {
487 for (i = 0; i < new_pages; i++) {
488 addr = __get_free_page(GFP_KERNEL);
489 if (!addr)
490 goto free_pages;
491 page = (struct buffer_page *)virt_to_page(addr);
492 list_add(&page->list, &pages);
493 }
494 }
495
496 for_each_buffer_cpu(buffer, cpu) {
497 cpu_buffer = buffer->buffers[cpu];
498 rb_insert_pages(cpu_buffer, &pages, new_pages);
499 }
500
501 BUG_ON(!list_empty(&pages));
502
503 out:
504 buffer->pages = nr_pages;
505 mutex_unlock(&buffer->mutex);
506
507 return size;
508
509 free_pages:
510 list_for_each_entry_safe(page, tmp, &pages, list) {
511 list_del_init(&page->list);
512 __free_page(&page->page);
513 }
514 return -ENOMEM;
515}
516
517static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
518{
519 return cpu_buffer->head_page == cpu_buffer->tail_page &&
520 cpu_buffer->head == cpu_buffer->tail;
521}
522
523static inline int rb_null_event(struct ring_buffer_event *event)
524{
525 return event->type == RINGBUF_TYPE_PADDING;
526}
527
528static inline void *rb_page_index(struct buffer_page *page, unsigned index)
529{
530 void *addr = page_address(&page->page);
531
532 return addr + index;
533}
534
535static inline struct ring_buffer_event *
536rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
537{
538 return rb_page_index(cpu_buffer->head_page,
539 cpu_buffer->head);
540}
541
542static inline struct ring_buffer_event *
543rb_iter_head_event(struct ring_buffer_iter *iter)
544{
545 return rb_page_index(iter->head_page,
546 iter->head);
547}
548
549/*
550 * When the tail hits the head and the buffer is in overwrite mode,
551 * the head jumps to the next page and all content on the previous
552 * page is discarded. But before doing so, we update the overrun
553 * variable of the buffer.
554 */
555static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
556{
557 struct ring_buffer_event *event;
558 unsigned long head;
559
560 for (head = 0; head < rb_head_size(cpu_buffer);
561 head += rb_event_length(event)) {
562
563 event = rb_page_index(cpu_buffer->head_page, head);
564 BUG_ON(rb_null_event(event));
565 /* Only count data entries */
566 if (event->type != RINGBUF_TYPE_DATA)
567 continue;
568 cpu_buffer->overrun++;
569 cpu_buffer->entries--;
570 }
571}
572
573static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
574 struct buffer_page **page)
575{
576 struct list_head *p = (*page)->list.next;
577
578 if (p == &cpu_buffer->pages)
579 p = p->next;
580
581 *page = list_entry(p, struct buffer_page, list);
582}
583
584static inline void
585rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
586{
587 cpu_buffer->tail_page->time_stamp = *ts;
588 cpu_buffer->write_stamp = *ts;
589}
590
591static void rb_reset_read_page(struct ring_buffer_per_cpu *cpu_buffer)
592{
593 cpu_buffer->read_stamp = cpu_buffer->head_page->time_stamp;
594 cpu_buffer->head = 0;
595}
596
597static void
598rb_reset_iter_read_page(struct ring_buffer_iter *iter)
599{
600 iter->read_stamp = iter->head_page->time_stamp;
601 iter->head = 0;
602}
603
604/**
605 * ring_buffer_update_event - update event type and data
606 * @event: the even to update
607 * @type: the type of event
608 * @length: the size of the event field in the ring buffer
609 *
610 * Update the type and data fields of the event. The length
611 * is the actual size that is written to the ring buffer,
612 * and with this, we can determine what to place into the
613 * data field.
614 */
615static inline void
616rb_update_event(struct ring_buffer_event *event,
617 unsigned type, unsigned length)
618{
619 event->type = type;
620
621 switch (type) {
622
623 case RINGBUF_TYPE_PADDING:
624 break;
625
626 case RINGBUF_TYPE_TIME_EXTEND:
627 event->len =
628 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
629 >> RB_ALIGNMENT_SHIFT;
630 break;
631
632 case RINGBUF_TYPE_TIME_STAMP:
633 event->len =
634 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
635 >> RB_ALIGNMENT_SHIFT;
636 break;
637
638 case RINGBUF_TYPE_DATA:
639 length -= RB_EVNT_HDR_SIZE;
640 if (length > RB_MAX_SMALL_DATA) {
641 event->len = 0;
642 event->array[0] = length;
643 } else
644 event->len =
645 (length + (RB_ALIGNMENT-1))
646 >> RB_ALIGNMENT_SHIFT;
647 break;
648 default:
649 BUG();
650 }
651}
652
653static inline unsigned rb_calculate_event_length(unsigned length)
654{
655 struct ring_buffer_event event; /* Used only for sizeof array */
656
657 /* zero length can cause confusions */
658 if (!length)
659 length = 1;
660
661 if (length > RB_MAX_SMALL_DATA)
662 length += sizeof(event.array[0]);
663
664 length += RB_EVNT_HDR_SIZE;
665 length = ALIGN(length, RB_ALIGNMENT);
666
667 return length;
668}
669
670static struct ring_buffer_event *
671__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
672 unsigned type, unsigned long length, u64 *ts)
673{
674 struct buffer_page *head_page, *tail_page;
675 unsigned long tail;
676 struct ring_buffer *buffer = cpu_buffer->buffer;
677 struct ring_buffer_event *event;
678
679 tail_page = cpu_buffer->tail_page;
680 head_page = cpu_buffer->head_page;
681 tail = cpu_buffer->tail;
682
683 if (tail + length > BUF_PAGE_SIZE) {
684 struct buffer_page *next_page = tail_page;
685
686 rb_inc_page(cpu_buffer, &next_page);
687
688 if (next_page == head_page) {
689 if (!(buffer->flags & RB_FL_OVERWRITE))
690 return NULL;
691
692 /* count overflows */
693 rb_update_overflow(cpu_buffer);
694
695 rb_inc_page(cpu_buffer, &head_page);
696 cpu_buffer->head_page = head_page;
697 rb_reset_read_page(cpu_buffer);
698 }
699
700 if (tail != BUF_PAGE_SIZE) {
701 event = rb_page_index(tail_page, tail);
702 /* page padding */
703 event->type = RINGBUF_TYPE_PADDING;
704 }
705
706 tail_page->size = tail;
707 tail_page = next_page;
708 tail_page->size = 0;
709 tail = 0;
710 cpu_buffer->tail_page = tail_page;
711 cpu_buffer->tail = tail;
712 rb_add_stamp(cpu_buffer, ts);
713 }
714
715 BUG_ON(tail + length > BUF_PAGE_SIZE);
716
717 event = rb_page_index(tail_page, tail);
718 rb_update_event(event, type, length);
719
720 return event;
721}
722
723static int
724rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
725 u64 *ts, u64 *delta)
726{
727 struct ring_buffer_event *event;
728 static int once;
729
730 if (unlikely(*delta > (1ULL << 59) && !once++)) {
731 printk(KERN_WARNING "Delta way too big! %llu"
732 " ts=%llu write stamp = %llu\n",
733 *delta, *ts, cpu_buffer->write_stamp);
734 WARN_ON(1);
735 }
736
737 /*
738 * The delta is too big, we to add a
739 * new timestamp.
740 */
741 event = __rb_reserve_next(cpu_buffer,
742 RINGBUF_TYPE_TIME_EXTEND,
743 RB_LEN_TIME_EXTEND,
744 ts);
745 if (!event)
746 return -1;
747
748 /* check to see if we went to the next page */
749 if (cpu_buffer->tail) {
750 /* Still on same page, update timestamp */
751 event->time_delta = *delta & TS_MASK;
752 event->array[0] = *delta >> TS_SHIFT;
753 /* commit the time event */
754 cpu_buffer->tail +=
755 rb_event_length(event);
756 cpu_buffer->write_stamp = *ts;
757 *delta = 0;
758 }
759
760 return 0;
761}
762
763static struct ring_buffer_event *
764rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
765 unsigned type, unsigned long length)
766{
767 struct ring_buffer_event *event;
768 u64 ts, delta;
769
770 ts = ring_buffer_time_stamp(cpu_buffer->cpu);
771
772 if (cpu_buffer->tail) {
773 delta = ts - cpu_buffer->write_stamp;
774
775 if (test_time_stamp(delta)) {
776 int ret;
777
778 ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
779 if (ret < 0)
780 return NULL;
781 }
782 } else {
783 rb_add_stamp(cpu_buffer, &ts);
784 delta = 0;
785 }
786
787 event = __rb_reserve_next(cpu_buffer, type, length, &ts);
788 if (!event)
789 return NULL;
790
791 /* If the reserve went to the next page, our delta is zero */
792 if (!cpu_buffer->tail)
793 delta = 0;
794
795 event->time_delta = delta;
796
797 return event;
798}
799
800/**
801 * ring_buffer_lock_reserve - reserve a part of the buffer
802 * @buffer: the ring buffer to reserve from
803 * @length: the length of the data to reserve (excluding event header)
804 * @flags: a pointer to save the interrupt flags
805 *
806 * Returns a reseverd event on the ring buffer to copy directly to.
807 * The user of this interface will need to get the body to write into
808 * and can use the ring_buffer_event_data() interface.
809 *
810 * The length is the length of the data needed, not the event length
811 * which also includes the event header.
812 *
813 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
814 * If NULL is returned, then nothing has been allocated or locked.
815 */
816struct ring_buffer_event *
817ring_buffer_lock_reserve(struct ring_buffer *buffer,
818 unsigned long length,
819 unsigned long *flags)
820{
821 struct ring_buffer_per_cpu *cpu_buffer;
822 struct ring_buffer_event *event;
823 int cpu;
824
825 if (atomic_read(&buffer->record_disabled))
826 return NULL;
827
828 raw_local_irq_save(*flags);
829 cpu = raw_smp_processor_id();
830
831 if (!cpu_isset(cpu, buffer->cpumask))
832 goto out_irq;
833
834 cpu_buffer = buffer->buffers[cpu];
835 spin_lock(&cpu_buffer->lock);
836
837 if (atomic_read(&cpu_buffer->record_disabled))
838 goto no_record;
839
840 length = rb_calculate_event_length(length);
841 if (length > BUF_PAGE_SIZE)
842 return NULL;
843
844 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
845 if (!event)
846 goto no_record;
847
848 return event;
849
850 no_record:
851 spin_unlock(&cpu_buffer->lock);
852 out_irq:
853 local_irq_restore(*flags);
854 return NULL;
855}
856
857static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
858 struct ring_buffer_event *event)
859{
860 cpu_buffer->tail += rb_event_length(event);
861 cpu_buffer->tail_page->size = cpu_buffer->tail;
862 cpu_buffer->write_stamp += event->time_delta;
863 cpu_buffer->entries++;
864}
865
866/**
867 * ring_buffer_unlock_commit - commit a reserved
868 * @buffer: The buffer to commit to
869 * @event: The event pointer to commit.
870 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
871 *
872 * This commits the data to the ring buffer, and releases any locks held.
873 *
874 * Must be paired with ring_buffer_lock_reserve.
875 */
876int ring_buffer_unlock_commit(struct ring_buffer *buffer,
877 struct ring_buffer_event *event,
878 unsigned long flags)
879{
880 struct ring_buffer_per_cpu *cpu_buffer;
881 int cpu = raw_smp_processor_id();
882
883 cpu_buffer = buffer->buffers[cpu];
884
885 assert_spin_locked(&cpu_buffer->lock);
886
887 rb_commit(cpu_buffer, event);
888
889 spin_unlock(&cpu_buffer->lock);
890 raw_local_irq_restore(flags);
891
892 return 0;
893}
894
895/**
896 * ring_buffer_write - write data to the buffer without reserving
897 * @buffer: The ring buffer to write to.
898 * @length: The length of the data being written (excluding the event header)
899 * @data: The data to write to the buffer.
900 *
901 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
902 * one function. If you already have the data to write to the buffer, it
903 * may be easier to simply call this function.
904 *
905 * Note, like ring_buffer_lock_reserve, the length is the length of the data
906 * and not the length of the event which would hold the header.
907 */
908int ring_buffer_write(struct ring_buffer *buffer,
909 unsigned long length,
910 void *data)
911{
912 struct ring_buffer_per_cpu *cpu_buffer;
913 struct ring_buffer_event *event;
914 unsigned long event_length, flags;
915 void *body;
916 int ret = -EBUSY;
917 int cpu;
918
919 if (atomic_read(&buffer->record_disabled))
920 return -EBUSY;
921
922 local_irq_save(flags);
923 cpu = raw_smp_processor_id();
924
925 if (!cpu_isset(cpu, buffer->cpumask))
926 goto out_irq;
927
928 cpu_buffer = buffer->buffers[cpu];
929 spin_lock(&cpu_buffer->lock);
930
931 if (atomic_read(&cpu_buffer->record_disabled))
932 goto out;
933
934 event_length = rb_calculate_event_length(length);
935 event = rb_reserve_next_event(cpu_buffer,
936 RINGBUF_TYPE_DATA, event_length);
937 if (!event)
938 goto out;
939
940 body = rb_event_data(event);
941
942 memcpy(body, data, length);
943
944 rb_commit(cpu_buffer, event);
945
946 ret = 0;
947 out:
948 spin_unlock(&cpu_buffer->lock);
949 out_irq:
950 local_irq_restore(flags);
951
952 return ret;
953}
954
955/**
956 * ring_buffer_lock - lock the ring buffer
957 * @buffer: The ring buffer to lock
958 * @flags: The place to store the interrupt flags
959 *
960 * This locks all the per CPU buffers.
961 *
962 * Must be unlocked by ring_buffer_unlock.
963 */
964void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags)
965{
966 struct ring_buffer_per_cpu *cpu_buffer;
967 int cpu;
968
969 local_irq_save(*flags);
970
971 for_each_buffer_cpu(buffer, cpu) {
972 cpu_buffer = buffer->buffers[cpu];
973 spin_lock(&cpu_buffer->lock);
974 }
975}
976
977/**
978 * ring_buffer_unlock - unlock a locked buffer
979 * @buffer: The locked buffer to unlock
980 * @flags: The interrupt flags received by ring_buffer_lock
981 */
982void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags)
983{
984 struct ring_buffer_per_cpu *cpu_buffer;
985 int cpu;
986
987 for (cpu = buffer->cpus - 1; cpu >= 0; cpu--) {
988 if (!cpu_isset(cpu, buffer->cpumask))
989 continue;
990 cpu_buffer = buffer->buffers[cpu];
991 spin_unlock(&cpu_buffer->lock);
992 }
993
994 local_irq_restore(flags);
995}
996
997/**
998 * ring_buffer_record_disable - stop all writes into the buffer
999 * @buffer: The ring buffer to stop writes to.
1000 *
1001 * This prevents all writes to the buffer. Any attempt to write
1002 * to the buffer after this will fail and return NULL.
1003 *
1004 * The caller should call synchronize_sched() after this.
1005 */
1006void ring_buffer_record_disable(struct ring_buffer *buffer)
1007{
1008 atomic_inc(&buffer->record_disabled);
1009}
1010
1011/**
1012 * ring_buffer_record_enable - enable writes to the buffer
1013 * @buffer: The ring buffer to enable writes
1014 *
1015 * Note, multiple disables will need the same number of enables
1016 * to truely enable the writing (much like preempt_disable).
1017 */
1018void ring_buffer_record_enable(struct ring_buffer *buffer)
1019{
1020 atomic_dec(&buffer->record_disabled);
1021}
1022
1023/**
1024 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1025 * @buffer: The ring buffer to stop writes to.
1026 * @cpu: The CPU buffer to stop
1027 *
1028 * This prevents all writes to the buffer. Any attempt to write
1029 * to the buffer after this will fail and return NULL.
1030 *
1031 * The caller should call synchronize_sched() after this.
1032 */
1033void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1034{
1035 struct ring_buffer_per_cpu *cpu_buffer;
1036
1037 if (!cpu_isset(cpu, buffer->cpumask))
1038 return;
1039
1040 cpu_buffer = buffer->buffers[cpu];
1041 atomic_inc(&cpu_buffer->record_disabled);
1042}
1043
1044/**
1045 * ring_buffer_record_enable_cpu - enable writes to the buffer
1046 * @buffer: The ring buffer to enable writes
1047 * @cpu: The CPU to enable.
1048 *
1049 * Note, multiple disables will need the same number of enables
1050 * to truely enable the writing (much like preempt_disable).
1051 */
1052void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1053{
1054 struct ring_buffer_per_cpu *cpu_buffer;
1055
1056 if (!cpu_isset(cpu, buffer->cpumask))
1057 return;
1058
1059 cpu_buffer = buffer->buffers[cpu];
1060 atomic_dec(&cpu_buffer->record_disabled);
1061}
1062
1063/**
1064 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1065 * @buffer: The ring buffer
1066 * @cpu: The per CPU buffer to get the entries from.
1067 */
1068unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1069{
1070 struct ring_buffer_per_cpu *cpu_buffer;
1071
1072 if (!cpu_isset(cpu, buffer->cpumask))
1073 return 0;
1074
1075 cpu_buffer = buffer->buffers[cpu];
1076 return cpu_buffer->entries;
1077}
1078
1079/**
1080 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1081 * @buffer: The ring buffer
1082 * @cpu: The per CPU buffer to get the number of overruns from
1083 */
1084unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1085{
1086 struct ring_buffer_per_cpu *cpu_buffer;
1087
1088 if (!cpu_isset(cpu, buffer->cpumask))
1089 return 0;
1090
1091 cpu_buffer = buffer->buffers[cpu];
1092 return cpu_buffer->overrun;
1093}
1094
1095/**
1096 * ring_buffer_entries - get the number of entries in a buffer
1097 * @buffer: The ring buffer
1098 *
1099 * Returns the total number of entries in the ring buffer
1100 * (all CPU entries)
1101 */
1102unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1103{
1104 struct ring_buffer_per_cpu *cpu_buffer;
1105 unsigned long entries = 0;
1106 int cpu;
1107
1108 /* if you care about this being correct, lock the buffer */
1109 for_each_buffer_cpu(buffer, cpu) {
1110 cpu_buffer = buffer->buffers[cpu];
1111 entries += cpu_buffer->entries;
1112 }
1113
1114 return entries;
1115}
1116
1117/**
1118 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1119 * @buffer: The ring buffer
1120 *
1121 * Returns the total number of overruns in the ring buffer
1122 * (all CPU entries)
1123 */
1124unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1125{
1126 struct ring_buffer_per_cpu *cpu_buffer;
1127 unsigned long overruns = 0;
1128 int cpu;
1129
1130 /* if you care about this being correct, lock the buffer */
1131 for_each_buffer_cpu(buffer, cpu) {
1132 cpu_buffer = buffer->buffers[cpu];
1133 overruns += cpu_buffer->overrun;
1134 }
1135
1136 return overruns;
1137}
1138
1139/**
1140 * ring_buffer_iter_reset - reset an iterator
1141 * @iter: The iterator to reset
1142 *
1143 * Resets the iterator, so that it will start from the beginning
1144 * again.
1145 */
1146void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1147{
1148 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1149
1150 iter->head_page = cpu_buffer->head_page;
1151 iter->head = cpu_buffer->head;
1152 rb_reset_iter_read_page(iter);
1153}
1154
1155/**
1156 * ring_buffer_iter_empty - check if an iterator has no more to read
1157 * @iter: The iterator to check
1158 */
1159int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1160{
1161 struct ring_buffer_per_cpu *cpu_buffer;
1162
1163 cpu_buffer = iter->cpu_buffer;
1164
1165 return iter->head_page == cpu_buffer->tail_page &&
1166 iter->head == cpu_buffer->tail;
1167}
1168
1169static void
1170rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1171 struct ring_buffer_event *event)
1172{
1173 u64 delta;
1174
1175 switch (event->type) {
1176 case RINGBUF_TYPE_PADDING:
1177 return;
1178
1179 case RINGBUF_TYPE_TIME_EXTEND:
1180 delta = event->array[0];
1181 delta <<= TS_SHIFT;
1182 delta += event->time_delta;
1183 cpu_buffer->read_stamp += delta;
1184 return;
1185
1186 case RINGBUF_TYPE_TIME_STAMP:
1187 /* FIXME: not implemented */
1188 return;
1189
1190 case RINGBUF_TYPE_DATA:
1191 cpu_buffer->read_stamp += event->time_delta;
1192 return;
1193
1194 default:
1195 BUG();
1196 }
1197 return;
1198}
1199
1200static void
1201rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1202 struct ring_buffer_event *event)
1203{
1204 u64 delta;
1205
1206 switch (event->type) {
1207 case RINGBUF_TYPE_PADDING:
1208 return;
1209
1210 case RINGBUF_TYPE_TIME_EXTEND:
1211 delta = event->array[0];
1212 delta <<= TS_SHIFT;
1213 delta += event->time_delta;
1214 iter->read_stamp += delta;
1215 return;
1216
1217 case RINGBUF_TYPE_TIME_STAMP:
1218 /* FIXME: not implemented */
1219 return;
1220
1221 case RINGBUF_TYPE_DATA:
1222 iter->read_stamp += event->time_delta;
1223 return;
1224
1225 default:
1226 BUG();
1227 }
1228 return;
1229}
1230
1231static void rb_advance_head(struct ring_buffer_per_cpu *cpu_buffer)
1232{
1233 struct ring_buffer_event *event;
1234 unsigned length;
1235
1236 /*
1237 * Check if we are at the end of the buffer.
1238 */
1239 if (cpu_buffer->head >= cpu_buffer->head_page->size) {
1240 BUG_ON(cpu_buffer->head_page == cpu_buffer->tail_page);
1241 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1242 rb_reset_read_page(cpu_buffer);
1243 return;
1244 }
1245
1246 event = rb_head_event(cpu_buffer);
1247
1248 if (event->type == RINGBUF_TYPE_DATA)
1249 cpu_buffer->entries--;
1250
1251 length = rb_event_length(event);
1252
1253 /*
1254 * This should not be called to advance the header if we are
1255 * at the tail of the buffer.
1256 */
1257 BUG_ON((cpu_buffer->head_page == cpu_buffer->tail_page) &&
1258 (cpu_buffer->head + length > cpu_buffer->tail));
1259
1260 rb_update_read_stamp(cpu_buffer, event);
1261
1262 cpu_buffer->head += length;
1263
1264 /* check for end of page */
1265 if ((cpu_buffer->head >= cpu_buffer->head_page->size) &&
1266 (cpu_buffer->head_page != cpu_buffer->tail_page))
1267 rb_advance_head(cpu_buffer);
1268}
1269
1270static void rb_advance_iter(struct ring_buffer_iter *iter)
1271{
1272 struct ring_buffer *buffer;
1273 struct ring_buffer_per_cpu *cpu_buffer;
1274 struct ring_buffer_event *event;
1275 unsigned length;
1276
1277 cpu_buffer = iter->cpu_buffer;
1278 buffer = cpu_buffer->buffer;
1279
1280 /*
1281 * Check if we are at the end of the buffer.
1282 */
1283 if (iter->head >= iter->head_page->size) {
1284 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1285 rb_inc_page(cpu_buffer, &iter->head_page);
1286 rb_reset_iter_read_page(iter);
1287 return;
1288 }
1289
1290 event = rb_iter_head_event(iter);
1291
1292 length = rb_event_length(event);
1293
1294 /*
1295 * This should not be called to advance the header if we are
1296 * at the tail of the buffer.
1297 */
1298 BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1299 (iter->head + length > cpu_buffer->tail));
1300
1301 rb_update_iter_read_stamp(iter, event);
1302
1303 iter->head += length;
1304
1305 /* check for end of page padding */
1306 if ((iter->head >= iter->head_page->size) &&
1307 (iter->head_page != cpu_buffer->tail_page))
1308 rb_advance_iter(iter);
1309}
1310
1311/**
1312 * ring_buffer_peek - peek at the next event to be read
1313 * @buffer: The ring buffer to read
1314 * @cpu: The cpu to peak at
1315 * @ts: The timestamp counter of this event.
1316 *
1317 * This will return the event that will be read next, but does
1318 * not consume the data.
1319 */
1320struct ring_buffer_event *
1321ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1322{
1323 struct ring_buffer_per_cpu *cpu_buffer;
1324 struct ring_buffer_event *event;
1325
1326 if (!cpu_isset(cpu, buffer->cpumask))
1327 return NULL;
1328
1329 cpu_buffer = buffer->buffers[cpu];
1330
1331 again:
1332 if (rb_per_cpu_empty(cpu_buffer))
1333 return NULL;
1334
1335 event = rb_head_event(cpu_buffer);
1336
1337 switch (event->type) {
1338 case RINGBUF_TYPE_PADDING:
1339 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1340 rb_reset_read_page(cpu_buffer);
1341 goto again;
1342
1343 case RINGBUF_TYPE_TIME_EXTEND:
1344 /* Internal data, OK to advance */
1345 rb_advance_head(cpu_buffer);
1346 goto again;
1347
1348 case RINGBUF_TYPE_TIME_STAMP:
1349 /* FIXME: not implemented */
1350 rb_advance_head(cpu_buffer);
1351 goto again;
1352
1353 case RINGBUF_TYPE_DATA:
1354 if (ts) {
1355 *ts = cpu_buffer->read_stamp + event->time_delta;
1356 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1357 }
1358 return event;
1359
1360 default:
1361 BUG();
1362 }
1363
1364 return NULL;
1365}
1366
1367/**
1368 * ring_buffer_iter_peek - peek at the next event to be read
1369 * @iter: The ring buffer iterator
1370 * @ts: The timestamp counter of this event.
1371 *
1372 * This will return the event that will be read next, but does
1373 * not increment the iterator.
1374 */
1375struct ring_buffer_event *
1376ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1377{
1378 struct ring_buffer *buffer;
1379 struct ring_buffer_per_cpu *cpu_buffer;
1380 struct ring_buffer_event *event;
1381
1382 if (ring_buffer_iter_empty(iter))
1383 return NULL;
1384
1385 cpu_buffer = iter->cpu_buffer;
1386 buffer = cpu_buffer->buffer;
1387
1388 again:
1389 if (rb_per_cpu_empty(cpu_buffer))
1390 return NULL;
1391
1392 event = rb_iter_head_event(iter);
1393
1394 switch (event->type) {
1395 case RINGBUF_TYPE_PADDING:
1396 rb_inc_page(cpu_buffer, &iter->head_page);
1397 rb_reset_iter_read_page(iter);
1398 goto again;
1399
1400 case RINGBUF_TYPE_TIME_EXTEND:
1401 /* Internal data, OK to advance */
1402 rb_advance_iter(iter);
1403 goto again;
1404
1405 case RINGBUF_TYPE_TIME_STAMP:
1406 /* FIXME: not implemented */
1407 rb_advance_iter(iter);
1408 goto again;
1409
1410 case RINGBUF_TYPE_DATA:
1411 if (ts) {
1412 *ts = iter->read_stamp + event->time_delta;
1413 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1414 }
1415 return event;
1416
1417 default:
1418 BUG();
1419 }
1420
1421 return NULL;
1422}
1423
1424/**
1425 * ring_buffer_consume - return an event and consume it
1426 * @buffer: The ring buffer to get the next event from
1427 *
1428 * Returns the next event in the ring buffer, and that event is consumed.
1429 * Meaning, that sequential reads will keep returning a different event,
1430 * and eventually empty the ring buffer if the producer is slower.
1431 */
1432struct ring_buffer_event *
1433ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1434{
1435 struct ring_buffer_per_cpu *cpu_buffer;
1436 struct ring_buffer_event *event;
1437
1438 if (!cpu_isset(cpu, buffer->cpumask))
1439 return NULL;
1440
1441 event = ring_buffer_peek(buffer, cpu, ts);
1442 if (!event)
1443 return NULL;
1444
1445 cpu_buffer = buffer->buffers[cpu];
1446 rb_advance_head(cpu_buffer);
1447
1448 return event;
1449}
1450
1451/**
1452 * ring_buffer_read_start - start a non consuming read of the buffer
1453 * @buffer: The ring buffer to read from
1454 * @cpu: The cpu buffer to iterate over
1455 *
1456 * This starts up an iteration through the buffer. It also disables
1457 * the recording to the buffer until the reading is finished.
1458 * This prevents the reading from being corrupted. This is not
1459 * a consuming read, so a producer is not expected.
1460 *
1461 * Must be paired with ring_buffer_finish.
1462 */
1463struct ring_buffer_iter *
1464ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1465{
1466 struct ring_buffer_per_cpu *cpu_buffer;
1467 struct ring_buffer_iter *iter;
1468
1469 if (!cpu_isset(cpu, buffer->cpumask))
1470 return NULL;
1471
1472 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1473 if (!iter)
1474 return NULL;
1475
1476 cpu_buffer = buffer->buffers[cpu];
1477
1478 iter->cpu_buffer = cpu_buffer;
1479
1480 atomic_inc(&cpu_buffer->record_disabled);
1481 synchronize_sched();
1482
1483 spin_lock(&cpu_buffer->lock);
1484 iter->head = cpu_buffer->head;
1485 iter->head_page = cpu_buffer->head_page;
1486 rb_reset_iter_read_page(iter);
1487 spin_unlock(&cpu_buffer->lock);
1488
1489 return iter;
1490}
1491
1492/**
1493 * ring_buffer_finish - finish reading the iterator of the buffer
1494 * @iter: The iterator retrieved by ring_buffer_start
1495 *
1496 * This re-enables the recording to the buffer, and frees the
1497 * iterator.
1498 */
1499void
1500ring_buffer_read_finish(struct ring_buffer_iter *iter)
1501{
1502 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1503
1504 atomic_dec(&cpu_buffer->record_disabled);
1505 kfree(iter);
1506}
1507
1508/**
1509 * ring_buffer_read - read the next item in the ring buffer by the iterator
1510 * @iter: The ring buffer iterator
1511 * @ts: The time stamp of the event read.
1512 *
1513 * This reads the next event in the ring buffer and increments the iterator.
1514 */
1515struct ring_buffer_event *
1516ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1517{
1518 struct ring_buffer_event *event;
1519
1520 event = ring_buffer_iter_peek(iter, ts);
1521 if (!event)
1522 return NULL;
1523
1524 rb_advance_iter(iter);
1525
1526 return event;
1527}
1528
1529/**
1530 * ring_buffer_size - return the size of the ring buffer (in bytes)
1531 * @buffer: The ring buffer.
1532 */
1533unsigned long ring_buffer_size(struct ring_buffer *buffer)
1534{
1535 return BUF_PAGE_SIZE * buffer->pages;
1536}
1537
1538static void
1539rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1540{
1541 cpu_buffer->head_page
1542 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1543 cpu_buffer->tail_page
1544 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1545
1546 cpu_buffer->head = cpu_buffer->tail = 0;
1547 cpu_buffer->overrun = 0;
1548 cpu_buffer->entries = 0;
1549}
1550
1551/**
1552 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1553 * @buffer: The ring buffer to reset a per cpu buffer of
1554 * @cpu: The CPU buffer to be reset
1555 */
1556void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1557{
1558 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1559 unsigned long flags;
1560
1561 if (!cpu_isset(cpu, buffer->cpumask))
1562 return;
1563
1564 raw_local_irq_save(flags);
1565 spin_lock(&cpu_buffer->lock);
1566
1567 rb_reset_cpu(cpu_buffer);
1568
1569 spin_unlock(&cpu_buffer->lock);
1570 raw_local_irq_restore(flags);
1571}
1572
1573/**
1574 * ring_buffer_reset - reset a ring buffer
1575 * @buffer: The ring buffer to reset all cpu buffers
1576 */
1577void ring_buffer_reset(struct ring_buffer *buffer)
1578{
1579 unsigned long flags;
1580 int cpu;
1581
1582 ring_buffer_lock(buffer, &flags);
1583
1584 for_each_buffer_cpu(buffer, cpu)
1585 rb_reset_cpu(buffer->buffers[cpu]);
1586
1587 ring_buffer_unlock(buffer, flags);
1588}
1589
1590/**
1591 * rind_buffer_empty - is the ring buffer empty?
1592 * @buffer: The ring buffer to test
1593 */
1594int ring_buffer_empty(struct ring_buffer *buffer)
1595{
1596 struct ring_buffer_per_cpu *cpu_buffer;
1597 int cpu;
1598
1599 /* yes this is racy, but if you don't like the race, lock the buffer */
1600 for_each_buffer_cpu(buffer, cpu) {
1601 cpu_buffer = buffer->buffers[cpu];
1602 if (!rb_per_cpu_empty(cpu_buffer))
1603 return 0;
1604 }
1605 return 1;
1606}
1607
1608/**
1609 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1610 * @buffer: The ring buffer
1611 * @cpu: The CPU buffer to test
1612 */
1613int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1614{
1615 struct ring_buffer_per_cpu *cpu_buffer;
1616
1617 if (!cpu_isset(cpu, buffer->cpumask))
1618 return 1;
1619
1620 cpu_buffer = buffer->buffers[cpu];
1621 return rb_per_cpu_empty(cpu_buffer);
1622}
1623
1624/**
1625 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1626 * @buffer_a: One buffer to swap with
1627 * @buffer_b: The other buffer to swap with
1628 *
1629 * This function is useful for tracers that want to take a "snapshot"
1630 * of a CPU buffer and has another back up buffer lying around.
1631 * it is expected that the tracer handles the cpu buffer not being
1632 * used at the moment.
1633 */
1634int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1635 struct ring_buffer *buffer_b, int cpu)
1636{
1637 struct ring_buffer_per_cpu *cpu_buffer_a;
1638 struct ring_buffer_per_cpu *cpu_buffer_b;
1639
1640 if (!cpu_isset(cpu, buffer_a->cpumask) ||
1641 !cpu_isset(cpu, buffer_b->cpumask))
1642 return -EINVAL;
1643
1644 /* At least make sure the two buffers are somewhat the same */
1645 if (buffer_a->size != buffer_b->size ||
1646 buffer_a->pages != buffer_b->pages)
1647 return -EINVAL;
1648
1649 cpu_buffer_a = buffer_a->buffers[cpu];
1650 cpu_buffer_b = buffer_b->buffers[cpu];
1651
1652 /*
1653 * We can't do a synchronize_sched here because this
1654 * function can be called in atomic context.
1655 * Normally this will be called from the same CPU as cpu.
1656 * If not it's up to the caller to protect this.
1657 */
1658 atomic_inc(&cpu_buffer_a->record_disabled);
1659 atomic_inc(&cpu_buffer_b->record_disabled);
1660
1661 buffer_a->buffers[cpu] = cpu_buffer_b;
1662 buffer_b->buffers[cpu] = cpu_buffer_a;
1663
1664 cpu_buffer_b->buffer = buffer_a;
1665 cpu_buffer_a->buffer = buffer_b;
1666
1667 atomic_dec(&cpu_buffer_a->record_disabled);
1668 atomic_dec(&cpu_buffer_b->record_disabled);
1669
1670 return 0;
1671}
1672