aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2011-04-04 16:19:59 -0400
committerDavid Teigland <teigland@redhat.com>2011-04-05 11:54:47 -0400
commit2a7ce0edd661b3144c7b916ecf1eba0967b6d4a5 (patch)
treeb667622eda00739c54e2010c5f38bba9c76947c7
parentc6ff669bac5c409f4cb74366248f51b73f7d6feb (diff)
dlm: remove shared message stub for recovery
kmalloc a stub message struct during recovery instead of sharing the struct in the lockspace. This leaves the lockspace stub_ms only for faking downconvert replies, where it is never modified and sharing is not a problem. Also improve the debug messages in the same recovery function. Signed-off-by: David Teigland <teigland@redhat.com>
-rw-r--r--fs/dlm/dlm_internal.h1
-rw-r--r--fs/dlm/lock.c82
2 files changed, 50 insertions, 33 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 6a92478fe1f1..0262451eb9c6 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -209,6 +209,7 @@ struct dlm_args {
209#define DLM_IFL_WATCH_TIMEWARN 0x00400000 209#define DLM_IFL_WATCH_TIMEWARN 0x00400000
210#define DLM_IFL_TIMEOUT_CANCEL 0x00800000 210#define DLM_IFL_TIMEOUT_CANCEL 0x00800000
211#define DLM_IFL_DEADLOCK_CANCEL 0x01000000 211#define DLM_IFL_DEADLOCK_CANCEL 0x01000000
212#define DLM_IFL_STUB_MS 0x02000000 /* magic number for m_flags */
212#define DLM_IFL_USER 0x00000001 213#define DLM_IFL_USER 0x00000001
213#define DLM_IFL_ORPHAN 0x00000002 214#define DLM_IFL_ORPHAN 0x00000002
214 215
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e3c864120371..81227799d47a 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1037,10 +1037,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1037 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1037 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1038 int error; 1038 int error;
1039 1039
1040 if (ms != &ls->ls_stub_ms) 1040 if (ms->m_flags != DLM_IFL_STUB_MS)
1041 mutex_lock(&ls->ls_waiters_mutex); 1041 mutex_lock(&ls->ls_waiters_mutex);
1042 error = _remove_from_waiters(lkb, ms->m_type, ms); 1042 error = _remove_from_waiters(lkb, ms->m_type, ms);
1043 if (ms != &ls->ls_stub_ms) 1043 if (ms->m_flags != DLM_IFL_STUB_MS)
1044 mutex_unlock(&ls->ls_waiters_mutex); 1044 mutex_unlock(&ls->ls_waiters_mutex);
1045 return error; 1045 return error;
1046} 1046}
@@ -1462,14 +1462,8 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1462 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become 1462 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1463 compatible with other granted locks */ 1463 compatible with other granted locks */
1464 1464
1465static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms) 1465static void munge_demoted(struct dlm_lkb *lkb)
1466{ 1466{
1467 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1468 log_print("munge_demoted %x invalid reply type %d",
1469 lkb->lkb_id, ms->m_type);
1470 return;
1471 }
1472
1473 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { 1467 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1474 log_print("munge_demoted %x invalid modes gr %d rq %d", 1468 log_print("munge_demoted %x invalid modes gr %d rq %d",
1475 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); 1469 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
@@ -2966,9 +2960,9 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2966 /* down conversions go without a reply from the master */ 2960 /* down conversions go without a reply from the master */
2967 if (!error && down_conversion(lkb)) { 2961 if (!error && down_conversion(lkb)) {
2968 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); 2962 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2963 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2969 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 2964 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2970 r->res_ls->ls_stub_ms.m_result = 0; 2965 r->res_ls->ls_stub_ms.m_result = 0;
2971 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2972 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 2966 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2973 } 2967 }
2974 2968
@@ -3156,6 +3150,9 @@ static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3156 3150
3157static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3151static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3158{ 3152{
3153 if (ms->m_flags == DLM_IFL_STUB_MS)
3154 return;
3155
3159 lkb->lkb_sbflags = ms->m_sbflags; 3156 lkb->lkb_sbflags = ms->m_sbflags;
3160 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 3157 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3161 (ms->m_flags & 0x0000FFFF); 3158 (ms->m_flags & 0x0000FFFF);
@@ -3698,7 +3695,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3698 /* convert was queued on remote master */ 3695 /* convert was queued on remote master */
3699 receive_flags_reply(lkb, ms); 3696 receive_flags_reply(lkb, ms);
3700 if (is_demoted(lkb)) 3697 if (is_demoted(lkb))
3701 munge_demoted(lkb, ms); 3698 munge_demoted(lkb);
3702 del_lkb(r, lkb); 3699 del_lkb(r, lkb);
3703 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3700 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3704 add_timeout(lkb); 3701 add_timeout(lkb);
@@ -3708,7 +3705,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3708 /* convert was granted on remote master */ 3705 /* convert was granted on remote master */
3709 receive_flags_reply(lkb, ms); 3706 receive_flags_reply(lkb, ms);
3710 if (is_demoted(lkb)) 3707 if (is_demoted(lkb))
3711 munge_demoted(lkb, ms); 3708 munge_demoted(lkb);
3712 grant_lock_pc(r, lkb, ms); 3709 grant_lock_pc(r, lkb, ms);
3713 queue_cast(r, lkb, 0); 3710 queue_cast(r, lkb, 0);
3714 break; 3711 break;
@@ -4082,15 +4079,17 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4082 dlm_put_lockspace(ls); 4079 dlm_put_lockspace(ls);
4083} 4080}
4084 4081
4085static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) 4082static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4083 struct dlm_message *ms_stub)
4086{ 4084{
4087 if (middle_conversion(lkb)) { 4085 if (middle_conversion(lkb)) {
4088 hold_lkb(lkb); 4086 hold_lkb(lkb);
4089 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 4087 memset(ms_stub, 0, sizeof(struct dlm_message));
4090 ls->ls_stub_ms.m_result = -EINPROGRESS; 4088 ms_stub->m_flags = DLM_IFL_STUB_MS;
4091 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 4089 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4092 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; 4090 ms_stub->m_result = -EINPROGRESS;
4093 _receive_convert_reply(lkb, &ls->ls_stub_ms); 4091 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4092 _receive_convert_reply(lkb, ms_stub);
4094 4093
4095 /* Same special case as in receive_rcom_lock_args() */ 4094 /* Same special case as in receive_rcom_lock_args() */
4096 lkb->lkb_grmode = DLM_LOCK_IV; 4095 lkb->lkb_grmode = DLM_LOCK_IV;
@@ -4131,13 +4130,27 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4131void dlm_recover_waiters_pre(struct dlm_ls *ls) 4130void dlm_recover_waiters_pre(struct dlm_ls *ls)
4132{ 4131{
4133 struct dlm_lkb *lkb, *safe; 4132 struct dlm_lkb *lkb, *safe;
4133 struct dlm_message *ms_stub;
4134 int wait_type, stub_unlock_result, stub_cancel_result; 4134 int wait_type, stub_unlock_result, stub_cancel_result;
4135 4135
4136 ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
4137 if (!ms_stub) {
4138 log_error(ls, "dlm_recover_waiters_pre no mem");
4139 return;
4140 }
4141
4136 mutex_lock(&ls->ls_waiters_mutex); 4142 mutex_lock(&ls->ls_waiters_mutex);
4137 4143
4138 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 4144 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4139 log_debug(ls, "pre recover waiter lkid %x type %d flags %x", 4145
4140 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags); 4146 /* exclude debug messages about unlocks because there can be so
4147 many and they aren't very interesting */
4148
4149 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4150 log_debug(ls, "recover_waiter %x nodeid %d "
4151 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4152 lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4153 }
4141 4154
4142 /* all outstanding lookups, regardless of destination will be 4155 /* all outstanding lookups, regardless of destination will be
4143 resent after recovery is done */ 4156 resent after recovery is done */
@@ -4183,26 +4196,28 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
4183 break; 4196 break;
4184 4197
4185 case DLM_MSG_CONVERT: 4198 case DLM_MSG_CONVERT:
4186 recover_convert_waiter(ls, lkb); 4199 recover_convert_waiter(ls, lkb, ms_stub);
4187 break; 4200 break;
4188 4201
4189 case DLM_MSG_UNLOCK: 4202 case DLM_MSG_UNLOCK:
4190 hold_lkb(lkb); 4203 hold_lkb(lkb);
4191 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; 4204 memset(ms_stub, 0, sizeof(struct dlm_message));
4192 ls->ls_stub_ms.m_result = stub_unlock_result; 4205 ms_stub->m_flags = DLM_IFL_STUB_MS;
4193 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 4206 ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4194 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; 4207 ms_stub->m_result = stub_unlock_result;
4195 _receive_unlock_reply(lkb, &ls->ls_stub_ms); 4208 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4209 _receive_unlock_reply(lkb, ms_stub);
4196 dlm_put_lkb(lkb); 4210 dlm_put_lkb(lkb);
4197 break; 4211 break;
4198 4212
4199 case DLM_MSG_CANCEL: 4213 case DLM_MSG_CANCEL:
4200 hold_lkb(lkb); 4214 hold_lkb(lkb);
4201 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; 4215 memset(ms_stub, 0, sizeof(struct dlm_message));
4202 ls->ls_stub_ms.m_result = stub_cancel_result; 4216 ms_stub->m_flags = DLM_IFL_STUB_MS;
4203 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 4217 ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4204 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; 4218 ms_stub->m_result = stub_cancel_result;
4205 _receive_cancel_reply(lkb, &ls->ls_stub_ms); 4219 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4220 _receive_cancel_reply(lkb, ms_stub);
4206 dlm_put_lkb(lkb); 4221 dlm_put_lkb(lkb);
4207 break; 4222 break;
4208 4223
@@ -4213,6 +4228,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
4213 schedule(); 4228 schedule();
4214 } 4229 }
4215 mutex_unlock(&ls->ls_waiters_mutex); 4230 mutex_unlock(&ls->ls_waiters_mutex);
4231 kfree(ms_stub);
4216} 4232}
4217 4233
4218static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) 4234static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
@@ -4277,8 +4293,8 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
4277 ou = is_overlap_unlock(lkb); 4293 ou = is_overlap_unlock(lkb);
4278 err = 0; 4294 err = 0;
4279 4295
4280 log_debug(ls, "recover_waiters_post %x type %d flags %x %s", 4296 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4281 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); 4297 lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4282 4298
4283 /* At this point we assume that we won't get a reply to any 4299 /* At this point we assume that we won't get a reply to any
4284 previous op or overlap op on this lock. First, do a big 4300 previous op or overlap op on this lock. First, do a big