aboutsummaryrefslogtreecommitdiffstats
path: root/ipc
diff options
context:
space:
mode:
authorDavidlohr Bueso <dave@stgolabs.net>2015-05-04 10:02:46 -0400
committerIngo Molnar <mingo@kernel.org>2015-05-08 06:23:07 -0400
commitfa6004ad4528153b699a4d5ce5ea6b33acce74cc (patch)
treeeda96bac494cc6e9071e14b2d5741fa8f29958ea /ipc
parent1d0dcb3ad9d336e6d6ee020a750a7f8d907e28de (diff)
ipc/mqueue: Implement lockless pipelined wakeups
This patch moves the wakeup_process() invocation so it is not done under the info->lock by making use of a lockless wake_q. With this change, the waiter is woken up once it is STATE_READY and it does not need to loop on SMP if it is still in STATE_PENDING. In the timeout case we still need to grab the info->lock to verify the state. This change should also avoid the introduction of preempt_disable() in -rt which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY change if the waiter has a higher priority compared to the waker. Additionally, this patch micro-optimizes wq_sleep by using the cheaper cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no matter what, thus get rid of the implied barrier. Signed-off-by: Davidlohr Bueso <dbueso@suse.de> Signed-off-by: Peter Zijlstra (Intel) <peterz@infradead.org> Acked-by: George Spelvin <linux@horizon.com> Acked-by: Thomas Gleixner <tglx@linutronix.de> Cc: Andrew Morton <akpm@linux-foundation.org> Cc: Borislav Petkov <bp@alien8.de> Cc: Chris Mason <clm@fb.com> Cc: H. Peter Anvin <hpa@zytor.com> Cc: Linus Torvalds <torvalds@linux-foundation.org> Cc: Manfred Spraul <manfred@colorfullife.com> Cc: Peter Zijlstra <peterz@infradead.org> Cc: Sebastian Andrzej Siewior <bigeasy@linutronix.de> Cc: Steven Rostedt <rostedt@goodmis.org> Cc: dave@stgolabs.net Link: http://lkml.kernel.org/r/1430748166.1940.17.camel@stgolabs.net Signed-off-by: Ingo Molnar <mingo@kernel.org>
Diffstat (limited to 'ipc')
-rw-r--r--ipc/mqueue.c54
1 files changed, 33 insertions, 21 deletions
diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 3aaea7ffd077..a24ba9fe5bb8 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -47,8 +47,7 @@
47#define RECV 1 47#define RECV 1
48 48
49#define STATE_NONE 0 49#define STATE_NONE 0
50#define STATE_PENDING 1 50#define STATE_READY 1
51#define STATE_READY 2
52 51
53struct posix_msg_tree_node { 52struct posix_msg_tree_node {
54 struct rb_node rb_node; 53 struct rb_node rb_node;
@@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,
571 wq_add(info, sr, ewp); 570 wq_add(info, sr, ewp);
572 571
573 for (;;) { 572 for (;;) {
574 set_current_state(TASK_INTERRUPTIBLE); 573 __set_current_state(TASK_INTERRUPTIBLE);
575 574
576 spin_unlock(&info->lock); 575 spin_unlock(&info->lock);
577 time = schedule_hrtimeout_range_clock(timeout, 0, 576 time = schedule_hrtimeout_range_clock(timeout, 0,
578 HRTIMER_MODE_ABS, CLOCK_REALTIME); 577 HRTIMER_MODE_ABS, CLOCK_REALTIME);
579 578
580 while (ewp->state == STATE_PENDING)
581 cpu_relax();
582
583 if (ewp->state == STATE_READY) { 579 if (ewp->state == STATE_READY) {
584 retval = 0; 580 retval = 0;
585 goto out; 581 goto out;
@@ -907,11 +903,15 @@ out_name:
907 * list of waiting receivers. A sender checks that list before adding the new 903 * list of waiting receivers. A sender checks that list before adding the new
908 * message into the message array. If there is a waiting receiver, then it 904 * message into the message array. If there is a waiting receiver, then it
909 * bypasses the message array and directly hands the message over to the 905 * bypasses the message array and directly hands the message over to the
910 * receiver. 906 * receiver. The receiver accepts the message and returns without grabbing the
911 * The receiver accepts the message and returns without grabbing the queue 907 * queue spinlock:
912 * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers 908 *
913 * are necessary. The same algorithm is used for sysv semaphores, see 909 * - Set pointer to message.
914 * ipc/sem.c for more details. 910 * - Queue the receiver task for later wakeup (without the info->lock).
911 * - Update its state to STATE_READY. Now the receiver can continue.
912 * - Wake up the process after the lock is dropped. Should the process wake up
913 * before this wakeup (due to a timeout or a signal) it will either see
914 * STATE_READY and continue or acquire the lock to check the state again.
915 * 915 *
916 * The same algorithm is used for senders. 916 * The same algorithm is used for senders.
917 */ 917 */
@@ -919,21 +919,29 @@ out_name:
919/* pipelined_send() - send a message directly to the task waiting in 919/* pipelined_send() - send a message directly to the task waiting in
920 * sys_mq_timedreceive() (without inserting message into a queue). 920 * sys_mq_timedreceive() (without inserting message into a queue).
921 */ 921 */
922static inline void pipelined_send(struct mqueue_inode_info *info, 922static inline void pipelined_send(struct wake_q_head *wake_q,
923 struct mqueue_inode_info *info,
923 struct msg_msg *message, 924 struct msg_msg *message,
924 struct ext_wait_queue *receiver) 925 struct ext_wait_queue *receiver)
925{ 926{
926 receiver->msg = message; 927 receiver->msg = message;
927 list_del(&receiver->list); 928 list_del(&receiver->list);
928 receiver->state = STATE_PENDING; 929 wake_q_add(wake_q, receiver->task);
929 wake_up_process(receiver->task); 930 /*
930 smp_wmb(); 931 * Rely on the implicit cmpxchg barrier from wake_q_add such
932 * that we can ensure that updating receiver->state is the last
933 * write operation: As once set, the receiver can continue,
934 * and if we don't have the reference count from the wake_q,
935 * yet, at that point we can later have a use-after-free
936 * condition and bogus wakeup.
937 */
931 receiver->state = STATE_READY; 938 receiver->state = STATE_READY;
932} 939}
933 940
934/* pipelined_receive() - if there is task waiting in sys_mq_timedsend() 941/* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
935 * gets its message and put to the queue (we have one free place for sure). */ 942 * gets its message and put to the queue (we have one free place for sure). */
936static inline void pipelined_receive(struct mqueue_inode_info *info) 943static inline void pipelined_receive(struct wake_q_head *wake_q,
944 struct mqueue_inode_info *info)
937{ 945{
938 struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND); 946 struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);
939 947
@@ -944,10 +952,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
944 } 952 }
945 if (msg_insert(sender->msg, info)) 953 if (msg_insert(sender->msg, info))
946 return; 954 return;
955
947 list_del(&sender->list); 956 list_del(&sender->list);
948 sender->state = STATE_PENDING; 957 wake_q_add(wake_q, sender->task);
949 wake_up_process(sender->task);
950 smp_wmb();
951 sender->state = STATE_READY; 958 sender->state = STATE_READY;
952} 959}
953 960
@@ -965,6 +972,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
965 struct timespec ts; 972 struct timespec ts;
966 struct posix_msg_tree_node *new_leaf = NULL; 973 struct posix_msg_tree_node *new_leaf = NULL;
967 int ret = 0; 974 int ret = 0;
975 WAKE_Q(wake_q);
968 976
969 if (u_abs_timeout) { 977 if (u_abs_timeout) {
970 int res = prepare_timeout(u_abs_timeout, &expires, &ts); 978 int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1049,7 +1057,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
1049 } else { 1057 } else {
1050 receiver = wq_get_first_waiter(info, RECV); 1058 receiver = wq_get_first_waiter(info, RECV);
1051 if (receiver) { 1059 if (receiver) {
1052 pipelined_send(info, msg_ptr, receiver); 1060 pipelined_send(&wake_q, info, msg_ptr, receiver);
1053 } else { 1061 } else {
1054 /* adds message to the queue */ 1062 /* adds message to the queue */
1055 ret = msg_insert(msg_ptr, info); 1063 ret = msg_insert(msg_ptr, info);
@@ -1062,6 +1070,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
1062 } 1070 }
1063out_unlock: 1071out_unlock:
1064 spin_unlock(&info->lock); 1072 spin_unlock(&info->lock);
1073 wake_up_q(&wake_q);
1065out_free: 1074out_free:
1066 if (ret) 1075 if (ret)
1067 free_msg(msg_ptr); 1076 free_msg(msg_ptr);
@@ -1149,14 +1158,17 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
1149 msg_ptr = wait.msg; 1158 msg_ptr = wait.msg;
1150 } 1159 }
1151 } else { 1160 } else {
1161 WAKE_Q(wake_q);
1162
1152 msg_ptr = msg_get(info); 1163 msg_ptr = msg_get(info);
1153 1164
1154 inode->i_atime = inode->i_mtime = inode->i_ctime = 1165 inode->i_atime = inode->i_mtime = inode->i_ctime =
1155 CURRENT_TIME; 1166 CURRENT_TIME;
1156 1167
1157 /* There is now free space in queue. */ 1168 /* There is now free space in queue. */
1158 pipelined_receive(info); 1169 pipelined_receive(&wake_q, info);
1159 spin_unlock(&info->lock); 1170 spin_unlock(&info->lock);
1171 wake_up_q(&wake_q);
1160 ret = 0; 1172 ret = 0;
1161 } 1173 }
1162 if (ret == 0) { 1174 if (ret == 0) {