aboutsummaryrefslogtreecommitdiffstats
path: root/ipc
diff options
context:
space:
mode:
authorDoug Ledford <dledford@redhat.com>2012-05-31 19:26:35 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2012-05-31 20:49:31 -0400
commitd6629859b36d953a4b1369b749f178736911bf10 (patch)
tree154cfc0d8ff3b65f59b9052bcc41edaabf974063 /ipc
parent50069a5851323ba5def0e414a21e234345016870 (diff)
ipc/mqueue: improve performance of send/recv
The existing implementation of the POSIX message queue send and recv functions is, well, abysmal. Even worse than abysmal. I submitted a patch to increase the maximum POSIX message queue limit to 65536 due to customer needs, however, upon looking over the send/recv implementation, I realized that my customer needs help with that too even if they don't know it. The basic problem is that, given the fairly typical use case scenario for a large queue of queueing lots of messages all at the same priority (I verified with my customer that this is indeed what their app does), the msg_insert routine is basically a frikkin' bubble sort. I mean, whoa, that's *so* middle school. OK, OK, to not slam the original author too much, I'm sure they didn't envision a queue depth of 50,000+ messages. No one would think that moving elements in an array, one at a time, and dereferencing each pointer in that array to check priority of the message being pointed too, again one at a time, for 50,000+ times would be good. So let's assume that, as is typical, the users have found a way to break our code simply by using it in a way we didn't envision. Fair enough. "So, just how broken is it?", you ask. I wondered the same thing, so I wrote an app to let me know. It's my next patch. It gave me some interesting results. Here's what it tested: Interference with other apps - In continuous mode, the app just sits there and hits a message queue forever, while you go do something productive on another terminal using other CPUs. You then measure how long it takes you to do that something productive. Then you restart the app in fake continuous mode, and it sits in a tight loop on a CPU while you repeat your tests. The whole point of this is to keep one CPU tied up (so it can't be used in your other work) but in one case tied up hitting the mqueue code so we can see the effect of walking that 65,528 element array one pointer at a time on the global CPU cache. If it's bad, then it will slow down your app on the other CPUs just by polluting cache mercilessly. In the fake case, it will be in a tight loop, but not polluting cache. Testing the mqueue subsystem directly - Here we just run a number of tests to see how the mqueue subsystem performs under different conditions. A couple conditions are known to be worst case for the old system, and some routines, so this tests all of them. So, on to the results already: Subsystem/Test Old New Time to compile linux kernel (make -j12 on a 6 core CPU) Running mqueue test user 49m10.744s user 45m26.294s sys 5m51.924s sys 4m59.894s total 55m02.668s total 50m26.188s Running fake test user 45m32.686s user 45m18.552s sys 5m12.465s sys 4m56.468s total 50m45.151s total 50m15.020s % slowdown from mqueue cache thrashing ~8% ~.5% Avg time to send/recv (in nanoseconds per message) when queue empty 305/288 349/318 when queue full (65528 messages) constant priority 526589/823 362/314 increasing priority 403105/916 495/445 decreasing priority 73420/594 482/409 random priority 280147/920 546/436 Time to fill/drain queue (65528 messages, in seconds) constant priority 17.37/.12 .13/.12 increasing priority 4.14/.14 .21/.18 decreasing priority 12.93/.13 .21/.18 random priority 8.88/.16 .22/.17 So, I think the results speak for themselves. It's possible this implementation could be improved by cacheing at least one priority level in the node tree (that would bring the queue empty performance more in line with the old implementation), but this works and is *so* much better than what we had, especially for the common case of a single priority in use, that further refinements can be in follow on patches. [akpm@linux-foundation.org: fix typo in comment, remove stray semicolon] [levinsasha928@gmail.com: use correct gfp flags in msg_insert] Signed-off-by: Doug Ledford <dledford@redhat.com> Cc: Stephen Rothwell <sfr@canb.auug.org.au> Cc: Manfred Spraul <manfred@colorfullife.com> Acked-by: KOSAKI Motohiro <kosaki.motohiro@jp.fujitsu.com> Signed-off-by: Sasha Levin <levinsasha928@gmail.com> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'ipc')
-rw-r--r--ipc/mqueue.c173
1 files changed, 130 insertions, 43 deletions
diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 609d53f7a915..3c72a05dc326 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -50,6 +50,12 @@
50#define STATE_PENDING 1 50#define STATE_PENDING 1
51#define STATE_READY 2 51#define STATE_READY 2
52 52
53struct posix_msg_tree_node {
54 struct rb_node rb_node;
55 struct list_head msg_list;
56 int priority;
57};
58
53struct ext_wait_queue { /* queue of sleeping tasks */ 59struct ext_wait_queue { /* queue of sleeping tasks */
54 struct task_struct *task; 60 struct task_struct *task;
55 struct list_head list; 61 struct list_head list;
@@ -62,7 +68,7 @@ struct mqueue_inode_info {
62 struct inode vfs_inode; 68 struct inode vfs_inode;
63 wait_queue_head_t wait_q; 69 wait_queue_head_t wait_q;
64 70
65 struct msg_msg **messages; 71 struct rb_root msg_tree;
66 struct mq_attr attr; 72 struct mq_attr attr;
67 73
68 struct sigevent notify; 74 struct sigevent notify;
@@ -110,6 +116,90 @@ static struct ipc_namespace *get_ns_from_inode(struct inode *inode)
110 return ns; 116 return ns;
111} 117}
112 118
119/* Auxiliary functions to manipulate messages' list */
120static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info)
121{
122 struct rb_node **p, *parent = NULL;
123 struct posix_msg_tree_node *leaf;
124
125 p = &info->msg_tree.rb_node;
126 while (*p) {
127 parent = *p;
128 leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node);
129
130 if (likely(leaf->priority == msg->m_type))
131 goto insert_msg;
132 else if (msg->m_type < leaf->priority)
133 p = &(*p)->rb_left;
134 else
135 p = &(*p)->rb_right;
136 }
137 leaf = kzalloc(sizeof(*leaf), GFP_ATOMIC);
138 if (!leaf)
139 return -ENOMEM;
140 rb_init_node(&leaf->rb_node);
141 INIT_LIST_HEAD(&leaf->msg_list);
142 leaf->priority = msg->m_type;
143 rb_link_node(&leaf->rb_node, parent, p);
144 rb_insert_color(&leaf->rb_node, &info->msg_tree);
145 info->qsize += sizeof(struct posix_msg_tree_node);
146insert_msg:
147 info->attr.mq_curmsgs++;
148 info->qsize += msg->m_ts;
149 list_add_tail(&msg->m_list, &leaf->msg_list);
150 return 0;
151}
152
153static inline struct msg_msg *msg_get(struct mqueue_inode_info *info)
154{
155 struct rb_node **p, *parent = NULL;
156 struct posix_msg_tree_node *leaf;
157 struct msg_msg *msg;
158
159try_again:
160 p = &info->msg_tree.rb_node;
161 while (*p) {
162 parent = *p;
163 /*
164 * During insert, low priorities go to the left and high to the
165 * right. On receive, we want the highest priorities first, so
166 * walk all the way to the right.
167 */
168 p = &(*p)->rb_right;
169 }
170 if (!parent) {
171 if (info->attr.mq_curmsgs) {
172 pr_warn_once("Inconsistency in POSIX message queue, "
173 "no tree element, but supposedly messages "
174 "should exist!\n");
175 info->attr.mq_curmsgs = 0;
176 }
177 return NULL;
178 }
179 leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node);
180 if (list_empty(&leaf->msg_list)) {
181 pr_warn_once("Inconsistency in POSIX message queue, "
182 "empty leaf node but we haven't implemented "
183 "lazy leaf delete!\n");
184 rb_erase(&leaf->rb_node, &info->msg_tree);
185 info->qsize -= sizeof(struct posix_msg_tree_node);
186 kfree(leaf);
187 goto try_again;
188 } else {
189 msg = list_first_entry(&leaf->msg_list,
190 struct msg_msg, m_list);
191 list_del(&msg->m_list);
192 if (list_empty(&leaf->msg_list)) {
193 rb_erase(&leaf->rb_node, &info->msg_tree);
194 info->qsize -= sizeof(struct posix_msg_tree_node);
195 kfree(leaf);
196 }
197 }
198 info->attr.mq_curmsgs--;
199 info->qsize -= msg->m_ts;
200 return msg;
201}
202
113static struct inode *mqueue_get_inode(struct super_block *sb, 203static struct inode *mqueue_get_inode(struct super_block *sb,
114 struct ipc_namespace *ipc_ns, umode_t mode, 204 struct ipc_namespace *ipc_ns, umode_t mode,
115 struct mq_attr *attr) 205 struct mq_attr *attr)
@@ -130,7 +220,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb,
130 220
131 if (S_ISREG(mode)) { 221 if (S_ISREG(mode)) {
132 struct mqueue_inode_info *info; 222 struct mqueue_inode_info *info;
133 unsigned long mq_bytes, mq_msg_tblsz; 223 unsigned long mq_bytes, mq_treesize;
134 224
135 inode->i_fop = &mqueue_file_operations; 225 inode->i_fop = &mqueue_file_operations;
136 inode->i_size = FILENT_SIZE; 226 inode->i_size = FILENT_SIZE;
@@ -144,6 +234,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb,
144 info->notify_user_ns = NULL; 234 info->notify_user_ns = NULL;
145 info->qsize = 0; 235 info->qsize = 0;
146 info->user = NULL; /* set when all is ok */ 236 info->user = NULL; /* set when all is ok */
237 info->msg_tree = RB_ROOT;
147 memset(&info->attr, 0, sizeof(info->attr)); 238 memset(&info->attr, 0, sizeof(info->attr));
148 info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max, 239 info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max,
149 ipc_ns->mq_msg_default); 240 ipc_ns->mq_msg_default);
@@ -153,16 +244,25 @@ static struct inode *mqueue_get_inode(struct super_block *sb,
153 info->attr.mq_maxmsg = attr->mq_maxmsg; 244 info->attr.mq_maxmsg = attr->mq_maxmsg;
154 info->attr.mq_msgsize = attr->mq_msgsize; 245 info->attr.mq_msgsize = attr->mq_msgsize;
155 } 246 }
156 mq_msg_tblsz = info->attr.mq_maxmsg * sizeof(struct msg_msg *); 247 /*
157 if (mq_msg_tblsz > PAGE_SIZE) 248 * We used to allocate a static array of pointers and account
158 info->messages = vmalloc(mq_msg_tblsz); 249 * the size of that array as well as one msg_msg struct per
159 else 250 * possible message into the queue size. That's no longer
160 info->messages = kmalloc(mq_msg_tblsz, GFP_KERNEL); 251 * accurate as the queue is now an rbtree and will grow and
161 if (!info->messages) 252 * shrink depending on usage patterns. We can, however, still
162 goto out_inode; 253 * account one msg_msg struct per message, but the nodes are
254 * allocated depending on priority usage, and most programs
255 * only use one, or a handful, of priorities. However, since
256 * this is pinned memory, we need to assume worst case, so
257 * that means the min(mq_maxmsg, max_priorities) * struct
258 * posix_msg_tree_node.
259 */
260 mq_treesize = info->attr.mq_maxmsg * sizeof(struct msg_msg) +
261 min_t(unsigned int, info->attr.mq_maxmsg, MQ_PRIO_MAX) *
262 sizeof(struct posix_msg_tree_node);
163 263
164 mq_bytes = (mq_msg_tblsz + 264 mq_bytes = mq_treesize + (info->attr.mq_maxmsg *
165 (info->attr.mq_maxmsg * info->attr.mq_msgsize)); 265 info->attr.mq_msgsize);
166 266
167 spin_lock(&mq_lock); 267 spin_lock(&mq_lock);
168 if (u->mq_bytes + mq_bytes < u->mq_bytes || 268 if (u->mq_bytes + mq_bytes < u->mq_bytes ||
@@ -253,9 +353,9 @@ static void mqueue_evict_inode(struct inode *inode)
253{ 353{
254 struct mqueue_inode_info *info; 354 struct mqueue_inode_info *info;
255 struct user_struct *user; 355 struct user_struct *user;
256 unsigned long mq_bytes; 356 unsigned long mq_bytes, mq_treesize;
257 int i;
258 struct ipc_namespace *ipc_ns; 357 struct ipc_namespace *ipc_ns;
358 struct msg_msg *msg;
259 359
260 clear_inode(inode); 360 clear_inode(inode);
261 361
@@ -265,17 +365,18 @@ static void mqueue_evict_inode(struct inode *inode)
265 ipc_ns = get_ns_from_inode(inode); 365 ipc_ns = get_ns_from_inode(inode);
266 info = MQUEUE_I(inode); 366 info = MQUEUE_I(inode);
267 spin_lock(&info->lock); 367 spin_lock(&info->lock);
268 for (i = 0; i < info->attr.mq_curmsgs; i++) 368 while ((msg = msg_get(info)) != NULL)
269 free_msg(info->messages[i]); 369 free_msg(msg);
270 if (is_vmalloc_addr(info->messages))
271 vfree(info->messages);
272 else
273 kfree(info->messages);
274 spin_unlock(&info->lock); 370 spin_unlock(&info->lock);
275 371
276 /* Total amount of bytes accounted for the mqueue */ 372 /* Total amount of bytes accounted for the mqueue */
277 mq_bytes = info->attr.mq_maxmsg * (sizeof(struct msg_msg *) 373 mq_treesize = info->attr.mq_maxmsg * sizeof(struct msg_msg) +
278 + info->attr.mq_msgsize); 374 min_t(unsigned int, info->attr.mq_maxmsg, MQ_PRIO_MAX) *
375 sizeof(struct posix_msg_tree_node);
376
377 mq_bytes = mq_treesize + (info->attr.mq_maxmsg *
378 info->attr.mq_msgsize);
379
279 user = info->user; 380 user = info->user;
280 if (user) { 381 if (user) {
281 spin_lock(&mq_lock); 382 spin_lock(&mq_lock);
@@ -495,26 +596,6 @@ static struct ext_wait_queue *wq_get_first_waiter(
495 return list_entry(ptr, struct ext_wait_queue, list); 596 return list_entry(ptr, struct ext_wait_queue, list);
496} 597}
497 598
498/* Auxiliary functions to manipulate messages' list */
499static void msg_insert(struct msg_msg *ptr, struct mqueue_inode_info *info)
500{
501 int k;
502
503 k = info->attr.mq_curmsgs - 1;
504 while (k >= 0 && info->messages[k]->m_type >= ptr->m_type) {
505 info->messages[k + 1] = info->messages[k];
506 k--;
507 }
508 info->attr.mq_curmsgs++;
509 info->qsize += ptr->m_ts;
510 info->messages[k + 1] = ptr;
511}
512
513static inline struct msg_msg *msg_get(struct mqueue_inode_info *info)
514{
515 info->qsize -= info->messages[--info->attr.mq_curmsgs]->m_ts;
516 return info->messages[info->attr.mq_curmsgs];
517}
518 599
519static inline void set_cookie(struct sk_buff *skb, char code) 600static inline void set_cookie(struct sk_buff *skb, char code)
520{ 601{
@@ -848,7 +929,8 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
848 wake_up_interruptible(&info->wait_q); 929 wake_up_interruptible(&info->wait_q);
849 return; 930 return;
850 } 931 }
851 msg_insert(sender->msg, info); 932 if (msg_insert(sender->msg, info))
933 return;
852 list_del(&sender->list); 934 list_del(&sender->list);
853 sender->state = STATE_PENDING; 935 sender->state = STATE_PENDING;
854 wake_up_process(sender->task); 936 wake_up_process(sender->task);
@@ -936,7 +1018,12 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
936 pipelined_send(info, msg_ptr, receiver); 1018 pipelined_send(info, msg_ptr, receiver);
937 } else { 1019 } else {
938 /* adds message to the queue */ 1020 /* adds message to the queue */
939 msg_insert(msg_ptr, info); 1021 if (msg_insert(msg_ptr, info)) {
1022 free_msg(msg_ptr);
1023 ret = -ENOMEM;
1024 spin_unlock(&info->lock);
1025 goto out_fput;
1026 }
940 __do_notify(info); 1027 __do_notify(info);
941 } 1028 }
942 inode->i_atime = inode->i_mtime = inode->i_ctime = 1029 inode->i_atime = inode->i_mtime = inode->i_ctime =