aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/lock.c139
1 files changed, 104 insertions, 35 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index c3b9fca17044..c2890efb0259 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3008,8 +3008,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3008 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST); 3008 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
3009 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP); 3009 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
3010 3010
3011 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
3012
3013 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3011 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3014 /* lkb was just created so there won't be an lvb yet */ 3012 /* lkb was just created so there won't be an lvb yet */
3015 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3013 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
@@ -3023,16 +3021,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3023static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3021static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3024 struct dlm_message *ms) 3022 struct dlm_message *ms)
3025{ 3023{
3026 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
3027 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
3028 lkb->lkb_nodeid, ms->m_header.h_nodeid,
3029 lkb->lkb_id, lkb->lkb_remid);
3030 return -EINVAL;
3031 }
3032
3033 if (!is_master_copy(lkb))
3034 return -EINVAL;
3035
3036 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 3024 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3037 return -EBUSY; 3025 return -EBUSY;
3038 3026
@@ -3048,8 +3036,6 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3048static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3036static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3049 struct dlm_message *ms) 3037 struct dlm_message *ms)
3050{ 3038{
3051 if (!is_master_copy(lkb))
3052 return -EINVAL;
3053 if (receive_lvb(ls, lkb, ms)) 3039 if (receive_lvb(ls, lkb, ms))
3054 return -ENOMEM; 3040 return -ENOMEM;
3055 return 0; 3041 return 0;
@@ -3065,6 +3051,50 @@ static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3065 lkb->lkb_remid = ms->m_lkid; 3051 lkb->lkb_remid = ms->m_lkid;
3066} 3052}
3067 3053
3054/* This is called after the rsb is locked so that we can safely inspect
3055 fields in the lkb. */
3056
3057static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3058{
3059 int from = ms->m_header.h_nodeid;
3060 int error = 0;
3061
3062 switch (ms->m_type) {
3063 case DLM_MSG_CONVERT:
3064 case DLM_MSG_UNLOCK:
3065 case DLM_MSG_CANCEL:
3066 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3067 error = -EINVAL;
3068 break;
3069
3070 case DLM_MSG_CONVERT_REPLY:
3071 case DLM_MSG_UNLOCK_REPLY:
3072 case DLM_MSG_CANCEL_REPLY:
3073 case DLM_MSG_GRANT:
3074 case DLM_MSG_BAST:
3075 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3076 error = -EINVAL;
3077 break;
3078
3079 case DLM_MSG_REQUEST_REPLY:
3080 if (!is_process_copy(lkb))
3081 error = -EINVAL;
3082 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3083 error = -EINVAL;
3084 break;
3085
3086 default:
3087 error = -EINVAL;
3088 }
3089
3090 if (error)
3091 log_error(lkb->lkb_resource->res_ls,
3092 "ignore invalid message %d from %d %x %x %x %d",
3093 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3094 lkb->lkb_flags, lkb->lkb_nodeid);
3095 return error;
3096}
3097
3068static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) 3098static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3069{ 3099{
3070 struct dlm_lkb *lkb; 3100 struct dlm_lkb *lkb;
@@ -3126,17 +3156,21 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3126 hold_rsb(r); 3156 hold_rsb(r);
3127 lock_rsb(r); 3157 lock_rsb(r);
3128 3158
3159 error = validate_message(lkb, ms);
3160 if (error)
3161 goto out;
3162
3129 receive_flags(lkb, ms); 3163 receive_flags(lkb, ms);
3130 error = receive_convert_args(ls, lkb, ms); 3164 error = receive_convert_args(ls, lkb, ms);
3131 if (error) 3165 if (error)
3132 goto out; 3166 goto out_reply;
3133 reply = !down_conversion(lkb); 3167 reply = !down_conversion(lkb);
3134 3168
3135 error = do_convert(r, lkb); 3169 error = do_convert(r, lkb);
3136 out: 3170 out_reply:
3137 if (reply) 3171 if (reply)
3138 send_convert_reply(r, lkb, error); 3172 send_convert_reply(r, lkb, error);
3139 3173 out:
3140 unlock_rsb(r); 3174 unlock_rsb(r);
3141 put_rsb(r); 3175 put_rsb(r);
3142 dlm_put_lkb(lkb); 3176 dlm_put_lkb(lkb);
@@ -3162,15 +3196,19 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3162 hold_rsb(r); 3196 hold_rsb(r);
3163 lock_rsb(r); 3197 lock_rsb(r);
3164 3198
3199 error = validate_message(lkb, ms);
3200 if (error)
3201 goto out;
3202
3165 receive_flags(lkb, ms); 3203 receive_flags(lkb, ms);
3166 error = receive_unlock_args(ls, lkb, ms); 3204 error = receive_unlock_args(ls, lkb, ms);
3167 if (error) 3205 if (error)
3168 goto out; 3206 goto out_reply;
3169 3207
3170 error = do_unlock(r, lkb); 3208 error = do_unlock(r, lkb);
3171 out: 3209 out_reply:
3172 send_unlock_reply(r, lkb, error); 3210 send_unlock_reply(r, lkb, error);
3173 3211 out:
3174 unlock_rsb(r); 3212 unlock_rsb(r);
3175 put_rsb(r); 3213 put_rsb(r);
3176 dlm_put_lkb(lkb); 3214 dlm_put_lkb(lkb);
@@ -3198,9 +3236,13 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3198 hold_rsb(r); 3236 hold_rsb(r);
3199 lock_rsb(r); 3237 lock_rsb(r);
3200 3238
3239 error = validate_message(lkb, ms);
3240 if (error)
3241 goto out;
3242
3201 error = do_cancel(r, lkb); 3243 error = do_cancel(r, lkb);
3202 send_cancel_reply(r, lkb, error); 3244 send_cancel_reply(r, lkb, error);
3203 3245 out:
3204 unlock_rsb(r); 3246 unlock_rsb(r);
3205 put_rsb(r); 3247 put_rsb(r);
3206 dlm_put_lkb(lkb); 3248 dlm_put_lkb(lkb);
@@ -3219,22 +3261,26 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3219 3261
3220 error = find_lkb(ls, ms->m_remid, &lkb); 3262 error = find_lkb(ls, ms->m_remid, &lkb);
3221 if (error) { 3263 if (error) {
3222 log_error(ls, "receive_grant no lkb"); 3264 log_debug(ls, "receive_grant from %d no lkb %x",
3265 ms->m_header.h_nodeid, ms->m_remid);
3223 return; 3266 return;
3224 } 3267 }
3225 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3226 3268
3227 r = lkb->lkb_resource; 3269 r = lkb->lkb_resource;
3228 3270
3229 hold_rsb(r); 3271 hold_rsb(r);
3230 lock_rsb(r); 3272 lock_rsb(r);
3231 3273
3274 error = validate_message(lkb, ms);
3275 if (error)
3276 goto out;
3277
3232 receive_flags_reply(lkb, ms); 3278 receive_flags_reply(lkb, ms);
3233 if (is_altmode(lkb)) 3279 if (is_altmode(lkb))
3234 munge_altmode(lkb, ms); 3280 munge_altmode(lkb, ms);
3235 grant_lock_pc(r, lkb, ms); 3281 grant_lock_pc(r, lkb, ms);
3236 queue_cast(r, lkb, 0); 3282 queue_cast(r, lkb, 0);
3237 3283 out:
3238 unlock_rsb(r); 3284 unlock_rsb(r);
3239 put_rsb(r); 3285 put_rsb(r);
3240 dlm_put_lkb(lkb); 3286 dlm_put_lkb(lkb);
@@ -3248,18 +3294,22 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3248 3294
3249 error = find_lkb(ls, ms->m_remid, &lkb); 3295 error = find_lkb(ls, ms->m_remid, &lkb);
3250 if (error) { 3296 if (error) {
3251 log_error(ls, "receive_bast no lkb"); 3297 log_debug(ls, "receive_bast from %d no lkb %x",
3298 ms->m_header.h_nodeid, ms->m_remid);
3252 return; 3299 return;
3253 } 3300 }
3254 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3255 3301
3256 r = lkb->lkb_resource; 3302 r = lkb->lkb_resource;
3257 3303
3258 hold_rsb(r); 3304 hold_rsb(r);
3259 lock_rsb(r); 3305 lock_rsb(r);
3260 3306
3261 queue_bast(r, lkb, ms->m_bastmode); 3307 error = validate_message(lkb, ms);
3308 if (error)
3309 goto out;
3262 3310
3311 queue_bast(r, lkb, ms->m_bastmode);
3312 out:
3263 unlock_rsb(r); 3313 unlock_rsb(r);
3264 put_rsb(r); 3314 put_rsb(r);
3265 dlm_put_lkb(lkb); 3315 dlm_put_lkb(lkb);
@@ -3325,15 +3375,19 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3325 3375
3326 error = find_lkb(ls, ms->m_remid, &lkb); 3376 error = find_lkb(ls, ms->m_remid, &lkb);
3327 if (error) { 3377 if (error) {
3328 log_error(ls, "receive_request_reply no lkb"); 3378 log_debug(ls, "receive_request_reply from %d no lkb %x",
3379 ms->m_header.h_nodeid, ms->m_remid);
3329 return; 3380 return;
3330 } 3381 }
3331 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3332 3382
3333 r = lkb->lkb_resource; 3383 r = lkb->lkb_resource;
3334 hold_rsb(r); 3384 hold_rsb(r);
3335 lock_rsb(r); 3385 lock_rsb(r);
3336 3386
3387 error = validate_message(lkb, ms);
3388 if (error)
3389 goto out;
3390
3337 mstype = lkb->lkb_wait_type; 3391 mstype = lkb->lkb_wait_type;
3338 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 3392 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3339 if (error) 3393 if (error)
@@ -3466,6 +3520,10 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3466 hold_rsb(r); 3520 hold_rsb(r);
3467 lock_rsb(r); 3521 lock_rsb(r);
3468 3522
3523 error = validate_message(lkb, ms);
3524 if (error)
3525 goto out;
3526
3469 /* stub reply can happen with waiters_mutex held */ 3527 /* stub reply can happen with waiters_mutex held */
3470 error = remove_from_waiters_ms(lkb, ms); 3528 error = remove_from_waiters_ms(lkb, ms);
3471 if (error) 3529 if (error)
@@ -3484,10 +3542,10 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3484 3542
3485 error = find_lkb(ls, ms->m_remid, &lkb); 3543 error = find_lkb(ls, ms->m_remid, &lkb);
3486 if (error) { 3544 if (error) {
3487 log_error(ls, "receive_convert_reply no lkb"); 3545 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3546 ms->m_header.h_nodeid, ms->m_remid);
3488 return; 3547 return;
3489 } 3548 }
3490 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3491 3549
3492 _receive_convert_reply(lkb, ms); 3550 _receive_convert_reply(lkb, ms);
3493 dlm_put_lkb(lkb); 3551 dlm_put_lkb(lkb);
@@ -3501,6 +3559,10 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3501 hold_rsb(r); 3559 hold_rsb(r);
3502 lock_rsb(r); 3560 lock_rsb(r);
3503 3561
3562 error = validate_message(lkb, ms);
3563 if (error)
3564 goto out;
3565
3504 /* stub reply can happen with waiters_mutex held */ 3566 /* stub reply can happen with waiters_mutex held */
3505 error = remove_from_waiters_ms(lkb, ms); 3567 error = remove_from_waiters_ms(lkb, ms);
3506 if (error) 3568 if (error)
@@ -3532,10 +3594,10 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3532 3594
3533 error = find_lkb(ls, ms->m_remid, &lkb); 3595 error = find_lkb(ls, ms->m_remid, &lkb);
3534 if (error) { 3596 if (error) {
3535 log_error(ls, "receive_unlock_reply no lkb"); 3597 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3598 ms->m_header.h_nodeid, ms->m_remid);
3536 return; 3599 return;
3537 } 3600 }
3538 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3539 3601
3540 _receive_unlock_reply(lkb, ms); 3602 _receive_unlock_reply(lkb, ms);
3541 dlm_put_lkb(lkb); 3603 dlm_put_lkb(lkb);
@@ -3549,6 +3611,10 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3549 hold_rsb(r); 3611 hold_rsb(r);
3550 lock_rsb(r); 3612 lock_rsb(r);
3551 3613
3614 error = validate_message(lkb, ms);
3615 if (error)
3616 goto out;
3617
3552 /* stub reply can happen with waiters_mutex held */ 3618 /* stub reply can happen with waiters_mutex held */
3553 error = remove_from_waiters_ms(lkb, ms); 3619 error = remove_from_waiters_ms(lkb, ms);
3554 if (error) 3620 if (error)
@@ -3580,10 +3646,10 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3580 3646
3581 error = find_lkb(ls, ms->m_remid, &lkb); 3647 error = find_lkb(ls, ms->m_remid, &lkb);
3582 if (error) { 3648 if (error) {
3583 log_error(ls, "receive_cancel_reply no lkb"); 3649 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3650 ms->m_header.h_nodeid, ms->m_remid);
3584 return; 3651 return;
3585 } 3652 }
3586 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3587 3653
3588 _receive_cancel_reply(lkb, ms); 3654 _receive_cancel_reply(lkb, ms);
3589 dlm_put_lkb(lkb); 3655 dlm_put_lkb(lkb);
@@ -3816,6 +3882,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3816 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 3882 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3817 ls->ls_stub_ms.m_result = -EINPROGRESS; 3883 ls->ls_stub_ms.m_result = -EINPROGRESS;
3818 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3884 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3885 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3819 _receive_convert_reply(lkb, &ls->ls_stub_ms); 3886 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3820 3887
3821 /* Same special case as in receive_rcom_lock_args() */ 3888 /* Same special case as in receive_rcom_lock_args() */
@@ -3917,6 +3984,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
3917 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; 3984 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3918 ls->ls_stub_ms.m_result = stub_unlock_result; 3985 ls->ls_stub_ms.m_result = stub_unlock_result;
3919 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3986 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3987 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3920 _receive_unlock_reply(lkb, &ls->ls_stub_ms); 3988 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3921 dlm_put_lkb(lkb); 3989 dlm_put_lkb(lkb);
3922 break; 3990 break;
@@ -3926,6 +3994,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
3926 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; 3994 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3927 ls->ls_stub_ms.m_result = stub_cancel_result; 3995 ls->ls_stub_ms.m_result = stub_cancel_result;
3928 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3996 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3997 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3929 _receive_cancel_reply(lkb, &ls->ls_stub_ms); 3998 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3930 dlm_put_lkb(lkb); 3999 dlm_put_lkb(lkb);
3931 break; 4000 break;