aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJan Kara <jack@suse.cz>2010-03-31 10:25:37 -0400
committerJan Kara <jack@suse.cz>2010-05-21 13:30:47 -0400
commitfb8dd8d780140a3f0e9074831a59054fec6cc451 (patch)
tree09e9f7bf157784fc6b0a7df71c1fbfe711349055
parentae4f6ef13417deaa49471c0e903914a3ef3be258 (diff)
ocfs2: Fix quota locking
OCFS2 had three issues with quota locking: a) When reading dquot from global quota file, we started a transaction while holding dqio_mutex which is prone to deadlocks because other paths do it the other way around b) During ocfs2_sync_dquot we were not protected against concurrent writers on the same node. Because we first copy data to local buffer, a race could happen resulting in old data being written to global quota file and thus causing quota inconsistency after a crash. c) ip_alloc_sem of quota files was acquired while a transaction is started in ocfs2_quota_write which can deadlock because we first get ip_alloc_sem and then start a transaction when extending quota files. We fix the problem a) by pulling all necessary code to ocfs2_acquire_dquot and ocfs2_release_dquot. Thus we no longer depend on generic dquot_acquire to do the locking and can force proper lock ordering. Problems b) and c) are fixed by locking i_mutex and ip_alloc_sem of global quota file in ocfs2_lock_global_qf and removing ip_alloc_sem from ocfs2_quota_read and ocfs2_quota_write. Acked-by: Joel Becker <Joel.Becker@oracle.com> Signed-off-by: Jan Kara <jack@suse.cz>
-rw-r--r--fs/ocfs2/quota.h5
-rw-r--r--fs/ocfs2/quota_global.c307
-rw-r--r--fs/ocfs2/quota_local.c88
3 files changed, 214 insertions, 186 deletions
diff --git a/fs/ocfs2/quota.h b/fs/ocfs2/quota.h
index c7623430ca3c..903ffa933d53 100644
--- a/fs/ocfs2/quota.h
+++ b/fs/ocfs2/quota.h
@@ -104,10 +104,11 @@ static inline int ocfs2_global_release_dquot(struct dquot *dquot)
104 104
105int ocfs2_lock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex); 105int ocfs2_lock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex);
106void ocfs2_unlock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex); 106void ocfs2_unlock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex);
107int ocfs2_read_quota_block(struct inode *inode, u64 v_block, 107int ocfs2_validate_quota_block(struct super_block *sb, struct buffer_head *bh);
108 struct buffer_head **bh);
109int ocfs2_read_quota_phys_block(struct inode *inode, u64 p_block, 108int ocfs2_read_quota_phys_block(struct inode *inode, u64 p_block,
110 struct buffer_head **bh); 109 struct buffer_head **bh);
110int ocfs2_create_local_dquot(struct dquot *dquot);
111int ocfs2_local_release_dquot(handle_t *handle, struct dquot *dquot);
111 112
112extern const struct dquot_operations ocfs2_quota_operations; 113extern const struct dquot_operations ocfs2_quota_operations;
113extern struct quota_format_type ocfs2_quota_format; 114extern struct quota_format_type ocfs2_quota_format;
diff --git a/fs/ocfs2/quota_global.c b/fs/ocfs2/quota_global.c
index f391b11ea98c..1e3e0d5b3ae7 100644
--- a/fs/ocfs2/quota_global.c
+++ b/fs/ocfs2/quota_global.c
@@ -28,6 +28,41 @@
28#include "buffer_head_io.h" 28#include "buffer_head_io.h"
29#include "quota.h" 29#include "quota.h"
30 30
31/*
32 * Locking of quotas with OCFS2 is rather complex. Here are rules that
33 * should be obeyed by all the functions:
34 * - any write of quota structure (either to local or global file) is protected
35 * by dqio_mutex or dquot->dq_lock.
36 * - any modification of global quota file holds inode cluster lock, i_mutex,
37 * and ip_alloc_sem of the global quota file (achieved by
38 * ocfs2_lock_global_qf). It also has to hold qinfo_lock.
39 * - an allocation of new blocks for local quota file is protected by
40 * its ip_alloc_sem
41 *
42 * A rough sketch of locking dependencies (lf = local file, gf = global file):
43 * Normal filesystem operation:
44 * start_trans -> dqio_mutex -> write to lf
45 * Syncing of local and global file:
46 * ocfs2_lock_global_qf -> start_trans -> dqio_mutex -> qinfo_lock ->
47 * write to gf
48 * -> write to lf
49 * Acquire dquot for the first time:
50 * dq_lock -> ocfs2_lock_global_qf -> qinfo_lock -> read from gf
51 * -> alloc space for gf
52 * -> start_trans -> qinfo_lock -> write to gf
53 * -> ip_alloc_sem of lf -> alloc space for lf
54 * -> write to lf
55 * Release last reference to dquot:
56 * dq_lock -> ocfs2_lock_global_qf -> start_trans -> qinfo_lock -> write to gf
57 * -> write to lf
58 * Note that all the above operations also hold the inode cluster lock of lf.
59 * Recovery:
60 * inode cluster lock of recovered lf
61 * -> read bitmaps -> ip_alloc_sem of lf
62 * -> ocfs2_lock_global_qf -> start_trans -> dqio_mutex -> qinfo_lock ->
63 * write to gf
64 */
65
31static struct workqueue_struct *ocfs2_quota_wq = NULL; 66static struct workqueue_struct *ocfs2_quota_wq = NULL;
32 67
33static void qsync_work_fn(struct work_struct *work); 68static void qsync_work_fn(struct work_struct *work);
@@ -92,8 +127,7 @@ struct qtree_fmt_operations ocfs2_global_ops = {
92 .is_id = ocfs2_global_is_id, 127 .is_id = ocfs2_global_is_id,
93}; 128};
94 129
95static int ocfs2_validate_quota_block(struct super_block *sb, 130int ocfs2_validate_quota_block(struct super_block *sb, struct buffer_head *bh)
96 struct buffer_head *bh)
97{ 131{
98 struct ocfs2_disk_dqtrailer *dqt = 132 struct ocfs2_disk_dqtrailer *dqt =
99 ocfs2_block_dqtrailer(sb->s_blocksize, bh->b_data); 133 ocfs2_block_dqtrailer(sb->s_blocksize, bh->b_data);
@@ -111,33 +145,6 @@ static int ocfs2_validate_quota_block(struct super_block *sb,
111 return ocfs2_validate_meta_ecc(sb, bh->b_data, &dqt->dq_check); 145 return ocfs2_validate_meta_ecc(sb, bh->b_data, &dqt->dq_check);
112} 146}
113 147
114int ocfs2_read_quota_block(struct inode *inode, u64 v_block,
115 struct buffer_head **bh)
116{
117 int rc = 0;
118 struct buffer_head *tmp = *bh;
119
120 if (i_size_read(inode) >> inode->i_sb->s_blocksize_bits <= v_block) {
121 ocfs2_error(inode->i_sb,
122 "Quota file %llu is probably corrupted! Requested "
123 "to read block %Lu but file has size only %Lu\n",
124 (unsigned long long)OCFS2_I(inode)->ip_blkno,
125 (unsigned long long)v_block,
126 (unsigned long long)i_size_read(inode));
127 return -EIO;
128 }
129 rc = ocfs2_read_virt_blocks(inode, v_block, 1, &tmp, 0,
130 ocfs2_validate_quota_block);
131 if (rc)
132 mlog_errno(rc);
133
134 /* If ocfs2_read_virt_blocks() got us a new bh, pass it up. */
135 if (!rc && !*bh)
136 *bh = tmp;
137
138 return rc;
139}
140
141int ocfs2_read_quota_phys_block(struct inode *inode, u64 p_block, 148int ocfs2_read_quota_phys_block(struct inode *inode, u64 p_block,
142 struct buffer_head **bhp) 149 struct buffer_head **bhp)
143{ 150{
@@ -151,27 +158,6 @@ int ocfs2_read_quota_phys_block(struct inode *inode, u64 p_block,
151 return rc; 158 return rc;
152} 159}
153 160
154static int ocfs2_get_quota_block(struct inode *inode, int block,
155 struct buffer_head **bh)
156{
157 u64 pblock, pcount;
158 int err;
159
160 down_read(&OCFS2_I(inode)->ip_alloc_sem);
161 err = ocfs2_extent_map_get_blocks(inode, block, &pblock, &pcount, NULL);
162 up_read(&OCFS2_I(inode)->ip_alloc_sem);
163 if (err) {
164 mlog_errno(err);
165 return err;
166 }
167 *bh = sb_getblk(inode->i_sb, pblock);
168 if (!*bh) {
169 err = -EIO;
170 mlog_errno(err);
171 }
172 return err;
173}
174
175/* Read data from global quotafile - avoid pagecache and such because we cannot 161/* Read data from global quotafile - avoid pagecache and such because we cannot
176 * afford acquiring the locks... We use quota cluster lock to serialize 162 * afford acquiring the locks... We use quota cluster lock to serialize
177 * operations. Caller is responsible for acquiring it. */ 163 * operations. Caller is responsible for acquiring it. */
@@ -186,6 +172,7 @@ ssize_t ocfs2_quota_read(struct super_block *sb, int type, char *data,
186 int err = 0; 172 int err = 0;
187 struct buffer_head *bh; 173 struct buffer_head *bh;
188 size_t toread, tocopy; 174 size_t toread, tocopy;
175 u64 pblock = 0, pcount = 0;
189 176
190 if (off > i_size) 177 if (off > i_size)
191 return 0; 178 return 0;
@@ -194,8 +181,19 @@ ssize_t ocfs2_quota_read(struct super_block *sb, int type, char *data,
194 toread = len; 181 toread = len;
195 while (toread > 0) { 182 while (toread > 0) {
196 tocopy = min_t(size_t, (sb->s_blocksize - offset), toread); 183 tocopy = min_t(size_t, (sb->s_blocksize - offset), toread);
184 if (!pcount) {
185 err = ocfs2_extent_map_get_blocks(gqinode, blk, &pblock,
186 &pcount, NULL);
187 if (err) {
188 mlog_errno(err);
189 return err;
190 }
191 } else {
192 pcount--;
193 pblock++;
194 }
197 bh = NULL; 195 bh = NULL;
198 err = ocfs2_read_quota_block(gqinode, blk, &bh); 196 err = ocfs2_read_quota_phys_block(gqinode, pblock, &bh);
199 if (err) { 197 if (err) {
200 mlog_errno(err); 198 mlog_errno(err);
201 return err; 199 return err;
@@ -223,6 +221,7 @@ ssize_t ocfs2_quota_write(struct super_block *sb, int type,
223 int err = 0, new = 0, ja_type; 221 int err = 0, new = 0, ja_type;
224 struct buffer_head *bh = NULL; 222 struct buffer_head *bh = NULL;
225 handle_t *handle = journal_current_handle(); 223 handle_t *handle = journal_current_handle();
224 u64 pblock, pcount;
226 225
227 if (!handle) { 226 if (!handle) {
228 mlog(ML_ERROR, "Quota write (off=%llu, len=%llu) cancelled " 227 mlog(ML_ERROR, "Quota write (off=%llu, len=%llu) cancelled "
@@ -235,12 +234,11 @@ ssize_t ocfs2_quota_write(struct super_block *sb, int type,
235 len = sb->s_blocksize - OCFS2_QBLK_RESERVED_SPACE - offset; 234 len = sb->s_blocksize - OCFS2_QBLK_RESERVED_SPACE - offset;
236 } 235 }
237 236
238 mutex_lock_nested(&gqinode->i_mutex, I_MUTEX_QUOTA);
239 if (gqinode->i_size < off + len) { 237 if (gqinode->i_size < off + len) {
240 loff_t rounded_end = 238 loff_t rounded_end =
241 ocfs2_align_bytes_to_blocks(sb, off + len); 239 ocfs2_align_bytes_to_blocks(sb, off + len);
242 240
243 /* Space is already allocated in ocfs2_global_read_dquot() */ 241 /* Space is already allocated in ocfs2_acquire_dquot() */
244 err = ocfs2_simple_size_update(gqinode, 242 err = ocfs2_simple_size_update(gqinode,
245 oinfo->dqi_gqi_bh, 243 oinfo->dqi_gqi_bh,
246 rounded_end); 244 rounded_end);
@@ -248,13 +246,20 @@ ssize_t ocfs2_quota_write(struct super_block *sb, int type,
248 goto out; 246 goto out;
249 new = 1; 247 new = 1;
250 } 248 }
249 err = ocfs2_extent_map_get_blocks(gqinode, blk, &pblock, &pcount, NULL);
250 if (err) {
251 mlog_errno(err);
252 goto out;
253 }
251 /* Not rewriting whole block? */ 254 /* Not rewriting whole block? */
252 if ((offset || len < sb->s_blocksize - OCFS2_QBLK_RESERVED_SPACE) && 255 if ((offset || len < sb->s_blocksize - OCFS2_QBLK_RESERVED_SPACE) &&
253 !new) { 256 !new) {
254 err = ocfs2_read_quota_block(gqinode, blk, &bh); 257 err = ocfs2_read_quota_phys_block(gqinode, pblock, &bh);
255 ja_type = OCFS2_JOURNAL_ACCESS_WRITE; 258 ja_type = OCFS2_JOURNAL_ACCESS_WRITE;
256 } else { 259 } else {
257 err = ocfs2_get_quota_block(gqinode, blk, &bh); 260 bh = sb_getblk(sb, pblock);
261 if (!bh)
262 err = -ENOMEM;
258 ja_type = OCFS2_JOURNAL_ACCESS_CREATE; 263 ja_type = OCFS2_JOURNAL_ACCESS_CREATE;
259 } 264 }
260 if (err) { 265 if (err) {
@@ -279,13 +284,11 @@ ssize_t ocfs2_quota_write(struct super_block *sb, int type,
279 brelse(bh); 284 brelse(bh);
280out: 285out:
281 if (err) { 286 if (err) {
282 mutex_unlock(&gqinode->i_mutex);
283 mlog_errno(err); 287 mlog_errno(err);
284 return err; 288 return err;
285 } 289 }
286 gqinode->i_version++; 290 gqinode->i_version++;
287 ocfs2_mark_inode_dirty(handle, gqinode, oinfo->dqi_gqi_bh); 291 ocfs2_mark_inode_dirty(handle, gqinode, oinfo->dqi_gqi_bh);
288 mutex_unlock(&gqinode->i_mutex);
289 return len; 292 return len;
290} 293}
291 294
@@ -303,11 +306,23 @@ int ocfs2_lock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex)
303 else 306 else
304 WARN_ON(bh != oinfo->dqi_gqi_bh); 307 WARN_ON(bh != oinfo->dqi_gqi_bh);
305 spin_unlock(&dq_data_lock); 308 spin_unlock(&dq_data_lock);
309 if (ex) {
310 mutex_lock(&oinfo->dqi_gqinode->i_mutex);
311 down_write(&OCFS2_I(oinfo->dqi_gqinode)->ip_alloc_sem);
312 } else {
313 down_read(&OCFS2_I(oinfo->dqi_gqinode)->ip_alloc_sem);
314 }
306 return 0; 315 return 0;
307} 316}
308 317
309void ocfs2_unlock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex) 318void ocfs2_unlock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex)
310{ 319{
320 if (ex) {
321 up_write(&OCFS2_I(oinfo->dqi_gqinode)->ip_alloc_sem);
322 mutex_unlock(&oinfo->dqi_gqinode->i_mutex);
323 } else {
324 up_read(&OCFS2_I(oinfo->dqi_gqinode)->ip_alloc_sem);
325 }
311 ocfs2_inode_unlock(oinfo->dqi_gqinode, ex); 326 ocfs2_inode_unlock(oinfo->dqi_gqinode, ex);
312 brelse(oinfo->dqi_gqi_bh); 327 brelse(oinfo->dqi_gqi_bh);
313 spin_lock(&dq_data_lock); 328 spin_lock(&dq_data_lock);
@@ -458,75 +473,6 @@ static int ocfs2_calc_global_qinit_credits(struct super_block *sb, int type)
458 OCFS2_QUOTA_BLOCK_UPDATE_CREDITS; 473 OCFS2_QUOTA_BLOCK_UPDATE_CREDITS;
459} 474}
460 475
461/* Read in information from global quota file and acquire a reference to it.
462 * dquot_acquire() has already started the transaction and locked quota file */
463int ocfs2_global_read_dquot(struct dquot *dquot)
464{
465 int err, err2, ex = 0;
466 struct super_block *sb = dquot->dq_sb;
467 int type = dquot->dq_type;
468 struct ocfs2_mem_dqinfo *info = sb_dqinfo(sb, type)->dqi_priv;
469 struct ocfs2_super *osb = OCFS2_SB(sb);
470 struct inode *gqinode = info->dqi_gqinode;
471 int need_alloc = ocfs2_global_qinit_alloc(sb, type);
472 handle_t *handle = NULL;
473
474 err = ocfs2_qinfo_lock(info, 0);
475 if (err < 0)
476 goto out;
477 err = qtree_read_dquot(&info->dqi_gi, dquot);
478 if (err < 0)
479 goto out_qlock;
480 OCFS2_DQUOT(dquot)->dq_use_count++;
481 OCFS2_DQUOT(dquot)->dq_origspace = dquot->dq_dqb.dqb_curspace;
482 OCFS2_DQUOT(dquot)->dq_originodes = dquot->dq_dqb.dqb_curinodes;
483 ocfs2_qinfo_unlock(info, 0);
484
485 if (!dquot->dq_off) { /* No real quota entry? */
486 ex = 1;
487 /*
488 * Add blocks to quota file before we start a transaction since
489 * locking allocators ranks above a transaction start
490 */
491 WARN_ON(journal_current_handle());
492 down_write(&OCFS2_I(gqinode)->ip_alloc_sem);
493 err = ocfs2_extend_no_holes(gqinode,
494 gqinode->i_size + (need_alloc << sb->s_blocksize_bits),
495 gqinode->i_size);
496 up_write(&OCFS2_I(gqinode)->ip_alloc_sem);
497 if (err < 0)
498 goto out;
499 }
500
501 handle = ocfs2_start_trans(osb,
502 ocfs2_calc_global_qinit_credits(sb, type));
503 if (IS_ERR(handle)) {
504 err = PTR_ERR(handle);
505 goto out;
506 }
507 err = ocfs2_qinfo_lock(info, ex);
508 if (err < 0)
509 goto out_trans;
510 err = qtree_write_dquot(&info->dqi_gi, dquot);
511 if (ex && info_dirty(sb_dqinfo(dquot->dq_sb, dquot->dq_type))) {
512 err2 = __ocfs2_global_write_info(dquot->dq_sb, dquot->dq_type);
513 if (!err)
514 err = err2;
515 }
516out_qlock:
517 if (ex)
518 ocfs2_qinfo_unlock(info, 1);
519 else
520 ocfs2_qinfo_unlock(info, 0);
521out_trans:
522 if (handle)
523 ocfs2_commit_trans(osb, handle);
524out:
525 if (err < 0)
526 mlog_errno(err);
527 return err;
528}
529
530/* Sync local information about quota modifications with global quota file. 476/* Sync local information about quota modifications with global quota file.
531 * Caller must have started the transaction and obtained exclusive lock for 477 * Caller must have started the transaction and obtained exclusive lock for
532 * global quota file inode */ 478 * global quota file inode */
@@ -742,6 +688,10 @@ static int ocfs2_release_dquot(struct dquot *dquot)
742 688
743 mlog_entry("id=%u, type=%d", dquot->dq_id, dquot->dq_type); 689 mlog_entry("id=%u, type=%d", dquot->dq_id, dquot->dq_type);
744 690
691 mutex_lock(&dquot->dq_lock);
692 /* Check whether we are not racing with some other dqget() */
693 if (atomic_read(&dquot->dq_count) > 1)
694 goto out;
745 status = ocfs2_lock_global_qf(oinfo, 1); 695 status = ocfs2_lock_global_qf(oinfo, 1);
746 if (status < 0) 696 if (status < 0)
747 goto out; 697 goto out;
@@ -752,30 +702,113 @@ static int ocfs2_release_dquot(struct dquot *dquot)
752 mlog_errno(status); 702 mlog_errno(status);
753 goto out_ilock; 703 goto out_ilock;
754 } 704 }
755 status = dquot_release(dquot); 705
706 status = ocfs2_global_release_dquot(dquot);
707 if (status < 0) {
708 mlog_errno(status);
709 goto out_trans;
710 }
711 status = ocfs2_local_release_dquot(handle, dquot);
712 /*
713 * If we fail here, we cannot do much as global structure is
714 * already released. So just complain...
715 */
716 if (status < 0)
717 mlog_errno(status);
718 clear_bit(DQ_ACTIVE_B, &dquot->dq_flags);
719out_trans:
756 ocfs2_commit_trans(osb, handle); 720 ocfs2_commit_trans(osb, handle);
757out_ilock: 721out_ilock:
758 ocfs2_unlock_global_qf(oinfo, 1); 722 ocfs2_unlock_global_qf(oinfo, 1);
759out: 723out:
724 mutex_unlock(&dquot->dq_lock);
760 mlog_exit(status); 725 mlog_exit(status);
761 return status; 726 return status;
762} 727}
763 728
729/*
730 * Read global dquot structure from disk or create it if it does
731 * not exist. Also update use count of the global structure and
732 * create structure in node-local quota file.
733 */
764static int ocfs2_acquire_dquot(struct dquot *dquot) 734static int ocfs2_acquire_dquot(struct dquot *dquot)
765{ 735{
766 struct ocfs2_mem_dqinfo *oinfo = 736 int status = 0, err;
767 sb_dqinfo(dquot->dq_sb, dquot->dq_type)->dqi_priv; 737 int ex = 0;
768 int status = 0; 738 struct super_block *sb = dquot->dq_sb;
739 struct ocfs2_super *osb = OCFS2_SB(sb);
740 int type = dquot->dq_type;
741 struct ocfs2_mem_dqinfo *info = sb_dqinfo(sb, type)->dqi_priv;
742 struct inode *gqinode = info->dqi_gqinode;
743 int need_alloc = ocfs2_global_qinit_alloc(sb, type);
744 handle_t *handle;
769 745
770 mlog_entry("id=%u, type=%d", dquot->dq_id, dquot->dq_type); 746 mlog_entry("id=%u, type=%d", dquot->dq_id, type);
771 /* We need an exclusive lock, because we're going to update use count 747 mutex_lock(&dquot->dq_lock);
772 * and instantiate possibly new dquot structure */ 748 /*
773 status = ocfs2_lock_global_qf(oinfo, 1); 749 * We need an exclusive lock, because we're going to update use count
750 * and instantiate possibly new dquot structure
751 */
752 status = ocfs2_lock_global_qf(info, 1);
774 if (status < 0) 753 if (status < 0)
775 goto out; 754 goto out;
776 status = dquot_acquire(dquot); 755 if (!test_bit(DQ_READ_B, &dquot->dq_flags)) {
777 ocfs2_unlock_global_qf(oinfo, 1); 756 status = ocfs2_qinfo_lock(info, 0);
757 if (status < 0)
758 goto out_dq;
759 status = qtree_read_dquot(&info->dqi_gi, dquot);
760 ocfs2_qinfo_unlock(info, 0);
761 if (status < 0)
762 goto out_dq;
763 }
764 set_bit(DQ_READ_B, &dquot->dq_flags);
765
766 OCFS2_DQUOT(dquot)->dq_use_count++;
767 OCFS2_DQUOT(dquot)->dq_origspace = dquot->dq_dqb.dqb_curspace;
768 OCFS2_DQUOT(dquot)->dq_originodes = dquot->dq_dqb.dqb_curinodes;
769 if (!dquot->dq_off) { /* No real quota entry? */
770 ex = 1;
771 /*
772 * Add blocks to quota file before we start a transaction since
773 * locking allocators ranks above a transaction start
774 */
775 WARN_ON(journal_current_handle());
776 status = ocfs2_extend_no_holes(gqinode,
777 gqinode->i_size + (need_alloc << sb->s_blocksize_bits),
778 gqinode->i_size);
779 if (status < 0)
780 goto out_dq;
781 }
782
783 handle = ocfs2_start_trans(osb,
784 ocfs2_calc_global_qinit_credits(sb, type));
785 if (IS_ERR(handle)) {
786 status = PTR_ERR(handle);
787 goto out_dq;
788 }
789 status = ocfs2_qinfo_lock(info, ex);
790 if (status < 0)
791 goto out_trans;
792 status = qtree_write_dquot(&info->dqi_gi, dquot);
793 if (ex && info_dirty(sb_dqinfo(sb, type))) {
794 err = __ocfs2_global_write_info(sb, type);
795 if (!status)
796 status = err;
797 }
798 ocfs2_qinfo_unlock(info, ex);
799out_trans:
800 ocfs2_commit_trans(osb, handle);
801out_dq:
802 ocfs2_unlock_global_qf(info, 1);
803 if (status < 0)
804 goto out;
805
806 status = ocfs2_create_local_dquot(dquot);
807 if (status < 0)
808 goto out;
809 set_bit(DQ_ACTIVE_B, &dquot->dq_flags);
778out: 810out:
811 mutex_unlock(&dquot->dq_lock);
779 mlog_exit(status); 812 mlog_exit(status);
780 return status; 813 return status;
781} 814}
@@ -820,7 +853,9 @@ static int ocfs2_mark_dquot_dirty(struct dquot *dquot)
820 mlog_errno(status); 853 mlog_errno(status);
821 goto out_ilock; 854 goto out_ilock;
822 } 855 }
856 mutex_lock(&sb_dqopt(sb)->dqio_mutex);
823 status = ocfs2_sync_dquot(dquot); 857 status = ocfs2_sync_dquot(dquot);
858 mutex_unlock(&sb_dqopt(sb)->dqio_mutex);
824 if (status < 0) { 859 if (status < 0) {
825 mlog_errno(status); 860 mlog_errno(status);
826 goto out_trans; 861 goto out_trans;
diff --git a/fs/ocfs2/quota_local.c b/fs/ocfs2/quota_local.c
index 962e8380852b..778947f0e951 100644
--- a/fs/ocfs2/quota_local.c
+++ b/fs/ocfs2/quota_local.c
@@ -22,6 +22,7 @@
22#include "dlmglue.h" 22#include "dlmglue.h"
23#include "quota.h" 23#include "quota.h"
24#include "uptodate.h" 24#include "uptodate.h"
25#include "super.h"
25 26
26/* Number of local quota structures per block */ 27/* Number of local quota structures per block */
27static inline unsigned int ol_quota_entries_per_block(struct super_block *sb) 28static inline unsigned int ol_quota_entries_per_block(struct super_block *sb)
@@ -129,6 +130,39 @@ static int ocfs2_modify_bh(struct inode *inode, struct buffer_head *bh,
129 return 0; 130 return 0;
130} 131}
131 132
133/*
134 * Read quota block from a given logical offset.
135 *
136 * This function acquires ip_alloc_sem and thus it must not be called with a
137 * transaction started.
138 */
139static int ocfs2_read_quota_block(struct inode *inode, u64 v_block,
140 struct buffer_head **bh)
141{
142 int rc = 0;
143 struct buffer_head *tmp = *bh;
144
145 if (i_size_read(inode) >> inode->i_sb->s_blocksize_bits <= v_block) {
146 ocfs2_error(inode->i_sb,
147 "Quota file %llu is probably corrupted! Requested "
148 "to read block %Lu but file has size only %Lu\n",
149 (unsigned long long)OCFS2_I(inode)->ip_blkno,
150 (unsigned long long)v_block,
151 (unsigned long long)i_size_read(inode));
152 return -EIO;
153 }
154 rc = ocfs2_read_virt_blocks(inode, v_block, 1, &tmp, 0,
155 ocfs2_validate_quota_block);
156 if (rc)
157 mlog_errno(rc);
158
159 /* If ocfs2_read_virt_blocks() got us a new bh, pass it up. */
160 if (!rc && !*bh)
161 *bh = tmp;
162
163 return rc;
164}
165
132/* Check whether we understand format of quota files */ 166/* Check whether we understand format of quota files */
133static int ocfs2_local_check_quota_file(struct super_block *sb, int type) 167static int ocfs2_local_check_quota_file(struct super_block *sb, int type)
134{ 168{
@@ -972,10 +1006,8 @@ static struct ocfs2_quota_chunk *ocfs2_local_quota_add_chunk(
972 } 1006 }
973 1007
974 /* Initialize chunk header */ 1008 /* Initialize chunk header */
975 down_read(&OCFS2_I(lqinode)->ip_alloc_sem);
976 status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks, 1009 status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks,
977 &p_blkno, NULL, NULL); 1010 &p_blkno, NULL, NULL);
978 up_read(&OCFS2_I(lqinode)->ip_alloc_sem);
979 if (status < 0) { 1011 if (status < 0) {
980 mlog_errno(status); 1012 mlog_errno(status);
981 goto out_trans; 1013 goto out_trans;
@@ -1003,10 +1035,8 @@ static struct ocfs2_quota_chunk *ocfs2_local_quota_add_chunk(
1003 ocfs2_journal_dirty(handle, bh); 1035 ocfs2_journal_dirty(handle, bh);
1004 1036
1005 /* Initialize new block with structures */ 1037 /* Initialize new block with structures */
1006 down_read(&OCFS2_I(lqinode)->ip_alloc_sem);
1007 status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks + 1, 1038 status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks + 1,
1008 &p_blkno, NULL, NULL); 1039 &p_blkno, NULL, NULL);
1009 up_read(&OCFS2_I(lqinode)->ip_alloc_sem);
1010 if (status < 0) { 1040 if (status < 0) {
1011 mlog_errno(status); 1041 mlog_errno(status);
1012 goto out_trans; 1042 goto out_trans;
@@ -1103,10 +1133,8 @@ static struct ocfs2_quota_chunk *ocfs2_extend_local_quota_file(
1103 } 1133 }
1104 1134
1105 /* Get buffer from the just added block */ 1135 /* Get buffer from the just added block */
1106 down_read(&OCFS2_I(lqinode)->ip_alloc_sem);
1107 status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks, 1136 status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks,
1108 &p_blkno, NULL, NULL); 1137 &p_blkno, NULL, NULL);
1109 up_read(&OCFS2_I(lqinode)->ip_alloc_sem);
1110 if (status < 0) { 1138 if (status < 0) {
1111 mlog_errno(status); 1139 mlog_errno(status);
1112 goto out; 1140 goto out;
@@ -1187,7 +1215,7 @@ static void olq_alloc_dquot(struct buffer_head *bh, void *private)
1187} 1215}
1188 1216
1189/* Create dquot in the local file for given id */ 1217/* Create dquot in the local file for given id */
1190static int ocfs2_create_local_dquot(struct dquot *dquot) 1218int ocfs2_create_local_dquot(struct dquot *dquot)
1191{ 1219{
1192 struct super_block *sb = dquot->dq_sb; 1220 struct super_block *sb = dquot->dq_sb;
1193 int type = dquot->dq_type; 1221 int type = dquot->dq_type;
@@ -1237,36 +1265,11 @@ out:
1237 return status; 1265 return status;
1238} 1266}
1239 1267
1240/* Create entry in local file for dquot, load data from the global file */ 1268/*
1241static int ocfs2_local_read_dquot(struct dquot *dquot) 1269 * Release dquot structure from local quota file. ocfs2_release_dquot() has
1242{ 1270 * already started a transaction and written all changes to global quota file
1243 int status; 1271 */
1244 1272int ocfs2_local_release_dquot(handle_t *handle, struct dquot *dquot)
1245 mlog_entry("id=%u, type=%d\n", dquot->dq_id, dquot->dq_type);
1246
1247 status = ocfs2_global_read_dquot(dquot);
1248 if (status < 0) {
1249 mlog_errno(status);
1250 goto out_err;
1251 }
1252
1253 /* Now create entry in the local quota file */
1254 status = ocfs2_create_local_dquot(dquot);
1255 if (status < 0) {
1256 mlog_errno(status);
1257 goto out_err;
1258 }
1259 mlog_exit(0);
1260 return 0;
1261out_err:
1262 mlog_exit(status);
1263 return status;
1264}
1265
1266/* Release dquot structure from local quota file. ocfs2_release_dquot() has
1267 * already started a transaction and obtained exclusive lock for global
1268 * quota file. */
1269static int ocfs2_local_release_dquot(struct dquot *dquot)
1270{ 1273{
1271 int status; 1274 int status;
1272 int type = dquot->dq_type; 1275 int type = dquot->dq_type;
@@ -1274,15 +1277,6 @@ static int ocfs2_local_release_dquot(struct dquot *dquot)
1274 struct super_block *sb = dquot->dq_sb; 1277 struct super_block *sb = dquot->dq_sb;
1275 struct ocfs2_local_disk_chunk *dchunk; 1278 struct ocfs2_local_disk_chunk *dchunk;
1276 int offset; 1279 int offset;
1277 handle_t *handle = journal_current_handle();
1278
1279 BUG_ON(!handle);
1280 /* First write all local changes to global file */
1281 status = ocfs2_global_release_dquot(dquot);
1282 if (status < 0) {
1283 mlog_errno(status);
1284 goto out;
1285 }
1286 1280
1287 status = ocfs2_journal_access_dq(handle, 1281 status = ocfs2_journal_access_dq(handle,
1288 INODE_CACHE(sb_dqopt(sb)->files[type]), 1282 INODE_CACHE(sb_dqopt(sb)->files[type]),
@@ -1315,9 +1309,7 @@ static const struct quota_format_ops ocfs2_format_ops = {
1315 .read_file_info = ocfs2_local_read_info, 1309 .read_file_info = ocfs2_local_read_info,
1316 .write_file_info = ocfs2_global_write_info, 1310 .write_file_info = ocfs2_global_write_info,
1317 .free_file_info = ocfs2_local_free_info, 1311 .free_file_info = ocfs2_local_free_info,
1318 .read_dqblk = ocfs2_local_read_dquot,
1319 .commit_dqblk = ocfs2_local_write_dquot, 1312 .commit_dqblk = ocfs2_local_write_dquot,
1320 .release_dqblk = ocfs2_local_release_dquot,
1321}; 1313};
1322 1314
1323struct quota_format_type ocfs2_quota_format = { 1315struct quota_format_type ocfs2_quota_format = {