aboutsummaryrefslogtreecommitdiffstats
path: root/ipc
diff options
context:
space:
mode:
authorDavidlohr Bueso <dave@stgolabs.net>2015-06-30 17:58:39 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2015-06-30 22:44:58 -0400
commitff35e5ef86fea1fa84eb7fdc939d0b1e3f1222bf (patch)
treedcbd89deb61c178925663ce5ad0457f232715a23 /ipc
parentc5c8975b2eb4eb7604e8ce4f762987f56d2a96a2 (diff)
ipc,msg: provide barrier pairings for lockless receive
We currently use a full barrier on the sender side to to avoid receiver tasks disappearing on us while still performing on the sender side wakeup. We lack however, the proper CPU-CPU interactions pairing on the receiver side which busy-waits for the message. Similarly, we do not need a full smp_mb, and can relax the semantics for the writer and reader sides of the message. This is safe as we are only ordering loads and stores to r_msg. And in both smp_wmb and smp_rmb, there are no stores after the calls _anyway_. This obviously applies for pipelined_send and expunge_all, for EIRDM when destroying a queue. Signed-off-by: Davidlohr Bueso <dbueso@suse.de> Cc: Manfred Spraul <manfred@colorfullife.com> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'ipc')
-rw-r--r--ipc/msg.c48
1 files changed, 38 insertions, 10 deletions
diff --git a/ipc/msg.c b/ipc/msg.c
index 2b6fdbb9e0e9..a9c3c519490a 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -196,7 +196,7 @@ static void expunge_all(struct msg_queue *msq, int res)
196 * or dealing with -EAGAIN cases. See lockless receive part 1 196 * or dealing with -EAGAIN cases. See lockless receive part 1
197 * and 2 in do_msgrcv(). 197 * and 2 in do_msgrcv().
198 */ 198 */
199 smp_mb(); 199 smp_wmb(); /* barrier (B) */
200 msr->r_msg = ERR_PTR(res); 200 msr->r_msg = ERR_PTR(res);
201 } 201 }
202} 202}
@@ -580,7 +580,8 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
580 /* initialize pipelined send ordering */ 580 /* initialize pipelined send ordering */
581 msr->r_msg = NULL; 581 msr->r_msg = NULL;
582 wake_up_process(msr->r_tsk); 582 wake_up_process(msr->r_tsk);
583 smp_mb(); /* see barrier comment below */ 583 /* barrier (B) see barrier comment below */
584 smp_wmb();
584 msr->r_msg = ERR_PTR(-E2BIG); 585 msr->r_msg = ERR_PTR(-E2BIG);
585 } else { 586 } else {
586 msr->r_msg = NULL; 587 msr->r_msg = NULL;
@@ -589,11 +590,12 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
589 wake_up_process(msr->r_tsk); 590 wake_up_process(msr->r_tsk);
590 /* 591 /*
591 * Ensure that the wakeup is visible before 592 * Ensure that the wakeup is visible before
592 * setting r_msg, as the receiving end depends 593 * setting r_msg, as the receiving can otherwise
593 * on it. See lockless receive part 1 and 2 in 594 * exit - once r_msg is set, the receiver can
594 * do_msgrcv(). 595 * continue. See lockless receive part 1 and 2
596 * in do_msgrcv(). Barrier (B).
595 */ 597 */
596 smp_mb(); 598 smp_wmb();
597 msr->r_msg = msg; 599 msr->r_msg = msg;
598 600
599 return 1; 601 return 1;
@@ -932,12 +934,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
932 /* Lockless receive, part 2: 934 /* Lockless receive, part 2:
933 * Wait until pipelined_send or expunge_all are outside of 935 * Wait until pipelined_send or expunge_all are outside of
934 * wake_up_process(). There is a race with exit(), see 936 * wake_up_process(). There is a race with exit(), see
935 * ipc/mqueue.c for the details. 937 * ipc/mqueue.c for the details. The correct serialization
938 * ensures that a receiver cannot continue without the wakeup
939 * being visibible _before_ setting r_msg:
940 *
941 * CPU 0 CPU 1
942 * <loop receiver>
943 * smp_rmb(); (A) <-- pair -. <waker thread>
944 * <load ->r_msg> | msr->r_msg = NULL;
945 * | wake_up_process();
946 * <continue> `------> smp_wmb(); (B)
947 * msr->r_msg = msg;
948 *
949 * Where (A) orders the message value read and where (B) orders
950 * the write to the r_msg -- done in both pipelined_send and
951 * expunge_all.
936 */ 952 */
937 msg = (struct msg_msg *)msr_d.r_msg; 953 for (;;) {
938 while (msg == NULL) { 954 /*
939 cpu_relax(); 955 * Pairs with writer barrier in pipelined_send
956 * or expunge_all.
957 */
958 smp_rmb(); /* barrier (A) */
940 msg = (struct msg_msg *)msr_d.r_msg; 959 msg = (struct msg_msg *)msr_d.r_msg;
960 if (msg)
961 break;
962
963 /*
964 * The cpu_relax() call is a compiler barrier
965 * which forces everything in this loop to be
966 * re-loaded.
967 */
968 cpu_relax();
941 } 969 }
942 970
943 /* Lockless receive, part 3: 971 /* Lockless receive, part 3: