aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSteven Rostedt <srostedt@redhat.com>2009-06-16 12:37:57 -0400
committerSteven Rostedt <rostedt@goodmis.org>2009-06-16 16:25:33 -0400
commitfa7439531dee58277748c819785a44d3203c4b51 (patch)
treeb867f2e8ff7702d107a2e6dc08c24e451d170a80
parent263294f3e1e883b9dcbf0c09a54b644918f7729d (diff)
ring-buffer: use commit counters for commit pointer accounting
The ring buffer is made up of three sets of pointers. The head page pointer, which points to the next page for the reader to get. The commit pointer and commit index, which points to the page and index of the last committed write respectively. The tail pointer and tail index, which points to the page and the index of the last reserved data respectively (non committed). The commit pointer is only moved forward by the outer most writer. If a nested writer comes in, it will not move the pointer forward. The current implementation has a flaw. It assumes that the outer most writer successfully reserved data. There's a small race window where the outer most writer could find the tail pointer, but a nested writer could come in (via interrupt) and move the tail forward, and even the commit forward. The outer writer would not realized the commit moved forward and the accounting will break. This patch changes the design to use counters in the per cpu buffers to keep track of commits. The counters are incremented at the start of the commit, and decremented at the end. If the end commit counter is 1, then it moves the commit pointers. A loop is made to check for races between checking and moving the commit pointers. Only the outer commit should move the pointers anyway. The test of knowing if a reserve is equal to the last commit update is still needed to know for time keeping. The time code is much less racey than the commit updates. This change not only solves the mentioned race, but also makes the code simpler. [ Impact: fix commit race and simplify code ] Signed-off-by: Steven Rostedt <rostedt@goodmis.org>
-rw-r--r--kernel/trace/ring_buffer.c154
1 files changed, 75 insertions, 79 deletions
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index e857e9443987..ed3559944fcf 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -415,6 +415,8 @@ struct ring_buffer_per_cpu {
415 unsigned long overrun; 415 unsigned long overrun;
416 unsigned long read; 416 unsigned long read;
417 local_t entries; 417 local_t entries;
418 local_t committing;
419 local_t commits;
418 u64 write_stamp; 420 u64 write_stamp;
419 u64 read_stamp; 421 u64 read_stamp;
420 atomic_t record_disabled; 422 atomic_t record_disabled;
@@ -1015,8 +1017,8 @@ rb_event_index(struct ring_buffer_event *event)
1015} 1017}
1016 1018
1017static inline int 1019static inline int
1018rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 1020rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
1019 struct ring_buffer_event *event) 1021 struct ring_buffer_event *event)
1020{ 1022{
1021 unsigned long addr = (unsigned long)event; 1023 unsigned long addr = (unsigned long)event;
1022 unsigned long index; 1024 unsigned long index;
@@ -1029,31 +1031,6 @@ rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
1029} 1031}
1030 1032
1031static void 1033static void
1032rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
1033 struct ring_buffer_event *event)
1034{
1035 unsigned long addr = (unsigned long)event;
1036 unsigned long index;
1037
1038 index = rb_event_index(event);
1039 addr &= PAGE_MASK;
1040
1041 while (cpu_buffer->commit_page->page != (void *)addr) {
1042 if (RB_WARN_ON(cpu_buffer,
1043 cpu_buffer->commit_page == cpu_buffer->tail_page))
1044 return;
1045 cpu_buffer->commit_page->page->commit =
1046 cpu_buffer->commit_page->write;
1047 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
1048 cpu_buffer->write_stamp =
1049 cpu_buffer->commit_page->page->time_stamp;
1050 }
1051
1052 /* Now set the commit to the event's index */
1053 local_set(&cpu_buffer->commit_page->page->commit, index);
1054}
1055
1056static void
1057rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1034rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
1058{ 1035{
1059 /* 1036 /*
@@ -1319,15 +1296,6 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
1319 1296
1320 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1297 rb_reset_tail(cpu_buffer, tail_page, tail, length);
1321 1298
1322 /*
1323 * If this was a commit entry that failed,
1324 * increment that too
1325 */
1326 if (tail_page == cpu_buffer->commit_page &&
1327 tail == rb_commit_index(cpu_buffer)) {
1328 rb_set_commit_to_write(cpu_buffer);
1329 }
1330
1331 __raw_spin_unlock(&cpu_buffer->lock); 1299 __raw_spin_unlock(&cpu_buffer->lock);
1332 local_irq_restore(flags); 1300 local_irq_restore(flags);
1333 1301
@@ -1377,11 +1345,11 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
1377 local_inc(&tail_page->entries); 1345 local_inc(&tail_page->entries);
1378 1346
1379 /* 1347 /*
1380 * If this is a commit and the tail is zero, then update 1348 * If this is the first commit on the page, then update
1381 * this page's time stamp. 1349 * its timestamp.
1382 */ 1350 */
1383 if (!tail && rb_is_commit(cpu_buffer, event)) 1351 if (!tail)
1384 cpu_buffer->commit_page->page->time_stamp = *ts; 1352 tail_page->page->time_stamp = *ts;
1385 1353
1386 return event; 1354 return event;
1387} 1355}
@@ -1450,16 +1418,16 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1450 return -EAGAIN; 1418 return -EAGAIN;
1451 1419
1452 /* Only a commited time event can update the write stamp */ 1420 /* Only a commited time event can update the write stamp */
1453 if (rb_is_commit(cpu_buffer, event)) { 1421 if (rb_event_is_commit(cpu_buffer, event)) {
1454 /* 1422 /*
1455 * If this is the first on the page, then we need to 1423 * If this is the first on the page, then it was
1456 * update the page itself, and just put in a zero. 1424 * updated with the page itself. Try to discard it
1425 * and if we can't just make it zero.
1457 */ 1426 */
1458 if (rb_event_index(event)) { 1427 if (rb_event_index(event)) {
1459 event->time_delta = *delta & TS_MASK; 1428 event->time_delta = *delta & TS_MASK;
1460 event->array[0] = *delta >> TS_SHIFT; 1429 event->array[0] = *delta >> TS_SHIFT;
1461 } else { 1430 } else {
1462 cpu_buffer->commit_page->page->time_stamp = *ts;
1463 /* try to discard, since we do not need this */ 1431 /* try to discard, since we do not need this */
1464 if (!rb_try_to_discard(cpu_buffer, event)) { 1432 if (!rb_try_to_discard(cpu_buffer, event)) {
1465 /* nope, just zero it */ 1433 /* nope, just zero it */
@@ -1485,6 +1453,44 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1485 return ret; 1453 return ret;
1486} 1454}
1487 1455
1456static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
1457{
1458 local_inc(&cpu_buffer->committing);
1459 local_inc(&cpu_buffer->commits);
1460}
1461
1462static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
1463{
1464 unsigned long commits;
1465
1466 if (RB_WARN_ON(cpu_buffer,
1467 !local_read(&cpu_buffer->committing)))
1468 return;
1469
1470 again:
1471 commits = local_read(&cpu_buffer->commits);
1472 /* synchronize with interrupts */
1473 barrier();
1474 if (local_read(&cpu_buffer->committing) == 1)
1475 rb_set_commit_to_write(cpu_buffer);
1476
1477 local_dec(&cpu_buffer->committing);
1478
1479 /* synchronize with interrupts */
1480 barrier();
1481
1482 /*
1483 * Need to account for interrupts coming in between the
1484 * updating of the commit page and the clearing of the
1485 * committing counter.
1486 */
1487 if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
1488 !local_read(&cpu_buffer->committing)) {
1489 local_inc(&cpu_buffer->committing);
1490 goto again;
1491 }
1492}
1493
1488static struct ring_buffer_event * 1494static struct ring_buffer_event *
1489rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, 1495rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1490 unsigned long length) 1496 unsigned long length)
@@ -1494,6 +1500,8 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1494 int commit = 0; 1500 int commit = 0;
1495 int nr_loops = 0; 1501 int nr_loops = 0;
1496 1502
1503 rb_start_commit(cpu_buffer);
1504
1497 length = rb_calculate_event_length(length); 1505 length = rb_calculate_event_length(length);
1498 again: 1506 again:
1499 /* 1507 /*
@@ -1506,7 +1514,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1506 * Bail! 1514 * Bail!
1507 */ 1515 */
1508 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 1516 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
1509 return NULL; 1517 goto out_fail;
1510 1518
1511 ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu); 1519 ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu);
1512 1520
@@ -1537,7 +1545,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1537 1545
1538 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 1546 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
1539 if (commit == -EBUSY) 1547 if (commit == -EBUSY)
1540 return NULL; 1548 goto out_fail;
1541 1549
1542 if (commit == -EAGAIN) 1550 if (commit == -EAGAIN)
1543 goto again; 1551 goto again;
@@ -1551,28 +1559,19 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1551 if (unlikely(PTR_ERR(event) == -EAGAIN)) 1559 if (unlikely(PTR_ERR(event) == -EAGAIN))
1552 goto again; 1560 goto again;
1553 1561
1554 if (!event) { 1562 if (!event)
1555 if (unlikely(commit)) 1563 goto out_fail;
1556 /*
1557 * Ouch! We needed a timestamp and it was commited. But
1558 * we didn't get our event reserved.
1559 */
1560 rb_set_commit_to_write(cpu_buffer);
1561 return NULL;
1562 }
1563 1564
1564 /* 1565 if (!rb_event_is_commit(cpu_buffer, event))
1565 * If the timestamp was commited, make the commit our entry
1566 * now so that we will update it when needed.
1567 */
1568 if (unlikely(commit))
1569 rb_set_commit_event(cpu_buffer, event);
1570 else if (!rb_is_commit(cpu_buffer, event))
1571 delta = 0; 1566 delta = 0;
1572 1567
1573 event->time_delta = delta; 1568 event->time_delta = delta;
1574 1569
1575 return event; 1570 return event;
1571
1572 out_fail:
1573 rb_end_commit(cpu_buffer);
1574 return NULL;
1576} 1575}
1577 1576
1578#define TRACE_RECURSIVE_DEPTH 16 1577#define TRACE_RECURSIVE_DEPTH 16
@@ -1682,13 +1681,14 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
1682{ 1681{
1683 local_inc(&cpu_buffer->entries); 1682 local_inc(&cpu_buffer->entries);
1684 1683
1685 /* Only process further if we own the commit */ 1684 /*
1686 if (!rb_is_commit(cpu_buffer, event)) 1685 * The event first in the commit queue updates the
1687 return; 1686 * time stamp.
1688 1687 */
1689 cpu_buffer->write_stamp += event->time_delta; 1688 if (rb_event_is_commit(cpu_buffer, event))
1689 cpu_buffer->write_stamp += event->time_delta;
1690 1690
1691 rb_set_commit_to_write(cpu_buffer); 1691 rb_end_commit(cpu_buffer);
1692} 1692}
1693 1693
1694/** 1694/**
@@ -1777,15 +1777,15 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer,
1777 /* The event is discarded regardless */ 1777 /* The event is discarded regardless */
1778 rb_event_discard(event); 1778 rb_event_discard(event);
1779 1779
1780 cpu = smp_processor_id();
1781 cpu_buffer = buffer->buffers[cpu];
1782
1780 /* 1783 /*
1781 * This must only be called if the event has not been 1784 * This must only be called if the event has not been
1782 * committed yet. Thus we can assume that preemption 1785 * committed yet. Thus we can assume that preemption
1783 * is still disabled. 1786 * is still disabled.
1784 */ 1787 */
1785 RB_WARN_ON(buffer, preemptible()); 1788 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
1786
1787 cpu = smp_processor_id();
1788 cpu_buffer = buffer->buffers[cpu];
1789 1789
1790 if (!rb_try_to_discard(cpu_buffer, event)) 1790 if (!rb_try_to_discard(cpu_buffer, event))
1791 goto out; 1791 goto out;
@@ -1796,13 +1796,7 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer,
1796 */ 1796 */
1797 local_inc(&cpu_buffer->entries); 1797 local_inc(&cpu_buffer->entries);
1798 out: 1798 out:
1799 /* 1799 rb_end_commit(cpu_buffer);
1800 * If a write came in and pushed the tail page
1801 * we still need to update the commit pointer
1802 * if we were the commit.
1803 */
1804 if (rb_is_commit(cpu_buffer, event))
1805 rb_set_commit_to_write(cpu_buffer);
1806 1800
1807 trace_recursive_unlock(); 1801 trace_recursive_unlock();
1808 1802
@@ -2720,6 +2714,8 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2720 cpu_buffer->overrun = 0; 2714 cpu_buffer->overrun = 0;
2721 cpu_buffer->read = 0; 2715 cpu_buffer->read = 0;
2722 local_set(&cpu_buffer->entries, 0); 2716 local_set(&cpu_buffer->entries, 0);
2717 local_set(&cpu_buffer->committing, 0);
2718 local_set(&cpu_buffer->commits, 0);
2723 2719
2724 cpu_buffer->write_stamp = 0; 2720 cpu_buffer->write_stamp = 0;
2725 cpu_buffer->read_stamp = 0; 2721 cpu_buffer->read_stamp = 0;