aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSteven Rostedt <srostedt@redhat.com>2010-03-31 13:21:56 -0400
committerSteven Rostedt <rostedt@goodmis.org>2010-03-31 22:57:04 -0400
commit66a8cb95ed04025664d1db4e952155ee1dccd048 (patch)
tree74417422a78bc8198de46b0e52e490175af866e0
parenteb0c53771fb2f5f66b0edb3ebce33be4bbf1c285 (diff)
ring-buffer: Add place holder recording of dropped events
Currently, when the ring buffer drops events, it does not record the fact that it did so. It does inform the writer that the event was dropped by returning a NULL event, but it does not put in any place holder where the event was dropped. This is not a trivial thing to add because the ring buffer mostly runs in overwrite (flight recorder) mode. That is, when the ring buffer is full, new data will overwrite old data. In a produce/consumer mode, where new data is simply dropped when the ring buffer is full, it is trivial to add the placeholder for dropped events. When there's more room to write new data, then a special event can be added to notify the reader about the dropped events. But in overwrite mode, any new write can overwrite events. A place holder can not be inserted into the ring buffer since there never may be room. A reader could also come in at anytime and miss the placeholder. Luckily, the way the ring buffer works, the read side can find out if events were lost or not, and how many events. Everytime a write takes place, if it overwrites the header page (the next read) it updates a "overrun" variable that keeps track of the number of lost events. When a reader swaps out a page from the ring buffer, it can record this number, perfom the swap, and then check to see if the number changed, and take the diff if it has, which would be the number of events dropped. This can be stored by the reader and returned to callers of the reader. Since the reader page swap will fail if the writer moved the head page since the time the reader page set up the swap, this gives room to record the overruns without worrying about races. If the reader sets up the pages, records the overrun, than performs the swap, if the swap succeeds, then the overrun variable has not been updated since the setup before the swap. For binary readers of the ring buffer, a flag is set in the header of each sub page (sub buffer) of the ring buffer. This flag is embedded in the size field of the data on the sub buffer, in the 31st bit (the size can be 32 or 64 bits depending on the architecture), but only 27 bits needs to be used for the actual size (less actually). We could add a new field in the sub buffer header to also record the number of events dropped since the last read, but this will change the format of the binary ring buffer a bit too much. Perhaps this change can be made if the information on the number of events dropped is considered important enough. Note, the notification of dropped events is only used by consuming reads or peeking at the ring buffer. Iterating over the ring buffer does not keep this information because the necessary data is only available when a page swap is made, and the iterator does not swap out pages. Cc: Robert Richter <robert.richter@amd.com> Cc: Andi Kleen <andi@firstfloor.org> Cc: Li Zefan <lizf@cn.fujitsu.com> Cc: Arnaldo Carvalho de Melo <acme@redhat.com> Cc: "Luis Claudio R. Goncalves" <lclaudio@uudg.org> Cc: Frederic Weisbecker <fweisbec@gmail.com> Signed-off-by: Steven Rostedt <rostedt@goodmis.org>
-rw-r--r--drivers/oprofile/cpu_buffer.c4
-rw-r--r--include/linux/ring_buffer.h6
-rw-r--r--kernel/trace/ring_buffer.c72
-rw-r--r--kernel/trace/ring_buffer_benchmark.c2
-rw-r--r--kernel/trace/trace.c4
-rw-r--r--kernel/trace/trace_functions_graph.c5
-rw-r--r--kernel/trace/trace_selftest.c2
7 files changed, 79 insertions, 16 deletions
diff --git a/drivers/oprofile/cpu_buffer.c b/drivers/oprofile/cpu_buffer.c
index 166b67ea622f..7581dbe456da 100644
--- a/drivers/oprofile/cpu_buffer.c
+++ b/drivers/oprofile/cpu_buffer.c
@@ -186,14 +186,14 @@ int op_cpu_buffer_write_commit(struct op_entry *entry)
186struct op_sample *op_cpu_buffer_read_entry(struct op_entry *entry, int cpu) 186struct op_sample *op_cpu_buffer_read_entry(struct op_entry *entry, int cpu)
187{ 187{
188 struct ring_buffer_event *e; 188 struct ring_buffer_event *e;
189 e = ring_buffer_consume(op_ring_buffer_read, cpu, NULL); 189 e = ring_buffer_consume(op_ring_buffer_read, cpu, NULL, NULL);
190 if (e) 190 if (e)
191 goto event; 191 goto event;
192 if (ring_buffer_swap_cpu(op_ring_buffer_read, 192 if (ring_buffer_swap_cpu(op_ring_buffer_read,
193 op_ring_buffer_write, 193 op_ring_buffer_write,
194 cpu)) 194 cpu))
195 return NULL; 195 return NULL;
196 e = ring_buffer_consume(op_ring_buffer_read, cpu, NULL); 196 e = ring_buffer_consume(op_ring_buffer_read, cpu, NULL, NULL);
197 if (e) 197 if (e)
198 goto event; 198 goto event;
199 return NULL; 199 return NULL;
diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index 5fcc31ed5771..c8297761e414 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -120,9 +120,11 @@ int ring_buffer_write(struct ring_buffer *buffer,
120 unsigned long length, void *data); 120 unsigned long length, void *data);
121 121
122struct ring_buffer_event * 122struct ring_buffer_event *
123ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts); 123ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
124 unsigned long *lost_events);
124struct ring_buffer_event * 125struct ring_buffer_event *
125ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts); 126ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
127 unsigned long *lost_events);
126 128
127struct ring_buffer_iter * 129struct ring_buffer_iter *
128ring_buffer_read_start(struct ring_buffer *buffer, int cpu); 130ring_buffer_read_start(struct ring_buffer *buffer, int cpu);
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index d1187ef20caf..8295650444c5 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -318,6 +318,9 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_data);
318#define TS_MASK ((1ULL << TS_SHIFT) - 1) 318#define TS_MASK ((1ULL << TS_SHIFT) - 1)
319#define TS_DELTA_TEST (~TS_MASK) 319#define TS_DELTA_TEST (~TS_MASK)
320 320
321/* Flag when events were overwritten */
322#define RB_MISSED_EVENTS (1 << 31)
323
321struct buffer_data_page { 324struct buffer_data_page {
322 u64 time_stamp; /* page time stamp */ 325 u64 time_stamp; /* page time stamp */
323 local_t commit; /* write committed index */ 326 local_t commit; /* write committed index */
@@ -416,6 +419,12 @@ int ring_buffer_print_page_header(struct trace_seq *s)
416 (unsigned int)sizeof(field.commit), 419 (unsigned int)sizeof(field.commit),
417 (unsigned int)is_signed_type(long)); 420 (unsigned int)is_signed_type(long));
418 421
422 ret = trace_seq_printf(s, "\tfield: int overwrite;\t"
423 "offset:%u;\tsize:%u;\tsigned:%u;\n",
424 (unsigned int)offsetof(typeof(field), commit),
425 1,
426 (unsigned int)is_signed_type(long));
427
419 ret = trace_seq_printf(s, "\tfield: char data;\t" 428 ret = trace_seq_printf(s, "\tfield: char data;\t"
420 "offset:%u;\tsize:%u;\tsigned:%u;\n", 429 "offset:%u;\tsize:%u;\tsigned:%u;\n",
421 (unsigned int)offsetof(typeof(field), data), 430 (unsigned int)offsetof(typeof(field), data),
@@ -439,6 +448,8 @@ struct ring_buffer_per_cpu {
439 struct buffer_page *tail_page; /* write to tail */ 448 struct buffer_page *tail_page; /* write to tail */
440 struct buffer_page *commit_page; /* committed pages */ 449 struct buffer_page *commit_page; /* committed pages */
441 struct buffer_page *reader_page; 450 struct buffer_page *reader_page;
451 unsigned long lost_events;
452 unsigned long last_overrun;
442 local_t commit_overrun; 453 local_t commit_overrun;
443 local_t overrun; 454 local_t overrun;
444 local_t entries; 455 local_t entries;
@@ -2835,6 +2846,7 @@ static struct buffer_page *
2835rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2846rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
2836{ 2847{
2837 struct buffer_page *reader = NULL; 2848 struct buffer_page *reader = NULL;
2849 unsigned long overwrite;
2838 unsigned long flags; 2850 unsigned long flags;
2839 int nr_loops = 0; 2851 int nr_loops = 0;
2840 int ret; 2852 int ret;
@@ -2896,6 +2908,18 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
2896 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 2908 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
2897 2909
2898 /* 2910 /*
2911 * We want to make sure we read the overruns after we set up our
2912 * pointers to the next object. The writer side does a
2913 * cmpxchg to cross pages which acts as the mb on the writer
2914 * side. Note, the reader will constantly fail the swap
2915 * while the writer is updating the pointers, so this
2916 * guarantees that the overwrite recorded here is the one we
2917 * want to compare with the last_overrun.
2918 */
2919 smp_mb();
2920 overwrite = local_read(&(cpu_buffer->overrun));
2921
2922 /*
2899 * Here's the tricky part. 2923 * Here's the tricky part.
2900 * 2924 *
2901 * We need to move the pointer past the header page. 2925 * We need to move the pointer past the header page.
@@ -2926,6 +2950,11 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
2926 cpu_buffer->reader_page = reader; 2950 cpu_buffer->reader_page = reader;
2927 rb_reset_reader_page(cpu_buffer); 2951 rb_reset_reader_page(cpu_buffer);
2928 2952
2953 if (overwrite != cpu_buffer->last_overrun) {
2954 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
2955 cpu_buffer->last_overrun = overwrite;
2956 }
2957
2929 goto again; 2958 goto again;
2930 2959
2931 out: 2960 out:
@@ -3002,8 +3031,14 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)
3002 rb_advance_iter(iter); 3031 rb_advance_iter(iter);
3003} 3032}
3004 3033
3034static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
3035{
3036 return cpu_buffer->lost_events;
3037}
3038
3005static struct ring_buffer_event * 3039static struct ring_buffer_event *
3006rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts) 3040rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
3041 unsigned long *lost_events)
3007{ 3042{
3008 struct ring_buffer_event *event; 3043 struct ring_buffer_event *event;
3009 struct buffer_page *reader; 3044 struct buffer_page *reader;
@@ -3055,6 +3090,8 @@ rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
3055 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 3090 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
3056 cpu_buffer->cpu, ts); 3091 cpu_buffer->cpu, ts);
3057 } 3092 }
3093 if (lost_events)
3094 *lost_events = rb_lost_events(cpu_buffer);
3058 return event; 3095 return event;
3059 3096
3060 default: 3097 default:
@@ -3165,12 +3202,14 @@ static inline int rb_ok_to_lock(void)
3165 * @buffer: The ring buffer to read 3202 * @buffer: The ring buffer to read
3166 * @cpu: The cpu to peak at 3203 * @cpu: The cpu to peak at
3167 * @ts: The timestamp counter of this event. 3204 * @ts: The timestamp counter of this event.
3205 * @lost_events: a variable to store if events were lost (may be NULL)
3168 * 3206 *
3169 * This will return the event that will be read next, but does 3207 * This will return the event that will be read next, but does
3170 * not consume the data. 3208 * not consume the data.
3171 */ 3209 */
3172struct ring_buffer_event * 3210struct ring_buffer_event *
3173ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 3211ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
3212 unsigned long *lost_events)
3174{ 3213{
3175 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3214 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
3176 struct ring_buffer_event *event; 3215 struct ring_buffer_event *event;
@@ -3185,7 +3224,7 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
3185 local_irq_save(flags); 3224 local_irq_save(flags);
3186 if (dolock) 3225 if (dolock)
3187 spin_lock(&cpu_buffer->reader_lock); 3226 spin_lock(&cpu_buffer->reader_lock);
3188 event = rb_buffer_peek(cpu_buffer, ts); 3227 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
3189 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3228 if (event && event->type_len == RINGBUF_TYPE_PADDING)
3190 rb_advance_reader(cpu_buffer); 3229 rb_advance_reader(cpu_buffer);
3191 if (dolock) 3230 if (dolock)
@@ -3227,13 +3266,17 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
3227/** 3266/**
3228 * ring_buffer_consume - return an event and consume it 3267 * ring_buffer_consume - return an event and consume it
3229 * @buffer: The ring buffer to get the next event from 3268 * @buffer: The ring buffer to get the next event from
3269 * @cpu: the cpu to read the buffer from
3270 * @ts: a variable to store the timestamp (may be NULL)
3271 * @lost_events: a variable to store if events were lost (may be NULL)
3230 * 3272 *
3231 * Returns the next event in the ring buffer, and that event is consumed. 3273 * Returns the next event in the ring buffer, and that event is consumed.
3232 * Meaning, that sequential reads will keep returning a different event, 3274 * Meaning, that sequential reads will keep returning a different event,
3233 * and eventually empty the ring buffer if the producer is slower. 3275 * and eventually empty the ring buffer if the producer is slower.
3234 */ 3276 */
3235struct ring_buffer_event * 3277struct ring_buffer_event *
3236ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 3278ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
3279 unsigned long *lost_events)
3237{ 3280{
3238 struct ring_buffer_per_cpu *cpu_buffer; 3281 struct ring_buffer_per_cpu *cpu_buffer;
3239 struct ring_buffer_event *event = NULL; 3282 struct ring_buffer_event *event = NULL;
@@ -3254,9 +3297,11 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
3254 if (dolock) 3297 if (dolock)
3255 spin_lock(&cpu_buffer->reader_lock); 3298 spin_lock(&cpu_buffer->reader_lock);
3256 3299
3257 event = rb_buffer_peek(cpu_buffer, ts); 3300 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
3258 if (event) 3301 if (event) {
3302 cpu_buffer->lost_events = 0;
3259 rb_advance_reader(cpu_buffer); 3303 rb_advance_reader(cpu_buffer);
3304 }
3260 3305
3261 if (dolock) 3306 if (dolock)
3262 spin_unlock(&cpu_buffer->reader_lock); 3307 spin_unlock(&cpu_buffer->reader_lock);
@@ -3405,6 +3450,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
3405 cpu_buffer->write_stamp = 0; 3450 cpu_buffer->write_stamp = 0;
3406 cpu_buffer->read_stamp = 0; 3451 cpu_buffer->read_stamp = 0;
3407 3452
3453 cpu_buffer->lost_events = 0;
3454 cpu_buffer->last_overrun = 0;
3455
3408 rb_head_page_activate(cpu_buffer); 3456 rb_head_page_activate(cpu_buffer);
3409} 3457}
3410 3458
@@ -3684,6 +3732,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
3684 unsigned int commit; 3732 unsigned int commit;
3685 unsigned int read; 3733 unsigned int read;
3686 u64 save_timestamp; 3734 u64 save_timestamp;
3735 int missed_events = 0;
3687 int ret = -1; 3736 int ret = -1;
3688 3737
3689 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3738 if (!cpumask_test_cpu(cpu, buffer->cpumask))
@@ -3716,6 +3765,10 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
3716 read = reader->read; 3765 read = reader->read;
3717 commit = rb_page_commit(reader); 3766 commit = rb_page_commit(reader);
3718 3767
3768 /* Check if any events were dropped */
3769 if (cpu_buffer->lost_events)
3770 missed_events = 1;
3771
3719 /* 3772 /*
3720 * If this page has been partially read or 3773 * If this page has been partially read or
3721 * if len is not big enough to read the rest of the page or 3774 * if len is not big enough to read the rest of the page or
@@ -3779,6 +3832,13 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
3779 } 3832 }
3780 ret = read; 3833 ret = read;
3781 3834
3835 cpu_buffer->lost_events = 0;
3836 /*
3837 * Set a flag in the commit field if we lost events
3838 */
3839 if (missed_events)
3840 local_add(RB_MISSED_EVENTS, &bpage->commit);
3841
3782 out_unlock: 3842 out_unlock:
3783 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3843 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3784 3844
diff --git a/kernel/trace/ring_buffer_benchmark.c b/kernel/trace/ring_buffer_benchmark.c
index df74c7982255..dc56556b55a2 100644
--- a/kernel/trace/ring_buffer_benchmark.c
+++ b/kernel/trace/ring_buffer_benchmark.c
@@ -81,7 +81,7 @@ static enum event_status read_event(int cpu)
81 int *entry; 81 int *entry;
82 u64 ts; 82 u64 ts;
83 83
84 event = ring_buffer_consume(buffer, cpu, &ts); 84 event = ring_buffer_consume(buffer, cpu, &ts, NULL);
85 if (!event) 85 if (!event)
86 return EVENT_DROPPED; 86 return EVENT_DROPPED;
87 87
diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index 3ec2ee6f6560..fabb0033a9be 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -1556,7 +1556,7 @@ peek_next_entry(struct trace_iterator *iter, int cpu, u64 *ts)
1556 if (buf_iter) 1556 if (buf_iter)
1557 event = ring_buffer_iter_peek(buf_iter, ts); 1557 event = ring_buffer_iter_peek(buf_iter, ts);
1558 else 1558 else
1559 event = ring_buffer_peek(iter->tr->buffer, cpu, ts); 1559 event = ring_buffer_peek(iter->tr->buffer, cpu, ts, NULL);
1560 1560
1561 ftrace_enable_cpu(); 1561 ftrace_enable_cpu();
1562 1562
@@ -1635,7 +1635,7 @@ static void trace_consume(struct trace_iterator *iter)
1635{ 1635{
1636 /* Don't allow ftrace to trace into the ring buffers */ 1636 /* Don't allow ftrace to trace into the ring buffers */
1637 ftrace_disable_cpu(); 1637 ftrace_disable_cpu();
1638 ring_buffer_consume(iter->tr->buffer, iter->cpu, &iter->ts); 1638 ring_buffer_consume(iter->tr->buffer, iter->cpu, &iter->ts, NULL);
1639 ftrace_enable_cpu(); 1639 ftrace_enable_cpu();
1640} 1640}
1641 1641
diff --git a/kernel/trace/trace_functions_graph.c b/kernel/trace/trace_functions_graph.c
index e6989d9b44da..a7f75fb10aa4 100644
--- a/kernel/trace/trace_functions_graph.c
+++ b/kernel/trace/trace_functions_graph.c
@@ -489,9 +489,10 @@ get_return_for_leaf(struct trace_iterator *iter,
489 * We need to consume the current entry to see 489 * We need to consume the current entry to see
490 * the next one. 490 * the next one.
491 */ 491 */
492 ring_buffer_consume(iter->tr->buffer, iter->cpu, NULL); 492 ring_buffer_consume(iter->tr->buffer, iter->cpu,
493 NULL, NULL);
493 event = ring_buffer_peek(iter->tr->buffer, iter->cpu, 494 event = ring_buffer_peek(iter->tr->buffer, iter->cpu,
494 NULL); 495 NULL, NULL);
495 } 496 }
496 497
497 if (!event) 498 if (!event)
diff --git a/kernel/trace/trace_selftest.c b/kernel/trace/trace_selftest.c
index 280fea470d67..e50180874c63 100644
--- a/kernel/trace/trace_selftest.c
+++ b/kernel/trace/trace_selftest.c
@@ -29,7 +29,7 @@ static int trace_test_buffer_cpu(struct trace_array *tr, int cpu)
29 struct trace_entry *entry; 29 struct trace_entry *entry;
30 unsigned int loops = 0; 30 unsigned int loops = 0;
31 31
32 while ((event = ring_buffer_consume(tr->buffer, cpu, NULL))) { 32 while ((event = ring_buffer_consume(tr->buffer, cpu, NULL, NULL))) {
33 entry = ring_buffer_event_data(event); 33 entry = ring_buffer_event_data(event);
34 34
35 /* 35 /*