aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/nfs/client.c2
-rw-r--r--fs/nfs/direct.c9
-rw-r--r--fs/nfs/inode.c43
-rw-r--r--fs/nfs/internal.h1
-rw-r--r--fs/nfs/nfs4proc.c10
-rw-r--r--fs/nfs/nfs4state.c8
-rw-r--r--fs/nfs/read.c16
-rw-r--r--fs/nfs/write.c63
-rw-r--r--include/linux/sunrpc/clnt.h9
-rw-r--r--include/linux/sunrpc/sched.h41
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c5
-rw-r--r--net/sunrpc/clnt.c8
-rw-r--r--net/sunrpc/rpcb_clnt.c2
-rw-r--r--net/sunrpc/sched.c255
-rw-r--r--net/sunrpc/xprt.c45
-rw-r--r--net/sunrpc/xprtsock.c7
16 files changed, 281 insertions, 243 deletions
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index c5c0175898f6..06f064d8fbbe 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -170,6 +170,8 @@ static void nfs4_shutdown_client(struct nfs_client *clp)
170 BUG_ON(!RB_EMPTY_ROOT(&clp->cl_state_owners)); 170 BUG_ON(!RB_EMPTY_ROOT(&clp->cl_state_owners));
171 if (__test_and_clear_bit(NFS_CS_IDMAP, &clp->cl_res_state)) 171 if (__test_and_clear_bit(NFS_CS_IDMAP, &clp->cl_res_state))
172 nfs_idmap_delete(clp); 172 nfs_idmap_delete(clp);
173
174 rpc_destroy_wait_queue(&clp->cl_rpcwaitq);
173#endif 175#endif
174} 176}
175 177
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 16844f98f50e..e44200579c8d 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -280,6 +280,7 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
280 .rpc_client = NFS_CLIENT(inode), 280 .rpc_client = NFS_CLIENT(inode),
281 .rpc_message = &msg, 281 .rpc_message = &msg,
282 .callback_ops = &nfs_read_direct_ops, 282 .callback_ops = &nfs_read_direct_ops,
283 .workqueue = nfsiod_workqueue,
283 .flags = RPC_TASK_ASYNC, 284 .flags = RPC_TASK_ASYNC,
284 }; 285 };
285 unsigned int pgbase; 286 unsigned int pgbase;
@@ -323,7 +324,7 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
323 data->inode = inode; 324 data->inode = inode;
324 data->cred = msg.rpc_cred; 325 data->cred = msg.rpc_cred;
325 data->args.fh = NFS_FH(inode); 326 data->args.fh = NFS_FH(inode);
326 data->args.context = ctx; 327 data->args.context = get_nfs_open_context(ctx);
327 data->args.offset = pos; 328 data->args.offset = pos;
328 data->args.pgbase = pgbase; 329 data->args.pgbase = pgbase;
329 data->args.pages = data->pagevec; 330 data->args.pages = data->pagevec;
@@ -446,6 +447,7 @@ static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
446 struct rpc_task_setup task_setup_data = { 447 struct rpc_task_setup task_setup_data = {
447 .rpc_client = NFS_CLIENT(inode), 448 .rpc_client = NFS_CLIENT(inode),
448 .callback_ops = &nfs_write_direct_ops, 449 .callback_ops = &nfs_write_direct_ops,
450 .workqueue = nfsiod_workqueue,
449 .flags = RPC_TASK_ASYNC, 451 .flags = RPC_TASK_ASYNC,
450 }; 452 };
451 453
@@ -537,6 +539,7 @@ static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
537 .rpc_message = &msg, 539 .rpc_message = &msg,
538 .callback_ops = &nfs_commit_direct_ops, 540 .callback_ops = &nfs_commit_direct_ops,
539 .callback_data = data, 541 .callback_data = data,
542 .workqueue = nfsiod_workqueue,
540 .flags = RPC_TASK_ASYNC, 543 .flags = RPC_TASK_ASYNC,
541 }; 544 };
542 545
@@ -546,6 +549,7 @@ static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
546 data->args.fh = NFS_FH(data->inode); 549 data->args.fh = NFS_FH(data->inode);
547 data->args.offset = 0; 550 data->args.offset = 0;
548 data->args.count = 0; 551 data->args.count = 0;
552 data->args.context = get_nfs_open_context(dreq->ctx);
549 data->res.count = 0; 553 data->res.count = 0;
550 data->res.fattr = &data->fattr; 554 data->res.fattr = &data->fattr;
551 data->res.verf = &data->verf; 555 data->res.verf = &data->verf;
@@ -682,6 +686,7 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
682 .rpc_client = NFS_CLIENT(inode), 686 .rpc_client = NFS_CLIENT(inode),
683 .rpc_message = &msg, 687 .rpc_message = &msg,
684 .callback_ops = &nfs_write_direct_ops, 688 .callback_ops = &nfs_write_direct_ops,
689 .workqueue = nfsiod_workqueue,
685 .flags = RPC_TASK_ASYNC, 690 .flags = RPC_TASK_ASYNC,
686 }; 691 };
687 size_t wsize = NFS_SERVER(inode)->wsize; 692 size_t wsize = NFS_SERVER(inode)->wsize;
@@ -728,7 +733,7 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
728 data->inode = inode; 733 data->inode = inode;
729 data->cred = msg.rpc_cred; 734 data->cred = msg.rpc_cred;
730 data->args.fh = NFS_FH(inode); 735 data->args.fh = NFS_FH(inode);
731 data->args.context = ctx; 736 data->args.context = get_nfs_open_context(ctx);
732 data->args.offset = pos; 737 data->args.offset = pos;
733 data->args.pgbase = pgbase; 738 data->args.pgbase = pgbase;
734 data->args.pages = data->pagevec; 739 data->args.pages = data->pagevec;
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index a4c7cf2bff3a..c49f6d8b42d2 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -522,8 +522,12 @@ struct nfs_open_context *get_nfs_open_context(struct nfs_open_context *ctx)
522 522
523static void __put_nfs_open_context(struct nfs_open_context *ctx, int wait) 523static void __put_nfs_open_context(struct nfs_open_context *ctx, int wait)
524{ 524{
525 struct inode *inode = ctx->path.dentry->d_inode; 525 struct inode *inode;
526 526
527 if (ctx == NULL)
528 return;
529
530 inode = ctx->path.dentry->d_inode;
527 if (!atomic_dec_and_lock(&ctx->count, &inode->i_lock)) 531 if (!atomic_dec_and_lock(&ctx->count, &inode->i_lock))
528 return; 532 return;
529 list_del(&ctx->list); 533 list_del(&ctx->list);
@@ -1217,6 +1221,36 @@ static void nfs_destroy_inodecache(void)
1217 kmem_cache_destroy(nfs_inode_cachep); 1221 kmem_cache_destroy(nfs_inode_cachep);
1218} 1222}
1219 1223
1224struct workqueue_struct *nfsiod_workqueue;
1225
1226/*
1227 * start up the nfsiod workqueue
1228 */
1229static int nfsiod_start(void)
1230{
1231 struct workqueue_struct *wq;
1232 dprintk("RPC: creating workqueue nfsiod\n");
1233 wq = create_singlethread_workqueue("nfsiod");
1234 if (wq == NULL)
1235 return -ENOMEM;
1236 nfsiod_workqueue = wq;
1237 return 0;
1238}
1239
1240/*
1241 * Destroy the nfsiod workqueue
1242 */
1243static void nfsiod_stop(void)
1244{
1245 struct workqueue_struct *wq;
1246
1247 wq = nfsiod_workqueue;
1248 if (wq == NULL)
1249 return;
1250 nfsiod_workqueue = NULL;
1251 destroy_workqueue(wq);
1252}
1253
1220/* 1254/*
1221 * Initialize NFS 1255 * Initialize NFS
1222 */ 1256 */
@@ -1224,6 +1258,10 @@ static int __init init_nfs_fs(void)
1224{ 1258{
1225 int err; 1259 int err;
1226 1260
1261 err = nfsiod_start();
1262 if (err)
1263 goto out6;
1264
1227 err = nfs_fs_proc_init(); 1265 err = nfs_fs_proc_init();
1228 if (err) 1266 if (err)
1229 goto out5; 1267 goto out5;
@@ -1270,6 +1308,8 @@ out3:
1270out4: 1308out4:
1271 nfs_fs_proc_exit(); 1309 nfs_fs_proc_exit();
1272out5: 1310out5:
1311 nfsiod_stop();
1312out6:
1273 return err; 1313 return err;
1274} 1314}
1275 1315
@@ -1285,6 +1325,7 @@ static void __exit exit_nfs_fs(void)
1285#endif 1325#endif
1286 unregister_nfs_fs(); 1326 unregister_nfs_fs();
1287 nfs_fs_proc_exit(); 1327 nfs_fs_proc_exit();
1328 nfsiod_stop();
1288} 1329}
1289 1330
1290/* Not quite true; I just maintain it */ 1331/* Not quite true; I just maintain it */
diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index 931992763e68..4c1122a13844 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -146,6 +146,7 @@ extern struct rpc_procinfo nfs4_procedures[];
146extern int nfs_access_cache_shrinker(int nr_to_scan, gfp_t gfp_mask); 146extern int nfs_access_cache_shrinker(int nr_to_scan, gfp_t gfp_mask);
147 147
148/* inode.c */ 148/* inode.c */
149extern struct workqueue_struct *nfsiod_workqueue;
149extern struct inode *nfs_alloc_inode(struct super_block *sb); 150extern struct inode *nfs_alloc_inode(struct super_block *sb);
150extern void nfs_destroy_inode(struct inode *); 151extern void nfs_destroy_inode(struct inode *);
151extern int nfs_write_inode(struct inode *,int); 152extern int nfs_write_inode(struct inode *,int);
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 7ce07862c2fb..bbb0d58ee6ac 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -51,6 +51,7 @@
51 51
52#include "nfs4_fs.h" 52#include "nfs4_fs.h"
53#include "delegation.h" 53#include "delegation.h"
54#include "internal.h"
54#include "iostat.h" 55#include "iostat.h"
55 56
56#define NFSDBG_FACILITY NFSDBG_PROC 57#define NFSDBG_FACILITY NFSDBG_PROC
@@ -773,6 +774,7 @@ static int _nfs4_proc_open_confirm(struct nfs4_opendata *data)
773 .rpc_message = &msg, 774 .rpc_message = &msg,
774 .callback_ops = &nfs4_open_confirm_ops, 775 .callback_ops = &nfs4_open_confirm_ops,
775 .callback_data = data, 776 .callback_data = data,
777 .workqueue = nfsiod_workqueue,
776 .flags = RPC_TASK_ASYNC, 778 .flags = RPC_TASK_ASYNC,
777 }; 779 };
778 int status; 780 int status;
@@ -910,6 +912,7 @@ static int _nfs4_proc_open(struct nfs4_opendata *data)
910 .rpc_message = &msg, 912 .rpc_message = &msg,
911 .callback_ops = &nfs4_open_ops, 913 .callback_ops = &nfs4_open_ops,
912 .callback_data = data, 914 .callback_data = data,
915 .workqueue = nfsiod_workqueue,
913 .flags = RPC_TASK_ASYNC, 916 .flags = RPC_TASK_ASYNC,
914 }; 917 };
915 int status; 918 int status;
@@ -1315,6 +1318,7 @@ int nfs4_do_close(struct path *path, struct nfs4_state *state, int wait)
1315 .rpc_client = server->client, 1318 .rpc_client = server->client,
1316 .rpc_message = &msg, 1319 .rpc_message = &msg,
1317 .callback_ops = &nfs4_close_ops, 1320 .callback_ops = &nfs4_close_ops,
1321 .workqueue = nfsiod_workqueue,
1318 .flags = RPC_TASK_ASYNC, 1322 .flags = RPC_TASK_ASYNC,
1319 }; 1323 };
1320 int status = -ENOMEM; 1324 int status = -ENOMEM;
@@ -2761,10 +2765,10 @@ nfs4_async_handle_error(struct rpc_task *task, const struct nfs_server *server)
2761 case -NFS4ERR_STALE_CLIENTID: 2765 case -NFS4ERR_STALE_CLIENTID:
2762 case -NFS4ERR_STALE_STATEID: 2766 case -NFS4ERR_STALE_STATEID:
2763 case -NFS4ERR_EXPIRED: 2767 case -NFS4ERR_EXPIRED:
2764 rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL, NULL); 2768 rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL);
2765 nfs4_schedule_state_recovery(clp); 2769 nfs4_schedule_state_recovery(clp);
2766 if (test_bit(NFS4CLNT_STATE_RECOVER, &clp->cl_state) == 0) 2770 if (test_bit(NFS4CLNT_STATE_RECOVER, &clp->cl_state) == 0)
2767 rpc_wake_up_task(task); 2771 rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task);
2768 task->tk_status = 0; 2772 task->tk_status = 0;
2769 return -EAGAIN; 2773 return -EAGAIN;
2770 case -NFS4ERR_DELAY: 2774 case -NFS4ERR_DELAY:
@@ -3235,6 +3239,7 @@ static struct rpc_task *nfs4_do_unlck(struct file_lock *fl,
3235 .rpc_client = NFS_CLIENT(lsp->ls_state->inode), 3239 .rpc_client = NFS_CLIENT(lsp->ls_state->inode),
3236 .rpc_message = &msg, 3240 .rpc_message = &msg,
3237 .callback_ops = &nfs4_locku_ops, 3241 .callback_ops = &nfs4_locku_ops,
3242 .workqueue = nfsiod_workqueue,
3238 .flags = RPC_TASK_ASYNC, 3243 .flags = RPC_TASK_ASYNC,
3239 }; 3244 };
3240 3245
@@ -3419,6 +3424,7 @@ static int _nfs4_do_setlk(struct nfs4_state *state, int cmd, struct file_lock *f
3419 .rpc_client = NFS_CLIENT(state->inode), 3424 .rpc_client = NFS_CLIENT(state->inode),
3420 .rpc_message = &msg, 3425 .rpc_message = &msg,
3421 .callback_ops = &nfs4_lock_ops, 3426 .callback_ops = &nfs4_lock_ops,
3427 .workqueue = nfsiod_workqueue,
3422 .flags = RPC_TASK_ASYNC, 3428 .flags = RPC_TASK_ASYNC,
3423 }; 3429 };
3424 int ret; 3430 int ret;
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index b962397004c1..7775435ea7a5 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -292,8 +292,10 @@ struct nfs4_state_owner *nfs4_get_state_owner(struct nfs_server *server, struct
292 spin_unlock(&clp->cl_lock); 292 spin_unlock(&clp->cl_lock);
293 if (sp == new) 293 if (sp == new)
294 get_rpccred(cred); 294 get_rpccred(cred);
295 else 295 else {
296 rpc_destroy_wait_queue(&new->so_sequence.wait);
296 kfree(new); 297 kfree(new);
298 }
297 return sp; 299 return sp;
298} 300}
299 301
@@ -310,6 +312,7 @@ void nfs4_put_state_owner(struct nfs4_state_owner *sp)
310 return; 312 return;
311 nfs4_remove_state_owner(clp, sp); 313 nfs4_remove_state_owner(clp, sp);
312 spin_unlock(&clp->cl_lock); 314 spin_unlock(&clp->cl_lock);
315 rpc_destroy_wait_queue(&sp->so_sequence.wait);
313 put_rpccred(cred); 316 put_rpccred(cred);
314 kfree(sp); 317 kfree(sp);
315} 318}
@@ -529,6 +532,7 @@ static void nfs4_free_lock_state(struct nfs4_lock_state *lsp)
529 spin_lock(&clp->cl_lock); 532 spin_lock(&clp->cl_lock);
530 nfs_free_unique_id(&clp->cl_lockowner_id, &lsp->ls_id); 533 nfs_free_unique_id(&clp->cl_lockowner_id, &lsp->ls_id);
531 spin_unlock(&clp->cl_lock); 534 spin_unlock(&clp->cl_lock);
535 rpc_destroy_wait_queue(&lsp->ls_sequence.wait);
532 kfree(lsp); 536 kfree(lsp);
533} 537}
534 538
@@ -731,7 +735,7 @@ int nfs_wait_on_sequence(struct nfs_seqid *seqid, struct rpc_task *task)
731 list_add_tail(&seqid->list, &sequence->list); 735 list_add_tail(&seqid->list, &sequence->list);
732 if (list_first_entry(&sequence->list, struct nfs_seqid, list) == seqid) 736 if (list_first_entry(&sequence->list, struct nfs_seqid, list) == seqid)
733 goto unlock; 737 goto unlock;
734 rpc_sleep_on(&sequence->wait, task, NULL, NULL); 738 rpc_sleep_on(&sequence->wait, task, NULL);
735 status = -EAGAIN; 739 status = -EAGAIN;
736unlock: 740unlock:
737 spin_unlock(&sequence->lock); 741 spin_unlock(&sequence->lock);
diff --git a/fs/nfs/read.c b/fs/nfs/read.c
index 3d7d9631e125..be9e8270f4d7 100644
--- a/fs/nfs/read.c
+++ b/fs/nfs/read.c
@@ -58,22 +58,19 @@ struct nfs_read_data *nfs_readdata_alloc(unsigned int pagecount)
58 return p; 58 return p;
59} 59}
60 60
61static void nfs_readdata_rcu_free(struct rcu_head *head) 61static void nfs_readdata_free(struct nfs_read_data *p)
62{ 62{
63 struct nfs_read_data *p = container_of(head, struct nfs_read_data, task.u.tk_rcu);
64 if (p && (p->pagevec != &p->page_array[0])) 63 if (p && (p->pagevec != &p->page_array[0]))
65 kfree(p->pagevec); 64 kfree(p->pagevec);
66 mempool_free(p, nfs_rdata_mempool); 65 mempool_free(p, nfs_rdata_mempool);
67} 66}
68 67
69static void nfs_readdata_free(struct nfs_read_data *rdata)
70{
71 call_rcu_bh(&rdata->task.u.tk_rcu, nfs_readdata_rcu_free);
72}
73
74void nfs_readdata_release(void *data) 68void nfs_readdata_release(void *data)
75{ 69{
76 nfs_readdata_free(data); 70 struct nfs_read_data *rdata = data;
71
72 put_nfs_open_context(rdata->args.context);
73 nfs_readdata_free(rdata);
77} 74}
78 75
79static 76static
@@ -174,6 +171,7 @@ static void nfs_read_rpcsetup(struct nfs_page *req, struct nfs_read_data *data,
174 .rpc_message = &msg, 171 .rpc_message = &msg,
175 .callback_ops = call_ops, 172 .callback_ops = call_ops,
176 .callback_data = data, 173 .callback_data = data,
174 .workqueue = nfsiod_workqueue,
177 .flags = RPC_TASK_ASYNC | swap_flags, 175 .flags = RPC_TASK_ASYNC | swap_flags,
178 }; 176 };
179 177
@@ -186,7 +184,7 @@ static void nfs_read_rpcsetup(struct nfs_page *req, struct nfs_read_data *data,
186 data->args.pgbase = req->wb_pgbase + offset; 184 data->args.pgbase = req->wb_pgbase + offset;
187 data->args.pages = data->pagevec; 185 data->args.pages = data->pagevec;
188 data->args.count = count; 186 data->args.count = count;
189 data->args.context = req->wb_context; 187 data->args.context = get_nfs_open_context(req->wb_context);
190 188
191 data->res.fattr = &data->fattr; 189 data->res.fattr = &data->fattr;
192 data->res.count = count; 190 data->res.count = count;
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index 80c61fdb2720..1667e3984418 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -58,19 +58,13 @@ struct nfs_write_data *nfs_commit_alloc(void)
58 return p; 58 return p;
59} 59}
60 60
61static void nfs_commit_rcu_free(struct rcu_head *head) 61void nfs_commit_free(struct nfs_write_data *p)
62{ 62{
63 struct nfs_write_data *p = container_of(head, struct nfs_write_data, task.u.tk_rcu);
64 if (p && (p->pagevec != &p->page_array[0])) 63 if (p && (p->pagevec != &p->page_array[0]))
65 kfree(p->pagevec); 64 kfree(p->pagevec);
66 mempool_free(p, nfs_commit_mempool); 65 mempool_free(p, nfs_commit_mempool);
67} 66}
68 67
69void nfs_commit_free(struct nfs_write_data *wdata)
70{
71 call_rcu_bh(&wdata->task.u.tk_rcu, nfs_commit_rcu_free);
72}
73
74struct nfs_write_data *nfs_writedata_alloc(unsigned int pagecount) 68struct nfs_write_data *nfs_writedata_alloc(unsigned int pagecount)
75{ 69{
76 struct nfs_write_data *p = mempool_alloc(nfs_wdata_mempool, GFP_NOFS); 70 struct nfs_write_data *p = mempool_alloc(nfs_wdata_mempool, GFP_NOFS);
@@ -92,21 +86,18 @@ struct nfs_write_data *nfs_writedata_alloc(unsigned int pagecount)
92 return p; 86 return p;
93} 87}
94 88
95static void nfs_writedata_rcu_free(struct rcu_head *head) 89static void nfs_writedata_free(struct nfs_write_data *p)
96{ 90{
97 struct nfs_write_data *p = container_of(head, struct nfs_write_data, task.u.tk_rcu);
98 if (p && (p->pagevec != &p->page_array[0])) 91 if (p && (p->pagevec != &p->page_array[0]))
99 kfree(p->pagevec); 92 kfree(p->pagevec);
100 mempool_free(p, nfs_wdata_mempool); 93 mempool_free(p, nfs_wdata_mempool);
101} 94}
102 95
103static void nfs_writedata_free(struct nfs_write_data *wdata) 96void nfs_writedata_release(void *data)
104{ 97{
105 call_rcu_bh(&wdata->task.u.tk_rcu, nfs_writedata_rcu_free); 98 struct nfs_write_data *wdata = data;
106}
107 99
108void nfs_writedata_release(void *wdata) 100 put_nfs_open_context(wdata->args.context);
109{
110 nfs_writedata_free(wdata); 101 nfs_writedata_free(wdata);
111} 102}
112 103
@@ -360,15 +351,13 @@ int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc)
360/* 351/*
361 * Insert a write request into an inode 352 * Insert a write request into an inode
362 */ 353 */
363static int nfs_inode_add_request(struct inode *inode, struct nfs_page *req) 354static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
364{ 355{
365 struct nfs_inode *nfsi = NFS_I(inode); 356 struct nfs_inode *nfsi = NFS_I(inode);
366 int error; 357 int error;
367 358
368 error = radix_tree_insert(&nfsi->nfs_page_tree, req->wb_index, req); 359 error = radix_tree_insert(&nfsi->nfs_page_tree, req->wb_index, req);
369 BUG_ON(error == -EEXIST); 360 BUG_ON(error);
370 if (error)
371 return error;
372 if (!nfsi->npages) { 361 if (!nfsi->npages) {
373 igrab(inode); 362 igrab(inode);
374 if (nfs_have_delegation(inode, FMODE_WRITE)) 363 if (nfs_have_delegation(inode, FMODE_WRITE))
@@ -378,8 +367,8 @@ static int nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
378 set_page_private(req->wb_page, (unsigned long)req); 367 set_page_private(req->wb_page, (unsigned long)req);
379 nfsi->npages++; 368 nfsi->npages++;
380 kref_get(&req->wb_kref); 369 kref_get(&req->wb_kref);
381 radix_tree_tag_set(&nfsi->nfs_page_tree, req->wb_index, NFS_PAGE_TAG_LOCKED); 370 radix_tree_tag_set(&nfsi->nfs_page_tree, req->wb_index,
382 return 0; 371 NFS_PAGE_TAG_LOCKED);
383} 372}
384 373
385/* 374/*
@@ -591,6 +580,13 @@ static struct nfs_page * nfs_update_request(struct nfs_open_context* ctx,
591 /* Loop over all inode entries and see if we find 580 /* Loop over all inode entries and see if we find
592 * A request for the page we wish to update 581 * A request for the page we wish to update
593 */ 582 */
583 if (new) {
584 if (radix_tree_preload(GFP_NOFS)) {
585 nfs_release_request(new);
586 return ERR_PTR(-ENOMEM);
587 }
588 }
589
594 spin_lock(&inode->i_lock); 590 spin_lock(&inode->i_lock);
595 req = nfs_page_find_request_locked(page); 591 req = nfs_page_find_request_locked(page);
596 if (req) { 592 if (req) {
@@ -601,28 +597,27 @@ static struct nfs_page * nfs_update_request(struct nfs_open_context* ctx,
601 error = nfs_wait_on_request(req); 597 error = nfs_wait_on_request(req);
602 nfs_release_request(req); 598 nfs_release_request(req);
603 if (error < 0) { 599 if (error < 0) {
604 if (new) 600 if (new) {
601 radix_tree_preload_end();
605 nfs_release_request(new); 602 nfs_release_request(new);
603 }
606 return ERR_PTR(error); 604 return ERR_PTR(error);
607 } 605 }
608 continue; 606 continue;
609 } 607 }
610 spin_unlock(&inode->i_lock); 608 spin_unlock(&inode->i_lock);
611 if (new) 609 if (new) {
610 radix_tree_preload_end();
612 nfs_release_request(new); 611 nfs_release_request(new);
612 }
613 break; 613 break;
614 } 614 }
615 615
616 if (new) { 616 if (new) {
617 int error;
618 nfs_lock_request_dontget(new); 617 nfs_lock_request_dontget(new);
619 error = nfs_inode_add_request(inode, new); 618 nfs_inode_add_request(inode, new);
620 if (error) {
621 spin_unlock(&inode->i_lock);
622 nfs_unlock_request(new);
623 return ERR_PTR(error);
624 }
625 spin_unlock(&inode->i_lock); 619 spin_unlock(&inode->i_lock);
620 radix_tree_preload_end();
626 req = new; 621 req = new;
627 goto zero_page; 622 goto zero_page;
628 } 623 }
@@ -800,6 +795,7 @@ static void nfs_write_rpcsetup(struct nfs_page *req,
800 .rpc_message = &msg, 795 .rpc_message = &msg,
801 .callback_ops = call_ops, 796 .callback_ops = call_ops,
802 .callback_data = data, 797 .callback_data = data,
798 .workqueue = nfsiod_workqueue,
803 .flags = flags, 799 .flags = flags,
804 .priority = priority, 800 .priority = priority,
805 }; 801 };
@@ -816,7 +812,7 @@ static void nfs_write_rpcsetup(struct nfs_page *req,
816 data->args.pgbase = req->wb_pgbase + offset; 812 data->args.pgbase = req->wb_pgbase + offset;
817 data->args.pages = data->pagevec; 813 data->args.pages = data->pagevec;
818 data->args.count = count; 814 data->args.count = count;
819 data->args.context = req->wb_context; 815 data->args.context = get_nfs_open_context(req->wb_context);
820 data->args.stable = NFS_UNSTABLE; 816 data->args.stable = NFS_UNSTABLE;
821 if (how & FLUSH_STABLE) { 817 if (how & FLUSH_STABLE) {
822 data->args.stable = NFS_DATA_SYNC; 818 data->args.stable = NFS_DATA_SYNC;
@@ -1153,8 +1149,11 @@ int nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
1153 1149
1154 1150
1155#if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 1151#if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4)
1156void nfs_commit_release(void *wdata) 1152void nfs_commit_release(void *data)
1157{ 1153{
1154 struct nfs_write_data *wdata = data;
1155
1156 put_nfs_open_context(wdata->args.context);
1158 nfs_commit_free(wdata); 1157 nfs_commit_free(wdata);
1159} 1158}
1160 1159
@@ -1181,6 +1180,7 @@ static void nfs_commit_rpcsetup(struct list_head *head,
1181 .rpc_message = &msg, 1180 .rpc_message = &msg,
1182 .callback_ops = &nfs_commit_ops, 1181 .callback_ops = &nfs_commit_ops,
1183 .callback_data = data, 1182 .callback_data = data,
1183 .workqueue = nfsiod_workqueue,
1184 .flags = flags, 1184 .flags = flags,
1185 .priority = priority, 1185 .priority = priority,
1186 }; 1186 };
@@ -1197,6 +1197,7 @@ static void nfs_commit_rpcsetup(struct list_head *head,
1197 /* Note: we always request a commit of the entire inode */ 1197 /* Note: we always request a commit of the entire inode */
1198 data->args.offset = 0; 1198 data->args.offset = 0;
1199 data->args.count = 0; 1199 data->args.count = 0;
1200 data->args.context = get_nfs_open_context(first->wb_context);
1200 data->res.count = 0; 1201 data->res.count = 0;
1201 data->res.fattr = &data->fattr; 1202 data->res.fattr = &data->fattr;
1202 data->res.verf = &data->verf; 1203 data->res.verf = &data->verf;
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 129a86e25d29..6fff7f82ef12 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -127,11 +127,12 @@ int rpcb_getport_sync(struct sockaddr_in *, u32, u32, int);
127void rpcb_getport_async(struct rpc_task *); 127void rpcb_getport_async(struct rpc_task *);
128 128
129void rpc_call_start(struct rpc_task *); 129void rpc_call_start(struct rpc_task *);
130int rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, 130int rpc_call_async(struct rpc_clnt *clnt,
131 int flags, const struct rpc_call_ops *tk_ops, 131 const struct rpc_message *msg, int flags,
132 const struct rpc_call_ops *tk_ops,
132 void *calldata); 133 void *calldata);
133int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, 134int rpc_call_sync(struct rpc_clnt *clnt,
134 int flags); 135 const struct rpc_message *msg, int flags);
135struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, 136struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred,
136 int flags); 137 int flags);
137void rpc_restart_call(struct rpc_task *); 138void rpc_restart_call(struct rpc_task *);
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index f689f02e6793..d1a5c8c1a0f1 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -11,7 +11,6 @@
11 11
12#include <linux/timer.h> 12#include <linux/timer.h>
13#include <linux/sunrpc/types.h> 13#include <linux/sunrpc/types.h>
14#include <linux/rcupdate.h>
15#include <linux/spinlock.h> 14#include <linux/spinlock.h>
16#include <linux/wait.h> 15#include <linux/wait.h>
17#include <linux/workqueue.h> 16#include <linux/workqueue.h>
@@ -33,7 +32,8 @@ struct rpc_wait_queue;
33struct rpc_wait { 32struct rpc_wait {
34 struct list_head list; /* wait queue links */ 33 struct list_head list; /* wait queue links */
35 struct list_head links; /* Links to related tasks */ 34 struct list_head links; /* Links to related tasks */
36 struct rpc_wait_queue * rpc_waitq; /* RPC wait queue we're on */ 35 struct list_head timer_list; /* Timer list */
36 unsigned long expires;
37}; 37};
38 38
39/* 39/*
@@ -57,33 +57,25 @@ struct rpc_task {
57 __u8 tk_cred_retry; 57 __u8 tk_cred_retry;
58 58
59 /* 59 /*
60 * timeout_fn to be executed by timer bottom half
61 * callback to be executed after waking up 60 * callback to be executed after waking up
62 * action next procedure for async tasks 61 * action next procedure for async tasks
63 * tk_ops caller callbacks 62 * tk_ops caller callbacks
64 */ 63 */
65 void (*tk_timeout_fn)(struct rpc_task *);
66 void (*tk_callback)(struct rpc_task *); 64 void (*tk_callback)(struct rpc_task *);
67 void (*tk_action)(struct rpc_task *); 65 void (*tk_action)(struct rpc_task *);
68 const struct rpc_call_ops *tk_ops; 66 const struct rpc_call_ops *tk_ops;
69 void * tk_calldata; 67 void * tk_calldata;
70 68
71 /*
72 * tk_timer is used for async processing by the RPC scheduling
73 * primitives. You should not access this directly unless
74 * you have a pathological interest in kernel oopses.
75 */
76 struct timer_list tk_timer; /* kernel timer */
77 unsigned long tk_timeout; /* timeout for rpc_sleep() */ 69 unsigned long tk_timeout; /* timeout for rpc_sleep() */
78 unsigned short tk_flags; /* misc flags */ 70 unsigned short tk_flags; /* misc flags */
79 unsigned long tk_runstate; /* Task run status */ 71 unsigned long tk_runstate; /* Task run status */
80 struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could 72 struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could
81 * be any workqueue 73 * be any workqueue
82 */ 74 */
75 struct rpc_wait_queue *tk_waitqueue; /* RPC wait queue we're on */
83 union { 76 union {
84 struct work_struct tk_work; /* Async task work queue */ 77 struct work_struct tk_work; /* Async task work queue */
85 struct rpc_wait tk_wait; /* RPC wait */ 78 struct rpc_wait tk_wait; /* RPC wait */
86 struct rcu_head tk_rcu; /* for task deletion */
87 } u; 79 } u;
88 80
89 unsigned short tk_timeouts; /* maj timeouts */ 81 unsigned short tk_timeouts; /* maj timeouts */
@@ -123,6 +115,7 @@ struct rpc_task_setup {
123 const struct rpc_message *rpc_message; 115 const struct rpc_message *rpc_message;
124 const struct rpc_call_ops *callback_ops; 116 const struct rpc_call_ops *callback_ops;
125 void *callback_data; 117 void *callback_data;
118 struct workqueue_struct *workqueue;
126 unsigned short flags; 119 unsigned short flags;
127 signed char priority; 120 signed char priority;
128}; 121};
@@ -147,9 +140,7 @@ struct rpc_task_setup {
147 140
148#define RPC_TASK_RUNNING 0 141#define RPC_TASK_RUNNING 0
149#define RPC_TASK_QUEUED 1 142#define RPC_TASK_QUEUED 1
150#define RPC_TASK_WAKEUP 2 143#define RPC_TASK_ACTIVE 2
151#define RPC_TASK_HAS_TIMER 3
152#define RPC_TASK_ACTIVE 4
153 144
154#define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate) 145#define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
155#define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate) 146#define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
@@ -171,15 +162,6 @@ struct rpc_task_setup {
171 smp_mb__after_clear_bit(); \ 162 smp_mb__after_clear_bit(); \
172 } while (0) 163 } while (0)
173 164
174#define rpc_start_wakeup(t) \
175 (test_and_set_bit(RPC_TASK_WAKEUP, &(t)->tk_runstate) == 0)
176#define rpc_finish_wakeup(t) \
177 do { \
178 smp_mb__before_clear_bit(); \
179 clear_bit(RPC_TASK_WAKEUP, &(t)->tk_runstate); \
180 smp_mb__after_clear_bit(); \
181 } while (0)
182
183#define RPC_IS_ACTIVATED(t) test_bit(RPC_TASK_ACTIVE, &(t)->tk_runstate) 165#define RPC_IS_ACTIVATED(t) test_bit(RPC_TASK_ACTIVE, &(t)->tk_runstate)
184 166
185/* 167/*
@@ -192,6 +174,12 @@ struct rpc_task_setup {
192#define RPC_PRIORITY_HIGH (1) 174#define RPC_PRIORITY_HIGH (1)
193#define RPC_NR_PRIORITY (1 + RPC_PRIORITY_HIGH - RPC_PRIORITY_LOW) 175#define RPC_NR_PRIORITY (1 + RPC_PRIORITY_HIGH - RPC_PRIORITY_LOW)
194 176
177struct rpc_timer {
178 struct timer_list timer;
179 struct list_head list;
180 unsigned long expires;
181};
182
195/* 183/*
196 * RPC synchronization objects 184 * RPC synchronization objects
197 */ 185 */
@@ -204,6 +192,7 @@ struct rpc_wait_queue {
204 unsigned char count; /* # task groups remaining serviced so far */ 192 unsigned char count; /* # task groups remaining serviced so far */
205 unsigned char nr; /* # tasks remaining for cookie */ 193 unsigned char nr; /* # tasks remaining for cookie */
206 unsigned short qlen; /* total # tasks waiting in queue */ 194 unsigned short qlen; /* total # tasks waiting in queue */
195 struct rpc_timer timer_list;
207#ifdef RPC_DEBUG 196#ifdef RPC_DEBUG
208 const char * name; 197 const char * name;
209#endif 198#endif
@@ -229,9 +218,11 @@ void rpc_killall_tasks(struct rpc_clnt *);
229void rpc_execute(struct rpc_task *); 218void rpc_execute(struct rpc_task *);
230void rpc_init_priority_wait_queue(struct rpc_wait_queue *, const char *); 219void rpc_init_priority_wait_queue(struct rpc_wait_queue *, const char *);
231void rpc_init_wait_queue(struct rpc_wait_queue *, const char *); 220void rpc_init_wait_queue(struct rpc_wait_queue *, const char *);
221void rpc_destroy_wait_queue(struct rpc_wait_queue *);
232void rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *, 222void rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *,
233 rpc_action action, rpc_action timer); 223 rpc_action action);
234void rpc_wake_up_task(struct rpc_task *); 224void rpc_wake_up_queued_task(struct rpc_wait_queue *,
225 struct rpc_task *);
235void rpc_wake_up(struct rpc_wait_queue *); 226void rpc_wake_up(struct rpc_wait_queue *);
236struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *); 227struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
237void rpc_wake_up_status(struct rpc_wait_queue *, int); 228void rpc_wake_up_status(struct rpc_wait_queue *, int);
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 6dac38792288..ef6384961808 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -266,6 +266,7 @@ gss_release_msg(struct gss_upcall_msg *gss_msg)
266 BUG_ON(!list_empty(&gss_msg->list)); 266 BUG_ON(!list_empty(&gss_msg->list));
267 if (gss_msg->ctx != NULL) 267 if (gss_msg->ctx != NULL)
268 gss_put_ctx(gss_msg->ctx); 268 gss_put_ctx(gss_msg->ctx);
269 rpc_destroy_wait_queue(&gss_msg->rpc_waitqueue);
269 kfree(gss_msg); 270 kfree(gss_msg);
270} 271}
271 272
@@ -408,13 +409,13 @@ gss_refresh_upcall(struct rpc_task *task)
408 } 409 }
409 spin_lock(&inode->i_lock); 410 spin_lock(&inode->i_lock);
410 if (gss_cred->gc_upcall != NULL) 411 if (gss_cred->gc_upcall != NULL)
411 rpc_sleep_on(&gss_cred->gc_upcall->rpc_waitqueue, task, NULL, NULL); 412 rpc_sleep_on(&gss_cred->gc_upcall->rpc_waitqueue, task, NULL);
412 else if (gss_msg->ctx == NULL && gss_msg->msg.errno >= 0) { 413 else if (gss_msg->ctx == NULL && gss_msg->msg.errno >= 0) {
413 task->tk_timeout = 0; 414 task->tk_timeout = 0;
414 gss_cred->gc_upcall = gss_msg; 415 gss_cred->gc_upcall = gss_msg;
415 /* gss_upcall_callback will release the reference to gss_upcall_msg */ 416 /* gss_upcall_callback will release the reference to gss_upcall_msg */
416 atomic_inc(&gss_msg->count); 417 atomic_inc(&gss_msg->count);
417 rpc_sleep_on(&gss_msg->rpc_waitqueue, task, gss_upcall_callback, NULL); 418 rpc_sleep_on(&gss_msg->rpc_waitqueue, task, gss_upcall_callback);
418 } else 419 } else
419 err = gss_msg->msg.errno; 420 err = gss_msg->msg.errno;
420 spin_unlock(&inode->i_lock); 421 spin_unlock(&inode->i_lock);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 8c6a7f1a25e9..ea14314331b0 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -548,7 +548,7 @@ EXPORT_SYMBOL_GPL(rpc_run_task);
548 * @msg: RPC call parameters 548 * @msg: RPC call parameters
549 * @flags: RPC call flags 549 * @flags: RPC call flags
550 */ 550 */
551int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags) 551int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
552{ 552{
553 struct rpc_task *task; 553 struct rpc_task *task;
554 struct rpc_task_setup task_setup_data = { 554 struct rpc_task_setup task_setup_data = {
@@ -579,7 +579,7 @@ EXPORT_SYMBOL_GPL(rpc_call_sync);
579 * @data: user call data 579 * @data: user call data
580 */ 580 */
581int 581int
582rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags, 582rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
583 const struct rpc_call_ops *tk_ops, void *data) 583 const struct rpc_call_ops *tk_ops, void *data)
584{ 584{
585 struct rpc_task *task; 585 struct rpc_task *task;
@@ -1066,7 +1066,7 @@ call_transmit(struct rpc_task *task)
1066 if (task->tk_msg.rpc_proc->p_decode != NULL) 1066 if (task->tk_msg.rpc_proc->p_decode != NULL)
1067 return; 1067 return;
1068 task->tk_action = rpc_exit_task; 1068 task->tk_action = rpc_exit_task;
1069 rpc_wake_up_task(task); 1069 rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
1070} 1070}
1071 1071
1072/* 1072/*
@@ -1535,7 +1535,7 @@ void rpc_show_tasks(void)
1535 proc = -1; 1535 proc = -1;
1536 1536
1537 if (RPC_IS_QUEUED(t)) 1537 if (RPC_IS_QUEUED(t))
1538 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); 1538 rpc_waitq = rpc_qname(t->tk_waitqueue);
1539 1539
1540 printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n", 1540 printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
1541 t->tk_pid, proc, 1541 t->tk_pid, proc,
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index 3164a0871cf0..f480c718b400 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -298,7 +298,7 @@ void rpcb_getport_async(struct rpc_task *task)
298 298
299 /* Put self on queue before sending rpcbind request, in case 299 /* Put self on queue before sending rpcbind request, in case
300 * rpcb_getport_done completes before we return from rpc_run_task */ 300 * rpcb_getport_done completes before we return from rpc_run_task */
301 rpc_sleep_on(&xprt->binding, task, NULL, NULL); 301 rpc_sleep_on(&xprt->binding, task, NULL);
302 302
303 /* Someone else may have bound if we slept */ 303 /* Someone else may have bound if we slept */
304 if (xprt_bound(xprt)) { 304 if (xprt_bound(xprt)) {
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 4c669121e607..cae219c8caeb 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -38,9 +38,9 @@ static struct kmem_cache *rpc_buffer_slabp __read_mostly;
38static mempool_t *rpc_task_mempool __read_mostly; 38static mempool_t *rpc_task_mempool __read_mostly;
39static mempool_t *rpc_buffer_mempool __read_mostly; 39static mempool_t *rpc_buffer_mempool __read_mostly;
40 40
41static void __rpc_default_timer(struct rpc_task *task);
42static void rpc_async_schedule(struct work_struct *); 41static void rpc_async_schedule(struct work_struct *);
43static void rpc_release_task(struct rpc_task *task); 42static void rpc_release_task(struct rpc_task *task);
43static void __rpc_queue_timer_fn(unsigned long ptr);
44 44
45/* 45/*
46 * RPC tasks sit here while waiting for conditions to improve. 46 * RPC tasks sit here while waiting for conditions to improve.
@@ -57,41 +57,30 @@ struct workqueue_struct *rpciod_workqueue;
57 * queue->lock and bh_disabled in order to avoid races within 57 * queue->lock and bh_disabled in order to avoid races within
58 * rpc_run_timer(). 58 * rpc_run_timer().
59 */ 59 */
60static inline void 60static void
61__rpc_disable_timer(struct rpc_task *task) 61__rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
62{ 62{
63 if (task->tk_timeout == 0)
64 return;
63 dprintk("RPC: %5u disabling timer\n", task->tk_pid); 65 dprintk("RPC: %5u disabling timer\n", task->tk_pid);
64 task->tk_timeout_fn = NULL;
65 task->tk_timeout = 0; 66 task->tk_timeout = 0;
67 list_del(&task->u.tk_wait.timer_list);
68 if (list_empty(&queue->timer_list.list))
69 del_timer(&queue->timer_list.timer);
66} 70}
67 71
68/* 72static void
69 * Run a timeout function. 73rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
70 * We use the callback in order to allow __rpc_wake_up_task()
71 * and friends to disable the timer synchronously on SMP systems
72 * without calling del_timer_sync(). The latter could cause a
73 * deadlock if called while we're holding spinlocks...
74 */
75static void rpc_run_timer(struct rpc_task *task)
76{ 74{
77 void (*callback)(struct rpc_task *); 75 queue->timer_list.expires = expires;
78 76 mod_timer(&queue->timer_list.timer, expires);
79 callback = task->tk_timeout_fn;
80 task->tk_timeout_fn = NULL;
81 if (callback && RPC_IS_QUEUED(task)) {
82 dprintk("RPC: %5u running timer\n", task->tk_pid);
83 callback(task);
84 }
85 smp_mb__before_clear_bit();
86 clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
87 smp_mb__after_clear_bit();
88} 77}
89 78
90/* 79/*
91 * Set up a timer for the current task. 80 * Set up a timer for the current task.
92 */ 81 */
93static inline void 82static void
94__rpc_add_timer(struct rpc_task *task, rpc_action timer) 83__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
95{ 84{
96 if (!task->tk_timeout) 85 if (!task->tk_timeout)
97 return; 86 return;
@@ -99,27 +88,10 @@ __rpc_add_timer(struct rpc_task *task, rpc_action timer)
99 dprintk("RPC: %5u setting alarm for %lu ms\n", 88 dprintk("RPC: %5u setting alarm for %lu ms\n",
100 task->tk_pid, task->tk_timeout * 1000 / HZ); 89 task->tk_pid, task->tk_timeout * 1000 / HZ);
101 90
102 if (timer) 91 task->u.tk_wait.expires = jiffies + task->tk_timeout;
103 task->tk_timeout_fn = timer; 92 if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
104 else 93 rpc_set_queue_timer(queue, task->u.tk_wait.expires);
105 task->tk_timeout_fn = __rpc_default_timer; 94 list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
106 set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
107 mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
108}
109
110/*
111 * Delete any timer for the current task. Because we use del_timer_sync(),
112 * this function should never be called while holding queue->lock.
113 */
114static void
115rpc_delete_timer(struct rpc_task *task)
116{
117 if (RPC_IS_QUEUED(task))
118 return;
119 if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
120 del_singleshot_timer_sync(&task->tk_timer);
121 dprintk("RPC: %5u deleting timer\n", task->tk_pid);
122 }
123} 95}
124 96
125/* 97/*
@@ -161,7 +133,7 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *
161 list_add(&task->u.tk_wait.list, &queue->tasks[0]); 133 list_add(&task->u.tk_wait.list, &queue->tasks[0]);
162 else 134 else
163 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); 135 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
164 task->u.tk_wait.rpc_waitq = queue; 136 task->tk_waitqueue = queue;
165 queue->qlen++; 137 queue->qlen++;
166 rpc_set_queued(task); 138 rpc_set_queued(task);
167 139
@@ -181,22 +153,18 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
181 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list); 153 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
182 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links); 154 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
183 } 155 }
184 list_del(&task->u.tk_wait.list);
185} 156}
186 157
187/* 158/*
188 * Remove request from queue. 159 * Remove request from queue.
189 * Note: must be called with spin lock held. 160 * Note: must be called with spin lock held.
190 */ 161 */
191static void __rpc_remove_wait_queue(struct rpc_task *task) 162static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
192{ 163{
193 struct rpc_wait_queue *queue; 164 __rpc_disable_timer(queue, task);
194 queue = task->u.tk_wait.rpc_waitq;
195
196 if (RPC_IS_PRIORITY(queue)) 165 if (RPC_IS_PRIORITY(queue))
197 __rpc_remove_wait_queue_priority(task); 166 __rpc_remove_wait_queue_priority(task);
198 else 167 list_del(&task->u.tk_wait.list);
199 list_del(&task->u.tk_wait.list);
200 queue->qlen--; 168 queue->qlen--;
201 dprintk("RPC: %5u removed from queue %p \"%s\"\n", 169 dprintk("RPC: %5u removed from queue %p \"%s\"\n",
202 task->tk_pid, queue, rpc_qname(queue)); 170 task->tk_pid, queue, rpc_qname(queue));
@@ -229,6 +197,9 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
229 INIT_LIST_HEAD(&queue->tasks[i]); 197 INIT_LIST_HEAD(&queue->tasks[i]);
230 queue->maxpriority = nr_queues - 1; 198 queue->maxpriority = nr_queues - 1;
231 rpc_reset_waitqueue_priority(queue); 199 rpc_reset_waitqueue_priority(queue);
200 queue->qlen = 0;
201 setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
202 INIT_LIST_HEAD(&queue->timer_list.list);
232#ifdef RPC_DEBUG 203#ifdef RPC_DEBUG
233 queue->name = qname; 204 queue->name = qname;
234#endif 205#endif
@@ -245,6 +216,12 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
245} 216}
246EXPORT_SYMBOL_GPL(rpc_init_wait_queue); 217EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
247 218
219void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
220{
221 del_timer_sync(&queue->timer_list.timer);
222}
223EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
224
248static int rpc_wait_bit_killable(void *word) 225static int rpc_wait_bit_killable(void *word)
249{ 226{
250 if (fatal_signal_pending(current)) 227 if (fatal_signal_pending(current))
@@ -313,7 +290,6 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
313 */ 290 */
314static void rpc_make_runnable(struct rpc_task *task) 291static void rpc_make_runnable(struct rpc_task *task)
315{ 292{
316 BUG_ON(task->tk_timeout_fn);
317 rpc_clear_queued(task); 293 rpc_clear_queued(task);
318 if (rpc_test_and_set_running(task)) 294 if (rpc_test_and_set_running(task))
319 return; 295 return;
@@ -326,7 +302,7 @@ static void rpc_make_runnable(struct rpc_task *task)
326 int status; 302 int status;
327 303
328 INIT_WORK(&task->u.tk_work, rpc_async_schedule); 304 INIT_WORK(&task->u.tk_work, rpc_async_schedule);
329 status = queue_work(task->tk_workqueue, &task->u.tk_work); 305 status = queue_work(rpciod_workqueue, &task->u.tk_work);
330 if (status < 0) { 306 if (status < 0) {
331 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); 307 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
332 task->tk_status = status; 308 task->tk_status = status;
@@ -343,7 +319,7 @@ static void rpc_make_runnable(struct rpc_task *task)
343 * as it's on a wait queue. 319 * as it's on a wait queue.
344 */ 320 */
345static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 321static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
346 rpc_action action, rpc_action timer) 322 rpc_action action)
347{ 323{
348 dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n", 324 dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
349 task->tk_pid, rpc_qname(q), jiffies); 325 task->tk_pid, rpc_qname(q), jiffies);
@@ -357,11 +333,11 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
357 333
358 BUG_ON(task->tk_callback != NULL); 334 BUG_ON(task->tk_callback != NULL);
359 task->tk_callback = action; 335 task->tk_callback = action;
360 __rpc_add_timer(task, timer); 336 __rpc_add_timer(q, task);
361} 337}
362 338
363void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 339void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
364 rpc_action action, rpc_action timer) 340 rpc_action action)
365{ 341{
366 /* Mark the task as being activated if so needed */ 342 /* Mark the task as being activated if so needed */
367 rpc_set_active(task); 343 rpc_set_active(task);
@@ -370,18 +346,19 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
370 * Protect the queue operations. 346 * Protect the queue operations.
371 */ 347 */
372 spin_lock_bh(&q->lock); 348 spin_lock_bh(&q->lock);
373 __rpc_sleep_on(q, task, action, timer); 349 __rpc_sleep_on(q, task, action);
374 spin_unlock_bh(&q->lock); 350 spin_unlock_bh(&q->lock);
375} 351}
376EXPORT_SYMBOL_GPL(rpc_sleep_on); 352EXPORT_SYMBOL_GPL(rpc_sleep_on);
377 353
378/** 354/**
379 * __rpc_do_wake_up_task - wake up a single rpc_task 355 * __rpc_do_wake_up_task - wake up a single rpc_task
356 * @queue: wait queue
380 * @task: task to be woken up 357 * @task: task to be woken up
381 * 358 *
382 * Caller must hold queue->lock, and have cleared the task queued flag. 359 * Caller must hold queue->lock, and have cleared the task queued flag.
383 */ 360 */
384static void __rpc_do_wake_up_task(struct rpc_task *task) 361static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
385{ 362{
386 dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", 363 dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
387 task->tk_pid, jiffies); 364 task->tk_pid, jiffies);
@@ -395,8 +372,7 @@ static void __rpc_do_wake_up_task(struct rpc_task *task)
395 return; 372 return;
396 } 373 }
397 374
398 __rpc_disable_timer(task); 375 __rpc_remove_wait_queue(queue, task);
399 __rpc_remove_wait_queue(task);
400 376
401 rpc_make_runnable(task); 377 rpc_make_runnable(task);
402 378
@@ -404,48 +380,32 @@ static void __rpc_do_wake_up_task(struct rpc_task *task)
404} 380}
405 381
406/* 382/*
407 * Wake up the specified task 383 * Wake up a queued task while the queue lock is being held
408 */ 384 */
409static void __rpc_wake_up_task(struct rpc_task *task) 385static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
410{ 386{
411 if (rpc_start_wakeup(task)) { 387 if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
412 if (RPC_IS_QUEUED(task)) 388 __rpc_do_wake_up_task(queue, task);
413 __rpc_do_wake_up_task(task);
414 rpc_finish_wakeup(task);
415 }
416} 389}
417 390
418/* 391/*
419 * Default timeout handler if none specified by user 392 * Wake up a task on a specific queue
420 */ 393 */
421static void 394void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
422__rpc_default_timer(struct rpc_task *task)
423{ 395{
424 dprintk("RPC: %5u timeout (default timer)\n", task->tk_pid); 396 spin_lock_bh(&queue->lock);
425 task->tk_status = -ETIMEDOUT; 397 rpc_wake_up_task_queue_locked(queue, task);
426 rpc_wake_up_task(task); 398 spin_unlock_bh(&queue->lock);
427} 399}
400EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
428 401
429/* 402/*
430 * Wake up the specified task 403 * Wake up the specified task
431 */ 404 */
432void rpc_wake_up_task(struct rpc_task *task) 405static void rpc_wake_up_task(struct rpc_task *task)
433{ 406{
434 rcu_read_lock_bh(); 407 rpc_wake_up_queued_task(task->tk_waitqueue, task);
435 if (rpc_start_wakeup(task)) {
436 if (RPC_IS_QUEUED(task)) {
437 struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
438
439 /* Note: we're already in a bh-safe context */
440 spin_lock(&queue->lock);
441 __rpc_do_wake_up_task(task);
442 spin_unlock(&queue->lock);
443 }
444 rpc_finish_wakeup(task);
445 }
446 rcu_read_unlock_bh();
447} 408}
448EXPORT_SYMBOL_GPL(rpc_wake_up_task);
449 409
450/* 410/*
451 * Wake up the next task on a priority queue. 411 * Wake up the next task on a priority queue.
@@ -495,7 +455,7 @@ new_queue:
495new_owner: 455new_owner:
496 rpc_set_waitqueue_owner(queue, task->tk_owner); 456 rpc_set_waitqueue_owner(queue, task->tk_owner);
497out: 457out:
498 __rpc_wake_up_task(task); 458 rpc_wake_up_task_queue_locked(queue, task);
499 return task; 459 return task;
500} 460}
501 461
@@ -508,16 +468,14 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
508 468
509 dprintk("RPC: wake_up_next(%p \"%s\")\n", 469 dprintk("RPC: wake_up_next(%p \"%s\")\n",
510 queue, rpc_qname(queue)); 470 queue, rpc_qname(queue));
511 rcu_read_lock_bh(); 471 spin_lock_bh(&queue->lock);
512 spin_lock(&queue->lock);
513 if (RPC_IS_PRIORITY(queue)) 472 if (RPC_IS_PRIORITY(queue))
514 task = __rpc_wake_up_next_priority(queue); 473 task = __rpc_wake_up_next_priority(queue);
515 else { 474 else {
516 task_for_first(task, &queue->tasks[0]) 475 task_for_first(task, &queue->tasks[0])
517 __rpc_wake_up_task(task); 476 rpc_wake_up_task_queue_locked(queue, task);
518 } 477 }
519 spin_unlock(&queue->lock); 478 spin_unlock_bh(&queue->lock);
520 rcu_read_unlock_bh();
521 479
522 return task; 480 return task;
523} 481}
@@ -534,18 +492,16 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
534 struct rpc_task *task, *next; 492 struct rpc_task *task, *next;
535 struct list_head *head; 493 struct list_head *head;
536 494
537 rcu_read_lock_bh(); 495 spin_lock_bh(&queue->lock);
538 spin_lock(&queue->lock);
539 head = &queue->tasks[queue->maxpriority]; 496 head = &queue->tasks[queue->maxpriority];
540 for (;;) { 497 for (;;) {
541 list_for_each_entry_safe(task, next, head, u.tk_wait.list) 498 list_for_each_entry_safe(task, next, head, u.tk_wait.list)
542 __rpc_wake_up_task(task); 499 rpc_wake_up_task_queue_locked(queue, task);
543 if (head == &queue->tasks[0]) 500 if (head == &queue->tasks[0])
544 break; 501 break;
545 head--; 502 head--;
546 } 503 }
547 spin_unlock(&queue->lock); 504 spin_unlock_bh(&queue->lock);
548 rcu_read_unlock_bh();
549} 505}
550EXPORT_SYMBOL_GPL(rpc_wake_up); 506EXPORT_SYMBOL_GPL(rpc_wake_up);
551 507
@@ -561,26 +517,48 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
561 struct rpc_task *task, *next; 517 struct rpc_task *task, *next;
562 struct list_head *head; 518 struct list_head *head;
563 519
564 rcu_read_lock_bh(); 520 spin_lock_bh(&queue->lock);
565 spin_lock(&queue->lock);
566 head = &queue->tasks[queue->maxpriority]; 521 head = &queue->tasks[queue->maxpriority];
567 for (;;) { 522 for (;;) {
568 list_for_each_entry_safe(task, next, head, u.tk_wait.list) { 523 list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
569 task->tk_status = status; 524 task->tk_status = status;
570 __rpc_wake_up_task(task); 525 rpc_wake_up_task_queue_locked(queue, task);
571 } 526 }
572 if (head == &queue->tasks[0]) 527 if (head == &queue->tasks[0])
573 break; 528 break;
574 head--; 529 head--;
575 } 530 }
576 spin_unlock(&queue->lock); 531 spin_unlock_bh(&queue->lock);
577 rcu_read_unlock_bh();
578} 532}
579EXPORT_SYMBOL_GPL(rpc_wake_up_status); 533EXPORT_SYMBOL_GPL(rpc_wake_up_status);
580 534
535static void __rpc_queue_timer_fn(unsigned long ptr)
536{
537 struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
538 struct rpc_task *task, *n;
539 unsigned long expires, now, timeo;
540
541 spin_lock(&queue->lock);
542 expires = now = jiffies;
543 list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
544 timeo = task->u.tk_wait.expires;
545 if (time_after_eq(now, timeo)) {
546 dprintk("RPC: %5u timeout\n", task->tk_pid);
547 task->tk_status = -ETIMEDOUT;
548 rpc_wake_up_task_queue_locked(queue, task);
549 continue;
550 }
551 if (expires == now || time_after(expires, timeo))
552 expires = timeo;
553 }
554 if (!list_empty(&queue->timer_list.list))
555 rpc_set_queue_timer(queue, expires);
556 spin_unlock(&queue->lock);
557}
558
581static void __rpc_atrun(struct rpc_task *task) 559static void __rpc_atrun(struct rpc_task *task)
582{ 560{
583 rpc_wake_up_task(task); 561 task->tk_status = 0;
584} 562}
585 563
586/* 564/*
@@ -589,7 +567,7 @@ static void __rpc_atrun(struct rpc_task *task)
589void rpc_delay(struct rpc_task *task, unsigned long delay) 567void rpc_delay(struct rpc_task *task, unsigned long delay)
590{ 568{
591 task->tk_timeout = delay; 569 task->tk_timeout = delay;
592 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); 570 rpc_sleep_on(&delay_queue, task, __rpc_atrun);
593} 571}
594EXPORT_SYMBOL_GPL(rpc_delay); 572EXPORT_SYMBOL_GPL(rpc_delay);
595 573
@@ -644,10 +622,6 @@ static void __rpc_execute(struct rpc_task *task)
644 BUG_ON(RPC_IS_QUEUED(task)); 622 BUG_ON(RPC_IS_QUEUED(task));
645 623
646 for (;;) { 624 for (;;) {
647 /*
648 * Garbage collection of pending timers...
649 */
650 rpc_delete_timer(task);
651 625
652 /* 626 /*
653 * Execute any pending callback. 627 * Execute any pending callback.
@@ -816,8 +790,6 @@ EXPORT_SYMBOL_GPL(rpc_free);
816static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data) 790static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
817{ 791{
818 memset(task, 0, sizeof(*task)); 792 memset(task, 0, sizeof(*task));
819 setup_timer(&task->tk_timer, (void (*)(unsigned long))rpc_run_timer,
820 (unsigned long)task);
821 atomic_set(&task->tk_count, 1); 793 atomic_set(&task->tk_count, 1);
822 task->tk_flags = task_setup_data->flags; 794 task->tk_flags = task_setup_data->flags;
823 task->tk_ops = task_setup_data->callback_ops; 795 task->tk_ops = task_setup_data->callback_ops;
@@ -832,7 +804,7 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta
832 task->tk_owner = current->tgid; 804 task->tk_owner = current->tgid;
833 805
834 /* Initialize workqueue for async tasks */ 806 /* Initialize workqueue for async tasks */
835 task->tk_workqueue = rpciod_workqueue; 807 task->tk_workqueue = task_setup_data->workqueue;
836 808
837 task->tk_client = task_setup_data->rpc_client; 809 task->tk_client = task_setup_data->rpc_client;
838 if (task->tk_client != NULL) { 810 if (task->tk_client != NULL) {
@@ -868,13 +840,6 @@ rpc_alloc_task(void)
868 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); 840 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
869} 841}
870 842
871static void rpc_free_task(struct rcu_head *rcu)
872{
873 struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
874 dprintk("RPC: %5u freeing task\n", task->tk_pid);
875 mempool_free(task, rpc_task_mempool);
876}
877
878/* 843/*
879 * Create a new task for the specified client. 844 * Create a new task for the specified client.
880 */ 845 */
@@ -898,12 +863,25 @@ out:
898 return task; 863 return task;
899} 864}
900 865
901 866static void rpc_free_task(struct rpc_task *task)
902void rpc_put_task(struct rpc_task *task)
903{ 867{
904 const struct rpc_call_ops *tk_ops = task->tk_ops; 868 const struct rpc_call_ops *tk_ops = task->tk_ops;
905 void *calldata = task->tk_calldata; 869 void *calldata = task->tk_calldata;
906 870
871 if (task->tk_flags & RPC_TASK_DYNAMIC) {
872 dprintk("RPC: %5u freeing task\n", task->tk_pid);
873 mempool_free(task, rpc_task_mempool);
874 }
875 rpc_release_calldata(tk_ops, calldata);
876}
877
878static void rpc_async_release(struct work_struct *work)
879{
880 rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
881}
882
883void rpc_put_task(struct rpc_task *task)
884{
907 if (!atomic_dec_and_test(&task->tk_count)) 885 if (!atomic_dec_and_test(&task->tk_count))
908 return; 886 return;
909 /* Release resources */ 887 /* Release resources */
@@ -915,9 +893,11 @@ void rpc_put_task(struct rpc_task *task)
915 rpc_release_client(task->tk_client); 893 rpc_release_client(task->tk_client);
916 task->tk_client = NULL; 894 task->tk_client = NULL;
917 } 895 }
918 if (task->tk_flags & RPC_TASK_DYNAMIC) 896 if (task->tk_workqueue != NULL) {
919 call_rcu_bh(&task->u.tk_rcu, rpc_free_task); 897 INIT_WORK(&task->u.tk_work, rpc_async_release);
920 rpc_release_calldata(tk_ops, calldata); 898 queue_work(task->tk_workqueue, &task->u.tk_work);
899 } else
900 rpc_free_task(task);
921} 901}
922EXPORT_SYMBOL_GPL(rpc_put_task); 902EXPORT_SYMBOL_GPL(rpc_put_task);
923 903
@@ -937,9 +917,6 @@ static void rpc_release_task(struct rpc_task *task)
937 } 917 }
938 BUG_ON (RPC_IS_QUEUED(task)); 918 BUG_ON (RPC_IS_QUEUED(task));
939 919
940 /* Synchronously delete any running timer */
941 rpc_delete_timer(task);
942
943#ifdef RPC_DEBUG 920#ifdef RPC_DEBUG
944 task->tk_magic = 0; 921 task->tk_magic = 0;
945#endif 922#endif
@@ -1029,11 +1006,20 @@ rpc_destroy_mempool(void)
1029 kmem_cache_destroy(rpc_task_slabp); 1006 kmem_cache_destroy(rpc_task_slabp);
1030 if (rpc_buffer_slabp) 1007 if (rpc_buffer_slabp)
1031 kmem_cache_destroy(rpc_buffer_slabp); 1008 kmem_cache_destroy(rpc_buffer_slabp);
1009 rpc_destroy_wait_queue(&delay_queue);
1032} 1010}
1033 1011
1034int 1012int
1035rpc_init_mempool(void) 1013rpc_init_mempool(void)
1036{ 1014{
1015 /*
1016 * The following is not strictly a mempool initialisation,
1017 * but there is no harm in doing it here
1018 */
1019 rpc_init_wait_queue(&delay_queue, "delayq");
1020 if (!rpciod_start())
1021 goto err_nomem;
1022
1037 rpc_task_slabp = kmem_cache_create("rpc_tasks", 1023 rpc_task_slabp = kmem_cache_create("rpc_tasks",
1038 sizeof(struct rpc_task), 1024 sizeof(struct rpc_task),
1039 0, SLAB_HWCACHE_ALIGN, 1025 0, SLAB_HWCACHE_ALIGN,
@@ -1054,13 +1040,6 @@ rpc_init_mempool(void)
1054 rpc_buffer_slabp); 1040 rpc_buffer_slabp);
1055 if (!rpc_buffer_mempool) 1041 if (!rpc_buffer_mempool)
1056 goto err_nomem; 1042 goto err_nomem;
1057 if (!rpciod_start())
1058 goto err_nomem;
1059 /*
1060 * The following is not strictly a mempool initialisation,
1061 * but there is no harm in doing it here
1062 */
1063 rpc_init_wait_queue(&delay_queue, "delayq");
1064 return 0; 1043 return 0;
1065err_nomem: 1044err_nomem:
1066 rpc_destroy_mempool(); 1045 rpc_destroy_mempool();
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index d5553b8179f9..85199c647022 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -188,9 +188,9 @@ out_sleep:
188 task->tk_timeout = 0; 188 task->tk_timeout = 0;
189 task->tk_status = -EAGAIN; 189 task->tk_status = -EAGAIN;
190 if (req && req->rq_ntrans) 190 if (req && req->rq_ntrans)
191 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 191 rpc_sleep_on(&xprt->resend, task, NULL);
192 else 192 else
193 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 193 rpc_sleep_on(&xprt->sending, task, NULL);
194 return 0; 194 return 0;
195} 195}
196EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 196EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
@@ -238,9 +238,9 @@ out_sleep:
238 task->tk_timeout = 0; 238 task->tk_timeout = 0;
239 task->tk_status = -EAGAIN; 239 task->tk_status = -EAGAIN;
240 if (req && req->rq_ntrans) 240 if (req && req->rq_ntrans)
241 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 241 rpc_sleep_on(&xprt->resend, task, NULL);
242 else 242 else
243 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 243 rpc_sleep_on(&xprt->sending, task, NULL);
244 return 0; 244 return 0;
245} 245}
246EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 246EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
@@ -453,7 +453,7 @@ void xprt_wait_for_buffer_space(struct rpc_task *task)
453 struct rpc_xprt *xprt = req->rq_xprt; 453 struct rpc_xprt *xprt = req->rq_xprt;
454 454
455 task->tk_timeout = req->rq_timeout; 455 task->tk_timeout = req->rq_timeout;
456 rpc_sleep_on(&xprt->pending, task, NULL, NULL); 456 rpc_sleep_on(&xprt->pending, task, NULL);
457} 457}
458EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 458EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
459 459
@@ -472,7 +472,7 @@ void xprt_write_space(struct rpc_xprt *xprt)
472 if (xprt->snd_task) { 472 if (xprt->snd_task) {
473 dprintk("RPC: write space: waking waiting task on " 473 dprintk("RPC: write space: waking waiting task on "
474 "xprt %p\n", xprt); 474 "xprt %p\n", xprt);
475 rpc_wake_up_task(xprt->snd_task); 475 rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
476 } 476 }
477 spin_unlock_bh(&xprt->transport_lock); 477 spin_unlock_bh(&xprt->transport_lock);
478} 478}
@@ -602,8 +602,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
602 /* Try to schedule an autoclose RPC call */ 602 /* Try to schedule an autoclose RPC call */
603 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 603 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
604 queue_work(rpciod_workqueue, &xprt->task_cleanup); 604 queue_work(rpciod_workqueue, &xprt->task_cleanup);
605 else if (xprt->snd_task != NULL) 605 xprt_wake_pending_tasks(xprt, -ENOTCONN);
606 rpc_wake_up_task(xprt->snd_task);
607 spin_unlock_bh(&xprt->transport_lock); 606 spin_unlock_bh(&xprt->transport_lock);
608} 607}
609EXPORT_SYMBOL_GPL(xprt_force_disconnect); 608EXPORT_SYMBOL_GPL(xprt_force_disconnect);
@@ -653,7 +652,7 @@ void xprt_connect(struct rpc_task *task)
653 task->tk_rqstp->rq_bytes_sent = 0; 652 task->tk_rqstp->rq_bytes_sent = 0;
654 653
655 task->tk_timeout = xprt->connect_timeout; 654 task->tk_timeout = xprt->connect_timeout;
656 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); 655 rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
657 xprt->stat.connect_start = jiffies; 656 xprt->stat.connect_start = jiffies;
658 xprt->ops->connect(task); 657 xprt->ops->connect(task);
659 } 658 }
@@ -749,18 +748,19 @@ EXPORT_SYMBOL_GPL(xprt_update_rtt);
749void xprt_complete_rqst(struct rpc_task *task, int copied) 748void xprt_complete_rqst(struct rpc_task *task, int copied)
750{ 749{
751 struct rpc_rqst *req = task->tk_rqstp; 750 struct rpc_rqst *req = task->tk_rqstp;
751 struct rpc_xprt *xprt = req->rq_xprt;
752 752
753 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 753 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
754 task->tk_pid, ntohl(req->rq_xid), copied); 754 task->tk_pid, ntohl(req->rq_xid), copied);
755 755
756 task->tk_xprt->stat.recvs++; 756 xprt->stat.recvs++;
757 task->tk_rtt = (long)jiffies - req->rq_xtime; 757 task->tk_rtt = (long)jiffies - req->rq_xtime;
758 758
759 list_del_init(&req->rq_list); 759 list_del_init(&req->rq_list);
760 /* Ensure all writes are done before we update req->rq_received */ 760 /* Ensure all writes are done before we update req->rq_received */
761 smp_wmb(); 761 smp_wmb();
762 req->rq_received = req->rq_private_buf.len = copied; 762 req->rq_received = req->rq_private_buf.len = copied;
763 rpc_wake_up_task(task); 763 rpc_wake_up_queued_task(&xprt->pending, task);
764} 764}
765EXPORT_SYMBOL_GPL(xprt_complete_rqst); 765EXPORT_SYMBOL_GPL(xprt_complete_rqst);
766 766
@@ -769,17 +769,17 @@ static void xprt_timer(struct rpc_task *task)
769 struct rpc_rqst *req = task->tk_rqstp; 769 struct rpc_rqst *req = task->tk_rqstp;
770 struct rpc_xprt *xprt = req->rq_xprt; 770 struct rpc_xprt *xprt = req->rq_xprt;
771 771
772 if (task->tk_status != -ETIMEDOUT)
773 return;
772 dprintk("RPC: %5u xprt_timer\n", task->tk_pid); 774 dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
773 775
774 spin_lock(&xprt->transport_lock); 776 spin_lock_bh(&xprt->transport_lock);
775 if (!req->rq_received) { 777 if (!req->rq_received) {
776 if (xprt->ops->timer) 778 if (xprt->ops->timer)
777 xprt->ops->timer(task); 779 xprt->ops->timer(task);
778 task->tk_status = -ETIMEDOUT; 780 } else
779 } 781 task->tk_status = 0;
780 task->tk_timeout = 0; 782 spin_unlock_bh(&xprt->transport_lock);
781 rpc_wake_up_task(task);
782 spin_unlock(&xprt->transport_lock);
783} 783}
784 784
785/** 785/**
@@ -864,7 +864,7 @@ void xprt_transmit(struct rpc_task *task)
864 if (!xprt_connected(xprt)) 864 if (!xprt_connected(xprt))
865 task->tk_status = -ENOTCONN; 865 task->tk_status = -ENOTCONN;
866 else if (!req->rq_received) 866 else if (!req->rq_received)
867 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); 867 rpc_sleep_on(&xprt->pending, task, xprt_timer);
868 spin_unlock_bh(&xprt->transport_lock); 868 spin_unlock_bh(&xprt->transport_lock);
869 return; 869 return;
870 } 870 }
@@ -875,7 +875,7 @@ void xprt_transmit(struct rpc_task *task)
875 */ 875 */
876 task->tk_status = status; 876 task->tk_status = status;
877 if (status == -ECONNREFUSED) 877 if (status == -ECONNREFUSED)
878 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 878 rpc_sleep_on(&xprt->sending, task, NULL);
879} 879}
880 880
881static inline void do_xprt_reserve(struct rpc_task *task) 881static inline void do_xprt_reserve(struct rpc_task *task)
@@ -895,7 +895,7 @@ static inline void do_xprt_reserve(struct rpc_task *task)
895 dprintk("RPC: waiting for request slot\n"); 895 dprintk("RPC: waiting for request slot\n");
896 task->tk_status = -EAGAIN; 896 task->tk_status = -EAGAIN;
897 task->tk_timeout = 0; 897 task->tk_timeout = 0;
898 rpc_sleep_on(&xprt->backlog, task, NULL, NULL); 898 rpc_sleep_on(&xprt->backlog, task, NULL);
899} 899}
900 900
901/** 901/**
@@ -1052,6 +1052,11 @@ static void xprt_destroy(struct kref *kref)
1052 xprt->shutdown = 1; 1052 xprt->shutdown = 1;
1053 del_timer_sync(&xprt->timer); 1053 del_timer_sync(&xprt->timer);
1054 1054
1055 rpc_destroy_wait_queue(&xprt->binding);
1056 rpc_destroy_wait_queue(&xprt->pending);
1057 rpc_destroy_wait_queue(&xprt->sending);
1058 rpc_destroy_wait_queue(&xprt->resend);
1059 rpc_destroy_wait_queue(&xprt->backlog);
1055 /* 1060 /*
1056 * Tear down transport state and free the rpc_xprt 1061 * Tear down transport state and free the rpc_xprt
1057 */ 1062 */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 30e7ac243a90..8bd3b0f73ac0 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1073,6 +1073,7 @@ static void xs_tcp_data_ready(struct sock *sk, int bytes)
1073{ 1073{
1074 struct rpc_xprt *xprt; 1074 struct rpc_xprt *xprt;
1075 read_descriptor_t rd_desc; 1075 read_descriptor_t rd_desc;
1076 int read;
1076 1077
1077 dprintk("RPC: xs_tcp_data_ready...\n"); 1078 dprintk("RPC: xs_tcp_data_ready...\n");
1078 1079
@@ -1084,8 +1085,10 @@ static void xs_tcp_data_ready(struct sock *sk, int bytes)
1084 1085
1085 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ 1086 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1086 rd_desc.arg.data = xprt; 1087 rd_desc.arg.data = xprt;
1087 rd_desc.count = 65536; 1088 do {
1088 tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); 1089 rd_desc.count = 65536;
1090 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1091 } while (read > 0);
1089out: 1092out:
1090 read_unlock(&sk->sk_callback_lock); 1093 read_unlock(&sk->sk_callback_lock);
1091} 1094}