aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/Kconfig1
-rw-r--r--fs/ceph/addr.c19
-rw-r--r--fs/ceph/auth_x.c15
-rw-r--r--fs/ceph/caps.c59
-rw-r--r--fs/ceph/debugfs.c4
-rw-r--r--fs/ceph/dir.c12
-rw-r--r--fs/ceph/inode.c16
-rw-r--r--fs/ceph/locks.c14
-rw-r--r--fs/ceph/mds_client.c103
-rw-r--r--fs/ceph/mds_client.h3
-rw-r--r--fs/ceph/osd_client.c2
-rw-r--r--fs/ceph/pagelist.c12
-rw-r--r--fs/ceph/snap.c177
-rw-r--r--fs/ceph/super.h16
-rw-r--r--fs/ceph/xattr.c1
15 files changed, 266 insertions, 188 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index bc87b9c1d27e..0fcd2640c23f 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -3,6 +3,7 @@ config CEPH_FS
3 depends on INET && EXPERIMENTAL 3 depends on INET && EXPERIMENTAL
4 select LIBCRC32C 4 select LIBCRC32C
5 select CRYPTO_AES 5 select CRYPTO_AES
6 select CRYPTO
6 help 7 help
7 Choose Y or M here to include support for mounting the 8 Choose Y or M here to include support for mounting the
8 experimental Ceph distributed file system. Ceph is an extremely 9 experimental Ceph distributed file system. Ceph is an extremely
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 5598a0d02295..efbc604001c8 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -87,7 +87,7 @@ static int ceph_set_page_dirty(struct page *page)
87 87
88 /* dirty the head */ 88 /* dirty the head */
89 spin_lock(&inode->i_lock); 89 spin_lock(&inode->i_lock);
90 if (ci->i_wrbuffer_ref_head == 0) 90 if (ci->i_head_snapc == NULL)
91 ci->i_head_snapc = ceph_get_snap_context(snapc); 91 ci->i_head_snapc = ceph_get_snap_context(snapc);
92 ++ci->i_wrbuffer_ref_head; 92 ++ci->i_wrbuffer_ref_head;
93 if (ci->i_wrbuffer_ref == 0) 93 if (ci->i_wrbuffer_ref == 0)
@@ -105,13 +105,7 @@ static int ceph_set_page_dirty(struct page *page)
105 spin_lock_irq(&mapping->tree_lock); 105 spin_lock_irq(&mapping->tree_lock);
106 if (page->mapping) { /* Race with truncate? */ 106 if (page->mapping) { /* Race with truncate? */
107 WARN_ON_ONCE(!PageUptodate(page)); 107 WARN_ON_ONCE(!PageUptodate(page));
108 108 account_page_dirtied(page, page->mapping);
109 if (mapping_cap_account_dirty(mapping)) {
110 __inc_zone_page_state(page, NR_FILE_DIRTY);
111 __inc_bdi_stat(mapping->backing_dev_info,
112 BDI_RECLAIMABLE);
113 task_io_account_write(PAGE_CACHE_SIZE);
114 }
115 radix_tree_tag_set(&mapping->page_tree, 109 radix_tree_tag_set(&mapping->page_tree,
116 page_index(page), PAGECACHE_TAG_DIRTY); 110 page_index(page), PAGECACHE_TAG_DIRTY);
117 111
@@ -352,7 +346,7 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
352 break; 346 break;
353 } 347 }
354 } 348 }
355 if (!snapc && ci->i_head_snapc) { 349 if (!snapc && ci->i_wrbuffer_ref_head) {
356 snapc = ceph_get_snap_context(ci->i_head_snapc); 350 snapc = ceph_get_snap_context(ci->i_head_snapc);
357 dout(" head snapc %p has %d dirty pages\n", 351 dout(" head snapc %p has %d dirty pages\n",
358 snapc, ci->i_wrbuffer_ref_head); 352 snapc, ci->i_wrbuffer_ref_head);
@@ -417,8 +411,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
417 if (i_size < page_off + len) 411 if (i_size < page_off + len)
418 len = i_size - page_off; 412 len = i_size - page_off;
419 413
420 dout("writepage %p page %p index %lu on %llu~%u\n", 414 dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
421 inode, page, page->index, page_off, len); 415 inode, page, page->index, page_off, len, snapc);
422 416
423 writeback_stat = atomic_long_inc_return(&client->writeback_count); 417 writeback_stat = atomic_long_inc_return(&client->writeback_count);
424 if (writeback_stat > 418 if (writeback_stat >
@@ -772,7 +766,8 @@ get_more_pages:
772 /* ok */ 766 /* ok */
773 if (locked_pages == 0) { 767 if (locked_pages == 0) {
774 /* prepare async write request */ 768 /* prepare async write request */
775 offset = page->index << PAGE_CACHE_SHIFT; 769 offset = (unsigned long long)page->index
770 << PAGE_CACHE_SHIFT;
776 len = wsize; 771 len = wsize;
777 req = ceph_osdc_new_request(&client->osdc, 772 req = ceph_osdc_new_request(&client->osdc,
778 &ci->i_layout, 773 &ci->i_layout,
diff --git a/fs/ceph/auth_x.c b/fs/ceph/auth_x.c
index 582e0b2caf8a..a2d002cbdec2 100644
--- a/fs/ceph/auth_x.c
+++ b/fs/ceph/auth_x.c
@@ -376,7 +376,7 @@ static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed)
376 376
377 th = get_ticket_handler(ac, service); 377 th = get_ticket_handler(ac, service);
378 378
379 if (!th) { 379 if (IS_ERR(th)) {
380 *pneed |= service; 380 *pneed |= service;
381 continue; 381 continue;
382 } 382 }
@@ -399,6 +399,9 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
399 struct ceph_x_ticket_handler *th = 399 struct ceph_x_ticket_handler *th =
400 get_ticket_handler(ac, CEPH_ENTITY_TYPE_AUTH); 400 get_ticket_handler(ac, CEPH_ENTITY_TYPE_AUTH);
401 401
402 if (IS_ERR(th))
403 return PTR_ERR(th);
404
402 ceph_x_validate_tickets(ac, &need); 405 ceph_x_validate_tickets(ac, &need);
403 406
404 dout("build_request want %x have %x need %x\n", 407 dout("build_request want %x have %x need %x\n",
@@ -450,7 +453,6 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
450 return -ERANGE; 453 return -ERANGE;
451 head->op = cpu_to_le16(CEPHX_GET_PRINCIPAL_SESSION_KEY); 454 head->op = cpu_to_le16(CEPHX_GET_PRINCIPAL_SESSION_KEY);
452 455
453 BUG_ON(!th);
454 ret = ceph_x_build_authorizer(ac, th, &xi->auth_authorizer); 456 ret = ceph_x_build_authorizer(ac, th, &xi->auth_authorizer);
455 if (ret) 457 if (ret)
456 return ret; 458 return ret;
@@ -505,7 +507,8 @@ static int ceph_x_handle_reply(struct ceph_auth_client *ac, int result,
505 507
506 case CEPHX_GET_PRINCIPAL_SESSION_KEY: 508 case CEPHX_GET_PRINCIPAL_SESSION_KEY:
507 th = get_ticket_handler(ac, CEPH_ENTITY_TYPE_AUTH); 509 th = get_ticket_handler(ac, CEPH_ENTITY_TYPE_AUTH);
508 BUG_ON(!th); 510 if (IS_ERR(th))
511 return PTR_ERR(th);
509 ret = ceph_x_proc_ticket_reply(ac, &th->session_key, 512 ret = ceph_x_proc_ticket_reply(ac, &th->session_key,
510 buf + sizeof(*head), end); 513 buf + sizeof(*head), end);
511 break; 514 break;
@@ -563,8 +566,8 @@ static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
563 void *end = p + sizeof(au->reply_buf); 566 void *end = p + sizeof(au->reply_buf);
564 567
565 th = get_ticket_handler(ac, au->service); 568 th = get_ticket_handler(ac, au->service);
566 if (!th) 569 if (IS_ERR(th))
567 return -EIO; /* hrm! */ 570 return PTR_ERR(th);
568 ret = ceph_x_decrypt(&th->session_key, &p, end, &reply, sizeof(reply)); 571 ret = ceph_x_decrypt(&th->session_key, &p, end, &reply, sizeof(reply));
569 if (ret < 0) 572 if (ret < 0)
570 return ret; 573 return ret;
@@ -626,7 +629,7 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
626 struct ceph_x_ticket_handler *th; 629 struct ceph_x_ticket_handler *th;
627 630
628 th = get_ticket_handler(ac, peer_type); 631 th = get_ticket_handler(ac, peer_type);
629 if (th && !IS_ERR(th)) 632 if (!IS_ERR(th))
630 remove_ticket_handler(ac, th); 633 remove_ticket_handler(ac, th);
631} 634}
632 635
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 7bf182b03973..73c153092f72 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -814,7 +814,7 @@ int __ceph_caps_used(struct ceph_inode_info *ci)
814 used |= CEPH_CAP_PIN; 814 used |= CEPH_CAP_PIN;
815 if (ci->i_rd_ref) 815 if (ci->i_rd_ref)
816 used |= CEPH_CAP_FILE_RD; 816 used |= CEPH_CAP_FILE_RD;
817 if (ci->i_rdcache_ref || ci->i_rdcache_gen) 817 if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages)
818 used |= CEPH_CAP_FILE_CACHE; 818 used |= CEPH_CAP_FILE_CACHE;
819 if (ci->i_wr_ref) 819 if (ci->i_wr_ref)
820 used |= CEPH_CAP_FILE_WR; 820 used |= CEPH_CAP_FILE_WR;
@@ -1082,6 +1082,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1082 gid_t gid; 1082 gid_t gid;
1083 struct ceph_mds_session *session; 1083 struct ceph_mds_session *session;
1084 u64 xattr_version = 0; 1084 u64 xattr_version = 0;
1085 struct ceph_buffer *xattr_blob = NULL;
1085 int delayed = 0; 1086 int delayed = 0;
1086 u64 flush_tid = 0; 1087 u64 flush_tid = 0;
1087 int i; 1088 int i;
@@ -1142,6 +1143,10 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1142 for (i = 0; i < CEPH_CAP_BITS; i++) 1143 for (i = 0; i < CEPH_CAP_BITS; i++)
1143 if (flushing & (1 << i)) 1144 if (flushing & (1 << i))
1144 ci->i_cap_flush_tid[i] = flush_tid; 1145 ci->i_cap_flush_tid[i] = flush_tid;
1146
1147 follows = ci->i_head_snapc->seq;
1148 } else {
1149 follows = 0;
1145 } 1150 }
1146 1151
1147 keep = cap->implemented; 1152 keep = cap->implemented;
@@ -1155,14 +1160,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1155 mtime = inode->i_mtime; 1160 mtime = inode->i_mtime;
1156 atime = inode->i_atime; 1161 atime = inode->i_atime;
1157 time_warp_seq = ci->i_time_warp_seq; 1162 time_warp_seq = ci->i_time_warp_seq;
1158 follows = ci->i_snap_realm->cached_context->seq;
1159 uid = inode->i_uid; 1163 uid = inode->i_uid;
1160 gid = inode->i_gid; 1164 gid = inode->i_gid;
1161 mode = inode->i_mode; 1165 mode = inode->i_mode;
1162 1166
1163 if (dropping & CEPH_CAP_XATTR_EXCL) { 1167 if (flushing & CEPH_CAP_XATTR_EXCL) {
1164 __ceph_build_xattrs_blob(ci); 1168 __ceph_build_xattrs_blob(ci);
1165 xattr_version = ci->i_xattrs.version + 1; 1169 xattr_blob = ci->i_xattrs.blob;
1170 xattr_version = ci->i_xattrs.version;
1166 } 1171 }
1167 1172
1168 spin_unlock(&inode->i_lock); 1173 spin_unlock(&inode->i_lock);
@@ -1170,9 +1175,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1170 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, 1175 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
1171 op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, 1176 op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
1172 size, max_size, &mtime, &atime, time_warp_seq, 1177 size, max_size, &mtime, &atime, time_warp_seq,
1173 uid, gid, mode, 1178 uid, gid, mode, xattr_version, xattr_blob,
1174 xattr_version,
1175 (flushing & CEPH_CAP_XATTR_EXCL) ? ci->i_xattrs.blob : NULL,
1176 follows); 1179 follows);
1177 if (ret < 0) { 1180 if (ret < 0) {
1178 dout("error sending cap msg, must requeue %p\n", inode); 1181 dout("error sending cap msg, must requeue %p\n", inode);
@@ -1192,10 +1195,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1192 * asynchronously back to the MDS once sync writes complete and dirty 1195 * asynchronously back to the MDS once sync writes complete and dirty
1193 * data is written out. 1196 * data is written out.
1194 * 1197 *
1198 * Unless @again is true, skip cap_snaps that were already sent to
1199 * the MDS (i.e., during this session).
1200 *
1195 * Called under i_lock. Takes s_mutex as needed. 1201 * Called under i_lock. Takes s_mutex as needed.
1196 */ 1202 */
1197void __ceph_flush_snaps(struct ceph_inode_info *ci, 1203void __ceph_flush_snaps(struct ceph_inode_info *ci,
1198 struct ceph_mds_session **psession) 1204 struct ceph_mds_session **psession,
1205 int again)
1199 __releases(ci->vfs_inode->i_lock) 1206 __releases(ci->vfs_inode->i_lock)
1200 __acquires(ci->vfs_inode->i_lock) 1207 __acquires(ci->vfs_inode->i_lock)
1201{ 1208{
@@ -1224,7 +1231,7 @@ retry:
1224 * pages to be written out. 1231 * pages to be written out.
1225 */ 1232 */
1226 if (capsnap->dirty_pages || capsnap->writing) 1233 if (capsnap->dirty_pages || capsnap->writing)
1227 continue; 1234 break;
1228 1235
1229 /* 1236 /*
1230 * if cap writeback already occurred, we should have dropped 1237 * if cap writeback already occurred, we should have dropped
@@ -1237,6 +1244,13 @@ retry:
1237 dout("no auth cap (migrating?), doing nothing\n"); 1244 dout("no auth cap (migrating?), doing nothing\n");
1238 goto out; 1245 goto out;
1239 } 1246 }
1247
1248 /* only flush each capsnap once */
1249 if (!again && !list_empty(&capsnap->flushing_item)) {
1250 dout("already flushed %p, skipping\n", capsnap);
1251 continue;
1252 }
1253
1240 mds = ci->i_auth_cap->session->s_mds; 1254 mds = ci->i_auth_cap->session->s_mds;
1241 mseq = ci->i_auth_cap->mseq; 1255 mseq = ci->i_auth_cap->mseq;
1242 1256
@@ -1273,8 +1287,8 @@ retry:
1273 &session->s_cap_snaps_flushing); 1287 &session->s_cap_snaps_flushing);
1274 spin_unlock(&inode->i_lock); 1288 spin_unlock(&inode->i_lock);
1275 1289
1276 dout("flush_snaps %p cap_snap %p follows %lld size %llu\n", 1290 dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
1277 inode, capsnap, next_follows, capsnap->size); 1291 inode, capsnap, capsnap->follows, capsnap->flush_tid);
1278 send_cap_msg(session, ceph_vino(inode).ino, 0, 1292 send_cap_msg(session, ceph_vino(inode).ino, 0,
1279 CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, 1293 CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
1280 capsnap->dirty, 0, capsnap->flush_tid, 0, mseq, 1294 capsnap->dirty, 0, capsnap->flush_tid, 0, mseq,
@@ -1282,7 +1296,7 @@ retry:
1282 &capsnap->mtime, &capsnap->atime, 1296 &capsnap->mtime, &capsnap->atime,
1283 capsnap->time_warp_seq, 1297 capsnap->time_warp_seq,
1284 capsnap->uid, capsnap->gid, capsnap->mode, 1298 capsnap->uid, capsnap->gid, capsnap->mode,
1285 0, NULL, 1299 capsnap->xattr_version, capsnap->xattr_blob,
1286 capsnap->follows); 1300 capsnap->follows);
1287 1301
1288 next_follows = capsnap->follows + 1; 1302 next_follows = capsnap->follows + 1;
@@ -1311,7 +1325,7 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci)
1311 struct inode *inode = &ci->vfs_inode; 1325 struct inode *inode = &ci->vfs_inode;
1312 1326
1313 spin_lock(&inode->i_lock); 1327 spin_lock(&inode->i_lock);
1314 __ceph_flush_snaps(ci, NULL); 1328 __ceph_flush_snaps(ci, NULL, 0);
1315 spin_unlock(&inode->i_lock); 1329 spin_unlock(&inode->i_lock);
1316} 1330}
1317 1331
@@ -1332,7 +1346,11 @@ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1332 ceph_cap_string(was | mask)); 1346 ceph_cap_string(was | mask));
1333 ci->i_dirty_caps |= mask; 1347 ci->i_dirty_caps |= mask;
1334 if (was == 0) { 1348 if (was == 0) {
1335 dout(" inode %p now dirty\n", &ci->vfs_inode); 1349 if (!ci->i_head_snapc)
1350 ci->i_head_snapc = ceph_get_snap_context(
1351 ci->i_snap_realm->cached_context);
1352 dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode,
1353 ci->i_head_snapc);
1336 BUG_ON(!list_empty(&ci->i_dirty_item)); 1354 BUG_ON(!list_empty(&ci->i_dirty_item));
1337 spin_lock(&mdsc->cap_dirty_lock); 1355 spin_lock(&mdsc->cap_dirty_lock);
1338 list_add(&ci->i_dirty_item, &mdsc->cap_dirty); 1356 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
@@ -1470,7 +1488,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
1470 1488
1471 /* flush snaps first time around only */ 1489 /* flush snaps first time around only */
1472 if (!list_empty(&ci->i_cap_snaps)) 1490 if (!list_empty(&ci->i_cap_snaps))
1473 __ceph_flush_snaps(ci, &session); 1491 __ceph_flush_snaps(ci, &session, 0);
1474 goto retry_locked; 1492 goto retry_locked;
1475retry: 1493retry:
1476 spin_lock(&inode->i_lock); 1494 spin_lock(&inode->i_lock);
@@ -1887,7 +1905,7 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
1887 if (cap && cap->session == session) { 1905 if (cap && cap->session == session) {
1888 dout("kick_flushing_caps %p cap %p capsnap %p\n", inode, 1906 dout("kick_flushing_caps %p cap %p capsnap %p\n", inode,
1889 cap, capsnap); 1907 cap, capsnap);
1890 __ceph_flush_snaps(ci, &session); 1908 __ceph_flush_snaps(ci, &session, 1);
1891 } else { 1909 } else {
1892 pr_err("%p auth cap %p not mds%d ???\n", inode, 1910 pr_err("%p auth cap %p not mds%d ???\n", inode,
1893 cap, session->s_mds); 1911 cap, session->s_mds);
@@ -2190,7 +2208,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
2190 2208
2191 if (ci->i_head_snapc == snapc) { 2209 if (ci->i_head_snapc == snapc) {
2192 ci->i_wrbuffer_ref_head -= nr; 2210 ci->i_wrbuffer_ref_head -= nr;
2193 if (!ci->i_wrbuffer_ref_head) { 2211 if (ci->i_wrbuffer_ref_head == 0 &&
2212 ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) {
2213 BUG_ON(!ci->i_head_snapc);
2194 ceph_put_snap_context(ci->i_head_snapc); 2214 ceph_put_snap_context(ci->i_head_snapc);
2195 ci->i_head_snapc = NULL; 2215 ci->i_head_snapc = NULL;
2196 } 2216 }
@@ -2483,6 +2503,11 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
2483 dout(" inode %p now clean\n", inode); 2503 dout(" inode %p now clean\n", inode);
2484 BUG_ON(!list_empty(&ci->i_dirty_item)); 2504 BUG_ON(!list_empty(&ci->i_dirty_item));
2485 drop = 1; 2505 drop = 1;
2506 if (ci->i_wrbuffer_ref_head == 0) {
2507 BUG_ON(!ci->i_head_snapc);
2508 ceph_put_snap_context(ci->i_head_snapc);
2509 ci->i_head_snapc = NULL;
2510 }
2486 } else { 2511 } else {
2487 BUG_ON(list_empty(&ci->i_dirty_item)); 2512 BUG_ON(list_empty(&ci->i_dirty_item));
2488 } 2513 }
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 360c4f22718d..6fd8b20a8611 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -171,6 +171,8 @@ static int mdsc_show(struct seq_file *s, void *p)
171 } else if (req->r_dentry) { 171 } else if (req->r_dentry) {
172 path = ceph_mdsc_build_path(req->r_dentry, &pathlen, 172 path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
173 &pathbase, 0); 173 &pathbase, 0);
174 if (IS_ERR(path))
175 path = NULL;
174 spin_lock(&req->r_dentry->d_lock); 176 spin_lock(&req->r_dentry->d_lock);
175 seq_printf(s, " #%llx/%.*s (%s)", 177 seq_printf(s, " #%llx/%.*s (%s)",
176 ceph_ino(req->r_dentry->d_parent->d_inode), 178 ceph_ino(req->r_dentry->d_parent->d_inode),
@@ -187,6 +189,8 @@ static int mdsc_show(struct seq_file *s, void *p)
187 if (req->r_old_dentry) { 189 if (req->r_old_dentry) {
188 path = ceph_mdsc_build_path(req->r_old_dentry, &pathlen, 190 path = ceph_mdsc_build_path(req->r_old_dentry, &pathlen,
189 &pathbase, 0); 191 &pathbase, 0);
192 if (IS_ERR(path))
193 path = NULL;
190 spin_lock(&req->r_old_dentry->d_lock); 194 spin_lock(&req->r_old_dentry->d_lock);
191 seq_printf(s, " #%llx/%.*s (%s)", 195 seq_printf(s, " #%llx/%.*s (%s)",
192 ceph_ino(req->r_old_dentry->d_parent->d_inode), 196 ceph_ino(req->r_old_dentry->d_parent->d_inode),
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 67bbb41d5526..a1986eb52045 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -46,7 +46,7 @@ int ceph_init_dentry(struct dentry *dentry)
46 else 46 else
47 dentry->d_op = &ceph_snap_dentry_ops; 47 dentry->d_op = &ceph_snap_dentry_ops;
48 48
49 di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS); 49 di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO);
50 if (!di) 50 if (!di)
51 return -ENOMEM; /* oh well */ 51 return -ENOMEM; /* oh well */
52 52
@@ -1021,11 +1021,15 @@ out_touch:
1021static void ceph_dentry_release(struct dentry *dentry) 1021static void ceph_dentry_release(struct dentry *dentry)
1022{ 1022{
1023 struct ceph_dentry_info *di = ceph_dentry(dentry); 1023 struct ceph_dentry_info *di = ceph_dentry(dentry);
1024 struct inode *parent_inode = dentry->d_parent->d_inode; 1024 struct inode *parent_inode = NULL;
1025 u64 snapid = ceph_snap(parent_inode); 1025 u64 snapid = CEPH_NOSNAP;
1026 1026
1027 if (!IS_ROOT(dentry)) {
1028 parent_inode = dentry->d_parent->d_inode;
1029 if (parent_inode)
1030 snapid = ceph_snap(parent_inode);
1031 }
1027 dout("dentry_release %p parent %p\n", dentry, parent_inode); 1032 dout("dentry_release %p parent %p\n", dentry, parent_inode);
1028
1029 if (parent_inode && snapid != CEPH_SNAPDIR) { 1033 if (parent_inode && snapid != CEPH_SNAPDIR) {
1030 struct ceph_inode_info *ci = ceph_inode(parent_inode); 1034 struct ceph_inode_info *ci = ceph_inode(parent_inode);
1031 1035
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 5d893d31e399..62377ec37edf 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -677,6 +677,7 @@ static int fill_inode(struct inode *inode,
677 if (ci->i_files == 0 && ci->i_subdirs == 0 && 677 if (ci->i_files == 0 && ci->i_subdirs == 0 &&
678 ceph_snap(inode) == CEPH_NOSNAP && 678 ceph_snap(inode) == CEPH_NOSNAP &&
679 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) && 679 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
680 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
680 (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) { 681 (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
681 dout(" marking %p complete (empty)\n", inode); 682 dout(" marking %p complete (empty)\n", inode);
682 ci->i_ceph_flags |= CEPH_I_COMPLETE; 683 ci->i_ceph_flags |= CEPH_I_COMPLETE;
@@ -844,7 +845,7 @@ static void ceph_set_dentry_offset(struct dentry *dn)
844 * the caller) if we fail. 845 * the caller) if we fail.
845 */ 846 */
846static struct dentry *splice_dentry(struct dentry *dn, struct inode *in, 847static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
847 bool *prehash) 848 bool *prehash, bool set_offset)
848{ 849{
849 struct dentry *realdn; 850 struct dentry *realdn;
850 851
@@ -876,7 +877,8 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
876 } 877 }
877 if ((!prehash || *prehash) && d_unhashed(dn)) 878 if ((!prehash || *prehash) && d_unhashed(dn))
878 d_rehash(dn); 879 d_rehash(dn);
879 ceph_set_dentry_offset(dn); 880 if (set_offset)
881 ceph_set_dentry_offset(dn);
880out: 882out:
881 return dn; 883 return dn;
882} 884}
@@ -1061,7 +1063,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1061 d_delete(dn); 1063 d_delete(dn);
1062 goto done; 1064 goto done;
1063 } 1065 }
1064 dn = splice_dentry(dn, in, &have_lease); 1066 dn = splice_dentry(dn, in, &have_lease, true);
1065 if (IS_ERR(dn)) { 1067 if (IS_ERR(dn)) {
1066 err = PTR_ERR(dn); 1068 err = PTR_ERR(dn);
1067 goto done; 1069 goto done;
@@ -1104,7 +1106,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1104 goto done; 1106 goto done;
1105 } 1107 }
1106 dout(" linking snapped dir %p to dn %p\n", in, dn); 1108 dout(" linking snapped dir %p to dn %p\n", in, dn);
1107 dn = splice_dentry(dn, in, NULL); 1109 dn = splice_dentry(dn, in, NULL, true);
1108 if (IS_ERR(dn)) { 1110 if (IS_ERR(dn)) {
1109 err = PTR_ERR(dn); 1111 err = PTR_ERR(dn);
1110 goto done; 1112 goto done;
@@ -1229,14 +1231,14 @@ retry_lookup:
1229 in = dn->d_inode; 1231 in = dn->d_inode;
1230 } else { 1232 } else {
1231 in = ceph_get_inode(parent->d_sb, vino); 1233 in = ceph_get_inode(parent->d_sb, vino);
1232 if (in == NULL) { 1234 if (IS_ERR(in)) {
1233 dout("new_inode badness\n"); 1235 dout("new_inode badness\n");
1234 d_delete(dn); 1236 d_delete(dn);
1235 dput(dn); 1237 dput(dn);
1236 err = -ENOMEM; 1238 err = PTR_ERR(in);
1237 goto out; 1239 goto out;
1238 } 1240 }
1239 dn = splice_dentry(dn, in, NULL); 1241 dn = splice_dentry(dn, in, NULL, false);
1240 if (IS_ERR(dn)) 1242 if (IS_ERR(dn))
1241 dn = NULL; 1243 dn = NULL;
1242 } 1244 }
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index ae85af06454f..ff4e753aae92 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -82,7 +82,8 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
82 length = fl->fl_end - fl->fl_start + 1; 82 length = fl->fl_end - fl->fl_start + 1;
83 83
84 err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file, 84 err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
85 (u64)fl->fl_pid, (u64)fl->fl_nspid, 85 (u64)fl->fl_pid,
86 (u64)(unsigned long)fl->fl_nspid,
86 lock_cmd, fl->fl_start, 87 lock_cmd, fl->fl_start,
87 length, wait); 88 length, wait);
88 if (!err) { 89 if (!err) {
@@ -92,7 +93,8 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
92 /* undo! This should only happen if the kernel detects 93 /* undo! This should only happen if the kernel detects
93 * local deadlock. */ 94 * local deadlock. */
94 ceph_lock_message(CEPH_LOCK_FCNTL, op, file, 95 ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
95 (u64)fl->fl_pid, (u64)fl->fl_nspid, 96 (u64)fl->fl_pid,
97 (u64)(unsigned long)fl->fl_nspid,
96 CEPH_LOCK_UNLOCK, fl->fl_start, 98 CEPH_LOCK_UNLOCK, fl->fl_start,
97 length, 0); 99 length, 0);
98 dout("got %d on posix_lock_file, undid lock", err); 100 dout("got %d on posix_lock_file, undid lock", err);
@@ -132,7 +134,8 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
132 length = fl->fl_end - fl->fl_start + 1; 134 length = fl->fl_end - fl->fl_start + 1;
133 135
134 err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK, 136 err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK,
135 file, (u64)fl->fl_pid, (u64)fl->fl_nspid, 137 file, (u64)fl->fl_pid,
138 (u64)(unsigned long)fl->fl_nspid,
136 lock_cmd, fl->fl_start, 139 lock_cmd, fl->fl_start,
137 length, wait); 140 length, wait);
138 if (!err) { 141 if (!err) {
@@ -141,7 +144,7 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
141 ceph_lock_message(CEPH_LOCK_FLOCK, 144 ceph_lock_message(CEPH_LOCK_FLOCK,
142 CEPH_MDS_OP_SETFILELOCK, 145 CEPH_MDS_OP_SETFILELOCK,
143 file, (u64)fl->fl_pid, 146 file, (u64)fl->fl_pid,
144 (u64)fl->fl_nspid, 147 (u64)(unsigned long)fl->fl_nspid,
145 CEPH_LOCK_UNLOCK, fl->fl_start, 148 CEPH_LOCK_UNLOCK, fl->fl_start,
146 length, 0); 149 length, 0);
147 dout("got %d on flock_lock_file_wait, undid lock", err); 150 dout("got %d on flock_lock_file_wait, undid lock", err);
@@ -235,7 +238,8 @@ int lock_to_ceph_filelock(struct file_lock *lock,
235 cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1); 238 cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
236 cephlock->client = cpu_to_le64(0); 239 cephlock->client = cpu_to_le64(0);
237 cephlock->pid = cpu_to_le64(lock->fl_pid); 240 cephlock->pid = cpu_to_le64(lock->fl_pid);
238 cephlock->pid_namespace = cpu_to_le64((u64)lock->fl_nspid); 241 cephlock->pid_namespace =
242 cpu_to_le64((u64)(unsigned long)lock->fl_nspid);
239 243
240 switch (lock->fl_type) { 244 switch (lock->fl_type) {
241 case F_RDLCK: 245 case F_RDLCK:
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a75ddbf9fe37..fad95f8f2608 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -560,6 +560,13 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
560 * 560 *
561 * Called under mdsc->mutex. 561 * Called under mdsc->mutex.
562 */ 562 */
563struct dentry *get_nonsnap_parent(struct dentry *dentry)
564{
565 while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
566 dentry = dentry->d_parent;
567 return dentry;
568}
569
563static int __choose_mds(struct ceph_mds_client *mdsc, 570static int __choose_mds(struct ceph_mds_client *mdsc,
564 struct ceph_mds_request *req) 571 struct ceph_mds_request *req)
565{ 572{
@@ -590,14 +597,29 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
590 if (req->r_inode) { 597 if (req->r_inode) {
591 inode = req->r_inode; 598 inode = req->r_inode;
592 } else if (req->r_dentry) { 599 } else if (req->r_dentry) {
593 if (req->r_dentry->d_inode) { 600 struct inode *dir = req->r_dentry->d_parent->d_inode;
601
602 if (dir->i_sb != mdsc->client->sb) {
603 /* not this fs! */
604 inode = req->r_dentry->d_inode;
605 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
606 /* direct snapped/virtual snapdir requests
607 * based on parent dir inode */
608 struct dentry *dn =
609 get_nonsnap_parent(req->r_dentry->d_parent);
610 inode = dn->d_inode;
611 dout("__choose_mds using nonsnap parent %p\n", inode);
612 } else if (req->r_dentry->d_inode) {
613 /* dentry target */
594 inode = req->r_dentry->d_inode; 614 inode = req->r_dentry->d_inode;
595 } else { 615 } else {
596 inode = req->r_dentry->d_parent->d_inode; 616 /* dir + name */
617 inode = dir;
597 hash = req->r_dentry->d_name.hash; 618 hash = req->r_dentry->d_name.hash;
598 is_hash = true; 619 is_hash = true;
599 } 620 }
600 } 621 }
622
601 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash, 623 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
602 (int)hash, mode); 624 (int)hash, mode);
603 if (!inode) 625 if (!inode)
@@ -2208,7 +2230,7 @@ static void handle_session(struct ceph_mds_session *session,
2208 pr_info("mds%d reconnect denied\n", session->s_mds); 2230 pr_info("mds%d reconnect denied\n", session->s_mds);
2209 remove_session_caps(session); 2231 remove_session_caps(session);
2210 wake = 1; /* for good measure */ 2232 wake = 1; /* for good measure */
2211 complete_all(&mdsc->session_close_waiters); 2233 wake_up_all(&mdsc->session_close_wq);
2212 kick_requests(mdsc, mds); 2234 kick_requests(mdsc, mds);
2213 break; 2235 break;
2214 2236
@@ -2302,7 +2324,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2302 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0); 2324 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2303 if (IS_ERR(path)) { 2325 if (IS_ERR(path)) {
2304 err = PTR_ERR(path); 2326 err = PTR_ERR(path);
2305 BUG_ON(err); 2327 goto out_dput;
2306 } 2328 }
2307 } else { 2329 } else {
2308 path = NULL; 2330 path = NULL;
@@ -2310,7 +2332,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2310 } 2332 }
2311 err = ceph_pagelist_encode_string(pagelist, path, pathlen); 2333 err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2312 if (err) 2334 if (err)
2313 goto out; 2335 goto out_free;
2314 2336
2315 spin_lock(&inode->i_lock); 2337 spin_lock(&inode->i_lock);
2316 cap->seq = 0; /* reset cap seq */ 2338 cap->seq = 0; /* reset cap seq */
@@ -2352,10 +2374,13 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2352 num_fcntl_locks, 2374 num_fcntl_locks,
2353 num_flock_locks); 2375 num_flock_locks);
2354 unlock_kernel(); 2376 unlock_kernel();
2377 } else {
2378 err = ceph_pagelist_append(pagelist, &rec, reclen);
2355 } 2379 }
2356 2380
2357out: 2381out_free:
2358 kfree(path); 2382 kfree(path);
2383out_dput:
2359 dput(dentry); 2384 dput(dentry);
2360 return err; 2385 return err;
2361} 2386}
@@ -2876,7 +2901,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2876 return -ENOMEM; 2901 return -ENOMEM;
2877 2902
2878 init_completion(&mdsc->safe_umount_waiters); 2903 init_completion(&mdsc->safe_umount_waiters);
2879 init_completion(&mdsc->session_close_waiters); 2904 init_waitqueue_head(&mdsc->session_close_wq);
2880 INIT_LIST_HEAD(&mdsc->waiting_for_map); 2905 INIT_LIST_HEAD(&mdsc->waiting_for_map);
2881 mdsc->sessions = NULL; 2906 mdsc->sessions = NULL;
2882 mdsc->max_sessions = 0; 2907 mdsc->max_sessions = 0;
@@ -3021,6 +3046,23 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3021 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); 3046 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3022} 3047}
3023 3048
3049/*
3050 * true if all sessions are closed, or we force unmount
3051 */
3052bool done_closing_sessions(struct ceph_mds_client *mdsc)
3053{
3054 int i, n = 0;
3055
3056 if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
3057 return true;
3058
3059 mutex_lock(&mdsc->mutex);
3060 for (i = 0; i < mdsc->max_sessions; i++)
3061 if (mdsc->sessions[i])
3062 n++;
3063 mutex_unlock(&mdsc->mutex);
3064 return n == 0;
3065}
3024 3066
3025/* 3067/*
3026 * called after sb is ro. 3068 * called after sb is ro.
@@ -3029,45 +3071,32 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3029{ 3071{
3030 struct ceph_mds_session *session; 3072 struct ceph_mds_session *session;
3031 int i; 3073 int i;
3032 int n;
3033 struct ceph_client *client = mdsc->client; 3074 struct ceph_client *client = mdsc->client;
3034 unsigned long started, timeout = client->mount_args->mount_timeout * HZ; 3075 unsigned long timeout = client->mount_args->mount_timeout * HZ;
3035 3076
3036 dout("close_sessions\n"); 3077 dout("close_sessions\n");
3037 3078
3038 mutex_lock(&mdsc->mutex);
3039
3040 /* close sessions */ 3079 /* close sessions */
3041 started = jiffies; 3080 mutex_lock(&mdsc->mutex);
3042 while (time_before(jiffies, started + timeout)) { 3081 for (i = 0; i < mdsc->max_sessions; i++) {
3043 dout("closing sessions\n"); 3082 session = __ceph_lookup_mds_session(mdsc, i);
3044 n = 0; 3083 if (!session)
3045 for (i = 0; i < mdsc->max_sessions; i++) { 3084 continue;
3046 session = __ceph_lookup_mds_session(mdsc, i);
3047 if (!session)
3048 continue;
3049 mutex_unlock(&mdsc->mutex);
3050 mutex_lock(&session->s_mutex);
3051 __close_session(mdsc, session);
3052 mutex_unlock(&session->s_mutex);
3053 ceph_put_mds_session(session);
3054 mutex_lock(&mdsc->mutex);
3055 n++;
3056 }
3057 if (n == 0)
3058 break;
3059
3060 if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
3061 break;
3062
3063 dout("waiting for sessions to close\n");
3064 mutex_unlock(&mdsc->mutex); 3085 mutex_unlock(&mdsc->mutex);
3065 wait_for_completion_timeout(&mdsc->session_close_waiters, 3086 mutex_lock(&session->s_mutex);
3066 timeout); 3087 __close_session(mdsc, session);
3088 mutex_unlock(&session->s_mutex);
3089 ceph_put_mds_session(session);
3067 mutex_lock(&mdsc->mutex); 3090 mutex_lock(&mdsc->mutex);
3068 } 3091 }
3092 mutex_unlock(&mdsc->mutex);
3093
3094 dout("waiting for sessions to close\n");
3095 wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3096 timeout);
3069 3097
3070 /* tear down remaining sessions */ 3098 /* tear down remaining sessions */
3099 mutex_lock(&mdsc->mutex);
3071 for (i = 0; i < mdsc->max_sessions; i++) { 3100 for (i = 0; i < mdsc->max_sessions; i++) {
3072 if (mdsc->sessions[i]) { 3101 if (mdsc->sessions[i]) {
3073 session = get_session(mdsc->sessions[i]); 3102 session = get_session(mdsc->sessions[i]);
@@ -3080,9 +3109,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3080 mutex_lock(&mdsc->mutex); 3109 mutex_lock(&mdsc->mutex);
3081 } 3110 }
3082 } 3111 }
3083
3084 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 3112 WARN_ON(!list_empty(&mdsc->cap_delay_list));
3085
3086 mutex_unlock(&mdsc->mutex); 3113 mutex_unlock(&mdsc->mutex);
3087 3114
3088 ceph_cleanup_empty_realms(mdsc); 3115 ceph_cleanup_empty_realms(mdsc);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index ab7e89f5e344..c98267ce6d2a 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -234,7 +234,8 @@ struct ceph_mds_client {
234 struct mutex mutex; /* all nested structures */ 234 struct mutex mutex; /* all nested structures */
235 235
236 struct ceph_mdsmap *mdsmap; 236 struct ceph_mdsmap *mdsmap;
237 struct completion safe_umount_waiters, session_close_waiters; 237 struct completion safe_umount_waiters;
238 wait_queue_head_t session_close_wq;
238 struct list_head waiting_for_map; 239 struct list_head waiting_for_map;
239 240
240 struct ceph_mds_session **sessions; /* NULL for mds if no session */ 241 struct ceph_mds_session **sessions; /* NULL for mds if no session */
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index bed6391e52c7..dfced1dacbcd 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -661,7 +661,7 @@ static int __send_request(struct ceph_osd_client *osdc,
661 reqhead->reassert_version = req->r_reassert_version; 661 reqhead->reassert_version = req->r_reassert_version;
662 662
663 req->r_stamp = jiffies; 663 req->r_stamp = jiffies;
664 list_move_tail(&osdc->req_lru, &req->r_req_lru_item); 664 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
665 665
666 ceph_msg_get(req->r_request); /* send consumes a ref */ 666 ceph_msg_get(req->r_request); /* send consumes a ref */
667 ceph_con_send(&req->r_osd->o_con, req->r_request); 667 ceph_con_send(&req->r_osd->o_con, req->r_request);
diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c
index b6859f47d364..46a368b6dce5 100644
--- a/fs/ceph/pagelist.c
+++ b/fs/ceph/pagelist.c
@@ -5,10 +5,18 @@
5 5
6#include "pagelist.h" 6#include "pagelist.h"
7 7
8static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
9{
10 struct page *page = list_entry(pl->head.prev, struct page,
11 lru);
12 kunmap(page);
13}
14
8int ceph_pagelist_release(struct ceph_pagelist *pl) 15int ceph_pagelist_release(struct ceph_pagelist *pl)
9{ 16{
10 if (pl->mapped_tail) 17 if (pl->mapped_tail)
11 kunmap(pl->mapped_tail); 18 ceph_pagelist_unmap_tail(pl);
19
12 while (!list_empty(&pl->head)) { 20 while (!list_empty(&pl->head)) {
13 struct page *page = list_first_entry(&pl->head, struct page, 21 struct page *page = list_first_entry(&pl->head, struct page,
14 lru); 22 lru);
@@ -26,7 +34,7 @@ static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
26 pl->room += PAGE_SIZE; 34 pl->room += PAGE_SIZE;
27 list_add_tail(&page->lru, &pl->head); 35 list_add_tail(&page->lru, &pl->head);
28 if (pl->mapped_tail) 36 if (pl->mapped_tail)
29 kunmap(pl->mapped_tail); 37 ceph_pagelist_unmap_tail(pl);
30 pl->mapped_tail = kmap(page); 38 pl->mapped_tail = kmap(page);
31 return 0; 39 return 0;
32} 40}
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index c0b26b6badba..190b6c4a6f2b 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -119,6 +119,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
119 INIT_LIST_HEAD(&realm->children); 119 INIT_LIST_HEAD(&realm->children);
120 INIT_LIST_HEAD(&realm->child_item); 120 INIT_LIST_HEAD(&realm->child_item);
121 INIT_LIST_HEAD(&realm->empty_item); 121 INIT_LIST_HEAD(&realm->empty_item);
122 INIT_LIST_HEAD(&realm->dirty_item);
122 INIT_LIST_HEAD(&realm->inodes_with_caps); 123 INIT_LIST_HEAD(&realm->inodes_with_caps);
123 spin_lock_init(&realm->inodes_with_caps_lock); 124 spin_lock_init(&realm->inodes_with_caps_lock);
124 __insert_snap_realm(&mdsc->snap_realms, realm); 125 __insert_snap_realm(&mdsc->snap_realms, realm);
@@ -435,7 +436,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
435{ 436{
436 struct inode *inode = &ci->vfs_inode; 437 struct inode *inode = &ci->vfs_inode;
437 struct ceph_cap_snap *capsnap; 438 struct ceph_cap_snap *capsnap;
438 int used; 439 int used, dirty;
439 440
440 capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS); 441 capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
441 if (!capsnap) { 442 if (!capsnap) {
@@ -445,6 +446,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
445 446
446 spin_lock(&inode->i_lock); 447 spin_lock(&inode->i_lock);
447 used = __ceph_caps_used(ci); 448 used = __ceph_caps_used(ci);
449 dirty = __ceph_caps_dirty(ci);
448 if (__ceph_have_pending_cap_snap(ci)) { 450 if (__ceph_have_pending_cap_snap(ci)) {
449 /* there is no point in queuing multiple "pending" cap_snaps, 451 /* there is no point in queuing multiple "pending" cap_snaps,
450 as no new writes are allowed to start when pending, so any 452 as no new writes are allowed to start when pending, so any
@@ -452,27 +454,37 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
452 cap_snap. lucky us. */ 454 cap_snap. lucky us. */
453 dout("queue_cap_snap %p already pending\n", inode); 455 dout("queue_cap_snap %p already pending\n", inode);
454 kfree(capsnap); 456 kfree(capsnap);
455 } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) { 457 } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR) ||
458 (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
459 CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
456 struct ceph_snap_context *snapc = ci->i_head_snapc; 460 struct ceph_snap_context *snapc = ci->i_head_snapc;
457 461
462 dout("queue_cap_snap %p cap_snap %p queuing under %p\n", inode,
463 capsnap, snapc);
458 igrab(inode); 464 igrab(inode);
459 465
460 atomic_set(&capsnap->nref, 1); 466 atomic_set(&capsnap->nref, 1);
461 capsnap->ci = ci; 467 capsnap->ci = ci;
462 INIT_LIST_HEAD(&capsnap->ci_item); 468 INIT_LIST_HEAD(&capsnap->ci_item);
463 INIT_LIST_HEAD(&capsnap->flushing_item); 469 INIT_LIST_HEAD(&capsnap->flushing_item);
464 470
465 capsnap->follows = snapc->seq - 1; 471 capsnap->follows = snapc->seq;
466 capsnap->issued = __ceph_caps_issued(ci, NULL); 472 capsnap->issued = __ceph_caps_issued(ci, NULL);
467 capsnap->dirty = __ceph_caps_dirty(ci); 473 capsnap->dirty = dirty;
468 474
469 capsnap->mode = inode->i_mode; 475 capsnap->mode = inode->i_mode;
470 capsnap->uid = inode->i_uid; 476 capsnap->uid = inode->i_uid;
471 capsnap->gid = inode->i_gid; 477 capsnap->gid = inode->i_gid;
472 478
473 /* fixme? */ 479 if (dirty & CEPH_CAP_XATTR_EXCL) {
474 capsnap->xattr_blob = NULL; 480 __ceph_build_xattrs_blob(ci);
475 capsnap->xattr_len = 0; 481 capsnap->xattr_blob =
482 ceph_buffer_get(ci->i_xattrs.blob);
483 capsnap->xattr_version = ci->i_xattrs.version;
484 } else {
485 capsnap->xattr_blob = NULL;
486 capsnap->xattr_version = 0;
487 }
476 488
477 /* dirty page count moved from _head to this cap_snap; 489 /* dirty page count moved from _head to this cap_snap;
478 all subsequent writes page dirties occur _after_ this 490 all subsequent writes page dirties occur _after_ this
@@ -480,7 +492,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
480 capsnap->dirty_pages = ci->i_wrbuffer_ref_head; 492 capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
481 ci->i_wrbuffer_ref_head = 0; 493 ci->i_wrbuffer_ref_head = 0;
482 capsnap->context = snapc; 494 capsnap->context = snapc;
483 ci->i_head_snapc = NULL; 495 ci->i_head_snapc =
496 ceph_get_snap_context(ci->i_snap_realm->cached_context);
497 dout(" new snapc is %p\n", ci->i_head_snapc);
484 list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); 498 list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
485 499
486 if (used & CEPH_CAP_FILE_WR) { 500 if (used & CEPH_CAP_FILE_WR) {
@@ -539,6 +553,41 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
539 return 1; /* caller may want to ceph_flush_snaps */ 553 return 1; /* caller may want to ceph_flush_snaps */
540} 554}
541 555
556/*
557 * Queue cap_snaps for snap writeback for this realm and its children.
558 * Called under snap_rwsem, so realm topology won't change.
559 */
560static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
561{
562 struct ceph_inode_info *ci;
563 struct inode *lastinode = NULL;
564 struct ceph_snap_realm *child;
565
566 dout("queue_realm_cap_snaps %p %llx inodes\n", realm, realm->ino);
567
568 spin_lock(&realm->inodes_with_caps_lock);
569 list_for_each_entry(ci, &realm->inodes_with_caps,
570 i_snap_realm_item) {
571 struct inode *inode = igrab(&ci->vfs_inode);
572 if (!inode)
573 continue;
574 spin_unlock(&realm->inodes_with_caps_lock);
575 if (lastinode)
576 iput(lastinode);
577 lastinode = inode;
578 ceph_queue_cap_snap(ci);
579 spin_lock(&realm->inodes_with_caps_lock);
580 }
581 spin_unlock(&realm->inodes_with_caps_lock);
582 if (lastinode)
583 iput(lastinode);
584
585 dout("queue_realm_cap_snaps %p %llx children\n", realm, realm->ino);
586 list_for_each_entry(child, &realm->children, child_item)
587 queue_realm_cap_snaps(child);
588
589 dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
590}
542 591
543/* 592/*
544 * Parse and apply a snapblob "snap trace" from the MDS. This specifies 593 * Parse and apply a snapblob "snap trace" from the MDS. This specifies
@@ -556,6 +605,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
556 struct ceph_snap_realm *realm; 605 struct ceph_snap_realm *realm;
557 int invalidate = 0; 606 int invalidate = 0;
558 int err = -ENOMEM; 607 int err = -ENOMEM;
608 LIST_HEAD(dirty_realms);
559 609
560 dout("update_snap_trace deletion=%d\n", deletion); 610 dout("update_snap_trace deletion=%d\n", deletion);
561more: 611more:
@@ -578,45 +628,6 @@ more:
578 } 628 }
579 } 629 }
580 630
581 if (le64_to_cpu(ri->seq) > realm->seq) {
582 dout("update_snap_trace updating %llx %p %lld -> %lld\n",
583 realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
584 /*
585 * if the realm seq has changed, queue a cap_snap for every
586 * inode with open caps. we do this _before_ we update
587 * the realm info so that we prepare for writeback under the
588 * _previous_ snap context.
589 *
590 * ...unless it's a snap deletion!
591 */
592 if (!deletion) {
593 struct ceph_inode_info *ci;
594 struct inode *lastinode = NULL;
595
596 spin_lock(&realm->inodes_with_caps_lock);
597 list_for_each_entry(ci, &realm->inodes_with_caps,
598 i_snap_realm_item) {
599 struct inode *inode = igrab(&ci->vfs_inode);
600 if (!inode)
601 continue;
602 spin_unlock(&realm->inodes_with_caps_lock);
603 if (lastinode)
604 iput(lastinode);
605 lastinode = inode;
606 ceph_queue_cap_snap(ci);
607 spin_lock(&realm->inodes_with_caps_lock);
608 }
609 spin_unlock(&realm->inodes_with_caps_lock);
610 if (lastinode)
611 iput(lastinode);
612 dout("update_snap_trace cap_snaps queued\n");
613 }
614
615 } else {
616 dout("update_snap_trace %llx %p seq %lld unchanged\n",
617 realm->ino, realm, realm->seq);
618 }
619
620 /* ensure the parent is correct */ 631 /* ensure the parent is correct */
621 err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent)); 632 err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
622 if (err < 0) 633 if (err < 0)
@@ -624,6 +635,8 @@ more:
624 invalidate += err; 635 invalidate += err;
625 636
626 if (le64_to_cpu(ri->seq) > realm->seq) { 637 if (le64_to_cpu(ri->seq) > realm->seq) {
638 dout("update_snap_trace updating %llx %p %lld -> %lld\n",
639 realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
627 /* update realm parameters, snap lists */ 640 /* update realm parameters, snap lists */
628 realm->seq = le64_to_cpu(ri->seq); 641 realm->seq = le64_to_cpu(ri->seq);
629 realm->created = le64_to_cpu(ri->created); 642 realm->created = le64_to_cpu(ri->created);
@@ -641,9 +654,17 @@ more:
641 if (err < 0) 654 if (err < 0)
642 goto fail; 655 goto fail;
643 656
657 /* queue realm for cap_snap creation */
658 list_add(&realm->dirty_item, &dirty_realms);
659
644 invalidate = 1; 660 invalidate = 1;
645 } else if (!realm->cached_context) { 661 } else if (!realm->cached_context) {
662 dout("update_snap_trace %llx %p seq %lld new\n",
663 realm->ino, realm, realm->seq);
646 invalidate = 1; 664 invalidate = 1;
665 } else {
666 dout("update_snap_trace %llx %p seq %lld unchanged\n",
667 realm->ino, realm, realm->seq);
647 } 668 }
648 669
649 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, 670 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
@@ -656,6 +677,14 @@ more:
656 if (invalidate) 677 if (invalidate)
657 rebuild_snap_realms(realm); 678 rebuild_snap_realms(realm);
658 679
680 /*
681 * queue cap snaps _after_ we've built the new snap contexts,
682 * so that i_head_snapc can be set appropriately.
683 */
684 list_for_each_entry(realm, &dirty_realms, dirty_item) {
685 queue_realm_cap_snaps(realm);
686 }
687
659 __cleanup_empty_realms(mdsc); 688 __cleanup_empty_realms(mdsc);
660 return 0; 689 return 0;
661 690
@@ -688,7 +717,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
688 igrab(inode); 717 igrab(inode);
689 spin_unlock(&mdsc->snap_flush_lock); 718 spin_unlock(&mdsc->snap_flush_lock);
690 spin_lock(&inode->i_lock); 719 spin_lock(&inode->i_lock);
691 __ceph_flush_snaps(ci, &session); 720 __ceph_flush_snaps(ci, &session, 0);
692 spin_unlock(&inode->i_lock); 721 spin_unlock(&inode->i_lock);
693 iput(inode); 722 iput(inode);
694 spin_lock(&mdsc->snap_flush_lock); 723 spin_lock(&mdsc->snap_flush_lock);
@@ -789,6 +818,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
789 }; 818 };
790 struct inode *inode = ceph_find_inode(sb, vino); 819 struct inode *inode = ceph_find_inode(sb, vino);
791 struct ceph_inode_info *ci; 820 struct ceph_inode_info *ci;
821 struct ceph_snap_realm *oldrealm;
792 822
793 if (!inode) 823 if (!inode)
794 continue; 824 continue;
@@ -814,18 +844,19 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
814 dout(" will move %p to split realm %llx %p\n", 844 dout(" will move %p to split realm %llx %p\n",
815 inode, realm->ino, realm); 845 inode, realm->ino, realm);
816 /* 846 /*
817 * Remove the inode from the realm's inode 847 * Move the inode to the new realm
818 * list, but don't add it to the new realm
819 * yet. We don't want the cap_snap to be
820 * queued (again) by ceph_update_snap_trace()
821 * below. Queue it _now_, under the old context.
822 */ 848 */
823 spin_lock(&realm->inodes_with_caps_lock); 849 spin_lock(&realm->inodes_with_caps_lock);
824 list_del_init(&ci->i_snap_realm_item); 850 list_del_init(&ci->i_snap_realm_item);
851 list_add(&ci->i_snap_realm_item,
852 &realm->inodes_with_caps);
853 oldrealm = ci->i_snap_realm;
854 ci->i_snap_realm = realm;
825 spin_unlock(&realm->inodes_with_caps_lock); 855 spin_unlock(&realm->inodes_with_caps_lock);
826 spin_unlock(&inode->i_lock); 856 spin_unlock(&inode->i_lock);
827 857
828 ceph_queue_cap_snap(ci); 858 ceph_get_snap_realm(mdsc, realm);
859 ceph_put_snap_realm(mdsc, oldrealm);
829 860
830 iput(inode); 861 iput(inode);
831 continue; 862 continue;
@@ -853,43 +884,9 @@ skip_inode:
853 ceph_update_snap_trace(mdsc, p, e, 884 ceph_update_snap_trace(mdsc, p, e,
854 op == CEPH_SNAP_OP_DESTROY); 885 op == CEPH_SNAP_OP_DESTROY);
855 886
856 if (op == CEPH_SNAP_OP_SPLIT) { 887 if (op == CEPH_SNAP_OP_SPLIT)
857 /*
858 * ok, _now_ add the inodes into the new realm.
859 */
860 for (i = 0; i < num_split_inos; i++) {
861 struct ceph_vino vino = {
862 .ino = le64_to_cpu(split_inos[i]),
863 .snap = CEPH_NOSNAP,
864 };
865 struct inode *inode = ceph_find_inode(sb, vino);
866 struct ceph_inode_info *ci;
867
868 if (!inode)
869 continue;
870 ci = ceph_inode(inode);
871 spin_lock(&inode->i_lock);
872 if (list_empty(&ci->i_snap_realm_item)) {
873 struct ceph_snap_realm *oldrealm =
874 ci->i_snap_realm;
875
876 dout(" moving %p to split realm %llx %p\n",
877 inode, realm->ino, realm);
878 spin_lock(&realm->inodes_with_caps_lock);
879 list_add(&ci->i_snap_realm_item,
880 &realm->inodes_with_caps);
881 ci->i_snap_realm = realm;
882 spin_unlock(&realm->inodes_with_caps_lock);
883 ceph_get_snap_realm(mdsc, realm);
884 ceph_put_snap_realm(mdsc, oldrealm);
885 }
886 spin_unlock(&inode->i_lock);
887 iput(inode);
888 }
889
890 /* we took a reference when we created the realm, above */ 888 /* we took a reference when we created the realm, above */
891 ceph_put_snap_realm(mdsc, realm); 889 ceph_put_snap_realm(mdsc, realm);
892 }
893 890
894 __cleanup_empty_realms(mdsc); 891 __cleanup_empty_realms(mdsc);
895 892
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 2482d696f0de..b87638e84c4b 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -216,8 +216,7 @@ struct ceph_cap_snap {
216 uid_t uid; 216 uid_t uid;
217 gid_t gid; 217 gid_t gid;
218 218
219 void *xattr_blob; 219 struct ceph_buffer *xattr_blob;
220 int xattr_len;
221 u64 xattr_version; 220 u64 xattr_version;
222 221
223 u64 size; 222 u64 size;
@@ -229,8 +228,11 @@ struct ceph_cap_snap {
229 228
230static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) 229static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
231{ 230{
232 if (atomic_dec_and_test(&capsnap->nref)) 231 if (atomic_dec_and_test(&capsnap->nref)) {
232 if (capsnap->xattr_blob)
233 ceph_buffer_put(capsnap->xattr_blob);
233 kfree(capsnap); 234 kfree(capsnap);
235 }
234} 236}
235 237
236/* 238/*
@@ -342,7 +344,8 @@ struct ceph_inode_info {
342 unsigned i_cap_exporting_issued; 344 unsigned i_cap_exporting_issued;
343 struct ceph_cap_reservation i_cap_migration_resv; 345 struct ceph_cap_reservation i_cap_migration_resv;
344 struct list_head i_cap_snaps; /* snapped state pending flush to mds */ 346 struct list_head i_cap_snaps; /* snapped state pending flush to mds */
345 struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 */ 347 struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or
348 dirty|flushing caps */
346 unsigned i_snap_caps; /* cap bits for snapped files */ 349 unsigned i_snap_caps; /* cap bits for snapped files */
347 350
348 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ 351 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
@@ -687,6 +690,8 @@ struct ceph_snap_realm {
687 690
688 struct list_head empty_item; /* if i have ref==0 */ 691 struct list_head empty_item; /* if i have ref==0 */
689 692
693 struct list_head dirty_item; /* if realm needs new context */
694
690 /* the current set of snaps for this realm */ 695 /* the current set of snaps for this realm */
691 struct ceph_snap_context *cached_context; 696 struct ceph_snap_context *cached_context;
692 697
@@ -823,7 +828,8 @@ extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
823extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, 828extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
824 struct ceph_snap_context *snapc); 829 struct ceph_snap_context *snapc);
825extern void __ceph_flush_snaps(struct ceph_inode_info *ci, 830extern void __ceph_flush_snaps(struct ceph_inode_info *ci,
826 struct ceph_mds_session **psession); 831 struct ceph_mds_session **psession,
832 int again);
827extern void ceph_check_caps(struct ceph_inode_info *ci, int flags, 833extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
828 struct ceph_mds_session *session); 834 struct ceph_mds_session *session);
829extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc); 835extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 097a2654c00f..9578af610b73 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -485,6 +485,7 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
485 ci->i_xattrs.blob = ci->i_xattrs.prealloc_blob; 485 ci->i_xattrs.blob = ci->i_xattrs.prealloc_blob;
486 ci->i_xattrs.prealloc_blob = NULL; 486 ci->i_xattrs.prealloc_blob = NULL;
487 ci->i_xattrs.dirty = false; 487 ci->i_xattrs.dirty = false;
488 ci->i_xattrs.version++;
488 } 489 }
489} 490}
490 491