diff options
author | Davidlohr Bueso <dave@stgolabs.net> | 2015-05-04 10:02:46 -0400 |
---|---|---|
committer | Ingo Molnar <mingo@kernel.org> | 2015-05-08 06:23:07 -0400 |
commit | fa6004ad4528153b699a4d5ce5ea6b33acce74cc (patch) | |
tree | eda96bac494cc6e9071e14b2d5741fa8f29958ea /ipc | |
parent | 1d0dcb3ad9d336e6d6ee020a750a7f8d907e28de (diff) |
ipc/mqueue: Implement lockless pipelined wakeups
This patch moves the wakeup_process() invocation so it is not done under
the info->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once it is STATE_READY and it does not need to loop
on SMP if it is still in STATE_PENDING. In the timeout case we still need
to grab the info->lock to verify the state.
This change should also avoid the introduction of preempt_disable() in -rt
which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY
change if the waiter has a higher priority compared to the waker.
Additionally, this patch micro-optimizes wq_sleep by using the cheaper
cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no
matter what, thus get rid of the implied barrier.
Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
Signed-off-by: Peter Zijlstra (Intel) <peterz@infradead.org>
Acked-by: George Spelvin <linux@horizon.com>
Acked-by: Thomas Gleixner <tglx@linutronix.de>
Cc: Andrew Morton <akpm@linux-foundation.org>
Cc: Borislav Petkov <bp@alien8.de>
Cc: Chris Mason <clm@fb.com>
Cc: H. Peter Anvin <hpa@zytor.com>
Cc: Linus Torvalds <torvalds@linux-foundation.org>
Cc: Manfred Spraul <manfred@colorfullife.com>
Cc: Peter Zijlstra <peterz@infradead.org>
Cc: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
Cc: Steven Rostedt <rostedt@goodmis.org>
Cc: dave@stgolabs.net
Link: http://lkml.kernel.org/r/1430748166.1940.17.camel@stgolabs.net
Signed-off-by: Ingo Molnar <mingo@kernel.org>
Diffstat (limited to 'ipc')
-rw-r--r-- | ipc/mqueue.c | 54 |
1 files changed, 33 insertions, 21 deletions
diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 3aaea7ffd077..a24ba9fe5bb8 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c | |||
@@ -47,8 +47,7 @@ | |||
47 | #define RECV 1 | 47 | #define RECV 1 |
48 | 48 | ||
49 | #define STATE_NONE 0 | 49 | #define STATE_NONE 0 |
50 | #define STATE_PENDING 1 | 50 | #define STATE_READY 1 |
51 | #define STATE_READY 2 | ||
52 | 51 | ||
53 | struct posix_msg_tree_node { | 52 | struct posix_msg_tree_node { |
54 | struct rb_node rb_node; | 53 | struct rb_node rb_node; |
@@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr, | |||
571 | wq_add(info, sr, ewp); | 570 | wq_add(info, sr, ewp); |
572 | 571 | ||
573 | for (;;) { | 572 | for (;;) { |
574 | set_current_state(TASK_INTERRUPTIBLE); | 573 | __set_current_state(TASK_INTERRUPTIBLE); |
575 | 574 | ||
576 | spin_unlock(&info->lock); | 575 | spin_unlock(&info->lock); |
577 | time = schedule_hrtimeout_range_clock(timeout, 0, | 576 | time = schedule_hrtimeout_range_clock(timeout, 0, |
578 | HRTIMER_MODE_ABS, CLOCK_REALTIME); | 577 | HRTIMER_MODE_ABS, CLOCK_REALTIME); |
579 | 578 | ||
580 | while (ewp->state == STATE_PENDING) | ||
581 | cpu_relax(); | ||
582 | |||
583 | if (ewp->state == STATE_READY) { | 579 | if (ewp->state == STATE_READY) { |
584 | retval = 0; | 580 | retval = 0; |
585 | goto out; | 581 | goto out; |
@@ -907,11 +903,15 @@ out_name: | |||
907 | * list of waiting receivers. A sender checks that list before adding the new | 903 | * list of waiting receivers. A sender checks that list before adding the new |
908 | * message into the message array. If there is a waiting receiver, then it | 904 | * message into the message array. If there is a waiting receiver, then it |
909 | * bypasses the message array and directly hands the message over to the | 905 | * bypasses the message array and directly hands the message over to the |
910 | * receiver. | 906 | * receiver. The receiver accepts the message and returns without grabbing the |
911 | * The receiver accepts the message and returns without grabbing the queue | 907 | * queue spinlock: |
912 | * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers | 908 | * |
913 | * are necessary. The same algorithm is used for sysv semaphores, see | 909 | * - Set pointer to message. |
914 | * ipc/sem.c for more details. | 910 | * - Queue the receiver task for later wakeup (without the info->lock). |
911 | * - Update its state to STATE_READY. Now the receiver can continue. | ||
912 | * - Wake up the process after the lock is dropped. Should the process wake up | ||
913 | * before this wakeup (due to a timeout or a signal) it will either see | ||
914 | * STATE_READY and continue or acquire the lock to check the state again. | ||
915 | * | 915 | * |
916 | * The same algorithm is used for senders. | 916 | * The same algorithm is used for senders. |
917 | */ | 917 | */ |
@@ -919,21 +919,29 @@ out_name: | |||
919 | /* pipelined_send() - send a message directly to the task waiting in | 919 | /* pipelined_send() - send a message directly to the task waiting in |
920 | * sys_mq_timedreceive() (without inserting message into a queue). | 920 | * sys_mq_timedreceive() (without inserting message into a queue). |
921 | */ | 921 | */ |
922 | static inline void pipelined_send(struct mqueue_inode_info *info, | 922 | static inline void pipelined_send(struct wake_q_head *wake_q, |
923 | struct mqueue_inode_info *info, | ||
923 | struct msg_msg *message, | 924 | struct msg_msg *message, |
924 | struct ext_wait_queue *receiver) | 925 | struct ext_wait_queue *receiver) |
925 | { | 926 | { |
926 | receiver->msg = message; | 927 | receiver->msg = message; |
927 | list_del(&receiver->list); | 928 | list_del(&receiver->list); |
928 | receiver->state = STATE_PENDING; | 929 | wake_q_add(wake_q, receiver->task); |
929 | wake_up_process(receiver->task); | 930 | /* |
930 | smp_wmb(); | 931 | * Rely on the implicit cmpxchg barrier from wake_q_add such |
932 | * that we can ensure that updating receiver->state is the last | ||
933 | * write operation: As once set, the receiver can continue, | ||
934 | * and if we don't have the reference count from the wake_q, | ||
935 | * yet, at that point we can later have a use-after-free | ||
936 | * condition and bogus wakeup. | ||
937 | */ | ||
931 | receiver->state = STATE_READY; | 938 | receiver->state = STATE_READY; |
932 | } | 939 | } |
933 | 940 | ||
934 | /* pipelined_receive() - if there is task waiting in sys_mq_timedsend() | 941 | /* pipelined_receive() - if there is task waiting in sys_mq_timedsend() |
935 | * gets its message and put to the queue (we have one free place for sure). */ | 942 | * gets its message and put to the queue (we have one free place for sure). */ |
936 | static inline void pipelined_receive(struct mqueue_inode_info *info) | 943 | static inline void pipelined_receive(struct wake_q_head *wake_q, |
944 | struct mqueue_inode_info *info) | ||
937 | { | 945 | { |
938 | struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND); | 946 | struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND); |
939 | 947 | ||
@@ -944,10 +952,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info) | |||
944 | } | 952 | } |
945 | if (msg_insert(sender->msg, info)) | 953 | if (msg_insert(sender->msg, info)) |
946 | return; | 954 | return; |
955 | |||
947 | list_del(&sender->list); | 956 | list_del(&sender->list); |
948 | sender->state = STATE_PENDING; | 957 | wake_q_add(wake_q, sender->task); |
949 | wake_up_process(sender->task); | ||
950 | smp_wmb(); | ||
951 | sender->state = STATE_READY; | 958 | sender->state = STATE_READY; |
952 | } | 959 | } |
953 | 960 | ||
@@ -965,6 +972,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, | |||
965 | struct timespec ts; | 972 | struct timespec ts; |
966 | struct posix_msg_tree_node *new_leaf = NULL; | 973 | struct posix_msg_tree_node *new_leaf = NULL; |
967 | int ret = 0; | 974 | int ret = 0; |
975 | WAKE_Q(wake_q); | ||
968 | 976 | ||
969 | if (u_abs_timeout) { | 977 | if (u_abs_timeout) { |
970 | int res = prepare_timeout(u_abs_timeout, &expires, &ts); | 978 | int res = prepare_timeout(u_abs_timeout, &expires, &ts); |
@@ -1049,7 +1057,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, | |||
1049 | } else { | 1057 | } else { |
1050 | receiver = wq_get_first_waiter(info, RECV); | 1058 | receiver = wq_get_first_waiter(info, RECV); |
1051 | if (receiver) { | 1059 | if (receiver) { |
1052 | pipelined_send(info, msg_ptr, receiver); | 1060 | pipelined_send(&wake_q, info, msg_ptr, receiver); |
1053 | } else { | 1061 | } else { |
1054 | /* adds message to the queue */ | 1062 | /* adds message to the queue */ |
1055 | ret = msg_insert(msg_ptr, info); | 1063 | ret = msg_insert(msg_ptr, info); |
@@ -1062,6 +1070,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, | |||
1062 | } | 1070 | } |
1063 | out_unlock: | 1071 | out_unlock: |
1064 | spin_unlock(&info->lock); | 1072 | spin_unlock(&info->lock); |
1073 | wake_up_q(&wake_q); | ||
1065 | out_free: | 1074 | out_free: |
1066 | if (ret) | 1075 | if (ret) |
1067 | free_msg(msg_ptr); | 1076 | free_msg(msg_ptr); |
@@ -1149,14 +1158,17 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr, | |||
1149 | msg_ptr = wait.msg; | 1158 | msg_ptr = wait.msg; |
1150 | } | 1159 | } |
1151 | } else { | 1160 | } else { |
1161 | WAKE_Q(wake_q); | ||
1162 | |||
1152 | msg_ptr = msg_get(info); | 1163 | msg_ptr = msg_get(info); |
1153 | 1164 | ||
1154 | inode->i_atime = inode->i_mtime = inode->i_ctime = | 1165 | inode->i_atime = inode->i_mtime = inode->i_ctime = |
1155 | CURRENT_TIME; | 1166 | CURRENT_TIME; |
1156 | 1167 | ||
1157 | /* There is now free space in queue. */ | 1168 | /* There is now free space in queue. */ |
1158 | pipelined_receive(info); | 1169 | pipelined_receive(&wake_q, info); |
1159 | spin_unlock(&info->lock); | 1170 | spin_unlock(&info->lock); |
1171 | wake_up_q(&wake_q); | ||
1160 | ret = 0; | 1172 | ret = 0; |
1161 | } | 1173 | } |
1162 | if (ret == 0) { | 1174 | if (ret == 0) { |