aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
authorSteve French <sfrench@us.ibm.com>2007-07-18 20:38:57 -0400
committerSteve French <sfrench@us.ibm.com>2007-07-18 20:38:57 -0400
commit1ff8392c32a2645d2665ca779ecb91bb29361c13 (patch)
tree860b95e9a499ade4060848740fc6ce1fbb4e4e8d /fs/dlm/lock.c
parent70b315b0dd3879cb3ab8aadffb14f10b2d19b9c3 (diff)
parent5bae7ac9feba925fd0099057f6b23d7be80b7b41 (diff)
Merge branch 'master' of /pub/scm/linux/kernel/git/torvalds/linux-2.6
Conflicts: fs/cifs/export.c
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c470
1 files changed, 369 insertions, 101 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index d8d6e729f96b..b455919c1998 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -82,10 +82,13 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r); 83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 86static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms); 87 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms); 88static int receive_extralen(struct dlm_message *ms);
88static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 89static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90static void del_timeout(struct dlm_lkb *lkb);
91void dlm_timeout_warn(struct dlm_lkb *lkb);
89 92
90/* 93/*
91 * Lock compatibilty matrix - thanks Steve 94 * Lock compatibilty matrix - thanks Steve
@@ -194,17 +197,17 @@ void dlm_dump_rsb(struct dlm_rsb *r)
194 197
195/* Threads cannot use the lockspace while it's being recovered */ 198/* Threads cannot use the lockspace while it's being recovered */
196 199
197static inline void lock_recovery(struct dlm_ls *ls) 200static inline void dlm_lock_recovery(struct dlm_ls *ls)
198{ 201{
199 down_read(&ls->ls_in_recovery); 202 down_read(&ls->ls_in_recovery);
200} 203}
201 204
202static inline void unlock_recovery(struct dlm_ls *ls) 205void dlm_unlock_recovery(struct dlm_ls *ls)
203{ 206{
204 up_read(&ls->ls_in_recovery); 207 up_read(&ls->ls_in_recovery);
205} 208}
206 209
207static inline int lock_recovery_try(struct dlm_ls *ls) 210int dlm_lock_recovery_try(struct dlm_ls *ls)
208{ 211{
209 return down_read_trylock(&ls->ls_in_recovery); 212 return down_read_trylock(&ls->ls_in_recovery);
210} 213}
@@ -286,8 +289,22 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
286 if (is_master_copy(lkb)) 289 if (is_master_copy(lkb))
287 return; 290 return;
288 291
292 del_timeout(lkb);
293
289 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
290 295
296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 rv = -ETIMEDOUT;
301 }
302
303 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305 rv = -EDEADLK;
306 }
307
291 lkb->lkb_lksb->sb_status = rv; 308 lkb->lkb_lksb->sb_status = rv;
292 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags; 309 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
293 310
@@ -581,6 +598,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
581 kref_init(&lkb->lkb_ref); 598 kref_init(&lkb->lkb_ref);
582 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 599 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 600 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
601 INIT_LIST_HEAD(&lkb->lkb_time_list);
584 602
585 get_random_bytes(&bucket, sizeof(bucket)); 603 get_random_bytes(&bucket, sizeof(bucket));
586 bucket &= (ls->ls_lkbtbl_size - 1); 604 bucket &= (ls->ls_lkbtbl_size - 1);
@@ -985,15 +1003,136 @@ void dlm_scan_rsbs(struct dlm_ls *ls)
985{ 1003{
986 int i; 1004 int i;
987 1005
988 if (dlm_locking_stopped(ls))
989 return;
990
991 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1006 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
992 shrink_bucket(ls, i); 1007 shrink_bucket(ls, i);
1008 if (dlm_locking_stopped(ls))
1009 break;
993 cond_resched(); 1010 cond_resched();
994 } 1011 }
995} 1012}
996 1013
1014static void add_timeout(struct dlm_lkb *lkb)
1015{
1016 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1017
1018 if (is_master_copy(lkb)) {
1019 lkb->lkb_timestamp = jiffies;
1020 return;
1021 }
1022
1023 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1024 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1025 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1026 goto add_it;
1027 }
1028 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1029 goto add_it;
1030 return;
1031
1032 add_it:
1033 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1034 mutex_lock(&ls->ls_timeout_mutex);
1035 hold_lkb(lkb);
1036 lkb->lkb_timestamp = jiffies;
1037 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1038 mutex_unlock(&ls->ls_timeout_mutex);
1039}
1040
1041static void del_timeout(struct dlm_lkb *lkb)
1042{
1043 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1044
1045 mutex_lock(&ls->ls_timeout_mutex);
1046 if (!list_empty(&lkb->lkb_time_list)) {
1047 list_del_init(&lkb->lkb_time_list);
1048 unhold_lkb(lkb);
1049 }
1050 mutex_unlock(&ls->ls_timeout_mutex);
1051}
1052
1053/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1054 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1055 and then lock rsb because of lock ordering in add_timeout. We may need
1056 to specify some special timeout-related bits in the lkb that are just to
1057 be accessed under the timeout_mutex. */
1058
1059void dlm_scan_timeout(struct dlm_ls *ls)
1060{
1061 struct dlm_rsb *r;
1062 struct dlm_lkb *lkb;
1063 int do_cancel, do_warn;
1064
1065 for (;;) {
1066 if (dlm_locking_stopped(ls))
1067 break;
1068
1069 do_cancel = 0;
1070 do_warn = 0;
1071 mutex_lock(&ls->ls_timeout_mutex);
1072 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1073
1074 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1075 time_after_eq(jiffies, lkb->lkb_timestamp +
1076 lkb->lkb_timeout_cs * HZ/100))
1077 do_cancel = 1;
1078
1079 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1080 time_after_eq(jiffies, lkb->lkb_timestamp +
1081 dlm_config.ci_timewarn_cs * HZ/100))
1082 do_warn = 1;
1083
1084 if (!do_cancel && !do_warn)
1085 continue;
1086 hold_lkb(lkb);
1087 break;
1088 }
1089 mutex_unlock(&ls->ls_timeout_mutex);
1090
1091 if (!do_cancel && !do_warn)
1092 break;
1093
1094 r = lkb->lkb_resource;
1095 hold_rsb(r);
1096 lock_rsb(r);
1097
1098 if (do_warn) {
1099 /* clear flag so we only warn once */
1100 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1101 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1102 del_timeout(lkb);
1103 dlm_timeout_warn(lkb);
1104 }
1105
1106 if (do_cancel) {
1107 log_debug(ls, "timeout cancel %x node %d %s",
1108 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1109 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1110 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1111 del_timeout(lkb);
1112 _cancel_lock(r, lkb);
1113 }
1114
1115 unlock_rsb(r);
1116 unhold_rsb(r);
1117 dlm_put_lkb(lkb);
1118 }
1119}
1120
1121/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1122 dlm_recoverd before checking/setting ls_recover_begin. */
1123
1124void dlm_adjust_timeouts(struct dlm_ls *ls)
1125{
1126 struct dlm_lkb *lkb;
1127 long adj = jiffies - ls->ls_recover_begin;
1128
1129 ls->ls_recover_begin = 0;
1130 mutex_lock(&ls->ls_timeout_mutex);
1131 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1132 lkb->lkb_timestamp += adj;
1133 mutex_unlock(&ls->ls_timeout_mutex);
1134}
1135
997/* lkb is master or local copy */ 1136/* lkb is master or local copy */
998 1137
999static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1138static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -1275,10 +1414,8 @@ static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1275 * queue for one resource. The granted mode of each lock blocks the requested 1414 * queue for one resource. The granted mode of each lock blocks the requested
1276 * mode of the other lock." 1415 * mode of the other lock."
1277 * 1416 *
1278 * Part 2: if the granted mode of lkb is preventing the first lkb in the 1417 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1279 * convert queue from being granted, then demote lkb (set grmode to NL). 1418 * convert queue from being granted, then deadlk/demote lkb.
1280 * This second form requires that we check for conv-deadlk even when
1281 * now == 0 in _can_be_granted().
1282 * 1419 *
1283 * Example: 1420 * Example:
1284 * Granted Queue: empty 1421 * Granted Queue: empty
@@ -1287,41 +1424,52 @@ static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1287 * 1424 *
1288 * The first lock can't be granted because of the granted mode of the second 1425 * The first lock can't be granted because of the granted mode of the second
1289 * lock and the second lock can't be granted because it's not first in the 1426 * lock and the second lock can't be granted because it's not first in the
1290 * list. We demote the granted mode of the second lock (the lkb passed to this 1427 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1291 * function). 1428 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1429 * flag set and return DEMOTED in the lksb flags.
1430 *
1431 * Originally, this function detected conv-deadlk in a more limited scope:
1432 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1433 * - if lkb1 was the first entry in the queue (not just earlier), and was
1434 * blocked by the granted mode of lkb2, and there was nothing on the
1435 * granted queue preventing lkb1 from being granted immediately, i.e.
1436 * lkb2 was the only thing preventing lkb1 from being granted.
1437 *
1438 * That second condition meant we'd only say there was conv-deadlk if
1439 * resolving it (by demotion) would lead to the first lock on the convert
1440 * queue being granted right away. It allowed conversion deadlocks to exist
1441 * between locks on the convert queue while they couldn't be granted anyway.
1292 * 1442 *
1293 * After the resolution, the "grant pending" function needs to go back and try 1443 * Now, we detect and take action on conversion deadlocks immediately when
1294 * to grant locks on the convert queue again since the first lock can now be 1444 * they're created, even if they may not be immediately consequential. If
1295 * granted. 1445 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1446 * mode that would prevent lkb1's conversion from being granted, we do a
1447 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1448 * I think this means that the lkb_is_ahead condition below should always
1449 * be zero, i.e. there will never be conv-deadlk between two locks that are
1450 * both already on the convert queue.
1296 */ 1451 */
1297 1452
1298static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb) 1453static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1299{ 1454{
1300 struct dlm_lkb *this, *first = NULL, *self = NULL; 1455 struct dlm_lkb *lkb1;
1456 int lkb_is_ahead = 0;
1301 1457
1302 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) { 1458 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1303 if (!first) 1459 if (lkb1 == lkb2) {
1304 first = this; 1460 lkb_is_ahead = 1;
1305 if (this == lkb) {
1306 self = lkb;
1307 continue; 1461 continue;
1308 } 1462 }
1309 1463
1310 if (!modes_compat(this, lkb) && !modes_compat(lkb, this)) 1464 if (!lkb_is_ahead) {
1311 return 1; 1465 if (!modes_compat(lkb2, lkb1))
1312 } 1466 return 1;
1313 1467 } else {
1314 /* if lkb is on the convert queue and is preventing the first 1468 if (!modes_compat(lkb2, lkb1) &&
1315 from being granted, then there's deadlock and we demote lkb. 1469 !modes_compat(lkb1, lkb2))
1316 multiple converting locks may need to do this before the first 1470 return 1;
1317 converting lock can be granted. */ 1471 }
1318
1319 if (self && self != first) {
1320 if (!modes_compat(lkb, first) &&
1321 !queue_conflict(&rsb->res_grantqueue, first))
1322 return 1;
1323 } 1472 }
1324
1325 return 0; 1473 return 0;
1326} 1474}
1327 1475
@@ -1450,42 +1598,57 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1450 if (!now && !conv && list_empty(&r->res_convertqueue) && 1598 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1451 first_in_list(lkb, &r->res_waitqueue)) 1599 first_in_list(lkb, &r->res_waitqueue))
1452 return 1; 1600 return 1;
1453
1454 out: 1601 out:
1455 /*
1456 * The following, enabled by CONVDEADLK, departs from VMS.
1457 */
1458
1459 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1460 conversion_deadlock_detect(r, lkb)) {
1461 lkb->lkb_grmode = DLM_LOCK_NL;
1462 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1463 }
1464
1465 return 0; 1602 return 0;
1466} 1603}
1467 1604
1468/* 1605static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1469 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a 1606 int *err)
1470 * simple way to provide a big optimization to applications that can use them.
1471 */
1472
1473static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1474{ 1607{
1475 uint32_t flags = lkb->lkb_exflags;
1476 int rv; 1608 int rv;
1477 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 1609 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1610 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1611
1612 if (err)
1613 *err = 0;
1478 1614
1479 rv = _can_be_granted(r, lkb, now); 1615 rv = _can_be_granted(r, lkb, now);
1480 if (rv) 1616 if (rv)
1481 goto out; 1617 goto out;
1482 1618
1483 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED) 1619 /*
1620 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1621 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1622 * cancels one of the locks.
1623 */
1624
1625 if (is_convert && can_be_queued(lkb) &&
1626 conversion_deadlock_detect(r, lkb)) {
1627 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1628 lkb->lkb_grmode = DLM_LOCK_NL;
1629 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1630 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1631 if (err)
1632 *err = -EDEADLK;
1633 else {
1634 log_print("can_be_granted deadlock %x now %d",
1635 lkb->lkb_id, now);
1636 dlm_dump_rsb(r);
1637 }
1638 }
1484 goto out; 1639 goto out;
1640 }
1485 1641
1486 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR) 1642 /*
1643 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1644 * to grant a request in a mode other than the normal rqmode. It's a
1645 * simple way to provide a big optimization to applications that can
1646 * use them.
1647 */
1648
1649 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1487 alt = DLM_LOCK_PR; 1650 alt = DLM_LOCK_PR;
1488 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW) 1651 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1489 alt = DLM_LOCK_CW; 1652 alt = DLM_LOCK_CW;
1490 1653
1491 if (alt) { 1654 if (alt) {
@@ -1500,10 +1663,20 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1500 return rv; 1663 return rv;
1501} 1664}
1502 1665
1666/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1667 for locks pending on the convert list. Once verified (watch for these
1668 log_prints), we should be able to just call _can_be_granted() and not
1669 bother with the demote/deadlk cases here (and there's no easy way to deal
1670 with a deadlk here, we'd have to generate something like grant_lock with
1671 the deadlk error.) */
1672
1673/* returns the highest requested mode of all blocked conversions */
1674
1503static int grant_pending_convert(struct dlm_rsb *r, int high) 1675static int grant_pending_convert(struct dlm_rsb *r, int high)
1504{ 1676{
1505 struct dlm_lkb *lkb, *s; 1677 struct dlm_lkb *lkb, *s;
1506 int hi, demoted, quit, grant_restart, demote_restart; 1678 int hi, demoted, quit, grant_restart, demote_restart;
1679 int deadlk;
1507 1680
1508 quit = 0; 1681 quit = 0;
1509 restart: 1682 restart:
@@ -1513,14 +1686,29 @@ static int grant_pending_convert(struct dlm_rsb *r, int high)
1513 1686
1514 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 1687 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1515 demoted = is_demoted(lkb); 1688 demoted = is_demoted(lkb);
1516 if (can_be_granted(r, lkb, 0)) { 1689 deadlk = 0;
1690
1691 if (can_be_granted(r, lkb, 0, &deadlk)) {
1517 grant_lock_pending(r, lkb); 1692 grant_lock_pending(r, lkb);
1518 grant_restart = 1; 1693 grant_restart = 1;
1519 } else { 1694 continue;
1520 hi = max_t(int, lkb->lkb_rqmode, hi);
1521 if (!demoted && is_demoted(lkb))
1522 demote_restart = 1;
1523 } 1695 }
1696
1697 if (!demoted && is_demoted(lkb)) {
1698 log_print("WARN: pending demoted %x node %d %s",
1699 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1700 demote_restart = 1;
1701 continue;
1702 }
1703
1704 if (deadlk) {
1705 log_print("WARN: pending deadlock %x node %d %s",
1706 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1707 dlm_dump_rsb(r);
1708 continue;
1709 }
1710
1711 hi = max_t(int, lkb->lkb_rqmode, hi);
1524 } 1712 }
1525 1713
1526 if (grant_restart) 1714 if (grant_restart)
@@ -1538,7 +1726,7 @@ static int grant_pending_wait(struct dlm_rsb *r, int high)
1538 struct dlm_lkb *lkb, *s; 1726 struct dlm_lkb *lkb, *s;
1539 1727
1540 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 1728 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1541 if (can_be_granted(r, lkb, 0)) 1729 if (can_be_granted(r, lkb, 0, NULL))
1542 grant_lock_pending(r, lkb); 1730 grant_lock_pending(r, lkb);
1543 else 1731 else
1544 high = max_t(int, lkb->lkb_rqmode, high); 1732 high = max_t(int, lkb->lkb_rqmode, high);
@@ -1733,7 +1921,7 @@ static void confirm_master(struct dlm_rsb *r, int error)
1733} 1921}
1734 1922
1735static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 1923static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1736 int namelen, uint32_t parent_lkid, void *ast, 1924 int namelen, unsigned long timeout_cs, void *ast,
1737 void *astarg, void *bast, struct dlm_args *args) 1925 void *astarg, void *bast, struct dlm_args *args)
1738{ 1926{
1739 int rv = -EINVAL; 1927 int rv = -EINVAL;
@@ -1776,10 +1964,6 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1776 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) 1964 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1777 goto out; 1965 goto out;
1778 1966
1779 /* parent/child locks not yet supported */
1780 if (parent_lkid)
1781 goto out;
1782
1783 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) 1967 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1784 goto out; 1968 goto out;
1785 1969
@@ -1791,6 +1975,7 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1791 args->astaddr = ast; 1975 args->astaddr = ast;
1792 args->astparam = (long) astarg; 1976 args->astparam = (long) astarg;
1793 args->bastaddr = bast; 1977 args->bastaddr = bast;
1978 args->timeout = timeout_cs;
1794 args->mode = mode; 1979 args->mode = mode;
1795 args->lksb = lksb; 1980 args->lksb = lksb;
1796 rv = 0; 1981 rv = 0;
@@ -1845,6 +2030,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1845 lkb->lkb_lksb = args->lksb; 2030 lkb->lkb_lksb = args->lksb;
1846 lkb->lkb_lvbptr = args->lksb->sb_lvbptr; 2031 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1847 lkb->lkb_ownpid = (int) current->pid; 2032 lkb->lkb_ownpid = (int) current->pid;
2033 lkb->lkb_timeout_cs = args->timeout;
1848 rv = 0; 2034 rv = 0;
1849 out: 2035 out:
1850 return rv; 2036 return rv;
@@ -1903,6 +2089,9 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1903 if (is_overlap(lkb)) 2089 if (is_overlap(lkb))
1904 goto out; 2090 goto out;
1905 2091
2092 /* don't let scand try to do a cancel */
2093 del_timeout(lkb);
2094
1906 if (lkb->lkb_flags & DLM_IFL_RESEND) { 2095 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1907 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 2096 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1908 rv = -EBUSY; 2097 rv = -EBUSY;
@@ -1934,6 +2123,9 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1934 if (is_overlap_unlock(lkb)) 2123 if (is_overlap_unlock(lkb))
1935 goto out; 2124 goto out;
1936 2125
2126 /* don't let scand try to do a cancel */
2127 del_timeout(lkb);
2128
1937 if (lkb->lkb_flags & DLM_IFL_RESEND) { 2129 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1938 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 2130 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1939 rv = -EBUSY; 2131 rv = -EBUSY;
@@ -1984,7 +2176,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1984{ 2176{
1985 int error = 0; 2177 int error = 0;
1986 2178
1987 if (can_be_granted(r, lkb, 1)) { 2179 if (can_be_granted(r, lkb, 1, NULL)) {
1988 grant_lock(r, lkb); 2180 grant_lock(r, lkb);
1989 queue_cast(r, lkb, 0); 2181 queue_cast(r, lkb, 0);
1990 goto out; 2182 goto out;
@@ -1994,6 +2186,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1994 error = -EINPROGRESS; 2186 error = -EINPROGRESS;
1995 add_lkb(r, lkb, DLM_LKSTS_WAITING); 2187 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1996 send_blocking_asts(r, lkb); 2188 send_blocking_asts(r, lkb);
2189 add_timeout(lkb);
1997 goto out; 2190 goto out;
1998 } 2191 }
1999 2192
@@ -2009,16 +2202,32 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2009static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 2202static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2010{ 2203{
2011 int error = 0; 2204 int error = 0;
2205 int deadlk = 0;
2012 2206
2013 /* changing an existing lock may allow others to be granted */ 2207 /* changing an existing lock may allow others to be granted */
2014 2208
2015 if (can_be_granted(r, lkb, 1)) { 2209 if (can_be_granted(r, lkb, 1, &deadlk)) {
2016 grant_lock(r, lkb); 2210 grant_lock(r, lkb);
2017 queue_cast(r, lkb, 0); 2211 queue_cast(r, lkb, 0);
2018 grant_pending_locks(r); 2212 grant_pending_locks(r);
2019 goto out; 2213 goto out;
2020 } 2214 }
2021 2215
2216 /* can_be_granted() detected that this lock would block in a conversion
2217 deadlock, so we leave it on the granted queue and return EDEADLK in
2218 the ast for the convert. */
2219
2220 if (deadlk) {
2221 /* it's left on the granted queue */
2222 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2223 lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2224 lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2225 revert_lock(r, lkb);
2226 queue_cast(r, lkb, -EDEADLK);
2227 error = -EDEADLK;
2228 goto out;
2229 }
2230
2022 /* is_demoted() means the can_be_granted() above set the grmode 2231 /* is_demoted() means the can_be_granted() above set the grmode
2023 to NL, and left us on the granted queue. This auto-demotion 2232 to NL, and left us on the granted queue. This auto-demotion
2024 (due to CONVDEADLK) might mean other locks, and/or this lock, are 2233 (due to CONVDEADLK) might mean other locks, and/or this lock, are
@@ -2041,6 +2250,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2041 del_lkb(r, lkb); 2250 del_lkb(r, lkb);
2042 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 2251 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2043 send_blocking_asts(r, lkb); 2252 send_blocking_asts(r, lkb);
2253 add_timeout(lkb);
2044 goto out; 2254 goto out;
2045 } 2255 }
2046 2256
@@ -2274,7 +2484,7 @@ int dlm_lock(dlm_lockspace_t *lockspace,
2274 if (!ls) 2484 if (!ls)
2275 return -EINVAL; 2485 return -EINVAL;
2276 2486
2277 lock_recovery(ls); 2487 dlm_lock_recovery(ls);
2278 2488
2279 if (convert) 2489 if (convert)
2280 error = find_lkb(ls, lksb->sb_lkid, &lkb); 2490 error = find_lkb(ls, lksb->sb_lkid, &lkb);
@@ -2284,7 +2494,7 @@ int dlm_lock(dlm_lockspace_t *lockspace,
2284 if (error) 2494 if (error)
2285 goto out; 2495 goto out;
2286 2496
2287 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast, 2497 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2288 astarg, bast, &args); 2498 astarg, bast, &args);
2289 if (error) 2499 if (error)
2290 goto out_put; 2500 goto out_put;
@@ -2299,10 +2509,10 @@ int dlm_lock(dlm_lockspace_t *lockspace,
2299 out_put: 2509 out_put:
2300 if (convert || error) 2510 if (convert || error)
2301 __put_lkb(ls, lkb); 2511 __put_lkb(ls, lkb);
2302 if (error == -EAGAIN) 2512 if (error == -EAGAIN || error == -EDEADLK)
2303 error = 0; 2513 error = 0;
2304 out: 2514 out:
2305 unlock_recovery(ls); 2515 dlm_unlock_recovery(ls);
2306 dlm_put_lockspace(ls); 2516 dlm_put_lockspace(ls);
2307 return error; 2517 return error;
2308} 2518}
@@ -2322,7 +2532,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
2322 if (!ls) 2532 if (!ls)
2323 return -EINVAL; 2533 return -EINVAL;
2324 2534
2325 lock_recovery(ls); 2535 dlm_lock_recovery(ls);
2326 2536
2327 error = find_lkb(ls, lkid, &lkb); 2537 error = find_lkb(ls, lkid, &lkb);
2328 if (error) 2538 if (error)
@@ -2344,7 +2554,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
2344 out_put: 2554 out_put:
2345 dlm_put_lkb(lkb); 2555 dlm_put_lkb(lkb);
2346 out: 2556 out:
2347 unlock_recovery(ls); 2557 dlm_unlock_recovery(ls);
2348 dlm_put_lockspace(ls); 2558 dlm_put_lockspace(ls);
2349 return error; 2559 return error;
2350} 2560}
@@ -2384,7 +2594,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
2384 pass into lowcomms_commit and a message buffer (mb) that we 2594 pass into lowcomms_commit and a message buffer (mb) that we
2385 write our data into */ 2595 write our data into */
2386 2596
2387 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb); 2597 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2388 if (!mh) 2598 if (!mh)
2389 return -ENOBUFS; 2599 return -ENOBUFS;
2390 2600
@@ -3111,9 +3321,10 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3111 lkb->lkb_remid = ms->m_lkid; 3321 lkb->lkb_remid = ms->m_lkid;
3112 if (is_altmode(lkb)) 3322 if (is_altmode(lkb))
3113 munge_altmode(lkb, ms); 3323 munge_altmode(lkb, ms);
3114 if (result) 3324 if (result) {
3115 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3325 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3116 else { 3326 add_timeout(lkb);
3327 } else {
3117 grant_lock_pc(r, lkb, ms); 3328 grant_lock_pc(r, lkb, ms);
3118 queue_cast(r, lkb, 0); 3329 queue_cast(r, lkb, 0);
3119 } 3330 }
@@ -3172,6 +3383,12 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3172 queue_cast(r, lkb, -EAGAIN); 3383 queue_cast(r, lkb, -EAGAIN);
3173 break; 3384 break;
3174 3385
3386 case -EDEADLK:
3387 receive_flags_reply(lkb, ms);
3388 revert_lock_pc(r, lkb);
3389 queue_cast(r, lkb, -EDEADLK);
3390 break;
3391
3175 case -EINPROGRESS: 3392 case -EINPROGRESS:
3176 /* convert was queued on remote master */ 3393 /* convert was queued on remote master */
3177 receive_flags_reply(lkb, ms); 3394 receive_flags_reply(lkb, ms);
@@ -3179,6 +3396,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3179 munge_demoted(lkb, ms); 3396 munge_demoted(lkb, ms);
3180 del_lkb(r, lkb); 3397 del_lkb(r, lkb);
3181 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3398 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3399 add_timeout(lkb);
3182 break; 3400 break;
3183 3401
3184 case 0: 3402 case 0:
@@ -3298,8 +3516,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3298 case -DLM_ECANCEL: 3516 case -DLM_ECANCEL:
3299 receive_flags_reply(lkb, ms); 3517 receive_flags_reply(lkb, ms);
3300 revert_lock_pc(r, lkb); 3518 revert_lock_pc(r, lkb);
3301 if (ms->m_result) 3519 queue_cast(r, lkb, -DLM_ECANCEL);
3302 queue_cast(r, lkb, -DLM_ECANCEL);
3303 break; 3520 break;
3304 case 0: 3521 case 0:
3305 break; 3522 break;
@@ -3424,7 +3641,7 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3424 } 3641 }
3425 } 3642 }
3426 3643
3427 if (lock_recovery_try(ls)) 3644 if (dlm_lock_recovery_try(ls))
3428 break; 3645 break;
3429 schedule(); 3646 schedule();
3430 } 3647 }
@@ -3503,7 +3720,7 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3503 log_error(ls, "unknown message type %d", ms->m_type); 3720 log_error(ls, "unknown message type %d", ms->m_type);
3504 } 3721 }
3505 3722
3506 unlock_recovery(ls); 3723 dlm_unlock_recovery(ls);
3507 out: 3724 out:
3508 dlm_put_lockspace(ls); 3725 dlm_put_lockspace(ls);
3509 dlm_astd_wake(); 3726 dlm_astd_wake();
@@ -4034,13 +4251,13 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4034 4251
4035int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 4252int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4036 int mode, uint32_t flags, void *name, unsigned int namelen, 4253 int mode, uint32_t flags, void *name, unsigned int namelen,
4037 uint32_t parent_lkid) 4254 unsigned long timeout_cs)
4038{ 4255{
4039 struct dlm_lkb *lkb; 4256 struct dlm_lkb *lkb;
4040 struct dlm_args args; 4257 struct dlm_args args;
4041 int error; 4258 int error;
4042 4259
4043 lock_recovery(ls); 4260 dlm_lock_recovery(ls);
4044 4261
4045 error = create_lkb(ls, &lkb); 4262 error = create_lkb(ls, &lkb);
4046 if (error) { 4263 if (error) {
@@ -4062,7 +4279,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4062 When DLM_IFL_USER is set, the dlm knows that this is a userspace 4279 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4063 lock and that lkb_astparam is the dlm_user_args structure. */ 4280 lock and that lkb_astparam is the dlm_user_args structure. */
4064 4281
4065 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid, 4282 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4066 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args); 4283 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4067 lkb->lkb_flags |= DLM_IFL_USER; 4284 lkb->lkb_flags |= DLM_IFL_USER;
4068 ua->old_mode = DLM_LOCK_IV; 4285 ua->old_mode = DLM_LOCK_IV;
@@ -4094,19 +4311,20 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4094 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 4311 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4095 spin_unlock(&ua->proc->locks_spin); 4312 spin_unlock(&ua->proc->locks_spin);
4096 out: 4313 out:
4097 unlock_recovery(ls); 4314 dlm_unlock_recovery(ls);
4098 return error; 4315 return error;
4099} 4316}
4100 4317
4101int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 4318int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4102 int mode, uint32_t flags, uint32_t lkid, char *lvb_in) 4319 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4320 unsigned long timeout_cs)
4103{ 4321{
4104 struct dlm_lkb *lkb; 4322 struct dlm_lkb *lkb;
4105 struct dlm_args args; 4323 struct dlm_args args;
4106 struct dlm_user_args *ua; 4324 struct dlm_user_args *ua;
4107 int error; 4325 int error;
4108 4326
4109 lock_recovery(ls); 4327 dlm_lock_recovery(ls);
4110 4328
4111 error = find_lkb(ls, lkid, &lkb); 4329 error = find_lkb(ls, lkid, &lkb);
4112 if (error) 4330 if (error)
@@ -4127,6 +4345,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4127 if (lvb_in && ua->lksb.sb_lvbptr) 4345 if (lvb_in && ua->lksb.sb_lvbptr)
4128 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 4346 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4129 4347
4348 ua->xid = ua_tmp->xid;
4130 ua->castparam = ua_tmp->castparam; 4349 ua->castparam = ua_tmp->castparam;
4131 ua->castaddr = ua_tmp->castaddr; 4350 ua->castaddr = ua_tmp->castaddr;
4132 ua->bastparam = ua_tmp->bastparam; 4351 ua->bastparam = ua_tmp->bastparam;
@@ -4134,19 +4353,19 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4134 ua->user_lksb = ua_tmp->user_lksb; 4353 ua->user_lksb = ua_tmp->user_lksb;
4135 ua->old_mode = lkb->lkb_grmode; 4354 ua->old_mode = lkb->lkb_grmode;
4136 4355
4137 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST, 4356 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4138 ua, DLM_FAKE_USER_AST, &args); 4357 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4139 if (error) 4358 if (error)
4140 goto out_put; 4359 goto out_put;
4141 4360
4142 error = convert_lock(ls, lkb, &args); 4361 error = convert_lock(ls, lkb, &args);
4143 4362
4144 if (error == -EINPROGRESS || error == -EAGAIN) 4363 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4145 error = 0; 4364 error = 0;
4146 out_put: 4365 out_put:
4147 dlm_put_lkb(lkb); 4366 dlm_put_lkb(lkb);
4148 out: 4367 out:
4149 unlock_recovery(ls); 4368 dlm_unlock_recovery(ls);
4150 kfree(ua_tmp); 4369 kfree(ua_tmp);
4151 return error; 4370 return error;
4152} 4371}
@@ -4159,7 +4378,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4159 struct dlm_user_args *ua; 4378 struct dlm_user_args *ua;
4160 int error; 4379 int error;
4161 4380
4162 lock_recovery(ls); 4381 dlm_lock_recovery(ls);
4163 4382
4164 error = find_lkb(ls, lkid, &lkb); 4383 error = find_lkb(ls, lkid, &lkb);
4165 if (error) 4384 if (error)
@@ -4194,7 +4413,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4194 out_put: 4413 out_put:
4195 dlm_put_lkb(lkb); 4414 dlm_put_lkb(lkb);
4196 out: 4415 out:
4197 unlock_recovery(ls); 4416 dlm_unlock_recovery(ls);
4198 kfree(ua_tmp); 4417 kfree(ua_tmp);
4199 return error; 4418 return error;
4200} 4419}
@@ -4207,7 +4426,7 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4207 struct dlm_user_args *ua; 4426 struct dlm_user_args *ua;
4208 int error; 4427 int error;
4209 4428
4210 lock_recovery(ls); 4429 dlm_lock_recovery(ls);
4211 4430
4212 error = find_lkb(ls, lkid, &lkb); 4431 error = find_lkb(ls, lkid, &lkb);
4213 if (error) 4432 if (error)
@@ -4231,11 +4450,59 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4231 out_put: 4450 out_put:
4232 dlm_put_lkb(lkb); 4451 dlm_put_lkb(lkb);
4233 out: 4452 out:
4234 unlock_recovery(ls); 4453 dlm_unlock_recovery(ls);
4235 kfree(ua_tmp); 4454 kfree(ua_tmp);
4236 return error; 4455 return error;
4237} 4456}
4238 4457
4458int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4459{
4460 struct dlm_lkb *lkb;
4461 struct dlm_args args;
4462 struct dlm_user_args *ua;
4463 struct dlm_rsb *r;
4464 int error;
4465
4466 dlm_lock_recovery(ls);
4467
4468 error = find_lkb(ls, lkid, &lkb);
4469 if (error)
4470 goto out;
4471
4472 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4473
4474 error = set_unlock_args(flags, ua, &args);
4475 if (error)
4476 goto out_put;
4477
4478 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4479
4480 r = lkb->lkb_resource;
4481 hold_rsb(r);
4482 lock_rsb(r);
4483
4484 error = validate_unlock_args(lkb, &args);
4485 if (error)
4486 goto out_r;
4487 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4488
4489 error = _cancel_lock(r, lkb);
4490 out_r:
4491 unlock_rsb(r);
4492 put_rsb(r);
4493
4494 if (error == -DLM_ECANCEL)
4495 error = 0;
4496 /* from validate_unlock_args() */
4497 if (error == -EBUSY)
4498 error = 0;
4499 out_put:
4500 dlm_put_lkb(lkb);
4501 out:
4502 dlm_unlock_recovery(ls);
4503 return error;
4504}
4505
4239/* lkb's that are removed from the waiters list by revert are just left on the 4506/* lkb's that are removed from the waiters list by revert are just left on the
4240 orphans list with the granted orphan locks, to be freed by purge */ 4507 orphans list with the granted orphan locks, to be freed by purge */
4241 4508
@@ -4314,12 +4581,13 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4314{ 4581{
4315 struct dlm_lkb *lkb, *safe; 4582 struct dlm_lkb *lkb, *safe;
4316 4583
4317 lock_recovery(ls); 4584 dlm_lock_recovery(ls);
4318 4585
4319 while (1) { 4586 while (1) {
4320 lkb = del_proc_lock(ls, proc); 4587 lkb = del_proc_lock(ls, proc);
4321 if (!lkb) 4588 if (!lkb)
4322 break; 4589 break;
4590 del_timeout(lkb);
4323 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 4591 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4324 orphan_proc_lock(ls, lkb); 4592 orphan_proc_lock(ls, lkb);
4325 else 4593 else
@@ -4347,7 +4615,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4347 } 4615 }
4348 4616
4349 mutex_unlock(&ls->ls_clear_proc_locks); 4617 mutex_unlock(&ls->ls_clear_proc_locks);
4350 unlock_recovery(ls); 4618 dlm_unlock_recovery(ls);
4351} 4619}
4352 4620
4353static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 4621static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
@@ -4429,12 +4697,12 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4429 if (nodeid != dlm_our_nodeid()) { 4697 if (nodeid != dlm_our_nodeid()) {
4430 error = send_purge(ls, nodeid, pid); 4698 error = send_purge(ls, nodeid, pid);
4431 } else { 4699 } else {
4432 lock_recovery(ls); 4700 dlm_lock_recovery(ls);
4433 if (pid == current->pid) 4701 if (pid == current->pid)
4434 purge_proc_locks(ls, proc); 4702 purge_proc_locks(ls, proc);
4435 else 4703 else
4436 do_purge(ls, nodeid, pid); 4704 do_purge(ls, nodeid, pid);
4437 unlock_recovery(ls); 4705 dlm_unlock_recovery(ls);
4438 } 4706 }
4439 return error; 4707 return error;
4440} 4708}