aboutsummaryrefslogtreecommitdiffstats
path: root/ipc/mqueue.c
diff options
context:
space:
mode:
authorDoug Ledford <dledford@redhat.com>2012-05-31 19:26:38 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2012-05-31 20:49:31 -0400
commitce2d52cc1364a22fc1a161781e60ee3cbb499a6d (patch)
treef21a7a5ddfdfc0a470304268928c047399e71c7a /ipc/mqueue.c
parent7820b0715b6fb1378fab41b27fb7aa3950852cb7 (diff)
ipc/mqueue: add rbtree node caching support
When I wrote the first patch that added the rbtree support for message queue insertion, it sped up the case where the queue was very full drastically from the original code. It, however, slowed down the case where the queue was empty (not drastically though). This patch caches the last freed rbtree node struct so we can quickly reuse it when we get a new message. This is the common path for any queue that very frequently goes from 0 to 1 then back to 0 messages in queue. Andrew Morton didn't like that we were doing a GFP_ATOMIC allocation in msg_insert, so this patch attempts to speculatively allocate a new node struct outside of the spin lock when we know we need it, but will still fall back to a GFP_ATOMIC allocation if it has to. Once I added the caching, the necessary various ret = ; spin_unlock gyrations in mq_timedsend were getting pretty ugly, so this also slightly refactors that function to streamline the flow of the code and the function exit. Finally, while working on getting performance back I made sure that all of the node structs were always fully initialized when they were first used, rendering the use of kzalloc unnecessary and a waste of CPU cycles. The net result of all of this is: 1) We will avoid a GFP_ATOMIC allocation when possible, but fall back on it when necessary. 2) We will speculatively allocate a node struct using GFP_KERNEL if our cache is empty (and save the struct to our cache if it's still empty after we have obtained the spin lock). 3) The performance of the common queue empty case has significantly improved and is now much more in line with the older performance for this case. The performance changes are: Old mqueue new mqueue new mqueue + caching queue empty send/recv 305/288ns 349/318ns 310/322ns I don't think we'll ever be able to get the recv performance back, but that's because the old recv performance was a direct result and consequence of the old methods abysmal send performance. The recv path simply must do more so that the send path does not incur such a penalty under higher queue depths. As it turns out, the new caching code also sped up the various queue full cases relative to my last patch. That could be because of the difference between the syscall path in 3.3.4-rc5 and 3.3.4-rc6, or because of the change in code flow in the mq_timedsend routine. Regardless, I'll take it. It wasn't huge, and I *would* say it was within the margin for error, but after many repeated runs what I'm seeing is that the old numbers trend slightly higher (about 10 to 20ns depending on which test is the one running). [akpm@linux-foundation.org: checkpatch fixes] Signed-off-by: Doug Ledford <dledford@redhat.com> Cc: Frederic Weisbecker <fweisbec@gmail.com> Cc: Manfred Spraul <manfred@colorfullife.com> Cc: Stephen Rothwell <sfr@canb.auug.org.au> Cc: KOSAKI Motohiro <kosaki.motohiro@jp.fujitsu.com> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'ipc/mqueue.c')
-rw-r--r--ipc/mqueue.c104
1 files changed, 81 insertions, 23 deletions
diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index af1692556c52..8ce57691e7b6 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -69,6 +69,7 @@ struct mqueue_inode_info {
69 wait_queue_head_t wait_q; 69 wait_queue_head_t wait_q;
70 70
71 struct rb_root msg_tree; 71 struct rb_root msg_tree;
72 struct posix_msg_tree_node *node_cache;
72 struct mq_attr attr; 73 struct mq_attr attr;
73 74
74 struct sigevent notify; 75 struct sigevent notify;
@@ -134,15 +135,20 @@ static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info)
134 else 135 else
135 p = &(*p)->rb_right; 136 p = &(*p)->rb_right;
136 } 137 }
137 leaf = kzalloc(sizeof(*leaf), GFP_ATOMIC); 138 if (info->node_cache) {
138 if (!leaf) 139 leaf = info->node_cache;
139 return -ENOMEM; 140 info->node_cache = NULL;
140 rb_init_node(&leaf->rb_node); 141 } else {
141 INIT_LIST_HEAD(&leaf->msg_list); 142 leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
143 if (!leaf)
144 return -ENOMEM;
145 rb_init_node(&leaf->rb_node);
146 INIT_LIST_HEAD(&leaf->msg_list);
147 info->qsize += sizeof(*leaf);
148 }
142 leaf->priority = msg->m_type; 149 leaf->priority = msg->m_type;
143 rb_link_node(&leaf->rb_node, parent, p); 150 rb_link_node(&leaf->rb_node, parent, p);
144 rb_insert_color(&leaf->rb_node, &info->msg_tree); 151 rb_insert_color(&leaf->rb_node, &info->msg_tree);
145 info->qsize += sizeof(struct posix_msg_tree_node);
146insert_msg: 152insert_msg:
147 info->attr.mq_curmsgs++; 153 info->attr.mq_curmsgs++;
148 info->qsize += msg->m_ts; 154 info->qsize += msg->m_ts;
@@ -177,13 +183,17 @@ try_again:
177 return NULL; 183 return NULL;
178 } 184 }
179 leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node); 185 leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node);
180 if (list_empty(&leaf->msg_list)) { 186 if (unlikely(list_empty(&leaf->msg_list))) {
181 pr_warn_once("Inconsistency in POSIX message queue, " 187 pr_warn_once("Inconsistency in POSIX message queue, "
182 "empty leaf node but we haven't implemented " 188 "empty leaf node but we haven't implemented "
183 "lazy leaf delete!\n"); 189 "lazy leaf delete!\n");
184 rb_erase(&leaf->rb_node, &info->msg_tree); 190 rb_erase(&leaf->rb_node, &info->msg_tree);
185 info->qsize -= sizeof(struct posix_msg_tree_node); 191 if (info->node_cache) {
186 kfree(leaf); 192 info->qsize -= sizeof(*leaf);
193 kfree(leaf);
194 } else {
195 info->node_cache = leaf;
196 }
187 goto try_again; 197 goto try_again;
188 } else { 198 } else {
189 msg = list_first_entry(&leaf->msg_list, 199 msg = list_first_entry(&leaf->msg_list,
@@ -191,8 +201,12 @@ try_again:
191 list_del(&msg->m_list); 201 list_del(&msg->m_list);
192 if (list_empty(&leaf->msg_list)) { 202 if (list_empty(&leaf->msg_list)) {
193 rb_erase(&leaf->rb_node, &info->msg_tree); 203 rb_erase(&leaf->rb_node, &info->msg_tree);
194 info->qsize -= sizeof(struct posix_msg_tree_node); 204 if (info->node_cache) {
195 kfree(leaf); 205 info->qsize -= sizeof(*leaf);
206 kfree(leaf);
207 } else {
208 info->node_cache = leaf;
209 }
196 } 210 }
197 } 211 }
198 info->attr.mq_curmsgs--; 212 info->attr.mq_curmsgs--;
@@ -235,6 +249,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb,
235 info->qsize = 0; 249 info->qsize = 0;
236 info->user = NULL; /* set when all is ok */ 250 info->user = NULL; /* set when all is ok */
237 info->msg_tree = RB_ROOT; 251 info->msg_tree = RB_ROOT;
252 info->node_cache = NULL;
238 memset(&info->attr, 0, sizeof(info->attr)); 253 memset(&info->attr, 0, sizeof(info->attr));
239 info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max, 254 info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max,
240 ipc_ns->mq_msg_default); 255 ipc_ns->mq_msg_default);
@@ -367,6 +382,7 @@ static void mqueue_evict_inode(struct inode *inode)
367 spin_lock(&info->lock); 382 spin_lock(&info->lock);
368 while ((msg = msg_get(info)) != NULL) 383 while ((msg = msg_get(info)) != NULL)
369 free_msg(msg); 384 free_msg(msg);
385 kfree(info->node_cache);
370 spin_unlock(&info->lock); 386 spin_unlock(&info->lock);
371 387
372 /* Total amount of bytes accounted for the mqueue */ 388 /* Total amount of bytes accounted for the mqueue */
@@ -964,7 +980,8 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
964 struct mqueue_inode_info *info; 980 struct mqueue_inode_info *info;
965 ktime_t expires, *timeout = NULL; 981 ktime_t expires, *timeout = NULL;
966 struct timespec ts; 982 struct timespec ts;
967 int ret; 983 struct posix_msg_tree_node *new_leaf = NULL;
984 int ret = 0;
968 985
969 if (u_abs_timeout) { 986 if (u_abs_timeout) {
970 int res = prepare_timeout(u_abs_timeout, &expires, &ts); 987 int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1012,39 +1029,60 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
1012 msg_ptr->m_ts = msg_len; 1029 msg_ptr->m_ts = msg_len;
1013 msg_ptr->m_type = msg_prio; 1030 msg_ptr->m_type = msg_prio;
1014 1031
1032 /*
1033 * msg_insert really wants us to have a valid, spare node struct so
1034 * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
1035 * fall back to that if necessary.
1036 */
1037 if (!info->node_cache)
1038 new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
1039
1015 spin_lock(&info->lock); 1040 spin_lock(&info->lock);
1016 1041
1042 if (!info->node_cache && new_leaf) {
1043 /* Save our speculative allocation into the cache */
1044 rb_init_node(&new_leaf->rb_node);
1045 INIT_LIST_HEAD(&new_leaf->msg_list);
1046 info->node_cache = new_leaf;
1047 info->qsize += sizeof(*new_leaf);
1048 new_leaf = NULL;
1049 } else {
1050 kfree(new_leaf);
1051 }
1052
1017 if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) { 1053 if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {
1018 if (filp->f_flags & O_NONBLOCK) { 1054 if (filp->f_flags & O_NONBLOCK) {
1019 spin_unlock(&info->lock);
1020 ret = -EAGAIN; 1055 ret = -EAGAIN;
1021 } else { 1056 } else {
1022 wait.task = current; 1057 wait.task = current;
1023 wait.msg = (void *) msg_ptr; 1058 wait.msg = (void *) msg_ptr;
1024 wait.state = STATE_NONE; 1059 wait.state = STATE_NONE;
1025 ret = wq_sleep(info, SEND, timeout, &wait); 1060 ret = wq_sleep(info, SEND, timeout, &wait);
1061 /*
1062 * wq_sleep must be called with info->lock held, and
1063 * returns with the lock released
1064 */
1065 goto out_free;
1026 } 1066 }
1027 if (ret < 0)
1028 free_msg(msg_ptr);
1029 } else { 1067 } else {
1030 receiver = wq_get_first_waiter(info, RECV); 1068 receiver = wq_get_first_waiter(info, RECV);
1031 if (receiver) { 1069 if (receiver) {
1032 pipelined_send(info, msg_ptr, receiver); 1070 pipelined_send(info, msg_ptr, receiver);
1033 } else { 1071 } else {
1034 /* adds message to the queue */ 1072 /* adds message to the queue */
1035 if (msg_insert(msg_ptr, info)) { 1073 ret = msg_insert(msg_ptr, info);
1036 free_msg(msg_ptr); 1074 if (ret)
1037 ret = -ENOMEM; 1075 goto out_unlock;
1038 spin_unlock(&info->lock);
1039 goto out_fput;
1040 }
1041 __do_notify(info); 1076 __do_notify(info);
1042 } 1077 }
1043 inode->i_atime = inode->i_mtime = inode->i_ctime = 1078 inode->i_atime = inode->i_mtime = inode->i_ctime =
1044 CURRENT_TIME; 1079 CURRENT_TIME;
1045 spin_unlock(&info->lock);
1046 ret = 0;
1047 } 1080 }
1081out_unlock:
1082 spin_unlock(&info->lock);
1083out_free:
1084 if (ret)
1085 free_msg(msg_ptr);
1048out_fput: 1086out_fput:
1049 fput(filp); 1087 fput(filp);
1050out: 1088out:
@@ -1063,6 +1101,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
1063 struct ext_wait_queue wait; 1101 struct ext_wait_queue wait;
1064 ktime_t expires, *timeout = NULL; 1102 ktime_t expires, *timeout = NULL;
1065 struct timespec ts; 1103 struct timespec ts;
1104 struct posix_msg_tree_node *new_leaf = NULL;
1066 1105
1067 if (u_abs_timeout) { 1106 if (u_abs_timeout) {
1068 int res = prepare_timeout(u_abs_timeout, &expires, &ts); 1107 int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1098,7 +1137,26 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
1098 goto out_fput; 1137 goto out_fput;
1099 } 1138 }
1100 1139
1140 /*
1141 * msg_insert really wants us to have a valid, spare node struct so
1142 * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
1143 * fall back to that if necessary.
1144 */
1145 if (!info->node_cache)
1146 new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
1147
1101 spin_lock(&info->lock); 1148 spin_lock(&info->lock);
1149
1150 if (!info->node_cache && new_leaf) {
1151 /* Save our speculative allocation into the cache */
1152 rb_init_node(&new_leaf->rb_node);
1153 INIT_LIST_HEAD(&new_leaf->msg_list);
1154 info->node_cache = new_leaf;
1155 info->qsize += sizeof(*new_leaf);
1156 } else {
1157 kfree(new_leaf);
1158 }
1159
1102 if (info->attr.mq_curmsgs == 0) { 1160 if (info->attr.mq_curmsgs == 0) {
1103 if (filp->f_flags & O_NONBLOCK) { 1161 if (filp->f_flags & O_NONBLOCK) {
1104 spin_unlock(&info->lock); 1162 spin_unlock(&info->lock);