aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/caps.c
diff options
context:
space:
mode:
authorJ. Bruce Fields <bfields@redhat.com>2010-08-26 13:22:27 -0400
committerJ. Bruce Fields <bfields@redhat.com>2010-08-26 13:22:27 -0400
commitf632265d0ffb5acf331252d98c64939849d96bb2 (patch)
tree31187d9a726bf1ca6ca12e26ad8e7c609eaf4d8b /fs/ceph/caps.c
parent7d94784293096c0a46897acdb83be5abd9278ece (diff)
parentda5cabf80e2433131bf0ed8993abc0f7ea618c73 (diff)
Merge commit 'v2.6.36-rc1' into HEAD
Diffstat (limited to 'fs/ceph/caps.c')
-rw-r--r--fs/ceph/caps.c426
1 files changed, 249 insertions, 177 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index ae3e3a306445..7bf182b03973 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -113,58 +113,41 @@ const char *ceph_cap_string(int caps)
113 return cap_str[i]; 113 return cap_str[i];
114} 114}
115 115
116/* 116void ceph_caps_init(struct ceph_mds_client *mdsc)
117 * Cap reservations
118 *
119 * Maintain a global pool of preallocated struct ceph_caps, referenced
120 * by struct ceph_caps_reservations. This ensures that we preallocate
121 * memory needed to successfully process an MDS response. (If an MDS
122 * sends us cap information and we fail to process it, we will have
123 * problems due to the client and MDS being out of sync.)
124 *
125 * Reservations are 'owned' by a ceph_cap_reservation context.
126 */
127static spinlock_t caps_list_lock;
128static struct list_head caps_list; /* unused (reserved or unreserved) */
129static int caps_total_count; /* total caps allocated */
130static int caps_use_count; /* in use */
131static int caps_reserve_count; /* unused, reserved */
132static int caps_avail_count; /* unused, unreserved */
133static int caps_min_count; /* keep at least this many (unreserved) */
134
135void __init ceph_caps_init(void)
136{ 117{
137 INIT_LIST_HEAD(&caps_list); 118 INIT_LIST_HEAD(&mdsc->caps_list);
138 spin_lock_init(&caps_list_lock); 119 spin_lock_init(&mdsc->caps_list_lock);
139} 120}
140 121
141void ceph_caps_finalize(void) 122void ceph_caps_finalize(struct ceph_mds_client *mdsc)
142{ 123{
143 struct ceph_cap *cap; 124 struct ceph_cap *cap;
144 125
145 spin_lock(&caps_list_lock); 126 spin_lock(&mdsc->caps_list_lock);
146 while (!list_empty(&caps_list)) { 127 while (!list_empty(&mdsc->caps_list)) {
147 cap = list_first_entry(&caps_list, struct ceph_cap, caps_item); 128 cap = list_first_entry(&mdsc->caps_list,
129 struct ceph_cap, caps_item);
148 list_del(&cap->caps_item); 130 list_del(&cap->caps_item);
149 kmem_cache_free(ceph_cap_cachep, cap); 131 kmem_cache_free(ceph_cap_cachep, cap);
150 } 132 }
151 caps_total_count = 0; 133 mdsc->caps_total_count = 0;
152 caps_avail_count = 0; 134 mdsc->caps_avail_count = 0;
153 caps_use_count = 0; 135 mdsc->caps_use_count = 0;
154 caps_reserve_count = 0; 136 mdsc->caps_reserve_count = 0;
155 caps_min_count = 0; 137 mdsc->caps_min_count = 0;
156 spin_unlock(&caps_list_lock); 138 spin_unlock(&mdsc->caps_list_lock);
157} 139}
158 140
159void ceph_adjust_min_caps(int delta) 141void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
160{ 142{
161 spin_lock(&caps_list_lock); 143 spin_lock(&mdsc->caps_list_lock);
162 caps_min_count += delta; 144 mdsc->caps_min_count += delta;
163 BUG_ON(caps_min_count < 0); 145 BUG_ON(mdsc->caps_min_count < 0);
164 spin_unlock(&caps_list_lock); 146 spin_unlock(&mdsc->caps_list_lock);
165} 147}
166 148
167int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need) 149int ceph_reserve_caps(struct ceph_mds_client *mdsc,
150 struct ceph_cap_reservation *ctx, int need)
168{ 151{
169 int i; 152 int i;
170 struct ceph_cap *cap; 153 struct ceph_cap *cap;
@@ -176,16 +159,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
176 dout("reserve caps ctx=%p need=%d\n", ctx, need); 159 dout("reserve caps ctx=%p need=%d\n", ctx, need);
177 160
178 /* first reserve any caps that are already allocated */ 161 /* first reserve any caps that are already allocated */
179 spin_lock(&caps_list_lock); 162 spin_lock(&mdsc->caps_list_lock);
180 if (caps_avail_count >= need) 163 if (mdsc->caps_avail_count >= need)
181 have = need; 164 have = need;
182 else 165 else
183 have = caps_avail_count; 166 have = mdsc->caps_avail_count;
184 caps_avail_count -= have; 167 mdsc->caps_avail_count -= have;
185 caps_reserve_count += have; 168 mdsc->caps_reserve_count += have;
186 BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + 169 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
187 caps_avail_count); 170 mdsc->caps_reserve_count +
188 spin_unlock(&caps_list_lock); 171 mdsc->caps_avail_count);
172 spin_unlock(&mdsc->caps_list_lock);
189 173
190 for (i = have; i < need; i++) { 174 for (i = have; i < need; i++) {
191 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 175 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
@@ -198,19 +182,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
198 } 182 }
199 BUG_ON(have + alloc != need); 183 BUG_ON(have + alloc != need);
200 184
201 spin_lock(&caps_list_lock); 185 spin_lock(&mdsc->caps_list_lock);
202 caps_total_count += alloc; 186 mdsc->caps_total_count += alloc;
203 caps_reserve_count += alloc; 187 mdsc->caps_reserve_count += alloc;
204 list_splice(&newcaps, &caps_list); 188 list_splice(&newcaps, &mdsc->caps_list);
205 189
206 BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + 190 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
207 caps_avail_count); 191 mdsc->caps_reserve_count +
208 spin_unlock(&caps_list_lock); 192 mdsc->caps_avail_count);
193 spin_unlock(&mdsc->caps_list_lock);
209 194
210 ctx->count = need; 195 ctx->count = need;
211 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", 196 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
212 ctx, caps_total_count, caps_use_count, caps_reserve_count, 197 ctx, mdsc->caps_total_count, mdsc->caps_use_count,
213 caps_avail_count); 198 mdsc->caps_reserve_count, mdsc->caps_avail_count);
214 return 0; 199 return 0;
215 200
216out_alloc_count: 201out_alloc_count:
@@ -220,92 +205,104 @@ out_alloc_count:
220 return ret; 205 return ret;
221} 206}
222 207
223int ceph_unreserve_caps(struct ceph_cap_reservation *ctx) 208int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
209 struct ceph_cap_reservation *ctx)
224{ 210{
225 dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); 211 dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
226 if (ctx->count) { 212 if (ctx->count) {
227 spin_lock(&caps_list_lock); 213 spin_lock(&mdsc->caps_list_lock);
228 BUG_ON(caps_reserve_count < ctx->count); 214 BUG_ON(mdsc->caps_reserve_count < ctx->count);
229 caps_reserve_count -= ctx->count; 215 mdsc->caps_reserve_count -= ctx->count;
230 caps_avail_count += ctx->count; 216 mdsc->caps_avail_count += ctx->count;
231 ctx->count = 0; 217 ctx->count = 0;
232 dout("unreserve caps %d = %d used + %d resv + %d avail\n", 218 dout("unreserve caps %d = %d used + %d resv + %d avail\n",
233 caps_total_count, caps_use_count, caps_reserve_count, 219 mdsc->caps_total_count, mdsc->caps_use_count,
234 caps_avail_count); 220 mdsc->caps_reserve_count, mdsc->caps_avail_count);
235 BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + 221 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
236 caps_avail_count); 222 mdsc->caps_reserve_count +
237 spin_unlock(&caps_list_lock); 223 mdsc->caps_avail_count);
224 spin_unlock(&mdsc->caps_list_lock);
238 } 225 }
239 return 0; 226 return 0;
240} 227}
241 228
242static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx) 229static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
230 struct ceph_cap_reservation *ctx)
243{ 231{
244 struct ceph_cap *cap = NULL; 232 struct ceph_cap *cap = NULL;
245 233
246 /* temporary, until we do something about cap import/export */ 234 /* temporary, until we do something about cap import/export */
247 if (!ctx) 235 if (!ctx) {
248 return kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 236 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
237 if (cap) {
238 mdsc->caps_use_count++;
239 mdsc->caps_total_count++;
240 }
241 return cap;
242 }
249 243
250 spin_lock(&caps_list_lock); 244 spin_lock(&mdsc->caps_list_lock);
251 dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n", 245 dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
252 ctx, ctx->count, caps_total_count, caps_use_count, 246 ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
253 caps_reserve_count, caps_avail_count); 247 mdsc->caps_reserve_count, mdsc->caps_avail_count);
254 BUG_ON(!ctx->count); 248 BUG_ON(!ctx->count);
255 BUG_ON(ctx->count > caps_reserve_count); 249 BUG_ON(ctx->count > mdsc->caps_reserve_count);
256 BUG_ON(list_empty(&caps_list)); 250 BUG_ON(list_empty(&mdsc->caps_list));
257 251
258 ctx->count--; 252 ctx->count--;
259 caps_reserve_count--; 253 mdsc->caps_reserve_count--;
260 caps_use_count++; 254 mdsc->caps_use_count++;
261 255
262 cap = list_first_entry(&caps_list, struct ceph_cap, caps_item); 256 cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
263 list_del(&cap->caps_item); 257 list_del(&cap->caps_item);
264 258
265 BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + 259 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
266 caps_avail_count); 260 mdsc->caps_reserve_count + mdsc->caps_avail_count);
267 spin_unlock(&caps_list_lock); 261 spin_unlock(&mdsc->caps_list_lock);
268 return cap; 262 return cap;
269} 263}
270 264
271void ceph_put_cap(struct ceph_cap *cap) 265void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
272{ 266{
273 spin_lock(&caps_list_lock); 267 spin_lock(&mdsc->caps_list_lock);
274 dout("put_cap %p %d = %d used + %d resv + %d avail\n", 268 dout("put_cap %p %d = %d used + %d resv + %d avail\n",
275 cap, caps_total_count, caps_use_count, 269 cap, mdsc->caps_total_count, mdsc->caps_use_count,
276 caps_reserve_count, caps_avail_count); 270 mdsc->caps_reserve_count, mdsc->caps_avail_count);
277 caps_use_count--; 271 mdsc->caps_use_count--;
278 /* 272 /*
279 * Keep some preallocated caps around (ceph_min_count), to 273 * Keep some preallocated caps around (ceph_min_count), to
280 * avoid lots of free/alloc churn. 274 * avoid lots of free/alloc churn.
281 */ 275 */
282 if (caps_avail_count >= caps_reserve_count + caps_min_count) { 276 if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
283 caps_total_count--; 277 mdsc->caps_min_count) {
278 mdsc->caps_total_count--;
284 kmem_cache_free(ceph_cap_cachep, cap); 279 kmem_cache_free(ceph_cap_cachep, cap);
285 } else { 280 } else {
286 caps_avail_count++; 281 mdsc->caps_avail_count++;
287 list_add(&cap->caps_item, &caps_list); 282 list_add(&cap->caps_item, &mdsc->caps_list);
288 } 283 }
289 284
290 BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + 285 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
291 caps_avail_count); 286 mdsc->caps_reserve_count + mdsc->caps_avail_count);
292 spin_unlock(&caps_list_lock); 287 spin_unlock(&mdsc->caps_list_lock);
293} 288}
294 289
295void ceph_reservation_status(struct ceph_client *client, 290void ceph_reservation_status(struct ceph_client *client,
296 int *total, int *avail, int *used, int *reserved, 291 int *total, int *avail, int *used, int *reserved,
297 int *min) 292 int *min)
298{ 293{
294 struct ceph_mds_client *mdsc = &client->mdsc;
295
299 if (total) 296 if (total)
300 *total = caps_total_count; 297 *total = mdsc->caps_total_count;
301 if (avail) 298 if (avail)
302 *avail = caps_avail_count; 299 *avail = mdsc->caps_avail_count;
303 if (used) 300 if (used)
304 *used = caps_use_count; 301 *used = mdsc->caps_use_count;
305 if (reserved) 302 if (reserved)
306 *reserved = caps_reserve_count; 303 *reserved = mdsc->caps_reserve_count;
307 if (min) 304 if (min)
308 *min = caps_min_count; 305 *min = mdsc->caps_min_count;
309} 306}
310 307
311/* 308/*
@@ -330,22 +327,29 @@ static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
330 return NULL; 327 return NULL;
331} 328}
332 329
330struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
331{
332 struct ceph_cap *cap;
333
334 spin_lock(&ci->vfs_inode.i_lock);
335 cap = __get_cap_for_mds(ci, mds);
336 spin_unlock(&ci->vfs_inode.i_lock);
337 return cap;
338}
339
333/* 340/*
334 * Return id of any MDS with a cap, preferably FILE_WR|WRBUFFER|EXCL, else 341 * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
335 * -1.
336 */ 342 */
337static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq) 343static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
338{ 344{
339 struct ceph_cap *cap; 345 struct ceph_cap *cap;
340 int mds = -1; 346 int mds = -1;
341 struct rb_node *p; 347 struct rb_node *p;
342 348
343 /* prefer mds with WR|WRBUFFER|EXCL caps */ 349 /* prefer mds with WR|BUFFER|EXCL caps */
344 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 350 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
345 cap = rb_entry(p, struct ceph_cap, ci_node); 351 cap = rb_entry(p, struct ceph_cap, ci_node);
346 mds = cap->mds; 352 mds = cap->mds;
347 if (mseq)
348 *mseq = cap->mseq;
349 if (cap->issued & (CEPH_CAP_FILE_WR | 353 if (cap->issued & (CEPH_CAP_FILE_WR |
350 CEPH_CAP_FILE_BUFFER | 354 CEPH_CAP_FILE_BUFFER |
351 CEPH_CAP_FILE_EXCL)) 355 CEPH_CAP_FILE_EXCL))
@@ -358,7 +362,7 @@ int ceph_get_cap_mds(struct inode *inode)
358{ 362{
359 int mds; 363 int mds;
360 spin_lock(&inode->i_lock); 364 spin_lock(&inode->i_lock);
361 mds = __ceph_get_cap_mds(ceph_inode(inode), NULL); 365 mds = __ceph_get_cap_mds(ceph_inode(inode));
362 spin_unlock(&inode->i_lock); 366 spin_unlock(&inode->i_lock);
363 return mds; 367 return mds;
364} 368}
@@ -477,8 +481,8 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
477 * Each time we receive FILE_CACHE anew, we increment 481 * Each time we receive FILE_CACHE anew, we increment
478 * i_rdcache_gen. 482 * i_rdcache_gen.
479 */ 483 */
480 if ((issued & CEPH_CAP_FILE_CACHE) && 484 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
481 (had & CEPH_CAP_FILE_CACHE) == 0) 485 (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
482 ci->i_rdcache_gen++; 486 ci->i_rdcache_gen++;
483 487
484 /* 488 /*
@@ -537,7 +541,7 @@ retry:
537 new_cap = NULL; 541 new_cap = NULL;
538 } else { 542 } else {
539 spin_unlock(&inode->i_lock); 543 spin_unlock(&inode->i_lock);
540 new_cap = get_cap(caps_reservation); 544 new_cap = get_cap(mdsc, caps_reservation);
541 if (new_cap == NULL) 545 if (new_cap == NULL)
542 return -ENOMEM; 546 return -ENOMEM;
543 goto retry; 547 goto retry;
@@ -582,6 +586,7 @@ retry:
582 } else { 586 } else {
583 pr_err("ceph_add_cap: couldn't find snap realm %llx\n", 587 pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
584 realmino); 588 realmino);
589 WARN_ON(!realm);
585 } 590 }
586 } 591 }
587 592
@@ -621,7 +626,7 @@ retry:
621 if (fmode >= 0) 626 if (fmode >= 0)
622 __ceph_get_fmode(ci, fmode); 627 __ceph_get_fmode(ci, fmode);
623 spin_unlock(&inode->i_lock); 628 spin_unlock(&inode->i_lock);
624 wake_up(&ci->i_cap_wq); 629 wake_up_all(&ci->i_cap_wq);
625 return 0; 630 return 0;
626} 631}
627 632
@@ -825,7 +830,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
825{ 830{
826 int want = 0; 831 int want = 0;
827 int mode; 832 int mode;
828 for (mode = 0; mode < 4; mode++) 833 for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++)
829 if (ci->i_nr_by_mode[mode]) 834 if (ci->i_nr_by_mode[mode])
830 want |= ceph_caps_for_mode(mode); 835 want |= ceph_caps_for_mode(mode);
831 return want; 836 return want;
@@ -895,7 +900,7 @@ void __ceph_remove_cap(struct ceph_cap *cap)
895 ci->i_auth_cap = NULL; 900 ci->i_auth_cap = NULL;
896 901
897 if (removed) 902 if (removed)
898 ceph_put_cap(cap); 903 ceph_put_cap(mdsc, cap);
899 904
900 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) { 905 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
901 struct ceph_snap_realm *realm = ci->i_snap_realm; 906 struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -981,6 +986,46 @@ static int send_cap_msg(struct ceph_mds_session *session,
981 return 0; 986 return 0;
982} 987}
983 988
989static void __queue_cap_release(struct ceph_mds_session *session,
990 u64 ino, u64 cap_id, u32 migrate_seq,
991 u32 issue_seq)
992{
993 struct ceph_msg *msg;
994 struct ceph_mds_cap_release *head;
995 struct ceph_mds_cap_item *item;
996
997 spin_lock(&session->s_cap_lock);
998 BUG_ON(!session->s_num_cap_releases);
999 msg = list_first_entry(&session->s_cap_releases,
1000 struct ceph_msg, list_head);
1001
1002 dout(" adding %llx release to mds%d msg %p (%d left)\n",
1003 ino, session->s_mds, msg, session->s_num_cap_releases);
1004
1005 BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
1006 head = msg->front.iov_base;
1007 head->num = cpu_to_le32(le32_to_cpu(head->num) + 1);
1008 item = msg->front.iov_base + msg->front.iov_len;
1009 item->ino = cpu_to_le64(ino);
1010 item->cap_id = cpu_to_le64(cap_id);
1011 item->migrate_seq = cpu_to_le32(migrate_seq);
1012 item->seq = cpu_to_le32(issue_seq);
1013
1014 session->s_num_cap_releases--;
1015
1016 msg->front.iov_len += sizeof(*item);
1017 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1018 dout(" release msg %p full\n", msg);
1019 list_move_tail(&msg->list_head, &session->s_cap_releases_done);
1020 } else {
1021 dout(" release msg %p at %d/%d (%d)\n", msg,
1022 (int)le32_to_cpu(head->num),
1023 (int)CEPH_CAPS_PER_RELEASE,
1024 (int)msg->front.iov_len);
1025 }
1026 spin_unlock(&session->s_cap_lock);
1027}
1028
984/* 1029/*
985 * Queue cap releases when an inode is dropped from our cache. Since 1030 * Queue cap releases when an inode is dropped from our cache. Since
986 * inode is about to be destroyed, there is no need for i_lock. 1031 * inode is about to be destroyed, there is no need for i_lock.
@@ -994,41 +1039,9 @@ void ceph_queue_caps_release(struct inode *inode)
994 while (p) { 1039 while (p) {
995 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); 1040 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
996 struct ceph_mds_session *session = cap->session; 1041 struct ceph_mds_session *session = cap->session;
997 struct ceph_msg *msg;
998 struct ceph_mds_cap_release *head;
999 struct ceph_mds_cap_item *item;
1000 1042
1001 spin_lock(&session->s_cap_lock); 1043 __queue_cap_release(session, ceph_ino(inode), cap->cap_id,
1002 BUG_ON(!session->s_num_cap_releases); 1044 cap->mseq, cap->issue_seq);
1003 msg = list_first_entry(&session->s_cap_releases,
1004 struct ceph_msg, list_head);
1005
1006 dout(" adding %p release to mds%d msg %p (%d left)\n",
1007 inode, session->s_mds, msg, session->s_num_cap_releases);
1008
1009 BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
1010 head = msg->front.iov_base;
1011 head->num = cpu_to_le32(le32_to_cpu(head->num) + 1);
1012 item = msg->front.iov_base + msg->front.iov_len;
1013 item->ino = cpu_to_le64(ceph_ino(inode));
1014 item->cap_id = cpu_to_le64(cap->cap_id);
1015 item->migrate_seq = cpu_to_le32(cap->mseq);
1016 item->seq = cpu_to_le32(cap->issue_seq);
1017
1018 session->s_num_cap_releases--;
1019
1020 msg->front.iov_len += sizeof(*item);
1021 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1022 dout(" release msg %p full\n", msg);
1023 list_move_tail(&msg->list_head,
1024 &session->s_cap_releases_done);
1025 } else {
1026 dout(" release msg %p at %d/%d (%d)\n", msg,
1027 (int)le32_to_cpu(head->num),
1028 (int)CEPH_CAPS_PER_RELEASE,
1029 (int)msg->front.iov_len);
1030 }
1031 spin_unlock(&session->s_cap_lock);
1032 p = rb_next(p); 1045 p = rb_next(p);
1033 __ceph_remove_cap(cap); 1046 __ceph_remove_cap(cap);
1034 } 1047 }
@@ -1167,7 +1180,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1167 } 1180 }
1168 1181
1169 if (wake) 1182 if (wake)
1170 wake_up(&ci->i_cap_wq); 1183 wake_up_all(&ci->i_cap_wq);
1171 1184
1172 return delayed; 1185 return delayed;
1173} 1186}
@@ -1183,6 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1183 */ 1196 */
1184void __ceph_flush_snaps(struct ceph_inode_info *ci, 1197void __ceph_flush_snaps(struct ceph_inode_info *ci,
1185 struct ceph_mds_session **psession) 1198 struct ceph_mds_session **psession)
1199 __releases(ci->vfs_inode->i_lock)
1200 __acquires(ci->vfs_inode->i_lock)
1186{ 1201{
1187 struct inode *inode = &ci->vfs_inode; 1202 struct inode *inode = &ci->vfs_inode;
1188 int mds; 1203 int mds;
@@ -1218,7 +1233,13 @@ retry:
1218 BUG_ON(capsnap->dirty == 0); 1233 BUG_ON(capsnap->dirty == 0);
1219 1234
1220 /* pick mds, take s_mutex */ 1235 /* pick mds, take s_mutex */
1221 mds = __ceph_get_cap_mds(ci, &mseq); 1236 if (ci->i_auth_cap == NULL) {
1237 dout("no auth cap (migrating?), doing nothing\n");
1238 goto out;
1239 }
1240 mds = ci->i_auth_cap->session->s_mds;
1241 mseq = ci->i_auth_cap->mseq;
1242
1222 if (session && session->s_mds != mds) { 1243 if (session && session->s_mds != mds) {
1223 dout("oops, wrong session %p mutex\n", session); 1244 dout("oops, wrong session %p mutex\n", session);
1224 mutex_unlock(&session->s_mutex); 1245 mutex_unlock(&session->s_mutex);
@@ -1237,8 +1258,8 @@ retry:
1237 } 1258 }
1238 /* 1259 /*
1239 * if session == NULL, we raced against a cap 1260 * if session == NULL, we raced against a cap
1240 * deletion. retry, and we'll get a better 1261 * deletion or migration. retry, and we'll
1241 * @mds value next time. 1262 * get a better @mds value next time.
1242 */ 1263 */
1243 spin_lock(&inode->i_lock); 1264 spin_lock(&inode->i_lock);
1244 goto retry; 1265 goto retry;
@@ -1276,6 +1297,7 @@ retry:
1276 list_del_init(&ci->i_snap_flush_item); 1297 list_del_init(&ci->i_snap_flush_item);
1277 spin_unlock(&mdsc->snap_flush_lock); 1298 spin_unlock(&mdsc->snap_flush_lock);
1278 1299
1300out:
1279 if (psession) 1301 if (psession)
1280 *psession = session; 1302 *psession = session;
1281 else if (session) { 1303 else if (session) {
@@ -1421,7 +1443,6 @@ static int try_nonblocking_invalidate(struct inode *inode)
1421 */ 1443 */
1422void ceph_check_caps(struct ceph_inode_info *ci, int flags, 1444void ceph_check_caps(struct ceph_inode_info *ci, int flags,
1423 struct ceph_mds_session *session) 1445 struct ceph_mds_session *session)
1424 __releases(session->s_mutex)
1425{ 1446{
1426 struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode); 1447 struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
1427 struct ceph_mds_client *mdsc = &client->mdsc; 1448 struct ceph_mds_client *mdsc = &client->mdsc;
@@ -1496,11 +1517,13 @@ retry_locked:
1496 ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ 1517 ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
1497 ci->i_rdcache_gen && /* may have cached pages */ 1518 ci->i_rdcache_gen && /* may have cached pages */
1498 (file_wanted == 0 || /* no open files */ 1519 (file_wanted == 0 || /* no open files */
1499 (revoking & CEPH_CAP_FILE_CACHE)) && /* or revoking cache */ 1520 (revoking & (CEPH_CAP_FILE_CACHE|
1521 CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */
1500 !tried_invalidate) { 1522 !tried_invalidate) {
1501 dout("check_caps trying to invalidate on %p\n", inode); 1523 dout("check_caps trying to invalidate on %p\n", inode);
1502 if (try_nonblocking_invalidate(inode) < 0) { 1524 if (try_nonblocking_invalidate(inode) < 0) {
1503 if (revoking & CEPH_CAP_FILE_CACHE) { 1525 if (revoking & (CEPH_CAP_FILE_CACHE|
1526 CEPH_CAP_FILE_LAZYIO)) {
1504 dout("check_caps queuing invalidate\n"); 1527 dout("check_caps queuing invalidate\n");
1505 queue_invalidate = 1; 1528 queue_invalidate = 1;
1506 ci->i_rdcache_revoking = ci->i_rdcache_gen; 1529 ci->i_rdcache_revoking = ci->i_rdcache_gen;
@@ -2139,7 +2162,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
2139 else if (flushsnaps) 2162 else if (flushsnaps)
2140 ceph_flush_snaps(ci); 2163 ceph_flush_snaps(ci);
2141 if (wake) 2164 if (wake)
2142 wake_up(&ci->i_cap_wq); 2165 wake_up_all(&ci->i_cap_wq);
2143 if (put) 2166 if (put)
2144 iput(inode); 2167 iput(inode);
2145} 2168}
@@ -2215,7 +2238,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
2215 iput(inode); 2238 iput(inode);
2216 } else if (complete_capsnap) { 2239 } else if (complete_capsnap) {
2217 ceph_flush_snaps(ci); 2240 ceph_flush_snaps(ci);
2218 wake_up(&ci->i_cap_wq); 2241 wake_up_all(&ci->i_cap_wq);
2219 } 2242 }
2220 if (drop_capsnap) 2243 if (drop_capsnap)
2221 iput(inode); 2244 iput(inode);
@@ -2236,8 +2259,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2236 struct ceph_mds_session *session, 2259 struct ceph_mds_session *session,
2237 struct ceph_cap *cap, 2260 struct ceph_cap *cap,
2238 struct ceph_buffer *xattr_buf) 2261 struct ceph_buffer *xattr_buf)
2239 __releases(inode->i_lock) 2262 __releases(inode->i_lock)
2240 __releases(session->s_mutex)
2241{ 2263{
2242 struct ceph_inode_info *ci = ceph_inode(inode); 2264 struct ceph_inode_info *ci = ceph_inode(inode);
2243 int mds = session->s_mds; 2265 int mds = session->s_mds;
@@ -2264,6 +2286,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2264 * will invalidate _after_ writeback.) 2286 * will invalidate _after_ writeback.)
2265 */ 2287 */
2266 if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && 2288 if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
2289 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
2267 !ci->i_wrbuffer_ref) { 2290 !ci->i_wrbuffer_ref) {
2268 if (try_nonblocking_invalidate(inode) == 0) { 2291 if (try_nonblocking_invalidate(inode) == 0) {
2269 revoked_rdcache = 1; 2292 revoked_rdcache = 1;
@@ -2355,15 +2378,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2355 2378
2356 /* revocation, grant, or no-op? */ 2379 /* revocation, grant, or no-op? */
2357 if (cap->issued & ~newcaps) { 2380 if (cap->issued & ~newcaps) {
2358 dout("revocation: %s -> %s\n", ceph_cap_string(cap->issued), 2381 int revoking = cap->issued & ~newcaps;
2359 ceph_cap_string(newcaps)); 2382
2360 if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER) 2383 dout("revocation: %s -> %s (revoking %s)\n",
2361 writeback = 1; /* will delay ack */ 2384 ceph_cap_string(cap->issued),
2362 else if (dirty & ~newcaps) 2385 ceph_cap_string(newcaps),
2363 check_caps = 1; /* initiate writeback in check_caps */ 2386 ceph_cap_string(revoking));
2364 else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 || 2387 if (revoking & used & CEPH_CAP_FILE_BUFFER)
2365 revoked_rdcache) 2388 writeback = 1; /* initiate writeback; will delay ack */
2366 check_caps = 2; /* send revoke ack in check_caps */ 2389 else if (revoking == CEPH_CAP_FILE_CACHE &&
2390 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
2391 queue_invalidate)
2392 ; /* do nothing yet, invalidation will be queued */
2393 else if (cap == ci->i_auth_cap)
2394 check_caps = 1; /* check auth cap only */
2395 else
2396 check_caps = 2; /* check all caps */
2367 cap->issued = newcaps; 2397 cap->issued = newcaps;
2368 cap->implemented |= newcaps; 2398 cap->implemented |= newcaps;
2369 } else if (cap->issued == newcaps) { 2399 } else if (cap->issued == newcaps) {
@@ -2391,7 +2421,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2391 if (queue_invalidate) 2421 if (queue_invalidate)
2392 ceph_queue_invalidate(inode); 2422 ceph_queue_invalidate(inode);
2393 if (wake) 2423 if (wake)
2394 wake_up(&ci->i_cap_wq); 2424 wake_up_all(&ci->i_cap_wq);
2395 2425
2396 if (check_caps == 1) 2426 if (check_caps == 1)
2397 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, 2427 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
@@ -2446,7 +2476,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
2446 struct ceph_inode_info, 2476 struct ceph_inode_info,
2447 i_flushing_item)->vfs_inode); 2477 i_flushing_item)->vfs_inode);
2448 mdsc->num_cap_flushing--; 2478 mdsc->num_cap_flushing--;
2449 wake_up(&mdsc->cap_flushing_wq); 2479 wake_up_all(&mdsc->cap_flushing_wq);
2450 dout(" inode %p now !flushing\n", inode); 2480 dout(" inode %p now !flushing\n", inode);
2451 2481
2452 if (ci->i_dirty_caps == 0) { 2482 if (ci->i_dirty_caps == 0) {
@@ -2458,7 +2488,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
2458 } 2488 }
2459 } 2489 }
2460 spin_unlock(&mdsc->cap_dirty_lock); 2490 spin_unlock(&mdsc->cap_dirty_lock);
2461 wake_up(&ci->i_cap_wq); 2491 wake_up_all(&ci->i_cap_wq);
2462 2492
2463out: 2493out:
2464 spin_unlock(&inode->i_lock); 2494 spin_unlock(&inode->i_lock);
@@ -2554,7 +2584,8 @@ static void handle_cap_trunc(struct inode *inode,
2554 * caller holds s_mutex 2584 * caller holds s_mutex
2555 */ 2585 */
2556static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, 2586static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2557 struct ceph_mds_session *session) 2587 struct ceph_mds_session *session,
2588 int *open_target_sessions)
2558{ 2589{
2559 struct ceph_inode_info *ci = ceph_inode(inode); 2590 struct ceph_inode_info *ci = ceph_inode(inode);
2560 int mds = session->s_mds; 2591 int mds = session->s_mds;
@@ -2586,6 +2617,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2586 ci->i_cap_exporting_mds = mds; 2617 ci->i_cap_exporting_mds = mds;
2587 ci->i_cap_exporting_mseq = mseq; 2618 ci->i_cap_exporting_mseq = mseq;
2588 ci->i_cap_exporting_issued = cap->issued; 2619 ci->i_cap_exporting_issued = cap->issued;
2620
2621 /*
2622 * make sure we have open sessions with all possible
2623 * export targets, so that we get the matching IMPORT
2624 */
2625 *open_target_sessions = 1;
2589 } 2626 }
2590 __ceph_remove_cap(cap); 2627 __ceph_remove_cap(cap);
2591 } 2628 }
@@ -2655,12 +2692,16 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2655 struct ceph_mds_caps *h; 2692 struct ceph_mds_caps *h;
2656 int mds = session->s_mds; 2693 int mds = session->s_mds;
2657 int op; 2694 int op;
2658 u32 seq; 2695 u32 seq, mseq;
2659 struct ceph_vino vino; 2696 struct ceph_vino vino;
2660 u64 cap_id; 2697 u64 cap_id;
2661 u64 size, max_size; 2698 u64 size, max_size;
2662 u64 tid; 2699 u64 tid;
2663 void *snaptrace; 2700 void *snaptrace;
2701 size_t snaptrace_len;
2702 void *flock;
2703 u32 flock_len;
2704 int open_target_sessions = 0;
2664 2705
2665 dout("handle_caps from mds%d\n", mds); 2706 dout("handle_caps from mds%d\n", mds);
2666 2707
@@ -2669,15 +2710,30 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2669 if (msg->front.iov_len < sizeof(*h)) 2710 if (msg->front.iov_len < sizeof(*h))
2670 goto bad; 2711 goto bad;
2671 h = msg->front.iov_base; 2712 h = msg->front.iov_base;
2672 snaptrace = h + 1;
2673 op = le32_to_cpu(h->op); 2713 op = le32_to_cpu(h->op);
2674 vino.ino = le64_to_cpu(h->ino); 2714 vino.ino = le64_to_cpu(h->ino);
2675 vino.snap = CEPH_NOSNAP; 2715 vino.snap = CEPH_NOSNAP;
2676 cap_id = le64_to_cpu(h->cap_id); 2716 cap_id = le64_to_cpu(h->cap_id);
2677 seq = le32_to_cpu(h->seq); 2717 seq = le32_to_cpu(h->seq);
2718 mseq = le32_to_cpu(h->migrate_seq);
2678 size = le64_to_cpu(h->size); 2719 size = le64_to_cpu(h->size);
2679 max_size = le64_to_cpu(h->max_size); 2720 max_size = le64_to_cpu(h->max_size);
2680 2721
2722 snaptrace = h + 1;
2723 snaptrace_len = le32_to_cpu(h->snap_trace_len);
2724
2725 if (le16_to_cpu(msg->hdr.version) >= 2) {
2726 void *p, *end;
2727
2728 p = snaptrace + snaptrace_len;
2729 end = msg->front.iov_base + msg->front.iov_len;
2730 ceph_decode_32_safe(&p, end, flock_len, bad);
2731 flock = p;
2732 } else {
2733 flock = NULL;
2734 flock_len = 0;
2735 }
2736
2681 mutex_lock(&session->s_mutex); 2737 mutex_lock(&session->s_mutex);
2682 session->s_seq++; 2738 session->s_seq++;
2683 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 2739 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2689,6 +2745,18 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2689 vino.snap, inode); 2745 vino.snap, inode);
2690 if (!inode) { 2746 if (!inode) {
2691 dout(" i don't have ino %llx\n", vino.ino); 2747 dout(" i don't have ino %llx\n", vino.ino);
2748
2749 if (op == CEPH_CAP_OP_IMPORT)
2750 __queue_cap_release(session, vino.ino, cap_id,
2751 mseq, seq);
2752
2753 /*
2754 * send any full release message to try to move things
2755 * along for the mds (who clearly thinks we still have this
2756 * cap).
2757 */
2758 ceph_add_cap_releases(mdsc, session);
2759 ceph_send_cap_releases(mdsc, session);
2692 goto done; 2760 goto done;
2693 } 2761 }
2694 2762
@@ -2699,12 +2767,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2699 goto done; 2767 goto done;
2700 2768
2701 case CEPH_CAP_OP_EXPORT: 2769 case CEPH_CAP_OP_EXPORT:
2702 handle_cap_export(inode, h, session); 2770 handle_cap_export(inode, h, session, &open_target_sessions);
2703 goto done; 2771 goto done;
2704 2772
2705 case CEPH_CAP_OP_IMPORT: 2773 case CEPH_CAP_OP_IMPORT:
2706 handle_cap_import(mdsc, inode, h, session, 2774 handle_cap_import(mdsc, inode, h, session,
2707 snaptrace, le32_to_cpu(h->snap_trace_len)); 2775 snaptrace, snaptrace_len);
2708 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, 2776 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
2709 session); 2777 session);
2710 goto done_unlocked; 2778 goto done_unlocked;
@@ -2714,7 +2782,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2714 spin_lock(&inode->i_lock); 2782 spin_lock(&inode->i_lock);
2715 cap = __get_cap_for_mds(ceph_inode(inode), mds); 2783 cap = __get_cap_for_mds(ceph_inode(inode), mds);
2716 if (!cap) { 2784 if (!cap) {
2717 dout("no cap on %p ino %llx.%llx from mds%d, releasing\n", 2785 dout(" no cap on %p ino %llx.%llx from mds%d\n",
2718 inode, ceph_ino(inode), ceph_snap(inode), mds); 2786 inode, ceph_ino(inode), ceph_snap(inode), mds);
2719 spin_unlock(&inode->i_lock); 2787 spin_unlock(&inode->i_lock);
2720 goto done; 2788 goto done;
@@ -2746,6 +2814,8 @@ done:
2746done_unlocked: 2814done_unlocked:
2747 if (inode) 2815 if (inode)
2748 iput(inode); 2816 iput(inode);
2817 if (open_target_sessions)
2818 ceph_mdsc_open_export_target_sessions(mdsc, session);
2749 return; 2819 return;
2750 2820
2751bad: 2821bad:
@@ -2865,18 +2935,19 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
2865 struct ceph_inode_info *ci = ceph_inode(inode); 2935 struct ceph_inode_info *ci = ceph_inode(inode);
2866 struct ceph_cap *cap; 2936 struct ceph_cap *cap;
2867 struct ceph_mds_request_release *rel = *p; 2937 struct ceph_mds_request_release *rel = *p;
2938 int used, dirty;
2868 int ret = 0; 2939 int ret = 0;
2869 int used = 0;
2870 2940
2871 spin_lock(&inode->i_lock); 2941 spin_lock(&inode->i_lock);
2872 used = __ceph_caps_used(ci); 2942 used = __ceph_caps_used(ci);
2943 dirty = __ceph_caps_dirty(ci);
2873 2944
2874 dout("encode_inode_release %p mds%d used %s drop %s unless %s\n", inode, 2945 dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n",
2875 mds, ceph_cap_string(used), ceph_cap_string(drop), 2946 inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop),
2876 ceph_cap_string(unless)); 2947 ceph_cap_string(unless));
2877 2948
2878 /* only drop unused caps */ 2949 /* only drop unused, clean caps */
2879 drop &= ~used; 2950 drop &= ~(used | dirty);
2880 2951
2881 cap = __get_cap_for_mds(ci, mds); 2952 cap = __get_cap_for_mds(ci, mds);
2882 if (cap && __cap_is_valid(cap)) { 2953 if (cap && __cap_is_valid(cap)) {
@@ -2956,6 +3027,7 @@ int ceph_encode_dentry_release(void **p, struct dentry *dentry,
2956 memcpy(*p, dentry->d_name.name, dentry->d_name.len); 3027 memcpy(*p, dentry->d_name.name, dentry->d_name.len);
2957 *p += dentry->d_name.len; 3028 *p += dentry->d_name.len;
2958 rel->dname_seq = cpu_to_le32(di->lease_seq); 3029 rel->dname_seq = cpu_to_le32(di->lease_seq);
3030 __ceph_mdsc_drop_dentry_lease(dentry);
2959 } 3031 }
2960 spin_unlock(&dentry->d_lock); 3032 spin_unlock(&dentry->d_lock);
2961 return ret; 3033 return ret;