aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-10-06 14:31:13 -0400
committerSage Weil <sage@newdream.net>2009-10-06 14:31:13 -0400
commit8fc91fd85950d106883852c6d215614ec28cc92d (patch)
treed2367bb82957da5ddfb48f64c6fd14e0dac6b4f8 /fs/ceph
parent31b8006e1d79e127a776c9414e3e0b5f9508047e (diff)
ceph: message pools
The msgpool is a basic mempool_t-like structure to preallocate messages we expect to receive over the wire. This ensures we have the necessary memory preallocated to process replies to requests, or to process unsolicited messages from various servers. Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/msgpool.c167
-rw-r--r--fs/ceph/msgpool.h26
2 files changed, 193 insertions, 0 deletions
diff --git a/fs/ceph/msgpool.c b/fs/ceph/msgpool.c
new file mode 100644
index 000000000000..39d4d7ed82ce
--- /dev/null
+++ b/fs/ceph/msgpool.c
@@ -0,0 +1,167 @@
1#include "ceph_debug.h"
2
3#include <linux/err.h>
4#include <linux/sched.h>
5#include <linux/types.h>
6#include <linux/vmalloc.h>
7
8#include "msgpool.h"
9
10/*
11 * We use msg pools to preallocate memory for messages we expect to
12 * receive over the wire, to avoid getting ourselves into OOM
13 * conditions at unexpected times. We take use a few different
14 * strategies:
15 *
16 * - for request/response type interactions, we preallocate the
17 * memory needed for the response when we generate the request.
18 *
19 * - for messages we can receive at any time from the MDS, we preallocate
20 * a pool of messages we can re-use.
21 *
22 * - for writeback, we preallocate some number of messages to use for
23 * requests and their replies, so that we always make forward
24 * progress.
25 *
26 * The msgpool behaves like a mempool_t, but keeps preallocated
27 * ceph_msgs strung together on a list_head instead of using a pointer
28 * vector. This avoids vector reallocation when we adjust the number
29 * of preallocated items (which happens frequently).
30 */
31
32
33/*
34 * Allocate or release as necessary to meet our target pool size.
35 */
36static int __fill_msgpool(struct ceph_msgpool *pool)
37{
38 struct ceph_msg *msg;
39
40 while (pool->num < pool->min) {
41 dout("fill_msgpool %p %d/%d allocating\n", pool, pool->num,
42 pool->min);
43 spin_unlock(&pool->lock);
44 msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
45 spin_lock(&pool->lock);
46 if (IS_ERR(msg))
47 return PTR_ERR(msg);
48 msg->pool = pool;
49 list_add(&msg->list_head, &pool->msgs);
50 pool->num++;
51 }
52 while (pool->num > pool->min) {
53 msg = list_first_entry(&pool->msgs, struct ceph_msg, list_head);
54 dout("fill_msgpool %p %d/%d releasing %p\n", pool, pool->num,
55 pool->min, msg);
56 list_del_init(&msg->list_head);
57 pool->num--;
58 ceph_msg_kfree(msg);
59 }
60 return 0;
61}
62
63int ceph_msgpool_init(struct ceph_msgpool *pool,
64 int front_len, int min, bool blocking)
65{
66 int ret;
67
68 dout("msgpool_init %p front_len %d min %d\n", pool, front_len, min);
69 spin_lock_init(&pool->lock);
70 pool->front_len = front_len;
71 INIT_LIST_HEAD(&pool->msgs);
72 pool->num = 0;
73 pool->min = min;
74 pool->blocking = blocking;
75 init_waitqueue_head(&pool->wait);
76
77 spin_lock(&pool->lock);
78 ret = __fill_msgpool(pool);
79 spin_unlock(&pool->lock);
80 return ret;
81}
82
83void ceph_msgpool_destroy(struct ceph_msgpool *pool)
84{
85 dout("msgpool_destroy %p\n", pool);
86 spin_lock(&pool->lock);
87 pool->min = 0;
88 __fill_msgpool(pool);
89 spin_unlock(&pool->lock);
90}
91
92int ceph_msgpool_resv(struct ceph_msgpool *pool, int delta)
93{
94 int ret;
95
96 spin_lock(&pool->lock);
97 dout("msgpool_resv %p delta %d\n", pool, delta);
98 pool->min += delta;
99 ret = __fill_msgpool(pool);
100 spin_unlock(&pool->lock);
101 return ret;
102}
103
104struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool)
105{
106 wait_queue_t wait;
107 struct ceph_msg *msg;
108
109 if (pool->blocking) {
110 /* mempool_t behavior; first try to alloc */
111 msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
112 if (!IS_ERR(msg))
113 return msg;
114 }
115
116 while (1) {
117 spin_lock(&pool->lock);
118 if (likely(pool->num)) {
119 msg = list_entry(pool->msgs.next, struct ceph_msg,
120 list_head);
121 list_del_init(&msg->list_head);
122 pool->num--;
123 dout("msgpool_get %p got %p, now %d/%d\n", pool, msg,
124 pool->num, pool->min);
125 spin_unlock(&pool->lock);
126 return msg;
127 }
128 pr_err("msgpool_get %p now %d/%d, %s\n", pool, pool->num,
129 pool->min, pool->blocking ? "waiting" : "failing");
130 spin_unlock(&pool->lock);
131
132 if (!pool->blocking) {
133 WARN_ON(1);
134
135 /* maybe we can allocate it now? */
136 msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
137 if (!IS_ERR(msg))
138 return msg;
139
140 return ERR_PTR(-ENOMEM);
141 }
142
143 init_wait(&wait);
144 prepare_to_wait(&pool->wait, &wait, TASK_UNINTERRUPTIBLE);
145 schedule();
146 finish_wait(&pool->wait, &wait);
147 }
148}
149
150void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
151{
152 spin_lock(&pool->lock);
153 if (pool->num < pool->min) {
154 ceph_msg_get(msg); /* retake a single ref */
155 list_add(&msg->list_head, &pool->msgs);
156 pool->num++;
157 dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg,
158 pool->num, pool->min);
159 spin_unlock(&pool->lock);
160 wake_up(&pool->wait);
161 } else {
162 dout("msgpool_put %p drop %p, at %d/%d\n", pool, msg,
163 pool->num, pool->min);
164 spin_unlock(&pool->lock);
165 ceph_msg_kfree(msg);
166 }
167}
diff --git a/fs/ceph/msgpool.h b/fs/ceph/msgpool.h
new file mode 100644
index 000000000000..07a2decaa6d8
--- /dev/null
+++ b/fs/ceph/msgpool.h
@@ -0,0 +1,26 @@
1#ifndef _FS_CEPH_MSGPOOL
2#define _FS_CEPH_MSGPOOL
3
4#include "messenger.h"
5
6/*
7 * we use memory pools for preallocating messages we may receive, to
8 * avoid unexpected OOM conditions.
9 */
10struct ceph_msgpool {
11 spinlock_t lock;
12 int front_len; /* preallocated payload size */
13 struct list_head msgs; /* msgs in the pool; each has 1 ref */
14 int num, min; /* cur, min # msgs in the pool */
15 bool blocking;
16 wait_queue_head_t wait;
17};
18
19extern int ceph_msgpool_init(struct ceph_msgpool *pool,
20 int front_len, int size, bool blocking);
21extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
22extern int ceph_msgpool_resv(struct ceph_msgpool *, int delta);
23extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *);
24extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *);
25
26#endif