aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/linux/ring_buffer.h130
-rw-r--r--kernel/trace/Kconfig4
-rw-r--r--kernel/trace/Makefile1
-rw-r--r--kernel/trace/ring_buffer.c1672
4 files changed, 1807 insertions, 0 deletions
diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
new file mode 100644
index 000000000000..c52375b8330d
--- /dev/null
+++ b/include/linux/ring_buffer.h
@@ -0,0 +1,130 @@
1#ifndef _LINUX_RING_BUFFER_H
2#define _LINUX_RING_BUFFER_H
3
4#include <linux/mm.h>
5#include <linux/seq_file.h>
6
7struct ring_buffer;
8struct ring_buffer_iter;
9
10/*
11 * Don't reference this struct directly, use functions below.
12 */
13struct ring_buffer_event {
14 u32 type:2, len:3, time_delta:27;
15 u32 array[];
16};
17
18/**
19 * enum ring_buffer_type - internal ring buffer types
20 *
21 * @RINGBUF_TYPE_PADDING: Left over page padding
22 * array is ignored
23 * size is variable depending on how much
24 * padding is needed
25 *
26 * @RINGBUF_TYPE_TIME_EXTEND: Extend the time delta
27 * array[0] = time delta (28 .. 59)
28 * size = 8 bytes
29 *
30 * @RINGBUF_TYPE_TIME_STAMP: Sync time stamp with external clock
31 * array[0] = tv_nsec
32 * array[1] = tv_sec
33 * size = 16 bytes
34 *
35 * @RINGBUF_TYPE_DATA: Data record
36 * If len is zero:
37 * array[0] holds the actual length
38 * array[1..(length+3)/4-1] holds data
39 * else
40 * length = len << 2
41 * array[0..(length+3)/4] holds data
42 */
43enum ring_buffer_type {
44 RINGBUF_TYPE_PADDING,
45 RINGBUF_TYPE_TIME_EXTEND,
46 /* FIXME: RINGBUF_TYPE_TIME_STAMP not implemented */
47 RINGBUF_TYPE_TIME_STAMP,
48 RINGBUF_TYPE_DATA,
49};
50
51unsigned ring_buffer_event_length(struct ring_buffer_event *event);
52void *ring_buffer_event_data(struct ring_buffer_event *event);
53
54/**
55 * ring_buffer_event_time_delta - return the delta timestamp of the event
56 * @event: the event to get the delta timestamp of
57 *
58 * The delta timestamp is the 27 bit timestamp since the last event.
59 */
60static inline unsigned
61ring_buffer_event_time_delta(struct ring_buffer_event *event)
62{
63 return event->time_delta;
64}
65
66void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags);
67void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags);
68
69/*
70 * size is in bytes for each per CPU buffer.
71 */
72struct ring_buffer *
73ring_buffer_alloc(unsigned long size, unsigned flags);
74void ring_buffer_free(struct ring_buffer *buffer);
75
76int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size);
77
78struct ring_buffer_event *
79ring_buffer_lock_reserve(struct ring_buffer *buffer,
80 unsigned long length,
81 unsigned long *flags);
82int ring_buffer_unlock_commit(struct ring_buffer *buffer,
83 struct ring_buffer_event *event,
84 unsigned long flags);
85int ring_buffer_write(struct ring_buffer *buffer,
86 unsigned long length, void *data);
87
88struct ring_buffer_event *
89ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts);
90struct ring_buffer_event *
91ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts);
92
93struct ring_buffer_iter *
94ring_buffer_read_start(struct ring_buffer *buffer, int cpu);
95void ring_buffer_read_finish(struct ring_buffer_iter *iter);
96
97struct ring_buffer_event *
98ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts);
99struct ring_buffer_event *
100ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts);
101void ring_buffer_iter_reset(struct ring_buffer_iter *iter);
102int ring_buffer_iter_empty(struct ring_buffer_iter *iter);
103
104unsigned long ring_buffer_size(struct ring_buffer *buffer);
105
106void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu);
107void ring_buffer_reset(struct ring_buffer *buffer);
108
109int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
110 struct ring_buffer *buffer_b, int cpu);
111
112int ring_buffer_empty(struct ring_buffer *buffer);
113int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu);
114
115void ring_buffer_record_disable(struct ring_buffer *buffer);
116void ring_buffer_record_enable(struct ring_buffer *buffer);
117void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu);
118void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu);
119
120unsigned long ring_buffer_entries(struct ring_buffer *buffer);
121unsigned long ring_buffer_overruns(struct ring_buffer *buffer);
122
123u64 ring_buffer_time_stamp(int cpu);
124void ring_buffer_normalize_time_stamp(int cpu, u64 *ts);
125
126enum ring_buffer_flags {
127 RB_FL_OVERWRITE = 1 << 0,
128};
129
130#endif /* _LINUX_RING_BUFFER_H */
diff --git a/kernel/trace/Kconfig b/kernel/trace/Kconfig
index 4feb3c81f94d..396aea11398e 100644
--- a/kernel/trace/Kconfig
+++ b/kernel/trace/Kconfig
@@ -18,9 +18,13 @@ config HAVE_FTRACE_MCOUNT_RECORD
18config TRACER_MAX_TRACE 18config TRACER_MAX_TRACE
19 bool 19 bool
20 20
21config RING_BUFFER
22 bool
23
21config TRACING 24config TRACING
22 bool 25 bool
23 select DEBUG_FS 26 select DEBUG_FS
27 select RING_BUFFER
24 select STACKTRACE 28 select STACKTRACE
25 select TRACEPOINTS 29 select TRACEPOINTS
26 30
diff --git a/kernel/trace/Makefile b/kernel/trace/Makefile
index 35a07f7cfa86..a85dfba88ba0 100644
--- a/kernel/trace/Makefile
+++ b/kernel/trace/Makefile
@@ -11,6 +11,7 @@ obj-y += trace_selftest_dynamic.o
11endif 11endif
12 12
13obj-$(CONFIG_FTRACE) += libftrace.o 13obj-$(CONFIG_FTRACE) += libftrace.o
14obj-$(CONFIG_RING_BUFFER) += ring_buffer.o
14 15
15obj-$(CONFIG_TRACING) += trace.o 16obj-$(CONFIG_TRACING) += trace.o
16obj-$(CONFIG_CONTEXT_SWITCH_TRACER) += trace_sched_switch.o 17obj-$(CONFIG_CONTEXT_SWITCH_TRACER) += trace_sched_switch.o
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
new file mode 100644
index 000000000000..830a2930dd91
--- /dev/null
+++ b/kernel/trace/ring_buffer.c
@@ -0,0 +1,1672 @@
1/*
2 * Generic ring buffer
3 *
4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5 */
6#include <linux/ring_buffer.h>
7#include <linux/spinlock.h>
8#include <linux/debugfs.h>
9#include <linux/uaccess.h>
10#include <linux/module.h>
11#include <linux/percpu.h>
12#include <linux/mutex.h>
13#include <linux/sched.h> /* used for sched_clock() (for now) */
14#include <linux/init.h>
15#include <linux/hash.h>
16#include <linux/list.h>
17#include <linux/fs.h>
18
19/* Up this if you want to test the TIME_EXTENTS and normalization */
20#define DEBUG_SHIFT 0
21
22/* FIXME!!! */
23u64 ring_buffer_time_stamp(int cpu)
24{
25 /* shift to debug/test normalization and TIME_EXTENTS */
26 return sched_clock() << DEBUG_SHIFT;
27}
28
29void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
30{
31 /* Just stupid testing the normalize function and deltas */
32 *ts >>= DEBUG_SHIFT;
33}
34
35#define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36#define RB_ALIGNMENT_SHIFT 2
37#define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
38#define RB_MAX_SMALL_DATA 28
39
40enum {
41 RB_LEN_TIME_EXTEND = 8,
42 RB_LEN_TIME_STAMP = 16,
43};
44
45/* inline for ring buffer fast paths */
46static inline unsigned
47rb_event_length(struct ring_buffer_event *event)
48{
49 unsigned length;
50
51 switch (event->type) {
52 case RINGBUF_TYPE_PADDING:
53 /* undefined */
54 return -1;
55
56 case RINGBUF_TYPE_TIME_EXTEND:
57 return RB_LEN_TIME_EXTEND;
58
59 case RINGBUF_TYPE_TIME_STAMP:
60 return RB_LEN_TIME_STAMP;
61
62 case RINGBUF_TYPE_DATA:
63 if (event->len)
64 length = event->len << RB_ALIGNMENT_SHIFT;
65 else
66 length = event->array[0];
67 return length + RB_EVNT_HDR_SIZE;
68 default:
69 BUG();
70 }
71 /* not hit */
72 return 0;
73}
74
75/**
76 * ring_buffer_event_length - return the length of the event
77 * @event: the event to get the length of
78 */
79unsigned ring_buffer_event_length(struct ring_buffer_event *event)
80{
81 return rb_event_length(event);
82}
83
84/* inline for ring buffer fast paths */
85static inline void *
86rb_event_data(struct ring_buffer_event *event)
87{
88 BUG_ON(event->type != RINGBUF_TYPE_DATA);
89 /* If length is in len field, then array[0] has the data */
90 if (event->len)
91 return (void *)&event->array[0];
92 /* Otherwise length is in array[0] and array[1] has the data */
93 return (void *)&event->array[1];
94}
95
96/**
97 * ring_buffer_event_data - return the data of the event
98 * @event: the event to get the data from
99 */
100void *ring_buffer_event_data(struct ring_buffer_event *event)
101{
102 return rb_event_data(event);
103}
104
105#define for_each_buffer_cpu(buffer, cpu) \
106 for_each_cpu_mask(cpu, buffer->cpumask)
107
108#define TS_SHIFT 27
109#define TS_MASK ((1ULL << TS_SHIFT) - 1)
110#define TS_DELTA_TEST (~TS_MASK)
111
112/*
113 * This hack stolen from mm/slob.c.
114 * We can store per page timing information in the page frame of the page.
115 * Thanks to Peter Zijlstra for suggesting this idea.
116 */
117struct buffer_page {
118 union {
119 struct {
120 unsigned long flags; /* mandatory */
121 atomic_t _count; /* mandatory */
122 u64 time_stamp; /* page time stamp */
123 unsigned size; /* size of page data */
124 struct list_head list; /* list of free pages */
125 };
126 struct page page;
127 };
128};
129
130/*
131 * We need to fit the time_stamp delta into 27 bits.
132 */
133static inline int test_time_stamp(u64 delta)
134{
135 if (delta & TS_DELTA_TEST)
136 return 1;
137 return 0;
138}
139
140#define BUF_PAGE_SIZE PAGE_SIZE
141
142/*
143 * head_page == tail_page && head == tail then buffer is empty.
144 */
145struct ring_buffer_per_cpu {
146 int cpu;
147 struct ring_buffer *buffer;
148 spinlock_t lock;
149 struct lock_class_key lock_key;
150 struct list_head pages;
151 unsigned long head; /* read from head */
152 unsigned long tail; /* write to tail */
153 struct buffer_page *head_page;
154 struct buffer_page *tail_page;
155 unsigned long overrun;
156 unsigned long entries;
157 u64 write_stamp;
158 u64 read_stamp;
159 atomic_t record_disabled;
160};
161
162struct ring_buffer {
163 unsigned long size;
164 unsigned pages;
165 unsigned flags;
166 int cpus;
167 cpumask_t cpumask;
168 atomic_t record_disabled;
169
170 struct mutex mutex;
171
172 struct ring_buffer_per_cpu **buffers;
173};
174
175struct ring_buffer_iter {
176 struct ring_buffer_per_cpu *cpu_buffer;
177 unsigned long head;
178 struct buffer_page *head_page;
179 u64 read_stamp;
180};
181
182#define RB_WARN_ON(buffer, cond) \
183 if (unlikely(cond)) { \
184 atomic_inc(&buffer->record_disabled); \
185 WARN_ON(1); \
186 return -1; \
187 }
188
189/**
190 * check_pages - integrity check of buffer pages
191 * @cpu_buffer: CPU buffer with pages to test
192 *
193 * As a safty measure we check to make sure the data pages have not
194 * been corrupted.
195 */
196static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
197{
198 struct list_head *head = &cpu_buffer->pages;
199 struct buffer_page *page, *tmp;
200
201 RB_WARN_ON(cpu_buffer, head->next->prev != head);
202 RB_WARN_ON(cpu_buffer, head->prev->next != head);
203
204 list_for_each_entry_safe(page, tmp, head, list) {
205 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
206 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
207 }
208
209 return 0;
210}
211
212static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
213{
214 return cpu_buffer->head_page->size;
215}
216
217static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
218 unsigned nr_pages)
219{
220 struct list_head *head = &cpu_buffer->pages;
221 struct buffer_page *page, *tmp;
222 unsigned long addr;
223 LIST_HEAD(pages);
224 unsigned i;
225
226 for (i = 0; i < nr_pages; i++) {
227 addr = __get_free_page(GFP_KERNEL);
228 if (!addr)
229 goto free_pages;
230 page = (struct buffer_page *)virt_to_page(addr);
231 list_add(&page->list, &pages);
232 }
233
234 list_splice(&pages, head);
235
236 rb_check_pages(cpu_buffer);
237
238 return 0;
239
240 free_pages:
241 list_for_each_entry_safe(page, tmp, &pages, list) {
242 list_del_init(&page->list);
243 __free_page(&page->page);
244 }
245 return -ENOMEM;
246}
247
248static struct ring_buffer_per_cpu *
249rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
250{
251 struct ring_buffer_per_cpu *cpu_buffer;
252 int ret;
253
254 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
255 GFP_KERNEL, cpu_to_node(cpu));
256 if (!cpu_buffer)
257 return NULL;
258
259 cpu_buffer->cpu = cpu;
260 cpu_buffer->buffer = buffer;
261 spin_lock_init(&cpu_buffer->lock);
262 INIT_LIST_HEAD(&cpu_buffer->pages);
263
264 ret = rb_allocate_pages(cpu_buffer, buffer->pages);
265 if (ret < 0)
266 goto fail_free_buffer;
267
268 cpu_buffer->head_page
269 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
270 cpu_buffer->tail_page
271 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
272
273 return cpu_buffer;
274
275 fail_free_buffer:
276 kfree(cpu_buffer);
277 return NULL;
278}
279
280static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
281{
282 struct list_head *head = &cpu_buffer->pages;
283 struct buffer_page *page, *tmp;
284
285 list_for_each_entry_safe(page, tmp, head, list) {
286 list_del_init(&page->list);
287 __free_page(&page->page);
288 }
289 kfree(cpu_buffer);
290}
291
292/**
293 * ring_buffer_alloc - allocate a new ring_buffer
294 * @size: the size in bytes that is needed.
295 * @flags: attributes to set for the ring buffer.
296 *
297 * Currently the only flag that is available is the RB_FL_OVERWRITE
298 * flag. This flag means that the buffer will overwrite old data
299 * when the buffer wraps. If this flag is not set, the buffer will
300 * drop data when the tail hits the head.
301 */
302struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
303{
304 struct ring_buffer *buffer;
305 int bsize;
306 int cpu;
307
308 /* keep it in its own cache line */
309 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
310 GFP_KERNEL);
311 if (!buffer)
312 return NULL;
313
314 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
315 buffer->flags = flags;
316
317 /* need at least two pages */
318 if (buffer->pages == 1)
319 buffer->pages++;
320
321 buffer->cpumask = cpu_possible_map;
322 buffer->cpus = nr_cpu_ids;
323
324 bsize = sizeof(void *) * nr_cpu_ids;
325 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
326 GFP_KERNEL);
327 if (!buffer->buffers)
328 goto fail_free_buffer;
329
330 for_each_buffer_cpu(buffer, cpu) {
331 buffer->buffers[cpu] =
332 rb_allocate_cpu_buffer(buffer, cpu);
333 if (!buffer->buffers[cpu])
334 goto fail_free_buffers;
335 }
336
337 mutex_init(&buffer->mutex);
338
339 return buffer;
340
341 fail_free_buffers:
342 for_each_buffer_cpu(buffer, cpu) {
343 if (buffer->buffers[cpu])
344 rb_free_cpu_buffer(buffer->buffers[cpu]);
345 }
346 kfree(buffer->buffers);
347
348 fail_free_buffer:
349 kfree(buffer);
350 return NULL;
351}
352
353/**
354 * ring_buffer_free - free a ring buffer.
355 * @buffer: the buffer to free.
356 */
357void
358ring_buffer_free(struct ring_buffer *buffer)
359{
360 int cpu;
361
362 for_each_buffer_cpu(buffer, cpu)
363 rb_free_cpu_buffer(buffer->buffers[cpu]);
364
365 kfree(buffer);
366}
367
368static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
369
370static void
371rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
372{
373 struct buffer_page *page;
374 struct list_head *p;
375 unsigned i;
376
377 atomic_inc(&cpu_buffer->record_disabled);
378 synchronize_sched();
379
380 for (i = 0; i < nr_pages; i++) {
381 BUG_ON(list_empty(&cpu_buffer->pages));
382 p = cpu_buffer->pages.next;
383 page = list_entry(p, struct buffer_page, list);
384 list_del_init(&page->list);
385 __free_page(&page->page);
386 }
387 BUG_ON(list_empty(&cpu_buffer->pages));
388
389 rb_reset_cpu(cpu_buffer);
390
391 rb_check_pages(cpu_buffer);
392
393 atomic_dec(&cpu_buffer->record_disabled);
394
395}
396
397static void
398rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
399 struct list_head *pages, unsigned nr_pages)
400{
401 struct buffer_page *page;
402 struct list_head *p;
403 unsigned i;
404
405 atomic_inc(&cpu_buffer->record_disabled);
406 synchronize_sched();
407
408 for (i = 0; i < nr_pages; i++) {
409 BUG_ON(list_empty(pages));
410 p = pages->next;
411 page = list_entry(p, struct buffer_page, list);
412 list_del_init(&page->list);
413 list_add_tail(&page->list, &cpu_buffer->pages);
414 }
415 rb_reset_cpu(cpu_buffer);
416
417 rb_check_pages(cpu_buffer);
418
419 atomic_dec(&cpu_buffer->record_disabled);
420}
421
422/**
423 * ring_buffer_resize - resize the ring buffer
424 * @buffer: the buffer to resize.
425 * @size: the new size.
426 *
427 * The tracer is responsible for making sure that the buffer is
428 * not being used while changing the size.
429 * Note: We may be able to change the above requirement by using
430 * RCU synchronizations.
431 *
432 * Minimum size is 2 * BUF_PAGE_SIZE.
433 *
434 * Returns -1 on failure.
435 */
436int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
437{
438 struct ring_buffer_per_cpu *cpu_buffer;
439 unsigned nr_pages, rm_pages, new_pages;
440 struct buffer_page *page, *tmp;
441 unsigned long buffer_size;
442 unsigned long addr;
443 LIST_HEAD(pages);
444 int i, cpu;
445
446 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
447 size *= BUF_PAGE_SIZE;
448 buffer_size = buffer->pages * BUF_PAGE_SIZE;
449
450 /* we need a minimum of two pages */
451 if (size < BUF_PAGE_SIZE * 2)
452 size = BUF_PAGE_SIZE * 2;
453
454 if (size == buffer_size)
455 return size;
456
457 mutex_lock(&buffer->mutex);
458
459 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
460
461 if (size < buffer_size) {
462
463 /* easy case, just free pages */
464 BUG_ON(nr_pages >= buffer->pages);
465
466 rm_pages = buffer->pages - nr_pages;
467
468 for_each_buffer_cpu(buffer, cpu) {
469 cpu_buffer = buffer->buffers[cpu];
470 rb_remove_pages(cpu_buffer, rm_pages);
471 }
472 goto out;
473 }
474
475 /*
476 * This is a bit more difficult. We only want to add pages
477 * when we can allocate enough for all CPUs. We do this
478 * by allocating all the pages and storing them on a local
479 * link list. If we succeed in our allocation, then we
480 * add these pages to the cpu_buffers. Otherwise we just free
481 * them all and return -ENOMEM;
482 */
483 BUG_ON(nr_pages <= buffer->pages);
484 new_pages = nr_pages - buffer->pages;
485
486 for_each_buffer_cpu(buffer, cpu) {
487 for (i = 0; i < new_pages; i++) {
488 addr = __get_free_page(GFP_KERNEL);
489 if (!addr)
490 goto free_pages;
491 page = (struct buffer_page *)virt_to_page(addr);
492 list_add(&page->list, &pages);
493 }
494 }
495
496 for_each_buffer_cpu(buffer, cpu) {
497 cpu_buffer = buffer->buffers[cpu];
498 rb_insert_pages(cpu_buffer, &pages, new_pages);
499 }
500
501 BUG_ON(!list_empty(&pages));
502
503 out:
504 buffer->pages = nr_pages;
505 mutex_unlock(&buffer->mutex);
506
507 return size;
508
509 free_pages:
510 list_for_each_entry_safe(page, tmp, &pages, list) {
511 list_del_init(&page->list);
512 __free_page(&page->page);
513 }
514 return -ENOMEM;
515}
516
517static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
518{
519 return cpu_buffer->head_page == cpu_buffer->tail_page &&
520 cpu_buffer->head == cpu_buffer->tail;
521}
522
523static inline int rb_null_event(struct ring_buffer_event *event)
524{
525 return event->type == RINGBUF_TYPE_PADDING;
526}
527
528static inline void *rb_page_index(struct buffer_page *page, unsigned index)
529{
530 void *addr = page_address(&page->page);
531
532 return addr + index;
533}
534
535static inline struct ring_buffer_event *
536rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
537{
538 return rb_page_index(cpu_buffer->head_page,
539 cpu_buffer->head);
540}
541
542static inline struct ring_buffer_event *
543rb_iter_head_event(struct ring_buffer_iter *iter)
544{
545 return rb_page_index(iter->head_page,
546 iter->head);
547}
548
549/*
550 * When the tail hits the head and the buffer is in overwrite mode,
551 * the head jumps to the next page and all content on the previous
552 * page is discarded. But before doing so, we update the overrun
553 * variable of the buffer.
554 */
555static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
556{
557 struct ring_buffer_event *event;
558 unsigned long head;
559
560 for (head = 0; head < rb_head_size(cpu_buffer);
561 head += rb_event_length(event)) {
562
563 event = rb_page_index(cpu_buffer->head_page, head);
564 BUG_ON(rb_null_event(event));
565 /* Only count data entries */
566 if (event->type != RINGBUF_TYPE_DATA)
567 continue;
568 cpu_buffer->overrun++;
569 cpu_buffer->entries--;
570 }
571}
572
573static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
574 struct buffer_page **page)
575{
576 struct list_head *p = (*page)->list.next;
577
578 if (p == &cpu_buffer->pages)
579 p = p->next;
580
581 *page = list_entry(p, struct buffer_page, list);
582}
583
584static inline void
585rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
586{
587 cpu_buffer->tail_page->time_stamp = *ts;
588 cpu_buffer->write_stamp = *ts;
589}
590
591static void rb_reset_read_page(struct ring_buffer_per_cpu *cpu_buffer)
592{
593 cpu_buffer->read_stamp = cpu_buffer->head_page->time_stamp;
594 cpu_buffer->head = 0;
595}
596
597static void
598rb_reset_iter_read_page(struct ring_buffer_iter *iter)
599{
600 iter->read_stamp = iter->head_page->time_stamp;
601 iter->head = 0;
602}
603
604/**
605 * ring_buffer_update_event - update event type and data
606 * @event: the even to update
607 * @type: the type of event
608 * @length: the size of the event field in the ring buffer
609 *
610 * Update the type and data fields of the event. The length
611 * is the actual size that is written to the ring buffer,
612 * and with this, we can determine what to place into the
613 * data field.
614 */
615static inline void
616rb_update_event(struct ring_buffer_event *event,
617 unsigned type, unsigned length)
618{
619 event->type = type;
620
621 switch (type) {
622
623 case RINGBUF_TYPE_PADDING:
624 break;
625
626 case RINGBUF_TYPE_TIME_EXTEND:
627 event->len =
628 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
629 >> RB_ALIGNMENT_SHIFT;
630 break;
631
632 case RINGBUF_TYPE_TIME_STAMP:
633 event->len =
634 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
635 >> RB_ALIGNMENT_SHIFT;
636 break;
637
638 case RINGBUF_TYPE_DATA:
639 length -= RB_EVNT_HDR_SIZE;
640 if (length > RB_MAX_SMALL_DATA) {
641 event->len = 0;
642 event->array[0] = length;
643 } else
644 event->len =
645 (length + (RB_ALIGNMENT-1))
646 >> RB_ALIGNMENT_SHIFT;
647 break;
648 default:
649 BUG();
650 }
651}
652
653static inline unsigned rb_calculate_event_length(unsigned length)
654{
655 struct ring_buffer_event event; /* Used only for sizeof array */
656
657 /* zero length can cause confusions */
658 if (!length)
659 length = 1;
660
661 if (length > RB_MAX_SMALL_DATA)
662 length += sizeof(event.array[0]);
663
664 length += RB_EVNT_HDR_SIZE;
665 length = ALIGN(length, RB_ALIGNMENT);
666
667 return length;
668}
669
670static struct ring_buffer_event *
671__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
672 unsigned type, unsigned long length, u64 *ts)
673{
674 struct buffer_page *head_page, *tail_page;
675 unsigned long tail;
676 struct ring_buffer *buffer = cpu_buffer->buffer;
677 struct ring_buffer_event *event;
678
679 tail_page = cpu_buffer->tail_page;
680 head_page = cpu_buffer->head_page;
681 tail = cpu_buffer->tail;
682
683 if (tail + length > BUF_PAGE_SIZE) {
684 struct buffer_page *next_page = tail_page;
685
686 rb_inc_page(cpu_buffer, &next_page);
687
688 if (next_page == head_page) {
689 if (!(buffer->flags & RB_FL_OVERWRITE))
690 return NULL;
691
692 /* count overflows */
693 rb_update_overflow(cpu_buffer);
694
695 rb_inc_page(cpu_buffer, &head_page);
696 cpu_buffer->head_page = head_page;
697 rb_reset_read_page(cpu_buffer);
698 }
699
700 if (tail != BUF_PAGE_SIZE) {
701 event = rb_page_index(tail_page, tail);
702 /* page padding */
703 event->type = RINGBUF_TYPE_PADDING;
704 }
705
706 tail_page->size = tail;
707 tail_page = next_page;
708 tail_page->size = 0;
709 tail = 0;
710 cpu_buffer->tail_page = tail_page;
711 cpu_buffer->tail = tail;
712 rb_add_stamp(cpu_buffer, ts);
713 }
714
715 BUG_ON(tail + length > BUF_PAGE_SIZE);
716
717 event = rb_page_index(tail_page, tail);
718 rb_update_event(event, type, length);
719
720 return event;
721}
722
723static int
724rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
725 u64 *ts, u64 *delta)
726{
727 struct ring_buffer_event *event;
728 static int once;
729
730 if (unlikely(*delta > (1ULL << 59) && !once++)) {
731 printk(KERN_WARNING "Delta way too big! %llu"
732 " ts=%llu write stamp = %llu\n",
733 *delta, *ts, cpu_buffer->write_stamp);
734 WARN_ON(1);
735 }
736
737 /*
738 * The delta is too big, we to add a
739 * new timestamp.
740 */
741 event = __rb_reserve_next(cpu_buffer,
742 RINGBUF_TYPE_TIME_EXTEND,
743 RB_LEN_TIME_EXTEND,
744 ts);
745 if (!event)
746 return -1;
747
748 /* check to see if we went to the next page */
749 if (cpu_buffer->tail) {
750 /* Still on same page, update timestamp */
751 event->time_delta = *delta & TS_MASK;
752 event->array[0] = *delta >> TS_SHIFT;
753 /* commit the time event */
754 cpu_buffer->tail +=
755 rb_event_length(event);
756 cpu_buffer->write_stamp = *ts;
757 *delta = 0;
758 }
759
760 return 0;
761}
762
763static struct ring_buffer_event *
764rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
765 unsigned type, unsigned long length)
766{
767 struct ring_buffer_event *event;
768 u64 ts, delta;
769
770 ts = ring_buffer_time_stamp(cpu_buffer->cpu);
771
772 if (cpu_buffer->tail) {
773 delta = ts - cpu_buffer->write_stamp;
774
775 if (test_time_stamp(delta)) {
776 int ret;
777
778 ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
779 if (ret < 0)
780 return NULL;
781 }
782 } else {
783 rb_add_stamp(cpu_buffer, &ts);
784 delta = 0;
785 }
786
787 event = __rb_reserve_next(cpu_buffer, type, length, &ts);
788 if (!event)
789 return NULL;
790
791 /* If the reserve went to the next page, our delta is zero */
792 if (!cpu_buffer->tail)
793 delta = 0;
794
795 event->time_delta = delta;
796
797 return event;
798}
799
800/**
801 * ring_buffer_lock_reserve - reserve a part of the buffer
802 * @buffer: the ring buffer to reserve from
803 * @length: the length of the data to reserve (excluding event header)
804 * @flags: a pointer to save the interrupt flags
805 *
806 * Returns a reseverd event on the ring buffer to copy directly to.
807 * The user of this interface will need to get the body to write into
808 * and can use the ring_buffer_event_data() interface.
809 *
810 * The length is the length of the data needed, not the event length
811 * which also includes the event header.
812 *
813 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
814 * If NULL is returned, then nothing has been allocated or locked.
815 */
816struct ring_buffer_event *
817ring_buffer_lock_reserve(struct ring_buffer *buffer,
818 unsigned long length,
819 unsigned long *flags)
820{
821 struct ring_buffer_per_cpu *cpu_buffer;
822 struct ring_buffer_event *event;
823 int cpu;
824
825 if (atomic_read(&buffer->record_disabled))
826 return NULL;
827
828 raw_local_irq_save(*flags);
829 cpu = raw_smp_processor_id();
830
831 if (!cpu_isset(cpu, buffer->cpumask))
832 goto out_irq;
833
834 cpu_buffer = buffer->buffers[cpu];
835 spin_lock(&cpu_buffer->lock);
836
837 if (atomic_read(&cpu_buffer->record_disabled))
838 goto no_record;
839
840 length = rb_calculate_event_length(length);
841 if (length > BUF_PAGE_SIZE)
842 return NULL;
843
844 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
845 if (!event)
846 goto no_record;
847
848 return event;
849
850 no_record:
851 spin_unlock(&cpu_buffer->lock);
852 out_irq:
853 local_irq_restore(*flags);
854 return NULL;
855}
856
857static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
858 struct ring_buffer_event *event)
859{
860 cpu_buffer->tail += rb_event_length(event);
861 cpu_buffer->tail_page->size = cpu_buffer->tail;
862 cpu_buffer->write_stamp += event->time_delta;
863 cpu_buffer->entries++;
864}
865
866/**
867 * ring_buffer_unlock_commit - commit a reserved
868 * @buffer: The buffer to commit to
869 * @event: The event pointer to commit.
870 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
871 *
872 * This commits the data to the ring buffer, and releases any locks held.
873 *
874 * Must be paired with ring_buffer_lock_reserve.
875 */
876int ring_buffer_unlock_commit(struct ring_buffer *buffer,
877 struct ring_buffer_event *event,
878 unsigned long flags)
879{
880 struct ring_buffer_per_cpu *cpu_buffer;
881 int cpu = raw_smp_processor_id();
882
883 cpu_buffer = buffer->buffers[cpu];
884
885 assert_spin_locked(&cpu_buffer->lock);
886
887 rb_commit(cpu_buffer, event);
888
889 spin_unlock(&cpu_buffer->lock);
890 raw_local_irq_restore(flags);
891
892 return 0;
893}
894
895/**
896 * ring_buffer_write - write data to the buffer without reserving
897 * @buffer: The ring buffer to write to.
898 * @length: The length of the data being written (excluding the event header)
899 * @data: The data to write to the buffer.
900 *
901 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
902 * one function. If you already have the data to write to the buffer, it
903 * may be easier to simply call this function.
904 *
905 * Note, like ring_buffer_lock_reserve, the length is the length of the data
906 * and not the length of the event which would hold the header.
907 */
908int ring_buffer_write(struct ring_buffer *buffer,
909 unsigned long length,
910 void *data)
911{
912 struct ring_buffer_per_cpu *cpu_buffer;
913 struct ring_buffer_event *event;
914 unsigned long event_length, flags;
915 void *body;
916 int ret = -EBUSY;
917 int cpu;
918
919 if (atomic_read(&buffer->record_disabled))
920 return -EBUSY;
921
922 local_irq_save(flags);
923 cpu = raw_smp_processor_id();
924
925 if (!cpu_isset(cpu, buffer->cpumask))
926 goto out_irq;
927
928 cpu_buffer = buffer->buffers[cpu];
929 spin_lock(&cpu_buffer->lock);
930
931 if (atomic_read(&cpu_buffer->record_disabled))
932 goto out;
933
934 event_length = rb_calculate_event_length(length);
935 event = rb_reserve_next_event(cpu_buffer,
936 RINGBUF_TYPE_DATA, event_length);
937 if (!event)
938 goto out;
939
940 body = rb_event_data(event);
941
942 memcpy(body, data, length);
943
944 rb_commit(cpu_buffer, event);
945
946 ret = 0;
947 out:
948 spin_unlock(&cpu_buffer->lock);
949 out_irq:
950 local_irq_restore(flags);
951
952 return ret;
953}
954
955/**
956 * ring_buffer_lock - lock the ring buffer
957 * @buffer: The ring buffer to lock
958 * @flags: The place to store the interrupt flags
959 *
960 * This locks all the per CPU buffers.
961 *
962 * Must be unlocked by ring_buffer_unlock.
963 */
964void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags)
965{
966 struct ring_buffer_per_cpu *cpu_buffer;
967 int cpu;
968
969 local_irq_save(*flags);
970
971 for_each_buffer_cpu(buffer, cpu) {
972 cpu_buffer = buffer->buffers[cpu];
973 spin_lock(&cpu_buffer->lock);
974 }
975}
976
977/**
978 * ring_buffer_unlock - unlock a locked buffer
979 * @buffer: The locked buffer to unlock
980 * @flags: The interrupt flags received by ring_buffer_lock
981 */
982void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags)
983{
984 struct ring_buffer_per_cpu *cpu_buffer;
985 int cpu;
986
987 for (cpu = buffer->cpus - 1; cpu >= 0; cpu--) {
988 if (!cpu_isset(cpu, buffer->cpumask))
989 continue;
990 cpu_buffer = buffer->buffers[cpu];
991 spin_unlock(&cpu_buffer->lock);
992 }
993
994 local_irq_restore(flags);
995}
996
997/**
998 * ring_buffer_record_disable - stop all writes into the buffer
999 * @buffer: The ring buffer to stop writes to.
1000 *
1001 * This prevents all writes to the buffer. Any attempt to write
1002 * to the buffer after this will fail and return NULL.
1003 *
1004 * The caller should call synchronize_sched() after this.
1005 */
1006void ring_buffer_record_disable(struct ring_buffer *buffer)
1007{
1008 atomic_inc(&buffer->record_disabled);
1009}
1010
1011/**
1012 * ring_buffer_record_enable - enable writes to the buffer
1013 * @buffer: The ring buffer to enable writes
1014 *
1015 * Note, multiple disables will need the same number of enables
1016 * to truely enable the writing (much like preempt_disable).
1017 */
1018void ring_buffer_record_enable(struct ring_buffer *buffer)
1019{
1020 atomic_dec(&buffer->record_disabled);
1021}
1022
1023/**
1024 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1025 * @buffer: The ring buffer to stop writes to.
1026 * @cpu: The CPU buffer to stop
1027 *
1028 * This prevents all writes to the buffer. Any attempt to write
1029 * to the buffer after this will fail and return NULL.
1030 *
1031 * The caller should call synchronize_sched() after this.
1032 */
1033void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1034{
1035 struct ring_buffer_per_cpu *cpu_buffer;
1036
1037 if (!cpu_isset(cpu, buffer->cpumask))
1038 return;
1039
1040 cpu_buffer = buffer->buffers[cpu];
1041 atomic_inc(&cpu_buffer->record_disabled);
1042}
1043
1044/**
1045 * ring_buffer_record_enable_cpu - enable writes to the buffer
1046 * @buffer: The ring buffer to enable writes
1047 * @cpu: The CPU to enable.
1048 *
1049 * Note, multiple disables will need the same number of enables
1050 * to truely enable the writing (much like preempt_disable).
1051 */
1052void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1053{
1054 struct ring_buffer_per_cpu *cpu_buffer;
1055
1056 if (!cpu_isset(cpu, buffer->cpumask))
1057 return;
1058
1059 cpu_buffer = buffer->buffers[cpu];
1060 atomic_dec(&cpu_buffer->record_disabled);
1061}
1062
1063/**
1064 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1065 * @buffer: The ring buffer
1066 * @cpu: The per CPU buffer to get the entries from.
1067 */
1068unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1069{
1070 struct ring_buffer_per_cpu *cpu_buffer;
1071
1072 if (!cpu_isset(cpu, buffer->cpumask))
1073 return 0;
1074
1075 cpu_buffer = buffer->buffers[cpu];
1076 return cpu_buffer->entries;
1077}
1078
1079/**
1080 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1081 * @buffer: The ring buffer
1082 * @cpu: The per CPU buffer to get the number of overruns from
1083 */
1084unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1085{
1086 struct ring_buffer_per_cpu *cpu_buffer;
1087
1088 if (!cpu_isset(cpu, buffer->cpumask))
1089 return 0;
1090
1091 cpu_buffer = buffer->buffers[cpu];
1092 return cpu_buffer->overrun;
1093}
1094
1095/**
1096 * ring_buffer_entries - get the number of entries in a buffer
1097 * @buffer: The ring buffer
1098 *
1099 * Returns the total number of entries in the ring buffer
1100 * (all CPU entries)
1101 */
1102unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1103{
1104 struct ring_buffer_per_cpu *cpu_buffer;
1105 unsigned long entries = 0;
1106 int cpu;
1107
1108 /* if you care about this being correct, lock the buffer */
1109 for_each_buffer_cpu(buffer, cpu) {
1110 cpu_buffer = buffer->buffers[cpu];
1111 entries += cpu_buffer->entries;
1112 }
1113
1114 return entries;
1115}
1116
1117/**
1118 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1119 * @buffer: The ring buffer
1120 *
1121 * Returns the total number of overruns in the ring buffer
1122 * (all CPU entries)
1123 */
1124unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1125{
1126 struct ring_buffer_per_cpu *cpu_buffer;
1127 unsigned long overruns = 0;
1128 int cpu;
1129
1130 /* if you care about this being correct, lock the buffer */
1131 for_each_buffer_cpu(buffer, cpu) {
1132 cpu_buffer = buffer->buffers[cpu];
1133 overruns += cpu_buffer->overrun;
1134 }
1135
1136 return overruns;
1137}
1138
1139/**
1140 * ring_buffer_iter_reset - reset an iterator
1141 * @iter: The iterator to reset
1142 *
1143 * Resets the iterator, so that it will start from the beginning
1144 * again.
1145 */
1146void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1147{
1148 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1149
1150 iter->head_page = cpu_buffer->head_page;
1151 iter->head = cpu_buffer->head;
1152 rb_reset_iter_read_page(iter);
1153}
1154
1155/**
1156 * ring_buffer_iter_empty - check if an iterator has no more to read
1157 * @iter: The iterator to check
1158 */
1159int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1160{
1161 struct ring_buffer_per_cpu *cpu_buffer;
1162
1163 cpu_buffer = iter->cpu_buffer;
1164
1165 return iter->head_page == cpu_buffer->tail_page &&
1166 iter->head == cpu_buffer->tail;
1167}
1168
1169static void
1170rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1171 struct ring_buffer_event *event)
1172{
1173 u64 delta;
1174
1175 switch (event->type) {
1176 case RINGBUF_TYPE_PADDING:
1177 return;
1178
1179 case RINGBUF_TYPE_TIME_EXTEND:
1180 delta = event->array[0];
1181 delta <<= TS_SHIFT;
1182 delta += event->time_delta;
1183 cpu_buffer->read_stamp += delta;
1184 return;
1185
1186 case RINGBUF_TYPE_TIME_STAMP:
1187 /* FIXME: not implemented */
1188 return;
1189
1190 case RINGBUF_TYPE_DATA:
1191 cpu_buffer->read_stamp += event->time_delta;
1192 return;
1193
1194 default:
1195 BUG();
1196 }
1197 return;
1198}
1199
1200static void
1201rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1202 struct ring_buffer_event *event)
1203{
1204 u64 delta;
1205
1206 switch (event->type) {
1207 case RINGBUF_TYPE_PADDING:
1208 return;
1209
1210 case RINGBUF_TYPE_TIME_EXTEND:
1211 delta = event->array[0];
1212 delta <<= TS_SHIFT;
1213 delta += event->time_delta;
1214 iter->read_stamp += delta;
1215 return;
1216
1217 case RINGBUF_TYPE_TIME_STAMP:
1218 /* FIXME: not implemented */
1219 return;
1220
1221 case RINGBUF_TYPE_DATA:
1222 iter->read_stamp += event->time_delta;
1223 return;
1224
1225 default:
1226 BUG();
1227 }
1228 return;
1229}
1230
1231static void rb_advance_head(struct ring_buffer_per_cpu *cpu_buffer)
1232{
1233 struct ring_buffer_event *event;
1234 unsigned length;
1235
1236 /*
1237 * Check if we are at the end of the buffer.
1238 */
1239 if (cpu_buffer->head >= cpu_buffer->head_page->size) {
1240 BUG_ON(cpu_buffer->head_page == cpu_buffer->tail_page);
1241 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1242 rb_reset_read_page(cpu_buffer);
1243 return;
1244 }
1245
1246 event = rb_head_event(cpu_buffer);
1247
1248 if (event->type == RINGBUF_TYPE_DATA)
1249 cpu_buffer->entries--;
1250
1251 length = rb_event_length(event);
1252
1253 /*
1254 * This should not be called to advance the header if we are
1255 * at the tail of the buffer.
1256 */
1257 BUG_ON((cpu_buffer->head_page == cpu_buffer->tail_page) &&
1258 (cpu_buffer->head + length > cpu_buffer->tail));
1259
1260 rb_update_read_stamp(cpu_buffer, event);
1261
1262 cpu_buffer->head += length;
1263
1264 /* check for end of page */
1265 if ((cpu_buffer->head >= cpu_buffer->head_page->size) &&
1266 (cpu_buffer->head_page != cpu_buffer->tail_page))
1267 rb_advance_head(cpu_buffer);
1268}
1269
1270static void rb_advance_iter(struct ring_buffer_iter *iter)
1271{
1272 struct ring_buffer *buffer;
1273 struct ring_buffer_per_cpu *cpu_buffer;
1274 struct ring_buffer_event *event;
1275 unsigned length;
1276
1277 cpu_buffer = iter->cpu_buffer;
1278 buffer = cpu_buffer->buffer;
1279
1280 /*
1281 * Check if we are at the end of the buffer.
1282 */
1283 if (iter->head >= iter->head_page->size) {
1284 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1285 rb_inc_page(cpu_buffer, &iter->head_page);
1286 rb_reset_iter_read_page(iter);
1287 return;
1288 }
1289
1290 event = rb_iter_head_event(iter);
1291
1292 length = rb_event_length(event);
1293
1294 /*
1295 * This should not be called to advance the header if we are
1296 * at the tail of the buffer.
1297 */
1298 BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1299 (iter->head + length > cpu_buffer->tail));
1300
1301 rb_update_iter_read_stamp(iter, event);
1302
1303 iter->head += length;
1304
1305 /* check for end of page padding */
1306 if ((iter->head >= iter->head_page->size) &&
1307 (iter->head_page != cpu_buffer->tail_page))
1308 rb_advance_iter(iter);
1309}
1310
1311/**
1312 * ring_buffer_peek - peek at the next event to be read
1313 * @buffer: The ring buffer to read
1314 * @cpu: The cpu to peak at
1315 * @ts: The timestamp counter of this event.
1316 *
1317 * This will return the event that will be read next, but does
1318 * not consume the data.
1319 */
1320struct ring_buffer_event *
1321ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1322{
1323 struct ring_buffer_per_cpu *cpu_buffer;
1324 struct ring_buffer_event *event;
1325
1326 if (!cpu_isset(cpu, buffer->cpumask))
1327 return NULL;
1328
1329 cpu_buffer = buffer->buffers[cpu];
1330
1331 again:
1332 if (rb_per_cpu_empty(cpu_buffer))
1333 return NULL;
1334
1335 event = rb_head_event(cpu_buffer);
1336
1337 switch (event->type) {
1338 case RINGBUF_TYPE_PADDING:
1339 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1340 rb_reset_read_page(cpu_buffer);
1341 goto again;
1342
1343 case RINGBUF_TYPE_TIME_EXTEND:
1344 /* Internal data, OK to advance */
1345 rb_advance_head(cpu_buffer);
1346 goto again;
1347
1348 case RINGBUF_TYPE_TIME_STAMP:
1349 /* FIXME: not implemented */
1350 rb_advance_head(cpu_buffer);
1351 goto again;
1352
1353 case RINGBUF_TYPE_DATA:
1354 if (ts) {
1355 *ts = cpu_buffer->read_stamp + event->time_delta;
1356 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1357 }
1358 return event;
1359
1360 default:
1361 BUG();
1362 }
1363
1364 return NULL;
1365}
1366
1367/**
1368 * ring_buffer_iter_peek - peek at the next event to be read
1369 * @iter: The ring buffer iterator
1370 * @ts: The timestamp counter of this event.
1371 *
1372 * This will return the event that will be read next, but does
1373 * not increment the iterator.
1374 */
1375struct ring_buffer_event *
1376ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1377{
1378 struct ring_buffer *buffer;
1379 struct ring_buffer_per_cpu *cpu_buffer;
1380 struct ring_buffer_event *event;
1381
1382 if (ring_buffer_iter_empty(iter))
1383 return NULL;
1384
1385 cpu_buffer = iter->cpu_buffer;
1386 buffer = cpu_buffer->buffer;
1387
1388 again:
1389 if (rb_per_cpu_empty(cpu_buffer))
1390 return NULL;
1391
1392 event = rb_iter_head_event(iter);
1393
1394 switch (event->type) {
1395 case RINGBUF_TYPE_PADDING:
1396 rb_inc_page(cpu_buffer, &iter->head_page);
1397 rb_reset_iter_read_page(iter);
1398 goto again;
1399
1400 case RINGBUF_TYPE_TIME_EXTEND:
1401 /* Internal data, OK to advance */
1402 rb_advance_iter(iter);
1403 goto again;
1404
1405 case RINGBUF_TYPE_TIME_STAMP:
1406 /* FIXME: not implemented */
1407 rb_advance_iter(iter);
1408 goto again;
1409
1410 case RINGBUF_TYPE_DATA:
1411 if (ts) {
1412 *ts = iter->read_stamp + event->time_delta;
1413 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1414 }
1415 return event;
1416
1417 default:
1418 BUG();
1419 }
1420
1421 return NULL;
1422}
1423
1424/**
1425 * ring_buffer_consume - return an event and consume it
1426 * @buffer: The ring buffer to get the next event from
1427 *
1428 * Returns the next event in the ring buffer, and that event is consumed.
1429 * Meaning, that sequential reads will keep returning a different event,
1430 * and eventually empty the ring buffer if the producer is slower.
1431 */
1432struct ring_buffer_event *
1433ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1434{
1435 struct ring_buffer_per_cpu *cpu_buffer;
1436 struct ring_buffer_event *event;
1437
1438 if (!cpu_isset(cpu, buffer->cpumask))
1439 return NULL;
1440
1441 event = ring_buffer_peek(buffer, cpu, ts);
1442 if (!event)
1443 return NULL;
1444
1445 cpu_buffer = buffer->buffers[cpu];
1446 rb_advance_head(cpu_buffer);
1447
1448 return event;
1449}
1450
1451/**
1452 * ring_buffer_read_start - start a non consuming read of the buffer
1453 * @buffer: The ring buffer to read from
1454 * @cpu: The cpu buffer to iterate over
1455 *
1456 * This starts up an iteration through the buffer. It also disables
1457 * the recording to the buffer until the reading is finished.
1458 * This prevents the reading from being corrupted. This is not
1459 * a consuming read, so a producer is not expected.
1460 *
1461 * Must be paired with ring_buffer_finish.
1462 */
1463struct ring_buffer_iter *
1464ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1465{
1466 struct ring_buffer_per_cpu *cpu_buffer;
1467 struct ring_buffer_iter *iter;
1468
1469 if (!cpu_isset(cpu, buffer->cpumask))
1470 return NULL;
1471
1472 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1473 if (!iter)
1474 return NULL;
1475
1476 cpu_buffer = buffer->buffers[cpu];
1477
1478 iter->cpu_buffer = cpu_buffer;
1479
1480 atomic_inc(&cpu_buffer->record_disabled);
1481 synchronize_sched();
1482
1483 spin_lock(&cpu_buffer->lock);
1484 iter->head = cpu_buffer->head;
1485 iter->head_page = cpu_buffer->head_page;
1486 rb_reset_iter_read_page(iter);
1487 spin_unlock(&cpu_buffer->lock);
1488
1489 return iter;
1490}
1491
1492/**
1493 * ring_buffer_finish - finish reading the iterator of the buffer
1494 * @iter: The iterator retrieved by ring_buffer_start
1495 *
1496 * This re-enables the recording to the buffer, and frees the
1497 * iterator.
1498 */
1499void
1500ring_buffer_read_finish(struct ring_buffer_iter *iter)
1501{
1502 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1503
1504 atomic_dec(&cpu_buffer->record_disabled);
1505 kfree(iter);
1506}
1507
1508/**
1509 * ring_buffer_read - read the next item in the ring buffer by the iterator
1510 * @iter: The ring buffer iterator
1511 * @ts: The time stamp of the event read.
1512 *
1513 * This reads the next event in the ring buffer and increments the iterator.
1514 */
1515struct ring_buffer_event *
1516ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1517{
1518 struct ring_buffer_event *event;
1519
1520 event = ring_buffer_iter_peek(iter, ts);
1521 if (!event)
1522 return NULL;
1523
1524 rb_advance_iter(iter);
1525
1526 return event;
1527}
1528
1529/**
1530 * ring_buffer_size - return the size of the ring buffer (in bytes)
1531 * @buffer: The ring buffer.
1532 */
1533unsigned long ring_buffer_size(struct ring_buffer *buffer)
1534{
1535 return BUF_PAGE_SIZE * buffer->pages;
1536}
1537
1538static void
1539rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1540{
1541 cpu_buffer->head_page
1542 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1543 cpu_buffer->tail_page
1544 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1545
1546 cpu_buffer->head = cpu_buffer->tail = 0;
1547 cpu_buffer->overrun = 0;
1548 cpu_buffer->entries = 0;
1549}
1550
1551/**
1552 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1553 * @buffer: The ring buffer to reset a per cpu buffer of
1554 * @cpu: The CPU buffer to be reset
1555 */
1556void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1557{
1558 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1559 unsigned long flags;
1560
1561 if (!cpu_isset(cpu, buffer->cpumask))
1562 return;
1563
1564 raw_local_irq_save(flags);
1565 spin_lock(&cpu_buffer->lock);
1566
1567 rb_reset_cpu(cpu_buffer);
1568
1569 spin_unlock(&cpu_buffer->lock);
1570 raw_local_irq_restore(flags);
1571}
1572
1573/**
1574 * ring_buffer_reset - reset a ring buffer
1575 * @buffer: The ring buffer to reset all cpu buffers
1576 */
1577void ring_buffer_reset(struct ring_buffer *buffer)
1578{
1579 unsigned long flags;
1580 int cpu;
1581
1582 ring_buffer_lock(buffer, &flags);
1583
1584 for_each_buffer_cpu(buffer, cpu)
1585 rb_reset_cpu(buffer->buffers[cpu]);
1586
1587 ring_buffer_unlock(buffer, flags);
1588}
1589
1590/**
1591 * rind_buffer_empty - is the ring buffer empty?
1592 * @buffer: The ring buffer to test
1593 */
1594int ring_buffer_empty(struct ring_buffer *buffer)
1595{
1596 struct ring_buffer_per_cpu *cpu_buffer;
1597 int cpu;
1598
1599 /* yes this is racy, but if you don't like the race, lock the buffer */
1600 for_each_buffer_cpu(buffer, cpu) {
1601 cpu_buffer = buffer->buffers[cpu];
1602 if (!rb_per_cpu_empty(cpu_buffer))
1603 return 0;
1604 }
1605 return 1;
1606}
1607
1608/**
1609 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1610 * @buffer: The ring buffer
1611 * @cpu: The CPU buffer to test
1612 */
1613int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1614{
1615 struct ring_buffer_per_cpu *cpu_buffer;
1616
1617 if (!cpu_isset(cpu, buffer->cpumask))
1618 return 1;
1619
1620 cpu_buffer = buffer->buffers[cpu];
1621 return rb_per_cpu_empty(cpu_buffer);
1622}
1623
1624/**
1625 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1626 * @buffer_a: One buffer to swap with
1627 * @buffer_b: The other buffer to swap with
1628 *
1629 * This function is useful for tracers that want to take a "snapshot"
1630 * of a CPU buffer and has another back up buffer lying around.
1631 * it is expected that the tracer handles the cpu buffer not being
1632 * used at the moment.
1633 */
1634int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1635 struct ring_buffer *buffer_b, int cpu)
1636{
1637 struct ring_buffer_per_cpu *cpu_buffer_a;
1638 struct ring_buffer_per_cpu *cpu_buffer_b;
1639
1640 if (!cpu_isset(cpu, buffer_a->cpumask) ||
1641 !cpu_isset(cpu, buffer_b->cpumask))
1642 return -EINVAL;
1643
1644 /* At least make sure the two buffers are somewhat the same */
1645 if (buffer_a->size != buffer_b->size ||
1646 buffer_a->pages != buffer_b->pages)
1647 return -EINVAL;
1648
1649 cpu_buffer_a = buffer_a->buffers[cpu];
1650 cpu_buffer_b = buffer_b->buffers[cpu];
1651
1652 /*
1653 * We can't do a synchronize_sched here because this
1654 * function can be called in atomic context.
1655 * Normally this will be called from the same CPU as cpu.
1656 * If not it's up to the caller to protect this.
1657 */
1658 atomic_inc(&cpu_buffer_a->record_disabled);
1659 atomic_inc(&cpu_buffer_b->record_disabled);
1660
1661 buffer_a->buffers[cpu] = cpu_buffer_b;
1662 buffer_b->buffers[cpu] = cpu_buffer_a;
1663
1664 cpu_buffer_b->buffer = buffer_a;
1665 cpu_buffer_a->buffer = buffer_b;
1666
1667 atomic_dec(&cpu_buffer_a->record_disabled);
1668 atomic_dec(&cpu_buffer_b->record_disabled);
1669
1670 return 0;
1671}
1672