aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlmglue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r--fs/ocfs2/dlmglue.c369
1 files changed, 237 insertions, 132 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index c5e4a49e3a12..8298608d4165 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -297,6 +297,11 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
297 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 297 lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
298} 298}
299 299
300static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
301{
302 return container_of(lksb, struct ocfs2_lock_res, l_lksb);
303}
304
300static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 305static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
301{ 306{
302 BUG_ON(!ocfs2_is_inode_lock(lockres)); 307 BUG_ON(!ocfs2_is_inode_lock(lockres));
@@ -875,6 +880,14 @@ static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lo
875 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 880 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
876 881
877 lockres->l_level = lockres->l_requested; 882 lockres->l_level = lockres->l_requested;
883
884 /*
885 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
886 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
887 * downconverting the lock before the upconvert has fully completed.
888 */
889 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
890
878 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 891 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
879 892
880 mlog_exit_void(); 893 mlog_exit_void();
@@ -907,8 +920,6 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
907 920
908 assert_spin_locked(&lockres->l_lock); 921 assert_spin_locked(&lockres->l_lock);
909 922
910 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
911
912 if (level > lockres->l_blocking) { 923 if (level > lockres->l_blocking) {
913 /* only schedule a downconvert if we haven't already scheduled 924 /* only schedule a downconvert if we haven't already scheduled
914 * one that goes low enough to satisfy the level we're 925 * one that goes low enough to satisfy the level we're
@@ -921,6 +932,13 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
921 lockres->l_blocking = level; 932 lockres->l_blocking = level;
922 } 933 }
923 934
935 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
936 lockres->l_name, level, lockres->l_level, lockres->l_blocking,
937 needs_downconvert);
938
939 if (needs_downconvert)
940 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
941
924 mlog_exit(needs_downconvert); 942 mlog_exit(needs_downconvert);
925 return needs_downconvert; 943 return needs_downconvert;
926} 944}
@@ -1031,18 +1049,17 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1031 return lockres->l_pending_gen; 1049 return lockres->l_pending_gen;
1032} 1050}
1033 1051
1034 1052static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
1035static void ocfs2_blocking_ast(void *opaque, int level)
1036{ 1053{
1037 struct ocfs2_lock_res *lockres = opaque; 1054 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1038 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1055 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1039 int needs_downconvert; 1056 int needs_downconvert;
1040 unsigned long flags; 1057 unsigned long flags;
1041 1058
1042 BUG_ON(level <= DLM_LOCK_NL); 1059 BUG_ON(level <= DLM_LOCK_NL);
1043 1060
1044 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 1061 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
1045 lockres->l_name, level, lockres->l_level, 1062 "type %s\n", lockres->l_name, level, lockres->l_level,
1046 ocfs2_lock_type_string(lockres->l_type)); 1063 ocfs2_lock_type_string(lockres->l_type));
1047 1064
1048 /* 1065 /*
@@ -1063,9 +1080,9 @@ static void ocfs2_blocking_ast(void *opaque, int level)
1063 ocfs2_wake_downconvert_thread(osb); 1080 ocfs2_wake_downconvert_thread(osb);
1064} 1081}
1065 1082
1066static void ocfs2_locking_ast(void *opaque) 1083static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
1067{ 1084{
1068 struct ocfs2_lock_res *lockres = opaque; 1085 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1069 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1086 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1070 unsigned long flags; 1087 unsigned long flags;
1071 int status; 1088 int status;
@@ -1086,6 +1103,10 @@ static void ocfs2_locking_ast(void *opaque)
1086 return; 1103 return;
1087 } 1104 }
1088 1105
1106 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
1107 "level %d => %d\n", lockres->l_name, lockres->l_action,
1108 lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
1109
1089 switch(lockres->l_action) { 1110 switch(lockres->l_action) {
1090 case OCFS2_AST_ATTACH: 1111 case OCFS2_AST_ATTACH:
1091 ocfs2_generic_handle_attach_action(lockres); 1112 ocfs2_generic_handle_attach_action(lockres);
@@ -1098,8 +1119,8 @@ static void ocfs2_locking_ast(void *opaque)
1098 ocfs2_generic_handle_downconvert_action(lockres); 1119 ocfs2_generic_handle_downconvert_action(lockres);
1099 break; 1120 break;
1100 default: 1121 default:
1101 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 1122 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
1102 "lockres flags = 0x%lx, unlock action: %u\n", 1123 "flags 0x%lx, unlock: %u\n",
1103 lockres->l_name, lockres->l_action, lockres->l_flags, 1124 lockres->l_name, lockres->l_action, lockres->l_flags,
1104 lockres->l_unlock_action); 1125 lockres->l_unlock_action);
1105 BUG(); 1126 BUG();
@@ -1125,6 +1146,88 @@ out:
1125 spin_unlock_irqrestore(&lockres->l_lock, flags); 1146 spin_unlock_irqrestore(&lockres->l_lock, flags);
1126} 1147}
1127 1148
1149static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
1150{
1151 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1152 unsigned long flags;
1153
1154 mlog_entry_void();
1155
1156 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
1157 lockres->l_name, lockres->l_unlock_action);
1158
1159 spin_lock_irqsave(&lockres->l_lock, flags);
1160 if (error) {
1161 mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
1162 "unlock_action %d\n", error, lockres->l_name,
1163 lockres->l_unlock_action);
1164 spin_unlock_irqrestore(&lockres->l_lock, flags);
1165 mlog_exit_void();
1166 return;
1167 }
1168
1169 switch(lockres->l_unlock_action) {
1170 case OCFS2_UNLOCK_CANCEL_CONVERT:
1171 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
1172 lockres->l_action = OCFS2_AST_INVALID;
1173 /* Downconvert thread may have requeued this lock, we
1174 * need to wake it. */
1175 if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1176 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
1177 break;
1178 case OCFS2_UNLOCK_DROP_LOCK:
1179 lockres->l_level = DLM_LOCK_IV;
1180 break;
1181 default:
1182 BUG();
1183 }
1184
1185 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1186 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1187 wake_up(&lockres->l_event);
1188 spin_unlock_irqrestore(&lockres->l_lock, flags);
1189
1190 mlog_exit_void();
1191}
1192
1193/*
1194 * This is the filesystem locking protocol. It provides the lock handling
1195 * hooks for the underlying DLM. It has a maximum version number.
1196 * The version number allows interoperability with systems running at
1197 * the same major number and an equal or smaller minor number.
1198 *
1199 * Whenever the filesystem does new things with locks (adds or removes a
1200 * lock, orders them differently, does different things underneath a lock),
1201 * the version must be changed. The protocol is negotiated when joining
1202 * the dlm domain. A node may join the domain if its major version is
1203 * identical to all other nodes and its minor version is greater than
1204 * or equal to all other nodes. When its minor version is greater than
1205 * the other nodes, it will run at the minor version specified by the
1206 * other nodes.
1207 *
1208 * If a locking change is made that will not be compatible with older
1209 * versions, the major number must be increased and the minor version set
1210 * to zero. If a change merely adds a behavior that can be disabled when
1211 * speaking to older versions, the minor version must be increased. If a
1212 * change adds a fully backwards compatible change (eg, LVB changes that
1213 * are just ignored by older versions), the version does not need to be
1214 * updated.
1215 */
1216static struct ocfs2_locking_protocol lproto = {
1217 .lp_max_version = {
1218 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
1219 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
1220 },
1221 .lp_lock_ast = ocfs2_locking_ast,
1222 .lp_blocking_ast = ocfs2_blocking_ast,
1223 .lp_unlock_ast = ocfs2_unlock_ast,
1224};
1225
1226void ocfs2_set_locking_protocol(void)
1227{
1228 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
1229}
1230
1128static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1231static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1129 int convert) 1232 int convert)
1130{ 1233{
@@ -1133,6 +1236,7 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1133 mlog_entry_void(); 1236 mlog_entry_void();
1134 spin_lock_irqsave(&lockres->l_lock, flags); 1237 spin_lock_irqsave(&lockres->l_lock, flags);
1135 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1238 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1239 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1136 if (convert) 1240 if (convert)
1137 lockres->l_action = OCFS2_AST_INVALID; 1241 lockres->l_action = OCFS2_AST_INVALID;
1138 else 1242 else
@@ -1179,8 +1283,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
1179 &lockres->l_lksb, 1283 &lockres->l_lksb,
1180 dlm_flags, 1284 dlm_flags,
1181 lockres->l_name, 1285 lockres->l_name,
1182 OCFS2_LOCK_ID_MAX_LEN - 1, 1286 OCFS2_LOCK_ID_MAX_LEN - 1);
1183 lockres);
1184 lockres_clear_pending(lockres, gen, osb); 1287 lockres_clear_pending(lockres, gen, osb);
1185 if (ret) { 1288 if (ret) {
1186 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1289 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -1323,13 +1426,13 @@ static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1323again: 1426again:
1324 wait = 0; 1427 wait = 0;
1325 1428
1429 spin_lock_irqsave(&lockres->l_lock, flags);
1430
1326 if (catch_signals && signal_pending(current)) { 1431 if (catch_signals && signal_pending(current)) {
1327 ret = -ERESTARTSYS; 1432 ret = -ERESTARTSYS;
1328 goto out; 1433 goto unlock;
1329 } 1434 }
1330 1435
1331 spin_lock_irqsave(&lockres->l_lock, flags);
1332
1333 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1436 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1334 "Cluster lock called on freeing lockres %s! flags " 1437 "Cluster lock called on freeing lockres %s! flags "
1335 "0x%lx\n", lockres->l_name, lockres->l_flags); 1438 "0x%lx\n", lockres->l_name, lockres->l_flags);
@@ -1346,6 +1449,25 @@ again:
1346 goto unlock; 1449 goto unlock;
1347 } 1450 }
1348 1451
1452 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1453 /*
1454 * We've upconverted. If the lock now has a level we can
1455 * work with, we take it. If, however, the lock is not at the
1456 * required level, we go thru the full cycle. One way this could
1457 * happen is if a process requesting an upconvert to PR is
1458 * closely followed by another requesting upconvert to an EX.
1459 * If the process requesting EX lands here, we want it to
1460 * continue attempting to upconvert and let the process
1461 * requesting PR take the lock.
1462 * If multiple processes request upconvert to PR, the first one
1463 * here will take the lock. The others will have to go thru the
1464 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1465 * downconvert request.
1466 */
1467 if (level <= lockres->l_level)
1468 goto update_holders;
1469 }
1470
1349 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1471 if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1350 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1472 !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1351 /* is the lock is currently blocked on behalf of 1473 /* is the lock is currently blocked on behalf of
@@ -1383,7 +1505,7 @@ again:
1383 BUG_ON(level == DLM_LOCK_IV); 1505 BUG_ON(level == DLM_LOCK_IV);
1384 BUG_ON(level == DLM_LOCK_NL); 1506 BUG_ON(level == DLM_LOCK_NL);
1385 1507
1386 mlog(0, "lock %s, convert from %d to level = %d\n", 1508 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
1387 lockres->l_name, lockres->l_level, level); 1509 lockres->l_name, lockres->l_level, level);
1388 1510
1389 /* call dlm_lock to upgrade lock now */ 1511 /* call dlm_lock to upgrade lock now */
@@ -1392,8 +1514,7 @@ again:
1392 &lockres->l_lksb, 1514 &lockres->l_lksb,
1393 lkm_flags, 1515 lkm_flags,
1394 lockres->l_name, 1516 lockres->l_name,
1395 OCFS2_LOCK_ID_MAX_LEN - 1, 1517 OCFS2_LOCK_ID_MAX_LEN - 1);
1396 lockres);
1397 lockres_clear_pending(lockres, gen, osb); 1518 lockres_clear_pending(lockres, gen, osb);
1398 if (ret) { 1519 if (ret) {
1399 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1520 if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
@@ -1416,11 +1537,14 @@ again:
1416 goto again; 1537 goto again;
1417 } 1538 }
1418 1539
1540update_holders:
1419 /* Ok, if we get here then we're good to go. */ 1541 /* Ok, if we get here then we're good to go. */
1420 ocfs2_inc_holders(lockres, level); 1542 ocfs2_inc_holders(lockres, level);
1421 1543
1422 ret = 0; 1544 ret = 0;
1423unlock: 1545unlock:
1546 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1547
1424 spin_unlock_irqrestore(&lockres->l_lock, flags); 1548 spin_unlock_irqrestore(&lockres->l_lock, flags);
1425out: 1549out:
1426 /* 1550 /*
@@ -1827,8 +1951,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
1827 spin_unlock_irqrestore(&lockres->l_lock, flags); 1951 spin_unlock_irqrestore(&lockres->l_lock, flags);
1828 1952
1829 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1953 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1830 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, 1954 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
1831 lockres);
1832 if (ret) { 1955 if (ret) {
1833 if (!trylock || (ret != -EAGAIN)) { 1956 if (!trylock || (ret != -EAGAIN)) {
1834 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1957 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -2957,7 +3080,7 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
2957 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3080 status = ocfs2_cluster_connect(osb->osb_cluster_stack,
2958 osb->uuid_str, 3081 osb->uuid_str,
2959 strlen(osb->uuid_str), 3082 strlen(osb->uuid_str),
2960 ocfs2_do_node_down, osb, 3083 &lproto, ocfs2_do_node_down, osb,
2961 &conn); 3084 &conn);
2962 if (status) { 3085 if (status) {
2963 mlog_errno(status); 3086 mlog_errno(status);
@@ -3024,50 +3147,6 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
3024 mlog_exit_void(); 3147 mlog_exit_void();
3025} 3148}
3026 3149
3027static void ocfs2_unlock_ast(void *opaque, int error)
3028{
3029 struct ocfs2_lock_res *lockres = opaque;
3030 unsigned long flags;
3031
3032 mlog_entry_void();
3033
3034 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
3035 lockres->l_unlock_action);
3036
3037 spin_lock_irqsave(&lockres->l_lock, flags);
3038 if (error) {
3039 mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
3040 "unlock_action %d\n", error, lockres->l_name,
3041 lockres->l_unlock_action);
3042 spin_unlock_irqrestore(&lockres->l_lock, flags);
3043 mlog_exit_void();
3044 return;
3045 }
3046
3047 switch(lockres->l_unlock_action) {
3048 case OCFS2_UNLOCK_CANCEL_CONVERT:
3049 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
3050 lockres->l_action = OCFS2_AST_INVALID;
3051 /* Downconvert thread may have requeued this lock, we
3052 * need to wake it. */
3053 if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3054 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
3055 break;
3056 case OCFS2_UNLOCK_DROP_LOCK:
3057 lockres->l_level = DLM_LOCK_IV;
3058 break;
3059 default:
3060 BUG();
3061 }
3062
3063 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
3064 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
3065 wake_up(&lockres->l_event);
3066 spin_unlock_irqrestore(&lockres->l_lock, flags);
3067
3068 mlog_exit_void();
3069}
3070
3071static int ocfs2_drop_lock(struct ocfs2_super *osb, 3150static int ocfs2_drop_lock(struct ocfs2_super *osb,
3072 struct ocfs2_lock_res *lockres) 3151 struct ocfs2_lock_res *lockres)
3073{ 3152{
@@ -3135,8 +3214,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
3135 3214
3136 mlog(0, "lock %s\n", lockres->l_name); 3215 mlog(0, "lock %s\n", lockres->l_name);
3137 3216
3138 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags, 3217 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
3139 lockres);
3140 if (ret) { 3218 if (ret) {
3141 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3219 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3142 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3220 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
@@ -3155,7 +3233,7 @@ out:
3155/* Mark the lockres as being dropped. It will no longer be 3233/* Mark the lockres as being dropped. It will no longer be
3156 * queued if blocking, but we still may have to wait on it 3234 * queued if blocking, but we still may have to wait on it
3157 * being dequeued from the downconvert thread before we can consider 3235 * being dequeued from the downconvert thread before we can consider
3158 * it safe to drop. 3236 * it safe to drop.
3159 * 3237 *
3160 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3238 * You can *not* attempt to call cluster_lock on this lockres anymore. */
3161void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 3239void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
@@ -3244,13 +3322,20 @@ static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3244 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3322 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3245 3323
3246 if (lockres->l_level <= new_level) { 3324 if (lockres->l_level <= new_level) {
3247 mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n", 3325 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
3248 lockres->l_level, new_level); 3326 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
3327 "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
3328 new_level, list_empty(&lockres->l_blocked_list),
3329 list_empty(&lockres->l_mask_waiters), lockres->l_type,
3330 lockres->l_flags, lockres->l_ro_holders,
3331 lockres->l_ex_holders, lockres->l_action,
3332 lockres->l_unlock_action, lockres->l_requested,
3333 lockres->l_blocking, lockres->l_pending_gen);
3249 BUG(); 3334 BUG();
3250 } 3335 }
3251 3336
3252 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 3337 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
3253 lockres->l_name, new_level, lockres->l_blocking); 3338 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
3254 3339
3255 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3340 lockres->l_action = OCFS2_AST_DOWNCONVERT;
3256 lockres->l_requested = new_level; 3341 lockres->l_requested = new_level;
@@ -3269,6 +3354,9 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3269 3354
3270 mlog_entry_void(); 3355 mlog_entry_void();
3271 3356
3357 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
3358 lockres->l_level, new_level);
3359
3272 if (lvb) 3360 if (lvb)
3273 dlm_flags |= DLM_LKF_VALBLK; 3361 dlm_flags |= DLM_LKF_VALBLK;
3274 3362
@@ -3277,8 +3365,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3277 &lockres->l_lksb, 3365 &lockres->l_lksb,
3278 dlm_flags, 3366 dlm_flags,
3279 lockres->l_name, 3367 lockres->l_name,
3280 OCFS2_LOCK_ID_MAX_LEN - 1, 3368 OCFS2_LOCK_ID_MAX_LEN - 1);
3281 lockres);
3282 lockres_clear_pending(lockres, generation, osb); 3369 lockres_clear_pending(lockres, generation, osb);
3283 if (ret) { 3370 if (ret) {
3284 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3371 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -3299,14 +3386,12 @@ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3299 assert_spin_locked(&lockres->l_lock); 3386 assert_spin_locked(&lockres->l_lock);
3300 3387
3301 mlog_entry_void(); 3388 mlog_entry_void();
3302 mlog(0, "lock %s\n", lockres->l_name);
3303 3389
3304 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3390 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3305 /* If we're already trying to cancel a lock conversion 3391 /* If we're already trying to cancel a lock conversion
3306 * then just drop the spinlock and allow the caller to 3392 * then just drop the spinlock and allow the caller to
3307 * requeue this lock. */ 3393 * requeue this lock. */
3308 3394 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
3309 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
3310 return 0; 3395 return 0;
3311 } 3396 }
3312 3397
@@ -3321,6 +3406,8 @@ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3321 "lock %s, invalid flags: 0x%lx\n", 3406 "lock %s, invalid flags: 0x%lx\n",
3322 lockres->l_name, lockres->l_flags); 3407 lockres->l_name, lockres->l_flags);
3323 3408
3409 mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3410
3324 return 1; 3411 return 1;
3325} 3412}
3326 3413
@@ -3330,16 +3417,15 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3330 int ret; 3417 int ret;
3331 3418
3332 mlog_entry_void(); 3419 mlog_entry_void();
3333 mlog(0, "lock %s\n", lockres->l_name);
3334 3420
3335 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3421 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3336 DLM_LKF_CANCEL, lockres); 3422 DLM_LKF_CANCEL);
3337 if (ret) { 3423 if (ret) {
3338 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3424 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3339 ocfs2_recover_from_dlm_error(lockres, 0); 3425 ocfs2_recover_from_dlm_error(lockres, 0);
3340 } 3426 }
3341 3427
3342 mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name); 3428 mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3343 3429
3344 mlog_exit(ret); 3430 mlog_exit(ret);
3345 return ret; 3431 return ret;
@@ -3352,6 +3438,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3352 unsigned long flags; 3438 unsigned long flags;
3353 int blocking; 3439 int blocking;
3354 int new_level; 3440 int new_level;
3441 int level;
3355 int ret = 0; 3442 int ret = 0;
3356 int set_lvb = 0; 3443 int set_lvb = 0;
3357 unsigned int gen; 3444 unsigned int gen;
@@ -3360,9 +3447,17 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3360 3447
3361 spin_lock_irqsave(&lockres->l_lock, flags); 3448 spin_lock_irqsave(&lockres->l_lock, flags);
3362 3449
3363 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
3364
3365recheck: 3450recheck:
3451 /*
3452 * Is it still blocking? If not, we have no more work to do.
3453 */
3454 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3455 BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3456 spin_unlock_irqrestore(&lockres->l_lock, flags);
3457 ret = 0;
3458 goto leave;
3459 }
3460
3366 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3461 if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3367 /* XXX 3462 /* XXX
3368 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3463 * This is a *big* race. The OCFS2_LOCK_PENDING flag
@@ -3387,8 +3482,11 @@ recheck:
3387 * at the same time they set OCFS2_DLM_BUSY. They must 3482 * at the same time they set OCFS2_DLM_BUSY. They must
3388 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3483 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3389 */ 3484 */
3390 if (lockres->l_flags & OCFS2_LOCK_PENDING) 3485 if (lockres->l_flags & OCFS2_LOCK_PENDING) {
3486 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
3487 lockres->l_name);
3391 goto leave_requeue; 3488 goto leave_requeue;
3489 }
3392 3490
3393 ctl->requeue = 1; 3491 ctl->requeue = 1;
3394 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3492 ret = ocfs2_prepare_cancel_convert(osb, lockres);
@@ -3401,31 +3499,70 @@ recheck:
3401 goto leave; 3499 goto leave;
3402 } 3500 }
3403 3501
3502 /*
3503 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3504 * set when the ast is received for an upconvert just before the
3505 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3506 * on the heels of the ast, we want to delay the downconvert just
3507 * enough to allow the up requestor to do its task. Because this
3508 * lock is in the blocked queue, the lock will be downconverted
3509 * as soon as the requestor is done with the lock.
3510 */
3511 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3512 goto leave_requeue;
3513
3514 /*
3515 * How can we block and yet be at NL? We were trying to upconvert
3516 * from NL and got canceled. The code comes back here, and now
3517 * we notice and clear BLOCKING.
3518 */
3519 if (lockres->l_level == DLM_LOCK_NL) {
3520 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3521 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
3522 lockres->l_blocking = DLM_LOCK_NL;
3523 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3524 spin_unlock_irqrestore(&lockres->l_lock, flags);
3525 goto leave;
3526 }
3527
3404 /* if we're blocking an exclusive and we have *any* holders, 3528 /* if we're blocking an exclusive and we have *any* holders,
3405 * then requeue. */ 3529 * then requeue. */
3406 if ((lockres->l_blocking == DLM_LOCK_EX) 3530 if ((lockres->l_blocking == DLM_LOCK_EX)
3407 && (lockres->l_ex_holders || lockres->l_ro_holders)) 3531 && (lockres->l_ex_holders || lockres->l_ro_holders)) {
3532 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
3533 lockres->l_name, lockres->l_ex_holders,
3534 lockres->l_ro_holders);
3408 goto leave_requeue; 3535 goto leave_requeue;
3536 }
3409 3537
3410 /* If it's a PR we're blocking, then only 3538 /* If it's a PR we're blocking, then only
3411 * requeue if we've got any EX holders */ 3539 * requeue if we've got any EX holders */
3412 if (lockres->l_blocking == DLM_LOCK_PR && 3540 if (lockres->l_blocking == DLM_LOCK_PR &&
3413 lockres->l_ex_holders) 3541 lockres->l_ex_holders) {
3542 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
3543 lockres->l_name, lockres->l_ex_holders);
3414 goto leave_requeue; 3544 goto leave_requeue;
3545 }
3415 3546
3416 /* 3547 /*
3417 * Can we get a lock in this state if the holder counts are 3548 * Can we get a lock in this state if the holder counts are
3418 * zero? The meta data unblock code used to check this. 3549 * zero? The meta data unblock code used to check this.
3419 */ 3550 */
3420 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3551 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3421 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) 3552 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
3553 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
3554 lockres->l_name);
3422 goto leave_requeue; 3555 goto leave_requeue;
3556 }
3423 3557
3424 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3558 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3425 3559
3426 if (lockres->l_ops->check_downconvert 3560 if (lockres->l_ops->check_downconvert
3427 && !lockres->l_ops->check_downconvert(lockres, new_level)) 3561 && !lockres->l_ops->check_downconvert(lockres, new_level)) {
3562 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
3563 lockres->l_name);
3428 goto leave_requeue; 3564 goto leave_requeue;
3565 }
3429 3566
3430 /* If we get here, then we know that there are no more 3567 /* If we get here, then we know that there are no more
3431 * incompatible holders (and anyone asking for an incompatible 3568 * incompatible holders (and anyone asking for an incompatible
@@ -3438,17 +3575,24 @@ recheck:
3438 * may sleep, so we save off a copy of what we're blocking as 3575 * may sleep, so we save off a copy of what we're blocking as
3439 * it may change while we're not holding the spin lock. */ 3576 * it may change while we're not holding the spin lock. */
3440 blocking = lockres->l_blocking; 3577 blocking = lockres->l_blocking;
3578 level = lockres->l_level;
3441 spin_unlock_irqrestore(&lockres->l_lock, flags); 3579 spin_unlock_irqrestore(&lockres->l_lock, flags);
3442 3580
3443 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3581 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3444 3582
3445 if (ctl->unblock_action == UNBLOCK_STOP_POST) 3583 if (ctl->unblock_action == UNBLOCK_STOP_POST) {
3584 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
3585 lockres->l_name);
3446 goto leave; 3586 goto leave;
3587 }
3447 3588
3448 spin_lock_irqsave(&lockres->l_lock, flags); 3589 spin_lock_irqsave(&lockres->l_lock, flags);
3449 if (blocking != lockres->l_blocking) { 3590 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3450 /* If this changed underneath us, then we can't drop 3591 /* If this changed underneath us, then we can't drop
3451 * it just yet. */ 3592 * it just yet. */
3593 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
3594 "Recheck\n", lockres->l_name, blocking,
3595 lockres->l_blocking, level, lockres->l_level);
3452 goto recheck; 3596 goto recheck;
3453 } 3597 }
3454 3598
@@ -3843,45 +3987,6 @@ void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
3843 ocfs2_cluster_unlock(osb, lockres, level); 3987 ocfs2_cluster_unlock(osb, lockres, level);
3844} 3988}
3845 3989
3846/*
3847 * This is the filesystem locking protocol. It provides the lock handling
3848 * hooks for the underlying DLM. It has a maximum version number.
3849 * The version number allows interoperability with systems running at
3850 * the same major number and an equal or smaller minor number.
3851 *
3852 * Whenever the filesystem does new things with locks (adds or removes a
3853 * lock, orders them differently, does different things underneath a lock),
3854 * the version must be changed. The protocol is negotiated when joining
3855 * the dlm domain. A node may join the domain if its major version is
3856 * identical to all other nodes and its minor version is greater than
3857 * or equal to all other nodes. When its minor version is greater than
3858 * the other nodes, it will run at the minor version specified by the
3859 * other nodes.
3860 *
3861 * If a locking change is made that will not be compatible with older
3862 * versions, the major number must be increased and the minor version set
3863 * to zero. If a change merely adds a behavior that can be disabled when
3864 * speaking to older versions, the minor version must be increased. If a
3865 * change adds a fully backwards compatible change (eg, LVB changes that
3866 * are just ignored by older versions), the version does not need to be
3867 * updated.
3868 */
3869static struct ocfs2_locking_protocol lproto = {
3870 .lp_max_version = {
3871 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
3872 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
3873 },
3874 .lp_lock_ast = ocfs2_locking_ast,
3875 .lp_blocking_ast = ocfs2_blocking_ast,
3876 .lp_unlock_ast = ocfs2_unlock_ast,
3877};
3878
3879void ocfs2_set_locking_protocol(void)
3880{
3881 ocfs2_stack_glue_set_locking_protocol(&lproto);
3882}
3883
3884
3885static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3990static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3886 struct ocfs2_lock_res *lockres) 3991 struct ocfs2_lock_res *lockres)
3887{ 3992{
@@ -3898,7 +4003,7 @@ static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3898 BUG_ON(!lockres); 4003 BUG_ON(!lockres);
3899 BUG_ON(!lockres->l_ops); 4004 BUG_ON(!lockres->l_ops);
3900 4005
3901 mlog(0, "lockres %s blocked.\n", lockres->l_name); 4006 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
3902 4007
3903 /* Detect whether a lock has been marked as going away while 4008 /* Detect whether a lock has been marked as going away while
3904 * the downconvert thread was processing other things. A lock can 4009 * the downconvert thread was processing other things. A lock can
@@ -3921,7 +4026,7 @@ unqueue:
3921 } else 4026 } else
3922 ocfs2_schedule_blocked_lock(osb, lockres); 4027 ocfs2_schedule_blocked_lock(osb, lockres);
3923 4028
3924 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 4029 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
3925 ctl.requeue ? "yes" : "no"); 4030 ctl.requeue ? "yes" : "no");
3926 spin_unlock_irqrestore(&lockres->l_lock, flags); 4031 spin_unlock_irqrestore(&lockres->l_lock, flags);
3927 4032
@@ -3943,7 +4048,7 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3943 /* Do not schedule a lock for downconvert when it's on 4048 /* Do not schedule a lock for downconvert when it's on
3944 * the way to destruction - any nodes wanting access 4049 * the way to destruction - any nodes wanting access
3945 * to the resource will get it soon. */ 4050 * to the resource will get it soon. */
3946 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 4051 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
3947 lockres->l_name, lockres->l_flags); 4052 lockres->l_name, lockres->l_flags);
3948 return; 4053 return;
3949 } 4054 }