aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2
diff options
context:
space:
mode:
authorSrinivas Eeda <srinivas.eeda@oracle.com>2009-06-03 20:02:55 -0400
committerJoel Becker <joel.becker@oracle.com>2009-06-03 22:14:31 -0400
commit83273932fbefb6ceef9c0b82ac4d23900728f4d9 (patch)
tree7ece8471af3d2a22543542e990369aca47aa3d25 /fs/ocfs2
parentedd45c08499a3e9d4c25431cd2b6a9ce5f692c92 (diff)
ocfs2: timer to queue scan of all orphan slots
When a dentry is unlinked, the unlinking node takes an EX on the dentry lock before moving the dentry to the orphan directory. Other nodes that have this dentry in cache have a PR on the same dentry lock. When the EX is requested, the other nodes flag the corresponding inode as MAYBE_ORPHANED during downconvert. The inode is finally deleted when the last node to iput the inode sees that i_nlink==0 and the MAYBE_ORPHANED flag is set. A problem arises if a node is forced to free dentry locks because of memory pressure. If this happens, the node will no longer get downconvert notifications for the dentries that have been unlinked on another node. If it also happens that node is actively using the corresponding inode and happens to be the one performing the last iput on that inode, it will fail to delete the inode as it will not have the MAYBE_ORPHANED flag set. This patch fixes this shortcoming by introducing a periodic scan of the orphan directories to delete such inodes. Care has been taken to distribute the workload across the cluster so that no one node has to perform the task all the time. Signed-off-by: Srinivas Eeda <srinivas.eeda@oracle.com> Signed-off-by: Joel Becker <joel.becker@oracle.com>
Diffstat (limited to 'fs/ocfs2')
-rw-r--r--fs/ocfs2/dlmglue.c51
-rw-r--r--fs/ocfs2/dlmglue.h11
-rw-r--r--fs/ocfs2/journal.c107
-rw-r--r--fs/ocfs2/journal.h4
-rw-r--r--fs/ocfs2/ocfs2.h10
-rw-r--r--fs/ocfs2/ocfs2_lockid.h5
-rw-r--r--fs/ocfs2/super.c9
7 files changed, 197 insertions, 0 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index e15fc7d50827..6cdeaa76f27f 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -248,6 +248,10 @@ static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
248 .flags = 0, 248 .flags = 0,
249}; 249};
250 250
251static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
252 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
253};
254
251static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 255static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
252 .get_osb = ocfs2_get_dentry_osb, 256 .get_osb = ocfs2_get_dentry_osb,
253 .post_unlock = ocfs2_dentry_post_unlock, 257 .post_unlock = ocfs2_dentry_post_unlock,
@@ -637,6 +641,19 @@ static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
637 &ocfs2_nfs_sync_lops, osb); 641 &ocfs2_nfs_sync_lops, osb);
638} 642}
639 643
644static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
645 struct ocfs2_super *osb)
646{
647 struct ocfs2_orphan_scan_lvb *lvb;
648
649 ocfs2_lock_res_init_once(res);
650 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
651 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
652 &ocfs2_orphan_scan_lops, osb);
653 lvb = ocfs2_dlm_lvb(&res->l_lksb);
654 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
655}
656
640void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 657void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
641 struct ocfs2_file_private *fp) 658 struct ocfs2_file_private *fp)
642{ 659{
@@ -2352,6 +2369,37 @@ void ocfs2_inode_unlock(struct inode *inode,
2352 mlog_exit_void(); 2369 mlog_exit_void();
2353} 2370}
2354 2371
2372int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno, int ex)
2373{
2374 struct ocfs2_lock_res *lockres;
2375 struct ocfs2_orphan_scan_lvb *lvb;
2376 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2377 int status = 0;
2378
2379 lockres = &osb->osb_orphan_scan.os_lockres;
2380 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2381 if (status < 0)
2382 return status;
2383
2384 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2385 if (lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2386 *seqno = be32_to_cpu(lvb->lvb_os_seqno);
2387 return status;
2388}
2389
2390void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno, int ex)
2391{
2392 struct ocfs2_lock_res *lockres;
2393 struct ocfs2_orphan_scan_lvb *lvb;
2394 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2395
2396 lockres = &osb->osb_orphan_scan.os_lockres;
2397 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2398 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2399 lvb->lvb_os_seqno = cpu_to_be32(seqno);
2400 ocfs2_cluster_unlock(osb, lockres, level);
2401}
2402
2355int ocfs2_super_lock(struct ocfs2_super *osb, 2403int ocfs2_super_lock(struct ocfs2_super *osb,
2356 int ex) 2404 int ex)
2357{ 2405{
@@ -2842,6 +2890,7 @@ local:
2842 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2890 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2843 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2891 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2844 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 2892 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
2893 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
2845 2894
2846 osb->cconn = conn; 2895 osb->cconn = conn;
2847 2896
@@ -2878,6 +2927,7 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
2878 ocfs2_lock_res_free(&osb->osb_super_lockres); 2927 ocfs2_lock_res_free(&osb->osb_super_lockres);
2879 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2928 ocfs2_lock_res_free(&osb->osb_rename_lockres);
2880 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 2929 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
2930 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
2881 2931
2882 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 2932 ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
2883 osb->cconn = NULL; 2933 osb->cconn = NULL;
@@ -3061,6 +3111,7 @@ static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3061 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3111 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3062 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3112 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3063 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3113 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3114 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3064} 3115}
3065 3116
3066int ocfs2_drop_inode_locks(struct inode *inode) 3117int ocfs2_drop_inode_locks(struct inode *inode)
diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
index e1fd5721cd7f..31b90d7b8f51 100644
--- a/fs/ocfs2/dlmglue.h
+++ b/fs/ocfs2/dlmglue.h
@@ -62,6 +62,14 @@ struct ocfs2_qinfo_lvb {
62 __be32 lvb_free_entry; 62 __be32 lvb_free_entry;
63}; 63};
64 64
65#define OCFS2_ORPHAN_LVB_VERSION 1
66
67struct ocfs2_orphan_scan_lvb {
68 __u8 lvb_version;
69 __u8 lvb_reserved[3];
70 __be32 lvb_os_seqno;
71};
72
65/* ocfs2_inode_lock_full() 'arg_flags' flags */ 73/* ocfs2_inode_lock_full() 'arg_flags' flags */
66/* don't wait on recovery. */ 74/* don't wait on recovery. */
67#define OCFS2_META_LOCK_RECOVERY (0x01) 75#define OCFS2_META_LOCK_RECOVERY (0x01)
@@ -113,6 +121,9 @@ int ocfs2_super_lock(struct ocfs2_super *osb,
113 int ex); 121 int ex);
114void ocfs2_super_unlock(struct ocfs2_super *osb, 122void ocfs2_super_unlock(struct ocfs2_super *osb,
115 int ex); 123 int ex);
124int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno, int ex);
125void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno, int ex);
126
116int ocfs2_rename_lock(struct ocfs2_super *osb); 127int ocfs2_rename_lock(struct ocfs2_super *osb);
117void ocfs2_rename_unlock(struct ocfs2_super *osb); 128void ocfs2_rename_unlock(struct ocfs2_super *osb);
118int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex); 129int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex);
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index a20a0f1e37fd..44ed768782ed 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -28,6 +28,8 @@
28#include <linux/slab.h> 28#include <linux/slab.h>
29#include <linux/highmem.h> 29#include <linux/highmem.h>
30#include <linux/kthread.h> 30#include <linux/kthread.h>
31#include <linux/time.h>
32#include <linux/random.h>
31 33
32#define MLOG_MASK_PREFIX ML_JOURNAL 34#define MLOG_MASK_PREFIX ML_JOURNAL
33#include <cluster/masklog.h> 35#include <cluster/masklog.h>
@@ -52,6 +54,8 @@
52 54
53DEFINE_SPINLOCK(trans_inc_lock); 55DEFINE_SPINLOCK(trans_inc_lock);
54 56
57#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
58
55static int ocfs2_force_read_journal(struct inode *inode); 59static int ocfs2_force_read_journal(struct inode *inode);
56static int ocfs2_recover_node(struct ocfs2_super *osb, 60static int ocfs2_recover_node(struct ocfs2_super *osb,
57 int node_num, int slot_num); 61 int node_num, int slot_num);
@@ -1841,6 +1845,109 @@ bail:
1841 return status; 1845 return status;
1842} 1846}
1843 1847
1848/*
1849 * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some
1850 * randomness to the timeout to minimize multple nodes firing the timer at the
1851 * same time.
1852 */
1853static inline unsigned long ocfs2_orphan_scan_timeout(void)
1854{
1855 unsigned long time;
1856
1857 get_random_bytes(&time, sizeof(time));
1858 time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000);
1859 return msecs_to_jiffies(time);
1860}
1861
1862/*
1863 * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for
1864 * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This
1865 * is done to catch any orphans that are left over in orphan directories.
1866 *
1867 * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT
1868 * seconds. It gets an EX lock on os_lockres and checks sequence number
1869 * stored in LVB. If the sequence number has changed, it means some other
1870 * node has done the scan. This node skips the scan and tracks the
1871 * sequence number. If the sequence number didn't change, it means a scan
1872 * hasn't happened. The node queues a scan and increments the
1873 * sequence number in the LVB.
1874 */
1875void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
1876{
1877 struct ocfs2_orphan_scan *os;
1878 int status, i;
1879 u32 seqno = 0;
1880
1881 os = &osb->osb_orphan_scan;
1882
1883 status = ocfs2_orphan_scan_lock(osb, &seqno, DLM_LOCK_EX);
1884 if (status < 0) {
1885 if (status != -EAGAIN)
1886 mlog_errno(status);
1887 goto out;
1888 }
1889
1890 if (os->os_seqno != seqno) {
1891 os->os_seqno = seqno;
1892 goto unlock;
1893 }
1894
1895 for (i = 0; i < osb->max_slots; i++)
1896 ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL,
1897 NULL);
1898 /*
1899 * We queued a recovery on orphan slots, increment the sequence
1900 * number and update LVB so other node will skip the scan for a while
1901 */
1902 seqno++;
1903unlock:
1904 ocfs2_orphan_scan_unlock(osb, seqno, DLM_LOCK_EX);
1905out:
1906 return;
1907}
1908
1909/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */
1910void ocfs2_orphan_scan_work(struct work_struct *work)
1911{
1912 struct ocfs2_orphan_scan *os;
1913 struct ocfs2_super *osb;
1914
1915 os = container_of(work, struct ocfs2_orphan_scan,
1916 os_orphan_scan_work.work);
1917 osb = os->os_osb;
1918
1919 mutex_lock(&os->os_lock);
1920 ocfs2_queue_orphan_scan(osb);
1921 schedule_delayed_work(&os->os_orphan_scan_work,
1922 ocfs2_orphan_scan_timeout());
1923 mutex_unlock(&os->os_lock);
1924}
1925
1926void ocfs2_orphan_scan_stop(struct ocfs2_super *osb)
1927{
1928 struct ocfs2_orphan_scan *os;
1929
1930 os = &osb->osb_orphan_scan;
1931 mutex_lock(&os->os_lock);
1932 cancel_delayed_work(&os->os_orphan_scan_work);
1933 mutex_unlock(&os->os_lock);
1934}
1935
1936int ocfs2_orphan_scan_init(struct ocfs2_super *osb)
1937{
1938 struct ocfs2_orphan_scan *os;
1939
1940 os = &osb->osb_orphan_scan;
1941 os->os_osb = osb;
1942 mutex_init(&os->os_lock);
1943
1944 INIT_DELAYED_WORK(&os->os_orphan_scan_work,
1945 ocfs2_orphan_scan_work);
1946 schedule_delayed_work(&os->os_orphan_scan_work,
1947 ocfs2_orphan_scan_timeout());
1948 return 0;
1949}
1950
1844struct ocfs2_orphan_filldir_priv { 1951struct ocfs2_orphan_filldir_priv {
1845 struct inode *head; 1952 struct inode *head;
1846 struct ocfs2_super *osb; 1953 struct ocfs2_super *osb;
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index eb7b76331eb7..61045eeb3f6e 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -144,6 +144,10 @@ static inline void ocfs2_inode_set_new(struct ocfs2_super *osb,
144} 144}
145 145
146/* Exported only for the journal struct init code in super.c. Do not call. */ 146/* Exported only for the journal struct init code in super.c. Do not call. */
147int ocfs2_orphan_scan_init(struct ocfs2_super *osb);
148void ocfs2_orphan_scan_stop(struct ocfs2_super *osb);
149void ocfs2_orphan_scan_exit(struct ocfs2_super *osb);
150
147void ocfs2_complete_recovery(struct work_struct *work); 151void ocfs2_complete_recovery(struct work_struct *work);
148void ocfs2_wait_for_recovery(struct ocfs2_super *osb); 152void ocfs2_wait_for_recovery(struct ocfs2_super *osb);
149 153
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 1386281950db..1fde52c96d25 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -151,6 +151,14 @@ struct ocfs2_lock_res {
151#endif 151#endif
152}; 152};
153 153
154struct ocfs2_orphan_scan {
155 struct mutex os_lock;
156 struct ocfs2_super *os_osb;
157 struct ocfs2_lock_res os_lockres; /* lock to synchronize scans */
158 struct delayed_work os_orphan_scan_work;
159 u32 os_seqno; /* incremented on every scan */
160};
161
154struct ocfs2_dlm_debug { 162struct ocfs2_dlm_debug {
155 struct kref d_refcnt; 163 struct kref d_refcnt;
156 struct dentry *d_locking_state; 164 struct dentry *d_locking_state;
@@ -341,6 +349,8 @@ struct ocfs2_super
341 unsigned int *osb_orphan_wipes; 349 unsigned int *osb_orphan_wipes;
342 wait_queue_head_t osb_wipe_event; 350 wait_queue_head_t osb_wipe_event;
343 351
352 struct ocfs2_orphan_scan osb_orphan_scan;
353
344 /* used to protect metaecc calculation check of xattr. */ 354 /* used to protect metaecc calculation check of xattr. */
345 spinlock_t osb_xattr_lock; 355 spinlock_t osb_xattr_lock;
346 356
diff --git a/fs/ocfs2/ocfs2_lockid.h b/fs/ocfs2/ocfs2_lockid.h
index a53ce87481bf..fcdba091af3d 100644
--- a/fs/ocfs2/ocfs2_lockid.h
+++ b/fs/ocfs2/ocfs2_lockid.h
@@ -48,6 +48,7 @@ enum ocfs2_lock_type {
48 OCFS2_LOCK_TYPE_FLOCK, 48 OCFS2_LOCK_TYPE_FLOCK,
49 OCFS2_LOCK_TYPE_QINFO, 49 OCFS2_LOCK_TYPE_QINFO,
50 OCFS2_LOCK_TYPE_NFS_SYNC, 50 OCFS2_LOCK_TYPE_NFS_SYNC,
51 OCFS2_LOCK_TYPE_ORPHAN_SCAN,
51 OCFS2_NUM_LOCK_TYPES 52 OCFS2_NUM_LOCK_TYPES
52}; 53};
53 54
@@ -85,6 +86,9 @@ static inline char ocfs2_lock_type_char(enum ocfs2_lock_type type)
85 case OCFS2_LOCK_TYPE_NFS_SYNC: 86 case OCFS2_LOCK_TYPE_NFS_SYNC:
86 c = 'Y'; 87 c = 'Y';
87 break; 88 break;
89 case OCFS2_LOCK_TYPE_ORPHAN_SCAN:
90 c = 'P';
91 break;
88 default: 92 default:
89 c = '\0'; 93 c = '\0';
90 } 94 }
@@ -104,6 +108,7 @@ static char *ocfs2_lock_type_strings[] = {
104 [OCFS2_LOCK_TYPE_OPEN] = "Open", 108 [OCFS2_LOCK_TYPE_OPEN] = "Open",
105 [OCFS2_LOCK_TYPE_FLOCK] = "Flock", 109 [OCFS2_LOCK_TYPE_FLOCK] = "Flock",
106 [OCFS2_LOCK_TYPE_QINFO] = "Quota", 110 [OCFS2_LOCK_TYPE_QINFO] = "Quota",
111 [OCFS2_LOCK_TYPE_ORPHAN_SCAN] = "OrphanScan",
107}; 112};
108 113
109static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type) 114static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index 79ff8d9d37e0..44ac27e2d1f5 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -1802,6 +1802,8 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
1802 1802
1803 ocfs2_truncate_log_shutdown(osb); 1803 ocfs2_truncate_log_shutdown(osb);
1804 1804
1805 ocfs2_orphan_scan_stop(osb);
1806
1805 /* This will disable recovery and flush any recovery work. */ 1807 /* This will disable recovery and flush any recovery work. */
1806 ocfs2_recovery_exit(osb); 1808 ocfs2_recovery_exit(osb);
1807 1809
@@ -1957,6 +1959,13 @@ static int ocfs2_initialize_super(struct super_block *sb,
1957 goto bail; 1959 goto bail;
1958 } 1960 }
1959 1961
1962 status = ocfs2_orphan_scan_init(osb);
1963 if (status) {
1964 mlog(ML_ERROR, "Unable to initialize delayed orphan scan\n");
1965 mlog_errno(status);
1966 goto bail;
1967 }
1968
1960 init_waitqueue_head(&osb->checkpoint_event); 1969 init_waitqueue_head(&osb->checkpoint_event);
1961 atomic_set(&osb->needs_checkpoint, 0); 1970 atomic_set(&osb->needs_checkpoint, 0);
1962 1971