aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/ocfs2/dlmglue.c51
-rw-r--r--fs/ocfs2/dlmglue.h11
-rw-r--r--fs/ocfs2/journal.c107
-rw-r--r--fs/ocfs2/journal.h4
-rw-r--r--fs/ocfs2/ocfs2.h10
-rw-r--r--fs/ocfs2/ocfs2_lockid.h5
-rw-r--r--fs/ocfs2/super.c9
7 files changed, 197 insertions, 0 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index e15fc7d50827..6cdeaa76f27f 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -248,6 +248,10 @@ static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
248 .flags = 0, 248 .flags = 0,
249}; 249};
250 250
251static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
252 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
253};
254
251static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 255static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
252 .get_osb = ocfs2_get_dentry_osb, 256 .get_osb = ocfs2_get_dentry_osb,
253 .post_unlock = ocfs2_dentry_post_unlock, 257 .post_unlock = ocfs2_dentry_post_unlock,
@@ -637,6 +641,19 @@ static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
637 &ocfs2_nfs_sync_lops, osb); 641 &ocfs2_nfs_sync_lops, osb);
638} 642}
639 643
644static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
645 struct ocfs2_super *osb)
646{
647 struct ocfs2_orphan_scan_lvb *lvb;
648
649 ocfs2_lock_res_init_once(res);
650 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
651 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
652 &ocfs2_orphan_scan_lops, osb);
653 lvb = ocfs2_dlm_lvb(&res->l_lksb);
654 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
655}
656
640void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 657void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
641 struct ocfs2_file_private *fp) 658 struct ocfs2_file_private *fp)
642{ 659{
@@ -2352,6 +2369,37 @@ void ocfs2_inode_unlock(struct inode *inode,
2352 mlog_exit_void(); 2369 mlog_exit_void();
2353} 2370}
2354 2371
2372int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno, int ex)
2373{
2374 struct ocfs2_lock_res *lockres;
2375 struct ocfs2_orphan_scan_lvb *lvb;
2376 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2377 int status = 0;
2378
2379 lockres = &osb->osb_orphan_scan.os_lockres;
2380 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2381 if (status < 0)
2382 return status;
2383
2384 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2385 if (lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2386 *seqno = be32_to_cpu(lvb->lvb_os_seqno);
2387 return status;
2388}
2389
2390void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno, int ex)
2391{
2392 struct ocfs2_lock_res *lockres;
2393 struct ocfs2_orphan_scan_lvb *lvb;
2394 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2395
2396 lockres = &osb->osb_orphan_scan.os_lockres;
2397 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2398 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2399 lvb->lvb_os_seqno = cpu_to_be32(seqno);
2400 ocfs2_cluster_unlock(osb, lockres, level);
2401}
2402
2355int ocfs2_super_lock(struct ocfs2_super *osb, 2403int ocfs2_super_lock(struct ocfs2_super *osb,
2356 int ex) 2404 int ex)
2357{ 2405{
@@ -2842,6 +2890,7 @@ local:
2842 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2890 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2843 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2891 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2844 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 2892 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
2893 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
2845 2894
2846 osb->cconn = conn; 2895 osb->cconn = conn;
2847 2896
@@ -2878,6 +2927,7 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
2878 ocfs2_lock_res_free(&osb->osb_super_lockres); 2927 ocfs2_lock_res_free(&osb->osb_super_lockres);
2879 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2928 ocfs2_lock_res_free(&osb->osb_rename_lockres);
2880 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 2929 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
2930 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
2881 2931
2882 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 2932 ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
2883 osb->cconn = NULL; 2933 osb->cconn = NULL;
@@ -3061,6 +3111,7 @@ static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3061 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3111 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3062 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3112 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3063 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3113 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3114 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3064} 3115}
3065 3116
3066int ocfs2_drop_inode_locks(struct inode *inode) 3117int ocfs2_drop_inode_locks(struct inode *inode)
diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
index e1fd5721cd7f..31b90d7b8f51 100644
--- a/fs/ocfs2/dlmglue.h
+++ b/fs/ocfs2/dlmglue.h
@@ -62,6 +62,14 @@ struct ocfs2_qinfo_lvb {
62 __be32 lvb_free_entry; 62 __be32 lvb_free_entry;
63}; 63};
64 64
65#define OCFS2_ORPHAN_LVB_VERSION 1
66
67struct ocfs2_orphan_scan_lvb {
68 __u8 lvb_version;
69 __u8 lvb_reserved[3];
70 __be32 lvb_os_seqno;
71};
72
65/* ocfs2_inode_lock_full() 'arg_flags' flags */ 73/* ocfs2_inode_lock_full() 'arg_flags' flags */
66/* don't wait on recovery. */ 74/* don't wait on recovery. */
67#define OCFS2_META_LOCK_RECOVERY (0x01) 75#define OCFS2_META_LOCK_RECOVERY (0x01)
@@ -113,6 +121,9 @@ int ocfs2_super_lock(struct ocfs2_super *osb,
113 int ex); 121 int ex);
114void ocfs2_super_unlock(struct ocfs2_super *osb, 122void ocfs2_super_unlock(struct ocfs2_super *osb,
115 int ex); 123 int ex);
124int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno, int ex);
125void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno, int ex);
126
116int ocfs2_rename_lock(struct ocfs2_super *osb); 127int ocfs2_rename_lock(struct ocfs2_super *osb);
117void ocfs2_rename_unlock(struct ocfs2_super *osb); 128void ocfs2_rename_unlock(struct ocfs2_super *osb);
118int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex); 129int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex);
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index a20a0f1e37fd..44ed768782ed 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -28,6 +28,8 @@
28#include <linux/slab.h> 28#include <linux/slab.h>
29#include <linux/highmem.h> 29#include <linux/highmem.h>
30#include <linux/kthread.h> 30#include <linux/kthread.h>
31#include <linux/time.h>
32#include <linux/random.h>
31 33
32#define MLOG_MASK_PREFIX ML_JOURNAL 34#define MLOG_MASK_PREFIX ML_JOURNAL
33#include <cluster/masklog.h> 35#include <cluster/masklog.h>
@@ -52,6 +54,8 @@
52 54
53DEFINE_SPINLOCK(trans_inc_lock); 55DEFINE_SPINLOCK(trans_inc_lock);
54 56
57#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
58
55static int ocfs2_force_read_journal(struct inode *inode); 59static int ocfs2_force_read_journal(struct inode *inode);
56static int ocfs2_recover_node(struct ocfs2_super *osb, 60static int ocfs2_recover_node(struct ocfs2_super *osb,
57 int node_num, int slot_num); 61 int node_num, int slot_num);
@@ -1841,6 +1845,109 @@ bail:
1841 return status; 1845 return status;
1842} 1846}
1843 1847
1848/*
1849 * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some
1850 * randomness to the timeout to minimize multple nodes firing the timer at the
1851 * same time.
1852 */
1853static inline unsigned long ocfs2_orphan_scan_timeout(void)
1854{
1855 unsigned long time;
1856
1857 get_random_bytes(&time, sizeof(time));
1858 time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000);
1859 return msecs_to_jiffies(time);
1860}
1861
1862/*
1863 * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for
1864 * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This
1865 * is done to catch any orphans that are left over in orphan directories.
1866 *
1867 * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT
1868 * seconds. It gets an EX lock on os_lockres and checks sequence number
1869 * stored in LVB. If the sequence number has changed, it means some other
1870 * node has done the scan. This node skips the scan and tracks the
1871 * sequence number. If the sequence number didn't change, it means a scan
1872 * hasn't happened. The node queues a scan and increments the
1873 * sequence number in the LVB.
1874 */
1875void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
1876{
1877 struct ocfs2_orphan_scan *os;
1878 int status, i;
1879 u32 seqno = 0;
1880
1881 os = &osb->osb_orphan_scan;
1882
1883 status = ocfs2_orphan_scan_lock(osb, &seqno, DLM_LOCK_EX);
1884 if (status < 0) {
1885 if (status != -EAGAIN)
1886 mlog_errno(status);
1887 goto out;
1888 }
1889
1890 if (os->os_seqno != seqno) {
1891 os->os_seqno = seqno;
1892 goto unlock;
1893 }
1894
1895 for (i = 0; i < osb->max_slots; i++)
1896 ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL,
1897 NULL);
1898 /*
1899 * We queued a recovery on orphan slots, increment the sequence
1900 * number and update LVB so other node will skip the scan for a while
1901 */
1902 seqno++;
1903unlock:
1904 ocfs2_orphan_scan_unlock(osb, seqno, DLM_LOCK_EX);
1905out:
1906 return;
1907}
1908
1909/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */
1910void ocfs2_orphan_scan_work(struct work_struct *work)
1911{
1912 struct ocfs2_orphan_scan *os;
1913 struct ocfs2_super *osb;
1914
1915 os = container_of(work, struct ocfs2_orphan_scan,
1916 os_orphan_scan_work.work);
1917 osb = os->os_osb;
1918
1919 mutex_lock(&os->os_lock);
1920 ocfs2_queue_orphan_scan(osb);
1921 schedule_delayed_work(&os->os_orphan_scan_work,
1922 ocfs2_orphan_scan_timeout());
1923 mutex_unlock(&os->os_lock);
1924}
1925
1926void ocfs2_orphan_scan_stop(struct ocfs2_super *osb)
1927{
1928 struct ocfs2_orphan_scan *os;
1929
1930 os = &osb->osb_orphan_scan;
1931 mutex_lock(&os->os_lock);
1932 cancel_delayed_work(&os->os_orphan_scan_work);
1933 mutex_unlock(&os->os_lock);
1934}
1935
1936int ocfs2_orphan_scan_init(struct ocfs2_super *osb)
1937{
1938 struct ocfs2_orphan_scan *os;
1939
1940 os = &osb->osb_orphan_scan;
1941 os->os_osb = osb;
1942 mutex_init(&os->os_lock);
1943
1944 INIT_DELAYED_WORK(&os->os_orphan_scan_work,
1945 ocfs2_orphan_scan_work);
1946 schedule_delayed_work(&os->os_orphan_scan_work,
1947 ocfs2_orphan_scan_timeout());
1948 return 0;
1949}
1950
1844struct ocfs2_orphan_filldir_priv { 1951struct ocfs2_orphan_filldir_priv {
1845 struct inode *head; 1952 struct inode *head;
1846 struct ocfs2_super *osb; 1953 struct ocfs2_super *osb;
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index eb7b76331eb7..61045eeb3f6e 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -144,6 +144,10 @@ static inline void ocfs2_inode_set_new(struct ocfs2_super *osb,
144} 144}
145 145
146/* Exported only for the journal struct init code in super.c. Do not call. */ 146/* Exported only for the journal struct init code in super.c. Do not call. */
147int ocfs2_orphan_scan_init(struct ocfs2_super *osb);
148void ocfs2_orphan_scan_stop(struct ocfs2_super *osb);
149void ocfs2_orphan_scan_exit(struct ocfs2_super *osb);
150
147void ocfs2_complete_recovery(struct work_struct *work); 151void ocfs2_complete_recovery(struct work_struct *work);
148void ocfs2_wait_for_recovery(struct ocfs2_super *osb); 152void ocfs2_wait_for_recovery(struct ocfs2_super *osb);
149 153
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 1386281950db..1fde52c96d25 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -151,6 +151,14 @@ struct ocfs2_lock_res {
151#endif 151#endif
152}; 152};
153 153
154struct ocfs2_orphan_scan {
155 struct mutex os_lock;
156 struct ocfs2_super *os_osb;
157 struct ocfs2_lock_res os_lockres; /* lock to synchronize scans */
158 struct delayed_work os_orphan_scan_work;
159 u32 os_seqno; /* incremented on every scan */
160};
161
154struct ocfs2_dlm_debug { 162struct ocfs2_dlm_debug {
155 struct kref d_refcnt; 163 struct kref d_refcnt;
156 struct dentry *d_locking_state; 164 struct dentry *d_locking_state;
@@ -341,6 +349,8 @@ struct ocfs2_super
341 unsigned int *osb_orphan_wipes; 349 unsigned int *osb_orphan_wipes;
342 wait_queue_head_t osb_wipe_event; 350 wait_queue_head_t osb_wipe_event;
343 351
352 struct ocfs2_orphan_scan osb_orphan_scan;
353
344 /* used to protect metaecc calculation check of xattr. */ 354 /* used to protect metaecc calculation check of xattr. */
345 spinlock_t osb_xattr_lock; 355 spinlock_t osb_xattr_lock;
346 356
diff --git a/fs/ocfs2/ocfs2_lockid.h b/fs/ocfs2/ocfs2_lockid.h
index a53ce87481bf..fcdba091af3d 100644
--- a/fs/ocfs2/ocfs2_lockid.h
+++ b/fs/ocfs2/ocfs2_lockid.h
@@ -48,6 +48,7 @@ enum ocfs2_lock_type {
48 OCFS2_LOCK_TYPE_FLOCK, 48 OCFS2_LOCK_TYPE_FLOCK,
49 OCFS2_LOCK_TYPE_QINFO, 49 OCFS2_LOCK_TYPE_QINFO,
50 OCFS2_LOCK_TYPE_NFS_SYNC, 50 OCFS2_LOCK_TYPE_NFS_SYNC,
51 OCFS2_LOCK_TYPE_ORPHAN_SCAN,
51 OCFS2_NUM_LOCK_TYPES 52 OCFS2_NUM_LOCK_TYPES
52}; 53};
53 54
@@ -85,6 +86,9 @@ static inline char ocfs2_lock_type_char(enum ocfs2_lock_type type)
85 case OCFS2_LOCK_TYPE_NFS_SYNC: 86 case OCFS2_LOCK_TYPE_NFS_SYNC:
86 c = 'Y'; 87 c = 'Y';
87 break; 88 break;
89 case OCFS2_LOCK_TYPE_ORPHAN_SCAN:
90 c = 'P';
91 break;
88 default: 92 default:
89 c = '\0'; 93 c = '\0';
90 } 94 }
@@ -104,6 +108,7 @@ static char *ocfs2_lock_type_strings[] = {
104 [OCFS2_LOCK_TYPE_OPEN] = "Open", 108 [OCFS2_LOCK_TYPE_OPEN] = "Open",
105 [OCFS2_LOCK_TYPE_FLOCK] = "Flock", 109 [OCFS2_LOCK_TYPE_FLOCK] = "Flock",
106 [OCFS2_LOCK_TYPE_QINFO] = "Quota", 110 [OCFS2_LOCK_TYPE_QINFO] = "Quota",
111 [OCFS2_LOCK_TYPE_ORPHAN_SCAN] = "OrphanScan",
107}; 112};
108 113
109static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type) 114static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index 79ff8d9d37e0..44ac27e2d1f5 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -1802,6 +1802,8 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
1802 1802
1803 ocfs2_truncate_log_shutdown(osb); 1803 ocfs2_truncate_log_shutdown(osb);
1804 1804
1805 ocfs2_orphan_scan_stop(osb);
1806
1805 /* This will disable recovery and flush any recovery work. */ 1807 /* This will disable recovery and flush any recovery work. */
1806 ocfs2_recovery_exit(osb); 1808 ocfs2_recovery_exit(osb);
1807 1809
@@ -1957,6 +1959,13 @@ static int ocfs2_initialize_super(struct super_block *sb,
1957 goto bail; 1959 goto bail;
1958 } 1960 }
1959 1961
1962 status = ocfs2_orphan_scan_init(osb);
1963 if (status) {
1964 mlog(ML_ERROR, "Unable to initialize delayed orphan scan\n");
1965 mlog_errno(status);
1966 goto bail;
1967 }
1968
1960 init_waitqueue_head(&osb->checkpoint_event); 1969 init_waitqueue_head(&osb->checkpoint_event);
1961 atomic_set(&osb->needs_checkpoint, 0); 1970 atomic_set(&osb->needs_checkpoint, 0);
1962 1971