aboutsummaryrefslogtreecommitdiffstats
path: root/ipc/msg.c
diff options
context:
space:
mode:
authorDavidlohr Bueso <dave@stgolabs.net>2016-10-11 16:55:02 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2016-10-11 18:06:33 -0400
commited27f9122c541a1720898739ac55f824f820b7ff (patch)
treea458a1ee07957da30167b9afe6cb9e1df3cd2b6f /ipc/msg.c
parentd0d6a2a95e80e63827ea1ca184754a990438c072 (diff)
ipc/msg: avoid waking sender upon full queue
Blocked tasks queued in q_senders waiting for their message to fit in the queue are blindly awoken every time we think there's a remote chance this might happen. This could cause numerous (and expensive -- thundering herd-ish) bogus wakeups if the queue is still really full. Adding to the scheduling cost/overhead, there's also the fact that we need to take the ipc object lock and requeue ourselves in the q_senders list. By keeping track of the blocked sender's message size, we can know previously if the wakeup ought to occur or not. Otherwise, to maintain the current wakeup order we just move it to the tail. This is exactly what occurs right now if the sender needs to go back to sleep. The case of EIDRM is left completely untouched, as we need to wakeup all the tasks, and shouldn't be playing games in the first place. This patch was seen to save on the 'msgctl10' ltp testcase ~15% in context switches (avg out of ten runs). Although these tests are really about functionality (as opposed to performance), is does show the direct benefits of the optimization. [akpm@linux-foundation.org: coding-style fixes] Link: http://lkml.kernel.org/r/1469748819-19484-6-git-send-email-dave@stgolabs.net Signed-off-by: Davidlohr Bueso <dbueso@suse.de> Acked-by: Peter Zijlstra (Intel) <peterz@infradead.org> Cc: Manfred Spraul <manfred@colorfullife.com> Cc: Sebastian Andrzej Siewior <bigeasy@linutronix.de> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'ipc/msg.c')
-rw-r--r--ipc/msg.c53
1 files changed, 43 insertions, 10 deletions
diff --git a/ipc/msg.c b/ipc/msg.c
index 3c44bbcc05f6..e12307d0c920 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -58,6 +58,7 @@ struct msg_receiver {
58struct msg_sender { 58struct msg_sender {
59 struct list_head list; 59 struct list_head list;
60 struct task_struct *tsk; 60 struct task_struct *tsk;
61 size_t msgsz;
61}; 62};
62 63
63#define SEARCH_ANY 1 64#define SEARCH_ANY 1
@@ -153,27 +154,60 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params)
153 return msq->q_perm.id; 154 return msq->q_perm.id;
154} 155}
155 156
156static inline void ss_add(struct msg_queue *msq, struct msg_sender *mss) 157static inline bool msg_fits_inqueue(struct msg_queue *msq, size_t msgsz)
158{
159 return msgsz + msq->q_cbytes <= msq->q_qbytes &&
160 1 + msq->q_qnum <= msq->q_qbytes;
161}
162
163static inline void ss_add(struct msg_queue *msq,
164 struct msg_sender *mss, size_t msgsz)
157{ 165{
158 mss->tsk = current; 166 mss->tsk = current;
167 mss->msgsz = msgsz;
159 __set_current_state(TASK_INTERRUPTIBLE); 168 __set_current_state(TASK_INTERRUPTIBLE);
160 list_add_tail(&mss->list, &msq->q_senders); 169 list_add_tail(&mss->list, &msq->q_senders);
161} 170}
162 171
163static inline void ss_del(struct msg_sender *mss) 172static inline void ss_del(struct msg_sender *mss)
164{ 173{
165 if (mss->list.next != NULL) 174 if (mss->list.next)
166 list_del(&mss->list); 175 list_del(&mss->list);
167} 176}
168 177
169static void ss_wakeup(struct list_head *h, 178static void ss_wakeup(struct msg_queue *msq,
170 struct wake_q_head *wake_q, bool kill) 179 struct wake_q_head *wake_q, bool kill)
171{ 180{
172 struct msg_sender *mss, *t; 181 struct msg_sender *mss, *t;
182 struct task_struct *stop_tsk = NULL;
183 struct list_head *h = &msq->q_senders;
173 184
174 list_for_each_entry_safe(mss, t, h, list) { 185 list_for_each_entry_safe(mss, t, h, list) {
175 if (kill) 186 if (kill)
176 mss->list.next = NULL; 187 mss->list.next = NULL;
188
189 /*
190 * Stop at the first task we don't wakeup,
191 * we've already iterated the original
192 * sender queue.
193 */
194 else if (stop_tsk == mss->tsk)
195 break;
196 /*
197 * We are not in an EIDRM scenario here, therefore
198 * verify that we really need to wakeup the task.
199 * To maintain current semantics and wakeup order,
200 * move the sender to the tail on behalf of the
201 * blocked task.
202 */
203 else if (!msg_fits_inqueue(msq, mss->msgsz)) {
204 if (!stop_tsk)
205 stop_tsk = mss->tsk;
206
207 list_move_tail(&mss->list, &msq->q_senders);
208 continue;
209 }
210
177 wake_q_add(wake_q, mss->tsk); 211 wake_q_add(wake_q, mss->tsk);
178 } 212 }
179} 213}
@@ -204,7 +238,7 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
204 WAKE_Q(wake_q); 238 WAKE_Q(wake_q);
205 239
206 expunge_all(msq, -EIDRM, &wake_q); 240 expunge_all(msq, -EIDRM, &wake_q);
207 ss_wakeup(&msq->q_senders, &wake_q, true); 241 ss_wakeup(msq, &wake_q, true);
208 msg_rmid(ns, msq); 242 msg_rmid(ns, msq);
209 ipc_unlock_object(&msq->q_perm); 243 ipc_unlock_object(&msq->q_perm);
210 wake_up_q(&wake_q); 244 wake_up_q(&wake_q);
@@ -388,7 +422,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
388 * Sleeping senders might be able to send 422 * Sleeping senders might be able to send
389 * due to a larger queue size. 423 * due to a larger queue size.
390 */ 424 */
391 ss_wakeup(&msq->q_senders, &wake_q, false); 425 ss_wakeup(msq, &wake_q, false);
392 ipc_unlock_object(&msq->q_perm); 426 ipc_unlock_object(&msq->q_perm);
393 wake_up_q(&wake_q); 427 wake_up_q(&wake_q);
394 428
@@ -642,10 +676,8 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
642 if (err) 676 if (err)
643 goto out_unlock0; 677 goto out_unlock0;
644 678
645 if (msgsz + msq->q_cbytes <= msq->q_qbytes && 679 if (msg_fits_inqueue(msq, msgsz))
646 1 + msq->q_qnum <= msq->q_qbytes) {
647 break; 680 break;
648 }
649 681
650 /* queue full, wait: */ 682 /* queue full, wait: */
651 if (msgflg & IPC_NOWAIT) { 683 if (msgflg & IPC_NOWAIT) {
@@ -654,7 +686,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
654 } 686 }
655 687
656 /* enqueue the sender and prepare to block */ 688 /* enqueue the sender and prepare to block */
657 ss_add(msq, &s); 689 ss_add(msq, &s, msgsz);
658 690
659 if (!ipc_rcu_getref(msq)) { 691 if (!ipc_rcu_getref(msq)) {
660 err = -EIDRM; 692 err = -EIDRM;
@@ -682,6 +714,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
682 } 714 }
683 715
684 } 716 }
717
685 msq->q_lspid = task_tgid_vnr(current); 718 msq->q_lspid = task_tgid_vnr(current);
686 msq->q_stime = get_seconds(); 719 msq->q_stime = get_seconds();
687 720
@@ -882,7 +915,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
882 msq->q_cbytes -= msg->m_ts; 915 msq->q_cbytes -= msg->m_ts;
883 atomic_sub(msg->m_ts, &ns->msg_bytes); 916 atomic_sub(msg->m_ts, &ns->msg_bytes);
884 atomic_dec(&ns->msg_hdrs); 917 atomic_dec(&ns->msg_hdrs);
885 ss_wakeup(&msq->q_senders, &wake_q, false); 918 ss_wakeup(msq, &wake_q, false);
886 919
887 goto out_unlock0; 920 goto out_unlock0;
888 } 921 }