summaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorYan, Zheng <zyan@redhat.com>2019-01-14 04:21:19 -0500
committerIlya Dryomov <idryomov@gmail.com>2019-03-05 12:55:16 -0500
commite3ec8d6898f71636a067dae683174ef9bf81bc96 (patch)
treece14ef6392b06d84016f35e96e140dce9dcc12a4 /fs
parent08796873a5183bfaab52a3bd899fe82f9e64be94 (diff)
ceph: send cap releases more aggressively
When pending cap releases fill up one message, start a work to send cap release message. (old way is sending cap releases every 5 seconds) Signed-off-by: "Yan, Zheng" <zyan@redhat.com> Reviewed-by: Jeff Layton <jlayton@redhat.com> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/caps.c29
-rw-r--r--fs/ceph/inode.c2
-rw-r--r--fs/ceph/mds_client.c61
-rw-r--r--fs/ceph/mds_client.h10
-rw-r--r--fs/ceph/super.c9
-rw-r--r--fs/ceph/super.h6
6 files changed, 83 insertions, 34 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 0eaf1b48c431..da5b56e14cc7 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1081,9 +1081,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
1081 (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { 1081 (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
1082 cap->queue_release = 1; 1082 cap->queue_release = 1;
1083 if (removed) { 1083 if (removed) {
1084 list_add_tail(&cap->session_caps, 1084 __ceph_queue_cap_release(session, cap);
1085 &session->s_cap_releases);
1086 session->s_num_cap_releases++;
1087 removed = 0; 1085 removed = 0;
1088 } 1086 }
1089 } else { 1087 } else {
@@ -1245,7 +1243,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
1245 * Queue cap releases when an inode is dropped from our cache. Since 1243 * Queue cap releases when an inode is dropped from our cache. Since
1246 * inode is about to be destroyed, there is no need for i_ceph_lock. 1244 * inode is about to be destroyed, there is no need for i_ceph_lock.
1247 */ 1245 */
1248void ceph_queue_caps_release(struct inode *inode) 1246void __ceph_remove_caps(struct inode *inode)
1249{ 1247{
1250 struct ceph_inode_info *ci = ceph_inode(inode); 1248 struct ceph_inode_info *ci = ceph_inode(inode);
1251 struct rb_node *p; 1249 struct rb_node *p;
@@ -3886,12 +3884,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3886 cap->seq = seq; 3884 cap->seq = seq;
3887 cap->issue_seq = seq; 3885 cap->issue_seq = seq;
3888 spin_lock(&session->s_cap_lock); 3886 spin_lock(&session->s_cap_lock);
3889 list_add_tail(&cap->session_caps, 3887 __ceph_queue_cap_release(session, cap);
3890 &session->s_cap_releases);
3891 session->s_num_cap_releases++;
3892 spin_unlock(&session->s_cap_lock); 3888 spin_unlock(&session->s_cap_lock);
3893 } 3889 }
3894 goto flush_cap_releases; 3890 goto done;
3895 } 3891 }
3896 3892
3897 /* these will work even if we don't have a cap yet */ 3893 /* these will work even if we don't have a cap yet */
@@ -3961,7 +3957,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3961 ceph_cap_op_name(op)); 3957 ceph_cap_op_name(op));
3962 } 3958 }
3963 3959
3964 goto done; 3960done:
3961 mutex_unlock(&session->s_mutex);
3962done_unlocked:
3963 iput(inode);
3964 ceph_put_string(extra_info.pool_ns);
3965 return;
3965 3966
3966flush_cap_releases: 3967flush_cap_releases:
3967 /* 3968 /*
@@ -3969,14 +3970,8 @@ flush_cap_releases:
3969 * along for the mds (who clearly thinks we still have this 3970 * along for the mds (who clearly thinks we still have this
3970 * cap). 3971 * cap).
3971 */ 3972 */
3972 ceph_send_cap_releases(mdsc, session); 3973 ceph_flush_cap_releases(mdsc, session);
3973 3974 goto done;
3974done:
3975 mutex_unlock(&session->s_mutex);
3976done_unlocked:
3977 iput(inode);
3978 ceph_put_string(extra_info.pool_ns);
3979 return;
3980 3975
3981bad: 3976bad:
3982 pr_err("ceph_handle_caps: corrupt message\n"); 3977 pr_err("ceph_handle_caps: corrupt message\n");
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e6012de58aae..f588b2d7b598 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -537,7 +537,7 @@ void ceph_destroy_inode(struct inode *inode)
537 537
538 ceph_fscache_unregister_inode_cookie(ci); 538 ceph_fscache_unregister_inode_cookie(ci);
539 539
540 ceph_queue_caps_release(inode); 540 __ceph_remove_caps(inode);
541 541
542 if (__ceph_has_any_quota(ci)) 542 if (__ceph_has_any_quota(ci))
543 ceph_adjust_quota_realms_count(inode, false); 543 ceph_adjust_quota_realms_count(inode, false);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index ddfb6a45575b..c9d4561336fc 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -57,6 +57,7 @@ struct ceph_reconnect_state {
57 57
58static void __wake_requests(struct ceph_mds_client *mdsc, 58static void __wake_requests(struct ceph_mds_client *mdsc,
59 struct list_head *head); 59 struct list_head *head);
60static void ceph_cap_release_work(struct work_struct *work);
60 61
61static const struct ceph_connection_operations mds_con_ops; 62static const struct ceph_connection_operations mds_con_ops;
62 63
@@ -636,6 +637,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
636 s->s_cap_reconnect = 0; 637 s->s_cap_reconnect = 0;
637 s->s_cap_iterator = NULL; 638 s->s_cap_iterator = NULL;
638 INIT_LIST_HEAD(&s->s_cap_releases); 639 INIT_LIST_HEAD(&s->s_cap_releases);
640 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
641
639 INIT_LIST_HEAD(&s->s_cap_flushing); 642 INIT_LIST_HEAD(&s->s_cap_flushing);
640 643
641 mdsc->sessions[mds] = s; 644 mdsc->sessions[mds] = s;
@@ -661,6 +664,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
661 dout("__unregister_session mds%d %p\n", s->s_mds, s); 664 dout("__unregister_session mds%d %p\n", s->s_mds, s);
662 BUG_ON(mdsc->sessions[s->s_mds] != s); 665 BUG_ON(mdsc->sessions[s->s_mds] != s);
663 mdsc->sessions[s->s_mds] = NULL; 666 mdsc->sessions[s->s_mds] = NULL;
667 s->s_state = 0;
664 ceph_con_close(&s->s_con); 668 ceph_con_close(&s->s_con);
665 ceph_put_mds_session(s); 669 ceph_put_mds_session(s);
666 atomic_dec(&mdsc->num_sessions); 670 atomic_dec(&mdsc->num_sessions);
@@ -1323,13 +1327,10 @@ static int iterate_session_caps(struct ceph_mds_session *session,
1323 cap->session = NULL; 1327 cap->session = NULL;
1324 list_del_init(&cap->session_caps); 1328 list_del_init(&cap->session_caps);
1325 session->s_nr_caps--; 1329 session->s_nr_caps--;
1326 if (cap->queue_release) { 1330 if (cap->queue_release)
1327 list_add_tail(&cap->session_caps, 1331 __ceph_queue_cap_release(session, cap);
1328 &session->s_cap_releases); 1332 else
1329 session->s_num_cap_releases++;
1330 } else {
1331 old_cap = cap; /* put_cap it w/o locks held */ 1333 old_cap = cap; /* put_cap it w/o locks held */
1332 }
1333 } 1334 }
1334 if (ret < 0) 1335 if (ret < 0)
1335 goto out; 1336 goto out;
@@ -1764,7 +1765,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
1764 session->s_trim_caps = 0; 1765 session->s_trim_caps = 0;
1765 } 1766 }
1766 1767
1767 ceph_send_cap_releases(mdsc, session); 1768 ceph_flush_cap_releases(mdsc, session);
1768 return 0; 1769 return 0;
1769} 1770}
1770 1771
@@ -1807,8 +1808,8 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc,
1807/* 1808/*
1808 * called under s_mutex 1809 * called under s_mutex
1809 */ 1810 */
1810void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1811static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1811 struct ceph_mds_session *session) 1812 struct ceph_mds_session *session)
1812{ 1813{
1813 struct ceph_msg *msg = NULL; 1814 struct ceph_msg *msg = NULL;
1814 struct ceph_mds_cap_release *head; 1815 struct ceph_mds_cap_release *head;
@@ -1900,6 +1901,48 @@ out_err:
1900 spin_unlock(&session->s_cap_lock); 1901 spin_unlock(&session->s_cap_lock);
1901} 1902}
1902 1903
1904static void ceph_cap_release_work(struct work_struct *work)
1905{
1906 struct ceph_mds_session *session =
1907 container_of(work, struct ceph_mds_session, s_cap_release_work);
1908
1909 mutex_lock(&session->s_mutex);
1910 if (session->s_state == CEPH_MDS_SESSION_OPEN ||
1911 session->s_state == CEPH_MDS_SESSION_HUNG)
1912 ceph_send_cap_releases(session->s_mdsc, session);
1913 mutex_unlock(&session->s_mutex);
1914 ceph_put_mds_session(session);
1915}
1916
1917void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
1918 struct ceph_mds_session *session)
1919{
1920 if (mdsc->stopping)
1921 return;
1922
1923 get_session(session);
1924 if (queue_work(mdsc->fsc->cap_wq,
1925 &session->s_cap_release_work)) {
1926 dout("cap release work queued\n");
1927 } else {
1928 ceph_put_mds_session(session);
1929 dout("failed to queue cap release work\n");
1930 }
1931}
1932
1933/*
1934 * caller holds session->s_cap_lock
1935 */
1936void __ceph_queue_cap_release(struct ceph_mds_session *session,
1937 struct ceph_cap *cap)
1938{
1939 list_add_tail(&cap->session_caps, &session->s_cap_releases);
1940 session->s_num_cap_releases++;
1941
1942 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
1943 ceph_flush_cap_releases(session->s_mdsc, session);
1944}
1945
1903/* 1946/*
1904 * requests 1947 * requests
1905 */ 1948 */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index af3b25e59e90..2147ecd0c9e5 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -172,12 +172,13 @@ struct ceph_mds_session {
172 /* protected by s_cap_lock */ 172 /* protected by s_cap_lock */
173 spinlock_t s_cap_lock; 173 spinlock_t s_cap_lock;
174 struct list_head s_caps; /* all caps issued by this session */ 174 struct list_head s_caps; /* all caps issued by this session */
175 struct ceph_cap *s_cap_iterator;
175 int s_nr_caps, s_trim_caps; 176 int s_nr_caps, s_trim_caps;
176 int s_num_cap_releases; 177 int s_num_cap_releases;
177 int s_cap_reconnect; 178 int s_cap_reconnect;
178 int s_readonly; 179 int s_readonly;
179 struct list_head s_cap_releases; /* waiting cap_release messages */ 180 struct list_head s_cap_releases; /* waiting cap_release messages */
180 struct ceph_cap *s_cap_iterator; 181 struct work_struct s_cap_release_work;
181 182
182 /* protected by mutex */ 183 /* protected by mutex */
183 struct list_head s_cap_flushing; /* inodes w/ flushing caps */ 184 struct list_head s_cap_flushing; /* inodes w/ flushing caps */
@@ -457,9 +458,10 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
457 kref_put(&req->r_kref, ceph_mdsc_release_request); 458 kref_put(&req->r_kref, ceph_mdsc_release_request);
458} 459}
459 460
460extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 461extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
461 struct ceph_mds_session *session); 462 struct ceph_cap *cap);
462 463extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
464 struct ceph_mds_session *session);
463extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc); 465extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
464 466
465extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, 467extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index da2cd8e89062..200836bcf542 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -671,6 +671,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
671 fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1); 671 fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
672 if (!fsc->trunc_wq) 672 if (!fsc->trunc_wq)
673 goto fail_pg_inv_wq; 673 goto fail_pg_inv_wq;
674 fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1);
675 if (!fsc->cap_wq)
676 goto fail_trunc_wq;
674 677
675 /* set up mempools */ 678 /* set up mempools */
676 err = -ENOMEM; 679 err = -ENOMEM;
@@ -678,13 +681,15 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
678 size = sizeof (struct page *) * (page_count ? page_count : 1); 681 size = sizeof (struct page *) * (page_count ? page_count : 1);
679 fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size); 682 fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
680 if (!fsc->wb_pagevec_pool) 683 if (!fsc->wb_pagevec_pool)
681 goto fail_trunc_wq; 684 goto fail_cap_wq;
682 685
683 /* caps */ 686 /* caps */
684 fsc->min_caps = fsopt->max_readdir; 687 fsc->min_caps = fsopt->max_readdir;
685 688
686 return fsc; 689 return fsc;
687 690
691fail_cap_wq:
692 destroy_workqueue(fsc->cap_wq);
688fail_trunc_wq: 693fail_trunc_wq:
689 destroy_workqueue(fsc->trunc_wq); 694 destroy_workqueue(fsc->trunc_wq);
690fail_pg_inv_wq: 695fail_pg_inv_wq:
@@ -706,6 +711,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc)
706 flush_workqueue(fsc->wb_wq); 711 flush_workqueue(fsc->wb_wq);
707 flush_workqueue(fsc->pg_inv_wq); 712 flush_workqueue(fsc->pg_inv_wq);
708 flush_workqueue(fsc->trunc_wq); 713 flush_workqueue(fsc->trunc_wq);
714 flush_workqueue(fsc->cap_wq);
709} 715}
710 716
711static void destroy_fs_client(struct ceph_fs_client *fsc) 717static void destroy_fs_client(struct ceph_fs_client *fsc)
@@ -715,6 +721,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
715 destroy_workqueue(fsc->wb_wq); 721 destroy_workqueue(fsc->wb_wq);
716 destroy_workqueue(fsc->pg_inv_wq); 722 destroy_workqueue(fsc->pg_inv_wq);
717 destroy_workqueue(fsc->trunc_wq); 723 destroy_workqueue(fsc->trunc_wq);
724 destroy_workqueue(fsc->cap_wq);
718 725
719 mempool_destroy(fsc->wb_pagevec_pool); 726 mempool_destroy(fsc->wb_pagevec_pool);
720 727
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index df44a7761472..c4a79eadc55a 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -107,10 +107,12 @@ struct ceph_fs_client {
107 107
108 /* writeback */ 108 /* writeback */
109 mempool_t *wb_pagevec_pool; 109 mempool_t *wb_pagevec_pool;
110 atomic_long_t writeback_count;
111
110 struct workqueue_struct *wb_wq; 112 struct workqueue_struct *wb_wq;
111 struct workqueue_struct *pg_inv_wq; 113 struct workqueue_struct *pg_inv_wq;
112 struct workqueue_struct *trunc_wq; 114 struct workqueue_struct *trunc_wq;
113 atomic_long_t writeback_count; 115 struct workqueue_struct *cap_wq;
114 116
115#ifdef CONFIG_DEBUG_FS 117#ifdef CONFIG_DEBUG_FS
116 struct dentry *debugfs_dentry_lru, *debugfs_caps; 118 struct dentry *debugfs_dentry_lru, *debugfs_caps;
@@ -988,11 +990,11 @@ extern void ceph_add_cap(struct inode *inode,
988 unsigned cap, unsigned seq, u64 realmino, int flags, 990 unsigned cap, unsigned seq, u64 realmino, int flags,
989 struct ceph_cap **new_cap); 991 struct ceph_cap **new_cap);
990extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); 992extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
993extern void __ceph_remove_caps(struct inode* inode);
991extern void ceph_put_cap(struct ceph_mds_client *mdsc, 994extern void ceph_put_cap(struct ceph_mds_client *mdsc,
992 struct ceph_cap *cap); 995 struct ceph_cap *cap);
993extern int ceph_is_any_caps(struct inode *inode); 996extern int ceph_is_any_caps(struct inode *inode);
994 997
995extern void ceph_queue_caps_release(struct inode *inode);
996extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); 998extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
997extern int ceph_fsync(struct file *file, loff_t start, loff_t end, 999extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
998 int datasync); 1000 int datasync);