aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/caps.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/caps.c')
-rw-r--r--fs/ceph/caps.c303
1 files changed, 173 insertions, 130 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index b81be9a56487..7bf182b03973 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -113,58 +113,41 @@ const char *ceph_cap_string(int caps)
113 return cap_str[i]; 113 return cap_str[i];
114} 114}
115 115
116/* 116void ceph_caps_init(struct ceph_mds_client *mdsc)
117 * Cap reservations
118 *
119 * Maintain a global pool of preallocated struct ceph_caps, referenced
120 * by struct ceph_caps_reservations. This ensures that we preallocate
121 * memory needed to successfully process an MDS response. (If an MDS
122 * sends us cap information and we fail to process it, we will have
123 * problems due to the client and MDS being out of sync.)
124 *
125 * Reservations are 'owned' by a ceph_cap_reservation context.
126 */
127static spinlock_t caps_list_lock;
128static struct list_head caps_list; /* unused (reserved or unreserved) */
129static int caps_total_count; /* total caps allocated */
130static int caps_use_count; /* in use */
131static int caps_reserve_count; /* unused, reserved */
132static int caps_avail_count; /* unused, unreserved */
133static int caps_min_count; /* keep at least this many (unreserved) */
134
135void __init ceph_caps_init(void)
136{ 117{
137 INIT_LIST_HEAD(&caps_list); 118 INIT_LIST_HEAD(&mdsc->caps_list);
138 spin_lock_init(&caps_list_lock); 119 spin_lock_init(&mdsc->caps_list_lock);
139} 120}
140 121
141void ceph_caps_finalize(void) 122void ceph_caps_finalize(struct ceph_mds_client *mdsc)
142{ 123{
143 struct ceph_cap *cap; 124 struct ceph_cap *cap;
144 125
145 spin_lock(&caps_list_lock); 126 spin_lock(&mdsc->caps_list_lock);
146 while (!list_empty(&caps_list)) { 127 while (!list_empty(&mdsc->caps_list)) {
147 cap = list_first_entry(&caps_list, struct ceph_cap, caps_item); 128 cap = list_first_entry(&mdsc->caps_list,
129 struct ceph_cap, caps_item);
148 list_del(&cap->caps_item); 130 list_del(&cap->caps_item);
149 kmem_cache_free(ceph_cap_cachep, cap); 131 kmem_cache_free(ceph_cap_cachep, cap);
150 } 132 }
151 caps_total_count = 0; 133 mdsc->caps_total_count = 0;
152 caps_avail_count = 0; 134 mdsc->caps_avail_count = 0;
153 caps_use_count = 0; 135 mdsc->caps_use_count = 0;
154 caps_reserve_count = 0; 136 mdsc->caps_reserve_count = 0;
155 caps_min_count = 0; 137 mdsc->caps_min_count = 0;
156 spin_unlock(&caps_list_lock); 138 spin_unlock(&mdsc->caps_list_lock);
157} 139}
158 140
159void ceph_adjust_min_caps(int delta) 141void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
160{ 142{
161 spin_lock(&caps_list_lock); 143 spin_lock(&mdsc->caps_list_lock);
162 caps_min_count += delta; 144 mdsc->caps_min_count += delta;
163 BUG_ON(caps_min_count < 0); 145 BUG_ON(mdsc->caps_min_count < 0);
164 spin_unlock(&caps_list_lock); 146 spin_unlock(&mdsc->caps_list_lock);
165} 147}
166 148
167int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need) 149int ceph_reserve_caps(struct ceph_mds_client *mdsc,
150 struct ceph_cap_reservation *ctx, int need)
168{ 151{
169 int i; 152 int i;
170 struct ceph_cap *cap; 153 struct ceph_cap *cap;
@@ -176,16 +159,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
176 dout("reserve caps ctx=%p need=%d\n", ctx, need); 159 dout("reserve caps ctx=%p need=%d\n", ctx, need);
177 160
178 /* first reserve any caps that are already allocated */ 161 /* first reserve any caps that are already allocated */
179 spin_lock(&caps_list_lock); 162 spin_lock(&mdsc->caps_list_lock);
180 if (caps_avail_count >= need) 163 if (mdsc->caps_avail_count >= need)
181 have = need; 164 have = need;
182 else 165 else
183 have = caps_avail_count; 166 have = mdsc->caps_avail_count;
184 caps_avail_count -= have; 167 mdsc->caps_avail_count -= have;
185 caps_reserve_count += have; 168 mdsc->caps_reserve_count += have;
186 BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + 169 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
187 caps_avail_count); 170 mdsc->caps_reserve_count +
188 spin_unlock(&caps_list_lock); 171 mdsc->caps_avail_count);
172 spin_unlock(&mdsc->caps_list_lock);
189 173
190 for (i = have; i < need; i++) { 174 for (i = have; i < need; i++) {
191 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 175 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
@@ -198,19 +182,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
198 } 182 }
199 BUG_ON(have + alloc != need); 183 BUG_ON(have + alloc != need);
200 184
201 spin_lock(&caps_list_lock); 185 spin_lock(&mdsc->caps_list_lock);
202 caps_total_count += alloc; 186 mdsc->caps_total_count += alloc;
203 caps_reserve_count += alloc; 187 mdsc->caps_reserve_count += alloc;
204 list_splice(&newcaps, &caps_list); 188 list_splice(&newcaps, &mdsc->caps_list);
205 189
206 BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + 190 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
207 caps_avail_count); 191 mdsc->caps_reserve_count +
208 spin_unlock(&caps_list_lock); 192 mdsc->caps_avail_count);
193 spin_unlock(&mdsc->caps_list_lock);
209 194
210 ctx->count = need; 195 ctx->count = need;
211 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", 196 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
212 ctx, caps_total_count, caps_use_count, caps_reserve_count, 197 ctx, mdsc->caps_total_count, mdsc->caps_use_count,
213 caps_avail_count); 198 mdsc->caps_reserve_count, mdsc->caps_avail_count);
214 return 0; 199 return 0;
215 200
216out_alloc_count: 201out_alloc_count:
@@ -220,26 +205,29 @@ out_alloc_count:
220 return ret; 205 return ret;
221} 206}
222 207
223int ceph_unreserve_caps(struct ceph_cap_reservation *ctx) 208int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
209 struct ceph_cap_reservation *ctx)
224{ 210{
225 dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); 211 dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
226 if (ctx->count) { 212 if (ctx->count) {
227 spin_lock(&caps_list_lock); 213 spin_lock(&mdsc->caps_list_lock);
228 BUG_ON(caps_reserve_count < ctx->count); 214 BUG_ON(mdsc->caps_reserve_count < ctx->count);
229 caps_reserve_count -= ctx->count; 215 mdsc->caps_reserve_count -= ctx->count;
230 caps_avail_count += ctx->count; 216 mdsc->caps_avail_count += ctx->count;
231 ctx->count = 0; 217 ctx->count = 0;
232 dout("unreserve caps %d = %d used + %d resv + %d avail\n", 218 dout("unreserve caps %d = %d used + %d resv + %d avail\n",
233 caps_total_count, caps_use_count, caps_reserve_count, 219 mdsc->caps_total_count, mdsc->caps_use_count,
234 caps_avail_count); 220 mdsc->caps_reserve_count, mdsc->caps_avail_count);
235 BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + 221 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
236 caps_avail_count); 222 mdsc->caps_reserve_count +
237 spin_unlock(&caps_list_lock); 223 mdsc->caps_avail_count);
224 spin_unlock(&mdsc->caps_list_lock);
238 } 225 }
239 return 0; 226 return 0;
240} 227}
241 228
242static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx) 229static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
230 struct ceph_cap_reservation *ctx)
243{ 231{
244 struct ceph_cap *cap = NULL; 232 struct ceph_cap *cap = NULL;
245 233
@@ -247,71 +235,74 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
247 if (!ctx) { 235 if (!ctx) {
248 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 236 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
249 if (cap) { 237 if (cap) {
250 caps_use_count++; 238 mdsc->caps_use_count++;
251 caps_total_count++; 239 mdsc->caps_total_count++;
252 } 240 }
253 return cap; 241 return cap;
254 } 242 }
255 243
256 spin_lock(&caps_list_lock); 244 spin_lock(&mdsc->caps_list_lock);
257 dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n", 245 dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
258 ctx, ctx->count, caps_total_count, caps_use_count, 246 ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
259 caps_reserve_count, caps_avail_count); 247 mdsc->caps_reserve_count, mdsc->caps_avail_count);
260 BUG_ON(!ctx->count); 248 BUG_ON(!ctx->count);
261 BUG_ON(ctx->count > caps_reserve_count); 249 BUG_ON(ctx->count > mdsc->caps_reserve_count);
262 BUG_ON(list_empty(&caps_list)); 250 BUG_ON(list_empty(&mdsc->caps_list));
263 251
264 ctx->count--; 252 ctx->count--;
265 caps_reserve_count--; 253 mdsc->caps_reserve_count--;
266 caps_use_count++; 254 mdsc->caps_use_count++;
267 255
268 cap = list_first_entry(&caps_list, struct ceph_cap, caps_item); 256 cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
269 list_del(&cap->caps_item); 257 list_del(&cap->caps_item);
270 258
271 BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + 259 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
272 caps_avail_count); 260 mdsc->caps_reserve_count + mdsc->caps_avail_count);
273 spin_unlock(&caps_list_lock); 261 spin_unlock(&mdsc->caps_list_lock);
274 return cap; 262 return cap;
275} 263}
276 264
277void ceph_put_cap(struct ceph_cap *cap) 265void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
278{ 266{
279 spin_lock(&caps_list_lock); 267 spin_lock(&mdsc->caps_list_lock);
280 dout("put_cap %p %d = %d used + %d resv + %d avail\n", 268 dout("put_cap %p %d = %d used + %d resv + %d avail\n",
281 cap, caps_total_count, caps_use_count, 269 cap, mdsc->caps_total_count, mdsc->caps_use_count,
282 caps_reserve_count, caps_avail_count); 270 mdsc->caps_reserve_count, mdsc->caps_avail_count);
283 caps_use_count--; 271 mdsc->caps_use_count--;
284 /* 272 /*
285 * Keep some preallocated caps around (ceph_min_count), to 273 * Keep some preallocated caps around (ceph_min_count), to
286 * avoid lots of free/alloc churn. 274 * avoid lots of free/alloc churn.
287 */ 275 */
288 if (caps_avail_count >= caps_reserve_count + caps_min_count) { 276 if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
289 caps_total_count--; 277 mdsc->caps_min_count) {
278 mdsc->caps_total_count--;
290 kmem_cache_free(ceph_cap_cachep, cap); 279 kmem_cache_free(ceph_cap_cachep, cap);
291 } else { 280 } else {
292 caps_avail_count++; 281 mdsc->caps_avail_count++;
293 list_add(&cap->caps_item, &caps_list); 282 list_add(&cap->caps_item, &mdsc->caps_list);
294 } 283 }
295 284
296 BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + 285 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
297 caps_avail_count); 286 mdsc->caps_reserve_count + mdsc->caps_avail_count);
298 spin_unlock(&caps_list_lock); 287 spin_unlock(&mdsc->caps_list_lock);
299} 288}
300 289
301void ceph_reservation_status(struct ceph_client *client, 290void ceph_reservation_status(struct ceph_client *client,
302 int *total, int *avail, int *used, int *reserved, 291 int *total, int *avail, int *used, int *reserved,
303 int *min) 292 int *min)
304{ 293{
294 struct ceph_mds_client *mdsc = &client->mdsc;
295
305 if (total) 296 if (total)
306 *total = caps_total_count; 297 *total = mdsc->caps_total_count;
307 if (avail) 298 if (avail)
308 *avail = caps_avail_count; 299 *avail = mdsc->caps_avail_count;
309 if (used) 300 if (used)
310 *used = caps_use_count; 301 *used = mdsc->caps_use_count;
311 if (reserved) 302 if (reserved)
312 *reserved = caps_reserve_count; 303 *reserved = mdsc->caps_reserve_count;
313 if (min) 304 if (min)
314 *min = caps_min_count; 305 *min = mdsc->caps_min_count;
315} 306}
316 307
317/* 308/*
@@ -336,22 +327,29 @@ static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
336 return NULL; 327 return NULL;
337} 328}
338 329
330struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
331{
332 struct ceph_cap *cap;
333
334 spin_lock(&ci->vfs_inode.i_lock);
335 cap = __get_cap_for_mds(ci, mds);
336 spin_unlock(&ci->vfs_inode.i_lock);
337 return cap;
338}
339
339/* 340/*
340 * Return id of any MDS with a cap, preferably FILE_WR|WRBUFFER|EXCL, else 341 * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
341 * -1.
342 */ 342 */
343static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq) 343static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
344{ 344{
345 struct ceph_cap *cap; 345 struct ceph_cap *cap;
346 int mds = -1; 346 int mds = -1;
347 struct rb_node *p; 347 struct rb_node *p;
348 348
349 /* prefer mds with WR|WRBUFFER|EXCL caps */ 349 /* prefer mds with WR|BUFFER|EXCL caps */
350 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 350 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
351 cap = rb_entry(p, struct ceph_cap, ci_node); 351 cap = rb_entry(p, struct ceph_cap, ci_node);
352 mds = cap->mds; 352 mds = cap->mds;
353 if (mseq)
354 *mseq = cap->mseq;
355 if (cap->issued & (CEPH_CAP_FILE_WR | 353 if (cap->issued & (CEPH_CAP_FILE_WR |
356 CEPH_CAP_FILE_BUFFER | 354 CEPH_CAP_FILE_BUFFER |
357 CEPH_CAP_FILE_EXCL)) 355 CEPH_CAP_FILE_EXCL))
@@ -364,7 +362,7 @@ int ceph_get_cap_mds(struct inode *inode)
364{ 362{
365 int mds; 363 int mds;
366 spin_lock(&inode->i_lock); 364 spin_lock(&inode->i_lock);
367 mds = __ceph_get_cap_mds(ceph_inode(inode), NULL); 365 mds = __ceph_get_cap_mds(ceph_inode(inode));
368 spin_unlock(&inode->i_lock); 366 spin_unlock(&inode->i_lock);
369 return mds; 367 return mds;
370} 368}
@@ -483,8 +481,8 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
483 * Each time we receive FILE_CACHE anew, we increment 481 * Each time we receive FILE_CACHE anew, we increment
484 * i_rdcache_gen. 482 * i_rdcache_gen.
485 */ 483 */
486 if ((issued & CEPH_CAP_FILE_CACHE) && 484 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
487 (had & CEPH_CAP_FILE_CACHE) == 0) 485 (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
488 ci->i_rdcache_gen++; 486 ci->i_rdcache_gen++;
489 487
490 /* 488 /*
@@ -543,7 +541,7 @@ retry:
543 new_cap = NULL; 541 new_cap = NULL;
544 } else { 542 } else {
545 spin_unlock(&inode->i_lock); 543 spin_unlock(&inode->i_lock);
546 new_cap = get_cap(caps_reservation); 544 new_cap = get_cap(mdsc, caps_reservation);
547 if (new_cap == NULL) 545 if (new_cap == NULL)
548 return -ENOMEM; 546 return -ENOMEM;
549 goto retry; 547 goto retry;
@@ -588,6 +586,7 @@ retry:
588 } else { 586 } else {
589 pr_err("ceph_add_cap: couldn't find snap realm %llx\n", 587 pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
590 realmino); 588 realmino);
589 WARN_ON(!realm);
591 } 590 }
592 } 591 }
593 592
@@ -831,7 +830,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
831{ 830{
832 int want = 0; 831 int want = 0;
833 int mode; 832 int mode;
834 for (mode = 0; mode < 4; mode++) 833 for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++)
835 if (ci->i_nr_by_mode[mode]) 834 if (ci->i_nr_by_mode[mode])
836 want |= ceph_caps_for_mode(mode); 835 want |= ceph_caps_for_mode(mode);
837 return want; 836 return want;
@@ -901,7 +900,7 @@ void __ceph_remove_cap(struct ceph_cap *cap)
901 ci->i_auth_cap = NULL; 900 ci->i_auth_cap = NULL;
902 901
903 if (removed) 902 if (removed)
904 ceph_put_cap(cap); 903 ceph_put_cap(mdsc, cap);
905 904
906 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) { 905 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
907 struct ceph_snap_realm *realm = ci->i_snap_realm; 906 struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -1197,6 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1197 */ 1196 */
1198void __ceph_flush_snaps(struct ceph_inode_info *ci, 1197void __ceph_flush_snaps(struct ceph_inode_info *ci,
1199 struct ceph_mds_session **psession) 1198 struct ceph_mds_session **psession)
1199 __releases(ci->vfs_inode->i_lock)
1200 __acquires(ci->vfs_inode->i_lock)
1200{ 1201{
1201 struct inode *inode = &ci->vfs_inode; 1202 struct inode *inode = &ci->vfs_inode;
1202 int mds; 1203 int mds;
@@ -1232,7 +1233,13 @@ retry:
1232 BUG_ON(capsnap->dirty == 0); 1233 BUG_ON(capsnap->dirty == 0);
1233 1234
1234 /* pick mds, take s_mutex */ 1235 /* pick mds, take s_mutex */
1235 mds = __ceph_get_cap_mds(ci, &mseq); 1236 if (ci->i_auth_cap == NULL) {
1237 dout("no auth cap (migrating?), doing nothing\n");
1238 goto out;
1239 }
1240 mds = ci->i_auth_cap->session->s_mds;
1241 mseq = ci->i_auth_cap->mseq;
1242
1236 if (session && session->s_mds != mds) { 1243 if (session && session->s_mds != mds) {
1237 dout("oops, wrong session %p mutex\n", session); 1244 dout("oops, wrong session %p mutex\n", session);
1238 mutex_unlock(&session->s_mutex); 1245 mutex_unlock(&session->s_mutex);
@@ -1251,8 +1258,8 @@ retry:
1251 } 1258 }
1252 /* 1259 /*
1253 * if session == NULL, we raced against a cap 1260 * if session == NULL, we raced against a cap
1254 * deletion. retry, and we'll get a better 1261 * deletion or migration. retry, and we'll
1255 * @mds value next time. 1262 * get a better @mds value next time.
1256 */ 1263 */
1257 spin_lock(&inode->i_lock); 1264 spin_lock(&inode->i_lock);
1258 goto retry; 1265 goto retry;
@@ -1290,6 +1297,7 @@ retry:
1290 list_del_init(&ci->i_snap_flush_item); 1297 list_del_init(&ci->i_snap_flush_item);
1291 spin_unlock(&mdsc->snap_flush_lock); 1298 spin_unlock(&mdsc->snap_flush_lock);
1292 1299
1300out:
1293 if (psession) 1301 if (psession)
1294 *psession = session; 1302 *psession = session;
1295 else if (session) { 1303 else if (session) {
@@ -1435,7 +1443,6 @@ static int try_nonblocking_invalidate(struct inode *inode)
1435 */ 1443 */
1436void ceph_check_caps(struct ceph_inode_info *ci, int flags, 1444void ceph_check_caps(struct ceph_inode_info *ci, int flags,
1437 struct ceph_mds_session *session) 1445 struct ceph_mds_session *session)
1438 __releases(session->s_mutex)
1439{ 1446{
1440 struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode); 1447 struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
1441 struct ceph_mds_client *mdsc = &client->mdsc; 1448 struct ceph_mds_client *mdsc = &client->mdsc;
@@ -1510,11 +1517,13 @@ retry_locked:
1510 ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ 1517 ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
1511 ci->i_rdcache_gen && /* may have cached pages */ 1518 ci->i_rdcache_gen && /* may have cached pages */
1512 (file_wanted == 0 || /* no open files */ 1519 (file_wanted == 0 || /* no open files */
1513 (revoking & CEPH_CAP_FILE_CACHE)) && /* or revoking cache */ 1520 (revoking & (CEPH_CAP_FILE_CACHE|
1521 CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */
1514 !tried_invalidate) { 1522 !tried_invalidate) {
1515 dout("check_caps trying to invalidate on %p\n", inode); 1523 dout("check_caps trying to invalidate on %p\n", inode);
1516 if (try_nonblocking_invalidate(inode) < 0) { 1524 if (try_nonblocking_invalidate(inode) < 0) {
1517 if (revoking & CEPH_CAP_FILE_CACHE) { 1525 if (revoking & (CEPH_CAP_FILE_CACHE|
1526 CEPH_CAP_FILE_LAZYIO)) {
1518 dout("check_caps queuing invalidate\n"); 1527 dout("check_caps queuing invalidate\n");
1519 queue_invalidate = 1; 1528 queue_invalidate = 1;
1520 ci->i_rdcache_revoking = ci->i_rdcache_gen; 1529 ci->i_rdcache_revoking = ci->i_rdcache_gen;
@@ -2250,8 +2259,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2250 struct ceph_mds_session *session, 2259 struct ceph_mds_session *session,
2251 struct ceph_cap *cap, 2260 struct ceph_cap *cap,
2252 struct ceph_buffer *xattr_buf) 2261 struct ceph_buffer *xattr_buf)
2253 __releases(inode->i_lock) 2262 __releases(inode->i_lock)
2254 __releases(session->s_mutex)
2255{ 2263{
2256 struct ceph_inode_info *ci = ceph_inode(inode); 2264 struct ceph_inode_info *ci = ceph_inode(inode);
2257 int mds = session->s_mds; 2265 int mds = session->s_mds;
@@ -2278,6 +2286,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2278 * will invalidate _after_ writeback.) 2286 * will invalidate _after_ writeback.)
2279 */ 2287 */
2280 if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && 2288 if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
2289 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
2281 !ci->i_wrbuffer_ref) { 2290 !ci->i_wrbuffer_ref) {
2282 if (try_nonblocking_invalidate(inode) == 0) { 2291 if (try_nonblocking_invalidate(inode) == 0) {
2283 revoked_rdcache = 1; 2292 revoked_rdcache = 1;
@@ -2369,15 +2378,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2369 2378
2370 /* revocation, grant, or no-op? */ 2379 /* revocation, grant, or no-op? */
2371 if (cap->issued & ~newcaps) { 2380 if (cap->issued & ~newcaps) {
2372 dout("revocation: %s -> %s\n", ceph_cap_string(cap->issued), 2381 int revoking = cap->issued & ~newcaps;
2373 ceph_cap_string(newcaps)); 2382
2374 if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER) 2383 dout("revocation: %s -> %s (revoking %s)\n",
2375 writeback = 1; /* will delay ack */ 2384 ceph_cap_string(cap->issued),
2376 else if (dirty & ~newcaps) 2385 ceph_cap_string(newcaps),
2377 check_caps = 1; /* initiate writeback in check_caps */ 2386 ceph_cap_string(revoking));
2378 else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 || 2387 if (revoking & used & CEPH_CAP_FILE_BUFFER)
2379 revoked_rdcache) 2388 writeback = 1; /* initiate writeback; will delay ack */
2380 check_caps = 2; /* send revoke ack in check_caps */ 2389 else if (revoking == CEPH_CAP_FILE_CACHE &&
2390 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
2391 queue_invalidate)
2392 ; /* do nothing yet, invalidation will be queued */
2393 else if (cap == ci->i_auth_cap)
2394 check_caps = 1; /* check auth cap only */
2395 else
2396 check_caps = 2; /* check all caps */
2381 cap->issued = newcaps; 2397 cap->issued = newcaps;
2382 cap->implemented |= newcaps; 2398 cap->implemented |= newcaps;
2383 } else if (cap->issued == newcaps) { 2399 } else if (cap->issued == newcaps) {
@@ -2568,7 +2584,8 @@ static void handle_cap_trunc(struct inode *inode,
2568 * caller holds s_mutex 2584 * caller holds s_mutex
2569 */ 2585 */
2570static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, 2586static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2571 struct ceph_mds_session *session) 2587 struct ceph_mds_session *session,
2588 int *open_target_sessions)
2572{ 2589{
2573 struct ceph_inode_info *ci = ceph_inode(inode); 2590 struct ceph_inode_info *ci = ceph_inode(inode);
2574 int mds = session->s_mds; 2591 int mds = session->s_mds;
@@ -2600,6 +2617,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2600 ci->i_cap_exporting_mds = mds; 2617 ci->i_cap_exporting_mds = mds;
2601 ci->i_cap_exporting_mseq = mseq; 2618 ci->i_cap_exporting_mseq = mseq;
2602 ci->i_cap_exporting_issued = cap->issued; 2619 ci->i_cap_exporting_issued = cap->issued;
2620
2621 /*
2622 * make sure we have open sessions with all possible
2623 * export targets, so that we get the matching IMPORT
2624 */
2625 *open_target_sessions = 1;
2603 } 2626 }
2604 __ceph_remove_cap(cap); 2627 __ceph_remove_cap(cap);
2605 } 2628 }
@@ -2675,6 +2698,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2675 u64 size, max_size; 2698 u64 size, max_size;
2676 u64 tid; 2699 u64 tid;
2677 void *snaptrace; 2700 void *snaptrace;
2701 size_t snaptrace_len;
2702 void *flock;
2703 u32 flock_len;
2704 int open_target_sessions = 0;
2678 2705
2679 dout("handle_caps from mds%d\n", mds); 2706 dout("handle_caps from mds%d\n", mds);
2680 2707
@@ -2683,7 +2710,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2683 if (msg->front.iov_len < sizeof(*h)) 2710 if (msg->front.iov_len < sizeof(*h))
2684 goto bad; 2711 goto bad;
2685 h = msg->front.iov_base; 2712 h = msg->front.iov_base;
2686 snaptrace = h + 1;
2687 op = le32_to_cpu(h->op); 2713 op = le32_to_cpu(h->op);
2688 vino.ino = le64_to_cpu(h->ino); 2714 vino.ino = le64_to_cpu(h->ino);
2689 vino.snap = CEPH_NOSNAP; 2715 vino.snap = CEPH_NOSNAP;
@@ -2693,6 +2719,21 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2693 size = le64_to_cpu(h->size); 2719 size = le64_to_cpu(h->size);
2694 max_size = le64_to_cpu(h->max_size); 2720 max_size = le64_to_cpu(h->max_size);
2695 2721
2722 snaptrace = h + 1;
2723 snaptrace_len = le32_to_cpu(h->snap_trace_len);
2724
2725 if (le16_to_cpu(msg->hdr.version) >= 2) {
2726 void *p, *end;
2727
2728 p = snaptrace + snaptrace_len;
2729 end = msg->front.iov_base + msg->front.iov_len;
2730 ceph_decode_32_safe(&p, end, flock_len, bad);
2731 flock = p;
2732 } else {
2733 flock = NULL;
2734 flock_len = 0;
2735 }
2736
2696 mutex_lock(&session->s_mutex); 2737 mutex_lock(&session->s_mutex);
2697 session->s_seq++; 2738 session->s_seq++;
2698 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 2739 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2714,7 +2755,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2714 * along for the mds (who clearly thinks we still have this 2755 * along for the mds (who clearly thinks we still have this
2715 * cap). 2756 * cap).
2716 */ 2757 */
2717 ceph_add_cap_releases(mdsc, session, -1); 2758 ceph_add_cap_releases(mdsc, session);
2718 ceph_send_cap_releases(mdsc, session); 2759 ceph_send_cap_releases(mdsc, session);
2719 goto done; 2760 goto done;
2720 } 2761 }
@@ -2726,12 +2767,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2726 goto done; 2767 goto done;
2727 2768
2728 case CEPH_CAP_OP_EXPORT: 2769 case CEPH_CAP_OP_EXPORT:
2729 handle_cap_export(inode, h, session); 2770 handle_cap_export(inode, h, session, &open_target_sessions);
2730 goto done; 2771 goto done;
2731 2772
2732 case CEPH_CAP_OP_IMPORT: 2773 case CEPH_CAP_OP_IMPORT:
2733 handle_cap_import(mdsc, inode, h, session, 2774 handle_cap_import(mdsc, inode, h, session,
2734 snaptrace, le32_to_cpu(h->snap_trace_len)); 2775 snaptrace, snaptrace_len);
2735 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, 2776 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
2736 session); 2777 session);
2737 goto done_unlocked; 2778 goto done_unlocked;
@@ -2773,6 +2814,8 @@ done:
2773done_unlocked: 2814done_unlocked:
2774 if (inode) 2815 if (inode)
2775 iput(inode); 2816 iput(inode);
2817 if (open_target_sessions)
2818 ceph_mdsc_open_export_target_sessions(mdsc, session);
2776 return; 2819 return;
2777 2820
2778bad: 2821bad: